diff --git a/opal/mca/pmix/ext2x/configure.m4 b/opal/mca/pmix/ext2x/configure.m4 index 171f735f3b..1e27bace65 100644 --- a/opal/mca/pmix/ext2x/configure.m4 +++ b/opal/mca/pmix/ext2x/configure.m4 @@ -48,7 +48,16 @@ AC_DEFUN([MCA_opal_pmix_ext2x_CONFIG],[ [$1 # need to set the wrapper flags for static builds pmix_ext2x_WRAPPER_EXTRA_LDFLAGS=$opal_external_pmix_LDFLAGS - pmix_ext2x_WRAPPER_EXTRA_LIBS=$opal_external_pmix_LIBS], + pmix_ext2x_WRAPPER_EXTRA_LIBS=$opal_external_pmix_LIBS + # and the flags for prun + OPAL_PMIX_CPPFLAGS="-I$opal_external_pmix_CPPFLAGS" + AC_SUBST(OPAL_PMIX_CPPFLAGS) + OPAL_PMIX_LDFLAGS=$opal_external_pmix_LDFLAGS + AC_SUBST(OPAL_PMIX_LDFLAGS) + OPAL_PMIX_LDADD= + AC_SUBST(OPAL_PMIX_LDADD) + OPAL_PMIX_LIBS=-lpmix + AC_SUBST(OPAL_PMIX_LIBS)], [$2])], [$2]) diff --git a/opal/mca/pmix/pmix2x/configure.m4 b/opal/mca/pmix/pmix2x/configure.m4 index 372a2db43f..8a066de2be 100644 --- a/opal/mca/pmix/pmix2x/configure.m4 +++ b/opal/mca/pmix/pmix2x/configure.m4 @@ -86,7 +86,16 @@ AC_DEFUN([MCA_opal_pmix_pmix2x_CONFIG],[ opal_pmix_pmix2x_LDFLAGS= opal_pmix_pmix2x_LIBS="$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix/src/libpmix.la" opal_pmix_pmix2x_CPPFLAGS="-I$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix/include -I$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix -I$OPAL_TOP_SRCDIR/$opal_pmix_pmix2x_basedir/pmix/include -I$OPAL_TOP_SRCDIR/$opal_pmix_pmix2x_basedir/pmix" - opal_pmix_pmix2x_DEPENDENCIES="$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix/src/libpmix.la"]) + opal_pmix_pmix2x_DEPENDENCIES="$OPAL_TOP_BUILDDIR/$opal_pmix_pmix2x_basedir/pmix/src/libpmix.la" + # and the flags for prun + OPAL_PMIX_CPPFLAGS="-I$opal_pmix_pmix2x_CPPFLAGS" + AC_SUBST(OPAL_PMIX_CPPFLAGS) + OPAL_PMIX_LDADD=$opal_pmix_pmix2x_LIBS + AC_SUBST(OPAL_PMIX_LDADD) + OPAL_PMIX_LIBS= + AC_SUBST(OPAL_PMIX_LIBS) + OPAL_PMIX_LDFLAGS= + AC_SUBST(OPAL_PMIX_LDFLAGS)]) AC_SUBST([opal_pmix_pmix2x_LIBS]) AC_SUBST([opal_pmix_pmix2x_CPPFLAGS]) diff --git a/opal/mca/pmix/pmix2x/pmix/VERSION b/opal/mca/pmix/pmix2x/pmix/VERSION index 644854e913..dbbc6e7c2e 100644 --- a/opal/mca/pmix/pmix2x/pmix/VERSION +++ b/opal/mca/pmix/pmix2x/pmix/VERSION @@ -30,7 +30,7 @@ greek= # command, or with the date (if "git describe" fails) in the form of # "date". -repo_rev=gitaffab34 +repo_rev=git8684147 # If tarball_version is not empty, it is used as the version string in # the tarball filename, regardless of all other versions listed in @@ -44,7 +44,7 @@ tarball_version= # The date when this release was created -date="Sep 06, 2017" +date="Sep 11, 2017" # The shared library version of each of PMIx's public libraries. # These versions are maintained in accordance with the "Library diff --git a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h index 302ea66c21..b170ec2201 100644 --- a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h +++ b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h @@ -153,6 +153,8 @@ typedef uint32_t pmix_rank_t; #define PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH") #define PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1") #define PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads") +#define PMIX_REQUESTOR_IS_TOOL "pmix.req.tool" // (bool) requesting process is a tool +#define PMIX_REQUESTOR_IS_CLIENT "pmix.req.client" // (bool) requesting process is a client process /* attributes for the USOCK rendezvous socket */ @@ -329,7 +331,9 @@ typedef uint32_t pmix_rank_t; #define PMIX_FWD_STDERR "pmix.fwd.stderr" // (bool) forward stderr from spawned procs to me #define PMIX_DEBUGGER_DAEMONS "pmix.debugger" // (bool) spawned app consists of debugger daemons #define PMIX_COSPAWN_APP "pmix.cospawn" // (bool) designated app is to be spawned as a disconnected - // job - i.e., not part of the "comm_world" of the job + // job - i.e., not part of the "comm_world" of the job +#define PMIX_SET_SESSION_CWD "pmix.ssncwd" // (bool) set the application's current working directory to + // the session working directory assigned by the RM /* query attributes */ #define PMIX_QUERY_NAMESPACES "pmix.qry.ns" // (char*) request a comma-delimited list of active nspaces diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c index 0e25ebad5f..a97b7ce679 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c @@ -100,6 +100,12 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer, pmix_output_verbose(2, pmix_globals.debug_output, "pmix:client_notify_recv - processing event"); + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + return; + } + /* start the local notification chain */ chain = PMIX_NEW(pmix_event_chain_t); if (NULL == chain) { diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_connect.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_connect.c index 2bacf2e6a1..544f68fb33 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_connect.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_connect.c @@ -352,6 +352,13 @@ static void wait_cbfunc(struct pmix_peer_t *pr, goto report; } + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + ret = PMIX_ERR_UNREACH; + goto report; + } + /* unpack the returned status */ cnt = 1; PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver, diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_fence.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_fence.c index 5d9ade4616..d3e182d584 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_fence.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_fence.c @@ -262,7 +262,13 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr, PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); return; } - rc = unpack_return(buf); + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + rc = PMIX_ERR_UNREACH; + } else { + rc = unpack_return(buf); + } /* if a callback was provided, execute it */ if (NULL != cb->cbfunc.opfn) { diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c index 54b1719f52..1ed9381438 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_get.c @@ -281,6 +281,13 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr, PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); return; } + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + ret = PMIX_ERR_UNREACH; + goto done; + } + /* cache the proc id */ (void)strncpy(proc.nspace, cb->pname.nspace, PMIX_MAX_NSLEN); proc.rank = cb->pname.rank; diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_pub.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_pub.c index a0c91a56ff..3f25286541 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_pub.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_pub.c @@ -526,6 +526,12 @@ static void wait_cbfunc(struct pmix_peer_t *pr, rc = PMIX_ERR_BAD_PARAM; goto report; } + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + rc = PMIX_ERR_UNREACH; + goto report; + } /* unpack the returned status */ cnt = 1; @@ -576,6 +582,12 @@ static void wait_lookup_cbfunc(struct pmix_peer_t *pr, rc = PMIX_ERR_BAD_PARAM; goto report; } + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + rc = PMIX_ERR_UNREACH; + goto report; + } /* set the defaults */ pdata = NULL; diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_spawn.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_spawn.c index 489f3e65ff..d3bc87b15e 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_spawn.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client_spawn.c @@ -218,6 +218,13 @@ static void wait_cbfunc(struct pmix_peer_t *pr, "pmix:client recv callback activated with %d bytes", (NULL == buf) ? -1 : (int)buf->bytes_used); + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + ret = PMIX_ERR_UNREACH; + goto report; + } + /* init */ memset(nspace, 0, PMIX_MAX_NSLEN+1); diff --git a/opal/mca/pmix/pmix2x/pmix/src/common/pmix_control.c b/opal/mca/pmix/pmix2x/pmix/src/common/pmix_control.c index c14a69510f..44803eff7a 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/common/pmix_control.c +++ b/opal/mca/pmix/pmix2x/pmix/src/common/pmix_control.c @@ -37,7 +37,7 @@ static void relcbfunc(void *cbdata) pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata; pmix_output_verbose(2, pmix_globals.debug_output, - "pmix:query release callback"); + "pmix:job_ctrl release callback"); if (NULL != cd->info) { PMIX_INFO_FREE(cd->info, cd->ninfo); @@ -54,7 +54,19 @@ static void query_cbfunc(struct pmix_peer_t *peer, int cnt; pmix_output_verbose(2, pmix_globals.debug_output, - "pmix:query cback from server"); + "pmix:job_ctrl cback from server with %d bytes", + (int)buf->bytes_used); + + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + /* release the caller */ + if (NULL != cd->cbfunc) { + cd->cbfunc(PMIX_ERR_COMM_FAILURE, NULL, 0, cd->cbdata, NULL, NULL); + } + PMIX_RELEASE(cd); + return; + } results = PMIX_NEW(pmix_shift_caddy_t); @@ -88,7 +100,7 @@ static void query_cbfunc(struct pmix_peer_t *peer, complete: pmix_output_verbose(2, pmix_globals.debug_output, - "pmix:query cback from server releasing"); + "pmix:job_ctrl cback from server releasing"); /* release the caller */ if (NULL != cd->cbfunc) { cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results); @@ -108,7 +120,7 @@ PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_ PMIX_ACQUIRE_THREAD(&pmix_global_lock); pmix_output_verbose(2, pmix_globals.debug_output, - "pmix: job control called"); + "pmix: job control called with %d directives", (int)ndirs); if (pmix_globals.init_cntr <= 0) { PMIX_RELEASE_THREAD(&pmix_global_lock); diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/bfrops/bfrops_types.h b/opal/mca/pmix/pmix2x/pmix/src/mca/bfrops/bfrops_types.h index 2eb7b96cc6..b1afcbcfc1 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/bfrops/bfrops_types.h +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/bfrops/bfrops_types.h @@ -143,6 +143,10 @@ PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_buffer_t); (b)->unpack_ptr = NULL; \ } while (0) +/* Convenience macro to check for empty buffer without + * exposing the internals */ +#define PMIX_BUFFER_IS_EMPTY(b) \ + 0 == (b)->bytes_used END_C_DECLS diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_connect.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_connect.c index 18276ecb1b..0963ba2552 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_connect.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_connect.c @@ -147,7 +147,7 @@ pmix_status_t pmix_ptl_base_recv_blocking(int sd, char *data, size_t size) pmix_output_verbose(8, pmix_ptl_base_framework.framework_output, "blocking_recv received error %d:%s from remote - cycling", pmix_socket_errno, strerror(pmix_socket_errno)); - continue; + return PMIX_ERR_TEMP_UNAVAILABLE; } if (pmix_socket_errno != EINTR ) { /* If we overflow the listen backlog, it's diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c index 5152b97185..a546fd5a74 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c @@ -518,6 +518,7 @@ static pmix_status_t try_connect(int *sd) struct sockaddr_in6 *in6; size_t len; pmix_status_t rc; + bool retried = false; pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, "pmix:tcp try connect to %s", @@ -591,6 +592,7 @@ static pmix_status_t try_connect(int *sd) } free(p); + retry: /* establish the connection */ if (PMIX_SUCCESS != (rc = pmix_ptl_base_connect(&mca_ptl_tcp_component.connection, len, sd))) { PMIX_ERROR_LOG(rc); @@ -606,8 +608,15 @@ static pmix_status_t try_connect(int *sd) /* do whatever handshake is required */ if (PMIX_SUCCESS != (rc = recv_connect_ack(*sd))) { - PMIX_ERROR_LOG(rc); CLOSE_THE_SOCKET(*sd); + if (PMIX_ERR_TEMP_UNAVAILABLE == rc) { + /* give it two tries */ + if (!retried) { + retried = true; + goto retry; + } + } + PMIX_ERROR_LOG(rc); return rc; } @@ -808,7 +817,12 @@ static pmix_status_t recv_connect_ack(int sd) /* receive the status reply */ rc = pmix_ptl_base_recv_blocking(sd, (char*)&u32, sizeof(uint32_t)); if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); + if (sockopt) { + /* return the socket to normal */ + if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) { + return PMIX_ERR_UNREACH; + } + } return rc; } reply = ntohl(u32); @@ -829,7 +843,6 @@ static pmix_status_t recv_connect_ack(int sd) /* receive our index into the server's client array */ rc = pmix_ptl_base_recv_blocking(sd, (char*)&u32, sizeof(uint32_t)); if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); return rc; } pmix_globals.pindex = ntohl(u32); @@ -842,7 +855,6 @@ static pmix_status_t recv_connect_ack(int sd) /* recv our nspace */ rc = pmix_ptl_base_recv_blocking(sd, (char*)&pmix_globals.myid.nspace, PMIX_MAX_NSLEN+1); if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); return rc; } /* our rank is always zero */ diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c index 29a7aa9b85..ca76b94358 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c @@ -101,6 +101,7 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer, pmix_status_t rc; int sd; pmix_socklen_t len; + bool retried = false; pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, "[%s:%d] connect to server", @@ -164,6 +165,7 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer, } pmix_argv_free(uri); + retry: /* establish the connection */ len = sizeof(struct sockaddr_un); if (PMIX_SUCCESS != (rc = pmix_ptl_base_connect(&mca_ptl_usock_component.connection, len, &sd))) { @@ -181,6 +183,13 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer, /* do whatever handshake is required */ if (PMIX_SUCCESS != (rc = recv_connect_ack(sd))) { CLOSE_THE_SOCKET(sd); + if (PMIX_ERR_TEMP_UNAVAILABLE == rc) { + /* give it two tries */ + if (!retried) { + retried = true; + goto retry; + } + } return rc; } @@ -360,7 +369,12 @@ static pmix_status_t recv_connect_ack(int sd) /* receive the status reply */ rc = pmix_ptl_base_recv_blocking(sd, (char*)&reply, sizeof(int)); if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); + if (sockopt) { + /* return the socket to normal */ + if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) { + return PMIX_ERR_UNREACH; + } + } return rc; } @@ -380,7 +394,6 @@ static pmix_status_t recv_connect_ack(int sd) /* receive our index into the server's client array */ rc = pmix_ptl_base_recv_blocking(sd, (char*)&pmix_globals.pindex, sizeof(int)); if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); return rc; } if (sockopt) { diff --git a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c index e793859ab8..37619acff9 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c @@ -1012,6 +1012,7 @@ pmix_status_t pmix_server_spawn(pmix_peer_t *peer, int32_t cnt; pmix_status_t rc; pmix_proc_t proc; + size_t ninfo; pmix_output_verbose(2, pmix_globals.debug_output, "recvd SPAWN"); @@ -1031,26 +1032,36 @@ pmix_status_t pmix_server_spawn(pmix_peer_t *peer, /* unpack the number of job-level directives */ cnt=1; - PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE); + PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); PMIX_RELEASE(cd); return rc; } + /* always add one directive that indicates whether the requestor + * is a tool or client */ + cd->ninfo = ninfo + 1; + PMIX_INFO_CREATE(cd->info, cd->ninfo); + if (NULL == cd->info) { + rc = PMIX_ERR_NOMEM; + goto cleanup; + } + /* unpack the array of directives */ - if (0 < cd->ninfo) { - PMIX_INFO_CREATE(cd->info, cd->ninfo); - if (NULL == cd->info) { - rc = PMIX_ERR_NOMEM; - goto cleanup; - } - cnt = cd->ninfo; + if (0 < ninfo) { + cnt = ninfo; PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); goto cleanup; } } + /* add the directive to the end */ + if (PMIX_PROC_IS_TOOL(peer)) { + PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_REQUESTOR_IS_TOOL, NULL, PMIX_BOOL); + } else { + PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_REQUESTOR_IS_CLIENT, NULL, PMIX_BOOL); + } /* unpack the number of apps */ cnt=1; diff --git a/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c b/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c index 119d5443ce..5675e5fae3 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c +++ b/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c @@ -89,10 +89,17 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, int32_t cnt; pmix_cmd_t cmd; pmix_event_chain_t *chain; + size_t ninfo; pmix_output_verbose(2, pmix_globals.debug_output, "pmix:tool_notify_recv - processing event"); + /* a zero-byte buffer indicates that this recv is being + * completed due to a lost connection */ + if (PMIX_BUFFER_IS_EMPTY(buf)) { + return; + } + /* start the local notification chain */ chain = PMIX_NEW(pmix_event_chain_t); chain->final_cbfunc = _notify_complete; @@ -126,14 +133,16 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, /* unpack the info that might have been provided */ cnt=1; PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver, - buf, &chain->ninfo, &cnt, PMIX_SIZE); + buf, &ninfo, &cnt, PMIX_SIZE); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); goto error; } - if (0 < chain->ninfo) { - PMIX_INFO_CREATE(chain->info, chain->ninfo); - cnt = chain->ninfo; + /* we always leave space for a callback object */ + chain->ninfo = ninfo + 1; + PMIX_INFO_CREATE(chain->info, chain->ninfo); + if (0 < ninfo) { + cnt = ninfo; PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver, buf, chain->info, &cnt, PMIX_INFO); if (PMIX_SUCCESS != rc) { @@ -141,6 +150,9 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, goto error; } } + /* now put the callback object tag in the last element */ + PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); + pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] pmix:tool_notify_recv - processing event %d, calling errhandler", pmix_globals.myid.nspace, pmix_globals.myid.rank, chain->status); diff --git a/opal/mca/pmix/pmix2x/pmix/src/util/error.c b/opal/mca/pmix/pmix2x/pmix/src/util/error.c index 29ee09f129..ae3851da05 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/util/error.c +++ b/opal/mca/pmix/pmix2x/pmix/src/util/error.c @@ -169,6 +169,8 @@ PMIX_EXPORT const char* PMIx_Error_string(pmix_status_t errnum) return "PMIX FILE MONITOR ALERT"; case PMIX_MODEL_DECLARED: return "PMIX MODEL DECLARED"; + case PMIX_ERR_TEMP_UNAVAILABLE: + return "PMIX TEMPORARILY UNAVAILABLE"; case PMIX_SUCCESS: return "SUCCESS"; default: diff --git a/opal/mca/pmix/pmix2x/pmix/src/util/error.h b/opal/mca/pmix/pmix2x/pmix/src/util/error.h index 1883c442e4..12532b05a0 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/util/error.h +++ b/opal/mca/pmix/pmix2x/pmix/src/util/error.h @@ -38,6 +38,7 @@ #define PMIX_ERR_FILE_OPEN_FAILURE (PMIX_INTERNAL_ERR_BASE - 34) #define PMIX_ERR_FILE_READ_FAILURE (PMIX_INTERNAL_ERR_BASE - 35) #define PMIX_ERR_TAKE_NEXT_OPTION (PMIX_INTERNAL_ERR_BASE - 36) +#define PMIX_ERR_TEMP_UNAVAILABLE (PMIX_INTERNAL_ERR_BASE - 37) #define PMIX_ERROR_LOG(r) \ do { \ diff --git a/opal/mca/pmix/pmix2x/pmix2x_client.c b/opal/mca/pmix/pmix2x/pmix2x_client.c index 77dcf05642..a90e056889 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_client.c +++ b/opal/mca/pmix/pmix2x/pmix2x_client.c @@ -1560,3 +1560,94 @@ int pmix2x_resolve_nodes(opal_jobid_t jobid, char **nodelist) return pmix2x_convert_rc(ret); } + +static void relcbfunc(void *cbdata) +{ + pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; + OBJ_RELEASE(op); +} + +static void infocbfunc(pmix_status_t status, + pmix_info_t *info, size_t ninfo, + void *cbdata, + pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; + int rc; + + if (NULL != release_fn) { + release_fn(release_cbdata); + } + rc = pmix2x_convert_rc(status); + if (NULL != op->qcbfunc) { + op->qcbfunc(rc, NULL, op->cbdata, relcbfunc, op); + } else { + OBJ_RELEASE(op); + } +} + +int pmix2x_allocate(opal_pmix_alloc_directive_t directive, + opal_list_t *info, + opal_pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + return OPAL_ERR_NOT_SUPPORTED; +} + +int pmix2x_job_control(opal_list_t *targets, + opal_list_t *directives, + opal_pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + pmix2x_opcaddy_t *op; + size_t n; + opal_namelist_t *ptr; + opal_value_t *iptr; + pmix_status_t rc; + char *nsptr; + + OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock); + if (0 >= opal_pmix_base.initialized) { + OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock); + return OPAL_ERR_NOT_INITIALIZED; + } + + /* create the caddy */ + op = OBJ_NEW(pmix2x_opcaddy_t); + op->qcbfunc = cbfunc; + op->cbdata = cbdata; + if (NULL != targets) { + op->nprocs = opal_list_get_size(targets); + + /* convert the list of procs to an array + * of pmix_proc_t */ + PMIX_PROC_CREATE(op->procs, op->nprocs); + n=0; + OPAL_LIST_FOREACH(ptr, targets, opal_namelist_t) { + if (NULL == (nsptr = pmix2x_convert_jobid(ptr->name.jobid))) { + OBJ_RELEASE(op); + OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock); + return OPAL_ERR_NOT_FOUND; + } + (void)strncpy(op->procs[n].nspace, nsptr, PMIX_MAX_NSLEN); + op->procs[n].rank = pmix2x_convert_opalrank(ptr->name.vpid); + ++n; + } + } + OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock); + + if (NULL != directives && 0 < (op->ninfo = opal_list_get_size(directives))) { + PMIX_INFO_CREATE(op->info, op->ninfo); + n=0; + OPAL_LIST_FOREACH(iptr, directives, opal_value_t) { + (void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&op->info[n].value, iptr); + ++n; + } + } + + rc = PMIx_Job_control_nb(op->procs,op->nprocs, op->info, op->ninfo, infocbfunc, op); + if (PMIX_SUCCESS != rc) { + OBJ_RELEASE(op); + } + return pmix2x_convert_rc(rc); +} diff --git a/opal/mca/pmix/pmix2x/pmix2x_server_north.c b/opal/mca/pmix/pmix2x/pmix2x_server_north.c index 4801fddb74..56f0daf6ea 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_server_north.c +++ b/opal/mca/pmix/pmix2x/pmix2x_server_north.c @@ -1240,6 +1240,7 @@ static pmix_status_t server_job_control(const pmix_proc_t *proct, for (n=0; n < ndirs; n++) { oinfo = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &oinfo->super); + oinfo->key = strdup(directives[n].key); if (OPAL_SUCCESS != (rc = pmix2x_value_unload(oinfo, &directives[n].value))) { OBJ_RELEASE(opalcaddy); return pmix2x_convert_opalrc(rc); diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index 3ea98f02a3..fb18fa048d 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -83,6 +83,8 @@ BEGIN_C_DECLS #define OPAL_PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH") #define OPAL_PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1") #define OPAL_PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads") +#define OPAL_PMIX_REQUESTOR_IS_TOOL "pmix.req.tool" // (bool) requesting process is a tool +#define OPAL_PMIX_REQUESTOR_IS_CLIENT "pmix.req.client" // (bool) requesting process is a client process /* attributes for the rendezvous socket */ #define OPAL_PMIX_USOCK_DISABLE "pmix.usock.disable" // (bool) disable legacy usock support @@ -248,7 +250,9 @@ BEGIN_C_DECLS #define OPAL_PMIX_FWD_STDERR "pmix.fwd.stderr" // (bool) forward stderr from spawned procs to me #define OPAL_PMIX_DEBUGGER_DAEMONS "pmix.debugger" // (bool) spawned app consists of debugger daemons #define OPAL_PMIX_COSPAWN_APP "pmix.cospawn" // (bool) designated app is to be spawned as a disconnected - // job - i.e., not part of the "comm_world" of the job + // job - i.e., not part of the "comm_world" of the job +#define OPAL_PMIX_SET_SESSION_CWD "pmix.ssncwd" // (bool) set the application's current working directory to + // the session working directory assigned by the RM /* query attributes */ #define OPAL_PMIX_QUERY_NAMESPACES "pmix.qry.ns" // (char*) request a comma-delimited list of active nspaces @@ -336,6 +340,8 @@ BEGIN_C_DECLS #define OPAL_PMIX_JOB_CTRL_PROVISION "pmix.jctrl.pvn" // (char*) regex identifying nodes that are to be provisioned #define OPAL_PMIX_JOB_CTRL_PROVISION_IMAGE "pmix.jctrl.pvnimg" // (char*) name of the image that is to be provisioned #define OPAL_PMIX_JOB_CTRL_PREEMPTIBLE "pmix.jctrl.preempt" // (bool) job can be pre-empted +#define OPAL_PMIX_JOB_CTRL_TERMINATE "pmix.jctrl.term" // (bool) politely terminate the specified procs + /* monitoring attributes */ #define OPAL_PMIX_MONITOR_HEARTBEAT "pmix.monitor.mbeat" // (void) register to have the server monitor the requestor for heartbeats diff --git a/orte/mca/state/dvm/state_dvm.c b/orte/mca/state/dvm/state_dvm.c index fd04e9fe7f..6293d4af2f 100644 --- a/orte/mca/state/dvm/state_dvm.c +++ b/orte/mca/state/dvm/state_dvm.c @@ -50,7 +50,6 @@ static int finalize(void); static void init_complete(int fd, short args, void *cbdata); static void vm_ready(int fd, short args, void *cbata); static void check_complete(int fd, short args, void *cbdata); -static void cleanup_job(int fd, short args, void *cbdata); /****************** * DVM module - used when mpirun is persistent @@ -90,7 +89,6 @@ static orte_job_state_t launch_states[] = { ORTE_JOB_STATE_REGISTERED, /* termination states */ ORTE_JOB_STATE_TERMINATED, - ORTE_JOB_STATE_NOTIFY_COMPLETED, ORTE_JOB_STATE_ALL_JOBS_COMPLETE }; static orte_state_cbfunc_t launch_callbacks[] = { @@ -109,7 +107,6 @@ static orte_state_cbfunc_t launch_callbacks[] = { orte_plm_base_post_launch, orte_plm_base_registered, check_complete, - cleanup_job, orte_quit }; @@ -504,24 +501,11 @@ static void check_complete(int fd, short args, void *cbdata) OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output, "%s state:dvm:check_job_completed state is terminated - activating notify", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED); - /* mark the job as notified */ - jdata->state = ORTE_JOB_STATE_NOTIFIED; + opal_pmix.notify_event(OPAL_ERR_JOB_TERMINATED, NULL, + OPAL_PMIX_RANGE_GLOBAL, NULL, + NULL, NULL); + opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL); } OBJ_RELEASE(caddy); } - -static void cleanup_job(int sd, short args, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_job_t *jdata; - - ORTE_ACQUIRE_OBJECT(caddy); - jdata = caddy->jdata; - - /* remove this object from the job array */ - opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL); - - OBJ_RELEASE(caddy); -} diff --git a/orte/orted/orted_submit.c b/orte/orted/orted_submit.c index 740686da93..19f7f690bf 100644 --- a/orte/orted/orted_submit.c +++ b/orte/orted/orted_submit.c @@ -1657,7 +1657,7 @@ static int create_app(int argc, char* argv[], orte_set_attribute(&app->attributes, ORTE_APP_SSNDIR_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); /* no harm in setting this attribute twice as the function will simply ignore it */ - orte_set_attribute(&app->attributes, ORTE_APP_SSNDIR_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); + orte_set_attribute(&app->attributes, ORTE_APP_USER_CWD, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); } } if (NULL != orte_cmd_options.preload_files) { diff --git a/orte/orted/pmix/pmix_server_dyn.c b/orte/orted/pmix/pmix_server_dyn.c index 4b908b990a..16680dc19b 100644 --- a/orte/orted/pmix/pmix_server_dyn.c +++ b/orte/orted/pmix/pmix_server_dyn.c @@ -237,6 +237,9 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, } else if (0 == strcmp(info->key, OPAL_PMIX_NON_PMI)) { orte_set_attribute(&jdata->attributes, ORTE_JOB_NON_ORTE_JOB, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); + } else if (0 == strcmp(info->key, OPAL_PMIX_REQUESTOR_IS_TOOL)) { + orte_set_attribute(&jdata->attributes, ORTE_JOB_DVM_JOB, + ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); } else if (0 == strcmp(info->key, OPAL_PMIX_STDIN_TGT)) { if (0 == strcmp(info->data.string, "all")) { jdata->stdin_target = ORTE_VPID_WILDCARD; @@ -341,7 +344,7 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, } } - /* indicate that we are to notify the requestor when we hear back */ + /* indicate the requestor so bookmarks can be correctly set */ orte_set_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, ORTE_ATTR_GLOBAL, requestor, OPAL_NAME); diff --git a/orte/orted/pmix/pmix_server_gen.c b/orte/orted/pmix/pmix_server_gen.c index 2c1a09f6ab..f67d0a67b7 100644 --- a/orte/orted/pmix/pmix_server_gen.c +++ b/orte/orted/pmix/pmix_server_gen.c @@ -622,7 +622,6 @@ static void _query(int sd, short args, void *cbdata) } } else { - opal_output(0, "NONLOCAL"); /* if they want it for remote procs, see who is hosting them * and ask directly for the info - if rank=wildcard, then * we need to xcast the request and collect the results */ @@ -894,6 +893,9 @@ int pmix_server_job_ctrl_fn(const opal_process_name_t *requestor, orte_proc_t *proc; opal_pointer_array_t parray, *ptrarray; opal_namelist_t *nm; + opal_buffer_t *cmd; + orte_daemon_cmd_flag_t cmmnd = ORTE_DAEMON_HALT_VM_CMD; + orte_grpcomm_signature_t *sig; opal_output_verbose(2, orte_pmix_server_globals.output, "%s job control request from %s", @@ -905,10 +907,9 @@ int pmix_server_job_ctrl_fn(const opal_process_name_t *requestor, ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); continue; } - if (0 == strcmp(val->key, OPAL_PMIX_JOB_CTRL_KILL)) { /* convert the list of targets to a pointer array */ - if (NULL == targets) { + if (0 == opal_list_get_size(targets)) { ptrarray = NULL; } else { OBJ_CONSTRUCT(&parray, opal_pointer_array_t); @@ -936,6 +937,27 @@ int pmix_server_job_ctrl_fn(const opal_process_name_t *requestor, OBJ_DESTRUCT(&parray); } continue; + } else if (0 == strcmp(val->key, OPAL_PMIX_JOB_CTRL_TERMINATE)) { + if (0 == opal_list_get_size(targets)) { + /* terminate the daemons and all running jobs */ + cmd = OBJ_NEW(opal_buffer_t); + /* pack the command */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(cmd, &cmmnd, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(cmd); + return rc; + } + /* goes to all daemons */ + sig = OBJ_NEW(orte_grpcomm_signature_t); + sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); + sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid; + sig->signature[0].vpid = ORTE_VPID_WILDCARD; + if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, cmd))) { + ORTE_ERROR_LOG(rc); + } + OBJ_RELEASE(cmd); + OBJ_RELEASE(sig); + } } }