From 29e00f91613acc73023a68bdd6b9379ab8786fab Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 12 Mar 2014 14:39:03 +0000 Subject: [PATCH] osc/rdma: fix issues with mpi_leave_pinned when using rdma capable btls It seems we can't release accumulate buffers in completion callbacks because the btls don't release registration resources until after the callback has fired. The fix is to keep track of the unused buffers and free them later. This should resolve issues when running IMB-EXT and IMB-RMA. cmr=v1.7.5:reviewer=jsquyres This commit was SVN r31029. --- ompi/mca/osc/rdma/osc_rdma.c | 3 +- ompi/mca/osc/rdma/osc_rdma.h | 16 +++- ompi/mca/osc/rdma/osc_rdma_component.c | 1 + ompi/mca/osc/rdma/osc_rdma_data_move.c | 107 +++++++++++++++---------- 4 files changed, 81 insertions(+), 46 deletions(-) diff --git a/ompi/mca/osc/rdma/osc_rdma.c b/ompi/mca/osc/rdma/osc_rdma.c index cb5f462588..48e6e229cf 100644 --- a/ompi/mca/osc/rdma/osc_rdma.c +++ b/ompi/mca/osc/rdma/osc_rdma.c @@ -85,9 +85,10 @@ ompi_osc_rdma_free(ompi_win_t *win) OBJ_DESTRUCT(&module->pending_acc); - osc_rdma_request_gc_clean (module); + osc_rdma_gc_clean (module); assert (0 == opal_list_get_size (&module->request_gc)); OBJ_DESTRUCT(&module->request_gc); + OBJ_DESTRUCT(&module->buffer_gc); if (NULL != module->peers) { free(module->peers); diff --git a/ompi/mca/osc/rdma/osc_rdma.h b/ompi/mca/osc/rdma/osc_rdma.h index d316cd7d7f..5c2cd005a7 100644 --- a/ompi/mca/osc/rdma/osc_rdma.h +++ b/ompi/mca/osc/rdma/osc_rdma.h @@ -193,6 +193,8 @@ struct ompi_osc_rdma_module_t { ompi_request_t *frag_request; opal_list_t request_gc; + opal_list_t buffer_gc; + /* enforce accumulate semantics */ opal_atomic_lock_t accumulate_lock; opal_list_t pending_acc; @@ -554,21 +556,27 @@ static inline void osc_rdma_copy_for_send (void *target, size_t target_len, void /** * osc_rdma_request_gc_clean: * - * @short Release finished PML requests. + * @short Release finished PML requests and accumulate buffers. * * @param[in] module - OSC RDMA module * * @long This function exists because it is not possible to free a PML request - * from a request completion callback. We instead put the request on the - * module's garbage collection list and release it at a later time. + * or buffer from a request completion callback. We instead put requests + * and buffers on the module's garbage collection lists and release then + * at a later time. */ -static inline void osc_rdma_request_gc_clean (ompi_osc_rdma_module_t *module) +static inline void osc_rdma_gc_clean (ompi_osc_rdma_module_t *module) { ompi_request_t *request; + opal_list_item_t *item; while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) { ompi_request_free (&request); } + + while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) { + OBJ_RELEASE(item); + } } #define OSC_RDMA_FRAG_TAG 0x10000 diff --git a/ompi/mca/osc/rdma/osc_rdma_component.c b/ompi/mca/osc/rdma/osc_rdma_component.c index e99817c8a3..c1e8f4f5c4 100644 --- a/ompi/mca/osc/rdma/osc_rdma_component.c +++ b/ompi/mca/osc/rdma/osc_rdma_component.c @@ -352,6 +352,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit OBJ_CONSTRUCT(&module->locks_pending, opal_list_t); OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t); OBJ_CONSTRUCT(&module->request_gc, opal_list_t); + OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); /* options */ diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.c b/ompi/mca/osc/rdma/osc_rdma_data_move.c index dc5447c1c1..2b0a85a167 100644 --- a/ompi/mca/osc/rdma/osc_rdma_data_move.c +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.c @@ -49,17 +49,46 @@ * @short Data associated with an in-progress accumulation operation. */ struct osc_rdma_accumulate_data_t { + opal_list_item_t super; ompi_osc_rdma_module_t* module; void *target; void *source; size_t source_len; ompi_proc_t *proc; int count; + int peer; ompi_datatype_t *datatype; ompi_op_t *op; int request_count; }; +typedef struct osc_rdma_accumulate_data_t osc_rdma_accumulate_data_t; +static void osc_rdma_accumulate_data_constructor (osc_rdma_accumulate_data_t *acc_data) +{ + acc_data->source = NULL; + acc_data->datatype = NULL; + acc_data->op = NULL; +} + +static void osc_rdma_accumulate_data_destructor (osc_rdma_accumulate_data_t *acc_data) +{ + if (acc_data->source) { + /* the source buffer is always alloc'd */ + free (acc_data->source); + } + + if (acc_data->datatype) { + OBJ_RELEASE(acc_data->datatype); + } + + if (acc_data->op) { + OBJ_RELEASE(acc_data->op); + } +} + +OBJ_CLASS_DECLARATION(osc_rdma_accumulate_data_t); +OBJ_CLASS_INSTANCE(osc_rdma_accumulate_data_t, opal_list_item_t, osc_rdma_accumulate_data_constructor, + osc_rdma_accumulate_data_destructor); /** * osc_rdma_pending_acc_t: @@ -375,11 +404,10 @@ static int osc_rdma_incomming_req_omplete (ompi_request_t *request) { ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data; /* we need to peer rank. get it from the pml request */ - mca_pml_base_request_t *pml_request = (mca_pml_base_request_t *) request; int rank = MPI_PROC_NULL; if (request->req_status.MPI_TAG & 0x01) { - rank = pml_request->req_peer; + rank = request->req_status.MPI_SOURCE; } mark_incoming_completion (module, rank); @@ -528,18 +556,19 @@ static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t * This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb * as the ctx parameter with accumulate_cb as the cb parameter. */ -static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, void *target, void *source, size_t source_len, +static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int peer, void *target, void *source, size_t source_len, ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op, - int request_count, struct osc_rdma_accumulate_data_t **acc_data_out) + int request_count, osc_rdma_accumulate_data_t **acc_data_out) { - struct osc_rdma_accumulate_data_t *acc_data; + osc_rdma_accumulate_data_t *acc_data; - acc_data = malloc (sizeof (*acc_data)); + acc_data = OBJ_NEW(osc_rdma_accumulate_data_t); if (OPAL_UNLIKELY(NULL == acc_data)) { return OMPI_ERR_OUT_OF_RESOURCE; } acc_data->module = module; + acc_data->peer = peer; acc_data->target = target; acc_data->source = source; acc_data->source_len = source_len; @@ -556,17 +585,6 @@ static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, void *t return OMPI_SUCCESS; } -static void osc_rdma_accumulate_free (struct osc_rdma_accumulate_data_t *acc_data) -{ - /* the source is always a temporary buffer */ - free (acc_data->source); - - OBJ_RELEASE(acc_data->datatype); - OBJ_RELEASE(acc_data->op); - - free (acc_data); -} - /** * @short Execute the accumulate once the request counter reaches 0. * @@ -579,30 +597,36 @@ static void osc_rdma_accumulate_free (struct osc_rdma_accumulate_data_t *acc_dat static int accumulate_cb (ompi_request_t *request) { struct osc_rdma_accumulate_data_t *acc_data = (struct osc_rdma_accumulate_data_t *) request->req_complete_cb_data; + ompi_osc_rdma_module_t *module = acc_data->module; + int rank = MPI_PROC_NULL; int ret = OMPI_SUCCESS; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "accumulate_cb, request_count = %d", acc_data->request_count)); - request->req_complete_cb_data = acc_data->module; - osc_rdma_incomming_req_omplete (request); - - --acc_data->request_count; - - if (0 != acc_data->request_count) { - /* more requests needed before the buffer can be accumulated */ - return OMPI_SUCCESS; + if (request->req_status.MPI_TAG & 0x01) { + rank = acc_data->peer; } - if (acc_data->source) { - ret = osc_rdma_accumulate_buffer (acc_data->target, acc_data->source, acc_data->source_len, - acc_data->proc, acc_data->count, acc_data->datatype, acc_data->op); + mark_incoming_completion (module, rank); + + OPAL_THREAD_LOCK(&module->lock); + if (0 == --acc_data->request_count) { + /* no more requests needed before the buffer can be accumulated */ + + if (acc_data->source) { + ret = osc_rdma_accumulate_buffer (acc_data->target, acc_data->source, acc_data->source_len, + acc_data->proc, acc_data->count, acc_data->datatype, acc_data->op); + } + + /* drop the accumulate lock */ + ompi_osc_rdma_accumulate_unlock (module); + + opal_list_append (&module->buffer_gc, &acc_data->super); } - /* drop the accumulate lock */ - ompi_osc_rdma_accumulate_unlock (acc_data->module); - - osc_rdma_accumulate_free (acc_data); + opal_list_append (&module->request_gc, (opal_list_item_t *) request); + OPAL_THREAD_UNLOCK(&module->lock); return ret; } @@ -752,7 +776,7 @@ static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int sou break; } - ret = osc_rdma_accumulate_allocate (module, target, buffer, buflen, proc, acc_header->count, + ret = osc_rdma_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count, datatype, op, 1, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { free (buffer); @@ -762,7 +786,7 @@ static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int sou ret = ompi_osc_rdma_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag, module->comm, NULL, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - osc_rdma_accumulate_free (acc_data); + OBJ_RELEASE(acc_data); } } while (0); @@ -804,7 +828,7 @@ static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source, assert (NULL != proc); do { - ret = osc_rdma_accumulate_allocate (module, target, data, data_len, proc, get_acc_header->count, + ret = osc_rdma_accumulate_allocate (module, source, target, data, data_len, proc, get_acc_header->count, datatype, op, 1, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -813,7 +837,7 @@ static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source, ret = ompi_osc_rdma_isend_w_cb (target, get_acc_header->count, datatype, source, get_acc_header->tag, module->comm, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - osc_rdma_accumulate_free (acc_data); + OBJ_RELEASE(acc_data); } } while (0); @@ -865,8 +889,8 @@ static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, break; } - ret = osc_rdma_accumulate_allocate (module, target, buffer, buflen, proc, get_acc_header->count, - datatype, op, 2, &acc_data); + ret = osc_rdma_accumulate_allocate (module, source, target, buffer, buflen, proc, get_acc_header->count, + datatype, op, 2, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; } @@ -874,7 +898,7 @@ static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, ret = ompi_osc_rdma_irecv_w_cb (buffer, get_acc_header->count, datatype, source, get_acc_header->tag, module->comm, &recv_request, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - osc_rdma_accumulate_free (acc_data); + OBJ_RELEASE(acc_data); break; } @@ -883,7 +907,7 @@ static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) { /* cancel the receive and free the accumulate data */ ompi_request_cancel (recv_request); - osc_rdma_accumulate_free (acc_data); + OBJ_RELEASE(acc_data); break; } } while (0); @@ -1417,7 +1441,8 @@ static int ompi_osc_rdma_callback (ompi_request_t *request) mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL); - osc_rdma_request_gc_clean (module); + osc_rdma_gc_clean (module); + opal_list_append (&module->request_gc, (opal_list_item_t *) request); ompi_osc_rdma_frag_start_receive (module);