diff --git a/ompi/mca/osc/rdma/osc_rdma.c b/ompi/mca/osc/rdma/osc_rdma.c index 7fd313028a..f8ea6eaf87 100644 --- a/ompi/mca/osc/rdma/osc_rdma.c +++ b/ompi/mca/osc/rdma/osc_rdma.c @@ -89,6 +89,12 @@ ompi_osc_rdma_free(ompi_win_t *win) OBJ_DESTRUCT(&module->pending_acc); + while (NULL != (item = opal_list_remove_first (&module->pending_posts))) { + OBJ_RELEASE(item); + } + + OBJ_DESTRUCT(&module->pending_posts); + osc_rdma_gc_clean (); if (NULL != module->peers) { diff --git a/ompi/mca/osc/rdma/osc_rdma.h b/ompi/mca/osc/rdma/osc_rdma.h index 472c70426c..0f91b4141d 100644 --- a/ompi/mca/osc/rdma/osc_rdma.h +++ b/ompi/mca/osc/rdma/osc_rdma.h @@ -201,6 +201,10 @@ struct ompi_osc_rdma_module_t { /* enforce accumulate semantics */ opal_atomic_lock_t accumulate_lock; opal_list_t pending_acc; + + /* enforce pscw matching */ + /** list of unmatched post messages */ + opal_list_t pending_posts; }; typedef struct ompi_osc_rdma_module_t ompi_osc_rdma_module_t; OMPI_MODULE_DECLSPEC extern ompi_osc_rdma_component_t mca_osc_rdma_component; @@ -328,7 +332,7 @@ int ompi_osc_rdma_rget_accumulate(void *origin_addr, int ompi_osc_rdma_fence(int assert, struct ompi_win_t *win); /* received a post message */ -int osc_rdma_incoming_post (ompi_osc_rdma_module_t *module); +int osc_rdma_incoming_post (ompi_osc_rdma_module_t *module, int source); int ompi_osc_rdma_start(struct ompi_group_t *group, int assert, diff --git a/ompi/mca/osc/rdma/osc_rdma_active_target.c b/ompi/mca/osc/rdma/osc_rdma_active_target.c index b31141563d..2b0f34121f 100644 --- a/ompi/mca/osc/rdma/osc_rdma_active_target.c +++ b/ompi/mca/osc/rdma/osc_rdma_active_target.c @@ -32,6 +32,21 @@ #include "ompi/communicator/communicator.h" #include "ompi/mca/osc/base/base.h" +/** + * ompi_osc_rdma_pending_post_t: + * + * Describes a post operation that was encountered outside it's + * matching start operation. + */ +struct ompi_osc_rdma_pending_post_t { + opal_list_item_t super; + int rank; +}; +typedef struct ompi_osc_rdma_pending_post_t ompi_osc_rdma_pending_post_t; +OBJ_CLASS_DECLARATION(ompi_osc_rdma_pending_post_t); + +OBJ_CLASS_INSTANCE(ompi_osc_rdma_pending_post_t, opal_list_item_t, NULL, NULL); + static int* get_comm_ranks(ompi_osc_rdma_module_t *module, ompi_group_t *sub_group) @@ -153,6 +168,7 @@ ompi_osc_rdma_start(ompi_group_t *group, ompi_win_t *win) { ompi_osc_rdma_module_t *module = GET_MODULE(win); + ompi_osc_rdma_pending_post_t *pending_post, *next; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_rdma_start entering...")); @@ -171,6 +187,22 @@ ompi_osc_rdma_start(ompi_group_t *group, module->sc_group = group; + OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_rdma_pending_post_t) { + ompi_proc_t *pending_proc = ompi_comm_peer_lookup (module->comm, pending_post->rank); + int group_size = ompi_group_size (module->sc_group); + + for (int i = 0 ; i < group_size ; ++i) { + ompi_proc_t *group_proc = ompi_group_peer_lookup (module->sc_group, i); + + if (group_proc == pending_proc) { + ++module->num_post_msgs; + opal_list_remove_item (&module->pending_posts, &pending_post->super); + OBJ_RELEASE(pending_post); + break; + } + } + } + /* disable eager sends until we've receved the proper number of post messages, at which time we know all our peers are ready to receive messages. */ @@ -434,3 +466,46 @@ ompi_osc_rdma_test(ompi_win_t *win, return ret; } +int osc_rdma_incoming_post (ompi_osc_rdma_module_t *module, int source) +{ + ompi_proc_t *source_proc = ompi_comm_peer_lookup (module->comm, source); + bool matched = false; + + OPAL_THREAD_LOCK(&module->lock); + + if (module->sc_group) { + const int group_size = ompi_group_size (module->sc_group); + + for (int i = 0 ; i < group_size ; ++i) { + ompi_proc_t *group_proc = ompi_group_peer_lookup (module->sc_group, i); + + if (group_proc == source_proc) { + matched = true; + break; + } + } + } + + if (!matched) { + ompi_osc_rdma_pending_post_t *pending_post = OBJ_NEW(ompi_osc_rdma_pending_post_t); + + pending_post->rank = source; + + opal_list_append (&module->pending_posts, &pending_post->super); + + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_SUCCESS; + } + + module->num_post_msgs++; + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "received post message. num_post_msgs = %d", module->num_post_msgs)); + + if (0 == module->num_post_msgs) { + module->active_eager_send_active = true; + } + opal_condition_broadcast (&module->cond); + OPAL_THREAD_UNLOCK(&module->lock); + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/osc/rdma/osc_rdma_component.c b/ompi/mca/osc/rdma/osc_rdma_component.c index 42995338a5..a817cb67f4 100644 --- a/ompi/mca/osc/rdma/osc_rdma_component.c +++ b/ompi/mca/osc/rdma/osc_rdma_component.c @@ -356,6 +356,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit OBJ_CONSTRUCT(&module->locks_pending, opal_list_t); OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); + OBJ_CONSTRUCT(&module->pending_posts, opal_list_t); /* options */ /* FIX ME: should actually check this value... */ diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.c b/ompi/mca/osc/rdma/osc_rdma_data_move.c index 5123558905..f4e8d21399 100644 --- a/ompi/mca/osc/rdma/osc_rdma_data_move.c +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.c @@ -1654,7 +1654,7 @@ static int ompi_osc_rdma_callback (ompi_request_t *request) process_frag(module, (ompi_osc_rdma_frag_header_t *) base_header); break; case OMPI_OSC_RDMA_HDR_TYPE_POST: - (void) osc_rdma_incoming_post (module); + (void) osc_rdma_incoming_post (module, source); break; case OMPI_OSC_RDMA_HDR_TYPE_LOCK_ACK: ompi_osc_rdma_process_lock_ack(module, (ompi_osc_rdma_header_lock_ack_t *) base_header); diff --git a/ompi/mca/osc/rdma/osc_rdma_frag.c b/ompi/mca/osc/rdma/osc_rdma_frag.c index f24c1bd83d..09ef067d09 100644 --- a/ompi/mca/osc/rdma/osc_rdma_frag.c +++ b/ompi/mca/osc/rdma/osc_rdma_frag.c @@ -196,19 +196,3 @@ ompi_osc_rdma_frag_flush_all(ompi_osc_rdma_module_t *module) return OMPI_SUCCESS; } - -int osc_rdma_incoming_post (ompi_osc_rdma_module_t *module) -{ - OPAL_THREAD_LOCK(&module->lock); - module->num_post_msgs++; - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "received post message. num_post_msgs = %d", module->num_post_msgs)); - - if (0 == module->num_post_msgs) { - module->active_eager_send_active = true; - } - opal_condition_broadcast (&module->cond); - OPAL_THREAD_UNLOCK(&module->lock); - - return OMPI_SUCCESS; -}