1
1

Ensure we properly commit suicide if/when we lose connection to the daemon. There are multiple paths by which a lost daemon can be reported, and so a race condition exists in the pmix support. Our MPI layer wants the ability to determine the response to the failure, and so it will call down to the RTE with any abort request. This comes down to the pmix layer as a "pmix_abort" command, which involves communicating the request to the daemon - who is gone. Sadly, the pmix component may not know that just yet, and so we hang.

So add a brief timer event to kick us out of the communication. The precise amount of time we should wait is somewhat TBD, but set something short for now and we can adjust.
Этот коммит содержится в:
Ralph Castain 2015-06-18 09:45:52 -07:00
родитель 8ab2b11f88
Коммит cc9b416ab3
4 изменённых файлов: 94 добавлений и 47 удалений

Просмотреть файл

@ -102,8 +102,7 @@ const opal_pmix_base_module_t opal_pmix_native_module = {
// local variables
static int init_cntr = 0;
opal_process_name_t native_pname = {0};
static opal_process_name_t native_pname = {0};
/* callback for wait completion */
static void wait_cbfunc(opal_buffer_t *buf, void *cbdata)
@ -213,7 +212,7 @@ static int native_fini(void)
if (1 != init_cntr) {
--init_cntr;
return OPAL_SUCCESS;
return OPAL_SUCCESS;
}
init_cntr = 0;
@ -275,12 +274,20 @@ static bool native_initialized(void)
return false;
}
static void timeout(int sd, short args, void *cbdata)
{
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
cb->active = false;
}
static int native_abort(int flag, const char msg[])
{
opal_buffer_t *bfr;
pmix_cmd_t cmd = PMIX_ABORT_CMD;
int rc;
pmix_cb_t *cb;
opal_event_t ev;
struct timeval tv = {1, 0};
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:native abort called",
@ -291,39 +298,48 @@ static int native_abort(int flag, const char msg[])
return OPAL_SUCCESS;
}
/* create a buffer to hold the message */
bfr = OBJ_NEW(opal_buffer_t);
/* pack the cmd */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* pack the status flag */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* pack the string message - a NULL is okay */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
if (PMIX_USOCK_CONNECTED == mca_pmix_native_component.state) {
/* create a buffer to hold the message */
bfr = OBJ_NEW(opal_buffer_t);
/* pack the cmd */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* pack the status flag */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* pack the string message - a NULL is okay */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* create a callback object as we need to pass it to the
* recv routine so we know which callback to use when
* the return message is recvd */
cb = OBJ_NEW(pmix_cb_t);
cb->active = true;
/* create a callback object as we need to pass it to the
* recv routine so we know which callback to use when
* the return message is recvd */
cb = OBJ_NEW(pmix_cb_t);
cb->active = true;
/* push the message into our event base to send to the server */
PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb);
/* push a timeout event to wake us up just in case this
* message cannot get thru - e.g., someone else may have
* detected the failure of the server and ordered an abort */
opal_event_evtimer_set(mca_pmix_native_component.evbase,
&ev, timeout, cb);
opal_event_evtimer_add(&ev, &tv);
/* wait for the release */
PMIX_WAIT_FOR_COMPLETION(cb->active);
OBJ_RELEASE(cb);
/* push the message into our event base to send to the server */
PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb);
/* wait for the release */
PMIX_WAIT_FOR_COMPLETION(cb->active);
OBJ_RELEASE(cb);
}
return OPAL_SUCCESS;
}

Просмотреть файл

@ -41,6 +41,13 @@ typedef enum {
PMIX_USOCK_ACCEPTING
} pmix_usock_state_t;
/* define a macro for abnormal termination */
#define PMIX_NATIVE_ABNORMAL_TERM \
do { \
mca_pmix_native_component.state = PMIX_USOCK_FAILED; \
opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \
} while(0);
/* define a command type for communicating to the
* pmix server */
typedef uint8_t pmix_cmd_t;
@ -202,12 +209,13 @@ OPAL_MODULE_DECLSPEC int usock_send_connect_ack(void);
opal_event_active(&ms->ev, OPAL_EV_WRITE, 1); \
} while(0);
#define CLOSE_THE_SOCKET(socket) \
do { \
shutdown(socket, 2); \
close(socket); \
/* notify the error handler */ \
opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \
#define CLOSE_THE_SOCKET(socket) \
do { \
if (0 <= socket) { \
shutdown(socket, 2); \
close(socket); \
socket = -1; \
} \
} while(0)

Просмотреть файл

@ -35,6 +35,12 @@
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include "opal_stdint.h"
#include "opal/opal_socket_errno.h"
@ -191,6 +197,7 @@ void pmix_usock_process_msg(int fd, short flags, void *cbdata)
/* we get here if no matching recv was found - this is an error */
opal_output(0, "%s UNEXPECTED MESSAGE",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
OBJ_RELEASE(msg);
}
@ -222,9 +229,10 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
"usock_peer_try_connect: attempting to connect to server on socket %d",
mca_pmix_native_component.sd);
/* try to connect */
if (connect(mca_pmix_native_component.sd, &mca_pmix_native_component.address, addrlen) < 0) {
if (connect(mca_pmix_native_component.sd, (struct sockaddr*)&mca_pmix_native_component.address, addrlen) < 0) {
if (opal_socket_errno == ETIMEDOUT) {
/* The server may be too busy to accept new connections */
/* The server may be too busy to accept new connections,
* so cycle around and let it try again */
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"timeout connecting to server");
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
@ -235,7 +243,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
abort a connection that was ECONNREFUSED on the last
attempt, without even trying to establish the
connection. Handle that case in a semi-rational
way by trying twice before giving up */
way by trying again before giving up */
if (ECONNABORTED == opal_socket_errno) {
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"connection to server aborted by OS - retrying");
@ -255,6 +263,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
if (0 <= mca_pmix_native_component.sd) {
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
}
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
@ -282,7 +291,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI);
mca_pmix_native_component.send_ev_active = false;
/* setup the socket as non-blocking */
/* setup the socket as non-blocking */
if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) {
opal_output(0, "usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n",
strerror(opal_socket_errno),
@ -310,8 +319,8 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
"usock_send_connect_ack to server failed: %s (%d)",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
opal_strerror(rc), rc);
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}

Просмотреть файл

@ -176,6 +176,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
mca_pmix_native_component.send_ev_active = false;
OBJ_RELEASE(msg);
mca_pmix_native_component.send_msg = NULL;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}
@ -208,6 +210,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
mca_pmix_native_component.send_ev_active = false;
OBJ_RELEASE(msg);
mca_pmix_native_component.send_msg = NULL;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}
@ -239,6 +243,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
opal_event_del(&mca_pmix_native_component.send_event);
mca_pmix_native_component.send_ev_active = false;
}
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
break;
}
}
@ -356,6 +362,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
opal_event_del(&mca_pmix_native_component.recv_event);
mca_pmix_native_component.recv_ev_active = false;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
break;
@ -372,6 +380,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
if (NULL == mca_pmix_native_component.recv_msg) {
opal_output(0, "%s usock_recv_handler: unable to allocate recv message\n",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
/* start by reading the header */
@ -416,6 +426,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
"%s usock:recv:handler error reading bytes - closing connection",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}
@ -447,6 +458,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
opal_event_del(&mca_pmix_native_component.recv_event);
mca_pmix_native_component.recv_ev_active = false;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}
@ -456,6 +468,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
mca_pmix_native_component.state);
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
break;
}
}
@ -689,6 +702,7 @@ static void usock_complete_connect(void)
opal_socket_errno);
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM;
return;
}
@ -703,8 +717,8 @@ static void usock_complete_connect(void)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
strerror(so_error),
so_error);
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
} else if (so_error != 0) {
/* No need to worry about the return code here - we return regardless
@ -715,8 +729,8 @@ static void usock_complete_connect(void)
"connection to server failed with error %d",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
so_error);
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
@ -737,8 +751,8 @@ static void usock_complete_connect(void)
} else {
opal_output(0, "%s usock_complete_connect: unable to send connect ack to server",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
}
}