1
1
openmpi/ompi/mca/coll/fca/coll_fca_module.c
Ralph Castain 40a2bfa238 WARNING: Work on the temp branch being merged here encountered problems with bugs in subversion. Considerable effort has gone into validating the branch. However, not all conditions can be checked, so users are cautioned that it may be advisable to not update from the trunk for a few days to allow MTT to identify platform-specific issues.
This merges the branch containing the revamped build system based around converting autogen from a bash script to a Perl program. Jeff has provided emails explaining the features contained in the change.

Please note that configure requirements on components HAVE CHANGED. For example. a configure.params file is no longer required in each component directory. See Jeff's emails for an explanation.

This commit was SVN r23764.
2010-09-17 23:04:06 +00:00

419 строки
15 KiB
C

/**
Copyright (c) 2010 Voltaire, Inc. All rights reserved.
$COPYRIGHT$
Additional copyrights may follow
$HEADER$
*/
#include "coll_fca.h"
#include "opal/mca/paffinity/paffinity.h"
/*
* Initial query function that is invoked during MPI_INIT, allowing
* this module to indicate what level of thread support it provides.
*/
int mca_coll_fca_init_query(bool enable_progress_threads,
bool enable_mpi_threads)
{
return OMPI_SUCCESS;
}
static int have_remote_peers(ompi_group_t *group, size_t size, int *local_peers)
{
ompi_proc_t *proc;
size_t i;
int ret;
*local_peers = 0;
ret = 0;
for (i = 0; i < size; ++i) {
proc = ompi_group_peer_lookup(group, i);
if (FCA_IS_LOCAL_PROCESS(proc->proc_flags)) {
++*local_peers;
} else {
ret = 1;
}
}
return ret;
}
static inline ompi_proc_t* __local_rank_lookup(ompi_communicator_t *comm, int rank)
{
return ompi_group_peer_lookup(comm->c_local_group, rank);
}
/**
* Fills local rank information in fca_module.
*/
static int __get_local_ranks(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
ompi_proc_t* proc;
int rank, index;
/* Count local ranks */
fca_module->num_local_procs = 0;
for (rank = 0; rank < ompi_comm_size(comm); ++rank) {
proc = __local_rank_lookup(comm, rank);
if (FCA_IS_LOCAL_PROCESS(proc->proc_flags))
++fca_module->num_local_procs;
FCA_MODULE_VERBOSE(fca_module, 4, "rank %d flags 0x%x host %s", rank,
proc->proc_flags,
proc->proc_hostname);
}
fca_module->local_ranks = calloc(fca_module->num_local_procs, sizeof *fca_module->local_ranks);
/* Get local ranks */
index = 0;
for (rank = 0; rank< ompi_comm_size(comm); ++rank) {
proc = __local_rank_lookup(comm, rank);
if (!FCA_IS_LOCAL_PROCESS(proc->proc_flags))
continue;
if (rank == fca_module->rank)
fca_module->local_proc_idx = index;
fca_module->local_ranks[index] = rank;
++index;
}
FCA_MODULE_VERBOSE(fca_module, 3, "num_local_ranks: %d, node_root: %d",
fca_module->num_local_procs, fca_module->local_ranks[0]);
return OMPI_SUCCESS;
}
int __fca_comm_new(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
fca_comm_new_spec_t spec;
int info_size, all_info_size;
void *all_info, *my_info;
int *rcounts, *disps;
int i, rc, ret;
/* call fca_get_rank_info() on node managers only*/
if (fca_module->local_proc_idx == 0) {
my_info = mca_coll_fca_component.fca_ops.get_rank_info(mca_coll_fca_component.fca_context,
&info_size);
if (!my_info) {
FCA_ERROR("fca_get_rank_info returned NULL");
return OMPI_ERROR;
}
} else {
info_size = 0;
}
FCA_MODULE_VERBOSE(fca_module, 1, "Info size: %d", info_size);
/* Get all rank info sizes using MPI_Gather */
if (fca_module->rank == 0)
rcounts = calloc(ompi_comm_size(comm), sizeof *rcounts);
rc = comm->c_coll.coll_gather(&info_size, 1, MPI_INT, rcounts, 1, MPI_INT, 0,
comm, comm->c_coll.coll_gather_module);
if (rc != OMPI_SUCCESS)
return rc;
/* Rank0 allocates buffers */
if (fca_module->rank == 0) {
FCA_MODULE_VERBOSE(fca_module, 1, "Total rank_info size: %d", all_info_size);
all_info_size = 0;
disps = calloc(ompi_comm_size(comm), sizeof *disps);
for (i = 0; i < ompi_comm_size(comm); ++i) {
disps[i] = all_info_size;
all_info_size += rcounts[i];
}
all_info = calloc(all_info_size, 1);
}
/* Send all node managers information to rank0 using MPI_Gatherv*/
rc = comm->c_coll.coll_gatherv(my_info, info_size, MPI_BYTE,
all_info, rcounts, disps, MPI_BYTE, 0,
comm, comm->c_coll.coll_gather_module);
if (rc != OMPI_SUCCESS) {
FCA_ERROR("Failed to gather rank information to rank0: %d", rc);
return rc;
}
/* Rank0 calls fca_comm_new() and fills fca_comm_spec filed */
if (fca_module->rank == 0) {
spec.rank_info = all_info;
spec.is_comm_world = comm == MPI_COMM_WORLD;
spec.rank_count = 0;
for (i = 0; i < ompi_comm_size(comm); ++i) {
FCA_MODULE_VERBOSE(fca_module, 1, "rcounts[%d]=%d disps[%d]=%d",
i, rcounts[i], i, disps[i]);
if (rcounts[i] > 0)
++spec.rank_count;
}
FCA_MODULE_VERBOSE(fca_module, 1, "starting fca_comm_new(), rank_count: %d",
spec.rank_count);
ret = mca_coll_fca_component.fca_ops.comm_new (mca_coll_fca_component.fca_context,
&spec,
&fca_module->fca_comm_desc);
if (ret < 0) {
FCA_ERROR("COMM_NEW failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
FCA_MODULE_VERBOSE(fca_module, 1, "rank 0: Received FCA communicator, comm_id %d",
fca_module->fca_comm_desc.comm_id);
free(disps);
free(rcounts);
free(all_info);
}
/* Release allocate rank_info on node managers */
if (fca_module->local_proc_idx == 0)
mca_coll_fca_component.fca_ops.free_rank_info(my_info);
/* Pass fca_comm_desc to all ranks using MPI_Bcast */
rc = fca_module->previous_bcast(&fca_module->fca_comm_desc,
sizeof(fca_module->fca_comm_desc), MPI_BYTE, 0,
comm, fca_module->previous_bcast_module);
if (rc != OMPI_SUCCESS) {
FCA_ERROR("Failed to broadcast comm_desc from rank0: %d", rc);
return rc;
}
FCA_MODULE_VERBOSE(fca_module, 1, "Received FCA communicator spec, comm_id %d",
fca_module->fca_comm_desc.comm_id);
return OMPI_SUCCESS;
}
static int __create_fca_comm(mca_coll_fca_module_t *fca_module)
{
fca_comm_desc_t comm_desc;
int rc, ret;
rc = __fca_comm_new(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
FCA_MODULE_VERBOSE(fca_module, 1, "Starting COMM_INIT comm_id %d proc_idx %d num_procs %d",
fca_module->fca_comm_desc.comm_id, fca_module->local_proc_idx,
fca_module->num_local_procs);
ret = mca_coll_fca_component.fca_ops.comm_init(mca_coll_fca_component.fca_context,
fca_module->local_proc_idx,
fca_module->num_local_procs,
ompi_comm_size(fca_module->comm),
&fca_module->fca_comm_desc,
&fca_module->fca_comm);
if (ret < 0) {
FCA_ERROR("COMM_INIT failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
/* get communicator capabilities */
ret = mca_coll_fca_component.fca_ops.comm_get_caps(fca_module->fca_comm,
&fca_module->fca_comm_caps);
if (ret < 0) {
FCA_ERROR("GET_COMM_CAPS failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
/* by this point every rank in the communicator is set up */
FCA_MODULE_VERBOSE(fca_module, 1, "Initialized FCA communicator, comm_id %d",
fca_module->fca_comm_desc.comm_id);
return OMPI_SUCCESS;
}
static void __destroy_fca_comm(mca_coll_fca_module_t *fca_module)
{
int ret;
mca_coll_fca_component.fca_ops.comm_destroy(fca_module->fca_comm);
if (fca_module->rank == 0) {
ret = mca_coll_fca_component.fca_ops.comm_end(mca_coll_fca_component.fca_context,
fca_module->fca_comm_desc.comm_id);
if (ret < 0) {
FCA_ERROR("COMM_END failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
}
}
FCA_MODULE_VERBOSE(fca_module, 1, "Destroyed FCA communicator, comm_id %d",
fca_module->fca_comm_desc.comm_id);
}
static int __save_coll_handlers(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
if (!comm->c_coll.coll_reduce || !comm->c_coll.coll_reduce_module ||
!comm->c_coll.coll_allreduce || !comm->c_coll.coll_allreduce_module ||
!comm->c_coll.coll_bcast || !comm->c_coll.coll_bcast_module ||
!comm->c_coll.coll_barrier || !comm->c_coll.coll_barrier_module) {
FCA_VERBOSE(1, "(%d/%s): no underlying reduce; disqualifying myself",
comm->c_contextid, comm->c_name);
return OMPI_ERROR;
}
fca_module->previous_allreduce = comm->c_coll.coll_allreduce;
fca_module->previous_allreduce_module = comm->c_coll.coll_allreduce_module;
OBJ_RETAIN(fca_module->previous_allreduce_module);
FCA_VERBOSE(14, "saving fca_module->previous_allreduce_module=%p, fca_module->previous_allreduce=%p, fca_module=%p,fca_module->super.coll_allreduce=%p",
fca_module->previous_allreduce_module, fca_module->previous_allreduce, fca_module, fca_module->super.coll_allreduce);
fca_module->previous_reduce = comm->c_coll.coll_reduce;
fca_module->previous_reduce_module = comm->c_coll.coll_reduce_module;
OBJ_RETAIN(fca_module->previous_reduce_module);
FCA_VERBOSE(14, "saving fca_module->previous_reduce_module=%p, fca_module->previous_reduce=%p, fca_module=%p,fca_module->super.coll_reduce=%p",
fca_module->previous_reduce_module, fca_module->previous_reduce, fca_module, fca_module->super.coll_reduce);
fca_module->previous_bcast = comm->c_coll.coll_bcast;
fca_module->previous_bcast_module = comm->c_coll.coll_bcast_module;
OBJ_RETAIN(fca_module->previous_bcast_module);
FCA_VERBOSE(14, "saving fca_module->bcast=%p, fca_module->bcast_module=%p, fca_module=%p, fca_module->super.coll_bcast=%p",
fca_module->previous_bcast, fca_module->previous_bcast_module, fca_module, fca_module->super.coll_bcast);
fca_module->previous_barrier = comm->c_coll.coll_barrier;
fca_module->previous_barrier_module = comm->c_coll.coll_barrier_module;
OBJ_RETAIN(fca_module->previous_barrier_module);
FCA_VERBOSE(14, "saving fca_module->barrier=%p, fca_module->barrier_module=%p, fca_module=%p, fca_module->super.coll_barrier=%p",
fca_module->previous_barrier, fca_module->previous_barrier_module, fca_module, fca_module->super.coll_barrier);
return OMPI_SUCCESS;
}
/*
* Initialize module on the communicator
*/
static int mca_coll_fca_module_enable(mca_coll_base_module_t *module,
struct ompi_communicator_t *comm)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*) module;
int rc;
fca_module->comm = comm;
fca_module->rank = ompi_comm_rank(comm);
rc = mca_coll_fca_get_fca_lib(comm);
if (rc != OMPI_SUCCESS)
return rc;
rc = __save_coll_handlers(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
rc = __get_local_ranks(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
rc = __create_fca_comm(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
FCA_MODULE_VERBOSE(fca_module, 1, "FCA Module initialized");
return OMPI_SUCCESS;
}
static int mca_coll_fca_ft_event(int state)
{
return OMPI_SUCCESS;
}
static void mca_coll_fca_module_clear(mca_coll_fca_module_t *fca_module)
{
fca_module->num_local_procs = 0;
fca_module->local_ranks = NULL;
fca_module->fca_comm = NULL;
fca_module->previous_allreduce = NULL;
fca_module->previous_reduce = NULL;
fca_module->previous_bcast = NULL;
fca_module->previous_barrier = NULL;
}
static void mca_coll_fca_module_construct(mca_coll_fca_module_t *fca_module)
{
FCA_VERBOSE(5, "==>");
mca_coll_fca_module_clear(fca_module);
}
static void mca_coll_fca_module_destruct(mca_coll_fca_module_t *fca_module)
{
FCA_VERBOSE(5, "==>");
int rc = OMPI_SUCCESS;
OBJ_RELEASE(fca_module->previous_allreduce_module);
OBJ_RELEASE(fca_module->previous_reduce_module);
OBJ_RELEASE(fca_module->previous_bcast_module);
OBJ_RELEASE(fca_module->previous_barrier_module);
if (fca_module->fca_comm)
__destroy_fca_comm(fca_module);
free(fca_module->local_ranks);
mca_coll_fca_module_clear(fca_module);
}
/*
* Invoked when there's a new communicator that has been created.
* Look at the communicator and decide which set of functions and
* priority we want to return.
*/
mca_coll_base_module_t *
mca_coll_fca_comm_query(struct ompi_communicator_t *comm, int *priority)
{
mca_coll_base_module_t *module;
int size = ompi_comm_size(comm);
int local_peers;
*priority = 0;
module = NULL;
if (!mca_coll_fca_component.fca_enable)
goto exit;
if (size < mca_coll_fca_component.fca_np)
goto exit;
if (!have_remote_peers(comm->c_local_group, size, &local_peers) || OMPI_COMM_IS_INTER(comm))
goto exit;
mca_coll_fca_module_t *fca_module = OBJ_NEW(mca_coll_fca_module_t);
if (!fca_module)
goto exit;
fca_module->super.coll_module_enable = mca_coll_fca_module_enable;
fca_module->super.ft_event = mca_coll_fca_ft_event;
fca_module->super.coll_allgather = NULL;
fca_module->super.coll_allgatherv = NULL;
fca_module->super.coll_allreduce = mca_coll_fca_allreduce;
fca_module->super.coll_alltoall = NULL;
fca_module->super.coll_alltoallv = NULL;
fca_module->super.coll_alltoallw = NULL;
fca_module->super.coll_barrier = mca_coll_fca_barrier;
fca_module->super.coll_bcast = mca_coll_fca_bcast;
fca_module->super.coll_exscan = NULL;
fca_module->super.coll_gather = NULL;
fca_module->super.coll_gatherv = NULL;
fca_module->super.coll_reduce = mca_coll_fca_reduce;
fca_module->super.coll_reduce_scatter = NULL;
fca_module->super.coll_scan = NULL;
fca_module->super.coll_scatter = NULL;
fca_module->super.coll_scatterv = NULL;
*priority = mca_coll_fca_component.fca_priority;
module = &fca_module->super;
exit:
FCA_VERBOSE(4, "Query FCA module for comm %p size %d rank %d local_peers=%d: priority=%d %s",
comm, size, ompi_comm_rank(comm), local_peers,
*priority, module ? "enabled" : "disabled");
return module;
}
OBJ_CLASS_INSTANCE(mca_coll_fca_module_t,
mca_coll_base_module_t,
mca_coll_fca_module_construct,
mca_coll_fca_module_destruct);