Merge pull request #482 from nkogteva/master
grpcomm: fixed brks and rcd algorithms - added enough space for masks in...
Этот коммит содержится в:
Коммит
c68a0ba99b
@ -84,6 +84,8 @@ ORTE_DECLSPEC int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
|
||||
void *cbdata);
|
||||
|
||||
ORTE_DECLSPEC orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create);
|
||||
ORTE_DECLSPEC void orte_grpcomm_base_mark_distance_recv(orte_grpcomm_coll_t *coll, uint32_t distance);
|
||||
ORTE_DECLSPEC unsigned int orte_grpcomm_base_check_distance_recv(orte_grpcomm_coll_t *coll, uint32_t distance);
|
||||
|
||||
END_C_DECLS
|
||||
#endif
|
||||
|
@ -119,7 +119,7 @@ static void ccon(orte_grpcomm_coll_t *p)
|
||||
p->dmns = NULL;
|
||||
p->ndmns = 0;
|
||||
p->nreported = 0;
|
||||
p->distance_mask_recv = 0;
|
||||
p->distance_mask_recv = NULL;
|
||||
p->cbfunc = NULL;
|
||||
p->cbdata = NULL;
|
||||
p->buffers = NULL;
|
||||
@ -134,6 +134,9 @@ static void cdes(orte_grpcomm_coll_t *p)
|
||||
free(p->dmns);
|
||||
}
|
||||
free(p->buffers);
|
||||
if (NULL != p->distance_mask_recv) {
|
||||
free(p->distance_mask_recv);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_grpcomm_coll_t,
|
||||
opal_list_item_t,
|
||||
|
@ -422,3 +422,27 @@ CLEANUP:
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
void orte_grpcomm_base_mark_distance_recv(orte_grpcomm_coll_t *coll,
|
||||
uint32_t distance) {
|
||||
uint32_t maskNumber = distance / 32;
|
||||
uint32_t bitNumber = distance % 32;
|
||||
|
||||
coll->distance_mask_recv[maskNumber] |= (1 << bitNumber);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
unsigned int orte_grpcomm_base_check_distance_recv(orte_grpcomm_coll_t *coll,
|
||||
uint32_t distance) {
|
||||
uint32_t maskNumber = distance / 32;
|
||||
uint32_t bitNumber = distance % 32;
|
||||
|
||||
if (NULL == coll->distance_mask_recv) {
|
||||
return 0;
|
||||
} else {
|
||||
if (0 == distance) {
|
||||
return 1;
|
||||
}
|
||||
return (((coll->distance_mask_recv[maskNumber] & (1 << bitNumber)) != 0) ? 1 : 0);
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "orte/types.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include <math.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "opal/dss/dss.h"
|
||||
@ -85,10 +86,10 @@ static int allgather(orte_grpcomm_coll_t *coll,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns));
|
||||
|
||||
/* record that we contributed */
|
||||
coll->nreported += 1;
|
||||
coll->nreported = 1;
|
||||
|
||||
/* mark local data received */
|
||||
coll->distance_mask_recv |= 1;
|
||||
coll->distance_mask_recv = (uint32_t *)calloc(sizeof(uint32_t), (coll->ndmns - 1));
|
||||
|
||||
/* start by seeding the collection with our own data */
|
||||
opal_dss.copy_payload(&coll->bucket, sendbuf);
|
||||
@ -173,8 +174,8 @@ static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dist
|
||||
|
||||
while (distance < coll->ndmns) {
|
||||
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
|
||||
"%s grpcomm:coll:brks process distance %u (mask recv: 0x%x)",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
|
||||
"%s grpcomm:coll:brks process distance %u)",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
|
||||
|
||||
/* first send my current contents */
|
||||
nv = (coll->ndmns + rank - distance) % coll->ndmns;
|
||||
@ -193,7 +194,7 @@ static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dist
|
||||
return;
|
||||
}
|
||||
coll->nreported += distance;
|
||||
coll->distance_mask_recv |= (uint32_t)(1 << distance);
|
||||
orte_grpcomm_base_mark_distance_recv(coll, distance);
|
||||
OBJ_RELEASE(coll->buffers[distance - 1]);
|
||||
coll->buffers[distance - 1] = NULL;
|
||||
distance = distance << 1;
|
||||
@ -249,14 +250,14 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
|
||||
brks_finalize_coll(coll, rc);
|
||||
return;
|
||||
}
|
||||
assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance)));
|
||||
assert(0 == orte_grpcomm_base_check_distance_recv(coll, distance));
|
||||
|
||||
/* Check whether we can process next distance */
|
||||
if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) {
|
||||
if (orte_grpcomm_base_check_distance_recv(coll, (distance >> 1))) {
|
||||
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
|
||||
"%s grpcomm:coll:brks data from %d distance received, "
|
||||
"Process the next distance (mask recv: 0x%x).",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
|
||||
"Process the next distance.",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
|
||||
/* capture any provided content */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) {
|
||||
OBJ_RELEASE(sig);
|
||||
@ -265,13 +266,13 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
|
||||
return;
|
||||
}
|
||||
coll->nreported += distance;
|
||||
coll->distance_mask_recv |= (uint32_t)(1 << distance);
|
||||
orte_grpcomm_base_mark_distance_recv(coll, distance);
|
||||
brks_allgather_process_data(coll, (uint32_t)(distance << 1));
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
|
||||
"%s grpcomm:coll:brks data from %d distance received, "
|
||||
"still waiting for data (mask recv: 0x%x).",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
|
||||
"still waiting for data.",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
|
||||
if (NULL == coll->buffers) {
|
||||
if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), coll->ndmns - 1))) {
|
||||
rc = OPAL_ERR_OUT_OF_RESOURCE;
|
||||
|
@ -75,7 +75,7 @@ typedef struct {
|
||||
/* number reported in */
|
||||
size_t nreported;
|
||||
/* distance masks for receive */
|
||||
uint32_t distance_mask_recv;
|
||||
uint32_t *distance_mask_recv;
|
||||
/* received buckets */
|
||||
opal_buffer_t ** buffers;
|
||||
/* callback function */
|
||||
|
@ -93,10 +93,10 @@ static int allgather(orte_grpcomm_coll_t *coll,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns));
|
||||
|
||||
/* record that we contributed */
|
||||
coll->nreported += 1;
|
||||
coll->nreported = 1;
|
||||
|
||||
/* mark local data received */
|
||||
coll->distance_mask_recv |= 1;
|
||||
coll->distance_mask_recv = (uint32_t *)calloc(sizeof(uint32_t), log2(coll->ndmns));
|
||||
|
||||
/* start by seeding the collection with our own data */
|
||||
opal_dss.copy_payload(&coll->bucket, sendbuf);
|
||||
@ -178,8 +178,8 @@ static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dista
|
||||
|
||||
while(distance < coll->ndmns) {
|
||||
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
|
||||
"%s grpcomm:coll:recdub process distance %u (mask recv: 0x%x)",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
|
||||
"%s grpcomm:coll:recdub process distance %u",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
|
||||
|
||||
/* first send my current contents */
|
||||
nv = rank ^ distance;
|
||||
@ -199,7 +199,7 @@ static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dista
|
||||
return;
|
||||
}
|
||||
coll->nreported += distance;
|
||||
coll->distance_mask_recv |= (uint32_t)(1 << distance);
|
||||
orte_grpcomm_base_mark_distance_recv(coll, distance);
|
||||
OBJ_RELEASE(coll->buffers[distance_index]);
|
||||
coll->buffers[distance_index] = NULL;
|
||||
distance = distance << 1;
|
||||
@ -255,14 +255,14 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
|
||||
rcd_finalize_coll(coll, rc);
|
||||
return;
|
||||
}
|
||||
assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance)));
|
||||
assert(0 == orte_grpcomm_base_check_distance_recv(coll, distance));
|
||||
|
||||
/* Check whether we can process next distance */
|
||||
if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) {
|
||||
if (orte_grpcomm_base_check_distance_recv(coll, (distance >> 1))) {
|
||||
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
|
||||
"%s grpcomm:coll:recdub data from %d distance received, "
|
||||
"Process the next distance (mask recv: 0x%x).",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
|
||||
"Process the next distance.",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
|
||||
/* capture any provided content */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) {
|
||||
OBJ_RELEASE(sig);
|
||||
@ -271,13 +271,13 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
|
||||
return;
|
||||
}
|
||||
coll->nreported += distance;
|
||||
coll->distance_mask_recv |= (uint32_t)(1 << distance);
|
||||
orte_grpcomm_base_mark_distance_recv(coll, distance);
|
||||
rcd_allgather_process_data(coll, (uint32_t)(distance << 1));
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
|
||||
"%s grpcomm:coll:recdub data from %d distance received, "
|
||||
"still waiting for data (mask recv: 0x%x).",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
|
||||
"still waiting for data.",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
|
||||
if (NULL == coll->buffers) {
|
||||
if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), log2(coll->ndmns)))) {
|
||||
rc = OPAL_ERR_OUT_OF_RESOURCE;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user