/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University. * All rights reserved. * Copyright (c) 2004-2006 The Trustees of the University of Tennessee. * All rights reserved. * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "osc_pt2pt.h" #include "osc_pt2pt_header.h" #include "osc_pt2pt_data_move.h" #include "osc_pt2pt_frag.h" #include "osc_pt2pt_request.h" #include "opal/util/arch.h" #include "opal/sys/atomic.h" #include "opal/align.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/pml/base/pml_base_sendreq.h" #include "opal/mca/btl/btl.h" #include "ompi/mca/osc/base/osc_base_obj_convert.h" #include "ompi/datatype/ompi_datatype.h" #include "ompi/op/op.h" #include "ompi/memchecker.h" /** * struct osc_pt2pt_accumulate_data_t: * * @brief Data associated with an in-progress accumulation operation. */ struct osc_pt2pt_accumulate_data_t { opal_list_item_t super; ompi_osc_pt2pt_module_t* module; void *target; void *source; size_t source_len; ompi_proc_t *proc; int count; int peer; ompi_datatype_t *datatype; ompi_op_t *op; int request_count; }; typedef struct osc_pt2pt_accumulate_data_t osc_pt2pt_accumulate_data_t; static void osc_pt2pt_accumulate_data_constructor (osc_pt2pt_accumulate_data_t *acc_data) { acc_data->source = NULL; acc_data->datatype = NULL; acc_data->op = NULL; } static void osc_pt2pt_accumulate_data_destructor (osc_pt2pt_accumulate_data_t *acc_data) { if (acc_data->source) { /* the source buffer is always alloc'd */ free (acc_data->source); } if (acc_data->datatype) { OBJ_RELEASE(acc_data->datatype); } if (acc_data->op) { OBJ_RELEASE(acc_data->op); } } OBJ_CLASS_DECLARATION(osc_pt2pt_accumulate_data_t); OBJ_CLASS_INSTANCE(osc_pt2pt_accumulate_data_t, opal_list_item_t, osc_pt2pt_accumulate_data_constructor, osc_pt2pt_accumulate_data_destructor); /** * osc_pt2pt_pending_acc_t: * * @brief Keep track of accumulate and cswap operations that are * waiting on the accumulate lock. * * Since accumulate operations may take several steps to * complete we need to lock the accumulate lock until the operation * is complete. While the lock is held it is possible that additional * accumulate operations will arrive. This structure keep track of * those operations. */ struct osc_pt2pt_pending_acc_t { opal_list_item_t super; ompi_osc_pt2pt_header_t header; int source; void *data; size_t data_len; ompi_datatype_t *datatype; }; typedef struct osc_pt2pt_pending_acc_t osc_pt2pt_pending_acc_t; static void osc_pt2pt_pending_acc_constructor (osc_pt2pt_pending_acc_t *pending) { pending->data = NULL; pending->datatype = NULL; } static void osc_pt2pt_pending_acc_destructor (osc_pt2pt_pending_acc_t *pending) { if (NULL != pending->data) { free (pending->data); } if (NULL != pending->datatype) { OBJ_RELEASE(pending->datatype); } } OBJ_CLASS_DECLARATION(osc_pt2pt_pending_acc_t); OBJ_CLASS_INSTANCE(osc_pt2pt_pending_acc_t, opal_list_item_t, osc_pt2pt_pending_acc_constructor, osc_pt2pt_pending_acc_destructor); /* end ompi_osc_pt2pt_pending_acc_t class */ /** * @brief Class for large datatype descriptions * * This class is used to keep track of buffers for large datatype desctiotions * (datatypes that do not fit in an eager fragment). The structure is designed * to take advantage of the small datatype description code path. */ struct ompi_osc_pt2pt_ddt_buffer_t { /** allows this class to be stored in the buffer garbage collection * list */ opal_list_item_t super; /** OSC PT2PT module */ ompi_osc_pt2pt_module_t *module; /** source of this header */ int source; /** header + datatype data */ ompi_osc_pt2pt_header_t *header; }; typedef struct ompi_osc_pt2pt_ddt_buffer_t ompi_osc_pt2pt_ddt_buffer_t; static void ompi_osc_pt2pt_ddt_buffer_constructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer) { ddt_buffer->header = NULL; } static void ompi_osc_pt2pt_ddt_buffer_destructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer) { if (ddt_buffer->header) { free (ddt_buffer->header); ddt_buffer->header = NULL; } } OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_ddt_buffer_t); OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_ddt_buffer_t, opal_list_item_t, ompi_osc_pt2pt_ddt_buffer_constructor, ompi_osc_pt2pt_ddt_buffer_destructor); /* end ompi_osc_pt2pt_ddt_buffer_t class */ /** * datatype_buffer_length: * * @brief Determine the buffer size needed to hold count elements of datatype. * * @param[in] datatype - Element type * @param[in] count - Element count * * @returns buflen Buffer length needed to hold count elements of datatype */ static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count) { ompi_datatype_t *primitive_datatype = NULL; uint32_t primitive_count; size_t buflen; ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count); primitive_count *= count; /* figure out how big a buffer we need */ ompi_datatype_type_size(primitive_datatype, &buflen); return buflen * primitive_count; } /** * ompi_osc_pt2pt_control_send: * * @brief send a control message as part of a fragment * * @param[in] module - OSC PT2PT module * @param[in] target - Target peer's rank * @param[in] data - Data to send * @param[in] len - Length of data * * @returns error OMPI error code or OMPI_SUCCESS * * "send" a control messages. Adds it to the active fragment, so the * caller will still need to explicitly flush (either to everyone or * to a target) before this is sent. */ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target, void *data, size_t len) { ompi_osc_pt2pt_frag_t *frag; char *ptr; int ret; ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr); if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { memcpy (ptr, data, len); ret = ompi_osc_pt2pt_frag_finish(module, frag); } return ret; } static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request) { void *ctx = request->req_complete_cb_data; ompi_osc_pt2pt_module_t *module; /* get module pointer and data */ module = *(ompi_osc_pt2pt_module_t **)ctx; /* mark this send as complete */ mark_outgoing_completion (module); /* free the temporary buffer */ free (ctx); /* put this request on the garbage colletion list */ osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } /** * ompi_osc_pt2pt_control_send_unbuffered: * * @brief Send an unbuffered control message to a peer. * * @param[in] module - OSC PT2PT module * @param[in] target - Target rank * @param[in] data - Data to send * @param[in] len - Length of data * * Directly send a control message. This does not allocate a * fragment, so should only be used when sending other messages would * be erroneous (such as complete messages, when there may be queued * transactions from an overlapping post that has already heard back * from its peer). The buffer specified by data will be available * when this call returns. */ int ompi_osc_pt2pt_control_send_unbuffered(ompi_osc_pt2pt_module_t *module, int target, void *data, size_t len) { void *ctx, *data_copy; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: sending unbuffered fragment to %d", target)); /* allocate a temporary buffer for this send */ ctx = malloc (sizeof(ompi_osc_pt2pt_module_t*) + len); if (OPAL_UNLIKELY(NULL == ctx)) { return OMPI_ERR_OUT_OF_RESOURCE; } /* increment outgoing signal count. this send is not part of a passive epoch * so there it would be erroneous to increment the epoch counters. */ ompi_osc_signal_outgoing (module, MPI_PROC_NULL, 1); /* store module pointer and data in the buffer */ *(ompi_osc_pt2pt_module_t**)ctx = module; data_copy = (ompi_osc_pt2pt_module_t**)ctx + 1; memcpy (data_copy, data, len); return ompi_osc_pt2pt_isend_w_cb (data_copy, len, MPI_BYTE, target, OSC_PT2PT_FRAG_TAG, module->comm, ompi_osc_pt2pt_control_send_unbuffered_cb, ctx); } /** * datatype_create: * * @brief Utility function that creates a new datatype from a packed * description. * * @param[in] module - OSC PT2PT module * @param[in] peer - Peer rank * @param[out] datatype - New datatype. Must be released with OBJ_RELEASE. * @param[out] proc - Optional. Proc for peer. * @param[inout] data - Pointer to a pointer where the description is stored. This * pointer will be updated to the location after the packed * description. */ static inline int datatype_create (ompi_osc_pt2pt_module_t *module, int peer, ompi_proc_t **proc, ompi_datatype_t **datatype, void **data) { ompi_datatype_t *new_datatype = NULL; ompi_proc_t *peer_proc; int ret = OMPI_SUCCESS; do { peer_proc = ompi_comm_peer_lookup(module->comm, peer); if (OPAL_UNLIKELY(NULL == peer_proc)) { OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, "%d: datatype_create: could not resolve proc pointer for peer %d", ompi_comm_rank(module->comm), peer)); ret = OMPI_ERROR; break; } new_datatype = ompi_osc_base_datatype_create(peer_proc, data); if (OPAL_UNLIKELY(NULL == new_datatype)) { OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, "%d: datatype_create: could not resolve datatype for peer %d", ompi_comm_rank(module->comm), peer)); ret = OMPI_ERROR; } } while (0); *datatype = new_datatype; if (proc) *proc = peer_proc; return ret; } /** * process_put: * * @shoer Process a put w/ data message * * @param[in] module - OSC PT2PT module * @param[in] source - Message source * @param[in] put_header - Message header + data * * Process a put message and copy the message data to the specified * memory region. Note, this function does not handle any bounds * checking at the moment. */ static inline int process_put(ompi_osc_pt2pt_module_t* module, int source, ompi_osc_pt2pt_header_put_t* put_header) { char *data = (char*) (put_header + 1); ompi_proc_t *proc; struct ompi_datatype_t *datatype; size_t data_len; void *target = (unsigned char*) module->baseptr + ((unsigned long) put_header->displacement * module->disp_unit); int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_put: received message from %d", ompi_comm_rank(module->comm), source)); ret = datatype_create (module, source, &proc, &datatype, (void **) &data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } data_len = put_header->len - ((uintptr_t) data - (uintptr_t) put_header); osc_pt2pt_copy_on_recv (target, data, data_len, proc, put_header->count, datatype); OBJ_RELEASE(datatype); return put_header->len; } static inline int process_put_long(ompi_osc_pt2pt_module_t* module, int source, ompi_osc_pt2pt_header_put_t* put_header) { char *data = (char*) (put_header + 1); struct ompi_datatype_t *datatype; void *target = (unsigned char*) module->baseptr + ((unsigned long) put_header->displacement * module->disp_unit); int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_put_long: received message from %d", ompi_comm_rank(module->comm), source)); ret = datatype_create (module, source, NULL, &datatype, (void **) &data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } ret = ompi_osc_pt2pt_component_irecv (module, target, put_header->count, datatype, source, put_header->tag, module->comm); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, "%d: process_put_long: irecv error: %d", ompi_comm_rank(module->comm), ret)); return OMPI_ERROR; } OBJ_RELEASE(datatype); return put_header->len; } /** * osc_pt2pt_incoming_req_complete: * * @brief Completion callback for a receive associate with an access * epoch. * * @param[in] request - PML request with an OSC RMDA module as the callback data. * * This function is called when a send or recieve associated with an * access epoch completes. When fired this function will increment the * passive or active incoming count. */ static int osc_pt2pt_incoming_req_complete (ompi_request_t *request) { ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data; int rank = MPI_PROC_NULL; if (request->req_status.MPI_TAG & 0x01) { rank = request->req_status.MPI_SOURCE; } mark_incoming_completion (module, rank); /* put this request on the garbage colletion list */ osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } struct osc_pt2pt_get_post_send_cb_data_t { ompi_osc_pt2pt_module_t *module; int peer; }; static int osc_pt2pt_get_post_send_cb (ompi_request_t *request) { struct osc_pt2pt_get_post_send_cb_data_t *data = (struct osc_pt2pt_get_post_send_cb_data_t *) request->req_complete_cb_data; ompi_osc_pt2pt_module_t *module = data->module; int rank = data->peer; free (data); /* mark this as a completed "incoming" request */ mark_incoming_completion (module, rank); /* put this request on the garbage colletion list */ osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } /** * @brief Post a send to match the remote receive for a get operation. * * @param[in] module - OSC PT2PT module * @param[in] source - Source buffer * @param[in] count - Number of elements in the source buffer * @param[in] datatype - Type of source elements. * @param[in] peer - Remote process that has the receive posted * @param[in] tag - Tag for the send * * This function posts a send to match the receive posted as part * of a get operation. When this send is complete the get is considered * complete at the target (this process). */ static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *source, int count, ompi_datatype_t *datatype, int peer, int tag) { struct osc_pt2pt_get_post_send_cb_data_t *data; int ret; data = malloc (sizeof (*data)); if (OPAL_UNLIKELY(NULL == data)) { return OMPI_ERR_OUT_OF_RESOURCE; } data->module = module; /* for incoming completion we need to know the peer (MPI_PROC_NULL if this is * in an active target epoch) */ data->peer = (tag & 0x1) ? peer : MPI_PROC_NULL; /* data will be freed by the callback */ ret = ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm, osc_pt2pt_get_post_send_cb, (void *) data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { free (data); } return ret; } /** * process_get: * * @brief Process a get message from a remote peer * * @param[in] module - OSC PT2PT module * @param[in] target - Peer process * @param[in] get_header - Incoming message header */ static inline int process_get (ompi_osc_pt2pt_module_t* module, int target, ompi_osc_pt2pt_header_get_t* get_header) { char *data = (char *) (get_header + 1); struct ompi_datatype_t *datatype; void *source = (unsigned char*) module->baseptr + ((unsigned long) get_header->displacement * module->disp_unit); int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_get: received message from %d", ompi_comm_rank(module->comm), target)); ret = datatype_create (module, target, NULL, &datatype, (void **) &data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } /* send get data */ ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype, target, get_header->tag); OBJ_RELEASE(datatype); return OMPI_SUCCESS == ret ? (int) get_header->len : ret; } /** * osc_pt2pt_accumulate_buffer: * * @brief Accumulate data into the target buffer. * * @param[in] target - Target buffer * @param[in] source - Source buffer * @param[in] source_len - Length of source buffer in bytes * @param[in] proc - Source proc * @param[in] count - Number of elements in target buffer * @param[in] datatype - Type of elements in target buffer * @param[in] op - Operation to be performed */ static inline int osc_pt2pt_accumulate_buffer (void *target, void *source, size_t source_len, ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op) { int ret; assert (NULL != target && NULL != source); if (op == &ompi_mpi_op_replace.op) { osc_pt2pt_copy_on_recv (target, source, source_len, proc, count, datatype); return OMPI_SUCCESS; } #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT if (proc->super.proc_arch != ompi_proc_local()->super.proc_arch) { ompi_datatype_t *primitive_datatype = NULL; uint32_t primitive_count; size_t buflen; void *buffer; ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count); primitive_count *= count; /* figure out how big a buffer we need */ ompi_datatype_type_size(primitive_datatype, &buflen); buflen *= primitive_count; buffer = malloc (buflen); if (OPAL_UNLIKELY(NULL == buffer)) { return OMPI_ERR_OUT_OF_RESOURCE; } osc_pt2pt_copy_on_recv (buffer, source, source_len, proc, primitive_count, primitive_datatype); ret = ompi_osc_base_process_op(target, buffer, source_len, datatype, count, op); free(buffer); } else #endif /* copy the data from the temporary buffer into the user window */ ret = ompi_osc_base_process_op(target, source, source_len, datatype, count, op); return ret; } /** * @brief Create an accumulate data object. * * @param[in] module - PT2PT OSC module * @param[in] target - Target for the accumulation * @param[in] source - Source of accumulate data. Must be allocated with malloc/calloc/etc * @param[in] source_len - Length of the source buffer in bytes * @param[in] proc - Source proc * @param[in] count - Number of elements to accumulate * @param[in] datatype - Datatype to accumulate * @oaram[in] op - Operator * @param[in] request_count - Number of prerequisite requests * @param[out] acc_data_out - New accumulation data * * This function is used to create a copy of the data needed to perform an accumulation. * This data should be provided to ompi_osc_pt2pt_isend_w_cb or ompi_osc_pt2pt_irecv_w_cb * as the ctx parameter with accumulate_cb as the cb parameter. */ static int osc_pt2pt_accumulate_allocate (ompi_osc_pt2pt_module_t *module, int peer, void *target, void *source, size_t source_len, ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op, int request_count, osc_pt2pt_accumulate_data_t **acc_data_out) { osc_pt2pt_accumulate_data_t *acc_data; acc_data = OBJ_NEW(osc_pt2pt_accumulate_data_t); if (OPAL_UNLIKELY(NULL == acc_data)) { return OMPI_ERR_OUT_OF_RESOURCE; } acc_data->module = module; acc_data->peer = peer; acc_data->target = target; acc_data->source = source; acc_data->source_len = source_len; acc_data->proc = proc; acc_data->count = count; acc_data->datatype = datatype; OBJ_RETAIN(datatype); acc_data->op = op; OBJ_RETAIN(op); acc_data->request_count = request_count; *acc_data_out = acc_data; return OMPI_SUCCESS; } /** * @brief Execute the accumulate once the request counter reaches 0. * * @param[in] request - request * * The request should be created with ompi_osc_pt2pt_isend_w_cb or ompi_osc_pt2pt_irecv_w_cb * with ctx allocated by osc_pt2pt_accumulate_allocate. This callback will free the accumulate * data once the accumulation operation is complete. */ static int accumulate_cb (ompi_request_t *request) { struct osc_pt2pt_accumulate_data_t *acc_data = (struct osc_pt2pt_accumulate_data_t *) request->req_complete_cb_data; ompi_osc_pt2pt_module_t *module = acc_data->module; int rank = MPI_PROC_NULL; int ret = OMPI_SUCCESS; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "accumulate_cb, request_count = %d", acc_data->request_count)); if (request->req_status.MPI_TAG & 0x01) { rank = acc_data->peer; } mark_incoming_completion (module, rank); if (0 == OPAL_THREAD_ADD32(&acc_data->request_count, -1)) { /* no more requests needed before the buffer can be accumulated */ if (acc_data->source) { ompi_datatype_t *primitive_datatype = NULL; uint32_t primitive_count; assert (NULL != acc_data->target && NULL != acc_data->source); ompi_osc_base_get_primitive_type_info(acc_data->datatype, &primitive_datatype, &primitive_count); primitive_count *= acc_data->count; if (acc_data->op == &ompi_mpi_op_replace.op) { ret = ompi_datatype_sndrcv(acc_data->source, primitive_count, primitive_datatype, acc_data->target, acc_data->count, acc_data->datatype); } else { ret = ompi_osc_base_process_op(acc_data->target, acc_data->source, acc_data->source_len, acc_data->datatype, acc_data->count, acc_data->op); } } /* drop the accumulate lock */ ompi_osc_pt2pt_accumulate_unlock (module); osc_pt2pt_gc_add_buffer (module, &acc_data->super); } /* put this request on the garbage colletion list */ osc_pt2pt_gc_add_request (module, request); return ret; } static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source, char *data, size_t data_len, ompi_datatype_t *datatype) { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); osc_pt2pt_pending_acc_t *pending_acc; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: queuing accumulate operation", ompi_comm_size (module->comm))); pending_acc = OBJ_NEW(osc_pt2pt_pending_acc_t); if (OPAL_UNLIKELY(NULL == pending_acc)) { return OMPI_ERR_OUT_OF_RESOURCE; } /* NTH: ensure we don't leave wait/process_flush/etc until this * accumulate operation is complete. */ OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); pending_acc->source = source; /* save any inline data (eager acc, gacc only) */ pending_acc->data_len = data_len; if (data_len) { pending_acc->data = malloc (data_len); memcpy (pending_acc->data, data, data_len); } /* save the datatype */ pending_acc->datatype = datatype; OBJ_RETAIN(datatype); /* save the header */ switch (header->base.type) { case OMPI_OSC_PT2PT_HDR_TYPE_ACC: case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG: case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC: case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG: pending_acc->header.acc = header->acc; break; case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP: pending_acc->header.cswap = header->cswap; break; default: /* it is a coding error if any other header types are queued this way */ assert (0); } /* add to the pending acc queue */ OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super)); return OMPI_SUCCESS; } static int replace_cb (ompi_request_t *request) { ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data; int rank = MPI_PROC_NULL; if (request->req_status.MPI_TAG & 0x01) { rank = request->req_status.MPI_SOURCE; } mark_incoming_completion (module, rank); /* put this request on the garbage colletion list */ osc_pt2pt_gc_add_request (module, request); /* unlock the accumulate lock */ ompi_osc_pt2pt_accumulate_unlock (module); return OMPI_SUCCESS; } /** * ompi_osc_pt2pt_acc_start: * * @brief Start an accumulate with data operation. * * @param[in] module - OSC PT2PT module * @param[in] source - Source rank * @param[in] data - Accumulate data * @param[in] data_len - Length of the accumulate data * @param[in] datatype - Accumulation datatype * @param[in] acc_header - Accumulate header * * The module's accumulation lock must be held before calling this * function. It will release the lock when the operation is complete. */ static int ompi_osc_pt2pt_acc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len, ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header) { void *target = (unsigned char*) module->baseptr + ((unsigned long) acc_header->displacement * module->disp_unit); struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op); ompi_proc_t *proc; int ret; proc = ompi_comm_peer_lookup(module->comm, source); assert (NULL != proc); ret = osc_pt2pt_accumulate_buffer (target, data, data_len, proc, acc_header->count, datatype, op); OBJ_RELEASE(op); ompi_osc_pt2pt_accumulate_unlock (module); return ret; } /** * ompi_osc_pt2pt_acc_start: * * @brief Start a long accumulate operation. * * @param[in] module - OSC PT2PT module * @param[in] source - Source rank * @param[in] datatype - Accumulation datatype * @param[in] acc_header - Accumulate header * * The module's accumulation lock must be held before calling this * function. It will release the lock when the operation is complete. */ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header) { struct osc_pt2pt_accumulate_data_t *acc_data; size_t buflen; void *buffer; ompi_proc_t *proc; void *target = (unsigned char*) module->baseptr + ((unsigned long) acc_header->displacement * module->disp_unit); struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op); ompi_datatype_t *primitive_datatype; uint32_t primitive_count; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_acc_long_start starting...")); proc = ompi_comm_peer_lookup(module->comm, source); assert (NULL != proc); do { if (op == &ompi_mpi_op_replace.op) { ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype, source, acc_header->tag, module->comm, NULL, replace_cb, module); break; } ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; } primitive_count *= acc_header->count; buflen = datatype_buffer_length (datatype, acc_header->count); /* allocate a temporary buffer to receive the accumulate data */ buffer = malloc (buflen); if (OPAL_UNLIKELY(NULL == buffer)) { ret = OMPI_ERR_OUT_OF_RESOURCE; break; } ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count, datatype, op, 1, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { free (buffer); break; } ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype, source, acc_header->tag, module->comm, NULL, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(acc_data); } } while (0); OBJ_RELEASE(op); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { ompi_osc_pt2pt_accumulate_unlock (module); } return ret; } /** * ompi_osc_pt2pt_gacc_start: * * @brief Start a accumulate with data + get operation. * * @param[in] module - OSC PT2PT module * @param[in] source - Source rank * @param[in] data - Accumulate data. Must be allocated on the heap. * @param[in] data_len - Length of the accumulate data * @param[in] datatype - Accumulation datatype * @param[in] get_acc_header - Accumulate header * * The module's accumulation lock must be held before calling this * function. It will release the lock when the operation is complete. */ static int ompi_osc_pt2pt_gacc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len, ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header) { void *target = (unsigned char*) module->baseptr + ((unsigned long) acc_header->displacement * module->disp_unit); struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op); struct osc_pt2pt_accumulate_data_t *acc_data; ompi_proc_t *proc; int ret; proc = ompi_comm_peer_lookup(module->comm, source); assert (NULL != proc); do { ret = osc_pt2pt_accumulate_allocate (module, source, target, data, data_len, proc, acc_header->count, datatype, op, 1, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; } ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype, source, acc_header->tag, module->comm, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(acc_data); } } while (0); OBJ_RELEASE(op); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { ompi_osc_pt2pt_accumulate_unlock (module); } return ret; } /** * ompi_osc_pt2pt_gacc_long_start: * * @brief Start a long accumulate + get operation. * * @param[in] module - OSC PT2PT module * @param[in] source - Source rank * @param[in] datatype - Accumulation datatype * @param[in] acc_header - Accumulate header * * The module's accumulation lock must be held before calling this * function. It will release the lock when the operation is complete. */ static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header) { void *target = (unsigned char*) module->baseptr + ((unsigned long) acc_header->displacement * module->disp_unit); struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op); struct osc_pt2pt_accumulate_data_t *acc_data; ompi_datatype_t *primitive_datatype; ompi_request_t *recv_request; uint32_t primitive_count; ompi_proc_t *proc; size_t buflen; void *buffer; int ret; proc = ompi_comm_peer_lookup(module->comm, source); assert (NULL != proc); /* allocate a temporary buffer to receive the accumulate data */ buflen = datatype_buffer_length (datatype, acc_header->count); do { ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; } primitive_count *= acc_header->count; buffer = malloc (buflen); if (OPAL_UNLIKELY(NULL == buffer)) { ret = OMPI_ERR_OUT_OF_RESOURCE; break; } ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count, datatype, op, 2, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { free (buffer); break; } ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag, module->comm, &recv_request, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(acc_data); break; } ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype, source, acc_header->tag, module->comm, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { /* cancel the receive and free the accumulate data */ ompi_request_cancel (recv_request); OBJ_RELEASE(acc_data); break; } } while (0); OBJ_RELEASE(op); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { ompi_osc_pt2pt_accumulate_unlock (module); } return ret; } /** * ompi_osc_pt2pt_cswap_start: * * @brief Start a compare and swap operation * * @param[in] module - OSC PT2PT module * @param[in] source - Source rank * @param[in] data - Compare and swap data * @param[in] data_len - Length of the compare and swap data. Must be exactly * twice the size of the datatype. * @param[in] datatype - Compare and swap datatype * @param[in] cswap_header - Compare and swap header * * The module's accumulation lock must be held before calling this * function. It will release the lock when the operation is complete. */ static int ompi_osc_pt2pt_cswap_start (ompi_osc_pt2pt_module_t *module, int source, void *data, ompi_datatype_t *datatype, ompi_osc_pt2pt_header_cswap_t *cswap_header) { void *target = (unsigned char*) module->baseptr + ((unsigned long) cswap_header->displacement * module->disp_unit); void *compare_addr, *origin_addr; size_t datatype_size; ompi_proc_t *proc; int ret; proc = ompi_comm_peer_lookup(module->comm, source); assert (NULL != proc); datatype_size = datatype->super.size; origin_addr = data; compare_addr = (void *)((uintptr_t) data + datatype_size); do { /* no reason to do a non-blocking send here */ ret = MCA_PML_CALL(send(target, 1, datatype, source, cswap_header->tag, MCA_PML_BASE_SEND_STANDARD, module->comm)); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; } /* increment the incoming fragment count so it matches what is expected */ mark_incoming_completion (module, (cswap_header->tag & 0x1) ? source : MPI_PROC_NULL); if (0 == memcmp (target, compare_addr, datatype_size)) { osc_pt2pt_copy_on_recv (target, origin_addr, datatype_size, proc, 1, datatype); } } while (0); ompi_osc_pt2pt_accumulate_unlock (module); return ret; } /** * ompi_osc_pt2pt_progress_pending_acc: * * @brief Progress one pending accumulation or compare and swap operation. * * @param[in] module - OSC PT2PT module * * If the accumulation lock can be aquired progress one pending * accumulate or compare and swap operation. */ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module) { osc_pt2pt_pending_acc_t *pending_acc; int ret; /* try to aquire the lock. it will be unlocked when the accumulate or cswap * operation completes */ if (ompi_osc_pt2pt_accumulate_trylock (module)) { return OMPI_SUCCESS; } pending_acc = (osc_pt2pt_pending_acc_t *) opal_list_remove_first (&module->pending_acc); if (OPAL_UNLIKELY(NULL == pending_acc)) { /* called without any pending accumulation operations */ ompi_osc_pt2pt_accumulate_unlock (module); return OMPI_SUCCESS; } switch (pending_acc->header.base.type) { case OMPI_OSC_PT2PT_HDR_TYPE_ACC: ret = ompi_osc_pt2pt_acc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len, pending_acc->datatype, &pending_acc->header.acc); free (pending_acc->data); break; case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG: ret = ompi_osc_pt2pt_acc_long_start (module, pending_acc->source, pending_acc->datatype, &pending_acc->header.acc); break; case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC: ret = ompi_osc_pt2pt_gacc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len, pending_acc->datatype, &pending_acc->header.acc); break; case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG: ret = ompi_osc_gacc_long_start (module, pending_acc->source, pending_acc->datatype, &pending_acc->header.acc); break; case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP: ret = ompi_osc_pt2pt_cswap_start (module, pending_acc->source, pending_acc->data, pending_acc->datatype, &pending_acc->header.cswap); break; default: ret = OMPI_ERROR; /* it is a coding error if this point is reached */ assert (0); } /* signal that an operation is complete */ mark_incoming_completion (module, pending_acc->source); pending_acc->data = NULL; OBJ_RELEASE(pending_acc); return ret; } static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_acc_t *acc_header) { char *data = (char *) (acc_header + 1); struct ompi_datatype_t *datatype; uint64_t data_len; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_acc: received message from %d", ompi_comm_rank(module->comm), source)); ret = datatype_create (module, source, NULL, &datatype, (void **) &data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } data_len = acc_header->len - ((char*) data - (char*) acc_header); /* try to aquire the accumulate lock */ if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) { ret = ompi_osc_pt2pt_acc_start (module, source, data, data_len, datatype, acc_header); } else { /* couldn't aquire the accumulate lock so queue up the accumulate operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source, data, data_len, datatype); } /* Release datatype & op */ OBJ_RELEASE(datatype); return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret; } static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source, ompi_osc_pt2pt_header_acc_t* acc_header) { char *data = (char *) (acc_header + 1); struct ompi_datatype_t *datatype; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_acc_long: received message from %d", ompi_comm_rank(module->comm), source)); ret = datatype_create (module, source, NULL, &datatype, (void **) &data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) { ret = ompi_osc_pt2pt_acc_long_start (module, source, datatype, acc_header); } else { /* queue the operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source, NULL, 0, datatype); } /* Release datatype & op */ OBJ_RELEASE(datatype); return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret; } static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_acc_t *acc_header) { char *data = (char *) (acc_header + 1); struct ompi_datatype_t *datatype; void *buffer = NULL; uint64_t data_len; ompi_proc_t * proc; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_get_acc: received message from %d", ompi_comm_rank(module->comm), source)); ret = datatype_create (module, source, &proc, &datatype, (void **) &data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } data_len = acc_header->len - ((char*) data - (char*) acc_header); if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) { /* make a copy of the data since the buffer needs to be returned */ if (data_len) { ompi_datatype_t *primitive_datatype = NULL; uint32_t primitive_count; buffer = malloc (data_len); if (OPAL_UNLIKELY(NULL == buffer)) { OBJ_RELEASE(datatype); return OMPI_ERR_OUT_OF_RESOURCE; } ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count); primitive_count *= acc_header->count; osc_pt2pt_copy_on_recv (buffer, data, data_len, proc, primitive_count, primitive_datatype); } ret = ompi_osc_pt2pt_gacc_start (module, source, buffer, data_len, datatype, acc_header); } else { /* queue the operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source, data, data_len, datatype); } /* Release datatype & op */ OBJ_RELEASE(datatype); return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret; } static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_acc_t *acc_header) { char *data = (char *) (acc_header + 1); struct ompi_datatype_t *datatype; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_acc: received message from %d", ompi_comm_rank(module->comm), source)); ret = datatype_create (module, source, NULL, &datatype, (void **) &data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) { ret = ompi_osc_gacc_long_start (module, source, datatype, acc_header); } else { /* queue the operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source, NULL, 0, datatype); } /* Release datatype & op */ OBJ_RELEASE(datatype); return OMPI_SUCCESS == ret ? (int) acc_header->len : ret; } static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_cswap_t *cswap_header) { char *data = (char*) (cswap_header + 1); struct ompi_datatype_t *datatype; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_cswap: received message from %d", ompi_comm_rank(module->comm), source)); ret = datatype_create (module, source, NULL, &datatype, (void **) &data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) { ret = ompi_osc_pt2pt_cswap_start (module, source, data, datatype, cswap_header); } else { /* queue the operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) cswap_header, source, data, 2 * datatype->super.size, datatype); } /* Release datatype */ OBJ_RELEASE(datatype); return (OMPI_SUCCESS == ret) ? (int) cswap_header->len : ret; } static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_complete_t *complete_header) { /* the current fragment is not part of the frag_count so we need to add it here */ osc_pt2pt_incoming_complete (module, source, complete_header->frag_count + 1); return sizeof (*complete_header); } /* flush and unlock headers cannot be processed from the request callback * because some btls do not provide re-entrant progress functions. these * fragment will be progressed by the pt2pt component's progress function */ static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_flush_t *flush_header) { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "process_flush header = {.frag_count = %d}", flush_header->frag_count)); /* increase signal count by incoming frags */ OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) flush_header->frag_count); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "%d: process_flush: received message from %d. passive_incoming_frag_count = %d", ompi_comm_rank(module->comm), source, peer->passive_incoming_frag_count)); ret = ompi_osc_pt2pt_process_flush (module, source, flush_header); if (OMPI_SUCCESS != ret) { ompi_osc_pt2pt_pending_t *pending; pending = OBJ_NEW(ompi_osc_pt2pt_pending_t); pending->module = module; pending->source = source; pending->header.flush = *flush_header; osc_pt2pt_add_pending (pending); } /* signal incomming will increment this counter */ OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); return sizeof (*flush_header); } static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_unlock_t *unlock_header) { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "process_unlock header = {.frag_count = %d}", unlock_header->frag_count)); /* increase signal count by incoming frags */ OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) unlock_header->frag_count); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: processing unlock request from %d. frag count = %d, processed_count = %d", source, unlock_header->frag_count, (int) peer->passive_incoming_frag_count)); ret = ompi_osc_pt2pt_process_unlock (module, source, unlock_header); if (OMPI_SUCCESS != ret) { ompi_osc_pt2pt_pending_t *pending; pending = OBJ_NEW(ompi_osc_pt2pt_pending_t); pending->module = module; pending->source = source; pending->header.unlock = *unlock_header; osc_pt2pt_add_pending (pending); } /* signal incomming will increment this counter */ OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); return sizeof (*unlock_header); } static int process_large_datatype_request_cb (ompi_request_t *request) { ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer = (ompi_osc_pt2pt_ddt_buffer_t *) request->req_complete_cb_data; ompi_osc_pt2pt_module_t *module = ddt_buffer->module; ompi_osc_pt2pt_header_t *header = ddt_buffer->header; int source = ddt_buffer->source; /* process the request */ switch (header->base.type) { case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG: (void) process_put_long (module, source, &header->put); break; case OMPI_OSC_PT2PT_HDR_TYPE_GET: (void) process_get (module, source, &header->get); break; case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG: (void) process_acc_long (module, source, &header->acc); break; case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG: (void) process_get_acc_long (module, source, &header->acc); break; default: /* developer error */ assert (0); return OMPI_ERROR; } /* put this request on the garbage colletion list */ osc_pt2pt_gc_add_request (module, request); /* free the datatype buffer */ osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super); return OMPI_SUCCESS; } /** * @short process a request with a large datatype * * @param[in] module - OSC PT2PT module * @param[in] source - header source * @param[in] header - header to process * * It is possible to construct datatypes whos description is too large * to fit in an OSC PT2PT fragment. In this case the remote side posts * a send of the datatype description. This function posts the matching * receive and processes the header on completion. */ static int process_large_datatype_request (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_t *header) { ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer; int header_len, tag, ret; uint64_t ddt_len; /* determine the header size and receive tag */ switch (header->base.type) { case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG: header_len = sizeof (header->put); tag = header->put.tag; break; case OMPI_OSC_PT2PT_HDR_TYPE_GET: header_len = sizeof (header->get); tag = header->get.tag; break; case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG: header_len = sizeof (header->acc); tag = header->acc.tag; break; case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG: header_len = sizeof (header->acc); tag = header->acc.tag; break; default: /* developer error */ opal_output (0, "Unsupported header/flag combination"); return OMPI_ERROR; } ddt_len = *((uint64_t *)((uintptr_t) header + header_len)); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "process_large_datatype_request: processing fragment with type %d. ddt_len %lu", header->base.type, (unsigned long) ddt_len)); ddt_buffer = OBJ_NEW(ompi_osc_pt2pt_ddt_buffer_t); if (OPAL_UNLIKELY(NULL == ddt_buffer)) { return OMPI_ERR_OUT_OF_RESOURCE; } ddt_buffer->module = module; ddt_buffer->source = source; ddt_buffer->header = malloc (ddt_len + header_len); if (OPAL_UNLIKELY(NULL == ddt_buffer->header)) { OBJ_RELEASE(ddt_buffer); return OMPI_ERR_OUT_OF_RESOURCE; } memcpy (ddt_buffer->header, header, header_len); ret = ompi_osc_pt2pt_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len), ddt_len, MPI_BYTE, source, tag, module->comm, NULL, process_large_datatype_request_cb, ddt_buffer); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(ddt_buffer); return ret; } return header_len + 8; } /* * Do all the data movement associated with a fragment */ static inline int process_frag (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_header_t *frag) { ompi_osc_pt2pt_header_t *header; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: process_frag: from %d, ops %d", (int) frag->source, (int) frag->num_ops)); header = (ompi_osc_pt2pt_header_t *) (frag + 1); for (int i = 0 ; i < frag->num_ops ; ++i) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: process_frag: type 0x%x. flag 0x%x. offset %u", header->base.type, (unsigned) ((uintptr_t)header - (uintptr_t)frag), header->base.flags)); if (OPAL_LIKELY(!(header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE))) { osc_pt2pt_ntoh(header); switch (header->base.type) { case OMPI_OSC_PT2PT_HDR_TYPE_PUT: ret = process_put(module, frag->source, &header->put); break; case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG: ret = process_put_long(module, frag->source, &header->put); break; case OMPI_OSC_PT2PT_HDR_TYPE_ACC: ret = process_acc(module, frag->source, &header->acc); break; case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG: ret = process_acc_long (module, frag->source, &header->acc); break; case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ: ret = ompi_osc_pt2pt_process_lock(module, frag->source, &header->lock); if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { ret = sizeof (header->lock); } break; case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ: ret = process_unlock(module, frag->source, &header->unlock); break; case OMPI_OSC_PT2PT_HDR_TYPE_GET: ret = process_get (module, frag->source, &header->get); break; case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP: ret = process_cswap (module, frag->source, &header->cswap); break; case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC: ret = process_get_acc (module, frag->source, &header->acc); break; case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG: ret = process_get_acc_long (module, frag->source, &header->acc); break; case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ: ret = process_flush (module, frag->source, &header->flush); break; case OMPI_OSC_PT2PT_HDR_TYPE_COMPLETE: ret = process_complete (module, frag->source, &header->complete); break; default: opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type); abort(); /* FIX ME */ } } else { ret = process_large_datatype_request (module, frag->source, header); } if (ret <= 0) { opal_output(0, "Error processing fragment: %d", ret); abort(); /* FIX ME */ } /* the next header will start on an 8-byte boundary. this is done to ensure * that the next header and the packed datatype is properly aligned */ header = (ompi_osc_pt2pt_header_t *) OPAL_ALIGN(((uintptr_t) header + ret), 8, uintptr_t); } return OMPI_SUCCESS; } /* dispatch for callback on message completion */ static int ompi_osc_pt2pt_callback (ompi_request_t *request) { ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data; ompi_osc_pt2pt_header_t *base_header = (ompi_osc_pt2pt_header_t *) module->incoming_buffer; size_t incoming_length = request->req_status._ucount; int source = request->req_status.MPI_SOURCE; OPAL_THREAD_UNLOCK(&ompi_request_lock); assert(incoming_length >= sizeof(ompi_osc_pt2pt_header_base_t)); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "received pt2pt callback for fragment. source = %d, count = %u, type = 0x%x", source, (unsigned) incoming_length, base_header->base.type)); osc_pt2pt_ntoh(base_header); switch (base_header->base.type) { case OMPI_OSC_PT2PT_HDR_TYPE_FRAG: process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header); /* only data fragments should be included in the completion counters */ mark_incoming_completion (module, (base_header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL); break; case OMPI_OSC_PT2PT_HDR_TYPE_POST: osc_pt2pt_incoming_post (module, source); break; case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK: ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header); break; case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK: ompi_osc_pt2pt_process_flush_ack (module, source, (ompi_osc_pt2pt_header_flush_ack_t *) base_header); break; case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_ACK: ompi_osc_pt2pt_process_unlock_ack (module, source, (ompi_osc_pt2pt_header_unlock_ack_t *) base_header); break; default: OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "received unexpected message of type %x", (int) base_header->base.type)); } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "finished processing incoming messages")); osc_pt2pt_gc_clean (module); /* put this request on the garbage colletion list */ osc_pt2pt_gc_add_request (module, request); ompi_osc_pt2pt_frag_start_receive (module); OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "finished posting receive request")); return OMPI_SUCCESS; } int ompi_osc_pt2pt_frag_start_receive (ompi_osc_pt2pt_module_t *module) { return ompi_osc_pt2pt_irecv_w_cb (module->incoming_buffer, mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t), MPI_BYTE, OMPI_ANY_SOURCE, OSC_PT2PT_FRAG_TAG, module->comm, &module->frag_request, ompi_osc_pt2pt_callback, module); } int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf, size_t count, struct ompi_datatype_t *datatype, int src, int tag, struct ompi_communicator_t *comm) { return ompi_osc_pt2pt_irecv_w_cb (buf, count, datatype, src, tag, comm, NULL, osc_pt2pt_incoming_req_complete, module); } static int isend_completion_cb(ompi_request_t *request) { ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, "isend_completion_cb called")); mark_outgoing_completion(module); /* put this request on the garbage colletion list */ osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, const void *buf, size_t count, struct ompi_datatype_t *datatype, int dest, int tag, struct ompi_communicator_t *comm) { return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm, isend_completion_cb, module); } int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, ompi_datatype_t *datatype, int target, int tag, ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx) { ompi_request_t *request; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: ompi_osc_pt2pt_isend_w_cb sending %d bytes to %d with tag %d", count, target, tag)); ret = MCA_PML_CALL(isend_init((void *)ptr, count, datatype, target, tag, MCA_PML_BASE_SEND_STANDARD, comm, &request)); if (OMPI_SUCCESS != ret) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "error sending fragment. ret = %d", ret)); return ret; } request->req_complete_cb = cb; request->req_complete_cb_data = ctx; ret = MCA_PML_CALL(start(1, &request)); return ret; } int ompi_osc_pt2pt_irecv_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag, ompi_communicator_t *comm, ompi_request_t **request_out, ompi_request_complete_fn_t cb, void *ctx) { ompi_request_t *request; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: ompi_osc_pt2pt_irecv_w_cb receiving %d bytes from %d with tag %d", count, target, tag)); ret = MCA_PML_CALL(irecv_init(ptr, count, datatype, target, tag, comm, &request)); if (OMPI_SUCCESS != ret) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "error posting receive. ret = %d", ret)); return ret; } request->req_complete_cb = cb; request->req_complete_cb_data = ctx; if (request_out) { *request_out = request; } ret = MCA_PML_CALL(start(1, &request)); return ret; }