From 8ab962411cee7ac75543b998bd205d10e9c9eef5 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 17 Dec 2009 19:39:53 +0000 Subject: [PATCH] Detect the scenario where one or more procs fail to call orte/ompi_init while others in the job do. This scenario can cause the job to hang as MPI_Init contains a barrier operation that will not complete. Although ORTE does not contain such a barrier, it still will be considered as an error scenario so that we can detect the MPI case - otherwise, ORTE has no knowledge of OMPI and wouldn't know how to differentiate the use-cases. Take advantage of the changes to update the routed_base_receive code to avoid message overlap. This commit was SVN r22329. --- orte/mca/odls/base/odls_base_default_fns.c | 90 +++++++++++---- orte/mca/odls/base/odls_base_open.c | 2 + orte/mca/odls/odls_types.h | 2 + orte/mca/plm/base/base.h | 1 + orte/mca/plm/base/plm_base_launch_support.c | 4 +- orte/mca/plm/base/plm_base_receive.c | 4 +- orte/mca/plm/base/plm_private.h | 2 - orte/mca/routed/base/base.h | 2 + orte/mca/routed/base/routed_base_receive.c | 106 ++++++++++++++---- .../routed/base/routed_base_register_sync.c | 82 ++++++++++++++ orte/mca/routed/binomial/routed_binomial.c | 67 +---------- orte/mca/routed/cm/routed_cm.c | 67 +---------- orte/mca/routed/linear/routed_linear.c | 67 +---------- orte/mca/routed/radix/routed_radix.c | 67 +---------- orte/tools/orterun/help-orterun.txt | 14 ++- 15 files changed, 263 insertions(+), 314 deletions(-) diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index c2f2f0dc24..b4b0c78733 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -2057,8 +2057,21 @@ static bool all_children_registered(orte_jobid_t job) /* is this child part of the specified job? */ if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) { + /* if this child has terminated, we consider it as having + * registered for the purposes of this function. If it never + * did register, then we will send a NULL rml_uri back to + * the HNP, which will then know that the proc did not register. + * If other procs did register, then the HNP can declare an + * abnormal termination + */ + if (ORTE_PROC_STATE_UNTERMINATED < child->state) { + /* this proc has terminated somehow - consider it + * as registered for now + */ + continue; + } /* if this child is *not* registered yet, return false */ - if (NULL == child->rml_uri) { + if (!child->init_recvd) { return false; } } @@ -2084,6 +2097,11 @@ static int pack_child_contact_info(orte_jobid_t job, opal_buffer_t *buf) /* is this child part of the specified job? */ if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) { + /* pack the child's vpid - must be done in case rml_uri is NULL */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &(child->name->vpid), 1, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + return rc; + } /* pack the contact info */ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &child->rml_uri, 1, OPAL_STRING))) { ORTE_ERROR_LOG(rc); @@ -2173,6 +2191,7 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, int rc; bool found=false; int8_t flag; + orte_odls_job_t *jobdat, *jdat; /* protect operations involving the global list of children */ OPAL_THREAD_LOCK(&orte_odls_globals.mutex); @@ -2216,13 +2235,15 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, /* if the contact info is already set, then we are "de-registering" the child * so free the info and set it to NULL */ - if (NULL != child->rml_uri) { + if (child->init_recvd && NULL != child->rml_uri) { free(child->rml_uri); child->rml_uri = NULL; + child->fini_recvd = true; } else { /* if the contact info is not set, then we are registering the child so * unpack the contact info from the buffer and store it */ + child->init_recvd = true; cnt = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &(child->rml_uri), &cnt, OPAL_STRING))) { ORTE_ERROR_LOG(rc); @@ -2233,13 +2254,14 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, OBJ_CONSTRUCT(&buffer, opal_buffer_t); /* do they want the nidmap? */ if (drop_nidmap) { - orte_odls_job_t *jobdat = NULL; /* get the jobdata object */ + jobdat = NULL; for (item = opal_list_get_first(&orte_local_jobdata); item != opal_list_get_end(&orte_local_jobdata); item = opal_list_get_next(item)) { - jobdat = (orte_odls_job_t*)item; - if (jobdat->jobid == child->name->jobid) { + jdat = (orte_odls_job_t*)item; + if (jdat->jobid == child->name->jobid) { + jobdat = jdat; break; } } @@ -2247,6 +2269,7 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); goto CLEANUP; } + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, "%s odls:sync nidmap requested for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -2577,7 +2600,7 @@ GOTCHILD: void orte_base_default_waitpid_fired(orte_process_name_t *proc, int32_t status) { - orte_odls_child_t *child; + orte_odls_child_t *child, *chd; opal_list_item_t *item; char *job, *vpid, *abort_file; struct stat buf; @@ -2677,21 +2700,47 @@ GOTCHILD: /* okay, it terminated normally - check to see if a sync was required and * if it was received */ - if (NULL != child->rml_uri) { - /* if this is set, then we required a sync and didn't get it, so this - * is considered an abnormal termination and treated accordingly - */ - child->state = ORTE_PROC_STATE_TERM_WO_SYNC; - - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:waitpid_fired child process %s terminated normally " - "but did not provide a required sync - it " - "will be treated as an abnormal termination", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(child->name))); - - goto MOVEON; + if (child->init_recvd) { + if (!child->fini_recvd) { + /* we required a finalizing sync and didn't get it, so this + * is considered an abnormal termination and treated accordingly + */ + child->state = ORTE_PROC_STATE_TERM_WO_SYNC; + + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired child process %s terminated normally " + "but did not provide a required finalize sync - it " + "will be treated as an abnormal termination", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(child->name))); + + goto MOVEON; + } } else { + /* has any child in this job already registered? */ + for (item = opal_list_get_first(&orte_local_children); + item != opal_list_get_end(&orte_local_children); + item = opal_list_get_next(item)) { + chd = (orte_odls_child_t*)item; + + if (chd->init_recvd) { + /* someone has registered, and we didn't before + * terminating - this is an abnormal termination + */ + child->state = ORTE_PROC_STATE_TERM_WO_SYNC; + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:waitpid_fired child process %s terminated normally " + "but did not provide a required init sync - it " + "will be treated as an abnormal termination", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(child->name))); + + goto MOVEON; + } + } + /* if no child has registered, then it is possible that + * none of them will. This is considered acceptable + */ child->state = ORTE_PROC_STATE_TERMINATED; } @@ -2699,7 +2748,6 @@ GOTCHILD: "%s odls:waitpid_fired child process %s terminated normally", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(child->name))); - } } else { /* the process was terminated with a signal! That's definitely diff --git a/orte/mca/odls/base/odls_base_open.c b/orte/mca/odls/base/odls_base_open.c index 0ff9522f1d..4151a0b151 100644 --- a/orte/mca/odls/base/odls_base_open.c +++ b/orte/mca/odls/base/odls_base_open.c @@ -85,6 +85,8 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr) */ ptr->state = ORTE_PROC_STATE_FAILED_TO_START; ptr->exit_code = 0; + ptr->init_recvd = false; + ptr->fini_recvd = false; ptr->rml_uri = NULL; ptr->slot_list = NULL; ptr->waitpid_recvd = false; diff --git a/orte/mca/odls/odls_types.h b/orte/mca/odls/odls_types.h index e22385c7b2..6742be1f86 100644 --- a/orte/mca/odls/odls_types.h +++ b/orte/mca/odls/odls_types.h @@ -91,6 +91,8 @@ typedef struct { bool coll_recvd; /* collective operation recvd */ orte_proc_state_t state; /* the state of the process */ orte_exit_code_t exit_code; /* process exit code */ + bool init_recvd; /* process called orte_init */ + bool fini_recvd; /* process called orte_finalize */ char *rml_uri; /* contact info for this child */ char *slot_list; /* list of slots for this child */ bool waitpid_recvd; /* waitpid has detected proc termination */ diff --git a/orte/mca/plm/base/base.h b/orte/mca/plm/base/base.h index 8589ec00fa..b933c99a27 100644 --- a/orte/mca/plm/base/base.h +++ b/orte/mca/plm/base/base.h @@ -81,6 +81,7 @@ ORTE_DECLSPEC int orte_plm_base_close(void); */ ORTE_DECLSPEC void orte_plm_base_app_report_launch(int fd, short event, void *data); ORTE_DECLSPEC void orte_plm_base_receive_process_msg(int fd, short event, void *data); +ORTE_DECLSPEC void orte_plm_base_check_job_completed(orte_job_t *jdata); #endif /* ORTE_DISABLE_FULL_SUPPORT */ diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index c64302dc1f..a74c342ed1 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -1369,9 +1369,7 @@ void orte_plm_base_check_job_completed(orte_job_t *jdata) * that the user realizes there was an error, so in this -one- case, * we overwrite the process' exit code with the default error code */ - if (ORTE_PROC_STATE_TERM_WO_SYNC == proc->state) { - ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); - } + ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); } break; } else if (ORTE_PROC_STATE_KILLED_BY_CMD == proc->state) { diff --git a/orte/mca/plm/base/plm_base_receive.c b/orte/mca/plm/base/plm_base_receive.c index 8bb0b6d97a..244bd416b9 100644 --- a/orte/mca/plm/base/plm_base_receive.c +++ b/orte/mca/plm/base/plm_base_receive.c @@ -34,8 +34,8 @@ #include "opal/mca/mca.h" #include "opal/mca/base/mca_base_param.h" - #include "opal/dss/dss.h" + #include "orte/constants.h" #include "orte/types.h" #include "orte/util/proc_info.h" @@ -129,7 +129,7 @@ int orte_plm_base_comm_stop(void) /* process incoming messages in order of receipt */ -void process_msg(int fd, short event, void *data) +static void process_msg(int fd, short event, void *data) { orte_msg_packet_t *msgpkt; orte_plm_cmd_flag_t command; diff --git a/orte/mca/plm/base/plm_private.h b/orte/mca/plm/base/plm_private.h index ed19480796..d73433c1a4 100644 --- a/orte/mca/plm/base/plm_private.h +++ b/orte/mca/plm/base/plm_private.h @@ -111,8 +111,6 @@ ORTE_DECLSPEC int orte_plm_base_report_launched(orte_jobid_t job); ORTE_DECLSPEC int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons); -ORTE_DECLSPEC void orte_plm_base_check_job_completed(orte_job_t *jdata); - ORTE_DECLSPEC int orte_plm_base_set_hnp_name(void); ORTE_DECLSPEC int orte_plm_base_create_jobid(orte_job_t *jdata); diff --git a/orte/mca/routed/base/base.h b/orte/mca/routed/base/base.h index 65f1eae89f..ca8e185ef0 100644 --- a/orte/mca/routed/base/base.h +++ b/orte/mca/routed/base/base.h @@ -37,6 +37,8 @@ ORTE_DECLSPEC extern int orte_routed_base_output; ORTE_DECLSPEC extern opal_list_t orte_routed_base_components; ORTE_DECLSPEC extern int orte_routed_base_register_sync(bool setup); +ORTE_DECLSPEC extern int orte_routed_base_process_callback(orte_jobid_t job, + opal_buffer_t *buffer); ORTE_DECLSPEC int orte_routed_base_comm_start(void); ORTE_DECLSPEC int orte_routed_base_comm_stop(void); diff --git a/orte/mca/routed/base/routed_base_receive.c b/orte/mca/routed/base/routed_base_receive.c index 87a8b4332e..8f30b467aa 100644 --- a/orte/mca/routed/base/routed_base_receive.c +++ b/orte/mca/routed/base/routed_base_receive.c @@ -53,6 +53,13 @@ #include "orte/mca/routed/base/base.h" static bool recv_issued=false; +static opal_mutex_t lock; +static opal_list_t recvs; +static opal_event_t ready; +static int ready_fd[2]; +static bool processing; + +static void process_msg(int fd, short event, void *data); int orte_routed_base_comm_start(void) { @@ -66,6 +73,20 @@ int orte_routed_base_comm_start(void) "%s routed:base: Receive: Start command recv", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + processing = false; + OBJ_CONSTRUCT(&lock, opal_mutex_t); + OBJ_CONSTRUCT(&recvs, opal_list_t); +#ifndef __WINDOWS__ + pipe(ready_fd); +#else + if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, ready_fd) == -1) { + return ORTE_ERROR; + } +#endif + + opal_event_set(&ready, ready_fd[0], OPAL_EV_READ, process_msg, NULL); + opal_event_add(&ready, 0); + if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES, ORTE_RML_NON_PERSISTENT, @@ -92,37 +113,73 @@ int orte_routed_base_comm_stop(void) "%s routed:base:receive stop comm", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES))) { - ORTE_ERROR_LOG(rc); - } + OBJ_DESTRUCT(&recvs); + opal_event_del(&ready); +#ifndef __WINDOWS__ + close(ready_fd[0]); +#else + closesocket(ready_fd[0]); +#endif + processing = false; + OBJ_DESTRUCT(&lock); + + orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES); recv_issued = false; return rc; } -void orte_routed_base_process_msg(int fd, short event, void *data) +static void process_msg(int fd, short event, void *data) { - orte_message_event_t *mev = (orte_message_event_t*)data; + orte_msg_packet_t *msgpkt; orte_jobid_t job; int rc; orte_std_cntr_t cnt; + opal_list_item_t *item; + int dump[128]; - /* unpack the jobid this is for */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &job, &cnt, ORTE_JOBID))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(mev); - return; + OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output, + "%s routed:base:receive processing msg", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + OPAL_THREAD_LOCK(&lock); + + /* tag that we are processing the list */ + processing = true; + + /* clear the file descriptor to stop the event from refiring */ +#ifndef __WINDOWS__ + read(fd, &dump, sizeof(dump)); +#else + recv(fd, (char *) &dump, sizeof(dump), 0); +#endif + + while (NULL != (item = opal_list_remove_first(&recvs))) { + msgpkt = (orte_msg_packet_t*)item; + + /* unpack the jobid this is for */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &job, &cnt, ORTE_JOBID))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(msgpkt); + continue; + } + + /* pass the remainder of the buffer to the active module's + * init_routes API + */ + if (ORTE_SUCCESS != (rc = orte_routed.init_routes(job, msgpkt->buffer))) { + ORTE_ERROR_LOG(rc); + } + OBJ_RELEASE(msgpkt); } - /* pass the remainder of the buffer to the active module's - * init_routes API - */ - if (ORTE_SUCCESS != (rc = orte_routed.init_routes(job, mev->buffer))) { - ORTE_ERROR_LOG(rc); - } - OBJ_RELEASE(mev); - return; + /* reset the event */ + processing = false; + opal_event_add(&ready, 0); + + /* release the thread */ + OPAL_THREAD_UNLOCK(&lock); } @@ -151,7 +208,7 @@ void orte_routed_base_recv(int status, orte_process_name_t* sender, * buffer, however, is NOT released here, although its payload IS transferred * to the message buffer for later processing */ - ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_routed_base_process_msg); + ORTE_PROCESS_MESSAGE(&recvs, &lock, processing, ready_fd[1], true, sender, &buffer); /* reissue the recv */ if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, @@ -163,3 +220,12 @@ void orte_routed_base_recv(int status, orte_process_name_t* sender, } return; } + +/* where HNP messages come */ +void orte_routed_base_process_msg(int fd, short event, void *data) +{ + orte_message_event_t *mev = (orte_message_event_t*)data; + + ORTE_PROCESS_MESSAGE(&recvs, &lock, processing, ready_fd[1], false, &mev->sender, &mev->buffer); + OBJ_RELEASE(mev); +} diff --git a/orte/mca/routed/base/routed_base_register_sync.c b/orte/mca/routed/base/routed_base_register_sync.c index 03b6961b54..07a2691c4d 100644 --- a/orte/mca/routed/base/routed_base_register_sync.c +++ b/orte/mca/routed/base/routed_base_register_sync.c @@ -27,6 +27,7 @@ #include "orte/mca/rml/rml.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" +#include "orte/mca/plm/base/base.h" #include "orte/mca/routed/base/base.h" @@ -105,3 +106,84 @@ int orte_routed_base_register_sync(bool setup) return ORTE_SUCCESS; } + +int orte_routed_base_process_callback(orte_jobid_t job, opal_buffer_t *buffer) +{ + orte_proc_t *proc; + orte_job_t *jdata; + orte_std_cntr_t cnt; + char *rml_uri; + orte_vpid_t vpid; + int rc; + + /* lookup the job object for this process */ + if (NULL == (jdata = orte_get_job_data_object(job))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + } + + /* unpack the data for each entry */ + cnt = 1; + while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &cnt, ORTE_VPID))) { + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + continue; + + } + OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output, + "%s routed_binomial:callback got uri %s for job %s rank %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == rml_uri) ? "NULL" : rml_uri, + ORTE_JOBID_PRINT(job), ORTE_VPID_PRINT(vpid))); + + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + continue; + } + + if (rml_uri == NULL) { + /* if the rml_uri is NULL, then that means this process + * terminated without calling orte_init. However, the only + * reason we would be getting called here is if other + * processes local to that daemon -did- call orte_init. + * This is considered an "abnormal termination" mode per + * community discussion, and must generate a corresponding + * response, so declare the proc abnormally terminated + */ + proc->state = ORTE_PROC_STATE_TERM_WO_SYNC; + /* increment the number of procs that have terminated */ + jdata->num_terminated++; + /* let the normal code path declare the job aborted */ + orte_plm_base_check_job_completed(jdata); + continue; + } + + /* update the record */ + proc->rml_uri = strdup(rml_uri); + free(rml_uri); + + /* update the proc state */ + if (proc->state < ORTE_PROC_STATE_RUNNING) { + proc->state = ORTE_PROC_STATE_RUNNING; + } + + ++jdata->num_reported; + cnt = 1; + } + if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* if all procs have reported, update our job state */ + if (jdata->num_reported == jdata->num_procs) { + /* update the job state */ + if (jdata->state < ORTE_JOB_STATE_RUNNING) { + jdata->state = ORTE_JOB_STATE_RUNNING; + } + } + + return ORTE_SUCCESS; +} diff --git a/orte/mca/routed/binomial/routed_binomial.c b/orte/mca/routed/binomial/routed_binomial.c index d7ee64b0b5..a954f08e70 100644 --- a/orte/mca/routed/binomial/routed_binomial.c +++ b/orte/mca/routed/binomial/routed_binomial.c @@ -421,71 +421,6 @@ static orte_process_name_t get_route(orte_process_name_t *target) return *ret; } -static int process_callback(orte_jobid_t job, opal_buffer_t *buffer) -{ - orte_proc_t **procs; - orte_job_t *jdata; - orte_std_cntr_t cnt; - char *rml_uri; - orte_process_name_t name; - int rc; - - /* lookup the job object for this process */ - if (NULL == (jdata = orte_get_job_data_object(job))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return ORTE_ERR_NOT_FOUND; - } - procs = (orte_proc_t**)jdata->procs->addr; - - /* unpack the data for each entry */ - cnt = 1; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) { - - OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output, - "%s routed_binomial:callback got uri %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == rml_uri) ? "NULL" : rml_uri)); - - if (rml_uri == NULL) continue; - - /* we don't need to set the contact info into our rml - * hash table as we won't talk to the proc directly - */ - - /* extract the proc's name */ - if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) { - ORTE_ERROR_LOG(rc); - free(rml_uri); - continue; - } - /* the procs are stored in vpid order, so update the record */ - procs[name.vpid]->rml_uri = strdup(rml_uri); - free(rml_uri); - - /* update the proc state */ - if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) { - procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING; - } - - ++jdata->num_reported; - cnt = 1; - } - if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* if all procs have reported, update our job state */ - if (jdata->num_reported == jdata->num_procs) { - /* update the job state */ - if (jdata->state < ORTE_JOB_STATE_RUNNING) { - jdata->state = ORTE_JOB_STATE_RUNNING; - } - } - - return ORTE_SUCCESS; -} - /* HANDLE ACK MESSAGES FROM AN HNP */ static void release_ack(int fd, short event, void *data) { @@ -621,7 +556,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) } } else { /* if not, then I need to process the callback */ - if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) { + if (ORTE_SUCCESS != (rc = orte_routed_base_process_callback(job, ndat))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/orte/mca/routed/cm/routed_cm.c b/orte/mca/routed/cm/routed_cm.c index 8c59368842..d05b33ae39 100644 --- a/orte/mca/routed/cm/routed_cm.c +++ b/orte/mca/routed/cm/routed_cm.c @@ -360,71 +360,6 @@ static orte_process_name_t get_route(orte_process_name_t *target) return *ret; } -static int process_callback(orte_jobid_t job, opal_buffer_t *buffer) -{ - orte_proc_t **procs; - orte_job_t *jdata; - orte_std_cntr_t cnt; - char *rml_uri; - orte_process_name_t name; - int rc; - - /* lookup the job object for this process */ - if (NULL == (jdata = orte_get_job_data_object(job))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return ORTE_ERR_NOT_FOUND; - } - procs = (orte_proc_t**)jdata->procs->addr; - - /* unpack the data for each entry */ - cnt = 1; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) { - - OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output, - "%s routed_cm:callback got uri %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == rml_uri) ? "NULL" : rml_uri)); - - if (rml_uri == NULL) continue; - - /* we don't need to set the contact info into our rml - * hash table as we won't talk to the proc directly - */ - - /* extract the proc's name */ - if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) { - ORTE_ERROR_LOG(rc); - free(rml_uri); - continue; - } - /* the procs are stored in vpid order, so update the record */ - procs[name.vpid]->rml_uri = strdup(rml_uri); - free(rml_uri); - - /* update the proc state */ - if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) { - procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING; - } - - ++jdata->num_reported; - cnt = 1; - } - if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* if all procs have reported, update our job state */ - if (jdata->num_reported == jdata->num_procs) { - /* update the job state */ - if (jdata->state < ORTE_JOB_STATE_RUNNING) { - jdata->state = ORTE_JOB_STATE_RUNNING; - } - } - - return ORTE_SUCCESS; -} - /* HANDLE ACK MESSAGES FROM AN HNP */ static void release_ack(int fd, short event, void *data) { @@ -571,7 +506,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) } } else { /* if not, then I need to process the callback */ - if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) { + if (ORTE_SUCCESS != (rc = orte_routed_base_process_callback(job, ndat))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/orte/mca/routed/linear/routed_linear.c b/orte/mca/routed/linear/routed_linear.c index 93385fc825..b07c606d30 100644 --- a/orte/mca/routed/linear/routed_linear.c +++ b/orte/mca/routed/linear/routed_linear.c @@ -383,71 +383,6 @@ static orte_process_name_t get_route(orte_process_name_t *target) return *ret; } -static int process_callback(orte_jobid_t job, opal_buffer_t *buffer) -{ - orte_proc_t **procs; - orte_job_t *jdata; - orte_std_cntr_t cnt; - char *rml_uri; - orte_process_name_t name; - int rc; - - /* lookup the job object for this process */ - if (NULL == (jdata = orte_get_job_data_object(job))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return ORTE_ERR_NOT_FOUND; - } - procs = (orte_proc_t**)jdata->procs->addr; - - /* unpack the data for each entry */ - cnt = 1; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) { - - OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output, - "%s routed_linear:callback got uri %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == rml_uri) ? "NULL" : rml_uri)); - - if (rml_uri == NULL) continue; - - /* we don't need to set the contact info into our rml - * hash table as we won't talk to the proc directly - */ - - /* extract the proc's name */ - if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) { - ORTE_ERROR_LOG(rc); - free(rml_uri); - continue; - } - /* the procs are stored in vpid order, so update the record */ - procs[name.vpid]->rml_uri = strdup(rml_uri); - free(rml_uri); - - /* update the proc state */ - if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) { - procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING; - } - - ++jdata->num_reported; - cnt = 1; - } - if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* if all procs have reported, update our job state */ - if (jdata->num_reported == jdata->num_procs) { - /* update the job state */ - if (jdata->state < ORTE_JOB_STATE_RUNNING) { - jdata->state = ORTE_JOB_STATE_RUNNING; - } - } - - return ORTE_SUCCESS; -} - /* HANDLE ACK MESSAGES FROM AN HNP */ static void release_ack(int fd, short event, void *data) { @@ -578,7 +513,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) } } else { /* if not, then I need to process the callback */ - if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) { + if (ORTE_SUCCESS != (rc = orte_routed_base_process_callback(job, ndat))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/orte/mca/routed/radix/routed_radix.c b/orte/mca/routed/radix/routed_radix.c index baf6a9f11c..2856130638 100644 --- a/orte/mca/routed/radix/routed_radix.c +++ b/orte/mca/routed/radix/routed_radix.c @@ -415,71 +415,6 @@ found: return *ret; } -static int process_callback(orte_jobid_t job, opal_buffer_t *buffer) -{ - orte_proc_t **procs; - orte_job_t *jdata; - orte_std_cntr_t cnt; - char *rml_uri; - orte_process_name_t name; - int rc; - - /* lookup the job object for this process */ - if (NULL == (jdata = orte_get_job_data_object(job))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return ORTE_ERR_NOT_FOUND; - } - procs = (orte_proc_t**)jdata->procs->addr; - - /* unpack the data for each entry */ - cnt = 1; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) { - - OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output, - "%s routed_radix:callback got uri %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == rml_uri) ? "NULL" : rml_uri)); - - if (rml_uri == NULL) continue; - - /* we don't need to set the contact info into our rml - * hash table as we won't talk to the proc directly - */ - - /* extract the proc's name */ - if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) { - ORTE_ERROR_LOG(rc); - free(rml_uri); - continue; - } - /* the procs are stored in vpid order, so update the record */ - procs[name.vpid]->rml_uri = strdup(rml_uri); - free(rml_uri); - - /* update the proc state */ - if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) { - procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING; - } - - ++jdata->num_reported; - cnt = 1; - } - if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* if all procs have reported, update our job state */ - if (jdata->num_reported == jdata->num_procs) { - /* update the job state */ - if (jdata->state < ORTE_JOB_STATE_RUNNING) { - jdata->state = ORTE_JOB_STATE_RUNNING; - } - } - - return ORTE_SUCCESS; -} - /* HANDLE ACK MESSAGES FROM AN HNP */ static void release_ack(int fd, short event, void *data) { @@ -610,7 +545,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) } } else { /* if not, then I need to process the callback */ - if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) { + if (ORTE_SUCCESS != (rc = orte_routed_base_process_callback(job, ndat))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/orte/tools/orterun/help-orterun.txt b/orte/tools/orterun/help-orterun.txt index 6e455b12b7..90adf4be06 100644 --- a/orte/tools/orterun/help-orterun.txt +++ b/orte/tools/orterun/help-orterun.txt @@ -95,8 +95,18 @@ in the application to be terminated by signals sent by %s # [orterun:proc-exit-no-sync] %s has exited due to process rank %lu with PID %lu on -node %s exiting without calling "finalize". This may -have caused other processes in the application to be +node %s exiting improperly. There are two reasons this could occur: + +1. this process did not call "init" before exiting, but others in +the job did. This can cause a job to hang indefinitely while it waits +for all processes to call "init". By rule, if one process calls "init", +then ALL processes must call "init" prior to termination. + +2. this process called "init", but exited without calling "finalize". +By rule, all processes that call "init" MUST call "finalize" prior to +exiting or it will be considered an "abnormal termination" + +This may have caused other processes in the application to be terminated by signals sent by %s (as reported here). # [orterun:proc-exit-no-sync-unknown]