Fix comm_spawn (maybe).
Comm_spawn was sticking during spawn_multiple because of a problem in the dpm - the modex there is asking processes to talk to each other in an allgather_list operation, but the procs don't have the required contact info to do so. The solution here was to ensure that all parent procs have full contact info for procs in the child job. Admittedly, this isn't the long-term answer. We would like to have the contact info given to only the parent procs that were involved in the comm_spawn. There is a way to do that, but this will suffice to keep things working until that can be implemented and tested. This commit was SVN r17772.
Этот коммит содержится в:
родитель
57a72c412a
Коммит
b110a247be
@ -479,6 +479,7 @@ static int spawn(int count, char **array_of_commands,
|
||||
|
||||
orte_job_t *jdata;
|
||||
orte_app_context_t *app;
|
||||
bool local_spawn, non_mpi;
|
||||
|
||||
/* parse the info object */
|
||||
/* check potentially for:
|
||||
@ -628,8 +629,18 @@ static int spawn(int count, char **array_of_commands,
|
||||
* the specified app is to be launched by the local orted as a
|
||||
* "slave" process, typically to support an attached co-processor
|
||||
*/
|
||||
ompi_info_get_bool(array_of_info[i], "ompi_local_slave", &jdata->local_spawn, &flag);
|
||||
|
||||
ompi_info_get_bool(array_of_info[i], "ompi_local_slave", &local_spawn, &flag);
|
||||
if ( local_spawn ) {
|
||||
jdata->controls |= ORTE_JOB_CONTROL_LOCAL_SPAWN;
|
||||
}
|
||||
|
||||
/* see if this is a non-mpi job - if so, then set the flag so ORTE
|
||||
* knows what to do
|
||||
*/
|
||||
ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi, &flag);
|
||||
if (non_mpi) {
|
||||
jdata->controls |= ORTE_JOB_CONTROL_NON_ORTE_JOB;
|
||||
}
|
||||
}
|
||||
|
||||
/* default value: If the user did not tell us where to look for the
|
||||
|
@ -65,7 +65,7 @@ int orte_plm_proxy_spawn(orte_job_t *jdata)
|
||||
}
|
||||
|
||||
/* identify who gets this command - the HNP or the local orted */
|
||||
if (jdata->local_spawn) {
|
||||
if (jdata->controls & ORTE_JOB_CONTROL_LOCAL_SPAWN) {
|
||||
/* for now, this is unsupported */
|
||||
opal_output(0, "LOCAL DAEMON SPAWN IS CURRENTLY UNSUPPORTED");
|
||||
target = ORTE_PROC_MY_HNP;
|
||||
|
@ -36,6 +36,8 @@
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
#include "orte/mca/grpcomm/grpcomm.h"
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
@ -100,10 +102,11 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
|
||||
{
|
||||
orte_message_event_t *mev = (orte_message_event_t*)data;
|
||||
orte_plm_cmd_flag_t command;
|
||||
orte_rml_cmd_flag_t cmd;
|
||||
orte_std_cntr_t count;
|
||||
orte_jobid_t job;
|
||||
orte_job_t *jdata;
|
||||
opal_buffer_t answer;
|
||||
opal_buffer_t answer, xchg;
|
||||
orte_vpid_t vpid;
|
||||
orte_proc_t **procs;
|
||||
orte_proc_state_t state;
|
||||
@ -132,19 +135,42 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
|
||||
goto ANSWER_LAUNCH;
|
||||
}
|
||||
|
||||
/* launch it */
|
||||
if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) {
|
||||
/* launch it */
|
||||
if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto ANSWER_LAUNCH;
|
||||
}
|
||||
job = jdata->jobid;
|
||||
|
||||
/* if the child is an ORTE job, wait for the procs to report they are alive */
|
||||
if (!(jdata->controls & ORTE_JOB_CONTROL_NON_ORTE_JOB)) {
|
||||
ORTE_PROGRESSED_WAIT(false, jdata->num_reported, jdata->num_procs);
|
||||
/* pack the update command */
|
||||
OBJ_CONSTRUCT(&xchg, opal_buffer_t);
|
||||
cmd = ORTE_RML_UPDATE_CMD;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&xchg, &cmd, 1, ORTE_RML_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto ANSWER_LAUNCH;
|
||||
}
|
||||
job = jdata->jobid;
|
||||
/* get the contact data of the child job */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(job, &xchg))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&xchg);
|
||||
goto ANSWER_LAUNCH;
|
||||
}
|
||||
/* send it to the parents */
|
||||
if (ORTE_SUCCESS != (ret = orte_grpcomm.xcast(mev->sender.jobid, &xchg, ORTE_RML_TAG_RML_INFO_UPDATE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
OBJ_DESTRUCT(&xchg);
|
||||
}
|
||||
|
||||
ANSWER_LAUNCH:
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:base:receive job %s launched",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(job)));
|
||||
|
||||
ANSWER_LAUNCH:
|
||||
/* pack the jobid to be returned */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&answer, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
|
@ -107,65 +107,6 @@ int orte_dt_copy_job(orte_job_t **dest, orte_job_t *src, opal_data_type_t type)
|
||||
(*dest) = src;
|
||||
OBJ_RETAIN(src);
|
||||
|
||||
#if 0
|
||||
orte_std_cntr_t i;
|
||||
int rc;
|
||||
|
||||
/* create the new object */
|
||||
*dest = OBJ_NEW(orte_job_t);
|
||||
if (NULL == *dest) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* copy data into it */
|
||||
(*dest)->jobid = src->jobid;
|
||||
|
||||
if (0 < src->num_apps) {
|
||||
(*dest)->apps = (orte_app_context_t**)malloc(src->num_apps * sizeof(orte_app_context_t*));
|
||||
if (NULL == (*dest)->apps) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
for (i=0; i < src->num_apps; i++) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy((void*)&((*dest)->apps[i]), src->apps[i], ORTE_APP_CONTEXT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
(*dest)->num_apps = src->num_apps;
|
||||
}
|
||||
|
||||
(*dest)->total_slots_alloc = src->total_slots_alloc;
|
||||
|
||||
(*dest)->num_procs = src->num_procs;
|
||||
for (i=0; i < src->procs->size; i++) {
|
||||
if (NULL != src->procs->addr[i]) {
|
||||
/* need to use pointer_array_set_item here */
|
||||
(*dest)->procs->addr[i] = src->procs->addr[i];
|
||||
OBJ_RETAIN(src->procs->addr[i]); /* keep the instance count correct */
|
||||
}
|
||||
}
|
||||
|
||||
(*dest)->map = src->map;
|
||||
OBJ_RETAIN(src->map); /* keep the instance count correct */
|
||||
|
||||
(*dest)->bookmark = src->bookmark;
|
||||
(*dest)->oversubscribe_override = src->oversubscribe_override;
|
||||
(*dest)->state = src->state;
|
||||
|
||||
(*dest)->num_terminated = src->num_terminated;
|
||||
(*dest)->abort = src->abort;
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
(*dest)->ckpt_state = src->ckpt_state;
|
||||
if (NULL != src->ckpt_snapshot_ref) {
|
||||
(*dest)->ckpt_snapshot_ref = strdup(src->ckpt_snapshot_ref);
|
||||
}
|
||||
if (NULL != src->ckpt_snapshot_loc) {
|
||||
(*dest)->ckpt_snapshot_loc = strdup(src->ckpt_snapshot_loc);
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -176,49 +117,6 @@ int orte_dt_copy_node(orte_node_t **dest, orte_node_t *src, opal_data_type_t typ
|
||||
{
|
||||
(*dest) = src;
|
||||
OBJ_RETAIN(src);
|
||||
#if 0
|
||||
orte_std_cntr_t i;
|
||||
|
||||
/* create the new object */
|
||||
*dest = OBJ_NEW(orte_node_t);
|
||||
if (NULL == *dest) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* copy data into it */
|
||||
if (NULL != src->name) {
|
||||
(*dest)->name = strdup(src->name);
|
||||
}
|
||||
(*dest)->nodeid = src->nodeid;
|
||||
|
||||
(*dest)->index = src->index;
|
||||
if (NULL != src->daemon) {
|
||||
(*dest)->daemon = src->daemon;
|
||||
OBJ_RETAIN(src->daemon); /* keep the instance count correct */
|
||||
}
|
||||
(*dest)->launch_id = src->launch_id;
|
||||
|
||||
(*dest)->num_procs = src->num_procs;
|
||||
for (i=0; i < src->procs->size; i++) {
|
||||
if (NULL != src->procs->addr[i]) {
|
||||
(*dest)->procs->addr[i] = src->procs->addr[i];
|
||||
OBJ_RETAIN(src->procs->addr[i]); /* keep the instance count correct */
|
||||
}
|
||||
}
|
||||
|
||||
(*dest)->oversubscribed = src->oversubscribed;
|
||||
(*dest)->arch = src->arch;
|
||||
(*dest)->state = src->state;
|
||||
(*dest)->slots = src->slots;
|
||||
(*dest)->slots_inuse = src->slots_inuse;
|
||||
(*dest)->slots_alloc = src->slots_alloc;
|
||||
(*dest)->slots_max = src->slots_max;
|
||||
if (NULL != src->username) {
|
||||
(*dest)->username = strdup(src->username);
|
||||
}
|
||||
(*dest)->oversubscribe_override = src->oversubscribe_override;
|
||||
#endif
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -229,39 +127,6 @@ int orte_dt_copy_proc(orte_proc_t **dest, orte_proc_t *src, opal_data_type_t typ
|
||||
{
|
||||
(*dest) = src;
|
||||
OBJ_RETAIN(src);
|
||||
#if 0
|
||||
/* create the new object */
|
||||
*dest = OBJ_NEW(orte_proc_t);
|
||||
if (NULL == *dest) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* copy data into it */
|
||||
(*dest)->name = src->name;
|
||||
(*dest)->pid = src->pid;
|
||||
(*dest)->local_rank = src->local_rank;
|
||||
(*dest)->state = src->state;
|
||||
(*dest)->app_idx = src->app_idx;
|
||||
if (NULL != src->slot_list) {
|
||||
(*dest)->slot_list = strdup(src->slot_list);
|
||||
}
|
||||
|
||||
if (NULL != src->node) {
|
||||
(*dest)->node = src->node;
|
||||
OBJ_RETAIN(src->node); /* keep the instance count correct */
|
||||
}
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
(*dest)->ckpt_state = src->ckpt_state;
|
||||
if (NULL != src->ckpt_snapshot_ref) {
|
||||
(*dest)->ckpt_snapshot_ref = strdup(src->ckpt_snapshot_ref);
|
||||
}
|
||||
if (NULL != src->ckpt_snapshot_loc) {
|
||||
(*dest)->ckpt_snapshot_loc = strdup(src->ckpt_snapshot_loc);
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -270,6 +135,9 @@ int orte_dt_copy_proc(orte_proc_t **dest, orte_proc_t *src, opal_data_type_t typ
|
||||
*/
|
||||
int orte_dt_copy_app_context(orte_app_context_t **dest, orte_app_context_t *src, opal_data_type_t type)
|
||||
{
|
||||
(*dest) = src;
|
||||
OBJ_RETAIN(src);
|
||||
#if 0
|
||||
/* create the new object */
|
||||
*dest = OBJ_NEW(orte_app_context_t);
|
||||
if (NULL == *dest) {
|
||||
@ -314,7 +182,7 @@ int orte_dt_copy_app_context(orte_app_context_t **dest, orte_app_context_t *src,
|
||||
if (NULL != src->prefix_dir) {
|
||||
(*dest)->prefix_dir = strdup(src->prefix_dir);
|
||||
}
|
||||
|
||||
#endif
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -177,7 +177,12 @@ int orte_dt_pack_job(opal_buffer_t *buffer, const void *src,
|
||||
}
|
||||
}
|
||||
|
||||
/* ignore the local_spawn flag - it never needs to be sent */
|
||||
/* pack the control flags */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack_buffer(buffer,
|
||||
(void*)(&(jobs[i]->controls)), 1, OPAL_UINT16))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* pack the total slots allocated to the job */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack_buffer(buffer,
|
||||
|
@ -204,9 +204,9 @@ int orte_dt_print_job(char **output, char *prefix, orte_job_t *src, opal_data_ty
|
||||
asprintf(&pfx2, "%s", prefix);
|
||||
}
|
||||
|
||||
asprintf(&tmp, "\n%sData for job: %s\tNum apps: %ld\tLocal spawn: %s\tState: %0x\tAbort: %s", pfx2,
|
||||
asprintf(&tmp, "\n%sData for job: %s\tNum apps: %ld\tControls: %0x\tState: %0x\tAbort: %s", pfx2,
|
||||
ORTE_JOBID_PRINT(src->jobid),
|
||||
(long)src->num_apps, src->local_spawn ? "True" : "False",
|
||||
(long)src->num_apps, src->controls,
|
||||
src->state, src->abort ? "True" : "False");
|
||||
|
||||
asprintf(&pfx, "%s\t", pfx2);
|
||||
|
@ -192,8 +192,14 @@ int orte_dt_unpack_job(opal_buffer_t *buffer, void *dest,
|
||||
}
|
||||
}
|
||||
|
||||
/* ignore the local_spawn flag - it never is sent */
|
||||
|
||||
/* unpack control flags */
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack_buffer(buffer,
|
||||
(&(jobs[i]->controls)), &n, OPAL_UINT16))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* unpack the total slots allocated to the job */
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack_buffer(buffer,
|
||||
|
@ -159,6 +159,10 @@ typedef struct {
|
||||
} orte_node_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_node_t);
|
||||
|
||||
/* define a set of flags to control the launch of a job */
|
||||
#define ORTE_JOB_CONTROL_LOCAL_SPAWN (uint16_t) 0x01
|
||||
#define ORTE_JOB_CONTROL_NON_ORTE_JOB (uint16_t) 0x02
|
||||
|
||||
typedef struct {
|
||||
/** Base object so this can be put on a list */
|
||||
opal_list_item_t super;
|
||||
@ -168,8 +172,10 @@ typedef struct {
|
||||
opal_pointer_array_t *apps;
|
||||
/* number of app_contexts in the array */
|
||||
orte_std_cntr_t num_apps;
|
||||
/* whether or not this job is locally spawned */
|
||||
bool local_spawn;
|
||||
/* flags to control the launch of this job - see above
|
||||
* for description of supported flags
|
||||
*/
|
||||
uint16_t controls;
|
||||
/* total slots allocated to this job */
|
||||
orte_std_cntr_t total_slots_alloc;
|
||||
/* number of procs in this job */
|
||||
|
@ -116,7 +116,7 @@ static void orte_job_construct(orte_job_t* job)
|
||||
ORTE_GLOBAL_ARRAY_MAX_SIZE,
|
||||
2);
|
||||
job->num_apps = 0;
|
||||
job->local_spawn = false;
|
||||
job->controls = 0;
|
||||
job->total_slots_alloc = 0;
|
||||
job->num_procs = 0;
|
||||
job->procs = OBJ_NEW(opal_pointer_array_t);
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user