diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index b270da6a46..b7a9520eea 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -171,8 +171,8 @@ struct ompi_osc_pt2pt_module_t { received. */ uint64_t flush_ack_received_count; - /** True if the access epoch is a passive target access epoch */ - bool passive_target_access_epoch; + /** Number of targets locked/being locked */ + unsigned int passive_target_access_epoch; /** start sending data eagerly */ bool active_eager_send_active; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index e682b854a5..068edf9d79 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -1633,6 +1633,9 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request) switch (base_header->type) { case OMPI_OSC_PT2PT_HDR_TYPE_FRAG: process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header); + + /* only data fragments should be included in the completion counters */ + mark_incoming_completion (module, (base_header->flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL); break; case OMPI_OSC_PT2PT_HDR_TYPE_POST: (void) osc_pt2pt_incoming_post (module, source); @@ -1654,12 +1657,6 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request) OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "finished processing incoming messages")); - /* post messages come unbuffered and should NOT increment the incoming completion - * counters */ - if (OMPI_OSC_PT2PT_HDR_TYPE_POST != base_header->type) { - mark_incoming_completion (module, (base_header->flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ? - source : MPI_PROC_NULL); - } osc_pt2pt_gc_clean (module); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 9dddc1a589..6b6e85e708 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -54,9 +54,9 @@ struct ompi_osc_pt2pt_outstanding_lock_t { int target; int assert; bool flushing; - int32_t lock_acks_received; - int32_t unlock_acks_received; - int32_t flush_acks_received; + int32_t lock_acks_expected; + int32_t unlock_acks_expected; + int32_t flush_acks_expected; uint64_t serial_number; int32_t type; }; @@ -136,7 +136,7 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp /* If locking local, can't be non-blocking according to the standard. We need to wait for the ack here. */ OPAL_THREAD_LOCK(&module->lock); - while (0 == lock->lock_acks_received) { + while (lock->lock_acks_expected) { opal_condition_wait(&module->cond, &module->lock); } OPAL_THREAD_UNLOCK(&module->lock); @@ -163,7 +163,7 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, /* need to ensure we make progress */ opal_progress(); - OPAL_THREAD_ADD32(&lock->unlock_acks_received, 1); + OPAL_THREAD_ADD32(&lock->unlock_acks_expected, -1); } static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock) @@ -255,6 +255,9 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module int ret; if (0 == (assert & MPI_MODE_NOCHECK)) { + lock->lock_acks_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1; + lock->unlock_acks_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1; + if (my_rank != target && target != -1) { ret = ompi_osc_pt2pt_lock_remote (module, target, lock); } else { @@ -279,12 +282,6 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module } } - } else { - if (-1 == target) { - lock->lock_acks_received = ompi_comm_size(module->comm); - } else { - lock->lock_acks_received = 1; - } } return OMPI_SUCCESS; @@ -317,8 +314,8 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, } lock->target = target; - lock->lock_acks_received = 0; - lock->unlock_acks_received = 0; + lock->lock_acks_expected = 0; + lock->unlock_acks_expected = 0; lock->serial_number = OPAL_THREAD_ADD64((int64_t *) &module->lock_serial_number, 1); lock->type = lock_type; lock->assert = assert; @@ -340,7 +337,7 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, peer->access_epoch = true; } - module->passive_target_access_epoch = true; + ++module->passive_target_access_epoch; opal_list_append(&module->outstanding_locks, &lock->super); OPAL_THREAD_UNLOCK(&module->lock); @@ -361,14 +358,10 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) ompi_osc_pt2pt_outstanding_lock_t *lock = NULL; int my_rank = ompi_comm_rank (module->comm); ompi_osc_pt2pt_peer_t *peer = NULL; - int lock_acks_expected; int ret = OMPI_SUCCESS; if (-1 != target) { - lock_acks_expected = 1; peer = module->peers + target; - } else { - lock_acks_expected = ompi_comm_size (module->comm); } OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, @@ -387,11 +380,11 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) opal_list_remove_item (&module->outstanding_locks, &lock->super); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_unlock_internal: lock acks received: %d, expected: %d", - lock->lock_acks_received, lock_acks_expected)); + "ompi_osc_pt2pt_unlock_internal: lock acks still expected: %d", + lock->lock_acks_expected)); /* wait until ack has arrived from target */ - while (lock->lock_acks_received != lock_acks_expected) { + while (lock->lock_acks_expected) { opal_condition_wait(&module->cond, &module->lock); } OPAL_THREAD_UNLOCK(&module->lock); @@ -437,7 +430,7 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) /* wait for unlock acks. this signals remote completion of fragments */ OPAL_THREAD_LOCK(&module->lock); - while (lock->unlock_acks_received != lock_acks_expected) { + while (lock->unlock_acks_expected) { opal_condition_wait(&module->cond, &module->lock); } OPAL_THREAD_UNLOCK(&module->lock); @@ -451,11 +444,10 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) OPAL_THREAD_LOCK(&module->lock); if (-1 != target) { peer->access_epoch = false; - module->passive_target_access_epoch = false; } else { - module->passive_target_access_epoch = false; module->all_access_epoch = false; } + --module->passive_target_access_epoch; OPAL_THREAD_UNLOCK(&module->lock); OBJ_RELEASE(lock); @@ -508,11 +500,11 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ /* wait until ack has arrived from target, since we need to be able to eager send before we can transfer all the data... */ OPAL_THREAD_LOCK(&module->lock); - while (peer_count > lock->lock_acks_received && lock->flushing) { + while (lock->lock_acks_expected && lock->flushing) { opal_condition_wait(&module->cond, &module->lock); } - lock->flush_acks_received = 0; + lock->flush_acks_expected = peer_count; lock->flushing = true; OPAL_THREAD_UNLOCK(&module->lock); @@ -541,7 +533,7 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ /* wait for all the requests and the flush ack (meaning remote completion) */ OPAL_THREAD_LOCK(&module->lock); - while (flush_count != lock->flush_acks_received) { + while (lock->flush_acks_expected) { opal_condition_wait(&module->cond, &module->lock); } @@ -710,8 +702,9 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor, "lock could not be located")); } - OPAL_THREAD_ADD32(&lock->lock_acks_received, 1); - opal_condition_broadcast (&module->cond); + if (0 == OPAL_THREAD_ADD32(&lock->lock_acks_expected, -1)) { + opal_condition_broadcast (&module->cond); + } return OMPI_SUCCESS; } @@ -835,7 +828,9 @@ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module, /* no need to hold the lock to set this */ peer->eager_send_active = true; - OPAL_THREAD_ADD32(&lock->lock_acks_received, 1); + if (0 == OPAL_THREAD_ADD32(&lock->lock_acks_expected, -1)) { + opal_condition_broadcast(&module->cond); + } opal_condition_broadcast(&module->cond); } @@ -845,14 +840,16 @@ void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int sour ompi_osc_pt2pt_outstanding_lock_t *lock; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_process_unlock_ack: processing flush ack from %d for lock %" PRIu64, + "ompi_osc_pt2pt_process_flush_ack: processing flush ack from %d for lock %" PRIu64, source, flush_ack_header->serial_number)); /* NTH: need to verify that this will work as expected */ lock = find_outstanding_lock_by_serial (module, flush_ack_header->serial_number); assert (NULL != lock); - OPAL_THREAD_ADD32(&lock->flush_acks_received, 1); + if (0 == OPAL_THREAD_ADD32(&lock->flush_acks_expected, -1)) { + opal_condition_broadcast(&module->cond); + } opal_condition_broadcast(&module->cond); } @@ -873,7 +870,7 @@ void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int sou peer->eager_send_active = false; - if (0 == OPAL_THREAD_ADD32(&lock->unlock_acks_received, 1)) { + if (0 == OPAL_THREAD_ADD32(&lock->unlock_acks_expected, -1)) { opal_condition_broadcast(&module->cond); } }