1
1

A complete overhaul of the HAN code.

Among many other things:
- Fix an imbalance bug in MPI_allgather
- Accept more human readable configuration files. We can now specify
  the collective by name instead of a magic number, and the component
  we want to use also by name.
- Add the capability to have optional arguments in the collective
  communication configuration file. Right now the capability exists
  for segment lengths, but is yet to be connected with the algorithms.
- Redo the initialization of all HAN collectives.

Cleanup the fallback collective support.
- In case the module is unable to deliver the expected result, it will fallback
  executing the collective operation on another collective component. This change
  make the support for this fallback simpler to use.
- Implement a fallback allowing a HAN module to remove itself as
  potential active collective module, and instead fallback to the
  next module in line.
- Completely disable the HAN modules on error. From the moment an error is
  encountered they remove themselves from the communicator, and in case some
  other modules calls them simply behave as a pass-through.

Communicator: provide ompi_comm_split_with_info to split and provide info at the same time
Add ompi_comm_coll_preference info key to control collective component selection

COLL HAN: use info keys instead of component-level variable to communicate topology level between abstraction layers
- The info value is a comma-separated list of entries, which are chosen with
  decreasing priorities. This overrides the priority of the component,
  unless the component has disqualified itself.
  An entry prefixed with ^ starts the ignore-list. Any entry following this
  character will be ingnored during the collective component selection for the
  communicator.
  Example: "sm,libnbc,^han,adapt" gives sm the highest preference, followed
  by libnbc. The components han and adapt are ignored in the selection process.
- Allocate a temporary buffer for all lower-level leaders (length 2 segments)
- Fix the handling of MPI_IN_PLACE for gather and scatter.

COLL HAN: Fix topology handling
 - HAN should not rely on node names to determine the ordering of ranks.
   Instead, use the node leaders as identifiers and short-cut if the
   node-leaders agree that ranks are consecutive. Also, error out if
   the rank distribution is imbalanced for now.

Signed-off-by: Xi Luo <xluo12@vols.utk.edu>
Signed-off-by: Joseph Schuchart <schuchart@icl.utk.edu>
Signed-off-by: George Bosilca <bosilca@icl.utk.edu>

Conflicts:
	ompi/mca/coll/adapt/coll_adapt_ibcast.c
This commit is contained in:
George Bosilca 2020-05-14 00:07:50 -04:00
parent 94c817ceff
commit 6d735ba052
31 changed files with 2668 additions and 2974 deletions

View File

@ -400,11 +400,10 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
/*
** Counterpart to MPI_Comm_split. To be used within OMPI (e.g. MPI_Cart_sub).
*/
int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
ompi_communicator_t **newcomm, bool pass_on_topo )
int ompi_comm_split_with_info( ompi_communicator_t* comm, int color, int key,
opal_info_t *info,
ompi_communicator_t **newcomm, bool pass_on_topo )
{
int myinfo[2];
int size, my_size;
@ -610,7 +609,11 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT FROM %d",
newcomp->c_contextid, comm->c_contextid );
/* Copy info if there is one */
if (info) {
newcomp->super.s_info = OBJ_NEW(opal_info_t);
opal_info_dup(info, &(newcomp->super.s_info));
}
/* Activate the communicator and init coll-component */
rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode);
@ -637,6 +640,15 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
}
/*
** Counterpart to MPI_Comm_split. To be used within OMPI (e.g. MPI_Cart_sub).
*/
int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
ompi_communicator_t **newcomm, bool pass_on_topo )
{
return ompi_comm_split_with_info(comm, color, key, NULL, newcomm, pass_on_topo);
}
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/

View File

@ -463,6 +463,21 @@ int ompi_topo_dist_graph_create_adjacent(ompi_communicator_t *old_comm,
OMPI_DECLSPEC int ompi_comm_split (ompi_communicator_t *comm, int color, int key,
ompi_communicator_t** newcomm, bool pass_on_topo);
/**
* split a communicator based on color and key. Parameters
* are identical to the MPI-counterpart of the function.
* Similar to \see ompi_comm_split with an additional info parameter.
*
* @param comm: input communicator
* @param color
* @param key
*
* @
*/
OMPI_DECLSPEC int ompi_comm_split_with_info( ompi_communicator_t* comm, int color, int key,
opal_info_t *info,
ompi_communicator_t **newcomm, bool pass_on_topo );
/**
* split a communicator based on type and key. Parameters
* are identical to the MPI-counterpart of the function.

View File

@ -578,3 +578,31 @@ bool ompi_group_have_remote_peers (ompi_group_t *group)
return false;
}
/**
* Count the number of processes on this group that share the same node as
* this process.
*/
int ompi_group_count_local_peers (ompi_group_t *group)
{
int local_peers = 0;
for (int i = 0 ; i < group->grp_proc_count ; ++i) {
ompi_proc_t *proc = NULL;
#if OMPI_GROUP_SPARSE
proc = ompi_group_peer_lookup (group, i);
#else
proc = ompi_group_get_proc_ptr_raw (group, i);
if (ompi_proc_is_sentinel (proc)) {
/* the proc must be stored in the group or cached in the proc
* hash table if the process resides in the local node
* (see ompi_proc_complete_init) */
continue;
}
#endif
if (OPAL_PROC_ON_LOCAL_NODE(proc->super.proc_flags)) {
local_peers++;
}
}
return local_peers;
}

View File

@ -419,8 +419,16 @@ static inline struct ompi_proc_t *ompi_group_peer_lookup_existing (ompi_group_t
return ompi_group_get_proc_ptr (group, peer_id, false);
}
/**
* Return true if all processes in the group are not on the local node.
*/
bool ompi_group_have_remote_peers (ompi_group_t *group);
/**
* Count the number of processes on the local node.
*/
int ompi_group_count_local_peers (ompi_group_t *group);
/**
* Function to print the group info
*/

View File

@ -178,7 +178,7 @@ static int send_cb(ompi_request_t * req)
|| (context->con->tree->tree_nextsize > 0 && rank != context->con->root
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs
&& num_recv_fini == context->con->num_segs)) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in send\n",
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Signal in send\n",
ompi_comm_rank(context->con->comm)));
ibcast_request_fini(context);
}
@ -304,7 +304,7 @@ static int recv_cb(ompi_request_t * req)
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs
&& num_recv_fini == context->con->num_segs) || (context->con->tree->tree_nextsize == 0
&& num_recv_fini == context->con->num_segs)) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in recv\n",
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Signal in recv\n",
ompi_comm_rank(context->con->comm)));
ibcast_request_fini(context);
}

View File

@ -38,6 +38,7 @@
#include "mpi.h"
#include "ompi/communicator/communicator.h"
#include "opal/util/output.h"
#include "opal/util/argv.h"
#include "opal/util/show_help.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_object.h"
@ -312,6 +313,20 @@ static int avail_coll_compare (opal_list_item_t **a,
return 0;
}
static inline int
component_in_argv(char **argv, const char* component_name)
{
if( NULL != argv ) {
while( NULL != *argv ) {
if( 0 == strcmp(component_name, *argv) ) {
return 1;
}
argv++; /* move to the next argument */
}
}
return 0;
}
/*
* For each module in the list, check and see if it wants to run, and
* do the resulting priority comparison. Make a list of modules to be
@ -321,13 +336,66 @@ static int avail_coll_compare (opal_list_item_t **a,
static opal_list_t *check_components(opal_list_t * components,
ompi_communicator_t * comm)
{
int priority;
int priority, flag;
const mca_base_component_t *component;
mca_base_component_list_item_t *cli;
mca_coll_base_module_2_3_0_t *module;
opal_list_t *selectable;
mca_coll_base_avail_coll_t *avail;
char info_val[OPAL_MAX_INFO_VAL+1];
char **coll_argv = NULL, **coll_exclude = NULL, **coll_include = NULL;
/* Check if this communicator comes with restrictions on the collective modules
* it wants to use. The restrictions are consistent with the MCA parameter
* to limit the collective components loaded, but it applies for each
* communicator and is provided as an info key during the communicator
* creation. Unlike the MCA param, this info key is used not to select
* components but either to prevent components from being used or to
* force a change in the component priority.
*/
if( NULL != comm->super.s_info) {
opal_info_get(comm->super.s_info, "ompi_comm_coll_preference",
sizeof(info_val), info_val, &flag);
if( !flag ) {
goto proceed_to_select;
}
coll_argv = opal_argv_split(info_val, ',');
if(NULL == coll_argv) {
goto proceed_to_select;
}
int idx2, count_include = opal_argv_count(coll_argv);
/* Allocate the coll_include argv */
coll_include = (char**)malloc((count_include + 1) * sizeof(char*));
coll_include[count_include] = NULL; /* NULL terminated array */
/* Dispatch the include/exclude in the corresponding arrays */
for( int idx = 0; NULL != coll_argv[idx]; idx++ ) {
if( '^' == coll_argv[idx][0] ) {
coll_include[idx] = NULL; /* NULL terminated array */
/* Allocate the coll_exclude argv */
coll_exclude = (char**)malloc((count_include - idx + 1) * sizeof(char*));
/* save the exclude components */
for( idx2 = idx; NULL != coll_argv[idx2]; idx2++ ) {
coll_exclude[idx2 - idx] = coll_argv[idx2];
}
coll_exclude[idx2 - idx] = NULL; /* NULL-terminated array */
coll_exclude[0] = coll_exclude[0] + 1; /* get rid of the ^ */
count_include = idx;
break;
}
coll_include[idx] = coll_argv[idx];
}
/* Reverse the order of the coll_inclide argv to faciliate the ordering of
* the selected components reverse.
*/
for( idx2 = 0; idx2 < (count_include - 1); idx2++ ) {
char* temp = coll_include[idx2];
coll_include[idx2] = coll_include[count_include - 1];
coll_include[count_include - 1] = temp;
count_include--;
}
}
proceed_to_select:
/* Make a list of the components that query successfully */
selectable = OBJ_NEW(opal_list_t);
@ -335,6 +403,13 @@ static opal_list_t *check_components(opal_list_t * components,
OPAL_LIST_FOREACH(cli, &ompi_coll_base_framework.framework_components, mca_base_component_list_item_t) {
component = cli->cli_component;
/* dont bother is we have this component in the exclusion list */
if( component_in_argv(coll_exclude, component->mca_component_name) ) {
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
"coll:base:comm_select: component disqualified: %s (due to communicator info key)",
component->mca_component_name );
continue;
}
priority = check_one_component(comm, component, &module);
if (priority >= 0) {
/* We have a component that indicated that it wants to run
@ -370,6 +445,27 @@ static opal_list_t *check_components(opal_list_t * components,
/* Put this list in priority order */
opal_list_sort(selectable, avail_coll_compare);
/* For all valid component reorder them not on their provided priorities but on
* the order requested in the info key. As at this point the coll_include is
* already ordered backward we can simply prepend the components.
*/
mca_coll_base_avail_coll_t *item, *item_next;
OPAL_LIST_FOREACH_SAFE(item, item_next,
selectable, mca_coll_base_avail_coll_t) {
if( component_in_argv(coll_include, item->ac_component_name) ) {
opal_list_remove_item(selectable, &item->super);
opal_list_prepend(selectable, &item->super);
}
}
opal_argv_free(coll_argv);
if( NULL != coll_exclude ) {
free(coll_exclude);
}
if( NULL != coll_include ) {
free(coll_include);
}
/* All done */
return selectable;
}
@ -403,7 +499,6 @@ static int check_one_component(ompi_communicator_t * comm,
return priority;
}
/**************************************************************************
* Query functions
**************************************************************************/

View File

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2016 The University of Tennessee and The University
* Copyright (c) 2004-2020 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -29,6 +29,8 @@
#include "ompi/mca/topo/base/base.h"
#include "ompi/mca/pml/pml.h"
#include "coll_base_util.h"
#include "coll_base_functions.h"
#include <ctype.h>
int ompi_coll_base_sendrecv_actual( const void* sendbuf, size_t scount,
ompi_datatype_t* sdatatype,
@ -268,7 +270,7 @@ int ompi_coll_base_retain_datatypes_w( ompi_request_t *req,
} else {
scount = rcount = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm);
}
for (int i=0; i<scount; i++) {
if (NULL != stypes && NULL != stypes[i] && !ompi_datatype_is_predefined(stypes[i])) {
OBJ_RETAIN(stypes[i]);
@ -297,7 +299,8 @@ int ompi_coll_base_retain_datatypes_w( ompi_request_t *req,
return OMPI_SUCCESS;
}
static void nbc_req_cons(ompi_coll_base_nbc_request_t *req) {
static void nbc_req_cons(ompi_coll_base_nbc_request_t *req)
{
req->cb.req_complete_cb = NULL;
req->req_complete_cb_data = NULL;
req->data.objs.objs[0] = NULL;
@ -309,35 +312,249 @@ OBJ_CLASS_INSTANCE(ompi_coll_base_nbc_request_t, ompi_request_t, nbc_req_cons, N
/* File reading functions */
static void skiptonewline (FILE *fptr, int *fileline)
{
do {
char val;
int rc;
char val;
int rc;
do {
rc = fread(&val, 1, 1, fptr);
if (0 == rc) return;
if ((1 == rc)&&('\n' == val)) {
if (0 == rc) {
return;
}
if ('\n' == val) {
(*fileline)++;
return;
}
}
} while (1);
}
long ompi_coll_base_file_getnext (FILE *fptr, int *fileline)
int ompi_coll_base_file_getnext_long(FILE *fptr, int *fileline, long* val)
{
do {
long val;
int rc;
char trash;
char trash;
int rc;
rc = fscanf(fptr, "%li", &val);
if (rc == EOF) return MYEOF;
if (1 == rc) return val;
/* in all other cases, skip to the end */
do {
rc = fscanf(fptr, "%li", val);
if (rc == EOF) {
return -1;
}
if (1 == rc) {
return 0;
}
/* in all other cases, skip to the end of the token */
rc = fread(&trash, sizeof(char), 1, fptr);
if (rc == EOF) return MYEOF;
if (rc == EOF) {
return -1;
}
if ('\n' == trash) (*fileline)++;
if ('#' == trash) {
skiptonewline (fptr, fileline);
}
}
} while (1);
}
int ompi_coll_base_file_getnext_string(FILE *fptr, int *fileline, char** val)
{
char trash, token[32];
int rc;
*val = NULL; /* security in case we fail */
do {
rc = fscanf(fptr, "%32s", token);
if (rc == EOF) {
return -1;
}
if (1 == rc) {
if( '#' == token[0] ) {
skiptonewline(fptr, fileline);
continue;
}
*val = (char*)malloc(strlen(token) + 1);
strcpy(*val, token);
return 0;
}
/* in all other cases, skip to the end of the token */
rc = fread(&trash, sizeof(char), 1, fptr);
if (rc == EOF) {
return -1;
}
if ('\n' == trash) (*fileline)++;
if ('#' == trash) {
skiptonewline (fptr, fileline);
}
} while (1);
}
int ompi_coll_base_file_getnext_size_t(FILE *fptr, int *fileline, size_t* val)
{
char trash;
int rc;
do {
rc = fscanf(fptr, "%" PRIsize_t, val);
if (rc == EOF) {
return -1;
}
if (1 == rc) {
return 0;
}
/* in all other cases, skip to the end of the token */
rc = fread(&trash, sizeof(char), 1, fptr);
if (rc == EOF) {
return -1;
}
if ('\n' == trash) (*fileline)++;
if ('#' == trash) {
skiptonewline (fptr, fileline);
}
} while (1);
}
int ompi_coll_base_file_peek_next_char_is(FILE *fptr, int *fileline, int expected)
{
char trash;
int rc;
do {
rc = fread(&trash, sizeof(char), 1, fptr);
if (0 == rc) { /* hit the end of the file */
return -1;
}
if ('\n' == trash) {
(*fileline)++;
continue;
}
if ('#' == trash) {
skiptonewline (fptr, fileline);
continue;
}
if( trash == expected )
return 1; /* return true and eat the char */
if( isblank(trash) ) /* skip all spaces if that's not what we were looking for */
continue;
if( 0 != fseek(fptr, -1, SEEK_CUR) )
return -1;
return 0;
} while (1);
}
/**
* There are certainly simpler implementation for this function when performance
* is not a critical point. But, as this function is used during the collective
* configuration, and we can do this configurations once for each communicator,
* I would rather have a more complex but faster implementation.
* The approach here is to search for the largest common denominators, to create
* something similar to a dichotomic search.
*/
int mca_coll_base_name_to_colltype(const char* name)
{
if( 'n' == name[0] ) {
if( 0 == strncmp(name, "neighbor_all", 12) ) {
if( 't' != name[12] ) {
if( 0 == strncmp(name+12, "gather", 6) ) {
if('\0' == name[18]) return NEIGHBOR_ALLGATHER;
if( 'v' == name[18]) return NEIGHBOR_ALLGATHERV;
}
} else {
if( 0 == strncmp(name+12, "toall", 5) ) {
if( '\0' == name[17] ) return NEIGHBOR_ALLTOALL;
if( 'v' == name[17] ) return NEIGHBOR_ALLTOALLV;
if( 'w' == name[17] ) return NEIGHBOR_ALLTOALLW;
}
}
}
return -1;
}
if( 'a' == name[0] ) {
if( 0 != strncmp(name, "all", 3) ) {
return -1;
}
if( 't' != name[3] ) {
if( 'r' == name[3] ) {
if( 0 == strcmp(name+3, "reduce") )
return ALLREDUCE;
} else {
if( 0 == strncmp(name+3, "gather", 6) ) {
if( '\0' == name[9] ) return ALLGATHER;
if( 'v' == name[9] ) return ALLGATHERV;
}
}
} else {
if( 0 == strncmp(name+3, "toall", 5) ) {
if( '\0' == name[8] ) return ALLTOALL;
if( 'v' == name[8] ) return ALLTOALLV;
if( 'w' == name[8] ) return ALLTOALLW;
}
}
return -1;
}
if( 'r' > name[0] ) {
if( 'b' == name[0] ) {
if( 0 == strcmp(name, "barrier") )
return BARRIER;
if( 0 == strcmp(name, "bcast") )
return BCAST;
} else if( 'g'== name[0] ) {
if( 0 == strncmp(name, "gather", 6) ) {
if( '\0' == name[6] ) return GATHER;
if( 'v' == name[6] ) return GATHERV;
}
}
if( 0 == strcmp(name, "exscan") )
return EXSCAN;
return -1;
}
if( 's' > name[0] ) {
if( 0 == strncmp(name, "reduce", 6) ) {
if( '\0' == name[6] ) return REDUCE;
if( '_' == name[6] ) {
if( 0 == strncmp(name+7, "scatter", 7) ) {
if( '\0' == name[14] ) return REDUCESCATTER;
if( 0 == strcmp(name+14, "_block") ) return REDUCESCATTERBLOCK;
}
}
}
return -1;
}
if( 0 == strcmp(name, "scan") )
return SCAN;
if( 0 == strcmp(name, "scatterv") )
return SCATTERV;
if( 0 == strcmp(name, "scatter") )
return SCATTER;
return -1;
}
/* conversion table for all COLLTYPE_T values defined in ompi/mca/coll/base/coll_base_functions.h */
static const char* colltype_translation_table[] = {
[ALLGATHER] = "allgather",
[ALLGATHERV] = "allgatherv",
[ALLREDUCE] = "allreduce",
[ALLTOALL] = "alltoall",
[ALLTOALLV] = "alltoallv",
[ALLTOALLW] = "alltoallw",
[BARRIER] = "barrier",
[BCAST] = "bcast",
[EXSCAN] = "exscan",
[GATHER] = "gather",
[GATHERV] = "gatherv",
[REDUCE] = "reduce",
[REDUCESCATTER] = "reduce_scatter",
[REDUCESCATTERBLOCK] = "reduce_scatter_block",
[SCAN] = "scan",
[SCATTER] = "scatter",
[SCATTERV] = "scatterv",
[NEIGHBOR_ALLGATHER] = "neighbor_allgather",
[NEIGHBOR_ALLGATHERV] = "neighbor_allgatherv",
[NEIGHBOR_ALLTOALL] = "neighbor_alltoall",
[NEIGHBOR_ALLTOALLV] = "neighbor_alltoallv",
[NEIGHBOR_ALLTOALLW] = "neighbor_alltoallw",
[COLLCOUNT] = NULL
};
char* mca_coll_base_colltype_to_str(int collid)
{
if( (collid < 0) || (collid >= COLLCOUNT) ) {
return NULL;
}
return strdup(colltype_translation_table[collid]);
}

View File

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2015 The University of Tennessee and The University
* Copyright (c) 2004-2020 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2007 High Performance Computing Center Stuttgart,
@ -178,8 +178,17 @@ int ompi_coll_base_retain_datatypes_w( ompi_request_t *request,
ompi_datatype_t *rtypes[]);
/* File reading function */
#define MYEOF -999
long ompi_coll_base_file_getnext(FILE *fptr, int *fileline);
int ompi_coll_base_file_getnext_long(FILE *fptr, int *fileline, long* val);
int ompi_coll_base_file_getnext_size_t(FILE *fptr, int *fileline, size_t* val);
int ompi_coll_base_file_getnext_string(FILE *fptr, int *fileline, char** val);
/* peek at the next valid token to see if it begins with the expected value. If yes
* eat the value, otherwise put it back into the file.
*/
int ompi_coll_base_file_peek_next_char_is(FILE *fptr, int *fileline, int expected);
/* Miscelaneous function */
char* mca_coll_base_colltype_to_str(int collid);
int mca_coll_base_name_to_colltype(const char* name);
END_C_DECLS
#endif /* MCA_COLL_BASE_UTIL_EXPORT_H */

View File

@ -26,8 +26,7 @@ coll_han_trigger.c \
coll_han_dynamic.c \
coll_han_dynamic_file.c \
coll_han_topo.c \
coll_han_subcomms.c \
coll_han_utils.c
coll_han_subcomms.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la

View File

@ -20,9 +20,7 @@
#include "opal/util/output.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "coll_han_trigger.h"
#include "ompi/mca/coll/han/coll_han_dynamic.h"
BEGIN_C_DECLS
#include "ompi/mca/coll/han/coll_han_dynamic.h"
/*
* Today;
@ -33,131 +31,125 @@ BEGIN_C_DECLS
#define COLL_HAN_LOW_MODULES 2
#define COLL_HAN_UP_MODULES 2
typedef struct {
uint32_t umod;
uint32_t lmod;
uint32_t fs;
uint32_t ualg;
uint32_t us;
} selection;
struct mca_bcast_argu_s {
struct mca_coll_han_bcast_args_s {
mca_coll_task_t *cur_task;
ompi_communicator_t *up_comm;
ompi_communicator_t *low_comm;
void *buff;
ompi_datatype_t *dtype;
int seg_count;
struct ompi_datatype_t *dtype;
int root_low_rank;
int root_up_rank;
struct ompi_communicator_t *up_comm;
struct ompi_communicator_t *low_comm;
int num_segments;
int cur_seg;
int w_rank;
int last_seg_count;
bool noop;
};
typedef struct mca_bcast_argu_s mca_bcast_argu_t;
typedef struct mca_coll_han_bcast_args_s mca_coll_han_bcast_args_t;
struct mca_reduce_argu_s {
struct mca_coll_han_reduce_args_s {
mca_coll_task_t *cur_task;
ompi_communicator_t *up_comm;
ompi_communicator_t *low_comm;
void *sbuf;
void *rbuf;
ompi_op_t *op;
ompi_datatype_t *dtype;
int seg_count;
struct ompi_datatype_t *dtype;
struct ompi_op_t *op;
int root_low_rank;
int root_up_rank;
struct ompi_communicator_t *up_comm;
struct ompi_communicator_t *low_comm;
int num_segments;
int cur_seg;
int w_rank;
int last_seg_count;
bool noop;
bool is_tmp_rbuf;
};
typedef struct mca_reduce_argu_s mca_reduce_argu_t;
typedef struct mca_coll_han_reduce_args_s mca_coll_han_reduce_args_t;
struct mca_allreduce_argu_s {
struct mca_coll_han_allreduce_args_s {
mca_coll_task_t *cur_task;
void *sbuf;
void *rbuf;
int seg_count;
struct ompi_datatype_t *dtype;
struct ompi_op_t *op;
int root_up_rank;
int root_low_rank;
struct ompi_communicator_t *up_comm;
struct ompi_communicator_t *low_comm;
int num_segments;
int cur_seg;
int w_rank;
int last_seg_count;
bool noop;
ompi_communicator_t *up_comm;
ompi_communicator_t *low_comm;
ompi_request_t *req;
void *sbuf;
void *rbuf;
ompi_op_t *op;
ompi_datatype_t *dtype;
int seg_count;
int root_up_rank;
int root_low_rank;
int num_segments;
int cur_seg;
int w_rank;
int last_seg_count;
bool noop;
int *completed;
};
typedef struct mca_allreduce_argu_s mca_allreduce_argu_t;
typedef struct mca_coll_han_allreduce_args_s mca_coll_han_allreduce_args_t;
struct mca_scatter_argu_s {
struct mca_coll_han_scatter_args_s {
mca_coll_task_t *cur_task;
ompi_communicator_t *up_comm;
ompi_communicator_t *low_comm;
ompi_request_t *req;
void *sbuf;
void *sbuf_inter_free;
void *sbuf_reorder_free;
int scount;
struct ompi_datatype_t *sdtype;
void *rbuf;
ompi_datatype_t *sdtype;
ompi_datatype_t *rdtype;
int scount;
int rcount;
struct ompi_datatype_t *rdtype;
int root;
int root_up_rank;
int root_low_rank;
struct ompi_communicator_t *up_comm;
struct ompi_communicator_t *low_comm;
int w_rank;
bool noop;
ompi_request_t *req;
};
typedef struct mca_scatter_argu_s mca_scatter_argu_t;
typedef struct mca_coll_han_scatter_args_s mca_coll_han_scatter_args_t;
struct mca_gather_argu_s {
struct mca_coll_han_gather_args_s {
mca_coll_task_t *cur_task;
ompi_communicator_t *up_comm;
ompi_communicator_t *low_comm;
ompi_request_t *req;
void *sbuf;
void *sbuf_inter_free;
int scount;
struct ompi_datatype_t *sdtype;
void *rbuf;
ompi_datatype_t *sdtype;
ompi_datatype_t *rdtype;
int scount;
int rcount;
struct ompi_datatype_t *rdtype;
int root;
int root_up_rank;
int root_low_rank;
struct ompi_communicator_t *up_comm;
struct ompi_communicator_t *low_comm;
int w_rank;
bool noop;
ompi_request_t *req;
bool is_mapbycore;
};
typedef struct mca_gather_argu_s mca_gather_argu_t;
typedef struct mca_coll_han_gather_args_s mca_coll_han_gather_args_t;
struct mca_allgather_argu_s {
struct mca_coll_han_allgather_s {
mca_coll_task_t *cur_task;
ompi_communicator_t *up_comm;
ompi_communicator_t *low_comm;
ompi_request_t *req;
void *sbuf;
void *sbuf_inter_free;
int scount;
struct ompi_datatype_t *sdtype;
void *rbuf;
ompi_datatype_t *sdtype;
ompi_datatype_t *rdtype;
int scount;
int rcount;
struct ompi_datatype_t *rdtype;
int root_low_rank;
struct ompi_communicator_t *up_comm;
struct ompi_communicator_t *low_comm;
int w_rank;
bool noop;
bool is_mapbycore;
int *topo;
ompi_request_t *req;
};
typedef struct mca_allgather_argu_s mca_allgather_argu_t;
typedef struct mca_coll_han_allgather_s mca_coll_han_allgather_t;
/**
* Structure to hold the han coll component. First it holds the
@ -184,7 +176,7 @@ typedef struct mca_coll_han_component_t {
/* up level module for reduce */
uint32_t han_reduce_up_module;
/* low level module for reduce */
uint32_t han_reduce_low_module;
uint32_t han_reduce_low_module;
/* segment size for allreduce */
uint32_t han_allreduce_segsize;
/* up level module for allreduce */
@ -203,21 +195,10 @@ typedef struct mca_coll_han_component_t {
uint32_t han_scatter_up_module;
/* low level module for scatter */
uint32_t han_scatter_low_module;
/* whether enable auto tune */
uint32_t han_auto_tune;
/* whether we need reproducible results
* (but disables topological optimisations)
*/
uint32_t han_reproducible;
/* create a 3D array
* num_processes (n): 2 4 8 16 32 64 (6)
* num_core (c): 2 4 8 12 (4)
* message size (m): 1 - 4194304 (23)
*/
uint32_t han_auto_tune_n;
uint32_t han_auto_tune_c;
uint32_t han_auto_tune_m;
selection *han_auto_tuned;
bool use_simple_algorithm[COLLCOUNT];
/* Dynamic configuration rules */
@ -228,7 +209,6 @@ typedef struct mca_coll_han_component_t {
mca_coll_han_dynamic_rules_t dynamic_rules;
/* Dynamic rules from mca parameter */
COMPONENT_T mca_rules[COLLCOUNT][NB_TOPO_LVL];
int topo_level;
/* Define maximum dynamic errors printed by rank 0 with a 0 verbosity level */
int max_dynamic_errors;
@ -240,7 +220,7 @@ typedef void (*previous_dummy_fn_t) (void);
* Structure used to store what is necessary for the collective operations
* routines in case of fallback.
*/
typedef struct collective_fallback_t {
typedef struct mca_coll_han_single_collective_fallback_s {
union {
mca_coll_base_module_allgather_fn_t allgather;
mca_coll_base_module_allgatherv_fn_t allgatherv;
@ -250,9 +230,24 @@ typedef struct collective_fallback_t {
mca_coll_base_module_reduce_fn_t reduce;
mca_coll_base_module_scatter_fn_t scatter;
previous_dummy_fn_t dummy;
} previous_routine;
mca_coll_base_module_t *previous_module;
} collective_fallback_t;
};
mca_coll_base_module_t* module;
} mca_coll_han_single_collective_fallback_t;
/*
* The structure containing a replacement for all collective supported
* by HAN. This structure is used as a fallback during subcommunicator
* creation.
*/
typedef struct mca_coll_han_collectives_fallback_s {
mca_coll_han_single_collective_fallback_t allgather;
mca_coll_han_single_collective_fallback_t allgatherv;
mca_coll_han_single_collective_fallback_t allreduce;
mca_coll_han_single_collective_fallback_t bcast;
mca_coll_han_single_collective_fallback_t reduce;
mca_coll_han_single_collective_fallback_t gather;
mca_coll_han_single_collective_fallback_t scatter;
} mca_coll_han_collectives_fallback_t;
/** Coll han module */
typedef struct mca_coll_han_module_t {
@ -262,7 +257,6 @@ typedef struct mca_coll_han_module_t {
/* Whether this module has been lazily initialized or not yet */
bool enabled;
struct ompi_communicator_t *cached_comm;
struct ompi_communicator_t **cached_low_comms;
struct ompi_communicator_t **cached_up_comms;
int *cached_vranks;
@ -271,7 +265,7 @@ typedef struct mca_coll_han_module_t {
bool are_ppn_imbalanced;
/* To be able to fallback when the cases are not supported */
struct collective_fallback_t previous_routines[COLLCOUNT];
struct mca_coll_han_collectives_fallback_s fallback;
/* To be able to fallback on reproducible algorithm */
mca_coll_base_module_reduce_fn_t reproducible_reduce;
@ -280,7 +274,7 @@ typedef struct mca_coll_han_module_t {
mca_coll_base_module_t *reproducible_allreduce_module;
/* Topological level of this communicator */
int topologic_level;
TOPO_LVL_T topologic_level;
/* Collective module storage for module choice */
mca_coll_han_collective_modules_storage_t modules_storage;
@ -302,21 +296,53 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t);
* Some defines to stick to the naming used in the other components in terms of
* fallback routines
*/
#define previous_allgather previous_routines[ALLGATHER].previous_routine.allgather
#define previous_allgatherv previous_routines[ALLGATHERV].previous_routine.allgatherv
#define previous_allreduce previous_routines[ALLREDUCE].previous_routine.allreduce
#define previous_bcast previous_routines[BCAST].previous_routine.bcast
#define previous_gather previous_routines[GATHER].previous_routine.gather
#define previous_reduce previous_routines[REDUCE].previous_routine.reduce
#define previous_scatter previous_routines[SCATTER].previous_routine.scatter
#define previous_allgather fallback.allgather.allgather
#define previous_allgather_module fallback.allgather.module
#define previous_allgatherv fallback.allgatherv.allgatherv
#define previous_allgatherv_module fallback.allgatherv.module
#define previous_allreduce fallback.allreduce.allreduce
#define previous_allreduce_module fallback.allreduce.module
#define previous_bcast fallback.bcast.bcast
#define previous_bcast_module fallback.bcast.module
#define previous_reduce fallback.reduce.reduce
#define previous_reduce_module fallback.reduce.module
#define previous_gather fallback.gather.gather
#define previous_gather_module fallback.gather.module
#define previous_scatter fallback.scatter.scatter
#define previous_scatter_module fallback.scatter.module
/* macro to correctly load a fallback collective module */
#define HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, COLL) \
do { \
if ( ((COMM)->c_coll->coll_ ## COLL ## _module) == (mca_coll_base_module_t*)(HANM) ) { \
(COMM)->c_coll->coll_ ## COLL = (HANM)->fallback.COLL.COLL; \
mca_coll_base_module_t *coll_module = (COMM)->c_coll->coll_ ## COLL ## _module; \
(COMM)->c_coll->coll_ ## COLL ## _module = (HANM)->fallback.COLL.module; \
OBJ_RETAIN((COMM)->c_coll->coll_ ## COLL ## _module); \
OBJ_RELEASE(coll_module); \
} \
} while(0)
/* macro to correctly load /all/ fallback collectives */
#define HAN_LOAD_FALLBACK_COLLECTIVES(HANM, COMM) \
do { \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, bcast); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, scatter); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, gather); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, reduce); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, allreduce); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, allgather); \
HAN_LOAD_FALLBACK_COLLECTIVE(HANM, COMM, allgatherv); \
han_module->enabled = false; /* entire module set to pass-through from now on */ \
} while(0)
#define previous_allgather_module previous_routines[ALLGATHER].previous_module
#define previous_allgatherv_module previous_routines[ALLGATHERV].previous_module
#define previous_allreduce_module previous_routines[ALLREDUCE].previous_module
#define previous_bcast_module previous_routines[BCAST].previous_module
#define previous_gather_module previous_routines[GATHER].previous_module
#define previous_reduce_module previous_routines[REDUCE].previous_module
#define previous_scatter_module previous_routines[SCATTER].previous_module
/**
* Global component instance
@ -333,20 +359,30 @@ mca_coll_base_module_t *mca_coll_han_comm_query(struct ompi_communicator_t *comm
int han_request_free(ompi_request_t ** request);
/* Subcommunicator creation */
void mca_coll_han_comm_create(struct ompi_communicator_t *comm, mca_coll_han_module_t * han_module);
void mca_coll_han_comm_create_new(struct ompi_communicator_t *comm, mca_coll_han_module_t *han_module);
/* Gather topology information */
int mca_coll_han_comm_create(struct ompi_communicator_t *comm, mca_coll_han_module_t * han_module);
int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm, mca_coll_han_module_t *han_module);
/**
* Gather topology information
*
* Returns a pointer to the (potentially already cached) topology.
* NOTE: if the rank distribution is imbalanced, no effort will be made to gather
* the topology at all ranks and instead NULL is returned and han_module->is_mapbycore
* is set to false.
* If HAN ever learns to deal with imbalanced topologies, this needs fixing!
*/
int *mca_coll_han_topo_init(struct ompi_communicator_t *comm, mca_coll_han_module_t * han_module,
int num_topo_level);
/* Utils */
void mca_coll_han_get_ranks(int *vranks, int root, int low_size, int *root_low_rank,
int *root_up_rank);
uint32_t han_auto_tuned_get_n(uint32_t n);
uint32_t han_auto_tuned_get_c(uint32_t c);
uint32_t han_auto_tuned_get_m(uint32_t m);
static inline void
mca_coll_han_get_ranks(int *vranks, int root, int low_size,
int *root_low_rank, int *root_up_rank)
{
*root_up_rank = vranks[root] / low_size;
*root_low_rank = vranks[root] % low_size;
}
const char* mca_coll_han_colltype_to_str(COLLTYPE_T coll);
const char* mca_coll_han_topo_lvl_to_str(TOPO_LVL_T topo_lvl);
/** Dynamic component choice */
@ -356,7 +392,7 @@ const char* mca_coll_han_topo_lvl_to_str(TOPO_LVL_T topo_lvl);
*/
int
mca_coll_han_get_all_coll_modules(struct ompi_communicator_t *comm,
mca_coll_han_module_t *han_module);
mca_coll_han_module_t *han_module);
int
mca_coll_han_allgather_intra_dynamic(ALLGATHER_BASE_ARGS,
@ -382,22 +418,13 @@ mca_coll_han_scatter_intra_dynamic(SCATTER_BASE_ARGS,
/* Bcast */
int mca_coll_han_bcast_intra_simple(void *buff,
int count,
struct ompi_datatype_t *dtype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
void mac_coll_han_set_bcast_argu(mca_bcast_argu_t * argu, mca_coll_task_t * cur_task, void *buff,
int seg_count, struct ompi_datatype_t *dtype,
int root_up_rank, int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int num_segments, int cur_seg, int w_rank, int last_seg_count,
bool noop);
int count,
struct ompi_datatype_t *dtype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_han_bcast_intra(void *buff, int count, struct ompi_datatype_t *dtype, int root,
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
int mca_coll_han_bcast_t0_task(void *task_argu);
int mca_coll_han_bcast_t1_task(void *task_argu);
/* Reduce */
int
@ -422,145 +449,75 @@ mca_coll_han_reduce_reproducible(const void *sbuf,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
void mac_coll_han_set_reduce_argu(mca_reduce_argu_t * argu, mca_coll_task_t * cur_task,
void *sbuf,
void *rbuf, int seg_count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
int root_up_rank, int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int num_segments, int cur_seg, int w_rank, int last_seg_count,
bool noop);
int mca_coll_han_reduce_intra(const void *sbuf,
int mca_coll_han_reduce_intra(const void *sbuf,
void *rbuf,
int count,
struct ompi_datatype_t *dtype,
ompi_op_t* op,
int root,
struct ompi_communicator_t *comm,
struct ompi_communicator_t *comm,
mca_coll_base_module_t * module);
int mca_coll_han_reduce_t0_task(void *task_argu);
int mca_coll_han_reduce_t1_task(void *task_argu);
/* Allreduce */
int
mca_coll_han_allreduce_intra_simple(const void *sbuf,
void *rbuf,
int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
void *rbuf,
int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int
mca_coll_han_allreduce_reproducible_decision(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int
mca_coll_han_allreduce_reproducible(const void *sbuf,
void *rbuf,
int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
void mac_coll_han_set_allreduce_argu(mca_allreduce_argu_t * argu,
mca_coll_task_t * cur_task,
void *sbuf,
void *rbuf,
int seg_count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
int root_up_rank,
int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int num_segments,
int cur_seg,
int w_rank,
int last_seg_count,
bool noop, ompi_request_t * req, int *completed);
int mca_coll_han_allreduce_intra(const void *sbuf,
void *rbuf,
int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
int mca_coll_han_allreduce_t0_task(void *task_argu);
int mca_coll_han_allreduce_t1_task(void *task_argu);
int mca_coll_han_allreduce_t2_task(void *task_argu);
int mca_coll_han_allreduce_t3_task(void *task_argu);
/* Scatter */
int
mca_coll_han_scatter_intra(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
int mca_coll_han_scatter_us_task(void *task_argu);
int mca_coll_han_scatter_ls_task(void *task_argu);
void mac_coll_han_set_scatter_argu(mca_scatter_argu_t * argu,
mca_coll_task_t * cur_task,
void *sbuf,
void *sbuf_inter_free,
void *sbuf_reorder_free,
int scount,
struct ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
struct ompi_datatype_t *rdtype,
int root,
int root_up_rank,
int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int w_rank, bool noop, ompi_request_t * req);
/* Gather */
int
mca_coll_han_gather_intra(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
int mca_coll_han_gather_lg_task(void *task_argu);
int mca_coll_han_gather_ug_task(void *task_argu);
void mac_coll_han_set_gather_argu(mca_gather_argu_t * argu,
mca_coll_task_t * cur_task,
void *sbuf,
void *sbuf_inter_free,
int scount,
struct ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
struct ompi_datatype_t *rdtype,
int root,
int root_up_rank,
int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int w_rank, bool noop, ompi_request_t * req);
/* Gather */
int
mca_coll_han_gather_intra(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
int
mca_coll_han_gather_intra_simple(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
struct ompi_datatype_t *sdtype,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
/* reordering after gather, for unordered ranks */
void
ompi_coll_han_reorder_gather(const void *sbuf,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
int * topo);
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
int * topo);
@ -571,30 +528,12 @@ mca_coll_han_allgather_intra(const void *sbuf, int scount,
void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm, mca_coll_base_module_t * module);
int mca_coll_han_allgather_lg_task(void *task_argu);
int mca_coll_han_allgather_uag_task(void *task_argu);
int mca_coll_han_allgather_lb_task(void *task_argu);
void mac_coll_han_set_allgather_argu(mca_allgather_argu_t * argu,
mca_coll_task_t * cur_task,
void *sbuf,
void *sbuf_inter_free,
int scount,
struct ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
struct ompi_datatype_t *rdtype,
int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int w_rank,
bool noop, bool is_mapbycore, int *topo, ompi_request_t * req);
int
mca_coll_han_allgather_intra_simple(const void *sbuf, int scount,
struct ompi_datatype_t *sdtype,
void* rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
struct ompi_datatype_t *sdtype,
void* rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
END_C_DECLS
#endif /* MCA_COLL_HAN_EXPORT_H */

View File

@ -16,40 +16,45 @@
#include "ompi/mca/pml/pml.h"
#include "coll_han_trigger.h"
void mac_coll_han_set_allgather_argu(mca_allgather_argu_t * argu,
mca_coll_task_t * cur_task,
void *sbuf,
void *sbuf_inter_free,
int scount,
struct ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
struct ompi_datatype_t *rdtype,
int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int w_rank,
bool noop,
bool is_mapbycore,
int *topo,
ompi_request_t * req)
static int mca_coll_han_allgather_lb_task(void *task_args);
static int mca_coll_han_allgather_lg_task(void *task_args);
static int mca_coll_han_allgather_uag_task(void *task_args);
static inline void
mca_coll_han_set_allgather_args(mca_coll_han_allgather_t * args,
mca_coll_task_t * cur_task,
void *sbuf,
void *sbuf_inter_free,
int scount,
struct ompi_datatype_t *sdtype,
void *rbuf,
int rcount,
struct ompi_datatype_t *rdtype,
int root_low_rank,
struct ompi_communicator_t *up_comm,
struct ompi_communicator_t *low_comm,
int w_rank,
bool noop,
bool is_mapbycore,
int *topo,
ompi_request_t * req)
{
argu->cur_task = cur_task;
argu->sbuf = sbuf;
argu->sbuf_inter_free = sbuf_inter_free;
argu->scount = scount;
argu->sdtype = sdtype;
argu->rbuf = rbuf;
argu->rcount = rcount;
argu->rdtype = rdtype;
argu->root_low_rank = root_low_rank;
argu->up_comm = up_comm;
argu->low_comm = low_comm;
argu->w_rank = w_rank;