diff --git a/opal/mca/pmix/pmix1xx/pmix/VERSION b/opal/mca/pmix/pmix1xx/pmix/VERSION index 4b740bca85..3ef5324908 100644 --- a/opal/mca/pmix/pmix1xx/pmix/VERSION +++ b/opal/mca/pmix/pmix1xx/pmix/VERSION @@ -30,7 +30,7 @@ greek=a1 # command, or with the date (if "git describe" fails) in the form of # "date". -repo_rev=git9208efc +repo_rev=gita1cad92 # If tarball_version is not empty, it is used as the version string in # the tarball filename, regardless of all other versions listed in diff --git a/opal/mca/pmix/pmix1xx/pmix/config/pmix.m4 b/opal/mca/pmix/pmix1xx/pmix/config/pmix.m4 index 6d74702980..b415b71870 100644 --- a/opal/mca/pmix/pmix1xx/pmix/config/pmix.m4 +++ b/opal/mca/pmix/pmix1xx/pmix/config/pmix.m4 @@ -18,6 +18,8 @@ dnl reserved. dnl Copyright (c) 2009-2011 Oak Ridge National Labs. All rights reserved. dnl Copyright (c) 2011-2013 NVIDIA Corporation. All rights reserved. dnl Copyright (c) 2013-2015 Intel, Inc. All rights reserved +dnl Copyright (c) 2015 Research Organization for Information Science +dnl and Technology (RIST). All rights reserved. dnl dnl $COPYRIGHT$ dnl @@ -40,7 +42,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[ # Get pmix's absolute top builddir (which may not be the same as # the real $top_builddir) PMIX_startdir=`pwd` - if test x"pmix_config_prefix" != "x" -a ! -d "pmix_config_prefix"; then + if test x"pmix_config_prefix" != "x" && test ! -d "pmix_config_prefix"; then mkdir -p "pmix_config_prefix" fi if test x"pmix_config_prefix" != "x"; then @@ -83,7 +85,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[ # Debug mode? AC_MSG_CHECKING([if want pmix maintainer support]) pmix_debug= - AS_IF([test "$pmix_debug" = "" -a "$enable_debug" = "yes"], + AS_IF([test "$pmix_debug" = "" && test "$enable_debug" = "yes"], [pmix_debug=1 pmix_debug_msg="enabled"]) AS_IF([test "$pmix_debug" = ""], @@ -138,12 +140,6 @@ AC_DEFUN([PMIX_SETUP_CORE],[ ############################################################################ pmix_show_title "Compiler and preprocessor tests" - ################################## - # C compiler characteristics - ################################## - # Does the compiler support "ident"-like constructs? - PMIX_CHECK_IDENT([CC], [CFLAGS], [c], [C]) - # # Check for some types # @@ -446,7 +442,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[ elif test $ac_cv_sizeof_void_p -eq $ac_cv_sizeof_long ; then pmix_ptrdiff_t="long" pmix_ptrdiff_size=$ac_cv_sizeof_long - elif test $ac_cv_type_long_long = yes -a $ac_cv_sizeof_void_p -eq $ac_cv_sizeof_long_long ; then + elif test $ac_cv_type_long_long = yes && test $ac_cv_sizeof_void_p -eq $ac_cv_sizeof_long_long ; then pmix_ptrdiff_t="long long" pmix_ptrdiff_size=$ac_cv_sizeof_long_long #else @@ -493,7 +489,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[ #endif ])], [ompi_cv_htonl_define=yes], [ompi_cv_htonl_define=no])]) AC_CHECK_FUNC([htonl], [ompi_have_htonl=yes], [ompi_have_htonl=no]) - AS_IF([test "$ompi_cv_htonl_define" = "yes" -o "$ompi_have_htonl" = "yes"], + AS_IF([test "$ompi_cv_htonl_define" = "yes" || test "$ompi_have_htonl" = "yes"], [AC_DEFINE_UNQUOTED([HAVE_UNIX_BYTESWAP], [1], [whether unix byteswap routines -- htonl, htons, nothl, ntohs -- are available])]) @@ -652,7 +648,7 @@ else WANT_PICKY_COMPILER=0 fi #################### Early development override #################### -if test "$WANT_PICKY_COMPILER" = "0" -a -z "$enable_picky" -a "$PMIX_DEVEL" = "1"; then +if test "$WANT_PICKY_COMPILER" = "0" && test -z "$enable_picky" && test "$PMIX_DEVEL" = "1"; then WANT_PICKY_COMPILER=1 echo "--> developer override: enable picky compiler by default" fi @@ -674,7 +670,7 @@ else WANT_DEBUG=0 fi #################### Early development override #################### -if test "$WANT_DEBUG" = "0" -a -z "$enable_debug" -a "$PMIX_DEVEL" = "1"; then +if test "$WANT_DEBUG" = "0" && test -z "$enable_debug" && test "$PMIX_DEVEL" = "1"; then WANT_DEBUG=1 echo "--> developer override: enable debugging code by default" fi @@ -716,7 +712,7 @@ AC_MSG_CHECKING([if want ident string]) AC_ARG_WITH([ident-string], [AC_HELP_STRING([--with-ident-string=STRING], [Embed an ident string into PMIx object files])]) -if test "$with_ident_string" = "" -o "$with_ident_string" = "no"; then +if test "$with_ident_string" = "" || test "$with_ident_string" = "no"; then with_ident_string="%VERSION%" fi # This is complicated, because $PMIX_VERSION may have spaces in it. diff --git a/opal/mca/pmix/pmix1xx/pmix/config/pmix_check_ident.m4 b/opal/mca/pmix/pmix1xx/pmix/config/pmix_check_ident.m4 index 0e2aa4f2cb..de2fa573bc 100644 --- a/opal/mca/pmix/pmix1xx/pmix/config/pmix_check_ident.m4 +++ b/opal/mca/pmix/pmix1xx/pmix/config/pmix_check_ident.m4 @@ -80,18 +80,23 @@ EOF # resulting object file. If the ident is found in "strings" or # the grep succeeds, rule that we have this flavor of ident. - PMIX_LOG_COMMAND([$pmix_compiler $pmix_flags -c conftest.$3 -o conftest.${OBJEXT}], - [AS_IF([test -f conftest.${OBJEXT}], - [pmix_output="`strings -a conftest.${OBJEXT} | grep $pmix_ident`" - grep $pmix_ident conftest.${OBJEXT} 2>&1 1>/dev/null - pmix_status=$? - AS_IF([test "$pmix_output" != "" || test "$pmix_status" = "0"], - [$6], - [$7])], - [PMIX_LOG_MSG([the failed program was:]) - PMIX_LOG_FILE([conftest.$3]) - $7] - [$7])]) + echo "configure:__oline__: $1" >&5 + pmix_output=`$pmix_compiler $pmix_flags -c conftest.$3 -o conftest.${OBJEXT} 2>&1 1>/dev/null` + pmix_status=$? + AS_IF([test $pmix_status = 0], + [test -z "$pmix_output" + pmix_status=$?]) + PMIX_LOG_MSG([\$? = $pmix_status], 1) + AS_IF([test $pmix_status = 0 && test -f conftest.${OBJEXT}], + [pmix_output="`strings -a conftest.${OBJEXT} | grep $pmix_ident`" + grep $pmix_ident conftest.${OBJEXT} 2>&1 1>/dev/null + pmix_status=$? + AS_IF([test "$pmix_output" != "" || test "$pmix_status" = "0"], + [$6], + [$7])], + [PMIX_LOG_MSG([the failed program was:]) + PMIX_LOG_FILE([conftest.$3]) + $7]) unset pmix_compiler pmix_flags pmix_output pmix_status rm -rf conftest.* conftest${EXEEXT} diff --git a/opal/mca/pmix/pmix1xx/pmix/config/pmix_check_sasl.m4 b/opal/mca/pmix/pmix1xx/pmix/config/pmix_check_sasl.m4 index 35c39845b7..7dafb477a0 100644 --- a/opal/mca/pmix/pmix1xx/pmix/config/pmix_check_sasl.m4 +++ b/opal/mca/pmix/pmix1xx/pmix/config/pmix_check_sasl.m4 @@ -2,6 +2,8 @@ # # Copyright (c) 2015 Intel, Inc. All rights reserved # Copyright (c) 2015 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2015 Research Organization for Information Science +# and Technology (RIST). All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -27,7 +29,7 @@ AC_DEFUN([PMIX_SASL_CONFIG],[ pmix_sasl_support=0 if test "$with_sasl" != "no"; then AC_MSG_CHECKING([for sasl in]) - if test ! -z "$with_sasl" -a "$with_sasl" != "yes"; then + if test ! -z "$with_sasl" && test "$with_sasl" != "yes"; then pmix_sasl_dir=$with_sasl/include/sasl if test -d $with_sasl/lib; then pmix_sasl_libdir=$with_sasl/lib diff --git a/opal/mca/pmix/pmix1xx/pmix/config/pmix_setup_cc.m4 b/opal/mca/pmix/pmix1xx/pmix/config/pmix_setup_cc.m4 index 64ff1b01b7..5e58631099 100644 --- a/opal/mca/pmix/pmix1xx/pmix/config/pmix_setup_cc.m4 +++ b/opal/mca/pmix/pmix1xx/pmix/config/pmix_setup_cc.m4 @@ -297,6 +297,13 @@ AC_DEFUN([PMIX_SETUP_CC],[ PMIX_ENSURE_CONTAINS_OPTFLAGS(["$CFLAGS"]) AC_MSG_RESULT([$co_result]) CFLAGS="$co_result" + + ################################## + # C compiler characteristics + ################################## + # Does the compiler support "ident"-like constructs? + PMIX_CHECK_IDENT([CC], [CFLAGS], [c], [C]) + ]) diff --git a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c index c817b0ce09..e3636f4fdf 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c @@ -149,12 +149,34 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, cb->active = false; } -static int connect_to_server(struct sockaddr_un *address) +static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, + pmix_buffer_t *buf, void *cbdata) +{ + pmix_status_t rc; + char *nspace; + int32_t cnt = 1; + bool *active = (bool*)cbdata; + + /* unpack the nspace - we don't really need it, but have to + * unpack it to maintain sequence */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &nspace, &cnt, PMIX_STRING))) { + PMIX_ERROR_LOG(rc); + return; + } + /* decode it */ + pmix_client_process_nspace_blob(pmix_globals.myid.nspace, buf); + *active = false; +} + +static int connect_to_server(struct sockaddr_un *address, bool *active) { int rc; + pmix_cmd_t cmd = PMIX_REQ_CMD; + pmix_buffer_t *req; rc = usock_connect((struct sockaddr *)address); if( rc < 0 ){ + PMIX_ERROR_LOG(rc); return rc; } pmix_client_globals.myserver.sd = rc; @@ -174,6 +196,18 @@ static int connect_to_server(struct sockaddr_un *address) EV_WRITE|EV_PERSIST, pmix_usock_send_handler, &pmix_client_globals.myserver); pmix_client_globals.myserver.send_ev_active = false; + + /* send a request for our job info - we do this as a non-blocking + * transaction because some systems cannot handle very large + * blocking operations and error out if we try them. */ + req = PMIX_NEW(pmix_buffer_t); + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(req, &cmd, 1, PMIX_CMD))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(req); + return rc; + } + PMIX_ACTIVATE_SEND_RECV(&pmix_client_globals.myserver, req, job_data, active); + return PMIX_SUCCESS; } @@ -188,6 +222,7 @@ int PMIx_Init(pmix_proc_t *proc) int rc, debug_level; struct sockaddr_un address; pmix_nspace_t *nsptr; + bool active; if (NULL == proc) { return PMIX_ERR_BAD_PARAM; @@ -286,9 +321,11 @@ int PMIx_Init(pmix_proc_t *proc) } /* connect to the server - returns job info if successful */ - if (PMIX_SUCCESS != (rc = connect_to_server(&address))){ + active = true; + if (PMIX_SUCCESS != (rc = connect_to_server(&address, &active))){ return rc; } + PMIX_WAIT_FOR_COMPLETION(active); return PMIX_SUCCESS; } @@ -724,124 +761,42 @@ static int send_connect_ack(int sd) * then we initiate authentication method */ static int recv_connect_ack(int sd) { - pmix_usock_hdr_t hdr; - int32_t reply; + int reply; int rc; - int32_t cnt; - char *msg = NULL; - pmix_buffer_t buf; - char *nspace; pmix_output_verbose(2, pmix_globals.debug_output, "pmix: RECV CONNECT ACK FROM SERVER"); - /* receive the header */ - rc = pmix_usock_recv_blocking(sd, (char*)&hdr, sizeof(pmix_usock_hdr_t)); + + /* receive the status reply */ + rc = pmix_usock_recv_blocking(sd, (char*)&reply, sizeof(int)); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); return rc; } - /* get whatever else was sent */ - msg = (char*)malloc(hdr.nbytes); - if (PMIX_SUCCESS != (rc = pmix_usock_recv_blocking(sd, msg, hdr.nbytes))) { - free(msg); - return rc; - } - /* load the buffer for unpacking */ - PMIX_CONSTRUCT(&buf, pmix_buffer_t); - PMIX_LOAD_BUFFER(&buf, msg, hdr.nbytes); - - /* unpack the status */ - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf, &reply, &cnt, PMIX_INT))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } /* see if they want us to do the handshake */ if (PMIX_ERR_READY_FOR_HANDSHAKE == reply) { - free(msg); - msg = NULL; if (NULL == pmix_sec.client_handshake) { - rc = PMIX_ERR_HANDSHAKE_FAILED; - goto cleanup; + return PMIX_ERR_HANDSHAKE_FAILED; } - if (PMIX_SUCCESS != pmix_sec.client_handshake(sd)) { - goto cleanup; + if (PMIX_SUCCESS != (rc = pmix_sec.client_handshake(sd))) { + return rc; } - /* if we successfully did the handshake, there will be a follow-on - * message that contains any job info */ - rc = pmix_usock_recv_blocking(sd, (char*)&hdr, sizeof(pmix_usock_hdr_t)); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - /* get whatever else was sent */ - msg = (char*)malloc(hdr.nbytes); - if (PMIX_SUCCESS != (rc = pmix_usock_recv_blocking(sd, msg, hdr.nbytes))) { - goto cleanup; - } - PMIX_DESTRUCT(&buf); - PMIX_CONSTRUCT(&buf, pmix_buffer_t); - PMIX_LOAD_BUFFER(&buf, msg, hdr.nbytes); - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf, &reply, &cnt, PMIX_INT))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - } - - /* see if we succeeded */ - if (PMIX_SUCCESS != reply) { - rc = reply; - goto cleanup; + } else if (PMIX_SUCCESS != reply) { + return reply; } pmix_output_verbose(2, pmix_globals.debug_output, - "pmix: RECV CONNECT CONFIRMATION AND INITIAL DATA FROM SERVER OF %d BYTES", - (int)hdr.nbytes); + "pmix: RECV CONNECT CONFIRMATION"); - /* unpack our index into the server's client array */ - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf, &pmix_globals.pindex, &cnt, PMIX_INT))) { - if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - /* this isn't an error - the host must provide us - * the localid */ - rc = PMIX_SUCCESS; - goto cleanup; - } + /* receive our index into the server's client array */ + rc = pmix_usock_recv_blocking(sd, (char*)&pmix_globals.pindex, sizeof(int)); + if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); - goto cleanup; + return rc; } - /* unpack the nspace - we don't need it, but need - * to step over it */ - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf, &nspace, &cnt, PMIX_STRING))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - /* do a sanity check */ - if (NULL == nspace || 0 != strcmp(nspace, pmix_globals.myid.nspace)) { - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - if (NULL != nspace) { - free(nspace); - } - goto cleanup; - } - if (NULL != nspace) { - free(nspace); - } - - /* unpack any info structs provided */ - pmix_client_process_nspace_blob(pmix_globals.myid.nspace, &buf); - - cleanup: - buf.base_ptr = NULL; // protect data region from double-free - PMIX_DESTRUCT(&buf); - if (NULL != msg) { - free(msg); - } - return rc; + return PMIX_SUCCESS; } void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr) @@ -857,6 +812,9 @@ void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr) pmix_nrec_t *nrec, *nr2; char **procs; + pmix_output_verbose(2, pmix_globals.debug_output, + "pmix: PROCESSING BLOB FOR NSPACE %s", nspace); + /* cycle across our known nspaces */ nsptr = NULL; PMIX_LIST_FOREACH(nsptr2, &pmix_globals.nspaces, pmix_nspace_t) { @@ -1071,5 +1029,6 @@ static int usock_connect(struct sockaddr *addr) pmix_globals.connected = true; pmix_usock_set_nonblocking(sd); + return sd; } diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c index da13808464..88c1070060 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c @@ -1908,6 +1908,7 @@ static int server_switchyard(pmix_peer_t *peer, uint32_t tag, pmix_cmd_t cmd; pmix_server_caddy_t *cd; pmix_proc_t proc; + pmix_buffer_t *reply; /* retrieve the cmd */ cnt = 1; @@ -1919,6 +1920,13 @@ static int server_switchyard(pmix_peer_t *peer, uint32_t tag, "recvd pmix cmd %d from %s:%d", cmd, peer->info->nptr->nspace, peer->info->rank); + if (PMIX_REQ_CMD == cmd) { + reply = PMIX_NEW(pmix_buffer_t); + pmix_bfrop.copy_payload(reply, &(peer->info->nptr->server->job_info)); + PMIX_SERVER_QUEUE_REPLY(peer, tag, reply); + return PMIX_SUCCESS; + } + if (PMIX_ABORT_CMD == cmd) { PMIX_PEER_CADDY(cd, peer, tag); if (PMIX_SUCCESS != (rc = pmix_server_abort(peer, buf, op_cbfunc, cd))) { diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_listener.c b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_listener.c index 8c4d106a7d..afd3ed6f26 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_listener.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_listener.c @@ -285,82 +285,11 @@ static void listener_cb(int incoming_sd) event_active(&pending_connection->ev, EV_WRITE, 1); } -static int send_client_response(int sd, int status, pmix_buffer_t *payload) -{ - int rc; - pmix_usock_hdr_t hdr; - pmix_buffer_t buf; - - /* pack the status */ - PMIX_CONSTRUCT(&buf, pmix_buffer_t); - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(&buf, &status, 1, PMIX_INT))) { - PMIX_ERROR_LOG(rc); - PMIX_DESTRUCT(&buf); - return rc; - } - if (NULL != payload) { - pmix_bfrop.copy_payload(&buf, payload); - } - - hdr.nbytes = buf.bytes_used; - hdr.pindex = 0; - hdr.tag = 0; // tag doesn't matter as we aren't matching to a recv - - if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)&hdr, sizeof(hdr)))) { - PMIX_ERROR_LOG(rc); - return rc; - } - - if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)buf.base_ptr, buf.bytes_used))) { - PMIX_ERROR_LOG(rc); - PMIX_DESTRUCT(&buf); - return rc; - } - PMIX_DESTRUCT(&buf); - - return PMIX_SUCCESS; -} - -/* - * Handler for accepting connections from the event library - */ -static void connection_handler(int sd, short flags, void* cbdata) -{ - pmix_pending_connection_t *pnd = (pmix_pending_connection_t*)cbdata; - pmix_peer_t *peer; - int rank; - - pmix_output_verbose(8, pmix_globals.debug_output, - "connection_handler: new connection: %d", - pnd->sd); - - /* receive identifier info from the client and authenticate it - the - * function will lookup and return the peer object if the connection - * is successfully authenticated */ - if (PMIX_SUCCESS != pmix_server_authenticate(pnd->sd, &rank, &peer, NULL)) { - CLOSE_THE_SOCKET(pnd->sd); - return; - } - pmix_usock_set_nonblocking(pnd->sd); - - /* start the events for this client */ - event_assign(&peer->recv_event, pmix_globals.evbase, pnd->sd, - EV_READ|EV_PERSIST, pmix_usock_recv_handler, peer); - event_add(&peer->recv_event, NULL); - peer->recv_ev_active = true; - event_assign(&peer->send_event, pmix_globals.evbase, pnd->sd, - EV_WRITE|EV_PERSIST, pmix_usock_send_handler, peer); - pmix_output_verbose(2, pmix_globals.debug_output, - "pmix:server client %s:%d has connected on socket %d", - peer->info->nptr->nspace, peer->info->rank, peer->sd); - PMIX_RELEASE(pnd); -} - /* Receive the peer's identification info from a newly * connected socket and verify the expected response. */ -pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer, - pmix_buffer_t **reply) +static pmix_status_t pmix_server_authenticate(int sd, int *out_rank, + pmix_peer_t **peer) { char *msg, *nspace, *version, *cred; int rc, rank; @@ -369,7 +298,6 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer pmix_rank_info_t *info; pmix_peer_t *psave = NULL; size_t csize; - pmix_buffer_t *bptr; bool found; pmix_proc_t proc; @@ -379,9 +307,6 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer /* ensure all is zero'd */ memset(&hdr, 0, sizeof(pmix_usock_hdr_t)); *peer = NULL; - if (NULL != reply) { - *reply = NULL; - } /* get the header */ if (PMIX_SUCCESS != (rc = pmix_usock_recv_blocking(sd, (char*)&hdr, sizeof(pmix_usock_hdr_t)))) { @@ -455,7 +380,9 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer if (NULL == nptr) { /* we don't know this namespace, reject it */ free(msg); - return PMIX_ERR_NOT_FOUND; + /* send an error reply to the client */ + rc = PMIX_ERR_NOT_FOUND; + goto error; } /* see if we have this peer in our list */ @@ -470,7 +397,9 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer if (!found) { /* rank unknown, reject it */ free(msg); - return PMIX_ERR_NOT_FOUND; + /* send an error reply to the client */ + rc = PMIX_ERR_NOT_FOUND; + goto error; } *out_rank = rank; /* a peer can connect on multiple sockets since it can fork/exec @@ -484,6 +413,7 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer if (0 > (psave->index = pmix_pointer_array_add(&pmix_server_globals.clients, psave))) { free(msg); PMIX_RELEASE(psave); + /* probably cannot send an error reply if we are out of memory */ return PMIX_ERR_OUT_OF_RESOURCE; } @@ -497,7 +427,8 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer free(msg); pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL); PMIX_RELEASE(psave); - return rc; + /* send an error reply to the client */ + goto error; } pmix_output_verbose(2, pmix_globals.debug_output, "client credential validated"); @@ -509,40 +440,38 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer if (NULL != pmix_sec.server_handshake) { pmix_output_verbose(2, pmix_globals.debug_output, "connect-ack executing handshake"); - if (PMIX_SUCCESS != send_client_response(sd, PMIX_ERR_READY_FOR_HANDSHAKE, NULL)) { - pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL); - PMIX_RELEASE(psave); - return PMIX_ERR_UNREACH; - } - if (PMIX_SUCCESS != pmix_sec.server_handshake(psave)) { - pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL); - PMIX_RELEASE(psave); - return PMIX_ERR_UNREACH; - } - pmix_output_verbose(2, pmix_globals.debug_output, - "connect-ack handshake complete"); - } - - /* create reply */ - bptr = PMIX_NEW(pmix_buffer_t); - /* send this process its index */ - pmix_bfrop.pack(bptr, (void*)&psave->index, 1, PMIX_INT); - /* copy any data across */ - pmix_bfrop.copy_payload(bptr, &nptr->server->job_info); - - if (NULL == reply) { - /* let the client know we are ready to go */ - pmix_output_verbose(2, pmix_globals.debug_output, - "connect-ack sending client response with %d bytes", - (NULL == bptr) ? 0 : (int)bptr->bytes_used); - if (PMIX_SUCCESS != (rc = send_client_response(sd, PMIX_SUCCESS, bptr))) { + rc = PMIX_ERR_READY_FOR_HANDSHAKE; + if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)&rc, sizeof(int)))) { + PMIX_ERROR_LOG(rc); pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL); PMIX_RELEASE(psave); return rc; } - PMIX_RELEASE(bptr); + if (PMIX_SUCCESS != (rc = pmix_sec.server_handshake(psave))) { + PMIX_ERROR_LOG(rc); + pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL); + PMIX_RELEASE(psave); + return rc; + } + pmix_output_verbose(2, pmix_globals.debug_output, + "connect-ack handshake complete"); } else { - *reply = bptr; + /* send them success */ + rc = PMIX_SUCCESS; + if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)&rc, sizeof(int)))) { + PMIX_ERROR_LOG(rc); + pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL); + PMIX_RELEASE(psave); + return rc; + } + } + + /* send the client's array index */ + if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)&psave->index, sizeof(int)))) { + PMIX_ERROR_LOG(rc); + pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL); + PMIX_RELEASE(psave); + return rc; } pmix_output_verbose(2, pmix_globals.debug_output, @@ -559,4 +488,47 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer } } return rc; + + error: + /* send an error reply to the client */ + if (PMIX_SUCCESS != pmix_usock_send_blocking(sd, (char*)&rc, sizeof(int))) { + PMIX_ERROR_LOG(rc); + } + return rc; } + +/* + * Handler for accepting connections from the event library + */ +static void connection_handler(int sd, short flags, void* cbdata) +{ + pmix_pending_connection_t *pnd = (pmix_pending_connection_t*)cbdata; + pmix_peer_t *peer; + int rank; + + pmix_output_verbose(8, pmix_globals.debug_output, + "connection_handler: new connection: %d", + pnd->sd); + + /* receive identifier info from the client and authenticate it - the + * function will lookup and return the peer object if the connection + * is successfully authenticated */ + if (PMIX_SUCCESS != pmix_server_authenticate(pnd->sd, &rank, &peer)) { + CLOSE_THE_SOCKET(pnd->sd); + return; + } + pmix_usock_set_nonblocking(pnd->sd); + + /* start the events for this client */ + event_assign(&peer->recv_event, pmix_globals.evbase, pnd->sd, + EV_READ|EV_PERSIST, pmix_usock_recv_handler, peer); + event_add(&peer->recv_event, NULL); + peer->recv_ev_active = true; + event_assign(&peer->send_event, pmix_globals.evbase, pnd->sd, + EV_WRITE|EV_PERSIST, pmix_usock_send_handler, peer); + pmix_output_verbose(2, pmix_globals.debug_output, + "pmix:server client %s:%d has connected on socket %d", + peer->info->nptr->nspace, peer->info->rank, peer->sd); + PMIX_RELEASE(pnd); +} + diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h index 87f3205b6c..b4001fbab2 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h @@ -188,10 +188,6 @@ void pmix_pending_nspace_fix(pmix_nspace_t *nptr); pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, pmix_dmdx_local_t *lcd); -pmix_status_t pmix_server_authenticate(int sd, int *out_rank, - pmix_peer_t **peer, - pmix_buffer_t **reply); - pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf, pmix_op_cbfunc_t cbfunc, void *cbdata); diff --git a/opal/mca/pmix/pmix1xx/pmix/src/usock/usock.h b/opal/mca/pmix/pmix1xx/pmix/src/usock/usock.h index 66574f6320..8ed64f5efe 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/usock/usock.h +++ b/opal/mca/pmix/pmix1xx/pmix/src/usock/usock.h @@ -62,6 +62,7 @@ /* define some commands */ typedef enum { + PMIX_REQ_CMD, PMIX_ABORT_CMD, PMIX_COMMIT_CMD, PMIX_FENCENB_CMD,