1
1

Merge pull request #553 from hjelmn/osc_pt2pt_fixes

osc/pt2pt: fix bugs that caused incorrect fragment counting
Этот коммит содержится в:
Nathan Hjelm 2015-04-24 08:37:59 -06:00
родитель 0afda878a2 f1d09e55ec
Коммит efdc1c37af
5 изменённых файлов: 42 добавлений и 48 удалений

Просмотреть файл

@ -171,8 +171,8 @@ struct ompi_osc_pt2pt_module_t {
received. */ received. */
uint64_t flush_ack_received_count; uint64_t flush_ack_received_count;
/** True if the access epoch is a passive target access epoch */ /** Number of targets locked/being locked */
bool passive_target_access_epoch; unsigned int passive_target_access_epoch;
/** start sending data eagerly */ /** start sending data eagerly */
bool active_eager_send_active; bool active_eager_send_active;

Просмотреть файл

@ -497,13 +497,13 @@ ompi_osc_pt2pt_get_info(struct ompi_win_t *win, struct ompi_info_t **info_used)
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_t, opal_list_item_t, NULL, NULL); OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_t, opal_list_item_t, NULL, NULL);
void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer) static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
{ {
OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t); OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t);
OBJ_CONSTRUCT(&peer->lock, opal_mutex_t); OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
} }
void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer) static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)
{ {
OBJ_DESTRUCT(&peer->queued_frags); OBJ_DESTRUCT(&peer->queued_frags);
OBJ_DESTRUCT(&peer->lock); OBJ_DESTRUCT(&peer->lock);

Просмотреть файл

@ -1633,6 +1633,9 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request)
switch (base_header->type) { switch (base_header->type) {
case OMPI_OSC_PT2PT_HDR_TYPE_FRAG: case OMPI_OSC_PT2PT_HDR_TYPE_FRAG:
process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header); process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header);
/* only data fragments should be included in the completion counters */
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL);
break; break;
case OMPI_OSC_PT2PT_HDR_TYPE_POST: case OMPI_OSC_PT2PT_HDR_TYPE_POST:
(void) osc_pt2pt_incoming_post (module, source); (void) osc_pt2pt_incoming_post (module, source);
@ -1654,12 +1657,6 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request)
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"finished processing incoming messages")); "finished processing incoming messages"));
/* post messages come unbuffered and should NOT increment the incoming completion
* counters */
if (OMPI_OSC_PT2PT_HDR_TYPE_POST != base_header->type) {
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ?
source : MPI_PROC_NULL);
}
osc_pt2pt_gc_clean (module); osc_pt2pt_gc_clean (module);

Просмотреть файл

@ -107,7 +107,7 @@ static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, in
} }
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: flushing active fragment to target. pending: %d", target, "osc pt2pt: flushing active fragment to target %d. pending: %d", target,
active_frag->pending)); active_frag->pending));
if (opal_atomic_cmpset (&module->peers[target].active_frag, active_frag, NULL)) { if (opal_atomic_cmpset (&module->peers[target].active_frag, active_frag, NULL)) {
@ -126,12 +126,12 @@ static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, in
int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target) int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target)
{ {
ompi_osc_pt2pt_peer_t *peer = module->peers + target; ompi_osc_pt2pt_peer_t *peer = module->peers + target;
ompi_osc_pt2pt_frag_t *next, *frag; ompi_osc_pt2pt_frag_t *frag;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush to target target %d. queue fragments: %u", "osc pt2pt: frag flush to target target %d. queue fragments: %lu",
target, opal_list_get_size (&peer->queued_frags))); target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
/* walk through the pending list and send */ /* walk through the pending list and send */
OPAL_THREAD_LOCK(&peer->lock); OPAL_THREAD_LOCK(&peer->lock);
@ -161,7 +161,7 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module) int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
{ {
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
ompi_osc_pt2pt_frag_t *frag, *next; ompi_osc_pt2pt_frag_t *frag;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush all begin")); "osc pt2pt: frag flush all begin"));

Просмотреть файл

@ -54,9 +54,9 @@ struct ompi_osc_pt2pt_outstanding_lock_t {
int target; int target;
int assert; int assert;
bool flushing; bool flushing;
int32_t lock_acks_received; int32_t lock_acks_expected;
int32_t unlock_acks_received; int32_t unlock_acks_expected;
int32_t flush_acks_received; int32_t flush_acks_expected;
uint64_t serial_number; uint64_t serial_number;
int32_t type; int32_t type;
}; };
@ -136,7 +136,7 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp
/* If locking local, can't be non-blocking according to the /* If locking local, can't be non-blocking according to the
standard. We need to wait for the ack here. */ standard. We need to wait for the ack here. */
OPAL_THREAD_LOCK(&module->lock); OPAL_THREAD_LOCK(&module->lock);
while (0 == lock->lock_acks_received) { while (lock->lock_acks_expected) {
opal_condition_wait(&module->cond, &module->lock); opal_condition_wait(&module->cond, &module->lock);
} }
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
@ -163,7 +163,7 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module,
/* need to ensure we make progress */ /* need to ensure we make progress */
opal_progress(); opal_progress();
OPAL_THREAD_ADD32(&lock->unlock_acks_received, 1); OPAL_THREAD_ADD32(&lock->unlock_acks_expected, -1);
} }
static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock) static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock)
@ -255,6 +255,9 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module
int ret; int ret;
if (0 == (assert & MPI_MODE_NOCHECK)) { if (0 == (assert & MPI_MODE_NOCHECK)) {
lock->lock_acks_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;
lock->unlock_acks_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;
if (my_rank != target && target != -1) { if (my_rank != target && target != -1) {
ret = ompi_osc_pt2pt_lock_remote (module, target, lock); ret = ompi_osc_pt2pt_lock_remote (module, target, lock);
} else { } else {
@ -279,12 +282,6 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module
} }
} }
} else {
if (-1 == target) {
lock->lock_acks_received = ompi_comm_size(module->comm);
} else {
lock->lock_acks_received = 1;
}
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -317,8 +314,8 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
} }
lock->target = target; lock->target = target;
lock->lock_acks_received = 0; lock->lock_acks_expected = 0;
lock->unlock_acks_received = 0; lock->unlock_acks_expected = 0;
lock->serial_number = OPAL_THREAD_ADD64((int64_t *) &module->lock_serial_number, 1); lock->serial_number = OPAL_THREAD_ADD64((int64_t *) &module->lock_serial_number, 1);
lock->type = lock_type; lock->type = lock_type;
lock->assert = assert; lock->assert = assert;
@ -340,7 +337,7 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
peer->access_epoch = true; peer->access_epoch = true;
} }
module->passive_target_access_epoch = true; ++module->passive_target_access_epoch;
opal_list_append(&module->outstanding_locks, &lock->super); opal_list_append(&module->outstanding_locks, &lock->super);
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
@ -361,14 +358,10 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
ompi_osc_pt2pt_outstanding_lock_t *lock = NULL; ompi_osc_pt2pt_outstanding_lock_t *lock = NULL;
int my_rank = ompi_comm_rank (module->comm); int my_rank = ompi_comm_rank (module->comm);
ompi_osc_pt2pt_peer_t *peer = NULL; ompi_osc_pt2pt_peer_t *peer = NULL;
int lock_acks_expected;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
if (-1 != target) { if (-1 != target) {
lock_acks_expected = 1;
peer = module->peers + target; peer = module->peers + target;
} else {
lock_acks_expected = ompi_comm_size (module->comm);
} }
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
@ -387,11 +380,11 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
opal_list_remove_item (&module->outstanding_locks, &lock->super); opal_list_remove_item (&module->outstanding_locks, &lock->super);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock_internal: lock acks received: %d, expected: %d", "ompi_osc_pt2pt_unlock_internal: lock acks still expected: %d",
lock->lock_acks_received, lock_acks_expected)); lock->lock_acks_expected));
/* wait until ack has arrived from target */ /* wait until ack has arrived from target */
while (lock->lock_acks_received != lock_acks_expected) { while (lock->lock_acks_expected) {
opal_condition_wait(&module->cond, &module->lock); opal_condition_wait(&module->cond, &module->lock);
} }
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
@ -437,7 +430,7 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
/* wait for unlock acks. this signals remote completion of fragments */ /* wait for unlock acks. this signals remote completion of fragments */
OPAL_THREAD_LOCK(&module->lock); OPAL_THREAD_LOCK(&module->lock);
while (lock->unlock_acks_received != lock_acks_expected) { while (lock->unlock_acks_expected) {
opal_condition_wait(&module->cond, &module->lock); opal_condition_wait(&module->cond, &module->lock);
} }
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
@ -451,11 +444,10 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
OPAL_THREAD_LOCK(&module->lock); OPAL_THREAD_LOCK(&module->lock);
if (-1 != target) { if (-1 != target) {
peer->access_epoch = false; peer->access_epoch = false;
module->passive_target_access_epoch = false;
} else { } else {
module->passive_target_access_epoch = false;
module->all_access_epoch = false; module->all_access_epoch = false;
} }
--module->passive_target_access_epoch;
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
OBJ_RELEASE(lock); OBJ_RELEASE(lock);
@ -508,11 +500,11 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
/* wait until ack has arrived from target, since we need to be /* wait until ack has arrived from target, since we need to be
able to eager send before we can transfer all the data... */ able to eager send before we can transfer all the data... */
OPAL_THREAD_LOCK(&module->lock); OPAL_THREAD_LOCK(&module->lock);
while (peer_count > lock->lock_acks_received && lock->flushing) { while (lock->lock_acks_expected && lock->flushing) {
opal_condition_wait(&module->cond, &module->lock); opal_condition_wait(&module->cond, &module->lock);
} }
lock->flush_acks_received = 0; lock->flush_acks_expected = peer_count;
lock->flushing = true; lock->flushing = true;
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
@ -541,7 +533,7 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
/* wait for all the requests and the flush ack (meaning remote completion) */ /* wait for all the requests and the flush ack (meaning remote completion) */
OPAL_THREAD_LOCK(&module->lock); OPAL_THREAD_LOCK(&module->lock);
while (flush_count != lock->flush_acks_received) { while (lock->flush_acks_expected) {
opal_condition_wait(&module->cond, &module->lock); opal_condition_wait(&module->cond, &module->lock);
} }
@ -710,8 +702,9 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor,
"lock could not be located")); "lock could not be located"));
} }
OPAL_THREAD_ADD32(&lock->lock_acks_received, 1); if (0 == OPAL_THREAD_ADD32(&lock->lock_acks_expected, -1)) {
opal_condition_broadcast (&module->cond); opal_condition_broadcast (&module->cond);
}
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -835,7 +828,9 @@ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
/* no need to hold the lock to set this */ /* no need to hold the lock to set this */
peer->eager_send_active = true; peer->eager_send_active = true;
OPAL_THREAD_ADD32(&lock->lock_acks_received, 1); if (0 == OPAL_THREAD_ADD32(&lock->lock_acks_expected, -1)) {
opal_condition_broadcast(&module->cond);
}
opal_condition_broadcast(&module->cond); opal_condition_broadcast(&module->cond);
} }
@ -845,14 +840,16 @@ void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int sour
ompi_osc_pt2pt_outstanding_lock_t *lock; ompi_osc_pt2pt_outstanding_lock_t *lock;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_process_unlock_ack: processing flush ack from %d for lock %" PRIu64, "ompi_osc_pt2pt_process_flush_ack: processing flush ack from %d for lock %" PRIu64,
source, flush_ack_header->serial_number)); source, flush_ack_header->serial_number));
/* NTH: need to verify that this will work as expected */ /* NTH: need to verify that this will work as expected */
lock = find_outstanding_lock_by_serial (module, flush_ack_header->serial_number); lock = find_outstanding_lock_by_serial (module, flush_ack_header->serial_number);
assert (NULL != lock); assert (NULL != lock);
OPAL_THREAD_ADD32(&lock->flush_acks_received, 1); if (0 == OPAL_THREAD_ADD32(&lock->flush_acks_expected, -1)) {
opal_condition_broadcast(&module->cond);
}
opal_condition_broadcast(&module->cond); opal_condition_broadcast(&module->cond);
} }
@ -873,7 +870,7 @@ void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int sou
peer->eager_send_active = false; peer->eager_send_active = false;
if (0 == OPAL_THREAD_ADD32(&lock->unlock_acks_received, 1)) { if (0 == OPAL_THREAD_ADD32(&lock->unlock_acks_expected, -1)) {
opal_condition_broadcast(&module->cond); opal_condition_broadcast(&module->cond);
} }
} }