1
1

initial release of Voltaire FCA (fabric collective accelerator) collective component

- compatible with FCA v1.2

This commit was SVN r23539.
Этот коммит содержится в:
Mike Dubman 2010-08-02 11:25:53 +00:00
родитель 3ef2be67b9
Коммит 7cbe9b43c2
10 изменённых файлов: 1308 добавлений и 0 удалений

65
config/ompi_check_libfca.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,65 @@
# -*- shell-script -*-
#
# Copyright (c) 2010 Voltaire Corporation. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# OMPI_CHECK_FCA(prefix, [action-if-found], [action-if-not-found])
# --------------------------------------------------------
# check if fca support can be found. sets prefix_{CPPFLAGS,
# LDFLAGS, LIBS} as needed and runs action-if-found if there is
# support, otherwise executes action-if-not-found
AC_DEFUN([OMPI_CHECK_FCA],[
AC_ARG_WITH([fca],
[AC_HELP_STRING([--with-fca(=DIR)],
[Build fca (Voltaire Fabric Collective Accelerator) support, searching for libraries in DIR])])
OMPI_CHECK_WITHDIR([fca], [$with_fca], [include/fca_core/fca_api.h])
AS_IF([test "$with_fca" != "no"],
[AS_IF([test ! -z "$with_fca" -a "$with_fca" != "yes"],
[ompi_check_fca_dir="$with_fca"
ompi_check_fca_libdir="$ompi_check_fca_dir/lib"
ompi_check_fca_incdir="$ompi_check_fca_dir/include"
ompi_check_fca_libs="fca"
CPPFLAGS_save="$CPPFLAGS"
LDFLAGS_save="$LDFLAGS"
LIBS_save="$LIBS"
OMPI_LOG_MSG([$1_CPPFLAGS : $$1_CPPFLAGS], 1)
OMPI_LOG_MSG([$1_LDFLAGS : $$1_LDFLAGS], 1)
OMPI_LOG_MSG([$1_LIBS : $$1_LIBS], 1)
OMPI_CHECK_PACKAGE([$1],
[fca_core/fca_api.h],
[$ompi_check_fca_libs],
[fca_get_version],
[],
[$ompi_check_fca_dir],
[$ompi_check_fca_libdir],
[ompi_check_fca_happy="yes"],
[ompi_check_fca_happy="no"])],
[ompi_check_fca_happy="no"])
CPPFLAGS="$CPPFLAGS_save"
LDFLAGS="$LDFLAGS_save"
LIBS="$LIBS_save"
])
AS_IF([test "$ompi_check_fca_happy" = "yes" -a "$enable_progress_threads" = "yes"],
[AC_MSG_WARN([fca driver does not currently support progress threads. Disabling FCA.])
ompi_check_fca_happy="no"])
AS_IF([test "$ompi_check_fca_happy" = "yes"],
[$2],
[AS_IF([test ! -z "$with_fca" -a "$with_fca" != "no"],
[AC_MSG_ERROR([FCA support requested but not found. Aborting])])
$3])
])

12
ompi/mca/coll/fca/.windows Обычный файл
Просмотреть файл

@ -0,0 +1,12 @@
#
# Copyright (c) 2008-2009 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Specific to this module
mca_dependencies=libmpi libopen-rte

45
ompi/mca/coll/fca/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,45 @@
# -*- shell-script -*-
#
#
# Copyright (c) 2010 Voltaire, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
#
EXTRA_DIST = .windows
AM_CPPFLAGS = $(coll_fca_CPPFLAGS) -DCOLL_FCA_HOME=\"$(coll_fca_HOME)\"
#dist_pkgdata_DATA = help-coll-fca.txt
coll_fca_sources = \
coll_fca.h \
coll_fca_debug.h \
coll_fca_module.c \
coll_fca_component.c \
coll_fca_ops.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if OMPI_BUILD_coll_fca_DSO
component_noinst =
component_install = mca_coll_fca.la
else
component_noinst = libmca_coll_fca.la
component_install =
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_coll_fca_la_SOURCES = $(coll_fca_sources)
mca_coll_fca_la_LIBADD =
mca_coll_fca_la_LDFLAGS = -module -avoid-version $(coll_fca_LDFLAGS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_coll_fca_la_SOURCES =$(coll_fca_sources)
libmca_coll_fca_la_LIBADD =
libmca_coll_fca_la_LDFLAGS = -module -avoid-version $(coll_fca_LDFLAGS)

208
ompi/mca/coll/fca/coll_fca.h Обычный файл
Просмотреть файл

@ -0,0 +1,208 @@
/**
Copyright (c) 2010 Voltaire, Inc. All rights reserved.
$COPYRIGHT$
Additional copyrights may follow
$HEADER$
*/
#ifndef MCA_COLL_FCA_H
#define MCA_COLL_FCA_H
#include <fca_core/fca_api.h>
#ifdef FCA_PROF
#include <fca_core/fca_prof.h>
#endif
#include "ompi_config.h"
#include "mpi.h"
#include "opal/mca/mca.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/request/request.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/communicator/communicator.h"
#include "ompi/op/op.h"
#include "coll_fca_debug.h"
#ifdef OMPI_DATATYPE_MAX_PREDEFINED
#define FCA_DT_MAX_PREDEFINED OMPI_DATATYPE_MAX_PREDEFINED
#else
#define FCA_DT_MAX_PREDEFINED DT_MAX_PREDEFINED
#endif
BEGIN_C_DECLS
/*
* FCA library functions.
* Used to load the library dynamically.
*/
struct mca_coll_fca_fca_ops_t {
/* Initialization / cleanup */
int (*init)(fca_init_spec_t *spec, fca_t **context);
void (*cleanup)(fca_t *context);
/* Fabric communicator creation */
int (*comm_new)(fca_t *context, fca_comm_new_spec_t *spec, fca_comm_desc_t *comm_desc);
int (*comm_end)(fca_t *context, int comm_id);
void* (*get_rank_info)(fca_t *context, int *size);
void (*free_rank_info)(void *rank_info);
/* Local communicator creation */
int (*comm_init)(fca_t *context, int proc_idx, int num_procs, int comm_size,
fca_comm_desc_t *comm_desc, fca_comm_t** fca_comm);
void (*comm_destroy)(fca_comm_t *comm);
int (*comm_get_caps)(fca_comm_t *comm, fca_comm_caps_t *caps);
/* Collectives supported by FCA */
int (*do_reduce)(fca_comm_t *comm, fca_reduce_spec_t *spec);
int (*do_all_reduce)(fca_comm_t *comm, fca_reduce_spec_t *spec);
int (*do_bcast)(fca_comm_t *comm, fca_bcast_spec_t *spec);
int (*do_barrier)(fca_comm_t *comm);
/* Helper functions */
int (*maddr_ib_pton)(const char *mlid_str, const char *mgid_str, fca_mcast_addr_t *dst);
int (*maddr_inet_pton)(int af, const char *src, fca_mcast_addr_t *dst);
fca_init_spec_t *(*parse_spec_file)(char* spec_ini_file);
void (*free_init_spec)(fca_init_spec_t *fca_init_spec);
int (*translate_mpi_op)(char *mpi_op);
int (*translate_mpi_dtype)(char *mpi_dtype);
int (*get_dtype_size)(int dtype);
const char* (*strerror)(int code);
};
typedef struct mca_coll_fca_fca_ops_t mca_coll_fca_fca_ops_t;
/**
* FCA data type information
*/
struct mca_coll_fca_dtype_info_t {
ompi_datatype_t *mpi_dtype;
size_t mpi_dtype_extent;
int fca_dtype; /* -1 if invalid */
size_t fca_dtype_extent;
};
typedef struct mca_coll_fca_dtype_info_t mca_coll_fca_dtype_info_t;
/**
* FCA operator information
*/
struct mca_coll_fca_op_info_t {
ompi_op_t *mpi_op;
int fca_op; /* -1 if invalid */
};
typedef struct mca_coll_fca_op_info_t mca_coll_fca_op_info_t;
#define FCA_MAX_OPS 32 /* Must be large enough to hold all FCA-supported operators */
/**
* Globally exported structure
*/
struct mca_coll_fca_component_t {
/** Base coll component */
mca_coll_base_component_2_0_0_t super;
/** MCA parameter: Priority of this component */
int fca_priority;
/** MCA parameter: Verbose level of this component */
int fca_verbose;
/** MCA parameter: Comm_mLid */
char *fca_comm_mlid;
/** MCA parameter: Comm_mGid */
char *fca_comm_mgid;
/** MCA parameter: FCA_Mlid */
char *fca_fmm_mlid;
/** MCA parameter: Path to fca spec file */
char* fca_spec_file;
/** MCA parameter: Path to libfca.so */
char* fca_lib_path;
/** MCA parameter: FCA device */
char* fca_dev;
/** MCA parameter: Enable FCA */
int fca_enable;
/** MCA parameter: FCA NP */
int fca_np;
/* FCA global stuff */
void *fca_lib_handle; /* FCA dynamic library */
mca_coll_fca_fca_ops_t fca_ops; /* FCA operations */
fca_t *fca_context; /* FCA context handle */
mca_coll_fca_dtype_info_t fca_dtypes[FCA_DT_MAX_PREDEFINED]; /* FCA dtype translation */
mca_coll_fca_op_info_t fca_reduce_ops[FCA_MAX_OPS]; /* FCA op translation */
};
typedef struct mca_coll_fca_component_t mca_coll_fca_component_t;
OMPI_MODULE_DECLSPEC extern mca_coll_fca_component_t mca_coll_fca_component;
/**
* FCA enabled communicator
*/
struct mca_coll_fca_module_t {
mca_coll_base_module_t super;
MPI_Comm comm;
int rank;
int local_proc_idx;
int num_local_procs;
int *local_ranks;
fca_comm_t *fca_comm;
fca_comm_desc_t fca_comm_desc;
fca_comm_caps_t fca_comm_caps;
/* Saved handlers - for fallback */
mca_coll_base_module_reduce_fn_t previous_reduce;
mca_coll_base_module_t *previous_reduce_module;
mca_coll_base_module_allreduce_fn_t previous_allreduce;
mca_coll_base_module_t *previous_allreduce_module;
mca_coll_base_module_bcast_fn_t previous_bcast;
mca_coll_base_module_t *previous_bcast_module;
mca_coll_base_module_barrier_fn_t previous_barrier;
mca_coll_base_module_t *previous_barrier_module;
};
typedef struct mca_coll_fca_module_t mca_coll_fca_module_t;
OBJ_CLASS_DECLARATION(mca_coll_fca_module_t);
/* API functions */
int mca_coll_fca_init_query(bool enable_progress_threads, bool enable_mpi_threads);
mca_coll_base_module_t *mca_coll_fca_comm_query(struct ompi_communicator_t *comm, int *priority);
/* Collective functions */
int mca_coll_fca_allreduce(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_bcast(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_reduce(void *sbuf, void* rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_fca_barrier(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
END_C_DECLS
#endif

214
ompi/mca/coll/fca/coll_fca_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,214 @@
/**
Copyright (c) 2010 Voltaire, Inc. All rights reserved.
$COPYRIGHT$
Additional copyrights may follow
$HEADER$
*/
#include <dlfcn.h>
#include <libgen.h>
#include "coll_fca.h"
/*
* Public string showing the coll ompi_fca component version number
*/
const char *mca_coll_fca_component_version_string =
"Open MPI FCA collective MCA component version " OMPI_VERSION;
/*
* Global variable
*/
int mca_coll_fca_output = -1;
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
static int fca_open(void);
//static int fca_query(void);
static int fca_close(void);
mca_coll_fca_component_t mca_coll_fca_component = {
/* First, the mca_component_t struct containing meta information
about the component itfca */
{
{
MCA_COLL_BASE_VERSION_2_0_0,
/* Component name and version */
"fca",
OMPI_MAJOR_VERSION,
OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION,
/* Component open and close functions */
fca_open,
fca_close
},
{
/* The component is not checkpoint ready */
MCA_BASE_METADATA_PARAM_NONE
},
/* Initialization / querying functions */
mca_coll_fca_init_query,
mca_coll_fca_comm_query,
}
};
static int fca_open(void)
{
FCA_VERBOSE(2, "==>");
const mca_base_component_t *c = &mca_coll_fca_component.super.collm_version;
mca_base_param_reg_int(c, "priority",
"Priority of the fca coll component",
false, false,
80,
&mca_coll_fca_component.fca_priority);
mca_base_param_reg_int(c, "verbose",
"Verbose level of the fca coll component",
false, false,
0,
&mca_coll_fca_component.fca_verbose);
mca_base_param_reg_int(c, "enable",
"[1|0|] Enable/Disable Fabric Collective Accelerator",
false, false,
1,
&mca_coll_fca_component.fca_enable);
mca_base_param_reg_string(c, "spec_file",
"Path to the FCA configuration file fca_mpi_spec.ini",
false, false,
""COLL_FCA_HOME"/etc/fca_mpi_spec.ini",
&mca_coll_fca_component.fca_spec_file);
mca_base_param_reg_string(c, "library_path",
"FCA /path/to/libfca.so",
false, false,
""COLL_FCA_HOME"/lib/libfca.so",
&mca_coll_fca_component.fca_lib_path);
mca_base_param_reg_int(c, "np",
"[integer] Minimal allowed job's NP to activate FCA",
false, false,
64,
&mca_coll_fca_component.fca_np);
mca_coll_fca_output = opal_output_open(NULL);
opal_output_set_verbosity(mca_coll_fca_output, mca_coll_fca_component.fca_verbose);
mca_coll_fca_component.fca_lib_handle = NULL;
mca_coll_fca_component.fca_context = NULL;
return OMPI_SUCCESS;
}
static int fca_close(void)
{
FCA_VERBOSE(2, "==>");
if (!mca_coll_fca_component.fca_lib_handle || !mca_coll_fca_component.fca_context)
return OMPI_SUCCESS;
mca_coll_fca_component.fca_ops.cleanup(mca_coll_fca_component.fca_context);
dlclose(mca_coll_fca_component.fca_lib_handle);
return OMPI_SUCCESS;
}
#define GET_FCA_SYM(__name) \
{ \
mca_coll_fca_component.fca_ops.__name = dlsym(mca_coll_fca_component.fca_lib_handle, "fca_" #__name);\
if (!mca_coll_fca_component.fca_ops.__name) { \
FCA_ERROR("Symbol %s not found", "fca_" #__name); \
} \
}
static void mca_coll_fca_progress_cb(void *arg)
{
opal_progress();
}
/**
* Initialize translation tables for FCA datatypes and operations
*/
static void mca_coll_fca_init_fca_translations(void)
{
int i, ret;
for (i = 0; i < FCA_DT_MAX_PREDEFINED; ++i) {
mca_coll_fca_component.fca_dtypes[i].mpi_dtype = MPI_DATATYPE_NULL;
mca_coll_fca_component.fca_dtypes[i].fca_dtype = -1;
mca_coll_fca_component.fca_dtypes[i].fca_dtype_extent = 0;
}
for (i = 0; i < FCA_MAX_OPS; ++i) {
mca_coll_fca_component.fca_reduce_ops[i].mpi_op = MPI_OP_NULL;
mca_coll_fca_component.fca_reduce_ops[i].fca_op = -1;
}
}
int mca_coll_fca_get_fca_lib(struct ompi_communicator_t *comm)
{
struct fca_init_spec *spec;
int ret;
if (mca_coll_fca_component.fca_lib_handle)
return OMPI_SUCCESS;
mca_coll_fca_component.fca_lib_handle = dlopen(mca_coll_fca_component.fca_lib_path, RTLD_LAZY);
if (!mca_coll_fca_component.fca_lib_handle) {
FCA_ERROR("Failed to load FCA from %s: %m", mca_coll_fca_component.fca_lib_path);
return OMPI_ERROR;
}
FCA_VERBOSE(1, "FCA Loaded from: %s", mca_coll_fca_component.fca_lib_path);
GET_FCA_SYM(init);
GET_FCA_SYM(cleanup);
GET_FCA_SYM(comm_new);
GET_FCA_SYM(comm_end);
GET_FCA_SYM(get_rank_info);
GET_FCA_SYM(free_rank_info);
GET_FCA_SYM(comm_init);
GET_FCA_SYM(comm_destroy);
GET_FCA_SYM(comm_get_caps);
GET_FCA_SYM(do_reduce);
GET_FCA_SYM(do_all_reduce);
GET_FCA_SYM(do_bcast);
GET_FCA_SYM(do_barrier);
GET_FCA_SYM(maddr_ib_pton);
GET_FCA_SYM(maddr_inet_pton);
GET_FCA_SYM(parse_spec_file);
GET_FCA_SYM(free_init_spec);
GET_FCA_SYM(translate_mpi_op);
GET_FCA_SYM(translate_mpi_dtype);
GET_FCA_SYM(get_dtype_size);
GET_FCA_SYM(strerror);
spec = mca_coll_fca_component.fca_ops.parse_spec_file(mca_coll_fca_component.fca_spec_file);
if (!spec) {
FCA_ERROR("Failed to parse FCA spec file `%s'", mca_coll_fca_component.fca_spec_file);
return OMPI_ERROR;
}
spec->job_id = ompi_proc_local()->proc_name.jobid;
spec->rank_id = ompi_comm_rank(MPI_COMM_WORLD);
spec->progress.func = mca_coll_fca_progress_cb;
spec->progress.arg = NULL;
ret = mca_coll_fca_component.fca_ops.init(spec, &mca_coll_fca_component.fca_context);
if (ret < 0) {
FCA_ERROR("Failed to initialize FCA: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
mca_coll_fca_component.fca_ops.free_init_spec(spec);
mca_coll_fca_init_fca_translations();
return OMPI_SUCCESS;
}

28
ompi/mca/coll/fca/coll_fca_debug.h Обычный файл
Просмотреть файл

@ -0,0 +1,28 @@
/**
Copyright (c) 2010 Voltaire, Inc. All rights reserved.
$COPYRIGHT$
Additional copyrights may follow
$HEADER$
*/
#ifndef MCA_COLL_FCA_DEBUG_H
#define MCA_COLL_FCA_DEBUG_H
#define FCA_VERBOSE(level, format, ...) \
opal_output_verbose(level, mca_coll_fca_output, "%s:%d - %s() " format, \
__BASE_FILE__, __LINE__, __FUNCTION__, ## __VA_ARGS__)
#define FCA_ERROR(format, ... ) \
opal_output_verbose(0, mca_coll_fca_output, "Error: %s:%d - %s() " format, \
__BASE_FILE__, __LINE__, __FUNCTION__, ## __VA_ARGS__)
#define FCA_MODULE_VERBOSE(fca_module, level, format, ...) \
FCA_VERBOSE(level, "[%p:%d] " format, (fca_module)->comm, (fca_module)->rank, ## __VA_ARGS__)
extern int mca_coll_fca_output;
#endif

413
ompi/mca/coll/fca/coll_fca_module.c Обычный файл
Просмотреть файл

@ -0,0 +1,413 @@
/**
Copyright (c) 2010 Voltaire, Inc. All rights reserved.
$COPYRIGHT$
Additional copyrights may follow
$HEADER$
*/
#include "coll_fca.h"
#include "opal/mca/paffinity/paffinity.h"
/*
* Initial query function that is invoked during MPI_INIT, allowing
* this module to indicate what level of thread support it provides.
*/
int mca_coll_fca_init_query(bool enable_progress_threads,
bool enable_mpi_threads)
{
return OMPI_SUCCESS;
}
static int have_remote_peers(ompi_group_t *group, size_t size, int *local_peers)
{
ompi_proc_t *proc;
size_t i;
int ret;
*local_peers = 0;
ret = 0;
for (i = 0; i < size; ++i) {
proc = ompi_group_peer_lookup(group, i);
if (OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags)) {
++*local_peers;
} else {
ret = 1;
}
}
return ret;
}
static inline ompi_proc_t* __local_rank_lookup(ompi_communicator_t *comm, int rank)
{
return ompi_group_peer_lookup(comm->c_local_group, rank);
}
/**
* Fills local rank information in fca_module.
*/
static int __get_local_ranks(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
ompi_proc_t* proc;
int rank, index;
/* Count local ranks */
fca_module->num_local_procs = 0;
for (rank = 0; rank < ompi_comm_size(comm); ++rank) {
proc = __local_rank_lookup(comm, rank);
if (OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags))
++fca_module->num_local_procs;
}
fca_module->local_ranks = calloc(fca_module->num_local_procs, sizeof *fca_module->local_ranks);
/* Get local ranks */
index = 0;
for (rank = 0; rank< ompi_comm_size(comm); ++rank) {
proc = __local_rank_lookup(comm, rank);
if (!OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags))
continue;
if (rank == fca_module->rank)
fca_module->local_proc_idx = index;
fca_module->local_ranks[index] = rank;
++index;
}
FCA_MODULE_VERBOSE(fca_module, 3, "num_local_ranks: %d, node_root: %d",
fca_module->num_local_procs, fca_module->local_ranks[0]);
return OMPI_SUCCESS;
}
int __fca_comm_new(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
fca_comm_new_spec_t spec;
int info_size, all_info_size;
void *all_info, *my_info;
int *rcounts, *disps;
int i, rc, ret;
/* call fca_get_rank_info() on node managers only*/
if (fca_module->local_proc_idx == 0) {
my_info = mca_coll_fca_component.fca_ops.get_rank_info(mca_coll_fca_component.fca_context,
&info_size);
if (!my_info) {
FCA_ERROR("fca_get_rank_info returned NULL");
return OMPI_ERROR;
}
} else {
info_size = 0;
}
FCA_MODULE_VERBOSE(fca_module, 1, "Info size: %d", info_size);
/* Get all rank info sizes using MPI_Gather */
if (fca_module->rank == 0)
rcounts = calloc(ompi_comm_size(comm), sizeof *rcounts);
rc = comm->c_coll.coll_gather(&info_size, 1, MPI_INT, rcounts, 1, MPI_INT, 0,
comm, comm->c_coll.coll_gather_module);
if (rc != OMPI_SUCCESS)
return rc;
/* Rank0 allocates buffers */
if (fca_module->rank == 0) {
FCA_MODULE_VERBOSE(fca_module, 1, "Total rank_info size: %d", all_info_size);
all_info_size = 0;
disps = calloc(ompi_comm_size(comm), sizeof *disps);
for (i = 0; i < ompi_comm_size(comm); ++i) {
disps[i] = all_info_size;
all_info_size += rcounts[i];
}
all_info = calloc(all_info_size, 1);
}
/* Send all node managers information to rank0 using MPI_Gatherv*/
rc = comm->c_coll.coll_gatherv(my_info, info_size, MPI_BYTE,
all_info, rcounts, disps, MPI_BYTE, 0,
comm, comm->c_coll.coll_gather_module);
if (rc != OMPI_SUCCESS) {
FCA_ERROR("Failed to gather rank information to rank0: %d", rc);
return rc;
}
/* Rank0 calls fca_comm_new() and fills fca_comm_spec filed */
if (fca_module->rank == 0) {
spec.rank_info = all_info;
spec.is_comm_world = comm == MPI_COMM_WORLD;
spec.rank_count = 0;
for (i = 0; i < ompi_comm_size(comm); ++i) {
FCA_MODULE_VERBOSE(fca_module, 1, "rcounts[%d]=%d disps[%d]=%d",
i, rcounts[i], i, disps[i]);
if (rcounts[i] > 0)
++spec.rank_count;
}
FCA_MODULE_VERBOSE(fca_module, 1, "starting fca_comm_new(), rank_count: %d",
spec.rank_count);
ret = mca_coll_fca_component.fca_ops.comm_new (mca_coll_fca_component.fca_context,
&spec,
&fca_module->fca_comm_desc);
if (ret < 0) {
FCA_ERROR("COMM_NEW failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
FCA_MODULE_VERBOSE(fca_module, 1, "rank 0: Received FCA communicator, comm_id %d",
fca_module->fca_comm_desc.comm_id);
free(disps);
free(rcounts);
free(all_info);
}
/* Release allocate rank_info on node managers */
if (fca_module->local_proc_idx == 0)
mca_coll_fca_component.fca_ops.free_rank_info(my_info);
/* Pass fca_comm_desc to all ranks using MPI_Bcast */
rc = fca_module->previous_bcast(&fca_module->fca_comm_desc,
sizeof(fca_module->fca_comm_desc), MPI_BYTE, 0,
comm, fca_module->previous_bcast_module);
if (rc != OMPI_SUCCESS) {
FCA_ERROR("Failed to broadcast comm_desc from rank0: %d", rc);
return rc;
}
FCA_MODULE_VERBOSE(fca_module, 1, "Received FCA communicator spec, comm_id %d",
fca_module->fca_comm_desc.comm_id);
return OMPI_SUCCESS;
}
static int __create_fca_comm(mca_coll_fca_module_t *fca_module)
{
fca_comm_desc_t comm_desc;
int rc, ret;
rc = __fca_comm_new(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
FCA_MODULE_VERBOSE(fca_module, 1, "Starting COMM_INIT comm_id %d proc_idx %d num_procs %d",
fca_module->fca_comm_desc.comm_id, fca_module->local_proc_idx,
fca_module->num_local_procs);
ret = mca_coll_fca_component.fca_ops.comm_init(mca_coll_fca_component.fca_context,
fca_module->local_proc_idx,
fca_module->num_local_procs,
ompi_comm_size(fca_module->comm),
&fca_module->fca_comm_desc,
&fca_module->fca_comm);
if (ret < 0) {
FCA_ERROR("COMM_INIT failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
/* get communicator capabilities */
ret = mca_coll_fca_component.fca_ops.comm_get_caps(fca_module->fca_comm,
&fca_module->fca_comm_caps);
if (ret < 0) {
FCA_ERROR("GET_COMM_CAPS failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
/* by this point every rank in the communicator is set up */
FCA_MODULE_VERBOSE(fca_module, 1, "Initialized FCA communicator, comm_id %d",
fca_module->fca_comm_desc.comm_id);
return OMPI_SUCCESS;
}
static void __destroy_fca_comm(mca_coll_fca_module_t *fca_module)
{
int ret;
mca_coll_fca_component.fca_ops.comm_destroy(fca_module->fca_comm);
if (fca_module->rank == 0) {
ret = mca_coll_fca_component.fca_ops.comm_end(mca_coll_fca_component.fca_context,
fca_module->fca_comm_desc.comm_id);
if (ret < 0) {
FCA_ERROR("COMM_END failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
}
}
FCA_MODULE_VERBOSE(fca_module, 1, "Destroyed FCA communicator, comm_id %d",
fca_module->fca_comm_desc.comm_id);
}
static int __save_coll_handlers(mca_coll_fca_module_t *fca_module)
{
ompi_communicator_t *comm = fca_module->comm;
if (!comm->c_coll.coll_reduce || !comm->c_coll.coll_reduce_module ||
!comm->c_coll.coll_allreduce || !comm->c_coll.coll_allreduce_module ||
!comm->c_coll.coll_bcast || !comm->c_coll.coll_bcast_module ||
!comm->c_coll.coll_barrier || !comm->c_coll.coll_barrier_module) {
FCA_VERBOSE(1, "(%d/%s): no underlying reduce; disqualifying myself",
comm->c_contextid, comm->c_name);
return OMPI_ERROR;
}
fca_module->previous_allreduce = comm->c_coll.coll_allreduce;
fca_module->previous_allreduce_module = comm->c_coll.coll_allreduce_module;
OBJ_RETAIN(fca_module->previous_allreduce_module);
FCA_VERBOSE(14, "saving fca_module->previous_allreduce_module=%p, fca_module->previous_allreduce=%p, fca_module=%p,fca_module->super.coll_allreduce=%p",
fca_module->previous_allreduce_module, fca_module->previous_allreduce, fca_module, fca_module->super.coll_allreduce);
fca_module->previous_reduce = comm->c_coll.coll_reduce;
fca_module->previous_reduce_module = comm->c_coll.coll_reduce_module;
OBJ_RETAIN(fca_module->previous_reduce_module);
FCA_VERBOSE(14, "saving fca_module->previous_reduce_module=%p, fca_module->previous_reduce=%p, fca_module=%p,fca_module->super.coll_reduce=%p",
fca_module->previous_reduce_module, fca_module->previous_reduce, fca_module, fca_module->super.coll_reduce);
fca_module->previous_bcast = comm->c_coll.coll_bcast;
fca_module->previous_bcast_module = comm->c_coll.coll_bcast_module;
OBJ_RETAIN(fca_module->previous_bcast_module);
FCA_VERBOSE(14, "saving fca_module->bcast=%p, fca_module->bcast_module=%p, fca_module=%p, fca_module->super.coll_bcast=%p",
fca_module->previous_bcast, fca_module->previous_bcast_module, fca_module, fca_module->super.coll_bcast);
fca_module->previous_barrier = comm->c_coll.coll_barrier;
fca_module->previous_barrier_module = comm->c_coll.coll_barrier_module;
OBJ_RETAIN(fca_module->previous_barrier_module);
FCA_VERBOSE(14, "saving fca_module->barrier=%p, fca_module->barrier_module=%p, fca_module=%p, fca_module->super.coll_barrier=%p",
fca_module->previous_barrier, fca_module->previous_barrier_module, fca_module, fca_module->super.coll_barrier);
return OMPI_SUCCESS;
}
/*
* Initialize module on the communicator
*/
static int mca_coll_fca_module_enable(mca_coll_base_module_t *module,
struct ompi_communicator_t *comm)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*) module;
int rc;
fca_module->comm = comm;
fca_module->rank = ompi_comm_rank(comm);
rc = mca_coll_fca_get_fca_lib(comm);
if (rc != OMPI_SUCCESS)
return rc;
rc = __save_coll_handlers(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
rc = __get_local_ranks(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
rc = __create_fca_comm(fca_module);
if (rc != OMPI_SUCCESS)
return rc;
FCA_MODULE_VERBOSE(fca_module, 1, "FCA Module initialized");
return OMPI_SUCCESS;
}
static int mca_coll_fca_ft_event(int state)
{
return OMPI_SUCCESS;
}
static void mca_coll_fca_module_clear(mca_coll_fca_module_t *fca_module)
{
fca_module->num_local_procs = 0;
fca_module->local_ranks = NULL;
fca_module->fca_comm = NULL;
fca_module->previous_allreduce = NULL;
fca_module->previous_reduce = NULL;
fca_module->previous_bcast = NULL;
fca_module->previous_barrier = NULL;
}
static void mca_coll_fca_module_construct(mca_coll_fca_module_t *fca_module)
{
FCA_VERBOSE(5, "==>");
mca_coll_fca_module_clear(fca_module);
}
static void mca_coll_fca_module_destruct(mca_coll_fca_module_t *fca_module)
{
FCA_VERBOSE(5, "==>");
int rc = OMPI_SUCCESS;
OBJ_RELEASE(fca_module->previous_allreduce_module);
OBJ_RELEASE(fca_module->previous_reduce_module);
OBJ_RELEASE(fca_module->previous_bcast_module);
OBJ_RELEASE(fca_module->previous_barrier_module);
if (fca_module->fca_comm)
__destroy_fca_comm(fca_module);
free(fca_module->local_ranks);
mca_coll_fca_module_clear(fca_module);
}
/*
* Invoked when there's a new communicator that has been created.
* Look at the communicator and decide which set of functions and
* priority we want to return.
*/
mca_coll_base_module_t *
mca_coll_fca_comm_query(struct ompi_communicator_t *comm, int *priority)
{
mca_coll_base_module_t *module;
int size = ompi_comm_size(comm);
int local_peers;
*priority = 0;
module = NULL;
if (!mca_coll_fca_component.fca_enable)
goto exit;
if (size < mca_coll_fca_component.fca_np)
goto exit;
if (!have_remote_peers(comm->c_local_group, size, &local_peers) || OMPI_COMM_IS_INTER(comm))
goto exit;
mca_coll_fca_module_t *fca_module = OBJ_NEW(mca_coll_fca_module_t);
if (!fca_module)
goto exit;
fca_module->super.coll_module_enable = mca_coll_fca_module_enable;
fca_module->super.ft_event = mca_coll_fca_ft_event;
fca_module->super.coll_allgather = NULL;
fca_module->super.coll_allgatherv = NULL;
fca_module->super.coll_allreduce = mca_coll_fca_allreduce;
fca_module->super.coll_alltoall = NULL;
fca_module->super.coll_alltoallv = NULL;
fca_module->super.coll_alltoallw = NULL;
fca_module->super.coll_barrier = mca_coll_fca_barrier;
fca_module->super.coll_bcast = mca_coll_fca_bcast;
fca_module->super.coll_exscan = NULL;
fca_module->super.coll_gather = NULL;
fca_module->super.coll_gatherv = NULL;
fca_module->super.coll_reduce = mca_coll_fca_reduce;
fca_module->super.coll_reduce_scatter = NULL;
fca_module->super.coll_scan = NULL;
fca_module->super.coll_scatter = NULL;
fca_module->super.coll_scatterv = NULL;
*priority = mca_coll_fca_component.fca_priority;
module = &fca_module->super;
exit:
FCA_VERBOSE(4, "Query FCA module for comm %p size %d rank %d local_peers=%d: priority=%d %s",
comm, size, ompi_comm_rank(comm), local_peers,
*priority, module ? "enabled" : "disabled");
return module;
}
OBJ_CLASS_INSTANCE(mca_coll_fca_module_t,
mca_coll_base_module_t,
mca_coll_fca_module_construct,
mca_coll_fca_module_destruct);

273
ompi/mca/coll/fca/coll_fca_ops.c Обычный файл
Просмотреть файл

@ -0,0 +1,273 @@
/**
Copyright (c) 2010 Voltaire, Inc. All rights reserved.
$COPYRIGHT$
Additional copyrights may follow
$HEADER$
*/
#include "ompi_config.h"
#include "ompi/constants.h"
#include "coll_fca.h"
/**
* Returns the index of the rank 'ran' in the local ranks group, or -1 if not exists.
*/
static inline int __find_local_rank(mca_coll_fca_module_t *fca_module, int rank)
{
int i;
for (i = 0; i < fca_module->num_local_procs; ++i) {
if (rank == fca_module->local_ranks[i])
return i;
}
return -1;
}
static mca_coll_fca_dtype_info_t* mca_coll_fca_get_dtype(ompi_datatype_t *dtype)
{
mca_coll_fca_dtype_info_t *dtype_info;
ptrdiff_t lb, extent;
int id = dtype->id;
int fca_dtype;
if (id < 0 || id >= FCA_DT_MAX_PREDEFINED)
return NULL;
dtype_info = &mca_coll_fca_component.fca_dtypes[id];
if (dtype_info->mpi_dtype == dtype)
return dtype_info;
/* assert we don't overwrite another datatype */
assert(dtype_info->mpi_dtype == MPI_DATATYPE_NULL);
fca_dtype = mca_coll_fca_component.fca_ops.translate_mpi_dtype(dtype->name);
if (fca_dtype < 0)
return NULL;
ompi_ddt_get_true_extent(dtype, &lb, &extent);
dtype_info->mpi_dtype = dtype;
dtype_info->mpi_dtype_extent = extent;
dtype_info->fca_dtype = fca_dtype;
dtype_info->fca_dtype_extent = mca_coll_fca_component.fca_ops.get_dtype_size(fca_dtype);
FCA_VERBOSE(2, "Added new dtype[%d]: %s fca id: %d, mpi sizet: %d, fca size: %d",
id, dtype->name, dtype_info->fca_dtype, dtype_info->mpi_dtype_extent,
dtype_info->fca_dtype_extent);
return dtype_info;
}
static mca_coll_fca_op_info_t *mca_coll_fca_get_op(ompi_op_t *op)
{
mca_coll_fca_op_info_t *op_info;
int i, fca_op;
/*
* Find 'op' in the array by exhaustive search. We assume all valid ops are
* in the beginning. If we stumble on a MPI_OP_NULL, we try to resolve the FCA
* operation code and store it in the array.
*/
op_info = mca_coll_fca_component.fca_reduce_ops;
for (i = 0; i < FCA_MAX_OPS; ++i, ++op_info) {
if (op_info->mpi_op == op) {
return op_info;
} else if (op_info->mpi_op == MPI_OP_NULL) {
fca_op = mca_coll_fca_component.fca_ops.translate_mpi_op(op->o_name);
if (fca_op < 0)
return NULL;
op_info->mpi_op = op;
op_info->fca_op = fca_op;
FCA_VERBOSE(2, "Added new op[%d]: %s fca id: %d", i, op->o_name, fca_op);
return op_info;
}
}
/* assert the array does not overflow */
//assert(mca_coll_fca_component.fca_reduce_ops[FCA_MAX_OPS - 1] == MPI_OP_NULL);
return NULL;
}
static int mca_coll_fca_get_buf_size(ompi_datatype_t *dtype, int count)
{
ptrdiff_t true_lb, true_extent;
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
/* If the datatype is the same packed as it is unpacked, we
can save a memory copy and just do the reduction operation
directly. However, if the representation is not the same, then we need to get a
receive convertor and a temporary buffer to receive into. */
if (!ompi_ddt_is_contiguous_memory_layout(dtype, count)) {
FCA_VERBOSE(5, "Unsupported datatype layout, only contiguous is supported now");
return OMPI_ERROR;
}
// TODO add support for non-contiguous layout
return true_extent * count;
}
static int mca_coll_fca_fill_reduce_spec(int count, ompi_datatype_t *dtype,
ompi_op_t *op, fca_reduce_spec_t *spec,
int max_fca_payload)
{
mca_coll_fca_dtype_info_t *dtype_info;
mca_coll_fca_op_info_t *op_info;
/* Check dtype */
dtype_info = mca_coll_fca_get_dtype(dtype);
if (!dtype_info) {
FCA_VERBOSE(10, "Unsupported dtype: %s", dtype->name);
return OMPI_ERROR;
}
/* Check FCA size */
if (dtype_info->fca_dtype_extent * count > max_fca_payload) {
FCA_VERBOSE(10, "Unsupported buffer size: %d", dtype_info->fca_dtype_extent * count);
return OMPI_ERROR;
}
/* Check operation */
op_info = mca_coll_fca_get_op(op);
if (!op) {
FCA_VERBOSE(10, "Unsupported op: %s", op->o_name);
return OMPI_ERROR;
}
/* Fill spec */
spec->dtype = dtype_info->fca_dtype;
spec->op = op_info->fca_op;
spec->length = count;
spec->buf_size = dtype_info->mpi_dtype_extent * count;
if (MPI_IN_PLACE == spec->sbuf) {
FCA_VERBOSE(10, "Using MPI_IN_PLACE for sbuf");
spec->sbuf = spec->rbuf;
} else if (MPI_IN_PLACE == spec->rbuf) {
FCA_VERBOSE(10, "Using MPI_IN_PLACE for rbuf");
spec->rbuf = spec->sbuf;
}
return OMPI_SUCCESS;
}
/*
* Function: - barrier
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_fca_barrier(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
int ret;
FCA_VERBOSE(5,"Using FCA Barrier");
ret = mca_coll_fca_component.fca_ops.do_barrier(fca_module->fca_comm);
if (ret < 0) {
FCA_ERROR("Barrier failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/*
* Function: - broadcast
* Accepts: - same arguments as MPI_Bcast()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_fca_bcast(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
fca_bcast_spec_t spec;
int ret;
FCA_VERBOSE(5,"[%d] Calling mca_coll_fca_bcast, root=%d, count=%d",
ompi_comm_rank(comm), root, count);
spec.size = mca_coll_fca_get_buf_size(datatype, count);
if (spec.size < 0 || spec.size > fca_module->fca_comm_caps.max_payload) {
FCA_VERBOSE(5, "Unsupported bcast operation, dtype=%s[%d] using fallback\n",
datatype->name, count);
return fca_module->previous_bcast(buff, count, datatype, root, comm,
fca_module->previous_bcast_module);
}
FCA_VERBOSE(5,"Using FCA Bcast");
spec.buf = buff;
spec.root_indx = __find_local_rank(fca_module, root);
ret = mca_coll_fca_component.fca_ops.do_bcast(fca_module->fca_comm, &spec);
if (ret < 0) {
FCA_ERROR("Bcast failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/*
* Reduce
*
* Function: - reduce
* Accepts: - same as MPI_Reduce()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_fca_reduce(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
fca_reduce_spec_t spec;
int ret;
spec.is_root = fca_module->rank == root;
spec.sbuf = sbuf;
spec.rbuf = rbuf;
if (mca_coll_fca_fill_reduce_spec(count, dtype, op, &spec, fca_module->fca_comm_caps.max_payload)
!= OMPI_SUCCESS) {
FCA_VERBOSE(5, "Unsupported reduce operation, using fallback\n");
return fca_module->previous_reduce(sbuf, rbuf, count, dtype, op, root,
comm, fca_module->previous_reduce_module);
}
FCA_VERBOSE(5,"Using FCA Reduce");
ret = mca_coll_fca_component.fca_ops.do_reduce(fca_module->fca_comm, &spec);
if (ret < 0) {
FCA_ERROR("Reduce failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/*
* Allreduce
*
* Function: - allreduce
* Accepts: - same as MPI_Allreduce()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_fca_allreduce(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_fca_module_t *fca_module = (mca_coll_fca_module_t*)module;
fca_reduce_spec_t spec;
int ret;
spec.sbuf = sbuf;
spec.rbuf = rbuf;
if (mca_coll_fca_fill_reduce_spec(count, dtype, op, &spec, fca_module->fca_comm_caps.max_payload)
!= OMPI_SUCCESS) {
FCA_VERBOSE(5, "Unsupported allreduce operation, using fallback\n");
return fca_module->previous_allreduce(sbuf, rbuf, count, dtype, op,
comm, fca_module->previous_allreduce_module);
}
FCA_VERBOSE(5,"Using FCA Allreduce");
ret = mca_coll_fca_component.fca_ops.do_all_reduce(fca_module->fca_comm, &spec);
if (ret < 0) {
FCA_ERROR("Allreduce failed: %s", mca_coll_fca_component.fca_ops.strerror(ret));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}

36
ompi/mca/coll/fca/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,36 @@
# -*- shell-script -*-
#
#
# Copyright (c) 2010 Voltaire, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# MCA_coll_fca_CONFIG([action-if-can-compile],
# [action-if-cant-compile])
# ------------------------------------------------
AC_DEFUN([MCA_coll_fca_CONFIG],[
OMPI_CHECK_FCA([coll_fca],
[coll_fca_happy="yes"],
[coll_fca_happy="no"])
AS_IF([test "$coll_fca_happy" = "yes"],
[coll_fca_WRAPPER_EXTRA_LDFLAGS="$coll_fca_LDFLAGS"
coll_fca_CPPFLAGS="$coll_fca_CPPFLAGS"
coll_fca_WRAPPER_EXTRA_CPPFLAGS="$coll_fca_CPPFLAGS"
coll_fca_WRAPPER_EXTRA_LIBS="$coll_fca_LIBS"
$1],
[$2])
# substitute in the things needed to build fca
AC_SUBST([coll_fca_CFLAGS])
AC_SUBST([coll_fca_CPPFLAGS])
AC_SUBST([coll_fca_LDFLAGS])
AC_SUBST([coll_fca_LIBS])
AC_SUBST(coll_fca_HOME, "$ompi_check_fca_dir")
])dnl

14
ompi/mca/coll/fca/configure.params Обычный файл
Просмотреть файл

@ -0,0 +1,14 @@
# -*- shell-script -*-
#
#
# Copyright (c) 2010 Voltaire, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Specific to this module
PARAM_CONFIG_FILES=Makefile