Well, we are getting closer to resolving the comm_spawn problem. For the benefit of those that haven't been in the midst of this discussion, the problem is that this is the first case where the process starting a set of processes has not been mpirun and is not guaranteed to be alive throughout the lifetime of the spawned processes. This sounds simple, but actually has some profound impacts.
Most of this checkin consists of more debugging stuff. Hopefully, you won't see any printf's that aren't protected by debug flags - if you do, let me know and I'll take them out with my apologies. Outside of debugging, the biggest change was a revamp of the shutdown process. For several reasons, we had chosen to have all processes "wait" for a shutdown message before exiting. This message is typically generated by mpirun, but in the case of comm_spawn we needed to do something else. We have decided that the best way to solve this problem is to: (a) replace the shutdown message (which needed to be generated by somebody - usually mpirun) with an oob_barrier call. This still requires that the rank 0 process be alive. However, we terminate all processes if one abnormally terminates anyway, so this isn't a problem (with the standard or our implementation); and (b) have the state-of-health monitoring subsystem issue the call to cleanup the job from the registry. Since the state-of-health subsystem isn't available yet, we have temporarily assigned that responsibility to the rank 0 process. Once the state-of-health subsystem is available, we will have it monitor the job for all-processes-complete and then it can tell the registry to cleanup the job (i.e., remove all data relating to this job). Hope that helps a little. I'll put all this into the design docs soon. This commit was SVN r3754.
Этот коммит содержится в:
родитель
8749d0fe39
Коммит
d21c0027df
@ -269,20 +269,6 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi
|
||||
}
|
||||
|
||||
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
void ompi_comm_shutdown_cbfunc(ompi_registry_notify_message_t* match, void* cbdata)
|
||||
{
|
||||
mca_ns_base_jobid_t *jobid;
|
||||
|
||||
jobid = (mca_ns_base_jobid_t*)cbdata;
|
||||
ompi_rte_job_shutdown(*jobid);
|
||||
free(jobid);
|
||||
OBJ_RELEASE(match);
|
||||
}
|
||||
|
||||
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
@ -297,7 +283,6 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
|
||||
char *tmp, *envvarname, *segment, *my_contact_info;
|
||||
char cwd[MAXPATHLEN];
|
||||
ompi_registry_notify_id_t rc_tag;
|
||||
mca_ns_base_jobid_t *jobid;
|
||||
|
||||
/* parse the info object */
|
||||
/* check potentially for:
|
||||
@ -393,7 +378,7 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
|
||||
|
||||
|
||||
/*
|
||||
* register to monitor the startup and shutdown processes
|
||||
* register to monitor the startup
|
||||
*/
|
||||
/* setup segment for this job */
|
||||
asprintf(&segment, "%s-%s", OMPI_RTE_JOB_STATUS_SEGMENT,
|
||||
@ -408,19 +393,7 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
|
||||
NULL,
|
||||
maxprocs,
|
||||
ompi_rte_all_procs_registered, NULL);
|
||||
|
||||
/* register a synchro on the segment so we get notified when everyone completes */
|
||||
jobid = (mca_ns_base_jobid_t*)malloc(sizeof(mca_ns_base_jobid_t));
|
||||
*jobid = new_jobid;
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
0,
|
||||
ompi_comm_shutdown_cbfunc, (void*)jobid);
|
||||
|
||||
|
||||
/*
|
||||
* spawn procs
|
||||
*/
|
||||
@ -438,8 +411,6 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
|
||||
/*
|
||||
* tell processes okay to start by sending startup msg
|
||||
*/
|
||||
ompi_output(0, "[%d,%d,%d] comm_spawn: sending start message",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
ompi_rte_job_startup(new_jobid);
|
||||
|
||||
/*
|
||||
|
@ -468,11 +468,6 @@ struct ompi_communicator_t {
|
||||
ompi_comm_disconnect_obj *ompi_comm_disconnect_init (ompi_communicator_t *comm);
|
||||
void ompi_comm_disconnect_waitall (int count, ompi_comm_disconnect_obj **objs );
|
||||
|
||||
/* this routine provides a callback function for terminating spawned
|
||||
* processes. It should be removed at some point in the future.
|
||||
*/
|
||||
void ompi_comm_shutdown_cbfunc(ompi_registry_notify_message_t* match, void* cbdata);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -230,7 +230,7 @@ static void mca_base_modex_registry_callback(
|
||||
* Lookup the process.
|
||||
*/
|
||||
ompi_unpack(buffer, &proc_name, 1, OMPI_NAME);
|
||||
|
||||
|
||||
proc = ompi_proc_find_and_add(&proc_name, &isnew);
|
||||
|
||||
if(NULL == proc)
|
||||
@ -310,6 +310,7 @@ static void mca_base_modex_registry_callback(
|
||||
mca_pml.pml_add_procs(new_procs, new_proc_count);
|
||||
free(new_procs);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -38,8 +38,8 @@ libmca_gpr_base_la_SOURCES = \
|
||||
gpr_base_unpack_mode_ops.c \
|
||||
gpr_base_pack_put_get.c \
|
||||
gpr_base_unpack_put_get.c \
|
||||
gpr_base_pack_startup_shutdown_msg.c \
|
||||
gpr_base_unpack_startup_shutdown_msg.c \
|
||||
gpr_base_pack_startup_msg.c \
|
||||
gpr_base_unpack_startup_msg.c \
|
||||
gpr_base_pack_subscribe.c \
|
||||
gpr_base_unpack_subscribe.c \
|
||||
gpr_base_pack_synchro.c \
|
||||
|
@ -171,9 +171,6 @@ extern "C" {
|
||||
OMPI_DECLSPEC int mca_gpr_base_pack_get_startup_msg(ompi_buffer_t cmd, mca_ns_base_jobid_t jobid);
|
||||
OMPI_DECLSPEC ompi_buffer_t mca_gpr_base_unpack_get_startup_msg(ompi_buffer_t buffer, ompi_list_t *recipients);
|
||||
|
||||
OMPI_DECLSPEC int mca_gpr_base_pack_get_shutdown_msg(ompi_buffer_t cmd, mca_ns_base_jobid_t jobid);
|
||||
ompi_buffer_t mca_gpr_base_unpack_get_shutdown_msg(ompi_buffer_t buffer, ompi_list_t *recipients);
|
||||
|
||||
OMPI_DECLSPEC int mca_gpr_base_pack_triggers_active_cmd(ompi_buffer_t cmd, mca_ns_base_jobid_t jobid);
|
||||
OMPI_DECLSPEC int mca_gpr_base_unpack_triggers_active_cmd(ompi_buffer_t cmd);
|
||||
|
||||
|
@ -42,22 +42,3 @@ int mca_gpr_base_pack_get_startup_msg(ompi_buffer_t cmd,
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mca_gpr_base_pack_get_shutdown_msg(ompi_buffer_t cmd,
|
||||
mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
mca_gpr_cmd_flag_t command;
|
||||
|
||||
command = MCA_GPR_GET_SHUTDOWN_MSG_CMD;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &jobid, 1, OMPI_JOBID)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
@ -66,46 +66,3 @@ mca_gpr_base_unpack_get_startup_msg(ompi_buffer_t buffer,
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
||||
ompi_buffer_t
|
||||
mca_gpr_base_unpack_get_shutdown_msg(ompi_buffer_t buffer,
|
||||
ompi_list_t *recipients)
|
||||
{
|
||||
mca_gpr_cmd_flag_t command;
|
||||
int32_t num_recipients, i;
|
||||
ompi_process_name_t proc;
|
||||
ompi_name_server_namelist_t *peer;
|
||||
ompi_buffer_t msg;
|
||||
void *addr;
|
||||
int size;
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(buffer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_GET_SHUTDOWN_MSG_CMD != command)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_recipients, 1, OMPI_INT32)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (i=0; i<num_recipients; i++) {
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &proc, 1, OMPI_NAME)) {
|
||||
return NULL;
|
||||
}
|
||||
peer = OBJ_NEW(ompi_name_server_namelist_t);
|
||||
peer->name = ompi_name_server.copy_process_name(&proc);;
|
||||
ompi_list_append(recipients, &peer->item);
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&msg, 0)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ompi_buffer_get(buffer, &addr, &size);
|
||||
if (0 < size) {
|
||||
ompi_pack(msg, addr, size, OMPI_BYTE);
|
||||
}
|
||||
|
||||
return msg;
|
||||
}
|
@ -31,7 +31,7 @@ int mca_gpr_base_unpack_synchro(ompi_buffer_t buffer, ompi_registry_notify_id_t
|
||||
mca_gpr_cmd_flag_t command;
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(buffer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_SUBSCRIBE_CMD != command)) {
|
||||
|| (MCA_GPR_SYNCHRO_CMD != command)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
@ -49,7 +49,7 @@ int mca_gpr_base_unpack_cancel_synchro(ompi_buffer_t buffer)
|
||||
int32_t response;
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(buffer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_UNSUBSCRIBE_CMD != command)) {
|
||||
|| (MCA_GPR_CANCEL_SYNCHRO_CMD != command)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
@ -80,7 +80,6 @@ typedef uint32_t ompi_registry_notify_id_t;
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS (uint16_t)0x0020 /**< Notify whenever conditions are met */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT (uint16_t)0x0040 /**< Fire once, then terminate synchro command */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_STARTUP (uint16_t)0x0080 /**< Indicates associated with application startup */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN (uint16_t)0x0100 /**< Indicates associated with application shutdown */
|
||||
|
||||
typedef uint16_t ompi_registry_synchro_mode_t;
|
||||
|
||||
@ -434,28 +433,6 @@ typedef void (*mca_gpr_base_module_triggers_inactive_fn_t)(mca_ns_base_jobid_t j
|
||||
typedef ompi_buffer_t (*mca_gpr_base_module_get_startup_msg_fn_t)(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients);
|
||||
|
||||
/*
|
||||
* Get the job shutdown message.
|
||||
* Upon completing, each process waits for a final synchronizing message to arrive. This ensures
|
||||
* that process all exit together and prevents, for example, "hangs" as one process tries to talk
|
||||
* to another that has completed. Not much data should need to be shared during this operation, but
|
||||
* this function provides an entry point in case something is identified.
|
||||
*
|
||||
* @param jobid The id of the job being shutdown.
|
||||
* @param recipients A list of process names for the recipients - the input parameter
|
||||
* is a pointer to the list; the function returns the list in that location.
|
||||
*
|
||||
* @retval msg A packed buffer containing the required information. At the moment, this will be an
|
||||
* empty buffer as no information has yet been identified.
|
||||
*
|
||||
* @code
|
||||
* msg_buffer = ompi_registry.get_shutdown_msg(jobid, recipients);
|
||||
* @endcode
|
||||
*
|
||||
*/
|
||||
typedef ompi_buffer_t (*mca_gpr_base_module_get_shutdown_msg_fn_t)(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients);
|
||||
|
||||
/* Cleanup a job from the registry
|
||||
* Remove all references to a given job from the registry. This includes removing
|
||||
* all segments "owned" by the job, and removing all process names from dictionaries
|
||||
@ -853,7 +830,6 @@ struct mca_gpr_base_module_1_0_0_t {
|
||||
mca_gpr_base_module_triggers_active_fn_t triggers_active;
|
||||
mca_gpr_base_module_triggers_inactive_fn_t triggers_inactive;
|
||||
mca_gpr_base_module_get_startup_msg_fn_t get_startup_msg;
|
||||
mca_gpr_base_module_get_shutdown_msg_fn_t get_shutdown_msg;
|
||||
mca_gpr_base_module_cleanup_job_fn_t cleanup_job;
|
||||
mca_gpr_base_module_cleanup_proc_fn_t cleanup_process;
|
||||
mca_gpr_base_module_deliver_notify_msg_fn_t deliver_notify_msg;
|
||||
|
@ -174,14 +174,11 @@ ompi_list_t* mca_gpr_proxy_test_internals(int level);
|
||||
|
||||
|
||||
/*
|
||||
* Startup/shutdown functions
|
||||
* Startup functions
|
||||
*/
|
||||
ompi_buffer_t mca_gpr_proxy_get_startup_msg(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients);
|
||||
|
||||
ompi_buffer_t mca_gpr_proxy_get_shutdown_msg(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients);
|
||||
|
||||
|
||||
/*
|
||||
* Functions that interface to the replica
|
||||
|
@ -75,7 +75,6 @@ static mca_gpr_base_module_t mca_gpr_proxy = {
|
||||
mca_gpr_proxy_triggers_active,
|
||||
mca_gpr_proxy_triggers_inactive,
|
||||
mca_gpr_proxy_get_startup_msg,
|
||||
mca_gpr_proxy_get_shutdown_msg,
|
||||
mca_gpr_proxy_cleanup_job,
|
||||
mca_gpr_proxy_cleanup_proc,
|
||||
mca_gpr_proxy_deliver_notify_msg
|
||||
@ -336,22 +335,23 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender,
|
||||
/* find the request corresponding to this notify */
|
||||
found = false;
|
||||
for (trackptr = (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
|
||||
trackptr != (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) && !found;
|
||||
trackptr != (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker);
|
||||
trackptr = (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
|
||||
if (mca_gpr_proxy_debug) {
|
||||
ompi_output(0, "\tchecking idtag %d for segment %s\n", trackptr->local_idtag, trackptr->segment);
|
||||
}
|
||||
if (trackptr->local_idtag == id_tag) {
|
||||
found = true;
|
||||
}
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
if (!found) { /* didn't find request */
|
||||
ompi_output(0, "[%d,%d,%d] Proxy notification error - received request not found",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
return;
|
||||
ompi_output(0, "[%d,%d,%d] Proxy notification error - received request not found",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
return;
|
||||
}
|
||||
|
||||
/* process request */
|
||||
|
@ -49,6 +49,11 @@ mca_gpr_proxy_enter_notify_request(char *segment,
|
||||
}
|
||||
ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item);
|
||||
|
||||
if (mca_gpr_proxy_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] enter_notify_request: tracker created for segment %s action %X idtag %d",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()), segment, action, trackptr->local_idtag);
|
||||
}
|
||||
|
||||
return trackptr->local_idtag;
|
||||
}
|
||||
|
||||
@ -78,6 +83,11 @@ mca_gpr_proxy_remove_notify_request(ompi_registry_notify_id_t local_idtag)
|
||||
ptr_free_id->id_tag = trackptr->local_idtag;
|
||||
ompi_list_append(&mca_gpr_proxy_free_notify_id_tags, &ptr_free_id->item);
|
||||
|
||||
if (mca_gpr_proxy_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] remove_notify_request: tracker removed for segment %s action %X idtag %d",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()), trackptr->segment, trackptr->action, local_idtag);
|
||||
}
|
||||
|
||||
/* release tracker item */
|
||||
OBJ_RELEASE(trackptr);
|
||||
|
||||
|
@ -27,22 +27,11 @@ void mca_gpr_proxy_deliver_notify_msg(ompi_registry_notify_action_t state,
|
||||
int namelen;
|
||||
mca_gpr_proxy_notify_request_tracker_t *trackptr;
|
||||
|
||||
if (mca_gpr_proxy_debug) {
|
||||
if (OMPI_REGISTRY_NOTIFY_ON_STARTUP == state) {
|
||||
ompi_output(0, "[%d,%d,%d] special delivery of startup msg",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
} else {
|
||||
ompi_output(0, "[%d,%d,%d] special delivery of shutdown msg",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
}
|
||||
}
|
||||
|
||||
/* don't deliver messages with zero data in them */
|
||||
if (0 < ompi_list_get_size(&message->data)) {
|
||||
|
||||
/* protect system from threadlock */
|
||||
if ((OMPI_REGISTRY_NOTIFY_ON_STARTUP & state) ||
|
||||
(OMPI_REGISTRY_NOTIFY_ON_SHUTDOWN & state)) {
|
||||
if (OMPI_REGISTRY_NOTIFY_ON_STARTUP & state) {
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
@ -60,6 +49,7 @@ void mca_gpr_proxy_deliver_notify_msg(ompi_registry_notify_action_t state,
|
||||
return;
|
||||
}
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(message);
|
||||
|
@ -38,37 +38,37 @@ mca_gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
|
||||
|
||||
if (mca_gpr_proxy_compound_cmd_mode) {
|
||||
if (OMPI_SUCCESS != mca_gpr_base_pack_synchro(mca_gpr_proxy_compound_cmd,
|
||||
synchro_mode,
|
||||
mode, segment, tokens, trigger)) {
|
||||
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
|
||||
/* store callback function and user_tag in local list for lookup */
|
||||
/* generate id_tag to send to replica to identify lookup entry */
|
||||
|
||||
idtag = mca_gpr_proxy_enter_notify_request(segment, OMPI_REGISTRY_NOTIFY_NONE, cb_func, user_tag);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(mca_gpr_proxy_compound_cmd, &idtag, 1, OMPI_INT32)) {
|
||||
mca_gpr_proxy_remove_notify_request(idtag);
|
||||
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
return idtag;
|
||||
if (OMPI_SUCCESS != mca_gpr_base_pack_synchro(mca_gpr_proxy_compound_cmd,
|
||||
synchro_mode,
|
||||
mode, segment, tokens, trigger)) {
|
||||
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
|
||||
/* store callback function and user_tag in local list for lookup */
|
||||
/* generate id_tag to send to replica to identify lookup entry */
|
||||
|
||||
idtag = mca_gpr_proxy_enter_notify_request(segment, OMPI_REGISTRY_NOTIFY_NONE, cb_func, user_tag);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(mca_gpr_proxy_compound_cmd, &idtag, 1, OMPI_INT32)) {
|
||||
mca_gpr_proxy_remove_notify_request(idtag);
|
||||
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
return idtag;
|
||||
}
|
||||
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
|
||||
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
response = OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
|
||||
if (OMPI_SUCCESS != mca_gpr_base_pack_synchro(cmd, synchro_mode, mode, segment, tokens, trigger)) {
|
||||
goto CLEANUP;
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
|
||||
@ -80,28 +80,28 @@ mca_gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &idtag, 1, OMPI_INT32)) {
|
||||
goto CLEANUP;
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
|
||||
goto CLEANUP;
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
|
||||
goto CLEANUP;
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != mca_gpr_base_unpack_synchro(answer, &remote_idtag)) { /* error on replica */
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
|
||||
mca_gpr_proxy_remove_notify_request(idtag);
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
response = OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
|
||||
mca_gpr_proxy_remove_notify_request(idtag);
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
response = OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||
|
||||
} else {
|
||||
response = idtag;
|
||||
mca_gpr_proxy_set_remote_idtag(idtag, remote_idtag);
|
||||
response = idtag;
|
||||
mca_gpr_proxy_set_remote_idtag(idtag, remote_idtag);
|
||||
}
|
||||
|
||||
ompi_buffer_free(answer);
|
||||
|
@ -70,56 +70,3 @@ ompi_buffer_t mca_gpr_proxy_get_startup_msg(mca_ns_base_jobid_t jobid,
|
||||
ompi_buffer_free(cmd);
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
||||
ompi_buffer_t mca_gpr_proxy_get_shutdown_msg(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients)
|
||||
{
|
||||
ompi_buffer_t msg, cmd, answer;
|
||||
int recv_tag=MCA_OOB_TAG_GPR;
|
||||
|
||||
if (mca_gpr_proxy_compound_cmd_mode) {
|
||||
if (mca_gpr_proxy_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] gpr_proxy: getting shutdown msg - compound cmd",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
}
|
||||
|
||||
mca_gpr_base_pack_get_shutdown_msg(mca_gpr_proxy_compound_cmd, jobid);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
msg = NULL;
|
||||
|
||||
if (mca_gpr_proxy_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] gpr_proxy: getting shutdown msg",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != mca_gpr_base_pack_get_shutdown_msg(cmd, jobid)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
msg = mca_gpr_base_unpack_get_shutdown_msg(answer, recipients);
|
||||
ompi_buffer_free(answer);
|
||||
|
||||
if (mca_gpr_proxy_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] gpr_proxy: got shutdown msg for %d recipients",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()), ompi_list_get_size(recipients));
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
ompi_buffer_free(cmd);
|
||||
return msg;
|
||||
}
|
||||
|
@ -410,18 +410,14 @@ void mca_gpr_replica_deliver_notify_msg(ompi_registry_notify_action_t state,
|
||||
ompi_list_t* mca_gpr_replica_test_internals(int level);
|
||||
|
||||
/*
|
||||
* Startup/shutdown functions
|
||||
* Startup functions
|
||||
*/
|
||||
ompi_buffer_t mca_gpr_replica_get_startup_msg(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients);
|
||||
|
||||
ompi_buffer_t mca_gpr_replica_get_shutdown_msg(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients);
|
||||
|
||||
ompi_buffer_t
|
||||
mca_gpr_replica_construct_startup_shutdown_msg_nl(int mode,
|
||||
mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients);
|
||||
mca_gpr_replica_construct_startup_msg_nl(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients);
|
||||
|
||||
/*
|
||||
* Functions that interface to the proxy, but aren't available outside the gpr subsystem
|
||||
|
@ -97,7 +97,7 @@ void mca_gpr_replica_cleanup_proc_nl(bool purge, ompi_process_name_t *proc)
|
||||
seg = (mca_gpr_replica_segment_t*)ompi_list_get_next(seg)) {
|
||||
|
||||
if (jobid == seg->owning_job) {
|
||||
/* adjust any startup synchro and/or shutdown synchros owned
|
||||
/* adjust any startup synchro synchros owned
|
||||
* by the associated jobid by one.
|
||||
*/
|
||||
if (mca_gpr_replica_debug) {
|
||||
@ -108,14 +108,9 @@ void mca_gpr_replica_cleanup_proc_nl(bool purge, ompi_process_name_t *proc)
|
||||
for (trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_first(&seg->triggers);
|
||||
trig != (mca_gpr_replica_trigger_list_t*)ompi_list_get_end(&seg->triggers);
|
||||
trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_next(trig)) {
|
||||
if ((OMPI_REGISTRY_SYNCHRO_MODE_STARTUP & trig->synch_mode) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN & trig->synch_mode)) {
|
||||
if (OMPI_REGISTRY_SYNCHRO_MODE_STARTUP & trig->synch_mode) {
|
||||
if (mca_gpr_replica_debug) {
|
||||
if (OMPI_REGISTRY_SYNCHRO_MODE_STARTUP & trig->synch_mode) {
|
||||
ompi_output(0, "\tadjusting startup synchro");
|
||||
} else {
|
||||
ompi_output(0, "\tadjusting shutdown synchro");
|
||||
}
|
||||
ompi_output(0, "\tadjusting startup synchro");
|
||||
}
|
||||
trig->count--;
|
||||
}
|
||||
|
@ -74,7 +74,6 @@ static mca_gpr_base_module_t mca_gpr_replica = {
|
||||
mca_gpr_replica_triggers_active,
|
||||
mca_gpr_replica_triggers_inactive,
|
||||
mca_gpr_replica_get_startup_msg,
|
||||
mca_gpr_replica_get_shutdown_msg,
|
||||
mca_gpr_replica_cleanup_job,
|
||||
mca_gpr_replica_cleanup_proc,
|
||||
mca_gpr_replica_deliver_notify_msg
|
||||
|
@ -203,9 +203,6 @@ void mca_gpr_replica_dump_nl(ompi_buffer_t buffer)
|
||||
if (OMPI_REGISTRY_SYNCHRO_MODE_STARTUP & trig->synch_mode) {
|
||||
ompi_pack_string(buffer, "\t\tOMPI_REGISTRY_SYNCHRO_MODE_STARTUP");
|
||||
}
|
||||
if (OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN & trig->synch_mode) {
|
||||
ompi_pack_string(buffer, "\t\tOMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN");
|
||||
}
|
||||
asprintf(&tmp_out, "\tTrigger level: %d\tCurrent count: %d", trig->trigger, trig->count);
|
||||
mca_gpr_replica_dump_load_string(buffer, tmp_out);
|
||||
asprintf(&tmp_out, "\tTransition status: %d", trig->above_below);
|
||||
|
@ -206,10 +206,10 @@ bool mca_gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg,
|
||||
for (trackptr = (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr = (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
|
||||
if (trackptr->local_idtag == trig->local_idtag) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
if (trackptr->local_idtag == trig->local_idtag) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found) { /* didn't find request */
|
||||
|
@ -45,9 +45,10 @@ void mca_gpr_replica_deliver_notify_msg(ompi_registry_notify_action_t state,
|
||||
seg = trackptr->segptr;
|
||||
if ((trackptr->action & state) &&
|
||||
(0 == strncmp(message->segment, seg->name, namelen))) {
|
||||
/* process request - callback function responsible for releasing memory */
|
||||
trackptr->callback(message, trackptr->user_tag);
|
||||
return;
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
/* process request - callback function responsible for releasing memory */
|
||||
trackptr->callback(message, trackptr->user_tag);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,6 @@ static ompi_registry_notify_id_t mca_gpr_replica_recv_synchro_cmd(ompi_process_n
|
||||
static int32_t mca_gpr_replica_recv_cancel_synchro_cmd(ompi_buffer_t buffer);
|
||||
static void mca_gpr_replica_recv_dump_cmd(ompi_buffer_t answer);
|
||||
static void mca_gpr_replica_recv_get_startup_msg_cmd(ompi_buffer_t buffer, ompi_buffer_t answer);
|
||||
static void mca_gpr_replica_recv_get_shutdown_msg_cmd(ompi_buffer_t buffer, ompi_buffer_t answer);
|
||||
static void mca_gpr_replica_recv_triggers_active_cmd(ompi_buffer_t buffer);
|
||||
static void mca_gpr_replica_recv_triggers_inactive_cmd(ompi_buffer_t buffer);
|
||||
static void mca_gpr_replica_recv_cleanup_job_cmd(ompi_buffer_t buffer);
|
||||
@ -420,17 +419,6 @@ ompi_buffer_t mca_gpr_replica_process_command_buffer(ompi_buffer_t buffer,
|
||||
|
||||
|
||||
|
||||
case MCA_GPR_GET_SHUTDOWN_MSG_CMD: /***** GET SHUTDOWN MSG *****/
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "\tget shutdown msg cmd");
|
||||
}
|
||||
|
||||
mca_gpr_replica_recv_get_shutdown_msg_cmd(buffer, answer);
|
||||
break;
|
||||
|
||||
|
||||
|
||||
case MCA_GPR_TRIGGERS_ACTIVE_CMD: /***** TRIGGERS ACTIVE *****/
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
@ -1163,7 +1151,7 @@ static void mca_gpr_replica_recv_get_startup_msg_cmd(ompi_buffer_t buffer, ompi_
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
msg = mca_gpr_replica_construct_startup_shutdown_msg_nl(OMPI_STARTUP_DETECTED, jobid, recipients);
|
||||
msg = mca_gpr_replica_construct_startup_msg_nl(jobid, recipients);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
@ -1186,46 +1174,6 @@ static void mca_gpr_replica_recv_get_startup_msg_cmd(ompi_buffer_t buffer, ompi_
|
||||
}
|
||||
|
||||
|
||||
static void mca_gpr_replica_recv_get_shutdown_msg_cmd(ompi_buffer_t buffer, ompi_buffer_t answer)
|
||||
{
|
||||
mca_ns_base_jobid_t jobid=0;
|
||||
ompi_list_t *recipients=NULL;
|
||||
ompi_buffer_t msg;
|
||||
ompi_name_server_namelist_t *recip=NULL;
|
||||
void *addr=NULL;
|
||||
int32_t size=0, num_recipients=0, i=0;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &jobid, 1, OMPI_JOBID)) {
|
||||
ompi_output(0, "[%d,%d,%d] recv_get_shutdown_msg: failed to unpack jobidstring",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
return;
|
||||
}
|
||||
|
||||
recipients = OBJ_NEW(ompi_list_t);
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
msg = mca_gpr_replica_construct_startup_shutdown_msg_nl(OMPI_SHUTDOWN_DETECTED, jobid, recipients);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
num_recipients = (int32_t)ompi_list_get_size(recipients);
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &num_recipients, 1, OMPI_INT32)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (i=0; i<num_recipients; i++) {
|
||||
recip = (ompi_name_server_namelist_t*)ompi_list_remove_first(recipients);
|
||||
ompi_pack(answer, recip->name, 1, OMPI_NAME);
|
||||
OBJ_RELEASE(recip);
|
||||
}
|
||||
|
||||
ompi_buffer_get(msg, &addr, &size);
|
||||
|
||||
ompi_pack(answer, &size, 1, OMPI_INT32);
|
||||
ompi_pack(answer, &addr, size, OMPI_BYTE);
|
||||
}
|
||||
|
||||
static void mca_gpr_replica_recv_triggers_active_cmd(ompi_buffer_t cmd)
|
||||
{
|
||||
mca_ns_base_jobid_t jobid=0;
|
||||
|
@ -69,7 +69,7 @@ ompi_buffer_t mca_gpr_replica_get_startup_msg(mca_ns_base_jobid_t jobid,
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
msg = mca_gpr_replica_construct_startup_shutdown_msg_nl(OMPI_STARTUP_DETECTED, jobid, recipients);
|
||||
msg = mca_gpr_replica_construct_startup_msg_nl(jobid, recipients);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
@ -78,30 +78,8 @@ ompi_buffer_t mca_gpr_replica_get_startup_msg(mca_ns_base_jobid_t jobid,
|
||||
|
||||
|
||||
ompi_buffer_t
|
||||
mca_gpr_replica_get_shutdown_msg(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients)
|
||||
{
|
||||
ompi_buffer_t msg;
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] entered get_shutdown_msg",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
}
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
msg = mca_gpr_replica_construct_startup_shutdown_msg_nl(OMPI_SHUTDOWN_DETECTED, jobid, recipients);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
||||
ompi_buffer_t
|
||||
mca_gpr_replica_construct_startup_shutdown_msg_nl(int mode,
|
||||
mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients)
|
||||
mca_gpr_replica_construct_startup_msg_nl(mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *recipients)
|
||||
{
|
||||
mca_gpr_replica_segment_t *seg=NULL, *proc_stat_seg;
|
||||
mca_gpr_replica_key_t *keys;
|
||||
@ -121,7 +99,7 @@ mca_gpr_replica_construct_startup_shutdown_msg_nl(int mode,
|
||||
size_t bufsize;
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] entered construct_startup_shutdown_msg for job %d",
|
||||
ompi_output(0, "[%d,%d,%d] entered construct_startup_msg for job %d",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()), (int)jobid);
|
||||
}
|
||||
|
||||
@ -157,13 +135,11 @@ mca_gpr_replica_construct_startup_shutdown_msg_nl(int mode,
|
||||
) {
|
||||
next_trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_next(trig);
|
||||
|
||||
if (((OMPI_REGISTRY_NOTIFY_ON_STARTUP & trig->action) && (OMPI_STARTUP_DETECTED == mode)) ||
|
||||
((OMPI_REGISTRY_NOTIFY_ON_SHUTDOWN & trig->action) && (OMPI_SHUTDOWN_DETECTED == mode))) {
|
||||
if (OMPI_REGISTRY_NOTIFY_ON_STARTUP & trig->action) {
|
||||
|
||||
/* see if data is requested - only one trig has to ask for it */
|
||||
if (((OMPI_REGISTRY_NOTIFY_INCLUDE_STARTUP_DATA & trig->action) && (OMPI_STARTUP_DETECTED == mode)) ||
|
||||
((OMPI_REGISTRY_NOTIFY_INCLUDE_SHUTDOWN_DATA & trig->action) && (OMPI_SHUTDOWN_DETECTED == mode))) {
|
||||
include_data = true;
|
||||
if (OMPI_REGISTRY_NOTIFY_INCLUDE_STARTUP_DATA & trig->action) {
|
||||
include_data = true;
|
||||
}
|
||||
|
||||
/***** if notify_one_shot is set, need to remove subscription from system */
|
||||
@ -247,7 +223,7 @@ mca_gpr_replica_construct_startup_shutdown_msg_nl(int mode,
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_buffer_size(msg, &bufsize);
|
||||
ompi_output(0, "[%d,%d,%d] built startup_shutdown_msg of length %d with %d recipients",
|
||||
ompi_output(0, "[%d,%d,%d] built startup_msg of length %d with %d recipients",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()), bufsize, (int)ompi_list_get_size(recipients));
|
||||
for (peer = (ompi_name_server_namelist_t*)ompi_list_get_first(recipients);
|
||||
peer != (ompi_name_server_namelist_t*)ompi_list_get_end(recipients);
|
||||
|
@ -31,7 +31,6 @@
|
||||
#include "include/types.h"
|
||||
#include "include/constants.h"
|
||||
#include "mca/ns/ns.h"
|
||||
#include "mca/gpr/gpr.h"
|
||||
#include "mca/gpr/base/base.h"
|
||||
#include "runtime/runtime.h"
|
||||
|
||||
@ -62,12 +61,6 @@ proc_registered_cb(ompi_registry_notify_message_t *match,
|
||||
ompi_rte_job_startup(mca_pcmclient_singleton_procs[0].jobid);
|
||||
}
|
||||
|
||||
static void
|
||||
proc_unregistered_cb(ompi_registry_notify_message_t *match,
|
||||
void *cbdata)
|
||||
{
|
||||
ompi_rte_job_shutdown(mca_pcmclient_singleton_procs[0].jobid);
|
||||
}
|
||||
|
||||
int
|
||||
mca_pcmclient_singleton_init_cleanup(void)
|
||||
@ -98,17 +91,6 @@ mca_pcmclient_singleton_init_cleanup(void)
|
||||
1,
|
||||
proc_registered_cb, NULL);
|
||||
|
||||
/* register a synchro on the segment so we get notified on shutdown */
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
0,
|
||||
proc_unregistered_cb, NULL);
|
||||
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -52,6 +52,8 @@ int ompi_mpi_finalize(void)
|
||||
{
|
||||
int ret;
|
||||
ompi_rte_process_status_t my_status;
|
||||
int my_rank;
|
||||
mca_ns_base_jobid_t my_jobid;
|
||||
|
||||
ompi_mpi_finalized = true;
|
||||
#if OMPI_HAVE_THREADS == 0
|
||||
@ -62,7 +64,8 @@ int ompi_mpi_finalize(void)
|
||||
ompi_registry.begin_compound_cmd();
|
||||
|
||||
/* Set process status to "terminating"*/
|
||||
my_status.rank = mca_ns_base_get_vpid(ompi_rte_get_self());
|
||||
my_rank = ompi_comm_rank(&ompi_mpi_comm_world);
|
||||
my_status.rank = (int32_t)my_rank;
|
||||
my_status.local_pid = (int32_t)ompi_process_info.pid;
|
||||
my_status.nodename = strdup(ompi_system_info.nodename);
|
||||
my_status.status_key = OMPI_PROC_TERMINATING;
|
||||
@ -72,18 +75,22 @@ int ompi_mpi_finalize(void)
|
||||
}
|
||||
|
||||
/* execute the compound command - no return data requested
|
||||
* we'll get it through the shutdown message
|
||||
*/
|
||||
ompi_registry.exec_compound_cmd(OMPI_REGISTRY_NO_RETURN_REQUESTED);
|
||||
|
||||
/* wait for all processes to reach same state */
|
||||
if (OMPI_SUCCESS != (ret = ompi_rte_wait_shutdown_msg())) {
|
||||
if (ompi_rte_debug_flag) {
|
||||
ompi_output(0, "mpi_finalize: gave up waiting for other processes to complete");
|
||||
}
|
||||
}
|
||||
|
||||
mca_oob_barrier();
|
||||
|
||||
/* need the following code to cleanup the job in the registry.
|
||||
* once the state-of-health monitoring system is available, we will
|
||||
* have that system perform this function. until then, we will have the
|
||||
* rank 0 process do it.
|
||||
*/
|
||||
if (0 == my_rank) {
|
||||
my_jobid = ompi_name_server.get_jobid(ompi_rte_get_self());
|
||||
ompi_rte_job_shutdown(my_jobid);
|
||||
}
|
||||
|
||||
/* Shut down any bindings-specific issues: C++, F77, F90 (may or
|
||||
may not be necessary...?) */
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
*/
|
||||
/** @file:
|
||||
*
|
||||
* The Open MPI general purpose registry - support functions.
|
||||
* Shutdown a job and cleanup the registry
|
||||
*
|
||||
*/
|
||||
|
||||
@ -23,37 +23,15 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "mca/oob/oob.h"
|
||||
#include "mca/oob/base/base.h"
|
||||
#include "mca/ns/base/base.h"
|
||||
#include "mca/gpr/base/base.h"
|
||||
|
||||
#include "runtime/runtime.h"
|
||||
|
||||
int ompi_rte_job_shutdown(mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
ompi_list_t *recipients;
|
||||
ompi_buffer_t shutdown_msg;
|
||||
int return_code;
|
||||
|
||||
recipients = OBJ_NEW(ompi_list_t);
|
||||
|
||||
shutdown_msg = ompi_registry.get_shutdown_msg(jobid, recipients);
|
||||
ompi_registry.triggers_inactive(jobid);
|
||||
|
||||
/* check to ensure there are recipients on list - error if not */
|
||||
if (0 < ompi_list_get_size(recipients)) {
|
||||
mca_oob_xcast(ompi_rte_get_self(), recipients, shutdown_msg, NULL);
|
||||
return_code = OMPI_SUCCESS;
|
||||
} else {
|
||||
if (ompi_rte_debug_flag) {
|
||||
ompi_output(0, "[%d,%d,%d] job_shutdown: no recipients for message",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
}
|
||||
return_code = OMPI_ERROR;
|
||||
}
|
||||
|
||||
ompi_registry.cleanup_job(jobid);
|
||||
OBJ_RELEASE(recipients);
|
||||
|
||||
return return_code;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -39,8 +39,8 @@ int ompi_rte_job_startup(mca_ns_base_jobid_t jobid)
|
||||
int num_procs;
|
||||
|
||||
if (ompi_rte_debug_flag) {
|
||||
ompi_output(0, "[%d,%d,%d] entered rte_job_startup",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
ompi_output(0, "[%d,%d,%d] entered rte_job_startup",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
}
|
||||
|
||||
recipients = OBJ_NEW(ompi_list_t);
|
||||
@ -49,9 +49,9 @@ int ompi_rte_job_startup(mca_ns_base_jobid_t jobid)
|
||||
ompi_registry.triggers_active(jobid);
|
||||
|
||||
if (ompi_rte_debug_flag) {
|
||||
ompi_output(0, "[%d,%d,%d] rte_job_startup: sending startup message to %d recipients",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()),
|
||||
ompi_list_get_size(recipients));
|
||||
ompi_output(0, "[%d,%d,%d] rte_job_startup: sending startup message to %d recipients",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()),
|
||||
ompi_list_get_size(recipients));
|
||||
}
|
||||
|
||||
/* check to ensure there are recipients on list - don't send if not */
|
||||
|
@ -27,14 +27,6 @@ void
|
||||
ompi_rte_decode_startup_msg(int status, ompi_process_name_t *peer,
|
||||
ompi_buffer_t msg, int tag, void *cbdata);
|
||||
|
||||
void
|
||||
ompi_rte_decode_shutdown_msg(int status, ompi_process_name_t *peer,
|
||||
ompi_buffer_t msg, int tag, void *cbdata);
|
||||
|
||||
static void
|
||||
ompi_rte_decode_startup_shutdown_msg(ompi_registry_notify_action_t state,
|
||||
int status, ompi_process_name_t *peer,
|
||||
ompi_buffer_t msg, int tag, void *cbdata);
|
||||
|
||||
/*
|
||||
* Main functions
|
||||
@ -46,33 +38,9 @@ int ompi_rte_wait_startup_msg(void)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ompi_rte_decode_startup_msg(int status, ompi_process_name_t *peer,
|
||||
ompi_buffer_t msg, int tag, void *cbdata)
|
||||
{
|
||||
ompi_rte_decode_startup_shutdown_msg(OMPI_REGISTRY_NOTIFY_ON_STARTUP,
|
||||
status, peer, msg, tag, cbdata);
|
||||
}
|
||||
|
||||
|
||||
int ompi_rte_wait_shutdown_msg(void)
|
||||
{
|
||||
return mca_oob_xcast(NULL, NULL, NULL, ompi_rte_decode_shutdown_msg);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ompi_rte_decode_shutdown_msg(int status, ompi_process_name_t *peer,
|
||||
ompi_buffer_t msg, int tag, void *cbdata)
|
||||
{
|
||||
ompi_rte_decode_startup_shutdown_msg(OMPI_REGISTRY_NOTIFY_ON_SHUTDOWN,
|
||||
status, peer, msg, tag, cbdata);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Unpack the startup/shutdown message.
|
||||
* When a startup/shutdown message is received, it contains data objects from
|
||||
* Unpack the startup message.
|
||||
* When a startup message is received, it contains data objects from
|
||||
* several pre-defined registry segments. This includes OOB contact info,
|
||||
* PTL contact info, and other things. Each of these subsystems has a
|
||||
* callback function that is used to receive updates from the registry
|
||||
@ -81,10 +49,9 @@ ompi_rte_decode_shutdown_msg(int status, ompi_process_name_t *peer,
|
||||
* callback function as if it came directly from the registry.
|
||||
*/
|
||||
|
||||
static void
|
||||
ompi_rte_decode_startup_shutdown_msg(ompi_registry_notify_action_t state,
|
||||
int status, ompi_process_name_t *peer,
|
||||
ompi_buffer_t msg, int tag, void *cbdata)
|
||||
void
|
||||
ompi_rte_decode_startup_msg(int status, ompi_process_name_t *peer,
|
||||
ompi_buffer_t msg, int tag, void *cbdata)
|
||||
{
|
||||
char *segment;
|
||||
ompi_registry_notify_message_t *notify_msg;
|
||||
@ -94,13 +61,8 @@ ompi_rte_decode_startup_shutdown_msg(ompi_registry_notify_action_t state,
|
||||
int32_t num_objects, i;
|
||||
|
||||
if (ompi_rte_debug_flag) {
|
||||
if (OMPI_REGISTRY_NOTIFY_ON_STARTUP == state) {
|
||||
ompi_output(0, "[%d,%d,%d] decoding startup msg",
|
||||
ompi_output(0, "[%d,%d,%d] decoding startup msg",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
} else {
|
||||
ompi_output(0, "[%d,%d,%d] decoding shutdown msg",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
}
|
||||
}
|
||||
|
||||
while (0 < ompi_unpack_string(msg, &segment)) {
|
||||
@ -136,7 +98,7 @@ ompi_rte_decode_startup_shutdown_msg(ompi_registry_notify_action_t state,
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()), segment, (int)num_objects);
|
||||
}
|
||||
|
||||
ompi_registry.deliver_notify_msg(state, notify_msg);
|
||||
ompi_registry.deliver_notify_msg(OMPI_REGISTRY_NOTIFY_ON_STARTUP, notify_msg);
|
||||
}
|
||||
|
||||
free(segment);
|
||||
|
@ -44,7 +44,7 @@
|
||||
#define OMPI_RTE_VM_STATUS_SEGMENT "ompi-vm-status"
|
||||
#define OMPI_RTE_CELL_STATUS_SEGMENT "ompi-cell-status"
|
||||
#define OMPI_RTE_SCHED_SEGMENT "ompi-sched"
|
||||
#define OMPI_RTE_MODEX_SEGMENT "ompi_modex"
|
||||
#define OMPI_RTE_MODEX_SEGMENT "ompi-modex"
|
||||
|
||||
/* constants for spawn constraints */
|
||||
|
||||
@ -353,12 +353,6 @@ OMPI_DECLSPEC ompi_rte_vm_status_t *ompi_rte_unpack_vm_status(ompi_registry_va
|
||||
|
||||
OMPI_DECLSPEC int ompi_rte_wait_startup_msg(void);
|
||||
|
||||
/**
|
||||
* Hold for shutdown message to arrive, then decode it.
|
||||
*/
|
||||
|
||||
OMPI_DECLSPEC int ompi_rte_wait_shutdown_msg(void);
|
||||
|
||||
/**
|
||||
* Change state as processes complete registration/unregistration
|
||||
*/
|
||||
|
@ -305,8 +305,8 @@ main(int argc, char *argv[])
|
||||
/* register a synchro on the segment so we get notified when everyone is gone
|
||||
*/
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN,
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_STARTUP,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
@ -331,7 +331,6 @@ main(int argc, char *argv[])
|
||||
} else {
|
||||
ompi_rte_job_startup(new_jobid);
|
||||
ompi_rte_monitor_procs_unregistered();
|
||||
ompi_rte_job_shutdown(new_jobid);
|
||||
}
|
||||
/*
|
||||
* - ompi_rte_kill_job()
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user