diff --git a/ompi/mca/osc/rdma/osc_rdma.c b/ompi/mca/osc/rdma/osc_rdma.c index cc1afbb795..1cd276e455 100644 --- a/ompi/mca/osc/rdma/osc_rdma.c +++ b/ompi/mca/osc/rdma/osc_rdma.c @@ -85,10 +85,7 @@ ompi_osc_rdma_free(ompi_win_t *win) OBJ_DESTRUCT(&module->pending_acc); - osc_rdma_gc_clean (module); - assert (0 == opal_list_get_size (&module->request_gc)); - OBJ_DESTRUCT(&module->request_gc); - OBJ_DESTRUCT(&module->buffer_gc); + osc_rdma_gc_clean (); if (NULL != module->peers) { free(module->peers); diff --git a/ompi/mca/osc/rdma/osc_rdma.h b/ompi/mca/osc/rdma/osc_rdma.h index 65a0e378bc..d971d20497 100644 --- a/ompi/mca/osc/rdma/osc_rdma.h +++ b/ompi/mca/osc/rdma/osc_rdma.h @@ -71,6 +71,12 @@ struct ompi_osc_rdma_component_t { /** Is the progress function enabled? */ bool progress_enable; + + /** List of requests that need to be freed */ + opal_list_t request_gc; + + /** List of buffers that need to be freed */ + opal_list_t buffer_gc; }; typedef struct ompi_osc_rdma_component_t ompi_osc_rdma_component_t; @@ -191,9 +197,6 @@ struct ompi_osc_rdma_module_t { unsigned char *incoming_buffer; ompi_request_t *frag_request; - opal_list_t request_gc; - - opal_list_t buffer_gc; /* enforce accumulate semantics */ opal_atomic_lock_t accumulate_lock; @@ -558,25 +561,41 @@ static inline void osc_rdma_copy_for_send (void *target, size_t target_len, void * * @short Release finished PML requests and accumulate buffers. * - * @param[in] module - OSC RDMA module - * * @long This function exists because it is not possible to free a PML request * or buffer from a request completion callback. We instead put requests * and buffers on the module's garbage collection lists and release then * at a later time. */ -static inline void osc_rdma_gc_clean (ompi_osc_rdma_module_t *module) +static inline void osc_rdma_gc_clean (void) { ompi_request_t *request; opal_list_item_t *item; - while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) { + OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock); + + while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&mca_osc_rdma_component.request_gc))) { ompi_request_free (&request); } - while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) { + while (NULL != (item = opal_list_remove_first (&mca_osc_rdma_component.buffer_gc))) { OBJ_RELEASE(item); } + + OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock); +} + +static inline void osc_rdma_gc_add_request (ompi_request_t *request) +{ + OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock); + opal_list_append (&mca_osc_rdma_component.request_gc, (opal_list_item_t *) request); + OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock); +} + +static inline void osc_rdma_gc_add_buffer (opal_list_item_t *buffer) +{ + OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock); + opal_list_append (&mca_osc_rdma_component.buffer_gc, buffer); + OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock); } #define OSC_RDMA_FRAG_TAG 0x10000 diff --git a/ompi/mca/osc/rdma/osc_rdma_comm.c b/ompi/mca/osc/rdma/osc_rdma_comm.c index f44eff6067..fbf2dcba04 100644 --- a/ompi/mca/osc/rdma/osc_rdma_comm.c +++ b/ompi/mca/osc/rdma/osc_rdma_comm.c @@ -56,9 +56,19 @@ static int ompi_osc_rdma_req_comm_complete (ompi_request_t *request) OPAL_THREAD_UNLOCK(&ompi_request_lock); /* put this request on the garbage colletion list */ - OPAL_THREAD_LOCK(&module->lock); - opal_list_append (&module->request_gc, (opal_list_item_t *) request); - OPAL_THREAD_UNLOCK(&module->lock); + osc_rdma_gc_add_request (request); + + return OMPI_SUCCESS; +} + +static int ompi_osc_rdma_dt_send_complete (ompi_request_t *request) +{ + ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data; + + OBJ_RELEASE(datatype); + + /* put this request on the garbage colletion list */ + osc_rdma_gc_add_request (request); return OMPI_SUCCESS; } @@ -234,9 +244,10 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count, ompi_osc_rdma_frag_t *frag; ompi_osc_rdma_header_put_t *header; size_t ddt_len, payload_len, frag_len; + bool is_long_datatype = false; bool is_long_msg = false; const void *packed_ddt; - int tag, ret; + int tag = -1, ret; char *ptr; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -273,9 +284,16 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count, if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { frag_len = sizeof(ompi_osc_rdma_header_put_t) + ddt_len; ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); - if (OMPI_SUCCESS != ret) { - OPAL_THREAD_UNLOCK(&module->lock); - return OMPI_ERR_OUT_OF_RESOURCE; + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + /* allocate space for the header plus space to store ddt_len */ + frag_len = sizeof(ompi_osc_rdma_header_put_t) + 8; + ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + is_long_datatype = true; } is_long_msg = true; @@ -285,7 +303,8 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count, OPAL_THREAD_UNLOCK(&module->lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc rdma: put long protocol: %d", (int) is_long_msg)); + "osc rdma: put long protocol: %d, large datatype: %d", + (int) is_long_msg, (int) is_long_datatype)); header = (ompi_osc_rdma_header_put_t *) ptr; header->base.flags = 0; @@ -294,42 +313,63 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count, header->displacement = target_disp; ptr += sizeof(ompi_osc_rdma_header_put_t); - ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); - memcpy((unsigned char*) ptr, packed_ddt, ddt_len); - ptr += ddt_len; - - if (!is_long_msg) { - header->base.type = OMPI_OSC_RDMA_HDR_TYPE_PUT; - - osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count, - origin_dt); - - /* the user's buffer is no longer needed so mark the request as - * complete. */ - if (request) { - ompi_osc_rdma_request_complete (request, MPI_SUCCESS); + do { + ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; } - } else { - header->base.type = OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG; - header->tag = tag; + if (is_long_datatype) { + /* the datatype does not fit in an eager message. send it seperately */ + header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE; - /* increase the outgoing signal count */ - ompi_osc_signal_outgoing (module, target, 1); + OBJ_RETAIN(target_dt); - if (request) { - request->outstanding_requests = 1; - ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_dt, - target, tag, module->comm, ompi_osc_rdma_req_comm_complete, - request); + ret = ompi_osc_rdma_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target, + tag, module->comm, ompi_osc_rdma_dt_send_complete, + target_dt); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + + *((uint64_t *) ptr) = ddt_len; + ptr += 8; } else { - ret = ompi_osc_rdma_component_isend (module,origin_addr, origin_count, origin_dt, target, tag, - module->comm); + memcpy((unsigned char*) ptr, packed_ddt, ddt_len); + ptr += ddt_len; } - if (OMPI_SUCCESS != ret) goto cleanup; - } - cleanup: + if (!is_long_msg) { + header->base.type = OMPI_OSC_RDMA_HDR_TYPE_PUT; + + osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count, + origin_dt); + + /* the user's buffer is no longer needed so mark the request as + * complete. */ + if (request) { + ompi_osc_rdma_request_complete (request, MPI_SUCCESS); + } + } else { + header->base.type = OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG; + + header->tag = tag; + + /* increase the outgoing signal count */ + ompi_osc_signal_outgoing (module, target, 1); + + if (request) { + request->outstanding_requests = 1; + ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_dt, + target, tag, module->comm, ompi_osc_rdma_req_comm_complete, + request); + } else { + ret = ompi_osc_rdma_component_isend (module,origin_addr, origin_count, origin_dt, target, tag, + module->comm); + } + } + } while (0); + if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID; } @@ -372,11 +412,12 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count, int ret; ompi_osc_rdma_module_t *module = GET_MODULE(win); ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target); + bool is_long_datatype = false; + bool is_long_msg = false; ompi_osc_rdma_frag_t *frag; ompi_osc_rdma_header_acc_t *header; size_t ddt_len, payload_len, frag_len; char *ptr; - bool is_long_msg = false; const void *packed_ddt; int tag; @@ -416,11 +457,16 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count, frag_len = sizeof(ompi_osc_rdma_header_acc_t) + ddt_len; ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); if (OMPI_SUCCESS != ret) { - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "acc: out of resource error while trying to allocate a fragment")); - OPAL_THREAD_UNLOCK(&module->lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } + /* allocate space for the header plus space to store ddt_len */ + frag_len = sizeof(ompi_osc_rdma_header_put_t) + 8; + ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + is_long_datatype = true; + } is_long_msg = true; tag = get_tag (module); @@ -436,50 +482,71 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count, header->op = op->o_f_to_c_index; ptr += sizeof(ompi_osc_rdma_header_acc_t); - ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); - memcpy((unsigned char*) ptr, packed_ddt, ddt_len); - ptr += ddt_len; - - if (!is_long_msg) { - header->base.type = OMPI_OSC_RDMA_HDR_TYPE_ACC; - - osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, - origin_count, origin_dt); - - /* the user's buffer is no longer needed so mark the request as - * complete. */ - if (request) { - ompi_osc_rdma_request_complete (request, MPI_SUCCESS); + do { + ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; } - } else { - header->base.type = OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG; - header->tag = tag; + if (is_long_datatype) { + /* the datatype does not fit in an eager message. send it seperately */ + header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE; - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "acc: starting long accumulate with tag %d", tag)); + OBJ_RETAIN(target_dt); - /* increment the outgoing send count */ - ompi_osc_signal_outgoing (module, target, 1); + ret = ompi_osc_rdma_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target, + tag, module->comm, ompi_osc_rdma_dt_send_complete, + target_dt); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } - if (request) { - request->outstanding_requests = 1; - ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_dt, - target, tag, module->comm, ompi_osc_rdma_req_comm_complete, - request); + *((uint64_t *) ptr) = ddt_len; + ptr += 8; } else { - ret = ompi_osc_rdma_component_isend (module, origin_addr, origin_count, origin_dt, target, tag, - module->comm); + memcpy((unsigned char*) ptr, packed_ddt, ddt_len); + ptr += ddt_len; } - if (OMPI_SUCCESS != ret) { + if (!is_long_msg) { + header->base.type = OMPI_OSC_RDMA_HDR_TYPE_ACC; + + osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, + origin_count, origin_dt); + + /* the user's buffer is no longer needed so mark the request as + * complete. */ + if (request) { + ompi_osc_rdma_request_complete (request, MPI_SUCCESS); + } + } else { + header->base.type = OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG; + + header->tag = tag; + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "acc: send failed with eror %d", ret)); - } - } + "acc: starting long accumulate with tag %d", tag)); - /* mark the fragment as valid */ - if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { + /* increment the outgoing send count */ + ompi_osc_signal_outgoing (module, target, 1); + + if (request) { + request->outstanding_requests = 1; + ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_dt, + target, tag, module->comm, ompi_osc_rdma_req_comm_complete, + request); + } else { + ret = ompi_osc_rdma_component_isend (module, origin_addr, origin_count, origin_dt, target, tag, + module->comm); + } + } + } while (0); + + if (OMPI_SUCCESS != ret) { + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, + "acc: failed with eror %d", ret)); + } else { + /* mark the fragment as valid */ header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID; } @@ -662,6 +729,7 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou { int ret, tag; ompi_osc_rdma_module_t *module = GET_MODULE(win); + bool is_long_datatype = false; ompi_osc_rdma_frag_t *frag; ompi_osc_rdma_header_get_t *header; size_t ddt_len, frag_len; @@ -713,8 +781,15 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou frag_len = sizeof(ompi_osc_rdma_header_get_t) + ddt_len; ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); if (OMPI_SUCCESS != ret) { - OPAL_THREAD_UNLOCK(&module->lock); - return OMPI_ERR_OUT_OF_RESOURCE; + /* allocate space for the header plus space to store ddt_len */ + frag_len = sizeof(ompi_osc_rdma_header_put_t) + 8; + ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + is_long_datatype = true; } tag = get_tag (module); @@ -733,20 +808,43 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou header->tag = tag; ptr += sizeof(ompi_osc_rdma_header_get_t); - ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); - memcpy((unsigned char*) ptr, packed_ddt, ddt_len); - ptr += ddt_len; + do { + ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + + if (is_long_datatype) { + /* the datatype does not fit in an eager message. send it seperately */ + header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE; + + OBJ_RETAIN(target_dt); + + ret = ompi_osc_rdma_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target, + tag, module->comm, ompi_osc_rdma_dt_send_complete, + target_dt); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + + *((uint64_t *) ptr) = ddt_len; + ptr += 8; + } else { + memcpy((unsigned char*) ptr, packed_ddt, ddt_len); + ptr += ddt_len; + } + + /* TODO -- store the request somewhere so we can cancel it on error */ + rdma_request->outstanding_requests = 1; + ret = ompi_osc_rdma_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag, + module->comm, NULL, ompi_osc_rdma_req_comm_complete, rdma_request); + } while (0); if (OMPI_SUCCESS == ret) { header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID; *request = &rdma_request->super; } - /* TODO -- store the request somewhere so we can cancel it on error */ - rdma_request->outstanding_requests = 1; - ret = ompi_osc_rdma_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag, - module->comm, NULL, ompi_osc_rdma_req_comm_complete, rdma_request); - OPAL_THREAD_LOCK(&module->lock); ret = ompi_osc_rdma_frag_finish(module, frag); @@ -837,11 +935,12 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count, int ret; ompi_osc_rdma_module_t *module = GET_MODULE(win); ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target_rank); + bool is_long_datatype = false; + bool is_long_msg = false; ompi_osc_rdma_frag_t *frag; ompi_osc_rdma_header_get_acc_t *header; size_t ddt_len, payload_len, frag_len; char *ptr; - bool is_long_msg = false; const void *packed_ddt; int tag; ompi_osc_rdma_request_t *rdma_request; @@ -901,9 +1000,17 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count, frag_len = sizeof(*header) + ddt_len; ret = ompi_osc_rdma_frag_alloc(module, target_rank, frag_len, &frag, &ptr); if (OMPI_SUCCESS != ret) { - OPAL_THREAD_UNLOCK(&module->lock); - return OMPI_ERR_OUT_OF_RESOURCE; + /* allocate space for the header plus space to store ddt_len */ + frag_len = sizeof(ompi_osc_rdma_header_put_t) + 8; + ret = ompi_osc_rdma_frag_alloc(module, target_rank, frag_len, &frag, &ptr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + is_long_datatype = true; } + is_long_msg = true; } @@ -927,32 +1034,53 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count, header->tag = tag; ptr = (char *)(header + 1); - ret = ompi_datatype_get_pack_description(target_datatype, &packed_ddt); - memcpy((unsigned char*) ptr, packed_ddt, ddt_len); - ptr += ddt_len; - - ret = ompi_osc_rdma_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag, - module->comm, NULL, ompi_osc_rdma_req_comm_complete, rdma_request); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - goto cleanup; - } - - if (!is_long_msg) { - header->base.type = OMPI_OSC_RDMA_HDR_TYPE_GET_ACC; - - if (&ompi_mpi_op_no_op.op != op) { - osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count, - origin_datatype); + do { + ret = ompi_datatype_get_pack_description(target_datatype, &packed_ddt); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; } - } else { - header->base.type = OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG; - ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_datatype, target_rank, - tag, module->comm, ompi_osc_rdma_req_comm_complete, rdma_request); - if (OMPI_SUCCESS != ret) goto cleanup; - } + if (is_long_datatype) { + /* the datatype does not fit in an eager message. send it seperately */ + header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE; + + OBJ_RETAIN(target_datatype); + + ret = ompi_osc_rdma_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target_rank, + tag, module->comm, ompi_osc_rdma_dt_send_complete, + target_datatype); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + + *((uint64_t *) ptr) = ddt_len; + ptr += 8; + } else { + memcpy((unsigned char*) ptr, packed_ddt, ddt_len); + ptr += ddt_len; + } + + ret = ompi_osc_rdma_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag, + module->comm, NULL, ompi_osc_rdma_req_comm_complete, rdma_request); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + + if (!is_long_msg) { + header->base.type = OMPI_OSC_RDMA_HDR_TYPE_GET_ACC; + + if (&ompi_mpi_op_no_op.op != op) { + osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count, + origin_datatype); + } + } else { + header->base.type = OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG; + + ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_datatype, target_rank, + tag, module->comm, ompi_osc_rdma_req_comm_complete, rdma_request); + } + } while (0); - cleanup: if (OMPI_SUCCESS == ret) { header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID; *request = (ompi_request_t *) rdma_request; diff --git a/ompi/mca/osc/rdma/osc_rdma_component.c b/ompi/mca/osc/rdma/osc_rdma_component.c index 0ee5b66792..1f989813ae 100644 --- a/ompi/mca/osc/rdma/osc_rdma_component.c +++ b/ompi/mca/osc/rdma/osc_rdma_component.c @@ -243,6 +243,8 @@ component_init(bool enable_progress_threads, OBJ_CONSTRUCT(&mca_osc_rdma_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_osc_rdma_component.pending_operations, opal_list_t); + OBJ_CONSTRUCT(&mca_osc_rdma_component.request_gc, opal_list_t); + OBJ_CONSTRUCT(&mca_osc_rdma_component.buffer_gc, opal_list_t); OBJ_CONSTRUCT(&mca_osc_rdma_component.modules, opal_hash_table_t); @@ -300,6 +302,8 @@ component_finalize(void) OBJ_DESTRUCT(&mca_osc_rdma_component.lock); OBJ_DESTRUCT(&mca_osc_rdma_component.requests); OBJ_DESTRUCT(&mca_osc_rdma_component.pending_operations); + OBJ_DESTRUCT(&mca_osc_rdma_component.request_gc); + OBJ_DESTRUCT(&mca_osc_rdma_component.buffer_gc); return OMPI_SUCCESS; } @@ -351,8 +355,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit OBJ_CONSTRUCT(&module->queued_frags, opal_list_t); OBJ_CONSTRUCT(&module->locks_pending, opal_list_t); OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t); - OBJ_CONSTRUCT(&module->request_gc, opal_list_t); - OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); /* options */ diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.c b/ompi/mca/osc/rdma/osc_rdma_data_move.c index 60a1b0c674..b4950cd190 100644 --- a/ompi/mca/osc/rdma/osc_rdma_data_move.c +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.c @@ -46,7 +46,7 @@ /** * struct osc_rdma_accumulate_data_t: * - * @short Data associated with an in-progress accumulation operation. + * @brief Data associated with an in-progress accumulation operation. */ struct osc_rdma_accumulate_data_t { opal_list_item_t super; @@ -93,10 +93,10 @@ OBJ_CLASS_INSTANCE(osc_rdma_accumulate_data_t, opal_list_item_t, osc_rdma_accumu /** * osc_rdma_pending_acc_t: * - * @short Keep track of accumulate and cswap operations that are + * @brief Keep track of accumulate and cswap operations that are * waiting on the accumulate lock. * - * @long Since accumulate operations may take several steps to + * Since accumulate operations may take several steps to * complete we need to lock the accumulate lock until the operation * is complete. While the lock is held it is possible that additional * accumulate operations will arrive. This structure keep track of @@ -134,10 +134,49 @@ OBJ_CLASS_INSTANCE(osc_rdma_pending_acc_t, opal_list_item_t, osc_rdma_pending_acc_constructor, osc_rdma_pending_acc_destructor); /* end ompi_osc_rdma_pending_acc_t class */ +/** + * @brief Class for large datatype descriptions + * + * This class is used to keep track of buffers for large datatype desctiotions + * (datatypes that do not fit in an eager fragment). The structure is designed + * to take advantage of the small datatype description code path. + */ +struct ompi_osc_rdma_ddt_buffer_t { + /** allows this class to be stored in the buffer garbage collection + * list */ + opal_list_item_t super; + + /** OSC RDMA module */ + ompi_osc_rdma_module_t *module; + /** source of this header */ + int source; + /** header + datatype data */ + ompi_osc_rdma_header_t *header; +}; +typedef struct ompi_osc_rdma_ddt_buffer_t ompi_osc_rdma_ddt_buffer_t; + +static void ompi_osc_rdma_ddt_buffer_constructor (ompi_osc_rdma_ddt_buffer_t *ddt_buffer) +{ + ddt_buffer->header = NULL; +} + +static void ompi_osc_rdma_ddt_buffer_destructor (ompi_osc_rdma_ddt_buffer_t *ddt_buffer) +{ + if (ddt_buffer->header) { + free (ddt_buffer->header); + ddt_buffer->header = NULL; + } +} + +OBJ_CLASS_DECLARATION(ompi_osc_rdma_ddt_buffer_t); +OBJ_CLASS_INSTANCE(ompi_osc_rdma_ddt_buffer_t, opal_list_item_t, ompi_osc_rdma_ddt_buffer_constructor, + ompi_osc_rdma_ddt_buffer_destructor); +/* end ompi_osc_rdma_ddt_buffer_t class */ + /** * datatype_buffer_length: * - * @short Determine the buffer size needed to hold count elements of datatype. + * @brief Determine the buffer size needed to hold count elements of datatype. * * @param[in] datatype - Element type * @param[in] count - Element count @@ -162,7 +201,7 @@ static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count) /** * ompi_osc_rdma_control_send: * - * @short send a control message as part of a fragment + * @brief send a control message as part of a fragment * * @param[in] module - OSC RDMA module * @param[in] target - Target peer's rank @@ -171,7 +210,7 @@ static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count) * * @returns error OMPI error code or OMPI_SUCCESS * - * @long "send" a control messages. Adds it to the active fragment, so the + * "send" a control messages. Adds it to the active fragment, so the * caller will still need to explicitly flush (either to everyone or * to a target) before this is sent. */ @@ -210,10 +249,8 @@ static int ompi_osc_rdma_control_send_unbuffered_cb (ompi_request_t *request) /* free the temporary buffer */ free (ctx); - /* put this request on the garbage collection list */ - OPAL_THREAD_LOCK(&module->lock); - opal_list_append (&module->request_gc, (opal_list_item_t *) request); - OPAL_THREAD_UNLOCK(&module->lock); + /* put this request on the garbage colletion list */ + osc_rdma_gc_add_request (request); return OMPI_SUCCESS; } @@ -221,14 +258,14 @@ static int ompi_osc_rdma_control_send_unbuffered_cb (ompi_request_t *request) /** * ompi_osc_rdma_control_send_unbuffered: * - * @short Send an unbuffered control message to a peer. + * @brief Send an unbuffered control message to a peer. * * @param[in] module - OSC RDMA module * @param[in] target - Target rank * @param[in] data - Data to send * @param[in] len - Length of data * - * @long Directly send a control message. This does not allocate a + * Directly send a control message. This does not allocate a * fragment, so should only be used when sending other messages would * be erroneous (such as complete messages, when there may be queued * transactions from an overlapping post that has already heard back @@ -265,7 +302,7 @@ int ompi_osc_rdma_control_send_unbuffered(ompi_osc_rdma_module_t *module, /** * datatype_create: * - * @short Utility function that creates a new datatype from a packed + * @brief Utility function that creates a new datatype from a packed * description. * * @param[in] module - OSC RDMA module @@ -317,7 +354,7 @@ static inline int datatype_create (ompi_osc_rdma_module_t *module, int peer, omp * @param[in] source - Message source * @param[in] put_header - Message header + data * - * @long Process a put message and copy the message data to the specified + * Process a put message and copy the message data to the specified * memory region. Note, this function does not handle any bounds * checking at the moment. */ @@ -391,12 +428,12 @@ static inline int process_put_long(ompi_osc_rdma_module_t* module, int source, /** * osc_rdma_incoming_req_complete: * - * @short Completion callback for a receive associate with an access + * @brief Completion callback for a receive associate with an access * epoch. * * @param[in] request - PML request with an OSC RMDA module as the callback data. * - * @long This function is called when a send or recieve associated with an + * This function is called when a send or recieve associated with an * access epoch completes. When fired this function will increment the * passive or active incoming count. */ @@ -412,9 +449,7 @@ static int osc_rdma_incoming_req_complete (ompi_request_t *request) mark_incoming_completion (module, rank); /* put this request on the garbage colletion list */ - OPAL_THREAD_LOCK(&module->lock); - opal_list_append (&module->request_gc, (opal_list_item_t *) request); - OPAL_THREAD_UNLOCK(&module->lock); + osc_rdma_gc_add_request (request); return OMPI_SUCCESS; } @@ -437,15 +472,13 @@ static int osc_rdma_get_post_send_cb (ompi_request_t *request) mark_incoming_completion (module, rank); /* put this request on the garbage colletion list */ - OPAL_THREAD_LOCK(&module->lock); - opal_list_append (&module->request_gc, (opal_list_item_t *) request); - OPAL_THREAD_UNLOCK(&module->lock); + osc_rdma_gc_add_request (request); return OMPI_SUCCESS; } /** - * @short Post a send to match the remote receive for a get operation. + * @brief Post a send to match the remote receive for a get operation. * * @param[in] module - OSC RDMA module * @param[in] source - Source buffer @@ -454,7 +487,7 @@ static int osc_rdma_get_post_send_cb (ompi_request_t *request) * @param[in] peer - Remote process that has the receive posted * @param[in] tag - Tag for the send * - * @long This function posts a send to match the receive posted as part + * This function posts a send to match the receive posted as part * of a get operation. When this send is complete the get is considered * complete at the target (this process). */ @@ -480,7 +513,7 @@ static int osc_rdma_get_post_send (ompi_osc_rdma_module_t *module, void *source, /** * process_get: * - * @short Process a get message from a remote peer + * @brief Process a get message from a remote peer * * @param[in] module - OSC RDMA module * @param[in] target - Peer process @@ -516,7 +549,7 @@ static inline int process_get (ompi_osc_rdma_module_t* module, int target, /** * osc_rdma_accumulate_buffer: * - * @short Accumulate data into the target buffer. + * @brief Accumulate data into the target buffer. * * @param[in] target - Target buffer * @param[in] source - Source buffer @@ -575,7 +608,7 @@ static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t } /** - * @short Create an accumulate data object. + * @brief Create an accumulate data object. * * @param[in] module - RDMA OSC module * @param[in] target - Target for the accumulation @@ -588,9 +621,9 @@ static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t * @param[in] request_count - Number of prerequisite requests * @param[out] acc_data_out - New accumulation data * - * @long This function is used to create a copy of the data needed to perform an accumulation. - * This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb - * as the ctx parameter with accumulate_cb as the cb parameter. + * This function is used to create a copy of the data needed to perform an accumulation. + * This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb + * as the ctx parameter with accumulate_cb as the cb parameter. */ static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int peer, void *target, void *source, size_t source_len, ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op, @@ -622,7 +655,7 @@ static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int pee } /** - * @short Execute the accumulate once the request counter reaches 0. + * @brief Execute the accumulate once the request counter reaches 0. * * @param[in] request - request * @@ -658,10 +691,12 @@ static int accumulate_cb (ompi_request_t *request) /* drop the accumulate lock */ ompi_osc_rdma_accumulate_unlock (module); - opal_list_append (&module->buffer_gc, &acc_data->super); + osc_rdma_gc_add_buffer (&acc_data->super); } - opal_list_append (&module->request_gc, (opal_list_item_t *) request); + /* put this request on the garbage colletion list */ + osc_rdma_gc_add_request (request); + OPAL_THREAD_UNLOCK(&module->lock); return ret; @@ -731,7 +766,7 @@ static int replace_cb (ompi_request_t *request) /** * ompi_osc_rdma_acc_start: * - * @short Start an accumulate with data operation. + * @brief Start an accumulate with data operation. * * @param[in] module - OSC RDMA module * @param[in] source - Source rank @@ -768,7 +803,7 @@ static int ompi_osc_rdma_acc_start (ompi_osc_rdma_module_t *module, int source, /** * ompi_osc_rdma_acc_start: * - * @short Start a long accumulate operation. + * @brief Start a long accumulate operation. * * @param[in] module - OSC RDMA module * @param[in] source - Source rank @@ -838,7 +873,7 @@ static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int sou /** * ompi_osc_rdma_gacc_start: * - * @short Start a accumulate with data + get operation. + * @brief Start a accumulate with data + get operation. * * @param[in] module - OSC RDMA module * @param[in] source - Source rank @@ -889,7 +924,7 @@ static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source, /** * ompi_osc_rdma_gacc_long_start: * - * @short Start a long accumulate + get operation. + * @brief Start a long accumulate + get operation. * * @param[in] module - OSC RDMA module * @param[in] source - Source rank @@ -960,7 +995,7 @@ static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, /** * ompi_osc_rdma_cswap_start: * - * @short Start a compare and swap operation + * @brief Start a compare and swap operation * * @param[in] module - OSC RDMA module * @param[in] source - Source rank @@ -1015,12 +1050,12 @@ static int ompi_osc_rdma_cswap_start (ompi_osc_rdma_module_t *module, int source /** * ompi_osc_rdma_progress_pending_acc: * - * @short Progress one pending accumulation or compare and swap operation. + * @brief Progress one pending accumulation or compare and swap operation. * * @param[in] module - OSC RDMA module * - * @long If the accumulation lock can be aquired progress one pending - * accumulate or compare and swap operation. + * If the accumulation lock can be aquired progress one pending + * accumulate or compare and swap operation. */ int ompi_osc_rdma_progress_pending_acc (ompi_osc_rdma_module_t *module) { @@ -1342,6 +1377,117 @@ static inline int process_unlock (ompi_osc_rdma_module_t *module, int source, return sizeof (*unlock_header); } +static int process_large_datatype_request_cb (ompi_request_t *request) +{ + ompi_osc_rdma_ddt_buffer_t *ddt_buffer = (ompi_osc_rdma_ddt_buffer_t *) request->req_complete_cb_data; + ompi_osc_rdma_module_t *module = ddt_buffer->module; + ompi_osc_rdma_header_t *header = ddt_buffer->header; + int source = ddt_buffer->source; + + /* process the request */ + switch (header->base.type) { + case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG: + (void) process_put_long (module, source, &header->put); + break; + case OMPI_OSC_RDMA_HDR_TYPE_GET: + (void) process_get (module, source, &header->get); + break; + case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG: + (void) process_acc_long (module, source, &header->acc); + break; + case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG: + (void) process_get_acc_long (module, source, &header->get_acc); + break; + default: + /* developer error */ + assert (0); + return OMPI_ERROR; + } + + /* put this request on the garbage colletion list */ + osc_rdma_gc_add_request (request); + + /* free the datatype buffer */ + osc_rdma_gc_add_buffer (&ddt_buffer->super); + + return OMPI_SUCCESS; +} + +/** + * @short process a request with a large datatype + * + * @param[in] module - OSC RDMA module + * @param[in] source - header source + * @param[in] header - header to process + * + * It is possible to construct datatypes whos description is too large + * to fit in an OSC RDMA fragment. In this case the remote side posts + * a send of the datatype description. This function posts the matching + * receive and processes the header on completion. + */ +static int process_large_datatype_request (ompi_osc_rdma_module_t *module, int source, ompi_osc_rdma_header_t *header) +{ + ompi_osc_rdma_ddt_buffer_t *ddt_buffer; + int header_len, tag, ret; + uint64_t ddt_len; + + /* determine the header size and receive tag */ + switch (header->base.type) { + case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG: + header_len = sizeof (header->put); + tag = header->put.tag; + break; + case OMPI_OSC_RDMA_HDR_TYPE_GET: + header_len = sizeof (header->get); + tag = header->get.tag; + break; + case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG: + header_len = sizeof (header->acc); + tag = header->acc.tag; + break; + case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG: + header_len = sizeof (header->get_acc); + tag = header->get_acc.tag; + break; + default: + /* developer error */ + opal_output (0, "Unsupported header/flag combination"); + return OMPI_ERROR; + } + + ddt_len = *((uint64_t *)((uintptr_t) header + header_len)); + + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, + "process_large_datatype_request: processing fragment with type %d. ddt_len %lu", + header->base.type, (unsigned long) ddt_len)); + + ddt_buffer = OBJ_NEW(ompi_osc_rdma_ddt_buffer_t); + if (OPAL_UNLIKELY(NULL == ddt_buffer)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + ddt_buffer->module = module; + ddt_buffer->source = source; + + ddt_buffer->header = malloc (ddt_len + header_len); + if (OPAL_UNLIKELY(NULL == ddt_buffer->header)) { + OBJ_RELEASE(ddt_buffer); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + memcpy (ddt_buffer->header, header, header_len); + + ret = ompi_osc_rdma_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len), + ddt_len, MPI_BYTE, source, tag, module->comm, NULL, + process_large_datatype_request_cb, ddt_buffer); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + OBJ_RELEASE(ddt_buffer); + return ret; + } + + return header_len + 8; +} + /* * Do all the data movement associated with a fragment */ @@ -1359,62 +1505,68 @@ static inline int process_frag (ompi_osc_rdma_module_t *module, for (int i = 0 ; i < frag->num_ops ; ++i) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc rdma: process_frag: type 0x%x. offset = %u", header->base.type, - (unsigned) ((uintptr_t)header - (uintptr_t)frag))); + "osc rdma: process_frag: type 0x%x. flag 0x%x. offset %u", + header->base.type, (unsigned) ((uintptr_t)header - (uintptr_t)frag), + header->base.flags)); - switch (header->base.type) { - case OMPI_OSC_RDMA_HDR_TYPE_PUT: - ret = process_put(module, frag->source, &header->put); - break; - case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG: - ret = process_put_long(module, frag->source, &header->put); - break; + if (OPAL_LIKELY(!(header->base.flags & OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE))) { + switch (header->base.type) { + case OMPI_OSC_RDMA_HDR_TYPE_PUT: + ret = process_put(module, frag->source, &header->put); + break; + case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG: + ret = process_put_long(module, frag->source, &header->put); + break; - case OMPI_OSC_RDMA_HDR_TYPE_ACC: - ret = process_acc(module, frag->source, &header->acc); - break; - case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG: - ret = process_acc_long (module, frag->source, &header->acc); - break; + case OMPI_OSC_RDMA_HDR_TYPE_ACC: + ret = process_acc(module, frag->source, &header->acc); + break; + case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG: + ret = process_acc_long (module, frag->source, &header->acc); + break; - case OMPI_OSC_RDMA_HDR_TYPE_LOCK_REQ: - ret = ompi_osc_rdma_process_lock(module, frag->source, &header->lock); - if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { - ret = sizeof (header->lock); + case OMPI_OSC_RDMA_HDR_TYPE_LOCK_REQ: + ret = ompi_osc_rdma_process_lock(module, frag->source, &header->lock); + if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { + ret = sizeof (header->lock); + } + break; + case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_REQ: + ret = process_unlock(module, frag->source, &header->unlock); + break; + + case OMPI_OSC_RDMA_HDR_TYPE_GET: + ret = process_get (module, frag->source, &header->get); + break; + + case OMPI_OSC_RDMA_HDR_TYPE_CSWAP: + ret = process_cswap (module, frag->source, &header->cswap); + break; + + case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC: + ret = process_get_acc (module, frag->source, &header->get_acc); + break; + + case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG: + ret = process_get_acc_long (module, frag->source, &header->get_acc); + break; + + case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_REQ: + ret = process_flush (module, frag->source, &header->flush); + break; + + case OMPI_OSC_RDMA_HDR_TYPE_COMPLETE: + ret = process_complete (module, frag->source, &header->complete); + break; + + default: + opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type); + abort(); /* FIX ME */ } - break; - case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_REQ: - ret = process_unlock(module, frag->source, &header->unlock); - - break; - case OMPI_OSC_RDMA_HDR_TYPE_GET: - ret = process_get (module, frag->source, &header->get); - break; - - case OMPI_OSC_RDMA_HDR_TYPE_CSWAP: - ret = process_cswap (module, frag->source, &header->cswap); - break; - - case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC: - ret = process_get_acc (module, frag->source, &header->get_acc); - break; - - case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG: - ret = process_get_acc_long (module, frag->source, &header->get_acc); - break; - - case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_REQ: - ret = process_flush (module, frag->source, &header->flush); - break; - - case OMPI_OSC_RDMA_HDR_TYPE_COMPLETE: - ret = process_complete (module, frag->source, &header->complete); - break; - - default: - opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type); - abort(); /* FIX ME */ + } else { + ret = process_large_datatype_request (module, frag->source, header); } + if (ret <= 0) { opal_output(0, "Error processing fragment: %d", ret); abort(); /* FIX ME */ @@ -1428,7 +1580,6 @@ static inline int process_frag (ompi_osc_rdma_module_t *module, return OMPI_SUCCESS; } - /* dispatch for callback on message completion */ static int ompi_osc_rdma_callback (ompi_request_t *request) { @@ -1477,9 +1628,10 @@ static int ompi_osc_rdma_callback (ompi_request_t *request) mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL); - osc_rdma_gc_clean (module); + osc_rdma_gc_clean (); - opal_list_append (&module->request_gc, (opal_list_item_t *) request); + /* put this request on the garbage colletion list */ + osc_rdma_gc_add_request (request); ompi_osc_rdma_frag_start_receive (module); OPAL_THREAD_UNLOCK(&module->lock); @@ -1520,9 +1672,7 @@ isend_completion_cb(ompi_request_t *request) mark_outgoing_completion(module); /* put this request on the garbage colletion list */ - OPAL_THREAD_LOCK(&module->lock); - opal_list_append (&module->request_gc, (opal_list_item_t *) request); - OPAL_THREAD_UNLOCK(&module->lock); + osc_rdma_gc_add_request (request); return OMPI_SUCCESS; } diff --git a/ompi/mca/osc/rdma/osc_rdma_frag.c b/ompi/mca/osc/rdma/osc_rdma_frag.c index d46471d009..f24c1bd83d 100644 --- a/ompi/mca/osc/rdma/osc_rdma_frag.c +++ b/ompi/mca/osc/rdma/osc_rdma_frag.c @@ -47,8 +47,9 @@ static int frag_send_cb (ompi_request_t *request) mark_outgoing_completion(module); OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.frags, &frag->super); + /* put this request on the garbage colletion list */ - opal_list_append (&module->request_gc, (opal_list_item_t *) request); + osc_rdma_gc_add_request (request); return OMPI_SUCCESS; } diff --git a/ompi/mca/osc/rdma/osc_rdma_header.h b/ompi/mca/osc/rdma/osc_rdma_header.h index 33220f7a60..056b8d4657 100644 --- a/ompi/mca/osc/rdma/osc_rdma_header.h +++ b/ompi/mca/osc/rdma/osc_rdma_header.h @@ -54,6 +54,7 @@ typedef enum ompi_osc_rdma_hdr_type_t ompi_osc_rdma_hdr_type_t; #define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x01 #define OMPI_OSC_RDMA_HDR_FLAG_VALID 0x02 #define OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET 0x04 +#define OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE 0x08 struct ompi_osc_rdma_header_base_t { /** fragment type. 8 bits */