1
1

Modify the base collective algorithms to take an array of arbitrary vpids instead of assuming everything is ordered in a particular way. Modify the hier grpcomm module to support arbitrary mappings

This commit was SVN r20599.
Этот коммит содержится в:
Ralph Castain 2009-02-19 21:35:20 +00:00
родитель 6151f7b60c
Коммит 8359477387
5 изменённых файлов: 96 добавлений и 95 удалений

Просмотреть файл

@ -86,7 +86,7 @@ ORTE_DECLSPEC void orte_grpcomm_base_coll_recv(int status, orte_process_name_t*
opal_buffer_t* buffer, orte_rml_tag_t tag, opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata); void* cbdata);
ORTE_DECLSPEC int orte_grpcomm_base_allgather(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries, ORTE_DECLSPEC int orte_grpcomm_base_allgather(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries,
orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t step); orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t *vpids);
#endif /* ORTE_DISABLE_FULL_SUPPORT */ #endif /* ORTE_DISABLE_FULL_SUPPORT */

Просмотреть файл

@ -53,11 +53,11 @@
/**** AVAILABLE ALGORITHMS ****/ /**** AVAILABLE ALGORITHMS ****/
static int twoproc(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries, static int twoproc(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries,
orte_jobid_t jobid, orte_vpid_t step); orte_jobid_t jobid, orte_vpid_t *vpids);
static int bruck(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries, static int bruck(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries,
orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t step); orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t *vpids);
static int recursivedoubling(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries, static int recursivedoubling(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries,
orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t step); orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t *vpids);
/**** LOCAL VARIABLES USED IN COLLECTIVES ****/ /**** LOCAL VARIABLES USED IN COLLECTIVES ****/
static int num_recvd; static int num_recvd;
@ -105,15 +105,15 @@ void orte_grpcomm_base_coll_recv(int status, orte_process_name_t* sender,
* Switchyard for selecting the collective algorithm to use * Switchyard for selecting the collective algorithm to use
*/ */
int orte_grpcomm_base_allgather(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries, int orte_grpcomm_base_allgather(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries,
orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t step) orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t *vpids)
{ {
bool has_one; bool has_one;
orte_vpid_t n; orte_vpid_t n;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:coll:allgather called with %d entries np %d step %d", "%s grpcomm:coll:allgather called with %d entries np %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_entries, (int)np, (int)step)); num_entries, (int)np));
/* if we only have one proc participating, just copy the data across and return */ /* if we only have one proc participating, just copy the data across and return */
if (1 == np) { if (1 == np) {
@ -123,7 +123,7 @@ int orte_grpcomm_base_allgather(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf,
if (2 == np) { if (2 == np) {
/* only two procs in collective */ /* only two procs in collective */
return twoproc(sendbuf, recvbuf, num_entries, jobid, step); return twoproc(sendbuf, recvbuf, num_entries, jobid, vpids);
} }
/* if we have power of 2 participants, use recursive doubling - otherwise, /* if we have power of 2 participants, use recursive doubling - otherwise,
@ -134,14 +134,14 @@ int orte_grpcomm_base_allgather(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf,
for ( ; n > 0; n >>= 1) { for ( ; n > 0; n >>= 1) {
if (n & 0x1) { if (n & 0x1) {
if (has_one) { if (has_one) {
return bruck(sendbuf, recvbuf, num_entries, jobid, np, step); return bruck(sendbuf, recvbuf, num_entries, jobid, np, vpids);
} }
has_one = true; has_one = true;
} }
} }
/* must be power of two! */ /* must be power of two! */
return recursivedoubling(sendbuf, recvbuf, num_entries, jobid, np, step); return recursivedoubling(sendbuf, recvbuf, num_entries, jobid, np, vpids);
} }
@ -152,7 +152,7 @@ int orte_grpcomm_base_allgather(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf,
* Zero adds its data to message, sends result back to one * Zero adds its data to message, sends result back to one
*/ */
static int twoproc(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries, static int twoproc(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries,
orte_jobid_t jobid, orte_vpid_t step) orte_jobid_t jobid, orte_vpid_t *vpids)
{ {
orte_process_name_t peer; orte_process_name_t peer;
int32_t num_remote, cnt; int32_t num_remote, cnt;
@ -165,9 +165,9 @@ static int twoproc(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_e
"%s grpcomm:coll:two-proc algo employed", "%s grpcomm:coll:two-proc algo employed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (0 == ORTE_PROC_MY_NAME->vpid) { if (vpids[0] == ORTE_PROC_MY_NAME->vpid) {
/* I send first */ /* I send first */
peer.vpid = step; peer.vpid = vpids[1];
/* setup a temp buffer so I can inform the other side as to the /* setup a temp buffer so I can inform the other side as to the
* number of entries in my buffer * number of entries in my buffer
*/ */
@ -222,7 +222,7 @@ static int twoproc(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_e
OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &num_entries, 1, OPAL_INT32); opal_dss.pack(&buf, &num_entries, 1, OPAL_INT32);
opal_dss.copy_payload(&buf, sendbuf); opal_dss.copy_payload(&buf, sendbuf);
peer.vpid = 0; peer.vpid = vpids[0];
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
"%s grpcomm:coll:two-proc sending to %s", "%s grpcomm:coll:two-proc sending to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -264,9 +264,9 @@ static int twoproc(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_e
* ompi/mca/coll/tuned/coll_tuned_allgather.c * ompi/mca/coll/tuned/coll_tuned_allgather.c
*/ */
static int bruck(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries, static int bruck(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries,
orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t step) orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t *vpids)
{ {
orte_vpid_t rank, distance, stp; orte_vpid_t rank, distance, nv;
orte_process_name_t peer; orte_process_name_t peer;
int32_t num_remote, total_entries, cnt; int32_t num_remote, total_entries, cnt;
opal_buffer_t collection, buf; opal_buffer_t collection, buf;
@ -292,16 +292,28 @@ static int bruck(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_ent
- sends message containing all data collected so far to rank r - distance - sends message containing all data collected so far to rank r - distance
- receives message containing all data collected so far from rank (r + distance) - receives message containing all data collected so far from rank (r + distance)
*/ */
/* find my position in the group of participants - it always starts at rank=0. This /* find my position in the group of participants. This
* value is the "rank" we will use in the algo * value is the "rank" we will use in the algo
*/ */
rank = (ORTE_PROC_MY_NAME->vpid) / step; rank = ORTE_VPID_INVALID;
for (nv=0; nv < np; nv++) {
if (vpids[nv] == ORTE_PROC_MY_NAME->vpid) {
rank = nv;
break;
}
}
/* check for bozo case */
if (ORTE_VPID_INVALID == rank) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
for (distance = 1; distance < np; distance <<= 1) { for (distance = 1; distance < np; distance <<= 1) {
/* first send my current contents */ /* first send my current contents */
stp = (rank - distance + np) % np; nv = (rank - distance + np) % np;
peer.vpid = (stp * step); peer.vpid = vpids[nv];
OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &total_entries, 1, OPAL_INT32); opal_dss.pack(&buf, &total_entries, 1, OPAL_INT32);
opal_dss.copy_payload(&buf, &collection); opal_dss.copy_payload(&buf, &collection);
@ -317,8 +329,8 @@ static int bruck(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_ent
/* now setup to recv from my other partner */ /* now setup to recv from my other partner */
num_recvd = 0; num_recvd = 0;
stp = (rank + distance) % np; nv = (rank + distance) % np;
peer.vpid = (stp * step); peer.vpid = vpids[nv];
OBJ_CONSTRUCT(&bucket, opal_buffer_t); OBJ_CONSTRUCT(&bucket, opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(&peer, if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(&peer,
ORTE_RML_TAG_DAEMON_COLLECTIVE, ORTE_RML_TAG_DAEMON_COLLECTIVE,
@ -367,9 +379,9 @@ static int bruck(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_ent
* ompi/mca/coll/tuned/coll_tuned_allgather.c * ompi/mca/coll/tuned/coll_tuned_allgather.c
*/ */
static int recursivedoubling(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries, static int recursivedoubling(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int32_t num_entries,
orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t step) orte_jobid_t jobid, orte_vpid_t np, orte_vpid_t *vpids)
{ {
orte_vpid_t rank, distance, stp; orte_vpid_t rank, distance, nv;
int32_t num_remote, total_entries, cnt; int32_t num_remote, total_entries, cnt;
opal_buffer_t collection, buf; opal_buffer_t collection, buf;
orte_process_name_t peer; orte_process_name_t peer;
@ -393,16 +405,28 @@ static int recursivedoubling(opal_buffer_t *sendbuf, opal_buffer_t *recvbuf, int
At every step i, rank r: At every step i, rank r:
- exchanges message containing all data collected so far with rank peer = (r ^ 2^i). - exchanges message containing all data collected so far with rank peer = (r ^ 2^i).
*/ */
/* find my position in the group of participants - it always starts at rank=0. This /* find my position in the group of participants. This
* value is the "rank" we will use in the algo * value is the "rank" we will use in the algo
*/ */
rank = (ORTE_PROC_MY_NAME->vpid) / step; rank = ORTE_VPID_INVALID;
for (nv=0; nv < np; nv++) {
if (vpids[nv] == ORTE_PROC_MY_NAME->vpid) {
rank = nv;
break;
}
}
/* check for bozo case */
if (ORTE_VPID_INVALID == rank) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
for (distance = 0x1; distance < np; distance<<=1) { for (distance = 0x1; distance < np; distance<<=1) {
/* first send my current contents */ /* first send my current contents */
stp = rank ^ distance; nv = rank ^ distance;
peer.vpid = (stp * step); peer.vpid = vpids[nv];
OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &total_entries, 1, OPAL_INT32); opal_dss.pack(&buf, &total_entries, 1, OPAL_INT32);
opal_dss.copy_payload(&buf, &collection); opal_dss.copy_payload(&buf, &collection);

Просмотреть файл

@ -38,18 +38,11 @@ int orte_grpcomm_hier_open(void);
int orte_grpcomm_hier_close(void); int orte_grpcomm_hier_close(void);
int orte_grpcomm_hier_component_query(mca_base_module_t **module, int *priority); int orte_grpcomm_hier_component_query(mca_base_module_t **module, int *priority);
/* Hier component */
typedef struct {
orte_grpcomm_base_component_t super;
orte_vpid_t num_nodes;
orte_vpid_t step;
} orte_grpcomm_hier_component_t;
/* /*
* Grpcomm interfaces * Grpcomm interfaces
*/ */
ORTE_MODULE_DECLSPEC extern orte_grpcomm_hier_component_t mca_grpcomm_hier_component; ORTE_MODULE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_hier_component;
extern orte_grpcomm_base_module_t orte_grpcomm_hier_module; extern orte_grpcomm_base_module_t orte_grpcomm_hier_module;
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -41,8 +41,7 @@
/* /*
* Struct of function pointers that need to be initialized * Struct of function pointers that need to be initialized
*/ */
orte_grpcomm_hier_component_t mca_grpcomm_hier_component = { orte_grpcomm_base_component_t mca_grpcomm_hier_component = {
{
{ {
ORTE_GRPCOMM_BASE_VERSION_2_0_0, ORTE_GRPCOMM_BASE_VERSION_2_0_0,
@ -58,7 +57,6 @@ orte_grpcomm_hier_component_t mca_grpcomm_hier_component = {
/* The component is checkpoint ready */ /* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT MCA_BASE_METADATA_PARAM_CHECKPOINT
} }
}
}; };
/* Open the component */ /* Open the component */
@ -74,30 +72,8 @@ int orte_grpcomm_hier_close(void)
int orte_grpcomm_hier_component_query(mca_base_module_t **module, int *priority) int orte_grpcomm_hier_component_query(mca_base_module_t **module, int *priority)
{ {
mca_base_component_t *c = &mca_grpcomm_hier_component.super.base_version; /* only selected upon request */
int tmp; *priority = 0;
/* check for required params */
mca_base_param_reg_int(c, "num_nodes",
"How many nodes are in the job (must be > 0)",
false, false, -1, &tmp);
if (tmp < 0) {
*module = NULL;
return ORTE_ERROR;
}
mca_grpcomm_hier_component.num_nodes = tmp;
mca_base_param_reg_int(c, "step",
"Step in local_rank=0 vpids between nodes (must be > 0)",
false, false, -1, &tmp);
if (tmp < 0) {
*module = NULL;
return ORTE_ERROR;
}
mca_grpcomm_hier_component.step = tmp;
/* we need to be selected */
*priority = 100;
*module = (mca_base_module_t *)&orte_grpcomm_hier_module; *module = (mca_base_module_t *)&orte_grpcomm_hier_module;
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -83,6 +83,8 @@ static opal_list_t my_local_peers;
static orte_process_name_t my_local_rank_zero_proc; static orte_process_name_t my_local_rank_zero_proc;
static int num_local_peers; static int num_local_peers;
static bool coll_initialized = false; static bool coll_initialized = false;
static orte_vpid_t *my_coll_peers=NULL;
static int cpeers=0;
/** /**
* Initialize the module * Initialize the module
@ -114,6 +116,10 @@ static void finalize(void)
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
OBJ_DESTRUCT(&my_local_peers); OBJ_DESTRUCT(&my_local_peers);
if (NULL != my_coll_peers) {
free(my_coll_peers);
}
} }
/** /**
@ -301,21 +307,31 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
/* have I initialized my local info? */ /* have I initialized my local info? */
if (!coll_initialized) { if (!coll_initialized) {
orte_process_name_t proc; orte_process_name_t proc;
orte_vpid_t v, vmax; orte_vpid_t v;
/* no - cycle through the procs to find those that are local */ /* get my local rank so I can locally cache it */
proc.jobid = ORTE_PROC_MY_NAME->jobid; my_local_rank = orte_ess.get_local_rank(ORTE_PROC_MY_NAME);
vmax = ORTE_VPID_MAX;
my_local_rank = 0;
num_local_peers = 0; /* don't count myself */
for (v=0; v < orte_process_info.num_procs; v++) { /* if I am local_rank=0 for this node and job, then setup
/* ignore if this is me */ * my array of local_rank=0 peers
if (v == ORTE_PROC_MY_NAME->vpid) { */
continue; if (0 == my_local_rank) {
/* we need one entry/node in this job */
my_coll_peers = (orte_vpid_t*)malloc(orte_process_info.num_nodes * sizeof(orte_vpid_t));
cpeers = 0;
} }
/* cycle through the procs to create a list of those that are local to me */
proc.jobid = ORTE_PROC_MY_NAME->jobid;
for (v=0; v < orte_process_info.num_procs; v++) {
proc.vpid = v; proc.vpid = v;
if (!OPAL_PROC_ON_LOCAL_NODE(orte_ess.proc_get_locality(&proc))) { /* is this proc local_rank=0 on its node? */
if (0 == my_local_rank && 0 == orte_ess.get_local_rank(&proc)) {
my_coll_peers[cpeers++] = v;
}
/* if this is me, or this proc isn't on our node, ignore it */
if (v == ORTE_PROC_MY_NAME->vpid ||
!OPAL_PROC_ON_LOCAL_NODE(orte_ess.proc_get_locality(&proc))) {
continue; continue;
} }
/* add this proc to our list of local peers */ /* add this proc to our list of local peers */
@ -323,23 +339,16 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
nm->name.jobid = proc.jobid; nm->name.jobid = proc.jobid;
nm->name.vpid = proc.vpid; nm->name.vpid = proc.vpid;
opal_list_append(&my_local_peers, &nm->item); opal_list_append(&my_local_peers, &nm->item);
/* keep count */ /* if I am not local_rank=0, is this one? */
num_local_peers++; if (0 != my_local_rank &&
/* is this our locally lowest rank? */ 0 == orte_ess.get_local_rank(&proc)) {
if (v < vmax) { my_local_rank_zero_proc.jobid = proc.jobid;
vmax = v; my_local_rank_zero_proc.vpid = proc.vpid;
}
/* is this rank lower than mine? */
if (v < ORTE_PROC_MY_NAME->vpid) {
my_local_rank++;
} }
} }
/* if I am not the local_rank=0 proc, record who is */ /* compute the number of local peers */
if (0 != my_local_rank) { num_local_peers = opal_list_get_size(&my_local_peers);
my_local_rank_zero_proc.jobid = ORTE_PROC_MY_NAME->jobid;
my_local_rank_zero_proc.vpid = vmax;
}
/* flag that I have initialized things */ /* flag that I have initialized things */
coll_initialized = true; coll_initialized = true;
@ -405,8 +414,7 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
OBJ_CONSTRUCT(&final_buf, opal_buffer_t); OBJ_CONSTRUCT(&final_buf, opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_allgather(&allgather_buf, rbuf, num_local_peers + 1, if (ORTE_SUCCESS != (rc = orte_grpcomm_base_allgather(&allgather_buf, rbuf, num_local_peers + 1,
ORTE_PROC_MY_NAME->jobid, ORTE_PROC_MY_NAME->jobid,
mca_grpcomm_hier_component.num_nodes, cpeers, my_coll_peers))) {
mca_grpcomm_hier_component.step))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&allgather_buf); OBJ_DESTRUCT(&allgather_buf);
OBJ_DESTRUCT(&final_buf); OBJ_DESTRUCT(&final_buf);