3a683419c5
This is hot-fix patch for the issue reported by Ralph. In future we plan to restructure ml data structure layout. Tested by Nathan. cmr=v1.7.5:ticket=trac:4158 This commit was SVN r30619. The following Trac tickets were found above: Ticket 4158 --> https://svn.open-mpi.org/trac/ompi/ticket/4158
1107 строки
40 KiB
C
1107 строки
40 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
|
|
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
|
|
* Copyright (c) 2013-2014 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "ompi_config.h"
|
|
#include "ompi/mca/bcol/base/base.h"
|
|
#include "ompi/mca/bcol/basesmuma/bcol_basesmuma.h"
|
|
#include "ompi/constants.h"
|
|
#include "ompi/datatype/ompi_datatype.h"
|
|
#include "ompi/communicator/communicator.h"
|
|
|
|
/* debug
|
|
* #include "opal/sys/timer.h"
|
|
*
|
|
* extern uint64_t timers[7];
|
|
* end debug */
|
|
|
|
/* debug */
|
|
#include <unistd.h>
|
|
/* end debug */
|
|
|
|
/* non-blocking gather routines: init and progress functions */
|
|
int bcol_basesmuma_gather_init(mca_bcol_base_module_t *super)
|
|
{
|
|
mca_bcol_base_coll_fn_comm_attributes_t comm_attribs;
|
|
mca_bcol_base_coll_fn_invoke_attributes_t inv_attribs;
|
|
|
|
comm_attribs.bcoll_type = BCOL_GATHER;
|
|
comm_attribs.comm_size_min = 0;
|
|
comm_attribs.comm_size_max = 16;
|
|
comm_attribs.data_src = DATA_SRC_KNOWN;
|
|
comm_attribs.waiting_semantics = BLOCKING;
|
|
|
|
inv_attribs.bcol_msg_min = 0;
|
|
inv_attribs.bcol_msg_max = 20000;
|
|
inv_attribs.datatype_bitmap = 0x11111111;
|
|
inv_attribs.op_types_bitmap = 0x11111111;
|
|
|
|
/* Set attributes for fanin fanout algorithm */
|
|
mca_bcol_base_set_attributes(super, &comm_attribs, &inv_attribs,
|
|
bcol_basesmuma_k_nomial_gather_init,
|
|
bcol_basesmuma_k_nomial_gather_progress);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
int bcol_basesmuma_k_nomial_gather_init(bcol_function_args_t *input_args,
|
|
mca_bcol_base_function_t *c_input_args)
|
|
{
|
|
/* local variables */
|
|
int leading_dim, buff_idx, idx;
|
|
int src, i, j, k_temp1, k_temp2;
|
|
int pseudo_root, proxy_root, pseudo_base_adj;
|
|
volatile int8_t ready_flag;
|
|
int count=input_args->count;
|
|
struct ompi_datatype_t* dtype=input_args->dtype;
|
|
int root=input_args->root;
|
|
int base_adj, base;
|
|
int total_peers, my_pow_k=0;
|
|
int64_t sequence_number=input_args->sequence_num;
|
|
mca_bcol_basesmuma_module_t* bcol_module=
|
|
(mca_bcol_basesmuma_module_t *)c_input_args->bcol_module;
|
|
int bcol_id = (int) bcol_module->super.bcol_id;
|
|
int my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
netpatterns_k_exchange_node_t *exchange_node =
|
|
&bcol_module->knomial_allgather_tree;
|
|
uint32_t buffer_index = input_args->buffer_index;
|
|
int *active_requests =
|
|
&(bcol_module->ml_mem.nb_coll_desc[buffer_index].active_requests);
|
|
|
|
int *iteration = &bcol_module->ml_mem.nb_coll_desc[buffer_index].iteration;
|
|
int *status = &bcol_module->ml_mem.nb_coll_desc[buffer_index].status;
|
|
|
|
int buff_offset = bcol_module->super.hier_scather_offset;
|
|
|
|
/* "indirectors" */
|
|
int *inv_map = exchange_node->inv_reindex_map;
|
|
int *reindex_map = exchange_node->reindex_map;
|
|
int stray = exchange_node->k_nomial_stray;
|
|
|
|
/* tree radix */
|
|
int tree_order = exchange_node->tree_order;
|
|
/* tree depth */
|
|
int pow_k = exchange_node->log_tree_order;
|
|
/* largest power of k less than or equal to np */
|
|
int cnt = exchange_node->n_largest_pow_tree_order;
|
|
|
|
/* payload structures */
|
|
volatile mca_bcol_basesmuma_payload_t *data_buffs;
|
|
|
|
/* control structures */
|
|
volatile mca_bcol_basesmuma_header_t *my_ctl_pointer;
|
|
|
|
size_t pack_len = 0, dt_size;
|
|
|
|
#if 0
|
|
fprintf(stderr,"Entering sm gather input_args->sbuf_offset %d \n",input_args->sbuf_offset);
|
|
fflush(stderr);
|
|
#endif
|
|
|
|
|
|
/* we will work only on packed data - so compute the length*/
|
|
/* this is the size of my data, this is not gatherv so it's the same
|
|
* for all ranks in the communicator.
|
|
*/
|
|
ompi_datatype_type_size(dtype, &dt_size);
|
|
pack_len=count*dt_size;
|
|
/* now set the "real" offset */
|
|
buff_offset = buff_offset*pack_len;
|
|
|
|
buff_idx = input_args->src_desc->buffer_index;
|
|
|
|
/* Get addressing information */
|
|
my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
|
|
leading_dim=bcol_module->colls_no_user_data.size_of_group;
|
|
idx=SM_ARRAY_INDEX(leading_dim,buff_idx,0);
|
|
data_buffs=(volatile mca_bcol_basesmuma_payload_t *)
|
|
bcol_module->colls_with_user_data.data_buffs+idx;
|
|
|
|
/* Set pointer to current proc ctrl region */
|
|
my_ctl_pointer = data_buffs[my_rank].ctl_struct;
|
|
|
|
/* init the header */
|
|
BASESMUMA_HEADER_INIT(my_ctl_pointer, ready_flag, sequence_number, bcol_id);
|
|
|
|
/* init active requests, iteration, and status */
|
|
*iteration = 0;
|
|
*active_requests = 0;
|
|
*status = -1;
|
|
/* calculate the number of steps necessary for this collective */
|
|
|
|
/* first thing we do is figure out where the root is in our new indexing */
|
|
/* find root in new indexing */
|
|
pseudo_root = inv_map[root];
|
|
/* see if this is larger than the stray */
|
|
if (pseudo_root >= stray) {
|
|
/* then we need to define the proxy root, everyone can do this */
|
|
proxy_root = pseudo_root - cnt;
|
|
} else {
|
|
proxy_root = pseudo_root;
|
|
}
|
|
|
|
/* do some figuring */
|
|
if (EXCHANGE_NODE == exchange_node->node_type) {
|
|
total_peers = 0;
|
|
my_pow_k = pow_k;
|
|
k_temp1 = tree_order;
|
|
k_temp2 = 1;
|
|
for( i = 0; i < pow_k; i++) {
|
|
/* then find the base */
|
|
FIND_BASE(base,exchange_node->reindex_myid,i+1,tree_order);
|
|
/* now find the adjusted base */
|
|
base_adj = base + (base + proxy_root)%k_temp1;
|
|
/* ok, now find out WHO is occupying this slot */
|
|
pseudo_base_adj = reindex_map[base_adj];
|
|
|
|
if(my_rank == pseudo_base_adj ) {
|
|
/* then go ahead and poll for children's data */
|
|
for( j = 0; j < (tree_order - 1); j++ ) {
|
|
/* send phase
|
|
*/
|
|
/* get communication partner */
|
|
|
|
src = exchange_node->rank_exchanges[i][j];
|
|
/* remember, if we have extra ranks, then we won't participate
|
|
* with a least one peer. Make a check
|
|
*/
|
|
if( src < 0 ){
|
|
continue;
|
|
}else{
|
|
|
|
/* flip a bit to represent this request */
|
|
*active_requests ^= (1<<total_peers++);
|
|
}
|
|
|
|
|
|
}
|
|
} else {
|
|
/* announce my arrival */
|
|
my_pow_k = i;
|
|
break;
|
|
}
|
|
|
|
k_temp1 = k_temp1*tree_order;
|
|
k_temp2 = k_temp2*tree_order;
|
|
}
|
|
}
|
|
|
|
*iteration = my_pow_k;
|
|
|
|
if (EXTRA_NODE == exchange_node->node_type || 0 == exchange_node->n_extra_sources) {
|
|
if (0 == my_pow_k || EXTRA_NODE == exchange_node->node_type) {
|
|
opal_atomic_rmb ();
|
|
|
|
my_ctl_pointer->flags[GATHER_FLAG][bcol_id] = ready_flag;
|
|
}
|
|
|
|
if ((EXTRA_NODE == exchange_node->node_type && root != my_rank) || 0 == my_pow_k) {
|
|
/* nothing more to do */
|
|
my_ctl_pointer->starting_flag_value[bcol_id]++;
|
|
|
|
return BCOL_FN_COMPLETE;
|
|
}
|
|
}
|
|
|
|
return BCOL_FN_STARTED;
|
|
}
|
|
|
|
|
|
int bcol_basesmuma_k_nomial_gather_progress(bcol_function_args_t *input_args,
|
|
mca_bcol_base_function_t *c_input_args)
|
|
{
|
|
/* local variables */
|
|
int group_size;
|
|
int flag_offset;
|
|
int leading_dim, buff_idx, idx;
|
|
int src, knt, i, j, k_temp1, k_temp2;
|
|
volatile int8_t ready_flag;
|
|
int count=input_args->count;
|
|
struct ompi_datatype_t* dtype=input_args->dtype;
|
|
int root=input_args->root;
|
|
int probe;
|
|
int matched;
|
|
int64_t sequence_number=input_args->sequence_num;
|
|
mca_bcol_basesmuma_module_t* bcol_module=
|
|
(mca_bcol_basesmuma_module_t *)c_input_args->bcol_module;
|
|
int bcol_id = (int) bcol_module->super.bcol_id;
|
|
int my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
mca_bcol_basesmuma_component_t *cm = &mca_bcol_basesmuma_component;
|
|
netpatterns_k_exchange_node_t *exchange_node =
|
|
&bcol_module->knomial_allgather_tree;
|
|
uint32_t buffer_index = input_args->buffer_index;
|
|
int *active_requests =
|
|
&(bcol_module->ml_mem.nb_coll_desc[buffer_index].active_requests);
|
|
int *iteration = &bcol_module->ml_mem.nb_coll_desc[buffer_index].iteration;
|
|
int *status = &bcol_module->ml_mem.nb_coll_desc[buffer_index].status;
|
|
int buff_offset = bcol_module->super.hier_scather_offset;
|
|
/* "indirectors" */
|
|
int *list_connected = bcol_module->super.list_n_connected;
|
|
/* tree radix */
|
|
int tree_order = exchange_node->tree_order;
|
|
/* payload structures */
|
|
volatile mca_bcol_basesmuma_payload_t *data_buffs;
|
|
volatile char *child_data_pointer;
|
|
/* control structures */
|
|
volatile mca_bcol_basesmuma_header_t *my_ctl_pointer;
|
|
volatile mca_bcol_basesmuma_header_t *child_ctl_pointer;
|
|
/*volatile mca_bcol_basesmuma_ctl_struct_t* parent_ctl_pointer; */
|
|
|
|
size_t pack_len = 0, dt_size;
|
|
void *data_addr = (void *)((unsigned char *)input_args->src_desc->data_addr);
|
|
|
|
|
|
#if 0
|
|
fprintf(stderr,"Entering sm gather input_args->sbuf_offset %d \n",input_args->sbuf_offset);
|
|
fflush(stderr);
|
|
#endif
|
|
|
|
|
|
/* we will work only on packed data - so compute the length*/
|
|
/* this is the size of my data, this is not gatherv so it's the same
|
|
* for all ranks in the communicator.
|
|
*/
|
|
ompi_datatype_type_size(dtype, &dt_size);
|
|
pack_len=count*dt_size;
|
|
/* now set the "real" offset */
|
|
buff_offset = buff_offset*pack_len;
|
|
|
|
buff_idx = input_args->src_desc->buffer_index;
|
|
|
|
/* Get addressing information */
|
|
my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
|
|
group_size = bcol_module->colls_no_user_data.size_of_group;
|
|
leading_dim=bcol_module->colls_no_user_data.size_of_group;
|
|
idx=SM_ARRAY_INDEX(leading_dim,buff_idx,0);
|
|
data_buffs=(volatile mca_bcol_basesmuma_payload_t *)
|
|
bcol_module->colls_with_user_data.data_buffs+idx;
|
|
|
|
/* Set pointer to current proc ctrl region */
|
|
my_ctl_pointer = data_buffs[my_rank].ctl_struct;
|
|
/* restart the ready_flag state */
|
|
flag_offset = my_ctl_pointer->starting_flag_value[bcol_id];
|
|
ready_flag = flag_offset + 1;
|
|
|
|
/* calculate the number of steps necessary for this collective */
|
|
|
|
/* first thing we do is figure out where the root is in our new indexing */
|
|
/* find root in new indexing */
|
|
if( EXTRA_NODE == exchange_node->node_type ) {
|
|
|
|
/* poll for data from proxy */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
/* get src data buffer */
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
/* remember to bump your flag */
|
|
ready_flag++;
|
|
|
|
/* in this case, you must block */
|
|
for (i = 0 ; i < cm->num_to_probe ; ++i) {
|
|
if (IS_PEER_READY(child_ctl_pointer,ready_flag,sequence_number, GATHER_FLAG, bcol_id)){
|
|
/* receive the data from the proxy, aka pseudo-root */
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset),
|
|
(void *) ((unsigned char *) child_data_pointer+buff_offset),
|
|
pack_len * group_size);
|
|
|
|
goto FINISHED;
|
|
}
|
|
}
|
|
|
|
return BCOL_FN_STARTED;
|
|
}
|
|
|
|
|
|
if (0 < exchange_node->n_extra_sources && (-1 == (*status))) {
|
|
/* am a proxy, poll for pack_len data from extra */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
/* get src data buffer */
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
knt = 0;
|
|
for( i = 0; i < src; i++){
|
|
knt += list_connected[i];
|
|
}
|
|
/* must block here also */
|
|
matched = 0;
|
|
for (i = 0, matched = 0 ; i < cm->num_to_probe && (0 == matched) ; ++i) {
|
|
if(IS_PEER_READY(child_ctl_pointer,ready_flag,sequence_number, GATHER_FLAG, bcol_id)){
|
|
matched = 1;
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset + pack_len*knt),
|
|
(void *) ((unsigned char *) child_data_pointer + buff_offset +
|
|
pack_len*knt), pack_len*list_connected[src]);
|
|
*status = 0;
|
|
if( 0 == *active_requests ){
|
|
goto LAST_STEP;
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
if( 0 == matched ){
|
|
return BCOL_FN_STARTED;
|
|
}
|
|
}
|
|
|
|
/* start the k-nomial gather phase */
|
|
/* only "active ranks participate, once a rank has forwarded its data, it becomes inactive */
|
|
for (probe = 0 ; probe < cm->num_to_probe ; ++probe) {
|
|
k_temp1 = tree_order;
|
|
k_temp2 = 1;
|
|
for (i = 0 ; i < *(iteration) ; ++i) {
|
|
|
|
/* then go ahead and poll for children's data */
|
|
for (j = 0 ; j < (tree_order - 1) ; ++j) {
|
|
/* send phase
|
|
*/
|
|
/* get communication partner */
|
|
|
|
src = exchange_node->rank_exchanges[i][j];
|
|
/* remember, if we have extra ranks, then we won't participate
|
|
* with a least one peer. Make a check
|
|
*/
|
|
/* if the bit that corresponds to this child has been set to zero,
|
|
* then it has already checked in and data received
|
|
*/
|
|
if (src < 0 || 1 != ((*active_requests >> ((tree_order - 1)*i + j))&1)){
|
|
continue;
|
|
}
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
|
|
if(IS_PEER_READY(child_ctl_pointer,ready_flag,sequence_number, GATHER_FLAG, bcol_id)){
|
|
/* copy the data */
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset +
|
|
exchange_node->payload_info[i][j].r_offset*pack_len),
|
|
(void *) ((unsigned char *) child_data_pointer + buff_offset +
|
|
exchange_node->payload_info[i][j].r_offset*pack_len),
|
|
exchange_node->payload_info[i][j].r_len*pack_len);
|
|
/* flip the bit to zero */
|
|
*active_requests ^= (1<<((tree_order - 1)*i + j));
|
|
if(0 == (*active_requests)) {
|
|
goto LAST_STEP;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
k_temp1 = k_temp1*tree_order;
|
|
k_temp2 = k_temp2*tree_order;
|
|
}
|
|
|
|
|
|
return BCOL_FN_STARTED;
|
|
|
|
LAST_STEP:
|
|
/* last step, proxies send full data back to the extra ranks */
|
|
if( 0 < exchange_node->n_extra_sources &&
|
|
root == exchange_node->rank_extra_sources_array[0]) {
|
|
/* regardless, I will bump the ready flag and set it in case someone is watching */
|
|
/* announce that data is ready */
|
|
ready_flag++;
|
|
}
|
|
|
|
/* signal that data is ready */
|
|
opal_atomic_wmb ();
|
|
my_ctl_pointer->flags[GATHER_FLAG][bcol_id] = ready_flag;
|
|
|
|
FINISHED:
|
|
|
|
|
|
my_ctl_pointer->starting_flag_value[bcol_id]++;
|
|
|
|
return BCOL_FN_COMPLETE;
|
|
}
|
|
|
|
|
|
/* Blocking routines, used to prototype and test signaling,
|
|
* as well as debug hierarchical algorithm
|
|
*/
|
|
#if 0
|
|
int bcol_basesmuma_gather_init(mca_bcol_base_module_t *super)
|
|
{
|
|
mca_bcol_base_coll_fn_comm_attributes_t comm_attribs;
|
|
mca_bcol_base_coll_fn_invoke_attributes_t inv_attribs;
|
|
|
|
comm_attribs.bcoll_type = BCOL_GATHER;
|
|
comm_attribs.comm_size_min = 0;
|
|
comm_attribs.comm_size_max = 16;
|
|
comm_attribs.data_src = DATA_SRC_KNOWN;
|
|
comm_attribs.waiting_semantics = BLOCKING;
|
|
|
|
inv_attribs.bcol_msg_min = 0;
|
|
inv_attribs.bcol_msg_max = 20000;
|
|
inv_attribs.datatype_bitmap = 0x11111111;
|
|
inv_attribs.op_types_bitmap = 0x11111111;
|
|
|
|
|
|
/* Set attributes for fanin fanout algorithm */
|
|
mca_bcol_base_set_attributes(super, &comm_attribs, &inv_attribs, bcol_basesmuma_k_nomial_gather,
|
|
bcol_basesmuma_k_nomial_gather);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
#endif
|
|
|
|
|
|
/* original, fully blocking, fully synchronous gather - should result in worst performance when used */
|
|
#if 0
|
|
int bcol_basesmuma_k_nomial_gather(bcol_function_args_t *input_args,
|
|
mca_bcol_base_function_t *c_input_args)
|
|
{
|
|
/* local variables */
|
|
int group_size;
|
|
int first_instance=0, flag_offset;
|
|
int rc = OMPI_SUCCESS;
|
|
int leading_dim, buff_idx, idx;
|
|
int *group_list;
|
|
int src, comm_src, knt, i, k, j, k_temp1, k_temp2;
|
|
int pseudo_root, proxy_root, pseudo_base_adj;
|
|
volatile int64_t ready_flag;
|
|
int count=input_args->count;
|
|
struct ompi_datatype_t* dtype=input_args->dtype;
|
|
int root=input_args->root;
|
|
int base_adj, base;
|
|
int64_t sequence_number=input_args->sequence_num;
|
|
mca_bcol_basesmuma_module_t* bcol_module=
|
|
(mca_bcol_basesmuma_module_t *)c_input_args->bcol_module;
|
|
int my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
mca_bcol_basesmuma_component_t *cs = &mca_bcol_basesmuma_component;
|
|
netpatterns_k_exchange_node_t *exchange_node =
|
|
&bcol_module->knomial_allgather_tree;
|
|
|
|
int buff_offset = bcol_module->super.hier_scather_offset;
|
|
|
|
/* "indirectors" */
|
|
int *list_connected = bcol_module->super.list_n_connected;
|
|
int *inv_map = exchange_node->inv_reindex_map;
|
|
int *reindex_map = exchange_node->reindex_map;
|
|
/*int *reindex_map = exchange_node->reindex_map;*/
|
|
/* stray rank == first rank in the extra set */
|
|
int stray = exchange_node->k_nomial_stray;
|
|
|
|
/* tree radix */
|
|
int tree_order = exchange_node->tree_order;
|
|
/* tree depth */
|
|
int pow_k = exchange_node->log_tree_order;
|
|
/* largest power of k less than or equal to np */
|
|
int cnt = exchange_node->n_largest_pow_tree_order;
|
|
|
|
/*fprintf(stderr,"tree order %d pow_k %d stray %d root %d\n",tree_order, pow_k, stray, root);*/
|
|
/* payload structures */
|
|
volatile mca_bcol_basesmuma_payload_t *data_buffs;
|
|
volatile char *child_data_pointer;
|
|
|
|
/* control structures */
|
|
volatile mca_bcol_basesmuma_header_t *my_ctl_pointer;
|
|
volatile mca_bcol_basesmuma_header_t *child_ctl_pointer;
|
|
/*volatile mca_bcol_basesmuma_ctl_struct_t* parent_ctl_pointer; */
|
|
|
|
size_t pack_len = 0, dt_size;
|
|
void *data_addr = (void *)((unsigned char *)input_args->src_desc->data_addr);
|
|
|
|
/* active in the algorithm */
|
|
bool active = true;
|
|
|
|
#if 0
|
|
fprintf(stderr,"Entering sm gather input_args->sbuf_offset %d \n",input_args->sbuf_offset);
|
|
fflush(stderr);
|
|
#endif
|
|
|
|
|
|
/* we will work only on packed data - so compute the length*/
|
|
/* this is the size of my data, this is not gatherv so it's the same
|
|
* for all ranks in the communicator.
|
|
*/
|
|
ompi_datatype_type_size(dtype, &dt_size);
|
|
pack_len=count*dt_size;
|
|
/* now set the "real" offset */
|
|
buff_offset = buff_offset*pack_len;
|
|
|
|
buff_idx = input_args->src_desc->buffer_index;
|
|
|
|
/* Get addressing information */
|
|
my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
/* I have a feeling that I'll need this */
|
|
group_list = bcol_module->super.sbgp_partner_module->group_list;
|
|
|
|
group_size = bcol_module->colls_no_user_data.size_of_group;
|
|
leading_dim=bcol_module->colls_no_user_data.size_of_group;
|
|
idx=SM_ARRAY_INDEX(leading_dim,buff_idx,0);
|
|
/*ctl_structs=(mca_bcol_basesmuma_ctl_struct_t **)
|
|
bcol_module->colls_with_user_data.ctl_buffs+idx;
|
|
*/
|
|
data_buffs=(volatile mca_bcol_basesmuma_payload_t *)
|
|
bcol_module->colls_with_user_data.data_buffs+idx;
|
|
|
|
/* Set pointer to current proc ctrl region */
|
|
/*my_ctl_pointer = ctl_structs[my_rank]; */
|
|
my_ctl_pointer = data_buffs[my_rank].ctl_struct;
|
|
|
|
/* setup resource recycling */
|
|
if( my_ctl_pointer->sequence_number < sequence_number ) {
|
|
first_instance=1;
|
|
}
|
|
|
|
if( first_instance ) {
|
|
/* Signal arrival */
|
|
my_ctl_pointer->flag = -1;
|
|
my_ctl_pointer->gflag = -1;
|
|
my_ctl_pointer->index=1;
|
|
/* this does not need to use any flag values , so only need to
|
|
* set the value for subsequent values that may need this */
|
|
my_ctl_pointer->starting_flag_value=0;
|
|
flag_offset=0;
|
|
|
|
} else {
|
|
/* only one thread at a time will be making progress on this
|
|
* collective, so no need to make this atomic */
|
|
my_ctl_pointer->index++;
|
|
}
|
|
|
|
|
|
/* increment the starting flag by one and return */
|
|
flag_offset = my_ctl_pointer->starting_flag_value;
|
|
ready_flag = flag_offset + sequence_number + 1;
|
|
my_ctl_pointer->sequence_number = sequence_number;
|
|
|
|
/* debug
|
|
fprintf(stderr," sequence_number %lld flag_offset %d starting flag val %d\n",sequence_number,flag_offset, my_ctl_pointer->starting_flag_value);
|
|
fflush(stderr);
|
|
end debug */
|
|
|
|
|
|
/*
|
|
* Fan out from root
|
|
*/
|
|
/* don't need this either */
|
|
/* root is the local leader */
|
|
/* calculate the number of steps necessary for this collective */
|
|
|
|
/* first thing we do is figure out where the root is in our new indexing */
|
|
/* find root in new indexing */
|
|
pseudo_root = inv_map[root];
|
|
/* see if this is larger than the stray */
|
|
if( pseudo_root >= stray ) {
|
|
/* then we need to define the proxy root, everyone can do this */
|
|
proxy_root = pseudo_root - cnt;
|
|
}else {
|
|
proxy_root = pseudo_root;
|
|
}
|
|
|
|
|
|
|
|
if( EXTRA_NODE == exchange_node->node_type ) {
|
|
|
|
/* signal arrival */
|
|
my_ctl_pointer->gflag = ready_flag;
|
|
|
|
/* send is done */
|
|
|
|
/* poll for data only if I am the root */
|
|
/* bump the ready flag */
|
|
ready_flag++;
|
|
if( root == my_rank ){
|
|
/* poll for data from proxy */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
/* get src data buffer */
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
while(!IS_GDATA_READY(child_ctl_pointer,ready_flag,sequence_number)){
|
|
opal_progress();
|
|
}
|
|
/* receive the data from the proxy, aka pseudo-root */
|
|
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset),(void *) ((unsigned char *) child_data_pointer+buff_offset)
|
|
,pack_len*group_size);
|
|
}
|
|
goto FINISHED;
|
|
|
|
|
|
} else if( 0 < exchange_node->n_extra_sources ) {
|
|
|
|
/* am a proxy, poll for pack_len data from extra */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
/* get src data buffer */
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
knt = 0;
|
|
for( i = 0; i < src; i++){
|
|
knt += list_connected[i];
|
|
}
|
|
while(!IS_GDATA_READY(child_ctl_pointer,ready_flag,sequence_number)){
|
|
opal_progress();
|
|
}
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset + pack_len*knt),
|
|
(void *) ((unsigned char *) child_data_pointer + buff_offset +
|
|
pack_len*knt), pack_len*list_connected[src]);
|
|
/*fprintf(stderr,"999 proxy received data from %d at offset %d of length %d\n",src,
|
|
buff_offset+pack_len*knt,pack_len*list_connected[src]);
|
|
*/
|
|
}
|
|
|
|
/* start the k-nomial gather phase */
|
|
/* only "active ranks participate, once a rank has forwarded its data, it becomes inactive */
|
|
knt = 0;
|
|
while(active){
|
|
k_temp1 = tree_order;
|
|
k_temp2 = 1;
|
|
for( i = 0; i < pow_k; i++) {
|
|
/* then find the base */
|
|
/*FIND_BASE(base,my_rank,i+1,tree_order);*/
|
|
FIND_BASE(base,exchange_node->reindex_myid,i+1,tree_order);
|
|
/* now find the adjusted base */
|
|
base_adj = base + (base + proxy_root)%k_temp1;
|
|
/* ok, now find out WHO is occupying this slot */
|
|
/*pseudo_base_adj = inv_map[base_adj];*/
|
|
pseudo_base_adj = reindex_map[base_adj];
|
|
|
|
if(my_rank == pseudo_base_adj ) {
|
|
/* then go ahead and poll for children's data */
|
|
for( j = 0; j < (tree_order - 1); j++ ) {
|
|
/* send phase
|
|
*/
|
|
/* get communication partner */
|
|
|
|
src = exchange_node->rank_exchanges[i][j];
|
|
/*fprintf(stderr,"comm_src %d\n",comm_src);*/
|
|
/* remember, if we have extra ranks, then we won't participate
|
|
* with a least one peer. Make a check
|
|
*/
|
|
if( src < 0 ){
|
|
continue;
|
|
}
|
|
|
|
/*fprintf(stderr,"src %d\n",src);*/
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
while(!IS_GDATA_READY(child_ctl_pointer,ready_flag,sequence_number)){
|
|
opal_progress();
|
|
}
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset +
|
|
exchange_node->payload_info[i][j].r_offset*pack_len),
|
|
(void *) ((unsigned char *) child_data_pointer + buff_offset +
|
|
exchange_node->payload_info[i][j].r_offset*pack_len),
|
|
exchange_node->payload_info[i][j].r_len*pack_len);
|
|
/*
|
|
fprintf(stderr,"999 receiving data from %d at offset %d of length %d\n",
|
|
exchange_node->rank_exchanges[i][j], buff_offset + exchange_node->payload_info[i][j].r_offset,
|
|
exchange_node->payload_info[i][j].r_len*pack_len);
|
|
*/
|
|
opal_atomic_wmb ();
|
|
knt++;
|
|
if(knt == exchange_node->n_actual_exchanges) {
|
|
/* this is the trick to break the root out,
|
|
* only the root should be able to satisfy this
|
|
*/
|
|
/*
|
|
fprintf(stderr,"hello n_actual is %d \n",knt);
|
|
fprintf(stderr,"hello n_actual_exch is %d \n",
|
|
exchange_node->n_actual_exchanges);
|
|
*/
|
|
goto LAST_STEP;
|
|
}
|
|
}
|
|
} else {
|
|
/* announce my arrival */
|
|
my_ctl_pointer->gflag = ready_flag;
|
|
active = false;
|
|
break;
|
|
}
|
|
|
|
k_temp1 = k_temp1*tree_order;
|
|
k_temp2 = k_temp2*tree_order;
|
|
}
|
|
}
|
|
LAST_STEP:
|
|
/* last step, proxies send full data back to the extra ranks */
|
|
if( 0 < exchange_node->n_extra_sources &&
|
|
root == exchange_node->rank_extra_sources_array[0]) {
|
|
/* regardless, I will bump the ready flag and set it in case someone is watching */
|
|
/* announce that data is ready */
|
|
ready_flag++;
|
|
my_ctl_pointer->gflag = ready_flag;
|
|
}
|
|
|
|
|
|
FINISHED:
|
|
|
|
/* debug
|
|
fprintf(stderr," my_ctl_pointer->index %d n of this type %d %u \n",
|
|
my_ctl_pointer->index,c_input_args->n_of_this_type_in_collective,getpid());
|
|
fflush(stderr);
|
|
end debug */
|
|
|
|
my_ctl_pointer->starting_flag_value+=1;
|
|
|
|
return BCOL_FN_COMPLETE;
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
#if 0
|
|
/* blocking, asynchronous polling gather routine */
|
|
int bcol_basesmuma_k_nomial_gather(bcol_function_args_t *input_args,
|
|
mca_bcol_base_function_t *c_input_args)
|
|
{
|
|
/* local variables */
|
|
int group_size;
|
|
int first_instance=0, flag_offset;
|
|
int rc = OMPI_SUCCESS;
|
|
int leading_dim, buff_idx, idx;
|
|
int *group_list;
|
|
int src, comm_src, knt, i, k, j, k_temp1, k_temp2;
|
|
int pseudo_root, proxy_root, pseudo_base_adj;
|
|
volatile int64_t ready_flag;
|
|
int count=input_args->count;
|
|
struct ompi_datatype_t* dtype=input_args->dtype;
|
|
int root=input_args->root;
|
|
int base_adj, base;
|
|
int total_peers, my_pow_k;
|
|
int probe;
|
|
int matched;
|
|
int64_t sequence_number=input_args->sequence_num;
|
|
mca_bcol_basesmuma_module_t* bcol_module=
|
|
(mca_bcol_basesmuma_module_t *)c_input_args->bcol_module;
|
|
int my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
mca_bcol_basesmuma_component_t *cm = &mca_bcol_basesmuma_component;
|
|
netpatterns_k_exchange_node_t *exchange_node =
|
|
&bcol_module->knomial_allgather_tree;
|
|
|
|
int buff_offset = bcol_module->super.hier_scather_offset;
|
|
|
|
/* "indirectors" */
|
|
int *list_connected = bcol_module->super.list_n_connected;
|
|
int *inv_map = exchange_node->inv_reindex_map;
|
|
int *reindex_map = exchange_node->reindex_map;
|
|
/*int *reindex_map = exchange_node->reindex_map;*/
|
|
/* stray rank == first rank in the extra set */
|
|
int stray = exchange_node->k_nomial_stray;
|
|
|
|
/* tree radix */
|
|
int tree_order = exchange_node->tree_order;
|
|
/* tree depth */
|
|
int pow_k = exchange_node->log_tree_order;
|
|
/* largest power of k less than or equal to np */
|
|
int cnt = exchange_node->n_largest_pow_tree_order;
|
|
|
|
/*fprintf(stderr,"tree order %d pow_k %d stray %d root %d\n",tree_order, pow_k, stray, root);*/
|
|
/* payload structures */
|
|
volatile mca_bcol_basesmuma_payload_t *data_buffs;
|
|
volatile char *child_data_pointer;
|
|
|
|
/* control structures */
|
|
volatile mca_bcol_basesmuma_header_t *my_ctl_pointer;
|
|
volatile mca_bcol_basesmuma_header_t *child_ctl_pointer;
|
|
/*volatile mca_bcol_basesmuma_ctl_struct_t* parent_ctl_pointer; */
|
|
|
|
size_t pack_len = 0, dt_size;
|
|
void *data_addr = (void *)((unsigned char *)input_args->src_desc->data_addr);
|
|
|
|
/* active in the algorithm */
|
|
bool active = true;
|
|
|
|
#if 0
|
|
fprintf(stderr,"Entering sm gather root %d \n",root);
|
|
fflush(stderr);
|
|
#endif
|
|
|
|
|
|
/* we will work only on packed data - so compute the length*/
|
|
/* this is the size of my data, this is not gatherv so it's the same
|
|
* for all ranks in the communicator.
|
|
*/
|
|
ompi_datatype_type_size(dtype, &dt_size);
|
|
pack_len=count*dt_size;
|
|
/* now set the "real" offset */
|
|
buff_offset = buff_offset*pack_len;
|
|
|
|
buff_idx = input_args->src_desc->buffer_index;
|
|
|
|
/* Get addressing information */
|
|
my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
/* I have a feeling that I'll need this */
|
|
group_list = bcol_module->super.sbgp_partner_module->group_list;
|
|
|
|
group_size = bcol_module->colls_no_user_data.size_of_group;
|
|
leading_dim=bcol_module->colls_no_user_data.size_of_group;
|
|
idx=SM_ARRAY_INDEX(leading_dim,buff_idx,0);
|
|
/*ctl_structs=(mca_bcol_basesmuma_ctl_struct_t **)
|
|
bcol_module->colls_with_user_data.ctl_buffs+idx;
|
|
*/
|
|
data_buffs=(volatile mca_bcol_basesmuma_payload_t *)
|
|
bcol_module->colls_with_user_data.data_buffs+idx;
|
|
|
|
/* Set pointer to current proc ctrl region */
|
|
/*my_ctl_pointer = ctl_structs[my_rank]; */
|
|
my_ctl_pointer = data_buffs[my_rank].ctl_struct;
|
|
|
|
/* setup resource recycling */
|
|
if( my_ctl_pointer->sequence_number < sequence_number ) {
|
|
first_instance=1;
|
|
}
|
|
|
|
if( first_instance ) {
|
|
/* Signal arrival */
|
|
my_ctl_pointer->flag = -1;
|
|
my_ctl_pointer->gflag = -1;
|
|
my_ctl_pointer->index=1;
|
|
/* this does not need to use any flag values , so only need to
|
|
* set the value for subsequent values that may need this */
|
|
my_ctl_pointer->starting_flag_value=0;
|
|
flag_offset=0;
|
|
|
|
} else {
|
|
/* only one thread at a time will be making progress on this
|
|
* collective, so no need to make this atomic */
|
|
my_ctl_pointer->index++;
|
|
}
|
|
|
|
|
|
/* increment the starting flag by one and return */
|
|
flag_offset = my_ctl_pointer->starting_flag_value;
|
|
ready_flag = flag_offset + sequence_number + 1;
|
|
my_ctl_pointer->sequence_number = sequence_number;
|
|
|
|
/* debug
|
|
fprintf(stderr," sequence_number %lld flag_offset %d starting flag val %d\n",sequence_number,flag_offset, my_ctl_pointer->starting_flag_value);
|
|
fflush(stderr);
|
|
end debug */
|
|
|
|
|
|
/*
|
|
* Fan out from root
|
|
*/
|
|
/* don't need this either */
|
|
/* root is the local leader */
|
|
/* calculate the number of steps necessary for this collective */
|
|
|
|
/* first thing we do is figure out where the root is in our new indexing */
|
|
/* find root in new indexing */
|
|
pseudo_root = inv_map[root];
|
|
/* see if this is larger than the stray */
|
|
if( pseudo_root >= stray ) {
|
|
/* then we need to define the proxy root, everyone can do this */
|
|
proxy_root = pseudo_root - cnt;
|
|
}else {
|
|
proxy_root = pseudo_root;
|
|
}
|
|
if( EXTRA_NODE == exchange_node->node_type ) {
|
|
|
|
/* signal arrival */
|
|
my_ctl_pointer->gflag = ready_flag;
|
|
|
|
/* send is done */
|
|
|
|
/* poll for data only if I am the root */
|
|
/* bump the ready flag */
|
|
ready_flag++;
|
|
if( root == my_rank ){
|
|
/* poll for data from proxy */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
/* get src data buffer */
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
/* in this case, you must block */
|
|
while(!IS_GDATA_READY(child_ctl_pointer,ready_flag,sequence_number)){
|
|
opal_progress();
|
|
}
|
|
/* receive the data from the proxy, aka pseudo-root */
|
|
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset),
|
|
(void *) ((unsigned char *) child_data_pointer+buff_offset)
|
|
,pack_len*group_size);
|
|
}
|
|
goto FINISHED;
|
|
|
|
|
|
} else if( 0 < exchange_node->n_extra_sources ) {
|
|
|
|
/* am a proxy, poll for pack_len data from extra */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
/* get src data buffer */
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
knt = 0;
|
|
for( i = 0; i < src; i++){
|
|
knt += list_connected[i];
|
|
}
|
|
/* must block here also */
|
|
while(!IS_GDATA_READY(child_ctl_pointer,ready_flag,sequence_number)){
|
|
opal_progress();
|
|
}
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset + pack_len*knt),
|
|
(void *) ((unsigned char *) child_data_pointer + buff_offset +
|
|
pack_len*knt), pack_len*list_connected[src]);
|
|
/*fprintf(stderr,"999 proxy received data from %d at offset %d of length %d\n",src,
|
|
buff_offset+pack_len*knt,pack_len*list_connected[src]);
|
|
*/
|
|
}
|
|
/* do some figuring */
|
|
|
|
total_peers = 0;
|
|
my_pow_k = pow_k;
|
|
k_temp1 = tree_order;
|
|
k_temp2 = 1;
|
|
for( i = 0; i < pow_k; i++) {
|
|
/* then find the base */
|
|
/*FIND_BASE(base,my_rank,i+1,tree_order);*/
|
|
FIND_BASE(base,exchange_node->reindex_myid,i+1,tree_order);
|
|
/* now find the adjusted base */
|
|
base_adj = base + (base + proxy_root)%k_temp1;
|
|
/* ok, now find out WHO is occupying this slot */
|
|
/*pseudo_base_adj = inv_map[base_adj];*/
|
|
pseudo_base_adj = reindex_map[base_adj];
|
|
|
|
if(my_rank == pseudo_base_adj ) {
|
|
/* then go ahead and poll for children's data */
|
|
for( j = 0; j < (tree_order - 1); j++ ) {
|
|
/* send phase
|
|
*/
|
|
/* get communication partner */
|
|
|
|
src = exchange_node->rank_exchanges[i][j];
|
|
/*fprintf(stderr,"comm_src %d\n",comm_src);*/
|
|
/* remember, if we have extra ranks, then we won't participate
|
|
* with a least one peer. Make a check
|
|
*/
|
|
if( src < 0 ){
|
|
continue;
|
|
}else{
|
|
total_peers++;
|
|
}
|
|
|
|
|
|
}
|
|
} else {
|
|
/* announce my arrival */
|
|
my_pow_k = i;
|
|
break;
|
|
}
|
|
|
|
k_temp1 = k_temp1*tree_order;
|
|
k_temp2 = k_temp2*tree_order;
|
|
}
|
|
|
|
if( 0 == my_pow_k ){
|
|
/* signal arrival */
|
|
my_ctl_pointer->gflag = ready_flag;
|
|
|
|
goto FINISHED;
|
|
}
|
|
|
|
|
|
|
|
/* start the k-nomial gather phase */
|
|
/* only "active ranks participate, once a rank has forwarded its data, it becomes inactive */
|
|
knt = 0;
|
|
while(active){
|
|
k_temp1 = tree_order;
|
|
k_temp2 = 1;
|
|
for( i = 0; i < my_pow_k; i++) {
|
|
|
|
/* then go ahead and poll for children's data */
|
|
for( j = 0; j < (tree_order - 1); j++ ) {
|
|
matched = 0;
|
|
/* send phase
|
|
*/
|
|
/* get communication partner */
|
|
|
|
src = exchange_node->rank_exchanges[i][j];
|
|
/*fprintf(stderr,"comm_src %d\n",comm_src);*/
|
|
/* remember, if we have extra ranks, then we won't participate
|
|
* with a least one peer. Make a check
|
|
*/
|
|
if( src < 0 ){
|
|
continue;
|
|
}
|
|
|
|
/*fprintf(stderr,"src %d\n",src);*/
|
|
child_data_pointer = data_buffs[src].payload;
|
|
child_ctl_pointer = data_buffs[src].ctl_struct;
|
|
|
|
/* if child has been marked, then skip */
|
|
if( sequence_number == child_ctl_pointer->mark ){
|
|
continue;
|
|
}
|
|
|
|
|
|
for( probe = 0; probe < cm->num_to_probe && (0 == matched); probe++){
|
|
if(IS_GDATA_READY(child_ctl_pointer,ready_flag,sequence_number)){
|
|
/* mark the child's pointer */
|
|
child_ctl_pointer->mark = sequence_number;
|
|
/* copy the data */
|
|
|
|
memcpy((void *) ((unsigned char *) data_addr + buff_offset +
|
|
exchange_node->payload_info[i][j].r_offset*pack_len),
|
|
(void *) ((unsigned char *) child_data_pointer + buff_offset +
|
|
exchange_node->payload_info[i][j].r_offset*pack_len),
|
|
exchange_node->payload_info[i][j].r_len*pack_len);
|
|
/*
|
|
fprintf(stderr,"999 receiving data from %d at offset %d of length %d\n",
|
|
exchange_node->rank_exchanges[i][j], buff_offset + exchange_node->payload_info[i][j].r_offset,
|
|
exchange_node->payload_info[i][j].r_len*pack_len);
|
|
*/
|
|
knt++;
|
|
if(knt == total_peers) {
|
|
/* this is the trick to break the root out,
|
|
* only the root should be able to satisfy this
|
|
*/
|
|
/*
|
|
fprintf(stderr,"hello n_actual is %d \n",knt);
|
|
fprintf(stderr,"hello n_actual_exch is %d \n",
|
|
exchange_node->n_actual_exchanges);
|
|
*/
|
|
opal_atomic_wmb ();
|
|
my_ctl_pointer->gflag = ready_flag;
|
|
|
|
goto LAST_STEP;
|
|
}
|
|
matched = 1;
|
|
}else{
|
|
opal_progress();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
k_temp1 = k_temp1*tree_order;
|
|
k_temp2 = k_temp2*tree_order;
|
|
}
|
|
LAST_STEP:
|
|
/* last step, proxies send full data back to the extra ranks */
|
|
if( 0 < exchange_node->n_extra_sources &&
|
|
root == exchange_node->rank_extra_sources_array[0]) {
|
|
/* regardless, I will bump the ready flag and set it in case someone is watching */
|
|
/* announce that data is ready */
|
|
ready_flag++;
|
|
my_ctl_pointer->gflag = ready_flag;
|
|
}
|
|
|
|
|
|
FINISHED:
|
|
|
|
/* debug
|
|
fprintf(stderr," my_ctl_pointer->index %d n of this type %d %u \n",
|
|
my_ctl_pointer->index,c_input_args->n_of_this_type_in_collective,getpid());
|
|
fflush(stderr);
|
|
end debug */
|
|
|
|
my_ctl_pointer->starting_flag_value+=1;
|
|
|
|
return BCOL_FN_COMPLETE;
|
|
}
|
|
#endif
|