diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index e96c3f8b8d..229343afa6 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -91,6 +91,15 @@ struct ompi_osc_pt2pt_component_t { }; typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t; +enum { + /** peer has sent an unexpected post message (no matching start) */ + OMPI_OSC_PT2PT_PEER_FLAG_UNEX = 1, + /** eager sends are active on this peer */ + OMPI_OSC_PT2PT_PEER_FLAG_EAGER = 2, + /** peer has been locked (on-demand locking for lock_all) */ + OMPI_OSC_PT2PT_PEER_FLAG_LOCK = 4, +}; + struct ompi_osc_pt2pt_peer_t { /** make this an opal object */ @@ -111,13 +120,54 @@ struct ompi_osc_pt2pt_peer_t { /** number of fragments incomming (negative - expected, positive - unsynchronized) */ int32_t passive_incoming_frag_count; - /** unexpected post message arrived */ - bool unexpected_post; + /** peer flags */ + int32_t flags; }; typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t; OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t); +static inline bool ompi_osc_pt2pt_peer_locked (ompi_osc_pt2pt_peer_t *peer) +{ + return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_LOCK); +} + +static inline bool ompi_osc_pt2pt_peer_unex (ompi_osc_pt2pt_peer_t *peer) +{ + return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_UNEX); +} + +static inline bool ompi_osc_pt2pt_peer_eager_active (ompi_osc_pt2pt_peer_t *peer) +{ + return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER); +} + +static inline void ompi_osc_pt2pt_peer_set_flag (ompi_osc_pt2pt_peer_t *peer, int32_t flag, bool value) +{ + if (value) { + peer->flags |= flag; + } else { + peer->flags &= ~flag; + } +} + +static inline bool ompi_osc_pt2pt_peer_set_locked (ompi_osc_pt2pt_peer_t *peer, bool value) +{ + ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_LOCK, value); +} + +static inline bool ompi_osc_pt2pt_peer_set_unex (ompi_osc_pt2pt_peer_t *peer, bool value) +{ + ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_UNEX, value); +} + +static inline bool ompi_osc_pt2pt_peer_set_eager_active (ompi_osc_pt2pt_peer_t *peer, bool value) +{ + ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_EAGER, value); +} + +OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t); + /** Module structure. Exactly one of these is associated with each PT2PT window */ struct ompi_osc_pt2pt_module_t { @@ -431,6 +481,8 @@ int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module, int tag, struct ompi_communicator_t *comm); +int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock); + /** * ompi_osc_pt2pt_progress_pending_acc: * @@ -845,6 +897,12 @@ static inline void ompi_osc_pt2pt_module_lock_remove (struct ompi_osc_pt2pt_modu static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc_pt2pt_module_t *module, int target, struct ompi_osc_pt2pt_peer_t **peer) { + ompi_osc_pt2pt_peer_t *tmp; + + if (NULL == peer) { + peer = &tmp; + } + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc/pt2pt: looking for synchronization object for target %d", target)); @@ -862,8 +920,9 @@ static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc /* fence epoch is now active */ module->all_sync.epoch_active = true; - if (peer) { - *peer = ompi_osc_pt2pt_peer_lookup (module, target); + *peer = ompi_osc_pt2pt_peer_lookup (module, target); + if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type && !ompi_osc_pt2pt_peer_locked (*peer)) { + (void) ompi_osc_pt2pt_lock_remote (module, target, &module->all_sync); } return &module->all_sync; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index b8b04796e1..3c086a42f2 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -261,13 +261,13 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) for (int i = 0 ; i < sync->num_peers ; ++i) { ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i]; - if (peer->unexpected_post) { + if (ompi_osc_pt2pt_peer_unex (peer)) { /* the peer already sent a post message for this pscw access epoch */ OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "found unexpected post from %d", peer->rank)); OPAL_THREAD_ADD32 (&sync->sync_expected, -1); - peer->unexpected_post = false; + ompi_osc_pt2pt_peer_set_unex (peer, false); } } OPAL_THREAD_UNLOCK(&sync->lock); @@ -600,7 +600,7 @@ void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source) "received unexpected post message from %d for future PSCW synchronization", source)); - peer->unexpected_post = true; + ompi_osc_pt2pt_peer_set_unex (peer, true); OPAL_THREAD_UNLOCK(&sync->lock); } else { OPAL_THREAD_UNLOCK(&sync->lock); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index cc3629315c..54c7eb6aeb 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -492,7 +492,7 @@ static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer) OBJ_CONSTRUCT(&peer->lock, opal_mutex_t); peer->active_frag = NULL; peer->passive_incoming_frag_count = 0; - peer->unexpected_post = false; + peer->flags = 0; } static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 8f14677cee..926e4958b5 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -58,11 +58,14 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock) { const int my_rank = ompi_comm_rank (module->comm); + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank); int lock_type = lock->sync.lock.type; bool acquired = false; assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); + (void) OPAL_THREAD_ADD32(&lock->sync_expected, 1); + acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock); if (!acquired) { /* queue the lock */ @@ -73,6 +76,9 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp ompi_osc_pt2pt_sync_wait_expected (lock); } + ompi_osc_pt2pt_peer_set_locked (peer, true); + ompi_osc_pt2pt_peer_set_eager_active (peer, true); + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "local lock aquired")); @@ -81,8 +87,12 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock) { + const int my_rank = ompi_comm_rank (module->comm); + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank); int lock_type = lock->sync.lock.type; + (void) OPAL_THREAD_ADD32(&lock->sync_expected, 1); + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, @@ -98,16 +108,22 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, /* need to ensure we make progress */ opal_progress(); + ompi_osc_pt2pt_peer_set_locked (peer, false); + ompi_osc_pt2pt_peer_set_eager_active (peer, false); + ompi_osc_pt2pt_sync_expected (lock); } -static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock) +int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock) { + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); int lock_type = lock->sync.lock.type; ompi_osc_pt2pt_header_lock_t lock_req; int ret; + (void) OPAL_THREAD_ADD32(&lock->sync_expected, 1); + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); /* generate a lock request */ @@ -128,6 +144,9 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i /* make sure the request gets sent, so we can start eager sending... */ ret = ompi_osc_pt2pt_frag_flush_target (module, target); + if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { + ompi_osc_pt2pt_peer_set_locked (peer, true); + } return ret; } @@ -140,6 +159,8 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_unlock_t unlock_req; int ret; + (void) OPAL_THREAD_ADD32(&lock->sync_expected, 1); + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); unlock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ; @@ -169,6 +190,9 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, return ret; } + ompi_osc_pt2pt_peer_set_locked (peer, false); + ompi_osc_pt2pt_peer_set_eager_active (peer, false); + return ompi_osc_pt2pt_frag_flush_target(module, target); } @@ -179,6 +203,8 @@ static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1); int ret; + (void) OPAL_THREAD_ADD32(&lock->sync_expected, 1); + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ; @@ -218,8 +244,6 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); if (0 == (assert & MPI_MODE_NOCHECK)) { - lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1; - if (my_rank != target && target != -1) { ret = ompi_osc_pt2pt_lock_remote (module, target, lock); } else { @@ -231,19 +255,7 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module return ret; } - if (-1 == target) { - for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { - if (my_rank == i) { - continue; - } - - ret = ompi_osc_pt2pt_lock_remote (module, i, lock); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - return ret; - } - } - - } + /* for lock_all there is nothing more to do. we will lock peer's on demand */ } else { lock->eager_send_active = true; } @@ -312,7 +324,7 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, lock->sync.lock.target = target; lock->sync.lock.type = lock_type; lock->sync.lock.assert = assert; - + lock->num_peers = (-1 == target) ? ompi_comm_size (&module->comm) : 1; lock->sync_expected = 0; /* delay all eager sends until we've heard back.. */ @@ -376,13 +388,13 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) "ompi_osc_pt2pt_unlock_internal: all lock acks received")); if (!(lock->sync.lock.assert & MPI_MODE_NOCHECK)) { - lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1; - if (my_rank != target) { if (-1 == target) { /* send unlock messages to all of my peers */ for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { - if (my_rank == i) { + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, i); + + if (my_rank == i || !ompi_osc_pt2pt_peer_locked (peer)) { continue; } @@ -469,12 +481,6 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ able to eager send before we can transfer all the data... */ ompi_osc_pt2pt_sync_wait_expected (lock); - if (-1 == target) { - lock->sync_expected = ompi_comm_size(module->comm) - 1; - } else { - lock->sync_expected = 1; - } - if (-1 == target) { /* NTH: no local flush */ for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { @@ -488,7 +494,6 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ } } } else { - /* send control message with flush request and count */ ret = ompi_osc_pt2pt_flush_remote (module, target, lock); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { @@ -787,6 +792,7 @@ int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source, void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header) { + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, lock_ack_header->source); ompi_osc_pt2pt_sync_t *lock; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -796,6 +802,8 @@ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module, lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ack_header->lock_ptr; assert (NULL != lock); + ompi_osc_pt2pt_peer_set_eager_active (peer, true); + ompi_osc_pt2pt_sync_expected (lock); } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h index f4e4adcae0..5e1d990515 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h @@ -164,7 +164,9 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync) int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1); if (0 == new_value) { OPAL_THREAD_LOCK(&sync->lock); - sync->eager_send_active = true; + if (!(sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK && sync->num_peers > 1)) { + sync->eager_send_active = true; + } opal_condition_broadcast (&sync->cond); OPAL_THREAD_UNLOCK(&sync->lock); }