fixed issue with grpcomm rcd and brks algorithms which led to performance issues: data just for part of processes was unpacked and stored locally during fence, therefore clients were forced to ask daemons for data directly during get request
Этот коммит содержится в:
родитель
010dce307a
Коммит
48eae25b8f
@ -307,6 +307,8 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
|
|||||||
static int brks_finalize_coll(orte_grpcomm_coll_t *coll, int ret) {
|
static int brks_finalize_coll(orte_grpcomm_coll_t *coll, int ret) {
|
||||||
opal_buffer_t *reply;
|
opal_buffer_t *reply;
|
||||||
int rc;
|
int rc;
|
||||||
|
orte_job_t *jdata;
|
||||||
|
uint64_t nprocs;
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
||||||
"%s grpcomm:coll:brks declared collective complete",
|
"%s grpcomm:coll:brks declared collective complete",
|
||||||
@ -314,11 +316,25 @@ static int brks_finalize_coll(orte_grpcomm_coll_t *coll, int ret) {
|
|||||||
|
|
||||||
reply = OBJ_NEW(opal_buffer_t);
|
reply = OBJ_NEW(opal_buffer_t);
|
||||||
|
|
||||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &coll->nreported, 1, OPAL_UINT64))) {
|
/* pack the number of procs involved in the collective
|
||||||
|
* so the recipients can unpack any collected data */
|
||||||
|
if (1 == coll->sig->sz) {
|
||||||
|
/* get the job object for this entry */
|
||||||
|
if (NULL == (jdata = orte_get_job_data_object(coll->sig->signature[0].jobid))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
OBJ_RELEASE(reply);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
nprocs = jdata->num_procs;
|
||||||
|
} else {
|
||||||
|
nprocs = coll->sig->sz;
|
||||||
|
}
|
||||||
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &nprocs, 1, OPAL_UINT64))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
OBJ_RELEASE(reply);
|
OBJ_RELEASE(reply);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* transfer the collected bucket */
|
/* transfer the collected bucket */
|
||||||
opal_dss.copy_payload(reply, &coll->bucket);
|
opal_dss.copy_payload(reply, &coll->bucket);
|
||||||
|
|
||||||
|
@ -311,6 +311,8 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
|
|||||||
static int rcd_finalize_coll(orte_grpcomm_coll_t *coll, int ret) {
|
static int rcd_finalize_coll(orte_grpcomm_coll_t *coll, int ret) {
|
||||||
opal_buffer_t *reply;
|
opal_buffer_t *reply;
|
||||||
int rc;
|
int rc;
|
||||||
|
orte_job_t *jdata;
|
||||||
|
uint64_t nprocs;
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
||||||
"%s grpcomm:coll:recdub declared collective complete",
|
"%s grpcomm:coll:recdub declared collective complete",
|
||||||
@ -318,11 +320,25 @@ static int rcd_finalize_coll(orte_grpcomm_coll_t *coll, int ret) {
|
|||||||
|
|
||||||
reply = OBJ_NEW(opal_buffer_t);
|
reply = OBJ_NEW(opal_buffer_t);
|
||||||
|
|
||||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &coll->nreported, 1, OPAL_UINT64))) {
|
/* pack the number of procs involved in the collective
|
||||||
|
* so the recipients can unpack any collected data */
|
||||||
|
if (1 == coll->sig->sz) {
|
||||||
|
/* get the job object for this entry */
|
||||||
|
if (NULL == (jdata = orte_get_job_data_object(coll->sig->signature[0].jobid))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
OBJ_RELEASE(reply);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
nprocs = jdata->num_procs;
|
||||||
|
} else {
|
||||||
|
nprocs = coll->sig->sz;
|
||||||
|
}
|
||||||
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &nprocs, 1, OPAL_UINT64))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
OBJ_RELEASE(reply);
|
OBJ_RELEASE(reply);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* transfer the collected bucket */
|
/* transfer the collected bucket */
|
||||||
opal_dss.copy_payload(reply, &coll->bucket);
|
opal_dss.copy_payload(reply, &coll->bucket);
|
||||||
|
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user