1
1

Fix singleton comm-spawn, yet again. The new grpcomm collectives require a complete knowledge of every active proc in the system in case they participate in a collective. So ensure we pass the required job info when we spawn new daemons, and construct the necessary connections to allow grpcomm to operate.

Этот коммит содержится в:
Ralph Castain 2014-12-03 18:09:28 -08:00
родитель 983bd49f11
Коммит c88f181efe
6 изменённых файлов: 155 добавлений и 9 удалений

Просмотреть файл

@ -252,7 +252,6 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
/* now get the daemons involved */
if (ORTE_SUCCESS != (rc = create_dmns(sig, &coll->dmns, &coll->ndmns))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(coll);
return NULL;
}
return coll;
@ -329,6 +328,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
OPAL_LIST_DESTRUCT(&ds);
return ORTE_ERR_NOT_FOUND;
}
opal_output_verbose(5, orte_grpcomm_base_framework.framework_output,
"%s sign: GETTING PROC OBJECT FOR %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&sig->signature[n]));
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
@ -337,6 +340,7 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (NULL == proc->node || NULL == proc->node->daemon) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
vpid = proc->node->daemon->name.vpid;

Просмотреть файл

@ -100,12 +100,12 @@
int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
orte_jobid_t job)
{
int rc;
orte_job_t *jdata=NULL;
int rc, i;
orte_job_t *jdata=NULL, *jptr;
orte_job_map_t *map=NULL;
opal_buffer_t *wireup;
opal_buffer_t *wireup, jobdata;
opal_byte_object_t bo, *boptr;
int32_t numbytes;
int32_t numbytes, numjobs;
int8_t flag;
/* get the job data pointer */
@ -169,6 +169,57 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
opal_dss.pack(data, &flag, 1, OPAL_INT8);
}
/* check if this job caused daemons to be spawned - if it did,
* then we need to ensure that those daemons get a complete
* copy of all active jobs so the grpcomm collectives can
* properly work should a proc from one of the other jobs
* interact with this one */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCHED_DAEMONS, NULL, OPAL_BOOL)) {
OBJ_CONSTRUCT(&jobdata, opal_buffer_t);
numjobs = 0;
for (i=0; i < orte_job_data->size; i++) {
if (NULL == (jptr = (orte_job_t*)opal_pointer_array_get_item(orte_job_data, i))) {
continue;
}
if (ORTE_JOB_STATE_UNTERMINATED < jptr->state) {
/* job already terminated - ignore it */
continue;
}
if (jptr == jdata) {
/* ignore the job we are looking at - we'll get it separately */
continue;
}
/* pack the job struct */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&jobdata, &jptr, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
return rc;
}
++numjobs;
}
if (0 < numjobs) {
/* pack the number of jobs */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numjobs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the jobdata buffer */
wireup = &jobdata;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &wireup, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata);
return rc;
}
OBJ_DESTRUCT(&jobdata);
}
} else {
numjobs = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numjobs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* pack the job struct */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
@ -191,7 +242,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
int rc;
orte_std_cntr_t cnt;
orte_job_t *jdata=NULL, *daemons;
int32_t n, k;
int32_t n, j, k;
orte_proc_t *pptr, *dmn;
opal_buffer_t *bptr;
orte_app_context_t *app;
@ -203,6 +254,64 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
*job = ORTE_JOBID_INVALID;
/* unpack the flag to see if additional jobs are included in the data */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &n, &cnt, OPAL_INT32))) {
*job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
/* get the daemon job object */
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
if (0 < n) {
/* unpack the buffer containing the info */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &bptr, &cnt, OPAL_BUFFER))) {
*job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
for (k=0; k < n; k++) {
/* unpack each job and add it to the local orte_job_data array */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(bptr, &jdata, &cnt, ORTE_JOB))) {
*job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
/* check to see if we already have this one */
if (NULL == orte_get_job_data_object(jdata->jobid)) {
/* nope - add it */
opal_pointer_array_set_item(orte_job_data, ORTE_LOCAL_JOBID(jdata->jobid), jdata);
/* connect each proc to its node object */
for (j=0; j < jdata->procs->size; j++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, j))) {
continue;
}
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, pptr->parent))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
OBJ_RETAIN(dmn->node);
pptr->node = dmn->node;
/* add proc to node - note that num_procs for the
* node was already correctly unpacked, so don't
* increment it here */
OBJ_RETAIN(pptr);
opal_pointer_array_add(dmn->node->procs, pptr);
}
} else {
/* yep - so we can drop this copy */
jdata->jobid = ORTE_JOBID_INVALID;
OBJ_RELEASE(jdata);
}
}
/* release the buffer */
OBJ_RELEASE(bptr);
}
/* unpack the job we are to launch */
cnt=1;
@ -300,7 +409,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
}
/* check the procs */
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
for (n=0; n < jdata->procs->size; n++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, n))) {
continue;
@ -309,6 +417,16 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
/* not ready for use yet */
continue;
}
opal_output_verbose(5, orte_odls_base_framework.framework_output,
"%s GETTING DAEMON FOR PROC %s WITH PARENT %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pptr->name),
ORTE_VPID_PRINT(pptr->parent));
if (ORTE_VPID_INVALID == pptr->parent) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
ORTE_FORCED_TERMINATE(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
/* connect the proc to its node object */
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, pptr->parent))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);

Просмотреть файл

@ -1326,7 +1326,7 @@ int orte_plm_base_orted_append_basic_args(int *argc, char ***argv,
int orte_plm_base_setup_virtual_machine(orte_job_t *jdata)
{
orte_node_t *node, *nptr;
orte_proc_t *proc;
orte_proc_t *proc, *pptr;
orte_job_map_t *map=NULL;
int rc, i;
orte_job_t *daemons;
@ -1846,6 +1846,12 @@ int orte_plm_base_setup_virtual_machine(orte_job_t *jdata)
if (ORTE_VPID_INVALID == map->daemon_vpid_start) {
map->daemon_vpid_start = proc->name.vpid;
}
/* loop across all app procs on this node and update their parent */
for (i=0; i < node->procs->size; i++) {
if (NULL != (pptr = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {
pptr->parent = proc->name.vpid;
}
}
}
if (orte_process_info.num_procs != daemons->num_procs) {
@ -1869,5 +1875,15 @@ int orte_plm_base_setup_virtual_machine(orte_job_t *jdata)
/* mark that the daemon job changed */
ORTE_FLAG_SET(daemons, ORTE_JOB_FLAG_UPDATED);
/* if new daemons are being launched, mark that this job
* caused it to happen */
if (0 < map->num_new_daemons) {
if (ORTE_SUCCESS != (rc = orte_set_attribute(&jdata->attributes, ORTE_JOB_LAUNCHED_DAEMONS,
true, NULL, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -558,6 +558,7 @@ int orte_daemon(int argc, char *argv[])
proc = OBJ_NEW(orte_proc_t);
proc->name.jobid = jdata->jobid;
proc->name.vpid = 0;
proc->parent = 0;
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
proc->state = ORTE_PROC_STATE_RUNNING;
proc->app_idx = 0;
@ -567,6 +568,10 @@ int orte_daemon(int argc, char *argv[])
OBJ_RETAIN(node); /* keep accounting straight */
opal_pointer_array_add(jdata->procs, proc);
jdata->num_procs = 1;
/* add the node to the job map */
OBJ_RETAIN(node);
opal_pointer_array_add(jdata->map->nodes, node);
jdata->map->num_nodes++;
/* and it obviously is on the node */
OBJ_RETAIN(proc);
opal_pointer_array_add(node->procs, proc);

Просмотреть файл

@ -247,7 +247,9 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
return "JOB-FWD-IO-TO-TOOL";
case ORTE_JOB_PHYSICAL_CPUIDS:
return "JOB-PHYSICAL-CPUIDS";
case ORTE_JOB_LAUNCHED_DAEMONS:
return "JOB-LAUNCHED-DAEMONS";
case ORTE_PROC_NOBARRIER:
return "PROC-NOBARRIER";
case ORTE_PROC_CPU_BITMAP:

Просмотреть файл

@ -123,6 +123,7 @@ typedef uint16_t orte_job_flags_t;
#define ORTE_JOB_FINI_BAR_ID (ORTE_JOB_START_KEY + 32) // orte_grpcomm_coll_id_t - collective id
#define ORTE_JOB_FWDIO_TO_TOOL (ORTE_JOB_START_KEY + 33) // Forward IO for this job to the tool requesting its spawn
#define ORTE_JOB_PHYSICAL_CPUIDS (ORTE_JOB_START_KEY + 34) // Hostfile contains physical jobids in cpuset
#define ORTE_JOB_LAUNCHED_DAEMONS (ORTE_JOB_START_KEY + 35) // Job caused new daemons to be spawned
#define ORTE_JOB_MAX_KEY 300