diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index bbb35f5562..5901aa2e1a 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -118,7 +118,7 @@ struct ompi_osc_pt2pt_peer_t { opal_list_t queued_frags; /** number of fragments incomming (negative - expected, positive - unsynchronized) */ - int32_t passive_incoming_frag_count; + volatile int32_t passive_incoming_frag_count; /** peer flags */ volatile int32_t flags; @@ -198,7 +198,7 @@ struct ompi_osc_pt2pt_module_t { int disp_unit; /** Mutex lock protecting module data */ - opal_mutex_t lock; + opal_recursive_mutex_t lock; /** condition variable associated with lock */ opal_condition_t cond; @@ -214,19 +214,13 @@ struct ompi_osc_pt2pt_module_t { uint32_t *epoch_outgoing_frag_count; /** cyclic counter for a unique tage for long messages. */ - uint32_t tag_counter; + volatile uint32_t tag_counter; - /* Number of outgoing fragments that have completed since the - begining of time */ - volatile uint32_t outgoing_frag_count; - /* Next outgoing fragment count at which we want a signal on cond */ - volatile uint32_t outgoing_frag_signal_count; + /** number of outgoing fragments still to be completed */ + volatile int32_t outgoing_frag_count; - /* Number of incoming fragments that have completed since the - begining of time */ - volatile uint32_t active_incoming_frag_count; - /* Next incoming buffer count at which we want a signal on cond */ - volatile uint32_t active_incoming_frag_signal_count; + /** number of incoming fragments */ + volatile int32_t active_incoming_frag_count; /** Number of targets locked/being locked */ unsigned int passive_target_access_epoch; @@ -239,7 +233,7 @@ struct ompi_osc_pt2pt_module_t { /** Number of "count" messages from the remote complete group we've received */ - int32_t num_complete_msgs; + volatile int32_t num_complete_msgs; /* ********************* LOCK data ************************ */ @@ -264,7 +258,12 @@ struct ompi_osc_pt2pt_module_t { /* enforce accumulate semantics */ opal_atomic_lock_t accumulate_lock; - opal_list_t pending_acc; + + /** accumulate operations pending the accumulation lock */ + opal_list_t pending_acc; + + /** lock for pending_acc */ + opal_mutex_t pending_acc_lock; /** Lock for garbage collection lists */ opal_mutex_t gc_lock; @@ -512,23 +511,29 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module); */ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, int source) { + int32_t new_value; + if (MPI_PROC_NULL == source) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "mark_incoming_completion marking active incoming complete. count = %d. signal = %d", - (int) module->active_incoming_frag_count + 1, module->active_incoming_frag_signal_count)); - OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_count, 1); - if (module->active_incoming_frag_count >= module->active_incoming_frag_signal_count) { + "mark_incoming_completion marking active incoming complete. module %p, count = %d", + (void *) module, (int) module->active_incoming_frag_count + 1)); + new_value = OPAL_THREAD_ADD32(&module->active_incoming_frag_count, 1); + if (new_value >= 0) { + OPAL_THREAD_LOCK(&module->lock); opal_condition_broadcast(&module->cond); + OPAL_THREAD_UNLOCK(&module->lock); } } else { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "mark_incoming_completion marking passive incoming complete. source = %d, count = %d", - source, (int) peer->passive_incoming_frag_count + 1)); - OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1); - if (0 == peer->passive_incoming_frag_count) { + "mark_incoming_completion marking passive incoming complete. module %p, source = %d, count = %d", + (void *) module, source, (int) peer->passive_incoming_frag_count + 1)); + new_value = OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1); + if (0 == new_value) { + OPAL_THREAD_LOCK(&module->lock); opal_condition_broadcast(&module->cond); + OPAL_THREAD_UNLOCK(&module->lock); } } } @@ -548,9 +553,13 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in */ static inline void mark_outgoing_completion (ompi_osc_pt2pt_module_t *module) { - OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, 1); - if (module->outgoing_frag_count >= module->outgoing_frag_signal_count) { + int32_t new_value = OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, 1); + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "mark_outgoing_completion: outgoing_frag_count = %d", new_value)); + if (new_value >= 0) { + OPAL_THREAD_LOCK(&module->lock); opal_condition_broadcast(&module->cond); + OPAL_THREAD_UNLOCK(&module->lock); } } @@ -568,7 +577,7 @@ static inline void mark_outgoing_completion (ompi_osc_pt2pt_module_t *module) */ static inline void ompi_osc_signal_outgoing (ompi_osc_pt2pt_module_t *module, int target, int count) { - OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_signal_count, count); + OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, -count); if (MPI_PROC_NULL != target) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_signal_outgoing_passive: target = %d, count = %d, total = %d", target, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index 55917ca65a..d1cb69bc49 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -168,7 +168,6 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) MPI_SUM, module->comm, module->comm->c_coll.coll_reduce_scatter_block_module); if (OMPI_SUCCESS != ret) { - OPAL_THREAD_UNLOCK(&module->lock); return ret; } @@ -181,11 +180,10 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) incoming_reqs)); /* set our complete condition for incoming requests */ - module->active_incoming_frag_signal_count += incoming_reqs; + OPAL_THREAD_ADD32(&module->active_incoming_frag_count, -incoming_reqs); /* wait for completion */ - while (module->outgoing_frag_count != module->outgoing_frag_signal_count || - module->active_incoming_frag_count < module->active_incoming_frag_signal_count) { + while (module->outgoing_frag_count < 0 || module->active_incoming_frag_count < 0) { opal_condition_wait(&module->cond, &module->lock); } @@ -196,10 +194,10 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) } module->all_sync.epoch_active = false; - - opal_condition_broadcast (&module->cond); OPAL_THREAD_UNLOCK(&module->lock); + module->comm->c_coll.coll_barrier (module->comm, module->comm->c_coll.coll_barrier_module); + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: fence end: %d", ret)); @@ -212,11 +210,11 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_osc_pt2pt_sync_t *sync = &module->all_sync; - OPAL_THREAD_LOCK(&module->lock); + OPAL_THREAD_LOCK(&sync->lock); /* check if we are already in an access epoch */ if (ompi_osc_pt2pt_access_epoch_active (module)) { - OPAL_THREAD_UNLOCK(&module->lock); + OPAL_THREAD_UNLOCK(&sync->lock); return OMPI_ERR_RMA_SYNC; } @@ -249,7 +247,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) if (0 == ompi_group_size (group)) { /* nothing more to do. this is an empty start epoch */ sync->eager_send_active = true; - OPAL_THREAD_UNLOCK(&module->lock); + OPAL_THREAD_UNLOCK(&sync->lock); return OMPI_SUCCESS; } @@ -258,12 +256,11 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) /* translate the group ranks into the communicator */ sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group); if (NULL == sync->peer_list.peers) { - OPAL_THREAD_UNLOCK(&module->lock); + OPAL_THREAD_UNLOCK(&sync->lock); return OMPI_ERR_OUT_OF_RESOURCE; } if (!(assert & MPI_MODE_NOCHECK)) { - OPAL_THREAD_LOCK(&sync->lock); for (int i = 0 ; i < sync->num_peers ; ++i) { ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i]; @@ -276,7 +273,6 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) ompi_osc_pt2pt_peer_set_unex (peer, false); } } - OPAL_THREAD_UNLOCK(&sync->lock); } else { sync->sync_expected = 0; } @@ -295,7 +291,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) "ompi_osc_pt2pt_start complete. eager sends active: %d", sync->eager_send_active)); - OPAL_THREAD_UNLOCK(&module->lock); + OPAL_THREAD_UNLOCK(&sync->lock); return OMPI_SUCCESS; } @@ -313,14 +309,14 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win) OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete entering...")); - OPAL_THREAD_LOCK(&module->lock); + OPAL_THREAD_LOCK(&sync->lock); if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != sync->type) { - OPAL_THREAD_UNLOCK(&module->lock); + OPAL_THREAD_UNLOCK(&sync->lock); return OMPI_ERR_RMA_SYNC; } /* wait for all the post messages */ - ompi_osc_pt2pt_sync_wait (sync); + ompi_osc_pt2pt_sync_wait_nolock (sync); /* phase 1 cleanup sync object */ group = sync->sync.pscw.group; @@ -330,8 +326,7 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win) /* need to reset the sync here to avoid processing incorrect post messages */ ompi_osc_pt2pt_sync_reset (sync); - - OPAL_THREAD_UNLOCK(&module->lock); + OPAL_THREAD_UNLOCK(&sync->lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete all posts received. sending complete messages...")); @@ -403,7 +398,7 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win) OPAL_THREAD_LOCK(&module->lock); /* wait for outgoing requests to complete. Don't wait for incoming, as we're only completing the access epoch, not the exposure epoch */ - while (module->outgoing_frag_count != module->outgoing_frag_signal_count) { + while (module->outgoing_frag_count < 0) { opal_condition_wait(&module->cond, &module->lock); } @@ -513,15 +508,13 @@ int ompi_osc_pt2pt_wait (ompi_win_t *win) } OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_wait entering...")); + "ompi_osc_pt2pt_wait entering... module %p", (void *) module)); OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_complete_msgs || - module->active_incoming_frag_count != module->active_incoming_frag_signal_count) { - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "num_complete_msgs = %d, " - "active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d", - module->num_complete_msgs, module->active_incoming_frag_count, - module->active_incoming_frag_signal_count)); + while (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) { + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "module %p, num_complete_msgs = %d, " + "active_incoming_frag_count = %d", (void *) module, module->num_complete_msgs, + module->active_incoming_frag_count)); opal_condition_wait(&module->cond, &module->lock); } @@ -554,21 +547,15 @@ int ompi_osc_pt2pt_test (ompi_win_t *win, int *flag) OPAL_THREAD_LOCK(&(module->lock)); - if (0 != module->num_complete_msgs || - module->active_incoming_frag_count != module->active_incoming_frag_signal_count) { + if (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) { *flag = 0; - ret = OMPI_SUCCESS; } else { *flag = 1; group = module->pw_group; module->pw_group = NULL; - OPAL_THREAD_UNLOCK(&(module->lock)); - OBJ_RELEASE(group); - - return OMPI_SUCCESS; } OPAL_THREAD_UNLOCK(&(module->lock)); @@ -580,15 +567,19 @@ void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, i { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: process_complete got complete message from %d. expected fragment count %d. " - "current signal count %d. current incomming count: %d. expected complete msgs: %d", - source, frag_count, module->active_incoming_frag_signal_count, - module->active_incoming_frag_count, module->num_complete_msgs)); + "current incomming count: %d. expected complete msgs: %d", source, + frag_count, module->active_incoming_frag_count, module->num_complete_msgs)); /* the current fragment is not part of the frag_count so we need to add it here */ - OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count, frag_count); + OPAL_THREAD_ADD32(&module->active_incoming_frag_count, -frag_count); - if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) { + /* make sure the signal count is written before changing the complete message count */ + opal_atomic_wmb (); + + if (0 == OPAL_THREAD_ADD32(&module->num_complete_msgs, 1)) { + OPAL_THREAD_LOCK(&module->lock); opal_condition_broadcast (&module->cond); + OPAL_THREAD_UNLOCK(&module->lock); } } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index 3973cf88d9..1b77721203 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -501,6 +501,9 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, is_long_msg = true; tag = get_tag (module); + } else { + /* still need to set the tag for the active/passive logic on the target */ + tag = !!(module->passive_target_access_epoch); } if (is_long_msg) { @@ -523,6 +526,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, header->count = target_count; header->displacement = target_disp; header->op = op->o_f_to_c_index; + header->tag = tag; ptr += sizeof (*header); do { @@ -565,7 +569,6 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, } } else { header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG; - header->tag = tag; osc_pt2pt_hton(header, proc); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index e41a8306b7..9b7cefcbb1 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -314,12 +314,13 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit sizeof(ompi_osc_base_module_t)); /* initialize the objects, so that always free in cleanup */ - OBJ_CONSTRUCT(&module->lock, opal_mutex_t); + OBJ_CONSTRUCT(&module->lock, opal_recursive_mutex_t); OBJ_CONSTRUCT(&module->cond, opal_condition_t); OBJ_CONSTRUCT(&module->locks_pending, opal_list_t); OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); + OBJ_CONSTRUCT(&module->pending_acc_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t); OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 04333d2add..65b96955a6 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2017 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -102,6 +102,7 @@ struct osc_pt2pt_pending_acc_t { void *data; size_t data_len; ompi_datatype_t *datatype; + bool active_target; }; typedef struct osc_pt2pt_pending_acc_t osc_pt2pt_pending_acc_t; @@ -666,8 +667,6 @@ static int accumulate_cb (ompi_request_t *request) rank = acc_data->peer; } - mark_incoming_completion (module, rank); - if (0 == OPAL_THREAD_ADD32(&acc_data->request_count, -1)) { /* no more requests needed before the buffer can be accumulated */ @@ -693,13 +692,15 @@ static int accumulate_cb (ompi_request_t *request) osc_pt2pt_gc_add_buffer (module, &acc_data->super); } + mark_incoming_completion (module, rank); + ompi_request_free (&request); return ret; } static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source, - char *data, size_t data_len, ompi_datatype_t *datatype) + char *data, size_t data_len, ompi_datatype_t *datatype, bool active_target) { ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); osc_pt2pt_pending_acc_t *pending_acc; @@ -714,8 +715,13 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os /* NTH: ensure we don't leave wait/process_flush/etc until this * accumulate operation is complete. */ - OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); + if (active_target) { + OPAL_THREAD_ADD32(&module->active_incoming_frag_count, -1); + } else { + OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); + } + pending_acc->active_target = active_target; pending_acc->source = source; /* save any inline data (eager acc, gacc only) */ @@ -747,7 +753,7 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os } /* add to the pending acc queue */ - OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super)); + OPAL_THREAD_SCOPED_LOCK(&module->pending_acc_lock, opal_list_append (&module->pending_acc, &pending_acc->super)); return OMPI_SUCCESS; } @@ -1090,7 +1096,9 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module) return OMPI_SUCCESS; } + OPAL_THREAD_LOCK(&module->pending_acc_lock); pending_acc = (osc_pt2pt_pending_acc_t *) opal_list_remove_first (&module->pending_acc); + OPAL_THREAD_UNLOCK(&module->pending_acc_lock); if (OPAL_UNLIKELY(NULL == pending_acc)) { /* called without any pending accumulation operations */ ompi_osc_pt2pt_accumulate_unlock (module); @@ -1127,7 +1135,7 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module) } /* signal that an operation is complete */ - mark_incoming_completion (module, pending_acc->source); + mark_incoming_completion (module, pending_acc->active_target ? MPI_PROC_NULL : pending_acc->source); pending_acc->data = NULL; OBJ_RELEASE(pending_acc); @@ -1138,6 +1146,7 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module) static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_acc_t *acc_header) { + bool active_target = !(acc_header->tag & 0x1); char *data = (char *) (acc_header + 1); struct ompi_datatype_t *datatype; uint64_t data_len; @@ -1162,7 +1171,7 @@ static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source, } else { /* couldn't aquire the accumulate lock so queue up the accumulate operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, - source, data, data_len, datatype); + source, data, data_len, datatype, active_target); } /* Release datatype & op */ @@ -1174,6 +1183,7 @@ static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source, static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source, ompi_osc_pt2pt_header_acc_t* acc_header) { + bool active_target = !(acc_header->tag & 0x1); char *data = (char *) (acc_header + 1); struct ompi_datatype_t *datatype; int ret; @@ -1193,7 +1203,7 @@ static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source, } else { /* queue the operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source, - NULL, 0, datatype); + NULL, 0, datatype, active_target); } /* Release datatype & op */ @@ -1205,6 +1215,7 @@ static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source, static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_acc_t *acc_header) { + bool active_target = !(acc_header->tag & 0x1); char *data = (char *) (acc_header + 1); struct ompi_datatype_t *datatype; void *buffer = NULL; @@ -1246,7 +1257,7 @@ static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source, } else { /* queue the operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, - source, data, data_len, datatype); + source, data, data_len, datatype, active_target); } /* Release datatype & op */ @@ -1258,6 +1269,7 @@ static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source, static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_acc_t *acc_header) { + bool active_target = !(acc_header->tag & 0x1); char *data = (char *) (acc_header + 1); struct ompi_datatype_t *datatype; int ret; @@ -1277,7 +1289,7 @@ static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int sour } else { /* queue the operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, - source, NULL, 0, datatype); + source, NULL, 0, datatype, active_target); } /* Release datatype & op */ @@ -1290,6 +1302,7 @@ static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int sour static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_cswap_t *cswap_header) { + bool active_target = !(cswap_header->tag & 0x1); char *data = (char*) (cswap_header + 1); struct ompi_datatype_t *datatype; int ret; @@ -1309,7 +1322,7 @@ static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source, } else { /* queue the operation */ ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) cswap_header, source, - data, 2 * datatype->super.size, datatype); + data, 2 * datatype->super.size, datatype, active_target); } /* Release datatype */ @@ -1392,7 +1405,7 @@ static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source, osc_pt2pt_add_pending (pending); } - /* signal incomming will increment this counter */ + /* signal incoming will increment this counter */ OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); return sizeof (*unlock_header); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_module.c b/ompi/mca/osc/pt2pt/osc_pt2pt_module.c index 0280f4738c..b4e5f95221 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_module.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_module.c @@ -77,6 +77,7 @@ int ompi_osc_pt2pt_free(ompi_win_t *win) /* it is erroneous to close a window with active operations on it so we should * probably produce an error here instead of cleaning up */ OPAL_LIST_DESTRUCT(&module->pending_acc); + OBJ_DESTRUCT(&module->pending_acc_lock); osc_pt2pt_gc_clean (module); OPAL_LIST_DESTRUCT(&module->buffer_gc); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index e49ba1a384..57fb0b2932 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -50,7 +50,7 @@ typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t; OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t, NULL, NULL); -static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module); +static int ompi_osc_pt2pt_activate_next_lock (ompi_osc_pt2pt_module_t *module); static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, int lock_type, uint64_t lock_ptr); static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock, int target); @@ -100,9 +100,9 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, if (MPI_LOCK_EXCLUSIVE == lock_type) { OPAL_THREAD_ADD32(&module->lock_status, 1); - ompi_osc_activate_next_lock (module); + ompi_osc_pt2pt_activate_next_lock (module); } else if (0 == OPAL_THREAD_ADD32(&module->lock_status, -1)) { - ompi_osc_activate_next_lock (module); + ompi_osc_pt2pt_activate_next_lock (module); } /* need to ensure we make progress */ @@ -385,10 +385,10 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock_internal: lock acks still expected: %d", lock->sync_expected)); + OPAL_THREAD_UNLOCK(&module->lock); /* wait until ack has arrived from target */ ompi_osc_pt2pt_sync_wait_expected (lock); - OPAL_THREAD_UNLOCK(&module->lock); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock_internal: all lock acks received")); @@ -426,7 +426,7 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) * So make sure to wait for all of the fragments to arrive. */ OPAL_THREAD_LOCK(&module->lock); - while (module->outgoing_frag_count < module->outgoing_frag_signal_count) { + while (module->outgoing_frag_count < 0) { opal_condition_wait(&module->cond, &module->lock); } OPAL_THREAD_UNLOCK(&module->lock); @@ -628,7 +628,7 @@ int ompi_osc_pt2pt_flush_local (int target, struct ompi_win_t *win) /* wait for all the requests */ OPAL_THREAD_LOCK(&module->lock); - while (module->outgoing_frag_count != module->outgoing_frag_signal_count) { + while (module->outgoing_frag_count < 0) { opal_condition_wait(&module->cond, &module->lock); } OPAL_THREAD_UNLOCK(&module->lock); @@ -654,7 +654,7 @@ int ompi_osc_pt2pt_flush_local_all (struct ompi_win_t *win) /* wait for all the requests */ OPAL_THREAD_LOCK(&module->lock); - while (module->outgoing_frag_count != module->outgoing_frag_signal_count) { + while (module->outgoing_frag_count < 0) { opal_condition_wait(&module->cond, &module->lock); } OPAL_THREAD_UNLOCK(&module->lock); @@ -758,7 +758,7 @@ static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, in return true; } -static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module) { +static int ompi_osc_pt2pt_activate_next_lock (ompi_osc_pt2pt_module_t *module) { /* release any other pending locks we can */ ompi_osc_pt2pt_pending_lock_t *pending_lock, *next; int ret = OMPI_SUCCESS; @@ -903,9 +903,9 @@ int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source, if (-1 == module->lock_status) { OPAL_THREAD_ADD32(&module->lock_status, 1); - ompi_osc_activate_next_lock (module); + ompi_osc_pt2pt_activate_next_lock (module); } else if (0 == OPAL_THREAD_ADD32(&module->lock_status, -1)) { - ompi_osc_activate_next_lock (module); + ompi_osc_pt2pt_activate_next_lock (module); } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h index 87bd1c45ad..10398926e8 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h @@ -127,20 +127,25 @@ bool ompi_osc_pt2pt_sync_pscw_peer (struct ompi_osc_pt2pt_module_t *module, int /** * Wait for all remote peers in the synchronization to respond */ -static inline void ompi_osc_pt2pt_sync_wait (ompi_osc_pt2pt_sync_t *sync) +static inline void ompi_osc_pt2pt_sync_wait_nolock (ompi_osc_pt2pt_sync_t *sync) { - OPAL_THREAD_LOCK(&sync->lock); while (!sync->eager_send_active) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "waiting for access epoch to start")); opal_condition_wait(&sync->cond, &sync->lock); } - OPAL_THREAD_UNLOCK(&sync->lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "access epoch ready")); } +static inline void ompi_osc_pt2pt_sync_wait (ompi_osc_pt2pt_sync_t *sync) +{ + OPAL_THREAD_LOCK(&sync->lock); + ompi_osc_pt2pt_sync_wait_nolock (sync); + OPAL_THREAD_UNLOCK(&sync->lock); +} + /** * Wait for all remote peers in the synchronization to respond */