diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 6e87eb898c..5e5d6b3204 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -105,22 +105,22 @@ struct ompi_osc_pt2pt_module_t { Complete. For lock, the number of messages waiting for completion on on the origin side. Not protected by p2p_lock - must use atomic counter operations. */ - volatile int32_t p2p_num_pending_out; + int32_t p2p_num_pending_out; /** For MPI_Fence synchronization, the number of expected incoming messages. For Post/Wait, the number of expected updates from complete. For lock, the number of messages on the passive side we are waiting for. Not protected by p2p_lock - must use atomic counter operations. */ - volatile int32_t p2p_num_pending_in; + int32_t p2p_num_pending_in; /** Number of "ping" messages from the remote post group we've received */ - volatile int32_t p2p_num_post_msgs; + int32_t p2p_num_post_msgs; /** Number of "count" messages from the remote complete group we've received */ - volatile int32_t p2p_num_complete_msgs; + int32_t p2p_num_complete_msgs; /** cyclic counter for a unique tag for long messages. Not protected by the p2p_lock - must use create_send_tag() to diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index 74d44762b0..6a83d3b57a 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -357,9 +357,11 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, #else ret = opal_progress_register(ompi_osc_pt2pt_component_progress); #endif + } else { + ret = OMPI_SUCCESS; } - if (OMPI_SUCCESS != ret) goto cleanup; OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock); + if (OMPI_SUCCESS != ret) goto cleanup; /* fill in window information */ win->w_osc_module = (ompi_osc_base_module_t*) module; @@ -572,11 +574,10 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq) case OMPI_OSC_PT2PT_HDR_POST: { int32_t count; - - count = OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs), -1); - if (count == 0) { - opal_condition_broadcast(&module->p2p_cond); - } + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (module->p2p_num_post_msgs -= 1); + OPAL_THREAD_UNLOCK(&module->p2p_lock); + if (count == 0) opal_condition_broadcast(&module->p2p_cond); } break; @@ -594,10 +595,11 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq) /* we've heard from one more place, and have value reqs to process */ - /* BWB -- need to check to see if we can do this better */ - count = OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs), -1); - if (count == 0) opal_condition_broadcast(&module->p2p_cond); - count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]); + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (module->p2p_num_complete_msgs -= 1); + count += (module->p2p_num_pending_in += header->hdr_value[0]); + OPAL_THREAD_UNLOCK(&module->p2p_lock); + if (count == 0) opal_condition_broadcast(&module->p2p_cond); } break; @@ -618,9 +620,10 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq) ompi_osc_pt2pt_passive_lock(module, header->hdr_value[0], header->hdr_value[1]); } else { - count = OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1); - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, - "received lock request ack, count %d", count)); + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (module->p2p_lock_received_ack += 1); + OPAL_THREAD_UNLOCK(&module->p2p_lock); + if (count != 0) opal_condition_broadcast(&module->p2p_cond); } } @@ -646,10 +649,10 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq) { int32_t count; - count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1); - if (count == 0) { - opal_condition_broadcast(&module->p2p_cond); - } + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (module->p2p_num_pending_out -= 1); + OPAL_THREAD_UNLOCK(&module->p2p_lock); + if (count == 0) opal_condition_broadcast(&module->p2p_cond); } break; @@ -724,11 +727,15 @@ ompi_osc_pt2pt_component_progress(void) static void* component_thread_fn(opal_object_t *obj) { + struct timespec waittime; + while (mca_osc_pt2pt_component.p2p_c_thread_run) { /* wake up whenever a request completes, to make sure it's not for us */ + waittime.tv_sec = 1; + waittime.tv_usec = 0; OPAL_THREAD_LOCK(&ompi_request_lock); - opal_condition_wait(&ompi_request_cond, &ompi_request_lock); + opal_condition_timedwait(&ompi_request_cond, &ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock); ompi_osc_pt2pt_component_progress(); } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index e14a351254..7474f3ae4e 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -56,13 +56,20 @@ create_send_tag(ompi_osc_pt2pt_module_t *module) static inline void inmsg_mark_complete(ompi_osc_pt2pt_module_t *module) { - int32_t count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1); + int32_t count; + bool need_unlock = false; + + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (module->p2p_num_pending_in -= 1); + if ((0 != module->p2p_lock_status) && + (opal_list_get_size(&module->p2p_unlocks_pending) != 0)) { + need_unlock = true; + } + OPAL_THREAD_UNLOCK(&module->p2p_lock); + if (0 == count) { + if (need_unlock) ompi_osc_pt2pt_passive_unlock_complete(module); opal_condition_broadcast(&module->p2p_cond); - if ((0 != module->p2p_lock_status) && - (opal_list_get_size(&module->p2p_unlocks_pending) != 0)) { - ompi_osc_pt2pt_passive_unlock_complete(module); - } } } @@ -86,7 +93,9 @@ ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq) sendreq->req_module->p2p_comm->c_my_rank, sendreq->req_target_rank)); - count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (sendreq->req_module->p2p_num_pending_out -= 1); + OPAL_THREAD_UNLOCK(&module->p2p_lock); ompi_osc_pt2pt_longreq_free(longreq); ompi_osc_pt2pt_sendreq_free(sendreq); @@ -122,7 +131,9 @@ ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq) /* do we need to post a send? */ if (header->hdr_msg_length != 0) { /* sendreq is done. Mark it as so and get out of here */ - count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (sendreq->req_module->p2p_num_pending_out -= 1); + OPAL_THREAD_UNLOCK(&module->p2p_lock); ompi_osc_pt2pt_sendreq_free(sendreq); if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond); } @@ -717,10 +728,13 @@ ompi_osc_pt2pt_replyreq_recv_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq) (ompi_osc_pt2pt_sendreq_t*) longreq->mpireq.cbdata; int32_t count; - ompi_osc_pt2pt_longreq_free(longreq); + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (sendreq->req_module->p2p_num_pending_out -= 1); + OPAL_THREAD_UNLOCK(&module->p2p_lock); - count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + ompi_osc_pt2pt_longreq_free(longreq); ompi_osc_pt2pt_sendreq_free(sendreq); + if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond); } @@ -749,8 +763,13 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module, &iov_count, &max_data ); - count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + + OPAL_THREAD_LOCK(&module->p2p_lock); + count = (sendreq->req_module->p2p_num_pending_out -= 1); + OPAL_THREAD_UNLOCK(&module->p2p_lock); + ompi_osc_pt2pt_sendreq_free(sendreq); + if (0 == count) opal_condition_broadcast(&module->p2p_cond); } else { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c index f97a7927ac..2927702a0b 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c @@ -29,13 +29,12 @@ #include "ompi/mca/osc/base/base.h" +/* Must hold module's lock before calling... */ static inline void ompi_osc_pt2pt_flip_sendreqs(ompi_osc_pt2pt_module_t *module) { unsigned int *tmp; - OPAL_THREAD_LOCK(&(module->p2p_lock)); - tmp = module->p2p_copy_num_pending_sendreqs; module->p2p_copy_num_pending_sendreqs = module->p2p_num_pending_sendreqs; @@ -47,8 +46,6 @@ ompi_osc_pt2pt_flip_sendreqs(ompi_osc_pt2pt_module_t *module) opal_list_join(&module->p2p_copy_pending_sendreqs, opal_list_get_end(&module->p2p_copy_pending_sendreqs), &module->p2p_pending_sendreqs); - - OPAL_THREAD_UNLOCK(&(module->p2p_lock)); } @@ -58,6 +55,7 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win) unsigned int incoming_reqs; int ret = OMPI_SUCCESS, i; ompi_osc_pt2pt_module_t *module = P2P_MODULE(win); + int num_outgoing = 0; if (0 != (assert & MPI_MODE_NOPRECEDE)) { /* check that the user didn't lie to us - since NOPRECEDED @@ -76,7 +74,11 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win) /* "atomically" copy all the data we're going to be modifying into the copy... */ + OPAL_THREAD_LOCK(&(module->p2p_lock)); ompi_osc_pt2pt_flip_sendreqs(module); + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); + + num_outgoing = opal_list_get_size(&(module->p2p_copy_pending_sendreqs)); /* find out how much data everyone is going to send us. */ ret = module->p2p_comm-> @@ -104,12 +106,6 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win) return ret; } - /* possible we've already received a couple in messages, so - atomically add however many we're going to wait for */ - OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), incoming_reqs); - OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), - opal_list_get_size(&(module->p2p_copy_pending_sendreqs))); - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, "fence: waiting on %d in and %d out", module->p2p_num_pending_in, @@ -127,13 +123,19 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win) if (OMPI_SUCCESS != ret) { opal_output_verbose(5, ompi_osc_base_output, - "fence: failure in starting sendreq (%d). Will try later.", + "fence: failure in starting sendreq (%d). " + "Will try later.", ret); opal_list_append(&(module->p2p_copy_pending_sendreqs), item); } } OPAL_THREAD_LOCK(&module->p2p_lock); + /* possible we've already received a couple in messages, so + add however many we're going to wait for */ + module->p2p_num_pending_in += incoming_reqs; + module->p2p_num_pending_out += num_outgoing; + /* now we know how many things we're waiting for - wait for them... */ while (module->p2p_num_pending_in > 0 || 0 != module->p2p_num_pending_out) { @@ -171,6 +173,10 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group, goto cleanup; } module->p2p_sc_group = group; + + /* possible we've already received a couple in messages, so + add however many we're going to wait for */ + module->p2p_num_post_msgs += ompi_group_size(module->p2p_sc_group); OPAL_THREAD_UNLOCK(&(module->p2p_lock)); memset(module->p2p_sc_remote_active_ranks, 0, @@ -209,11 +215,6 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group, ompi_win_remove_mode(win, OMPI_WIN_FENCE); ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_STARTED); - /* possible we've already received a couple in messages, so - atomicall add however many we're going to wait for */ - OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs), - ompi_group_size(module->p2p_sc_group)); - return OMPI_SUCCESS; cleanup: @@ -237,7 +238,6 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win) while (0 != module->p2p_num_post_msgs) { opal_condition_wait(&module->p2p_cond, &module->p2p_lock); } - OPAL_THREAD_UNLOCK(&module->p2p_lock); ompi_osc_pt2pt_flip_sendreqs(module); @@ -246,8 +246,13 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win) for (i = 0 ; i < ompi_group_size(module->p2p_sc_group) ; ++i) { int comm_rank = module->p2p_sc_remote_ranks[i]; - OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), - module->p2p_copy_num_pending_sendreqs[comm_rank]); + module->p2p_num_pending_out += + module->p2p_copy_num_pending_sendreqs[comm_rank]; + } + OPAL_THREAD_UNLOCK(&module->p2p_lock); + + for (i = 0 ; i < ompi_group_size(module->p2p_sc_group) ; ++i) { + int comm_rank = module->p2p_sc_remote_ranks[i]; ret = ompi_osc_pt2pt_control_send(module, module->p2p_sc_group->grp_proc_pointers[i], OMPI_OSC_PT2PT_HDR_COMPLETE, @@ -309,22 +314,22 @@ ompi_osc_pt2pt_module_post(ompi_group_t *group, OPAL_THREAD_LOCK(&(module->p2p_lock)); assert(NULL == module->p2p_pw_group); module->p2p_pw_group = group; - OPAL_THREAD_UNLOCK(&(module->p2p_lock)); /* Set our mode to expose w/ post */ ompi_win_remove_mode(win, OMPI_WIN_FENCE); ompi_win_append_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED); /* list how many complete counters we're still waiting on */ - OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs), - ompi_group_size(module->p2p_pw_group)); + module->p2p_num_complete_msgs += + ompi_group_size(module->p2p_pw_group); + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); /* send a hello counter to everyone in group */ for (i = 0 ; i < ompi_group_size(module->p2p_pw_group) ; ++i) { ompi_osc_pt2pt_control_send(module, group->grp_proc_pointers[i], OMPI_OSC_PT2PT_HDR_POST, 1, 0); - } + } return OMPI_SUCCESS; } @@ -442,9 +447,8 @@ ompi_osc_pt2pt_module_unlock(int target, while (0 == module->p2p_lock_received_ack) { opal_condition_wait(&module->p2p_cond, &module->p2p_lock); } - OPAL_THREAD_UNLOCK(&module->p2p_lock); - OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), -1); + module->p2p_lock_received_ack -= 1; /* start all the requests */ ompi_osc_pt2pt_flip_sendreqs(module); @@ -456,7 +460,8 @@ ompi_osc_pt2pt_module_unlock(int target, /* we want to send all the requests, plus we wait for one more completion event for the control message ack from the unlocker saying we're done */ - OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), out_count + 1); + module->p2p_num_pending_out += (out_count + 1); + OPAL_THREAD_UNLOCK(&module->p2p_lock); /* send the unlock request */ OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, @@ -574,7 +579,7 @@ ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module, new_pending->proc = proc; new_pending->lock_type = 0; OPAL_THREAD_LOCK(&(module->p2p_lock)); - OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count); + module->p2p_num_pending_in += count; opal_list_append(&module->p2p_unlocks_pending, &(new_pending->super)); OPAL_THREAD_UNLOCK(&(module->p2p_lock));