Clean-up a few things - mostly adding a lot of diagnostics to chase down the comm_spawn issue. Believe this may fix that problem, but still checking that for sure. At least now it starts processes correctly again!
This commit was SVN r3738.
Этот коммит содержится в:
родитель
03b9ed1ff5
Коммит
e259750296
@ -269,6 +269,20 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi
|
||||
}
|
||||
|
||||
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
void ompi_comm_shutdown_cbfunc(ompi_registry_notify_message_t* match, void* cbdata)
|
||||
{
|
||||
mca_ns_base_jobid_t *jobid;
|
||||
|
||||
jobid = (mca_ns_base_jobid_t*)cbdata;
|
||||
ompi_rte_job_shutdown(*jobid);
|
||||
free(jobid);
|
||||
OBJ_RELEASE(match);
|
||||
}
|
||||
|
||||
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
/**********************************************************************/
|
||||
@ -283,7 +297,8 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
|
||||
char *tmp, *envvarname, *segment, *my_contact_info;
|
||||
char cwd[MAXPATHLEN];
|
||||
ompi_registry_notify_id_t rc_tag;
|
||||
|
||||
mca_ns_base_jobid_t *jobid;
|
||||
|
||||
/* parse the info object */
|
||||
/* check potentially for:
|
||||
- "host": desired host where to spawn the processes
|
||||
@ -386,14 +401,26 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
|
||||
|
||||
/* register a synchro on the segment so we get notified when everyone registers */
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_LEVEL|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_LEVEL|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_STARTUP,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
maxprocs,
|
||||
ompi_rte_all_procs_registered, NULL);
|
||||
|
||||
|
||||
/* register a synchro on the segment so we get notified when everyone completes */
|
||||
jobid = (mca_ns_base_jobid_t*)malloc(sizeof(mca_ns_base_jobid_t));
|
||||
*jobid = new_jobid;
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
0,
|
||||
ompi_comm_shutdown_cbfunc, (void*)jobid);
|
||||
|
||||
/*
|
||||
* spawn procs
|
||||
*/
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "group/group.h"
|
||||
#include "mca/coll/coll.h"
|
||||
#include "mca/topo/topo.h"
|
||||
#include "mca/gpr/base/base.h"
|
||||
#include "request/request.h"
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
@ -467,6 +468,11 @@ struct ompi_communicator_t {
|
||||
ompi_comm_disconnect_obj *ompi_comm_disconnect_init (ompi_communicator_t *comm);
|
||||
void ompi_comm_disconnect_waitall (int count, ompi_comm_disconnect_obj **objs );
|
||||
|
||||
/* this routine provides a callback function for terminating spawned
|
||||
* processes. It should be removed at some point in the future.
|
||||
*/
|
||||
void ompi_comm_shutdown_cbfunc(ompi_registry_notify_message_t* match, void* cbdata);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -29,17 +29,15 @@ int mca_gpr_base_pack_get_startup_msg(ompi_buffer_t cmd,
|
||||
mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
mca_gpr_cmd_flag_t command;
|
||||
char *jobidstring;
|
||||
|
||||
command = MCA_GPR_GET_STARTUP_MSG_CMD;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
return OMPI_ERROR;
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
jobidstring = ompi_name_server.convert_jobid_to_string(jobid);
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, jobidstring)) {
|
||||
return OMPI_ERROR;
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &jobid, 1, OMPI_JOBID)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -50,17 +48,15 @@ int mca_gpr_base_pack_get_shutdown_msg(ompi_buffer_t cmd,
|
||||
mca_ns_base_jobid_t jobid)
|
||||
{
|
||||
mca_gpr_cmd_flag_t command;
|
||||
char *jobidstring;
|
||||
|
||||
command = MCA_GPR_GET_SHUTDOWN_MSG_CMD;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
return OMPI_ERROR;
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
jobidstring = ompi_name_server.convert_jobid_to_string(jobid);
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, jobidstring)) {
|
||||
return OMPI_ERROR;
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &jobid, 1, OMPI_JOBID)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -71,18 +71,18 @@ typedef uint32_t ompi_registry_notify_id_t;
|
||||
/*
|
||||
* Define synchro mode flags
|
||||
*/
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_NONE (uint8_t)0x00 /**< No synchronization */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING (uint8_t)0x01 /**< Notify when trigger is reached, ascending mode */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING (uint8_t)0x02 /**< Notify when trigger is reached, descending mode */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_LEVEL (uint8_t)0x04 /**< Notify when trigger is reached, regardless of direction */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_GT_EQUAL (uint8_t)0x08 /**< Notify if level greater than or equal */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_LT_EQUAL (uint8_t)0x10 /**< Notify if level less than or equal */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS (uint8_t)0x80 /**< Notify whenever conditions are met */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT (uint8_t)0x81 /**< Fire once, then terminate synchro command */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_STARTUP (uint8_t)0x82 /**< Indicates associated with application startup */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN (uint8_t)0x84 /**< Indicates associated with application shutdown */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_NONE (uint16_t)0x0000 /**< No synchronization */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING (uint16_t)0x0001 /**< Notify when trigger is reached, ascending mode */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING (uint16_t)0x0002 /**< Notify when trigger is reached, descending mode */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_LEVEL (uint16_t)0x0004 /**< Notify when trigger is reached, regardless of direction */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_GT_EQUAL (uint16_t)0x0008 /**< Notify if level greater than or equal */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_LT_EQUAL (uint16_t)0x0010 /**< Notify if level less than or equal */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS (uint16_t)0x0020 /**< Notify whenever conditions are met */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT (uint16_t)0x0040 /**< Fire once, then terminate synchro command */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_STARTUP (uint16_t)0x0080 /**< Indicates associated with application startup */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN (uint16_t)0x0100 /**< Indicates associated with application shutdown */
|
||||
|
||||
typedef uint8_t ompi_registry_synchro_mode_t;
|
||||
typedef uint16_t ompi_registry_synchro_mode_t;
|
||||
|
||||
/** Return value for notify requests
|
||||
*/
|
||||
@ -136,11 +136,11 @@ typedef uint16_t ompi_registry_mode_t;
|
||||
#define MCA_GPR_GET_CMD (uint16_t)0x0100
|
||||
#define MCA_GPR_TEST_INTERNALS_CMD (uint16_t)0x0200
|
||||
#define MCA_GPR_NOTIFY_CMD (uint16_t)0x0400 /**< Indicates a notify message */
|
||||
#define MCA_GPR_DUMP_CMD (uint16_t)0x2000
|
||||
#define MCA_GPR_ASSUME_OWNERSHIP_CMD (uint16_t)0x4000
|
||||
#define MCA_GPR_NOTIFY_ON_CMD (uint16_t)0x8000
|
||||
#define MCA_GPR_NOTIFY_OFF_CMD (uint16_t)0x8001
|
||||
#define MCA_GPR_COMPOUND_CMD (uint16_t)0x8010
|
||||
#define MCA_GPR_DUMP_CMD (uint16_t)0x0800
|
||||
#define MCA_GPR_ASSUME_OWNERSHIP_CMD (uint16_t)0x1000
|
||||
#define MCA_GPR_NOTIFY_ON_CMD (uint16_t)0x2000
|
||||
#define MCA_GPR_NOTIFY_OFF_CMD (uint16_t)0x4000
|
||||
#define MCA_GPR_COMPOUND_CMD (uint16_t)0x8000
|
||||
#define MCA_GPR_GET_STARTUP_MSG_CMD (uint16_t)0x8020
|
||||
#define MCA_GPR_GET_SHUTDOWN_MSG_CMD (uint16_t)0x8040
|
||||
#define MCA_GPR_TRIGGERS_ACTIVE_CMD (uint16_t)0x8080
|
||||
@ -161,7 +161,7 @@ typedef uint16_t mca_gpr_cmd_flag_t;
|
||||
#define MCA_GPR_OOB_PACK_ACTION OMPI_INT16
|
||||
#define MCA_GPR_OOB_PACK_MODE OMPI_INT16
|
||||
#define MCA_GPR_OOB_PACK_OBJECT_SIZE OMPI_INT32
|
||||
#define MCA_GPR_OOB_PACK_SYNCHRO_MODE OMPI_INT8
|
||||
#define MCA_GPR_OOB_PACK_SYNCHRO_MODE OMPI_INT16
|
||||
#define MCA_GPR_OOB_PACK_NOTIFY_ID OMPI_INT32
|
||||
#define MCA_GPR_OOB_PACK_BOOL OMPI_INT8
|
||||
#define MCA_GPR_OOB_PACK_STATUS_KEY OMPI_INT8
|
||||
|
@ -37,26 +37,30 @@ void mca_gpr_proxy_deliver_notify_msg(ompi_registry_notify_action_t state,
|
||||
}
|
||||
}
|
||||
|
||||
/* protect system from threadlock */
|
||||
if ((OMPI_REGISTRY_NOTIFY_ON_STARTUP & state) ||
|
||||
(OMPI_REGISTRY_NOTIFY_ON_SHUTDOWN & state)) {
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
namelen = strlen(message->segment);
|
||||
|
||||
/* find the request corresponding to this notify */
|
||||
for (trackptr = (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
|
||||
trackptr != (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker);
|
||||
trackptr = (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
|
||||
if ((trackptr->action & state) &&
|
||||
(0 == strcmp(message->segment, trackptr->segment))) {
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
/* process request - callback function responsible for releasing memory */
|
||||
trackptr->callback(message, trackptr->user_tag);
|
||||
return;
|
||||
}
|
||||
/* don't deliver messages with zero data in them */
|
||||
if (0 < ompi_list_get_size(&message->data)) {
|
||||
|
||||
/* protect system from threadlock */
|
||||
if ((OMPI_REGISTRY_NOTIFY_ON_STARTUP & state) ||
|
||||
(OMPI_REGISTRY_NOTIFY_ON_SHUTDOWN & state)) {
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
|
||||
|
||||
namelen = strlen(message->segment);
|
||||
|
||||
/* find the request corresponding to this notify */
|
||||
for (trackptr = (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
|
||||
trackptr != (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker);
|
||||
trackptr = (mca_gpr_proxy_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
|
||||
if ((trackptr->action & state) &&
|
||||
(0 == strcmp(message->segment, trackptr->segment))) {
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||
/* process request - callback function responsible for releasing memory */
|
||||
trackptr->callback(message, trackptr->user_tag);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(message);
|
||||
}
|
||||
|
@ -82,6 +82,11 @@ void mca_gpr_replica_cleanup_proc_nl(bool purge, ompi_process_name_t *proc)
|
||||
char *procname;
|
||||
mca_ns_base_jobid_t jobid;
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] gpr_replica_cleanup_proc: function entered",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
}
|
||||
|
||||
procname = ompi_name_server.get_proc_name_string(proc);
|
||||
jobid = ompi_name_server.get_jobid(proc);
|
||||
|
||||
@ -95,13 +100,25 @@ void mca_gpr_replica_cleanup_proc_nl(bool purge, ompi_process_name_t *proc)
|
||||
/* adjust any startup synchro and/or shutdown synchros owned
|
||||
* by the associated jobid by one.
|
||||
*/
|
||||
for (trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_first(&seg->triggers);
|
||||
trig != (mca_gpr_replica_trigger_list_t*)ompi_list_get_end(&seg->triggers);
|
||||
trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_next(trig)) {
|
||||
if ((OMPI_REGISTRY_SYNCHRO_MODE_STARTUP & trig->synch_mode) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN & trig->synch_mode)) {
|
||||
trig->count--;
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] gpr_replica_cleanup_proc: adjusting synchros for segment %s",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()), seg->name);
|
||||
}
|
||||
|
||||
for (trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_first(&seg->triggers);
|
||||
trig != (mca_gpr_replica_trigger_list_t*)ompi_list_get_end(&seg->triggers);
|
||||
trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_next(trig)) {
|
||||
if ((OMPI_REGISTRY_SYNCHRO_MODE_STARTUP & trig->synch_mode) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN & trig->synch_mode)) {
|
||||
if (mca_gpr_replica_debug) {
|
||||
if (OMPI_REGISTRY_SYNCHRO_MODE_STARTUP & trig->synch_mode) {
|
||||
ompi_output(0, "\tadjusting startup synchro");
|
||||
} else {
|
||||
ompi_output(0, "\tadjusting shutdown synchro");
|
||||
}
|
||||
}
|
||||
trig->count--;
|
||||
}
|
||||
}
|
||||
mca_gpr_replica_check_synchros(seg);
|
||||
}
|
||||
|
@ -28,27 +28,32 @@ void mca_gpr_replica_deliver_notify_msg(ompi_registry_notify_action_t state,
|
||||
mca_gpr_replica_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_replica_segment_t *seg;
|
||||
|
||||
/* protect system from threadlock */
|
||||
if ((OMPI_REGISTRY_NOTIFY_ON_STARTUP & state) ||
|
||||
(OMPI_REGISTRY_NOTIFY_ON_SHUTDOWN & state)) {
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
namelen = strlen(message->segment);
|
||||
|
||||
/* find the request corresponding to this notify */
|
||||
for (trackptr = (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr = (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
|
||||
seg = trackptr->segptr;
|
||||
if ((trackptr->action & state) &&
|
||||
(0 == strncmp(message->segment, seg->name, namelen))) {
|
||||
/* process request - callback function responsible for releasing memory */
|
||||
trackptr->callback(message, trackptr->user_tag);
|
||||
/* don't deliver messages with zero data */
|
||||
if (0 < ompi_list_get_size(&message->data)) {
|
||||
/* protect system from threadlock */
|
||||
if ((OMPI_REGISTRY_NOTIFY_ON_STARTUP & state) ||
|
||||
(OMPI_REGISTRY_NOTIFY_ON_SHUTDOWN & state)) {
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
namelen = strlen(message->segment);
|
||||
|
||||
/* find the request corresponding to this notify */
|
||||
for (trackptr = (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr = (mca_gpr_replica_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
|
||||
seg = trackptr->segptr;
|
||||
if ((trackptr->action & state) &&
|
||||
(0 == strncmp(message->segment, seg->name, namelen))) {
|
||||
/* process request - callback function responsible for releasing memory */
|
||||
trackptr->callback(message, trackptr->user_tag);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
}
|
||||
}
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
}
|
||||
OBJ_RELEASE(message);
|
||||
|
||||
}
|
||||
|
@ -1148,7 +1148,6 @@ static void mca_gpr_replica_recv_dump_cmd(ompi_buffer_t answer)
|
||||
|
||||
static void mca_gpr_replica_recv_get_startup_msg_cmd(ompi_buffer_t buffer, ompi_buffer_t answer)
|
||||
{
|
||||
char *jobidstring=NULL;
|
||||
mca_ns_base_jobid_t jobid=0;
|
||||
ompi_list_t *recipients=NULL;
|
||||
ompi_buffer_t msg;
|
||||
@ -1156,12 +1155,10 @@ static void mca_gpr_replica_recv_get_startup_msg_cmd(ompi_buffer_t buffer, ompi_
|
||||
void *addr=NULL;
|
||||
int32_t size=0, num_recipients=0, i=0;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack_string(buffer, &jobidstring)) {
|
||||
return;
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &jobid, 1, OMPI_JOBID)) {
|
||||
return;
|
||||
}
|
||||
|
||||
jobid = ompi_name_server.convert_string_to_jobid(jobidstring);
|
||||
|
||||
recipients = OBJ_NEW(ompi_list_t);
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
@ -1191,7 +1188,6 @@ static void mca_gpr_replica_recv_get_startup_msg_cmd(ompi_buffer_t buffer, ompi_
|
||||
|
||||
static void mca_gpr_replica_recv_get_shutdown_msg_cmd(ompi_buffer_t buffer, ompi_buffer_t answer)
|
||||
{
|
||||
char *jobidstring=NULL;
|
||||
mca_ns_base_jobid_t jobid=0;
|
||||
ompi_list_t *recipients=NULL;
|
||||
ompi_buffer_t msg;
|
||||
@ -1199,14 +1195,12 @@ static void mca_gpr_replica_recv_get_shutdown_msg_cmd(ompi_buffer_t buffer, ompi
|
||||
void *addr=NULL;
|
||||
int32_t size=0, num_recipients=0, i=0;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack_string(buffer, &jobidstring)) {
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &jobid, 1, OMPI_JOBID)) {
|
||||
ompi_output(0, "[%d,%d,%d] recv_get_shutdown_msg: failed to unpack jobidstring",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
return;
|
||||
}
|
||||
|
||||
jobid = ompi_name_server.convert_string_to_jobid(jobidstring);
|
||||
|
||||
recipients = OBJ_NEW(ompi_list_t);
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
|
@ -46,16 +46,12 @@ int mca_oob_xcast(
|
||||
int rc;
|
||||
int tag = MCA_OOB_TAG_XCAST;
|
||||
|
||||
ompi_output(0, "[%d,%d,%d] xcast: entered function", OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
|
||||
/* check to see if I am the root process name */
|
||||
if(NULL != root &&
|
||||
0 == ompi_name_server.compare(OMPI_NS_CMP_ALL, root, ompi_rte_get_self())) {
|
||||
for (ptr = (ompi_name_server_namelist_t*)ompi_list_get_first(peers);
|
||||
ptr != (ompi_name_server_namelist_t*)ompi_list_get_end(peers);
|
||||
ptr = (ompi_name_server_namelist_t*)ompi_list_get_next(ptr)) {
|
||||
ompi_output(0, "[%d,%d,%d] xcast: sending message to [%d,%d,%d]",
|
||||
OMPI_NAME_ARGS(*ompi_rte_get_self()), *(ptr->name));
|
||||
rc = mca_oob_send_packed(ptr->name, buffer, tag, 0);
|
||||
if(rc < 0) {
|
||||
return rc;
|
||||
@ -67,7 +63,6 @@ int mca_oob_xcast(
|
||||
if(rc < 0) {
|
||||
return rc;
|
||||
}
|
||||
ompi_output(0, "[%d,%d,%d] xcast: got message", OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||
if(cbfunc != NULL)
|
||||
cbfunc(rc, root, rbuf, tag, NULL);
|
||||
ompi_buffer_free(rbuf);
|
||||
|
@ -90,7 +90,8 @@ mca_pcmclient_singleton_init_cleanup(void)
|
||||
|
||||
/* register a synchro on the segment so we get notified for startup */
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_LEVEL|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_LEVEL|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_STARTUP,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
@ -99,7 +100,8 @@ mca_pcmclient_singleton_init_cleanup(void)
|
||||
|
||||
/* register a synchro on the segment so we get notified on shutdown */
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
|
@ -87,11 +87,11 @@ int ompi_rte_set_process_status(ompi_rte_process_status_t *status,
|
||||
segment, tokens, addr, size);
|
||||
|
||||
if ((OMPI_PROC_STOPPED == status->status_key) ||
|
||||
(OMPI_PROC_KILLED == status->status_key) ||
|
||||
(OMPI_PROC_EXITED == status->status_key)) {
|
||||
ompi_registry.cleanup_process(true, proc); /* purge subscriptions */
|
||||
(OMPI_PROC_KILLED == status->status_key) ||
|
||||
(OMPI_PROC_EXITED == status->status_key)) {
|
||||
ompi_registry.cleanup_process(true, proc); /* purge subscriptions */
|
||||
} else if (OMPI_PROC_TERMINATING == status->status_key) {
|
||||
ompi_registry.cleanup_process(false, proc); /* just cleanup - don't purge subs */
|
||||
ompi_registry.cleanup_process(false, proc); /* just cleanup - don't purge subs */
|
||||
}
|
||||
|
||||
/* cleanup */
|
||||
|
@ -295,7 +295,8 @@ main(int argc, char *argv[])
|
||||
|
||||
/* register a synchro on the segment so we get notified when everyone registers */
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_LEVEL|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_LEVEL|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_STARTUP,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
@ -304,7 +305,8 @@ main(int argc, char *argv[])
|
||||
/* register a synchro on the segment so we get notified when everyone is gone
|
||||
*/
|
||||
rc_tag = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT|
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_SHUTDOWN,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user