1
1

Continue refinement of the DVM operations. Send the spawn request to the right place (it helps) as it isn't a comm_spawn request and has to be treated a little differently. Ensure IO gets forwarded back to the tool. Ensure the tool outputs show_help locally as there is no place to send it.

Этот коммит содержится в:
Ralph Castain 2015-02-04 06:20:11 -08:00
родитель a3728f09af
Коммит 2b0b012460
8 изменённых файлов: 123 добавлений и 37 удалений

Просмотреть файл

@ -105,13 +105,7 @@ static void proc_errors(int fd, short args, void *cbdata)
return;
}
if (ORTE_PROC_STATE_UNABLE_TO_SEND_MSG == caddy->proc_state) {
/* do nothing - the util/comm library knows how to handle this */
OBJ_RELEASE(caddy);
return;
}
/* all other errors require abort */
/* all errors require abort */
orte_errmgr_base_abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
OBJ_RELEASE(caddy);

Просмотреть файл

@ -147,6 +147,6 @@ static void rte_abort(int status, bool report)
orte_proc_info_finalize();
/* Now just exit */
exit(0);
exit(status);
}

Просмотреть файл

@ -63,7 +63,11 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
opal_list_item_t *item, *next;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s received IOF from proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* unpack the stream first as this may be flow control info */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &stream, &count, ORTE_IOF_TAG))) {
@ -97,6 +101,12 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
goto CLEAN_RETURN;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s received IOF cmd from sender %s for source %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&requestor),
ORTE_NAME_PRINT(&origin)));
/* check to see if a tool has requested something */
if (ORTE_IOF_PULL & stream) {
/* get name of the process wishing to be the sink */
@ -210,6 +220,6 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
}
}
CLEAN_RETURN:
CLEAN_RETURN:
return;
}

Просмотреть файл

@ -243,20 +243,22 @@ void orte_plm_base_setup_job(int fd, short args, void *cbdata)
caddy->jdata->state = caddy->job_state;
/* start by getting a jobid */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(caddy->jdata))) {
ORTE_ERROR_LOG(rc);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
if (ORTE_JOBID_INVALID == caddy->jdata->jobid) {
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(caddy->jdata))) {
ORTE_ERROR_LOG(rc);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
/* store it on the global job data pool - this is the key
* step required before we launch the daemons. It allows
* the orte_rmaps_base_setup_virtual_machine routine to
* search all apps for any hosts to be used by the vm
*/
opal_pointer_array_set_item(orte_job_data, ORTE_LOCAL_JOBID(caddy->jdata->jobid), caddy->jdata);
}
/* store it on the global job data pool - this is the key
* step required before we launch the daemons. It allows
* the orte_rmaps_base_setup_virtual_machine routine to
* search all apps for any hosts to be used by the vm
*/
opal_pointer_array_set_item(orte_job_data, ORTE_LOCAL_JOBID(caddy->jdata->jobid), caddy->jdata);
/* if job recovery is not enabled, set it to default */
if (!ORTE_FLAG_TEST(caddy->jdata, ORTE_JOB_FLAG_RECOVERABLE) &&
orte_enable_recovery) {

Просмотреть файл

@ -151,6 +151,9 @@ BEGIN_C_DECLS
/* notifier support */
#define ORTE_RML_TAG_NOTIFIER_HNP 52
/* confirm spawn by tool */
#define ORTE_RML_TAG_CONFIRM_SPAWN 53
#define ORTE_RML_TAG_MAX 100

Просмотреть файл

@ -60,6 +60,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/iof/iof_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/odls/odls.h"
@ -505,8 +506,49 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(ret);
goto ANSWER_LAUNCH;
}
/* launch it */
/* point the originator to the sender */
jdata->originator = *sender;
/* assign a jobid to it */
if (ORTE_SUCCESS != (ret = orte_plm_base_create_jobid(jdata))) {
ORTE_ERROR_LOG(ret);
goto ANSWER_LAUNCH;
}
/* store it on the global job data pool */
opal_pointer_array_set_item(orte_job_data, ORTE_LOCAL_JOBID(jdata->jobid), jdata);
/* before we launch it, tell the IOF to forward all output to the requestor */
/* setup the tag to pull from HNP */
{
orte_iof_tag_t ioftag;
opal_buffer_t *iofbuf;
orte_process_name_t source;
ioftag = ORTE_IOF_STDOUTALL | ORTE_IOF_PULL;
iofbuf = OBJ_NEW(opal_buffer_t);
/* pack the tag */
if (ORTE_SUCCESS != (ret = opal_dss.pack(iofbuf, &ioftag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(iofbuf);
goto ANSWER_LAUNCH;
}
/* pack the name of the source */
source.jobid = jdata->jobid;
source.vpid = ORTE_VPID_WILDCARD;
if (ORTE_SUCCESS != (ret = opal_dss.pack(iofbuf, &source, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(iofbuf);
goto ANSWER_LAUNCH;
}
/* pack the sender as the sink */
if (ORTE_SUCCESS != (ret = opal_dss.pack(iofbuf, sender, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(iofbuf);
goto ANSWER_LAUNCH;
}
/* send the buffer to our IOF */
orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, iofbuf, ORTE_RML_TAG_IOF_HNP,
orte_rml_send_callback, NULL);
}
/* now launch the job - this will just push it into our state machine */
if (ORTE_SUCCESS != (ret = orte_plm.spawn(jdata))) {
ORTE_ERROR_LOG(ret);
goto ANSWER_LAUNCH;
@ -521,7 +563,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
goto CLEANUP;
}
/* return response */
if (0 > (ret = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_TOOL,
if (0 > (ret = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_CONFIRM_SPAWN,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);

Просмотреть файл

@ -97,19 +97,14 @@
#include "orte/runtime/orte_quit.h"
#include "orte/util/show_help.h"
/* local functions */
static void orte_timeout_wakeup(int sd, short args, void *cbdata);
static void local_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata);
/*
* Globals
*/
static char **global_mca_env = NULL;
static orte_std_cntr_t total_num_apps = 0;
static bool want_prefix_by_default = (bool) ORTE_WANT_ORTERUN_PREFIX_BY_DEFAULT;
static volatile bool mywait = true;
volatile bool mywait = true;
volatile bool myspawn = true;
/*
* Globals
@ -330,6 +325,13 @@ static int parse_globals(int argc, char* argv[], opal_cmd_line_t *cmd_line);
static int parse_locals(orte_job_t *jdata, int argc, char* argv[]);
static void set_classpath_jar_file(orte_app_context_t *app, int index, char *jarfile);
static int parse_appfile(orte_job_t *jdata, char *filename, char ***env);
static void orte_timeout_wakeup(int sd, short args, void *cbdata);
static void local_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata);
static void spawn_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata);
int main(int argc, char *argv[])
@ -339,6 +341,8 @@ int main(int argc, char *argv[])
char *param;
orte_job_t *jdata=NULL;
char *hnpenv;
opal_buffer_t *req;
orte_daemon_cmd_flag_t cmd = ORTE_DAEMON_SPAWN_JOB_CMD;
/* Setup and parse the command line */
memset(&myglobals, 0, sizeof(myglobals));
@ -561,13 +565,29 @@ int main(int argc, char *argv[])
}
/* ask the HNP to spawn the job for us */
rc = orte_plm.spawn(jdata);
// post recv on tag_confirm_spawn, pass jdata as cbdata
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_CONFIRM_SPAWN,
ORTE_RML_PERSISTENT, spawn_recv, jdata);
// pack the ORTE_DAEMON_SPAWN_JOB_CMD command and job object and send to HNP at tag ORTE_RML_TAG_DAEMON
req = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(req, &cmd, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
exit(rc);
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(req, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
exit(rc);
}
orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, req, ORTE_RML_TAG_DAEMON, orte_rml_send_callback, NULL);
// wait for response and unpack the status, jobid
ORTE_WAIT_FOR_COMPLETION(myspawn);
opal_output(0, "Job %s has launched", ORTE_JOBID_PRINT(jdata->jobid));
waiting:
ORTE_WAIT_FOR_COMPLETION(mywait);
DONE:
opal_output(0, "FINALIZING");
/* cleanup and leave */
orte_finalize();
@ -1465,3 +1485,17 @@ static void local_recv(int status, orte_process_name_t* sender,
exit(orte_exit_status);
}
static void spawn_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
orte_job_t *jdata = (orte_job_t*)cbdata;
int32_t cnt;
// extract the returned jobid
cnt = 1;
opal_dss.unpack(buffer, &jdata->jobid, &cnt, ORTE_JOBID);
// release the wait
myspawn = false;
}

Просмотреть файл

@ -627,7 +627,8 @@ int orte_show_help_norender(const char *filename, const char *topic,
* or we weren't given an HNP, or we are running in standalone
* mode, then all we can do is process this locally
*/
if (ORTE_PROC_IS_HNP || orte_standalone_operation ||
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_TOOL ||
orte_standalone_operation ||
NULL == orte_rml.send_buffer_nb ||
NULL == orte_routed.get_route ||
NULL == orte_process_info.my_hnp_uri) {