1
1

osc/pt2pt: fix threading issues

This commit fixes a number of threading issues discovered in
osc/pt2pt. This includes:

 - Lock the synchronization object not the module in osc_pt2pt_start.
   This fixes a race between the start function and processing post
   messages.

 - Always lock before calling cond_broadcast. Fixes a race between
   the waiting thread and signaling thread.

 - Make all atomically updated values volatile.

 - Make the module lock recursive to protect against some deadlock
   conditions. Will roll this back once the locks have been
   re-designed.

 - Mark incoming complete *after* completing an accumulate not
   before. This was causing an incorrect answer under certain
   conditions.

Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
Этот коммит содержится в:
Nathan Hjelm 2017-01-12 11:41:38 -07:00 коммит произвёл Nathan Hjelm
родитель a9d836bae3
Коммит 362ac8b87e
8 изменённых файлов: 115 добавлений и 92 удалений

Просмотреть файл

@ -118,7 +118,7 @@ struct ompi_osc_pt2pt_peer_t {
opal_list_t queued_frags;
/** number of fragments incomming (negative - expected, positive - unsynchronized) */
int32_t passive_incoming_frag_count;
volatile int32_t passive_incoming_frag_count;
/** peer flags */
volatile int32_t flags;
@ -198,7 +198,7 @@ struct ompi_osc_pt2pt_module_t {
int disp_unit;
/** Mutex lock protecting module data */
opal_mutex_t lock;
opal_recursive_mutex_t lock;
/** condition variable associated with lock */
opal_condition_t cond;
@ -214,19 +214,13 @@ struct ompi_osc_pt2pt_module_t {
uint32_t *epoch_outgoing_frag_count;
/** cyclic counter for a unique tage for long messages. */
uint32_t tag_counter;
volatile uint32_t tag_counter;
/* Number of outgoing fragments that have completed since the
begining of time */
volatile uint32_t outgoing_frag_count;
/* Next outgoing fragment count at which we want a signal on cond */
volatile uint32_t outgoing_frag_signal_count;
/** number of outgoing fragments still to be completed */
volatile int32_t outgoing_frag_count;
/* Number of incoming fragments that have completed since the
begining of time */
volatile uint32_t active_incoming_frag_count;
/* Next incoming buffer count at which we want a signal on cond */
volatile uint32_t active_incoming_frag_signal_count;
/** number of incoming fragments */
volatile int32_t active_incoming_frag_count;
/** Number of targets locked/being locked */
unsigned int passive_target_access_epoch;
@ -239,7 +233,7 @@ struct ompi_osc_pt2pt_module_t {
/** Number of "count" messages from the remote complete group
we've received */
int32_t num_complete_msgs;
volatile int32_t num_complete_msgs;
/* ********************* LOCK data ************************ */
@ -264,7 +258,12 @@ struct ompi_osc_pt2pt_module_t {
/* enforce accumulate semantics */
opal_atomic_lock_t accumulate_lock;
opal_list_t pending_acc;
/** accumulate operations pending the accumulation lock */
opal_list_t pending_acc;
/** lock for pending_acc */
opal_mutex_t pending_acc_lock;
/** Lock for garbage collection lists */
opal_mutex_t gc_lock;
@ -512,23 +511,29 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module);
*/
static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, int source)
{
int32_t new_value;
if (MPI_PROC_NULL == source) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"mark_incoming_completion marking active incoming complete. count = %d. signal = %d",
(int) module->active_incoming_frag_count + 1, module->active_incoming_frag_signal_count));
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_count, 1);
if (module->active_incoming_frag_count >= module->active_incoming_frag_signal_count) {
"mark_incoming_completion marking active incoming complete. module %p, count = %d",
(void *) module, (int) module->active_incoming_frag_count + 1));
new_value = OPAL_THREAD_ADD32(&module->active_incoming_frag_count, 1);
if (new_value >= 0) {
OPAL_THREAD_LOCK(&module->lock);
opal_condition_broadcast(&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
}
} else {
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"mark_incoming_completion marking passive incoming complete. source = %d, count = %d",
source, (int) peer->passive_incoming_frag_count + 1));
OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1);
if (0 == peer->passive_incoming_frag_count) {
"mark_incoming_completion marking passive incoming complete. module %p, source = %d, count = %d",
(void *) module, source, (int) peer->passive_incoming_frag_count + 1));
new_value = OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1);
if (0 == new_value) {
OPAL_THREAD_LOCK(&module->lock);
opal_condition_broadcast(&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
}
}
}
@ -548,9 +553,13 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in
*/
static inline void mark_outgoing_completion (ompi_osc_pt2pt_module_t *module)
{
OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, 1);
if (module->outgoing_frag_count >= module->outgoing_frag_signal_count) {
int32_t new_value = OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, 1);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"mark_outgoing_completion: outgoing_frag_count = %d", new_value));
if (new_value >= 0) {
OPAL_THREAD_LOCK(&module->lock);
opal_condition_broadcast(&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
}
}
@ -568,7 +577,7 @@ static inline void mark_outgoing_completion (ompi_osc_pt2pt_module_t *module)
*/
static inline void ompi_osc_signal_outgoing (ompi_osc_pt2pt_module_t *module, int target, int count)
{
OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_signal_count, count);
OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, -count);
if (MPI_PROC_NULL != target) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_signal_outgoing_passive: target = %d, count = %d, total = %d", target,

Просмотреть файл

@ -168,7 +168,6 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
MPI_SUM, module->comm,
module->comm->c_coll.coll_reduce_scatter_block_module);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
@ -181,11 +180,10 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
incoming_reqs));
/* set our complete condition for incoming requests */
module->active_incoming_frag_signal_count += incoming_reqs;
OPAL_THREAD_ADD32(&module->active_incoming_frag_count, -incoming_reqs);
/* wait for completion */
while (module->outgoing_frag_count != module->outgoing_frag_signal_count ||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
while (module->outgoing_frag_count < 0 || module->active_incoming_frag_count < 0) {
opal_condition_wait(&module->cond, &module->lock);
}
@ -196,10 +194,10 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
}
module->all_sync.epoch_active = false;
opal_condition_broadcast (&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
module->comm->c_coll.coll_barrier (module->comm, module->comm->c_coll.coll_barrier_module);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc pt2pt: fence end: %d", ret));
@ -212,11 +210,11 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
OPAL_THREAD_LOCK(&module->lock);
OPAL_THREAD_LOCK(&sync->lock);
/* check if we are already in an access epoch */
if (ompi_osc_pt2pt_access_epoch_active (module)) {
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_UNLOCK(&sync->lock);
return OMPI_ERR_RMA_SYNC;
}
@ -249,7 +247,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
if (0 == ompi_group_size (group)) {
/* nothing more to do. this is an empty start epoch */
sync->eager_send_active = true;
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_UNLOCK(&sync->lock);
return OMPI_SUCCESS;
}
@ -258,12 +256,11 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
/* translate the group ranks into the communicator */
sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group);
if (NULL == sync->peer_list.peers) {
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_UNLOCK(&sync->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
if (!(assert & MPI_MODE_NOCHECK)) {
OPAL_THREAD_LOCK(&sync->lock);
for (int i = 0 ; i < sync->num_peers ; ++i) {
ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];
@ -276,7 +273,6 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
ompi_osc_pt2pt_peer_set_unex (peer, false);
}
}
OPAL_THREAD_UNLOCK(&sync->lock);
} else {
sync->sync_expected = 0;
}
@ -295,7 +291,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
"ompi_osc_pt2pt_start complete. eager sends active: %d",
sync->eager_send_active));
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_UNLOCK(&sync->lock);
return OMPI_SUCCESS;
}
@ -313,14 +309,14 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win)
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_complete entering..."));
OPAL_THREAD_LOCK(&module->lock);
OPAL_THREAD_LOCK(&sync->lock);
if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != sync->type) {
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_UNLOCK(&sync->lock);
return OMPI_ERR_RMA_SYNC;
}
/* wait for all the post messages */
ompi_osc_pt2pt_sync_wait (sync);
ompi_osc_pt2pt_sync_wait_nolock (sync);
/* phase 1 cleanup sync object */
group = sync->sync.pscw.group;
@ -330,8 +326,7 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win)
/* need to reset the sync here to avoid processing incorrect post messages */
ompi_osc_pt2pt_sync_reset (sync);
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_UNLOCK(&sync->lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_complete all posts received. sending complete messages..."));
@ -403,7 +398,7 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win)
OPAL_THREAD_LOCK(&module->lock);
/* wait for outgoing requests to complete. Don't wait for incoming, as
we're only completing the access epoch, not the exposure epoch */
while (module->outgoing_frag_count != module->outgoing_frag_signal_count) {
while (module->outgoing_frag_count < 0) {
opal_condition_wait(&module->cond, &module->lock);
}
@ -513,15 +508,13 @@ int ompi_osc_pt2pt_wait (ompi_win_t *win)
}
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_wait entering..."));
"ompi_osc_pt2pt_wait entering... module %p", (void *) module));
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_complete_msgs ||
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "num_complete_msgs = %d, "
"active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d",
module->num_complete_msgs, module->active_incoming_frag_count,
module->active_incoming_frag_signal_count));
while (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "module %p, num_complete_msgs = %d, "
"active_incoming_frag_count = %d", (void *) module, module->num_complete_msgs,
module->active_incoming_frag_count));
opal_condition_wait(&module->cond, &module->lock);
}
@ -554,21 +547,15 @@ int ompi_osc_pt2pt_test (ompi_win_t *win, int *flag)
OPAL_THREAD_LOCK(&(module->lock));
if (0 != module->num_complete_msgs ||
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
if (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
*flag = 0;
ret = OMPI_SUCCESS;
} else {
*flag = 1;
group = module->pw_group;
module->pw_group = NULL;
OPAL_THREAD_UNLOCK(&(module->lock));
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&(module->lock));
@ -580,15 +567,19 @@ void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, i
{
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: process_complete got complete message from %d. expected fragment count %d. "
"current signal count %d. current incomming count: %d. expected complete msgs: %d",
source, frag_count, module->active_incoming_frag_signal_count,
module->active_incoming_frag_count, module->num_complete_msgs));
"current incomming count: %d. expected complete msgs: %d", source,
frag_count, module->active_incoming_frag_count, module->num_complete_msgs));
/* the current fragment is not part of the frag_count so we need to add it here */
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count, frag_count);
OPAL_THREAD_ADD32(&module->active_incoming_frag_count, -frag_count);
if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) {
/* make sure the signal count is written before changing the complete message count */
opal_atomic_wmb ();
if (0 == OPAL_THREAD_ADD32(&module->num_complete_msgs, 1)) {
OPAL_THREAD_LOCK(&module->lock);
opal_condition_broadcast (&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
}
}

Просмотреть файл

@ -501,6 +501,9 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
is_long_msg = true;
tag = get_tag (module);
} else {
/* still need to set the tag for the active/passive logic on the target */
tag = !!(module->passive_target_access_epoch);
}
if (is_long_msg) {
@ -523,6 +526,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
header->count = target_count;
header->displacement = target_disp;
header->op = op->o_f_to_c_index;
header->tag = tag;
ptr += sizeof (*header);
do {
@ -565,7 +569,6 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
}
} else {
header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG;
header->tag = tag;
osc_pt2pt_hton(header, proc);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,

Просмотреть файл

@ -314,12 +314,13 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
sizeof(ompi_osc_base_module_t));
/* initialize the objects, so that always free in cleanup */
OBJ_CONSTRUCT(&module->lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->lock, opal_recursive_mutex_t);
OBJ_CONSTRUCT(&module->cond, opal_condition_t);
OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
OBJ_CONSTRUCT(&module->pending_acc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2017 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -102,6 +102,7 @@ struct osc_pt2pt_pending_acc_t {
void *data;
size_t data_len;
ompi_datatype_t *datatype;
bool active_target;
};
typedef struct osc_pt2pt_pending_acc_t osc_pt2pt_pending_acc_t;
@ -666,8 +667,6 @@ static int accumulate_cb (ompi_request_t *request)
rank = acc_data->peer;
}
mark_incoming_completion (module, rank);
if (0 == OPAL_THREAD_ADD32(&acc_data->request_count, -1)) {
/* no more requests needed before the buffer can be accumulated */
@ -693,13 +692,15 @@ static int accumulate_cb (ompi_request_t *request)
osc_pt2pt_gc_add_buffer (module, &acc_data->super);
}
mark_incoming_completion (module, rank);
ompi_request_free (&request);
return ret;
}
static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source,
char *data, size_t data_len, ompi_datatype_t *datatype)
char *data, size_t data_len, ompi_datatype_t *datatype, bool active_target)
{
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
osc_pt2pt_pending_acc_t *pending_acc;
@ -714,8 +715,13 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os
/* NTH: ensure we don't leave wait/process_flush/etc until this
* accumulate operation is complete. */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
if (active_target) {
OPAL_THREAD_ADD32(&module->active_incoming_frag_count, -1);
} else {
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
}
pending_acc->active_target = active_target;
pending_acc->source = source;
/* save any inline data (eager acc, gacc only) */
@ -747,7 +753,7 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os
}
/* add to the pending acc queue */
OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super));
OPAL_THREAD_SCOPED_LOCK(&module->pending_acc_lock, opal_list_append (&module->pending_acc, &pending_acc->super));
return OMPI_SUCCESS;
}
@ -1090,7 +1096,9 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
return OMPI_SUCCESS;
}
OPAL_THREAD_LOCK(&module->pending_acc_lock);
pending_acc = (osc_pt2pt_pending_acc_t *) opal_list_remove_first (&module->pending_acc);
OPAL_THREAD_UNLOCK(&module->pending_acc_lock);
if (OPAL_UNLIKELY(NULL == pending_acc)) {
/* called without any pending accumulation operations */
ompi_osc_pt2pt_accumulate_unlock (module);
@ -1127,7 +1135,7 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
}
/* signal that an operation is complete */
mark_incoming_completion (module, pending_acc->source);
mark_incoming_completion (module, pending_acc->active_target ? MPI_PROC_NULL : pending_acc->source);
pending_acc->data = NULL;
OBJ_RELEASE(pending_acc);
@ -1138,6 +1146,7 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_acc_t *acc_header)
{
bool active_target = !(acc_header->tag & 0x1);
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
uint64_t data_len;
@ -1162,7 +1171,7 @@ static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source,
} else {
/* couldn't aquire the accumulate lock so queue up the accumulate operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
source, data, data_len, datatype);
source, data, data_len, datatype, active_target);
}
/* Release datatype & op */
@ -1174,6 +1183,7 @@ static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source,
static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source,
ompi_osc_pt2pt_header_acc_t* acc_header)
{
bool active_target = !(acc_header->tag & 0x1);
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
int ret;
@ -1193,7 +1203,7 @@ static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source,
} else {
/* queue the operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source,
NULL, 0, datatype);
NULL, 0, datatype, active_target);
}
/* Release datatype & op */
@ -1205,6 +1215,7 @@ static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source,
static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_acc_t *acc_header)
{
bool active_target = !(acc_header->tag & 0x1);
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
void *buffer = NULL;
@ -1246,7 +1257,7 @@ static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source,
} else {
/* queue the operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
source, data, data_len, datatype);
source, data, data_len, datatype, active_target);
}
/* Release datatype & op */
@ -1258,6 +1269,7 @@ static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source,
static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_acc_t *acc_header)
{
bool active_target = !(acc_header->tag & 0x1);
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
int ret;
@ -1277,7 +1289,7 @@ static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int sour
} else {
/* queue the operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
source, NULL, 0, datatype);
source, NULL, 0, datatype, active_target);
}
/* Release datatype & op */
@ -1290,6 +1302,7 @@ static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int sour
static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_cswap_t *cswap_header)
{
bool active_target = !(cswap_header->tag & 0x1);
char *data = (char*) (cswap_header + 1);
struct ompi_datatype_t *datatype;
int ret;
@ -1309,7 +1322,7 @@ static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source,
} else {
/* queue the operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) cswap_header, source,
data, 2 * datatype->super.size, datatype);
data, 2 * datatype->super.size, datatype, active_target);
}
/* Release datatype */
@ -1392,7 +1405,7 @@ static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
osc_pt2pt_add_pending (pending);
}
/* signal incomming will increment this counter */
/* signal incoming will increment this counter */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
return sizeof (*unlock_header);

Просмотреть файл

@ -77,6 +77,7 @@ int ompi_osc_pt2pt_free(ompi_win_t *win)
/* it is erroneous to close a window with active operations on it so we should
* probably produce an error here instead of cleaning up */
OPAL_LIST_DESTRUCT(&module->pending_acc);
OBJ_DESTRUCT(&module->pending_acc_lock);
osc_pt2pt_gc_clean (module);
OPAL_LIST_DESTRUCT(&module->buffer_gc);

Просмотреть файл

@ -50,7 +50,7 @@ typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t;
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t,
NULL, NULL);
static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module);
static int ompi_osc_pt2pt_activate_next_lock (ompi_osc_pt2pt_module_t *module);
static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, int lock_type, uint64_t lock_ptr);
static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock,
int target);
@ -100,9 +100,9 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module,
if (MPI_LOCK_EXCLUSIVE == lock_type) {
OPAL_THREAD_ADD32(&module->lock_status, 1);
ompi_osc_activate_next_lock (module);
ompi_osc_pt2pt_activate_next_lock (module);
} else if (0 == OPAL_THREAD_ADD32(&module->lock_status, -1)) {
ompi_osc_activate_next_lock (module);
ompi_osc_pt2pt_activate_next_lock (module);
}
/* need to ensure we make progress */
@ -385,10 +385,10 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock_internal: lock acks still expected: %d",
lock->sync_expected));
OPAL_THREAD_UNLOCK(&module->lock);
/* wait until ack has arrived from target */
ompi_osc_pt2pt_sync_wait_expected (lock);
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock_internal: all lock acks received"));
@ -426,7 +426,7 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
* So make sure to wait for all of the fragments to arrive.
*/
OPAL_THREAD_LOCK(&module->lock);
while (module->outgoing_frag_count < module->outgoing_frag_signal_count) {
while (module->outgoing_frag_count < 0) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
@ -628,7 +628,7 @@ int ompi_osc_pt2pt_flush_local (int target, struct ompi_win_t *win)
/* wait for all the requests */
OPAL_THREAD_LOCK(&module->lock);
while (module->outgoing_frag_count != module->outgoing_frag_signal_count) {
while (module->outgoing_frag_count < 0) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
@ -654,7 +654,7 @@ int ompi_osc_pt2pt_flush_local_all (struct ompi_win_t *win)
/* wait for all the requests */
OPAL_THREAD_LOCK(&module->lock);
while (module->outgoing_frag_count != module->outgoing_frag_signal_count) {
while (module->outgoing_frag_count < 0) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
@ -758,7 +758,7 @@ static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, in
return true;
}
static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module) {
static int ompi_osc_pt2pt_activate_next_lock (ompi_osc_pt2pt_module_t *module) {
/* release any other pending locks we can */
ompi_osc_pt2pt_pending_lock_t *pending_lock, *next;
int ret = OMPI_SUCCESS;
@ -903,9 +903,9 @@ int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source,
if (-1 == module->lock_status) {
OPAL_THREAD_ADD32(&module->lock_status, 1);
ompi_osc_activate_next_lock (module);
ompi_osc_pt2pt_activate_next_lock (module);
} else if (0 == OPAL_THREAD_ADD32(&module->lock_status, -1)) {
ompi_osc_activate_next_lock (module);
ompi_osc_pt2pt_activate_next_lock (module);
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,

Просмотреть файл

@ -127,20 +127,25 @@ bool ompi_osc_pt2pt_sync_pscw_peer (struct ompi_osc_pt2pt_module_t *module, int
/**
* Wait for all remote peers in the synchronization to respond
*/
static inline void ompi_osc_pt2pt_sync_wait (ompi_osc_pt2pt_sync_t *sync)
static inline void ompi_osc_pt2pt_sync_wait_nolock (ompi_osc_pt2pt_sync_t *sync)
{
OPAL_THREAD_LOCK(&sync->lock);
while (!sync->eager_send_active) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for access epoch to start"));
opal_condition_wait(&sync->cond, &sync->lock);
}
OPAL_THREAD_UNLOCK(&sync->lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"access epoch ready"));
}
static inline void ompi_osc_pt2pt_sync_wait (ompi_osc_pt2pt_sync_t *sync)
{
OPAL_THREAD_LOCK(&sync->lock);
ompi_osc_pt2pt_sync_wait_nolock (sync);
OPAL_THREAD_UNLOCK(&sync->lock);
}
/**
* Wait for all remote peers in the synchronization to respond
*/