1
1

Merge pull request #647 from rhc54/topic/hangs

Ensure we properly commit suicide if/when we lose connection to the daemon.
Этот коммит содержится в:
rhc54 2015-06-18 10:25:07 -07:00
родитель 706884652f cc9b416ab3
Коммит 5d38283c7f
4 изменённых файлов: 94 добавлений и 47 удалений

Просмотреть файл

@ -102,8 +102,7 @@ const opal_pmix_base_module_t opal_pmix_native_module = {
// local variables
static int init_cntr = 0;
opal_process_name_t native_pname = {0};
static opal_process_name_t native_pname = {0};
/* callback for wait completion */
static void wait_cbfunc(opal_buffer_t *buf, void *cbdata)
@ -213,7 +212,7 @@ static int native_fini(void)
if (1 != init_cntr) {
--init_cntr;
return OPAL_SUCCESS;
return OPAL_SUCCESS;
}
init_cntr = 0;
@ -275,12 +274,20 @@ static bool native_initialized(void)
return false;
}
static void timeout(int sd, short args, void *cbdata)
{
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
cb->active = false;
}
static int native_abort(int flag, const char msg[])
{
opal_buffer_t *bfr;
pmix_cmd_t cmd = PMIX_ABORT_CMD;
int rc;
pmix_cb_t *cb;
opal_event_t ev;
struct timeval tv = {1, 0};
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:native abort called",
@ -291,39 +298,48 @@ static int native_abort(int flag, const char msg[])
return OPAL_SUCCESS;
}
/* create a buffer to hold the message */
bfr = OBJ_NEW(opal_buffer_t);
/* pack the cmd */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* pack the status flag */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* pack the string message - a NULL is okay */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
if (PMIX_USOCK_CONNECTED == mca_pmix_native_component.state) {
/* create a buffer to hold the message */
bfr = OBJ_NEW(opal_buffer_t);
/* pack the cmd */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* pack the status flag */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* pack the string message - a NULL is okay */
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(bfr);
return rc;
}
/* create a callback object as we need to pass it to the
* recv routine so we know which callback to use when
* the return message is recvd */
cb = OBJ_NEW(pmix_cb_t);
cb->active = true;
/* create a callback object as we need to pass it to the
* recv routine so we know which callback to use when
* the return message is recvd */
cb = OBJ_NEW(pmix_cb_t);
cb->active = true;
/* push the message into our event base to send to the server */
PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb);
/* push a timeout event to wake us up just in case this
* message cannot get thru - e.g., someone else may have
* detected the failure of the server and ordered an abort */
opal_event_evtimer_set(mca_pmix_native_component.evbase,
&ev, timeout, cb);
opal_event_evtimer_add(&ev, &tv);
/* wait for the release */
PMIX_WAIT_FOR_COMPLETION(cb->active);
OBJ_RELEASE(cb);
/* push the message into our event base to send to the server */
PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb);
/* wait for the release */
PMIX_WAIT_FOR_COMPLETION(cb->active);
OBJ_RELEASE(cb);
}
return OPAL_SUCCESS;
}

Просмотреть файл

@ -41,6 +41,13 @@ typedef enum {
PMIX_USOCK_ACCEPTING
} pmix_usock_state_t;
/* define a macro for abnormal termination */
#define PMIX_NATIVE_ABNORMAL_TERM \
do { \
mca_pmix_native_component.state = PMIX_USOCK_FAILED; \
opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \
} while(0);
/* define a command type for communicating to the
* pmix server */
typedef uint8_t pmix_cmd_t;
@ -202,12 +209,13 @@ OPAL_MODULE_DECLSPEC int usock_send_connect_ack(void);
opal_event_active(&ms->ev, OPAL_EV_WRITE, 1); \
} while(0);
#define CLOSE_THE_SOCKET(socket) \
do { \
shutdown(socket, 2); \
close(socket); \
/* notify the error handler */ \
opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \
#define CLOSE_THE_SOCKET(socket) \
do { \
if (0 <= socket) { \
shutdown(socket, 2); \
close(socket); \
socket = -1; \
} \
} while(0)

Просмотреть файл

@ -35,6 +35,12 @@
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include "opal_stdint.h"
#include "opal/opal_socket_errno.h"
@ -191,6 +197,7 @@ void pmix_usock_process_msg(int fd, short flags, void *cbdata)
/* we get here if no matching recv was found - this is an error */
opal_output(0, "%s UNEXPECTED MESSAGE",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
OBJ_RELEASE(msg);
}
@ -222,9 +229,10 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
"usock_peer_try_connect: attempting to connect to server on socket %d",
mca_pmix_native_component.sd);
/* try to connect */
if (connect(mca_pmix_native_component.sd, &mca_pmix_native_component.address, addrlen) < 0) {
if (connect(mca_pmix_native_component.sd, (struct sockaddr*)&mca_pmix_native_component.address, addrlen) < 0) {
if (opal_socket_errno == ETIMEDOUT) {
/* The server may be too busy to accept new connections */
/* The server may be too busy to accept new connections,
* so cycle around and let it try again */
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"timeout connecting to server");
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
@ -235,7 +243,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
abort a connection that was ECONNREFUSED on the last
attempt, without even trying to establish the
connection. Handle that case in a semi-rational
way by trying twice before giving up */
way by trying again before giving up */
if (ECONNABORTED == opal_socket_errno) {
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"connection to server aborted by OS - retrying");
@ -255,6 +263,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
if (0 <= mca_pmix_native_component.sd) {
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
}
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
@ -282,7 +291,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI);
mca_pmix_native_component.send_ev_active = false;
/* setup the socket as non-blocking */
/* setup the socket as non-blocking */
if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) {
opal_output(0, "usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n",
strerror(opal_socket_errno),
@ -310,8 +319,8 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
"usock_send_connect_ack to server failed: %s (%d)",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
opal_strerror(rc), rc);
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}

Просмотреть файл

@ -176,6 +176,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
mca_pmix_native_component.send_ev_active = false;
OBJ_RELEASE(msg);
mca_pmix_native_component.send_msg = NULL;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}
@ -208,6 +210,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
mca_pmix_native_component.send_ev_active = false;
OBJ_RELEASE(msg);
mca_pmix_native_component.send_msg = NULL;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}
@ -239,6 +243,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
opal_event_del(&mca_pmix_native_component.send_event);
mca_pmix_native_component.send_ev_active = false;
}
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
break;
}
}
@ -356,6 +362,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
opal_event_del(&mca_pmix_native_component.recv_event);
mca_pmix_native_component.recv_ev_active = false;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
break;
@ -372,6 +380,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
if (NULL == mca_pmix_native_component.recv_msg) {
opal_output(0, "%s usock_recv_handler: unable to allocate recv message\n",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
/* start by reading the header */
@ -416,6 +426,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
"%s usock:recv:handler error reading bytes - closing connection",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}
@ -447,6 +458,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
opal_event_del(&mca_pmix_native_component.recv_event);
mca_pmix_native_component.recv_ev_active = false;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
}
@ -456,6 +468,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
mca_pmix_native_component.state);
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
break;
}
}
@ -689,6 +702,7 @@ static void usock_complete_connect(void)
opal_socket_errno);
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM;
return;
}
@ -703,8 +717,8 @@ static void usock_complete_connect(void)
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
strerror(so_error),
so_error);
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
} else if (so_error != 0) {
/* No need to worry about the return code here - we return regardless
@ -715,8 +729,8 @@ static void usock_complete_connect(void)
"connection to server failed with error %d",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
so_error);
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
return;
}
@ -737,8 +751,8 @@ static void usock_complete_connect(void)
} else {
opal_output(0, "%s usock_complete_connect: unable to send connect ack to server",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
}
}