From 5352c31914bdb3d61f30b0d0699570a0152774cb Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Fri, 6 Oct 2017 10:04:30 -0700 Subject: [PATCH] Enable remote tool connections for the DVM. Fix notifications so we "de-bounce" termination calls Signed-off-by: Ralph Castain --- .../pmix/pmix2x/pmix/src/client/pmix_client.c | 7 + .../pmix/src/event/pmix_event_notification.c | 27 ++- .../pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c | 8 +- .../pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.h | 1 + .../pmix/src/mca/ptl/tcp/ptl_tcp_component.c | 36 +++- .../mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c | 19 ++ opal/mca/pmix/pmix2x/pmix2x.c | 5 +- opal/mca/pmix/pmix2x/pmix2x_server_south.c | 7 +- opal/mca/pmix/pmix_types.h | 1 + orte/mca/state/base/state_base_fns.c | 18 +- orte/mca/state/dvm/state_dvm.c | 120 +++++++++++++ orte/mca/state/hnp/state_hnp.c | 164 ++++++++++++++++++ orte/tools/orte-dvm/orte-dvm.c | 120 ++----------- orte/tools/prun/prun.c | 20 ++- 14 files changed, 411 insertions(+), 142 deletions(-) diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c index 6626b4dea4..8c09f5a2d1 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c @@ -171,6 +171,13 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer, PMIX_RELEASE(chain); goto error; } + /* check for non-default flag */ + for (cnt=0; cnt < ninfo; cnt++) { + if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { + chain->nondefault = PMIX_INFO_TRUE(&chain->info[cnt]); + break; + } + } } /* now put the callback object tag in the last element */ PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c index 704ad732b2..b6f0458a3d 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c @@ -797,6 +797,8 @@ static void _notify_client_event(int sd, short args, void *cbdata) pmix_buffer_t *bfr; pmix_cmd_t cmd = PMIX_NOTIFY_CMD; pmix_status_t rc; + pmix_list_t trk; + pmix_namelist_t *nm; /* need to acquire the object from its originating thread */ PMIX_ACQUIRE_OBJECT(cd); @@ -835,6 +837,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) holdcd = false; if (PMIX_RANGE_PROC_LOCAL != cd->range) { + PMIX_CONSTRUCT(&trk, pmix_list_t); /* cycle across our registered events and send the message to * any client who registered for it */ PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) { @@ -848,6 +851,17 @@ static void _notify_client_event(int sd, short args, void *cbdata) cd->source.rank == pr->peer->info->pname.rank) { continue; } + /* if we have already notified this client, then don't do it again */ + matched = false; + PMIX_LIST_FOREACH(nm, &trk, pmix_namelist_t) { + if (nm->pname == &pr->peer->info->pname) { + matched = true; + break; + } + } + if (matched) { + continue; + } /* if we were given specific targets, check if this is one */ if (NULL != cd->targets) { matched = false; @@ -867,8 +881,15 @@ static void _notify_client_event(int sd, short args, void *cbdata) } } pmix_output_verbose(2, pmix_globals.debug_output, - "pmix_server: notifying client %s:%u", - pr->peer->info->pname.nspace, pr->peer->info->pname.rank); + "pmix_server: notifying client %s:%u on status %s", + pr->peer->info->pname.nspace, pr->peer->info->pname.rank, + PMIx_Error_string(cd->status)); + + /* record that we notified this client */ + nm = PMIX_NEW(pmix_namelist_t); + nm->pname = &pr->peer->info->pname; + pmix_list_append(&trk, &nm->super); + bfr = PMIX_NEW(pmix_buffer_t); if (NULL == bfr) { continue; @@ -896,7 +917,6 @@ static void _notify_client_event(int sd, short args, void *cbdata) PMIX_RELEASE(bfr); continue; } - /* pack any info */ PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->ninfo, 1, PMIX_SIZE); if (PMIX_SUCCESS != rc) { @@ -917,6 +937,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) } } } + PMIX_LIST_DESTRUCT(&trk); if (PMIX_RANGE_LOCAL != cd->range && 0 == strncmp(cd->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) && cd->source.rank == pmix_globals.myid.rank) { diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c index 034f06bb45..a1875708ad 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c @@ -235,12 +235,12 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer, if (NULL != mca_ptl_tcp_component.super.uri) { /* if the string starts with "file:", then they are pointing * us to a file we need to read to get the URI itself */ - if (0 != strncmp(mca_ptl_tcp_component.super.uri, "file:", 5)) { + if (0 == strncmp(mca_ptl_tcp_component.super.uri, "file:", 5)) { pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, "ptl:tcp:tool getting connection info from %s", mca_ptl_tcp_component.super.uri); nspace = NULL; - rc = parse_uri_file(&mca_ptl_tcp_component.super.uri[6], &suri, &nspace, &rank); + rc = parse_uri_file(&mca_ptl_tcp_component.super.uri[5], &suri, &nspace, &rank); if (PMIX_SUCCESS != rc) { return PMIX_ERR_UNREACH; } @@ -534,8 +534,8 @@ static pmix_status_t parse_uri_file(char *filename, } *p2 = '\0'; ++p2; - /* set the server nspace */ - *nspace = strdup(p); + /* set the server nspace/rank */ + *nspace = strdup(srvr); *rank = strtoull(p2, NULL, 10); /* now parse the uri itself */ diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.h b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.h index c05be98bd9..6f7db22520 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.h +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.h @@ -50,6 +50,7 @@ typedef struct { int wait_to_connect; int max_retries; char *report_uri; + bool remote_connections; } pmix_ptl_tcp_component_t; extern pmix_ptl_tcp_component_t mca_ptl_tcp_component; diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp_component.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp_component.c index e997a1cac5..572ac0fcd8 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp_component.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp_component.c @@ -115,7 +115,8 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo, .system_filename = NULL, .wait_to_connect = 4, .max_retries = 2, - .report_uri = NULL + .report_uri = NULL, + .remote_connections = false }; static char **split_and_resolve(char **orig_str, char *name); @@ -142,6 +143,13 @@ static int component_register(void) PMIX_MCA_BASE_VAR_SCOPE_LOCAL, &mca_ptl_tcp_component.report_uri); + (void)pmix_mca_base_component_var_register(component, "remote_connections", + "Enable connections from remote tools", + PMIX_MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, + PMIX_INFO_LVL_2, + PMIX_MCA_BASE_VAR_SCOPE_LOCAL, + &mca_ptl_tcp_component.remote_connections); + (void)pmix_mca_base_component_var_register(component, "if_include", "Comma-delimited list of devices and/or CIDR notation of TCP networks (e.g., \"eth0,192.168.0.0/16\"). Mutually exclusive with ptl_tcp_if_exclude.", PMIX_MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, @@ -212,6 +220,8 @@ static int component_register(void) return PMIX_SUCCESS; } +static char *urifile = NULL; + static pmix_status_t component_open(void) { char *tdir; @@ -241,6 +251,11 @@ static pmix_status_t component_open(void) if (NULL == mca_ptl_tcp_component.system_tmpdir) { mca_ptl_tcp_component.system_tmpdir = strdup(tdir); } + if (NULL != mca_ptl_tcp_component.report_uri && + 0 != strcmp(mca_ptl_tcp_component.report_uri, "-") && + 0 != strcmp(mca_ptl_tcp_component.report_uri, "+")) { + urifile = strdup(mca_ptl_tcp_component.report_uri); + } return PMIX_SUCCESS; } @@ -253,6 +268,12 @@ pmix_status_t component_close(void) if (NULL != mca_ptl_tcp_component.session_filename) { unlink(mca_ptl_tcp_component.session_filename); } + if (NULL != urifile) { + /* remove the file */ + unlink(urifile); + free(urifile); + urifile = NULL; + } return PMIX_SUCCESS; } @@ -283,7 +304,6 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo, struct sockaddr_storage my_ss; int kindex; size_t n; - bool remote_connections = false; bool session_tool = false; bool system_tool = false; pmix_socklen_t addrlen; @@ -317,11 +337,11 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo, } else if (0 == strcmp(info[n].key, PMIX_TCP_IPV6_PORT)) { mca_ptl_tcp_component.ipv6_port = info[n].value.data.integer; } else if (0 == strcmp(info[n].key, PMIX_TCP_DISABLE_IPV4)) { - mca_ptl_tcp_component.disable_ipv4_family = PMIX_INFO_TRUE(&info[n]); + mca_ptl_tcp_component.disable_ipv4_family = PMIX_INFO_TRUE(&info[n]); } else if (0 == strcmp(info[n].key, PMIX_TCP_DISABLE_IPV6)) { - mca_ptl_tcp_component.disable_ipv6_family = PMIX_INFO_TRUE(&info[n]); + mca_ptl_tcp_component.disable_ipv6_family = PMIX_INFO_TRUE(&info[n]); } else if (0 == strcmp(info[n].key, PMIX_SERVER_REMOTE_CONNECTIONS)) { - remote_connections = PMIX_INFO_TRUE(&info[n]); + mca_ptl_tcp_component.remote_connections = PMIX_INFO_TRUE(&info[n]); } else if (0 == strcmp(info[n].key, PMIX_TCP_URI)) { if (NULL != mca_ptl_tcp_component.super.uri) { free(mca_ptl_tcp_component.super.uri); @@ -343,9 +363,9 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo, } mca_ptl_tcp_component.system_tmpdir = strdup(info[n].value.data.string); } else if (0 == strcmp(info[n].key, PMIX_SERVER_TOOL_SUPPORT)) { - session_tool = PMIX_INFO_TRUE(&info[n]); + session_tool = PMIX_INFO_TRUE(&info[n]); } else if (0 == strcmp(info[n].key, PMIX_SERVER_SYSTEM_SUPPORT)) { - system_tool = PMIX_INFO_TRUE(&info[n]); + system_tool = PMIX_INFO_TRUE(&info[n]); } } } @@ -434,7 +454,7 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo, /* if this is the loopback device and they didn't enable * remote connections, then we are done */ if (pmix_ifisloopback(i)) { - if (remote_connections) { + if (mca_ptl_tcp_component.remote_connections) { /* ignore loopback */ continue; } else { diff --git a/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c b/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c index 1be259f049..24196cec36 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c +++ b/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c @@ -110,6 +110,7 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, buf, &cmd, &cnt, PMIX_COMMAND); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); + PMIX_RELEASE(chain); goto error; } /* unpack the status */ @@ -118,6 +119,7 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, buf, &chain->status, &cnt, PMIX_STATUS); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); + PMIX_RELEASE(chain); goto error; } @@ -127,6 +129,7 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, buf, &chain->source, &cnt, PMIX_PROC); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); + PMIX_RELEASE(chain); goto error; } @@ -136,19 +139,35 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, buf, &ninfo, &cnt, PMIX_SIZE); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); + PMIX_RELEASE(chain); goto error; } + /* we always leave space for a callback object */ chain->ninfo = ninfo + 1; PMIX_INFO_CREATE(chain->info, chain->ninfo); + if (NULL == chain->info) { + PMIX_ERROR_LOG(PMIX_ERR_NOMEM); + PMIX_RELEASE(chain); + return; + } + if (0 < ninfo) { cnt = ninfo; PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver, buf, chain->info, &cnt, PMIX_INFO); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); + PMIX_RELEASE(chain); goto error; } + /* check for non-default flag */ + for (cnt=0; cnt < ninfo; cnt++) { + if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { + chain->nondefault = PMIX_INFO_TRUE(&chain->info[cnt]); + break; + } + } } /* now put the callback object tag in the last element */ PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); diff --git a/opal/mca/pmix/pmix2x/pmix2x.c b/opal/mca/pmix/pmix2x/pmix2x.c index 34bc3d7d0e..ac5866aafa 100644 --- a/opal/mca/pmix/pmix2x/pmix2x.c +++ b/opal/mca/pmix/pmix2x/pmix2x.c @@ -244,8 +244,9 @@ void pmix2x_event_hdlr(size_t evhdlr_registration_id, opal_pmix2x_event_t *event; opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s RECEIVED NOTIFICATION OF STATUS %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), status); + "%s RECEIVED NOTIFICATION OF STATUS %d ON HDLR %lu", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), status, + (unsigned long)evhdlr_registration_id); OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock); diff --git a/opal/mca/pmix/pmix2x/pmix2x_server_south.c b/opal/mca/pmix/pmix2x/pmix2x_server_south.c index db76b13dee..7917d60180 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_server_south.c +++ b/opal/mca/pmix/pmix2x/pmix2x_server_south.c @@ -525,7 +525,12 @@ int pmix2x_server_notify_event(int status, n = 0; OPAL_LIST_FOREACH(kv, info, opal_value_t) { (void)strncpy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&pinfo[n].value, kv); + if (0 == strcmp(kv->key, OPAL_PMIX_JOB_TERM_STATUS)) { + pinfo[n].value.type = PMIX_STATUS; + pinfo[n].value.data.status = pmix2x_convert_opalrc(kv->data.integer); + } else { + pmix2x_value_load(&pinfo[n].value, kv); + } ++n; } } else { diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index 4b2ebfc478..a5e8384f6b 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -50,6 +50,7 @@ BEGIN_C_DECLS #define OPAL_PMIX_SERVER_TOOL_SUPPORT "pmix.srvr.tool" // (bool) The host RM wants to declare itself as willing to // accept tool connection requests +#define OPAL_PMIX_SERVER_REMOTE_CONNECTIONS "pmix.srvr.remote" // (bool) Allow connections from remote tools (do not use loopback device) #define OPAL_PMIX_SERVER_SYSTEM_SUPPORT "pmix.srvr.sys" // (bool) The host RM wants to declare itself as being the local // system server for PMIx connection requests #define OPAL_PMIX_SERVER_TMPDIR "pmix.srvr.tmpdir" // (char*) temp directory where PMIx server will place diff --git a/orte/mca/state/base/state_base_fns.c b/orte/mca/state/base/state_base_fns.c index 7f7a73697b..5f8bc3a2c4 100644 --- a/orte/mca/state/base/state_base_fns.c +++ b/orte/mca/state/base/state_base_fns.c @@ -651,7 +651,7 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata) orte_proc_t *pdata; int i; char *rtmod; - orte_process_name_t parent, target, *npptr; + orte_process_name_t parent, target; ORTE_ACQUIRE_OBJECT(caddy); proc = &caddy->name; @@ -769,22 +769,6 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata) orte_state_base_notify_data_server(&target); } ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); - /* if they requested notification upon completion, provide it */ - if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) { - /* notify_completion => notify the parent of the termination - * of this child job. So get the parent jobid info */ - npptr = &parent; - if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&npptr, OPAL_NAME)) { - /* notify everyone who asked for it */ - target.jobid = jdata->jobid; - target.vpid = ORTE_VPID_WILDCARD; - _send_notification(OPAL_ERR_JOB_TERMINATED, pdata->state, &target, ORTE_NAME_WILDCARD); - } else { - target.jobid = jdata->jobid; - target.vpid = ORTE_VPID_WILDCARD; - _send_notification(OPAL_ERR_JOB_TERMINATED, pdata->state, &target, &parent); - } - } } else if (ORTE_PROC_STATE_TERMINATED < pdata->state && !orte_job_term_ordered) { /* if this was an abnormal term, notify the other procs of the termination */ diff --git a/orte/mca/state/dvm/state_dvm.c b/orte/mca/state/dvm/state_dvm.c index 3462df57bd..426ffc813b 100644 --- a/orte/mca/state/dvm/state_dvm.c +++ b/orte/mca/state/dvm/state_dvm.c @@ -70,6 +70,8 @@ orte_state_base_module_t orte_state_dvm_module = { orte_state_base_remove_proc_state }; +static void dvm_notify(int sd, short args, void *cbdata); + /* defined default state machine sequence - individual * plm's must add a state for launching daemons */ @@ -91,6 +93,7 @@ static orte_job_state_t launch_states[] = { /* termination states */ ORTE_JOB_STATE_TERMINATED, ORTE_JOB_STATE_NOTIFY_COMPLETED, + ORTE_JOB_STATE_NOTIFIED, ORTE_JOB_STATE_ALL_JOBS_COMPLETE }; static orte_state_cbfunc_t launch_callbacks[] = { @@ -109,6 +112,7 @@ static orte_state_cbfunc_t launch_callbacks[] = { orte_plm_base_post_launch, orte_plm_base_registered, check_complete, + dvm_notify, cleanup_job, orte_quit }; @@ -518,3 +522,119 @@ static void cleanup_job(int sd, short args, void *cbdata) OBJ_RELEASE(caddy); } + +typedef struct { + opal_list_t *info; + orte_job_t *jdata; +} mycaddy_t; + +static void notify_complete(int status, void *cbdata) +{ + mycaddy_t *mycaddy = (mycaddy_t*)cbdata; + + OPAL_LIST_RELEASE(mycaddy->info); + ORTE_ACTIVATE_JOB_STATE(mycaddy->jdata, ORTE_JOB_STATE_NOTIFIED); + OBJ_RELEASE(mycaddy->jdata); + free(mycaddy); +} + +static void dvm_notify(int sd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = caddy->jdata; + orte_proc_t *pptr=NULL; + int ret; + opal_buffer_t *reply; + orte_daemon_cmd_flag_t command; + orte_grpcomm_signature_t *sig; + bool notify = true; + opal_list_t *info; + opal_value_t *val; + opal_process_name_t pname, *proc; + mycaddy_t *mycaddy; + + /* see if there was any problem */ + if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ABORTED_PROC, (void**)&pptr, OPAL_PTR) && NULL != pptr) { + ret = pptr->exit_code; + /* or whether we got cancelled by the user */ + } else if (orte_get_attribute(&jdata->attributes, ORTE_JOB_CANCELLED, NULL, OPAL_BOOL)) { + ret = ORTE_ERR_JOB_CANCELLED; + } else { + ret = ORTE_SUCCESS; + } + + if (0 == ret && orte_get_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION, NULL, OPAL_BOOL)) { + notify = false; + } + /* if the jobid matches that of the requestor, then don't notify */ + proc = &pname; + if (orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&proc, OPAL_NAME)) { + if (pname.jobid == jdata->jobid) { + notify = false; + } + } + + if (notify) { + /* the source is the job that terminated */ + pname.jobid = jdata->jobid; + pname.vpid = OPAL_VPID_WILDCARD; + + info = OBJ_NEW(opal_list_t); + /* ensure this only goes to the job terminated event handler */ + val = OBJ_NEW(opal_value_t); + val->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT); + val->type = OPAL_BOOL; + val->data.flag = true; + opal_list_append(info, &val->super); + /* tell the server not to cache the event as subsequent jobs + * do not need to know about it */ + val = OBJ_NEW(opal_value_t); + val->key = strdup(OPAL_PMIX_EVENT_DO_NOT_CACHE); + val->type = OPAL_BOOL; + val->data.flag = true; + opal_list_append(info, &val->super); + /* provide the status */ + val = OBJ_NEW(opal_value_t); + val->key = strdup(OPAL_PMIX_JOB_TERM_STATUS); + val->type = OPAL_STATUS; + val->data.status = ret; + opal_list_append(info, &val->super); + /* if there was a problem, we need to send the requestor more info about what happened */ + if (ORTE_SUCCESS != ret) { + val = OBJ_NEW(opal_value_t); + val->key = strdup(OPAL_PMIX_PROCID); + val->type = OPAL_NAME; + val->data.name.jobid = jdata->jobid; + if (NULL != pptr) { + val->data.name.vpid = pptr->name.vpid; + } else { + val->data.name.vpid = ORTE_VPID_WILDCARD; + } + opal_list_append(info, &val->super); + } + mycaddy = (mycaddy_t*)malloc(sizeof(mycaddy_t)); + mycaddy->info = info; + OBJ_RETAIN(jdata); + mycaddy->jdata = jdata; + opal_pmix.server_notify_event(OPAL_ERR_JOB_TERMINATED, &pname, + info, notify_complete, mycaddy); + } + + /* now ensure that _all_ daemons know that this job has terminated so even + * those that did not participate in it will know to cleanup the resources + * they assigned to the job. This is necessary now that the mapping function + * has been moved to the backend daemons - otherwise, non-participating daemons + * retain the slot assignments on the participating daemons, and then incorrectly + * map subsequent jobs thinking those nodes are still "busy" */ + reply = OBJ_NEW(opal_buffer_t); + command = ORTE_DAEMON_DVM_CLEANUP_JOB_CMD; + opal_dss.pack(reply, &command, 1, ORTE_DAEMON_CMD); + opal_dss.pack(reply, &jdata->jobid, 1, ORTE_JOBID); + sig = OBJ_NEW(orte_grpcomm_signature_t); + sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); + sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid; + sig->signature[0].vpid = ORTE_VPID_WILDCARD; + orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, reply); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); +} diff --git a/orte/mca/state/hnp/state_hnp.c b/orte/mca/state/hnp/state_hnp.c index cfde613539..71135b7a55 100644 --- a/orte/mca/state/hnp/state_hnp.c +++ b/orte/mca/state/hnp/state_hnp.c @@ -20,8 +20,11 @@ #include #include "opal/util/output.h" +#include "opal/mca/pmix/pmix.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/grpcomm/grpcomm.h" +#include "orte/mca/rml/rml.h" #include "orte/mca/iof/iof.h" #include "orte/mca/plm/base/base.h" #include "orte/mca/ras/base/base.h" @@ -62,6 +65,8 @@ orte_state_base_module_t orte_state_hnp_module = { orte_state_base_remove_proc_state }; +static void hnp_notify(int sd, short args, void *cbdata); + /* defined default state machine sequence - individual * plm's must add a state for launching daemons */ @@ -83,6 +88,7 @@ static orte_job_state_t launch_states[] = { /* termination states */ ORTE_JOB_STATE_TERMINATED, ORTE_JOB_STATE_NOTIFY_COMPLETED, + ORTE_JOB_STATE_NOTIFIED, ORTE_JOB_STATE_ALL_JOBS_COMPLETE }; static orte_state_cbfunc_t launch_callbacks[] = { @@ -101,6 +107,7 @@ static orte_state_cbfunc_t launch_callbacks[] = { orte_plm_base_post_launch, orte_plm_base_registered, orte_state_base_check_all_complete, + hnp_notify, orte_state_base_cleanup_job, orte_quit }; @@ -196,3 +203,160 @@ static int finalize(void) return ORTE_SUCCESS; } + +static void _send_notification(int status, + orte_proc_state_t state, + orte_process_name_t *proc, + orte_process_name_t *target) +{ + opal_buffer_t *buf; + orte_grpcomm_signature_t sig; + int rc; + opal_value_t kv, *kvptr; + orte_process_name_t daemon; + + buf = OBJ_NEW(opal_buffer_t); + + opal_output_verbose(5, orte_state_base_framework.framework_output, + "%s state:hnp:sending notification %s proc %s target %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_ERROR_NAME(status), + ORTE_NAME_PRINT(proc), + ORTE_NAME_PRINT(target)); + + /* pack the status */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return; + } + + /* the source is the proc */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, proc, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return; + } + + if (OPAL_ERR_PROC_ABORTED == status) { + /* we will pass three opal_value_t's */ + rc = 3; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return; + } + /* pass along the affected proc(s) */ + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC); + kv.type = OPAL_NAME; + kv.data.name.jobid = proc->jobid; + kv.data.name.vpid = proc->vpid; + kvptr = &kv; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + OBJ_RELEASE(buf); + return; + } + OBJ_DESTRUCT(&kv); + } else { + /* we are going to pass two opal_value_t's */ + rc = 2; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return; + } + } + + /* pass along the affected proc(s) */ + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC); + kv.type = OPAL_NAME; + kv.data.name.jobid = proc->jobid; + kv.data.name.vpid = proc->vpid; + kvptr = &kv; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + OBJ_RELEASE(buf); + return; + } + OBJ_DESTRUCT(&kv); + + /* pass along the proc(s) to be notified */ + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE); + kv.type = OPAL_NAME; + kv.data.name.jobid = target->jobid; + kv.data.name.vpid = target->vpid; + kvptr = &kv; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + OBJ_RELEASE(buf); + return; + } + OBJ_DESTRUCT(&kv); + + /* if the targets are a wildcard, then xcast it to everyone */ + if (ORTE_VPID_WILDCARD == target->vpid) { + OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t); + sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); + sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid; + sig.signature[0].vpid = ORTE_VPID_WILDCARD; + sig.sz = 1; + + if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, buf))) { + ORTE_ERROR_LOG(rc); + } + OBJ_DESTRUCT(&sig); + OBJ_RELEASE(buf); + } else { + /* get the daemon hosting the proc to be notified */ + daemon.jobid = ORTE_PROC_MY_NAME->jobid; + daemon.vpid = orte_get_proc_daemon_vpid(target); + /* send the notification to that daemon */ + opal_output_verbose(5, orte_state_base_framework.framework_output, + "%s state:base:sending notification %s to proc %s at daemon %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_ERROR_NAME(status), + ORTE_NAME_PRINT(target), + ORTE_NAME_PRINT(&daemon)); + if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit, + &daemon, buf, + ORTE_RML_TAG_NOTIFICATION, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + } + } +} + +static void hnp_notify(int sd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = caddy->jdata; + orte_process_name_t parent, target, *npptr; + + /* if they requested notification upon completion, provide it */ + if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) { + /* notify_completion => notify the parent of the termination + * of this child job. So get the parent jobid info */ + npptr = &parent; + if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&npptr, OPAL_NAME)) { + /* notify everyone who asked for it */ + target.jobid = jdata->jobid; + target.vpid = ORTE_VPID_WILDCARD; + _send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, ORTE_NAME_WILDCARD); + } else { + target.jobid = jdata->jobid; + target.vpid = ORTE_VPID_WILDCARD; + _send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, &parent); + } + } + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFIED); + + OBJ_RELEASE(caddy); +} diff --git a/orte/tools/orte-dvm/orte-dvm.c b/orte/tools/orte-dvm/orte-dvm.c index 1d286e1eef..522c539af3 100644 --- a/orte/tools/orte-dvm/orte-dvm.c +++ b/orte/tools/orte-dvm/orte-dvm.c @@ -106,6 +106,8 @@ static struct { bool set_sid; bool daemonize; bool system_server; + char *report_uri; + bool remote_connections; } myglobals; static opal_cmd_line_init_t cmd_line_init[] = { @@ -170,13 +172,20 @@ static opal_cmd_line_init_t cmd_line_init[] = { &myglobals.system_server, OPAL_CMD_LINE_TYPE_BOOL, "Provide a system-level server connection point - only one allowed per node" }, + { NULL, '\0', "report-uri", "report-uri", 1, + &myglobals.report_uri, OPAL_CMD_LINE_TYPE_STRING, + "Printout URI on stdout [-], stderr [+], or a file [anything else]", + OPAL_CMD_LINE_OTYPE_DEBUG }, + + { NULL, '\0', "remote-tools", "remote-tools", 0, + &myglobals.remote_connections, OPAL_CMD_LINE_TYPE_BOOL, + "Enable connections from remote tools" }, + /* End of list */ { NULL, '\0', NULL, NULL, 0, NULL, OPAL_CMD_LINE_TYPE_NULL, NULL } }; -static void notify_requestor(int sd, short args, void *cbdata); - int main(int argc, char *argv[]) { int rc, i, j; @@ -291,6 +300,13 @@ int main(int argc, char *argv[]) } /* always act as session-level PMIx server */ opal_setenv(OPAL_MCA_PREFIX"pmix_session_server", "1", true, &environ); + /* if we were asked to report a uri, set the MCA param to do so */ + if (NULL != myglobals.report_uri) { + opal_setenv("PMIX_MCA_ptl_tcp_report_uri", myglobals.report_uri, true, &environ); + } + if (myglobals.remote_connections) { + opal_setenv("PMIX_MCA_ptl_tcp_remote_connections", "1", true, &environ); + } /* Setup MCA params */ orte_register_params(); @@ -446,15 +462,6 @@ int main(int argc, char *argv[]) orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, ORTE_RML_PERSISTENT, orte_daemon_recv, NULL); - /* override the notify_completed state so we can send a message - * back to anyone who submits a job to us telling them the job - * completed */ - if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_NOTIFY_COMPLETED, notify_requestor))) { - ORTE_ERROR_LOG(rc); - ORTE_UPDATE_EXIT_STATUS(rc); - exit(orte_exit_status); - } - /* spawn the DVM - we skip the initial steps as this * isn't a user-level application */ ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOCATE); @@ -473,94 +480,3 @@ int main(int argc, char *argv[]) } exit(orte_exit_status); } - -static void notify_complete(int status, void *cbdata) -{ - opal_list_t *info = (opal_list_t*)cbdata; - OPAL_LIST_RELEASE(info); -} - -static void notify_requestor(int sd, short args, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_job_t *jdata = caddy->jdata; - orte_proc_t *pptr=NULL; - int ret; - opal_buffer_t *reply; - orte_daemon_cmd_flag_t command; - orte_grpcomm_signature_t *sig; - bool notify = true; - opal_list_t *info; - opal_value_t *val; - - /* see if there was any problem */ - if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ABORTED_PROC, (void**)&pptr, OPAL_PTR) && NULL != pptr) { - ret = pptr->exit_code; - /* or whether we got cancelled by the user */ - } else if (orte_get_attribute(&jdata->attributes, ORTE_JOB_CANCELLED, NULL, OPAL_BOOL)) { - ret = ORTE_ERR_JOB_CANCELLED; - } else { - ret = ORTE_SUCCESS; - } - - if (0 == ret && orte_get_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION, NULL, OPAL_BOOL)) { - notify = false; - } - - if (notify) { - info = OBJ_NEW(opal_list_t); - /* ensure this only goes to the job terminated event handler */ - val = OBJ_NEW(opal_value_t); - val->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT); - val->type = OPAL_BOOL; - val->data.flag = true; - opal_list_append(info, &val->super); - /* tell the server not to cache the event as subsequent jobs - * do not need to know about it */ - val = OBJ_NEW(opal_value_t); - val->key = strdup(OPAL_PMIX_EVENT_DO_NOT_CACHE); - val->type = OPAL_BOOL; - val->data.flag = true; - opal_list_append(info, &val->super); - /* provide the status */ - val = OBJ_NEW(opal_value_t); - val->key = strdup(OPAL_PMIX_JOB_TERM_STATUS); - val->type = OPAL_STATUS; - val->data.status = ret; - opal_list_append(info, &val->super); - /* if there was a problem, we need to send the requestor more info about what happened */ - if (ORTE_SUCCESS != ret) { - val = OBJ_NEW(opal_value_t); - val->key = strdup(OPAL_PMIX_PROCID); - val->type = OPAL_NAME; - val->data.name.jobid = jdata->jobid; - if (NULL != pptr) { - val->data.name.vpid = pptr->name.vpid; - } else { - val->data.name.vpid = ORTE_VPID_WILDCARD; - } - opal_list_append(info, &val->super); - } - opal_pmix.notify_event(OPAL_ERR_JOB_TERMINATED, NULL, - OPAL_PMIX_RANGE_GLOBAL, info, - notify_complete, info); - } - - /* now ensure that _all_ daemons know that this job has terminated so even - * those that did not participate in it will know to cleanup the resources - * they assigned to the job. This is necessary now that the mapping function - * has been moved to the backend daemons - otherwise, non-participating daemons - * retain the slot assignments on the participating daemons, and then incorrectly - * map subsequent jobs thinking those nodes are still "busy" */ - reply = OBJ_NEW(opal_buffer_t); - command = ORTE_DAEMON_DVM_CLEANUP_JOB_CMD; - opal_dss.pack(reply, &command, 1, ORTE_DAEMON_CMD); - opal_dss.pack(reply, &jdata->jobid, 1, ORTE_JOBID); - sig = OBJ_NEW(orte_grpcomm_signature_t); - sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); - sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid; - sig->signature[0].vpid = ORTE_VPID_WILDCARD; - orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, reply); - OBJ_RELEASE(reply); - OBJ_RELEASE(sig); -} diff --git a/orte/tools/prun/prun.c b/orte/tools/prun/prun.c index 0183848433..5c8852cbae 100644 --- a/orte/tools/prun/prun.c +++ b/orte/tools/prun/prun.c @@ -105,6 +105,7 @@ static int create_app(int argc, char* argv[], bool *made_app, char ***app_env); static int parse_locals(opal_list_t *jdata, int argc, char* argv[]); static void set_classpath_jar_file(opal_pmix_app_t *app, int index, char *jarfile); +static size_t evid = INT_MAX; static opal_cmd_line_init_t cmd_line_init[] = { @@ -154,12 +155,15 @@ static void regcbfunc(int status, size_t ref, void *cbdata) { opal_pmix_lock_t *lock = (opal_pmix_lock_t*)cbdata; OPAL_ACQUIRE_OBJECT(lock); + evid = ref; OPAL_PMIX_WAKEUP_THREAD(lock); } -static void release(int sd, short args, void *cbdata) +static void opcbfunc(int status, void *cbdata) { - active = false; + opal_pmix_lock_t *lock = (opal_pmix_lock_t*)cbdata; + OPAL_ACQUIRE_OBJECT(lock); + OPAL_PMIX_WAKEUP_THREAD(lock); } static bool fired = false; @@ -184,7 +188,7 @@ static void evhandler(int status, } if (!fired) { fired = true; - ORTE_ACTIVATE_PROC_STATE(ORTE_PROC_MY_NAME, ORTE_PROC_STATE_TERMINATED); + active = false; } } @@ -356,6 +360,10 @@ int prun(int argc, char *argv[]) opal_setenv(OPAL_MCA_PREFIX"ess_tool_server_pid", param, true, &environ); free(param); } + /* if they specified the URI, then pass it along */ + if (NULL != orte_cmd_options.hnp) { + opal_setenv("PMIX_MCA_ptl_tcp_server_uri", orte_cmd_options.hnp, true, &environ); + } /* now initialize ORTE */ if (OPAL_SUCCESS != (rc = orte_init(&argc, &argv, ORTE_PROC_TOOL))) { @@ -381,8 +389,6 @@ int prun(int argc, char *argv[]) goto DONE; } - orte_state.add_proc_state(ORTE_PROC_STATE_TERMINATED, release, ORTE_SYS_PRI); - /* get here if they want to run an application, so let's parse * the cmd line to get it */ @@ -621,6 +627,10 @@ int prun(int argc, char *argv[]) while (active) { nanosleep(&tp, NULL); } + OPAL_PMIX_CONSTRUCT_LOCK(&lock); + opal_pmix.deregister_evhandler(evid, opcbfunc, &lock); + OPAL_PMIX_WAIT_THREAD(&lock); + OPAL_PMIX_DESTRUCT_LOCK(&lock); DONE: /* cleanup and leave */