1
1

Merge pull request #902 from hjelmn/new_osc

osc/pt2pt: reduce memory footprint of windows
Этот коммит содержится в:
Nathan Hjelm 2015-09-21 10:28:41 -06:00
родитель ee17d73bd3 fd42343ff0
Коммит 88100ad670
13 изменённых файлов: 1084 добавлений и 882 удалений

Просмотреть файл

@ -32,7 +32,9 @@ pt2pt_sources = \
osc_pt2pt_request.h \
osc_pt2pt_request.c \
osc_pt2pt_active_target.c \
osc_pt2pt_passive_target.c
osc_pt2pt_passive_target.c \
osc_pt2pt_sync.h \
osc_pt2pt_sync.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la

Просмотреть файл

@ -29,19 +29,19 @@
#include "opal/class/opal_free_list.h"
#include "opal/class/opal_hash_table.h"
#include "opal/threads/threads.h"
#include "opal/mca/btl/btl.h"
#include "opal/util/output.h"
#include "ompi/win/win.h"
#include "ompi/info/info.h"
#include "ompi/communicator/communicator.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/request/request.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/osc/base/base.h"
#include "opal/mca/btl/btl.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/memchecker.h"
#include "osc_pt2pt_header.h"
#include "osc_pt2pt_sync.h"
BEGIN_C_DECLS
@ -85,6 +85,9 @@ struct ompi_osc_pt2pt_peer_t {
/** make this an opal object */
opal_object_t super;
/** rank of this peer */
int rank;
/** pointer to the current send fragment for each outgoing target */
struct ompi_osc_pt2pt_frag_t *active_frag;
@ -94,25 +97,16 @@ struct ompi_osc_pt2pt_peer_t {
/** fragments queued to this target */
opal_list_t queued_frags;
/** number of acks pending. New requests can not be sent out if there are
* acks pending (to fulfill the ordering constraints of accumulate) */
uint32_t num_acks_pending;
/** number of fragments incomming (negative - expected, positive - unsynchronized) */
int32_t passive_incoming_frag_count;
/** peer is in an access epoch */
bool access_epoch;
/** eager sends are active to this peer */
bool eager_send_active;
/** unexpected post message arrived */
bool unexpected_post;
};
typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);
#define SEQ_INVALID 0xFFFFFFFFFFFFFFFFULL
/** Module structure. Exactly one of these is associated with each
PT2PT window */
struct ompi_osc_pt2pt_module_t {
@ -122,6 +116,9 @@ struct ompi_osc_pt2pt_module_t {
/** window should have accumulate ordering... */
bool accumulate_ordering;
/** no locks info key value */
bool no_locks;
/** pointer to free on cleanup (may be NULL) */
void *free_after;
@ -141,11 +138,11 @@ struct ompi_osc_pt2pt_module_t {
/** condition variable associated with lock */
opal_condition_t cond;
/** lock for atomic window updates from reductions */
opal_mutex_t acc_lock;
/** hash table of peer objects */
opal_hash_table_t peer_hash;
/** peer data */
ompi_osc_pt2pt_peer_t *peers;
/** lock protecting peer_hash */
opal_mutex_t peer_lock;
/** Nmber of communication fragments started for this epoch, by
peer. Not in peer data to make fence more manageable. */
@ -166,29 +163,14 @@ struct ompi_osc_pt2pt_module_t {
/* Next incoming buffer count at which we want a signal on cond */
uint32_t active_incoming_frag_signal_count;
/* Number of flush ack requests send since beginning of time */
uint64_t flush_ack_requested_count;
/* Number of flush ack replies received since beginning of
time. cond should be signalled on every flush reply
received. */
uint64_t flush_ack_received_count;
/** Number of targets locked/being locked */
unsigned int passive_target_access_epoch;
/** start sending data eagerly */
bool active_eager_send_active;
/** Indicates the window is in an all access epoch (fence, lock_all) */
bool all_access_epoch;
/** Indicates the window is in a pcsw or all access (fence, lock_all) epoch */
ompi_osc_pt2pt_sync_t all_sync;
/* ********************* PWSC data ************************ */
struct ompi_group_t *pw_group;
struct ompi_group_t *sc_group;
/** Number of "ping" messages from the remote post group we've
received */
int32_t num_post_msgs;
/** Number of "count" messages from the remote complete group
we've received */
@ -207,9 +189,7 @@ struct ompi_osc_pt2pt_module_t {
opal_list_t locks_pending;
/** origin side list of locks currently outstanding */
opal_list_t outstanding_locks;
uint64_t lock_serial_number;
opal_hash_table_t outstanding_locks;
unsigned char *incoming_buffer;
ompi_request_t *frag_request;
@ -218,10 +198,6 @@ struct ompi_osc_pt2pt_module_t {
opal_atomic_lock_t accumulate_lock;
opal_list_t pending_acc;
/* enforce pscw matching */
/** list of unmatched post messages */
opal_list_t pending_posts;
/** Lock for garbage collection lists */
opal_mutex_t gc_lock;
@ -234,6 +210,29 @@ struct ompi_osc_pt2pt_module_t {
typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t;
OMPI_MODULE_DECLSPEC extern ompi_osc_pt2pt_component_t mca_osc_pt2pt_component;
static inline ompi_osc_pt2pt_peer_t *ompi_osc_pt2pt_peer_lookup (ompi_osc_pt2pt_module_t *module,
int rank)
{
ompi_osc_pt2pt_peer_t *peer = NULL;
(void) opal_hash_table_get_value_uint32 (&module->peer_hash, rank, (void **) &peer);
if (OPAL_UNLIKELY(NULL == peer)) {
OPAL_THREAD_LOCK(&module->peer_lock);
(void) opal_hash_table_get_value_uint32 (&module->peer_hash, rank, (void **) &peer);
if (NULL == peer) {
peer = OBJ_NEW(ompi_osc_pt2pt_peer_t);
peer->rank = rank;
(void) opal_hash_table_set_value_uint32 (&module->peer_hash, rank, (void *) peer);
}
OPAL_THREAD_UNLOCK(&module->peer_lock);
}
return peer;
}
struct ompi_osc_pt2pt_pending_t {
opal_list_item_t super;
ompi_osc_pt2pt_module_t *module;
@ -262,23 +261,23 @@ int ompi_osc_pt2pt_put(const void *origin_addr,
struct ompi_win_t *win);
int ompi_osc_pt2pt_accumulate(const void *origin_addr,
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
OPAL_PTRDIFF_TYPE target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_op_t *op,
struct ompi_win_t *win);
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
OPAL_PTRDIFF_TYPE target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_op_t *op,
struct ompi_win_t *win);
int ompi_osc_pt2pt_get(void *origin_addr,
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
OPAL_PTRDIFF_TYPE target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_win_t *win);
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
OPAL_PTRDIFF_TYPE target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_win_t *win);
int ompi_osc_pt2pt_compare_and_swap(const void *origin_addr,
const void *compare_addr,
@ -357,7 +356,10 @@ int ompi_osc_pt2pt_rget_accumulate(const void *origin_addr,
int ompi_osc_pt2pt_fence(int assert, struct ompi_win_t *win);
/* received a post message */
int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source);
void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source);
/* received a complete message */
void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, int frag_count);
int ompi_osc_pt2pt_start(struct ompi_group_t *group,
int assert,
@ -451,7 +453,8 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in
opal_condition_broadcast(&module->cond);
}
} else {
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"mark_incoming_completion marking passive incoming complete. source = %d, count = %d",
source, (int) peer->passive_incoming_frag_count + 1));
@ -704,6 +707,16 @@ static inline int ompi_osc_pt2pt_accumulate_trylock (ompi_osc_pt2pt_module_t *mo
return opal_atomic_trylock (&module->accumulate_lock);
}
/**
* @brief check if this process has this process is in a passive target access epoch
*
* @param[in] module osc pt2pt module
*/
static inline bool ompi_osc_pt2pt_in_passive_epoch (ompi_osc_pt2pt_module_t *module)
{
return 0 != module->passive_target_access_epoch;
}
/**
* ompi_osc_pt2pt_accumulate_unlock:
*
@ -722,9 +735,134 @@ static inline void ompi_osc_pt2pt_accumulate_unlock (ompi_osc_pt2pt_module_t *mo
}
}
static inline bool ompi_osc_pt2pt_check_access_epoch (ompi_osc_pt2pt_module_t *module, int rank)
/**
* Find the first outstanding lock of the target.
*
* @param[in] module osc pt2pt module
* @param[in] target target rank
* @param[out] peer peer object associated with the target
*
* @returns an outstanding lock on success
*
* This function looks for an outstanding lock to the target. If a lock exists it is returned.
*/
static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_lock_find (ompi_osc_pt2pt_module_t *module, int target,
ompi_osc_pt2pt_peer_t **peer)
{
return module->all_access_epoch || module->peers[rank].access_epoch;
ompi_osc_pt2pt_sync_t *outstanding_lock = NULL;
(void) opal_hash_table_get_value_uint32 (&module->outstanding_locks, (uint32_t) target, (void **) &outstanding_lock);
if (NULL != outstanding_lock && peer) {
*peer = outstanding_lock->peer_list.peer;
}
return outstanding_lock;
}
/**
* Add an outstanding lock
*
* @param[in] module osc pt2pt module
* @param[in] lock lock object
*
* This function inserts a lock object to the list of outstanding locks. The caller must be holding the module
* lock.
*/
static inline void ompi_osc_pt2pt_module_lock_insert (struct ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
{
(void) opal_hash_table_set_value_uint32 (&module->outstanding_locks, (uint32_t) lock->sync.lock.target, (void *) lock);
}
/**
* Remove an outstanding lock
*
* @param[in] module osc pt2pt module
* @param[in] lock lock object
*
* This function removes a lock object to the list of outstanding locks. The caller must be holding the module
* lock.
*/
static inline void ompi_osc_pt2pt_module_lock_remove (struct ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
{
(void) opal_hash_table_remove_value_uint32 (&module->outstanding_locks, (uint32_t) lock->sync.lock.target);
}
/**
* Lookup a synchronization object associated with the target
*
* @param[in] module osc pt2pt module
* @param[in] target target rank
* @param[out] peer peer object
*
* @returns NULL if the target is not locked, fenced, or part of a pscw sync
* @returns synchronization object on success
*
* This function returns the synchronization object associated with an access epoch for
* the target. If the target is not part of any current access epoch then NULL is returned.
*/
static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc_pt2pt_module_t *module, int target,
struct ompi_osc_pt2pt_peer_t **peer)
{
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc/pt2pt: looking for synchronization object for target %d", target));
switch (module->all_sync.type) {
case OMPI_OSC_PT2PT_SYNC_TYPE_NONE:
if (!module->no_locks) {
return ompi_osc_pt2pt_module_lock_find (module, target, peer);
}
return NULL;
case OMPI_OSC_PT2PT_SYNC_TYPE_FENCE:
case OMPI_OSC_PT2PT_SYNC_TYPE_LOCK:
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc/pt2pt: found fence/lock_all access epoch for target %d", target));
/* fence epoch is now active */
module->all_sync.epoch_active = true;
if (peer) {
*peer = ompi_osc_pt2pt_peer_lookup (module, target);
}
return &module->all_sync;
case OMPI_OSC_PT2PT_SYNC_TYPE_PSCW:
if (ompi_osc_pt2pt_sync_pscw_peer (module, target, peer)) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc/pt2pt: found PSCW access epoch target for %d", target));
return &module->all_sync;
}
}
return NULL;
}
/**
* @brief check if an access epoch is active
*
* @param[in] module osc pt2pt module
*
* @returns true if any type of access epoch is active
* @returns false otherwise
*
* This function is used to check for conflicting access epochs.
*/
static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t *module)
{
return (module->all_sync.epoch_active || ompi_osc_pt2pt_in_passive_epoch (module));
}
static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank)
{
ompi_osc_pt2pt_sync_t *sync;
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, NULL);
if (!sync) {
return false;
}
return sync->eager_send_active;
}
END_C_DECLS

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 IBM Corporation. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -35,74 +35,92 @@
#include "ompi/mca/osc/base/base.h"
/**
* ompi_osc_pt2pt_pending_post_t:
* compare_ranks:
*
* Describes a post operation that was encountered outside its
* matching start operation.
* @param[in] ptra Pointer to integer item
* @param[in] ptrb Pointer to integer item
*
* @returns 0 if *ptra == *ptrb
* @returns -1 if *ptra < *ptrb
* @returns 1 otherwise
*
* This function is used to sort the rank list. It can be removed if
* groups are always in order.
*/
struct ompi_osc_pt2pt_pending_post_t {
opal_list_item_t super;
int rank;
};
typedef struct ompi_osc_pt2pt_pending_post_t ompi_osc_pt2pt_pending_post_t;
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_pending_post_t);
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_post_t, opal_list_item_t, NULL, NULL);
static bool group_contains_proc (ompi_group_t *group, ompi_proc_t *proc)
static int compare_ranks (const void *ptra, const void *ptrb)
{
int group_size = ompi_group_size (group);
int a = *((int *) ptra);
int b = *((int *) ptrb);
for (int i = 0 ; i < group_size ; ++i) {
ompi_proc_t *group_proc = ompi_group_peer_lookup (group, i);
/* it is safe to compare procs by pointer */
if (group_proc == proc) {
return true;
}
if (a < b) {
return -1;
} else if (a > b) {
return 1;
}
return false;
return 0;
}
static int*
get_comm_ranks(ompi_osc_pt2pt_module_t *module,
ompi_group_t *sub_group)
/**
* ompi_osc_pt2pt_get_comm_ranks:
*
* @param[in] module - OSC PT2PT module
* @param[in] sub_group - Group with ranks to translate
*
* @returns an array of translated ranks on success or NULL on failure
*
* Translate the ranks given in {sub_group} into ranks in the
* communicator used to create {module}.
*/
static ompi_osc_pt2pt_peer_t **ompi_osc_pt2pt_get_peers (ompi_osc_pt2pt_module_t *module, ompi_group_t *sub_group)
{
int *ranks1 = NULL, *ranks2 = NULL;
bool success = false;
int i, ret;
int size = ompi_group_size(sub_group);
ompi_osc_pt2pt_peer_t **peers;
int *ranks1, *ranks2;
int ret;
ranks1 = malloc(sizeof(int) * ompi_group_size(sub_group));
if (NULL == ranks1) goto cleanup;
ranks2 = malloc(sizeof(int) * ompi_group_size(sub_group));
if (NULL == ranks2) goto cleanup;
ranks1 = malloc (sizeof(int) * size);
ranks2 = malloc (sizeof(int) * size);
peers = malloc (sizeof (ompi_osc_pt2pt_peer_t *) * size);
if (NULL == ranks1 || NULL == ranks2 || NULL == peers) {
free (ranks1);
free (ranks2);
free (peers);
}
for (i = 0 ; i < ompi_group_size(sub_group) ; ++i) {
for (int i = 0 ; i < size ; ++i) {
ranks1[i] = i;
}
ret = ompi_group_translate_ranks(sub_group,
ompi_group_size(sub_group),
ranks1,
module->comm->c_local_group,
ranks2);
if (OMPI_SUCCESS != ret) goto cleanup;
success = true;
cleanup:
if (NULL != ranks1) free(ranks1);
if (!success) {
if (NULL != ranks2) free(ranks2);
ranks2 = NULL;
ret = ompi_group_translate_ranks (sub_group, size, ranks1, module->comm->c_local_group,
ranks2);
free (ranks1);
if (OMPI_SUCCESS != ret) {
free (ranks2);
free (peers);
return NULL;
}
return ranks2;
qsort (ranks2, size, sizeof (int), compare_ranks);
for (int i = 0 ; i < size ; ++i) {
peers[i] = ompi_osc_pt2pt_peer_lookup (module, ranks2[i]);
OBJ_RETAIN(peers[i]);
}
free (ranks2);
return peers;
}
int
ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
static void ompi_osc_pt2pt_release_peers (ompi_osc_pt2pt_peer_t **peers, int npeers)
{
for (int i = 0 ; i < npeers ; ++i) {
OBJ_RELEASE(peers[i]);
}
free (peers);
}
int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
uint32_t incoming_reqs;
@ -112,14 +130,16 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
"osc pt2pt: fence start"));
/* can't enter an active target epoch when in a passive target epoch */
if (module->passive_target_access_epoch) {
if (ompi_osc_pt2pt_in_passive_epoch (module)) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc pt2pt: could not enter fence. already in an access epoch"));
return OMPI_ERR_RMA_SYNC;
}
/* active sends are now active (we will close the epoch if NOSUCCEED is specified) */
if (0 == (assert & MPI_MODE_NOSUCCEED)) {
module->active_eager_send_active = true;
module->all_access_epoch = true;
module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE;
module->all_sync.eager_send_active = true;
}
/* short-circuit the noprecede case */
@ -168,9 +188,11 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
if (assert & MPI_MODE_NOSUCCEED) {
/* as specified in MPI-3 p 438 3-5 the fence can end an epoch. it isn't explicitly
* stated that MPI_MODE_NOSUCCEED ends the epoch but it is a safe assumption. */
module->active_eager_send_active = false;
module->all_access_epoch = false;
ompi_osc_pt2pt_sync_reset (&module->all_sync);
}
module->all_sync.epoch_active = false;
opal_condition_broadcast (&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
@ -181,124 +203,131 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
}
int
ompi_osc_pt2pt_start(ompi_group_t *group,
int assert,
ompi_win_t *win)
int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_pending_post_t *pending_post, *next;
int group_size;
int *ranks;
ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
OPAL_THREAD_LOCK(&module->lock);
OPAL_THREAD_LOCK(&sync->lock);
/* ensure we're not already in a start or passive target. we can no check for all
* access here due to fence */
if (NULL != module->sc_group || module->passive_target_access_epoch) {
/* check if we are already in an access epoch */
if (ompi_osc_pt2pt_access_epoch_active (module)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_SYNC;
}
/* mark all procs in this group as being in an access epoch */
sync->num_peers = ompi_group_size (group);
sync->sync.pscw.group = group;
/* haven't processed any post messages yet */
sync->sync_expected = sync->num_peers;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_start entering with group size %d...",
sync->num_peers));
if (0 == ompi_group_size (group)) {
/* nothing more to do. this is an empty start epoch */
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
opal_atomic_wmb ();
sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_PSCW;
/* prevent us from entering a passive-target, fence, or another pscw access epoch until
* the matching complete is called */
sync->epoch_active = true;
/* translate the group ranks into the communicator */
sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group);
if (NULL == sync->peer_list.peers) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* save the group */
OBJ_RETAIN(group);
ompi_group_increment_proc_count(group);
module->sc_group = group;
if (!(assert & MPI_MODE_NOCHECK)) {
OPAL_THREAD_LOCK(&sync->lock);
for (int i = 0 ; i < sync->num_peers ; ++i) {
ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];
/* mark all procs in this group as being in an access epoch */
group_size = ompi_group_size (module->sc_group);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_start entering with group size %d...",
group_size));
ranks = get_comm_ranks(module, module->sc_group);
if (NULL == ranks) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
for (int i = 0 ; i < group_size ; ++i) {
/* when the post comes in we will be in an access epoch with this proc */
module->peers[ranks[i]].access_epoch = true;
}
free (ranks);
OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_pt2pt_pending_post_t) {
ompi_proc_t *pending_proc = ompi_comm_peer_lookup (module->comm, pending_post->rank);
if (group_contains_proc (module->sc_group, pending_proc)) {
ompi_osc_pt2pt_peer_t *peer = module->peers + pending_post->rank;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Consumed unexpected post message from %d",
pending_post->rank));
++module->num_post_msgs;
peer->eager_send_active = true;
opal_list_remove_item (&module->pending_posts, &pending_post->super);
OBJ_RELEASE(pending_post);
if (peer->unexpected_post) {
/* the peer already sent a post message for this pscw access epoch */
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"found unexpected post from %d",
peer->rank));
OPAL_THREAD_ADD32 (&sync->sync_expected, -1);
peer->unexpected_post = false;
}
}
OPAL_THREAD_UNLOCK(&sync->lock);
} else {
sync->sync_expected = 0;
}
/* disable eager sends until we've receved the proper number of
post messages, at which time we know all our peers are ready to
receive messages. */
module->active_eager_send_active = false;
/* possible we've already received a couple in messages, so
add however many we're going to wait for */
module->num_post_msgs -= ompi_group_size(module->sc_group);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"num_post_msgs = %d", module->num_post_msgs));
"post messages still needed: %d", sync->sync_expected));
/* if we've already received all the post messages, we can eager
send. Otherwise, eager send will be enabled when
numb_post_messages reaches 0 */
if (0 == module->num_post_msgs) {
module->active_eager_send_active = true;
if (0 == sync->sync_expected) {
sync->eager_send_active = true;
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_start complete"));
"ompi_osc_pt2pt_start complete. eager sends active: %d",
sync->eager_send_active));
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
int
ompi_osc_pt2pt_complete(ompi_win_t *win)
int ompi_osc_pt2pt_complete (ompi_win_t *win)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_header_complete_t complete_req;
ompi_osc_pt2pt_peer_t *peer;
ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
int my_rank = ompi_comm_rank (module->comm);
ompi_osc_pt2pt_peer_t **peers;
int ret = OMPI_SUCCESS;
int i;
int *ranks = NULL;
ompi_group_t *group;
size_t group_size;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_complete entering..."));
if (NULL == module->sc_group) {
OPAL_THREAD_LOCK(&module->lock);
if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != sync->type) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_SYNC;
}
ranks = get_comm_ranks(module, module->sc_group);
if (NULL == ranks) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
OPAL_THREAD_LOCK(&module->lock);
/* wait for all the post messages */
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
ompi_osc_pt2pt_sync_wait (sync);
/* phase 1 cleanup sync object */
group = sync->sync.pscw.group;
group_size = sync->num_peers;
peers = sync->peer_list.peers;
if (NULL == peers) {
/* empty peer list */
OPAL_THREAD_UNLOCK(&(module->lock));
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_complete sending complete messages"));
"ompi_osc_pt2pt_complete all posts received. sending complete messages..."));
/* for each process in group, send a control message with number
of updates coming, then start all the requests. Note that the
@ -307,12 +336,13 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
At the same time, clean out the outgoing count for the next
round. */
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, ranks[i]);
if (ompi_proc_local() == proc) {
for (size_t i = 0 ; i < group_size ; ++i) {
ompi_osc_pt2pt_header_complete_t complete_req;
int rank = peers[i]->rank;
if (my_rank == rank) {
/* shortcut for self */
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete self complete"));
module->num_complete_msgs++;
osc_pt2pt_incoming_complete (module, rank, 0);
continue;
}
@ -322,10 +352,10 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
complete_req.padding[0] = 0;
complete_req.padding[1] = 0;
#endif
complete_req.frag_count = module->epoch_outgoing_frag_count[ranks[i]];
complete_req.frag_count = module->epoch_outgoing_frag_count[rank];
osc_pt2pt_hton(&complete_req, proc);
peer = module->peers + ranks[i];
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, rank);
/* XXX -- TODO -- since fragment are always delivered in order we do not need to count anything but long
* requests. once that is done this can be removed. */
@ -335,36 +365,38 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_complete sending complete message to %d. frag_count: %u",
ranks[i], complete_req.frag_count));
rank, complete_req.frag_count));
peer->access_epoch = false;
ret = ompi_osc_pt2pt_control_send (module, ranks[i], &complete_req,
ret = ompi_osc_pt2pt_control_send (module, rank, &complete_req,
sizeof(ompi_osc_pt2pt_header_complete_t));
if (OMPI_SUCCESS != ret) goto cleanup;
if (OMPI_SUCCESS != ret) {
break;
}
ret = ompi_osc_pt2pt_frag_flush_target (module, ranks[i]);
if (OMPI_SUCCESS != ret) goto cleanup;
ret = ompi_osc_pt2pt_frag_flush_target (module, rank);
if (OMPI_SUCCESS != ret) {
break;
}
/* zero the fragment counts here to ensure they are zerod */
module->epoch_outgoing_frag_count[rank] = 0;
}
/* release our reference to peers in this group */
ompi_osc_pt2pt_release_peers (peers, group_size);
if (OMPI_SUCCESS != ret) {
return ret;
}
OPAL_THREAD_LOCK(&module->lock);
/* zero the fragment counts here to ensure they are zerod */
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
peer = module->peers + ranks[i];
module->epoch_outgoing_frag_count[ranks[i]] = 0;
peer->eager_send_active = false;
}
/* wait for outgoing requests to complete. Don't wait for incoming, as
we're only completing the access epoch, not the exposure epoch */
while (module->outgoing_frag_count != module->outgoing_frag_signal_count) {
opal_condition_wait(&module->cond, &module->lock);
}
/* phase 1 cleanup group */
group = module->sc_group;
module->sc_group = NULL;
ompi_osc_pt2pt_sync_reset (sync);
/* unlock here, as group cleanup can take a while... */
OPAL_THREAD_UNLOCK(&module->lock);
@ -375,26 +407,17 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_complete complete"));
free (ranks);
return OMPI_SUCCESS;
cleanup:
if (NULL != ranks) free(ranks);
return ret;
}
int
ompi_osc_pt2pt_post(ompi_group_t *group,
int assert,
ompi_win_t *win)
int ompi_osc_pt2pt_post (ompi_group_t *group, int assert, ompi_win_t *win)
{
int *ranks;
int ret = OMPI_SUCCESS;
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_header_post_t post_req;
ompi_osc_pt2pt_peer_t **peers;
/* can't check for all access epoch here due to fence */
if (module->pw_group) {
@ -405,17 +428,18 @@ ompi_osc_pt2pt_post(ompi_group_t *group,
"ompi_osc_pt2pt_post entering with group size %d...",
ompi_group_size (group)));
/* save the group */
OBJ_RETAIN(group);
ompi_group_increment_proc_count(group);
OPAL_THREAD_LOCK(&(module->lock));
OPAL_THREAD_LOCK(&module->lock);
/* ensure we're not already in a post */
if (NULL != module->pw_group) {
OPAL_THREAD_UNLOCK(&(module->lock));
return OMPI_ERR_RMA_SYNC;
}
/* save the group */
OBJ_RETAIN(group);
ompi_group_increment_proc_count(group);
module->pw_group = group;
/* Update completion counter. Can't have received any completion
@ -425,18 +449,26 @@ ompi_osc_pt2pt_post(ompi_group_t *group,
OPAL_THREAD_UNLOCK(&(module->lock));
if ((assert & MPI_MODE_NOCHECK) || 0 == ompi_group_size (group)) {
return OMPI_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"sending post messages"));
ranks = get_comm_ranks(module, module->pw_group);
if (NULL == ranks) {
/* translate group ranks into the communicator */
peers = ompi_osc_pt2pt_get_peers (module, module->pw_group);
if (OPAL_UNLIKELY(NULL == peers)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* send a hello counter to everyone in group */
for (int i = 0 ; i < ompi_group_size(module->pw_group) ; ++i) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Sending post message to rank %d", ranks[i]));
ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, ranks[i]);
ompi_osc_pt2pt_peer_t *peer = peers[i];
int rank = peer->rank;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Sending post message to rank %d", rank));
ompi_proc_t *proc = ompi_comm_peer_lookup (module->comm, rank);
/* shortcut for self */
if (ompi_proc_local() == proc) {
@ -447,26 +479,24 @@ ompi_osc_pt2pt_post(ompi_group_t *group,
post_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_POST;
post_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
post_req.windx = ompi_comm_get_cid(module->comm);
osc_pt2pt_hton(&post_req, proc);
/* we don't want to send any data, since we're the exposure
epoch only, so use an unbuffered send */
ret = ompi_osc_pt2pt_control_send_unbuffered(module, ranks[i], &post_req,
sizeof(ompi_osc_pt2pt_header_post_t));
ret = ompi_osc_pt2pt_control_send_unbuffered(module, rank, &post_req,
sizeof(ompi_osc_pt2pt_header_post_t));
if (OMPI_SUCCESS != ret) {
break;
}
}
free (ranks);
ompi_osc_pt2pt_release_peers (peers, ompi_group_size(module->pw_group));
return ret;
}
int
ompi_osc_pt2pt_wait(ompi_win_t *win)
int ompi_osc_pt2pt_wait (ompi_win_t *win)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_group_t *group;
@ -481,9 +511,10 @@ ompi_osc_pt2pt_wait(ompi_win_t *win)
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_complete_msgs ||
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"num_complete_msgs = %d, active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d",
module->num_complete_msgs, module->active_incoming_frag_count, module->active_incoming_frag_signal_count));
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "num_complete_msgs = %d, "
"active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d",
module->num_complete_msgs, module->active_incoming_frag_count,
module->active_incoming_frag_signal_count));
opal_condition_wait(&module->cond, &module->lock);
}
@ -501,9 +532,7 @@ ompi_osc_pt2pt_wait(ompi_win_t *win)
}
int
ompi_osc_pt2pt_test(ompi_win_t *win,
int *flag)
int ompi_osc_pt2pt_test (ompi_win_t *win, int *flag)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_group_t *group;
@ -542,41 +571,45 @@ ompi_osc_pt2pt_test(ompi_win_t *win,
return ret;
}
int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, int frag_count)
{
ompi_proc_t *source_proc = ompi_comm_peer_lookup (module->comm, source);
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: process_complete got complete message from %d. expected fragment count %d. "
"current signal count %d. current incomming count: %d. expected complete msgs: %d",
source, frag_count, module->active_incoming_frag_signal_count,
module->active_incoming_frag_count, module->num_complete_msgs));
OPAL_THREAD_LOCK(&module->lock);
/* the current fragment is not part of the frag_count so we need to add it here */
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count, frag_count);
if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) {
opal_condition_broadcast (&module->cond);
}
}
void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
{
ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
OPAL_THREAD_LOCK(&sync->lock);
/* verify that this proc is part of the current start group */
if (!module->sc_group || !group_contains_proc (module->sc_group, source_proc)) {
ompi_osc_pt2pt_pending_post_t *pending_post = OBJ_NEW(ompi_osc_pt2pt_pending_post_t);
if (!ompi_osc_pt2pt_sync_pscw_peer (module, source, NULL)) {
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received unexpected post message from %d. module->sc_group = %p, size = %d",
source, (void*)module->sc_group, module->sc_group ? ompi_group_size (module->sc_group) : 0));
"received unexpected post message from %d for future PSCW synchronization",
source));
pending_post->rank = source;
peer->unexpected_post = true;
OPAL_THREAD_UNLOCK(&sync->lock);
} else {
OPAL_THREAD_UNLOCK(&sync->lock);
opal_list_append (&module->pending_posts, &pending_post->super);
ompi_osc_pt2pt_sync_expected (sync);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received post message for PSCW synchronization. post messages still needed: %d",
sync->sync_expected));
}
assert (!peer->eager_send_active);
peer->eager_send_active = true;
module->num_post_msgs++;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received post message. num_post_msgs = %d", module->num_post_msgs));
if (0 == module->num_post_msgs) {
module->active_eager_send_active = true;
}
opal_condition_broadcast (&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -21,12 +21,6 @@
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include <stdio.h>
#include <string.h>
#include "osc_pt2pt.h"
#include "osc_pt2pt_request.h"
#include "osc_pt2pt_header.h"
@ -35,9 +29,9 @@
#include "opal_stdint.h"
#include "ompi/memchecker.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
#include "ompi/mca/osc/base/base.h"
#include <stdio.h>
/* progress an OSC request */
static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
@ -82,26 +76,17 @@ static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request)
}
/* self communication optimizations */
static inline int ompi_osc_pt2pt_put_self (const void *source, int source_count, ompi_datatype_t *source_datatype,
OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype,
ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count,
ompi_datatype_t *source_datatype, OPAL_PTRDIFF_TYPE target_disp, int target_count,
ompi_datatype_t *target_datatype, ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_request_t *request)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) target_disp * module->disp_unit);
int ret;
/* if we are in active target mode wait until all post messages arrive */
if (module->sc_group && !module->active_eager_send_active) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
}
if (!(module->passive_target_access_epoch || module->active_eager_send_active)) {
return OMPI_ERR_RMA_SYNC;
}
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype,
target, target_count, target_datatype);
@ -116,26 +101,16 @@ static inline int ompi_osc_pt2pt_put_self (const void *source, int source_count,
return OMPI_SUCCESS;
}
static inline int ompi_osc_pt2pt_get_self (void *target, int target_count, ompi_datatype_t *target_datatype,
OPAL_PTRDIFF_TYPE source_disp, int source_count, ompi_datatype_t *source_datatype,
ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, void *target, int target_count, ompi_datatype_t *target_datatype,
OPAL_PTRDIFF_TYPE source_disp, int source_count, ompi_datatype_t *source_datatype,
ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
{
void *source = (unsigned char*) module->baseptr +
((unsigned long) source_disp * module->disp_unit);
int ret;
/* if we are in active target mode wait until all post messages arrive */
if (module->sc_group && !module->active_eager_send_active) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
}
if (!(module->passive_target_access_epoch || module->active_eager_send_active)) {
return OMPI_ERR_RMA_SYNC;
}
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
ret = ompi_datatype_sndrcv (source, source_count, source_datatype,
target, target_count, target_datatype);
@ -150,24 +125,14 @@ static inline int ompi_osc_pt2pt_get_self (void *target, int target_count, ompi_
return OMPI_SUCCESS;
}
static inline int ompi_osc_pt2pt_cas_self (const void *source, const void *compare, void *result, ompi_datatype_t *datatype,
OPAL_PTRDIFF_TYPE target_disp, ompi_osc_pt2pt_module_t *module)
static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, const void *compare, void *result,
ompi_datatype_t *datatype, OPAL_PTRDIFF_TYPE target_disp, ompi_osc_pt2pt_module_t *module)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) target_disp * module->disp_unit);
/* if we are in active target mode wait until all post messages arrive */
if (module->sc_group && !module->active_eager_send_active) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
}
if (!(module->passive_target_access_epoch || module->active_eager_send_active)) {
return OMPI_ERR_RMA_SYNC;
}
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
ompi_osc_pt2pt_accumulate_lock (module);
@ -182,26 +147,16 @@ static inline int ompi_osc_pt2pt_cas_self (const void *source, const void *compa
return OMPI_SUCCESS;
}
static inline int ompi_osc_pt2pt_acc_self (const void *source, int source_count, ompi_datatype_t *source_datatype,
OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype,
ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count, ompi_datatype_t *source_datatype,
OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype,
ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) target_disp * module->disp_unit);
int ret;
/* if we are in active target mode wait until all post messages arrive */
if (module->sc_group && !module->active_eager_send_active) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
}
if (!(module->passive_target_access_epoch || module->active_eager_send_active)) {
return OMPI_ERR_RMA_SYNC;
}
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
ompi_osc_pt2pt_accumulate_lock (module);
@ -226,27 +181,17 @@ static inline int ompi_osc_pt2pt_acc_self (const void *source, int source_count,
return OMPI_SUCCESS;
}
static inline int ompi_osc_pt2pt_gacc_self (const void *source, int source_count, ompi_datatype_t *source_datatype,
void *result, int result_count, ompi_datatype_t *result_datatype,
OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype,
ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count, ompi_datatype_t *source_datatype,
void *result, int result_count, ompi_datatype_t *result_datatype,
OPAL_PTRDIFF_TYPE target_disp, int target_count, ompi_datatype_t *target_datatype,
ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) target_disp * module->disp_unit);
int ret;
/* if we are in active target mode wait until all post messages arrive */
if (module->sc_group && !module->active_eager_send_active) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
}
if (!(module->passive_target_access_epoch || module->active_eager_send_active)) {
return OMPI_ERR_RMA_SYNC;
}
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
ompi_osc_pt2pt_accumulate_lock (module);
@ -296,6 +241,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target);
ompi_osc_pt2pt_frag_t *frag;
ompi_osc_pt2pt_header_put_t *header;
ompi_osc_pt2pt_sync_t *pt2pt_sync;
size_t ddt_len, payload_len, frag_len;
bool is_long_datatype = false;
bool is_long_msg = false;
@ -309,7 +255,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
origin_dt->name, target, (int) target_disp,
target_count, target_dt->name, win->w_name));
if (!ompi_osc_pt2pt_check_access_epoch (module, target)) {
pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL);
if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
return OMPI_ERR_RMA_SYNC;
}
@ -324,9 +271,9 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
/* optimize self communication. TODO: optimize local communication */
if (ompi_comm_rank (module->comm) == target) {
return ompi_osc_pt2pt_put_self (origin_addr, origin_count, origin_dt,
target_disp, target_count, target_dt,
module, request);
return ompi_osc_pt2pt_put_self (pt2pt_sync, origin_addr, origin_count, origin_dt,
target_disp, target_count, target_dt,
module, request);
}
/* Compute datatype and payload lengths. Note that the datatype description
@ -354,16 +301,10 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
tag = get_tag(module);
}
/* flush will be called at the end of this function. make sure the post message has
/* flush will be called at the end of this function. make sure all post messages have
* arrived. */
if ((is_long_msg || request) && module->sc_group) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) {
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@ -478,6 +419,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
bool is_long_msg = false;
ompi_osc_pt2pt_frag_t *frag;
ompi_osc_pt2pt_header_acc_t *header;
ompi_osc_pt2pt_sync_t *pt2pt_sync;
size_t ddt_len, payload_len, frag_len;
char *ptr;
const void *packed_ddt;
@ -490,7 +432,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
target_count, target_dt->name, op->o_name,
win->w_name));
if (!ompi_osc_pt2pt_check_access_epoch (module, target)) {
pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL);
if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
return OMPI_ERR_RMA_SYNC;
}
@ -505,9 +448,9 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
/* optimize the self case. TODO: optimize the local case */
if (ompi_comm_rank (module->comm) == target) {
return ompi_osc_pt2pt_acc_self (origin_addr, origin_count, origin_dt,
target_disp, target_count, target_dt,
op, module, request);
return ompi_osc_pt2pt_acc_self (pt2pt_sync, origin_addr, origin_count, origin_dt,
target_disp, target_count, target_dt,
op, module, request);
}
/* Compute datatype and payload lengths. Note that the datatype description
@ -535,16 +478,10 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
tag = get_tag (module);
}
/* flush will be called at the end of this function. make sure the post message has
/* flush will be called at the end of this function. make sure all post messages have
* arrived. */
if ((is_long_msg || request) && module->sc_group) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) {
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
header = (ompi_osc_pt2pt_header_acc_t*) ptr;
@ -656,6 +593,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target);
ompi_osc_pt2pt_frag_t *frag;
ompi_osc_pt2pt_header_cswap_t *header;
ompi_osc_pt2pt_sync_t *pt2pt_sync;
size_t ddt_len, payload_len, frag_len;
ompi_osc_pt2pt_request_t *request;
const void *packed_ddt;
@ -668,14 +606,15 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
(unsigned long) result_addr, dt->name, target, (int) target_disp,
win->w_name));
if (!ompi_osc_pt2pt_check_access_epoch (module, target)) {
pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL);
if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
return OMPI_ERR_RMA_SYNC;
}
/* optimize self case. TODO: optimize local case */
if (ompi_comm_rank (module->comm) == target) {
return ompi_osc_pt2pt_cas_self (origin_addr, compare_addr, result_addr, dt, target_disp,
module);
return ompi_osc_pt2pt_cas_self (pt2pt_sync, origin_addr, compare_addr, result_addr, dt, target_disp,
module);
}
/* compare-and-swaps are always request based, so that we know where to land the data */
@ -697,6 +636,11 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
/* we need to send both the origin and compare buffers */
payload_len = dt->super.size * 2;
ret = ompi_datatype_get_pack_description(dt, &packed_ddt);
if (OMPI_SUCCESS != ret) {
return ret;
}
frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) {
@ -715,7 +659,6 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
osc_pt2pt_hton(header, proc);
ptr += sizeof(ompi_osc_pt2pt_header_cswap_t);
ret = ompi_datatype_get_pack_description(dt, &packed_ddt);
memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
ptr += ddt_len;
@ -802,6 +745,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
bool is_long_datatype = false;
ompi_osc_pt2pt_frag_t *frag;
ompi_osc_pt2pt_header_get_t *header;
ompi_osc_pt2pt_sync_t *pt2pt_sync;
size_t ddt_len, frag_len;
char *ptr;
const void *packed_ddt;
@ -813,7 +757,8 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
origin_dt->name, target, (int) target_disp,
target_count, target_dt->name, win->w_name));
if (!ompi_osc_pt2pt_check_access_epoch (module, target)) {
pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL);
if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
return OMPI_ERR_RMA_SYNC;
}
@ -835,9 +780,9 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
/* optimize self communication. TODO: optimize local communication */
if (ompi_comm_rank (module->comm) == target) {
*request = &pt2pt_request->super;
return ompi_osc_pt2pt_get_self (origin_addr, origin_count, origin_dt,
target_disp, target_count, target_dt,
module, pt2pt_request);
return ompi_osc_pt2pt_get_self (pt2pt_sync, origin_addr, origin_count, origin_dt,
target_disp, target_count, target_dt,
module, pt2pt_request);
}
pt2pt_request->type = OMPI_OSC_PT2PT_HDR_TYPE_GET;
@ -868,14 +813,10 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
/* for bookkeeping the get is "outgoing" */
ompi_osc_signal_outgoing (module, target, 1);
/* flush will be called at the end of this function. make sure the post message has
/* flush will be called at the end of this function. make sure all post messages have
* arrived. */
if (!release_req && module->sc_group) {
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) {
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
header = (ompi_osc_pt2pt_header_get_t*) ptr;
@ -1017,6 +958,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
bool is_long_msg = false;
ompi_osc_pt2pt_frag_t *frag;
ompi_osc_pt2pt_header_acc_t *header;
ompi_osc_pt2pt_sync_t *pt2pt_sync;
size_t ddt_len, payload_len, frag_len;
char *ptr;
const void *packed_ddt;
@ -1030,7 +972,8 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
target_rank, (int) target_disp, target_count, target_datatype->name,
op->o_name, win->w_name));
if (!ompi_osc_pt2pt_check_access_epoch (module, target_rank)) {
pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target_rank, NULL);
if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
return OMPI_ERR_RMA_SYNC;
}
@ -1052,10 +995,10 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
/* optimize the self case. TODO: optimize the local case */
if (ompi_comm_rank (module->comm) == target_rank) {
*request = &pt2pt_request->super;
return ompi_osc_pt2pt_gacc_self (origin_addr, origin_count, origin_datatype,
result_addr, result_count, result_datatype,
target_disp, target_count, target_datatype,
op, module, pt2pt_request);
return ompi_osc_pt2pt_gacc_self (pt2pt_sync, origin_addr, origin_count, origin_datatype,
result_addr, result_count, result_datatype,
target_disp, target_count, target_datatype,
op, module, pt2pt_request);
}
pt2pt_request->type = OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC;
@ -1102,16 +1045,10 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
/* increment the number of outgoing fragments */
ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests);
/* flush will be called at the end of this function. make sure the post message has
/* flush will be called at the end of this function. make sure all post messages have
* arrived. */
if (!release_req && module->sc_group) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) {
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
header = (ompi_osc_pt2pt_header_acc_t *) ptr;

Просмотреть файл

@ -28,25 +28,12 @@
#include <string.h>
#include "osc_pt2pt.h"
#include "osc_pt2pt_data_move.h"
#include "osc_pt2pt_frag.h"
#include "osc_pt2pt_request.h"
#include "osc_pt2pt_data_move.h"
#include "opal/threads/condition.h"
#include "opal/threads/mutex.h"
#include "opal/util/arch.h"
#include "opal/align.h"
#include "opal/mca/btl/btl.h"
#include "ompi/info/info.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
#include "opal/mca/btl/btl.h"
#include "ompi/mca/pml/pml.h"
static int component_open(void);
static int component_register(void);
static int component_init(bool enable_progress_threads, bool enable_mpi_threads);
static int component_finalize(void);
@ -64,7 +51,6 @@ ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
.mca_component_name = "pt2pt",
MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION),
.mca_open_component = component_open,
.mca_register_component_params = component_register,
},
.osc_data = {
@ -128,53 +114,15 @@ bool ompi_osc_pt2pt_no_locks = false;
/* look up parameters for configuring this window. The code first
looks in the info structure passed by the user, then through mca
parameters. */
static bool
check_config_value_bool(char *key, ompi_info_t *info)
static bool check_config_value_bool(char *key, ompi_info_t *info, bool result)
{
char *value_string;
int value_len, ret, flag, param;
const bool *flag_value;
bool result;
int flag;
ret = ompi_info_get_valuelen(info, key, &value_len, &flag);
if (OMPI_SUCCESS != ret) goto info_not_found;
if (flag == 0) goto info_not_found;
value_len++;
value_string = (char*)malloc(sizeof(char) * value_len + 1); /* Should malloc 1 char for NUL-termination */
if (NULL == value_string) goto info_not_found;
ret = ompi_info_get(info, key, value_len, value_string, &flag);
if (OMPI_SUCCESS != ret) {
free(value_string);
goto info_not_found;
}
assert(flag != 0);
ret = ompi_info_value_to_bool(value_string, &result);
free(value_string);
if (OMPI_SUCCESS != ret) goto info_not_found;
(void) ompi_info_get_bool (info, key, &result, &flag);
return result;
info_not_found:
param = mca_base_var_find("ompi", "osc", "pt2pt", key);
if (0 > param) return false;
ret = mca_base_var_get_value(param, &flag_value, NULL, NULL);
if (OMPI_SUCCESS != ret) return false;
return flag_value[0];
}
static int
component_open(void)
{
return OMPI_SUCCESS;
}
static int
component_register(void)
static int component_register (void)
{
ompi_osc_pt2pt_no_locks = false;
(void) mca_base_component_var_register(&mca_osc_pt2pt_component.super.osc_version,
@ -346,15 +294,26 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
/* initialize the objects, so that always free in cleanup */
OBJ_CONSTRUCT(&module->lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->cond, opal_condition_t);
OBJ_CONSTRUCT(&module->acc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t);
OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
OBJ_CONSTRUCT(&module->pending_posts, opal_list_t);
OBJ_CONSTRUCT(&module->request_gc, opal_list_t);
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);
OBJ_CONSTRUCT(&module->peer_hash, opal_hash_table_t);
OBJ_CONSTRUCT(&module->peer_lock, opal_mutex_t);
ret = opal_hash_table_init (&module->outstanding_locks, 64);
if (OPAL_SUCCESS != ret) {
goto cleanup;
}
ret = opal_hash_table_init (&module->peer_hash, 128);
if (OPAL_SUCCESS != ret) {
goto cleanup;
}
/* options */
/* FIX ME: should actually check this value... */
@ -388,17 +347,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
/* record my displacement unit. Always resolved at target */
module->disp_unit = disp_unit;
/* peer data */
module->peers = calloc(ompi_comm_size(comm), sizeof(ompi_osc_pt2pt_peer_t));
if (NULL == module->peers) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
for (int i = 0 ; i < ompi_comm_size (comm) ; ++i) {
OBJ_CONSTRUCT(module->peers + i, ompi_osc_pt2pt_peer_t);
}
/* peer op count data */
module->epoch_outgoing_frag_count = calloc (ompi_comm_size(comm), sizeof(uint32_t));
if (NULL == module->epoch_outgoing_frag_count) {
@ -408,18 +356,16 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
/* the statement below (from Brian) does not seem correct so disable active target on the
* window. if this end up being incorrect please revert this one change */
module->active_eager_send_active = false;
#if 0
/* initially, we're in that pseudo-fence state, so we allow eager
sends (yay for Fence). Other protocols will disable before
they start their epochs, so this isn't a problem. */
module->active_eager_send_active = true;
module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE;
module->all_sync.eager_send_active = true;
#endif
/* lock data */
if (check_config_value_bool("no_locks", info)) {
win->w_flags |= OMPI_WIN_NO_LOCKS;
}
module->no_locks = check_config_value_bool ("no_locks", info, ompi_osc_pt2pt_no_locks);
/* update component data */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
@ -460,6 +406,10 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
mca_osc_pt2pt_component.progress_enable = true;
}
if (module->no_locks) {
win->w_flags |= OMPI_WIN_NO_LOCKS;
}
OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
"done creating pt2pt window %d", ompi_comm_get_cid(module->comm)));
@ -503,6 +453,9 @@ static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
{
OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t);
OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
peer->active_frag = NULL;
peer->passive_incoming_frag_count = 0;
peer->unexpected_post = false;
}
static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -21,26 +21,19 @@
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_pt2pt.h"
#include "osc_pt2pt_header.h"
#include "osc_pt2pt_data_move.h"
#include "osc_pt2pt_frag.h"
#include "osc_pt2pt_request.h"
#include "opal/threads/condition.h"
#include "opal/threads/mutex.h"
#include "opal/util/arch.h"
#include "opal/util/output.h"
#include "opal/sys/atomic.h"
#include "opal/align.h"
#include "opal/mca/btl/btl.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "opal/mca/btl/btl.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/op/op.h"
@ -218,7 +211,7 @@ static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
* to a target) before this is sent.
*/
int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
void *data, size_t len)
void *data, size_t len)
{
ompi_osc_pt2pt_frag_t *frag;
char *ptr;
@ -494,6 +487,7 @@ static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *sourc
ompi_datatype_t *datatype, int peer, int tag)
{
struct osc_pt2pt_get_post_send_cb_data_t *data;
int ret;
data = malloc (sizeof (*data));
if (OPAL_UNLIKELY(NULL == data)) {
@ -505,8 +499,14 @@ static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *sourc
* in an active target epoch) */
data->peer = (tag & 0x1) ? peer : MPI_PROC_NULL;
return ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm,
/* data will be freed by the callback */
ret = ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm,
osc_pt2pt_get_post_send_cb, (void *) data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
free (data);
}
return ret;
}
/**
@ -712,7 +712,7 @@ static int accumulate_cb (ompi_request_t *request)
static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source,
char *data, size_t data_len, ompi_datatype_t *datatype)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
osc_pt2pt_pending_acc_t *pending_acc;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@ -877,14 +877,14 @@ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int s
}
ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
datatype, op, 1, &acc_data);
datatype, op, 1, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
free (buffer);
break;
}
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype, source,
acc_header->tag, module->comm, NULL, accumulate_cb, acc_data);
acc_header->tag, module->comm, NULL, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
}
@ -985,12 +985,6 @@ static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source
buflen = datatype_buffer_length (datatype, acc_header->count);
do {
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
break;
}
ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
@ -998,9 +992,16 @@ static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source
primitive_count *= acc_header->count;
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
break;
}
ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
datatype, op, 2, &acc_data);
datatype, op, 2, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
free (buffer);
break;
}
@ -1337,20 +1338,8 @@ static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source,
static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_complete_t *complete_header)
{
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: process_complete got complete message from %d. expected fragment count %d. "
"current signal count %d. current incomming count: %d",
source, complete_header->frag_count, module->active_incoming_frag_signal_count,
module->active_incoming_frag_count));
/* the current fragment is not part of the frag_count so we need to add it here */
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count,
complete_header->frag_count + 1);
if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) {
opal_condition_broadcast (&module->cond);
}
osc_pt2pt_incoming_complete (module, source, complete_header->frag_count + 1);
return sizeof (*complete_header);
}
@ -1361,7 +1350,7 @@ static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_flush_t *flush_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@ -1395,7 +1384,7 @@ static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_unlock_t *unlock_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@ -1653,10 +1642,11 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request)
process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header);
/* only data fragments should be included in the completion counters */
mark_incoming_completion (module, (base_header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL);
mark_incoming_completion (module, (base_header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ?
source : MPI_PROC_NULL);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_POST:
(void) osc_pt2pt_incoming_post (module, source);
osc_pt2pt_incoming_post (module, source);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK:
ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header);

Просмотреть файл

@ -12,17 +12,12 @@
* $HEADER$
*/
#include "ompi_config.h"
#include "opal/class/opal_list.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/pml/pml.h"
#include "osc_pt2pt.h"
#include "osc_pt2pt_frag.h"
#include "osc_pt2pt_data_move.h"
static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag){
static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag)
{
frag->buffer = frag->super.ptr;
}
@ -68,7 +63,7 @@ static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *fr
int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_frag_t *frag)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + frag->target;
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, frag->target);
int ret;
assert(0 == frag->pending && peer->active_frag != frag);
@ -79,7 +74,7 @@ int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module,
/* if eager sends are not active, can't send yet, so buffer and
get out... */
if (!(peer->eager_send_active || module->all_access_epoch) || opal_list_get_size (&peer->queued_frags)) {
if (!ompi_osc_pt2pt_peer_sends_active (module, frag->target) || opal_list_get_size (&peer->queued_frags)) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "queuing fragment to peer %d",
frag->target));
OPAL_THREAD_SCOPED_LOCK(&peer->lock,
@ -97,9 +92,9 @@ int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module,
return ret;
}
static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, int target)
static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_peer_t *peer)
{
ompi_osc_pt2pt_frag_t *active_frag = module->peers[target].active_frag;
ompi_osc_pt2pt_frag_t *active_frag = peer->active_frag;
int ret = OMPI_SUCCESS;
if (NULL == active_frag) {
@ -108,16 +103,16 @@ static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, in
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: flushing active fragment to target %d. pending: %d", target,
active_frag->pending));
"osc pt2pt: flushing active fragment to target %d. pending: %d",
active_frag->target, active_frag->pending));
if (opal_atomic_cmpset (&module->peers[target].active_frag, active_frag, NULL)) {
if (opal_atomic_cmpset (&peer->active_frag, active_frag, NULL)) {
if (0 != OPAL_THREAD_ADD32(&active_frag->pending, -1)) {
/* communication going on while synchronizing; this is an rma usage bug */
return OMPI_ERR_RMA_SYNC;
}
ompi_osc_signal_outgoing (module, target, 1);
ompi_osc_signal_outgoing (module, active_frag->target, 1);
ret = frag_send (module, active_frag);
}
@ -126,7 +121,7 @@ static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, in
int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + target;
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
ompi_osc_pt2pt_frag_t *frag;
int ret = OMPI_SUCCESS;
@ -150,7 +145,7 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
}
/* flush the active frag */
ret = ompi_osc_pt2pt_flush_active_frag (module, target);
ret = ompi_osc_pt2pt_flush_active_frag (module, peer);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush target %d finished", target));
@ -162,41 +157,20 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
{
int ret = OMPI_SUCCESS;
ompi_osc_pt2pt_frag_t *frag;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush all begin"));
/* try to start all the queued frags */
/* try to start frags queued to all peers */
for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
ompi_osc_pt2pt_peer_t *peer = module->peers + i;
while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
ret = frag_send(module, frag);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
}
/* XXX -- TODO -- better error handling */
ret = ompi_osc_pt2pt_frag_flush_target (module, i);
if (OMPI_SUCCESS != ret) {
return ret;
break;
}
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: flushing all active fragments"));
/* flush the active frag */
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
ret = ompi_osc_pt2pt_flush_active_frag (module, i);
if (OMPI_SUCCESS != ret) {
return ret;
}
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush all done"));
"osc pt2pt: frag flush all done. ret: %d", ret));
return ret;
}

Просмотреть файл

@ -51,6 +51,7 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
char **ptr)
{
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
ompi_osc_pt2pt_frag_t *curr;
int ret;
@ -64,13 +65,13 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
}
OPAL_THREAD_LOCK(&module->lock);
curr = module->peers[target].active_frag;
curr = peer->active_frag;
if (NULL == curr || curr->remain_len < request_len) {
opal_free_list_item_t *item = NULL;
if (NULL != curr) {
curr->remain_len = 0;
module->peers[target].active_frag = NULL;
peer->active_frag = NULL;
opal_atomic_mb ();
/* If there's something pending, the pending finish will
@ -87,8 +88,7 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
if (OPAL_UNLIKELY(NULL == item)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
curr = module->peers[target].active_frag =
(ompi_osc_pt2pt_frag_t*) item;
curr = peer->active_frag = (ompi_osc_pt2pt_frag_t*) item;
curr->target = target;
@ -105,7 +105,6 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
}
curr->header->source = ompi_comm_rank(module->comm);
curr->header->num_ops = 0;
curr->header->windx = ompi_comm_get_cid(module->comm);
if (curr->remain_len < request_len) {
OPAL_THREAD_UNLOCK(&module->lock);

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved.
@ -118,7 +118,6 @@ typedef struct ompi_osc_pt2pt_header_cswap_t ompi_osc_pt2pt_header_cswap_t;
struct ompi_osc_pt2pt_header_post_t {
ompi_osc_pt2pt_header_base_t base;
uint16_t windx;
};
typedef struct ompi_osc_pt2pt_header_post_t ompi_osc_pt2pt_header_post_t;
@ -134,7 +133,6 @@ typedef struct ompi_osc_pt2pt_header_lock_t ompi_osc_pt2pt_header_lock_t;
struct ompi_osc_pt2pt_header_lock_ack_t {
ompi_osc_pt2pt_header_base_t base;
uint16_t windx;
uint32_t source;
uint64_t lock_ptr;
};
@ -166,7 +164,7 @@ struct ompi_osc_pt2pt_header_flush_t {
uint8_t padding[2];
#endif
uint32_t frag_count;
uint64_t serial_number;
uint64_t lock_ptr;
};
typedef struct ompi_osc_pt2pt_header_flush_t ompi_osc_pt2pt_header_flush_t;
@ -175,13 +173,12 @@ struct ompi_osc_pt2pt_header_flush_ack_t {
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
uint8_t padding[6];
#endif
uint64_t serial_number;
uint64_t lock_ptr;
};
typedef struct ompi_osc_pt2pt_header_flush_ack_t ompi_osc_pt2pt_header_flush_ack_t;
struct ompi_osc_pt2pt_frag_header_t {
ompi_osc_pt2pt_header_base_t base;
uint16_t windx; /* cid of communicator backing window (our window id) */
uint32_t source; /* rank in window of source process */
int32_t num_ops; /* number of operations in this buffer */
uint32_t pad; /* ensure the fragment header is a multiple of 8 bytes */
@ -208,12 +205,10 @@ typedef union ompi_osc_pt2pt_header_t ompi_osc_pt2pt_header_t;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
#define MCA_OSC_PT2PT_FRAG_HDR_NTOH(h) \
(h).windx = ntohs((h).windx); \
(h).source = ntohl((h).source); \
(h).num_ops = ntohl((h).num_ops); \
(h).pad = ntohl((h).pad);
#define MCA_OSC_PT2PT_FRAG_HDR_HTON(h) \
(h).windx = htons((h).windx); \
(h).source = htonl((h).source); \
(h).num_ops = htonl((h).num_ops); \
(h).pad = htonl((h).pad);
@ -254,34 +249,24 @@ typedef union ompi_osc_pt2pt_header_t ompi_osc_pt2pt_header_t;
(h).op = htonl((h).op);
#define MCA_OSC_PT2PT_LOCK_HDR_NTOH(h) \
(h).lock_type = ntohl((h).lock_type); \
(h).lock_ptr = ntoh64((h).lock_ptr)
(h).lock_type = ntohl((h).lock_type)
#define MCA_OSC_PT2PT_LOCK_HDR_HTON(h) \
(h).lock_type = htonl((h).lock_type); \
(h).lock_ptr = hton64((h).lock_ptr)
(h).lock_type = htonl((h).lock_type)
#define MCA_OSC_PT2PT_UNLOCK_HDR_NTOH(h) \
(h).lock_type = ntohl((h).lock_type); \
(h).lock_ptr = ntoh64((h).lock_ptr); \
(h).frag_count = ntohl((h).frag_count)
#define MCA_OSC_PT2PT_UNLOCK_HDR_HTON(h) \
(h).lock_type = htonl((h).lock_type); \
(h).lock_ptr = hton64((h).lock_ptr); \
(h).frag_count = htonl((h).frag_count)
#define MCA_OSC_PT2PT_LOCK_ACK_HDR_NTOH(h) \
(h).windx = ntohs((h).windx); \
(h).source = ntohl((h).source); \
(h).lock_ptr = ntoh64((h).lock_ptr)
(h).source = ntohl((h).source)
#define MCA_OSC_PT2PT_LOCK_ACK_HDR_HTON(h) \
(h).windx = htonl((h).windx); \
(h).source= htonl((h).source); \
(h).lock_ptr = hton64((h).lock_ptr)
(h).source= htonl((h).source)
#define MCA_OSC_PT2PT_UNLOCK_ACK_HDR_NTOH(h) \
(h).lock_ptr = ntoh64((h).lock_ptr);
#define MCA_OSC_PT2PT_UNLOCK_ACK_HDR_HTON(h) \
(h).lock_ptr = hton64((h).lock_ptr);
#define MCA_OSC_PT2PT_UNLOCK_ACK_HDR_NTOH(h)
#define MCA_OSC_PT2PT_UNLOCK_ACK_HDR_HTON(h)
#define MCA_OSC_PT2PT_COMPLETE_HDR_NTOH(h) \
(h).frag_count = ntohl((h).frag_count)
@ -289,21 +274,15 @@ typedef union ompi_osc_pt2pt_header_t ompi_osc_pt2pt_header_t;
(h).frag_count = htonl((h).frag_count)
#define MCA_OSC_PT2PT_FLUSH_HDR_NTOH(h) \
(h).frag_count = ntohl((h).frag_count); \
(h).serial_number = ntoh64((h).serial_number)
(h).frag_count = ntohl((h).frag_count)
#define MCA_OSC_PT2PT_FLUSH_HDR_HTON(h) \
(h).frag_count = htonl((h).frag_count); \
(h).serial_number = ntoh64((h).serial_number)
(h).frag_count = htonl((h).frag_count)
#define MCA_OSC_PT2PT_FLUSH_ACK_HDR_NTOH(h) \
(h).serial_number = ntoh64((h).serial_number)
#define MCA_OSC_PT2PT_FLUSH_ACK_HDR_HTON(h) \
(h).serial_number = ntoh64((h).serial_number)
#define MCA_OSC_PT2PT_FLUSH_ACK_HDR_NTOH(h)
#define MCA_OSC_PT2PT_FLUSH_ACK_HDR_HTON(h)
#define MCA_OSC_PT2PT_POST_HDR_NTOH(h) \
(h).windx = ntohs((h).windx)
#define MCA_OSC_PT2PT_POST_HDR_HTON(h) \
(h).windx = htons((h).windx)
#define MCA_OSC_PT2PT_POST_HDR_NTOH(h)
#define MCA_OSC_PT2PT_POST_HDR_HTON(h)
#define MCA_OSC_PT2PT_CSWAP_HDR_NTOH(h) \
(h).tag = ntohs((h).tag); \

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
@ -20,20 +20,10 @@
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_pt2pt.h"
#include "opal/threads/mutex.h"
#include "opal/mca/btl/btl.h"
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/osc/base/base.h"
#include "mpi.h"
int
ompi_osc_pt2pt_attach(struct ompi_win_t *win, void *base, size_t len)
int ompi_osc_pt2pt_attach(struct ompi_win_t *win, void *base, size_t len)
{
return OMPI_SUCCESS;
}
@ -46,11 +36,13 @@ ompi_osc_pt2pt_detach(struct ompi_win_t *win, const void *base)
}
int
ompi_osc_pt2pt_free(ompi_win_t *win)
int ompi_osc_pt2pt_free(ompi_win_t *win)
{
int ret = OMPI_SUCCESS;
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_peer_t *peer;
uint32_t key;
void *node;
if (NULL == module) {
return OMPI_SUCCESS;
@ -78,28 +70,29 @@ ompi_osc_pt2pt_free(ompi_win_t *win)
OBJ_DESTRUCT(&module->outstanding_locks);
OBJ_DESTRUCT(&module->locks_pending);
OBJ_DESTRUCT(&module->locks_pending_lock);
OBJ_DESTRUCT(&module->acc_lock);
OBJ_DESTRUCT(&module->cond);
OBJ_DESTRUCT(&module->lock);
OBJ_DESTRUCT(&module->all_sync);
/* it is erroneous to close a window with active operations on it so we should
* probably produce an error here instead of cleaning up */
OPAL_LIST_DESTRUCT(&module->pending_acc);
OPAL_LIST_DESTRUCT(&module->pending_posts);
osc_pt2pt_gc_clean (module);
OPAL_LIST_DESTRUCT(&module->request_gc);
OPAL_LIST_DESTRUCT(&module->buffer_gc);
OBJ_DESTRUCT(&module->gc_lock);
if (NULL != module->peers) {
for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
OBJ_DESTRUCT(module->peers + i);
}
free(module->peers);
ret = opal_hash_table_get_first_key_uint32 (&module->peer_hash, &key, (void **) &peer, &node);
while (OPAL_SUCCESS == ret) {
OBJ_RELEASE(peer);
ret = opal_hash_table_get_next_key_uint32 (&module->peer_hash, &key, (void **) &peer, node,
&node);
}
OBJ_DESTRUCT(&module->peer_hash);
OBJ_DESTRUCT(&module->peer_lock);
if (NULL != module->epoch_outgoing_frag_count) free(module->epoch_outgoing_frag_count);
if (NULL != module->frag_request) {
@ -115,5 +108,5 @@ ompi_osc_pt2pt_free(ompi_win_t *win)
free (module);
return ret;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -37,7 +37,7 @@
#include "opal/include/opal_stdint.h"
static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, int source, int lock_type,
uint64_t serial_number);
uint64_t lock_ptr);
/* target-side tracking of a lock request */
struct ompi_osc_pt2pt_pending_lock_t {
@ -50,99 +50,27 @@ typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t;
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t,
NULL, NULL);
/* origin-side tracking of a lock request */
struct ompi_osc_pt2pt_outstanding_lock_t {
opal_list_item_t super;
int target;
int assert;
bool flushing;
int32_t lock_acks_expected;
int32_t unlock_acks_expected;
int32_t flush_acks_expected;
uint64_t serial_number;
int32_t type;
};
typedef struct ompi_osc_pt2pt_outstanding_lock_t ompi_osc_pt2pt_outstanding_lock_t;
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_outstanding_lock_t, opal_list_item_t,
NULL, NULL);
static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module);
static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, int lock_type, uint64_t lock_ptr);
static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock,
static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock,
int target);
/**
* Find the first outstanding lock to a target.
*
* @param[in] module - OSC PT2PT module
* @param[in] target - Target rank
*
* @returns an outstanding lock on success
*
* This function traverses the outstanding_locks list in the module
* looking for a lock that matches target. The caller must hold the
* module lock.
*/
static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock_st (ompi_osc_pt2pt_module_t *module, int target)
{
ompi_osc_pt2pt_outstanding_lock_t *outstanding_lock, *lock = NULL;
OPAL_LIST_FOREACH(outstanding_lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) {
if (outstanding_lock->target == target) {
lock = outstanding_lock;
break;
}
}
return lock;
}
static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock (ompi_osc_pt2pt_module_t *module, int target)
{
ompi_osc_pt2pt_outstanding_lock_t *lock;
OPAL_THREAD_LOCK(&module->lock);
lock = find_outstanding_lock_st (module, target);
OPAL_THREAD_UNLOCK(&module->lock);
return lock;
}
static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock_by_serial (ompi_osc_pt2pt_module_t *module, uint64_t serial_number)
{
ompi_osc_pt2pt_outstanding_lock_t *outstanding_lock, *lock = NULL;
OPAL_THREAD_LOCK(&module->lock);
OPAL_LIST_FOREACH(outstanding_lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) {
if (outstanding_lock->serial_number == serial_number) {
lock = outstanding_lock;
break;
}
}
OPAL_THREAD_UNLOCK(&module->lock);
return lock;
}
static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock)
static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
{
const int my_rank = ompi_comm_rank (module->comm);
int lock_type = lock->sync.lock.type;
bool acquired = false;
acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock->type, (uint64_t) (uintptr_t) lock);
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock);
if (!acquired) {
/* queue the lock */
queue_lock (module, my_rank, lock->type, (uint64_t) (uintptr_t) lock);
queue_lock (module, my_rank, lock_type, (uint64_t) (uintptr_t) lock);
/* If locking local, can't be non-blocking according to the
standard. We need to wait for the ack here. */
OPAL_THREAD_LOCK(&module->lock);
while (lock->lock_acks_expected) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
ompi_osc_pt2pt_sync_wait_expected (lock);
}
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
@ -151,12 +79,16 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp
return OMPI_SUCCESS;
}
static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock)
static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
{
int lock_type = lock->sync.lock.type;
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock_self: unlocking myself. lock state = %d", module->lock_status));
if (MPI_LOCK_EXCLUSIVE == lock->type) {
if (MPI_LOCK_EXCLUSIVE == lock_type) {
OPAL_THREAD_ADD32(&module->lock_status, 1);
ompi_osc_activate_next_lock (module);
} else if (0 == OPAL_THREAD_ADD32(&module->lock_status, -1)) {
@ -166,15 +98,18 @@ static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module,
/* need to ensure we make progress */
opal_progress();
OPAL_THREAD_ADD32(&lock->unlock_acks_expected, -1);
ompi_osc_pt2pt_sync_expected (lock);
}
static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock)
static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
{
int lock_type = lock->sync.lock.type;
ompi_osc_pt2pt_header_lock_t lock_req;
int ret;
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
/* generate a lock request */
lock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ;
lock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
@ -182,7 +117,7 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i
lock_req.padding[0] = 0;
lock_req.padding[1] = 0;
#endif
lock_req.lock_type = lock->type;
lock_req.lock_type = lock_type;
lock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
OSC_PT2PT_HTON(&lock_req, module, target);
@ -197,11 +132,15 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i
return ret;
}
static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock)
static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + target;
ompi_osc_pt2pt_header_unlock_t unlock_req;
int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1);
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
int lock_type = lock->sync.lock.type;
ompi_osc_pt2pt_header_unlock_t unlock_req;
int ret;
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
unlock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ;
unlock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
@ -210,7 +149,7 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module,
unlock_req.padding[1] = 0;
#endif
unlock_req.frag_count = frag_count;
unlock_req.lock_type = lock->type;
unlock_req.lock_type = lock_type;
unlock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
OSC_PT2PT_HTON(&unlock_req, module, target);
@ -225,20 +164,27 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module,
unlock_req.frag_count));
/* send control message with unlock request and count */
return ompi_osc_pt2pt_control_send (module, target, &unlock_req, sizeof (unlock_req));
ret = ompi_osc_pt2pt_control_send (module, target, &unlock_req, sizeof (unlock_req));
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
return ompi_osc_pt2pt_frag_flush_target(module, target);
}
static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock)
static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + target;
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
ompi_osc_pt2pt_header_flush_t flush_req;
int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1);
int ret;
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ;
flush_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
flush_req.frag_count = frag_count;
flush_req.serial_number = lock->serial_number;
flush_req.lock_ptr = (uint64_t) (uintptr_t) lock;
/* XXX -- TODO -- since fragment are always delivered in order we do not need to count anything but long
* requests. once that is done this can be removed. */
@ -262,16 +208,17 @@ static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module,
return ompi_osc_pt2pt_frag_flush_target (module, target);
}
static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock)
static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock)
{
int my_rank = ompi_comm_rank (module->comm);
int target = lock->target;
int assert = lock->assert;
int target = lock->sync.lock.target;
int assert = lock->sync.lock.assert;
int ret;
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
if (0 == (assert & MPI_MODE_NOCHECK)) {
lock->lock_acks_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;
lock->unlock_acks_expected = lock->lock_acks_expected;
lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;
if (my_rank != target && target != -1) {
ret = ompi_osc_pt2pt_lock_remote (module, target, lock);
@ -305,63 +252,87 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module
static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, ompi_win_t *win)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_outstanding_lock_t *lock;
ompi_osc_pt2pt_peer_t *peer = NULL;
ompi_osc_pt2pt_sync_t *lock;
int ret = OMPI_SUCCESS;
if (-1 != target) {
peer = module->peers + target;
/* Check if no_locks is set. TODO: we also need to track whether we are in an
* active target epoch. Fence can make this tricky to track. */
if (-1 == target) {
if (module->all_sync.epoch_active) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, "osc/pt2pt: attempted "
"to lock all when active target epoch is %s and lock all epoch is %s. type %d",
(OMPI_OSC_PT2PT_SYNC_TYPE_LOCK != module->all_sync.type && module->all_sync.epoch_active) ?
"active" : "inactive",
(OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type) ? "active" : "inactive",
module->all_sync.type));
return OMPI_ERR_RMA_SYNC;
}
} else {
if (module->all_sync.epoch_active && (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK != module->all_sync.type || MPI_LOCK_EXCLUSIVE == lock_type)) {
/* impossible to get an exclusive lock while holding a global shared lock or in a active
* target access epoch */
return OMPI_ERR_RMA_SYNC;
}
}
/* Check if no_locks is set. TODO: we also need to track whether we are in an
* active target epoch. Fence can make this tricky to track. */
if (module->sc_group) {
if (module->all_sync.epoch_active || (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type &&
(MPI_LOCK_EXCLUSIVE == lock_type || -1 == target))) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: attempted "
"to acquire a lock on %d with type %d when active sync is %s and lock "
"all epoch is %s", target, lock_type, module->all_sync.epoch_active ? "active" : "inactive",
(OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type &&
(MPI_LOCK_EXCLUSIVE == lock_type || -1 == target)) ? "active" : "inactive"));
return OMPI_ERR_RMA_SYNC;
}
if (OMPI_OSC_PT2PT_SYNC_TYPE_FENCE == module->all_sync.type) {
/* if not communication has occurred during a fence epoch then we can enter a lock epoch
* just need to clear the all access epoch */
module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE;
}
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc pt2pt: lock %d %d", target, lock_type));
/* create lock item */
lock = OBJ_NEW(ompi_osc_pt2pt_outstanding_lock_t);
if (OPAL_UNLIKELY(NULL == lock)) {
return OMPI_ERR_OUT_OF_RESOURCE;
if (-1 != target) {
lock = ompi_osc_pt2pt_sync_allocate (module);
if (OPAL_UNLIKELY(NULL == lock)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
} else {
lock = &module->all_sync;
}
lock->target = target;
lock->lock_acks_expected = 0;
lock->unlock_acks_expected = 0;
lock->serial_number = OPAL_THREAD_ADD64((int64_t *) &module->lock_serial_number, 1);
lock->type = lock_type;
lock->assert = assert;
lock->type = OMPI_OSC_PT2PT_SYNC_TYPE_LOCK;
lock->sync.lock.target = target;
lock->sync.lock.type = lock_type;
lock->sync.lock.assert = assert;
lock->sync_expected = 0;
/* delay all eager sends until we've heard back.. */
OPAL_THREAD_LOCK(&module->lock);
/* check for conflicting lock */
if (find_outstanding_lock_st (module, target)) {
OBJ_RELEASE(lock);
if (ompi_osc_pt2pt_module_lock_find (module, target, NULL)) {
ompi_osc_pt2pt_sync_return (lock);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_CONFLICT;
}
/* when the lock ack returns we will be in an access epoch with this peer/all peers (target = -1) */
if (-1 == target) {
module->all_access_epoch = true;
} else {
peer->access_epoch = true;
}
++module->passive_target_access_epoch;
opal_list_append(&module->outstanding_locks, &lock->super);
ompi_osc_pt2pt_module_lock_insert (module, lock);
OPAL_THREAD_UNLOCK(&module->lock);
ret = ompi_osc_pt2pt_lock_internal_execute (module, lock);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_SCOPED_LOCK(&module->lock,
opal_list_remove_item(&module->outstanding_locks, &lock->super));
OBJ_RELEASE(lock);
OPAL_THREAD_SCOPED_LOCK(&module->lock, ompi_osc_pt2pt_module_lock_remove (module, lock));
ompi_osc_pt2pt_sync_return (lock);
}
return ret;
@ -370,20 +341,15 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_outstanding_lock_t *lock = NULL;
ompi_osc_pt2pt_sync_t *lock = NULL;
int my_rank = ompi_comm_rank (module->comm);
ompi_osc_pt2pt_peer_t *peer = NULL;
int ret = OMPI_SUCCESS;
if (-1 != target) {
peer = module->peers + target;
}
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock_internal: unlocking target %d", target));
OPAL_THREAD_LOCK(&module->lock);
lock = find_outstanding_lock_st (module, target);
lock = ompi_osc_pt2pt_module_lock_find (module, target, NULL);
if (OPAL_UNLIKELY(NULL == lock)) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock: target %d is not locked in window %s",
@ -394,80 +360,65 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock_internal: lock acks still expected: %d",
lock->lock_acks_expected));
lock->sync_expected));
/* wait until ack has arrived from target */
while (lock->lock_acks_expected) {
opal_condition_wait(&module->cond, &module->lock);
}
ompi_osc_pt2pt_sync_wait_expected (lock);
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock_internal: all lock acks received"));
if (lock->assert & MPI_MODE_NOCHECK) {
/* flush instead */
ompi_osc_pt2pt_flush_lock (module, lock, target);
} else if (my_rank != target) {
if (-1 == target) {
/* send unlock messages to all of my peers */
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
if (my_rank == i) {
continue;
if (!(lock->sync.lock.assert & MPI_MODE_NOCHECK)) {
lock->sync_expected = (-1 == target) ? ompi_comm_size (module->comm) : 1;
if (my_rank != target) {
if (-1 == target) {
/* send unlock messages to all of my peers */
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
if (my_rank == i) {
continue;
}
ret = ompi_osc_pt2pt_unlock_remote (module, i, lock);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
}
ret = ompi_osc_pt2pt_unlock_remote (module, i, lock);
ompi_osc_pt2pt_unlock_self (module, lock);
} else {
ret = ompi_osc_pt2pt_unlock_remote (module, target, lock);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
}
/* wait for unlock acks. this signals remote completion of fragments */
ompi_osc_pt2pt_sync_wait_expected (lock);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock: unlock of %d complete", target));
} else {
ompi_osc_pt2pt_unlock_self (module, lock);
} else {
ret = ompi_osc_pt2pt_unlock_remote (module, target, lock);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
}
/* start all sendreqs to target */
if (-1 == target) {
ret = ompi_osc_pt2pt_frag_flush_all (module);
} else {
ret = ompi_osc_pt2pt_frag_flush_target(module, target);
}
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
/* wait for unlock acks. this signals remote completion of fragments */
OPAL_THREAD_LOCK(&module->lock);
while (lock->unlock_acks_expected) {
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock: unlock of %d complete", target));
} else {
ompi_osc_pt2pt_unlock_self (module, lock);
/* flush instead */
ompi_osc_pt2pt_flush_lock (module, lock, target);
}
OPAL_THREAD_LOCK(&module->lock);
if (-1 != target) {
peer->access_epoch = false;
if (-1 != lock->sync.lock.target) {
ompi_osc_pt2pt_sync_return (lock);
} else {
module->all_access_epoch = false;
ompi_osc_pt2pt_sync_reset (lock);
}
--module->passive_target_access_epoch;
opal_list_remove_item (&module->outstanding_locks, &lock->super);
--module->passive_target_access_epoch;
ompi_osc_pt2pt_module_lock_remove (module, lock);
OPAL_THREAD_UNLOCK(&module->lock);
OBJ_RELEASE(lock);
return ret;
}
@ -501,7 +452,7 @@ int ompi_osc_pt2pt_sync (struct ompi_win_t *win)
return OMPI_SUCCESS;
}
static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock,
static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sync_t *lock,
int target)
{
int ret;
@ -509,21 +460,14 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
/* wait until ack has arrived from target, since we need to be
able to eager send before we can transfer all the data... */
OPAL_THREAD_LOCK(&module->lock);
while (lock->lock_acks_expected && lock->flushing) {
opal_condition_wait(&module->cond, &module->lock);
}
lock->flushing = true;
ompi_osc_pt2pt_sync_wait_expected (lock);
if (-1 == target) {
lock->flush_acks_expected = ompi_comm_size(module->comm) - 1;
lock->sync_expected = ompi_comm_size(module->comm) - 1;
} else {
lock->flush_acks_expected = 1;
lock->sync_expected = 1;
}
OPAL_THREAD_UNLOCK(&module->lock);
if (-1 == target) {
/* NTH: no local flush */
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
@ -545,16 +489,9 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
}
}
/* wait for all the requests and the flush ack (meaning remote completion) */
OPAL_THREAD_LOCK(&module->lock);
while (lock->flush_acks_expected) {
opal_condition_wait(&module->cond, &module->lock);
}
lock->flushing = false;
opal_condition_broadcast(&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
/* wait for all flush acks (meaning remote completion) */
ompi_osc_pt2pt_sync_wait_expected (lock);
opal_condition_broadcast (&module->cond);
return OMPI_SUCCESS;
}
@ -562,7 +499,7 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_outstanding_lock_t *lock;
ompi_osc_pt2pt_sync_t *lock;
int ret;
assert (0 <= target);
@ -581,10 +518,14 @@ int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win)
return OMPI_SUCCESS;
}
lock = find_outstanding_lock (module, target);
OPAL_THREAD_LOCK(&module->lock);
lock = ompi_osc_pt2pt_module_lock_find (module, target, NULL);
if (NULL == lock) {
lock = find_outstanding_lock (module, -1);
if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type) {
lock = &module->all_sync;
}
}
OPAL_THREAD_UNLOCK(&module->lock);
if (OPAL_UNLIKELY(NULL == lock)) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_flush: target %d is not locked in window %s",
@ -601,12 +542,12 @@ int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win)
int ompi_osc_pt2pt_flush_all (struct ompi_win_t *win)
{
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_outstanding_lock_t *lock;
int ret = OMPI_SUCCESS;
ompi_osc_pt2pt_sync_t *lock;
int target, ret;
void *node;
/* flush is only allowed from within a passive target epoch */
if (OPAL_UNLIKELY(!module->passive_target_access_epoch ||
0 == opal_list_get_size (&module->outstanding_locks))) {
if (OPAL_UNLIKELY(!module->passive_target_access_epoch)) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_flush_all: no targets are locked in window %s",
win->w_name));
@ -617,11 +558,22 @@ int ompi_osc_pt2pt_flush_all (struct ompi_win_t *win)
"ompi_osc_pt2pt_flush_all entering..."));
/* flush all locks */
OPAL_LIST_FOREACH(lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) {
ret = ompi_osc_pt2pt_flush_lock (module, lock, lock->target);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
ret = opal_hash_table_get_first_key_uint32 (&module->outstanding_locks, (uint32_t *) &target,
(void **) &lock, &node);
if (OPAL_SUCCESS == ret) {
do {
ret = ompi_osc_pt2pt_flush_lock (module, lock, lock->sync.lock.target);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
ret = opal_hash_table_get_next_key_uint32 (&module->outstanding_locks, (uint32_t *) &target,
(void **) lock, node, &node);
if (OPAL_SUCCESS != ret) {
ret = OPAL_SUCCESS;
break;
}
} while (1);
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@ -687,7 +639,7 @@ int ompi_osc_pt2pt_flush_local_all (struct ompi_win_t *win)
static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor,
uint64_t lock_ptr)
{
ompi_osc_pt2pt_outstanding_lock_t *lock;
ompi_osc_pt2pt_sync_t *lock;
if (ompi_comm_rank (module->comm) != requestor) {
ompi_osc_pt2pt_header_lock_ack_t lock_ack;
@ -695,7 +647,6 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor,
lock_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK;
lock_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
lock_ack.source = ompi_comm_rank(module->comm);
lock_ack.windx = ompi_comm_get_cid(module->comm);
lock_ack.lock_ptr = lock_ptr;
OSC_PT2PT_HTON(&lock_ack, module, requestor);
@ -711,15 +662,13 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor,
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc pt2pt: releasing local lock"));
lock = (ompi_osc_pt2pt_outstanding_lock_t *) (uintptr_t) lock_ptr;
lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ptr;
if (OPAL_UNLIKELY(NULL == lock)) {
OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output,
"lock could not be located"));
}
if (0 == OPAL_THREAD_ADD32(&lock->lock_acks_expected, -1)) {
opal_condition_broadcast (&module->cond);
}
ompi_osc_pt2pt_sync_expected (lock);
return OMPI_SUCCESS;
}
@ -829,65 +778,48 @@ int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source,
/* initiator-side function called when the target acks the lock
request. */
void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header)
ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + lock_ack_header->source;
ompi_osc_pt2pt_outstanding_lock_t *lock;
ompi_osc_pt2pt_sync_t *lock;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_process_lock_ack: processing lock ack from %d for lock %" PRIu64,
lock_ack_header->source, lock_ack_header->lock_ptr));
lock = (ompi_osc_pt2pt_outstanding_lock_t *) (uintptr_t) lock_ack_header->lock_ptr;
lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) lock_ack_header->lock_ptr;
assert (NULL != lock);
/* no need to hold the lock to set this */
peer->eager_send_active = true;
if (0 == OPAL_THREAD_ADD32(&lock->lock_acks_expected, -1)) {
opal_condition_broadcast(&module->cond);
}
opal_condition_broadcast(&module->cond);
ompi_osc_pt2pt_sync_expected (lock);
}
void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_flush_ack_t *flush_ack_header) {
ompi_osc_pt2pt_outstanding_lock_t *lock;
ompi_osc_pt2pt_sync_t *lock;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_process_flush_ack: processing flush ack from %d for lock %" PRIu64,
source, flush_ack_header->serial_number));
"ompi_osc_pt2pt_process_flush_ack: processing flush ack from %d for lock 0x%" PRIx64,
source, flush_ack_header->lock_ptr));
/* NTH: need to verify that this will work as expected */
lock = find_outstanding_lock_by_serial (module, flush_ack_header->serial_number);
lock = (ompi_osc_pt2pt_sync_t *) (uintptr_t) flush_ack_header->lock_ptr;
assert (NULL != lock);
if (0 == OPAL_THREAD_ADD32(&lock->flush_acks_expected, -1)) {
opal_condition_broadcast(&module->cond);
}
opal_condition_broadcast(&module->cond);
ompi_osc_pt2pt_sync_expected (lock);
}
void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_unlock_ack_t *unlock_ack_header)
ompi_osc_pt2pt_header_unlock_ack_t *unlock_ack_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
ompi_osc_pt2pt_outstanding_lock_t *lock;
ompi_osc_pt2pt_sync_t *lock;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_process_unlock_ack: processing unlock ack from %d",
source));
/* NTH: need to verify that this will work as expected */
lock = (ompi_osc_pt2pt_outstanding_lock_t *) (intptr_t) unlock_ack_header->lock_ptr;
lock = (ompi_osc_pt2pt_sync_t *) (intptr_t) unlock_ack_header->lock_ptr;
assert (NULL != lock);
peer->eager_send_active = false;
if (0 == OPAL_THREAD_ADD32(&lock->unlock_acks_expected, -1)) {
opal_condition_broadcast(&module->cond);
}
ompi_osc_pt2pt_sync_expected (lock);
}
/**
@ -903,12 +835,14 @@ void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int sou
* active a pending lock if the lock becomes free.
*/
int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_unlock_t *unlock_header)
ompi_osc_pt2pt_header_unlock_t *unlock_header)
{
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
ompi_osc_pt2pt_header_unlock_ack_t unlock_ack;
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
int ret;
assert (NULL != peer);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_process_unlock entering (passive_incoming_frag_count: %d)...",
peer->passive_incoming_frag_count));
@ -950,11 +884,13 @@ int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source,
}
int ompi_osc_pt2pt_process_flush (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_flush_t *flush_header)
ompi_osc_pt2pt_header_flush_t *flush_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
ompi_osc_pt2pt_header_flush_ack_t flush_ack;
assert (NULL != peer);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_process_flush entering (passive_incoming_frag_count: %d)...",
peer->passive_incoming_frag_count));
@ -966,7 +902,7 @@ int ompi_osc_pt2pt_process_flush (ompi_osc_pt2pt_module_t *module, int source,
flush_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK;
flush_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
flush_ack.serial_number = flush_header->serial_number;
flush_ack.lock_ptr = flush_header->lock_ptr;
OSC_PT2PT_HTON(&flush_ack, module, source);
return ompi_osc_pt2pt_control_send_unbuffered (module, source, &flush_ack, sizeof (flush_ack));

90
ompi/mca/osc/pt2pt/osc_pt2pt_sync.c Обычный файл
Просмотреть файл

@ -0,0 +1,90 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "osc_pt2pt.h"
#include "osc_pt2pt_sync.h"
static void ompi_osc_pt2pt_sync_constructor (ompi_osc_pt2pt_sync_t *sync)
{
sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE;
sync->eager_send_active = false;
sync->epoch_active = false;
OBJ_CONSTRUCT(&sync->lock, opal_mutex_t);
OBJ_CONSTRUCT(&sync->cond, opal_condition_t);
}
static void ompi_osc_pt2pt_sync_destructor (ompi_osc_pt2pt_sync_t *sync)
{
OBJ_DESTRUCT(&sync->lock);
OBJ_DESTRUCT(&sync->cond);
}
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_sync_t, opal_free_list_item_t,
ompi_osc_pt2pt_sync_constructor,
ompi_osc_pt2pt_sync_destructor);
ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_sync_allocate (struct ompi_osc_pt2pt_module_t *module)
{
ompi_osc_pt2pt_sync_t *sync;
#pragma unused (module)
sync = OBJ_NEW (ompi_osc_pt2pt_sync_t);
if (OPAL_UNLIKELY(NULL == sync)) {
return NULL;
}
sync->module = module;
return sync;
}
void ompi_osc_pt2pt_sync_return (ompi_osc_pt2pt_sync_t *sync)
{
OBJ_RELEASE(sync);
}
static inline bool ompi_osc_pt2pt_sync_array_peer (int rank, ompi_osc_pt2pt_peer_t **peers, size_t nranks,
struct ompi_osc_pt2pt_peer_t **peer)
{
int mid = nranks / 2;
/* base cases */
if (0 == nranks || (1 == nranks && peers[0]->rank != rank)) {
if (peer) {
*peer = NULL;
}
return false;
} else if (peers[0]->rank == rank) {
if (peer) {
*peer = peers[0];
}
return true;
}
if (peers[mid]->rank > rank) {
return ompi_osc_pt2pt_sync_array_peer (rank, peers, mid, peer);
}
return ompi_osc_pt2pt_sync_array_peer (rank, peers + mid, nranks - mid, peer);
}
bool ompi_osc_pt2pt_sync_pscw_peer (ompi_osc_pt2pt_module_t *module, int target, struct ompi_osc_pt2pt_peer_t **peer)
{
ompi_osc_pt2pt_sync_t *pt2pt_sync = &module->all_sync;
/* check synchronization type */
if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != pt2pt_sync->type) {
if (peer) {
*peer = NULL;
}
return false;
}
return ompi_osc_pt2pt_sync_array_peer (target, pt2pt_sync->peer_list.peers, pt2pt_sync->num_peers, peer);
}

178
ompi/mca/osc/pt2pt/osc_pt2pt_sync.h Обычный файл
Просмотреть файл

@ -0,0 +1,178 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_SYNC_H
#define OMPI_OSC_PT2PT_SYNC_H
#include "ompi_config.h"
#include "opal/class/opal_free_list.h"
#include "opal/threads/threads.h"
enum ompi_osc_pt2pt_sync_type_t {
/** default value */
OMPI_OSC_PT2PT_SYNC_TYPE_NONE,
/** lock access epoch */
OMPI_OSC_PT2PT_SYNC_TYPE_LOCK,
/** fence access epoch */
OMPI_OSC_PT2PT_SYNC_TYPE_FENCE,
/* post-start-complete-wait access epoch */
OMPI_OSC_PT2PT_SYNC_TYPE_PSCW,
};
typedef enum ompi_osc_pt2pt_sync_type_t ompi_osc_pt2pt_sync_type_t;
struct ompi_osc_pt2pt_module_t;
struct ompi_osc_pt2pt_peer_t;
/**
* @brief synchronization object
*
* This structure holds information about an access epoch.
*/
struct ompi_osc_pt2pt_sync_t {
opal_free_list_item_t super;
struct ompi_osc_pt2pt_module_t *module;
/** synchronization type */
ompi_osc_pt2pt_sync_type_t type;
/** synchronization data */
union {
/** lock specific synchronization data */
struct {
/** lock target rank (-1 for all) */
int target;
/** lock type: MPI_LOCK_SHARED, MPI_LOCK_EXCLUSIVE */
int type;
/** assert specified at lock acquire time */
int assert;
} lock;
/** post/start/complete/wait specific synchronization data */
struct {
/** group passed to ompi_osc_pt2pt_start */
ompi_group_t *group;
} pscw;
} sync;
/** array of peers for this sync */
union {
/** multiple peers (lock all, pscw, fence) */
struct ompi_osc_pt2pt_peer_t **peers;
/** single peer (targeted lock) */
struct ompi_osc_pt2pt_peer_t *peer;
} peer_list;
/** number of peers */
int num_peers;
/** number of synchronization messages expected */
int32_t sync_expected;
/** eager sends are active to all peers in this access epoch */
bool eager_send_active;
/** communication has started on this epoch */
bool epoch_active;
/** lock to protect sync structure members */
opal_mutex_t lock;
/** condition variable for changes in the sync object */
opal_condition_t cond;
};
typedef struct ompi_osc_pt2pt_sync_t ompi_osc_pt2pt_sync_t;
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_sync_t);
/**
* @brief allocate a new synchronization object
*
* @param[in] module osc pt2pt module
*
* @returns NULL on failure
* @returns a new synchronization object on success
*/
ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_sync_allocate (struct ompi_osc_pt2pt_module_t *module);
/**
* @brief release a synchronization object
*
* @param[in] pt2pt_sync synchronization object allocated by ompi_osc_pt2pt_sync_allocate()
*/
void ompi_osc_pt2pt_sync_return (ompi_osc_pt2pt_sync_t *pt2pt_sync);
/**
* Check if the target is part of a PSCW access epoch
*
* @param[in] module osc pt2pt module
* @param[in] target target rank
* @param[out] peer peer object
*
* @returns false if the window is not in a PSCW access epoch or the peer is not
* in the group passed to MPI_Win_start
* @returns true otherwise
*
* This functions verifies the target is part of an active PSCW access epoch.
*/
bool ompi_osc_pt2pt_sync_pscw_peer (struct ompi_osc_pt2pt_module_t *module, int target, struct ompi_osc_pt2pt_peer_t **peer);
/**
* Wait for all remote peers in the synchronization to respond
*/
static inline void ompi_osc_pt2pt_sync_wait (ompi_osc_pt2pt_sync_t *sync)
{
OPAL_THREAD_LOCK(&sync->lock);
while (!sync->eager_send_active) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for access epoch to start"));
opal_condition_wait(&sync->cond, &sync->lock);
}
OPAL_THREAD_UNLOCK(&sync->lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"access epoch ready"));
}
/**
* Wait for all remote peers in the synchronization to respond
*/
static inline void ompi_osc_pt2pt_sync_wait_expected (ompi_osc_pt2pt_sync_t *sync)
{
OPAL_THREAD_LOCK(&sync->lock);
while (sync->sync_expected) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for %d syncronization messages",
sync->sync_expected));
opal_condition_wait(&sync->cond, &sync->lock);
}
OPAL_THREAD_UNLOCK(&sync->lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"all synchronization messages received"));
}
static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync)
{
int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1);
if (0 == new_value) {
sync->eager_send_active = true;
opal_condition_broadcast (&sync->cond);
}
}
static inline void ompi_osc_pt2pt_sync_reset (ompi_osc_pt2pt_sync_t *sync)
{
sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE;
sync->eager_send_active = 0;
sync->epoch_active = 0;
}
#endif /* OMPI_OSC_PT2PT_SYNC_H */