1
1
openmpi/ompi/mca/bcol/ptpcoll/bcol_ptpcoll_allgather.c
Pavel Shamis 3a683419c5 Fixing broken dependency between ML/BCOLS
This is hot-fix patch for the issue reported by Ralph. 
In future we plan to restructure ml data structure layout.

Tested by Nathan.

cmr=v1.7.5:ticket=trac:4158

This commit was SVN r30619.

The following Trac tickets were found above:
  Ticket 4158 --> https://svn.open-mpi.org/trac/ompi/ticket/4158
2014-02-07 19:15:45 +00:00

606 строки
21 KiB
C

/*
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/include/ompi/constants.h"
#include "ompi/mca/bcol/bcol.h"
#include "bcol_ptpcoll_allreduce.h"
/*
* Recursive K-ing allgather
*/
/*
*
* Recurssive k-ing algorithm
* Example k=3 n=9
*
*
* Number of Exchange steps = log (basek) n
* Number of steps in exchange step = k (radix)
*
*/
int bcol_ptpcoll_k_nomial_allgather_init(bcol_function_args_t *input_args,
struct mca_bcol_base_function_t *const_args)
{
/* local variables */
mca_bcol_ptpcoll_module_t *ptpcoll_module = (mca_bcol_ptpcoll_module_t *) const_args->bcol_module;
int *group_list = ptpcoll_module->super.sbgp_partner_module->group_list;
netpatterns_k_exchange_node_t *exchange_node = &ptpcoll_module->knomial_allgather_tree;
int my_group_index = ptpcoll_module->super.sbgp_partner_module->my_index;
int group_size = ptpcoll_module->group_size;
int *list_connected = ptpcoll_module->super.list_n_connected; /* critical for hierarchical colls */
int tag;
int i, j;
int knt;
int comm_src, comm_dst, src, dst;
int recv_offset, recv_len;
int send_offset, send_len;
uint32_t buffer_index = input_args->buffer_index;
int pow_k, tree_order;
int rc = OMPI_SUCCESS;
ompi_communicator_t* comm = ptpcoll_module->super.sbgp_partner_module->group_comm;
ompi_request_t **requests =
ptpcoll_module->ml_mem.ml_buf_desc[buffer_index].requests;
int *active_requests =
&(ptpcoll_module->ml_mem.ml_buf_desc[buffer_index].active_requests);
int completed = 0; /* initialized */
void *data_buffer = (void*)(
(unsigned char *) input_args->sbuf +
(size_t) input_args->sbuf_offset);
int pack_len = input_args->count * input_args->dtype->super.size;
#if 0
fprintf(stderr,"entering p2p allgather pack_len %d. exchange node: %p\n",pack_len, exchange_node);
#endif
/* initialize the iteration counter */
int *iteration = &ptpcoll_module->ml_mem.ml_buf_desc[buffer_index].iteration;
*iteration = 0;
/* reset active request counter */
*active_requests = 0;
/* keep tag within the limit supported by the pml */
tag = (PTPCOLL_TAG_OFFSET + input_args->sequence_num * PTPCOLL_TAG_FACTOR) & (ptpcoll_module->tag_mask);
/* mark this as a collective tag, to avoid conflict with user-level flags */
tag = -tag;
/* k-nomial parameters */
tree_order = exchange_node->tree_order;
pow_k = exchange_node->log_tree_order;
/* let's begin the collective, starting with extra ranks and their
* respective proxies
*/
if( EXTRA_NODE == exchange_node->node_type ) {
/* then I will send to my proxy rank*/
dst = exchange_node->rank_extra_sources_array[0];
/* find rank in the communicator */
comm_dst = group_list[dst];
/* now I need to calculate my own offset */
knt = 0;
for (i = 0 ; i < my_group_index; i++){
knt += list_connected[i];
}
/* send the data to my proxy */
rc = MCA_PML_CALL(isend((void *) ( (unsigned char *) data_buffer +
knt*pack_len),
pack_len * list_connected[my_group_index],
MPI_BYTE,
comm_dst, tag,
MCA_PML_BASE_SEND_STANDARD, comm,
&(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10,("Failed to isend data"));
return OMPI_ERROR;
}
++(*active_requests);
/* now I go ahead and post the receive from my proxy */
comm_src = comm_dst;
knt = 0;
for( i =0; i < group_size; i++){
knt += list_connected[i];
}
rc = MCA_PML_CALL(irecv(data_buffer,
knt * pack_len,
MPI_BYTE,
comm_src,
tag , comm, &(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10, ("Failed to post ireceive "));
return OMPI_ERROR;
}
++(*active_requests);
/* poll for completion */
/* this polls internally */
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(completed){
/* go to buffer release */
goto FINISHED;
}else{
/* save state and hop out
* nothing to save here
*/
return ((OMPI_SUCCESS != rc) ? OMPI_ERROR : BCOL_FN_STARTED);
}
}else if ( 0 < exchange_node->n_extra_sources ) {
/* I am a proxy for someone */
src = exchange_node->rank_extra_sources_array[0];
/* find the rank in the communicator */
comm_src = group_list[src];
knt = 0;
for(i = 0; i < src; i++){
knt += list_connected[i];
}
/* post the receive */
rc = MCA_PML_CALL(irecv((void *) ( (unsigned char *) data_buffer
+ knt*pack_len),
pack_len * list_connected[src],
MPI_BYTE,
comm_src,
tag , comm, &(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10, ("Failed to post ireceive "));
return OMPI_ERROR;
}
++(*active_requests);
/* poll for completion */
/* this routine polls internally */
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(!completed){
/* save state and hop out
* We really do need to block here so set
* the iteration to -1 indicating we need to
* finish this part first
*/
*iteration = -1;
return ((OMPI_SUCCESS != rc )? OMPI_ERROR : BCOL_FN_STARTED);
}
}
/* we start the recursive k - ing phase */
/* fprintf(stderr,"tree order %d pow_k %d \n",tree_order,pow_k);*/
for( i = 0; i < pow_k; i++) {
for(j = 0; j < (tree_order - 1); j++) {
/* send phase */
dst = exchange_node->rank_exchanges[i][j];
if( dst < 0 ){
continue;
}
comm_dst = group_list[dst];
send_offset = exchange_node->payload_info[i][j].s_offset * pack_len;
send_len = exchange_node->payload_info[i][j].s_len * pack_len;
/* debug print */
/* fprintf(stderr,"sending %d bytes to rank %d at offset %d\n",send_len, */
/* comm_dst,send_offset); */
rc = MCA_PML_CALL(isend((void*)((unsigned char *) data_buffer +
send_offset),
send_len,
MPI_BYTE,
comm_dst, tag,
MCA_PML_BASE_SEND_STANDARD, comm,
&(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10,("Failed to isend data"));
return OMPI_ERROR;
}
++(*active_requests);
/* sends are posted */
}
/* Now post the recv's */
for( j = 0; j < (tree_order - 1); j++ ) {
/* recv phase */
src = exchange_node->rank_exchanges[i][j];
if( src < 0 ) {
continue;
}
comm_src = group_list[src];
recv_offset = exchange_node->payload_info[i][j].r_offset * pack_len;
recv_len = exchange_node->payload_info[i][j].r_len * pack_len;
/* debug print */
/* fprintf(stderr,"recving %d bytes to rank %d at offset %d\n",recv_len, */
/* comm_src,recv_offset); */
/* post the receive */
rc = MCA_PML_CALL(irecv((void *) ((unsigned char *) data_buffer +
recv_offset),
recv_len,
MPI_BYTE,
comm_src,
tag, comm, &(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10, ("Failed to post ireceive "));
return OMPI_ERROR;
}
++(*active_requests);
}
/* finished all send/recv's now poll for completion before
* continuing to next iteration
*/
completed = 0;
/* polling internally on 2*(k - 1) requests */
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(!completed){
/* save state and hop out
* only the iteration needs to be tracked
*/
*iteration = i; /* need to pick up here */
return ((OMPI_SUCCESS != rc) ? OMPI_ERROR : BCOL_FN_STARTED);
}
}
/* finish off the last piece, send the data back to the extra */
if( 0 < exchange_node->n_extra_sources ) {
dst = exchange_node->rank_extra_sources_array[0];
comm_dst = group_list[dst];
knt = 0;
for( i = 0; i < group_size; i++){
knt += list_connected[i];
}
/* debug print */
/*
fprintf(stderr,"sending %d bytes to extra %d \n",pack_len*knt,comm_dst);
*/
rc = MCA_PML_CALL(isend(data_buffer,
pack_len * knt,
MPI_BYTE,
comm_dst, tag,
MCA_PML_BASE_SEND_STANDARD, comm,
&(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10,("Failed to isend data"));
return OMPI_ERROR;
}
++(*active_requests);
/* probe for send completion */
completed = 0;
/* polling internally */
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(!completed){
/* save state and hop out
* We really do need to block here so set
* the iteration to pow_k +1 indicating we need to
* finish progressing the last part
*/
*iteration = pow_k + 1;
return (OMPI_SUCCESS != rc ? OMPI_ERROR : BCOL_FN_STARTED);
}
}
FINISHED:
/* recycle buffer if need be */
return BCOL_FN_COMPLETE;
}
/* allgather progress function */
int bcol_ptpcoll_k_nomial_allgather_progress(bcol_function_args_t *input_args,
struct mca_bcol_base_function_t *const_args)
{
/* local variables */
mca_bcol_ptpcoll_module_t *ptpcoll_module = (mca_bcol_ptpcoll_module_t *) const_args->bcol_module;
int *group_list = ptpcoll_module->super.sbgp_partner_module->group_list;
netpatterns_k_exchange_node_t *exchange_node = &ptpcoll_module->knomial_allgather_tree;
int group_size = ptpcoll_module->group_size;
int *list_connected = ptpcoll_module->super.list_n_connected; /* critical for hierarchical colls */
int tag;
int i, j;
int knt;
int comm_src, comm_dst, src, dst;
int recv_offset, recv_len;
int send_offset, send_len;
uint32_t buffer_index = input_args->buffer_index;
int pow_k, tree_order;
int rc = OMPI_SUCCESS;
ompi_communicator_t* comm = ptpcoll_module->super.sbgp_partner_module->group_comm;
ompi_request_t **requests =
ptpcoll_module->ml_mem.ml_buf_desc[buffer_index].requests;
int *active_requests =
&(ptpcoll_module->ml_mem.ml_buf_desc[buffer_index].active_requests);
int completed = 0; /* initialized */
void *data_buffer = (void*)(
(unsigned char *) input_args->sbuf +
(size_t) input_args->sbuf_offset);
int pack_len = input_args->count * input_args->dtype->super.size;
/* initialize the counter */
int *iteration = &ptpcoll_module->ml_mem.ml_buf_desc[buffer_index].iteration;
#if 0
fprintf(stderr,"%d: entering p2p allgather progress AR: %d iter: %d\n",my_group_index,*active_requests,
*iteration);
#endif
/* keep tag within the limit supported by the pml */
tag = (PTPCOLL_TAG_OFFSET + input_args->sequence_num * PTPCOLL_TAG_FACTOR) & (ptpcoll_module->tag_mask);
/* mark this as a collective tag, to avoid conflict with user-level flags */
tag = -tag;
/* k-nomial tree parameters */
tree_order = exchange_node->tree_order;
pow_k = exchange_node->log_tree_order;
/* let's begin the collective, starting with extra ranks and their
* respective proxies
*/
if( EXTRA_NODE == exchange_node->node_type ) {
/* debug print */
/*fprintf(stderr,"666 \n");*/
/* simply poll for completion */
completed = 0;
/* polling internally */
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(completed){
/* go to buffer release */
goto FINISHED;
}else{
/* save state and hop out
* nothing to save here
*/
return ((OMPI_SUCCESS != rc) ? OMPI_ERROR : BCOL_FN_STARTED);
}
}else if ( 0 < exchange_node->n_extra_sources && (-1 == *iteration)) {
/* I am a proxy for someone */
/* Simply poll for completion */
completed = 0;
/* polling internally */
assert( 1 == *active_requests);
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(!completed){
/* save state and hop out
* We really do need to block here so set
* the iteration to -1 indicating we need to
* finish this part first
*/
(*iteration) = -1;
return ((OMPI_SUCCESS != rc) ? OMPI_ERROR : BCOL_FN_STARTED);
}
/* I may now proceed to the recursive k - ing phase */
*iteration = 0;
}
/* the ordering here between the extra rank and progress active requests
* is critical
*/
/* extra rank */
if( (pow_k + 1) == *iteration ){
/* finish off the last one */
goto PROGRESS_EXTRA;
}
/* active requests must be completed before continuing on to
* recursive k -ing step
* CAREFUL HERE, IT THIS REALLY WHAT YOU WANT??
*/
if( 0 < (*active_requests) ) {
/* then we have something to progress from last step */
/* debug print */
/*
fprintf(stderr,"%d: entering progress AR: %d iter: %d\n",my_group_index,*active_requests,
*iteration);
*/
completed = 0;
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(!completed){
/* save state and hop out
* state hasn't changed
*/
return ((MPI_SUCCESS != rc) ? OMPI_ERROR : BCOL_FN_STARTED);
}
++(*iteration);
}
/* we start the recursive k - ing phase */
for( i = *iteration; i < pow_k; i++) {
/* nothing changes here */
for(j = 0; j < (tree_order - 1); j++) {
/* send phase */
dst = exchange_node->rank_exchanges[i][j];
if( dst < 0 ){
continue;
}
comm_dst = group_list[dst];
send_offset = exchange_node->payload_info[i][j].s_offset * pack_len;
send_len = exchange_node->payload_info[i][j].s_len * pack_len;
rc = MCA_PML_CALL(isend((void*)((unsigned char *) data_buffer +
send_offset),
send_len,
MPI_BYTE,
comm_dst, tag,
MCA_PML_BASE_SEND_STANDARD, comm,
&(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10,("Failed to isend data"));
return OMPI_ERROR;
}
++(*active_requests);
/* sends are posted */
}
/* Now post the recv's */
for( j = 0; j < (tree_order - 1); j++ ) {
/* recv phase */
src = exchange_node->rank_exchanges[i][j];
if( src < 0 ) {
continue;
}
comm_src = group_list[src];
recv_offset = exchange_node->payload_info[i][j].r_offset * pack_len;
recv_len = exchange_node->payload_info[i][j].r_len * pack_len;
/* post the receive */
rc = MCA_PML_CALL(irecv((void *) ((unsigned char *) data_buffer +
recv_offset),
recv_len,
MPI_BYTE,
comm_src,
tag, comm, &(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10, ("Failed to post ireceive "));
return OMPI_ERROR;
}
++(*active_requests);
}
/* finished all send/recv's now poll for completion before
* continuing to next iteration
*/
completed = 0;
/* make this non-blocking */
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(!completed){
/* save state and hop out
* We really do need to block here so set
* the iteration to -1 indicating we need to
* finish this part first
*/
*iteration = i; /* need to pick up here */
return ((OMPI_SUCCESS != rc) ? OMPI_ERROR : BCOL_FN_STARTED);
}
}
/* finish off the last piece, send the data back to the extra */
if( 0 < exchange_node->n_extra_sources ) {
dst = exchange_node->rank_extra_sources_array[0];
comm_dst = group_list[dst];
knt = 0;
for( i = 0; i < group_size; i++){
knt += list_connected[i];
}
rc = MCA_PML_CALL(isend(data_buffer,
pack_len * knt,
MPI_BYTE,
comm_dst, tag,
MCA_PML_BASE_SEND_STANDARD, comm,
&(requests[*active_requests])));
if( OMPI_SUCCESS != rc ) {
PTPCOLL_VERBOSE(10,("Failed to isend data"));
return OMPI_ERROR;
}
++(*active_requests);
/* probe for send completion */
completed = 0;
/* make this non-blocking */
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(!completed){
/* save state and hop out
* We really do need to block here so set
* the iteration to pow_k +1 indicating we need to
* finish progressing the last part
*/
*iteration = pow_k + 1;
return ((OMPI_SUCCESS != rc) ? OMPI_ERROR : BCOL_FN_STARTED);
}
}
/* folks need to skip this unless they really are the proxy
* reentering with the intent of progressing the final send
*/
goto FINISHED;
PROGRESS_EXTRA:
/* probe for send completion */
completed = 0;
/* make this non-blocking */
completed = mca_bcol_ptpcoll_test_all_for_match(active_requests, requests, &rc);
if(!completed){
/* save state and hop out
* We really do need to block here so set
* the iteration to pow_k +1 indicating we need to
* finish progressing the last part
*/
return ((OMPI_SUCCESS != rc) ? OMPI_ERROR : BCOL_FN_STARTED);
}
FINISHED:
/* recycle buffer if need be */
return BCOL_FN_COMPLETE;
}
/*
* Register allreduce functions to the BCOL function table,
* so they can be selected
*/
int bcol_ptpcoll_allgather_init(mca_bcol_base_module_t *super)
{
mca_bcol_base_coll_fn_comm_attributes_t comm_attribs;
mca_bcol_base_coll_fn_invoke_attributes_t inv_attribs;
comm_attribs.bcoll_type = BCOL_ALLGATHER;
comm_attribs.comm_size_min = 0;
comm_attribs.comm_size_max = 1024 * 1024;
comm_attribs.waiting_semantics = NON_BLOCKING;
inv_attribs.bcol_msg_min = 0;
inv_attribs.bcol_msg_max = 20000; /* range 1 */
inv_attribs.datatype_bitmap = 0xffffffff;
inv_attribs.op_types_bitmap = 0xffffffff;
comm_attribs.data_src = DATA_SRC_KNOWN;
mca_bcol_base_set_attributes(super, &comm_attribs, &inv_attribs,
bcol_ptpcoll_k_nomial_allgather_init,
bcol_ptpcoll_k_nomial_allgather_progress);
comm_attribs.data_src = DATA_SRC_KNOWN;
inv_attribs.bcol_msg_min = 10000000;
inv_attribs.bcol_msg_max = 10485760; /* range 4 */
mca_bcol_base_set_attributes(super, &comm_attribs, &inv_attribs,
bcol_ptpcoll_k_nomial_allgather_init,
bcol_ptpcoll_k_nomial_allgather_progress);
return OMPI_SUCCESS;
}