gather socket information - not debugged.`
This commit was SVN r20670.
Этот коммит содержится в:
родитель
c7fda41d2a
Коммит
daf7673aff
@ -474,6 +474,31 @@ BEGIN_C_DECLS
|
||||
*/
|
||||
size_t short_message_size;
|
||||
|
||||
/*
|
||||
* flag indicating if have socket layout for the procs
|
||||
*/
|
||||
int have_socket_information;
|
||||
|
||||
/*
|
||||
* socket index
|
||||
*/
|
||||
int *socket_index;
|
||||
|
||||
/*
|
||||
* number of processes per socket
|
||||
*/
|
||||
int *n_procs_per_socket;
|
||||
|
||||
/*
|
||||
* sockets in use
|
||||
*/
|
||||
int *sockets_in_use;
|
||||
|
||||
/*
|
||||
* number of processes per socket
|
||||
*/
|
||||
int **list_of_ranks_per_socket;
|
||||
|
||||
/*
|
||||
* function table for variants of a given collective
|
||||
* function.
|
||||
|
@ -39,6 +39,9 @@
|
||||
#include "ompi/mca/dpm/dpm.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "opal/mca/paffinity/base/base.h"
|
||||
#include "orte/mca/grpcomm/grpcomm.h"
|
||||
|
||||
/*
|
||||
* Local functions
|
||||
@ -672,7 +675,8 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
{
|
||||
/* local variables */
|
||||
mca_coll_sm2_module_t *sm_module;
|
||||
int i,j,group_size,ret;
|
||||
int i,j,group_size,ret,proc;
|
||||
int my_socket_index,num_procs,socket,socket_tmp,core,n_sockets,cnt;
|
||||
size_t alignment,size;
|
||||
size_t tot_size_mem_banks;
|
||||
size_t ctl_memory_per_proc_per_segment;
|
||||
@ -1149,6 +1153,215 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
/* set the switch-over parameter */
|
||||
sm_module->short_message_size=mca_coll_sm2_component.short_message_size;
|
||||
|
||||
|
||||
/*
|
||||
** set up process affinity information
|
||||
** */
|
||||
{
|
||||
opal_buffer_t* sbuffer = OBJ_NEW(opal_buffer_t);
|
||||
opal_buffer_t* rbuffer = OBJ_NEW(opal_buffer_t);
|
||||
opal_paffinity_base_cpu_set_t my_cpu_set;
|
||||
opal_list_t peers;
|
||||
orte_namelist_t *peer;
|
||||
int my_rank=ompi_comm_rank(comm);
|
||||
uint32_t dummy;
|
||||
/* use socket layout based collectives, only if explicitly discovered
|
||||
** that we can */
|
||||
sm_module->have_socket_information=0;
|
||||
|
||||
/* get the number of processors on this node */
|
||||
ret=opal_paffinity_base_get_processor_info(&num_procs);
|
||||
|
||||
/* get process affinity mask */
|
||||
OPAL_PAFFINITY_CPU_ZERO(my_cpu_set);
|
||||
ret=opal_paffinity_base_get(&my_cpu_set);
|
||||
if( OPAL_ERR_NOT_FOUND == ret ) {
|
||||
|
||||
/* pa affinity not set, so socket index will be set to -1 */
|
||||
my_socket_index=-1;
|
||||
} else {
|
||||
|
||||
my_socket_index=-1;
|
||||
/* loop over number of processors */
|
||||
for ( proc=0 ; proc < num_procs ; proc++ ) {
|
||||
if (OPAL_PAFFINITY_CPU_ISSET(proc,my_cpu_set)) {
|
||||
opal_paffinity_base_get_map_to_socket_core(i,&socket_tmp,&core);
|
||||
if( (-1) == socket ){
|
||||
/* socket not set yet */
|
||||
my_socket_index=socket_tmp;
|
||||
} else {
|
||||
/* the algorithms assume that procs are local to one
|
||||
** socket only */
|
||||
if( my_socket_index != socket_tmp ) {
|
||||
my_socket_index=-1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* get every one elses information */
|
||||
|
||||
}
|
||||
/* prepare list of ranks */
|
||||
OBJ_CONSTRUCT(&peers, opal_list_t);
|
||||
for (i = 0; i < size; i++) {
|
||||
peer = OBJ_NEW(orte_namelist_t);
|
||||
peer->name.jobid = comm->c_local_group->grp_proc_pointers[i]->proc_name.jobid;
|
||||
peer->name.vpid = comm->c_local_group->grp_proc_pointers[i]->proc_name.vpid;
|
||||
opal_list_append(&peers, &peer->item);
|
||||
}
|
||||
/* prepare send data */
|
||||
if (NULL == sbuffer || NULL == rbuffer) {
|
||||
fprintf(stderr," Can't allocte memory for sbuffer or rbuffer \n");
|
||||
fflush(stderr);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
/* Pack my rank , I need it because allgather doesnot work as expected */
|
||||
ret = opal_dss.pack(sbuffer, &my_rank, 1, OPAL_UINT32);
|
||||
if (ORTE_SUCCESS != ret) {
|
||||
fprintf(stderr," pack returned error %d for my_rank \n",ret);
|
||||
fflush(stderr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Pack socket index */
|
||||
ret = opal_dss.pack(sbuffer, my_socket_index, 1, OPAL_UINT32);
|
||||
if (ORTE_SUCCESS != ret) {
|
||||
fprintf(stderr," pack returned error %d for my_socket_index \n",ret);
|
||||
fflush(stderr);
|
||||
return ret;
|
||||
}
|
||||
/* Allgather data over the comunicator */
|
||||
if (ORTE_SUCCESS != (ret = orte_grpcomm.allgather_list(&peers, sbuffer, rbuffer))) {
|
||||
fprintf(stderr," orte_grpcomm.allgather_list returned error %d \n",ret);
|
||||
fflush(stderr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
** note !!!! - not sure why this is here, but will leave if for now
|
||||
*/
|
||||
ret = opal_dss.unpack(rbuffer, &dummy, &cnt, ORTE_STD_CNTR);
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_base_output,"Get dummy value %d \n", dummy));
|
||||
if (ORTE_SUCCESS != ret) {
|
||||
fprintf(stderr," unpack returned error %d for dummy \n",ret);
|
||||
fflush(stderr);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
sm_module->have_socket_information=1;
|
||||
/* allocte memory to store socket information per process */
|
||||
sm_module->socket_index=(int *)malloc(sizeof(int)*ompi_comm_size(comm));
|
||||
if ( NULL == sm_module->socket_index) {
|
||||
goto DONE_WITH_SOCKET_SETUP;
|
||||
}
|
||||
for (proc = 0; proc < ompi_comm_size(comm); proc++) {
|
||||
uint32_t rem_socket_index;
|
||||
uint32_t rem_rank;
|
||||
|
||||
/* note !!!! need to store the data for manipulation */
|
||||
/* unpack rank*/
|
||||
ret = opal_dss.unpack(rbuffer, &rem_rank, &cnt, OPAL_UINT32);
|
||||
if (ORTE_SUCCESS != ret) {
|
||||
fprintf(stderr," unpack returned error %d for rem_rank \n",ret);
|
||||
fflush(stderr);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* unpack socket index */
|
||||
ret = opal_dss.unpack(rbuffer, &rem_socket_index, &cnt, OPAL_UINT32);
|
||||
if (ORTE_SUCCESS != ret) {
|
||||
fprintf(stderr," unpack returned error %d for rem_socket_index \n",ret);
|
||||
fflush(stderr);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
sm_module->socket_index[rem_rank]=rem_socket_index;
|
||||
if( (-1) == rem_socket_index ) {
|
||||
sm_module->have_socket_information=0;
|
||||
free(sm_module->socket_index);
|
||||
sm_module->socket_index=NULL;
|
||||
goto DONE_WITH_SOCKET_SETUP;
|
||||
}
|
||||
|
||||
}
|
||||
/* need to generate the required data for the collective algorithms */
|
||||
|
||||
/* figure out how many sokcets are used */
|
||||
/* allocte memory to store socket information per process */
|
||||
sm_module->n_procs_per_socket=(int *)malloc(sizeof(int)*num_procs);
|
||||
if ( NULL == sm_module->socket_index) {
|
||||
goto DONE_WITH_SOCKET_SETUP;
|
||||
}
|
||||
/* initialize counters */
|
||||
for (proc = 0; proc < num_procs; proc++) {
|
||||
sm_module->n_procs_per_socket[proc]=0;
|
||||
}
|
||||
/* count how many procs are associated with a given socket */
|
||||
for (proc = 0; proc < ompi_comm_size(comm); proc++) {
|
||||
sm_module->n_procs_per_socket[sm_module->socket_index[proc]]++;
|
||||
}
|
||||
n_sockets=0;
|
||||
for (proc = 0; proc < num_procs; proc++) {
|
||||
if( 0 < sm_module->n_procs_per_socket[proc]) {
|
||||
n_sockets++;
|
||||
}
|
||||
}
|
||||
if( n_sockets == ompi_comm_size(comm) ) {
|
||||
/* only one proc per socket - no extra level of hierarchy */
|
||||
if( NULL != sm_module->socket_index ) {
|
||||
free(sm_module->socket_index);
|
||||
sm_module->socket_index=NULL;
|
||||
}
|
||||
if( NULL != sm_module->n_procs_per_socket ) {
|
||||
free(sm_module->n_procs_per_socket);
|
||||
sm_module->n_procs_per_socket=NULL;
|
||||
}
|
||||
sm_module->have_socket_information=0;
|
||||
goto DONE_WITH_SOCKET_SETUP;
|
||||
}
|
||||
/* group procs by socket - for rooted operations want to access the
|
||||
** root directly, rather than through the intermediate designated
|
||||
** socket "leader" */
|
||||
sm_module->sockets_in_use=(int *)malloc(sizeof(int)*n_sockets);
|
||||
if ( NULL == sm_module->sockets_in_use) {
|
||||
goto DONE_WITH_SOCKET_SETUP;
|
||||
}
|
||||
cnt=0;
|
||||
for (proc = 0; proc < num_procs; proc++) {
|
||||
if( 0 < sm_module->n_procs_per_socket[proc] ) {
|
||||
sm_module->sockets_in_use[cnt]=proc;
|
||||
cnt++;
|
||||
}
|
||||
}
|
||||
sm_module->list_of_ranks_per_socket=(int **)malloc(sizeof(int *)*n_sockets);
|
||||
if ( NULL == sm_module->list_of_ranks_per_socket) {
|
||||
goto DONE_WITH_SOCKET_SETUP;
|
||||
}
|
||||
for (j = 0; j < n_sockets; j++) {
|
||||
socket=sm_module->sockets_in_use[j];
|
||||
cnt=sm_module->n_procs_per_socket[socket];
|
||||
sm_module->list_of_ranks_per_socket[j]=(int *)malloc(sizeof(int)*cnt);
|
||||
if ( NULL == sm_module->list_of_ranks_per_socket) {
|
||||
goto DONE_WITH_SOCKET_SETUP;
|
||||
}
|
||||
cnt=0;
|
||||
for (i = 0; i < ompi_comm_size(comm); i++) {
|
||||
if( socket == sm_module->socket_index[i] ) {
|
||||
sm_module->list_of_ranks_per_socket[j][cnt]=i;
|
||||
cnt++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DONE_WITH_SOCKET_SETUP:
|
||||
/* free resources */
|
||||
OBJ_RELEASE(peer);
|
||||
OBJ_RELEASE(sbuffer);
|
||||
OBJ_RELEASE(rbuffer);
|
||||
}
|
||||
|
||||
/* touch pages to apply memory affinity - Note: do we really need this or will
|
||||
* the algorithms do this */
|
||||
|
||||
@ -1206,6 +1419,24 @@ CLEANUP:
|
||||
free(sm_module->ctl_blocking_barrier);
|
||||
sm_module->ctl_blocking_barrier=NULL;
|
||||
}
|
||||
if( NULL != sm_module->socket_index ) {
|
||||
free(sm_module->socket_index);
|
||||
sm_module->socket_index=NULL;
|
||||
}
|
||||
if( NULL != sm_module->n_procs_per_socket ) {
|
||||
free(sm_module->n_procs_per_socket);
|
||||
sm_module->n_procs_per_socket=NULL;
|
||||
}
|
||||
if( NULL != sm_module->sockets_in_use ) {
|
||||
for (j = 0; j < n_sockets; j++) {
|
||||
if( NULL != sm_module->sockets_in_use[j] ) {
|
||||
free(sm_module->sockets_in_use[j]);
|
||||
sm_module->sockets_in_use[j]=NULL;
|
||||
}
|
||||
free(sm_module->sockets_in_use);
|
||||
sm_module->sockets_in_use=NULL;
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_RELEASE(sm_module);
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user