osc/pt2pt: make lock_all locking on-demand
The original lock_all algorithm in osc/pt2pt sent a lock message to each peer in the communicator even if the peer is never the target of an operation. Since this scales very poorly the implementation has been replaced by one that locks the remote peer on first communication after a call to MPI_Win_lock_all. Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
Этот коммит содержится в:
родитель
7589a25377
Коммит
9444df1eb7
@ -91,6 +91,15 @@ struct ompi_osc_pt2pt_component_t {
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
|
||||
|
||||
enum {
|
||||
/** peer has sent an unexpected post message (no matching start) */
|
||||
OMPI_OSC_PT2PT_PEER_FLAG_UNEX = 1,
|
||||
/** eager sends are active on this peer */
|
||||
OMPI_OSC_PT2PT_PEER_FLAG_EAGER = 2,
|
||||
/** peer has been locked (on-demand locking for lock_all) */
|
||||
OMPI_OSC_PT2PT_PEER_FLAG_LOCK = 4,
|
||||
};
|
||||
|
||||
|
||||
struct ompi_osc_pt2pt_peer_t {
|
||||
/** make this an opal object */
|
||||
@ -111,13 +120,54 @@ struct ompi_osc_pt2pt_peer_t {
|
||||
/** number of fragments incomming (negative - expected, positive - unsynchronized) */
|
||||
int32_t passive_incoming_frag_count;
|
||||
|
||||
/** unexpected post message arrived */
|
||||
bool unexpected_post;
|
||||
/** peer flags */
|
||||
int32_t flags;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);
|
||||
|
||||
static inline bool ompi_osc_pt2pt_peer_locked (ompi_osc_pt2pt_peer_t *peer)
|
||||
{
|
||||
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_LOCK);
|
||||
}
|
||||
|
||||
static inline bool ompi_osc_pt2pt_peer_unex (ompi_osc_pt2pt_peer_t *peer)
|
||||
{
|
||||
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_UNEX);
|
||||
}
|
||||
|
||||
static inline bool ompi_osc_pt2pt_peer_eager_active (ompi_osc_pt2pt_peer_t *peer)
|
||||
{
|
||||
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER);
|
||||
}
|
||||
|
||||
static inline void ompi_osc_pt2pt_peer_set_flag (ompi_osc_pt2pt_peer_t *peer, int32_t flag, bool value)
|
||||
{
|
||||
if (value) {
|
||||
peer->flags |= flag;
|
||||
} else {
|
||||
peer->flags &= ~flag;
|
||||
}
|
||||
}
|
||||
|
||||
static inline bool ompi_osc_pt2pt_peer_set_locked (ompi_osc_pt2pt_peer_t *peer, bool value)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_LOCK, value);
|
||||
}
|
||||
|
||||
static inline bool ompi_osc_pt2pt_peer_set_unex (ompi_osc_pt2pt_peer_t *peer, bool value)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_UNEX, value);
|
||||
}
|
||||
|
||||
static inline bool ompi_osc_pt2pt_peer_set_eager_active (ompi_osc_pt2pt_peer_t *peer, bool value)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_EAGER, value);
|
||||
}
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);
|
||||
|
||||
/** Module structure. Exactly one of these is associated with each
|
||||
PT2PT window */
|
||||
struct ompi_osc_pt2pt_module_t {
|
||||
@ -431,6 +481,8 @@ int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module,
|
||||
int tag,
|
||||
struct ompi_communicator_t *comm);
|
||||
|
||||
int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock);
|
||||
|
||||
/**
|
||||
* ompi_osc_pt2pt_progress_pending_acc:
|
||||
*
|
||||
@ -845,6 +897,12 @@ static inline void ompi_osc_pt2pt_module_lock_remove (struct ompi_osc_pt2pt_modu
|
||||
static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc_pt2pt_module_t *module, int target,
|
||||
struct ompi_osc_pt2pt_peer_t **peer)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *tmp;
|
||||
|
||||
if (NULL == peer) {
|
||||
peer = &tmp;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc/pt2pt: looking for synchronization object for target %d", target));
|
||||
|
||||
@ -862,8 +920,9 @@ static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc
|
||||
|
||||
/* fence epoch is now active */
|
||||
module->all_sync.epoch_active = true;
|
||||
if (peer) {
|
||||
*peer = ompi_osc_pt2pt_peer_lookup (module, target);
|
||||
*peer = ompi_osc_pt2pt_peer_lookup (module, target);
|
||||
if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type && !ompi_osc_pt2pt_peer_locked (*peer)) {
|
||||
(void) ompi_osc_pt2pt_lock_remote (module, target, &module->all_sync);
|
||||
}
|
||||
|
||||
return &module->all_sync;
|
||||
|
@ -261,13 +261,13 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
|
||||
for (int i = 0 ; i < sync->num_peers ; ++i) {
|
||||
ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];
|
||||
|
||||
if (peer->unexpected_post) {
|
||||
if (ompi_osc_pt2pt_peer_unex (peer)) {
|
||||
/* the peer already sent a post message for this pscw access epoch */
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"found unexpected post from %d",
|
||||
peer->rank));
|
||||
OPAL_THREAD_ADD32 (&sync->sync_expected, -1);
|
||||
peer->unexpected_post = false;
|
||||
ompi_osc_pt2pt_peer_set_unex (peer, false);
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&sync->lock);
|
||||
@ -600,7 +600,7 @@ void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
|
||||
"received unexpected post message from %d for future PSCW synchronization",
|
||||
source));
|
||||
|
||||
peer->unexpected_post = true;
|
||||
ompi_osc_pt2pt_peer_set_unex (peer, true);
|
||||
OPAL_THREAD_UNLOCK(&sync->lock);
|
||||
} else {
|
||||
OPAL_THREAD_UNLOCK(&sync->lock);
|
||||
|
@ -492,7 +492,7 @@ static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
|
||||
OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
|
||||
peer->active_frag = NULL;
|
||||
peer->passive_incoming_frag_count = 0;
|
||||
peer->unexpected_post = false;
|
||||
peer->flags = 0;
|
||||
}
|
||||
|
||||
static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)
|
||||
|
@ -58,11 +58,14 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
|
||||
static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
|
||||
{
|
||||
const int my_rank = ompi_comm_rank (module->comm);
|
||||
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank);
|
||||
int lock_type = lock->sync.lock.type;
|
||||
bool acquired = false;
|
||||
|
||||
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
|
||||
|
||||
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
|
||||
|
||||
acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock);
|
||||
if (!acquired) {
|
||||
/* queue the lock */
|
||||
@ -73,6 +76,9 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp
|
||||
ompi_osc_pt2pt_sync_wait_expected (lock);
|
||||
}
|
||||
|
||||
ompi_osc_pt2pt_peer_set_locked (peer, true);
|
||||
ompi_osc_pt2pt_peer_set_eager_active (peer, true);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
"local lock aquired"));
|
||||
|
||||
@ -81,8 +87,12 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp
|
||||
|
||||
static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
|
||||
{
|
||||
const int my_rank = ompi_comm_rank (module->comm);
|
||||
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, my_rank);
|
||||
int lock_type = lock->sync.lock.type;
|
||||
|
||||
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
|
||||
|
||||
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
@ -98,16 +108,22 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module,
|
||||
/* need to ensure we make progress */
|
||||
opal_progress();
|
||||
|
||||
ompi_osc_pt2pt_peer_set_locked (peer, false);
|
||||
ompi_osc_pt2pt_peer_set_eager_active (peer, false);
|
||||
|
||||
ompi_osc_pt2pt_sync_expected (lock);
|
||||
}
|
||||
|
||||
static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
|
||||
int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
|
||||
int lock_type = lock->sync.lock.type;
|
||||
ompi_osc_pt2pt_header_lock_t lock_req;
|
||||
|
||||
int ret;
|
||||
|
||||
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
|
||||
|
||||
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
|
||||
|
||||
/* generate a lock request */
|
||||
@ -128,6 +144,9 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i
|
||||
|
||||
/* make sure the request gets sent, so we can start eager sending... */
|
||||
ret = ompi_osc_pt2pt_frag_flush_target (module, target);
|
||||
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
|
||||
ompi_osc_pt2pt_peer_set_locked (peer, true);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -140,6 +159,8 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_header_unlock_t unlock_req;
|
||||
int ret;
|
||||
|
||||
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
|
||||
|
||||
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
|
||||
|
||||
unlock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ;
|
||||
@ -169,6 +190,9 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module,
|
||||
return ret;
|
||||
}
|
||||
|
||||
ompi_osc_pt2pt_peer_set_locked (peer, false);
|
||||
ompi_osc_pt2pt_peer_set_eager_active (peer, false);
|
||||
|
||||
return ompi_osc_pt2pt_frag_flush_target(module, target);
|
||||
}
|
||||
|
||||
@ -179,6 +203,8 @@ static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module,
|
||||
int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1);
|
||||
int ret;
|
||||
|
||||
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
|
||||
|
||||
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
|
||||
|
||||
flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ;
|
||||
@ -218,8 +244,6 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module
|
||||
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
|
||||
|
||||
if (0 == (assert & MPI_MODE_NOCHECK)) {
|
||||
lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;
|
||||
|
||||
if (my_rank != target && target != -1) {
|
||||
ret = ompi_osc_pt2pt_lock_remote (module, target, lock);
|
||||
} else {
|
||||
@ -231,19 +255,7 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (-1 == target) {
|
||||
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
|
||||
if (my_rank == i) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ret = ompi_osc_pt2pt_lock_remote (module, i, lock);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
/* for lock_all there is nothing more to do. we will lock peer's on demand */
|
||||
} else {
|
||||
lock->eager_send_active = true;
|
||||
}
|
||||
@ -312,7 +324,7 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
|
||||
lock->sync.lock.target = target;
|
||||
lock->sync.lock.type = lock_type;
|
||||
lock->sync.lock.assert = assert;
|
||||
|
||||
lock->num_peers = (-1 == target) ? ompi_comm_size (&module->comm) : 1;
|
||||
lock->sync_expected = 0;
|
||||
|
||||
/* delay all eager sends until we've heard back.. */
|
||||
@ -376,13 +388,13 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
|
||||
"ompi_osc_pt2pt_unlock_internal: all lock acks received"));
|
||||
|
||||
if (!(lock->sync.lock.assert & MPI_MODE_NOCHECK)) {
|
||||
lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;
|
||||
|
||||
if (my_rank != target) {
|
||||
if (-1 == target) {
|
||||
/* send unlock messages to all of my peers */
|
||||
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
|
||||
if (my_rank == i) {
|
||||
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, i);
|
||||
|
||||
if (my_rank == i || !ompi_osc_pt2pt_peer_locked (peer)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -469,12 +481,6 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
|
||||
able to eager send before we can transfer all the data... */
|
||||
ompi_osc_pt2pt_sync_wait_expected (lock);
|
||||
|
||||
if (-1 == target) {
|
||||
lock->sync_expected = ompi_comm_size(module->comm) - 1;
|
||||
} else {
|
||||
lock->sync_expected = 1;
|
||||
}
|
||||
|
||||
if (-1 == target) {
|
||||
/* NTH: no local flush */
|
||||
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
|
||||
@ -488,7 +494,6 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
/* send control message with flush request and count */
|
||||
ret = ompi_osc_pt2pt_flush_remote (module, target, lock);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
@ -787,6 +792,7 @@ int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source,
|
||||
void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, lock_ack_header->source);
|
||||
ompi_osc_pt2pt_sync_t *lock;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
@ -796,6 +802,8 @@ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
|
||||
lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ack_header->lock_ptr;
|
||||
assert (NULL != lock);
|
||||
|
||||
ompi_osc_pt2pt_peer_set_eager_active (peer, true);
|
||||
|
||||
ompi_osc_pt2pt_sync_expected (lock);
|
||||
}
|
||||
|
||||
|
@ -164,7 +164,9 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync)
|
||||
int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1);
|
||||
if (0 == new_value) {
|
||||
OPAL_THREAD_LOCK(&sync->lock);
|
||||
sync->eager_send_active = true;
|
||||
if (!(sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK && sync->num_peers > 1)) {
|
||||
sync->eager_send_active = true;
|
||||
}
|
||||
opal_condition_broadcast (&sync->cond);
|
||||
OPAL_THREAD_UNLOCK(&sync->lock);
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user