1
1
This commit was SVN r27988.
Этот коммит содержится в:
Igor Usarov 2013-01-31 11:14:27 +00:00
родитель 2735658d81
Коммит 8d80af6c10
3 изменённых файлов: 117 добавлений и 42 удалений

Просмотреть файл

@ -217,6 +217,10 @@ struct mca_coll_fca_module_t {
mca_coll_base_module_t *previous_gatherv_module;
mca_coll_base_module_reduce_scatter_fn_t previous_reduce_scatter;
mca_coll_base_module_t *previous_reduce_scatter_module;
#if OMPI_FCA_VERSION >= 30
mca_coll_base_module_scatter_fn_t previous_scatter;
mca_coll_base_module_t *previous_scatter_module;
#endif
};
typedef struct mca_coll_fca_module_t mca_coll_fca_module_t;

Просмотреть файл

@ -67,7 +67,7 @@ static inline void mca_coll_fca_get_reduce_root(int root_rank, int my_rank,
spec->is_root = root_rank == my_rank;
}
#elif OMPI_FCA_VERSION >= 20
#elif OMPI_FCA_VERSION >= 20
#define OMPI_FCA_ALLGATHER 1
#define OMPI_FCA_ALLGATHERV 1
@ -76,7 +76,11 @@ static inline void mca_coll_fca_get_reduce_root(int root_rank, int my_rank,
static inline int mca_coll_fca_comm_init(fca_t *fca_context, int rank, int comm_size,
int local_proc_idx, int num_local_procs,
fca_comm_desc_t *comm_desc,
fca_comm_t **fca_comm)
fca_comm_t **fca_comm
#if OMPI_FCA_VERSION >= 30
, void *comm_init_data
#endif
)
{
fca_comm_init_spec_t spec;
@ -85,6 +89,9 @@ static inline int mca_coll_fca_comm_init(fca_t *fca_context, int rank, int comm_
spec.desc = *comm_desc;
spec.proc_idx = local_proc_idx;
spec.num_procs = num_local_procs;
#if OMPI_FCA_VERSION >= 30
spec.comm_init_data = comm_init_data;
#endif
return fca_comm_init(fca_context, &spec, fca_comm);
}

Просмотреть файл

@ -1,6 +1,5 @@
/**
Copyright (c) 2011 Mellanox Technologies. All rights reserved.
Copyright (c) 2012 Cisco Systems, Inc. All rights reserved.
$COPYRIGHT$
Additional copyrights may follow
@ -8,6 +7,7 @@
$HEADER$
*/
#include "coll_fca.h"
//#include "opal/mca/paffinity/paffinity.h"
/*
* Initial query function that is invoked during MPI_INIT, allowing
@ -89,21 +89,25 @@ static int __get_local_ranks(mca_coll_fca_module_t *fca_module)
static int __fca_comm_new(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
fca_comm_new_spec_t spec;
fca_comm_new_spec_t spec = {0,};
int info_size, all_info_size;
void *all_info, *my_info;
int *rcounts, *disps;
int i, rc, ret;
int *rcounts, *displs;
int i, rc, ret, comm_size = ompi_comm_size(fca_module->comm);
/* call fca_get_rank_info() on node managers only*/
if (fca_module->local_proc_idx == 0) {
#if OMPI_FCA_VERSION >= 30
my_info = fca_get_rank_info(mca_coll_fca_component.fca_context,
&info_size);
fca_module->rank, &info_size);
#else
my_info = fca_get_rank_info(mca_coll_fca_component.fca_context,
&info_size);
#endif
if (!my_info) {
FCA_ERROR("fca_get_rank_info returned NULL");
return OMPI_ERROR;
}
} else {
info_size = 0;
}
@ -111,7 +115,7 @@ static int __fca_comm_new(mca_coll_fca_module_t *fca_module)
/* Allocate gather buffer on the root rank */
if (fca_module->rank == 0) {
rcounts = calloc(ompi_comm_size(comm), sizeof *rcounts);
rcounts = calloc(comm_size, sizeof *rcounts);
}
/* Get all rank info sizes using MPI_Gather */
@ -123,18 +127,26 @@ static int __fca_comm_new(mca_coll_fca_module_t *fca_module)
/* Allocate buffer for gathering rank information on rank0 */
if (fca_module->rank == 0) {
all_info_size = 0;
FCA_MODULE_VERBOSE(fca_module, 1, "Total rank_info size: %d", all_info_size);
disps = calloc(ompi_comm_size(comm), sizeof *disps);
for (i = 0; i < ompi_comm_size(comm); ++i) {
disps[i] = all_info_size;
displs = calloc(comm_size, sizeof *displs);
for (i = 0; i < comm_size; ++i) {
displs[i] = all_info_size;
all_info_size += rcounts[i];
if (rcounts[i] > 0)
++spec.rank_count;
FCA_MODULE_VERBOSE(fca_module, 1, "gatherv: rcounts[%d]=%d displs[%d]=%d",
i, rcounts[i], i, displs[i]);
}
FCA_MODULE_VERBOSE(fca_module, 1, "Total rank_info size: %d", all_info_size);
all_info = calloc(all_info_size, 1);
}
/* Send all node managers information to rank0 using MPI_Gatherv*/
/* Send all node managers information to rank0 using MPI_Gatherv */
rc = comm->c_coll.coll_gatherv(my_info, info_size, MPI_BYTE,
all_info, rcounts, disps, MPI_BYTE, 0,
all_info, rcounts, displs, MPI_BYTE, 0,
comm, comm->c_coll.coll_gather_module);
if (rc != OMPI_SUCCESS) {
FCA_ERROR("Failed to gather rank information to rank0: %d", rc);
@ -145,21 +157,21 @@ static int __fca_comm_new(mca_coll_fca_module_t *fca_module)
if (fca_module->rank == 0) {
spec.rank_info = all_info;
spec.is_comm_world = comm == MPI_COMM_WORLD;
spec.rank_count = 0;
for (i = 0; i < ompi_comm_size(comm); ++i) {
FCA_MODULE_VERBOSE(fca_module, 1, "rcounts[%d]=%d disps[%d]=%d",
i, rcounts[i], i, disps[i]);
if (rcounts[i] > 0)
++spec.rank_count;
}
free(disps);
free(rcounts);
free(displs);
free(rcounts);
#if OMPI_FCA_VERSION >= 30
spec.comm_size = comm_size;
spec.comm_init_data = NULL;
spec.comm_init_data_size = 0;
#endif
FCA_MODULE_VERBOSE(fca_module, 1, "starting fca_comm_new(), rank_count: %d",
spec.rank_count);
ret = fca_comm_new(mca_coll_fca_component.fca_context,
&spec, &fca_module->fca_comm_desc);
&spec, &fca_module->fca_comm_desc);
free(all_info);
}
@ -176,7 +188,30 @@ static int __fca_comm_new(mca_coll_fca_module_t *fca_module)
FCA_ERROR("COMM_NEW failed: %s", fca_strerror(ret));
return OMPI_ERROR;
}
#if OMPI_FCA_VERSION >= 30
/* Send comm_ini_data_size to all ranks in comm */
rc = fca_module->previous_bcast(&spec.comm_init_data_size, 1, MPI_INT,
0, comm, fca_module->previous_bcast_module);
if (OMPI_SUCCESS != rc) {
FCA_ERROR("Failed to broadcast comm init data size value from rank0: %d", rc);
return rc;
}
if (0 != fca_module->rank &&
NULL == (spec.comm_init_data = calloc(1, spec.comm_init_data_size))) {
FCA_ERROR("Failed to allocate memory for comm init data.");
return OMPI_ERROR;
}
/* Send comm_ini_data to all ranks in comm */
rc = fca_module->previous_scatter(spec.comm_init_data, spec.comm_init_data_size, MPI_BYTE,
spec.comm_init_data, spec.comm_init_data_size, MPI_BYTE,
0, comm, fca_module->previous_scatter_module);
if (OMPI_SUCCESS != rc) {
FCA_ERROR("Failed to scatter comm_init sizes return value from rank0: %d", rc);
return rc;
}
#endif
/* Release allocate rank_info on node managers */
if (fca_module->local_proc_idx == 0) {
fca_free_rank_info(my_info);
@ -193,36 +228,55 @@ static int __fca_comm_new(mca_coll_fca_module_t *fca_module)
FCA_MODULE_VERBOSE(fca_module, 1, "Received FCA communicator spec, comm_id %d",
fca_module->fca_comm_desc.comm_id);
return OMPI_SUCCESS;
}
static int __create_fca_comm(mca_coll_fca_module_t *fca_module)
{
int comm_size;
int rc, ret;
rc = __fca_comm_new(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
#if OMPI_FCA_VERSION >= 30
/* allocate comm_init_spec */
FCA_MODULE_VERBOSE(fca_module, 1, "Starting COMM_INIT comm_id %d proc_idx %d num_procs %d",
fca_module->fca_comm_desc.comm_id, fca_module->local_proc_idx,
fca_module->num_local_procs);
comm_size = ompi_comm_size(fca_module->comm);
ret = mca_coll_fca_comm_init(mca_coll_fca_component.fca_context,
fca_module->rank, comm_size,
fca_module->local_proc_idx, fca_module->num_local_procs,
&fca_module->fca_comm_desc, &fca_module->fca_comm);
&fca_module->fca_comm_desc, &fca_module->fca_comm,
spec.comm_init_data);
if (ret < 0) {
FCA_ERROR("COMM_INIT failed: %s", fca_strerror(ret));
return OMPI_ERROR;
}
if (0 != fca_module->rank) {
free(spec.comm_init_data);
}
#endif
return OMPI_SUCCESS;
}
static int __create_fca_comm(mca_coll_fca_module_t *fca_module)
{
int rc, ret;
rc = __fca_comm_new(fca_module);
if (OMPI_SUCCESS != rc) {
return rc;
}
#if OMPI_FCA_VERSION < 30
/* allocate comm_init_spec */
FCA_MODULE_VERBOSE(fca_module, 1, "Starting COMM_INIT comm_id %d proc_idx %d num_procs %d",
fca_module->fca_comm_desc.comm_id, fca_module->local_proc_idx,
fca_module->num_local_procs);
ret = mca_coll_fca_comm_init(mca_coll_fca_component.fca_context,
fca_module->rank, ompi_comm_size(fca_module->comm),
fca_module->local_proc_idx, fca_module->num_local_procs,
&fca_module->fca_comm_desc, &fca_module->fca_comm);
if (ret < 0) {
FCA_ERROR("COMM_INIT failed: %s", fca_strerror(ret));
return OMPI_ERROR;
}
#endif
/* get communicator capabilities */
ret = fca_comm_get_caps(fca_module->fca_comm,
&fca_module->fca_comm_caps);
ret = fca_comm_get_caps(fca_module->fca_comm, &fca_module->fca_comm_caps);
if (ret < 0) {
FCA_ERROR("GET_COMM_CAPS failed: %s", fca_strerror(ret));
return OMPI_ERROR;
@ -278,6 +332,9 @@ static int __save_coll_handlers(mca_coll_fca_module_t *fca_module)
FCA_SAVE_PREV_COLL_API(alltoallv);
FCA_SAVE_PREV_COLL_API(alltoallw);
FCA_SAVE_PREV_COLL_API(reduce_scatter);
#if OMPI_FCA_VERSION >= 30
FCA_SAVE_PREV_COLL_API(scatter);
#endif
return OMPI_SUCCESS;
}
@ -350,6 +407,11 @@ static void mca_coll_fca_module_construct(mca_coll_fca_module_t *fca_module)
static void mca_coll_fca_module_destruct(mca_coll_fca_module_t *fca_module)
{
FCA_VERBOSE(5, "==>");
if (fca_module->fca_comm) {
__destroy_fca_comm(fca_module);
}
OBJ_RELEASE(fca_module->previous_barrier_module);
OBJ_RELEASE(fca_module->previous_bcast_module);
OBJ_RELEASE(fca_module->previous_reduce_module);
@ -362,8 +424,10 @@ static void mca_coll_fca_module_destruct(mca_coll_fca_module_t *fca_module)
OBJ_RELEASE(fca_module->previous_alltoallv_module);
OBJ_RELEASE(fca_module->previous_alltoallw_module);
OBJ_RELEASE(fca_module->previous_reduce_scatter_module);
if (fca_module->fca_comm)
__destroy_fca_comm(fca_module);
#if OMPI_FCA_VERSION >= 30
OBJ_RELEASE(fca_module->previous_scatter_module);
#endif
free(fca_module->local_ranks);
mca_coll_fca_module_clear(fca_module);
}