1
1

Consolidate the daemon wireup message into the launch message. The daemons don't need their contact info prior to the launch message anyway. This not only eliminates a job-wide communication from the startup procedure, but it also resolves a race condition reported when operating across highly distributed (i.e., cross-country) networks. In such scenarios, it proved possible for a daemon to receive its launch message -before- it had received the contact info message, even though the latter had been sent first!

This eliminates that problem...

This commit was SVN r18126.
Этот коммит содержится в:
Ralph Castain 2008-04-10 15:35:11 +00:00
родитель 4b798cf29a
Коммит 851279fc9f
4 изменённых файлов: 71 добавлений и 35 удалений

Просмотреть файл

@ -80,6 +80,46 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
orte_std_cntr_t i;
orte_vpid_t j;
orte_vpid_t invalid_vpid=ORTE_VPID_INVALID;
opal_buffer_t *wireup;
opal_byte_object_t bo, *boptr;
int32_t numbytes;
/* get wireup info for daemons per the selected routing module */
wireup = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_routed.get_wireup_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* if anything was inserted, put it in a byte object for xmission */
if (0 < wireup->bytes_used) {
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
/* pack the number of bytes required by payload */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numbytes, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* pack the byte object */
bo.size = numbytes;
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* release the data since it has now been copied into our buffer */
free(bo.bytes);
} else {
/* pack numbytes=0 so the unpack routine remains sync'd to us */
numbytes = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numbytes, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
}
OBJ_RELEASE(wireup);
/* get the job data pointer */
if (NULL == (jdata = orte_get_job_data_object(job))) {
@ -238,6 +278,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
char *slot_str;
bool node_oversubscribed;
orte_odls_job_t *jobdat;
opal_buffer_t wireup;
opal_byte_object_t *bo;
int32_t numbytes;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:constructing child list",
@ -257,7 +300,34 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
*node_included = false;
*oversubscribed = false;
*override_oversubscribed = false;
/* unpack the #bytes of daemon wireup info in the message */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &numbytes, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* any bytes there? */
if (0 < numbytes) {
/* unpack the byte object */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* load it into a buffer */
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
opal_dss.load(&wireup, bo->bytes, bo->size);
/* pass it for processing */
if (ORTE_SUCCESS != (rc = orte_routed.init_routes(ORTE_PROC_MY_NAME->jobid, &wireup))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&wireup);
return rc;
}
/* done with the buffer - dump it */
OBJ_DESTRUCT(&wireup);
}
/* unpack the jobid we are to launch */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, job, &cnt, ORTE_JOBID))) {

Просмотреть файл

@ -301,7 +301,6 @@ CLEANUP:
int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
{
int rc;
opal_buffer_t *wireup;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:daemon_callback",
@ -336,23 +335,6 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
ORTE_ERROR_LOG(rc);
}
/* get wireup info for daemons per the selected routing module */
wireup = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_routed.get_wireup_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* if anything was inserted, send it out */
if (0 < wireup->bytes_used) {
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, wireup, ORTE_RML_TAG_RML_INFO_UPDATE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
}
OBJ_RELEASE(wireup);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -561,16 +561,8 @@ static int route_lost(const orte_process_name_t *route)
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf)
{
orte_rml_cmd_flag_t command;
int rc;
/* pack the update-RML command */
command = ORTE_RML_UPDATE_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_RML_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);

Просмотреть файл

@ -566,16 +566,8 @@ static int route_lost(const orte_process_name_t *route)
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf)
{
orte_rml_cmd_flag_t command;
int rc;
/* pack the update-RML command */
command = ORTE_RML_UPDATE_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_RML_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);