1
1

Merge pull request #1756 from rhc54/topic/hangs

Fix rare hangs observed on OS-X by properly thread-shifting upcalls from the PMIx server into ORTE
Этот коммит содержится в:
rhc54 2016-06-06 07:41:58 -07:00
родитель db70852d31 dd0f843843
Коммит c2a02ab06c
3 изменённых файлов: 174 добавлений и 74 удалений

Просмотреть файл

@ -53,7 +53,6 @@ typedef uint32_t orte_proc_state_t;
#define ORTE_PROC_STATE_IOF_COMPLETE 6 /* io forwarding pipes have closed */
#define ORTE_PROC_STATE_WAITPID_FIRED 7 /* waitpid fired on process */
#define ORTE_PROC_STATE_MODEX_READY 8 /* all modex info has been stored */
/*
* Define a "boundary" so we can easily and quickly determine
* if a proc is still running or not - any value less than

Просмотреть файл

@ -43,50 +43,141 @@
#include "pmix_server_internal.h"
int pmix_server_client_connected_fn(opal_process_name_t *proc, void *server_object)
{
ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_REGISTERED);
return ORTE_SUCCESS;
}
int pmix_server_client_finalized_fn(opal_process_name_t *proc, void *server_object,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
static void _client_conn(int sd, short args, void *cbdata)
{
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
orte_job_t *jdata;
orte_proc_t *p, *ptr;
int i;
if (NULL != cbdata) {
if (NULL != cd->server_object) {
/* we were passed back the orte_proc_t */
p = (orte_proc_t*)cbdata;
p = (orte_proc_t*)cd->server_object;
} else {
/* find the named process */
p = NULL;
if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
return ORTE_ERR_NOT_FOUND;
if (NULL == (jdata = orte_get_job_data_object(cd->proc->jobid))) {
return;
}
for (i=0; i < jdata->procs->size; i++) {
if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
if (proc->jobid != ptr->name.jobid) {
if (cd->proc->jobid != ptr->name.jobid) {
continue;
}
if (proc->vpid == ptr->name.vpid) {
if (cd->proc->vpid == ptr->name.vpid) {
p = ptr;
break;
}
}
}
if (NULL != p) {
p->state = ORTE_PROC_STATE_TERMINATED;
/* release the caller */
if (NULL != cbfunc) {
cbfunc(ORTE_SUCCESS, cbdata);
}
return ORTE_SUCCESS;
ORTE_FLAG_SET(p, ORTE_PROC_FLAG_REG);
ORTE_ACTIVATE_PROC_STATE(&p->name, ORTE_PROC_STATE_REGISTERED);
}
return ORTE_ERR_NOT_FOUND;
OBJ_RELEASE(cd);
}
int pmix_server_client_connected_fn(opal_process_name_t *proc, void *server_object)
{
/* need to thread-shift this request as we are going
* to access our global list of registered events */
ORTE_PMIX_THREADSHIFT(proc, server_object, OPAL_SUCCESS, NULL,
NULL, _client_conn, NULL, NULL);
return ORTE_SUCCESS;
}
static void _client_finalized(int sd, short args, void *cbdata)
{
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
orte_job_t *jdata;
orte_proc_t *p, *ptr;
int i;
if (NULL != cd->server_object) {
/* we were passed back the orte_proc_t */
p = (orte_proc_t*)cd->server_object;
} else {
/* find the named process */
p = NULL;
if (NULL == (jdata = orte_get_job_data_object(cd->proc->jobid))) {
return;
}
for (i=0; i < jdata->procs->size; i++) {
if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
if (cd->proc->jobid != ptr->name.jobid) {
continue;
}
if (cd->proc->vpid == ptr->name.vpid) {
p = ptr;
break;
}
}
}
if (NULL != p) {
ORTE_FLAG_SET(p, ORTE_PROC_FLAG_HAS_DEREG);
/* release the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(ORTE_SUCCESS, cd->cbdata);
}
}
OBJ_RELEASE(cd);
}
int pmix_server_client_finalized_fn(opal_process_name_t *proc, void *server_object,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
/* need to thread-shift this request as we are going
* to access our global list of registered events */
ORTE_PMIX_THREADSHIFT(proc, server_object, OPAL_SUCCESS, NULL,
NULL, _client_finalized, cbfunc, cbdata);
return ORTE_SUCCESS;
}
static void _client_abort(int sd, short args, void *cbdata)
{
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
orte_job_t *jdata;
orte_proc_t *p, *ptr;
int i;
if (NULL != cd->server_object) {
p = (orte_proc_t*)cd->server_object;
} else {
/* find the named process */
p = NULL;
if (NULL == (jdata = orte_get_job_data_object(cd->proc->jobid))) {
return;
}
for (i=0; i < jdata->procs->size; i++) {
if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
if (cd->proc->jobid != ptr->name.jobid) {
continue;
}
if (cd->proc->vpid == ptr->name.vpid) {
p = ptr;
break;
}
}
}
if (NULL != p) {
p->exit_code = cd->status;
ORTE_ACTIVATE_PROC_STATE(&p->name, ORTE_PROC_STATE_CALLED_ABORT);
}
ORTE_UPDATE_EXIT_STATUS(cd->status);
/* release the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(OPAL_SUCCESS, cd->cbdata);
}
OBJ_RELEASE(cd);
}
int pmix_server_abort_fn(opal_process_name_t *proc, void *server_object,
@ -94,21 +185,11 @@ int pmix_server_abort_fn(opal_process_name_t *proc, void *server_object,
opal_list_t *procs_to_abort,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
orte_proc_t *p;
if (NULL != server_object) {
p = (orte_proc_t*)server_object;
p->exit_code = status;
}
ORTE_UPDATE_EXIT_STATUS(status);
ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_CALLED_ABORT);
/* release the caller */
if (NULL != cbfunc) {
cbfunc(OPAL_SUCCESS, cbdata);
}
return OPAL_SUCCESS;
/* need to thread-shift this request as we are going
* to access our global list of registered events */
ORTE_PMIX_THREADSHIFT(proc, server_object, status, msg,
procs_to_abort, _client_abort, cbfunc, cbdata);
return ORTE_SUCCESS;
}
static void _register_events(int sd, short args, void *cbdata)

Просмотреть файл

@ -74,6 +74,10 @@ OBJ_CLASS_DECLARATION(pmix_server_req_t);
typedef struct {
opal_object_t super;
opal_event_t ev;
int status;
opal_process_name_t *proc;
const char *msg;
void *server_object;
opal_list_t *procs;
opal_list_t *eprocs;
opal_list_t *info;
@ -90,46 +94,62 @@ typedef struct {
} orte_pmix_mdx_caddy_t;
OBJ_CLASS_DECLARATION(orte_pmix_mdx_caddy_t);
#define ORTE_DMX_REQ(p, cf, ocf, ocd) \
do { \
pmix_server_req_t *_req; \
_req = OBJ_NEW(pmix_server_req_t); \
_req->target = (p); \
_req->mdxcbfunc = (ocf); \
_req->cbdata = (ocd); \
opal_event_set(orte_event_base, &(_req->ev), \
-1, OPAL_EV_WRITE, (cf), _req); \
opal_event_set_priority(&(_req->ev), ORTE_MSG_PRI); \
opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \
} while(0);
#define ORTE_DMX_REQ(p, cf, ocf, ocd) \
do { \
pmix_server_req_t *_req; \
_req = OBJ_NEW(pmix_server_req_t); \
_req->target = (p); \
_req->mdxcbfunc = (ocf); \
_req->cbdata = (ocd); \
opal_event_set(orte_event_base, &(_req->ev), \
-1, OPAL_EV_WRITE, (cf), _req); \
opal_event_set_priority(&(_req->ev), ORTE_MSG_PRI); \
opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \
} while(0);
#define ORTE_SPN_REQ(j, cf, ocf, ocd) \
do { \
pmix_server_req_t *_req; \
_req = OBJ_NEW(pmix_server_req_t); \
_req->jdata = (j); \
_req->spcbfunc = (ocf); \
_req->cbdata = (ocd); \
opal_event_set(orte_event_base, &(_req->ev), \
-1, OPAL_EV_WRITE, (cf), _req); \
opal_event_set_priority(&(_req->ev), ORTE_MSG_PRI); \
opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \
} while(0);
#define ORTE_SPN_REQ(j, cf, ocf, ocd) \
do { \
pmix_server_req_t *_req; \
_req = OBJ_NEW(pmix_server_req_t); \
_req->jdata = (j); \
_req->spcbfunc = (ocf); \
_req->cbdata = (ocd); \
opal_event_set(orte_event_base, &(_req->ev), \
-1, OPAL_EV_WRITE, (cf), _req); \
opal_event_set_priority(&(_req->ev), ORTE_MSG_PRI); \
opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \
} while(0);
#define ORTE_PMIX_OPERATION(p, i, fn, cf, cb) \
do { \
orte_pmix_server_op_caddy_t *_cd; \
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
_cd->procs = (p); \
_cd->info = (i); \
_cd->cbfunc = (cf); \
_cd->cbdata = (cb); \
opal_event_set(orte_event_base, &(_cd->ev), -1, \
OPAL_EV_WRITE, (fn), _cd); \
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
} while(0);
#define ORTE_PMIX_OPERATION(p, i, fn, cf, cb) \
do { \
orte_pmix_server_op_caddy_t *_cd; \
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
_cd->procs = (p); \
_cd->info = (i); \
_cd->cbfunc = (cf); \
_cd->cbdata = (cb); \
opal_event_set(orte_event_base, &(_cd->ev), -1, \
OPAL_EV_WRITE, (fn), _cd); \
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
} while(0);
#define ORTE_PMIX_THREADSHIFT(p, s, st, m, pl, fn, cf, cb) \
do { \
orte_pmix_server_op_caddy_t *_cd; \
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
_cd->proc = (p); \
_cd->server_object = (s); \
_cd->status = (st); \
_cd->msg = (m); \
_cd->procs = (pl); \
_cd->cbfunc = (cf); \
_cd->cbdata = (cb); \
opal_event_set(orte_event_base, &(_cd->ev), -1, \
OPAL_EV_WRITE, (fn), _cd); \
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
} while(0);
/* define the server module functions */
extern int pmix_server_client_connected_fn(opal_process_name_t *proc, void* server_object);