diff --git a/src/runtime/ompi_rte_monitor.c b/src/runtime/ompi_rte_monitor.c index 7b65b476a7..a7ff5f3708 100644 --- a/src/runtime/ompi_rte_monitor.c +++ b/src/runtime/ompi_rte_monitor.c @@ -28,74 +28,13 @@ static bool ompi_rte_job_started = false; static bool ompi_rte_job_finished = false; -/* - * Update the registry with an entry for this process. - */ - -int ompi_rte_register(void) -{ - ompi_buffer_t buffer; - char segment[32]; - char *keys[2]; - void *addr; - int rc,size; - - /* setup keys and segment for this job */ - sprintf(segment, "job-%u", mca_oob_name_self.jobid); - keys[0] = ns_base_get_proc_name_string(ompi_process_info.name); - keys[1] = NULL; - - if (ompi_rte_debug_flag) { - ompi_output(0, "rte_register: entered for proc %s", keys[0]); - } - - /* setup packed buffer of proc info - may expand as needed */ - ompi_buffer_init(&buffer, 128); - ompi_pack(buffer, &ompi_process_info.pid, 1, OMPI_INT32); - ompi_pack_string(buffer, ompi_system_info.nodename); - - /* peek the buffer and resulting size */ - ompi_buffer_get(buffer, &addr, &size); - - rc = ompi_registry.put(OMPI_REGISTRY_XAND | OMPI_REGISTRY_OVERWRITE, - segment, keys, addr, size); - - if (ompi_rte_debug_flag) { - ompi_output(0, "rte_register: %s %d", keys[0], rc); - } - ompi_buffer_free(buffer); - return rc; -} - - -/* - * Register process info. - */ - -int ompi_rte_unregister(void) -{ - char segment[32]; - char *keys[2]; - int rc; - - /* setup keys and segment for this job */ - sprintf(segment, "job-%u", mca_oob_name_self.jobid); - keys[0] = ns_base_get_proc_name_string(ompi_process_info.name); - keys[1] = NULL; - - rc = ompi_registry.delete_object(OMPI_REGISTRY_XAND, segment, keys); - free(keys[0]); - return rc; -} - - /* * Change state as processes register/unregister. Note that we could save * the list of registrations - and use the host/pid for cleanup later. */ -static void ompi_rte_registered(ompi_registry_notify_message_t* match, void* cbdata) +void ompi_rte_all_procs_registered(ompi_registry_notify_message_t* match, void* cbdata) { OMPI_THREAD_LOCK(&ompi_rte_mutex); ompi_rte_job_started = true; @@ -104,7 +43,7 @@ static void ompi_rte_registered(ompi_registry_notify_message_t* match, void* cbd } -static void ompi_rte_unregistered(ompi_registry_notify_message_t* match, void* cbdata) +void ompi_rte_all_procs_unregistered(ompi_registry_notify_message_t* match, void* cbdata) { OMPI_THREAD_LOCK(&ompi_rte_mutex); ompi_rte_job_finished = true; @@ -113,49 +52,13 @@ static void ompi_rte_unregistered(ompi_registry_notify_message_t* match, void* c } -int ompi_rte_notify(mca_ns_base_jobid_t jobid, int num_procs) -{ - char segment[32]; - int rc; - - /* setup segment for this job */ - sprintf(segment, "job-%d", jobid); - - /* register for a callback when all processes on this jobid have - * registered their process info - */ - rc = ompi_registry.synchro( - OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT, - OMPI_REGISTRY_OR, - segment, - NULL, - num_procs, - ompi_rte_registered, - NULL); - if(rc != OMPI_SUCCESS) - return rc; - - /* register for a callback when all processes on this jobid have - * unregistered their process info - */ - rc = ompi_registry.synchro( - OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT, - OMPI_REGISTRY_OR, - segment, - NULL, - 0, - ompi_rte_unregistered, - NULL); - return rc; -} - /** * TSW - This is a temporary solution - that only handles graceful * shutdown.... */ -int ompi_rte_monitor(void) +int ompi_rte_monitor_procs_registered(void) { struct timeval tv; struct timespec ts; @@ -176,7 +79,13 @@ int ompi_rte_monitor(void) return OMPI_ERROR; } } + OMPI_THREAD_UNLOCK(&ompi_rte_mutex); + return OMPI_SUCCESS; +} +int ompi_rte_monitor_procs_unregistered(void) +{ + OMPI_THREAD_LOCK(&ompi_rte_mutex); /* wait for all processes to complete */ while(ompi_rte_job_finished == false) { ompi_condition_wait(&ompi_rte_condition, &ompi_rte_mutex);