diff --git a/ompi/mca/coll/sm2/coll_sm2.h b/ompi/mca/coll/sm2/coll_sm2.h index 769addd160..646ce5023c 100644 --- a/ompi/mca/coll/sm2/coll_sm2.h +++ b/ompi/mca/coll/sm2/coll_sm2.h @@ -474,6 +474,31 @@ BEGIN_C_DECLS */ size_t short_message_size; + /* + * flag indicating if have socket layout for the procs + */ + int have_socket_information; + + /* + * socket index + */ + int *socket_index; + + /* + * number of processes per socket + */ + int *n_procs_per_socket; + + /* + * sockets in use + */ + int *sockets_in_use; + + /* + * number of processes per socket + */ + int **list_of_ranks_per_socket; + /* * function table for variants of a given collective * function. diff --git a/ompi/mca/coll/sm2/coll_sm2_module.c b/ompi/mca/coll/sm2/coll_sm2_module.c index f1092de987..6ae5f121e4 100644 --- a/ompi/mca/coll/sm2/coll_sm2_module.c +++ b/ompi/mca/coll/sm2/coll_sm2_module.c @@ -39,6 +39,9 @@ #include "ompi/mca/dpm/dpm.h" #include "orte/mca/rml/rml.h" #include "orte/util/proc_info.h" +#include "orte/util/name_fns.h" +#include "opal/mca/paffinity/base/base.h" +#include "orte/mca/grpcomm/grpcomm.h" /* * Local functions @@ -672,7 +675,8 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) { /* local variables */ mca_coll_sm2_module_t *sm_module; - int i,j,group_size,ret; + int i,j,group_size,ret,proc; + int my_socket_index,num_procs,socket,socket_tmp,core,n_sockets,cnt; size_t alignment,size; size_t tot_size_mem_banks; size_t ctl_memory_per_proc_per_segment; @@ -1149,6 +1153,215 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) /* set the switch-over parameter */ sm_module->short_message_size=mca_coll_sm2_component.short_message_size; + + /* + ** set up process affinity information + ** */ + { + opal_buffer_t* sbuffer = OBJ_NEW(opal_buffer_t); + opal_buffer_t* rbuffer = OBJ_NEW(opal_buffer_t); + opal_paffinity_base_cpu_set_t my_cpu_set; + opal_list_t peers; + orte_namelist_t *peer; + int my_rank=ompi_comm_rank(comm); + uint32_t dummy; + /* use socket layout based collectives, only if explicitly discovered + ** that we can */ + sm_module->have_socket_information=0; + + /* get the number of processors on this node */ + ret=opal_paffinity_base_get_processor_info(&num_procs); + + /* get process affinity mask */ + OPAL_PAFFINITY_CPU_ZERO(my_cpu_set); + ret=opal_paffinity_base_get(&my_cpu_set); + if( OPAL_ERR_NOT_FOUND == ret ) { + + /* pa affinity not set, so socket index will be set to -1 */ + my_socket_index=-1; + } else { + + my_socket_index=-1; + /* loop over number of processors */ + for ( proc=0 ; proc < num_procs ; proc++ ) { + if (OPAL_PAFFINITY_CPU_ISSET(proc,my_cpu_set)) { + opal_paffinity_base_get_map_to_socket_core(i,&socket_tmp,&core); + if( (-1) == socket ){ + /* socket not set yet */ + my_socket_index=socket_tmp; + } else { + /* the algorithms assume that procs are local to one + ** socket only */ + if( my_socket_index != socket_tmp ) { + my_socket_index=-1; + break; + } + } + } + } + + /* get every one elses information */ + + } + /* prepare list of ranks */ + OBJ_CONSTRUCT(&peers, opal_list_t); + for (i = 0; i < size; i++) { + peer = OBJ_NEW(orte_namelist_t); + peer->name.jobid = comm->c_local_group->grp_proc_pointers[i]->proc_name.jobid; + peer->name.vpid = comm->c_local_group->grp_proc_pointers[i]->proc_name.vpid; + opal_list_append(&peers, &peer->item); + } + /* prepare send data */ + if (NULL == sbuffer || NULL == rbuffer) { + fprintf(stderr," Can't allocte memory for sbuffer or rbuffer \n"); + fflush(stderr); + return ORTE_ERR_OUT_OF_RESOURCE; + } + /* Pack my rank , I need it because allgather doesnot work as expected */ + ret = opal_dss.pack(sbuffer, &my_rank, 1, OPAL_UINT32); + if (ORTE_SUCCESS != ret) { + fprintf(stderr," pack returned error %d for my_rank \n",ret); + fflush(stderr); + return ret; + } + + /* Pack socket index */ + ret = opal_dss.pack(sbuffer, my_socket_index, 1, OPAL_UINT32); + if (ORTE_SUCCESS != ret) { + fprintf(stderr," pack returned error %d for my_socket_index \n",ret); + fflush(stderr); + return ret; + } + /* Allgather data over the comunicator */ + if (ORTE_SUCCESS != (ret = orte_grpcomm.allgather_list(&peers, sbuffer, rbuffer))) { + fprintf(stderr," orte_grpcomm.allgather_list returned error %d \n",ret); + fflush(stderr); + return ret; + } + + /* + ** note !!!! - not sure why this is here, but will leave if for now + */ + ret = opal_dss.unpack(rbuffer, &dummy, &cnt, ORTE_STD_CNTR); + OPAL_OUTPUT_VERBOSE((10, mca_coll_base_output,"Get dummy value %d \n", dummy)); + if (ORTE_SUCCESS != ret) { + fprintf(stderr," unpack returned error %d for dummy \n",ret); + fflush(stderr); + return OMPI_ERROR; + } + + sm_module->have_socket_information=1; + /* allocte memory to store socket information per process */ + sm_module->socket_index=(int *)malloc(sizeof(int)*ompi_comm_size(comm)); + if ( NULL == sm_module->socket_index) { + goto DONE_WITH_SOCKET_SETUP; + } + for (proc = 0; proc < ompi_comm_size(comm); proc++) { + uint32_t rem_socket_index; + uint32_t rem_rank; + + /* note !!!! need to store the data for manipulation */ + /* unpack rank*/ + ret = opal_dss.unpack(rbuffer, &rem_rank, &cnt, OPAL_UINT32); + if (ORTE_SUCCESS != ret) { + fprintf(stderr," unpack returned error %d for rem_rank \n",ret); + fflush(stderr); + return OMPI_ERROR; + } + + /* unpack socket index */ + ret = opal_dss.unpack(rbuffer, &rem_socket_index, &cnt, OPAL_UINT32); + if (ORTE_SUCCESS != ret) { + fprintf(stderr," unpack returned error %d for rem_socket_index \n",ret); + fflush(stderr); + return OMPI_ERROR; + } + + sm_module->socket_index[rem_rank]=rem_socket_index; + if( (-1) == rem_socket_index ) { + sm_module->have_socket_information=0; + free(sm_module->socket_index); + sm_module->socket_index=NULL; + goto DONE_WITH_SOCKET_SETUP; + } + + } + /* need to generate the required data for the collective algorithms */ + + /* figure out how many sokcets are used */ + /* allocte memory to store socket information per process */ + sm_module->n_procs_per_socket=(int *)malloc(sizeof(int)*num_procs); + if ( NULL == sm_module->socket_index) { + goto DONE_WITH_SOCKET_SETUP; + } + /* initialize counters */ + for (proc = 0; proc < num_procs; proc++) { + sm_module->n_procs_per_socket[proc]=0; + } + /* count how many procs are associated with a given socket */ + for (proc = 0; proc < ompi_comm_size(comm); proc++) { + sm_module->n_procs_per_socket[sm_module->socket_index[proc]]++; + } + n_sockets=0; + for (proc = 0; proc < num_procs; proc++) { + if( 0 < sm_module->n_procs_per_socket[proc]) { + n_sockets++; + } + } + if( n_sockets == ompi_comm_size(comm) ) { + /* only one proc per socket - no extra level of hierarchy */ + if( NULL != sm_module->socket_index ) { + free(sm_module->socket_index); + sm_module->socket_index=NULL; + } + if( NULL != sm_module->n_procs_per_socket ) { + free(sm_module->n_procs_per_socket); + sm_module->n_procs_per_socket=NULL; + } + sm_module->have_socket_information=0; + goto DONE_WITH_SOCKET_SETUP; + } + /* group procs by socket - for rooted operations want to access the + ** root directly, rather than through the intermediate designated + ** socket "leader" */ + sm_module->sockets_in_use=(int *)malloc(sizeof(int)*n_sockets); + if ( NULL == sm_module->sockets_in_use) { + goto DONE_WITH_SOCKET_SETUP; + } + cnt=0; + for (proc = 0; proc < num_procs; proc++) { + if( 0 < sm_module->n_procs_per_socket[proc] ) { + sm_module->sockets_in_use[cnt]=proc; + cnt++; + } + } + sm_module->list_of_ranks_per_socket=(int **)malloc(sizeof(int *)*n_sockets); + if ( NULL == sm_module->list_of_ranks_per_socket) { + goto DONE_WITH_SOCKET_SETUP; + } + for (j = 0; j < n_sockets; j++) { + socket=sm_module->sockets_in_use[j]; + cnt=sm_module->n_procs_per_socket[socket]; + sm_module->list_of_ranks_per_socket[j]=(int *)malloc(sizeof(int)*cnt); + if ( NULL == sm_module->list_of_ranks_per_socket) { + goto DONE_WITH_SOCKET_SETUP; + } + cnt=0; + for (i = 0; i < ompi_comm_size(comm); i++) { + if( socket == sm_module->socket_index[i] ) { + sm_module->list_of_ranks_per_socket[j][cnt]=i; + cnt++; + } + } + } + +DONE_WITH_SOCKET_SETUP: + /* free resources */ + OBJ_RELEASE(peer); + OBJ_RELEASE(sbuffer); + OBJ_RELEASE(rbuffer); + } + /* touch pages to apply memory affinity - Note: do we really need this or will * the algorithms do this */ @@ -1206,6 +1419,24 @@ CLEANUP: free(sm_module->ctl_blocking_barrier); sm_module->ctl_blocking_barrier=NULL; } + if( NULL != sm_module->socket_index ) { + free(sm_module->socket_index); + sm_module->socket_index=NULL; + } + if( NULL != sm_module->n_procs_per_socket ) { + free(sm_module->n_procs_per_socket); + sm_module->n_procs_per_socket=NULL; + } + if( NULL != sm_module->sockets_in_use ) { + for (j = 0; j < n_sockets; j++) { + if( NULL != sm_module->sockets_in_use[j] ) { + free(sm_module->sockets_in_use[j]); + sm_module->sockets_in_use[j]=NULL; + } + free(sm_module->sockets_in_use); + sm_module->sockets_in_use=NULL; + } + } OBJ_RELEASE(sm_module);