1
1

Merge pull request #852 from rhc54/topic/pmix

Sync to PMIx tarball - includes:
Этот коммит содержится в:
rhc54 2015-09-01 06:54:34 -07:00
родитель 6dfa996760 c1bbd7bc78
Коммит d8cb3fe705
10 изменённых файлов: 185 добавлений и 239 удалений

Просмотреть файл

@ -30,7 +30,7 @@ greek=a1
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".
repo_rev=git9208efc
repo_rev=gita1cad92
# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in

Просмотреть файл

@ -18,6 +18,8 @@ dnl reserved.
dnl Copyright (c) 2009-2011 Oak Ridge National Labs. All rights reserved.
dnl Copyright (c) 2011-2013 NVIDIA Corporation. All rights reserved.
dnl Copyright (c) 2013-2015 Intel, Inc. All rights reserved
dnl Copyright (c) 2015 Research Organization for Information Science
dnl and Technology (RIST). All rights reserved.
dnl
dnl $COPYRIGHT$
dnl
@ -40,7 +42,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[
# Get pmix's absolute top builddir (which may not be the same as
# the real $top_builddir)
PMIX_startdir=`pwd`
if test x"pmix_config_prefix" != "x" -a ! -d "pmix_config_prefix"; then
if test x"pmix_config_prefix" != "x" && test ! -d "pmix_config_prefix"; then
mkdir -p "pmix_config_prefix"
fi
if test x"pmix_config_prefix" != "x"; then
@ -83,7 +85,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[
# Debug mode?
AC_MSG_CHECKING([if want pmix maintainer support])
pmix_debug=
AS_IF([test "$pmix_debug" = "" -a "$enable_debug" = "yes"],
AS_IF([test "$pmix_debug" = "" && test "$enable_debug" = "yes"],
[pmix_debug=1
pmix_debug_msg="enabled"])
AS_IF([test "$pmix_debug" = ""],
@ -138,12 +140,6 @@ AC_DEFUN([PMIX_SETUP_CORE],[
############################################################################
pmix_show_title "Compiler and preprocessor tests"
##################################
# C compiler characteristics
##################################
# Does the compiler support "ident"-like constructs?
PMIX_CHECK_IDENT([CC], [CFLAGS], [c], [C])
#
# Check for some types
#
@ -446,7 +442,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[
elif test $ac_cv_sizeof_void_p -eq $ac_cv_sizeof_long ; then
pmix_ptrdiff_t="long"
pmix_ptrdiff_size=$ac_cv_sizeof_long
elif test $ac_cv_type_long_long = yes -a $ac_cv_sizeof_void_p -eq $ac_cv_sizeof_long_long ; then
elif test $ac_cv_type_long_long = yes && test $ac_cv_sizeof_void_p -eq $ac_cv_sizeof_long_long ; then
pmix_ptrdiff_t="long long"
pmix_ptrdiff_size=$ac_cv_sizeof_long_long
#else
@ -493,7 +489,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[
#endif
])], [ompi_cv_htonl_define=yes], [ompi_cv_htonl_define=no])])
AC_CHECK_FUNC([htonl], [ompi_have_htonl=yes], [ompi_have_htonl=no])
AS_IF([test "$ompi_cv_htonl_define" = "yes" -o "$ompi_have_htonl" = "yes"],
AS_IF([test "$ompi_cv_htonl_define" = "yes" || test "$ompi_have_htonl" = "yes"],
[AC_DEFINE_UNQUOTED([HAVE_UNIX_BYTESWAP], [1],
[whether unix byteswap routines -- htonl, htons, nothl, ntohs -- are available])])
@ -652,7 +648,7 @@ else
WANT_PICKY_COMPILER=0
fi
#################### Early development override ####################
if test "$WANT_PICKY_COMPILER" = "0" -a -z "$enable_picky" -a "$PMIX_DEVEL" = "1"; then
if test "$WANT_PICKY_COMPILER" = "0" && test -z "$enable_picky" && test "$PMIX_DEVEL" = "1"; then
WANT_PICKY_COMPILER=1
echo "--> developer override: enable picky compiler by default"
fi
@ -674,7 +670,7 @@ else
WANT_DEBUG=0
fi
#################### Early development override ####################
if test "$WANT_DEBUG" = "0" -a -z "$enable_debug" -a "$PMIX_DEVEL" = "1"; then
if test "$WANT_DEBUG" = "0" && test -z "$enable_debug" && test "$PMIX_DEVEL" = "1"; then
WANT_DEBUG=1
echo "--> developer override: enable debugging code by default"
fi
@ -716,7 +712,7 @@ AC_MSG_CHECKING([if want ident string])
AC_ARG_WITH([ident-string],
[AC_HELP_STRING([--with-ident-string=STRING],
[Embed an ident string into PMIx object files])])
if test "$with_ident_string" = "" -o "$with_ident_string" = "no"; then
if test "$with_ident_string" = "" || test "$with_ident_string" = "no"; then
with_ident_string="%VERSION%"
fi
# This is complicated, because $PMIX_VERSION may have spaces in it.

Просмотреть файл

@ -80,18 +80,23 @@ EOF
# resulting object file. If the ident is found in "strings" or
# the grep succeeds, rule that we have this flavor of ident.
PMIX_LOG_COMMAND([$pmix_compiler $pmix_flags -c conftest.$3 -o conftest.${OBJEXT}],
[AS_IF([test -f conftest.${OBJEXT}],
[pmix_output="`strings -a conftest.${OBJEXT} | grep $pmix_ident`"
grep $pmix_ident conftest.${OBJEXT} 2>&1 1>/dev/null
pmix_status=$?
AS_IF([test "$pmix_output" != "" || test "$pmix_status" = "0"],
[$6],
[$7])],
[PMIX_LOG_MSG([the failed program was:])
PMIX_LOG_FILE([conftest.$3])
$7]
[$7])])
echo "configure:__oline__: $1" >&5
pmix_output=`$pmix_compiler $pmix_flags -c conftest.$3 -o conftest.${OBJEXT} 2>&1 1>/dev/null`
pmix_status=$?
AS_IF([test $pmix_status = 0],
[test -z "$pmix_output"
pmix_status=$?])
PMIX_LOG_MSG([\$? = $pmix_status], 1)
AS_IF([test $pmix_status = 0 && test -f conftest.${OBJEXT}],
[pmix_output="`strings -a conftest.${OBJEXT} | grep $pmix_ident`"
grep $pmix_ident conftest.${OBJEXT} 2>&1 1>/dev/null
pmix_status=$?
AS_IF([test "$pmix_output" != "" || test "$pmix_status" = "0"],
[$6],
[$7])],
[PMIX_LOG_MSG([the failed program was:])
PMIX_LOG_FILE([conftest.$3])
$7])
unset pmix_compiler pmix_flags pmix_output pmix_status
rm -rf conftest.* conftest${EXEEXT}

Просмотреть файл

@ -2,6 +2,8 @@
#
# Copyright (c) 2015 Intel, Inc. All rights reserved
# Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2015 Research Organization for Information Science
# and Technology (RIST). All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -27,7 +29,7 @@ AC_DEFUN([PMIX_SASL_CONFIG],[
pmix_sasl_support=0
if test "$with_sasl" != "no"; then
AC_MSG_CHECKING([for sasl in])
if test ! -z "$with_sasl" -a "$with_sasl" != "yes"; then
if test ! -z "$with_sasl" && test "$with_sasl" != "yes"; then
pmix_sasl_dir=$with_sasl/include/sasl
if test -d $with_sasl/lib; then
pmix_sasl_libdir=$with_sasl/lib

Просмотреть файл

@ -297,6 +297,13 @@ AC_DEFUN([PMIX_SETUP_CC],[
PMIX_ENSURE_CONTAINS_OPTFLAGS(["$CFLAGS"])
AC_MSG_RESULT([$co_result])
CFLAGS="$co_result"
##################################
# C compiler characteristics
##################################
# Does the compiler support "ident"-like constructs?
PMIX_CHECK_IDENT([CC], [CFLAGS], [c], [C])
])

Просмотреть файл

@ -149,12 +149,34 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
cb->active = false;
}
static int connect_to_server(struct sockaddr_un *address)
static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
pmix_buffer_t *buf, void *cbdata)
{
pmix_status_t rc;
char *nspace;
int32_t cnt = 1;
bool *active = (bool*)cbdata;
/* unpack the nspace - we don't really need it, but have to
* unpack it to maintain sequence */
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &nspace, &cnt, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
return;
}
/* decode it */
pmix_client_process_nspace_blob(pmix_globals.myid.nspace, buf);
*active = false;
}
static int connect_to_server(struct sockaddr_un *address, bool *active)
{
int rc;
pmix_cmd_t cmd = PMIX_REQ_CMD;
pmix_buffer_t *req;
rc = usock_connect((struct sockaddr *)address);
if( rc < 0 ){
PMIX_ERROR_LOG(rc);
return rc;
}
pmix_client_globals.myserver.sd = rc;
@ -174,6 +196,18 @@ static int connect_to_server(struct sockaddr_un *address)
EV_WRITE|EV_PERSIST,
pmix_usock_send_handler, &pmix_client_globals.myserver);
pmix_client_globals.myserver.send_ev_active = false;
/* send a request for our job info - we do this as a non-blocking
* transaction because some systems cannot handle very large
* blocking operations and error out if we try them. */
req = PMIX_NEW(pmix_buffer_t);
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(req, &cmd, 1, PMIX_CMD))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(req);
return rc;
}
PMIX_ACTIVATE_SEND_RECV(&pmix_client_globals.myserver, req, job_data, active);
return PMIX_SUCCESS;
}
@ -188,6 +222,7 @@ int PMIx_Init(pmix_proc_t *proc)
int rc, debug_level;
struct sockaddr_un address;
pmix_nspace_t *nsptr;
bool active;
if (NULL == proc) {
return PMIX_ERR_BAD_PARAM;
@ -286,9 +321,11 @@ int PMIx_Init(pmix_proc_t *proc)
}
/* connect to the server - returns job info if successful */
if (PMIX_SUCCESS != (rc = connect_to_server(&address))){
active = true;
if (PMIX_SUCCESS != (rc = connect_to_server(&address, &active))){
return rc;
}
PMIX_WAIT_FOR_COMPLETION(active);
return PMIX_SUCCESS;
}
@ -724,124 +761,42 @@ static int send_connect_ack(int sd)
* then we initiate authentication method */
static int recv_connect_ack(int sd)
{
pmix_usock_hdr_t hdr;
int32_t reply;
int reply;
int rc;
int32_t cnt;
char *msg = NULL;
pmix_buffer_t buf;
char *nspace;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: RECV CONNECT ACK FROM SERVER");
/* receive the header */
rc = pmix_usock_recv_blocking(sd, (char*)&hdr, sizeof(pmix_usock_hdr_t));
/* receive the status reply */
rc = pmix_usock_recv_blocking(sd, (char*)&reply, sizeof(int));
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* get whatever else was sent */
msg = (char*)malloc(hdr.nbytes);
if (PMIX_SUCCESS != (rc = pmix_usock_recv_blocking(sd, msg, hdr.nbytes))) {
free(msg);
return rc;
}
/* load the buffer for unpacking */
PMIX_CONSTRUCT(&buf, pmix_buffer_t);
PMIX_LOAD_BUFFER(&buf, msg, hdr.nbytes);
/* unpack the status */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf, &reply, &cnt, PMIX_INT))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* see if they want us to do the handshake */
if (PMIX_ERR_READY_FOR_HANDSHAKE == reply) {
free(msg);
msg = NULL;
if (NULL == pmix_sec.client_handshake) {
rc = PMIX_ERR_HANDSHAKE_FAILED;
goto cleanup;
return PMIX_ERR_HANDSHAKE_FAILED;
}
if (PMIX_SUCCESS != pmix_sec.client_handshake(sd)) {
goto cleanup;
if (PMIX_SUCCESS != (rc = pmix_sec.client_handshake(sd))) {
return rc;
}
/* if we successfully did the handshake, there will be a follow-on
* message that contains any job info */
rc = pmix_usock_recv_blocking(sd, (char*)&hdr, sizeof(pmix_usock_hdr_t));
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* get whatever else was sent */
msg = (char*)malloc(hdr.nbytes);
if (PMIX_SUCCESS != (rc = pmix_usock_recv_blocking(sd, msg, hdr.nbytes))) {
goto cleanup;
}
PMIX_DESTRUCT(&buf);
PMIX_CONSTRUCT(&buf, pmix_buffer_t);
PMIX_LOAD_BUFFER(&buf, msg, hdr.nbytes);
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf, &reply, &cnt, PMIX_INT))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
}
/* see if we succeeded */
if (PMIX_SUCCESS != reply) {
rc = reply;
goto cleanup;
} else if (PMIX_SUCCESS != reply) {
return reply;
}
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: RECV CONNECT CONFIRMATION AND INITIAL DATA FROM SERVER OF %d BYTES",
(int)hdr.nbytes);
"pmix: RECV CONNECT CONFIRMATION");
/* unpack our index into the server's client array */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf, &pmix_globals.pindex, &cnt, PMIX_INT))) {
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
/* this isn't an error - the host must provide us
* the localid */
rc = PMIX_SUCCESS;
goto cleanup;
}
/* receive our index into the server's client array */
rc = pmix_usock_recv_blocking(sd, (char*)&pmix_globals.pindex, sizeof(int));
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto cleanup;
return rc;
}
/* unpack the nspace - we don't need it, but need
* to step over it */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf, &nspace, &cnt, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* do a sanity check */
if (NULL == nspace || 0 != strcmp(nspace, pmix_globals.myid.nspace)) {
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
if (NULL != nspace) {
free(nspace);
}
goto cleanup;
}
if (NULL != nspace) {
free(nspace);
}
/* unpack any info structs provided */
pmix_client_process_nspace_blob(pmix_globals.myid.nspace, &buf);
cleanup:
buf.base_ptr = NULL; // protect data region from double-free
PMIX_DESTRUCT(&buf);
if (NULL != msg) {
free(msg);
}
return rc;
return PMIX_SUCCESS;
}
void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr)
@ -857,6 +812,9 @@ void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr)
pmix_nrec_t *nrec, *nr2;
char **procs;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: PROCESSING BLOB FOR NSPACE %s", nspace);
/* cycle across our known nspaces */
nsptr = NULL;
PMIX_LIST_FOREACH(nsptr2, &pmix_globals.nspaces, pmix_nspace_t) {
@ -1071,5 +1029,6 @@ static int usock_connect(struct sockaddr *addr)
pmix_globals.connected = true;
pmix_usock_set_nonblocking(sd);
return sd;
}

Просмотреть файл

@ -1908,6 +1908,7 @@ static int server_switchyard(pmix_peer_t *peer, uint32_t tag,
pmix_cmd_t cmd;
pmix_server_caddy_t *cd;
pmix_proc_t proc;
pmix_buffer_t *reply;
/* retrieve the cmd */
cnt = 1;
@ -1919,6 +1920,13 @@ static int server_switchyard(pmix_peer_t *peer, uint32_t tag,
"recvd pmix cmd %d from %s:%d",
cmd, peer->info->nptr->nspace, peer->info->rank);
if (PMIX_REQ_CMD == cmd) {
reply = PMIX_NEW(pmix_buffer_t);
pmix_bfrop.copy_payload(reply, &(peer->info->nptr->server->job_info));
PMIX_SERVER_QUEUE_REPLY(peer, tag, reply);
return PMIX_SUCCESS;
}
if (PMIX_ABORT_CMD == cmd) {
PMIX_PEER_CADDY(cd, peer, tag);
if (PMIX_SUCCESS != (rc = pmix_server_abort(peer, buf, op_cbfunc, cd))) {

Просмотреть файл

@ -285,82 +285,11 @@ static void listener_cb(int incoming_sd)
event_active(&pending_connection->ev, EV_WRITE, 1);
}
static int send_client_response(int sd, int status, pmix_buffer_t *payload)
{
int rc;
pmix_usock_hdr_t hdr;
pmix_buffer_t buf;
/* pack the status */
PMIX_CONSTRUCT(&buf, pmix_buffer_t);
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(&buf, &status, 1, PMIX_INT))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf);
return rc;
}
if (NULL != payload) {
pmix_bfrop.copy_payload(&buf, payload);
}
hdr.nbytes = buf.bytes_used;
hdr.pindex = 0;
hdr.tag = 0; // tag doesn't matter as we aren't matching to a recv
if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)&hdr, sizeof(hdr)))) {
PMIX_ERROR_LOG(rc);
return rc;
}
if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)buf.base_ptr, buf.bytes_used))) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf);
return rc;
}
PMIX_DESTRUCT(&buf);
return PMIX_SUCCESS;
}
/*
* Handler for accepting connections from the event library
*/
static void connection_handler(int sd, short flags, void* cbdata)
{
pmix_pending_connection_t *pnd = (pmix_pending_connection_t*)cbdata;
pmix_peer_t *peer;
int rank;
pmix_output_verbose(8, pmix_globals.debug_output,
"connection_handler: new connection: %d",
pnd->sd);
/* receive identifier info from the client and authenticate it - the
* function will lookup and return the peer object if the connection
* is successfully authenticated */
if (PMIX_SUCCESS != pmix_server_authenticate(pnd->sd, &rank, &peer, NULL)) {
CLOSE_THE_SOCKET(pnd->sd);
return;
}
pmix_usock_set_nonblocking(pnd->sd);
/* start the events for this client */
event_assign(&peer->recv_event, pmix_globals.evbase, pnd->sd,
EV_READ|EV_PERSIST, pmix_usock_recv_handler, peer);
event_add(&peer->recv_event, NULL);
peer->recv_ev_active = true;
event_assign(&peer->send_event, pmix_globals.evbase, pnd->sd,
EV_WRITE|EV_PERSIST, pmix_usock_send_handler, peer);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server client %s:%d has connected on socket %d",
peer->info->nptr->nspace, peer->info->rank, peer->sd);
PMIX_RELEASE(pnd);
}
/* Receive the peer's identification info from a newly
* connected socket and verify the expected response.
*/
pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer,
pmix_buffer_t **reply)
static pmix_status_t pmix_server_authenticate(int sd, int *out_rank,
pmix_peer_t **peer)
{
char *msg, *nspace, *version, *cred;
int rc, rank;
@ -369,7 +298,6 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer
pmix_rank_info_t *info;
pmix_peer_t *psave = NULL;
size_t csize;
pmix_buffer_t *bptr;
bool found;
pmix_proc_t proc;
@ -379,9 +307,6 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer
/* ensure all is zero'd */
memset(&hdr, 0, sizeof(pmix_usock_hdr_t));
*peer = NULL;
if (NULL != reply) {
*reply = NULL;
}
/* get the header */
if (PMIX_SUCCESS != (rc = pmix_usock_recv_blocking(sd, (char*)&hdr, sizeof(pmix_usock_hdr_t)))) {
@ -455,7 +380,9 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer
if (NULL == nptr) {
/* we don't know this namespace, reject it */
free(msg);
return PMIX_ERR_NOT_FOUND;
/* send an error reply to the client */
rc = PMIX_ERR_NOT_FOUND;
goto error;
}
/* see if we have this peer in our list */
@ -470,7 +397,9 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer
if (!found) {
/* rank unknown, reject it */
free(msg);
return PMIX_ERR_NOT_FOUND;
/* send an error reply to the client */
rc = PMIX_ERR_NOT_FOUND;
goto error;
}
*out_rank = rank;
/* a peer can connect on multiple sockets since it can fork/exec
@ -484,6 +413,7 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer
if (0 > (psave->index = pmix_pointer_array_add(&pmix_server_globals.clients, psave))) {
free(msg);
PMIX_RELEASE(psave);
/* probably cannot send an error reply if we are out of memory */
return PMIX_ERR_OUT_OF_RESOURCE;
}
@ -497,7 +427,8 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer
free(msg);
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
PMIX_RELEASE(psave);
return rc;
/* send an error reply to the client */
goto error;
}
pmix_output_verbose(2, pmix_globals.debug_output,
"client credential validated");
@ -509,40 +440,38 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer
if (NULL != pmix_sec.server_handshake) {
pmix_output_verbose(2, pmix_globals.debug_output,
"connect-ack executing handshake");
if (PMIX_SUCCESS != send_client_response(sd, PMIX_ERR_READY_FOR_HANDSHAKE, NULL)) {
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
PMIX_RELEASE(psave);
return PMIX_ERR_UNREACH;
}
if (PMIX_SUCCESS != pmix_sec.server_handshake(psave)) {
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
PMIX_RELEASE(psave);
return PMIX_ERR_UNREACH;
}
pmix_output_verbose(2, pmix_globals.debug_output,
"connect-ack handshake complete");
}
/* create reply */
bptr = PMIX_NEW(pmix_buffer_t);
/* send this process its index */
pmix_bfrop.pack(bptr, (void*)&psave->index, 1, PMIX_INT);
/* copy any data across */
pmix_bfrop.copy_payload(bptr, &nptr->server->job_info);
if (NULL == reply) {
/* let the client know we are ready to go */
pmix_output_verbose(2, pmix_globals.debug_output,
"connect-ack sending client response with %d bytes",
(NULL == bptr) ? 0 : (int)bptr->bytes_used);
if (PMIX_SUCCESS != (rc = send_client_response(sd, PMIX_SUCCESS, bptr))) {
rc = PMIX_ERR_READY_FOR_HANDSHAKE;
if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)&rc, sizeof(int)))) {
PMIX_ERROR_LOG(rc);
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
PMIX_RELEASE(psave);
return rc;
}
PMIX_RELEASE(bptr);
if (PMIX_SUCCESS != (rc = pmix_sec.server_handshake(psave))) {
PMIX_ERROR_LOG(rc);
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
PMIX_RELEASE(psave);
return rc;
}
pmix_output_verbose(2, pmix_globals.debug_output,
"connect-ack handshake complete");
} else {
*reply = bptr;
/* send them success */
rc = PMIX_SUCCESS;
if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)&rc, sizeof(int)))) {
PMIX_ERROR_LOG(rc);
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
PMIX_RELEASE(psave);
return rc;
}
}
/* send the client's array index */
if (PMIX_SUCCESS != (rc = pmix_usock_send_blocking(sd, (char*)&psave->index, sizeof(int)))) {
PMIX_ERROR_LOG(rc);
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
PMIX_RELEASE(psave);
return rc;
}
pmix_output_verbose(2, pmix_globals.debug_output,
@ -559,4 +488,47 @@ pmix_status_t pmix_server_authenticate(int sd, int *out_rank, pmix_peer_t **peer
}
}
return rc;
error:
/* send an error reply to the client */
if (PMIX_SUCCESS != pmix_usock_send_blocking(sd, (char*)&rc, sizeof(int))) {
PMIX_ERROR_LOG(rc);
}
return rc;
}
/*
* Handler for accepting connections from the event library
*/
static void connection_handler(int sd, short flags, void* cbdata)
{
pmix_pending_connection_t *pnd = (pmix_pending_connection_t*)cbdata;
pmix_peer_t *peer;
int rank;
pmix_output_verbose(8, pmix_globals.debug_output,
"connection_handler: new connection: %d",
pnd->sd);
/* receive identifier info from the client and authenticate it - the
* function will lookup and return the peer object if the connection
* is successfully authenticated */
if (PMIX_SUCCESS != pmix_server_authenticate(pnd->sd, &rank, &peer)) {
CLOSE_THE_SOCKET(pnd->sd);
return;
}
pmix_usock_set_nonblocking(pnd->sd);
/* start the events for this client */
event_assign(&peer->recv_event, pmix_globals.evbase, pnd->sd,
EV_READ|EV_PERSIST, pmix_usock_recv_handler, peer);
event_add(&peer->recv_event, NULL);
peer->recv_ev_active = true;
event_assign(&peer->send_event, pmix_globals.evbase, pnd->sd,
EV_WRITE|EV_PERSIST, pmix_usock_send_handler, peer);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server client %s:%d has connected on socket %d",
peer->info->nptr->nspace, peer->info->rank, peer->sd);
PMIX_RELEASE(pnd);
}

Просмотреть файл

@ -188,10 +188,6 @@ void pmix_pending_nspace_fix(pmix_nspace_t *nptr);
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, pmix_dmdx_local_t *lcd);
pmix_status_t pmix_server_authenticate(int sd, int *out_rank,
pmix_peer_t **peer,
pmix_buffer_t **reply);
pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf,
pmix_op_cbfunc_t cbfunc, void *cbdata);

Просмотреть файл

@ -62,6 +62,7 @@
/* define some commands */
typedef enum {
PMIX_REQ_CMD,
PMIX_ABORT_CMD,
PMIX_COMMIT_CMD,
PMIX_FENCENB_CMD,