diff --git a/ompi/mca/osc/rdma/osc_rdma.c b/ompi/mca/osc/rdma/osc_rdma.c index cb5f462588..48e6e229cf 100644 --- a/ompi/mca/osc/rdma/osc_rdma.c +++ b/ompi/mca/osc/rdma/osc_rdma.c @@ -85,9 +85,10 @@ ompi_osc_rdma_free(ompi_win_t *win) OBJ_DESTRUCT(&module->pending_acc); - osc_rdma_request_gc_clean (module); + osc_rdma_gc_clean (module); assert (0 == opal_list_get_size (&module->request_gc)); OBJ_DESTRUCT(&module->request_gc); + OBJ_DESTRUCT(&module->buffer_gc); if (NULL != module->peers) { free(module->peers); diff --git a/ompi/mca/osc/rdma/osc_rdma.h b/ompi/mca/osc/rdma/osc_rdma.h index d316cd7d7f..5c2cd005a7 100644 --- a/ompi/mca/osc/rdma/osc_rdma.h +++ b/ompi/mca/osc/rdma/osc_rdma.h @@ -193,6 +193,8 @@ struct ompi_osc_rdma_module_t { ompi_request_t *frag_request; opal_list_t request_gc; + opal_list_t buffer_gc; + /* enforce accumulate semantics */ opal_atomic_lock_t accumulate_lock; opal_list_t pending_acc; @@ -554,21 +556,27 @@ static inline void osc_rdma_copy_for_send (void *target, size_t target_len, void /** * osc_rdma_request_gc_clean: * - * @short Release finished PML requests. + * @short Release finished PML requests and accumulate buffers. * * @param[in] module - OSC RDMA module * * @long This function exists because it is not possible to free a PML request - * from a request completion callback. We instead put the request on the - * module's garbage collection list and release it at a later time. + * or buffer from a request completion callback. We instead put requests + * and buffers on the module's garbage collection lists and release then + * at a later time. */ -static inline void osc_rdma_request_gc_clean (ompi_osc_rdma_module_t *module) +static inline void osc_rdma_gc_clean (ompi_osc_rdma_module_t *module) { ompi_request_t *request; + opal_list_item_t *item; while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) { ompi_request_free (&request); } + + while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) { + OBJ_RELEASE(item); + } } #define OSC_RDMA_FRAG_TAG 0x10000 diff --git a/ompi/mca/osc/rdma/osc_rdma_component.c b/ompi/mca/osc/rdma/osc_rdma_component.c index e99817c8a3..c1e8f4f5c4 100644 --- a/ompi/mca/osc/rdma/osc_rdma_component.c +++ b/ompi/mca/osc/rdma/osc_rdma_component.c @@ -352,6 +352,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit OBJ_CONSTRUCT(&module->locks_pending, opal_list_t); OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t); OBJ_CONSTRUCT(&module->request_gc, opal_list_t); + OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); /* options */ diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.c b/ompi/mca/osc/rdma/osc_rdma_data_move.c index dc5447c1c1..2b0a85a167 100644 --- a/ompi/mca/osc/rdma/osc_rdma_data_move.c +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.c @@ -49,17 +49,46 @@ * @short Data associated with an in-progress accumulation operation. */ struct osc_rdma_accumulate_data_t { + opal_list_item_t super; ompi_osc_rdma_module_t* module; void *target; void *source; size_t source_len; ompi_proc_t *proc; int count; + int peer; ompi_datatype_t *datatype; ompi_op_t *op; int request_count; }; +typedef struct osc_rdma_accumulate_data_t osc_rdma_accumulate_data_t; +static void osc_rdma_accumulate_data_constructor (osc_rdma_accumulate_data_t *acc_data) +{ + acc_data->source = NULL; + acc_data->datatype = NULL; + acc_data->op = NULL; +} + +static void osc_rdma_accumulate_data_destructor (osc_rdma_accumulate_data_t *acc_data) +{ + if (acc_data->source) { + /* the source buffer is always alloc'd */ + free (acc_data->source); + } + + if (acc_data->datatype) { + OBJ_RELEASE(acc_data->datatype); + } + + if (acc_data->op) { + OBJ_RELEASE(acc_data->op); + } +} + +OBJ_CLASS_DECLARATION(osc_rdma_accumulate_data_t); +OBJ_CLASS_INSTANCE(osc_rdma_accumulate_data_t, opal_list_item_t, osc_rdma_accumulate_data_constructor, + osc_rdma_accumulate_data_destructor); /** * osc_rdma_pending_acc_t: @@ -375,11 +404,10 @@ static int osc_rdma_incomming_req_omplete (ompi_request_t *request) { ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data; /* we need to peer rank. get it from the pml request */ - mca_pml_base_request_t *pml_request = (mca_pml_base_request_t *) request; int rank = MPI_PROC_NULL; if (request->req_status.MPI_TAG & 0x01) { - rank = pml_request->req_peer; + rank = request->req_status.MPI_SOURCE; } mark_incoming_completion (module, rank); @@ -528,18 +556,19 @@ static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t * This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb * as the ctx parameter with accumulate_cb as the cb parameter. */ -static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, void *target, void *source, size_t source_len, +static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int peer, void *target, void *source, size_t source_len, ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op, - int request_count, struct osc_rdma_accumulate_data_t **acc_data_out) + int request_count, osc_rdma_accumulate_data_t **acc_data_out) { - struct osc_rdma_accumulate_data_t *acc_data; + osc_rdma_accumulate_data_t *acc_data; - acc_data = malloc (sizeof (*acc_data)); + acc_data = OBJ_NEW(osc_rdma_accumulate_data_t); if (OPAL_UNLIKELY(NULL == acc_data)) { return OMPI_ERR_OUT_OF_RESOURCE; } acc_data->module = module; + acc_data->peer = peer; acc_data->target = target; acc_data->source = source; acc_data->source_len = source_len; @@ -556,17 +585,6 @@ static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, void *t return OMPI_SUCCESS; } -static void osc_rdma_accumulate_free (struct osc_rdma_accumulate_data_t *acc_data) -{ - /* the source is always a temporary buffer */ - free (acc_data->source); - - OBJ_RELEASE(acc_data->datatype); - OBJ_RELEASE(acc_data->op); - - free (acc_data); -} - /** * @short Execute the accumulate once the request counter reaches 0. * @@ -579,30 +597,36 @@ static void osc_rdma_accumulate_free (struct osc_rdma_accumulate_data_t *acc_dat static int accumulate_cb (ompi_request_t *request) { struct osc_rdma_accumulate_data_t *acc_data = (struct osc_rdma_accumulate_data_t *) request->req_complete_cb_data; + ompi_osc_rdma_module_t *module = acc_data->module; + int rank = MPI_PROC_NULL; int ret = OMPI_SUCCESS; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "accumulate_cb, request_count = %d", acc_data->request_count)); - request->req_complete_cb_data = acc_data->module; - osc_rdma_incomming_req_omplete (request); - - --acc_data->request_count; - - if (0 != acc_data->request_count) { - /* more requests needed before the buffer can be accumulated */ - return OMPI_SUCCESS; + if (request->req_status.MPI_TAG & 0x01) { + rank = acc_data->peer; } - if (acc_data->source) { - ret = osc_rdma_accumulate_buffer (acc_data->target, acc_data->source, acc_data->source_len, - acc_data->proc, acc_data->count, acc_data->datatype, acc_data->op); + mark_incoming_completion (module, rank); + + OPAL_THREAD_LOCK(&module->lock); + if (0 == --acc_data->request_count) { + /* no more requests needed before the buffer can be accumulated */ + + if (acc_data->source) { + ret = osc_rdma_accumulate_buffer (acc_data->target, acc_data->source, acc_data->source_len, + acc_data->proc, acc_data->count, acc_data->datatype, acc_data->op); + } + + /* drop the accumulate lock */ + ompi_osc_rdma_accumulate_unlock (module); + + opal_list_append (&module->buffer_gc, &acc_data->super); } - /* drop the accumulate lock */ - ompi_osc_rdma_accumulate_unlock (acc_data->module); - - osc_rdma_accumulate_free (acc_data); + opal_list_append (&module->request_gc, (opal_list_item_t *) request); + OPAL_THREAD_UNLOCK(&module->lock); return ret; } @@ -752,7 +776,7 @@ static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int sou break; } - ret = osc_rdma_accumulate_allocate (module, target, buffer, buflen, proc, acc_header->count, + ret = osc_rdma_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count, datatype, op, 1, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { free (buffer); @@ -762,7 +786,7 @@ static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int sou ret = ompi_osc_rdma_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag, module->comm, NULL, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - osc_rdma_accumulate_free (acc_data); + OBJ_RELEASE(acc_data); } } while (0); @@ -804,7 +828,7 @@ static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source, assert (NULL != proc); do { - ret = osc_rdma_accumulate_allocate (module, target, data, data_len, proc, get_acc_header->count, + ret = osc_rdma_accumulate_allocate (module, source, target, data, data_len, proc, get_acc_header->count, datatype, op, 1, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -813,7 +837,7 @@ static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source, ret = ompi_osc_rdma_isend_w_cb (target, get_acc_header->count, datatype, source, get_acc_header->tag, module->comm, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - osc_rdma_accumulate_free (acc_data); + OBJ_RELEASE(acc_data); } } while (0); @@ -865,8 +889,8 @@ static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, break; } - ret = osc_rdma_accumulate_allocate (module, target, buffer, buflen, proc, get_acc_header->count, - datatype, op, 2, &acc_data); + ret = osc_rdma_accumulate_allocate (module, source, target, buffer, buflen, proc, get_acc_header->count, + datatype, op, 2, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; } @@ -874,7 +898,7 @@ static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, ret = ompi_osc_rdma_irecv_w_cb (buffer, get_acc_header->count, datatype, source, get_acc_header->tag, module->comm, &recv_request, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - osc_rdma_accumulate_free (acc_data); + OBJ_RELEASE(acc_data); break; } @@ -883,7 +907,7 @@ static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) { /* cancel the receive and free the accumulate data */ ompi_request_cancel (recv_request); - osc_rdma_accumulate_free (acc_data); + OBJ_RELEASE(acc_data); break; } } while (0); @@ -1417,7 +1441,8 @@ static int ompi_osc_rdma_callback (ompi_request_t *request) mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL); - osc_rdma_request_gc_clean (module); + osc_rdma_gc_clean (module); + opal_list_append (&module->request_gc, (opal_list_item_t *) request); ompi_osc_rdma_frag_start_receive (module);