Cleanup the code and add some comments to make it easier to understand. Add a bozo error check
This commit was SVN r23639.
Этот коммит содержится в:
родитель
2886da5669
Коммит
7608513158
@ -79,7 +79,7 @@ orte_grpcomm_base_module_t orte_grpcomm_hier_module = {
|
||||
static orte_local_rank_t my_local_rank;
|
||||
static opal_list_t my_local_peers;
|
||||
static orte_process_name_t my_local_rank_zero_proc;
|
||||
static size_t num_local_peers;
|
||||
static size_t num_local_peers=0;
|
||||
static bool coll_initialized = false;
|
||||
static orte_vpid_t *my_coll_peers=NULL;
|
||||
static int cpeers=0;
|
||||
@ -96,6 +96,9 @@ static int init(void)
|
||||
OBJ_CONSTRUCT(&barrier, orte_grpcomm_collective_t);
|
||||
OBJ_CONSTRUCT(&allgather, orte_grpcomm_collective_t);
|
||||
|
||||
my_local_rank_zero_proc.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
my_local_rank_zero_proc.vpid = ORTE_VPID_INVALID;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
@ -209,8 +212,8 @@ static int hier_barrier(void)
|
||||
}
|
||||
|
||||
static void allgather_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
{
|
||||
orte_grpcomm_collective_t *coll = (orte_grpcomm_collective_t*)cbdata;
|
||||
int rc;
|
||||
@ -221,7 +224,17 @@ static void allgather_recv(int status, orte_process_name_t* sender,
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
coll->recvd += 1;
|
||||
if (num_local_peers == coll->recvd) {
|
||||
if (0 == my_local_rank) {
|
||||
/* I have to wait until I get a message from
|
||||
* every local proc other than myself
|
||||
*/
|
||||
if (num_local_peers == coll->recvd) {
|
||||
opal_condition_broadcast(&coll->cond);
|
||||
}
|
||||
} else {
|
||||
/* the rest of the local procs only recv one message back,
|
||||
* coming from the local_rank=0 proc
|
||||
*/
|
||||
opal_condition_broadcast(&coll->cond);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&coll->lock);
|
||||
@ -281,18 +294,26 @@ static int hier_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
||||
}
|
||||
}
|
||||
|
||||
/* compute the number of local peers */
|
||||
/* compute the number of local peers - note that this number
|
||||
* does not include me!!
|
||||
*/
|
||||
num_local_peers = opal_list_get_size(&my_local_peers);
|
||||
|
||||
/* flag that I have initialized things */
|
||||
coll_initialized = true;
|
||||
}
|
||||
|
||||
|
||||
/* if I am not local rank = 0 */
|
||||
if (0 != my_local_rank) {
|
||||
if (ORTE_VPID_INVALID == my_local_rank_zero_proc.vpid) {
|
||||
/* something is broken */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
|
||||
/* setup the collective */
|
||||
OPAL_THREAD_LOCK(&allgather.lock);
|
||||
allgather.recvd = num_local_peers - 1;
|
||||
allgather.recvd = 0;
|
||||
/* reset the collector */
|
||||
OBJ_DESTRUCT(&allgather.results);
|
||||
OBJ_CONSTRUCT(&allgather.results, opal_buffer_t);
|
||||
@ -314,9 +335,11 @@ static int hier_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* wait to complete */
|
||||
/* wait to complete - we will receive a single message
|
||||
* sent from our local_rank=0 peer
|
||||
*/
|
||||
OPAL_THREAD_LOCK(&allgather.lock);
|
||||
while (allgather.recvd < num_local_peers) {
|
||||
while (allgather.recvd < 1) {
|
||||
opal_condition_wait(&allgather.cond, &allgather.lock);
|
||||
}
|
||||
/* copy payload to the caller's buffer */
|
||||
@ -328,10 +351,10 @@ static int hier_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
||||
|
||||
} else {
|
||||
/* I am local_rank = 0 on this node! */
|
||||
|
||||
|
||||
/* setup the collective */
|
||||
OPAL_THREAD_LOCK(&allgather.lock);
|
||||
allgather.recvd = 1;
|
||||
allgather.recvd = 0;
|
||||
/* reset the collector */
|
||||
OBJ_DESTRUCT(&allgather.results);
|
||||
OBJ_CONSTRUCT(&allgather.results, opal_buffer_t);
|
||||
@ -349,7 +372,9 @@ static int hier_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* wait to complete */
|
||||
/* wait to complete - we need to receive input from every
|
||||
* local peer (excluding myself)
|
||||
*/
|
||||
OPAL_THREAD_LOCK(&allgather.lock);
|
||||
while (allgather.recvd < num_local_peers) {
|
||||
opal_condition_wait(&allgather.cond, &allgather.lock);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user