1
1
openmpi/ompi/mca/osc/rdma/osc_rdma_active_target.c
2015-12-17 17:39:15 -08:00

641 строка
22 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 IBM Corporation. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma.h"
#include "osc_rdma_frag.h"
#include "osc_rdma_active_target.h"
#include "mpi.h"
#include "opal/threads/mutex.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/osc/base/base.h"
/**
* ompi_osc_rdma_pending_post_t:
*
* Describes a post operation that was encountered outside it's
* matching start operation.
*/
struct ompi_osc_rdma_pending_post_t {
opal_list_item_t super;
int rank;
};
typedef struct ompi_osc_rdma_pending_post_t ompi_osc_rdma_pending_post_t;
static OBJ_CLASS_INSTANCE(ompi_osc_rdma_pending_post_t, opal_list_item_t, NULL, NULL);
/**
* Dummy completion function for atomic operations
*/
void ompi_osc_rdma_atomic_complete (mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint,
void *local_address, mca_btl_base_registration_handle_t *local_handle,
void *context, void *data, int status)
{
volatile bool *atomic_complete = (volatile bool *) context;
if (atomic_complete) {
*atomic_complete = true;
}
}
/**
* compare_ranks:
*
* @param[in] ptra Pointer to integer item
* @param[in] ptrb Pointer to integer item
*
* @returns 0 if *ptra == *ptrb
* @returns -1 if *ptra < *ptrb
* @returns 1 otherwise
*
* This function is used to sort the rank list. It can be removed if
* groups are always in order.
*/
static int compare_ranks (const void *ptra, const void *ptrb)
{
int a = *((int *) ptra);
int b = *((int *) ptrb);
if (a < b) {
return -1;
} else if (a > b) {
return 1;
}
return 0;
}
/**
* ompi_osc_rdma_get_comm_ranks:
*
* @param[in] module - OSC RDMA module
* @param[in] sub_group - Group with ranks to translate
*
* @returns an array of translated ranks on success or NULL on failure
*
* Translate the ranks given in {sub_group} into ranks in the
* communicator used to create {module}.
*/
static ompi_osc_rdma_peer_t **ompi_osc_rdma_get_peers (ompi_osc_rdma_module_t *module, ompi_group_t *sub_group)
{
int size = ompi_group_size(sub_group);
ompi_osc_rdma_peer_t **peers;
int *ranks1, *ranks2;
int ret;
ranks1 = calloc (size, sizeof(int));
ranks2 = calloc (size, sizeof(int));
peers = calloc (size, sizeof (ompi_osc_rdma_peer_t *));
if (NULL == ranks1 || NULL == ranks2 || NULL == peers) {
free (ranks1);
free (ranks2);
free (peers);
return NULL;
}
for (int i = 0 ; i < size ; ++i) {
ranks1[i] = i;
}
ret = ompi_group_translate_ranks (sub_group, size, ranks1, module->comm->c_local_group,
ranks2);
free (ranks1);
if (OMPI_SUCCESS != ret) {
free (ranks2);
free (peers);
return NULL;
}
qsort (ranks2, size, sizeof (int), compare_ranks);
for (int i = 0 ; i < size ; ++i) {
peers[i] = ompi_osc_rdma_module_peer (module, ranks2[i]);
if (NULL == peers[i]) {
free (peers);
peers = NULL;
break;
}
OBJ_RETAIN(peers[i]);
}
free (ranks2);
return peers;
}
static void ompi_osc_rdma_release_peers (ompi_osc_rdma_peer_t **peers, int npeers)
{
for (int i = 0 ; i < npeers ; ++i) {
OBJ_RELEASE(peers[i]);
}
free (peers);
}
static void ompi_osc_rdma_handle_post (ompi_osc_rdma_module_t *module, int rank, ompi_osc_rdma_peer_t **peers, int npeers) {
ompi_osc_rdma_state_t *state = module->state;
ompi_osc_rdma_pending_post_t *pending_post;
/* look for the posting peer in the group */
for (int j = 0 ; j < npeers ; ++j) {
if (rank == peers[j]->rank) {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "got expected post from %d. still expecting posts from %d processes",
rank, (int) (npeers - state->num_post_msgs - 1));
++state->num_post_msgs;
return;
}
}
/* post does not belong to this start epoch. save it for later */
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "got unexpected post from %d . queueing for later", rank);
pending_post = OBJ_NEW(ompi_osc_rdma_pending_post_t);
pending_post->rank = rank;
OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_posts, &pending_post->super));
}
int ompi_osc_rdma_post_atomic (ompi_group_t *group, int assert, ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_osc_rdma_peer_t **peers;
int my_rank = ompi_comm_rank (module->comm);
ompi_osc_rdma_state_t *state = module->state;
volatile bool atomic_complete;
ompi_osc_rdma_frag_t *frag = NULL;
osc_rdma_counter_t *temp = NULL;
int ret;
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "post: %p, %d, %s", (void*) group, assert, win->w_name);
/* check if we are already in a post epoch */
if (module->pw_group) {
return OMPI_ERR_RMA_SYNC;
}
/* save the group */
OBJ_RETAIN(group);
OPAL_THREAD_LOCK(&module->lock);
/* ensure we're not already in a post */
if (NULL != module->pw_group) {
OPAL_THREAD_UNLOCK(&(module->lock));
return OMPI_ERR_RMA_SYNC;
}
module->pw_group = group;
/* Update completion counter. Can't have received any completion
messages yet; complete won't send a completion header until
we've sent a post header. */
state->num_complete_msgs = 0;
OPAL_THREAD_UNLOCK(&module->lock);
/* allocate a temporary buffer for atomic response */
ret = ompi_osc_rdma_frag_alloc (module, 8, &frag, (char **) &temp);
if ((assert & MPI_MODE_NOCHECK) || 0 == ompi_group_size (group)) {
return OMPI_SUCCESS;
}
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* translate group ranks into the communicator */
peers = ompi_osc_rdma_get_peers (module, module->pw_group);
if (OPAL_UNLIKELY(NULL == peers)) {
ompi_osc_rdma_frag_complete (frag);
return OMPI_ERR_OUT_OF_RESOURCE;
}
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "sending post messages");
/* send a hello counter to everyone in group */
for (int i = 0 ; i < ompi_group_size(module->pw_group) ; ++i) {
ompi_osc_rdma_peer_t *peer = peers[i];
uint64_t target = (uint64_t) (intptr_t) peer->state + offsetof (ompi_osc_rdma_state_t, post_index);
int post_index;
if (peer->rank == my_rank) {
ompi_osc_rdma_handle_post (module, my_rank, NULL, 0);
continue;
}
/* get a post index */
atomic_complete = false;
if (!ompi_osc_rdma_peer_local_state (peer)) {
do {
ret = module->selected_btl->btl_atomic_fop (module->selected_btl, peer->state_endpoint, temp, target, frag->handle,
peer->state_handle, MCA_BTL_ATOMIC_ADD, 1, 0, MCA_BTL_NO_ORDER,
ompi_osc_rdma_atomic_complete, (void *) &atomic_complete, NULL);
assert (OPAL_SUCCESS >= ret);
if (OMPI_SUCCESS == ret) {
while (!atomic_complete) {
ompi_osc_rdma_progress (module);
}
break;
}
ompi_osc_rdma_progress (module);
} while (1);
} else {
*temp = ompi_osc_rdma_counter_add ((osc_rdma_counter_t *) (intptr_t) target, 1) - 1;
}
post_index = (*temp) & (OMPI_OSC_RDMA_POST_PEER_MAX - 1);
target = (uint64_t) (intptr_t) peer->state + offsetof (ompi_osc_rdma_state_t, post_peers) +
sizeof (osc_rdma_counter_t) * post_index;
do {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "attempting to post to index %d @ rank %d", post_index, peer->rank);
/* try to post. if the value isn't 0 then another rank is occupying this index */
if (!ompi_osc_rdma_peer_local_state (peer)) {
atomic_complete = false;
ret = module->selected_btl->btl_atomic_cswap (module->selected_btl, peer->state_endpoint, temp, target, frag->handle, peer->state_handle,
0, 1 + (int64_t) my_rank, 0, MCA_BTL_NO_ORDER, ompi_osc_rdma_atomic_complete,
(void *) &atomic_complete, NULL);
assert (OPAL_SUCCESS >= ret);
if (OMPI_SUCCESS == ret) {
while (!atomic_complete) {
ompi_osc_rdma_progress (module);
}
} else {
ompi_osc_rdma_progress (module);
continue;
}
} else {
*temp = !ompi_osc_rdma_lock_cmpset ((osc_rdma_counter_t *) target, 0, 1 + (osc_rdma_counter_t) my_rank);
}
if (OPAL_LIKELY(0 == *temp)) {
break;
}
/* prevent circular wait by checking for post messages received */
for (int j = 0 ; j < OMPI_OSC_RDMA_POST_PEER_MAX ; ++j) {
/* no post at this index (yet) */
if (0 == state->post_peers[j]) {
continue;
}
ompi_osc_rdma_handle_post (module, state->post_peers[j] - 1, NULL, 0);
state->post_peers[j] = 0;
}
usleep (100);
} while (1);
}
ompi_osc_rdma_frag_complete (frag);
ompi_osc_rdma_release_peers (peers, ompi_group_size(module->pw_group));
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "post complete");
return OMPI_SUCCESS;
}
int ompi_osc_rdma_start_atomic (ompi_group_t *group, int assert, ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_osc_rdma_pending_post_t *pending_post, *next;
ompi_osc_rdma_state_t *state = module->state;
ompi_osc_rdma_sync_t *sync = &module->all_sync;
int group_size = ompi_group_size (group);
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "start: %p, %d, %s", (void*) group, assert,
win->w_name);
OPAL_THREAD_LOCK(&module->lock);
/* check if we are already in an access epoch */
if (ompi_osc_rdma_access_epoch_active (module)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_SYNC;
}
/* mark all procs in this group as being in an access epoch */
sync->num_peers = ompi_group_size (group);
sync->sync.pscw.group = group;
/* haven't processed any post messaes yet */
state->num_post_msgs = 0;
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "start group size %d", sync->num_peers);
if (0 == ompi_group_size (group)) {
/* nothing more to do. this is an empty start epoch */
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
opal_atomic_wmb ();
sync->type = OMPI_OSC_RDMA_SYNC_TYPE_PSCW;
/* prevent us from entering a passive-target, fence, or another pscw access epoch until
* the matching complete is called */
sync->epoch_active = true;
/* translate the group ranks into the communicator */
sync->peer_list.peers = ompi_osc_rdma_get_peers (module, group);
if (NULL == sync->peer_list.peers) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* save the group */
OBJ_RETAIN(group);
if (!(assert & MPI_MODE_NOCHECK)) {
/* look through list of pending posts */
OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_rdma_pending_post_t) {
for (int i = 0 ; i < group_size ; ++i) {
ompi_osc_rdma_peer_t *peer = sync->peer_list.peers[i];
if (pending_post->rank == peer->rank) {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "found queued post from %d. still expecting posts "
"from %d processes", peer->rank, (int) (group_size - state->num_post_msgs - 1));
opal_list_remove_item (&module->pending_posts, &pending_post->super);
OBJ_RELEASE(pending_post);
/* only one thread can process post messages so there is no need of atomics here */
++state->num_post_msgs;
break;
}
}
}
/* wait for all post messages to arrive */
while (state->num_post_msgs != group_size) {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "waiting for post messages. have %d of %d",
(int) state->num_post_msgs, group_size);
for (int i = 0 ; i < OMPI_OSC_RDMA_POST_PEER_MAX ; ++i) {
/* no post at this index (yet) */
if (0 == state->post_peers[i]) {
continue;
}
ompi_osc_rdma_handle_post (module, state->post_peers[i] - 1, sync->peer_list.peers, group_size);
state->post_peers[i] = 0;
}
ompi_osc_rdma_progress (module);
}
} else {
state->num_post_msgs = group_size;
}
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "start complete");
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
int ompi_osc_rdma_complete_atomic (ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_osc_rdma_sync_t *sync = &module->all_sync;
ompi_osc_rdma_frag_t *frag = NULL;
ompi_osc_rdma_peer_t **peers;
void *scratch_lock = NULL;
ompi_group_t *group;
int group_size, ret;
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "complete: %s", win->w_name);
OPAL_THREAD_LOCK(&module->lock);
if (OMPI_OSC_RDMA_SYNC_TYPE_PSCW != sync->type) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_SYNC;
}
/* phase 1 cleanup sync object */
group = sync->sync.pscw.group;
group_size = sync->num_peers;
sync->type = OMPI_OSC_RDMA_SYNC_TYPE_NONE;
sync->epoch_active = false;
/* phase 2 cleanup group */
OBJ_RELEASE(group);
peers = sync->peer_list.peers;
if (NULL == peers) {
/* empty peer list */
OPAL_THREAD_UNLOCK(&(module->lock));
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}
sync->peer_list.peers = NULL;
OPAL_THREAD_UNLOCK(&(module->lock));
ompi_osc_rdma_sync_rdma_complete (sync);
if (MCA_BTL_FLAGS_ATOMIC_OPS & module->selected_btl->btl_flags) {
ret = ompi_osc_rdma_frag_alloc (module, 8, &frag, (char **) &scratch_lock);
if (OPAL_UNLIKELY(OPAL_SUCCESS != ret)) {
return ret;
}
}
/* for each process in the group increment their number of complete messages */
for (int i = 0 ; i < group_size ; ++i) {
ompi_osc_rdma_peer_t *peer = peers[i];
intptr_t target = (intptr_t) peer->state + offsetof (ompi_osc_rdma_state_t, num_complete_msgs);
if (!ompi_osc_rdma_peer_local_state (peer)) {
do {
if (MCA_BTL_FLAGS_ATOMIC_OPS & module->selected_btl->btl_flags) {
ret = module->selected_btl->btl_atomic_op (module->selected_btl, peer->state_endpoint, target, peer->state_handle,
MCA_BTL_ATOMIC_ADD, 1, 0, MCA_BTL_NO_ORDER,
ompi_osc_rdma_atomic_complete, NULL, NULL);
} else {
/* don't care about the read value so use the scratch lock */
ret = module->selected_btl->btl_atomic_fop (module->selected_btl, peer->state_endpoint, scratch_lock,
target, frag->handle, peer->state_handle, MCA_BTL_ATOMIC_ADD, 1,
0, MCA_BTL_NO_ORDER, ompi_osc_rdma_atomic_complete, NULL, NULL);
}
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
break;
}
} while (1);
} else {
(void) ompi_osc_rdma_counter_add ((osc_rdma_counter_t *) target, 1);
}
}
if (frag) {
ompi_osc_rdma_frag_complete (frag);
}
/* release our reference to peers in this group */
ompi_osc_rdma_release_peers (peers, group_size);
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "complete complete");
return OMPI_SUCCESS;
}
int ompi_osc_rdma_wait_atomic (ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_osc_rdma_state_t *state = module->state;
ompi_group_t *group;
int group_size;
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "wait: %s", win->w_name);
OPAL_THREAD_LOCK(&module->lock);
if (NULL == module->pw_group) {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "no matching post");
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_SYNC;
}
group_size = ompi_group_size (module->pw_group);
OPAL_THREAD_UNLOCK(&module->lock);
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "waiting on complete message. have %d of %d",
(int) state->num_complete_msgs, group_size);
while (group_size != state->num_complete_msgs) {
ompi_osc_rdma_progress (module);
opal_atomic_mb ();
}
OPAL_THREAD_LOCK(&module->lock);
state->num_complete_msgs = 0;
group = module->pw_group;
module->pw_group = NULL;
OPAL_THREAD_UNLOCK(&module->lock);
OBJ_RELEASE(group);
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "wait complete");
return OMPI_SUCCESS;
}
int ompi_osc_rdma_test_atomic (ompi_win_t *win, int *flag)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_osc_rdma_state_t *state = module->state;
ompi_group_t *group;
int group_size;
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "test: %s", win->w_name);
OPAL_THREAD_LOCK(&module->lock);
if (NULL == module->pw_group) {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "no matching post");
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_SYNC;
}
group_size = ompi_group_size (module->pw_group);
*flag = (group_size == state->num_complete_msgs);
OPAL_THREAD_UNLOCK(&module->lock);
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "checking on complete message. have %d of %d",
(int) state->num_complete_msgs, group_size);
if (!*flag) {
ompi_osc_rdma_progress (module);
return OMPI_SUCCESS;
}
state->num_complete_msgs = 0;
OPAL_THREAD_LOCK(&(module->lock));
group = module->pw_group;
module->pw_group = NULL;
OPAL_THREAD_UNLOCK(&(module->lock));
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}
int ompi_osc_rdma_fence_atomic (int assert, ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
int ret = OMPI_SUCCESS;
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "fence: %d, %s", assert, win->w_name);
/* can't enter an active target epoch while a lock is active */
if (ompi_osc_rdma_in_passive_epoch (module) || module->pw_group) {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "can not start fence epoch due to conflicting epoch");
return OMPI_ERR_RMA_SYNC;
}
OPAL_THREAD_LOCK(&module->lock);
/* active sends are now active (we will close the epoch if NOSUCCEED is specified) */
if (0 == (assert & MPI_MODE_NOSUCCEED)) {
module->all_sync.type = OMPI_OSC_RDMA_SYNC_TYPE_FENCE;
module->all_sync.num_peers = ompi_comm_size (module->comm);
/* NTH: should add a fast access array for peers here later. for now just use the
* hash table. */
}
/* technically it is possible to enter a lock epoch (which will close the fence epoch) if
* no communication has occurred. this flag will be set on the next put, get, accumulate, etc. */
module->all_sync.epoch_active = false;
/* short-circuit the noprecede case */
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "fence complete (short circuit)");
/* no communication can occur until a peer has entered the same fence epoch. for now
* a barrier is used to ensure this is the case. */
ret = module->comm->c_coll.coll_barrier(module->comm, module->comm->c_coll.coll_barrier_module);
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
ompi_osc_rdma_sync_rdma_complete (&module->all_sync);
/* ensure all writes to my memory are complete */
ret = module->comm->c_coll.coll_barrier(module->comm, module->comm->c_coll.coll_barrier_module);
if (assert & MPI_MODE_NOSUCCEED) {
/* as specified in MPI-3 p 438 3-5 the fence can end an epoch. it isn't explicitly
* stated that MPI_MODE_NOSUCCEED ends the epoch but it is a safe assumption. */
module->all_sync.type = OMPI_OSC_RDMA_SYNC_TYPE_NONE;
}
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "fence complete");
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}