1
1

bcol/basesmuma: fix broken allgather algorithm

The algorithm was failing ibm/collective/allgather and iallgather. I
cleaned up the code to eliminate duplicate code paths and tracked the
issue down to an error in the way extra nodes in the knomial exchange
are handled. The new code is more compact and has been tested with up
to 64 ranks with the ibm test suite.

cmr=v1.8.1:reviewer=manjugv

This commit was SVN r31419.
Этот коммит содержится в:
Nathan Hjelm 2014-04-16 22:43:52 +00:00
родитель e125bbe347
Коммит a03b11c20e

Просмотреть файл

@ -53,15 +53,9 @@
int bcol_basesmuma_k_nomial_allgather_init(bcol_function_args_t *input_args,
struct mca_bcol_base_function_t *const_args)
{
/* local variables */
int8_t flag_offset;
volatile int8_t ready_flag;
mca_bcol_basesmuma_module_t *bcol_module = (mca_bcol_basesmuma_module_t *) const_args->bcol_module;
netpatterns_k_exchange_node_t *exchange_node = &bcol_module->knomial_allgather_tree;
int group_size = bcol_module->colls_no_user_data.size_of_group;
int *list_connected = bcol_module->super.list_n_connected; /* critical for hierarchical colls */
int bcol_id = (int) bcol_module->super.bcol_id;
mca_bcol_basesmuma_component_t *cm = &mca_bcol_basesmuma_component;
uint32_t buffer_index = input_args->buffer_index;
int *active_requests =
&(bcol_module->ml_mem.nb_coll_desc[buffer_index].active_requests);
@ -70,35 +64,16 @@ int bcol_basesmuma_k_nomial_allgather_init(bcol_function_args_t *input_args,
int *status = &bcol_module->ml_mem.nb_coll_desc[buffer_index].status;
int leading_dim, buff_idx, idx;
int i, j, probe;
int knt;
int src;
int recv_offset, recv_len;
int pow_k, tree_order;
int max_requests = 0; /* important to initialize this */
int matched = 0;
int64_t sequence_number=input_args->sequence_num;
int64_t sequence_number = input_args->sequence_num;
int my_rank = bcol_module->super.sbgp_partner_module->my_index;
int buff_offset = bcol_module->super.hier_scather_offset;
int pack_len = input_args->count * input_args->dtype->super.size;
void *data_addr = (void*)(
(unsigned char *) input_args->sbuf +
(size_t) input_args->sbuf_offset);
volatile mca_bcol_basesmuma_payload_t *data_buffs;
volatile char *peer_data_pointer;
/* control structures */
volatile mca_bcol_basesmuma_header_t *my_ctl_pointer;
volatile mca_bcol_basesmuma_header_t *peer_ctl_pointer;
#if 0
fprintf(stderr,"entering p2p allgather pack_len %d\n",pack_len);
#endif
volatile int8_t ready_flag;
/* initialize the iteration counter */
buff_idx = input_args->src_desc->buffer_index;
leading_dim = bcol_module->colls_no_user_data.size_of_group;
@ -108,178 +83,22 @@ int bcol_basesmuma_k_nomial_allgather_init(bcol_function_args_t *input_args,
/* Set pointer to current proc ctrl region */
my_ctl_pointer = data_buffs[my_rank].ctl_struct;
/* NTH: copied from progress */
flag_offset = my_ctl_pointer->starting_flag_value[bcol_id];
/* initialize headers and ready flag */
BASESMUMA_HEADER_INIT(my_ctl_pointer, ready_flag, sequence_number, bcol_id);
/* initialize these */
*iteration = 0;
*iteration = -1;
*active_requests = 0;
*status = 0;
*status = ready_flag;
/* k-nomial parameters */
tree_order = exchange_node->tree_order;
pow_k = exchange_node->log_tree_order;
/* calculate the maximum number of requests
* at each level each rank communicates with
* at most (k - 1) peers
* so if we set k - 1 bit fields in "max_requests", then
* we have max_request == 2^(k - 1) -1
*/
for(i = 0; i < (tree_order - 1); i++){
max_requests ^= (1<<i);
}
/* let's begin the collective, starting with extra ranks and their
* respective proxies
*/
if( EXTRA_NODE == exchange_node->node_type ) {
/* then I will signal to my proxy rank*/
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
ready_flag = flag_offset + 1 + pow_k + 2;
/* now, poll for completion */
src = exchange_node->rank_extra_sources_array[0];
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
/* calculate the offset */
knt = 0;
for(i = 0; i < group_size; i++){
knt += list_connected[i];
}
for( i = 0; i < cm->num_to_probe && (0 == matched); i++ ) {
if(IS_PEER_READY(peer_ctl_pointer, ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
matched = 1;
/* we receive the entire message */
memcpy((void *)((unsigned char *) data_addr + buff_offset),
(void *) ((unsigned char *) peer_data_pointer + buff_offset),
knt * pack_len);
goto FINISHED;
}
}
/* save state and bail */
*iteration = -1;
return BCOL_FN_STARTED;
}else if ( 0 < exchange_node->n_extra_sources ) {
/* I am a proxy for someone */
src = exchange_node->rank_extra_sources_array[0];
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
knt = 0;
for(i = 0; i < src; i++){
knt += list_connected[i];
}
/* probe for extra rank's arrival */
for( i = 0; i < cm->num_to_probe && ( 0 == matched); i++) {
if(IS_PEER_READY(peer_ctl_pointer,ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
matched = 1;
/* copy it in */
memcpy((void *)((unsigned char *) data_addr + knt*pack_len),
(void *) ((unsigned char *) peer_data_pointer + knt*pack_len),
pack_len * list_connected[src]);
goto MAIN_PHASE;
}
}
*status = ready_flag;
*iteration = -1;
return BCOL_FN_STARTED;
}
MAIN_PHASE:
/* bump the ready flag */
ready_flag++;
/* we start the recursive k - ing phase */
for( *iteration = 0; *iteration < pow_k; (*iteration)++) {
/* announce my arrival */
if (EXTRA_NODE == exchange_node->node_type) {
/* I am ready at this level */
opal_atomic_wmb ();
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
/* calculate the number of active requests */
CALC_ACTIVE_REQUESTS(active_requests,exchange_node->rank_exchanges[*iteration],tree_order);
/* Now post the recv's */
for( j = 0; j < (tree_order - 1); j++ ) {
/* recv phase */
src = exchange_node->rank_exchanges[*iteration][j];
if( src < 0 ) {
/* then not a valid rank, continue */
continue;
}
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
if( !(*active_requests&(1<<j))) {
/* then the bit hasn't been set, thus this peer
* hasn't been processed at this level
*/
recv_offset = exchange_node->payload_info[*iteration][j].r_offset * pack_len;
recv_len = exchange_node->payload_info[*iteration][j].r_len * pack_len;
/* post the receive */
/* I am putting the probe loop as the inner most loop to achieve
* better temporal locality
*/
matched = 0;
for( probe = 0; probe < cm->num_to_probe && (0 == matched); probe++){
if(IS_PEER_READY(peer_ctl_pointer,ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
matched = 1;
/* set this request's bit */
*active_requests ^= (1<<j);
/* get the data */
memcpy((void *)((unsigned char *) data_addr + recv_offset),
(void *)((unsigned char *) peer_data_pointer + recv_offset),
recv_len);
}
}
}
}
if( max_requests == *active_requests ){
/* bump the ready flag */
ready_flag++;
/*reset the active requests */
*active_requests = 0;
} else {
/* save state and hop out
* only the iteration needs to be tracked
*/
*status = my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id];
return BCOL_FN_STARTED;
}
}
/* bump the flag one more time for the extra rank */
ready_flag = flag_offset + 1 + pow_k + 2;
/* finish off the last piece, send the data back to the extra */
if( 0 < exchange_node->n_extra_sources ) {
/* simply announce my arrival */
opal_atomic_wmb ();
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
}
FINISHED:
/* bump this up */
my_ctl_pointer->starting_flag_value[bcol_id]++;
return BCOL_FN_COMPLETE;
return bcol_basesmuma_k_nomial_allgather_progress (input_args, const_args);
}
@ -288,8 +107,6 @@ FINISHED:
int bcol_basesmuma_k_nomial_allgather_progress(bcol_function_args_t *input_args,
struct mca_bcol_base_function_t *const_args)
{
/* local variables */
int8_t flag_offset;
uint32_t buffer_index = input_args->buffer_index;
@ -304,7 +121,6 @@ int bcol_basesmuma_k_nomial_allgather_progress(bcol_function_args_t *input_args,
&(bcol_module->ml_mem.nb_coll_desc[buffer_index].active_requests);
int *iteration = &bcol_module->ml_mem.nb_coll_desc[buffer_index].iteration;
int *iter = iteration; /* double alias */
int *status = &bcol_module->ml_mem.nb_coll_desc[buffer_index].status;
int leading_dim, idx, buff_idx;
@ -315,10 +131,8 @@ int bcol_basesmuma_k_nomial_allgather_progress(bcol_function_args_t *input_args,
int max_requests = 0; /* critical to set this */
int pow_k, tree_order;
int matched = 0;
int64_t sequence_number=input_args->sequence_num;
int my_rank = bcol_module->super.sbgp_partner_module->my_index;
int buff_offset = bcol_module->super.hier_scather_offset;
int pack_len = input_args->count * input_args->dtype->super.size;
@ -333,9 +147,10 @@ int bcol_basesmuma_k_nomial_allgather_progress(bcol_function_args_t *input_args,
volatile mca_bcol_basesmuma_header_t *peer_ctl_pointer;
#if 0
fprintf(stderr,"%d: entering sm allgather progress active requests %d iter %d ready_flag %d\n",my_rank,
*active_requests,*iter,*status);
fprintf(stderr,"%d: entering sm allgather progress active requests %d iter %d ready_flag %d\n", my_rank,
*active_requests, *iteration, *status);
#endif
buff_idx = input_args->src_desc->buffer_index;
leading_dim=bcol_module->colls_no_user_data.size_of_group;
idx=SM_ARRAY_INDEX(leading_dim,buff_idx,0);
@ -368,114 +183,111 @@ int bcol_basesmuma_k_nomial_allgather_progress(bcol_function_args_t *input_args,
* respective proxies
*/
if( EXTRA_NODE == exchange_node->node_type ) {
if (OPAL_UNLIKELY(-1 == *iteration)) {
if (EXTRA_NODE == exchange_node->node_type) {
/* If I'm in here, then I must be looking for data */
ready_flag = flag_offset + 1 + pow_k + 2;
/* If I'm in here, then I must be looking for data */
ready_flag = flag_offset + 1 + pow_k + 2;
src = exchange_node->rank_extra_sources_array[0];
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
src = exchange_node->rank_extra_sources_array[0];
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
/* calculate the offset */
knt = 0;
for(i = 0; i < group_size; i++){
knt += list_connected[i];
}
for( i = 0; i < cm->num_to_probe && (0 == matched); i++ ) {
if(IS_PEER_READY(peer_ctl_pointer, ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
matched = 1;
/* we receive the entire message */
memcpy((void *)((unsigned char *) data_addr + buff_offset),
(void *) ((unsigned char *) peer_data_pointer + buff_offset),
knt * pack_len);
goto FINISHED;
/* calculate the count */
for (i = 0, knt = 0 ; i < group_size ; ++i){
knt += list_connected[i];
}
}
for (i = 0 ; i < cm->num_to_probe ; ++i) {
if (IS_PEER_READY(peer_ctl_pointer, ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)) {
/* we receive the entire message */
opal_atomic_mb ();
memcpy (data_addr, (void *) peer_data_pointer, knt * pack_len);
/* haven't found it, state is saved, bail out */
return BCOL_FN_STARTED;
goto FINISHED;
}
}
}else if ( ( -1 == *iteration ) && (0 < exchange_node->n_extra_sources) ) {
/* haven't found it, state is saved, bail out */
return BCOL_FN_STARTED;
} else if (0 < exchange_node->n_extra_sources) {
/* I am a proxy for someone */
src = exchange_node->rank_extra_sources_array[0];
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
/* I am a proxy for someone */
src = exchange_node->rank_extra_sources_array[0];
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
/* calculate the offset */
for (i = 0, knt = 0 ; i < src ; ++i){
knt += list_connected[i];
}
knt = 0;
for(i = 0; i < src; i++){
knt += list_connected[i];
}
/* probe for extra rank's arrival */
for (i = 0 ; i < cm->num_to_probe ; ++i) {
if (IS_PEER_READY(peer_ctl_pointer, ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)) {
opal_atomic_mb ();
/* copy it in */
memcpy ((void *) ((uintptr_t) data_addr + knt * pack_len),
(void *) ((uintptr_t) peer_data_pointer + knt * pack_len),
pack_len * list_connected[src]);
break;
}
}
/* probe for extra rank's arrival */
for( i = 0; i < cm->num_to_probe && ( 0 == matched); i++) {
if(IS_PEER_READY(peer_ctl_pointer,ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
matched = 1;
/* copy it in */
memcpy((void *)((unsigned char *) data_addr + knt*pack_len),
(void *) ((unsigned char *) peer_data_pointer + knt*pack_len),
pack_len * list_connected[src]);
ready_flag++;
*iteration = 0;
goto MAIN_PHASE;
if (i == cm->num_to_probe) {
return BCOL_FN_STARTED;
}
}
return BCOL_FN_STARTED;
/* bump the ready flag to indicate extra node exchange complete */
++ready_flag;
*iteration = 0;
}
MAIN_PHASE:
/* start the recursive k - ing phase */
for( *iter=*iteration; *iter < pow_k; (*iter)++) {
for (i = *iteration ; i < pow_k ; ++i) {
/* I am ready at this level */
opal_atomic_wmb ();
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
if( 0 == *active_requests ) {
if (0 == *active_requests) {
/* flip some bits, if we don't have active requests from a previous visit */
CALC_ACTIVE_REQUESTS(active_requests,exchange_node->rank_exchanges[*iter],tree_order);
CALC_ACTIVE_REQUESTS(active_requests,exchange_node->rank_exchanges[i],tree_order);
}
for( j = 0; j < (tree_order - 1); j++ ) {
for (j = 0; j < (tree_order - 1); ++j) {
/* recv phase */
src = exchange_node->rank_exchanges[*iter][j];
src = exchange_node->rank_exchanges[i][j];
if( src < 0 ) {
/* then not a valid rank, continue
*/
if (src < 0) {
/* then not a valid rank, continue */
continue;
}
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
if( !(*active_requests&(1<<j))){
if (!(*active_requests&(1<<j))) {
/* then this peer hasn't been processed at this level */
recv_offset = exchange_node->payload_info[*iter][j].r_offset * pack_len;
recv_len = exchange_node->payload_info[*iter][j].r_len * pack_len;
peer_data_pointer = data_buffs[src].payload;
peer_ctl_pointer = data_buffs[src].ctl_struct;
recv_offset = exchange_node->payload_info[i][j].r_offset * pack_len;
recv_len = exchange_node->payload_info[i][j].r_len * pack_len;
/* I am putting the probe loop as the inner most loop to achieve
* better temporal locality
*/
matched = 0;
for( probe = 0; probe < cm->num_to_probe && (0 == matched); probe++){
if(IS_PEER_READY(peer_ctl_pointer,ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)){
matched = 1;
for (probe = 0 ; probe < cm->num_to_probe ; ++probe) {
if (IS_PEER_READY(peer_ctl_pointer, ready_flag, sequence_number, ALLGATHER_FLAG, bcol_id)) {
/* flip the request's bit */
*active_requests ^= (1<<j);
/* copy the data */
memcpy((void *)((unsigned char *) data_addr + recv_offset),
(void *)((unsigned char *) peer_data_pointer + recv_offset),
recv_len);
break;
}
}
}
}
if( max_requests == *active_requests ){
/* bump the ready flag */
ready_flag++;
@ -490,9 +302,11 @@ MAIN_PHASE:
/* state is saved hop out
*/
*status = my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id];
*iteration = i;
return BCOL_FN_STARTED;
}
}
/* bump the flag one more time for the extra rank */
ready_flag = flag_offset + 1 + pow_k + 2;
@ -501,7 +315,6 @@ MAIN_PHASE:
/* simply announce my arrival */
opal_atomic_wmb ();
my_ctl_pointer->flags[ALLGATHER_FLAG][bcol_id] = ready_flag;
}
FINISHED: