1
1

Bootproxy daemons should persist after their local children have exited,

waiting instead for the SOH to indicate that the jobid has terminated.

In a scheduled environment, if your program has a section of MPI code
followed by a section of computation that some processes execute while
other proceses terminate normally. This patch keeps the scheduler from 
terminating all of the processes and the allocation if all of the processes 
on an allocated node exit well before other processes on other nodes.

This commit was SVN r7333.
Этот коммит содержится в:
Josh Hursey 2005-09-13 02:37:34 +00:00
родитель 39f25428da
Коммит 8dfcc41efd

Просмотреть файл

@ -59,6 +59,7 @@
#include "mca/rml/rml.h"
#include "mca/soh/soh.h"
#include "mca/rmgr/rmgr.h"
#include "mca/rmgr/base/base.h"
#include "mca/soh/base/base.h"
#include "runtime/runtime.h"
@ -73,7 +74,7 @@ static struct opal_event term_handler;
static struct opal_event int_handler;
static void signal_callback(int fd, short flags, void *arg);
static void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata);
static void orte_daemon_recv(int status, orte_process_name_t* sender,
orte_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata);
@ -345,10 +346,34 @@ int main(int argc, char *argv[])
if (ORTE_SUCCESS != (ret = orte_rmgr.launch(orted_globals.bootproxy))) {
ORTE_ERROR_LOG(ret);
return ret;
}
/* setup the thread lock and condition variable */
OBJ_CONSTRUCT(&orted_globals.mutex, opal_mutex_t);
OBJ_CONSTRUCT(&orted_globals.condition, opal_condition_t);
/* Setup callback on jobid */
ret = orte_rmgr_base_proc_stage_gate_subscribe(orted_globals.bootproxy, job_state_callback, NULL);
if(ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
/* setup and enter the event monitor */
OPAL_THREAD_LOCK(&orted_globals.mutex);
while (false == orted_globals.exit_condition) {
opal_condition_wait(&orted_globals.condition, &orted_globals.mutex);
}
OPAL_THREAD_UNLOCK(&orted_globals.mutex);
/* Finalize and clean up */
if (ORTE_SUCCESS != (ret = orte_finalize())) {
ORTE_ERROR_LOG(ret);
}
exit(ret);
}
@ -370,7 +395,7 @@ int main(int argc, char *argv[])
opal_output(0, "[%lu,%lu,%lu] ompid: issuing callback", ORTE_NAME_ARGS(orte_process_info.my_name));
}
/* register the daemon main callback function */
/* register the daemon main callback function */
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_DAEMON, 0, orte_daemon_recv, NULL);
if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(ret);
@ -504,3 +529,73 @@ static void orte_daemon_recv(int status, orte_process_name_t* sender,
OPAL_THREAD_UNLOCK(&orted_globals.mutex);
return;
}
/* Function callback on jobid state changes.
* This is closely modeled after orte_rmgr_proxy_callback in rmgr_proxy.c
*/
void job_state_callback(orte_gpr_notify_data_t *data, void *cbdata)
{
orte_gpr_value_t **values, *value;
orte_gpr_keyval_t** keyvals;
orte_jobid_t jobid;
size_t i, j, k;
int rc;
/* we made sure in the subscriptions that at least one
* value is always returned
* get the jobid from the segment name in the first value
*/
values = (orte_gpr_value_t**)(data->values)->addr;
if (ORTE_SUCCESS != (rc =
orte_schema.extract_jobid_from_segment_name(&jobid,
values[0]->segment))) {
ORTE_ERROR_LOG(rc);
return;
}
for (i = 0, k = 0; k < (data->cnt) && i < (data->values)->size; ++i) {
if (NULL != values[i]) {
k++;
value = values[i];
/* determine the state change */
keyvals = value->keyvals;
for (j = 0; j < value->cnt; ++j) {
orte_gpr_keyval_t* keyval = keyvals[j];
if(strcmp(keyval->key, ORTE_PROC_NUM_TERMINATED) == 0) {
OPAL_THREAD_LOCK(&orted_globals.mutex);
if (orted_globals.debug) {
opal_output(0, "orted: job_state_callback(jobid = %d, state = ORTE_PROC_STATE_TERMINATED)\n",
jobid);
}
orted_globals.exit_condition = true;
opal_condition_signal(&orted_globals.condition);
OPAL_THREAD_UNLOCK(&orted_globals.mutex);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_ABORTED) == 0) {
OPAL_THREAD_LOCK(&orted_globals.mutex);
if (orted_globals.debug) {
opal_output(0, "orted: job_state_callback(jobid = %d, state = ORTE_PROC_STATE_ABORTED)\n",
jobid);
}
orted_globals.exit_condition = true;
opal_condition_signal(&orted_globals.condition);
OPAL_THREAD_UNLOCK(&orted_globals.mutex);
continue;
}
}
}
}
return;
}