1
1

* Fix race condition with the pending_{in,out} variables -- if we're going

to do while(...) { } then we can't change the variables in the ... 
    atomically, but should do it while holding the module lock.
  * Fix dumb communicator creation error when we don't create the progress
    stuff (because a window already exists), where we would accidently
    jump to the error case.

This commit was SVN r14715.
Этот коммит содержится в:
Brian Barrett 2007-05-21 20:53:02 +00:00
родитель 677eb5e4bc
Коммит 38eab3613b
4 изменённых файлов: 90 добавлений и 59 удалений

Просмотреть файл

@ -105,22 +105,22 @@ struct ompi_osc_pt2pt_module_t {
Complete. For lock, the number of
messages waiting for completion on on the origin side. Not
protected by p2p_lock - must use atomic counter operations. */
volatile int32_t p2p_num_pending_out;
int32_t p2p_num_pending_out;
/** For MPI_Fence synchronization, the number of expected incoming
messages. For Post/Wait, the number of expected updates from
complete. For lock, the number of messages on the passive side
we are waiting for. Not protected by p2p_lock - must use
atomic counter operations. */
volatile int32_t p2p_num_pending_in;
int32_t p2p_num_pending_in;
/** Number of "ping" messages from the remote post group we've
received */
volatile int32_t p2p_num_post_msgs;
int32_t p2p_num_post_msgs;
/** Number of "count" messages from the remote complete group
we've received */
volatile int32_t p2p_num_complete_msgs;
int32_t p2p_num_complete_msgs;
/** cyclic counter for a unique tag for long messages. Not
protected by the p2p_lock - must use create_send_tag() to

Просмотреть файл

@ -357,9 +357,11 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
#else
ret = opal_progress_register(ompi_osc_pt2pt_component_progress);
#endif
} else {
ret = OMPI_SUCCESS;
}
if (OMPI_SUCCESS != ret) goto cleanup;
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
if (OMPI_SUCCESS != ret) goto cleanup;
/* fill in window information */
win->w_osc_module = (ompi_osc_base_module_t*) module;
@ -572,11 +574,10 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
case OMPI_OSC_PT2PT_HDR_POST:
{
int32_t count;
count = OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs), -1);
if (count == 0) {
opal_condition_broadcast(&module->p2p_cond);
}
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (module->p2p_num_post_msgs -= 1);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
if (count == 0) opal_condition_broadcast(&module->p2p_cond);
}
break;
@ -594,10 +595,11 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
/* we've heard from one more place, and have value reqs to
process */
/* BWB -- need to check to see if we can do this better */
count = OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs), -1);
if (count == 0) opal_condition_broadcast(&module->p2p_cond);
count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]);
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (module->p2p_num_complete_msgs -= 1);
count += (module->p2p_num_pending_in += header->hdr_value[0]);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
if (count == 0) opal_condition_broadcast(&module->p2p_cond);
}
break;
@ -618,9 +620,10 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
ompi_osc_pt2pt_passive_lock(module, header->hdr_value[0],
header->hdr_value[1]);
} else {
count = OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"received lock request ack, count %d", count));
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (module->p2p_lock_received_ack += 1);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
if (count != 0) opal_condition_broadcast(&module->p2p_cond);
}
}
@ -646,10 +649,10 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
{
int32_t count;
count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1);
if (count == 0) {
opal_condition_broadcast(&module->p2p_cond);
}
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (module->p2p_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
if (count == 0) opal_condition_broadcast(&module->p2p_cond);
}
break;
@ -724,11 +727,15 @@ ompi_osc_pt2pt_component_progress(void)
static void*
component_thread_fn(opal_object_t *obj)
{
struct timespec waittime;
while (mca_osc_pt2pt_component.p2p_c_thread_run) {
/* wake up whenever a request completes, to make sure it's not
for us */
waittime.tv_sec = 1;
waittime.tv_usec = 0;
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
opal_condition_timedwait(&ompi_request_cond, &ompi_request_lock);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
ompi_osc_pt2pt_component_progress();
}

Просмотреть файл

@ -56,13 +56,20 @@ create_send_tag(ompi_osc_pt2pt_module_t *module)
static inline void
inmsg_mark_complete(ompi_osc_pt2pt_module_t *module)
{
int32_t count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
int32_t count;
bool need_unlock = false;
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (module->p2p_num_pending_in -= 1);
if ((0 != module->p2p_lock_status) &&
(opal_list_get_size(&module->p2p_unlocks_pending) != 0)) {
need_unlock = true;
}
OPAL_THREAD_UNLOCK(&module->p2p_lock);
if (0 == count) {
if (need_unlock) ompi_osc_pt2pt_passive_unlock_complete(module);
opal_condition_broadcast(&module->p2p_cond);
if ((0 != module->p2p_lock_status) &&
(opal_list_get_size(&module->p2p_unlocks_pending) != 0)) {
ompi_osc_pt2pt_passive_unlock_complete(module);
}
}
}
@ -86,7 +93,9 @@ ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_target_rank));
count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (sendreq->req_module->p2p_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
ompi_osc_pt2pt_longreq_free(longreq);
ompi_osc_pt2pt_sendreq_free(sendreq);
@ -122,7 +131,9 @@ ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
/* do we need to post a send? */
if (header->hdr_msg_length != 0) {
/* sendreq is done. Mark it as so and get out of here */
count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (sendreq->req_module->p2p_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
ompi_osc_pt2pt_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond);
}
@ -717,10 +728,13 @@ ompi_osc_pt2pt_replyreq_recv_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
(ompi_osc_pt2pt_sendreq_t*) longreq->mpireq.cbdata;
int32_t count;
ompi_osc_pt2pt_longreq_free(longreq);
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (sendreq->req_module->p2p_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ompi_osc_pt2pt_longreq_free(longreq);
ompi_osc_pt2pt_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond);
}
@ -749,8 +763,13 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
&iov_count,
&max_data );
count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
OPAL_THREAD_LOCK(&module->p2p_lock);
count = (sendreq->req_module->p2p_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
ompi_osc_pt2pt_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&module->p2p_cond);
} else {

Просмотреть файл

@ -29,13 +29,12 @@
#include "ompi/mca/osc/base/base.h"
/* Must hold module's lock before calling... */
static inline void
ompi_osc_pt2pt_flip_sendreqs(ompi_osc_pt2pt_module_t *module)
{
unsigned int *tmp;
OPAL_THREAD_LOCK(&(module->p2p_lock));
tmp = module->p2p_copy_num_pending_sendreqs;
module->p2p_copy_num_pending_sendreqs =
module->p2p_num_pending_sendreqs;
@ -47,8 +46,6 @@ ompi_osc_pt2pt_flip_sendreqs(ompi_osc_pt2pt_module_t *module)
opal_list_join(&module->p2p_copy_pending_sendreqs,
opal_list_get_end(&module->p2p_copy_pending_sendreqs),
&module->p2p_pending_sendreqs);
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
@ -58,6 +55,7 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
unsigned int incoming_reqs;
int ret = OMPI_SUCCESS, i;
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
int num_outgoing = 0;
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
/* check that the user didn't lie to us - since NOPRECEDED
@ -76,7 +74,11 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
/* "atomically" copy all the data we're going to be modifying
into the copy... */
OPAL_THREAD_LOCK(&(module->p2p_lock));
ompi_osc_pt2pt_flip_sendreqs(module);
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
num_outgoing = opal_list_get_size(&(module->p2p_copy_pending_sendreqs));
/* find out how much data everyone is going to send us. */
ret = module->p2p_comm->
@ -104,12 +106,6 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
return ret;
}
/* possible we've already received a couple in messages, so
atomically add however many we're going to wait for */
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), incoming_reqs);
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out),
opal_list_get_size(&(module->p2p_copy_pending_sendreqs)));
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"fence: waiting on %d in and %d out",
module->p2p_num_pending_in,
@ -127,13 +123,19 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
if (OMPI_SUCCESS != ret) {
opal_output_verbose(5, ompi_osc_base_output,
"fence: failure in starting sendreq (%d). Will try later.",
"fence: failure in starting sendreq (%d). "
"Will try later.",
ret);
opal_list_append(&(module->p2p_copy_pending_sendreqs), item);
}
}
OPAL_THREAD_LOCK(&module->p2p_lock);
/* possible we've already received a couple in messages, so
add however many we're going to wait for */
module->p2p_num_pending_in += incoming_reqs;
module->p2p_num_pending_out += num_outgoing;
/* now we know how many things we're waiting for - wait for them... */
while (module->p2p_num_pending_in > 0 ||
0 != module->p2p_num_pending_out) {
@ -171,6 +173,10 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group,
goto cleanup;
}
module->p2p_sc_group = group;
/* possible we've already received a couple in messages, so
add however many we're going to wait for */
module->p2p_num_post_msgs += ompi_group_size(module->p2p_sc_group);
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
memset(module->p2p_sc_remote_active_ranks, 0,
@ -209,11 +215,6 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group,
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_STARTED);
/* possible we've already received a couple in messages, so
atomicall add however many we're going to wait for */
OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs),
ompi_group_size(module->p2p_sc_group));
return OMPI_SUCCESS;
cleanup:
@ -237,7 +238,6 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
while (0 != module->p2p_num_post_msgs) {
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
}
OPAL_THREAD_UNLOCK(&module->p2p_lock);
ompi_osc_pt2pt_flip_sendreqs(module);
@ -246,8 +246,13 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
for (i = 0 ; i < ompi_group_size(module->p2p_sc_group) ; ++i) {
int comm_rank = module->p2p_sc_remote_ranks[i];
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out),
module->p2p_copy_num_pending_sendreqs[comm_rank]);
module->p2p_num_pending_out +=
module->p2p_copy_num_pending_sendreqs[comm_rank];
}
OPAL_THREAD_UNLOCK(&module->p2p_lock);
for (i = 0 ; i < ompi_group_size(module->p2p_sc_group) ; ++i) {
int comm_rank = module->p2p_sc_remote_ranks[i];
ret = ompi_osc_pt2pt_control_send(module,
module->p2p_sc_group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_COMPLETE,
@ -309,22 +314,22 @@ ompi_osc_pt2pt_module_post(ompi_group_t *group,
OPAL_THREAD_LOCK(&(module->p2p_lock));
assert(NULL == module->p2p_pw_group);
module->p2p_pw_group = group;
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
/* Set our mode to expose w/ post */
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
ompi_win_append_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
/* list how many complete counters we're still waiting on */
OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs),
ompi_group_size(module->p2p_pw_group));
module->p2p_num_complete_msgs +=
ompi_group_size(module->p2p_pw_group);
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
/* send a hello counter to everyone in group */
for (i = 0 ; i < ompi_group_size(module->p2p_pw_group) ; ++i) {
ompi_osc_pt2pt_control_send(module,
group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_POST, 1, 0);
}
}
return OMPI_SUCCESS;
}
@ -442,9 +447,8 @@ ompi_osc_pt2pt_module_unlock(int target,
while (0 == module->p2p_lock_received_ack) {
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
}
OPAL_THREAD_UNLOCK(&module->p2p_lock);
OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), -1);
module->p2p_lock_received_ack -= 1;
/* start all the requests */
ompi_osc_pt2pt_flip_sendreqs(module);
@ -456,7 +460,8 @@ ompi_osc_pt2pt_module_unlock(int target,
/* we want to send all the requests, plus we wait for one more
completion event for the control message ack from the unlocker
saying we're done */
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), out_count + 1);
module->p2p_num_pending_out += (out_count + 1);
OPAL_THREAD_UNLOCK(&module->p2p_lock);
/* send the unlock request */
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
@ -574,7 +579,7 @@ ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,
new_pending->proc = proc;
new_pending->lock_type = 0;
OPAL_THREAD_LOCK(&(module->p2p_lock));
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count);
module->p2p_num_pending_in += count;
opal_list_append(&module->p2p_unlocks_pending, &(new_pending->super));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));