Fix tree-spawn to work within the new modex system
This commit was SVN r18349.
Этот коммит содержится в:
родитель
dcd21d7d07
Коммит
b2c73f6e11
@ -36,7 +36,7 @@ typedef uint8_t orte_daemon_cmd_flag_t;
|
||||
#define ORTE_DAEMON_KILL_LOCAL_PROCS (orte_daemon_cmd_flag_t) 2
|
||||
#define ORTE_DAEMON_SIGNAL_LOCAL_PROCS (orte_daemon_cmd_flag_t) 3
|
||||
#define ORTE_DAEMON_ADD_LOCAL_PROCS (orte_daemon_cmd_flag_t) 4
|
||||
#define ORTE_DAEMON_ADD_AND_SPAWN (orte_daemon_cmd_flag_t) 5
|
||||
#define ORTE_DAEMON_TREE_SPAWN (orte_daemon_cmd_flag_t) 5
|
||||
#define ORTE_DAEMON_HEARTBEAT_CMD (orte_daemon_cmd_flag_t) 6
|
||||
#define ORTE_DAEMON_EXIT_CMD (orte_daemon_cmd_flag_t) 7
|
||||
#define ORTE_DAEMON_PROCESS_AND_RELAY_CMD (orte_daemon_cmd_flag_t) 8
|
||||
|
@ -283,6 +283,9 @@ static void process_orted_launch_report(int fd, short event, void *data)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mev->sender)));
|
||||
|
||||
/* update state */
|
||||
pdatorted[mev->sender.vpid]->state = ORTE_PROC_STATE_RUNNING;
|
||||
|
||||
/* unpack its contact info */
|
||||
idx = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &idx, OPAL_STRING))) {
|
||||
@ -326,6 +329,11 @@ static void process_orted_launch_report(int fd, short event, void *data)
|
||||
/* store the arch */
|
||||
nodes[mev->sender.vpid]->arch = arch;
|
||||
|
||||
/* if a tree-launch is underway, send the cmd back */
|
||||
if (NULL != orte_tree_launch_cmd) {
|
||||
orte_rml.send_buffer(&mev->sender, orte_tree_launch_cmd, ORTE_RML_TAG_DAEMON, 0);
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
@ -412,6 +420,11 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
/* if a tree-launch was underway, clear out the cmd */
|
||||
if (NULL != orte_tree_launch_cmd) {
|
||||
OBJ_RELEASE(orte_tree_launch_cmd);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -143,16 +143,12 @@ static const char * orte_plm_rsh_shell_name[] = {
|
||||
static void set_handler_default(int sig);
|
||||
static orte_plm_rsh_shell_t find_shell(char *shell);
|
||||
static int find_children(int rank, int parent, int me, int num_procs);
|
||||
static int daemon_callback(orte_std_cntr_t num_children);
|
||||
|
||||
/* local global storage of timing variables */
|
||||
static struct timeval joblaunchstart, joblaunchstop;
|
||||
|
||||
/* local global storage */
|
||||
static orte_jobid_t active_job=ORTE_JOBID_INVALID;
|
||||
static orte_job_t *jdatorted;
|
||||
static orte_proc_t **pdatorted;
|
||||
static opal_buffer_t *launch_cmd;
|
||||
|
||||
/**
|
||||
* Init the module
|
||||
@ -281,107 +277,6 @@ static int orte_plm_rsh_probe(char *nodename,
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int total_num_daemons_calledback;
|
||||
static bool total_callback_failed;
|
||||
|
||||
|
||||
static void process_remote_launch_report(int fd, short event, void *data)
|
||||
{
|
||||
orte_message_event_t *mev = (orte_message_event_t*)data;
|
||||
opal_buffer_t *buffer = mev->buffer;
|
||||
orte_vpid_t vpid=ORTE_VPID_INVALID;
|
||||
orte_std_cntr_t cnt, numd, i;
|
||||
int rc;
|
||||
uint8_t flag;
|
||||
char *rml_uri;
|
||||
orte_process_name_t daemon;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:report_remote_launch from daemon %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mev->sender)));
|
||||
|
||||
/* unpack number of daemons being reported */
|
||||
cnt = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &numd, &cnt, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
total_callback_failed = true;
|
||||
return;
|
||||
}
|
||||
|
||||
/* unpack flag that indicates if any failed */
|
||||
cnt = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &cnt, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
total_callback_failed = true;
|
||||
return;
|
||||
}
|
||||
|
||||
/* did any fail? */
|
||||
if (0 != flag) {
|
||||
/* unpack the failed vpid */
|
||||
cnt = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &vpid, &cnt, ORTE_VPID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
total_callback_failed = true;
|
||||
}
|
||||
if (ORTE_VPID_INVALID != vpid) {
|
||||
/* note that this daemon failed */
|
||||
pdatorted[vpid]->state = ORTE_PROC_STATE_FAILED_TO_START;
|
||||
}
|
||||
/* report that the daemon has failed so we can exit */
|
||||
orte_plm_base_launch_failed(active_job, true, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
return;
|
||||
}
|
||||
|
||||
/* get their uri info */
|
||||
for (i=0; i < numd; i++) {
|
||||
cnt=1;
|
||||
opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING);
|
||||
orte_rml.set_contact_info(rml_uri);
|
||||
orte_rml_base_parse_uris(rml_uri, &daemon, NULL);
|
||||
pdatorted[daemon.vpid]->rml_uri = strdup(rml_uri);
|
||||
orte_routed.update_route(&daemon, &daemon);
|
||||
}
|
||||
|
||||
/* update num recvd */
|
||||
total_num_daemons_calledback += numd;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:report_remote_launch reported %d for total of %d daemons reported",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)numd, total_num_daemons_calledback));
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Need a callback function to report failure of a remote daemon's launch
|
||||
*/
|
||||
static void report_remote_launch(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* don't process this right away - we need to get out of the recv before
|
||||
* we process the message as it may ask us to do something that involves
|
||||
* more messaging! Instead, setup an event so that the message gets processed
|
||||
* as soon as we leave the recv.
|
||||
*
|
||||
* The macro makes a copy of the buffer, which we release when processed - the incoming
|
||||
* buffer, however, is NOT released here, although its payload IS transferred
|
||||
* to the message buffer for later processing
|
||||
*/
|
||||
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_remote_launch_report);
|
||||
|
||||
/* reissue the recv */
|
||||
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_REPORT_REMOTE_LAUNCH,
|
||||
ORTE_RML_NON_PERSISTENT, report_remote_launch, NULL);
|
||||
if (rc != ORTE_SUCCESS) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
total_callback_failed = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Callback on daemon exit.
|
||||
@ -787,7 +682,6 @@ static int remote_spawn(opal_buffer_t *launch)
|
||||
int proc_vpid_index;
|
||||
int local_exec_index;
|
||||
char **argv = NULL;
|
||||
char *param, *rml_uri;
|
||||
char *prefix;
|
||||
int argc;
|
||||
int rc;
|
||||
@ -795,7 +689,6 @@ static int remote_spawn(opal_buffer_t *launch)
|
||||
char *lib_base = NULL, *bin_base = NULL;
|
||||
bool failed_launch = true;
|
||||
pid_t pid;
|
||||
int num_children;
|
||||
orte_std_cntr_t n;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
@ -819,13 +712,6 @@ static int remote_spawn(opal_buffer_t *launch)
|
||||
nodes = (orte_nid_t**)orte_daemonmap.addr;
|
||||
vpid=ORTE_PROC_MY_NAME->vpid;
|
||||
|
||||
/* rewind the buffer for use by our children */
|
||||
launch->unpack_ptr = launch->base_ptr;
|
||||
|
||||
/* setup the launch cmd */
|
||||
launch_cmd = OBJ_NEW(opal_buffer_t);
|
||||
opal_dss.copy_payload(launch_cmd, launch);
|
||||
|
||||
/* clear out any previous child info */
|
||||
while (NULL != (item = opal_list_remove_first(&mca_plm_rsh_component.children))) {
|
||||
OBJ_RELEASE(item);
|
||||
@ -851,20 +737,11 @@ static int remote_spawn(opal_buffer_t *launch)
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* tell the child that I am its parent so it reports back to me */
|
||||
rml_uri = orte_rml.get_contact_info();
|
||||
asprintf(¶m, "\"%s\"", rml_uri);
|
||||
opal_argv_append(&argc, &argv, "--parent");
|
||||
opal_argv_append(&argc, &argv, param);
|
||||
free(param);
|
||||
free(rml_uri);
|
||||
|
||||
/* setup the collection buffer so I can report all the URI's back
|
||||
* to the HNP when the launch completes
|
||||
*/
|
||||
OBJ_CONSTRUCT(&collected_uris, opal_buffer_t);
|
||||
|
||||
num_children = 0;
|
||||
for (item = opal_list_get_first(&mca_plm_rsh_component.children);
|
||||
item != opal_list_get_end(&mca_plm_rsh_component.children);
|
||||
item = opal_list_get_next(item)) {
|
||||
@ -905,7 +782,6 @@ static int remote_spawn(opal_buffer_t *launch)
|
||||
lib_base, remote_sh, remote_csh);
|
||||
|
||||
} else { /* father */
|
||||
++num_children;
|
||||
OPAL_THREAD_LOCK(&mca_plm_rsh_component.lock);
|
||||
/* This situation can lead to a deadlock if '--debug-daemons' is set.
|
||||
* However, the deadlock condition is tested at the begining of this
|
||||
@ -923,15 +799,7 @@ static int remote_spawn(opal_buffer_t *launch)
|
||||
orte_wait_cb(pid, orte_plm_rsh_wait_daemon, (void*)&vpid);
|
||||
}
|
||||
}
|
||||
|
||||
/* wait for daemons to callback */
|
||||
if (ORTE_SUCCESS != (rc = daemon_callback(num_children))) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
"%s plm:rsh: daemon launch failed for job %s on error %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(active_job), ORTE_ERROR_NAME(rc)));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
failed_launch = false;
|
||||
|
||||
cleanup:
|
||||
@ -990,7 +858,6 @@ int orte_plm_rsh_launch(orte_job_t *jdata)
|
||||
orte_app_context_t **apps;
|
||||
orte_node_t **nodes;
|
||||
orte_std_cntr_t nnode;
|
||||
orte_std_cntr_t num_children;
|
||||
|
||||
if (orte_timing) {
|
||||
if (0 != gettimeofday(&joblaunchstart, NULL)) {
|
||||
@ -1029,13 +896,6 @@ int orte_plm_rsh_launch(orte_job_t *jdata)
|
||||
apps = (orte_app_context_t**)jdata->apps->addr;
|
||||
nodes = (orte_node_t**)map->nodes->addr;
|
||||
|
||||
/* get the orted job data object */
|
||||
if (NULL == (jdatorted = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
pdatorted = (orte_proc_t**)jdatorted->procs->addr;
|
||||
|
||||
if (0 == map->num_new_daemons) {
|
||||
/* have all the daemons we need - launch app */
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
@ -1096,41 +956,45 @@ int orte_plm_rsh_launch(orte_job_t *jdata)
|
||||
* and setup the recv to hear of any remote failures
|
||||
*/
|
||||
if (mca_plm_rsh_component.tree_spawn) {
|
||||
orte_daemon_cmd_flag_t command = ORTE_DAEMON_ADD_AND_SPAWN;
|
||||
|
||||
launch_cmd= OBJ_NEW(opal_buffer_t);
|
||||
/* insert the add_and_spawn cmd */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(launch_cmd, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
orte_daemon_cmd_flag_t command = ORTE_DAEMON_TREE_SPAWN;
|
||||
opal_byte_object_t bo, *boptr;
|
||||
orte_job_t *jdatorted;
|
||||
|
||||
orte_tree_launch_cmd= OBJ_NEW(opal_buffer_t);
|
||||
/* insert the tree_spawn cmd */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(orte_tree_launch_cmd, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(launch_cmd);
|
||||
OBJ_RELEASE(orte_tree_launch_cmd);
|
||||
goto cleanup;
|
||||
}
|
||||
/* pack the prefix since this will be needed by the next wave */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(launch_cmd, &prefix_dir, 1, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(orte_tree_launch_cmd, &prefix_dir, 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(launch_cmd);
|
||||
OBJ_RELEASE(orte_tree_launch_cmd);
|
||||
goto cleanup;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_odls.get_add_procs_data(launch_cmd, active_job))) {
|
||||
/* construct a nodemap */
|
||||
if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(&bo))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(launch_cmd);
|
||||
OBJ_RELEASE(orte_tree_launch_cmd);
|
||||
goto cleanup;
|
||||
}
|
||||
/* store it */
|
||||
boptr = &bo;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(orte_tree_launch_cmd, &boptr, 1, OPAL_BYTE_OBJECT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(orte_tree_launch_cmd);
|
||||
free(bo.bytes);
|
||||
goto cleanup;
|
||||
}
|
||||
/* release the data since it has now been copied into our buffer */
|
||||
free(bo.bytes);
|
||||
/* get the orted job data object */
|
||||
if (NULL == (jdatorted = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
find_children(0, 0, 0, jdatorted->num_procs);
|
||||
total_num_daemons_calledback = 1; /* need to account for myself */
|
||||
total_callback_failed = false;
|
||||
num_children = opal_list_get_size(&mca_plm_rsh_component.children);
|
||||
/* we don't really need the collection buffer, but it needs to
|
||||
* be setup to avoid problems in the callback
|
||||
*/
|
||||
OBJ_CONSTRUCT(&collected_uris, opal_buffer_t);
|
||||
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_REPORT_REMOTE_LAUNCH,
|
||||
ORTE_RML_NON_PERSISTENT, report_remote_launch, NULL);
|
||||
if (rc != ORTE_SUCCESS) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(launch_cmd);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1252,24 +1116,12 @@ next_node:
|
||||
}
|
||||
|
||||
/* wait for daemons to callback */
|
||||
if (mca_plm_rsh_component.tree_spawn) {
|
||||
if (ORTE_SUCCESS != (rc = daemon_callback(num_children))) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
"%s plm:rsh: daemon launch failed for job %s on error %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(active_job), ORTE_ERROR_NAME(rc)));
|
||||
goto cleanup;
|
||||
}
|
||||
failed_launch = false;
|
||||
goto cleanup; /* apps are launched via callback */
|
||||
} else {
|
||||
if (ORTE_SUCCESS != (rc = orte_plm_base_daemon_callback(map->num_new_daemons))) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
"%s plm:rsh: daemon launch failed for job %s on error %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(active_job), ORTE_ERROR_NAME(rc)));
|
||||
goto cleanup;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_plm_base_daemon_callback(map->num_new_daemons))) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
"%s plm:rsh: daemon launch failed for job %s on error %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(active_job), ORTE_ERROR_NAME(rc)));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
launch_apps:
|
||||
@ -1352,213 +1204,6 @@ static int find_children(int rank, int parent, int me, int num_procs)
|
||||
return -1;
|
||||
}
|
||||
|
||||
static orte_std_cntr_t num_callback;
|
||||
static bool failed_launch;
|
||||
|
||||
static void process_launch_report(int fd, short event, void *data)
|
||||
{
|
||||
orte_message_event_t *mev = (orte_message_event_t*)data;
|
||||
opal_buffer_t *buffer = mev->buffer;
|
||||
char *rml_uri;
|
||||
int rc, idx;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:report_launch from daemon %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mev->sender)));
|
||||
|
||||
/* add this data to the collection buffer */
|
||||
opal_dss.copy_payload(&collected_uris, buffer);
|
||||
|
||||
/* unpack its contact info */
|
||||
idx = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &idx, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* set the contact info into the hash table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(rml_uri))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(rml_uri);
|
||||
failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* if I'm the HNP, lookup and record this daemon's contact info */
|
||||
if (orte_process_info.hnp) {
|
||||
/* this counts towards my total callback count */
|
||||
++total_num_daemons_calledback;
|
||||
pdatorted[mev->sender.vpid]->rml_uri = strdup(rml_uri);
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:report_launch now at %d total called back",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
total_num_daemons_calledback));
|
||||
|
||||
}
|
||||
free(rml_uri);
|
||||
|
||||
/* set the route to be direct */
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&mev->sender, &mev->sender))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* send out the add-and-spawn command */
|
||||
if (0 > (rc = orte_rml.send_buffer(&mev->sender, launch_cmd, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
failed_launch = true;
|
||||
goto CLEANUP;
|
||||
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:report_launch %s for daemon %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
failed_launch ? "failed" : "completed",
|
||||
ORTE_NAME_PRINT(&mev->sender)));
|
||||
|
||||
if (failed_launch && orte_process_info.hnp) {
|
||||
orte_errmgr.incomplete_start(ORTE_PROC_MY_NAME->jobid, jdatorted->aborted_proc->exit_code);
|
||||
} else {
|
||||
num_callback++;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void report_launch(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
{
|
||||
int rc;
|
||||
|
||||
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_launch_report);
|
||||
/* reissue the recv */
|
||||
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ORTED_CALLBACK,
|
||||
ORTE_RML_NON_PERSISTENT, report_launch, NULL);
|
||||
if (rc != ORTE_SUCCESS) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
failed_launch = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static int daemon_callback(orte_std_cntr_t num_children)
|
||||
{
|
||||
int rc;
|
||||
opal_buffer_t wireup, buf;
|
||||
orte_rml_cmd_flag_t cmd = ORTE_RML_TAG_RML_INFO_UPDATE;
|
||||
uint8_t flag;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:daemon_callback",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if I have no children, just return ok */
|
||||
if (0 == num_children) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
num_callback = 0;
|
||||
failed_launch = false;
|
||||
|
||||
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ORTED_CALLBACK,
|
||||
ORTE_RML_NON_PERSISTENT, report_launch, NULL);
|
||||
if (rc != ORTE_SUCCESS) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
ORTE_PROGRESSED_WAIT(failed_launch, num_callback, num_children);
|
||||
|
||||
/* cancel the lingering recv */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ORTED_CALLBACK))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:daemon_callback completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* all done launching my children - if I am NOT the HNP, then
|
||||
* send my number of children and their URI's to the HNP so it can know
|
||||
* when everyone is done and how to talk to them directly, if necessary
|
||||
*/
|
||||
if (!orte_process_info.hnp) {
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
opal_dss.pack(&buf, &num_children, 1, ORTE_STD_CNTR);
|
||||
flag=0;
|
||||
opal_dss.pack(&buf, &flag, 1, OPAL_UINT8);
|
||||
/* copy over the collection buffer data */
|
||||
opal_dss.copy_payload(&buf, &collected_uris);
|
||||
orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buf, ORTE_RML_TAG_REPORT_REMOTE_LAUNCH, 0);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* If I am the HNP, then I need to update the #procs in my process_info
|
||||
*/
|
||||
orte_process_info.num_procs = jdatorted->num_procs;
|
||||
/* update the grpcomm xcast tree(s) */
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.update_trees())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:daemon_callback trees updated\n\twaiting for all %d daemons to launch, currently at %d, %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)orte_process_info.num_procs, total_num_daemons_calledback,
|
||||
total_callback_failed ? "Callback has failed" : "Callback continues"));
|
||||
|
||||
/* wait for all daemons to complete launching - setup a recv
|
||||
* so that each daemon can tell me how many they launched.
|
||||
* When that number == total num daemons to be launched, then
|
||||
* we are done
|
||||
*/
|
||||
ORTE_PROGRESSED_WAIT(total_callback_failed, total_num_daemons_calledback, (int)orte_process_info.num_procs);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:daemon_callback - all daemons have launched!",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* get our wireup info so we can tell the entire daemon tree how
|
||||
* to communicate with each other
|
||||
*/
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.get_wireup_info(ORTE_PROC_MY_NAME->jobid, &buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
/* if anything was inserted, send it out */
|
||||
if (0 < buf.bytes_used) {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:ssh:daemon_callback updating contact info",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* setup a buffer */
|
||||
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
|
||||
/* pack the update_rml_info command */
|
||||
opal_dss.pack(&wireup, &cmd, 1, ORTE_RML_CMD);
|
||||
/* copy the data */
|
||||
opal_dss.copy_payload(&wireup, &buf);
|
||||
/* xcast it */
|
||||
orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &wireup, ORTE_RML_TAG_RML_INFO_UPDATE);
|
||||
OBJ_DESTRUCT(&wireup);
|
||||
}
|
||||
OBJ_DESTRUCT(&buf);
|
||||
|
||||
/* now we are good to go! */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Terminate all processes for a given job
|
||||
|
@ -301,11 +301,10 @@ static int process_commands(orte_process_name_t* sender,
|
||||
int32_t signal;
|
||||
orte_jobid_t job;
|
||||
orte_rml_tag_t target_tag;
|
||||
char *contact_info, *prefix;
|
||||
char *contact_info;
|
||||
opal_buffer_t *answer;
|
||||
orte_rml_cmd_flag_t rml_cmd;
|
||||
orte_job_t *jdata;
|
||||
char *save_buf;
|
||||
|
||||
/* unpack the command */
|
||||
n = 1;
|
||||
@ -371,14 +370,12 @@ static int process_commands(orte_process_name_t* sender,
|
||||
}
|
||||
break;
|
||||
|
||||
/**** ADD_AND_SPAWN ****/
|
||||
case ORTE_DAEMON_ADD_AND_SPAWN:
|
||||
/**** TREE_SPAWN ****/
|
||||
case ORTE_DAEMON_TREE_SPAWN:
|
||||
if (orte_debug_daemons_flag) {
|
||||
opal_output(0, "%s orted_cmd: received add_and_spawn",
|
||||
opal_output(0, "%s orted_cmd: received tree_spawn",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
}
|
||||
/* save our current buffer location */
|
||||
save_buf = buffer->unpack_ptr;
|
||||
/* if the PLM supports remote spawn, pass it all along */
|
||||
if (NULL != orte_plm.remote_spawn) {
|
||||
if (ORTE_SUCCESS != (ret = orte_plm.remote_spawn(buffer))) {
|
||||
@ -387,20 +384,6 @@ static int process_commands(orte_process_name_t* sender,
|
||||
} else {
|
||||
opal_output(0, "%s remote spawn is NULL!", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
}
|
||||
/* rewind the buffer so we can reuse it */
|
||||
buffer->unpack_ptr = save_buf;
|
||||
/* unpack the prefix and throw it away - we don't need it here */
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &prefix, &n, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* launch the local processes */
|
||||
if (ORTE_SUCCESS != (ret = orte_odls.launch_local_procs(buffer))) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orted:comm:add_procs failed to launch on error %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_ERROR_NAME(ret)));
|
||||
}
|
||||
break;
|
||||
|
||||
/**** DELIVER A MESSAGE TO THE LOCAL PROCS ****/
|
||||
|
@ -107,7 +107,6 @@ static struct {
|
||||
char* num_procs;
|
||||
int uri_pipe;
|
||||
int singleton_died_pipe;
|
||||
char* parent;
|
||||
} orted_globals;
|
||||
|
||||
/*
|
||||
@ -167,10 +166,6 @@ opal_cmd_line_init_t orte_cmd_line_opts[] = {
|
||||
&orted_globals.singleton_died_pipe, OPAL_CMD_LINE_TYPE_INT,
|
||||
"Watch on indicated pipe for singleton termination"},
|
||||
|
||||
{ NULL, NULL, NULL, '\0', NULL, "parent", 1,
|
||||
&orted_globals.parent, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Parent vpid for tree-based spawns"},
|
||||
|
||||
/* End of list */
|
||||
{ NULL, NULL, NULL, '\0', NULL, NULL, 0,
|
||||
NULL, OPAL_CMD_LINE_TYPE_NULL, NULL }
|
||||
@ -524,7 +519,6 @@ int orte_daemon(int argc, char *argv[])
|
||||
* We need to do this at the last possible second as the HNP
|
||||
* can turn right around and begin issuing orders to us
|
||||
*/
|
||||
orte_process_name_t parent;
|
||||
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
/* if we are using static ports, there is no need to send our
|
||||
@ -549,19 +543,7 @@ int orte_daemon(int argc, char *argv[])
|
||||
OBJ_RELEASE(buffer);
|
||||
return ret;
|
||||
}
|
||||
/* if no parent was specified, send to my hnp */
|
||||
if (NULL == orted_globals.parent) {
|
||||
parent.vpid = ORTE_PROC_MY_HNP->vpid;
|
||||
parent.jobid = ORTE_PROC_MY_HNP->jobid;
|
||||
} else {
|
||||
/* set the parent's contact info into our hash tables */
|
||||
orte_rml.set_contact_info(orted_globals.parent);
|
||||
/* extract the parent's name */
|
||||
orte_rml_base_parse_uris(orted_globals.parent, &parent, NULL);
|
||||
/* set the route to be direct */
|
||||
orte_routed.update_route(&parent, &parent);
|
||||
}
|
||||
if (0 > (ret = orte_rml.send_buffer(&parent, buffer,
|
||||
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buffer,
|
||||
ORTE_RML_TAG_ORTED_CALLBACK, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_RELEASE(buffer);
|
||||
|
@ -65,6 +65,8 @@ int orte_timeout_usec_per_proc;
|
||||
float orte_max_timeout;
|
||||
char *orte_default_hostfile;
|
||||
|
||||
opal_buffer_t *orte_tree_launch_cmd = NULL;
|
||||
|
||||
orte_process_name_t orte_globals_name_wildcard = {ORTE_JOBID_WILDCARD, ORTE_VPID_WILDCARD};
|
||||
orte_process_name_t orte_globals_name_invalid = {ORTE_JOBID_INVALID, ORTE_VPID_INVALID};
|
||||
|
||||
|
@ -333,6 +333,8 @@ ORTE_DECLSPEC extern float orte_max_timeout;
|
||||
|
||||
ORTE_DECLSPEC extern char *orte_default_hostfile;
|
||||
|
||||
ORTE_DECLSPEC extern opal_buffer_t *orte_tree_launch_cmd;
|
||||
|
||||
/* global arrays for data storage */
|
||||
ORTE_DECLSPEC extern opal_pointer_array_t *orte_job_data;
|
||||
ORTE_DECLSPEC extern opal_pointer_array_t *orte_node_pool;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user