1
1

Continue resolving add_host behavior

Fix a problem in packing/unpacking job updates. There remains a race condition that causes messages to attempt to be sent to the second new daemon before it is completely ready. Not entirely sure where it is coming from.

Refs #4665

Rebase to master. Reset orte_nidmap_communicated if hosts are added. Check for duplicate hostnames in an add_host command. Turn off tree_spawn for dynamic launch of additional daemons.

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2018-01-05 11:28:56 -08:00
родитель cb5dfbe5b1
Коммит 75eb56522c
4 изменённых файлов: 121 добавлений и 77 удалений

Просмотреть файл

@ -109,9 +109,9 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
int rc, v;
orte_job_t *jdata=NULL, *jptr;
orte_job_map_t *map=NULL;
opal_buffer_t *wireup, jobdata;
opal_buffer_t *wireup, jobdata, priorjob;
opal_byte_object_t bo, *boptr;
int32_t numbytes, numjobs;
int32_t numbytes;
int8_t flag;
void *nptr;
uint32_t key;
@ -270,16 +270,17 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
flag = 1;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
OBJ_CONSTRUCT(&jobdata, opal_buffer_t);
numjobs = 0;
rc = opal_hash_table_get_first_key_uint32(orte_job_data, &key, (void **)&jptr, &nptr);
while (OPAL_SUCCESS == rc) {
/* skip the one we are launching now */
if (NULL != jptr && jptr != jdata &&
ORTE_PROC_MY_NAME->jobid != jptr->jobid) {
OBJ_CONSTRUCT(&priorjob, opal_buffer_t);
/* pack the job struct */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&jobdata, &jptr, 1, ORTE_JOB))) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(&priorjob, &jptr, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata);
OBJ_DESTRUCT(&priorjob);
return rc;
}
/* pack the location of each proc */
@ -287,32 +288,33 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jptr->procs, n))) {
continue;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&jobdata, &proc->parent, 1, ORTE_VPID))) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(&priorjob, &proc->parent, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata);
OBJ_DESTRUCT(&priorjob);
return rc;
}
}
++numjobs;
/* pack the jobdata buffer */
wireup = &priorjob;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&jobdata, &wireup, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata);
OBJ_DESTRUCT(&priorjob);
return rc;
}
OBJ_DESTRUCT(&priorjob);
}
rc = opal_hash_table_get_next_key_uint32(orte_job_data, &key, (void **)&jptr, nptr, &nptr);
}
/* pack the number of jobs */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &numjobs, 1, OPAL_INT32))) {
/* pack the jobdata buffer */
wireup = &jobdata;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &wireup, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata);
return rc;
}
if (0 < numjobs) {
/* pack the jobdata buffer */
wireup = &jobdata;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &wireup, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata);
return rc;
}
OBJ_DESTRUCT(&jobdata);
}
OBJ_DESTRUCT(&jobdata);
} else {
flag = 0;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
@ -367,8 +369,8 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
orte_job_t *jdata=NULL, *daemons;
orte_node_t *node;
orte_vpid_t dmnvpid, v;
int32_t n, k;
opal_buffer_t *bptr;
int32_t n;
opal_buffer_t *bptr, *jptr;
orte_proc_t *pptr, *dmn;
orte_app_context_t *app;
int8_t flag;
@ -391,68 +393,69 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
}
if (0 != flag) {
/* see if additional jobs are included in the data */
/* unpack the buffer containing the info */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &n, &cnt, OPAL_INT32))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &bptr, &cnt, OPAL_BUFFER))) {
*job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(bptr);
goto REPORT_ERROR;
}
if (0 < n) {
/* unpack the buffer containing the info */
cnt=1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(bptr, &jptr, &cnt, OPAL_BUFFER))) {
/* unpack each job and add it to the local orte_job_data array */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &bptr, &cnt, OPAL_BUFFER))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(jptr, &jdata, &cnt, ORTE_JOB))) {
*job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(bptr);
OBJ_RELEASE(jptr);
goto REPORT_ERROR;
}
for (k=0; k < n; k++) {
/* unpack each job and add it to the local orte_job_data array */
/* check to see if we already have this one */
if (NULL == orte_get_job_data_object(jdata->jobid)) {
/* nope - add it */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
} else {
/* yep - so we can drop this copy */
jdata->jobid = ORTE_JOBID_INVALID;
OBJ_RELEASE(jdata);
OBJ_RELEASE(jptr);
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(bptr, &jdata, &cnt, ORTE_JOB))) {
*job = ORTE_JOBID_INVALID;
continue;
}
/* unpack the location of each proc in this job */
for (v=0; v < jdata->num_procs; v++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, v))) {
pptr = OBJ_NEW(orte_proc_t);
pptr->name.jobid = jdata->jobid;
pptr->name.vpid = v;
opal_pointer_array_set_item(jdata->procs, v, pptr);
}
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(jptr, &dmnvpid, &cnt, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(jptr);
OBJ_RELEASE(bptr);
goto REPORT_ERROR;
}
/* check to see if we already have this one */
if (NULL == orte_get_job_data_object(jdata->jobid)) {
/* nope - add it */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
} else {
/* yep - so we can drop this copy */
jdata->jobid = ORTE_JOBID_INVALID;
OBJ_RELEASE(jdata);
continue;
}
/* unpack the location of each proc in this job */
for (v=0; v < jdata->num_procs; v++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, v))) {
pptr = OBJ_NEW(orte_proc_t);
pptr->name.jobid = jdata->jobid;
pptr->name.vpid = v;
opal_pointer_array_set_item(jdata->procs, v, pptr);
}
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(bptr, &dmnvpid, &cnt, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(jdata);
goto REPORT_ERROR;
}
/* lookup the daemon */
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, dmnvpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto REPORT_ERROR;
}
/* connect the two */
OBJ_RETAIN(dmn->node);
pptr->node = dmn->node;
/* lookup the daemon */
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, dmnvpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
OBJ_RELEASE(jptr);
OBJ_RELEASE(bptr);
goto REPORT_ERROR;
}
/* connect the two */
OBJ_RETAIN(dmn->node);
pptr->node = dmn->node;
}
/* release the buffer */
OBJ_RELEASE(bptr);
OBJ_RELEASE(jptr);
cnt = 1;
}
OBJ_RELEASE(bptr);
}
/* unpack the job we are to launch */

Просмотреть файл

@ -15,7 +15,7 @@
* Copyright (c) 2008-2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2011-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2017 Research Organization for Information Science
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
@ -887,6 +887,10 @@ static int remote_spawn(void)
opal_list_append(&launch_list, &caddy->super);
}
OPAL_LIST_DESTRUCT(&coll);
/* we NEVER use tree-spawn for secondary launches - e.g.,
* due to a dynamic launch requesting add_hosts - so be
* sure to turn it off here */
mca_plm_rsh_component.no_tree_spawn = true;
/* trigger the event to start processing the launch list */
OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
@ -1280,6 +1284,10 @@ static void launch_daemons(int fd, short args, void *cbdata)
OBJ_RETAIN(caddy->daemon);
opal_list_append(&launch_list, &caddy->super);
}
/* we NEVER use tree-spawn for secondary launches - e.g.,
* due to a dynamic launch requesting add_hosts - so be
* sure to turn it off here */
mca_plm_rsh_component.no_tree_spawn = true;
/* set the job state to indicate the daemons are launched */
state->jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;

Просмотреть файл

@ -11,7 +11,9 @@
* All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -456,9 +458,9 @@ int orte_ras_base_add_hosts(orte_job_t *jdata)
{
int rc;
opal_list_t nodes;
int i;
int i, n;
orte_app_context_t *app;
orte_node_t *node;
orte_node_t *node, *next, *nptr;
char *hosts;
/* construct a list to hold the results */
@ -532,19 +534,35 @@ int orte_ras_base_add_hosts(orte_job_t *jdata)
/* if something was found, we add that to our global pool */
if (!opal_list_is_empty(&nodes)) {
/* mark all the nodes as "added" */
OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
/* the node insert code doesn't check for uniqueness, so we will
* do so here - yes, this is an ugly, non-scalable loop, but this
* is the exception case and so we can do it here */
OPAL_LIST_FOREACH_SAFE(node, next, &nodes, orte_node_t) {
node->state = ORTE_NODE_STATE_ADDED;
for (n=0; n < orte_node_pool->size; n++) {
if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
continue;
}
if (0 == strcmp(node->name, nptr->name)) {
opal_list_remove_item(&nodes, &node->super);
OBJ_RELEASE(node);
break;
}
}
}
/* store the results in the global resource pool - this removes the
* list items
*/
if (ORTE_SUCCESS != (rc = orte_ras_base_node_insert(&nodes, jdata))) {
ORTE_ERROR_LOG(rc);
if (!opal_list_is_empty(&nodes)) {
/* store the results in the global resource pool - this removes the
* list items
*/
if (ORTE_SUCCESS != (rc = orte_ras_base_node_insert(&nodes, jdata))) {
ORTE_ERROR_LOG(rc);
}
/* mark that an updated nidmap must be communicated to existing daemons */
orte_nidmap_communicated = false;
}
/* cleanup */
OBJ_DESTRUCT(&nodes);
}
/* cleanup */
OPAL_LIST_DESTRUCT(&nodes);
/* shall we display the results? */
if (0 < opal_output_get_verbosity(orte_ras_base_framework.framework_output)) {

Просмотреть файл

@ -47,6 +47,21 @@ int main(int argc, char* argv[])
}
MPI_Comm_disconnect(&child);
printf("Parent disconnected\n");
/* do it again */
MPI_Info_set(info, "add-host", "rhc003:24");
if (MPI_SUCCESS != (rc = MPI_Comm_spawn(argv[0], MPI_ARGV_NULL, 3, info,
0, MPI_COMM_WORLD, &child, MPI_ERRCODES_IGNORE))) {
printf("Child failed to spawn\n");
return rc;
}
printf("Parent done with second spawn\n");
if (0 == rank) {
msg = 38;
printf("Parent sending message to second children\n");
MPI_Send(&msg, 1, MPI_INT, 0, 1, child);
}
MPI_Comm_disconnect(&child);
printf("Parent disconnected again\n");
}
/* Otherwise, we're the child */
else {