1
1

revert r23764 in ompi/mca/coll/fca

This commit was SVN r23771.

The following SVN revision numbers were found above:
  r23764 --> open-mpi/ompi@40a2bfa238
Этот коммит содержится в:
Mike Dubman 2010-09-20 06:06:45 +00:00
родитель b61cefc8c7
Коммит bd9a1f28a3
7 изменённых файлов: 487 добавлений и 135 удалений

Просмотреть файл

@ -24,7 +24,7 @@ coll_fca_sources = \
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_ompi_coll_fca_DSO
if OMPI_BUILD_coll_fca_DSO
component_noinst =
component_install = mca_coll_fca.la
else

Просмотреть файл

@ -60,8 +60,7 @@ struct mca_coll_fca_fca_ops_t {
void (*free_rank_info)(void *rank_info);
/* Local communicator creation */
int (*comm_init)(fca_t *context, int proc_idx, int num_procs, int comm_size,
fca_comm_desc_t *comm_desc, fca_comm_t** fca_comm);
int (*comm_init)(fca_t *context, fca_comm_init_spec_t *spec, fca_comm_t** fca_comm);
void (*comm_destroy)(fca_comm_t *comm);
int (*comm_get_caps)(fca_comm_t *comm, fca_comm_caps_t *caps);
@ -69,10 +68,13 @@ struct mca_coll_fca_fca_ops_t {
int (*do_reduce)(fca_comm_t *comm, fca_reduce_spec_t *spec);
int (*do_all_reduce)(fca_comm_t *comm, fca_reduce_spec_t *spec);
int (*do_bcast)(fca_comm_t *comm, fca_bcast_spec_t *spec);
int (*do_allgather)(fca_comm_t *comm, fca_gather_spec_t *spec);
int (*do_allgatherv)(fca_comm_t *comm, fca_gatherv_spec_t *spec);
int (*do_barrier)(fca_comm_t *comm);
/* Helper functions */
unsigned long (*get_version)(void);
char * (*get_version_string)(void);
int (*maddr_ib_pton)(const char *mlid_str, const char *mgid_str, fca_mcast_addr_t *dst);
int (*maddr_inet_pton)(int af, const char *src, fca_mcast_addr_t *dst);
fca_init_spec_t *(*parse_spec_file)(char* spec_ini_file);
@ -141,6 +143,42 @@ struct mca_coll_fca_component_t {
/** MCA parameter: Enable FCA */
int fca_enable;
/** MCA parameter: Enable FCA Barrier */
int fca_enable_barrier;
/** MCA parameter: Enable FCA Bcast */
int fca_enable_bcast;
/** MCA parameter: Enable FCA Reduce */
int fca_enable_reduce;
/** MCA parameter: Enable FCA Reduce_Scatter */
int fca_enable_reduce_scatter;
/** MCA parameter: Enable FCA Allreduce */
int fca_enable_allreduce;
/** MCA parameter: Enable FCA Allgather */
int fca_enable_allgather;
/** MCA parameter: Enable FCA Allgatherv */
int fca_enable_allgatherv;
/** MCA parameter: Enable FCA Gather */
int fca_enable_gather;
/** MCA parameter: Enable FCA Gatherv */
int fca_enable_gatherv;
/** MCA parameter: Enable FCA AlltoAll */
int fca_enable_alltoall;
/** MCA parameter: Enable FCA AlltoAllv */
int fca_enable_alltoallv;
/** MCA parameter: Enable FCA AlltoAllw */
int fca_enable_alltoallw;
/** MCA parameter: FCA NP */
int fca_np;
@ -180,7 +218,22 @@ struct mca_coll_fca_module_t {
mca_coll_base_module_t *previous_bcast_module;
mca_coll_base_module_barrier_fn_t previous_barrier;
mca_coll_base_module_t *previous_barrier_module;
mca_coll_base_module_allgather_fn_t previous_allgather;
mca_coll_base_module_t *previous_allgather_module;
mca_coll_base_module_allgatherv_fn_t previous_allgatherv;
mca_coll_base_module_t *previous_allgatherv_module;
mca_coll_base_module_alltoall_fn_t previous_alltoall;
mca_coll_base_module_t *previous_alltoall_module;
mca_coll_base_module_alltoallv_fn_t previous_alltoallv;
mca_coll_base_module_t *previous_alltoallv_module;
mca_coll_base_module_alltoallw_fn_t previous_alltoallw;
mca_coll_base_module_t *previous_alltoallw_module;
mca_coll_base_module_gather_fn_t previous_gather;
mca_coll_base_module_t *previous_gather_module;
mca_coll_base_module_gatherv_fn_t previous_gatherv;
mca_coll_base_module_t *previous_gatherv_module;
mca_coll_base_module_reduce_scatter_fn_t previous_reduce_scatter;
mca_coll_base_module_t *previous_reduce_scatter_module;
};
typedef struct mca_coll_fca_module_t mca_coll_fca_module_t;
@ -209,7 +262,58 @@ int mca_coll_fca_reduce(void *sbuf, void* rbuf, int count,
int mca_coll_fca_barrier(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_allgather(void *sbuf, int scount, struct ompi_datatype_t *sdtype,
void *rbuf, int rcount, struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_allgatherv(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int *rcounts, int *disps,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_alltoall(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_alltoallv(void *sbuf, int *scounts, int *sdisps,
struct ompi_datatype_t *sdtype,
void *rbuf, int *rcounts, int *rdisps,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_alltoallw(void *sbuf, int *scounts, int *sdisps,
struct ompi_datatype_t **sdtypes,
void *rbuf, int *rcounts, int *rdisps,
struct ompi_datatype_t **rdtypes,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_gather(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_gatherv(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int *rcounts, int *disps,
struct ompi_datatype_t *rdtype, int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_reduce_scatter(void *sbuf, void *rbuf, int *rcounts,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
END_C_DECLS

Просмотреть файл

@ -65,7 +65,8 @@ mca_coll_fca_component_t mca_coll_fca_component = {
#define FCA_MINOR_BIT (16UL)
#define FCA_MAJOR_BIT (24UL)
#define FCA_API_ABI_MAJOR (2)
#define FCA_API_ABI_MINOR (0)
#define FCA_API_CLEAR_MICRO(__x) ((__x>>FCA_MINOR_BIT)<<FCA_MINOR_BIT)
#define FCA_API_VER(__major,__minor) (__major<<FCA_MAJOR_BIT | __minor<<FCA_MINOR_BIT)
@ -94,7 +95,8 @@ static int mca_coll_fca_mpi_progress_cb(void)
if (!mca_coll_fca_component.fca_context)
return 0;
mca_coll_fca_component.fca_ops.progress(mca_coll_fca_component.fca_context);
if (mca_coll_fca_component.fca_ops.progress)
mca_coll_fca_component.fca_ops.progress(mca_coll_fca_component.fca_context);
return 0;
}
@ -103,7 +105,7 @@ static int mca_coll_fca_mpi_progress_cb(void)
*/
static void mca_coll_fca_init_fca_translations(void)
{
int i, ret;
int i;
for (i = 0; i < FCA_DT_MAX_PREDEFINED; ++i) {
mca_coll_fca_component.fca_dtypes[i].mpi_dtype = MPI_DATATYPE_NULL;
@ -136,10 +138,20 @@ int mca_coll_fca_get_fca_lib(struct ompi_communicator_t *comm)
FCA_VERBOSE(1, "FCA Loaded from: %s", mca_coll_fca_component.fca_lib_path);
GET_FCA_SYM(get_version);
GET_FCA_SYM(get_version_string);
fca_ver = FCA_API_CLEAR_MICRO(mca_coll_fca_component.fca_ops.get_version());
if (fca_ver < FCA_API_VER(FCA_API_ABI_MAJOR,FCA_API_ABI_MINOR)) {
FCA_ERROR("Unsupported FCA version: %s Please upgrade FCA to at least v%d.%d",
mca_coll_fca_component.fca_ops.get_version_string(),
FCA_API_ABI_MAJOR,
FCA_API_ABI_MINOR);
return OMPI_ERROR;
}
GET_FCA_SYM(init);
GET_FCA_SYM(cleanup);
GET_FCA_SYM(progress);
GET_FCA_SYM(comm_new);
GET_FCA_SYM(comm_end);
GET_FCA_SYM(get_rank_info);
@ -151,6 +163,8 @@ int mca_coll_fca_get_fca_lib(struct ompi_communicator_t *comm)
GET_FCA_SYM(do_all_reduce);
GET_FCA_SYM(do_bcast);
GET_FCA_SYM(do_barrier);
GET_FCA_SYM(do_allgather);
GET_FCA_SYM(do_allgatherv);
GET_FCA_SYM(maddr_ib_pton);
GET_FCA_SYM(maddr_inet_pton);
GET_FCA_SYM(parse_spec_file);
@ -179,18 +193,13 @@ int mca_coll_fca_get_fca_lib(struct ompi_communicator_t *comm)
mca_coll_fca_component.fca_ops.free_init_spec(spec);
mca_coll_fca_init_fca_translations();
if (fca_ver > FCA_API_VER(1,2)) {
GET_FCA_SYM(progress);
opal_progress_register(mca_coll_fca_mpi_progress_cb);
}
opal_progress_register(mca_coll_fca_mpi_progress_cb);
return OMPI_SUCCESS;
}
static void mca_coll_fca_close_fca_lib(void)
{
if (NULL != mca_coll_fca_component.fca_ops.progress) {
opal_progress_unregister(mca_coll_fca_mpi_progress_cb);
}
opal_progress_unregister(mca_coll_fca_mpi_progress_cb);
mca_coll_fca_component.fca_ops.cleanup(mca_coll_fca_component.fca_context);
mca_coll_fca_component.fca_context = NULL;
dlclose(mca_coll_fca_component.fca_lib_handle);
@ -239,6 +248,80 @@ static int fca_register(void)
64,
&mca_coll_fca_component.fca_np);
mca_base_param_reg_int(c, "enable_barrier",
"[1|0|] Enable/Disable FCA Barrier support",
false, false,
1,
&mca_coll_fca_component.fca_enable_barrier);
mca_base_param_reg_int(c, "enable_bcast",
"[1|0|] Enable/Disable FCA Bcast support",
false, false,
1,
&mca_coll_fca_component.fca_enable_bcast);
mca_base_param_reg_int(c, "enable_reduce",
"[1|0|] Enable/Disable FCA Reduce support",
false, false,
1,
&mca_coll_fca_component.fca_enable_reduce);
mca_base_param_reg_int(c, "enable_reduce_scatter",
"[1|0|] Enable/Disable FCA Reduce support",
false, false,
0,
&mca_coll_fca_component.fca_enable_reduce_scatter);
mca_base_param_reg_int(c, "enable_allreduce",
"[1|0|] Enable/Disable FCA Allreduce support",
false, false,
1,
&mca_coll_fca_component.fca_enable_allreduce);
mca_base_param_reg_int(c, "enable_allgather",
"[1|0|] Enable/Disable FCA Allgather support",
false, false,
0,
&mca_coll_fca_component.fca_enable_allgather);
mca_base_param_reg_int(c, "enable_allgatherv",
"[1|0|] Enable/Disable FCA Allgatherv support",
false, false,
0,
&mca_coll_fca_component.fca_enable_allgatherv);
mca_base_param_reg_int(c, "enable_gather",
"[1|0|] Enable/Disable FCA Gather support",
false, false,
0,
&mca_coll_fca_component.fca_enable_gather);
mca_base_param_reg_int(c, "enable_gatherv",
"[1|0|] Enable/Disable FCA Gatherv support",
false, false,
0,
&mca_coll_fca_component.fca_enable_gatherv);
mca_base_param_reg_int(c, "enable_alltoall",
"[1|0|] Enable/Disable FCA AlltoAll support",
false, false,
0,
&mca_coll_fca_component.fca_enable_alltoall);
mca_base_param_reg_int(c, "enable_alltoallv",
"[1|0|] Enable/Disable FCA AlltoAllv support",
false, false,
0,
&mca_coll_fca_component.fca_enable_alltoallv);
mca_base_param_reg_int(c, "enable_alltoallw",
"[1|0|] Enable/Disable FCA AlltoAllw support",
false, false,
0,
&mca_coll_fca_component.fca_enable_alltoallw);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -50,37 +50,23 @@ static int __get_local_ranks(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
ompi_proc_t* proc;
int rank, index;
int rank;
/* Count local ranks */
fca_module->num_local_procs = 0;
for (rank = 0; rank < ompi_comm_size(comm); ++rank) {
proc = __local_rank_lookup(comm, rank);
if (FCA_IS_LOCAL_PROCESS(proc->proc_flags))
++fca_module->num_local_procs;
FCA_MODULE_VERBOSE(fca_module, 4, "rank %d flags 0x%x host %s", rank,
proc->proc_flags,
proc->proc_hostname);
}
fca_module->local_ranks = calloc(fca_module->num_local_procs, sizeof *fca_module->local_ranks);
/* Get local ranks */
index = 0;
for (rank = 0; rank< ompi_comm_size(comm); ++rank) {
proc = __local_rank_lookup(comm, rank);
if (!FCA_IS_LOCAL_PROCESS(proc->proc_flags))
continue;
if (rank == fca_module->rank)
fca_module->local_proc_idx = index;
fca_module->local_ranks[index] = rank;
++index;
if (rank == fca_module->rank) {
fca_module->local_proc_idx = fca_module->num_local_procs;
}
++fca_module->num_local_procs;
}
FCA_MODULE_VERBOSE(fca_module, 3, "num_local_ranks: %d, node_root: %d",
fca_module->num_local_procs, fca_module->local_ranks[0]);
FCA_MODULE_VERBOSE(fca_module, 3, "i am %d/%d", fca_module->local_proc_idx,
fca_module->num_local_procs);
return OMPI_SUCCESS;
}
@ -187,23 +173,28 @@ int __fca_comm_new(mca_coll_fca_module_t *fca_module)
static int __create_fca_comm(mca_coll_fca_module_t *fca_module)
{
fca_comm_desc_t comm_desc;
int rc, ret;
fca_comm_init_spec_t spec;
int rc, ret, node_root;
int comm_size;
rc = __fca_comm_new(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
/* allocate comm_init_spec */
comm_size = ompi_comm_size(fca_module->comm);
spec.rank = fca_module->rank;
spec.size = comm_size;
spec.desc = fca_module->fca_comm_desc;
spec.proc_idx = fca_module->local_proc_idx;
spec.num_procs = fca_module->num_local_procs;
FCA_MODULE_VERBOSE(fca_module, 1, "Starting COMM_INIT comm_id %d proc_idx %d num_procs %d",
fca_module->fca_comm_desc.comm_id, fca_module->local_proc_idx,
fca_module->num_local_procs);
ret = mca_coll_fca_component.fca_ops.comm_init(mca_coll_fca_component.fca_context,
fca_module->local_proc_idx,
fca_module->num_local_procs,
ompi_comm_size(fca_module->comm),
&fca_module->fca_comm_desc,
&fca_module->fca_comm);
&spec, &fca_module->fca_comm);
if (ret < 0) {
FCA_ERROR("COMM_INIT failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
@ -240,43 +231,33 @@ static void __destroy_fca_comm(mca_coll_fca_module_t *fca_module)
fca_module->fca_comm_desc.comm_id);
}
#define FCA_SAVE_PREV_COLL_API(__api) do {\
fca_module->previous_ ## __api = comm->c_coll.coll_ ## __api;\
fca_module->previous_ ## __api ## _module = comm->c_coll.coll_ ## __api ## _module;\
OBJ_RETAIN(fca_module->previous_ ## __api ## _module);\
if (!comm->c_coll.coll_ ## __api || !comm->c_coll.coll_ ## __api ## _module) {\
FCA_VERBOSE(1, "(%d/%s): no underlying " # __api"; disqualifying myself",\
comm->c_contextid, comm->c_name);\
return OMPI_ERROR;\
}\
} while(0)
static int __save_coll_handlers(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
if (!comm->c_coll.coll_reduce || !comm->c_coll.coll_reduce_module ||
!comm->c_coll.coll_allreduce || !comm->c_coll.coll_allreduce_module ||
!comm->c_coll.coll_bcast || !comm->c_coll.coll_bcast_module ||
!comm->c_coll.coll_barrier || !comm->c_coll.coll_barrier_module) {
FCA_VERBOSE(1, "(%d/%s): no underlying reduce; disqualifying myself",
comm->c_contextid, comm->c_name);
return OMPI_ERROR;
}
fca_module->previous_allreduce = comm->c_coll.coll_allreduce;
fca_module->previous_allreduce_module = comm->c_coll.coll_allreduce_module;
OBJ_RETAIN(fca_module->previous_allreduce_module);
FCA_VERBOSE(14, "saving fca_module->previous_allreduce_module=%p, fca_module->previous_allreduce=%p, fca_module=%p,fca_module->super.coll_allreduce=%p",
fca_module->previous_allreduce_module, fca_module->previous_allreduce, fca_module, fca_module->super.coll_allreduce);
fca_module->previous_reduce = comm->c_coll.coll_reduce;
fca_module->previous_reduce_module = comm->c_coll.coll_reduce_module;
OBJ_RETAIN(fca_module->previous_reduce_module);
FCA_VERBOSE(14, "saving fca_module->previous_reduce_module=%p, fca_module->previous_reduce=%p, fca_module=%p,fca_module->super.coll_reduce=%p",
fca_module->previous_reduce_module, fca_module->previous_reduce, fca_module, fca_module->super.coll_reduce);
fca_module->previous_bcast = comm->c_coll.coll_bcast;
fca_module->previous_bcast_module = comm->c_coll.coll_bcast_module;
OBJ_RETAIN(fca_module->previous_bcast_module);
FCA_VERBOSE(14, "saving fca_module->bcast=%p, fca_module->bcast_module=%p, fca_module=%p, fca_module->super.coll_bcast=%p",
fca_module->previous_bcast, fca_module->previous_bcast_module, fca_module, fca_module->super.coll_bcast);
fca_module->previous_barrier = comm->c_coll.coll_barrier;
fca_module->previous_barrier_module = comm->c_coll.coll_barrier_module;
OBJ_RETAIN(fca_module->previous_barrier_module);
FCA_VERBOSE(14, "saving fca_module->barrier=%p, fca_module->barrier_module=%p, fca_module=%p, fca_module->super.coll_barrier=%p",
fca_module->previous_barrier, fca_module->previous_barrier_module, fca_module, fca_module->super.coll_barrier);
FCA_SAVE_PREV_COLL_API(barrier);
FCA_SAVE_PREV_COLL_API(bcast);
FCA_SAVE_PREV_COLL_API(reduce);
FCA_SAVE_PREV_COLL_API(allreduce);
FCA_SAVE_PREV_COLL_API(allgather);
FCA_SAVE_PREV_COLL_API(allgatherv);
FCA_SAVE_PREV_COLL_API(gather);
FCA_SAVE_PREV_COLL_API(gatherv);
FCA_SAVE_PREV_COLL_API(alltoall);
FCA_SAVE_PREV_COLL_API(alltoallv);
FCA_SAVE_PREV_COLL_API(alltoallw);
FCA_SAVE_PREV_COLL_API(reduce_scatter);
return OMPI_SUCCESS;
}
@ -323,12 +304,20 @@ static int mca_coll_fca_ft_event(int state)
static void mca_coll_fca_module_clear(mca_coll_fca_module_t *fca_module)
{
fca_module->num_local_procs = 0;
fca_module->local_ranks = NULL;
fca_module->fca_comm = NULL;
fca_module->previous_allreduce = NULL;
fca_module->previous_reduce = NULL;
fca_module->previous_bcast = NULL;
fca_module->previous_barrier = NULL;
fca_module->previous_barrier = NULL;
fca_module->previous_bcast = NULL;
fca_module->previous_reduce = NULL;
fca_module->previous_allreduce = NULL;
fca_module->previous_allgather = NULL;
fca_module->previous_allgatherv = NULL;
fca_module->previous_gather = NULL;
fca_module->previous_gatherv = NULL;
fca_module->previous_alltoall = NULL;
fca_module->previous_alltoallv = NULL;
fca_module->previous_alltoallw = NULL;
fca_module->previous_reduce_scatter = NULL;
}
static void mca_coll_fca_module_construct(mca_coll_fca_module_t *fca_module)
@ -341,16 +330,21 @@ static void mca_coll_fca_module_destruct(mca_coll_fca_module_t *fca_module)
{
FCA_VERBOSE(5, "==>");
int rc = OMPI_SUCCESS;
OBJ_RELEASE(fca_module->previous_allreduce_module);
OBJ_RELEASE(fca_module->previous_reduce_module);
OBJ_RELEASE(fca_module->previous_bcast_module);
OBJ_RELEASE(fca_module->previous_barrier_module);
OBJ_RELEASE(fca_module->previous_bcast_module);
OBJ_RELEASE(fca_module->previous_reduce_module);
OBJ_RELEASE(fca_module->previous_allreduce_module);
OBJ_RELEASE(fca_module->previous_allgather_module);
OBJ_RELEASE(fca_module->previous_allgatherv_module);
OBJ_RELEASE(fca_module->previous_gather_module);
OBJ_RELEASE(fca_module->previous_gatherv_module);
OBJ_RELEASE(fca_module->previous_alltoall_module);
OBJ_RELEASE(fca_module->previous_alltoallv_module);
OBJ_RELEASE(fca_module->previous_alltoallw_module);
OBJ_RELEASE(fca_module->previous_reduce_scatter_module);
if (fca_module->fca_comm)
__destroy_fca_comm(fca_module);
free(fca_module->local_ranks);
mca_coll_fca_module_clear(fca_module);
}
@ -385,19 +379,19 @@ mca_coll_fca_comm_query(struct ompi_communicator_t *comm, int *priority)
fca_module->super.coll_module_enable = mca_coll_fca_module_enable;
fca_module->super.ft_event = mca_coll_fca_ft_event;
fca_module->super.coll_allgather = NULL;
fca_module->super.coll_allgatherv = NULL;
fca_module->super.coll_allreduce = mca_coll_fca_allreduce;
fca_module->super.coll_alltoall = NULL;
fca_module->super.coll_alltoallv = NULL;
fca_module->super.coll_alltoallw = NULL;
fca_module->super.coll_barrier = mca_coll_fca_barrier;
fca_module->super.coll_bcast = mca_coll_fca_bcast;
fca_module->super.coll_allgather = mca_coll_fca_component.fca_enable_allgather? mca_coll_fca_allgather : NULL;
fca_module->super.coll_allgatherv = mca_coll_fca_component.fca_enable_allgatherv? mca_coll_fca_allgatherv : NULL;
fca_module->super.coll_allreduce = mca_coll_fca_component.fca_enable_allreduce? mca_coll_fca_allreduce : NULL;
fca_module->super.coll_alltoall = mca_coll_fca_component.fca_enable_alltoall? mca_coll_fca_alltoall : NULL;
fca_module->super.coll_alltoallv = mca_coll_fca_component.fca_enable_alltoallv? mca_coll_fca_alltoallv : NULL;
fca_module->super.coll_alltoallw = mca_coll_fca_component.fca_enable_alltoallw? mca_coll_fca_alltoallw : NULL;
fca_module->super.coll_barrier = mca_coll_fca_component.fca_enable_barrier? mca_coll_fca_barrier : NULL;
fca_module->super.coll_bcast = mca_coll_fca_component.fca_enable_bcast? mca_coll_fca_bcast : NULL;
fca_module->super.coll_exscan = NULL;
fca_module->super.coll_gather = NULL;
fca_module->super.coll_gatherv = NULL;
fca_module->super.coll_reduce = mca_coll_fca_reduce;
fca_module->super.coll_reduce_scatter = NULL;
fca_module->super.coll_gather = mca_coll_fca_component.fca_enable_gather? mca_coll_fca_gather : NULL;
fca_module->super.coll_gatherv = mca_coll_fca_component.fca_enable_gatherv? mca_coll_fca_gatherv : NULL;
fca_module->super.coll_reduce = mca_coll_fca_component.fca_enable_reduce? mca_coll_fca_reduce : NULL;
fca_module->super.coll_reduce_scatter = mca_coll_fca_component.fca_enable_reduce_scatter? mca_coll_fca_reduce_scatter : NULL;
fca_module->super.coll_scan = NULL;
fca_module->super.coll_scatter = NULL;
fca_module->super.coll_scatterv = NULL;

Просмотреть файл

@ -12,20 +12,6 @@
#include "coll_fca.h"
/**
* Returns the index of the rank 'ran' in the local ranks group, or -1 if not exists.
*/
static inline int __find_local_rank(mca_coll_fca_module_t *fca_module, int rank)
{
int i;
for (i = 0; i < fca_module->num_local_procs; ++i) {
if (rank == fca_module->local_ranks[i])
return i;
}
return -1;
}
static mca_coll_fca_dtype_info_t* mca_coll_fca_get_dtype(ompi_datatype_t *dtype)
{
mca_coll_fca_dtype_info_t *dtype_info;
@ -86,22 +72,18 @@ static mca_coll_fca_op_info_t *mca_coll_fca_get_op(ompi_op_t *op)
return NULL;
}
static int mca_coll_fca_get_buf_size(ompi_datatype_t *dtype, int count)
static int mca_coll_fca_get_buf_size(ompi_datatype_t *dtype, int count,
int contiguous_count)
{
ptrdiff_t true_lb, true_extent;
FCA_DT_GET_TRUE_EXTENT(dtype, &true_lb, &true_extent);
/* If the datatype is the same packed as it is unpacked, we
can save a memory copy and just do the reduction operation
directly. However, if the representation is not the same, then we need to get a
receive convertor and a temporary buffer to receive into. */
if (!FCA_DT_IS_CONTIGUOUS_MEMORY_LAYOUT(dtype, count)) {
/* Check that the type in contiguous */
if (!FCA_DT_IS_CONTIGUOUS_MEMORY_LAYOUT(dtype, contiguous_count)) {
FCA_VERBOSE(5, "Unsupported datatype layout, only contiguous is supported now");
return OMPI_ERROR;
}
/* TODO add support for non-contiguous layout */
FCA_DT_GET_TRUE_EXTENT(dtype, &true_lb, &true_extent);
return true_extent * count;
}
@ -182,7 +164,7 @@ int mca_coll_fca_bcast(void *buff, int count, struct ompi_datatype_t *datatype,
FCA_VERBOSE(5,"[%d] Calling mca_coll_fca_bcast, root=%d, count=%d",
ompi_comm_rank(comm), root, count);
spec.size = mca_coll_fca_get_buf_size(datatype, count);
spec.size = mca_coll_fca_get_buf_size(datatype, count, count);
if (spec.size < 0 || spec.size > fca_module->fca_comm_caps.max_payload) {
FCA_VERBOSE(5, "Unsupported bcast operation, dtype=%s[%d] using fallback\n",
datatype->name, count);
@ -191,8 +173,8 @@ int mca_coll_fca_bcast(void *buff, int count, struct ompi_datatype_t *datatype,
}
FCA_VERBOSE(5,"Using FCA Bcast");
spec.buf = buff;
spec.root_indx = __find_local_rank(fca_module, root);
spec.buf = buff;
spec.root = root;
ret = mca_coll_fca_component.fca_ops.do_bcast(fca_module->fca_comm, &spec);
if (ret < 0) {
FCA_ERROR("Bcast failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
@ -218,10 +200,11 @@ int mca_coll_fca_reduce(void *sbuf, void *rbuf, int count,
fca_reduce_spec_t spec;
int ret;
spec.is_root = fca_module->rank == root;
spec.sbuf = sbuf;
spec.rbuf = rbuf;
if (mca_coll_fca_fill_reduce_spec(count, dtype, op, &spec, fca_module->fca_comm_caps.max_payload)
spec.root = root;
spec.sbuf = sbuf;
spec.rbuf = rbuf;
if (mca_coll_fca_fill_reduce_spec(count, dtype, op, &spec,
fca_module->fca_comm_caps.max_payload)
!= OMPI_SUCCESS) {
FCA_VERBOSE(5, "Unsupported reduce operation %s, using fallback\n", op->o_name);
return fca_module->previous_reduce(sbuf, rbuf, count, dtype, op, root,
@ -253,9 +236,10 @@ int mca_coll_fca_allreduce(void *sbuf, void *rbuf, int count,
fca_reduce_spec_t spec;
int ret;
spec.sbuf = sbuf;
spec.rbuf = rbuf;
if (mca_coll_fca_fill_reduce_spec(count, dtype, op, &spec, fca_module->fca_comm_caps.max_payload)
spec.sbuf = sbuf;
spec.rbuf = rbuf;
if (mca_coll_fca_fill_reduce_spec(count, dtype, op, &spec,
fca_module->fca_comm_caps.max_payload)
!= OMPI_SUCCESS) {
FCA_VERBOSE(5, "Unsupported allreduce operation %s, using fallback\n", op->o_name);
return fca_module->previous_allreduce(sbuf, rbuf, count, dtype, op,
@ -271,3 +255,178 @@ int mca_coll_fca_allreduce(void *sbuf, void *rbuf, int count,
return OMPI_SUCCESS;
}
/*
* Allgather
*
* Function: - allgather
* Accepts: - same as MPI_Allgather()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_fca_allgather(void *sbuf, int scount, struct ompi_datatype_t *sdtype,
void *rbuf, int rcount, struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
fca_gather_spec_t spec = {0,};
int ret;
spec.sbuf = sbuf;
spec.rbuf = rbuf;
spec.size = mca_coll_fca_get_buf_size(sdtype, scount, scount);
if (spec.size < 0 || spec.size > fca_module->fca_comm_caps.max_payload) {
FCA_VERBOSE(5, "Unsupported allgather operation size %d, using fallback\n",
spec.size);
goto orig_allgather;
}
if (spec.size != mca_coll_fca_get_buf_size(rdtype, rcount, rcount)) {
FCA_VERBOSE(5, "Unsupported allgather: send_size != recv_size\n");
goto orig_allgather;
}
FCA_VERBOSE(5,"Using FCA Allgather");
ret = mca_coll_fca_component.fca_ops.do_allgather(fca_module->fca_comm, &spec);
if (ret < 0) {
FCA_ERROR("Allgather failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
orig_allgather:
return fca_module->previous_allgather(sbuf, scount, sdtype, rbuf, rcount, rdtype,
comm, fca_module->previous_allgather_module);
}
int mca_coll_fca_allgatherv(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int *rcounts, int *disps,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
fca_gatherv_spec_t spec;
int relemsize;
int comm_size;
int i, ret;
comm_size = ompi_comm_size(fca_module->comm);
spec.sbuf = sbuf;
spec.rbuf = rbuf;
spec.sendsize = mca_coll_fca_get_buf_size(sdtype, scount, scount);
if (spec.sendsize < 0 || spec.sendsize > fca_module->fca_comm_caps.max_payload) {
FCA_VERBOSE(5, "Unsupported allgatherv operation size %d, using fallback\n",
spec.sendsize);
goto orig_allgatherv;
}
spec.recvsizes = alloca(sizeof *spec.recvsizes * comm_size);
spec.displs = alloca(sizeof *spec.displs * comm_size);
/* convert MPI counts which depend on dtype) to FCA sizes (which are in bytes) */
relemsize = mca_coll_fca_get_buf_size(rdtype, 1, comm_size);
for (i = 0; i < comm_size; ++i) {
spec.recvsizes[i] *= relemsize;
spec.displs[i] *= relemsize;
}
FCA_VERBOSE(5,"Using FCA Allgatherv");
ret = mca_coll_fca_component.fca_ops.do_allgatherv(fca_module->fca_comm, &spec);
if (ret < 0) {
FCA_ERROR("Allgatherv failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
orig_allgatherv:
return fca_module->previous_allgatherv(sbuf, scount, sdtype, rbuf, rcounts,
disps, rdtype, comm,
fca_module->previous_allgatherv_module);
}
int mca_coll_fca_alltoall(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
/* not implemented yet */
return fca_module->previous_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype,
comm, fca_module->previous_alltoall_module);
}
int mca_coll_fca_alltoallv(void *sbuf, int *scounts, int *sdisps,
struct ompi_datatype_t *sdtype,
void *rbuf, int *rcounts, int *rdisps,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
/* not implemented yet */
return fca_module->previous_alltoallv(sbuf, scounts, sdisps, sdtype, rbuf, rcounts, rdisps, rdtype,
comm, fca_module->previous_alltoallv_module);
}
int mca_coll_fca_alltoallw(void *sbuf, int *scounts, int *sdisps,
struct ompi_datatype_t **sdtypes,
void *rbuf, int *rcounts, int *rdisps,
struct ompi_datatype_t **rdtypes,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
/* not implemented yet */
return fca_module->previous_alltoallw(sbuf, scounts, sdisps, sdtypes, rbuf, rcounts, rdisps, rdtypes,
comm, fca_module->previous_alltoallw_module);
}
int mca_coll_fca_gather(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
/* not implemented yet */
return fca_module->previous_gather(sbuf, scount, sdtype, rbuf, rcount, rdtype, root,
comm, fca_module->previous_gather_module);
}
int mca_coll_fca_gatherv(void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int *rcounts, int *disps,
struct ompi_datatype_t *rdtype, int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
/* not implemented yet */
return fca_module->previous_gatherv(sbuf, scount, sdtype, rbuf, rcounts, disps, rdtype, root,
comm, fca_module->previous_gatherv_module);
}
int mca_coll_fca_reduce_scatter(void *sbuf, void *rbuf, int *rcounts,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
/* not implemented yet */
return fca_module->previous_reduce_scatter(sbuf, rbuf, rcounts, dtype, op,
comm, fca_module->previous_reduce_scatter_module);
}

Просмотреть файл

@ -13,9 +13,7 @@
# MCA_coll_fca_CONFIG([action-if-can-compile],
# [action-if-cant-compile])
# ------------------------------------------------
AC_DEFUN([MCA_ompi_coll_fca_CONFIG],[
AC_CONFIG_FILES([ompi/mca/coll/fca/Makefile])
AC_DEFUN([MCA_coll_fca_CONFIG],[
OMPI_CHECK_FCA([coll_fca],
[coll_fca_happy="yes"],
[coll_fca_happy="no"])

14
ompi/mca/coll/fca/configure.params Обычный файл
Просмотреть файл

@ -0,0 +1,14 @@
# -*- shell-script -*-
#
#
# Copyright (c) 2010 Voltaire, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Specific to this module
PARAM_CONFIG_FILES=Makefile