1
1

Enable remote tool connections for the DVM. Fix notifications so we "de-bounce" termination calls

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-10-06 10:04:30 -07:00
родитель 3660cedc48
Коммит 5352c31914
14 изменённых файлов: 411 добавлений и 142 удалений

Просмотреть файл

@ -171,6 +171,13 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer,
PMIX_RELEASE(chain);
goto error;
}
/* check for non-default flag */
for (cnt=0; cnt < ninfo; cnt++) {
if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
chain->nondefault = PMIX_INFO_TRUE(&chain->info[cnt]);
break;
}
}
}
/* now put the callback object tag in the last element */
PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);

Просмотреть файл

@ -797,6 +797,8 @@ static void _notify_client_event(int sd, short args, void *cbdata)
pmix_buffer_t *bfr;
pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
pmix_status_t rc;
pmix_list_t trk;
pmix_namelist_t *nm;
/* need to acquire the object from its originating thread */
PMIX_ACQUIRE_OBJECT(cd);
@ -835,6 +837,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
holdcd = false;
if (PMIX_RANGE_PROC_LOCAL != cd->range) {
PMIX_CONSTRUCT(&trk, pmix_list_t);
/* cycle across our registered events and send the message to
* any client who registered for it */
PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
@ -848,6 +851,17 @@ static void _notify_client_event(int sd, short args, void *cbdata)
cd->source.rank == pr->peer->info->pname.rank) {
continue;
}
/* if we have already notified this client, then don't do it again */
matched = false;
PMIX_LIST_FOREACH(nm, &trk, pmix_namelist_t) {
if (nm->pname == &pr->peer->info->pname) {
matched = true;
break;
}
}
if (matched) {
continue;
}
/* if we were given specific targets, check if this is one */
if (NULL != cd->targets) {
matched = false;
@ -867,8 +881,15 @@ static void _notify_client_event(int sd, short args, void *cbdata)
}
}
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: notifying client %s:%u",
pr->peer->info->pname.nspace, pr->peer->info->pname.rank);
"pmix_server: notifying client %s:%u on status %s",
pr->peer->info->pname.nspace, pr->peer->info->pname.rank,
PMIx_Error_string(cd->status));
/* record that we notified this client */
nm = PMIX_NEW(pmix_namelist_t);
nm->pname = &pr->peer->info->pname;
pmix_list_append(&trk, &nm->super);
bfr = PMIX_NEW(pmix_buffer_t);
if (NULL == bfr) {
continue;
@ -896,7 +917,6 @@ static void _notify_client_event(int sd, short args, void *cbdata)
PMIX_RELEASE(bfr);
continue;
}
/* pack any info */
PMIX_BFROPS_PACK(rc, pr->peer, bfr, &cd->ninfo, 1, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
@ -917,6 +937,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
}
}
}
PMIX_LIST_DESTRUCT(&trk);
if (PMIX_RANGE_LOCAL != cd->range &&
0 == strncmp(cd->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pmix_globals.myid.rank) {

Просмотреть файл

@ -235,12 +235,12 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
if (NULL != mca_ptl_tcp_component.super.uri) {
/* if the string starts with "file:", then they are pointing
* us to a file we need to read to get the URI itself */
if (0 != strncmp(mca_ptl_tcp_component.super.uri, "file:", 5)) {
if (0 == strncmp(mca_ptl_tcp_component.super.uri, "file:", 5)) {
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"ptl:tcp:tool getting connection info from %s",
mca_ptl_tcp_component.super.uri);
nspace = NULL;
rc = parse_uri_file(&mca_ptl_tcp_component.super.uri[6], &suri, &nspace, &rank);
rc = parse_uri_file(&mca_ptl_tcp_component.super.uri[5], &suri, &nspace, &rank);
if (PMIX_SUCCESS != rc) {
return PMIX_ERR_UNREACH;
}
@ -534,8 +534,8 @@ static pmix_status_t parse_uri_file(char *filename,
}
*p2 = '\0';
++p2;
/* set the server nspace */
*nspace = strdup(p);
/* set the server nspace/rank */
*nspace = strdup(srvr);
*rank = strtoull(p2, NULL, 10);
/* now parse the uri itself */

Просмотреть файл

@ -50,6 +50,7 @@ typedef struct {
int wait_to_connect;
int max_retries;
char *report_uri;
bool remote_connections;
} pmix_ptl_tcp_component_t;
extern pmix_ptl_tcp_component_t mca_ptl_tcp_component;

Просмотреть файл

@ -115,7 +115,8 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
.system_filename = NULL,
.wait_to_connect = 4,
.max_retries = 2,
.report_uri = NULL
.report_uri = NULL,
.remote_connections = false
};
static char **split_and_resolve(char **orig_str, char *name);
@ -142,6 +143,13 @@ static int component_register(void)
PMIX_MCA_BASE_VAR_SCOPE_LOCAL,
&mca_ptl_tcp_component.report_uri);
(void)pmix_mca_base_component_var_register(component, "remote_connections",
"Enable connections from remote tools",
PMIX_MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
PMIX_INFO_LVL_2,
PMIX_MCA_BASE_VAR_SCOPE_LOCAL,
&mca_ptl_tcp_component.remote_connections);
(void)pmix_mca_base_component_var_register(component, "if_include",
"Comma-delimited list of devices and/or CIDR notation of TCP networks (e.g., \"eth0,192.168.0.0/16\"). Mutually exclusive with ptl_tcp_if_exclude.",
PMIX_MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
@ -212,6 +220,8 @@ static int component_register(void)
return PMIX_SUCCESS;
}
static char *urifile = NULL;
static pmix_status_t component_open(void)
{
char *tdir;
@ -241,6 +251,11 @@ static pmix_status_t component_open(void)
if (NULL == mca_ptl_tcp_component.system_tmpdir) {
mca_ptl_tcp_component.system_tmpdir = strdup(tdir);
}
if (NULL != mca_ptl_tcp_component.report_uri &&
0 != strcmp(mca_ptl_tcp_component.report_uri, "-") &&
0 != strcmp(mca_ptl_tcp_component.report_uri, "+")) {
urifile = strdup(mca_ptl_tcp_component.report_uri);
}
return PMIX_SUCCESS;
}
@ -253,6 +268,12 @@ pmix_status_t component_close(void)
if (NULL != mca_ptl_tcp_component.session_filename) {
unlink(mca_ptl_tcp_component.session_filename);
}
if (NULL != urifile) {
/* remove the file */
unlink(urifile);
free(urifile);
urifile = NULL;
}
return PMIX_SUCCESS;
}
@ -283,7 +304,6 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
struct sockaddr_storage my_ss;
int kindex;
size_t n;
bool remote_connections = false;
bool session_tool = false;
bool system_tool = false;
pmix_socklen_t addrlen;
@ -317,11 +337,11 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
} else if (0 == strcmp(info[n].key, PMIX_TCP_IPV6_PORT)) {
mca_ptl_tcp_component.ipv6_port = info[n].value.data.integer;
} else if (0 == strcmp(info[n].key, PMIX_TCP_DISABLE_IPV4)) {
mca_ptl_tcp_component.disable_ipv4_family = PMIX_INFO_TRUE(&info[n]);
mca_ptl_tcp_component.disable_ipv4_family = PMIX_INFO_TRUE(&info[n]);
} else if (0 == strcmp(info[n].key, PMIX_TCP_DISABLE_IPV6)) {
mca_ptl_tcp_component.disable_ipv6_family = PMIX_INFO_TRUE(&info[n]);
mca_ptl_tcp_component.disable_ipv6_family = PMIX_INFO_TRUE(&info[n]);
} else if (0 == strcmp(info[n].key, PMIX_SERVER_REMOTE_CONNECTIONS)) {
remote_connections = PMIX_INFO_TRUE(&info[n]);
mca_ptl_tcp_component.remote_connections = PMIX_INFO_TRUE(&info[n]);
} else if (0 == strcmp(info[n].key, PMIX_TCP_URI)) {
if (NULL != mca_ptl_tcp_component.super.uri) {
free(mca_ptl_tcp_component.super.uri);
@ -343,9 +363,9 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
}
mca_ptl_tcp_component.system_tmpdir = strdup(info[n].value.data.string);
} else if (0 == strcmp(info[n].key, PMIX_SERVER_TOOL_SUPPORT)) {
session_tool = PMIX_INFO_TRUE(&info[n]);
session_tool = PMIX_INFO_TRUE(&info[n]);
} else if (0 == strcmp(info[n].key, PMIX_SERVER_SYSTEM_SUPPORT)) {
system_tool = PMIX_INFO_TRUE(&info[n]);
system_tool = PMIX_INFO_TRUE(&info[n]);
}
}
}
@ -434,7 +454,7 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
/* if this is the loopback device and they didn't enable
* remote connections, then we are done */
if (pmix_ifisloopback(i)) {
if (remote_connections) {
if (mca_ptl_tcp_component.remote_connections) {
/* ignore loopback */
continue;
} else {

Просмотреть файл

@ -110,6 +110,7 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer,
buf, &cmd, &cnt, PMIX_COMMAND);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(chain);
goto error;
}
/* unpack the status */
@ -118,6 +119,7 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer,
buf, &chain->status, &cnt, PMIX_STATUS);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(chain);
goto error;
}
@ -127,6 +129,7 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer,
buf, &chain->source, &cnt, PMIX_PROC);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(chain);
goto error;
}
@ -136,19 +139,35 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer,
buf, &ninfo, &cnt, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(chain);
goto error;
}
/* we always leave space for a callback object */
chain->ninfo = ninfo + 1;
PMIX_INFO_CREATE(chain->info, chain->ninfo);
if (NULL == chain->info) {
PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
PMIX_RELEASE(chain);
return;
}
if (0 < ninfo) {
cnt = ninfo;
PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
buf, chain->info, &cnt, PMIX_INFO);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(chain);
goto error;
}
/* check for non-default flag */
for (cnt=0; cnt < ninfo; cnt++) {
if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
chain->nondefault = PMIX_INFO_TRUE(&chain->info[cnt]);
break;
}
}
}
/* now put the callback object tag in the last element */
PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);

Просмотреть файл

@ -244,8 +244,9 @@ void pmix2x_event_hdlr(size_t evhdlr_registration_id,
opal_pmix2x_event_t *event;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s RECEIVED NOTIFICATION OF STATUS %d",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), status);
"%s RECEIVED NOTIFICATION OF STATUS %d ON HDLR %lu",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), status,
(unsigned long)evhdlr_registration_id);
OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);

Просмотреть файл

@ -525,7 +525,12 @@ int pmix2x_server_notify_event(int status,
n = 0;
OPAL_LIST_FOREACH(kv, info, opal_value_t) {
(void)strncpy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
pmix2x_value_load(&pinfo[n].value, kv);
if (0 == strcmp(kv->key, OPAL_PMIX_JOB_TERM_STATUS)) {
pinfo[n].value.type = PMIX_STATUS;
pinfo[n].value.data.status = pmix2x_convert_opalrc(kv->data.integer);
} else {
pmix2x_value_load(&pinfo[n].value, kv);
}
++n;
}
} else {

Просмотреть файл

@ -50,6 +50,7 @@ BEGIN_C_DECLS
#define OPAL_PMIX_SERVER_TOOL_SUPPORT "pmix.srvr.tool" // (bool) The host RM wants to declare itself as willing to
// accept tool connection requests
#define OPAL_PMIX_SERVER_REMOTE_CONNECTIONS "pmix.srvr.remote" // (bool) Allow connections from remote tools (do not use loopback device)
#define OPAL_PMIX_SERVER_SYSTEM_SUPPORT "pmix.srvr.sys" // (bool) The host RM wants to declare itself as being the local
// system server for PMIx connection requests
#define OPAL_PMIX_SERVER_TMPDIR "pmix.srvr.tmpdir" // (char*) temp directory where PMIx server will place

Просмотреть файл

@ -651,7 +651,7 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
orte_proc_t *pdata;
int i;
char *rtmod;
orte_process_name_t parent, target, *npptr;
orte_process_name_t parent, target;
ORTE_ACQUIRE_OBJECT(caddy);
proc = &caddy->name;
@ -769,22 +769,6 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
orte_state_base_notify_data_server(&target);
}
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
/* if they requested notification upon completion, provide it */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) {
/* notify_completion => notify the parent of the termination
* of this child job. So get the parent jobid info */
npptr = &parent;
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&npptr, OPAL_NAME)) {
/* notify everyone who asked for it */
target.jobid = jdata->jobid;
target.vpid = ORTE_VPID_WILDCARD;
_send_notification(OPAL_ERR_JOB_TERMINATED, pdata->state, &target, ORTE_NAME_WILDCARD);
} else {
target.jobid = jdata->jobid;
target.vpid = ORTE_VPID_WILDCARD;
_send_notification(OPAL_ERR_JOB_TERMINATED, pdata->state, &target, &parent);
}
}
} else if (ORTE_PROC_STATE_TERMINATED < pdata->state &&
!orte_job_term_ordered) {
/* if this was an abnormal term, notify the other procs of the termination */

Просмотреть файл

@ -70,6 +70,8 @@ orte_state_base_module_t orte_state_dvm_module = {
orte_state_base_remove_proc_state
};
static void dvm_notify(int sd, short args, void *cbdata);
/* defined default state machine sequence - individual
* plm's must add a state for launching daemons
*/
@ -91,6 +93,7 @@ static orte_job_state_t launch_states[] = {
/* termination states */
ORTE_JOB_STATE_TERMINATED,
ORTE_JOB_STATE_NOTIFY_COMPLETED,
ORTE_JOB_STATE_NOTIFIED,
ORTE_JOB_STATE_ALL_JOBS_COMPLETE
};
static orte_state_cbfunc_t launch_callbacks[] = {
@ -109,6 +112,7 @@ static orte_state_cbfunc_t launch_callbacks[] = {
orte_plm_base_post_launch,
orte_plm_base_registered,
check_complete,
dvm_notify,
cleanup_job,
orte_quit
};
@ -518,3 +522,119 @@ static void cleanup_job(int sd, short args, void *cbdata)
OBJ_RELEASE(caddy);
}
typedef struct {
opal_list_t *info;
orte_job_t *jdata;
} mycaddy_t;
static void notify_complete(int status, void *cbdata)
{
mycaddy_t *mycaddy = (mycaddy_t*)cbdata;
OPAL_LIST_RELEASE(mycaddy->info);
ORTE_ACTIVATE_JOB_STATE(mycaddy->jdata, ORTE_JOB_STATE_NOTIFIED);
OBJ_RELEASE(mycaddy->jdata);
free(mycaddy);
}
static void dvm_notify(int sd, short args, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata = caddy->jdata;
orte_proc_t *pptr=NULL;
int ret;
opal_buffer_t *reply;
orte_daemon_cmd_flag_t command;
orte_grpcomm_signature_t *sig;
bool notify = true;
opal_list_t *info;
opal_value_t *val;
opal_process_name_t pname, *proc;
mycaddy_t *mycaddy;
/* see if there was any problem */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ABORTED_PROC, (void**)&pptr, OPAL_PTR) && NULL != pptr) {
ret = pptr->exit_code;
/* or whether we got cancelled by the user */
} else if (orte_get_attribute(&jdata->attributes, ORTE_JOB_CANCELLED, NULL, OPAL_BOOL)) {
ret = ORTE_ERR_JOB_CANCELLED;
} else {
ret = ORTE_SUCCESS;
}
if (0 == ret && orte_get_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION, NULL, OPAL_BOOL)) {
notify = false;
}
/* if the jobid matches that of the requestor, then don't notify */
proc = &pname;
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&proc, OPAL_NAME)) {
if (pname.jobid == jdata->jobid) {
notify = false;
}
}
if (notify) {
/* the source is the job that terminated */
pname.jobid = jdata->jobid;
pname.vpid = OPAL_VPID_WILDCARD;
info = OBJ_NEW(opal_list_t);
/* ensure this only goes to the job terminated event handler */
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
val->type = OPAL_BOOL;
val->data.flag = true;
opal_list_append(info, &val->super);
/* tell the server not to cache the event as subsequent jobs
* do not need to know about it */
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_EVENT_DO_NOT_CACHE);
val->type = OPAL_BOOL;
val->data.flag = true;
opal_list_append(info, &val->super);
/* provide the status */
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_JOB_TERM_STATUS);
val->type = OPAL_STATUS;
val->data.status = ret;
opal_list_append(info, &val->super);
/* if there was a problem, we need to send the requestor more info about what happened */
if (ORTE_SUCCESS != ret) {
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_PROCID);
val->type = OPAL_NAME;
val->data.name.jobid = jdata->jobid;
if (NULL != pptr) {
val->data.name.vpid = pptr->name.vpid;
} else {
val->data.name.vpid = ORTE_VPID_WILDCARD;
}
opal_list_append(info, &val->super);
}
mycaddy = (mycaddy_t*)malloc(sizeof(mycaddy_t));
mycaddy->info = info;
OBJ_RETAIN(jdata);
mycaddy->jdata = jdata;
opal_pmix.server_notify_event(OPAL_ERR_JOB_TERMINATED, &pname,
info, notify_complete, mycaddy);
}
/* now ensure that _all_ daemons know that this job has terminated so even
* those that did not participate in it will know to cleanup the resources
* they assigned to the job. This is necessary now that the mapping function
* has been moved to the backend daemons - otherwise, non-participating daemons
* retain the slot assignments on the participating daemons, and then incorrectly
* map subsequent jobs thinking those nodes are still "busy" */
reply = OBJ_NEW(opal_buffer_t);
command = ORTE_DAEMON_DVM_CLEANUP_JOB_CMD;
opal_dss.pack(reply, &command, 1, ORTE_DAEMON_CMD);
opal_dss.pack(reply, &jdata->jobid, 1, ORTE_JOBID);
sig = OBJ_NEW(orte_grpcomm_signature_t);
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig->signature[0].vpid = ORTE_VPID_WILDCARD;
orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, reply);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
}

Просмотреть файл

@ -20,8 +20,11 @@
#include <string.h>
#include "opal/util/output.h"
#include "opal/mca/pmix/pmix.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/plm/base/base.h"
#include "orte/mca/ras/base/base.h"
@ -62,6 +65,8 @@ orte_state_base_module_t orte_state_hnp_module = {
orte_state_base_remove_proc_state
};
static void hnp_notify(int sd, short args, void *cbdata);
/* defined default state machine sequence - individual
* plm's must add a state for launching daemons
*/
@ -83,6 +88,7 @@ static orte_job_state_t launch_states[] = {
/* termination states */
ORTE_JOB_STATE_TERMINATED,
ORTE_JOB_STATE_NOTIFY_COMPLETED,
ORTE_JOB_STATE_NOTIFIED,
ORTE_JOB_STATE_ALL_JOBS_COMPLETE
};
static orte_state_cbfunc_t launch_callbacks[] = {
@ -101,6 +107,7 @@ static orte_state_cbfunc_t launch_callbacks[] = {
orte_plm_base_post_launch,
orte_plm_base_registered,
orte_state_base_check_all_complete,
hnp_notify,
orte_state_base_cleanup_job,
orte_quit
};
@ -196,3 +203,160 @@ static int finalize(void)
return ORTE_SUCCESS;
}
static void _send_notification(int status,
orte_proc_state_t state,
orte_process_name_t *proc,
orte_process_name_t *target)
{
opal_buffer_t *buf;
orte_grpcomm_signature_t sig;
int rc;
opal_value_t kv, *kvptr;
orte_process_name_t daemon;
buf = OBJ_NEW(opal_buffer_t);
opal_output_verbose(5, orte_state_base_framework.framework_output,
"%s state:hnp:sending notification %s proc %s target %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_ERROR_NAME(status),
ORTE_NAME_PRINT(proc),
ORTE_NAME_PRINT(target));
/* pack the status */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
/* the source is the proc */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, proc, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
if (OPAL_ERR_PROC_ABORTED == status) {
/* we will pass three opal_value_t's */
rc = 3;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
/* pass along the affected proc(s) */
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
kv.type = OPAL_NAME;
kv.data.name.jobid = proc->jobid;
kv.data.name.vpid = proc->vpid;
kvptr = &kv;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
OBJ_RELEASE(buf);
return;
}
OBJ_DESTRUCT(&kv);
} else {
/* we are going to pass two opal_value_t's */
rc = 2;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
}
/* pass along the affected proc(s) */
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
kv.type = OPAL_NAME;
kv.data.name.jobid = proc->jobid;
kv.data.name.vpid = proc->vpid;
kvptr = &kv;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
OBJ_RELEASE(buf);
return;
}
OBJ_DESTRUCT(&kv);
/* pass along the proc(s) to be notified */
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE);
kv.type = OPAL_NAME;
kv.data.name.jobid = target->jobid;
kv.data.name.vpid = target->vpid;
kvptr = &kv;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
OBJ_RELEASE(buf);
return;
}
OBJ_DESTRUCT(&kv);
/* if the targets are a wildcard, then xcast it to everyone */
if (ORTE_VPID_WILDCARD == target->vpid) {
OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t);
sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig.signature[0].vpid = ORTE_VPID_WILDCARD;
sig.sz = 1;
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, buf))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&sig);
OBJ_RELEASE(buf);
} else {
/* get the daemon hosting the proc to be notified */
daemon.jobid = ORTE_PROC_MY_NAME->jobid;
daemon.vpid = orte_get_proc_daemon_vpid(target);
/* send the notification to that daemon */
opal_output_verbose(5, orte_state_base_framework.framework_output,
"%s state:base:sending notification %s to proc %s at daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_ERROR_NAME(status),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(&daemon));
if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
&daemon, buf,
ORTE_RML_TAG_NOTIFICATION,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
}
}
static void hnp_notify(int sd, short args, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata = caddy->jdata;
orte_process_name_t parent, target, *npptr;
/* if they requested notification upon completion, provide it */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) {
/* notify_completion => notify the parent of the termination
* of this child job. So get the parent jobid info */
npptr = &parent;
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&npptr, OPAL_NAME)) {
/* notify everyone who asked for it */
target.jobid = jdata->jobid;
target.vpid = ORTE_VPID_WILDCARD;
_send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, ORTE_NAME_WILDCARD);
} else {
target.jobid = jdata->jobid;
target.vpid = ORTE_VPID_WILDCARD;
_send_notification(OPAL_ERR_JOB_TERMINATED, caddy->proc_state, &target, &parent);
}
}
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFIED);
OBJ_RELEASE(caddy);
}

Просмотреть файл

@ -106,6 +106,8 @@ static struct {
bool set_sid;
bool daemonize;
bool system_server;
char *report_uri;
bool remote_connections;
} myglobals;
static opal_cmd_line_init_t cmd_line_init[] = {
@ -170,13 +172,20 @@ static opal_cmd_line_init_t cmd_line_init[] = {
&myglobals.system_server, OPAL_CMD_LINE_TYPE_BOOL,
"Provide a system-level server connection point - only one allowed per node" },
{ NULL, '\0', "report-uri", "report-uri", 1,
&myglobals.report_uri, OPAL_CMD_LINE_TYPE_STRING,
"Printout URI on stdout [-], stderr [+], or a file [anything else]",
OPAL_CMD_LINE_OTYPE_DEBUG },
{ NULL, '\0', "remote-tools", "remote-tools", 0,
&myglobals.remote_connections, OPAL_CMD_LINE_TYPE_BOOL,
"Enable connections from remote tools" },
/* End of list */
{ NULL, '\0', NULL, NULL, 0,
NULL, OPAL_CMD_LINE_TYPE_NULL, NULL }
};
static void notify_requestor(int sd, short args, void *cbdata);
int main(int argc, char *argv[])
{
int rc, i, j;
@ -291,6 +300,13 @@ int main(int argc, char *argv[])
}
/* always act as session-level PMIx server */
opal_setenv(OPAL_MCA_PREFIX"pmix_session_server", "1", true, &environ);
/* if we were asked to report a uri, set the MCA param to do so */
if (NULL != myglobals.report_uri) {
opal_setenv("PMIX_MCA_ptl_tcp_report_uri", myglobals.report_uri, true, &environ);
}
if (myglobals.remote_connections) {
opal_setenv("PMIX_MCA_ptl_tcp_remote_connections", "1", true, &environ);
}
/* Setup MCA params */
orte_register_params();
@ -446,15 +462,6 @@ int main(int argc, char *argv[])
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON,
ORTE_RML_PERSISTENT, orte_daemon_recv, NULL);
/* override the notify_completed state so we can send a message
* back to anyone who submits a job to us telling them the job
* completed */
if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_NOTIFY_COMPLETED, notify_requestor))) {
ORTE_ERROR_LOG(rc);
ORTE_UPDATE_EXIT_STATUS(rc);
exit(orte_exit_status);
}
/* spawn the DVM - we skip the initial steps as this
* isn't a user-level application */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOCATE);
@ -473,94 +480,3 @@ int main(int argc, char *argv[])
}
exit(orte_exit_status);
}
static void notify_complete(int status, void *cbdata)
{
opal_list_t *info = (opal_list_t*)cbdata;
OPAL_LIST_RELEASE(info);
}
static void notify_requestor(int sd, short args, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata = caddy->jdata;
orte_proc_t *pptr=NULL;
int ret;
opal_buffer_t *reply;
orte_daemon_cmd_flag_t command;
orte_grpcomm_signature_t *sig;
bool notify = true;
opal_list_t *info;
opal_value_t *val;
/* see if there was any problem */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ABORTED_PROC, (void**)&pptr, OPAL_PTR) && NULL != pptr) {
ret = pptr->exit_code;
/* or whether we got cancelled by the user */
} else if (orte_get_attribute(&jdata->attributes, ORTE_JOB_CANCELLED, NULL, OPAL_BOOL)) {
ret = ORTE_ERR_JOB_CANCELLED;
} else {
ret = ORTE_SUCCESS;
}
if (0 == ret && orte_get_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION, NULL, OPAL_BOOL)) {
notify = false;
}
if (notify) {
info = OBJ_NEW(opal_list_t);
/* ensure this only goes to the job terminated event handler */
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
val->type = OPAL_BOOL;
val->data.flag = true;
opal_list_append(info, &val->super);
/* tell the server not to cache the event as subsequent jobs
* do not need to know about it */
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_EVENT_DO_NOT_CACHE);
val->type = OPAL_BOOL;
val->data.flag = true;
opal_list_append(info, &val->super);
/* provide the status */
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_JOB_TERM_STATUS);
val->type = OPAL_STATUS;
val->data.status = ret;
opal_list_append(info, &val->super);
/* if there was a problem, we need to send the requestor more info about what happened */
if (ORTE_SUCCESS != ret) {
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_PROCID);
val->type = OPAL_NAME;
val->data.name.jobid = jdata->jobid;
if (NULL != pptr) {
val->data.name.vpid = pptr->name.vpid;
} else {
val->data.name.vpid = ORTE_VPID_WILDCARD;
}
opal_list_append(info, &val->super);
}
opal_pmix.notify_event(OPAL_ERR_JOB_TERMINATED, NULL,
OPAL_PMIX_RANGE_GLOBAL, info,
notify_complete, info);
}
/* now ensure that _all_ daemons know that this job has terminated so even
* those that did not participate in it will know to cleanup the resources
* they assigned to the job. This is necessary now that the mapping function
* has been moved to the backend daemons - otherwise, non-participating daemons
* retain the slot assignments on the participating daemons, and then incorrectly
* map subsequent jobs thinking those nodes are still "busy" */
reply = OBJ_NEW(opal_buffer_t);
command = ORTE_DAEMON_DVM_CLEANUP_JOB_CMD;
opal_dss.pack(reply, &command, 1, ORTE_DAEMON_CMD);
opal_dss.pack(reply, &jdata->jobid, 1, ORTE_JOBID);
sig = OBJ_NEW(orte_grpcomm_signature_t);
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig->signature[0].vpid = ORTE_VPID_WILDCARD;
orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, reply);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
}

Просмотреть файл

@ -105,6 +105,7 @@ static int create_app(int argc, char* argv[],
bool *made_app, char ***app_env);
static int parse_locals(opal_list_t *jdata, int argc, char* argv[]);
static void set_classpath_jar_file(opal_pmix_app_t *app, int index, char *jarfile);
static size_t evid = INT_MAX;
static opal_cmd_line_init_t cmd_line_init[] = {
@ -154,12 +155,15 @@ static void regcbfunc(int status, size_t ref, void *cbdata)
{
opal_pmix_lock_t *lock = (opal_pmix_lock_t*)cbdata;
OPAL_ACQUIRE_OBJECT(lock);
evid = ref;
OPAL_PMIX_WAKEUP_THREAD(lock);
}
static void release(int sd, short args, void *cbdata)
static void opcbfunc(int status, void *cbdata)
{
active = false;
opal_pmix_lock_t *lock = (opal_pmix_lock_t*)cbdata;
OPAL_ACQUIRE_OBJECT(lock);
OPAL_PMIX_WAKEUP_THREAD(lock);
}
static bool fired = false;
@ -184,7 +188,7 @@ static void evhandler(int status,
}
if (!fired) {
fired = true;
ORTE_ACTIVATE_PROC_STATE(ORTE_PROC_MY_NAME, ORTE_PROC_STATE_TERMINATED);
active = false;
}
}
@ -356,6 +360,10 @@ int prun(int argc, char *argv[])
opal_setenv(OPAL_MCA_PREFIX"ess_tool_server_pid", param, true, &environ);
free(param);
}
/* if they specified the URI, then pass it along */
if (NULL != orte_cmd_options.hnp) {
opal_setenv("PMIX_MCA_ptl_tcp_server_uri", orte_cmd_options.hnp, true, &environ);
}
/* now initialize ORTE */
if (OPAL_SUCCESS != (rc = orte_init(&argc, &argv, ORTE_PROC_TOOL))) {
@ -381,8 +389,6 @@ int prun(int argc, char *argv[])
goto DONE;
}
orte_state.add_proc_state(ORTE_PROC_STATE_TERMINATED, release, ORTE_SYS_PRI);
/* get here if they want to run an application, so let's parse
* the cmd line to get it */
@ -621,6 +627,10 @@ int prun(int argc, char *argv[])
while (active) {
nanosleep(&tp, NULL);
}
OPAL_PMIX_CONSTRUCT_LOCK(&lock);
opal_pmix.deregister_evhandler(evid, opcbfunc, &lock);
OPAL_PMIX_WAIT_THREAD(&lock);
OPAL_PMIX_DESTRUCT_LOCK(&lock);
DONE:
/* cleanup and leave */