1
1

Detect the scenario where one or more procs fail to call orte/ompi_init while others in the job do. This scenario can cause the job to hang as MPI_Init contains a barrier operation that will not complete. Although ORTE does not contain such a barrier, it still will be considered as an error scenario so that we can detect the MPI case - otherwise, ORTE has no knowledge of OMPI and wouldn't know how to differentiate the use-cases.

Take advantage of the changes to update the routed_base_receive code to avoid message overlap.

This commit was SVN r22329.
Этот коммит содержится в:
Ralph Castain 2009-12-17 19:39:53 +00:00
родитель 06d1f2cfe2
Коммит 8ab962411c
15 изменённых файлов: 263 добавлений и 314 удалений

Просмотреть файл

@ -2057,8 +2057,21 @@ static bool all_children_registered(orte_jobid_t job)
/* is this child part of the specified job? */
if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) {
/* if this child has terminated, we consider it as having
* registered for the purposes of this function. If it never
* did register, then we will send a NULL rml_uri back to
* the HNP, which will then know that the proc did not register.
* If other procs did register, then the HNP can declare an
* abnormal termination
*/
if (ORTE_PROC_STATE_UNTERMINATED < child->state) {
/* this proc has terminated somehow - consider it
* as registered for now
*/
continue;
}
/* if this child is *not* registered yet, return false */
if (NULL == child->rml_uri) {
if (!child->init_recvd) {
return false;
}
}
@ -2084,6 +2097,11 @@ static int pack_child_contact_info(orte_jobid_t job, opal_buffer_t *buf)
/* is this child part of the specified job? */
if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) {
/* pack the child's vpid - must be done in case rml_uri is NULL */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &(child->name->vpid), 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the contact info */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &child->rml_uri, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
@ -2173,6 +2191,7 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
int rc;
bool found=false;
int8_t flag;
orte_odls_job_t *jobdat, *jdat;
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
@ -2216,13 +2235,15 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
/* if the contact info is already set, then we are "de-registering" the child
* so free the info and set it to NULL
*/
if (NULL != child->rml_uri) {
if (child->init_recvd && NULL != child->rml_uri) {
free(child->rml_uri);
child->rml_uri = NULL;
child->fini_recvd = true;
} else {
/* if the contact info is not set, then we are registering the child so
* unpack the contact info from the buffer and store it
*/
child->init_recvd = true;
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &(child->rml_uri), &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
@ -2233,13 +2254,14 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
/* do they want the nidmap? */
if (drop_nidmap) {
orte_odls_job_t *jobdat = NULL;
/* get the jobdata object */
jobdat = NULL;
for (item = opal_list_get_first(&orte_local_jobdata);
item != opal_list_get_end(&orte_local_jobdata);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
if (jobdat->jobid == child->name->jobid) {
jdat = (orte_odls_job_t*)item;
if (jdat->jobid == child->name->jobid) {
jobdat = jdat;
break;
}
}
@ -2247,6 +2269,7 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:sync nidmap requested for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -2577,7 +2600,7 @@ GOTCHILD:
void orte_base_default_waitpid_fired(orte_process_name_t *proc, int32_t status)
{
orte_odls_child_t *child;
orte_odls_child_t *child, *chd;
opal_list_item_t *item;
char *job, *vpid, *abort_file;
struct stat buf;
@ -2677,21 +2700,47 @@ GOTCHILD:
/* okay, it terminated normally - check to see if a sync was required and
* if it was received
*/
if (NULL != child->rml_uri) {
/* if this is set, then we required a sync and didn't get it, so this
* is considered an abnormal termination and treated accordingly
*/
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired child process %s terminated normally "
"but did not provide a required sync - it "
"will be treated as an abnormal termination",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
if (child->init_recvd) {
if (!child->fini_recvd) {
/* we required a finalizing sync and didn't get it, so this
* is considered an abnormal termination and treated accordingly
*/
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired child process %s terminated normally "
"but did not provide a required finalize sync - it "
"will be treated as an abnormal termination",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
}
} else {
/* has any child in this job already registered? */
for (item = opal_list_get_first(&orte_local_children);
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
chd = (orte_odls_child_t*)item;
if (chd->init_recvd) {
/* someone has registered, and we didn't before
* terminating - this is an abnormal termination
*/
child->state = ORTE_PROC_STATE_TERM_WO_SYNC;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:waitpid_fired child process %s terminated normally "
"but did not provide a required init sync - it "
"will be treated as an abnormal termination",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
goto MOVEON;
}
}
/* if no child has registered, then it is possible that
* none of them will. This is considered acceptable
*/
child->state = ORTE_PROC_STATE_TERMINATED;
}
@ -2699,7 +2748,6 @@ GOTCHILD:
"%s odls:waitpid_fired child process %s terminated normally",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
}
} else {
/* the process was terminated with a signal! That's definitely

Просмотреть файл

@ -85,6 +85,8 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
*/
ptr->state = ORTE_PROC_STATE_FAILED_TO_START;
ptr->exit_code = 0;
ptr->init_recvd = false;
ptr->fini_recvd = false;
ptr->rml_uri = NULL;
ptr->slot_list = NULL;
ptr->waitpid_recvd = false;

Просмотреть файл

@ -91,6 +91,8 @@ typedef struct {
bool coll_recvd; /* collective operation recvd */
orte_proc_state_t state; /* the state of the process */
orte_exit_code_t exit_code; /* process exit code */
bool init_recvd; /* process called orte_init */
bool fini_recvd; /* process called orte_finalize */
char *rml_uri; /* contact info for this child */
char *slot_list; /* list of slots for this child */
bool waitpid_recvd; /* waitpid has detected proc termination */

Просмотреть файл

@ -81,6 +81,7 @@ ORTE_DECLSPEC int orte_plm_base_close(void);
*/
ORTE_DECLSPEC void orte_plm_base_app_report_launch(int fd, short event, void *data);
ORTE_DECLSPEC void orte_plm_base_receive_process_msg(int fd, short event, void *data);
ORTE_DECLSPEC void orte_plm_base_check_job_completed(orte_job_t *jdata);
#endif /* ORTE_DISABLE_FULL_SUPPORT */

Просмотреть файл

@ -1369,9 +1369,7 @@ void orte_plm_base_check_job_completed(orte_job_t *jdata)
* that the user realizes there was an error, so in this -one- case,
* we overwrite the process' exit code with the default error code
*/
if (ORTE_PROC_STATE_TERM_WO_SYNC == proc->state) {
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
}
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
}
break;
} else if (ORTE_PROC_STATE_KILLED_BY_CMD == proc->state) {

Просмотреть файл

@ -34,8 +34,8 @@
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/dss/dss.h"
#include "orte/constants.h"
#include "orte/types.h"
#include "orte/util/proc_info.h"
@ -129,7 +129,7 @@ int orte_plm_base_comm_stop(void)
/* process incoming messages in order of receipt */
void process_msg(int fd, short event, void *data)
static void process_msg(int fd, short event, void *data)
{
orte_msg_packet_t *msgpkt;
orte_plm_cmd_flag_t command;

Просмотреть файл

@ -111,8 +111,6 @@ ORTE_DECLSPEC int orte_plm_base_report_launched(orte_jobid_t job);
ORTE_DECLSPEC int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons);
ORTE_DECLSPEC void orte_plm_base_check_job_completed(orte_job_t *jdata);
ORTE_DECLSPEC int orte_plm_base_set_hnp_name(void);
ORTE_DECLSPEC int orte_plm_base_create_jobid(orte_job_t *jdata);

Просмотреть файл

@ -37,6 +37,8 @@ ORTE_DECLSPEC extern int orte_routed_base_output;
ORTE_DECLSPEC extern opal_list_t orte_routed_base_components;
ORTE_DECLSPEC extern int orte_routed_base_register_sync(bool setup);
ORTE_DECLSPEC extern int orte_routed_base_process_callback(orte_jobid_t job,
opal_buffer_t *buffer);
ORTE_DECLSPEC int orte_routed_base_comm_start(void);
ORTE_DECLSPEC int orte_routed_base_comm_stop(void);

Просмотреть файл

@ -53,6 +53,13 @@
#include "orte/mca/routed/base/base.h"
static bool recv_issued=false;
static opal_mutex_t lock;
static opal_list_t recvs;
static opal_event_t ready;
static int ready_fd[2];
static bool processing;
static void process_msg(int fd, short event, void *data);
int orte_routed_base_comm_start(void)
{
@ -66,6 +73,20 @@ int orte_routed_base_comm_start(void)
"%s routed:base: Receive: Start command recv",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
processing = false;
OBJ_CONSTRUCT(&lock, opal_mutex_t);
OBJ_CONSTRUCT(&recvs, opal_list_t);
#ifndef __WINDOWS__
pipe(ready_fd);
#else
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, ready_fd) == -1) {
return ORTE_ERROR;
}
#endif
opal_event_set(&ready, ready_fd[0], OPAL_EV_READ, process_msg, NULL);
opal_event_add(&ready, 0);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_INIT_ROUTES,
ORTE_RML_NON_PERSISTENT,
@ -92,37 +113,73 @@ int orte_routed_base_comm_stop(void)
"%s routed:base:receive stop comm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&recvs);
opal_event_del(&ready);
#ifndef __WINDOWS__
close(ready_fd[0]);
#else
closesocket(ready_fd[0]);
#endif
processing = false;
OBJ_DESTRUCT(&lock);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_INIT_ROUTES);
recv_issued = false;
return rc;
}
void orte_routed_base_process_msg(int fd, short event, void *data)
static void process_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_msg_packet_t *msgpkt;
orte_jobid_t job;
int rc;
orte_std_cntr_t cnt;
opal_list_item_t *item;
int dump[128];
/* unpack the jobid this is for */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &job, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(mev);
return;
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output,
"%s routed:base:receive processing msg",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OPAL_THREAD_LOCK(&lock);
/* tag that we are processing the list */
processing = true;
/* clear the file descriptor to stop the event from refiring */
#ifndef __WINDOWS__
read(fd, &dump, sizeof(dump));
#else
recv(fd, (char *) &dump, sizeof(dump), 0);
#endif
while (NULL != (item = opal_list_remove_first(&recvs))) {
msgpkt = (orte_msg_packet_t*)item;
/* unpack the jobid this is for */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &job, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(msgpkt);
continue;
}
/* pass the remainder of the buffer to the active module's
* init_routes API
*/
if (ORTE_SUCCESS != (rc = orte_routed.init_routes(job, msgpkt->buffer))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(msgpkt);
}
/* pass the remainder of the buffer to the active module's
* init_routes API
*/
if (ORTE_SUCCESS != (rc = orte_routed.init_routes(job, mev->buffer))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(mev);
return;
/* reset the event */
processing = false;
opal_event_add(&ready, 0);
/* release the thread */
OPAL_THREAD_UNLOCK(&lock);
}
@ -151,7 +208,7 @@ void orte_routed_base_recv(int status, orte_process_name_t* sender,
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_routed_base_process_msg);
ORTE_PROCESS_MESSAGE(&recvs, &lock, processing, ready_fd[1], true, sender, &buffer);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
@ -163,3 +220,12 @@ void orte_routed_base_recv(int status, orte_process_name_t* sender,
}
return;
}
/* where HNP messages come */
void orte_routed_base_process_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
ORTE_PROCESS_MESSAGE(&recvs, &lock, processing, ready_fd[1], false, &mev->sender, &mev->buffer);
OBJ_RELEASE(mev);
}

Просмотреть файл

@ -27,6 +27,7 @@
#include "orte/mca/rml/rml.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/plm/base/base.h"
#include "orte/mca/routed/base/base.h"
@ -105,3 +106,84 @@ int orte_routed_base_register_sync(bool setup)
return ORTE_SUCCESS;
}
int orte_routed_base_process_callback(orte_jobid_t job, opal_buffer_t *buffer)
{
orte_proc_t *proc;
orte_job_t *jdata;
orte_std_cntr_t cnt;
char *rml_uri;
orte_vpid_t vpid;
int rc;
/* lookup the job object for this process */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
/* unpack the data for each entry */
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &cnt, ORTE_VPID))) {
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
continue;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_binomial:callback got uri %s for job %s rank %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == rml_uri) ? "NULL" : rml_uri,
ORTE_JOBID_PRINT(job), ORTE_VPID_PRINT(vpid)));
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
continue;
}
if (rml_uri == NULL) {
/* if the rml_uri is NULL, then that means this process
* terminated without calling orte_init. However, the only
* reason we would be getting called here is if other
* processes local to that daemon -did- call orte_init.
* This is considered an "abnormal termination" mode per
* community discussion, and must generate a corresponding
* response, so declare the proc abnormally terminated
*/
proc->state = ORTE_PROC_STATE_TERM_WO_SYNC;
/* increment the number of procs that have terminated */
jdata->num_terminated++;
/* let the normal code path declare the job aborted */
orte_plm_base_check_job_completed(jdata);
continue;
}
/* update the record */
proc->rml_uri = strdup(rml_uri);
free(rml_uri);
/* update the proc state */
if (proc->state < ORTE_PROC_STATE_RUNNING) {
proc->state = ORTE_PROC_STATE_RUNNING;
}
++jdata->num_reported;
cnt = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if all procs have reported, update our job state */
if (jdata->num_reported == jdata->num_procs) {
/* update the job state */
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
jdata->state = ORTE_JOB_STATE_RUNNING;
}
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -421,71 +421,6 @@ static orte_process_name_t get_route(orte_process_name_t *target)
return *ret;
}
static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
{
orte_proc_t **procs;
orte_job_t *jdata;
orte_std_cntr_t cnt;
char *rml_uri;
orte_process_name_t name;
int rc;
/* lookup the job object for this process */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
procs = (orte_proc_t**)jdata->procs->addr;
/* unpack the data for each entry */
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_binomial:callback got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == rml_uri) ? "NULL" : rml_uri));
if (rml_uri == NULL) continue;
/* we don't need to set the contact info into our rml
* hash table as we won't talk to the proc directly
*/
/* extract the proc's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* the procs are stored in vpid order, so update the record */
procs[name.vpid]->rml_uri = strdup(rml_uri);
free(rml_uri);
/* update the proc state */
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
}
++jdata->num_reported;
cnt = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if all procs have reported, update our job state */
if (jdata->num_reported == jdata->num_procs) {
/* update the job state */
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
jdata->state = ORTE_JOB_STATE_RUNNING;
}
}
return ORTE_SUCCESS;
}
/* HANDLE ACK MESSAGES FROM AN HNP */
static void release_ack(int fd, short event, void *data)
{
@ -621,7 +556,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
}
} else {
/* if not, then I need to process the callback */
if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) {
if (ORTE_SUCCESS != (rc = orte_routed_base_process_callback(job, ndat))) {
ORTE_ERROR_LOG(rc);
return rc;
}

Просмотреть файл

@ -360,71 +360,6 @@ static orte_process_name_t get_route(orte_process_name_t *target)
return *ret;
}
static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
{
orte_proc_t **procs;
orte_job_t *jdata;
orte_std_cntr_t cnt;
char *rml_uri;
orte_process_name_t name;
int rc;
/* lookup the job object for this process */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
procs = (orte_proc_t**)jdata->procs->addr;
/* unpack the data for each entry */
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_cm:callback got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == rml_uri) ? "NULL" : rml_uri));
if (rml_uri == NULL) continue;
/* we don't need to set the contact info into our rml
* hash table as we won't talk to the proc directly
*/
/* extract the proc's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* the procs are stored in vpid order, so update the record */
procs[name.vpid]->rml_uri = strdup(rml_uri);
free(rml_uri);
/* update the proc state */
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
}
++jdata->num_reported;
cnt = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if all procs have reported, update our job state */
if (jdata->num_reported == jdata->num_procs) {
/* update the job state */
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
jdata->state = ORTE_JOB_STATE_RUNNING;
}
}
return ORTE_SUCCESS;
}
/* HANDLE ACK MESSAGES FROM AN HNP */
static void release_ack(int fd, short event, void *data)
{
@ -571,7 +506,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
}
} else {
/* if not, then I need to process the callback */
if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) {
if (ORTE_SUCCESS != (rc = orte_routed_base_process_callback(job, ndat))) {
ORTE_ERROR_LOG(rc);
return rc;
}

Просмотреть файл

@ -383,71 +383,6 @@ static orte_process_name_t get_route(orte_process_name_t *target)
return *ret;
}
static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
{
orte_proc_t **procs;
orte_job_t *jdata;
orte_std_cntr_t cnt;
char *rml_uri;
orte_process_name_t name;
int rc;
/* lookup the job object for this process */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
procs = (orte_proc_t**)jdata->procs->addr;
/* unpack the data for each entry */
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_linear:callback got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == rml_uri) ? "NULL" : rml_uri));
if (rml_uri == NULL) continue;
/* we don't need to set the contact info into our rml
* hash table as we won't talk to the proc directly
*/
/* extract the proc's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* the procs are stored in vpid order, so update the record */
procs[name.vpid]->rml_uri = strdup(rml_uri);
free(rml_uri);
/* update the proc state */
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
}
++jdata->num_reported;
cnt = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if all procs have reported, update our job state */
if (jdata->num_reported == jdata->num_procs) {
/* update the job state */
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
jdata->state = ORTE_JOB_STATE_RUNNING;
}
}
return ORTE_SUCCESS;
}
/* HANDLE ACK MESSAGES FROM AN HNP */
static void release_ack(int fd, short event, void *data)
{
@ -578,7 +513,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
}
} else {
/* if not, then I need to process the callback */
if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) {
if (ORTE_SUCCESS != (rc = orte_routed_base_process_callback(job, ndat))) {
ORTE_ERROR_LOG(rc);
return rc;
}

Просмотреть файл

@ -415,71 +415,6 @@ found:
return *ret;
}
static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
{
orte_proc_t **procs;
orte_job_t *jdata;
orte_std_cntr_t cnt;
char *rml_uri;
orte_process_name_t name;
int rc;
/* lookup the job object for this process */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
procs = (orte_proc_t**)jdata->procs->addr;
/* unpack the data for each entry */
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_radix:callback got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == rml_uri) ? "NULL" : rml_uri));
if (rml_uri == NULL) continue;
/* we don't need to set the contact info into our rml
* hash table as we won't talk to the proc directly
*/
/* extract the proc's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* the procs are stored in vpid order, so update the record */
procs[name.vpid]->rml_uri = strdup(rml_uri);
free(rml_uri);
/* update the proc state */
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
}
++jdata->num_reported;
cnt = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if all procs have reported, update our job state */
if (jdata->num_reported == jdata->num_procs) {
/* update the job state */
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
jdata->state = ORTE_JOB_STATE_RUNNING;
}
}
return ORTE_SUCCESS;
}
/* HANDLE ACK MESSAGES FROM AN HNP */
static void release_ack(int fd, short event, void *data)
{
@ -610,7 +545,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
}
} else {
/* if not, then I need to process the callback */
if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) {
if (ORTE_SUCCESS != (rc = orte_routed_base_process_callback(job, ndat))) {
ORTE_ERROR_LOG(rc);
return rc;
}

Просмотреть файл

@ -95,8 +95,18 @@ in the application to be terminated by signals sent by %s
#
[orterun:proc-exit-no-sync]
%s has exited due to process rank %lu with PID %lu on
node %s exiting without calling "finalize". This may
have caused other processes in the application to be
node %s exiting improperly. There are two reasons this could occur:
1. this process did not call "init" before exiting, but others in
the job did. This can cause a job to hang indefinitely while it waits
for all processes to call "init". By rule, if one process calls "init",
then ALL processes must call "init" prior to termination.
2. this process called "init", but exited without calling "finalize".
By rule, all processes that call "init" MUST call "finalize" prior to
exiting or it will be considered an "abnormal termination"
This may have caused other processes in the application to be
terminated by signals sent by %s (as reported here).
#
[orterun:proc-exit-no-sync-unknown]