1
1
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-09-11 17:38:21 -07:00
родитель 29a53b0269
Коммит 3477079804
26 изменённых файлов: 296 добавлений и 55 удалений

Просмотреть файл

@ -48,7 +48,16 @@ AC_DEFUN([MCA_opal_pmix_ext2x_CONFIG],[
[$1
# need to set the wrapper flags for static builds
pmix_ext2x_WRAPPER_EXTRA_LDFLAGS=$opal_external_pmix_LDFLAGS
pmix_ext2x_WRAPPER_EXTRA_LIBS=$opal_external_pmix_LIBS],
pmix_ext2x_WRAPPER_EXTRA_LIBS=$opal_external_pmix_LIBS
# and the flags for prun
OPAL_PMIX_CPPFLAGS="-I$opal_external_pmix_CPPFLAGS"
AC_SUBST(OPAL_PMIX_CPPFLAGS)
OPAL_PMIX_LDFLAGS=$opal_external_pmix_LDFLAGS
AC_SUBST(OPAL_PMIX_LDFLAGS)
OPAL_PMIX_LDADD=
AC_SUBST(OPAL_PMIX_LDADD)
OPAL_PMIX_LIBS=-lpmix
AC_SUBST(OPAL_PMIX_LIBS)],
[$2])],
[$2])

Просмотреть файл

@ -86,7 +86,16 @@ AC_DEFUN([MCA_opal_pmix_pmix2x_CONFIG],[
opal_pmix_pmix2x_LDFLAGS=
opal_pmix_pmix2x_LIBS="$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix/src/libpmix.la"
opal_pmix_pmix2x_CPPFLAGS="-I$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix/include -I$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix -I$OPAL_TOP_SRCDIR/$opal_pmix_pmix2x_basedir/pmix/include -I$OPAL_TOP_SRCDIR/$opal_pmix_pmix2x_basedir/pmix"
opal_pmix_pmix2x_DEPENDENCIES="$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix/src/libpmix.la"])
opal_pmix_pmix2x_DEPENDENCIES="$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix/src/libpmix.la"
# and the flags for prun
OPAL_PMIX_CPPFLAGS="-I$opal_pmix_pmix2x_CPPFLAGS"
AC_SUBST(OPAL_PMIX_CPPFLAGS)
OPAL_PMIX_LDADD=$opal_pmix_pmix2x_LIBS
AC_SUBST(OPAL_PMIX_LDADD)
OPAL_PMIX_LIBS=
AC_SUBST(OPAL_PMIX_LIBS)
OPAL_PMIX_LDFLAGS=
AC_SUBST(OPAL_PMIX_LDFLAGS)])
AC_SUBST([opal_pmix_pmix2x_LIBS])
AC_SUBST([opal_pmix_pmix2x_CPPFLAGS])

Просмотреть файл

@ -30,7 +30,7 @@ greek=
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".
repo_rev=gitaffab34
repo_rev=git8684147
# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in
@ -44,7 +44,7 @@ tarball_version=
# The date when this release was created
date="Sep 06, 2017"
date="Sep 11, 2017"
# The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library

Просмотреть файл

@ -153,6 +153,8 @@ typedef uint32_t pmix_rank_t;
#define PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH")
#define PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1")
#define PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads")
#define PMIX_REQUESTOR_IS_TOOL "pmix.req.tool" // (bool) requesting process is a tool
#define PMIX_REQUESTOR_IS_CLIENT "pmix.req.client" // (bool) requesting process is a client process
/* attributes for the USOCK rendezvous socket */
@ -330,6 +332,8 @@ typedef uint32_t pmix_rank_t;
#define PMIX_DEBUGGER_DAEMONS "pmix.debugger" // (bool) spawned app consists of debugger daemons
#define PMIX_COSPAWN_APP "pmix.cospawn" // (bool) designated app is to be spawned as a disconnected
// job - i.e., not part of the "comm_world" of the job
#define PMIX_SET_SESSION_CWD "pmix.ssncwd" // (bool) set the application's current working directory to
// the session working directory assigned by the RM
/* query attributes */
#define PMIX_QUERY_NAMESPACES "pmix.qry.ns" // (char*) request a comma-delimited list of active nspaces

Просмотреть файл

@ -100,6 +100,12 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer,
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:client_notify_recv - processing event");
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
return;
}
/* start the local notification chain */
chain = PMIX_NEW(pmix_event_chain_t);
if (NULL == chain) {

Просмотреть файл

@ -352,6 +352,13 @@ static void wait_cbfunc(struct pmix_peer_t *pr,
goto report;
}
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
ret = PMIX_ERR_UNREACH;
goto report;
}
/* unpack the returned status */
cnt = 1;
PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,

Просмотреть файл

@ -262,7 +262,13 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr,
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
return;
}
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
rc = PMIX_ERR_UNREACH;
} else {
rc = unpack_return(buf);
}
/* if a callback was provided, execute it */
if (NULL != cb->cbfunc.opfn) {

Просмотреть файл

@ -281,6 +281,13 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr,
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
return;
}
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
ret = PMIX_ERR_UNREACH;
goto done;
}
/* cache the proc id */
(void)strncpy(proc.nspace, cb->pname.nspace, PMIX_MAX_NSLEN);
proc.rank = cb->pname.rank;

Просмотреть файл

@ -526,6 +526,12 @@ static void wait_cbfunc(struct pmix_peer_t *pr,
rc = PMIX_ERR_BAD_PARAM;
goto report;
}
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
rc = PMIX_ERR_UNREACH;
goto report;
}
/* unpack the returned status */
cnt = 1;
@ -576,6 +582,12 @@ static void wait_lookup_cbfunc(struct pmix_peer_t *pr,
rc = PMIX_ERR_BAD_PARAM;
goto report;
}
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
rc = PMIX_ERR_UNREACH;
goto report;
}
/* set the defaults */
pdata = NULL;

Просмотреть файл

@ -218,6 +218,13 @@ static void wait_cbfunc(struct pmix_peer_t *pr,
"pmix:client recv callback activated with %d bytes",
(NULL == buf) ? -1 : (int)buf->bytes_used);
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
ret = PMIX_ERR_UNREACH;
goto report;
}
/* init */
memset(nspace, 0, PMIX_MAX_NSLEN+1);

Просмотреть файл

@ -37,7 +37,7 @@ static void relcbfunc(void *cbdata)
pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:query release callback");
"pmix:job_ctrl release callback");
if (NULL != cd->info) {
PMIX_INFO_FREE(cd->info, cd->ninfo);
@ -54,7 +54,19 @@ static void query_cbfunc(struct pmix_peer_t *peer,
int cnt;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:query cback from server");
"pmix:job_ctrl cback from server with %d bytes",
(int)buf->bytes_used);
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
/* release the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(PMIX_ERR_COMM_FAILURE, NULL, 0, cd->cbdata, NULL, NULL);
}
PMIX_RELEASE(cd);
return;
}
results = PMIX_NEW(pmix_shift_caddy_t);
@ -88,7 +100,7 @@ static void query_cbfunc(struct pmix_peer_t *peer,
complete:
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:query cback from server releasing");
"pmix:job_ctrl cback from server releasing");
/* release the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results);
@ -108,7 +120,7 @@ PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_
PMIX_ACQUIRE_THREAD(&pmix_global_lock);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: job control called");
"pmix: job control called with %d directives", (int)ndirs);
if (pmix_globals.init_cntr <= 0) {
PMIX_RELEASE_THREAD(&pmix_global_lock);

Просмотреть файл

@ -143,6 +143,10 @@ PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_buffer_t);
(b)->unpack_ptr = NULL; \
} while (0)
/* Convenience macro to check for empty buffer without
* exposing the internals */
#define PMIX_BUFFER_IS_EMPTY(b) \
0 == (b)->bytes_used
END_C_DECLS

Просмотреть файл

@ -147,7 +147,7 @@ pmix_status_t pmix_ptl_base_recv_blocking(int sd, char *data, size_t size)
pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
"blocking_recv received error %d:%s from remote - cycling",
pmix_socket_errno, strerror(pmix_socket_errno));
continue;
return PMIX_ERR_TEMP_UNAVAILABLE;
}
if (pmix_socket_errno != EINTR ) {
/* If we overflow the listen backlog, it's

Просмотреть файл

@ -518,6 +518,7 @@ static pmix_status_t try_connect(int *sd)
struct sockaddr_in6 *in6;
size_t len;
pmix_status_t rc;
bool retried = false;
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"pmix:tcp try connect to %s",
@ -591,6 +592,7 @@ static pmix_status_t try_connect(int *sd)
}
free(p);
retry:
/* establish the connection */
if (PMIX_SUCCESS != (rc = pmix_ptl_base_connect(&mca_ptl_tcp_component.connection, len, sd))) {
PMIX_ERROR_LOG(rc);
@ -606,8 +608,15 @@ static pmix_status_t try_connect(int *sd)
/* do whatever handshake is required */
if (PMIX_SUCCESS != (rc = recv_connect_ack(*sd))) {
PMIX_ERROR_LOG(rc);
CLOSE_THE_SOCKET(*sd);
if (PMIX_ERR_TEMP_UNAVAILABLE == rc) {
/* give it two tries */
if (!retried) {
retried = true;
goto retry;
}
}
PMIX_ERROR_LOG(rc);
return rc;
}
@ -808,7 +817,12 @@ static pmix_status_t recv_connect_ack(int sd)
/* receive the status reply */
rc = pmix_ptl_base_recv_blocking(sd, (char*)&u32, sizeof(uint32_t));
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
if (sockopt) {
/* return the socket to normal */
if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
return PMIX_ERR_UNREACH;
}
}
return rc;
}
reply = ntohl(u32);
@ -829,7 +843,6 @@ static pmix_status_t recv_connect_ack(int sd)
/* receive our index into the server's client array */
rc = pmix_ptl_base_recv_blocking(sd, (char*)&u32, sizeof(uint32_t));
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
pmix_globals.pindex = ntohl(u32);
@ -842,7 +855,6 @@ static pmix_status_t recv_connect_ack(int sd)
/* recv our nspace */
rc = pmix_ptl_base_recv_blocking(sd, (char*)&pmix_globals.myid.nspace, PMIX_MAX_NSLEN+1);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* our rank is always zero */

Просмотреть файл

@ -101,6 +101,7 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
pmix_status_t rc;
int sd;
pmix_socklen_t len;
bool retried = false;
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"[%s:%d] connect to server",
@ -164,6 +165,7 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
}
pmix_argv_free(uri);
retry:
/* establish the connection */
len = sizeof(struct sockaddr_un);
if (PMIX_SUCCESS != (rc = pmix_ptl_base_connect(&mca_ptl_usock_component.connection, len, &sd))) {
@ -181,6 +183,13 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
/* do whatever handshake is required */
if (PMIX_SUCCESS != (rc = recv_connect_ack(sd))) {
CLOSE_THE_SOCKET(sd);
if (PMIX_ERR_TEMP_UNAVAILABLE == rc) {
/* give it two tries */
if (!retried) {
retried = true;
goto retry;
}
}
return rc;
}
@ -360,7 +369,12 @@ static pmix_status_t recv_connect_ack(int sd)
/* receive the status reply */
rc = pmix_ptl_base_recv_blocking(sd, (char*)&reply, sizeof(int));
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
if (sockopt) {
/* return the socket to normal */
if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
return PMIX_ERR_UNREACH;
}
}
return rc;
}
@ -380,7 +394,6 @@ static pmix_status_t recv_connect_ack(int sd)
/* receive our index into the server's client array */
rc = pmix_ptl_base_recv_blocking(sd, (char*)&pmix_globals.pindex, sizeof(int));
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
if (sockopt) {

Просмотреть файл

@ -1012,6 +1012,7 @@ pmix_status_t pmix_server_spawn(pmix_peer_t *peer,
int32_t cnt;
pmix_status_t rc;
pmix_proc_t proc;
size_t ninfo;
pmix_output_verbose(2, pmix_globals.debug_output,
"recvd SPAWN");
@ -1031,26 +1032,36 @@ pmix_status_t pmix_server_spawn(pmix_peer_t *peer,
/* unpack the number of job-level directives */
cnt=1;
PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(cd);
return rc;
}
/* unpack the array of directives */
if (0 < cd->ninfo) {
/* always add one directive that indicates whether the requestor
* is a tool or client */
cd->ninfo = ninfo + 1;
PMIX_INFO_CREATE(cd->info, cd->ninfo);
if (NULL == cd->info) {
rc = PMIX_ERR_NOMEM;
goto cleanup;
}
cnt = cd->ninfo;
/* unpack the array of directives */
if (0 < ninfo) {
cnt = ninfo;
PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
}
/* add the directive to the end */
if (PMIX_PROC_IS_TOOL(peer)) {
PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_REQUESTOR_IS_TOOL, NULL, PMIX_BOOL);
} else {
PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_REQUESTOR_IS_CLIENT, NULL, PMIX_BOOL);
}
/* unpack the number of apps */
cnt=1;

Просмотреть файл

@ -89,10 +89,17 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer,
int32_t cnt;
pmix_cmd_t cmd;
pmix_event_chain_t *chain;
size_t ninfo;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:tool_notify_recv - processing event");
/* a zero-byte buffer indicates that this recv is being
* completed due to a lost connection */
if (PMIX_BUFFER_IS_EMPTY(buf)) {
return;
}
/* start the local notification chain */
chain = PMIX_NEW(pmix_event_chain_t);
chain->final_cbfunc = _notify_complete;
@ -126,14 +133,16 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer,
/* unpack the info that might have been provided */
cnt=1;
PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
buf, &chain->ninfo, &cnt, PMIX_SIZE);
buf, &ninfo, &cnt, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto error;
}
if (0 < chain->ninfo) {
/* we always leave space for a callback object */
chain->ninfo = ninfo + 1;
PMIX_INFO_CREATE(chain->info, chain->ninfo);
cnt = chain->ninfo;
if (0 < ninfo) {
cnt = ninfo;
PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
buf, chain->info, &cnt, PMIX_INFO);
if (PMIX_SUCCESS != rc) {
@ -141,6 +150,9 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer,
goto error;
}
}
/* now put the callback object tag in the last element */
PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] pmix:tool_notify_recv - processing event %d, calling errhandler",
pmix_globals.myid.nspace, pmix_globals.myid.rank, chain->status);

Просмотреть файл

@ -169,6 +169,8 @@ PMIX_EXPORT const char* PMIx_Error_string(pmix_status_t errnum)
return "PMIX FILE MONITOR ALERT";
case PMIX_MODEL_DECLARED:
return "PMIX MODEL DECLARED";
case PMIX_ERR_TEMP_UNAVAILABLE:
return "PMIX TEMPORARILY UNAVAILABLE";
case PMIX_SUCCESS:
return "SUCCESS";
default:

Просмотреть файл

@ -38,6 +38,7 @@
#define PMIX_ERR_FILE_OPEN_FAILURE (PMIX_INTERNAL_ERR_BASE - 34)
#define PMIX_ERR_FILE_READ_FAILURE (PMIX_INTERNAL_ERR_BASE - 35)
#define PMIX_ERR_TAKE_NEXT_OPTION (PMIX_INTERNAL_ERR_BASE - 36)
#define PMIX_ERR_TEMP_UNAVAILABLE (PMIX_INTERNAL_ERR_BASE - 37)
#define PMIX_ERROR_LOG(r) \
do { \

Просмотреть файл

@ -1560,3 +1560,94 @@ int pmix2x_resolve_nodes(opal_jobid_t jobid, char **nodelist)
return pmix2x_convert_rc(ret);
}
static void relcbfunc(void *cbdata)
{
pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata;
OBJ_RELEASE(op);
}
static void infocbfunc(pmix_status_t status,
pmix_info_t *info, size_t ninfo,
void *cbdata,
pmix_release_cbfunc_t release_fn,
void *release_cbdata)
{
pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata;
int rc;
if (NULL != release_fn) {
release_fn(release_cbdata);
}
rc = pmix2x_convert_rc(status);
if (NULL != op->qcbfunc) {
op->qcbfunc(rc, NULL, op->cbdata, relcbfunc, op);
} else {
OBJ_RELEASE(op);
}
}
int pmix2x_allocate(opal_pmix_alloc_directive_t directive,
opal_list_t *info,
opal_pmix_info_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;
}
int pmix2x_job_control(opal_list_t *targets,
opal_list_t *directives,
opal_pmix_info_cbfunc_t cbfunc, void *cbdata)
{
pmix2x_opcaddy_t *op;
size_t n;
opal_namelist_t *ptr;
opal_value_t *iptr;
pmix_status_t rc;
char *nsptr;
OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
if (0 >= opal_pmix_base.initialized) {
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
return OPAL_ERR_NOT_INITIALIZED;
}
/* create the caddy */
op = OBJ_NEW(pmix2x_opcaddy_t);
op->qcbfunc = cbfunc;
op->cbdata = cbdata;
if (NULL != targets) {
op->nprocs = opal_list_get_size(targets);
/* convert the list of procs to an array
* of pmix_proc_t */
PMIX_PROC_CREATE(op->procs, op->nprocs);
n=0;
OPAL_LIST_FOREACH(ptr, targets, opal_namelist_t) {
if (NULL == (nsptr = pmix2x_convert_jobid(ptr->name.jobid))) {
OBJ_RELEASE(op);
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
return OPAL_ERR_NOT_FOUND;
}
(void)strncpy(op->procs[n].nspace, nsptr, PMIX_MAX_NSLEN);
op->procs[n].rank = pmix2x_convert_opalrank(ptr->name.vpid);
++n;
}
}
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
if (NULL != directives && 0 < (op->ninfo = opal_list_get_size(directives))) {
PMIX_INFO_CREATE(op->info, op->ninfo);
n=0;
OPAL_LIST_FOREACH(iptr, directives, opal_value_t) {
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
pmix2x_value_load(&op->info[n].value, iptr);
++n;
}
}
rc = PMIx_Job_control_nb(op->procs,op->nprocs, op->info, op->ninfo, infocbfunc, op);
if (PMIX_SUCCESS != rc) {
OBJ_RELEASE(op);
}
return pmix2x_convert_rc(rc);
}

Просмотреть файл

@ -1240,6 +1240,7 @@ static pmix_status_t server_job_control(const pmix_proc_t *proct,
for (n=0; n < ndirs; n++) {
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &oinfo->super);
oinfo->key = strdup(directives[n].key);
if (OPAL_SUCCESS != (rc = pmix2x_value_unload(oinfo, &directives[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix2x_convert_opalrc(rc);

Просмотреть файл

@ -83,6 +83,8 @@ BEGIN_C_DECLS
#define OPAL_PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH")
#define OPAL_PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1")
#define OPAL_PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads")
#define OPAL_PMIX_REQUESTOR_IS_TOOL "pmix.req.tool" // (bool) requesting process is a tool
#define OPAL_PMIX_REQUESTOR_IS_CLIENT "pmix.req.client" // (bool) requesting process is a client process
/* attributes for the rendezvous socket */
#define OPAL_PMIX_USOCK_DISABLE "pmix.usock.disable" // (bool) disable legacy usock support
@ -249,6 +251,8 @@ BEGIN_C_DECLS
#define OPAL_PMIX_DEBUGGER_DAEMONS "pmix.debugger" // (bool) spawned app consists of debugger daemons
#define OPAL_PMIX_COSPAWN_APP "pmix.cospawn" // (bool) designated app is to be spawned as a disconnected
// job - i.e., not part of the "comm_world" of the job
#define OPAL_PMIX_SET_SESSION_CWD "pmix.ssncwd" // (bool) set the application's current working directory to
// the session working directory assigned by the RM
/* query attributes */
#define OPAL_PMIX_QUERY_NAMESPACES "pmix.qry.ns" // (char*) request a comma-delimited list of active nspaces
@ -336,6 +340,8 @@ BEGIN_C_DECLS
#define OPAL_PMIX_JOB_CTRL_PROVISION "pmix.jctrl.pvn" // (char*) regex identifying nodes that are to be provisioned
#define OPAL_PMIX_JOB_CTRL_PROVISION_IMAGE "pmix.jctrl.pvnimg" // (char*) name of the image that is to be provisioned
#define OPAL_PMIX_JOB_CTRL_PREEMPTIBLE "pmix.jctrl.preempt" // (bool) job can be pre-empted
#define OPAL_PMIX_JOB_CTRL_TERMINATE "pmix.jctrl.term" // (bool) politely terminate the specified procs
/* monitoring attributes */
#define OPAL_PMIX_MONITOR_HEARTBEAT "pmix.monitor.mbeat" // (void) register to have the server monitor the requestor for heartbeats

Просмотреть файл

@ -50,7 +50,6 @@ static int finalize(void);
static void init_complete(int fd, short args, void *cbdata);
static void vm_ready(int fd, short args, void *cbata);
static void check_complete(int fd, short args, void *cbdata);
static void cleanup_job(int fd, short args, void *cbdata);
/******************
* DVM module - used when mpirun is persistent
@ -90,7 +89,6 @@ static orte_job_state_t launch_states[] = {
ORTE_JOB_STATE_REGISTERED,
/* termination states */
ORTE_JOB_STATE_TERMINATED,
ORTE_JOB_STATE_NOTIFY_COMPLETED,
ORTE_JOB_STATE_ALL_JOBS_COMPLETE
};
static orte_state_cbfunc_t launch_callbacks[] = {
@ -109,7 +107,6 @@ static orte_state_cbfunc_t launch_callbacks[] = {
orte_plm_base_post_launch,
orte_plm_base_registered,
check_complete,
cleanup_job,
orte_quit
};
@ -504,24 +501,11 @@ static void check_complete(int fd, short args, void *cbdata)
OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
"%s state:dvm:check_job_completed state is terminated - activating notify",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED);
/* mark the job as notified */
jdata->state = ORTE_JOB_STATE_NOTIFIED;
}
OBJ_RELEASE(caddy);
}
static void cleanup_job(int sd, short args, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata;
ORTE_ACQUIRE_OBJECT(caddy);
jdata = caddy->jdata;
/* remove this object from the job array */
opal_pmix.notify_event(OPAL_ERR_JOB_TERMINATED, NULL,
OPAL_PMIX_RANGE_GLOBAL, NULL,
NULL, NULL);
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
}
OBJ_RELEASE(caddy);
}

Просмотреть файл

@ -1657,7 +1657,7 @@ static int create_app(int argc, char* argv[],
orte_set_attribute(&app->attributes, ORTE_APP_SSNDIR_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
/* no harm in setting this attribute twice as the function will simply ignore it */
orte_set_attribute(&app->attributes, ORTE_APP_SSNDIR_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
orte_set_attribute(&app->attributes, ORTE_APP_USER_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
}
}
if (NULL != orte_cmd_options.preload_files) {

Просмотреть файл

@ -237,6 +237,9 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
} else if (0 == strcmp(info->key, OPAL_PMIX_NON_PMI)) {
orte_set_attribute(&jdata->attributes, ORTE_JOB_NON_ORTE_JOB,
ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
} else if (0 == strcmp(info->key, OPAL_PMIX_REQUESTOR_IS_TOOL)) {
orte_set_attribute(&jdata->attributes, ORTE_JOB_DVM_JOB,
ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
} else if (0 == strcmp(info->key, OPAL_PMIX_STDIN_TGT)) {
if (0 == strcmp(info->data.string, "all")) {
jdata->stdin_target = ORTE_VPID_WILDCARD;
@ -341,7 +344,7 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
}
}
/* indicate that we are to notify the requestor when we hear back */
/* indicate the requestor so bookmarks can be correctly set */
orte_set_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, ORTE_ATTR_GLOBAL,
requestor, OPAL_NAME);

Просмотреть файл

@ -622,7 +622,6 @@ static void _query(int sd, short args, void *cbdata)
}
} else {
opal_output(0, "NONLOCAL");
/* if they want it for remote procs, see who is hosting them
* and ask directly for the info - if rank=wildcard, then
* we need to xcast the request and collect the results */
@ -894,6 +893,9 @@ int pmix_server_job_ctrl_fn(const opal_process_name_t *requestor,
orte_proc_t *proc;
opal_pointer_array_t parray, *ptrarray;
opal_namelist_t *nm;
opal_buffer_t *cmd;
orte_daemon_cmd_flag_t cmmnd = ORTE_DAEMON_HALT_VM_CMD;
orte_grpcomm_signature_t *sig;
opal_output_verbose(2, orte_pmix_server_globals.output,
"%s job control request from %s",
@ -905,10 +907,9 @@ int pmix_server_job_ctrl_fn(const opal_process_name_t *requestor,
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
continue;
}
if (0 == strcmp(val->key, OPAL_PMIX_JOB_CTRL_KILL)) {
/* convert the list of targets to a pointer array */
if (NULL == targets) {
if (0 == opal_list_get_size(targets)) {
ptrarray = NULL;
} else {
OBJ_CONSTRUCT(&parray, opal_pointer_array_t);
@ -936,6 +937,27 @@ int pmix_server_job_ctrl_fn(const opal_process_name_t *requestor,
OBJ_DESTRUCT(&parray);
}
continue;
} else if (0 == strcmp(val->key, OPAL_PMIX_JOB_CTRL_TERMINATE)) {
if (0 == opal_list_get_size(targets)) {
/* terminate the daemons and all running jobs */
cmd = OBJ_NEW(opal_buffer_t);
/* pack the command */
if (ORTE_SUCCESS != (rc = opal_dss.pack(cmd, &cmmnd, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cmd);
return rc;
}
/* goes to all daemons */
sig = OBJ_NEW(orte_grpcomm_signature_t);
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig->signature[0].vpid = ORTE_VPID_WILDCARD;
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, cmd))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(cmd);
OBJ_RELEASE(sig);
}
}
}