1
1
This commit was SVN r2826.
Этот коммит содержится в:
Ralph Castain 2004-09-23 14:33:28 +00:00
родитель 65fc61e212
Коммит 0cc082780f

Просмотреть файл

@ -28,74 +28,13 @@ static bool ompi_rte_job_started = false;
static bool ompi_rte_job_finished = false;
/*
* Update the registry with an entry for this process.
*/
int ompi_rte_register(void)
{
ompi_buffer_t buffer;
char segment[32];
char *keys[2];
void *addr;
int rc,size;
/* setup keys and segment for this job */
sprintf(segment, "job-%u", mca_oob_name_self.jobid);
keys[0] = ns_base_get_proc_name_string(ompi_process_info.name);
keys[1] = NULL;
if (ompi_rte_debug_flag) {
ompi_output(0, "rte_register: entered for proc %s", keys[0]);
}
/* setup packed buffer of proc info - may expand as needed */
ompi_buffer_init(&buffer, 128);
ompi_pack(buffer, &ompi_process_info.pid, 1, OMPI_INT32);
ompi_pack_string(buffer, ompi_system_info.nodename);
/* peek the buffer and resulting size */
ompi_buffer_get(buffer, &addr, &size);
rc = ompi_registry.put(OMPI_REGISTRY_XAND | OMPI_REGISTRY_OVERWRITE,
segment, keys, addr, size);
if (ompi_rte_debug_flag) {
ompi_output(0, "rte_register: %s %d", keys[0], rc);
}
ompi_buffer_free(buffer);
return rc;
}
/*
* Register process info.
*/
int ompi_rte_unregister(void)
{
char segment[32];
char *keys[2];
int rc;
/* setup keys and segment for this job */
sprintf(segment, "job-%u", mca_oob_name_self.jobid);
keys[0] = ns_base_get_proc_name_string(ompi_process_info.name);
keys[1] = NULL;
rc = ompi_registry.delete_object(OMPI_REGISTRY_XAND, segment, keys);
free(keys[0]);
return rc;
}
/*
* Change state as processes register/unregister. Note that we could save
* the list of registrations - and use the host/pid for cleanup later.
*/
static void ompi_rte_registered(ompi_registry_notify_message_t* match, void* cbdata)
void ompi_rte_all_procs_registered(ompi_registry_notify_message_t* match, void* cbdata)
{
OMPI_THREAD_LOCK(&ompi_rte_mutex);
ompi_rte_job_started = true;
@ -104,7 +43,7 @@ static void ompi_rte_registered(ompi_registry_notify_message_t* match, void* cbd
}
static void ompi_rte_unregistered(ompi_registry_notify_message_t* match, void* cbdata)
void ompi_rte_all_procs_unregistered(ompi_registry_notify_message_t* match, void* cbdata)
{
OMPI_THREAD_LOCK(&ompi_rte_mutex);
ompi_rte_job_finished = true;
@ -113,49 +52,13 @@ static void ompi_rte_unregistered(ompi_registry_notify_message_t* match, void* c
}
int ompi_rte_notify(mca_ns_base_jobid_t jobid, int num_procs)
{
char segment[32];
int rc;
/* setup segment for this job */
sprintf(segment, "job-%d", jobid);
/* register for a callback when all processes on this jobid have
* registered their process info
*/
rc = ompi_registry.synchro(
OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
OMPI_REGISTRY_OR,
segment,
NULL,
num_procs,
ompi_rte_registered,
NULL);
if(rc != OMPI_SUCCESS)
return rc;
/* register for a callback when all processes on this jobid have
* unregistered their process info
*/
rc = ompi_registry.synchro(
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
OMPI_REGISTRY_OR,
segment,
NULL,
0,
ompi_rte_unregistered,
NULL);
return rc;
}
/**
* TSW - This is a temporary solution - that only handles graceful
* shutdown....
*/
int ompi_rte_monitor(void)
int ompi_rte_monitor_procs_registered(void)
{
struct timeval tv;
struct timespec ts;
@ -176,7 +79,13 @@ int ompi_rte_monitor(void)
return OMPI_ERROR;
}
}
OMPI_THREAD_UNLOCK(&ompi_rte_mutex);
return OMPI_SUCCESS;
}
int ompi_rte_monitor_procs_unregistered(void)
{
OMPI_THREAD_LOCK(&ompi_rte_mutex);
/* wait for all processes to complete */
while(ompi_rte_job_finished == false) {
ompi_condition_wait(&ompi_rte_condition, &ompi_rte_mutex);