diff --git a/ompi/mca/osc/pt2pt/Makefile.am b/ompi/mca/osc/pt2pt/Makefile.am index 83bdb33e6c..17d08ff50e 100644 --- a/ompi/mca/osc/pt2pt/Makefile.am +++ b/ompi/mca/osc/pt2pt/Makefile.am @@ -32,7 +32,9 @@ pt2pt_sources = \ osc_pt2pt_request.h \ osc_pt2pt_request.c \ osc_pt2pt_active_target.c \ - osc_pt2pt_passive_target.c + osc_pt2pt_passive_target.c \ + osc_pt2pt_sync.h \ + osc_pt2pt_sync.c # Make the output library in this directory, and name it either # mca__.la (for DSO builds) or libmca__.la diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 04833efe53..51b14b7057 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -29,19 +29,19 @@ #include "opal/class/opal_free_list.h" #include "opal/class/opal_hash_table.h" #include "opal/threads/threads.h" -#include "opal/mca/btl/btl.h" +#include "opal/util/output.h" #include "ompi/win/win.h" +#include "ompi/info/info.h" #include "ompi/communicator/communicator.h" #include "ompi/datatype/ompi_datatype.h" #include "ompi/request/request.h" #include "ompi/mca/osc/osc.h" #include "ompi/mca/osc/base/base.h" -#include "opal/mca/btl/btl.h" -#include "ompi/mca/bml/bml.h" #include "ompi/memchecker.h" #include "osc_pt2pt_header.h" +#include "osc_pt2pt_sync.h" BEGIN_C_DECLS @@ -85,6 +85,9 @@ struct ompi_osc_pt2pt_peer_t { /** make this an opal object */ opal_object_t super; + /** rank of this peer */ + int rank; + /** pointer to the current send fragment for each outgoing target */ struct ompi_osc_pt2pt_frag_t *active_frag; @@ -94,25 +97,16 @@ struct ompi_osc_pt2pt_peer_t { /** fragments queued to this target */ opal_list_t queued_frags; - /** number of acks pending. New requests can not be sent out if there are - * acks pending (to fulfill the ordering constraints of accumulate) */ - uint32_t num_acks_pending; - /** number of fragments incomming (negative - expected, positive - unsynchronized) */ int32_t passive_incoming_frag_count; - /** peer is in an access epoch */ - bool access_epoch; - - /** eager sends are active to this peer */ - bool eager_send_active; + /** unexpected post message arrived */ + bool unexpected_post; }; typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t; OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t); -#define SEQ_INVALID 0xFFFFFFFFFFFFFFFFULL - /** Module structure. Exactly one of these is associated with each PT2PT window */ struct ompi_osc_pt2pt_module_t { @@ -122,6 +116,9 @@ struct ompi_osc_pt2pt_module_t { /** window should have accumulate ordering... */ bool accumulate_ordering; + /** no locks info key value */ + bool no_locks; + /** pointer to free on cleanup (may be NULL) */ void *free_after; @@ -141,11 +138,11 @@ struct ompi_osc_pt2pt_module_t { /** condition variable associated with lock */ opal_condition_t cond; - /** lock for atomic window updates from reductions */ - opal_mutex_t acc_lock; + /** hash table of peer objects */ + opal_hash_table_t peer_hash; - /** peer data */ - ompi_osc_pt2pt_peer_t *peers; + /** lock protecting peer_hash */ + opal_mutex_t peer_lock; /** Nmber of communication fragments started for this epoch, by peer. Not in peer data to make fence more manageable. */ @@ -166,29 +163,14 @@ struct ompi_osc_pt2pt_module_t { /* Next incoming buffer count at which we want a signal on cond */ uint32_t active_incoming_frag_signal_count; - /* Number of flush ack requests send since beginning of time */ - uint64_t flush_ack_requested_count; - /* Number of flush ack replies received since beginning of - time. cond should be signalled on every flush reply - received. */ - uint64_t flush_ack_received_count; - /** Number of targets locked/being locked */ unsigned int passive_target_access_epoch; - /** start sending data eagerly */ - bool active_eager_send_active; - - /** Indicates the window is in an all access epoch (fence, lock_all) */ - bool all_access_epoch; + /** Indicates the window is in a pcsw or all access (fence, lock_all) epoch */ + ompi_osc_pt2pt_sync_t all_sync; /* ********************* PWSC data ************************ */ struct ompi_group_t *pw_group; - struct ompi_group_t *sc_group; - - /** Number of "ping" messages from the remote post group we've - received */ - int32_t num_post_msgs; /** Number of "count" messages from the remote complete group we've received */ @@ -207,9 +189,7 @@ struct ompi_osc_pt2pt_module_t { opal_list_t locks_pending; /** origin side list of locks currently outstanding */ - opal_list_t outstanding_locks; - - uint64_t lock_serial_number; + opal_hash_table_t outstanding_locks; unsigned char *incoming_buffer; ompi_request_t *frag_request; @@ -218,10 +198,6 @@ struct ompi_osc_pt2pt_module_t { opal_atomic_lock_t accumulate_lock; opal_list_t pending_acc; - /* enforce pscw matching */ - /** list of unmatched post messages */ - opal_list_t pending_posts; - /** Lock for garbage collection lists */ opal_mutex_t gc_lock; @@ -234,6 +210,29 @@ struct ompi_osc_pt2pt_module_t { typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t; OMPI_MODULE_DECLSPEC extern ompi_osc_pt2pt_component_t mca_osc_pt2pt_component; +static inline ompi_osc_pt2pt_peer_t *ompi_osc_pt2pt_peer_lookup (ompi_osc_pt2pt_module_t *module, + int rank) +{ + ompi_osc_pt2pt_peer_t *peer = NULL; + (void) opal_hash_table_get_value_uint32 (&module->peer_hash, rank, (void **) &peer); + + if (OPAL_UNLIKELY(NULL == peer)) { + OPAL_THREAD_LOCK(&module->peer_lock); + (void) opal_hash_table_get_value_uint32 (&module->peer_hash, rank, (void **) &peer); + + if (NULL == peer) { + peer = OBJ_NEW(ompi_osc_pt2pt_peer_t); + peer->rank = rank; + + (void) opal_hash_table_set_value_uint32 (&module->peer_hash, rank, (void *) peer); + } + OPAL_THREAD_UNLOCK(&module->peer_lock); + } + + return peer; +} + + struct ompi_osc_pt2pt_pending_t { opal_list_item_t super; ompi_osc_pt2pt_module_t *module; @@ -262,23 +261,23 @@ int ompi_osc_pt2pt_put(const void *origin_addr, struct ompi_win_t *win); int ompi_osc_pt2pt_accumulate(const void *origin_addr, - int origin_count, - struct ompi_datatype_t *origin_dt, - int target, - OPAL_PTRDIFF_TYPE target_disp, - int target_count, - struct ompi_datatype_t *target_dt, - struct ompi_op_t *op, - struct ompi_win_t *win); + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + OPAL_PTRDIFF_TYPE target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, + struct ompi_win_t *win); int ompi_osc_pt2pt_get(void *origin_addr, - int origin_count, - struct ompi_datatype_t *origin_dt, - int target, - OPAL_PTRDIFF_TYPE target_disp, - int target_count, - struct ompi_datatype_t *target_dt, - struct ompi_win_t *win); + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + OPAL_PTRDIFF_TYPE target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win); int ompi_osc_pt2pt_compare_and_swap(const void *origin_addr, const void *compare_addr, @@ -357,7 +356,10 @@ int ompi_osc_pt2pt_rget_accumulate(const void *origin_addr, int ompi_osc_pt2pt_fence(int assert, struct ompi_win_t *win); /* received a post message */ -int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source); +void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source); + +/* received a complete message */ +void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, int frag_count); int ompi_osc_pt2pt_start(struct ompi_group_t *group, int assert, @@ -451,7 +453,8 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in opal_condition_broadcast(&module->cond); } } else { - ompi_osc_pt2pt_peer_t *peer = module->peers + source; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "mark_incoming_completion marking passive incoming complete. source = %d, count = %d", source, (int) peer->passive_incoming_frag_count + 1)); @@ -704,6 +707,16 @@ static inline int ompi_osc_pt2pt_accumulate_trylock (ompi_osc_pt2pt_module_t *mo return opal_atomic_trylock (&module->accumulate_lock); } +/** + * @brief check if this process has this process is in a passive target access epoch + * + * @param[in] module osc pt2pt module + */ +static inline bool ompi_osc_pt2pt_in_passive_epoch (ompi_osc_pt2pt_module_t *module) +{ + return 0 != module->passive_target_access_epoch; +} + /** * ompi_osc_pt2pt_accumulate_unlock: * @@ -722,9 +735,134 @@ static inline void ompi_osc_pt2pt_accumulate_unlock (ompi_osc_pt2pt_module_t *mo } } -static inline bool ompi_osc_pt2pt_check_access_epoch (ompi_osc_pt2pt_module_t *module, int rank) +/** + * Find the first outstanding lock of the target. + * + * @param[in] module osc pt2pt module + * @param[in] target target rank + * @param[out] peer peer object associated with the target + * + * @returns an outstanding lock on success + * + * This function looks for an outstanding lock to the target. If a lock exists it is returned. + */ +static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_lock_find (ompi_osc_pt2pt_module_t *module, int target, + ompi_osc_pt2pt_peer_t **peer) { - return module->all_access_epoch || module->peers[rank].access_epoch; + ompi_osc_pt2pt_sync_t *outstanding_lock = NULL; + + (void) opal_hash_table_get_value_uint32 (&module->outstanding_locks, (uint32_t) target, (void **) &outstanding_lock); + if (NULL != outstanding_lock && peer) { + *peer = outstanding_lock->peer_list.peer; + } + + return outstanding_lock; +} + +/** + * Add an outstanding lock + * + * @param[in] module osc pt2pt module + * @param[in] lock lock object + * + * This function inserts a lock object to the list of outstanding locks. The caller must be holding the module + * lock. + */ +static inline void ompi_osc_pt2pt_module_lock_insert (struct ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock) +{ + (void) opal_hash_table_set_value_uint32 (&module->outstanding_locks, (uint32_t) lock->sync.lock.target, (void *) lock); +} + + +/** + * Remove an outstanding lock + * + * @param[in] module osc pt2pt module + * @param[in] lock lock object + * + * This function removes a lock object to the list of outstanding locks. The caller must be holding the module + * lock. + */ +static inline void ompi_osc_pt2pt_module_lock_remove (struct ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock) +{ + + (void) opal_hash_table_remove_value_uint32 (&module->outstanding_locks, (uint32_t) lock->sync.lock.target); +} + +/** + * Lookup a synchronization object associated with the target + * + * @param[in] module osc pt2pt module + * @param[in] target target rank + * @param[out] peer peer object + * + * @returns NULL if the target is not locked, fenced, or part of a pscw sync + * @returns synchronization object on success + * + * This function returns the synchronization object associated with an access epoch for + * the target. If the target is not part of any current access epoch then NULL is returned. + */ +static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc_pt2pt_module_t *module, int target, + struct ompi_osc_pt2pt_peer_t **peer) +{ + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "osc/pt2pt: looking for synchronization object for target %d", target)); + + switch (module->all_sync.type) { + case OMPI_OSC_PT2PT_SYNC_TYPE_NONE: + if (!module->no_locks) { + return ompi_osc_pt2pt_module_lock_find (module, target, peer); + } + + return NULL; + case OMPI_OSC_PT2PT_SYNC_TYPE_FENCE: + case OMPI_OSC_PT2PT_SYNC_TYPE_LOCK: + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "osc/pt2pt: found fence/lock_all access epoch for target %d", target)); + + /* fence epoch is now active */ + module->all_sync.epoch_active = true; + if (peer) { + *peer = ompi_osc_pt2pt_peer_lookup (module, target); + } + + return &module->all_sync; + case OMPI_OSC_PT2PT_SYNC_TYPE_PSCW: + if (ompi_osc_pt2pt_sync_pscw_peer (module, target, peer)) { + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "osc/pt2pt: found PSCW access epoch target for %d", target)); + return &module->all_sync; + } + } + + return NULL; +} + +/** + * @brief check if an access epoch is active + * + * @param[in] module osc pt2pt module + * + * @returns true if any type of access epoch is active + * @returns false otherwise + * + * This function is used to check for conflicting access epochs. + */ +static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t *module) +{ + return (module->all_sync.epoch_active || ompi_osc_pt2pt_in_passive_epoch (module)); +} + +static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank) +{ + ompi_osc_pt2pt_sync_t *sync; + + sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, NULL); + if (!sync) { + return false; + } + + return sync->eager_send_active; } END_C_DECLS diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index 5e5e5d414a..8804a8f023 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 IBM Corporation. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -35,74 +35,92 @@ #include "ompi/mca/osc/base/base.h" /** - * ompi_osc_pt2pt_pending_post_t: + * compare_ranks: * - * Describes a post operation that was encountered outside its - * matching start operation. + * @param[in] ptra Pointer to integer item + * @param[in] ptrb Pointer to integer item + * + * @returns 0 if *ptra == *ptrb + * @returns -1 if *ptra < *ptrb + * @returns 1 otherwise + * + * This function is used to sort the rank list. It can be removed if + * groups are always in order. */ -struct ompi_osc_pt2pt_pending_post_t { - opal_list_item_t super; - int rank; -}; -typedef struct ompi_osc_pt2pt_pending_post_t ompi_osc_pt2pt_pending_post_t; -OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_pending_post_t); - -OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_post_t, opal_list_item_t, NULL, NULL); - -static bool group_contains_proc (ompi_group_t *group, ompi_proc_t *proc) +static int compare_ranks (const void *ptra, const void *ptrb) { - int group_size = ompi_group_size (group); + int a = *((int *) ptra); + int b = *((int *) ptrb); - for (int i = 0 ; i < group_size ; ++i) { - ompi_proc_t *group_proc = ompi_group_peer_lookup (group, i); - - /* it is safe to compare procs by pointer */ - if (group_proc == proc) { - return true; - } + if (a < b) { + return -1; + } else if (a > b) { + return 1; } - return false; + return 0; } -static int* -get_comm_ranks(ompi_osc_pt2pt_module_t *module, - ompi_group_t *sub_group) +/** + * ompi_osc_pt2pt_get_comm_ranks: + * + * @param[in] module - OSC PT2PT module + * @param[in] sub_group - Group with ranks to translate + * + * @returns an array of translated ranks on success or NULL on failure + * + * Translate the ranks given in {sub_group} into ranks in the + * communicator used to create {module}. + */ +static ompi_osc_pt2pt_peer_t **ompi_osc_pt2pt_get_peers (ompi_osc_pt2pt_module_t *module, ompi_group_t *sub_group) { - int *ranks1 = NULL, *ranks2 = NULL; - bool success = false; - int i, ret; + int size = ompi_group_size(sub_group); + ompi_osc_pt2pt_peer_t **peers; + int *ranks1, *ranks2; + int ret; - ranks1 = malloc(sizeof(int) * ompi_group_size(sub_group)); - if (NULL == ranks1) goto cleanup; - ranks2 = malloc(sizeof(int) * ompi_group_size(sub_group)); - if (NULL == ranks2) goto cleanup; + ranks1 = malloc (sizeof(int) * size); + ranks2 = malloc (sizeof(int) * size); + peers = malloc (sizeof (ompi_osc_pt2pt_peer_t *) * size); + if (NULL == ranks1 || NULL == ranks2 || NULL == peers) { + free (ranks1); + free (ranks2); + free (peers); + } - for (i = 0 ; i < ompi_group_size(sub_group) ; ++i) { + for (int i = 0 ; i < size ; ++i) { ranks1[i] = i; } - ret = ompi_group_translate_ranks(sub_group, - ompi_group_size(sub_group), - ranks1, - module->comm->c_local_group, - ranks2); - if (OMPI_SUCCESS != ret) goto cleanup; - - success = true; - - cleanup: - if (NULL != ranks1) free(ranks1); - if (!success) { - if (NULL != ranks2) free(ranks2); - ranks2 = NULL; + ret = ompi_group_translate_ranks (sub_group, size, ranks1, module->comm->c_local_group, + ranks2); + free (ranks1); + if (OMPI_SUCCESS != ret) { + free (ranks2); + free (peers); + return NULL; } - return ranks2; + qsort (ranks2, size, sizeof (int), compare_ranks); + for (int i = 0 ; i < size ; ++i) { + peers[i] = ompi_osc_pt2pt_peer_lookup (module, ranks2[i]); + OBJ_RETAIN(peers[i]); + } + free (ranks2); + + return peers; } -int -ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) +static void ompi_osc_pt2pt_release_peers (ompi_osc_pt2pt_peer_t **peers, int npeers) +{ + for (int i = 0 ; i < npeers ; ++i) { + OBJ_RELEASE(peers[i]); + } + + free (peers); +} + +int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); uint32_t incoming_reqs; @@ -112,14 +130,16 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) "osc pt2pt: fence start")); /* can't enter an active target epoch when in a passive target epoch */ - if (module->passive_target_access_epoch) { + if (ompi_osc_pt2pt_in_passive_epoch (module)) { + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, + "osc pt2pt: could not enter fence. already in an access epoch")); return OMPI_ERR_RMA_SYNC; } /* active sends are now active (we will close the epoch if NOSUCCEED is specified) */ if (0 == (assert & MPI_MODE_NOSUCCEED)) { - module->active_eager_send_active = true; - module->all_access_epoch = true; + module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE; + module->all_sync.eager_send_active = true; } /* short-circuit the noprecede case */ @@ -168,9 +188,11 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) if (assert & MPI_MODE_NOSUCCEED) { /* as specified in MPI-3 p 438 3-5 the fence can end an epoch. it isn't explicitly * stated that MPI_MODE_NOSUCCEED ends the epoch but it is a safe assumption. */ - module->active_eager_send_active = false; - module->all_access_epoch = false; + ompi_osc_pt2pt_sync_reset (&module->all_sync); } + + module->all_sync.epoch_active = false; + opal_condition_broadcast (&module->cond); OPAL_THREAD_UNLOCK(&module->lock); @@ -181,124 +203,131 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) } -int -ompi_osc_pt2pt_start(ompi_group_t *group, - int assert, - ompi_win_t *win) +int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - ompi_osc_pt2pt_pending_post_t *pending_post, *next; - int group_size; - int *ranks; + ompi_osc_pt2pt_sync_t *sync = &module->all_sync; - OPAL_THREAD_LOCK(&module->lock); + OPAL_THREAD_LOCK(&sync->lock); - /* ensure we're not already in a start or passive target. we can no check for all - * access here due to fence */ - if (NULL != module->sc_group || module->passive_target_access_epoch) { + /* check if we are already in an access epoch */ + if (ompi_osc_pt2pt_access_epoch_active (module)) { OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_RMA_SYNC; } + /* mark all procs in this group as being in an access epoch */ + sync->num_peers = ompi_group_size (group); + sync->sync.pscw.group = group; + + /* haven't processed any post messages yet */ + sync->sync_expected = sync->num_peers; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "ompi_osc_rdma_start entering with group size %d...", + sync->num_peers)); + + if (0 == ompi_group_size (group)) { + /* nothing more to do. this is an empty start epoch */ + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_SUCCESS; + } + + opal_atomic_wmb (); + + sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_PSCW; + + /* prevent us from entering a passive-target, fence, or another pscw access epoch until + * the matching complete is called */ + sync->epoch_active = true; + + /* translate the group ranks into the communicator */ + sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group); + if (NULL == sync->peer_list.peers) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + /* save the group */ OBJ_RETAIN(group); ompi_group_increment_proc_count(group); - module->sc_group = group; + if (!(assert & MPI_MODE_NOCHECK)) { + OPAL_THREAD_LOCK(&sync->lock); + for (int i = 0 ; i < sync->num_peers ; ++i) { + ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i]; - /* mark all procs in this group as being in an access epoch */ - group_size = ompi_group_size (module->sc_group); - - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_start entering with group size %d...", - group_size)); - - ranks = get_comm_ranks(module, module->sc_group); - if (NULL == ranks) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; - - for (int i = 0 ; i < group_size ; ++i) { - /* when the post comes in we will be in an access epoch with this proc */ - module->peers[ranks[i]].access_epoch = true; - } - - free (ranks); - - OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_pt2pt_pending_post_t) { - ompi_proc_t *pending_proc = ompi_comm_peer_lookup (module->comm, pending_post->rank); - - if (group_contains_proc (module->sc_group, pending_proc)) { - ompi_osc_pt2pt_peer_t *peer = module->peers + pending_post->rank; - - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Consumed unexpected post message from %d", - pending_post->rank)); - ++module->num_post_msgs; - peer->eager_send_active = true; - - opal_list_remove_item (&module->pending_posts, &pending_post->super); - OBJ_RELEASE(pending_post); + if (peer->unexpected_post) { + /* the peer already sent a post message for this pscw access epoch */ + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "found unexpected post from %d", + peer->rank)); + OPAL_THREAD_ADD32 (&sync->sync_expected, -1); + peer->unexpected_post = false; + } } + OPAL_THREAD_UNLOCK(&sync->lock); + } else { + sync->sync_expected = 0; } - /* disable eager sends until we've receved the proper number of - post messages, at which time we know all our peers are ready to - receive messages. */ - module->active_eager_send_active = false; - - /* possible we've already received a couple in messages, so - add however many we're going to wait for */ - module->num_post_msgs -= ompi_group_size(module->sc_group); - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "num_post_msgs = %d", module->num_post_msgs)); + "post messages still needed: %d", sync->sync_expected)); /* if we've already received all the post messages, we can eager send. Otherwise, eager send will be enabled when numb_post_messages reaches 0 */ - if (0 == module->num_post_msgs) { - module->active_eager_send_active = true; + if (0 == sync->sync_expected) { + sync->eager_send_active = true; } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_start complete")); + "ompi_osc_pt2pt_start complete. eager sends active: %d", + sync->eager_send_active)); OPAL_THREAD_UNLOCK(&module->lock); return OMPI_SUCCESS; } -int -ompi_osc_pt2pt_complete(ompi_win_t *win) +int ompi_osc_pt2pt_complete (ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - ompi_osc_pt2pt_header_complete_t complete_req; - ompi_osc_pt2pt_peer_t *peer; + ompi_osc_pt2pt_sync_t *sync = &module->all_sync; + int my_rank = ompi_comm_rank (module->comm); + ompi_osc_pt2pt_peer_t **peers; int ret = OMPI_SUCCESS; - int i; - int *ranks = NULL; ompi_group_t *group; + size_t group_size; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete entering...")); - if (NULL == module->sc_group) { + OPAL_THREAD_LOCK(&module->lock); + if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != sync->type) { + OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_RMA_SYNC; } - ranks = get_comm_ranks(module, module->sc_group); - if (NULL == ranks) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; - - OPAL_THREAD_LOCK(&module->lock); - /* wait for all the post messages */ - while (0 != module->num_post_msgs) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); - opal_condition_wait(&module->cond, &module->lock); + ompi_osc_pt2pt_sync_wait (sync); + + /* phase 1 cleanup sync object */ + group = sync->sync.pscw.group; + group_size = sync->num_peers; + + peers = sync->peer_list.peers; + if (NULL == peers) { + /* empty peer list */ + OPAL_THREAD_UNLOCK(&(module->lock)); + OBJ_RELEASE(group); + return OMPI_SUCCESS; } + OPAL_THREAD_UNLOCK(&module->lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_complete sending complete messages")); + "ompi_osc_pt2pt_complete all posts received. sending complete messages...")); /* for each process in group, send a control message with number of updates coming, then start all the requests. Note that the @@ -307,12 +336,13 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) At the same time, clean out the outgoing count for the next round. */ - for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) { - ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, ranks[i]); - if (ompi_proc_local() == proc) { + for (size_t i = 0 ; i < group_size ; ++i) { + ompi_osc_pt2pt_header_complete_t complete_req; + int rank = peers[i]->rank; + + if (my_rank == rank) { /* shortcut for self */ - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete self complete")); - module->num_complete_msgs++; + osc_pt2pt_incoming_complete (module, rank, 0); continue; } @@ -322,10 +352,10 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) complete_req.padding[0] = 0; complete_req.padding[1] = 0; #endif - complete_req.frag_count = module->epoch_outgoing_frag_count[ranks[i]]; + complete_req.frag_count = module->epoch_outgoing_frag_count[rank]; osc_pt2pt_hton(&complete_req, proc); - peer = module->peers + ranks[i]; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, rank); /* XXX -- TODO -- since fragment are always delivered in order we do not need to count anything but long * requests. once that is done this can be removed. */ @@ -335,36 +365,38 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete sending complete message to %d. frag_count: %u", - ranks[i], complete_req.frag_count)); + rank, complete_req.frag_count)); - - peer->access_epoch = false; - - ret = ompi_osc_pt2pt_control_send (module, ranks[i], &complete_req, + ret = ompi_osc_pt2pt_control_send (module, rank, &complete_req, sizeof(ompi_osc_pt2pt_header_complete_t)); - if (OMPI_SUCCESS != ret) goto cleanup; + if (OMPI_SUCCESS != ret) { + break; + } - ret = ompi_osc_pt2pt_frag_flush_target (module, ranks[i]); - if (OMPI_SUCCESS != ret) goto cleanup; + ret = ompi_osc_pt2pt_frag_flush_target (module, rank); + if (OMPI_SUCCESS != ret) { + break; + } + + /* zero the fragment counts here to ensure they are zerod */ + module->epoch_outgoing_frag_count[rank] = 0; + } + + /* release our reference to peers in this group */ + ompi_osc_pt2pt_release_peers (peers, group_size); + + if (OMPI_SUCCESS != ret) { + return ret; } OPAL_THREAD_LOCK(&module->lock); - /* zero the fragment counts here to ensure they are zerod */ - for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) { - peer = module->peers + ranks[i]; - module->epoch_outgoing_frag_count[ranks[i]] = 0; - peer->eager_send_active = false; - } - /* wait for outgoing requests to complete. Don't wait for incoming, as we're only completing the access epoch, not the exposure epoch */ while (module->outgoing_frag_count != module->outgoing_frag_signal_count) { opal_condition_wait(&module->cond, &module->lock); } - /* phase 1 cleanup group */ - group = module->sc_group; - module->sc_group = NULL; + ompi_osc_pt2pt_sync_reset (sync); /* unlock here, as group cleanup can take a while... */ OPAL_THREAD_UNLOCK(&module->lock); @@ -375,26 +407,17 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete complete")); - free (ranks); return OMPI_SUCCESS; - - cleanup: - if (NULL != ranks) free(ranks); - - return ret; } -int -ompi_osc_pt2pt_post(ompi_group_t *group, - int assert, - ompi_win_t *win) +int ompi_osc_pt2pt_post (ompi_group_t *group, int assert, ompi_win_t *win) { - int *ranks; int ret = OMPI_SUCCESS; ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_osc_pt2pt_header_post_t post_req; + ompi_osc_pt2pt_peer_t **peers; /* can't check for all access epoch here due to fence */ if (module->pw_group) { @@ -405,17 +428,18 @@ ompi_osc_pt2pt_post(ompi_group_t *group, "ompi_osc_pt2pt_post entering with group size %d...", ompi_group_size (group))); - /* save the group */ - OBJ_RETAIN(group); - ompi_group_increment_proc_count(group); - - OPAL_THREAD_LOCK(&(module->lock)); + OPAL_THREAD_LOCK(&module->lock); /* ensure we're not already in a post */ if (NULL != module->pw_group) { OPAL_THREAD_UNLOCK(&(module->lock)); return OMPI_ERR_RMA_SYNC; } + + /* save the group */ + OBJ_RETAIN(group); + ompi_group_increment_proc_count(group); + module->pw_group = group; /* Update completion counter. Can't have received any completion @@ -425,18 +449,26 @@ ompi_osc_pt2pt_post(ompi_group_t *group, OPAL_THREAD_UNLOCK(&(module->lock)); + if ((assert & MPI_MODE_NOCHECK) || 0 == ompi_group_size (group)) { + return OMPI_SUCCESS; + } + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "sending post messages")); - ranks = get_comm_ranks(module, module->pw_group); - if (NULL == ranks) { + /* translate group ranks into the communicator */ + peers = ompi_osc_pt2pt_get_peers (module, module->pw_group); + if (OPAL_UNLIKELY(NULL == peers)) { return OMPI_ERR_OUT_OF_RESOURCE; } /* send a hello counter to everyone in group */ for (int i = 0 ; i < ompi_group_size(module->pw_group) ; ++i) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Sending post message to rank %d", ranks[i])); - ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, ranks[i]); + ompi_osc_pt2pt_peer_t *peer = peers[i]; + int rank = peer->rank; + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Sending post message to rank %d", rank)); + ompi_proc_t *proc = ompi_comm_peer_lookup (module->comm, rank); /* shortcut for self */ if (ompi_proc_local() == proc) { @@ -447,26 +479,24 @@ ompi_osc_pt2pt_post(ompi_group_t *group, post_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_POST; post_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; - post_req.windx = ompi_comm_get_cid(module->comm); osc_pt2pt_hton(&post_req, proc); /* we don't want to send any data, since we're the exposure epoch only, so use an unbuffered send */ - ret = ompi_osc_pt2pt_control_send_unbuffered(module, ranks[i], &post_req, - sizeof(ompi_osc_pt2pt_header_post_t)); + ret = ompi_osc_pt2pt_control_send_unbuffered(module, rank, &post_req, + sizeof(ompi_osc_pt2pt_header_post_t)); if (OMPI_SUCCESS != ret) { break; } } - free (ranks); + ompi_osc_pt2pt_release_peers (peers, ompi_group_size(module->pw_group)); return ret; } -int -ompi_osc_pt2pt_wait(ompi_win_t *win) +int ompi_osc_pt2pt_wait (ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_group_t *group; @@ -481,9 +511,10 @@ ompi_osc_pt2pt_wait(ompi_win_t *win) OPAL_THREAD_LOCK(&module->lock); while (0 != module->num_complete_msgs || module->active_incoming_frag_count != module->active_incoming_frag_signal_count) { - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "num_complete_msgs = %d, active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d", - module->num_complete_msgs, module->active_incoming_frag_count, module->active_incoming_frag_signal_count)); + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "num_complete_msgs = %d, " + "active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d", + module->num_complete_msgs, module->active_incoming_frag_count, + module->active_incoming_frag_signal_count)); opal_condition_wait(&module->cond, &module->lock); } @@ -501,9 +532,7 @@ ompi_osc_pt2pt_wait(ompi_win_t *win) } -int -ompi_osc_pt2pt_test(ompi_win_t *win, - int *flag) +int ompi_osc_pt2pt_test (ompi_win_t *win, int *flag) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_group_t *group; @@ -542,41 +571,45 @@ ompi_osc_pt2pt_test(ompi_win_t *win, return ret; } -int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source) +void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, int frag_count) { - ompi_proc_t *source_proc = ompi_comm_peer_lookup (module->comm, source); - ompi_osc_pt2pt_peer_t *peer = module->peers + source; + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "osc pt2pt: process_complete got complete message from %d. expected fragment count %d. " + "current signal count %d. current incomming count: %d. expected complete msgs: %d", + source, frag_count, module->active_incoming_frag_signal_count, + module->active_incoming_frag_count, module->num_complete_msgs)); - OPAL_THREAD_LOCK(&module->lock); + /* the current fragment is not part of the frag_count so we need to add it here */ + OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count, frag_count); + + if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) { + opal_condition_broadcast (&module->cond); + } +} + +void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source) +{ + ompi_osc_pt2pt_sync_t *sync = &module->all_sync; + + OPAL_THREAD_LOCK(&sync->lock); /* verify that this proc is part of the current start group */ - if (!module->sc_group || !group_contains_proc (module->sc_group, source_proc)) { - ompi_osc_pt2pt_pending_post_t *pending_post = OBJ_NEW(ompi_osc_pt2pt_pending_post_t); + if (!ompi_osc_pt2pt_sync_pscw_peer (module, source, NULL)) { + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "received unexpected post message from %d. module->sc_group = %p, size = %d", - source, (void*)module->sc_group, module->sc_group ? ompi_group_size (module->sc_group) : 0)); + "received unexpected post message from %d for future PSCW synchronization", + source)); - pending_post->rank = source; + peer->unexpected_post = true; + OPAL_THREAD_UNLOCK(&sync->lock); + } else { + OPAL_THREAD_UNLOCK(&sync->lock); - opal_list_append (&module->pending_posts, &pending_post->super); + ompi_osc_pt2pt_sync_expected (sync); - OPAL_THREAD_UNLOCK(&module->lock); - return OMPI_SUCCESS; + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "received post message for PSCW synchronization. post messages still needed: %d", + sync->sync_expected)); } - - assert (!peer->eager_send_active); - peer->eager_send_active = true; - - module->num_post_msgs++; - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "received post message. num_post_msgs = %d", module->num_post_msgs)); - - if (0 == module->num_post_msgs) { - module->active_eager_send_active = true; - } - opal_condition_broadcast (&module->cond); - OPAL_THREAD_UNLOCK(&module->lock); - - return OMPI_SUCCESS; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index 46407d1366..2f26b28c50 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -21,12 +21,6 @@ * $HEADER$ */ -#include "ompi_config.h" -#include "mpi.h" - -#include -#include - #include "osc_pt2pt.h" #include "osc_pt2pt_request.h" #include "osc_pt2pt_header.h" @@ -35,9 +29,9 @@ #include "opal_stdint.h" #include "ompi/memchecker.h" -#include "ompi/mca/pml/pml.h" #include "ompi/mca/osc/base/osc_base_obj_convert.h" -#include "ompi/mca/osc/base/base.h" + +#include /* progress an OSC request */ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) @@ -82,26 +76,17 @@ static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request) } /* self communication optimizations */ -static inline int ompi_osc_pt2pt_put_self (const void *source, int source_count, ompi_datatype_t *source_datatype, - OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype, - ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request) +static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count, + ompi_datatype_t *source_datatype, OPAL_PTRDIFF_TYPE target_disp, int target_count, + ompi_datatype_t *target_datatype, ompi_osc_pt2pt_module_t *module, + ompi_osc_pt2pt_request_t *request) { void *target = (unsigned char*) module->baseptr + ((unsigned long) target_disp * module->disp_unit); int ret; /* if we are in active target mode wait until all post messages arrive */ - if (module->sc_group && !module->active_eager_send_active) { - OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_post_msgs) { - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); - } - - if (!(module->passive_target_access_epoch || module->active_eager_send_active)) { - return OMPI_ERR_RMA_SYNC; - } + ompi_osc_pt2pt_sync_wait (pt2pt_sync); ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype, target, target_count, target_datatype); @@ -116,26 +101,16 @@ static inline int ompi_osc_pt2pt_put_self (const void *source, int source_count, return OMPI_SUCCESS; } -static inline int ompi_osc_pt2pt_get_self (void *target, int target_count, ompi_datatype_t *target_datatype, - OPAL_PTRDIFF_TYPE source_disp, int source_count, ompi_datatype_t *source_datatype, - ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request) +static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, void *target, int target_count, ompi_datatype_t *target_datatype, + OPAL_PTRDIFF_TYPE source_disp, int source_count, ompi_datatype_t *source_datatype, + ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request) { void *source = (unsigned char*) module->baseptr + ((unsigned long) source_disp * module->disp_unit); int ret; /* if we are in active target mode wait until all post messages arrive */ - if (module->sc_group && !module->active_eager_send_active) { - OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_post_msgs) { - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); - } - - if (!(module->passive_target_access_epoch || module->active_eager_send_active)) { - return OMPI_ERR_RMA_SYNC; - } + ompi_osc_pt2pt_sync_wait (pt2pt_sync); ret = ompi_datatype_sndrcv (source, source_count, source_datatype, target, target_count, target_datatype); @@ -150,24 +125,14 @@ static inline int ompi_osc_pt2pt_get_self (void *target, int target_count, ompi_ return OMPI_SUCCESS; } -static inline int ompi_osc_pt2pt_cas_self (const void *source, const void *compare, void *result, ompi_datatype_t *datatype, - OPAL_PTRDIFF_TYPE target_disp, ompi_osc_pt2pt_module_t *module) +static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, const void *compare, void *result, + ompi_datatype_t *datatype, OPAL_PTRDIFF_TYPE target_disp, ompi_osc_pt2pt_module_t *module) { void *target = (unsigned char*) module->baseptr + ((unsigned long) target_disp * module->disp_unit); /* if we are in active target mode wait until all post messages arrive */ - if (module->sc_group && !module->active_eager_send_active) { - OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_post_msgs) { - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); - } - - if (!(module->passive_target_access_epoch || module->active_eager_send_active)) { - return OMPI_ERR_RMA_SYNC; - } + ompi_osc_pt2pt_sync_wait (pt2pt_sync); ompi_osc_pt2pt_accumulate_lock (module); @@ -182,26 +147,16 @@ static inline int ompi_osc_pt2pt_cas_self (const void *source, const void *compa return OMPI_SUCCESS; } -static inline int ompi_osc_pt2pt_acc_self (const void *source, int source_count, ompi_datatype_t *source_datatype, - OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype, - ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request) +static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count, ompi_datatype_t *source_datatype, + OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype, + ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request) { void *target = (unsigned char*) module->baseptr + ((unsigned long) target_disp * module->disp_unit); int ret; /* if we are in active target mode wait until all post messages arrive */ - if (module->sc_group && !module->active_eager_send_active) { - OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_post_msgs) { - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); - } - - if (!(module->passive_target_access_epoch || module->active_eager_send_active)) { - return OMPI_ERR_RMA_SYNC; - } + ompi_osc_pt2pt_sync_wait (pt2pt_sync); ompi_osc_pt2pt_accumulate_lock (module); @@ -226,27 +181,17 @@ static inline int ompi_osc_pt2pt_acc_self (const void *source, int source_count, return OMPI_SUCCESS; } -static inline int ompi_osc_pt2pt_gacc_self (const void *source, int source_count, ompi_datatype_t *source_datatype, - void *result, int result_count, ompi_datatype_t *result_datatype, - OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype, - ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request) +static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count, ompi_datatype_t *source_datatype, + void *result, int result_count, ompi_datatype_t *result_datatype, + OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype, + ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request) { void *target = (unsigned char*) module->baseptr + ((unsigned long) target_disp * module->disp_unit); int ret; /* if we are in active target mode wait until all post messages arrive */ - if (module->sc_group && !module->active_eager_send_active) { - OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_post_msgs) { - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); - } - - if (!(module->passive_target_access_epoch || module->active_eager_send_active)) { - return OMPI_ERR_RMA_SYNC; - } + ompi_osc_pt2pt_sync_wait (pt2pt_sync); ompi_osc_pt2pt_accumulate_lock (module); @@ -296,6 +241,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target); ompi_osc_pt2pt_frag_t *frag; ompi_osc_pt2pt_header_put_t *header; + ompi_osc_pt2pt_sync_t *pt2pt_sync; size_t ddt_len, payload_len, frag_len; bool is_long_datatype = false; bool is_long_msg = false; @@ -309,7 +255,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ origin_dt->name, target, (int) target_disp, target_count, target_dt->name, win->w_name)); - if (!ompi_osc_pt2pt_check_access_epoch (module, target)) { + pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL); + if (OPAL_UNLIKELY(NULL == pt2pt_sync)) { return OMPI_ERR_RMA_SYNC; } @@ -324,9 +271,9 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ /* optimize self communication. TODO: optimize local communication */ if (ompi_comm_rank (module->comm) == target) { - return ompi_osc_pt2pt_put_self (origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - module, request); + return ompi_osc_pt2pt_put_self (pt2pt_sync, origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + module, request); } /* Compute datatype and payload lengths. Note that the datatype description @@ -354,16 +301,10 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ tag = get_tag(module); } - /* flush will be called at the end of this function. make sure the post message has + /* flush will be called at the end of this function. make sure all post messages have * arrived. */ - if ((is_long_msg || request) && module->sc_group) { - OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_post_msgs) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); + if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + ompi_osc_pt2pt_sync_wait (pt2pt_sync); } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -478,6 +419,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, bool is_long_msg = false; ompi_osc_pt2pt_frag_t *frag; ompi_osc_pt2pt_header_acc_t *header; + ompi_osc_pt2pt_sync_t *pt2pt_sync; size_t ddt_len, payload_len, frag_len; char *ptr; const void *packed_ddt; @@ -490,7 +432,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, target_count, target_dt->name, op->o_name, win->w_name)); - if (!ompi_osc_pt2pt_check_access_epoch (module, target)) { + pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL); + if (OPAL_UNLIKELY(NULL == pt2pt_sync)) { return OMPI_ERR_RMA_SYNC; } @@ -505,9 +448,9 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, /* optimize the self case. TODO: optimize the local case */ if (ompi_comm_rank (module->comm) == target) { - return ompi_osc_pt2pt_acc_self (origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - op, module, request); + return ompi_osc_pt2pt_acc_self (pt2pt_sync, origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + op, module, request); } /* Compute datatype and payload lengths. Note that the datatype description @@ -535,16 +478,10 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, tag = get_tag (module); } - /* flush will be called at the end of this function. make sure the post message has + /* flush will be called at the end of this function. make sure all post messages have * arrived. */ - if ((is_long_msg || request) && module->sc_group) { - OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_post_msgs) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); + if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + ompi_osc_pt2pt_sync_wait (pt2pt_sync); } header = (ompi_osc_pt2pt_header_acc_t*) ptr; @@ -656,6 +593,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target); ompi_osc_pt2pt_frag_t *frag; ompi_osc_pt2pt_header_cswap_t *header; + ompi_osc_pt2pt_sync_t *pt2pt_sync; size_t ddt_len, payload_len, frag_len; ompi_osc_pt2pt_request_t *request; const void *packed_ddt; @@ -668,14 +606,15 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar (unsigned long) result_addr, dt->name, target, (int) target_disp, win->w_name)); - if (!ompi_osc_pt2pt_check_access_epoch (module, target)) { + pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL); + if (OPAL_UNLIKELY(NULL == pt2pt_sync)) { return OMPI_ERR_RMA_SYNC; } /* optimize self case. TODO: optimize local case */ if (ompi_comm_rank (module->comm) == target) { - return ompi_osc_pt2pt_cas_self (origin_addr, compare_addr, result_addr, dt, target_disp, - module); + return ompi_osc_pt2pt_cas_self (pt2pt_sync, origin_addr, compare_addr, result_addr, dt, target_disp, + module); } /* compare-and-swaps are always request based, so that we know where to land the data */ @@ -697,6 +636,11 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar /* we need to send both the origin and compare buffers */ payload_len = dt->super.size * 2; + ret = ompi_datatype_get_pack_description(dt, &packed_ddt); + if (OMPI_SUCCESS != ret) { + return ret; + } + frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len; ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); if (OMPI_SUCCESS != ret) { @@ -715,7 +659,6 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar osc_pt2pt_hton(header, proc); ptr += sizeof(ompi_osc_pt2pt_header_cswap_t); - ret = ompi_datatype_get_pack_description(dt, &packed_ddt); memcpy((unsigned char*) ptr, packed_ddt, ddt_len); ptr += ddt_len; @@ -802,6 +745,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co bool is_long_datatype = false; ompi_osc_pt2pt_frag_t *frag; ompi_osc_pt2pt_header_get_t *header; + ompi_osc_pt2pt_sync_t *pt2pt_sync; size_t ddt_len, frag_len; char *ptr; const void *packed_ddt; @@ -813,7 +757,8 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co origin_dt->name, target, (int) target_disp, target_count, target_dt->name, win->w_name)); - if (!ompi_osc_pt2pt_check_access_epoch (module, target)) { + pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL); + if (OPAL_UNLIKELY(NULL == pt2pt_sync)) { return OMPI_ERR_RMA_SYNC; } @@ -835,9 +780,9 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co /* optimize self communication. TODO: optimize local communication */ if (ompi_comm_rank (module->comm) == target) { *request = &pt2pt_request->super; - return ompi_osc_pt2pt_get_self (origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - module, pt2pt_request); + return ompi_osc_pt2pt_get_self (pt2pt_sync, origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + module, pt2pt_request); } pt2pt_request->type = OMPI_OSC_PT2PT_HDR_TYPE_GET; @@ -868,14 +813,10 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co /* for bookkeeping the get is "outgoing" */ ompi_osc_signal_outgoing (module, target, 1); - /* flush will be called at the end of this function. make sure the post message has + /* flush will be called at the end of this function. make sure all post messages have * arrived. */ - if (!release_req && module->sc_group) { - while (0 != module->num_post_msgs) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); - opal_condition_wait(&module->cond, &module->lock); - } + if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + ompi_osc_pt2pt_sync_wait (pt2pt_sync); } header = (ompi_osc_pt2pt_header_get_t*) ptr; @@ -1017,6 +958,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin bool is_long_msg = false; ompi_osc_pt2pt_frag_t *frag; ompi_osc_pt2pt_header_acc_t *header; + ompi_osc_pt2pt_sync_t *pt2pt_sync; size_t ddt_len, payload_len, frag_len; char *ptr; const void *packed_ddt; @@ -1030,7 +972,8 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin target_rank, (int) target_disp, target_count, target_datatype->name, op->o_name, win->w_name)); - if (!ompi_osc_pt2pt_check_access_epoch (module, target_rank)) { + pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target_rank, NULL); + if (OPAL_UNLIKELY(NULL == pt2pt_sync)) { return OMPI_ERR_RMA_SYNC; } @@ -1052,10 +995,10 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin /* optimize the self case. TODO: optimize the local case */ if (ompi_comm_rank (module->comm) == target_rank) { *request = &pt2pt_request->super; - return ompi_osc_pt2pt_gacc_self (origin_addr, origin_count, origin_datatype, - result_addr, result_count, result_datatype, - target_disp, target_count, target_datatype, - op, module, pt2pt_request); + return ompi_osc_pt2pt_gacc_self (pt2pt_sync, origin_addr, origin_count, origin_datatype, + result_addr, result_count, result_datatype, + target_disp, target_count, target_datatype, + op, module, pt2pt_request); } pt2pt_request->type = OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC; @@ -1102,16 +1045,10 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin /* increment the number of outgoing fragments */ ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests); - /* flush will be called at the end of this function. make sure the post message has + /* flush will be called at the end of this function. make sure all post messages have * arrived. */ - if (!release_req && module->sc_group) { - OPAL_THREAD_LOCK(&module->lock); - while (0 != module->num_post_msgs) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); + if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) { + ompi_osc_pt2pt_sync_wait (pt2pt_sync); } header = (ompi_osc_pt2pt_header_acc_t *) ptr; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index 4c2d06d74f..6a8f53ebc8 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -28,25 +28,12 @@ #include #include "osc_pt2pt.h" -#include "osc_pt2pt_data_move.h" #include "osc_pt2pt_frag.h" #include "osc_pt2pt_request.h" +#include "osc_pt2pt_data_move.h" -#include "opal/threads/condition.h" -#include "opal/threads/mutex.h" -#include "opal/util/arch.h" -#include "opal/align.h" -#include "opal/mca/btl/btl.h" - -#include "ompi/info/info.h" -#include "ompi/communicator/communicator.h" -#include "ompi/mca/osc/osc.h" -#include "ompi/mca/osc/base/base.h" #include "ompi/mca/osc/base/osc_base_obj_convert.h" -#include "opal/mca/btl/btl.h" -#include "ompi/mca/pml/pml.h" -static int component_open(void); static int component_register(void); static int component_init(bool enable_progress_threads, bool enable_mpi_threads); static int component_finalize(void); @@ -64,7 +51,6 @@ ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = { .mca_component_name = "pt2pt", MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION, OMPI_RELEASE_VERSION), - .mca_open_component = component_open, .mca_register_component_params = component_register, }, .osc_data = { @@ -128,53 +114,15 @@ bool ompi_osc_pt2pt_no_locks = false; /* look up parameters for configuring this window. The code first looks in the info structure passed by the user, then through mca parameters. */ -static bool -check_config_value_bool(char *key, ompi_info_t *info) +static bool check_config_value_bool(char *key, ompi_info_t *info, bool result) { - char *value_string; - int value_len, ret, flag, param; - const bool *flag_value; - bool result; + int flag; - ret = ompi_info_get_valuelen(info, key, &value_len, &flag); - if (OMPI_SUCCESS != ret) goto info_not_found; - if (flag == 0) goto info_not_found; - value_len++; - - value_string = (char*)malloc(sizeof(char) * value_len + 1); /* Should malloc 1 char for NUL-termination */ - if (NULL == value_string) goto info_not_found; - - ret = ompi_info_get(info, key, value_len, value_string, &flag); - if (OMPI_SUCCESS != ret) { - free(value_string); - goto info_not_found; - } - assert(flag != 0); - ret = ompi_info_value_to_bool(value_string, &result); - free(value_string); - if (OMPI_SUCCESS != ret) goto info_not_found; + (void) ompi_info_get_bool (info, key, &result, &flag); return result; - - info_not_found: - param = mca_base_var_find("ompi", "osc", "pt2pt", key); - if (0 > param) return false; - - ret = mca_base_var_get_value(param, &flag_value, NULL, NULL); - if (OMPI_SUCCESS != ret) return false; - - return flag_value[0]; } - -static int -component_open(void) -{ - return OMPI_SUCCESS; -} - - -static int -component_register(void) +static int component_register (void) { ompi_osc_pt2pt_no_locks = false; (void) mca_base_component_var_register(&mca_osc_pt2pt_component.super.osc_version, @@ -346,15 +294,26 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit /* initialize the objects, so that always free in cleanup */ OBJ_CONSTRUCT(&module->lock, opal_mutex_t); OBJ_CONSTRUCT(&module->cond, opal_condition_t); - OBJ_CONSTRUCT(&module->acc_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->locks_pending, opal_list_t); OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t); - OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t); + OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); - OBJ_CONSTRUCT(&module->pending_posts, opal_list_t); OBJ_CONSTRUCT(&module->request_gc, opal_list_t); OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t); OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t); + OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t); + OBJ_CONSTRUCT(&module->peer_hash, opal_hash_table_t); + OBJ_CONSTRUCT(&module->peer_lock, opal_mutex_t); + + ret = opal_hash_table_init (&module->outstanding_locks, 64); + if (OPAL_SUCCESS != ret) { + goto cleanup; + } + + ret = opal_hash_table_init (&module->peer_hash, 128); + if (OPAL_SUCCESS != ret) { + goto cleanup; + } /* options */ /* FIX ME: should actually check this value... */ @@ -388,17 +347,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit /* record my displacement unit. Always resolved at target */ module->disp_unit = disp_unit; - /* peer data */ - module->peers = calloc(ompi_comm_size(comm), sizeof(ompi_osc_pt2pt_peer_t)); - if (NULL == module->peers) { - ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; - goto cleanup; - } - - for (int i = 0 ; i < ompi_comm_size (comm) ; ++i) { - OBJ_CONSTRUCT(module->peers + i, ompi_osc_pt2pt_peer_t); - } - /* peer op count data */ module->epoch_outgoing_frag_count = calloc (ompi_comm_size(comm), sizeof(uint32_t)); if (NULL == module->epoch_outgoing_frag_count) { @@ -408,18 +356,16 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit /* the statement below (from Brian) does not seem correct so disable active target on the * window. if this end up being incorrect please revert this one change */ - module->active_eager_send_active = false; #if 0 /* initially, we're in that pseudo-fence state, so we allow eager sends (yay for Fence). Other protocols will disable before they start their epochs, so this isn't a problem. */ - module->active_eager_send_active = true; + module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE; + module->all_sync.eager_send_active = true; #endif /* lock data */ - if (check_config_value_bool("no_locks", info)) { - win->w_flags |= OMPI_WIN_NO_LOCKS; - } + module->no_locks = check_config_value_bool ("no_locks", info, ompi_osc_pt2pt_no_locks); /* update component data */ OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); @@ -460,6 +406,10 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit mca_osc_pt2pt_component.progress_enable = true; } + if (module->no_locks) { + win->w_flags |= OMPI_WIN_NO_LOCKS; + } + OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, "done creating pt2pt window %d", ompi_comm_get_cid(module->comm))); @@ -503,6 +453,9 @@ static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer) { OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t); OBJ_CONSTRUCT(&peer->lock, opal_mutex_t); + peer->active_frag = NULL; + peer->passive_incoming_frag_count = 0; + peer->unexpected_post = false; } static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index c49f0f72ce..4e349518e2 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. @@ -21,26 +21,19 @@ * $HEADER$ */ -#include "ompi_config.h" - #include "osc_pt2pt.h" #include "osc_pt2pt_header.h" #include "osc_pt2pt_data_move.h" #include "osc_pt2pt_frag.h" #include "osc_pt2pt_request.h" -#include "opal/threads/condition.h" -#include "opal/threads/mutex.h" #include "opal/util/arch.h" -#include "opal/util/output.h" #include "opal/sys/atomic.h" #include "opal/align.h" -#include "opal/mca/btl/btl.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/pml/base/pml_base_sendreq.h" #include "opal/mca/btl/btl.h" -#include "ompi/mca/osc/base/base.h" #include "ompi/mca/osc/base/osc_base_obj_convert.h" #include "ompi/datatype/ompi_datatype.h" #include "ompi/op/op.h" @@ -218,7 +211,7 @@ static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count) * to a target) before this is sent. */ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target, - void *data, size_t len) + void *data, size_t len) { ompi_osc_pt2pt_frag_t *frag; char *ptr; @@ -494,6 +487,7 @@ static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *sourc ompi_datatype_t *datatype, int peer, int tag) { struct osc_pt2pt_get_post_send_cb_data_t *data; + int ret; data = malloc (sizeof (*data)); if (OPAL_UNLIKELY(NULL == data)) { @@ -505,8 +499,14 @@ static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *sourc * in an active target epoch) */ data->peer = (tag & 0x1) ? peer : MPI_PROC_NULL; - return ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm, + /* data will be freed by the callback */ + ret = ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm, osc_pt2pt_get_post_send_cb, (void *) data); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + free (data); + } + + return ret; } /** @@ -712,7 +712,7 @@ static int accumulate_cb (ompi_request_t *request) static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source, char *data, size_t data_len, ompi_datatype_t *datatype) { - ompi_osc_pt2pt_peer_t *peer = module->peers + source; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); osc_pt2pt_pending_acc_t *pending_acc; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -877,14 +877,14 @@ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int s } ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count, - datatype, op, 1, &acc_data); + datatype, op, 1, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { free (buffer); break; } ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype, source, - acc_header->tag, module->comm, NULL, accumulate_cb, acc_data); + acc_header->tag, module->comm, NULL, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(acc_data); } @@ -985,12 +985,6 @@ static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source buflen = datatype_buffer_length (datatype, acc_header->count); do { - buffer = malloc (buflen); - if (OPAL_UNLIKELY(NULL == buffer)) { - ret = OMPI_ERR_OUT_OF_RESOURCE; - break; - } - ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -998,9 +992,16 @@ static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source primitive_count *= acc_header->count; + buffer = malloc (buflen); + if (OPAL_UNLIKELY(NULL == buffer)) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + break; + } + ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count, - datatype, op, 2, &acc_data); + datatype, op, 2, &acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + free (buffer); break; } @@ -1337,20 +1338,8 @@ static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source, static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_complete_t *complete_header) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc pt2pt: process_complete got complete message from %d. expected fragment count %d. " - "current signal count %d. current incomming count: %d", - source, complete_header->frag_count, module->active_incoming_frag_signal_count, - module->active_incoming_frag_count)); - /* the current fragment is not part of the frag_count so we need to add it here */ - OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count, - complete_header->frag_count + 1); - - - if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) { - opal_condition_broadcast (&module->cond); - } + osc_pt2pt_incoming_complete (module, source, complete_header->frag_count + 1); return sizeof (*complete_header); } @@ -1361,7 +1350,7 @@ static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source, static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_flush_t *flush_header) { - ompi_osc_pt2pt_peer_t *peer = module->peers + source; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -1395,7 +1384,7 @@ static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source, static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_unlock_t *unlock_header) { - ompi_osc_pt2pt_peer_t *peer = module->peers + source; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -1653,10 +1642,11 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request) process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header); /* only data fragments should be included in the completion counters */ - mark_incoming_completion (module, (base_header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL); + mark_incoming_completion (module, (base_header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ? + source : MPI_PROC_NULL); break; case OMPI_OSC_PT2PT_HDR_TYPE_POST: - (void) osc_pt2pt_incoming_post (module, source); + osc_pt2pt_incoming_post (module, source); break; case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK: ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c index 0e0c588bef..b926cae157 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c @@ -12,17 +12,12 @@ * $HEADER$ */ -#include "ompi_config.h" - -#include "opal/class/opal_list.h" -#include "ompi/mca/osc/base/base.h" -#include "ompi/mca/pml/pml.h" - #include "osc_pt2pt.h" #include "osc_pt2pt_frag.h" #include "osc_pt2pt_data_move.h" -static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag){ +static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag) +{ frag->buffer = frag->super.ptr; } @@ -68,7 +63,7 @@ static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *fr int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *frag) { - ompi_osc_pt2pt_peer_t *peer = module->peers + frag->target; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, frag->target); int ret; assert(0 == frag->pending && peer->active_frag != frag); @@ -79,7 +74,7 @@ int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module, /* if eager sends are not active, can't send yet, so buffer and get out... */ - if (!(peer->eager_send_active || module->all_access_epoch) || opal_list_get_size (&peer->queued_frags)) { + if (!ompi_osc_pt2pt_peer_sends_active (module, frag->target) || opal_list_get_size (&peer->queued_frags)) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "queuing fragment to peer %d", frag->target)); OPAL_THREAD_SCOPED_LOCK(&peer->lock, @@ -97,9 +92,9 @@ int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module, return ret; } -static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, int target) +static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_peer_t *peer) { - ompi_osc_pt2pt_frag_t *active_frag = module->peers[target].active_frag; + ompi_osc_pt2pt_frag_t *active_frag = peer->active_frag; int ret = OMPI_SUCCESS; if (NULL == active_frag) { @@ -108,16 +103,16 @@ static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, in } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc pt2pt: flushing active fragment to target %d. pending: %d", target, - active_frag->pending)); + "osc pt2pt: flushing active fragment to target %d. pending: %d", + active_frag->target, active_frag->pending)); - if (opal_atomic_cmpset (&module->peers[target].active_frag, active_frag, NULL)) { + if (opal_atomic_cmpset (&peer->active_frag, active_frag, NULL)) { if (0 != OPAL_THREAD_ADD32(&active_frag->pending, -1)) { /* communication going on while synchronizing; this is an rma usage bug */ return OMPI_ERR_RMA_SYNC; } - ompi_osc_signal_outgoing (module, target, 1); + ompi_osc_signal_outgoing (module, active_frag->target, 1); ret = frag_send (module, active_frag); } @@ -126,7 +121,7 @@ static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, in int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target) { - ompi_osc_pt2pt_peer_t *peer = module->peers + target; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); ompi_osc_pt2pt_frag_t *frag; int ret = OMPI_SUCCESS; @@ -150,7 +145,7 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe } /* flush the active frag */ - ret = ompi_osc_pt2pt_flush_active_frag (module, target); + ret = ompi_osc_pt2pt_flush_active_frag (module, peer); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: frag flush target %d finished", target)); @@ -162,41 +157,20 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module) { int ret = OMPI_SUCCESS; - ompi_osc_pt2pt_frag_t *frag; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: frag flush all begin")); - /* try to start all the queued frags */ + /* try to start frags queued to all peers */ for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) { - ompi_osc_pt2pt_peer_t *peer = module->peers + i; - - while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) { - ret = frag_send(module, frag); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - break; - } - } - - /* XXX -- TODO -- better error handling */ + ret = ompi_osc_pt2pt_frag_flush_target (module, i); if (OMPI_SUCCESS != ret) { - return ret; + break; } } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc pt2pt: flushing all active fragments")); - - /* flush the active frag */ - for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { - ret = ompi_osc_pt2pt_flush_active_frag (module, i); - if (OMPI_SUCCESS != ret) { - return ret; - } - } - - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc pt2pt: frag flush all done")); + "osc pt2pt: frag flush all done. ret: %d", ret)); return ret; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index 7417f7bc00..515ce82fdf 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -51,6 +51,7 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in size_t request_len, ompi_osc_pt2pt_frag_t **buffer, char **ptr) { + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); ompi_osc_pt2pt_frag_t *curr; int ret; @@ -64,13 +65,13 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in } OPAL_THREAD_LOCK(&module->lock); - curr = module->peers[target].active_frag; + curr = peer->active_frag; if (NULL == curr || curr->remain_len < request_len) { opal_free_list_item_t *item = NULL; if (NULL != curr) { curr->remain_len = 0; - module->peers[target].active_frag = NULL; + peer->active_frag = NULL; opal_atomic_mb (); /* If there's something pending, the pending finish will @@ -87,8 +88,7 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in if (OPAL_UNLIKELY(NULL == item)) { return OMPI_ERR_OUT_OF_RESOURCE; } - curr = module->peers[target].active_frag = - (ompi_osc_pt2pt_frag_t*) item; + curr = peer->active_frag = (ompi_osc_pt2pt_frag_t*) item; curr->target = target; @@ -105,7 +105,6 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in } curr->header->source = ompi_comm_rank(module->comm); curr->header->num_ops = 0; - curr->header->windx = ompi_comm_get_cid(module->comm); if (curr->remain_len < request_len) { OPAL_THREAD_UNLOCK(&module->lock); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_header.h b/ompi/mca/osc/pt2pt/osc_pt2pt_header.h index 8d38f71d1e..f979d9bf61 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_header.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_header.h @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved. @@ -118,7 +118,6 @@ typedef struct ompi_osc_pt2pt_header_cswap_t ompi_osc_pt2pt_header_cswap_t; struct ompi_osc_pt2pt_header_post_t { ompi_osc_pt2pt_header_base_t base; - uint16_t windx; }; typedef struct ompi_osc_pt2pt_header_post_t ompi_osc_pt2pt_header_post_t; @@ -134,7 +133,6 @@ typedef struct ompi_osc_pt2pt_header_lock_t ompi_osc_pt2pt_header_lock_t; struct ompi_osc_pt2pt_header_lock_ack_t { ompi_osc_pt2pt_header_base_t base; - uint16_t windx; uint32_t source; uint64_t lock_ptr; }; @@ -166,7 +164,7 @@ struct ompi_osc_pt2pt_header_flush_t { uint8_t padding[2]; #endif uint32_t frag_count; - uint64_t serial_number; + uint64_t lock_ptr; }; typedef struct ompi_osc_pt2pt_header_flush_t ompi_osc_pt2pt_header_flush_t; @@ -175,13 +173,12 @@ struct ompi_osc_pt2pt_header_flush_ack_t { #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT uint8_t padding[6]; #endif - uint64_t serial_number; + uint64_t lock_ptr; }; typedef struct ompi_osc_pt2pt_header_flush_ack_t ompi_osc_pt2pt_header_flush_ack_t; struct ompi_osc_pt2pt_frag_header_t { ompi_osc_pt2pt_header_base_t base; - uint16_t windx; /* cid of communicator backing window (our window id) */ uint32_t source; /* rank in window of source process */ int32_t num_ops; /* number of operations in this buffer */ uint32_t pad; /* ensure the fragment header is a multiple of 8 bytes */ @@ -208,12 +205,10 @@ typedef union ompi_osc_pt2pt_header_t ompi_osc_pt2pt_header_t; #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT #define MCA_OSC_PT2PT_FRAG_HDR_NTOH(h) \ - (h).windx = ntohs((h).windx); \ (h).source = ntohl((h).source); \ (h).num_ops = ntohl((h).num_ops); \ (h).pad = ntohl((h).pad); #define MCA_OSC_PT2PT_FRAG_HDR_HTON(h) \ - (h).windx = htons((h).windx); \ (h).source = htonl((h).source); \ (h).num_ops = htonl((h).num_ops); \ (h).pad = htonl((h).pad); @@ -254,34 +249,24 @@ typedef union ompi_osc_pt2pt_header_t ompi_osc_pt2pt_header_t; (h).op = htonl((h).op); #define MCA_OSC_PT2PT_LOCK_HDR_NTOH(h) \ - (h).lock_type = ntohl((h).lock_type); \ - (h).lock_ptr = ntoh64((h).lock_ptr) + (h).lock_type = ntohl((h).lock_type) #define MCA_OSC_PT2PT_LOCK_HDR_HTON(h) \ - (h).lock_type = htonl((h).lock_type); \ - (h).lock_ptr = hton64((h).lock_ptr) + (h).lock_type = htonl((h).lock_type) #define MCA_OSC_PT2PT_UNLOCK_HDR_NTOH(h) \ (h).lock_type = ntohl((h).lock_type); \ - (h).lock_ptr = ntoh64((h).lock_ptr); \ (h).frag_count = ntohl((h).frag_count) #define MCA_OSC_PT2PT_UNLOCK_HDR_HTON(h) \ (h).lock_type = htonl((h).lock_type); \ - (h).lock_ptr = hton64((h).lock_ptr); \ (h).frag_count = htonl((h).frag_count) #define MCA_OSC_PT2PT_LOCK_ACK_HDR_NTOH(h) \ - (h).windx = ntohs((h).windx); \ - (h).source = ntohl((h).source); \ - (h).lock_ptr = ntoh64((h).lock_ptr) + (h).source = ntohl((h).source) #define MCA_OSC_PT2PT_LOCK_ACK_HDR_HTON(h) \ - (h).windx = htonl((h).windx); \ - (h).source= htonl((h).source); \ - (h).lock_ptr = hton64((h).lock_ptr) + (h).source= htonl((h).source) -#define MCA_OSC_PT2PT_UNLOCK_ACK_HDR_NTOH(h) \ - (h).lock_ptr = ntoh64((h).lock_ptr); -#define MCA_OSC_PT2PT_UNLOCK_ACK_HDR_HTON(h) \ - (h).lock_ptr = hton64((h).lock_ptr); +#define MCA_OSC_PT2PT_UNLOCK_ACK_HDR_NTOH(h) +#define MCA_OSC_PT2PT_UNLOCK_ACK_HDR_HTON(h) #define MCA_OSC_PT2PT_COMPLETE_HDR_NTOH(h) \ (h).frag_count = ntohl((h).frag_count) @@ -289,21 +274,15 @@ typedef union ompi_osc_pt2pt_header_t ompi_osc_pt2pt_header_t; (h).frag_count = htonl((h).frag_count) #define MCA_OSC_PT2PT_FLUSH_HDR_NTOH(h) \ - (h).frag_count = ntohl((h).frag_count); \ - (h).serial_number = ntoh64((h).serial_number) + (h).frag_count = ntohl((h).frag_count) #define MCA_OSC_PT2PT_FLUSH_HDR_HTON(h) \ - (h).frag_count = htonl((h).frag_count); \ - (h).serial_number = ntoh64((h).serial_number) + (h).frag_count = htonl((h).frag_count) -#define MCA_OSC_PT2PT_FLUSH_ACK_HDR_NTOH(h) \ - (h).serial_number = ntoh64((h).serial_number) -#define MCA_OSC_PT2PT_FLUSH_ACK_HDR_HTON(h) \ - (h).serial_number = ntoh64((h).serial_number) +#define MCA_OSC_PT2PT_FLUSH_ACK_HDR_NTOH(h) +#define MCA_OSC_PT2PT_FLUSH_ACK_HDR_HTON(h) -#define MCA_OSC_PT2PT_POST_HDR_NTOH(h) \ - (h).windx = ntohs((h).windx) -#define MCA_OSC_PT2PT_POST_HDR_HTON(h) \ - (h).windx = htons((h).windx) +#define MCA_OSC_PT2PT_POST_HDR_NTOH(h) +#define MCA_OSC_PT2PT_POST_HDR_HTON(h) #define MCA_OSC_PT2PT_CSWAP_HDR_NTOH(h) \ (h).tag = ntohs((h).tag); \ diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_module.c b/ompi/mca/osc/pt2pt/osc_pt2pt_module.c index e3bcc34a2d..77ee48b746 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_module.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_module.c @@ -8,7 +8,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science @@ -20,20 +20,10 @@ * $HEADER$ */ -#include "ompi_config.h" - #include "osc_pt2pt.h" -#include "opal/threads/mutex.h" -#include "opal/mca/btl/btl.h" -#include "ompi/win/win.h" -#include "ompi/communicator/communicator.h" -#include "ompi/mca/osc/base/base.h" -#include "mpi.h" - -int -ompi_osc_pt2pt_attach(struct ompi_win_t *win, void *base, size_t len) +int ompi_osc_pt2pt_attach(struct ompi_win_t *win, void *base, size_t len) { return OMPI_SUCCESS; } @@ -46,11 +36,13 @@ ompi_osc_pt2pt_detach(struct ompi_win_t *win, const void *base) } -int -ompi_osc_pt2pt_free(ompi_win_t *win) +int ompi_osc_pt2pt_free(ompi_win_t *win) { int ret = OMPI_SUCCESS; ompi_osc_pt2pt_module_t *module = GET_MODULE(win); + ompi_osc_pt2pt_peer_t *peer; + uint32_t key; + void *node; if (NULL == module) { return OMPI_SUCCESS; @@ -78,28 +70,29 @@ ompi_osc_pt2pt_free(ompi_win_t *win) OBJ_DESTRUCT(&module->outstanding_locks); OBJ_DESTRUCT(&module->locks_pending); OBJ_DESTRUCT(&module->locks_pending_lock); - OBJ_DESTRUCT(&module->acc_lock); OBJ_DESTRUCT(&module->cond); OBJ_DESTRUCT(&module->lock); + OBJ_DESTRUCT(&module->all_sync); /* it is erroneous to close a window with active operations on it so we should * probably produce an error here instead of cleaning up */ OPAL_LIST_DESTRUCT(&module->pending_acc); - OPAL_LIST_DESTRUCT(&module->pending_posts); osc_pt2pt_gc_clean (module); OPAL_LIST_DESTRUCT(&module->request_gc); OPAL_LIST_DESTRUCT(&module->buffer_gc); OBJ_DESTRUCT(&module->gc_lock); - if (NULL != module->peers) { - for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) { - OBJ_DESTRUCT(module->peers + i); - } - - free(module->peers); + ret = opal_hash_table_get_first_key_uint32 (&module->peer_hash, &key, (void **) &peer, &node); + while (OPAL_SUCCESS == ret) { + OBJ_RELEASE(peer); + ret = opal_hash_table_get_next_key_uint32 (&module->peer_hash, &key, (void **) &peer, node, + &node); } + OBJ_DESTRUCT(&module->peer_hash); + OBJ_DESTRUCT(&module->peer_lock); + if (NULL != module->epoch_outgoing_frag_count) free(module->epoch_outgoing_frag_count); if (NULL != module->frag_request) { @@ -115,5 +108,5 @@ ompi_osc_pt2pt_free(ompi_win_t *win) free (module); - return ret; + return OMPI_SUCCESS; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 523d7da978..b3801c8681 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -37,7 +37,7 @@ #include "opal/include/opal_stdint.h" static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, int source, int lock_type, - uint64_t serial_number); + uint64_t lock_ptr); /* target-side tracking of a lock request */ struct ompi_osc_pt2pt_pending_lock_t { @@ -50,99 +50,27 @@ typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t; OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t, NULL, NULL); - -/* origin-side tracking of a lock request */ -struct ompi_osc_pt2pt_outstanding_lock_t { - opal_list_item_t super; - int target; - int assert; - bool flushing; - int32_t lock_acks_expected; - int32_t unlock_acks_expected; - int32_t flush_acks_expected; - uint64_t serial_number; - int32_t type; -}; -typedef struct ompi_osc_pt2pt_outstanding_lock_t ompi_osc_pt2pt_outstanding_lock_t; -OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_outstanding_lock_t, opal_list_item_t, - NULL, NULL); - static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module); static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, int lock_type, uint64_t lock_ptr); -static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock, +static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock, int target); - -/** - * Find the first outstanding lock to a target. - * - * @param[in] module - OSC PT2PT module - * @param[in] target - Target rank - * - * @returns an outstanding lock on success - * - * This function traverses the outstanding_locks list in the module - * looking for a lock that matches target. The caller must hold the - * module lock. - */ -static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock_st (ompi_osc_pt2pt_module_t *module, int target) -{ - ompi_osc_pt2pt_outstanding_lock_t *outstanding_lock, *lock = NULL; - - OPAL_LIST_FOREACH(outstanding_lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) { - if (outstanding_lock->target == target) { - lock = outstanding_lock; - break; - } - } - - return lock; -} - -static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock (ompi_osc_pt2pt_module_t *module, int target) -{ - ompi_osc_pt2pt_outstanding_lock_t *lock; - - OPAL_THREAD_LOCK(&module->lock); - lock = find_outstanding_lock_st (module, target); - OPAL_THREAD_UNLOCK(&module->lock); - - return lock; -} - -static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock_by_serial (ompi_osc_pt2pt_module_t *module, uint64_t serial_number) -{ - ompi_osc_pt2pt_outstanding_lock_t *outstanding_lock, *lock = NULL; - - OPAL_THREAD_LOCK(&module->lock); - OPAL_LIST_FOREACH(outstanding_lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) { - if (outstanding_lock->serial_number == serial_number) { - lock = outstanding_lock; - break; - } - } - OPAL_THREAD_UNLOCK(&module->lock); - - return lock; -} - -static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock) +static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock) { const int my_rank = ompi_comm_rank (module->comm); + int lock_type = lock->sync.lock.type; bool acquired = false; - acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock->type, (uint64_t) (uintptr_t) lock); + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); + + acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock); if (!acquired) { /* queue the lock */ - queue_lock (module, my_rank, lock->type, (uint64_t) (uintptr_t) lock); + queue_lock (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock); /* If locking local, can't be non-blocking according to the standard. We need to wait for the ack here. */ - OPAL_THREAD_LOCK(&module->lock); - while (lock->lock_acks_expected) { - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); + ompi_osc_pt2pt_sync_wait_expected (lock); } OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, @@ -151,12 +79,16 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp return OMPI_SUCCESS; } -static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock) +static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock) { + int lock_type = lock->sync.lock.type; + + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock_self: unlocking myself. lock state = %d", module->lock_status)); - if (MPI_LOCK_EXCLUSIVE == lock->type) { + if (MPI_LOCK_EXCLUSIVE == lock_type) { OPAL_THREAD_ADD32(&module->lock_status, 1); ompi_osc_activate_next_lock (module); } else if (0 == OPAL_THREAD_ADD32(&module->lock_status, -1)) { @@ -166,15 +98,18 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, /* need to ensure we make progress */ opal_progress(); - OPAL_THREAD_ADD32(&lock->unlock_acks_expected, -1); + ompi_osc_pt2pt_sync_expected (lock); } -static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock) +static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock) { + int lock_type = lock->sync.lock.type; ompi_osc_pt2pt_header_lock_t lock_req; int ret; + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); + /* generate a lock request */ lock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ; lock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; @@ -182,7 +117,7 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i lock_req.padding[0] = 0; lock_req.padding[1] = 0; #endif - lock_req.lock_type = lock->type; + lock_req.lock_type = lock_type; lock_req.lock_ptr = (uint64_t) (uintptr_t) lock; OSC_PT2PT_HTON(&lock_req, module, target); @@ -197,11 +132,15 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i return ret; } -static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock) +static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock) { - ompi_osc_pt2pt_peer_t *peer = module->peers + target; - ompi_osc_pt2pt_header_unlock_t unlock_req; int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1); + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); + int lock_type = lock->sync.lock.type; + ompi_osc_pt2pt_header_unlock_t unlock_req; + int ret; + + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); unlock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ; unlock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; @@ -210,7 +149,7 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, unlock_req.padding[1] = 0; #endif unlock_req.frag_count = frag_count; - unlock_req.lock_type = lock->type; + unlock_req.lock_type = lock_type; unlock_req.lock_ptr = (uint64_t) (uintptr_t) lock; OSC_PT2PT_HTON(&unlock_req, module, target); @@ -225,20 +164,27 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, unlock_req.frag_count)); /* send control message with unlock request and count */ - return ompi_osc_pt2pt_control_send (module, target, &unlock_req, sizeof (unlock_req)); + ret = ompi_osc_pt2pt_control_send (module, target, &unlock_req, sizeof (unlock_req)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return ret; + } + + return ompi_osc_pt2pt_frag_flush_target(module, target); } -static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock) +static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock) { - ompi_osc_pt2pt_peer_t *peer = module->peers + target; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); ompi_osc_pt2pt_header_flush_t flush_req; int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1); int ret; + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); + flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ; flush_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; flush_req.frag_count = frag_count; - flush_req.serial_number = lock->serial_number; + flush_req.lock_ptr = (uint64_t) (uintptr_t) lock; /* XXX -- TODO -- since fragment are always delivered in order we do not need to count anything but long * requests. once that is done this can be removed. */ @@ -262,16 +208,17 @@ static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, return ompi_osc_pt2pt_frag_flush_target (module, target); } -static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock) +static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock) { int my_rank = ompi_comm_rank (module->comm); - int target = lock->target; - int assert = lock->assert; + int target = lock->sync.lock.target; + int assert = lock->sync.lock.assert; int ret; + assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK); + if (0 == (assert & MPI_MODE_NOCHECK)) { - lock->lock_acks_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1; - lock->unlock_acks_expected = lock->lock_acks_expected; + lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1; if (my_rank != target && target != -1) { ret = ompi_osc_pt2pt_lock_remote (module, target, lock); @@ -305,63 +252,87 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - ompi_osc_pt2pt_outstanding_lock_t *lock; - ompi_osc_pt2pt_peer_t *peer = NULL; + ompi_osc_pt2pt_sync_t *lock; int ret = OMPI_SUCCESS; - if (-1 != target) { - peer = module->peers + target; + /* Check if no_locks is set. TODO: we also need to track whether we are in an + * active target epoch. Fence can make this tricky to track. */ + if (-1 == target) { + if (module->all_sync.epoch_active) { + OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, "osc/pt2pt: attempted " + "to lock all when active target epoch is %s and lock all epoch is %s. type %d", + (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK != module->all_sync.type && module->all_sync.epoch_active) ? + "active" : "inactive", + (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type) ? "active" : "inactive", + module->all_sync.type)); + return OMPI_ERR_RMA_SYNC; + } + } else { + if (module->all_sync.epoch_active && (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK != module->all_sync.type || MPI_LOCK_EXCLUSIVE == lock_type)) { + /* impossible to get an exclusive lock while holding a global shared lock or in a active + * target access epoch */ + return OMPI_ERR_RMA_SYNC; + } } /* Check if no_locks is set. TODO: we also need to track whether we are in an * active target epoch. Fence can make this tricky to track. */ - if (module->sc_group) { + if (module->all_sync.epoch_active || (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type && + (MPI_LOCK_EXCLUSIVE == lock_type || -1 == target))) { + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: attempted " + "to acquire a lock on %d with type %d when active sync is %s and lock " + "all epoch is %s", target, lock_type, module->all_sync.epoch_active ? "active" : "inactive", + (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type && + (MPI_LOCK_EXCLUSIVE == lock_type || -1 == target)) ? "active" : "inactive")); return OMPI_ERR_RMA_SYNC; } + if (OMPI_OSC_PT2PT_SYNC_TYPE_FENCE == module->all_sync.type) { + /* if not communication has occurred during a fence epoch then we can enter a lock epoch + * just need to clear the all access epoch */ + module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE; + } + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: lock %d %d", target, lock_type)); /* create lock item */ - lock = OBJ_NEW(ompi_osc_pt2pt_outstanding_lock_t); - if (OPAL_UNLIKELY(NULL == lock)) { - return OMPI_ERR_OUT_OF_RESOURCE; + if (-1 != target) { + lock = ompi_osc_pt2pt_sync_allocate (module); + if (OPAL_UNLIKELY(NULL == lock)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + } else { + lock = &module->all_sync; } - lock->target = target; - lock->lock_acks_expected = 0; - lock->unlock_acks_expected = 0; - lock->serial_number = OPAL_THREAD_ADD64((int64_t *) &module->lock_serial_number, 1); - lock->type = lock_type; - lock->assert = assert; + lock->type = OMPI_OSC_PT2PT_SYNC_TYPE_LOCK; + lock->sync.lock.target = target; + lock->sync.lock.type = lock_type; + lock->sync.lock.assert = assert; + + lock->sync_expected = 0; /* delay all eager sends until we've heard back.. */ OPAL_THREAD_LOCK(&module->lock); /* check for conflicting lock */ - if (find_outstanding_lock_st (module, target)) { - OBJ_RELEASE(lock); + if (ompi_osc_pt2pt_module_lock_find (module, target, NULL)) { + ompi_osc_pt2pt_sync_return (lock); OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_RMA_CONFLICT; } - /* when the lock ack returns we will be in an access epoch with this peer/all peers (target = -1) */ - if (-1 == target) { - module->all_access_epoch = true; - } else { - peer->access_epoch = true; - } - ++module->passive_target_access_epoch; - opal_list_append(&module->outstanding_locks, &lock->super); + ompi_osc_pt2pt_module_lock_insert (module, lock); + OPAL_THREAD_UNLOCK(&module->lock); ret = ompi_osc_pt2pt_lock_internal_execute (module, lock); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - OPAL_THREAD_SCOPED_LOCK(&module->lock, - opal_list_remove_item(&module->outstanding_locks, &lock->super)); - OBJ_RELEASE(lock); + OPAL_THREAD_SCOPED_LOCK(&module->lock, ompi_osc_pt2pt_module_lock_remove (module, lock)); + ompi_osc_pt2pt_sync_return (lock); } return ret; @@ -370,20 +341,15 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - ompi_osc_pt2pt_outstanding_lock_t *lock = NULL; + ompi_osc_pt2pt_sync_t *lock = NULL; int my_rank = ompi_comm_rank (module->comm); - ompi_osc_pt2pt_peer_t *peer = NULL; int ret = OMPI_SUCCESS; - if (-1 != target) { - peer = module->peers + target; - } - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock_internal: unlocking target %d", target)); OPAL_THREAD_LOCK(&module->lock); - lock = find_outstanding_lock_st (module, target); + lock = ompi_osc_pt2pt_module_lock_find (module, target, NULL); if (OPAL_UNLIKELY(NULL == lock)) { OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock: target %d is not locked in window %s", @@ -394,80 +360,65 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock_internal: lock acks still expected: %d", - lock->lock_acks_expected)); + lock->sync_expected)); /* wait until ack has arrived from target */ - while (lock->lock_acks_expected) { - opal_condition_wait(&module->cond, &module->lock); - } + ompi_osc_pt2pt_sync_wait_expected (lock); OPAL_THREAD_UNLOCK(&module->lock); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock_internal: all lock acks received")); - if (lock->assert & MPI_MODE_NOCHECK) { - /* flush instead */ - ompi_osc_pt2pt_flush_lock (module, lock, target); - } else if (my_rank != target) { - if (-1 == target) { - /* send unlock messages to all of my peers */ - for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { - if (my_rank == i) { - continue; + if (!(lock->sync.lock.assert & MPI_MODE_NOCHECK)) { + lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1; + + if (my_rank != target) { + if (-1 == target) { + /* send unlock messages to all of my peers */ + for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { + if (my_rank == i) { + continue; + } + + ret = ompi_osc_pt2pt_unlock_remote (module, i, lock); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return ret; + } } - ret = ompi_osc_pt2pt_unlock_remote (module, i, lock); + ompi_osc_pt2pt_unlock_self (module, lock); + } else { + ret = ompi_osc_pt2pt_unlock_remote (module, target, lock); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } } + /* wait for unlock acks. this signals remote completion of fragments */ + ompi_osc_pt2pt_sync_wait_expected (lock); + + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, + "ompi_osc_pt2pt_unlock: unlock of %d complete", target)); + } else { ompi_osc_pt2pt_unlock_self (module, lock); - } else { - ret = ompi_osc_pt2pt_unlock_remote (module, target, lock); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - return ret; - } } - - /* start all sendreqs to target */ - if (-1 == target) { - ret = ompi_osc_pt2pt_frag_flush_all (module); - } else { - ret = ompi_osc_pt2pt_frag_flush_target(module, target); - } - - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - return ret; - } - - /* wait for unlock acks. this signals remote completion of fragments */ - OPAL_THREAD_LOCK(&module->lock); - while (lock->unlock_acks_expected) { - opal_condition_wait(&module->cond, &module->lock); - } - OPAL_THREAD_UNLOCK(&module->lock); - - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_unlock: unlock of %d complete", target)); } else { - ompi_osc_pt2pt_unlock_self (module, lock); + /* flush instead */ + ompi_osc_pt2pt_flush_lock (module, lock, target); } OPAL_THREAD_LOCK(&module->lock); - if (-1 != target) { - peer->access_epoch = false; + if (-1 != lock->sync.lock.target) { + ompi_osc_pt2pt_sync_return (lock); } else { - module->all_access_epoch = false; + ompi_osc_pt2pt_sync_reset (lock); } - --module->passive_target_access_epoch; - opal_list_remove_item (&module->outstanding_locks, &lock->super); + --module->passive_target_access_epoch; + ompi_osc_pt2pt_module_lock_remove (module, lock); OPAL_THREAD_UNLOCK(&module->lock); - OBJ_RELEASE(lock); - return ret; } @@ -501,7 +452,7 @@ int ompi_osc_pt2pt_sync (struct ompi_win_t *win) return OMPI_SUCCESS; } -static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock, +static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock, int target) { int ret; @@ -509,21 +460,14 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ /* wait until ack has arrived from target, since we need to be able to eager send before we can transfer all the data... */ - OPAL_THREAD_LOCK(&module->lock); - while (lock->lock_acks_expected && lock->flushing) { - opal_condition_wait(&module->cond, &module->lock); - } - - lock->flushing = true; + ompi_osc_pt2pt_sync_wait_expected (lock); if (-1 == target) { - lock->flush_acks_expected = ompi_comm_size(module->comm) - 1; + lock->sync_expected = ompi_comm_size(module->comm) - 1; } else { - lock->flush_acks_expected = 1; + lock->sync_expected = 1; } - OPAL_THREAD_UNLOCK(&module->lock); - if (-1 == target) { /* NTH: no local flush */ for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { @@ -545,16 +489,9 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ } } - /* wait for all the requests and the flush ack (meaning remote completion) */ - OPAL_THREAD_LOCK(&module->lock); - while (lock->flush_acks_expected) { - opal_condition_wait(&module->cond, &module->lock); - } - - lock->flushing = false; - opal_condition_broadcast(&module->cond); - - OPAL_THREAD_UNLOCK(&module->lock); + /* wait for all flush acks (meaning remote completion) */ + ompi_osc_pt2pt_sync_wait_expected (lock); + opal_condition_broadcast (&module->cond); return OMPI_SUCCESS; } @@ -562,7 +499,7 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - ompi_osc_pt2pt_outstanding_lock_t *lock; + ompi_osc_pt2pt_sync_t *lock; int ret; assert (0 <= target); @@ -581,10 +518,14 @@ int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win) return OMPI_SUCCESS; } - lock = find_outstanding_lock (module, target); + OPAL_THREAD_LOCK(&module->lock); + lock = ompi_osc_pt2pt_module_lock_find (module, target, NULL); if (NULL == lock) { - lock = find_outstanding_lock (module, -1); + if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type) { + lock = &module->all_sync; + } } + OPAL_THREAD_UNLOCK(&module->lock); if (OPAL_UNLIKELY(NULL == lock)) { OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_flush: target %d is not locked in window %s", @@ -601,12 +542,12 @@ int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win) int ompi_osc_pt2pt_flush_all (struct ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - ompi_osc_pt2pt_outstanding_lock_t *lock; - int ret = OMPI_SUCCESS; + ompi_osc_pt2pt_sync_t *lock; + int target, ret; + void *node; /* flush is only allowed from within a passive target epoch */ - if (OPAL_UNLIKELY(!module->passive_target_access_epoch || - 0 == opal_list_get_size (&module->outstanding_locks))) { + if (OPAL_UNLIKELY(!module->passive_target_access_epoch)) { OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_flush_all: no targets are locked in window %s", win->w_name)); @@ -617,11 +558,22 @@ int ompi_osc_pt2pt_flush_all (struct ompi_win_t *win) "ompi_osc_pt2pt_flush_all entering...")); /* flush all locks */ - OPAL_LIST_FOREACH(lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) { - ret = ompi_osc_pt2pt_flush_lock (module, lock, lock->target); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - break; - } + ret = opal_hash_table_get_first_key_uint32 (&module->outstanding_locks, (uint32_t *) &target, + (void **) &lock, &node); + if (OPAL_SUCCESS == ret) { + do { + ret = ompi_osc_pt2pt_flush_lock (module, lock, lock->sync.lock.target); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } + + ret = opal_hash_table_get_next_key_uint32 (&module->outstanding_locks, (uint32_t *) &target, + (void **) lock, node, &node); + if (OPAL_SUCCESS != ret) { + ret = OPAL_SUCCESS; + break; + } + } while (1); } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -687,7 +639,7 @@ int ompi_osc_pt2pt_flush_local_all (struct ompi_win_t *win) static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor, uint64_t lock_ptr) { - ompi_osc_pt2pt_outstanding_lock_t *lock; + ompi_osc_pt2pt_sync_t *lock; if (ompi_comm_rank (module->comm) != requestor) { ompi_osc_pt2pt_header_lock_ack_t lock_ack; @@ -695,7 +647,6 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor, lock_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK; lock_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; lock_ack.source = ompi_comm_rank(module->comm); - lock_ack.windx = ompi_comm_get_cid(module->comm); lock_ack.lock_ptr = lock_ptr; OSC_PT2PT_HTON(&lock_ack, module, requestor); @@ -711,15 +662,13 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: releasing local lock")); - lock = (ompi_osc_pt2pt_outstanding_lock_t *) (uintptr_t) lock_ptr; + lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ptr; if (OPAL_UNLIKELY(NULL == lock)) { OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output, "lock could not be located")); } - if (0 == OPAL_THREAD_ADD32(&lock->lock_acks_expected, -1)) { - opal_condition_broadcast (&module->cond); - } + ompi_osc_pt2pt_sync_expected (lock); return OMPI_SUCCESS; } @@ -829,65 +778,48 @@ int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source, /* initiator-side function called when the target acks the lock request. */ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module, - ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header) + ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header) { - ompi_osc_pt2pt_peer_t *peer = module->peers + lock_ack_header->source; - ompi_osc_pt2pt_outstanding_lock_t *lock; + ompi_osc_pt2pt_sync_t *lock; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_process_lock_ack: processing lock ack from %d for lock %" PRIu64, lock_ack_header->source, lock_ack_header->lock_ptr)); - lock = (ompi_osc_pt2pt_outstanding_lock_t *) (uintptr_t) lock_ack_header->lock_ptr; + lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ack_header->lock_ptr; assert (NULL != lock); - /* no need to hold the lock to set this */ - peer->eager_send_active = true; - if (0 == OPAL_THREAD_ADD32(&lock->lock_acks_expected, -1)) { - opal_condition_broadcast(&module->cond); - } - - opal_condition_broadcast(&module->cond); + ompi_osc_pt2pt_sync_expected (lock); } void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_flush_ack_t *flush_ack_header) { - ompi_osc_pt2pt_outstanding_lock_t *lock; + ompi_osc_pt2pt_sync_t *lock; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_process_flush_ack: processing flush ack from %d for lock %" PRIu64, - source, flush_ack_header->serial_number)); + "ompi_osc_pt2pt_process_flush_ack: processing flush ack from %d for lock 0x%" PRIx64, + source, flush_ack_header->lock_ptr)); - /* NTH: need to verify that this will work as expected */ - lock = find_outstanding_lock_by_serial (module, flush_ack_header->serial_number); + lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) flush_ack_header->lock_ptr; assert (NULL != lock); - if (0 == OPAL_THREAD_ADD32(&lock->flush_acks_expected, -1)) { - opal_condition_broadcast(&module->cond); - } - - opal_condition_broadcast(&module->cond); + ompi_osc_pt2pt_sync_expected (lock); } void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int source, - ompi_osc_pt2pt_header_unlock_ack_t *unlock_ack_header) + ompi_osc_pt2pt_header_unlock_ack_t *unlock_ack_header) { - ompi_osc_pt2pt_peer_t *peer = module->peers + source; - ompi_osc_pt2pt_outstanding_lock_t *lock; + ompi_osc_pt2pt_sync_t *lock; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_process_unlock_ack: processing unlock ack from %d", source)); /* NTH: need to verify that this will work as expected */ - lock = (ompi_osc_pt2pt_outstanding_lock_t *) (intptr_t) unlock_ack_header->lock_ptr; + lock = (ompi_osc_pt2pt_sync_t *) (intptr_t) unlock_ack_header->lock_ptr; assert (NULL != lock); - peer->eager_send_active = false; - - if (0 == OPAL_THREAD_ADD32(&lock->unlock_acks_expected, -1)) { - opal_condition_broadcast(&module->cond); - } + ompi_osc_pt2pt_sync_expected (lock); } /** @@ -903,12 +835,14 @@ void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int sou * active a pending lock if the lock becomes free. */ int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source, - ompi_osc_pt2pt_header_unlock_t *unlock_header) + ompi_osc_pt2pt_header_unlock_t *unlock_header) { + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); ompi_osc_pt2pt_header_unlock_ack_t unlock_ack; - ompi_osc_pt2pt_peer_t *peer = module->peers + source; int ret; + assert (NULL != peer); + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_process_unlock entering (passive_incoming_frag_count: %d)...", peer->passive_incoming_frag_count)); @@ -950,11 +884,13 @@ int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source, } int ompi_osc_pt2pt_process_flush (ompi_osc_pt2pt_module_t *module, int source, - ompi_osc_pt2pt_header_flush_t *flush_header) + ompi_osc_pt2pt_header_flush_t *flush_header) { - ompi_osc_pt2pt_peer_t *peer = module->peers + source; + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source); ompi_osc_pt2pt_header_flush_ack_t flush_ack; + assert (NULL != peer); + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_process_flush entering (passive_incoming_frag_count: %d)...", peer->passive_incoming_frag_count)); @@ -966,7 +902,7 @@ int ompi_osc_pt2pt_process_flush (ompi_osc_pt2pt_module_t *module, int source, flush_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK; flush_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; - flush_ack.serial_number = flush_header->serial_number; + flush_ack.lock_ptr = flush_header->lock_ptr; OSC_PT2PT_HTON(&flush_ack, module, source); return ompi_osc_pt2pt_control_send_unbuffered (module, source, &flush_ack, sizeof (flush_ack)); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c new file mode 100644 index 0000000000..74c6be1801 --- /dev/null +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c @@ -0,0 +1,90 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "osc_pt2pt.h" +#include "osc_pt2pt_sync.h" + +static void ompi_osc_pt2pt_sync_constructor (ompi_osc_pt2pt_sync_t *sync) +{ + sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE; + sync->eager_send_active = false; + sync->epoch_active = false; + OBJ_CONSTRUCT(&sync->lock, opal_mutex_t); + OBJ_CONSTRUCT(&sync->cond, opal_condition_t); +} + +static void ompi_osc_pt2pt_sync_destructor (ompi_osc_pt2pt_sync_t *sync) +{ + OBJ_DESTRUCT(&sync->lock); + OBJ_DESTRUCT(&sync->cond); +} + +OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_sync_t, opal_free_list_item_t, + ompi_osc_pt2pt_sync_constructor, + ompi_osc_pt2pt_sync_destructor); + +ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_sync_allocate (struct ompi_osc_pt2pt_module_t *module) +{ + ompi_osc_pt2pt_sync_t *sync; +#pragma unused (module) + sync = OBJ_NEW (ompi_osc_pt2pt_sync_t); + if (OPAL_UNLIKELY(NULL == sync)) { + return NULL; + } + + sync->module = module; + return sync; +} + +void ompi_osc_pt2pt_sync_return (ompi_osc_pt2pt_sync_t *sync) +{ + OBJ_RELEASE(sync); +} + +static inline bool ompi_osc_pt2pt_sync_array_peer (int rank, ompi_osc_pt2pt_peer_t **peers, size_t nranks, + struct ompi_osc_pt2pt_peer_t **peer) +{ + int mid = nranks / 2; + + /* base cases */ + if (0 == nranks || (1 == nranks && peers[0]->rank != rank)) { + if (peer) { + *peer = NULL; + } + return false; + } else if (peers[0]->rank == rank) { + if (peer) { + *peer = peers[0]; + } + return true; + } + + if (peers[mid]->rank > rank) { + return ompi_osc_pt2pt_sync_array_peer (rank, peers, mid, peer); + } + + return ompi_osc_pt2pt_sync_array_peer (rank, peers + mid, nranks - mid, peer); +} + +bool ompi_osc_pt2pt_sync_pscw_peer (ompi_osc_pt2pt_module_t *module, int target, struct ompi_osc_pt2pt_peer_t **peer) +{ + ompi_osc_pt2pt_sync_t *pt2pt_sync = &module->all_sync; + + /* check synchronization type */ + if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != pt2pt_sync->type) { + if (peer) { + *peer = NULL; + } + return false; + } + + return ompi_osc_pt2pt_sync_array_peer (target, pt2pt_sync->peer_list.peers, pt2pt_sync->num_peers, peer); +} diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h new file mode 100644 index 0000000000..4df38884c6 --- /dev/null +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h @@ -0,0 +1,178 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_OSC_PT2PT_SYNC_H +#define OMPI_OSC_PT2PT_SYNC_H + +#include "ompi_config.h" +#include "opal/class/opal_free_list.h" +#include "opal/threads/threads.h" + +enum ompi_osc_pt2pt_sync_type_t { + /** default value */ + OMPI_OSC_PT2PT_SYNC_TYPE_NONE, + /** lock access epoch */ + OMPI_OSC_PT2PT_SYNC_TYPE_LOCK, + /** fence access epoch */ + OMPI_OSC_PT2PT_SYNC_TYPE_FENCE, + /* post-start-complete-wait access epoch */ + OMPI_OSC_PT2PT_SYNC_TYPE_PSCW, +}; +typedef enum ompi_osc_pt2pt_sync_type_t ompi_osc_pt2pt_sync_type_t; + +struct ompi_osc_pt2pt_module_t; +struct ompi_osc_pt2pt_peer_t; + +/** + * @brief synchronization object + * + * This structure holds information about an access epoch. + */ +struct ompi_osc_pt2pt_sync_t { + opal_free_list_item_t super; + + struct ompi_osc_pt2pt_module_t *module; + + /** synchronization type */ + ompi_osc_pt2pt_sync_type_t type; + + /** synchronization data */ + union { + /** lock specific synchronization data */ + struct { + /** lock target rank (-1 for all) */ + int target; + /** lock type: MPI_LOCK_SHARED, MPI_LOCK_EXCLUSIVE */ + int type; + /** assert specified at lock acquire time */ + int assert; + } lock; + /** post/start/complete/wait specific synchronization data */ + struct { + /** group passed to ompi_osc_pt2pt_start */ + ompi_group_t *group; + } pscw; + } sync; + + /** array of peers for this sync */ + union { + /** multiple peers (lock all, pscw, fence) */ + struct ompi_osc_pt2pt_peer_t **peers; + /** single peer (targeted lock) */ + struct ompi_osc_pt2pt_peer_t *peer; + } peer_list; + + /** number of peers */ + int num_peers; + + /** number of synchronization messages expected */ + int32_t sync_expected; + + /** eager sends are active to all peers in this access epoch */ + bool eager_send_active; + + /** communication has started on this epoch */ + bool epoch_active; + + /** lock to protect sync structure members */ + opal_mutex_t lock; + + /** condition variable for changes in the sync object */ + opal_condition_t cond; +}; +typedef struct ompi_osc_pt2pt_sync_t ompi_osc_pt2pt_sync_t; + +OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_sync_t); + +/** + * @brief allocate a new synchronization object + * + * @param[in] module osc pt2pt module + * + * @returns NULL on failure + * @returns a new synchronization object on success + */ +ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_sync_allocate (struct ompi_osc_pt2pt_module_t *module); + +/** + * @brief release a synchronization object + * + * @param[in] pt2pt_sync synchronization object allocated by ompi_osc_pt2pt_sync_allocate() + */ +void ompi_osc_pt2pt_sync_return (ompi_osc_pt2pt_sync_t *pt2pt_sync); + +/** + * Check if the target is part of a PSCW access epoch + * + * @param[in] module osc pt2pt module + * @param[in] target target rank + * @param[out] peer peer object + * + * @returns false if the window is not in a PSCW access epoch or the peer is not + * in the group passed to MPI_Win_start + * @returns true otherwise + * + * This functions verifies the target is part of an active PSCW access epoch. + */ +bool ompi_osc_pt2pt_sync_pscw_peer (struct ompi_osc_pt2pt_module_t *module, int target, struct ompi_osc_pt2pt_peer_t **peer); + +/** + * Wait for all remote peers in the synchronization to respond + */ +static inline void ompi_osc_pt2pt_sync_wait (ompi_osc_pt2pt_sync_t *sync) +{ + OPAL_THREAD_LOCK(&sync->lock); + while (!sync->eager_send_active) { + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "waiting for access epoch to start")); + opal_condition_wait(&sync->cond, &sync->lock); + } + OPAL_THREAD_UNLOCK(&sync->lock); + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "access epoch ready")); +} + +/** + * Wait for all remote peers in the synchronization to respond + */ +static inline void ompi_osc_pt2pt_sync_wait_expected (ompi_osc_pt2pt_sync_t *sync) +{ + OPAL_THREAD_LOCK(&sync->lock); + while (sync->sync_expected) { + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "waiting for %d syncronization messages", + sync->sync_expected)); + opal_condition_wait(&sync->cond, &sync->lock); + } + OPAL_THREAD_UNLOCK(&sync->lock); + + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "all synchronization messages received")); +} + +static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync) +{ + int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1); + if (0 == new_value) { + sync->eager_send_active = true; + opal_condition_broadcast (&sync->cond); + } +} + +static inline void ompi_osc_pt2pt_sync_reset (ompi_osc_pt2pt_sync_t *sync) +{ + sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE; + sync->eager_send_active = 0; + sync->epoch_active = 0; +} + +#endif /* OMPI_OSC_PT2PT_SYNC_H */