Merge pull request #225 from hjelmn/master
osc/rdma: fix issue identified by Berk Hess
Этот коммит содержится в:
Коммит
23cb00d7d5
@ -424,8 +424,8 @@ static inline void mark_incoming_completion (ompi_osc_rdma_module_t *module, int
|
||||
{
|
||||
if (MPI_PROC_NULL == source) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"mark_incoming_completion marking active incoming complete. count = %d",
|
||||
(int) module->active_incoming_frag_count + 1));
|
||||
"mark_incoming_completion marking active incoming complete. count = %d. signal = %d",
|
||||
(int) module->active_incoming_frag_count + 1, module->active_incoming_frag_signal_count));
|
||||
OPAL_THREAD_ADD32(&module->active_incoming_frag_count, 1);
|
||||
if (module->active_incoming_frag_count >= module->active_incoming_frag_signal_count) {
|
||||
opal_condition_broadcast(&module->cond);
|
||||
|
@ -461,7 +461,7 @@ ompi_osc_rdma_wait(ompi_win_t *win)
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
while (0 != module->num_complete_msgs ||
|
||||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
|
||||
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
"num_complete_msgs = %d, active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d",
|
||||
module->num_complete_msgs, module->active_incoming_frag_count, module->active_incoming_frag_signal_count));
|
||||
@ -501,7 +501,7 @@ ompi_osc_rdma_test(ompi_win_t *win,
|
||||
OPAL_THREAD_LOCK(&(module->lock));
|
||||
|
||||
if (0 != module->num_complete_msgs ||
|
||||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
|
||||
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
|
||||
*flag = 0;
|
||||
ret = OMPI_SUCCESS;
|
||||
goto cleanup;
|
||||
|
@ -349,6 +349,16 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count,
|
||||
tag = get_tag(module);
|
||||
}
|
||||
|
||||
/* flush will be called at the end of this function. make sure the post message has
|
||||
* arrived. */
|
||||
if ((is_long_msg || request) && module->sc_group) {
|
||||
while (0 != module->num_post_msgs) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
@ -525,6 +535,16 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count,
|
||||
tag = get_tag (module);
|
||||
}
|
||||
|
||||
/* flush will be called at the end of this function. make sure the post message has
|
||||
* arrived. */
|
||||
if ((is_long_msg || request) && module->sc_group) {
|
||||
while (0 != module->num_post_msgs) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
header = (ompi_osc_rdma_header_acc_t*) ptr;
|
||||
@ -607,7 +627,7 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count,
|
||||
|
||||
ret = ompi_osc_rdma_frag_finish(module, frag);
|
||||
|
||||
if (request) {
|
||||
if (is_long_msg || request) {
|
||||
/* need to flush now in case the caller decides to wait on the request */
|
||||
ompi_osc_rdma_frag_flush_target (module, target);
|
||||
}
|
||||
@ -858,6 +878,16 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou
|
||||
/* for bookkeeping the get is "outgoing" */
|
||||
ompi_osc_signal_outgoing (module, target, 1);
|
||||
|
||||
/* flush will be called at the end of this function. make sure the post message has
|
||||
* arrived. */
|
||||
if (!release_req && module->sc_group) {
|
||||
while (0 != module->num_post_msgs) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
header = (ompi_osc_rdma_header_get_t*) ptr;
|
||||
@ -1088,6 +1118,16 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count,
|
||||
/* increment the number of outgoing fragments */
|
||||
ompi_osc_signal_outgoing (module, target_rank, rdma_request->outstanding_requests);
|
||||
|
||||
/* flush will be called at the end of this function. make sure the post message has
|
||||
* arrived. */
|
||||
if (!release_req && module->sc_group) {
|
||||
while (0 != module->num_post_msgs) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
header = (ompi_osc_rdma_header_acc_t *) ptr;
|
||||
|
@ -1335,8 +1335,10 @@ static inline int process_complete (ompi_osc_rdma_module_t *module, int source,
|
||||
ompi_osc_rdma_header_complete_t *complete_header)
|
||||
{
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc rdma: process_complete got complete message from %d. expected fragment count %d",
|
||||
source, complete_header->frag_count));
|
||||
"osc rdma: process_complete got complete message from %d. expected fragment count %d. "
|
||||
"current signal count %d. current incomming count: %d",
|
||||
source, complete_header->frag_count, module->active_incoming_frag_signal_count,
|
||||
module->active_incoming_frag_count));
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
@ -1681,8 +1683,12 @@ static int ompi_osc_rdma_callback (ompi_request_t *request)
|
||||
/* restart the receive request */
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ?
|
||||
source : MPI_PROC_NULL);
|
||||
/* post messages come unbuffered and should NOT increment the incoming completion
|
||||
* counters */
|
||||
if (OMPI_OSC_RDMA_HDR_TYPE_POST != base_header->type) {
|
||||
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ?
|
||||
source : MPI_PROC_NULL);
|
||||
}
|
||||
|
||||
osc_rdma_gc_clean ();
|
||||
|
||||
|
@ -54,7 +54,6 @@ static int frag_send_cb (ompi_request_t *request)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
frag_send(ompi_osc_rdma_module_t *module,
|
||||
ompi_osc_rdma_frag_t *frag)
|
||||
@ -67,6 +66,10 @@ frag_send(ompi_osc_rdma_module_t *module,
|
||||
"osc rdma: frag_send called to %d, frag = %p, count = %d",
|
||||
frag->target, (void *) frag, count));
|
||||
|
||||
/* we need to signal now that a frag is outgoing to ensure the count sent
|
||||
* with the unlock message is correct */
|
||||
ompi_osc_signal_outgoing (module, frag->target, 1);
|
||||
|
||||
return ompi_osc_rdma_isend_w_cb (frag->buffer, count, MPI_BYTE, frag->target, OSC_RDMA_FRAG_TAG,
|
||||
module->comm, frag_send_cb, frag);
|
||||
}
|
||||
@ -81,10 +84,6 @@ ompi_osc_rdma_frag_start(ompi_osc_rdma_module_t *module,
|
||||
assert(0 == frag->pending);
|
||||
assert(module->peers[frag->target].active_frag != frag);
|
||||
|
||||
/* we need to signal now that a frag is outgoing to ensure the count sent
|
||||
* with the unlock message is correct */
|
||||
ompi_osc_signal_outgoing (module, frag->target, 1);
|
||||
|
||||
/* if eager sends are not active, can't send yet, so buffer and
|
||||
get out... */
|
||||
if (module->passive_target_access_epoch) {
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user