1
1

Merge pull request #1270 from rhc54/topic/cleanup

Cleanup connection termination
Этот коммит содержится в:
rhc54 2015-12-30 08:37:25 -08:00
родитель a04f1cd643 0a6b8d2c14
Коммит d387725395
4 изменённых файлов: 118 добавлений и 68 удалений

Просмотреть файл

@ -1056,30 +1056,55 @@ static bool match_error_registration(pmix_regevents_info_t *reginfoptr, pmix_not
size_t ninfo = reginfoptr->ninfo;
pmix_status_t error = cd->status;
if (NULL == info || ninfo <= 0) {
/* this is a general errhandler, and so it always matches.
* however, here we are looking for an exact match, and
* so we ignore general errhandlers unless the incoming
* one is also general */
if (NULL == cd->info || 0 == cd->ninfo) {
return true;
} else {
return false;
}
}
/* since this errhandler has info keys, it is not a general errhandler.
* If the incoming errhandler *is* a general one, then we must not
* match so we can store the general case */
if (NULL == cd->info || 0 == cd->ninfo) {
return false;
}
/* try to match using error name or error group keys - this indicates
* a request for a specific error state */
pmix_get_errorgroup(error, errgroup);
/* try to match using error name or error group keys */
for (i=0; i < ninfo; i++) {
// if we get a match on any key then we abort the search and return true.
if ((0 == strcmp(info[i].key, PMIX_ERROR_NAME)) &&
if ((0 == strncmp(info[i].key, PMIX_ERROR_NAME, PMIX_MAX_KEYLEN)) &&
(error == info[i].value.data.int32)) {
return true;
} else if ((0 == strcmp(info[i].key, errgroup)) &&
} else if ((0 == strncmp(info[i].key, errgroup, PMIX_MAX_KEYLEN)) &&
(true == info[i].value.data.flag)) {
return true;
}
}
/* search by node (error location) key if it is specified in the notify info list*/
for (i=0; i<cd->ninfo ; i++) {
if (0 == strcmp (cd->info[i].key, PMIX_ERROR_NODE_NAME)) {
for (j=0; j<ninfo; j++) {
if ((0 == strcmp (info[j].key, PMIX_ERROR_NODE_NAME)) &&
(0 == strcmp (info[j].value.data.string, cd->info[i].value.data.string))) {
/* if we get here, then they haven't asked for a specific error state.
* It is possible, however, that they are asking for all errors from a
* specific node, so search by node (error location) key if it is
* specified in the notify info list */
for (i=0; i < cd->ninfo ; i++) {
if (0 == strncmp(cd->info[i].key, PMIX_ERROR_NODE_NAME, PMIX_MAX_KEYLEN)) {
for (j=0; j < ninfo; j++) {
if ((0 == strncmp(info[j].key, PMIX_ERROR_NODE_NAME, PMIX_MAX_KEYLEN)) &&
(0 == strcmp(info[j].value.data.string, cd->info[i].value.data.string))) {
return true;
}
}
}
}
/* end of search return false*/
/* end of search and nothing matched, so return false */
return false;
}
@ -1093,9 +1118,11 @@ static void _notify_error(int sd, short args, void *cbdata)
pmix_peer_t *peer;
pmix_regevents_info_t *reginfoptr;
bool notify, notifyall;
pmix_output_verbose(0, pmix_globals.debug_output,
"pmix_server: _notify_error notifying client of error %d",
cd->status);
/* pack the command */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(cd->buf, &cmd, 1, PMIX_CMD))) {
PMIX_ERROR_LOG(rc);
@ -1157,6 +1184,8 @@ static void _notify_error(int sd, short args, void *cbdata)
}
}
if (!notify) {
/* if we are not notifying everyone, and this proc isn't to
* be notified, then just continue the main loop */
continue;
}
}
@ -1173,8 +1202,9 @@ static void _notify_error(int sd, short args, void *cbdata)
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server _notify_error - match error registration returned notify =%d ", notify);
}
if (notify)
if (notify) {
break;
}
}
if (notify) {
pmix_output_verbose(2, pmix_globals.debug_output,
@ -1212,6 +1242,7 @@ pmix_status_t pmix_server_notify_error(pmix_status_t status,
cd->ninfo = ninfo;
cd->cbfunc = cbfunc;
cd->cbdata = cbdata;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server_notify_error status =%d, nprocs = %lu, ninfo =%lu",
status, nprocs, ninfo);
@ -1227,18 +1258,19 @@ static void reg_errhandler(int sd, short args, void *cbdata)
int index = 0;
pmix_status_t rc;
pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
/* check if this handler is already registered if so return error */
if (PMIX_SUCCESS == pmix_lookup_errhandler (cd->err, &index)) {
if (PMIX_SUCCESS == pmix_lookup_errhandler(cd->err, &index)) {
/* complete request with error status and return its original reference */
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server_register_errhandler error - hdlr already registered index = %d",
index);
cd->cbfunc.errregcbfn (PMIX_EXISTS, index, cd->cbdata);
cd->cbfunc.errregcbfn(PMIX_EXISTS, index, cd->cbdata);
} else {
rc = pmix_add_errhandler (cd->err, cd->info, cd->ninfo, &index);
rc = pmix_add_errhandler(cd->err, cd->info, cd->ninfo, &index);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server_register_errhandler - success index =%d", index);
cd->cbfunc.errregcbfn (rc, index, cd->cbdata);
cd->cbfunc.errregcbfn(rc, index, cd->cbdata);
}
cd->active = false;
PMIX_RELEASE(cd);
@ -1250,6 +1282,7 @@ void pmix_server_register_errhandler(pmix_info_t info[], size_t ninfo,
void *cbdata)
{
pmix_shift_caddy_t *cd;
/* need to thread shift this request */
cd = PMIX_NEW(pmix_shift_caddy_t);
cd->info = info;
@ -1257,8 +1290,10 @@ void pmix_server_register_errhandler(pmix_info_t info[], size_t ninfo,
cd->err = errhandler;
cd->cbfunc.errregcbfn = cbfunc;
cd->cbdata = cbdata;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server_register_errhandler shifting to server thread");
PMIX_THREADSHIFT(cd, reg_errhandler);
}
@ -1266,8 +1301,11 @@ static void dereg_errhandler(int sd, short args, void *cbdata)
{
pmix_status_t rc;
pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
rc = pmix_remove_errhandler (cd->ref);
cd->cbfunc.opcbfn(rc, cd->cbdata);
rc = pmix_remove_errhandler(cd->ref);
if (NULL != cd->cbfunc.opcbfn) {
cd->cbfunc.opcbfn(rc, cd->cbdata);
}
cd->active = false;
}
@ -1276,12 +1314,14 @@ void pmix_server_deregister_errhandler(int errhandler_ref,
void *cbdata)
{
pmix_shift_caddy_t *cd;
/* need to thread shift this request */
cd = PMIX_NEW(pmix_shift_caddy_t);
cd->cbfunc.opcbfn = cbfunc;
cd->cbdata = cbdata;
cd->ref = errhandler_ref;
PMIX_THREADSHIFT(cd, dereg_errhandler);
PMIX_WAIT_FOR_COMPLETION(cd->active);
PMIX_RELEASE(cd);
}

Просмотреть файл

@ -165,8 +165,10 @@ void pmix_errhandler_invoke(pmix_status_t status,
PMIX_INFO_CREATE(iptr, ninfo+1);
(void)strncpy(iptr[0].key, PMIX_ERROR_HANDLER_ID, PMIX_MAX_KEYLEN);
iptr[0].value.type = PMIX_INT;
for (j=0; j < ninfo; j++) {
PMIX_INFO_LOAD(&iptr[j+1], info[j].key, &info[j].value.data, info[j].value.type);
if (NULL != info) {
for (j=0; j < ninfo; j++) {
PMIX_INFO_LOAD(&iptr[j+1], info[j].key, &info[j].value.data, info[j].value.type);
}
}
for (i = 0; i < pmix_globals.errregs.size; i++) {
@ -221,9 +223,10 @@ pmix_status_t pmix_lookup_errhandler(pmix_notification_fn_t err,
int i;
pmix_status_t rc = PMIX_ERR_NOT_FOUND;
pmix_error_reg_info_t *errreg = NULL;
for (i = 0; i < pmix_pointer_array_get_size(&pmix_globals.errregs) ; i++) {
errreg = (pmix_error_reg_info_t*) pmix_pointer_array_get_item (&pmix_globals.errregs, i);
if((NULL != errreg) && (err == errreg->errhandler)) {
errreg = (pmix_error_reg_info_t*)pmix_pointer_array_get_item(&pmix_globals.errregs, i);
if ((NULL != errreg) && (err == errreg->errhandler)) {
*index = i;
rc = PMIX_SUCCESS;
break;
@ -238,19 +241,25 @@ pmix_status_t pmix_add_errhandler(pmix_notification_fn_t err,
{
int i;
pmix_status_t rc = PMIX_SUCCESS;
pmix_error_reg_info_t *errreg = PMIX_NEW(pmix_error_reg_info_t);
pmix_error_reg_info_t *errreg;
errreg = PMIX_NEW(pmix_error_reg_info_t);
errreg->errhandler = err;
errreg->ninfo = ninfo;
PMIX_INFO_CREATE(errreg->info, ninfo);
for (i=0; i < ninfo; i++) {
memcpy(errreg->info[i].key, info[i].key, PMIX_MAX_KEYLEN);
pmix_value_xfer(&errreg->info[i].value, &info[i].value);
if (NULL != info && 0 < ninfo) {
PMIX_INFO_CREATE(errreg->info, ninfo);
for (i=0; i < ninfo; i++) {
(void)strncpy(errreg->info[i].key, info[i].key, PMIX_MAX_KEYLEN);
pmix_value_xfer(&errreg->info[i].value, &info[i].value);
}
}
*index = pmix_pointer_array_add (&pmix_globals.errregs, errreg);
*index = pmix_pointer_array_add(&pmix_globals.errregs, errreg);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_add_errhandler index =%d", *index);
if (-1 == *index)
if (*index < 0) {
PMIX_RELEASE(errreg);
rc = PMIX_ERROR;
}
return rc;
}
@ -258,16 +267,19 @@ pmix_status_t pmix_remove_errhandler(int errhandler_ref)
{
int rc = PMIX_SUCCESS;
pmix_error_reg_info_t *errreg;
errreg = (pmix_error_reg_info_t*) pmix_pointer_array_get_item (&pmix_globals.errregs,
errhandler_ref);
if (NULL != errreg)
errreg = (pmix_error_reg_info_t*)pmix_pointer_array_get_item(&pmix_globals.errregs,
errhandler_ref);
if (NULL != errreg) {
PMIX_RELEASE(errreg);
else
pmix_pointer_array_set_item(&pmix_globals.errregs, errhandler_ref, NULL);
} else {
rc = PMIX_ERR_NOT_FOUND;
}
return rc;
}
void pmix_get_errorgroup ( pmix_status_t status, char *pmix_error_group)
void pmix_get_errorgroup(pmix_status_t status, char *pmix_error_group)
{
switch(status) {
case PMIX_ERR_UNREACH:
@ -276,24 +288,24 @@ void pmix_get_errorgroup ( pmix_status_t status, char *pmix_error_group)
case PMIX_ERR_TIMEOUT:
case PMIX_ERR_PACK_FAILURE:
case PMIX_ERR_UNPACK_FAILURE:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_COMM);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_COMM, PMIX_MAX_KEYLEN);
break;
case PMIX_ERR_OUT_OF_RESOURCE:
case PMIX_ERR_RESOURCE_BUSY:
case PMIX_ERR_NOMEM:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_RESOURCE);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_RESOURCE, PMIX_MAX_KEYLEN);
break;
case PMIX_ERR_PROC_MIGRATE:
case PMIX_ERR_PROC_CHECKPOINT:
case PMIX_ERR_PROC_RESTART:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_MIGRATE);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_MIGRATE, PMIX_MAX_KEYLEN);
break;
case PMIX_ERR_PROC_ABORTING:
case PMIX_ERR_PROC_REQUESTED_ABORT:
case PMIX_ERR_PROC_ABORTED:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_ABORT);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_ABORT, PMIX_MAX_KEYLEN);
break;
default:
strcpy(pmix_error_group, PMIX_ERROR_GROUP_GENERAL);
(void)strncpy(pmix_error_group, PMIX_ERROR_GROUP_GENERAL, PMIX_MAX_KEYLEN);
}
}

Просмотреть файл

@ -970,8 +970,6 @@ void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
MCA_OOB_TCP_CHECK_SHUTDOWN(pop);
/* Mark that we no longer support this peer */
memcpy(&ui64, (char*)&pop->peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
@ -979,18 +977,19 @@ void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata)
bpr = OBJ_NEW(orte_oob_base_peer_t);
}
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* activate the proc state */
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED);
if (!orte_finalizing) {
/* activate the proc state */
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED);
}
}
OBJ_RELEASE(pop);
}
@ -1006,8 +1005,6 @@ void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop));
MCA_OOB_TCP_CHECK_SHUTDOWN(mop);
/* mark that we cannot reach this hop */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
@ -1020,11 +1017,16 @@ void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
ORTE_ERROR_LOG(rc);
}
/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
/* report the error back to the OOB and let it try other components
* or declare a problem
*/
if (!orte_finalizing && !orte_abnormal_term_ordered) {
/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
}
}
OBJ_RELEASE(mop);
@ -1042,7 +1044,11 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop));
MCA_OOB_TCP_CHECK_SHUTDOWN(mop);
if (orte_finalizing || orte_abnormal_term_ordered) {
/* just ignore the problem */
OBJ_RELEASE(mop);
return;
}
/* mark that this component cannot reach this hop */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
@ -1110,7 +1116,11 @@ void mca_oob_tcp_component_failed_to_connect(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
MCA_OOB_TCP_CHECK_SHUTDOWN(pop);
/* if we are terminating, then don't attempt to reconnect */
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
OBJ_RELEASE(pop);
return;
}
/* activate the proc state */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,

Просмотреть файл

@ -92,16 +92,4 @@ ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_failed_to_connect(int fd, short
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata);
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata);
/* provide a macro for handling errors reported during shutdown */
#define MCA_OOB_TCP_CHECK_SHUTDOWN(a) \
do { \
if (!orte_enable_recovery || \
orte_orteds_term_ordered || \
orte_finalizing || \
orte_abnormal_term_ordered) { \
OBJ_RELEASE(a); \
return; \
} \
} while(0);
#endif /* _MCA_OOB_TCP_COMPONENT_H_ */