1
1
openmpi/ompi/mca/coll/han/coll_han_gather.c
George Bosilca 16b49dc5b3 A complete overhaul of the HAN code.
Among many other things:
- Fix an imbalance bug in MPI_allgather
- Accept more human readable configuration files. We can now specify
  the collective by name instead of a magic number, and the component
  we want to use also by name.
- Add the capability to have optional arguments in the collective
  communication configuration file. Right now the capability exists
  for segment lengths, but is yet to be connected with the algorithms.
- Redo the initialization of all HAN collectives.

Cleanup the fallback collective support.
- In case the module is unable to deliver the expected result, it will fallback
  executing the collective operation on another collective component. This change
  make the support for this fallback simpler to use.
- Implement a fallback allowing a HAN module to remove itself as
  potential active collective module, and instead fallback to the
  next module in line.
- Completely disable the HAN modules on error. From the moment an error is
  encountered they remove themselves from the communicator, and in case some
  other modules calls them simply behave as a pass-through.

Communicator: provide ompi_comm_split_with_info to split and provide info at the same time
Add ompi_comm_coll_preference info key to control collective component selection

COLL HAN: use info keys instead of component-level variable to communicate topology level between abstraction layers
- The info value is a comma-separated list of entries, which are chosen with
  decreasing priorities. This overrides the priority of the component,
  unless the component has disqualified itself.
  An entry prefixed with ^ starts the ignore-list. Any entry following this
  character will be ingnored during the collective component selection for the
  communicator.
  Example: "sm,libnbc,^han,adapt" gives sm the highest preference, followed
  by libnbc. The components han and adapt are ignored in the selection process.
- Allocate a temporary buffer for all lower-level leaders (length 2 segments)
- Fix the handling of MPI_IN_PLACE for gather and scatter.

COLL HAN: Fix topology handling
 - HAN should not rely on node names to determine the ordering of ranks.
   Instead, use the node leaders as identifiers and short-cut if the
   node-leaders agree that ranks are consecutive. Also, error out if
   the rank distribution is imbalanced for now.

Signed-off-by: Xi Luo <xluo12@vols.utk.edu>
Signed-off-by: Joseph Schuchart <schuchart@icl.utk.edu>
Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
2020-10-25 18:13:16 -04:00

512 строки
22 KiB
C

/*
* Copyright (c) 2018-2020 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2020 Bull S.A.S. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "coll_han.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/pml/pml.h"
#include "coll_han_trigger.h"
static int mca_coll_han_gather_lg_task(void *task_args);
static int mca_coll_han_gather_ug_task(void *task_args);
/* only work with regular situation (each node has equal number of processes) */
static inline void
mca_coll_han_set_gather_args(mca_coll_han_gather_args_t * args,
mca_coll_task_t * cur_task,
void *sbuf,
void *sbuf_inter_free,
int scount,
struct ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
struct ompi_datatype_t *rdtype,
int root,
int root_up_rank,
int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int w_rank, bool noop, bool is_mapbycore, ompi_request_t * req)
{
args->cur_task = cur_task;
args->sbuf = sbuf;
args->sbuf_inter_free = sbuf_inter_free;
args->scount = scount;
args->sdtype = sdtype;
args->rbuf = rbuf;
args->rcount = rcount;
args->rdtype = rdtype;
args->root = root;
args->root_up_rank = root_up_rank;
args->root_low_rank = root_low_rank;
args->up_comm = up_comm;
args->low_comm = low_comm;
args->w_rank = w_rank;
args->noop = noop;
args->is_mapbycore = is_mapbycore;
args->req = req;
}
int
mca_coll_han_gather_intra(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t * module)
{
mca_coll_han_module_t *han_module = (mca_coll_han_module_t *) module;
int w_rank, w_size; /* information about the global communicator */
int root_low_rank, root_up_rank; /* root ranks for both sub-communicators */
char *reorder_buf = NULL, *reorder_rbuf = NULL;
int i, err, *vranks, low_rank, low_size, *topo;
ompi_request_t *temp_request = NULL;
/* Create the subcommunicators */
err = mca_coll_han_comm_create(comm, han_module);
if( OMPI_SUCCESS != err ) { /* Let's hope the error is consistently returned across the entire communicator */
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle gather with this communicator. Fall back on another component\n"));
/* HAN cannot work with this communicator so fallback on all collectives */
HAN_LOAD_FALLBACK_COLLECTIVES(han_module, comm);
return comm->c_coll->coll_gather(sbuf, scount, sdtype, rbuf,
rcount, rdtype, root,
comm, comm->c_coll->coll_gather_module);
}
/* Topo must be initialized to know rank distribution which then is used to
* determine if han can be used */
topo = mca_coll_han_topo_init(comm, han_module, 2);
if (han_module->are_ppn_imbalanced) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle gather with this communicator (imbalance). Fall back on another component\n"));
/* Put back the fallback collective support and call it once. All
* future calls will then be automatically redirected.
*/
HAN_LOAD_FALLBACK_COLLECTIVE(han_module, comm, gather);
return comm->c_coll->coll_gather(sbuf, scount, sdtype, rbuf,
rcount, rdtype, root,
comm, comm->c_coll->coll_gather_module);
}
w_rank = ompi_comm_rank(comm);
w_size = ompi_comm_size(comm);
/* Set up request */
temp_request = OBJ_NEW(ompi_request_t);
temp_request->req_state = OMPI_REQUEST_ACTIVE;
temp_request->req_type = OMPI_REQUEST_COLL;
temp_request->req_free = han_request_free;
temp_request->req_status = (ompi_status_public_t){0};
temp_request->req_complete = REQUEST_PENDING;
/* create the subcommunicators */
ompi_communicator_t *low_comm =
han_module->cached_low_comms[mca_coll_han_component.han_gather_low_module];
ompi_communicator_t *up_comm =
han_module->cached_up_comms[mca_coll_han_component.han_gather_up_module];
/* Get the 'virtual ranks' mapping correspondong to the communicators */
vranks = han_module->cached_vranks;
/* information about sub-communicators */
low_rank = ompi_comm_rank(low_comm);
low_size = ompi_comm_size(low_comm);
/* Get root ranks for low and up comms */
mca_coll_han_get_ranks(vranks, root, low_size, &root_low_rank, &root_up_rank);
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"[%d]: Han Gather root %d root_low_rank %d root_up_rank %d\n",
w_rank, root, root_low_rank, root_up_rank));
/* Allocate reorder buffers */
if (w_rank == root) {
/* if the processes are mapped-by core, no need to reorder:
* distribution of ranks on core first and node next,
* in a increasing order for both patterns */
if (han_module->is_mapbycore) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"[%d]: Han Gather is_bycore: ", w_rank));
reorder_rbuf = (char *)rbuf;
} else {
/* Need a buffer to store unordered final result */
ptrdiff_t rsize, rgap;
rsize = opal_datatype_span(&rdtype->super,
(int64_t)rcount * w_size,
&rgap);
reorder_buf = (char *)malloc(rsize); //TODO:free
/* rgap is the size of unused space at the start of the datatype */
reorder_rbuf = reorder_buf - rgap;
if (MPI_IN_PLACE == sbuf) {
ptrdiff_t rextent;
ompi_datatype_type_extent(rdtype, &rextent);
ptrdiff_t block_size = rextent * (ptrdiff_t)rcount;
ptrdiff_t src_shift = block_size * w_rank;
ptrdiff_t dest_shift = block_size * w_rank;
ompi_datatype_copy_content_same_ddt(rdtype,
(ptrdiff_t)rcount,
(char *)rbuf + dest_shift,
reorder_rbuf + src_shift);
}
}
}
/* Create lg task */
mca_coll_task_t *lg = OBJ_NEW(mca_coll_task_t);
/* Setup lg task arguments */
mca_coll_han_gather_args_t *lg_args = malloc(sizeof(mca_coll_han_gather_args_t));
mca_coll_han_set_gather_args(lg_args, lg, (char *) sbuf, NULL, scount, sdtype, reorder_rbuf,
rcount, rdtype, root, root_up_rank, root_low_rank, up_comm,
low_comm, w_rank, low_rank != root_low_rank, han_module->is_mapbycore, temp_request);
/* Init lg task */
init_task(lg, mca_coll_han_gather_lg_task, (void *) (lg_args));
/* Issure lg task */
issue_task(lg);
ompi_request_wait(&temp_request, MPI_STATUS_IGNORE);
/* Suppose, the expected message is 0 1 2 3 4 5 6 7 but the processes are
* mapped on 2 nodes, for example |0 2 4 6| |1 3 5 7|. The messages from
* low gather will be 0 2 4 6 and 1 3 5 7.
* So the upper gather result is 0 2 4 6 1 3 5 7 which must be reordered.
* The 3rd element (4) must be recopied at the 4th place. In general, the
* i-th element must be recopied at the place given by the i-th entry of the
* topology, which is topo[i*topolevel +1]
*/
/* reorder rbuf based on rank */
if (w_rank == root && !han_module->is_mapbycore) {
ptrdiff_t rextent;
ompi_datatype_type_extent(rdtype, &rextent);
for (i=0; i<w_size; i++) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"[%d]: Han Gather copy from %d to %d\n",
w_rank,
i * 2 + 1,
topo[i * 2 + 1]));
ptrdiff_t block_size = rextent * (ptrdiff_t)rcount;
ptrdiff_t src_shift = block_size * i;
ptrdiff_t dest_shift = block_size * (ptrdiff_t)topo[i * 2 + 1];
ompi_datatype_copy_content_same_ddt(rdtype,
(ptrdiff_t)rcount,
reorder_rbuf + src_shift,
(char *)rbuf + dest_shift);
}
free(reorder_buf);
}
return OMPI_SUCCESS;
}
/* Perform a intra node gather and when it ends launch the inter node gather */
int mca_coll_han_gather_lg_task(void *task_args)
{
mca_coll_han_gather_args_t *t = (mca_coll_han_gather_args_t *) task_args;
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, "[%d] Han Gather: lg\n",
t->w_rank));
ompi_datatype_t *dtype;
size_t count;
if (t->w_rank == t->root) {
dtype = t->rdtype;
count = t->rcount;
} else {
dtype = t->sdtype;
count = t->scount;
}
/* If the process is one of the node leader */
char *tmp_buf = NULL;
char *tmp_rbuf = NULL;
if (!t->noop) {
/* if the process is one of the node leader, allocate the intermediary
* buffer to gather on the low sub communicator */
int low_size = ompi_comm_size(t->low_comm);
int low_rank = ompi_comm_rank(t->low_comm);
ptrdiff_t rsize, rgap = 0;
rsize = opal_datatype_span(&dtype->super,
count * low_size,
&rgap);
tmp_buf = (char *) malloc(rsize);
tmp_rbuf = tmp_buf - rgap;
if (t->w_rank == t->root) {
if (MPI_IN_PLACE == t->sbuf) {
ptrdiff_t rextent;
ompi_datatype_type_extent(dtype, &rextent);
ptrdiff_t block_size = rextent * (ptrdiff_t)count;
ptrdiff_t src_shift = block_size * t->w_rank;
ptrdiff_t dest_shift = block_size * low_rank;
ompi_datatype_copy_content_same_ddt(dtype,
(ptrdiff_t)count,
tmp_rbuf + dest_shift,
(char *)t->rbuf + src_shift);
}
}
}
/* Low level (usually intra-node or shared memory) node gather */
t->low_comm->c_coll->coll_gather((char *)t->sbuf,
count,
dtype,
tmp_rbuf,
count,
dtype,
t->root_low_rank,
t->low_comm,
t->low_comm->c_coll->coll_gather_module);
/* Prepare up comm gather */
t->sbuf = tmp_rbuf;
t->sbuf_inter_free = tmp_buf;
/* Create ug (upper level all-gather) task */
mca_coll_task_t *ug = t->cur_task;
/* Init ug task */
init_task(ug, mca_coll_han_gather_ug_task, (void *) t);
/* Issure ug task */
issue_task(ug);
return OMPI_SUCCESS;
}
/* ug: upper level (intra-node) gather task */
int mca_coll_han_gather_ug_task(void *task_args)
{
mca_coll_han_gather_args_t *t = (mca_coll_han_gather_args_t *) task_args;
OBJ_RELEASE(t->cur_task);
if (t->noop) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"[%d] Han Gather: ug noop\n", t->w_rank));
} else {
ompi_datatype_t *dtype;
size_t count;
if (t->w_rank == t->root) {
dtype = t->rdtype;
count = t->rcount;
} else {
dtype = t->sdtype;
count = t->scount;
}
int low_size = ompi_comm_size(t->low_comm);
/* inter node gather */
t->up_comm->c_coll->coll_gather((char *)t->sbuf,
count*low_size,
dtype,
(char *)t->rbuf,
count*low_size,
dtype,
t->root_up_rank,
t->up_comm,
t->up_comm->c_coll->coll_gather_module);
if (t->sbuf_inter_free != NULL) {
free(t->sbuf_inter_free);
t->sbuf_inter_free = NULL;
}
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"[%d] Han Gather: ug gather finish\n", t->w_rank));
}
ompi_request_t *temp_req = t->req;
free(t);
ompi_request_complete(temp_req, 1);
return OMPI_SUCCESS;
}
/* only work with regular situation (each node has equal number of processes) */
int
mca_coll_han_gather_intra_simple(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_han_module_t *han_module = (mca_coll_han_module_t *)module;
int *topo, w_rank = ompi_comm_rank(comm);
int w_size = ompi_comm_size(comm);
/* Create the subcommunicators */
if( OMPI_SUCCESS != mca_coll_han_comm_create_new(comm, han_module) ) { /* Let's hope the error is consistently returned across the entire communicator */
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle gather with this communicator. Fall back on another component\n"));
/* HAN cannot work with this communicator so fallback on all collectives */
HAN_LOAD_FALLBACK_COLLECTIVES(han_module, comm);
return comm->c_coll->coll_gather(sbuf, scount, sdtype, rbuf,
rcount, rdtype, root,
comm, comm->c_coll->coll_gather_module);
}
/* Topo must be initialized to know rank distribution which then is used to
* determine if han can be used */
topo = mca_coll_han_topo_init(comm, han_module, 2);
if (han_module->are_ppn_imbalanced){
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"han cannot handle gather with this communicator (imbalance). Fall back on another component\n"));
/* Put back the fallback collective support and call it once. All
* future calls will then be automatically redirected.
*/
HAN_LOAD_FALLBACK_COLLECTIVE(han_module, comm, gather);
return comm->c_coll->coll_gather(sbuf, scount, sdtype, rbuf,
rcount, rdtype, root,
comm, comm->c_coll->coll_gather_module);
}
ompi_communicator_t *low_comm = han_module->sub_comm[INTRA_NODE];
ompi_communicator_t *up_comm = han_module->sub_comm[INTER_NODE];
ompi_datatype_t *dtype;
size_t count;
if (w_rank == root) {
dtype = rdtype;
count = rcount;
} else {
dtype = sdtype;
count = scount;
}
/* Get the 'virtual ranks' mapping corresponding to the communicators */
int *vranks = han_module->cached_vranks;
/* information about sub-communicators */
int low_rank = ompi_comm_rank(low_comm);
int low_size = ompi_comm_size(low_comm);
/* Get root ranks for low and up comms */
int root_low_rank, root_up_rank; /* root ranks for both sub-communicators */
mca_coll_han_get_ranks(vranks, root, low_size, &root_low_rank, &root_up_rank);
/* allocate buffer to store unordered result on root
* if the processes are mapped-by core, no need to reorder:
* distribution of ranks on core first and node next,
* in a increasing order for both patterns */
char *reorder_buf = NULL; // allocated memory
char *reorder_buf_start = NULL; // start of the data
if (w_rank == root) {
if (han_module->is_mapbycore) {
reorder_buf_start = (char *)rbuf;
} else {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"[%d]: Future Gather needs reordering: ", w_rank));
ptrdiff_t rgap = 0;
ptrdiff_t rsize = opal_datatype_span(&rdtype->super,
(int64_t)rcount * w_size,
&rgap);
reorder_buf = (char *)malloc(rsize);
/* rgap is the size of unused space at the start of the datatype */
reorder_buf_start = reorder_buf - rgap;
}
}
/* allocate the intermediary buffer
* to gather on leaders on the low sub communicator */
char *tmp_buf = NULL; // allocated memory
char *tmp_buf_start = NULL; // start of the data
if (low_rank == root_low_rank) {
ptrdiff_t rsize, rgap = 0;
rsize = opal_datatype_span(&dtype->super,
count * low_size,
&rgap);
tmp_buf = (char *) malloc(rsize);
tmp_buf_start = tmp_buf - rgap;
}
/* 1. low gather on nodes leaders */
low_comm->c_coll->coll_gather((char *)sbuf,
count,
dtype,
tmp_buf_start,
count,
dtype,
root_low_rank,
low_comm,
low_comm->c_coll->coll_gather_module);
/* 2. upper gather (inter-node) between node leaders */
if (low_rank == root_low_rank) {
up_comm->c_coll->coll_gather((char *)tmp_buf_start,
count*low_size,
dtype,
(char *)reorder_buf_start,
count*low_size,
dtype,
root_up_rank,
up_comm,
up_comm->c_coll->coll_gather_module);
if (tmp_buf != NULL) {
free(tmp_buf);
tmp_buf = NULL;
tmp_buf_start = NULL;
}
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"[%d] Future Gather: ug gather finish\n", w_rank));
}
/* 3. reorder data on root into rbuf
* if ranks are not mapped in topological order, data needs to be reordered
* (see reorder_gather)
*/
if (w_rank == root && !han_module->is_mapbycore) {
ompi_coll_han_reorder_gather(reorder_buf_start,
rbuf, rcount, rdtype,
comm, topo);
free(reorder_buf);
}
return OMPI_SUCCESS;
}
/* Reorder after gather operation, for unordered ranks
*
* Suppose, the expected message is 0 1 2 3 4 5 6 7 but the processes are
* mapped on 2 nodes, for example |0 2 4 6| |1 3 5 7|. The messages from
* low gather will be 0 2 4 6 and 1 3 5 7.
* So the upper gather result is 0 2 4 6 1 3 5 7 which must be reordered.
* The 3rd element (4) must be recopied at the 4th place. In general, the
* i-th element must be recopied at the place given by the i-th entry of the
* topology, which is topo[i*topolevel +1]
*/
void
ompi_coll_han_reorder_gather(const void *sbuf,
void *rbuf, int rcount,
struct ompi_datatype_t *dtype,
struct ompi_communicator_t *comm,
int * topo)
{
int i, topolevel = 2; // always 2 levels in topo
int w_rank = ompi_comm_rank(comm);
int w_size = ompi_comm_size(comm);
ptrdiff_t rextent;
ompi_datatype_type_extent(dtype, &rextent);
for ( i = 0; i < w_size; i++ ) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output,
"[%d]: Future reorder from %d to %d\n",
w_rank,
i * topolevel + 1,
topo[i * topolevel + 1]));
ptrdiff_t block_size = rextent * (ptrdiff_t)rcount;
ptrdiff_t src_shift = block_size * i;
ptrdiff_t dest_shift = block_size * (ptrdiff_t)topo[i * topolevel + 1];
ompi_datatype_copy_content_same_ddt(dtype,
(ptrdiff_t)rcount,
(char *)rbuf + dest_shift,
(char *)sbuf + src_shift);
}
}