1
1

grpcomm: fixed brks and rcd algorithms - added enough space for masks in order to get them working in the large scale.

Этот коммит содержится в:
Nadezhda Kogteva 2015-03-17 17:12:45 +02:00
родитель 50277fec76
Коммит 7c25b4cea6
6 изменённых файлов: 56 добавлений и 26 удалений

Просмотреть файл

@ -84,6 +84,8 @@ ORTE_DECLSPEC int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
void *cbdata); void *cbdata);
ORTE_DECLSPEC orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create); ORTE_DECLSPEC orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create);
ORTE_DECLSPEC void orte_grpcomm_base_mark_distance_recv(orte_grpcomm_coll_t *coll, uint32_t distance);
ORTE_DECLSPEC unsigned int orte_grpcomm_base_check_distance_recv(orte_grpcomm_coll_t *coll, uint32_t distance);
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -119,7 +119,7 @@ static void ccon(orte_grpcomm_coll_t *p)
p->dmns = NULL; p->dmns = NULL;
p->ndmns = 0; p->ndmns = 0;
p->nreported = 0; p->nreported = 0;
p->distance_mask_recv = 0; p->distance_mask_recv = NULL;
p->cbfunc = NULL; p->cbfunc = NULL;
p->cbdata = NULL; p->cbdata = NULL;
p->buffers = NULL; p->buffers = NULL;
@ -134,6 +134,9 @@ static void cdes(orte_grpcomm_coll_t *p)
free(p->dmns); free(p->dmns);
} }
free(p->buffers); free(p->buffers);
if (NULL != p->distance_mask_recv) {
free(p->distance_mask_recv);
}
} }
OBJ_CLASS_INSTANCE(orte_grpcomm_coll_t, OBJ_CLASS_INSTANCE(orte_grpcomm_coll_t,
opal_list_item_t, opal_list_item_t,

Просмотреть файл

@ -422,3 +422,27 @@ CLEANUP:
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
void orte_grpcomm_base_mark_distance_recv(orte_grpcomm_coll_t *coll,
uint32_t distance) {
uint32_t maskNumber = distance / 32;
uint32_t bitNumber = distance % 32;
coll->distance_mask_recv[maskNumber] |= (1 << bitNumber);
return;
}
unsigned int orte_grpcomm_base_check_distance_recv(orte_grpcomm_coll_t *coll,
uint32_t distance) {
uint32_t maskNumber = distance / 32;
uint32_t bitNumber = distance % 32;
if (NULL == coll->distance_mask_recv) {
return 0;
} else {
if (0 == distance) {
return 1;
}
return (((coll->distance_mask_recv[maskNumber] & (1 << bitNumber)) != 0) ? 1 : 0);
}
}

Просмотреть файл

@ -20,6 +20,7 @@
#include "orte/types.h" #include "orte/types.h"
#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_wait.h"
#include <math.h>
#include <string.h> #include <string.h>
#include "opal/dss/dss.h" #include "opal/dss/dss.h"
@ -85,10 +86,10 @@ static int allgather(orte_grpcomm_coll_t *coll,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns));
/* record that we contributed */ /* record that we contributed */
coll->nreported += 1; coll->nreported = 1;
/* mark local data received */ /* mark local data received */
coll->distance_mask_recv |= 1; coll->distance_mask_recv = (uint32_t *)calloc(sizeof(uint32_t), (coll->ndmns - 1));
/* start by seeding the collection with our own data */ /* start by seeding the collection with our own data */
opal_dss.copy_payload(&coll->bucket, sendbuf); opal_dss.copy_payload(&coll->bucket, sendbuf);
@ -173,8 +174,8 @@ static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dist
while (distance < coll->ndmns) { while (distance < coll->ndmns) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks process distance %u (mask recv: 0x%x)", "%s grpcomm:coll:brks process distance %u)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
/* first send my current contents */ /* first send my current contents */
nv = (coll->ndmns + rank - distance) % coll->ndmns; nv = (coll->ndmns + rank - distance) % coll->ndmns;
@ -193,7 +194,7 @@ static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dist
return; return;
} }
coll->nreported += distance; coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance); orte_grpcomm_base_mark_distance_recv(coll, distance);
OBJ_RELEASE(coll->buffers[distance - 1]); OBJ_RELEASE(coll->buffers[distance - 1]);
coll->buffers[distance - 1] = NULL; coll->buffers[distance - 1] = NULL;
distance = distance << 1; distance = distance << 1;
@ -249,14 +250,14 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
brks_finalize_coll(coll, rc); brks_finalize_coll(coll, rc);
return; return;
} }
assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance))); assert(0 == orte_grpcomm_base_check_distance_recv(coll, distance));
/* Check whether we can process next distance */ /* Check whether we can process next distance */
if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) { if (orte_grpcomm_base_check_distance_recv(coll, (distance >> 1))) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks data from %d distance received, " "%s grpcomm:coll:brks data from %d distance received, "
"Process the next distance (mask recv: 0x%x).", "Process the next distance.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
/* capture any provided content */ /* capture any provided content */
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) { if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) {
OBJ_RELEASE(sig); OBJ_RELEASE(sig);
@ -265,13 +266,13 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
return; return;
} }
coll->nreported += distance; coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance); orte_grpcomm_base_mark_distance_recv(coll, distance);
brks_allgather_process_data(coll, (uint32_t)(distance << 1)); brks_allgather_process_data(coll, (uint32_t)(distance << 1));
} else { } else {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks data from %d distance received, " "%s grpcomm:coll:brks data from %d distance received, "
"still waiting for data (mask recv: 0x%x).", "still waiting for data.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
if (NULL == coll->buffers) { if (NULL == coll->buffers) {
if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), coll->ndmns - 1))) { if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), coll->ndmns - 1))) {
rc = OPAL_ERR_OUT_OF_RESOURCE; rc = OPAL_ERR_OUT_OF_RESOURCE;

Просмотреть файл

@ -75,7 +75,7 @@ typedef struct {
/* number reported in */ /* number reported in */
size_t nreported; size_t nreported;
/* distance masks for receive */ /* distance masks for receive */
uint32_t distance_mask_recv; uint32_t *distance_mask_recv;
/* received buckets */ /* received buckets */
opal_buffer_t ** buffers; opal_buffer_t ** buffers;
/* callback function */ /* callback function */

Просмотреть файл

@ -93,10 +93,10 @@ static int allgather(orte_grpcomm_coll_t *coll,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns));
/* record that we contributed */ /* record that we contributed */
coll->nreported += 1; coll->nreported = 1;
/* mark local data received */ /* mark local data received */
coll->distance_mask_recv |= 1; coll->distance_mask_recv = (uint32_t *)calloc(sizeof(uint32_t), log2(coll->ndmns));
/* start by seeding the collection with our own data */ /* start by seeding the collection with our own data */
opal_dss.copy_payload(&coll->bucket, sendbuf); opal_dss.copy_payload(&coll->bucket, sendbuf);
@ -178,8 +178,8 @@ static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dista
while(distance < coll->ndmns) { while(distance < coll->ndmns) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub process distance %u (mask recv: 0x%x)", "%s grpcomm:coll:recdub process distance %u",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
/* first send my current contents */ /* first send my current contents */
nv = rank ^ distance; nv = rank ^ distance;
@ -199,7 +199,7 @@ static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t dista
return; return;
} }
coll->nreported += distance; coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance); orte_grpcomm_base_mark_distance_recv(coll, distance);
OBJ_RELEASE(coll->buffers[distance_index]); OBJ_RELEASE(coll->buffers[distance_index]);
coll->buffers[distance_index] = NULL; coll->buffers[distance_index] = NULL;
distance = distance << 1; distance = distance << 1;
@ -255,14 +255,14 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
rcd_finalize_coll(coll, rc); rcd_finalize_coll(coll, rc);
return; return;
} }
assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance))); assert(0 == orte_grpcomm_base_check_distance_recv(coll, distance));
/* Check whether we can process next distance */ /* Check whether we can process next distance */
if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) { if (orte_grpcomm_base_check_distance_recv(coll, (distance >> 1))) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub data from %d distance received, " "%s grpcomm:coll:recdub data from %d distance received, "
"Process the next distance (mask recv: 0x%x).", "Process the next distance.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
/* capture any provided content */ /* capture any provided content */
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) { if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) {
OBJ_RELEASE(sig); OBJ_RELEASE(sig);
@ -271,13 +271,13 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
return; return;
} }
coll->nreported += distance; coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance); orte_grpcomm_base_mark_distance_recv(coll, distance);
rcd_allgather_process_data(coll, (uint32_t)(distance << 1)); rcd_allgather_process_data(coll, (uint32_t)(distance << 1));
} else { } else {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub data from %d distance received, " "%s grpcomm:coll:recdub data from %d distance received, "
"still waiting for data (mask recv: 0x%x).", "still waiting for data.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
if (NULL == coll->buffers) { if (NULL == coll->buffers) {
if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), log2(coll->ndmns)))) { if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), log2(coll->ndmns)))) {
rc = OPAL_ERR_OUT_OF_RESOURCE; rc = OPAL_ERR_OUT_OF_RESOURCE;