diff --git a/orte/mca/plm/plm_types.h b/orte/mca/plm/plm_types.h index 1e27f9c63f..bf316e7e1e 100644 --- a/orte/mca/plm/plm_types.h +++ b/orte/mca/plm/plm_types.h @@ -53,7 +53,6 @@ typedef uint32_t orte_proc_state_t; #define ORTE_PROC_STATE_IOF_COMPLETE 6 /* io forwarding pipes have closed */ #define ORTE_PROC_STATE_WAITPID_FIRED 7 /* waitpid fired on process */ #define ORTE_PROC_STATE_MODEX_READY 8 /* all modex info has been stored */ - /* * Define a "boundary" so we can easily and quickly determine * if a proc is still running or not - any value less than diff --git a/orte/orted/pmix/pmix_server_gen.c b/orte/orted/pmix/pmix_server_gen.c index 65edab73ab..b7bbdbe4eb 100644 --- a/orte/orted/pmix/pmix_server_gen.c +++ b/orte/orted/pmix/pmix_server_gen.c @@ -43,50 +43,141 @@ #include "pmix_server_internal.h" -int pmix_server_client_connected_fn(opal_process_name_t *proc, void *server_object) -{ - ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_REGISTERED); - return ORTE_SUCCESS; -} - -int pmix_server_client_finalized_fn(opal_process_name_t *proc, void *server_object, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata) +static void _client_conn(int sd, short args, void *cbdata) { + orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata; orte_job_t *jdata; orte_proc_t *p, *ptr; int i; - if (NULL != cbdata) { + if (NULL != cd->server_object) { /* we were passed back the orte_proc_t */ - p = (orte_proc_t*)cbdata; + p = (orte_proc_t*)cd->server_object; } else { /* find the named process */ p = NULL; - if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) { - return ORTE_ERR_NOT_FOUND; + if (NULL == (jdata = orte_get_job_data_object(cd->proc->jobid))) { + return; } for (i=0; i < jdata->procs->size; i++) { if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) { continue; } - if (proc->jobid != ptr->name.jobid) { + if (cd->proc->jobid != ptr->name.jobid) { continue; } - if (proc->vpid == ptr->name.vpid) { + if (cd->proc->vpid == ptr->name.vpid) { p = ptr; break; } } } if (NULL != p) { - p->state = ORTE_PROC_STATE_TERMINATED; - /* release the caller */ - if (NULL != cbfunc) { - cbfunc(ORTE_SUCCESS, cbdata); - } - return ORTE_SUCCESS; + ORTE_FLAG_SET(p, ORTE_PROC_FLAG_REG); + ORTE_ACTIVATE_PROC_STATE(&p->name, ORTE_PROC_STATE_REGISTERED); } - return ORTE_ERR_NOT_FOUND; + OBJ_RELEASE(cd); +} + +int pmix_server_client_connected_fn(opal_process_name_t *proc, void *server_object) +{ + /* need to thread-shift this request as we are going + * to access our global list of registered events */ + ORTE_PMIX_THREADSHIFT(proc, server_object, OPAL_SUCCESS, NULL, + NULL, _client_conn, NULL, NULL); + return ORTE_SUCCESS; +} + +static void _client_finalized(int sd, short args, void *cbdata) +{ + orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata; + orte_job_t *jdata; + orte_proc_t *p, *ptr; + int i; + + if (NULL != cd->server_object) { + /* we were passed back the orte_proc_t */ + p = (orte_proc_t*)cd->server_object; + } else { + /* find the named process */ + p = NULL; + if (NULL == (jdata = orte_get_job_data_object(cd->proc->jobid))) { + return; + } + for (i=0; i < jdata->procs->size; i++) { + if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) { + continue; + } + if (cd->proc->jobid != ptr->name.jobid) { + continue; + } + if (cd->proc->vpid == ptr->name.vpid) { + p = ptr; + break; + } + } + } + if (NULL != p) { + ORTE_FLAG_SET(p, ORTE_PROC_FLAG_HAS_DEREG); + /* release the caller */ + if (NULL != cd->cbfunc) { + cd->cbfunc(ORTE_SUCCESS, cd->cbdata); + } + } + OBJ_RELEASE(cd); +} + +int pmix_server_client_finalized_fn(opal_process_name_t *proc, void *server_object, + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) +{ + /* need to thread-shift this request as we are going + * to access our global list of registered events */ + ORTE_PMIX_THREADSHIFT(proc, server_object, OPAL_SUCCESS, NULL, + NULL, _client_finalized, cbfunc, cbdata); + return ORTE_SUCCESS; + +} + +static void _client_abort(int sd, short args, void *cbdata) +{ + orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata; + orte_job_t *jdata; + orte_proc_t *p, *ptr; + int i; + + if (NULL != cd->server_object) { + p = (orte_proc_t*)cd->server_object; + } else { + /* find the named process */ + p = NULL; + if (NULL == (jdata = orte_get_job_data_object(cd->proc->jobid))) { + return; + } + for (i=0; i < jdata->procs->size; i++) { + if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) { + continue; + } + if (cd->proc->jobid != ptr->name.jobid) { + continue; + } + if (cd->proc->vpid == ptr->name.vpid) { + p = ptr; + break; + } + } + } + if (NULL != p) { + p->exit_code = cd->status; + ORTE_ACTIVATE_PROC_STATE(&p->name, ORTE_PROC_STATE_CALLED_ABORT); + } + + ORTE_UPDATE_EXIT_STATUS(cd->status); + + /* release the caller */ + if (NULL != cd->cbfunc) { + cd->cbfunc(OPAL_SUCCESS, cd->cbdata); + } + OBJ_RELEASE(cd); } int pmix_server_abort_fn(opal_process_name_t *proc, void *server_object, @@ -94,21 +185,11 @@ int pmix_server_abort_fn(opal_process_name_t *proc, void *server_object, opal_list_t *procs_to_abort, opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { - orte_proc_t *p; - - if (NULL != server_object) { - p = (orte_proc_t*)server_object; - p->exit_code = status; - } - - ORTE_UPDATE_EXIT_STATUS(status); - ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_CALLED_ABORT); - - /* release the caller */ - if (NULL != cbfunc) { - cbfunc(OPAL_SUCCESS, cbdata); - } - return OPAL_SUCCESS; + /* need to thread-shift this request as we are going + * to access our global list of registered events */ + ORTE_PMIX_THREADSHIFT(proc, server_object, status, msg, + procs_to_abort, _client_abort, cbfunc, cbdata); + return ORTE_SUCCESS; } static void _register_events(int sd, short args, void *cbdata) diff --git a/orte/orted/pmix/pmix_server_internal.h b/orte/orted/pmix/pmix_server_internal.h index c93d3624f4..39cd1d8db8 100644 --- a/orte/orted/pmix/pmix_server_internal.h +++ b/orte/orted/pmix/pmix_server_internal.h @@ -74,6 +74,10 @@ OBJ_CLASS_DECLARATION(pmix_server_req_t); typedef struct { opal_object_t super; opal_event_t ev; + int status; + opal_process_name_t *proc; + const char *msg; + void *server_object; opal_list_t *procs; opal_list_t *eprocs; opal_list_t *info; @@ -90,46 +94,62 @@ typedef struct { } orte_pmix_mdx_caddy_t; OBJ_CLASS_DECLARATION(orte_pmix_mdx_caddy_t); -#define ORTE_DMX_REQ(p, cf, ocf, ocd) \ -do { \ - pmix_server_req_t *_req; \ - _req = OBJ_NEW(pmix_server_req_t); \ - _req->target = (p); \ - _req->mdxcbfunc = (ocf); \ - _req->cbdata = (ocd); \ - opal_event_set(orte_event_base, &(_req->ev), \ - -1, OPAL_EV_WRITE, (cf), _req); \ - opal_event_set_priority(&(_req->ev), ORTE_MSG_PRI); \ - opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \ -} while(0); +#define ORTE_DMX_REQ(p, cf, ocf, ocd) \ + do { \ + pmix_server_req_t *_req; \ + _req = OBJ_NEW(pmix_server_req_t); \ + _req->target = (p); \ + _req->mdxcbfunc = (ocf); \ + _req->cbdata = (ocd); \ + opal_event_set(orte_event_base, &(_req->ev), \ + -1, OPAL_EV_WRITE, (cf), _req); \ + opal_event_set_priority(&(_req->ev), ORTE_MSG_PRI); \ + opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \ + } while(0); -#define ORTE_SPN_REQ(j, cf, ocf, ocd) \ -do { \ - pmix_server_req_t *_req; \ - _req = OBJ_NEW(pmix_server_req_t); \ - _req->jdata = (j); \ - _req->spcbfunc = (ocf); \ - _req->cbdata = (ocd); \ - opal_event_set(orte_event_base, &(_req->ev), \ - -1, OPAL_EV_WRITE, (cf), _req); \ - opal_event_set_priority(&(_req->ev), ORTE_MSG_PRI); \ - opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \ -} while(0); +#define ORTE_SPN_REQ(j, cf, ocf, ocd) \ + do { \ + pmix_server_req_t *_req; \ + _req = OBJ_NEW(pmix_server_req_t); \ + _req->jdata = (j); \ + _req->spcbfunc = (ocf); \ + _req->cbdata = (ocd); \ + opal_event_set(orte_event_base, &(_req->ev), \ + -1, OPAL_EV_WRITE, (cf), _req); \ + opal_event_set_priority(&(_req->ev), ORTE_MSG_PRI); \ + opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \ + } while(0); -#define ORTE_PMIX_OPERATION(p, i, fn, cf, cb) \ -do { \ - orte_pmix_server_op_caddy_t *_cd; \ - _cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \ - _cd->procs = (p); \ - _cd->info = (i); \ - _cd->cbfunc = (cf); \ - _cd->cbdata = (cb); \ - opal_event_set(orte_event_base, &(_cd->ev), -1, \ - OPAL_EV_WRITE, (fn), _cd); \ - opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \ - opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \ -} while(0); +#define ORTE_PMIX_OPERATION(p, i, fn, cf, cb) \ + do { \ + orte_pmix_server_op_caddy_t *_cd; \ + _cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \ + _cd->procs = (p); \ + _cd->info = (i); \ + _cd->cbfunc = (cf); \ + _cd->cbdata = (cb); \ + opal_event_set(orte_event_base, &(_cd->ev), -1, \ + OPAL_EV_WRITE, (fn), _cd); \ + opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \ + opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \ + } while(0); +#define ORTE_PMIX_THREADSHIFT(p, s, st, m, pl, fn, cf, cb) \ + do { \ + orte_pmix_server_op_caddy_t *_cd; \ + _cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \ + _cd->proc = (p); \ + _cd->server_object = (s); \ + _cd->status = (st); \ + _cd->msg = (m); \ + _cd->procs = (pl); \ + _cd->cbfunc = (cf); \ + _cd->cbdata = (cb); \ + opal_event_set(orte_event_base, &(_cd->ev), -1, \ + OPAL_EV_WRITE, (fn), _cd); \ + opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \ + opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \ + } while(0); /* define the server module functions */ extern int pmix_server_client_connected_fn(opal_process_name_t *proc, void* server_object);