64321acc22
opal does not always define MB. It is recommended that opal_atomic_[rw]mb is called instead. We will need to address the cases where these functions are no-ops on weak-memory ordered cpus. cmr=v1.7.5:ticket=trac:4158 This commit was SVN r30534. The following Trac tickets were found above: Ticket 4158 --> https://svn.open-mpi.org/trac/ompi/ticket/4158
540 строки
19 KiB
C
540 строки
19 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2009-2013 Oak Ridge National Laboratory. All rights reserved.
|
|
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
|
|
* Copyright (c) 2013-2014 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "ompi_config.h"
|
|
|
|
#include "ompi/include/ompi/constants.h"
|
|
#include "ompi/mca/coll/ml/coll_ml.h"
|
|
#include "ompi/mca/bcol/bcol.h"
|
|
#include "ompi/mca/bcol/basesmuma/bcol_basesmuma.h"
|
|
/*
|
|
#define IS_AGDATA_READY(peer, my_flag, my_sequence_number)\
|
|
(((peer)->sequence_number == (my_sequence_number) && \
|
|
(peer)->flags[ALLGATHER_FLAG][bcol_id] >= (my_flag) \
|
|
)? true : false )
|
|
*/
|
|
|
|
#define CALC_ACTIVE_REQUESTS(active_requests,peers, tree_order) \
|
|
do{ \
|
|
for( j = 0; j < (tree_order - 1); j++){ \
|
|
if( 0 > peers[j] ) { \
|
|
/* set the bit */ \
|
|
*active_requests ^= (1<<j); \
|
|
} \
|
|
} \
|
|
}while(0)
|
|
|
|
|
|
|
|
/*
|
|
* Recursive K-ing allgather
|
|
*/
|
|
|
|
/*
|
|
*
|
|
* Recurssive k-ing algorithm
|
|
* Example k=3 n=9
|
|
*
|
|
*
|
|
* Number of Exchange steps = log (basek) n
|
|
* Number of steps in exchange step = k (radix)
|
|
*
|
|
*/
|
|
int bcol_basesmuma_k_nomial_allgather_init(bcol_function_args_t *input_args,
|
|
struct coll_ml_function_t *const_args)
|
|
{
|
|
/* local variables */
|
|
int8_t flag_offset;
|
|
volatile int8_t ready_flag;
|
|
mca_bcol_basesmuma_module_t *bcol_module = (mca_bcol_basesmuma_module_t *) const_args->bcol_module;
|
|
netpatterns_k_exchange_node_t *exchange_node = &bcol_module->knomial_allgather_tree;
|
|
int group_size = bcol_module->colls_no_user_data.size_of_group;
|
|
int *list_connected = bcol_module->super.list_n_connected; /* critical for hierarchical colls */
|
|
int bcol_id = (int) bcol_module->super.bcol_id;
|
|
mca_bcol_basesmuma_component_t *cm = &mca_bcol_basesmuma_component;
|
|
uint32_t buffer_index = input_args->buffer_index;
|
|
int *active_requests =
|
|
&(bcol_module->ml_mem.nb_coll_desc[buffer_index].active_requests);
|
|
|
|
int *iteration = &bcol_module->ml_mem.nb_coll_desc[buffer_index].iteration;
|
|
int *status = &bcol_module->ml_mem.nb_coll_desc[buffer_index].status;
|
|
int leading_dim, buff_idx, idx;
|
|
|
|
int i, j, probe;
|
|
int knt;
|
|
int src;
|
|
int recv_offset, recv_len;
|
|
|
|
int pow_k, tree_order;
|
|
int max_requests = 0; /* important to initialize this */
|
|
|
|
int matched = 0;
|
|
int64_t sequence_number=input_args->sequence_num;
|
|
int my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
int buff_offset = bcol_module->super.hier_scather_offset;
|
|
|
|
|
|
int pack_len = input_args->count * input_args->dtype->super.size;
|
|
|
|
void *data_addr = (void*)(
|
|
(unsigned char *) input_args->sbuf +
|
|
(size_t) input_args->sbuf_offset);
|
|
volatile mca_bcol_basesmuma_payload_t *data_buffs;
|
|
volatile char *peer_data_pointer;
|
|
|
|
/* control structures */
|
|
volatile mca_bcol_basesmuma_header_t *my_ctl_pointer;
|
|
volatile mca_bcol_basesmuma_header_t *peer_ctl_pointer;
|
|
|
|
#if 0
|
|
fprintf(stderr,"entering p2p allgather pack_len %d\n",pack_len);
|
|
#endif
|
|
/* initialize the iteration counter */
|
|
buff_idx = input_args->src_desc->buffer_index;
|
|
leading_dim = bcol_module->colls_no_user_data.size_of_group;
|
|
idx=SM_ARRAY_INDEX(leading_dim,buff_idx,0);
|
|
data_buffs=(volatile mca_bcol_basesmuma_payload_t *)
|
|
bcol_module->colls_with_user_data.data_buffs+idx;
|
|
|
|
/* Set pointer to current proc ctrl region */
|
|
my_ctl_pointer = data_buffs[my_rank].ctl_struct;
|
|
/* NTH: copied from progress */
|
|
flag_offset = my_ctl_pointer->starting_flag_value[bcol_id];
|
|
|
|
/* initialize headers and ready flag */
|
|
BASESMUMA_HEADER_INIT(my_ctl_pointer, ready_flag, sequence_number, bcol_id);
|
|
|
|
/* initialize these */
|
|
*iteration = 0;
|
|
*active_requests = 0;
|
|
*status = 0;
|
|
|
|
/* k-nomial parameters */
|
|
tree_order = exchange_node->tree_order;
|
|
pow_k = exchange_node->log_tree_order;
|
|
|
|
/* calculate the maximum number of requests
|
|
* at each level each rank communicates with
|
|
* at most (k - 1) peers
|
|
* so if we set k - 1 bit fields in "max_requests", then
|
|
* we have max_request == 2^(k - 1) -1
|
|
*/
|
|
for(i = 0; i < (tree_order - 1); i++){
|
|
max_requests ^= (1<<i);
|
|
}
|
|
/* let's begin the collective, starting with extra ranks and their
|
|
* respective proxies
|
|
*/
|
|
|
|
if( EXTRA_NODE == exchange_node->node_type ) {
|
|
|
|
/* then I will signal to my proxy rank*/
|
|
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
|
|
ready_flag = flag_offset + 1 + pow_k + 2;
|
|
/* now, poll for completion */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
peer_data_pointer = data_buffs[src].payload;
|
|
peer_ctl_pointer = data_buffs[src].ctl_struct;
|
|
|
|
/* calculate the offset */
|
|
knt = 0;
|
|
for(i = 0; i < group_size; i++){
|
|
knt += list_connected[i];
|
|
}
|
|
for( i = 0; i < cm->num_to_probe && (0 == matched); i++ ) {
|
|
if(IS_PEER_READY(peer_ctl_pointer, ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
|
|
matched = 1;
|
|
/* we receive the entire message */
|
|
memcpy((void *)((unsigned char *) data_addr + buff_offset),
|
|
(void *) ((unsigned char *) peer_data_pointer + buff_offset),
|
|
knt * pack_len);
|
|
|
|
goto FINISHED;
|
|
}
|
|
|
|
}
|
|
|
|
/* save state and bail */
|
|
*iteration = -1;
|
|
return BCOL_FN_STARTED;
|
|
|
|
}else if ( 0 < exchange_node->n_extra_sources ) {
|
|
|
|
/* I am a proxy for someone */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
peer_data_pointer = data_buffs[src].payload;
|
|
peer_ctl_pointer = data_buffs[src].ctl_struct;
|
|
|
|
|
|
knt = 0;
|
|
for(i = 0; i < src; i++){
|
|
knt += list_connected[i];
|
|
}
|
|
|
|
/* probe for extra rank's arrival */
|
|
for( i = 0; i < cm->num_to_probe && ( 0 == matched); i++) {
|
|
if(IS_PEER_READY(peer_ctl_pointer,ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
|
|
matched = 1;
|
|
/* copy it in */
|
|
memcpy((void *)((unsigned char *) data_addr + knt*pack_len),
|
|
(void *) ((unsigned char *) peer_data_pointer + knt*pack_len),
|
|
pack_len * list_connected[src]);
|
|
goto MAIN_PHASE;
|
|
}
|
|
}
|
|
*status = ready_flag;
|
|
*iteration = -1;
|
|
return BCOL_FN_STARTED;
|
|
|
|
|
|
}
|
|
|
|
MAIN_PHASE:
|
|
/* bump the ready flag */
|
|
ready_flag++;
|
|
|
|
|
|
/* we start the recursive k - ing phase */
|
|
for( *iteration = 0; *iteration < pow_k; (*iteration)++) {
|
|
/* announce my arrival */
|
|
opal_atomic_wmb ();
|
|
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
|
|
/* calculate the number of active requests */
|
|
CALC_ACTIVE_REQUESTS(active_requests,exchange_node->rank_exchanges[*iteration],tree_order);
|
|
/* Now post the recv's */
|
|
for( j = 0; j < (tree_order - 1); j++ ) {
|
|
|
|
/* recv phase */
|
|
src = exchange_node->rank_exchanges[*iteration][j];
|
|
|
|
if( src < 0 ) {
|
|
/* then not a valid rank, continue */
|
|
|
|
continue;
|
|
}
|
|
|
|
peer_data_pointer = data_buffs[src].payload;
|
|
peer_ctl_pointer = data_buffs[src].ctl_struct;
|
|
if( !(*active_requests&(1<<j))) {
|
|
/* then the bit hasn't been set, thus this peer
|
|
* hasn't been processed at this level
|
|
*/
|
|
recv_offset = exchange_node->payload_info[*iteration][j].r_offset * pack_len;
|
|
recv_len = exchange_node->payload_info[*iteration][j].r_len * pack_len;
|
|
/* post the receive */
|
|
/* I am putting the probe loop as the inner most loop to achieve
|
|
* better temporal locality
|
|
*/
|
|
matched = 0;
|
|
for( probe = 0; probe < cm->num_to_probe && (0 == matched); probe++){
|
|
if(IS_PEER_READY(peer_ctl_pointer,ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
|
|
matched = 1;
|
|
/* set this request's bit */
|
|
*active_requests ^= (1<<j);
|
|
/* get the data */
|
|
memcpy((void *)((unsigned char *) data_addr + recv_offset),
|
|
(void *)((unsigned char *) peer_data_pointer + recv_offset),
|
|
recv_len);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
if( max_requests == *active_requests ){
|
|
/* bump the ready flag */
|
|
ready_flag++;
|
|
/*reset the active requests */
|
|
*active_requests = 0;
|
|
} else {
|
|
/* save state and hop out
|
|
* only the iteration needs to be tracked
|
|
*/
|
|
*status = my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id];
|
|
return BCOL_FN_STARTED;
|
|
}
|
|
}
|
|
|
|
/* bump the flag one more time for the extra rank */
|
|
ready_flag = flag_offset + 1 + pow_k + 2;
|
|
|
|
/* finish off the last piece, send the data back to the extra */
|
|
if( 0 < exchange_node->n_extra_sources ) {
|
|
/* simply announce my arrival */
|
|
opal_atomic_wmb ();
|
|
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
|
|
|
|
}
|
|
|
|
FINISHED:
|
|
/* bump this up */
|
|
my_ctl_pointer->starting_flag_value[bcol_id]++;
|
|
return BCOL_FN_COMPLETE;
|
|
}
|
|
|
|
|
|
/* allgather progress function */
|
|
|
|
int bcol_basesmuma_k_nomial_allgather_progress(bcol_function_args_t *input_args,
|
|
struct coll_ml_function_t *const_args)
|
|
{
|
|
|
|
|
|
/* local variables */
|
|
int8_t flag_offset;
|
|
uint32_t buffer_index = input_args->buffer_index;
|
|
volatile int8_t ready_flag;
|
|
mca_bcol_basesmuma_module_t *bcol_module = (mca_bcol_basesmuma_module_t *) const_args->bcol_module;
|
|
netpatterns_k_exchange_node_t *exchange_node = &bcol_module->knomial_allgather_tree;
|
|
int group_size = bcol_module->colls_no_user_data.size_of_group;
|
|
int *list_connected = bcol_module->super.list_n_connected; /* critical for hierarchical colls */
|
|
int bcol_id = (int) bcol_module->super.bcol_id;
|
|
mca_bcol_basesmuma_component_t *cm = &mca_bcol_basesmuma_component;
|
|
int *active_requests =
|
|
&(bcol_module->ml_mem.nb_coll_desc[buffer_index].active_requests);
|
|
|
|
int *iteration = &bcol_module->ml_mem.nb_coll_desc[buffer_index].iteration;
|
|
int *iter = iteration; /* double alias */
|
|
int *status = &bcol_module->ml_mem.nb_coll_desc[buffer_index].status;
|
|
int leading_dim, idx, buff_idx;
|
|
|
|
int i, j, probe;
|
|
int knt;
|
|
int src;
|
|
int recv_offset, recv_len;
|
|
int max_requests = 0; /* critical to set this */
|
|
int pow_k, tree_order;
|
|
|
|
int matched = 0;
|
|
int64_t sequence_number=input_args->sequence_num;
|
|
int my_rank = bcol_module->super.sbgp_partner_module->my_index;
|
|
int buff_offset = bcol_module->super.hier_scather_offset;
|
|
|
|
int pack_len = input_args->count * input_args->dtype->super.size;
|
|
|
|
void *data_addr = (void*)(
|
|
(unsigned char *) input_args->sbuf +
|
|
(size_t) input_args->sbuf_offset);
|
|
volatile mca_bcol_basesmuma_payload_t *data_buffs;
|
|
volatile char *peer_data_pointer;
|
|
|
|
/* control structures */
|
|
volatile mca_bcol_basesmuma_header_t *my_ctl_pointer;
|
|
volatile mca_bcol_basesmuma_header_t *peer_ctl_pointer;
|
|
|
|
#if 0
|
|
fprintf(stderr,"%d: entering sm allgather progress active requests %d iter %d ready_flag %d\n",my_rank,
|
|
*active_requests,*iter,*status);
|
|
#endif
|
|
buff_idx = input_args->src_desc->buffer_index;
|
|
leading_dim=bcol_module->colls_no_user_data.size_of_group;
|
|
idx=SM_ARRAY_INDEX(leading_dim,buff_idx,0);
|
|
data_buffs=(volatile mca_bcol_basesmuma_payload_t *)
|
|
bcol_module->colls_with_user_data.data_buffs+idx;
|
|
|
|
/* Set pointer to current proc ctrl region */
|
|
my_ctl_pointer = data_buffs[my_rank].ctl_struct;
|
|
|
|
/* increment the starting flag by one and return */
|
|
/* flag offset seems unnecessary here */
|
|
flag_offset = my_ctl_pointer->starting_flag_value[bcol_id];
|
|
ready_flag = *status;
|
|
my_ctl_pointer->sequence_number = sequence_number;
|
|
/* k-nomial parameters */
|
|
tree_order = exchange_node->tree_order;
|
|
pow_k = exchange_node->log_tree_order;
|
|
|
|
/* calculate the maximum number of requests
|
|
* at each level each rank communicates with
|
|
* at most (k - 1) peers
|
|
* so if we set k - 1 bit fields in "max_requests", then
|
|
* we have max_request == 2^(k - 1) -1
|
|
*/
|
|
for(i = 0; i < (tree_order - 1); i++){
|
|
max_requests ^= (1<<i);
|
|
}
|
|
|
|
/* let's begin the collective, starting with extra ranks and their
|
|
* respective proxies
|
|
*/
|
|
|
|
if( EXTRA_NODE == exchange_node->node_type ) {
|
|
|
|
/* If I'm in here, then I must be looking for data */
|
|
ready_flag = flag_offset + 1 + pow_k + 2;
|
|
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
peer_data_pointer = data_buffs[src].payload;
|
|
peer_ctl_pointer = data_buffs[src].ctl_struct;
|
|
|
|
/* calculate the offset */
|
|
knt = 0;
|
|
for(i = 0; i < group_size; i++){
|
|
knt += list_connected[i];
|
|
}
|
|
for( i = 0; i < cm->num_to_probe && (0 == matched); i++ ) {
|
|
if(IS_PEER_READY(peer_ctl_pointer, ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
|
|
matched = 1;
|
|
/* we receive the entire message */
|
|
memcpy((void *)((unsigned char *) data_addr + buff_offset),
|
|
(void *) ((unsigned char *) peer_data_pointer + buff_offset),
|
|
knt * pack_len);
|
|
|
|
goto FINISHED;
|
|
}
|
|
|
|
}
|
|
|
|
/* haven't found it, state is saved, bail out */
|
|
return BCOL_FN_STARTED;
|
|
|
|
}else if ( ( -1 == *iteration ) && (0 < exchange_node->n_extra_sources) ) {
|
|
|
|
/* I am a proxy for someone */
|
|
src = exchange_node->rank_extra_sources_array[0];
|
|
peer_data_pointer = data_buffs[src].payload;
|
|
peer_ctl_pointer = data_buffs[src].ctl_struct;
|
|
|
|
knt = 0;
|
|
for(i = 0; i < src; i++){
|
|
knt += list_connected[i];
|
|
}
|
|
|
|
/* probe for extra rank's arrival */
|
|
for( i = 0; i < cm->num_to_probe && ( 0 == matched); i++) {
|
|
if(IS_PEER_READY(peer_ctl_pointer,ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
|
|
matched = 1;
|
|
/* copy it in */
|
|
memcpy((void *)((unsigned char *) data_addr + knt*pack_len),
|
|
(void *) ((unsigned char *) peer_data_pointer + knt*pack_len),
|
|
pack_len * list_connected[src]);
|
|
|
|
ready_flag++;
|
|
*iteration = 0;
|
|
goto MAIN_PHASE;
|
|
}
|
|
}
|
|
return BCOL_FN_STARTED;
|
|
|
|
}
|
|
|
|
MAIN_PHASE:
|
|
|
|
/* start the recursive k - ing phase */
|
|
for( *iter=*iteration; *iter < pow_k; (*iter)++) {
|
|
/* I am ready at this level */
|
|
opal_atomic_wmb ();
|
|
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
|
|
if( 0 == *active_requests ) {
|
|
/* flip some bits, if we don't have active requests from a previous visit */
|
|
CALC_ACTIVE_REQUESTS(active_requests,exchange_node->rank_exchanges[*iter],tree_order);
|
|
}
|
|
for( j = 0; j < (tree_order - 1); j++ ) {
|
|
|
|
/* recv phase */
|
|
src = exchange_node->rank_exchanges[*iter][j];
|
|
|
|
if( src < 0 ) {
|
|
/* then not a valid rank, continue
|
|
*/
|
|
continue;
|
|
}
|
|
|
|
peer_data_pointer = data_buffs[src].payload;
|
|
peer_ctl_pointer = data_buffs[src].ctl_struct;
|
|
if( !(*active_requests&(1<<j))){
|
|
|
|
/* then this peer hasn't been processed at this level */
|
|
recv_offset = exchange_node->payload_info[*iter][j].r_offset * pack_len;
|
|
recv_len = exchange_node->payload_info[*iter][j].r_len * pack_len;
|
|
/* I am putting the probe loop as the inner most loop to achieve
|
|
* better temporal locality
|
|
*/
|
|
matched = 0;
|
|
for( probe = 0; probe < cm->num_to_probe && (0 == matched); probe++){
|
|
if(IS_PEER_READY(peer_ctl_pointer,ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
|
|
matched = 1;
|
|
/* flip the request's bit */
|
|
*active_requests ^= (1<<j);
|
|
/* copy the data */
|
|
memcpy((void *)((unsigned char *) data_addr + recv_offset),
|
|
(void *)((unsigned char *) peer_data_pointer + recv_offset),
|
|
recv_len);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
if( max_requests == *active_requests ){
|
|
/* bump the ready flag */
|
|
ready_flag++;
|
|
/* reset the active requests for the next level */
|
|
*active_requests = 0;
|
|
/* calculate the number of active requests
|
|
* logically makes sense to do it here. We don't
|
|
* want to inadvertantly flip a bit to zero that we
|
|
* set previously
|
|
*/
|
|
} else {
|
|
/* state is saved hop out
|
|
*/
|
|
*status = my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id];
|
|
return BCOL_FN_STARTED;
|
|
}
|
|
}
|
|
/* bump the flag one more time for the extra rank */
|
|
ready_flag = flag_offset + 1 + pow_k + 2;
|
|
|
|
/* finish off the last piece, send the data back to the extra */
|
|
if( 0 < exchange_node->n_extra_sources ) {
|
|
/* simply announce my arrival */
|
|
opal_atomic_wmb ();
|
|
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
|
|
|
|
}
|
|
|
|
FINISHED:
|
|
/* bump this up for others to see */
|
|
my_ctl_pointer->starting_flag_value[bcol_id]++;
|
|
return BCOL_FN_COMPLETE;
|
|
}
|
|
|
|
/* Register allreduce functions to the BCOL function table,
|
|
* so they can be selected
|
|
*/
|
|
int bcol_basesmuma_allgather_init(mca_bcol_base_module_t *super)
|
|
{
|
|
mca_bcol_base_coll_fn_comm_attributes_t comm_attribs;
|
|
mca_bcol_base_coll_fn_invoke_attributes_t inv_attribs;
|
|
|
|
comm_attribs.bcoll_type = BCOL_ALLGATHER;
|
|
comm_attribs.comm_size_min = 0;
|
|
comm_attribs.comm_size_max = 1024 * 1024;
|
|
comm_attribs.waiting_semantics = NON_BLOCKING;
|
|
|
|
inv_attribs.bcol_msg_min = 0;
|
|
inv_attribs.bcol_msg_max = 20000; /* range 1 */
|
|
|
|
inv_attribs.datatype_bitmap = 0xffffffff;
|
|
inv_attribs.op_types_bitmap = 0xffffffff;
|
|
|
|
comm_attribs.data_src = DATA_SRC_KNOWN;
|
|
|
|
mca_bcol_base_set_attributes(super, &comm_attribs, &inv_attribs,
|
|
bcol_basesmuma_k_nomial_allgather_init,
|
|
bcol_basesmuma_k_nomial_allgather_progress);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|