1
1
openmpi/ompi/mca/osc/rdma/osc_rdma_active_target.c
Nathan Hjelm 32ab6f850e osc/rdma: fix warning
cmr=v1.8.2:reviewer=rhc

This commit was SVN r32201.
2014-07-10 18:42:55 +00:00

562 строки
18 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 IBM Corporation. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma.h"
#include "osc_rdma_header.h"
#include "osc_rdma_data_move.h"
#include "osc_rdma_frag.h"
#include "mpi.h"
#include "opal/runtime/opal_progress.h"
#include "opal/threads/mutex.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/osc/base/base.h"
/**
* ompi_osc_rdma_pending_post_t:
*
* Describes a post operation that was encountered outside its
* matching start operation.
*/
struct ompi_osc_rdma_pending_post_t {
opal_list_item_t super;
int rank;
};
typedef struct ompi_osc_rdma_pending_post_t ompi_osc_rdma_pending_post_t;
OBJ_CLASS_DECLARATION(ompi_osc_rdma_pending_post_t);
OBJ_CLASS_INSTANCE(ompi_osc_rdma_pending_post_t, opal_list_item_t, NULL, NULL);
static bool group_contains_proc (ompi_group_t *group, ompi_proc_t *proc)
{
int group_size = ompi_group_size (group);
for (int i = 0 ; i < group_size ; ++i) {
ompi_proc_t *group_proc = ompi_group_peer_lookup (group, i);
/* it is safe to compare procs by pointer */
if (group_proc == proc) {
return true;
}
}
return false;
}
static int*
get_comm_ranks(ompi_osc_rdma_module_t *module,
ompi_group_t *sub_group)
{
int *ranks1 = NULL, *ranks2 = NULL;
bool success = false;
int i, ret;
ranks1 = malloc(sizeof(int) * ompi_group_size(sub_group));
if (NULL == ranks1) goto cleanup;
ranks2 = malloc(sizeof(int) * ompi_group_size(sub_group));
if (NULL == ranks2) goto cleanup;
for (i = 0 ; i < ompi_group_size(sub_group) ; ++i) {
ranks1[i] = i;
}
ret = ompi_group_translate_ranks(sub_group,
ompi_group_size(sub_group),
ranks1,
module->comm->c_local_group,
ranks2);
if (OMPI_SUCCESS != ret) goto cleanup;
success = true;
cleanup:
if (NULL != ranks1) free(ranks1);
if (!success) {
if (NULL != ranks2) free(ranks2);
ranks2 = NULL;
}
return ranks2;
}
int
ompi_osc_rdma_fence(int assert, ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
uint32_t incoming_reqs;
int ret = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc rdma: fence start"));
/* can't enter an active target epoch when in a passive target epoch */
if (module->passive_target_access_epoch) {
return OMPI_ERR_RMA_SYNC;
}
/* active sends are now active (we will close the epoch if NOSUCCEED is specified) */
if (0 == (assert & MPI_MODE_NOSUCCEED)) {
module->active_eager_send_active = true;
module->all_access_epoch = true;
}
/* short-circuit the noprecede case */
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
ret = module->comm->c_coll.coll_barrier(module->comm,
module->comm->c_coll.coll_barrier_module);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: fence end (short circuit)"));
return ret;
}
/* try to start all the requests. */
ret = ompi_osc_rdma_frag_flush_all(module);
if (OMPI_SUCCESS != ret) goto cleanup;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: fence done sending"));
/* find out how much data everyone is going to send us. */
ret = module->comm->c_coll.coll_reduce_scatter_block (module->epoch_outgoing_frag_count,
&incoming_reqs, 1, MPI_UINT32_T,
MPI_SUM, module->comm,
module->comm->c_coll.coll_reduce_scatter_block_module);
if (OMPI_SUCCESS != ret) goto cleanup;
OPAL_THREAD_LOCK(&module->lock);
bzero(module->epoch_outgoing_frag_count,
sizeof(uint32_t) * ompi_comm_size(module->comm));
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: fence expects %d requests",
incoming_reqs));
/* set our complete condition for incoming requests */
module->active_incoming_frag_signal_count += incoming_reqs;
/* wait for completion */
while (module->outgoing_frag_count != module->outgoing_frag_signal_count ||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
opal_condition_wait(&module->cond, &module->lock);
}
ret = OMPI_SUCCESS;
if (assert & MPI_MODE_NOSUCCEED) {
/* as specified in MPI-3 p 438 3-5 the fence can end an epoch. it isn't explicitly
* stated that MPI_MODE_NOSUCCEED ends the epoch but it is a safe assumption. */
module->active_eager_send_active = false;
module->all_access_epoch = false;
}
cleanup:
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc rdma: fence end: %d", ret));
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
int
ompi_osc_rdma_start(ompi_group_t *group,
int assert,
ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_osc_rdma_pending_post_t *pending_post, *next;
int group_size;
int *ranks;
OPAL_THREAD_LOCK(&module->lock);
/* ensure we're not already in a start or passive target. we can no check for all
* access here due to fence */
if (NULL != module->sc_group || module->passive_target_access_epoch) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_SYNC;
}
/* save the group */
OBJ_RETAIN(group);
ompi_group_increment_proc_count(group);
module->sc_group = group;
/* mark all procs in this group as being in an access epoch */
group_size = ompi_group_size (module->sc_group);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_start entering with group size %d...",
group_size));
ranks = get_comm_ranks(module, module->sc_group);
if (NULL == ranks) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
for (int i = 0 ; i < group_size ; ++i) {
/* when the post comes in we will be in an access epoch with this proc */
module->peers[ranks[i]].access_epoch = true;
}
free (ranks);
OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_rdma_pending_post_t) {
ompi_proc_t *pending_proc = ompi_comm_peer_lookup (module->comm, pending_post->rank);
if (group_contains_proc (module->sc_group, pending_proc)) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Consumed unexpected post message from %d",
pending_post->rank));
++module->num_post_msgs;
opal_list_remove_item (&module->pending_posts, &pending_post->super);
OBJ_RELEASE(pending_post);
}
}
/* disable eager sends until we've receved the proper number of
post messages, at which time we know all our peers are ready to
receive messages. */
module->active_eager_send_active = false;
/* possible we've already received a couple in messages, so
add however many we're going to wait for */
module->num_post_msgs -= ompi_group_size(module->sc_group);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"num_post_msgs = %d", module->num_post_msgs));
/* if we've already received all the post messages, we can eager
send. Otherwise, eager send will be enabled when
numb_post_messages reaches 0 */
if (0 == module->num_post_msgs) {
module->active_eager_send_active = true;
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_start complete"));
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_complete(ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_osc_rdma_header_complete_t complete_req;
ompi_osc_rdma_peer_t *peer;
int ret = OMPI_SUCCESS;
int i;
int *ranks = NULL;
ompi_group_t *group;
int my_rank = ompi_comm_rank (module->comm);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_complete entering..."));
if (NULL == module->sc_group) {
return OMPI_ERR_RMA_SYNC;
}
ranks = get_comm_ranks(module, module->sc_group);
if (NULL == ranks) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
OPAL_THREAD_LOCK(&module->lock);
/* wait for all the post messages */
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_complete sending complete message"));
/* for each process in group, send a control message with number
of updates coming, then start all the requests. Note that the
control send is processed as another message in a fragment, so
this might get queued until the flush_all (which is fine).
At the same time, clean out the outgoing count for the next
round. */
OPAL_THREAD_UNLOCK(&module->lock);
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
if (my_rank == ranks[i]) {
/* shortcut for self */
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_rdma_complete self complete"));
module->num_complete_msgs++;
continue;
}
complete_req.base.type = OMPI_OSC_RDMA_HDR_TYPE_COMPLETE;
complete_req.base.flags = OMPI_OSC_RDMA_HDR_FLAG_VALID;
complete_req.frag_count = module->epoch_outgoing_frag_count[ranks[i]];
peer = module->peers + ranks[i];
peer->access_epoch = false;
ret = ompi_osc_rdma_control_send(module,
ranks[i],
&complete_req,
sizeof(ompi_osc_rdma_header_complete_t));
if (OMPI_SUCCESS != ret) goto cleanup;
}
OPAL_THREAD_LOCK(&module->lock);
/* start all requests */
ret = ompi_osc_rdma_frag_flush_all(module);
if (OMPI_SUCCESS != ret) goto cleanup;
/* zero the fragment counts here to ensure they are zerod */
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
module->epoch_outgoing_frag_count[ranks[i]] = 0;
}
/* wait for outgoing requests to complete. Don't wait for incoming, as
we're only completing the access epoch, not the exposure epoch */
while (module->outgoing_frag_count != module->outgoing_frag_signal_count) {
opal_condition_wait(&module->cond, &module->lock);
}
/* phase 1 cleanup group */
group = module->sc_group;
module->sc_group = NULL;
/* unlock here, as group cleanup can take a while... */
OPAL_THREAD_UNLOCK(&(module->lock));
/* phase 2 cleanup group */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_complete complete"));
free (ranks);
return OMPI_SUCCESS;
cleanup:
if (NULL != ranks) free(ranks);
OPAL_THREAD_UNLOCK(&(module->lock));
return ret;
}
int
ompi_osc_rdma_post(ompi_group_t *group,
int assert,
ompi_win_t *win)
{
int *ranks;
int ret = OMPI_SUCCESS;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_osc_rdma_header_post_t post_req;
int my_rank = ompi_comm_rank(module->comm);
/* can't check for all access epoch here due to fence */
if (module->pw_group) {
return OMPI_ERR_RMA_SYNC;
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_post entering with group size %d...",
ompi_group_size (group)));
/* save the group */
OBJ_RETAIN(group);
ompi_group_increment_proc_count(group);
OPAL_THREAD_LOCK(&(module->lock));
/* ensure we're not already in a post */
if (NULL != module->pw_group) {
OPAL_THREAD_UNLOCK(&(module->lock));
return OMPI_ERR_RMA_SYNC;
}
module->pw_group = group;
/* Update completion counter. Can't have received any completion
messages yet; complete won't send a completion header until
we've sent a post header. */
module->num_complete_msgs = -ompi_group_size(module->pw_group);
OPAL_THREAD_UNLOCK(&(module->lock));
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"sending post messages"));
ranks = get_comm_ranks(module, module->pw_group);
if (NULL == ranks) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* send a hello counter to everyone in group */
for (int i = 0 ; i < ompi_group_size(module->pw_group) ; ++i) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Sending post message to rank %d", ranks[i]));
/* shortcut for self */
if (my_rank == ranks[i]) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_rdma_complete self post"));
osc_rdma_incoming_post (module, my_rank);
continue;
}
post_req.base.type = OMPI_OSC_RDMA_HDR_TYPE_POST;
post_req.base.flags = OMPI_OSC_RDMA_HDR_FLAG_VALID;
post_req.windx = ompi_comm_get_cid(module->comm);
/* we don't want to send any data, since we're the exposure
epoch only, so use an unbuffered send */
ret = ompi_osc_rdma_control_send_unbuffered(module, ranks[i], &post_req,
sizeof(ompi_osc_rdma_header_post_t));
if (OMPI_SUCCESS != ret) {
break;
}
}
free (ranks);
return ret;
}
int
ompi_osc_rdma_wait(ompi_win_t *win)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_group_t *group;
if (NULL == module->pw_group) {
return OMPI_ERR_RMA_SYNC;
}
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_wait entering..."));
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_complete_msgs ||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"num_complete_msgs = %d, active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d",
module->num_complete_msgs, module->active_incoming_frag_count, module->active_incoming_frag_signal_count));
opal_condition_wait(&module->cond, &module->lock);
}
group = module->pw_group;
module->pw_group = NULL;
OPAL_THREAD_UNLOCK(&module->lock);
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_wait complete"));
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_test(ompi_win_t *win,
int *flag)
{
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_group_t *group;
int ret = OMPI_SUCCESS;
#if !OMPI_ENABLE_PROGRESS_THREADS
opal_progress();
#endif
if (NULL == module->pw_group) {
return OMPI_ERR_RMA_SYNC;
}
OPAL_THREAD_LOCK(&(module->lock));
if (0 != module->num_complete_msgs ||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
*flag = 0;
ret = OMPI_SUCCESS;
goto cleanup;
} else {
*flag = 1;
group = module->pw_group;
module->pw_group = NULL;
OPAL_THREAD_UNLOCK(&(module->lock));
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}
cleanup:
OPAL_THREAD_UNLOCK(&(module->lock));
return ret;
}
int osc_rdma_incoming_post (ompi_osc_rdma_module_t *module, int source)
{
ompi_proc_t *source_proc = ompi_comm_peer_lookup (module->comm, source);
OPAL_THREAD_LOCK(&module->lock);
/* verify that this proc is part of the current start group */
if (!module->sc_group || !group_contains_proc (module->sc_group, source_proc)) {
ompi_osc_rdma_pending_post_t *pending_post = OBJ_NEW(ompi_osc_rdma_pending_post_t);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received unexpected post message from %d. module->sc_group = %p, size = %d",
source, module->sc_group, module->sc_group ? ompi_group_size (module->sc_group) : 0));
pending_post->rank = source;
opal_list_append (&module->pending_posts, &pending_post->super);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
module->num_post_msgs++;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received post message. num_post_msgs = %d", module->num_post_msgs));
if (0 == module->num_post_msgs) {
module->active_eager_send_active = true;
}
opal_condition_broadcast (&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}