1
1

Bring in the code for routing xcast stage gate messages via the local orteds. This code is inactive unless you specifically request it via an mca param oob_xcast_mode (can be set to "linear" or "direct"). Direct mode is the old standard method where we send messages directly to each MPI process. Linear mode sends the xcast message via the orteds, with the HNP sending the message to each orted directly.

There is a binomial algorithm in the code (i.e., the HNP would send to a subset of the orteds, which then relay it on according to the typical log-2 algo), but that has a bug in it so the code won't let you select it even if you tried (and the mca param doesn't show, so you'd *really* have to try).

This also involved a slight change to the oob.xcast API, so propagated that as required.

Note: this has *only* been tested on rsh, SLURM, and Bproc environments (now that it has been transferred to the OMPI trunk, I'll need to re-test it [only done rsh so far]). It should work fine on any environment that uses the ORTE daemons - anywhere else, you are on your own... :-)

Also, correct a mistake where the orte_debug_flag was declared an int, but the mca param was set as a bool. Move the storage for that flag to the orte/runtime/params.c and orte/runtime/params.h files appropriately.

This commit was SVN r14475.
Этот коммит содержится в:
Ralph Castain 2007-04-23 18:41:04 +00:00
родитель 009be1c1b5
Коммит 18b2dca51c
30 изменённых файлов: 654 добавлений и 186 удалений

Просмотреть файл

@ -485,7 +485,7 @@ int mca_pml_ob1_ft_event( int state )
return ret;
}
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, true,
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid,
NULL, orte_gpr.deliver_notify_msg))) {
opal_output(0,
"pml:ob1: ft_event(Restart): Stage Gate 1 Failed %d",
@ -510,7 +510,7 @@ int mca_pml_ob1_ft_event( int state )
return ret;
}
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false,
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid,
NULL, orte_gpr.deliver_notify_msg))) {
opal_output(0,"pml:ob1: ft_event(Restart): Stage Gate 1 Failed %d",
ret);

Просмотреть файл

@ -156,7 +156,7 @@ int ompi_mpi_finalize(void)
/*
* Wait for everyone to get here
*/
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false,
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(ret);
return ret;
@ -308,7 +308,7 @@ int ompi_mpi_finalize(void)
* the RTE while the smr is trying to do the update - which causes
* an ugly race condition
*/
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false,
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(ret);
return ret;

Просмотреть файл

@ -52,6 +52,7 @@
#include "orte/mca/schema/schema.h"
#include "orte/mca/smr/smr.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/runtime/params.h"
#include "ompi/constants.h"
#include "ompi/mpi/f77/constants.h"
@ -555,7 +556,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
* process the STG1 message prior to sending it along the xcast chain
* as this message contains all the oob contact info we need!
*/
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, true,
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(ret);
error = "ompi_mpi_init: failed to see all procs register\n";
@ -665,7 +666,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
/* Second barrier -- wait for message from
RMGR_PROC_STAGE_GATE_MGR to arrive */
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false,
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(ret);
error = "ompi_mpi_init: failed to see all procs register\n";

Просмотреть файл

@ -97,7 +97,7 @@ int orte_odls_base_open(void)
mca_base_param_reg_int_name("odls_base", "sigkill_timeout",
"Time to wait for a process to die after issuing a kill signal to it",
false, false, 1, &orte_odls_globals.timeout_before_sigkill);
/* register the daemon cmd data type */
tmp = ORTE_DAEMON_CMD;
if (ORTE_SUCCESS != (rc = orte_dss.register_type(orte_odls_pack_daemon_cmd,

Просмотреть файл

@ -534,6 +534,8 @@ orte_odls_bproc_launch_local_procs(orte_gpr_notify_data_t *data, char **base_env
bool connect_stdin;
orte_jobid_t jobid;
int cycle = 0;
char *job_str=NULL, *vpid_str, *uri_file, *my_uri=NULL, *session_dir=NULL;
FILE *fp;
/* first, retrieve the job number we are to launch from the
* returned data - we can extract the jobid directly from the
@ -634,6 +636,36 @@ orte_odls_bproc_launch_local_procs(orte_gpr_notify_data_t *data, char **base_env
goto cleanup;
}
/* record my uri in a file within the session directory so the child can contact me */
/* get the session dir for this proc */
orte_ns.convert_vpid_to_string(&vpid_str, child->name->vpid);
if (ORTE_SUCCESS != (rc = orte_session_dir(true, NULL, NULL, NULL,
NULL, NULL, job_str, vpid_str))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* get the session dir name so we can put the file there */
if (ORTE_SUCCESS != (rc = orte_session_dir_get_name(&session_dir, NULL, NULL, NULL,
NULL, NULL, NULL, job_str, vpid_str))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
free(vpid_str);
/* create the file and put my uri into it */
uri_file = opal_os_path(false, session_dir, "orted-uri.txt", NULL);
fp = fopen(uri_file, "w");
if (NULL == fp) {
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
rc = ORTE_ERR_FILE_OPEN_FAILURE;
goto cleanup;
}
fprintf(fp, "%s\n", my_uri);
fclose(fp);
free(uri_file);
cycle++;
}

Просмотреть файл

@ -59,6 +59,7 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba
int orte_odls_default_kill_local_procs(orte_jobid_t job, bool set_state);
int orte_odls_default_signal_local_procs(const orte_process_name_t *proc,
int32_t signal);
int orte_odls_default_deliver_message(orte_jobid_t job, orte_buffer_t *buffer, orte_rml_tag_t tag);
/**
* ODLS Default globals

Просмотреть файл

@ -82,6 +82,7 @@
#include "orte/util/univ_info.h"
#include "orte/util/session_dir.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/params.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/errmgr/base/base.h"
#include "orte/mca/iof/iof.h"
@ -113,7 +114,8 @@ orte_odls_base_module_t orte_odls_default_module = {
orte_odls_default_get_add_procs_data,
orte_odls_default_launch_local_procs,
orte_odls_default_kill_local_procs,
orte_odls_default_signal_local_procs
orte_odls_default_signal_local_procs,
orte_odls_default_deliver_message
};
/* this entire function gets called within a GPR compound command,
@ -171,8 +173,8 @@ int orte_odls_default_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb
sub.cnt = 2;
sub.values = values;
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_TOKENS_OR, segment,
num_glob_keys, 1))) {
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
segment, num_glob_keys, 1))) {
ORTE_ERROR_LOG(rc);
free(segment);
free(sub.name);
@ -760,6 +762,13 @@ static int odls_default_fork_local_proc(
opal_unsetenv(param, &environ_copy);
free(param);
/* pass my contact info to the local proc so we can talk */
uri = orte_rml.get_uri();
param = mca_base_param_environ_variable("orte","local_daemon","uri");
opal_setenv(param, uri, true, &environ_copy);
free(param);
free(uri);
/* setup yield schedule and processor affinity
* We default here to always setting the affinity processor if we want
* it. The processor affinity system then determines
@ -959,6 +968,8 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba
bool oversubscribed=false, want_processor, *bptr, override_oversubscribed=false;
opal_list_item_t *item, *item2;
orte_filem_base_request_t *filem_request;
char *job_str, *uri_file, *my_uri, *session_dir=NULL;
FILE *fp;
/* parse the returned data to create the required structures
* for a fork launch. Since the data will contain information
@ -1107,6 +1118,41 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba
} /* for j */
}
/* record my uri in a file within the session directory so the local proc
* can contact me
*/
opal_output(orte_odls_globals.output, "odls: dropping local uri file");
/* put the file in the job session dir for the job being launched */
orte_ns.convert_jobid_to_string(&job_str, job);
if (ORTE_SUCCESS != (rc = orte_session_dir(true, NULL, NULL, NULL,
NULL, NULL, job_str, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* get the session dir name so we can put the file there */
if (ORTE_SUCCESS != (rc = orte_session_dir_get_name(&session_dir, NULL, NULL, NULL,
NULL, NULL, NULL, job_str, NULL))) {
ORTE_ERROR_LOG(rc);
free(job_str);
return rc;
}
free(job_str);
/* create the file and put my uri into it */
uri_file = opal_os_path(false, session_dir, "orted-uri.txt", NULL);
fp = fopen(uri_file, "w");
if (NULL == fp) {
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
return ORTE_ERR_FILE_OPEN_FAILURE;
}
my_uri = orte_rml.get_uri();
fprintf(fp, "%s\n", my_uri);
fclose(fp);
free(uri_file);
free(my_uri);
/* Now we preload any files that are needed. This is done on a per
* app context basis */
for (item = opal_list_get_first(&app_context_list);
@ -1380,6 +1426,43 @@ int orte_odls_default_signal_local_procs(const orte_process_name_t *proc, int32_
}
int orte_odls_default_deliver_message(orte_jobid_t job, orte_buffer_t *buffer, orte_rml_tag_t tag)
{
int rc;
opal_list_item_t *item;
orte_odls_child_t *child;
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_default.mutex);
for (item = opal_list_get_first(&orte_odls_default.children);
item != opal_list_get_end(&orte_odls_default.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* do we have a child from the specified job. Because the
* job could be given as a WILDCARD value, we must use
* the dss.compare function to check for equality.
*/
if (ORTE_EQUAL != orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) {
continue;
}
opal_output(orte_odls_globals.output, "odls: sending message to child [%ld, %ld, %ld]",
ORTE_NAME_ARGS(child->name));
/* if so, send the message */
rc = orte_rml.send_buffer(child->name, buffer, tag, 0);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
}
}
opal_condition_signal(&orte_odls_default.cond);
OPAL_THREAD_UNLOCK(&orte_odls_default.mutex);
return ORTE_SUCCESS;
}
static void set_handler_default(int sig)
{
struct sigaction act;

Просмотреть файл

@ -31,9 +31,11 @@
#include "opal/mca/mca.h"
#include "opal/class/opal_list.h"
#include "orte/dss/dss_types.h"
#include "orte/mca/gpr/gpr_types.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/odls/odls_types.h"
@ -77,6 +79,12 @@ typedef int (*orte_odls_base_module_kill_local_processes_fn_t)(orte_jobid_t job,
typedef int (*orte_odls_base_module_signal_local_process_fn_t)(const orte_process_name_t *proc,
int32_t signal);
/**
* Deliver a message to local processes
*/
typedef int (*orte_odls_base_module_deliver_message_fn_t)(orte_jobid_t job, orte_buffer_t *buffer,
orte_rml_tag_t tag);
/**
* pls module version 1.3.0
*/
@ -86,6 +94,7 @@ struct orte_odls_base_module_1_3_0_t {
orte_odls_base_module_launch_local_processes_fn_t launch_local_procs;
orte_odls_base_module_kill_local_processes_fn_t kill_local_procs;
orte_odls_base_module_signal_local_process_fn_t signal_local_procs;
orte_odls_base_module_deliver_message_fn_t deliver_message;
};
/** shorten orte_odls_base_module_1_3_0_t declaration */

Просмотреть файл

@ -44,8 +44,15 @@ typedef uint8_t orte_daemon_cmd_flag_t;
#define ORTE_DAEMON_HEARTBEAT_CMD (orte_daemon_cmd_flag_t) 7
#define ORTE_DAEMON_EXIT_CMD (orte_daemon_cmd_flag_t) 8
#define ORTE_DAEMON_HALT_VM_CMD (orte_daemon_cmd_flag_t) 9
#define ORTE_DAEMON_MESSAGE_LOCAL_PROCS (orte_daemon_cmd_flag_t) 10
#define ORTE_DAEMON_ROUTE_NONE (orte_daemon_cmd_flag_t) 11
#define ORTE_DAEMON_ROUTE_BINOMIAL (orte_daemon_cmd_flag_t) 12
#define ORTE_DAEMON_WARMUP_LOCAL_CONN (orte_daemon_cmd_flag_t) 13
/* define some useful attributes for dealing with orteds */
#define ORTE_DAEMON_SOFT_KILL "orted-soft-kill"
#define ORTE_DAEMON_HARD_KILL "orted-hard-kill"
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -392,19 +392,16 @@ ORTE_DECLSPEC int mca_oob_recv_packed_nb(
/**
* A "broadcast-like" function over the specified set of peers.
* @param root The process acting as the root of the broadcast.
* @param peers The list of processes receiving the broadcast (excluding root).
* @param buffer The data to broadcast - only significant at root.
* @param cbfunc Callback function on receipt of data - not significant at root.
* @param job The job whose processes are to receive the message.
* @param msg The message to be sent
* @param cbfunc Callback function on receipt of data
*
* Note that the callback function is provided so that the data can be
* received and interpreted by the application prior to the broadcast
* continuing to forward data along the distribution tree.
* received and interpreted by the application
*/
ORTE_DECLSPEC int mca_oob_xcast(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc);
/*

Просмотреть файл

@ -108,15 +108,22 @@ int mca_oob_base_open(void)
}
param = mca_base_param_reg_string_name("oob_xcast", "mode",
"Select xcast mode (\"linear\" [default] | \"binomial\")",
false, false, "linear", &mode);
if (0 == strcmp(mode, "linear")) {
#if 0
"Select xcast mode (\"linear\" | \"binomial\" | \"direct [default] \")",
#endif
"Select xcast mode (\"linear\" | \"direct [default] \")",
false, false, "direct", &mode);
if (0 == strcmp(mode, "binomial")) {
opal_output(0, "oob_xcast_mode: %s option not supported at this time", mode);
return ORTE_ERROR;
orte_oob_xcast_mode = 0;
} else if (0 == strcmp(mode, "linear")) {
orte_oob_xcast_mode = 1;
} else if (0 != strcmp(mode, "binomial")) {
} else if (0 == strcmp(mode, "direct")) {
orte_oob_xcast_mode = 2;
} else {
opal_output(0, "oob_xcast_mode: unknown option %s", mode);
return ORTE_ERROR;
} else {
orte_oob_xcast_mode = 0;
}
/* All done */

Просмотреть файл

@ -35,6 +35,8 @@
#include "orte/mca/ns/ns.h"
#include "orte/mca/rmgr/rmgr.h"
#include "orte/mca/smr/smr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/oob/oob.h"
#include "orte/mca/oob/base/base.h"
@ -48,43 +50,44 @@
*/
static opal_mutex_t xcastmutex;
static int xcast_bitmap, bitmap_save;
static bool bitmap_init = false;
static int mca_oob_xcast_binomial_tree(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc);
static int mca_oob_xcast_linear(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc);
static int mca_oob_xcast_direct(orte_jobid_t job,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc);
int mca_oob_xcast(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc)
{
int rc = ORTE_SUCCESS;
struct timeval start, stop;
if (orte_oob_xcast_timing) {
if (NULL != buffer) {
opal_output(0, "xcast [%ld,%ld,%ld]: buffer size %lu", ORTE_NAME_ARGS(ORTE_PROC_MY_NAME),
(unsigned long)buffer->bytes_used);
}
gettimeofday(&start, NULL);
}
switch(orte_oob_xcast_mode) {
case 0: /* binomial tree */
rc = mca_oob_xcast_binomial_tree(job, process_first, buffer, cbfunc);
rc = mca_oob_xcast_binomial_tree(job, msg, cbfunc);
break;
case 1: /* linear */
rc = mca_oob_xcast_linear(job, process_first, buffer, cbfunc);
rc = mca_oob_xcast_linear(job, msg, cbfunc);
break;
case 2: /* direct */
rc = mca_oob_xcast_direct(job, msg, cbfunc);
break;
}
if (orte_oob_xcast_timing) {
gettimeofday(&stop, NULL);
opal_output(0, "xcast [%ld,%ld,%ld]: mode %s time %ld usec", ORTE_NAME_ARGS(ORTE_PROC_MY_NAME),
@ -97,130 +100,269 @@ int mca_oob_xcast(orte_jobid_t job,
}
static int mca_oob_xcast_binomial_tree(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc)
{
orte_std_cntr_t i;
int rc;
int tag = ORTE_RML_TAG_XCAST;
int rc, ret;
int peer, size, rank, hibit, mask;
orte_buffer_t rbuf, sbuf;
orte_gpr_notify_message_t *msg;
orte_process_name_t target;
orte_buffer_t buffer;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
orte_daemon_cmd_flag_t mode=ORTE_DAEMON_ROUTE_BINOMIAL;
orte_vpid_t daemon_start=0, num_daemons;
int bitmap;
/* check to see if there is something to send - this is only true on the HNP end.
* However, we cannot just test to see if we are the HNP since, if we are a singleton,
* we are the HNP *and* we still need to handle both ends of the xcast
*/
if (NULL != buffer) {
/* this is the HNP end, so it starts the procedure. Accordingly, it sends its
* message to the first process in the job in the peer list, which takes it from there
if (NULL != msg) {
/* this is the HNP end, so it starts the procedure. Since the HNP is always the
* vpid=0 at this time, we take advantage of that fact to figure out who we
* should send this to on the first step
*/
OBJ_CONSTRUCT(&xcastmutex, opal_mutex_t);
OPAL_THREAD_LOCK(&xcastmutex);
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = job;
target.vpid = 0;
if (0 > (rc = mca_oob_send_packed(&target, buffer, tag, 0))) {
/* need to pack the msg for sending - be sure to include routing info so it
* can properly be sent through the daemons
*/
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
/* tell the daemon this is a message for its local procs */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&xcastmutex);
OBJ_DESTRUCT(&xcastmutex);
return rc;
goto CLEANUP;
}
/* tell the daemon the routing algorithm is binomial so it can figure
* out who to forward the message to
*/
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &mode, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* get the number of daemons currently in the system and tell the daemon so
* it can properly route
*/
if (ORTE_SUCCESS != (rc = orte_ns.get_vpid_range(0, &num_daemons))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &daemon_start, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &num_daemons, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the jobid of the procs that are to receive the message */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* pack the message itself */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &msg, 1, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (orte_oob_xcast_timing) {
opal_output(0, "xcast [%ld,%ld,%ld]: buffer size %ld",
ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), (long)buffer.bytes_used);
}
/* start setting up the target recipients */
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = 0;
/* if there is only one daemon, then we just send it - don't try to
* compute the binomial algorithm as it won't yield a meaningful
* result for just one
*/
if (num_daemons < 2) {
target.vpid = 1;
if (0 > (ret = mca_oob_send_packed(&target, &buffer, ORTE_RML_TAG_PLS_ORTED, 0))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(msg);
rc = ret;
goto CLEANUP;
}
} else {
/* compute the bitmap */
bitmap = opal_cube_dim((int)num_daemons);
rank = 0;
size = (int)num_daemons;
hibit = opal_hibit(rank, bitmap);
--bitmap;
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = 0;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
target.vpid = (orte_vpid_t)(daemon_start+peer);
if (0 > (ret = mca_oob_send_packed(&target, &buffer, ORTE_RML_TAG_PLS_ORTED, 0))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(msg);
rc = ret;
goto CLEANUP;
}
}
}
}
CLEANUP:
OBJ_DESTRUCT(&buffer);
OPAL_THREAD_UNLOCK(&xcastmutex);
OBJ_DESTRUCT(&xcastmutex);
return ORTE_SUCCESS;
}
/* this process is one of the application procs - accordingly, it will
* receive the message from its "parent" in the broadcast tree, and
* then send it along to some set of children
*/
/* compute the bitmap, if we haven't already done so */
if (!bitmap_init) {
bitmap_save = opal_cube_dim((int)orte_process_info.num_procs);
bitmap_init = true;
}
xcast_bitmap = bitmap_save;
rank = (int)(ORTE_PROC_MY_NAME->vpid);
size = (int)orte_process_info.num_procs;
hibit = opal_hibit(rank, xcast_bitmap);
--xcast_bitmap;
/* regardless of who we are, we first have to receive the message */
OBJ_CONSTRUCT(&rbuf, orte_buffer_t);
if (0 > (rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &rbuf, tag))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rbuf);
return rc;
}
msg = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == msg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
i=1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &msg, &i, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(msg);
return rc;
}
OBJ_DESTRUCT(&rbuf);
/* repack the message so we can send it on */
OBJ_CONSTRUCT(&sbuf, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.pack(&sbuf, &msg, 1, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&sbuf);
return rc;
}
/* since the OOB contact info for our peers is in the STG1 message, we have to
* process it BEFORE we can relay the message to any "children"
*/
if (cbfunc != NULL && process_first) {
/* process the message */
cbfunc(msg);
}
/* send data to any children */
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = ORTE_PROC_MY_NAME->jobid;
for (i = hibit + 1, mask = 1 << i; i <= xcast_bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
target.vpid = (orte_vpid_t)peer;
if (0 > (rc = mca_oob_send_packed(&target, &sbuf, tag, 0))) {
} else {
/* if we are not the HNP, then we need to just receive the message and process it */
orte_std_cntr_t i;
orte_buffer_t rbuf;
orte_gpr_notify_message_t *mesg;
OBJ_CONSTRUCT(&rbuf, orte_buffer_t);
rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &rbuf, ORTE_RML_TAG_XCAST);
if(rc < 0) {
OBJ_DESTRUCT(&rbuf);
return rc;
}
if (cbfunc != NULL) {
mesg = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == mesg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
i=1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &mesg, &i, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(msg);
OBJ_RELEASE(mesg);
return rc;
}
cbfunc(mesg);
OBJ_RELEASE(mesg);
}
OBJ_DESTRUCT(&rbuf);
return ORTE_SUCCESS;
}
OBJ_DESTRUCT(&sbuf);
/* if it wasn't the STG1 message, then process it here */
if (cbfunc != NULL && !process_first) {
cbfunc(msg);
}
OBJ_RELEASE(msg);
return ORTE_SUCCESS;
}
static int mca_oob_xcast_linear(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc)
{
int rc;
orte_buffer_t buffer;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
orte_daemon_cmd_flag_t mode=ORTE_DAEMON_ROUTE_NONE;
orte_vpid_t i, range;
orte_process_name_t dummy;
if (NULL != msg) {
/* if we are the HNP, then we need to send the message out */
OBJ_CONSTRUCT(&xcastmutex, opal_mutex_t);
OPAL_THREAD_LOCK(&xcastmutex);
/* pack the msg for sending - indicate that no further routing is required */
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
/* tell the daemon this is a message for its local procs */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon that no further routing required */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &mode, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the jobid of the procs that are to receive the message */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* pack the message itself */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &msg, 1, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (orte_oob_xcast_timing) {
opal_output(0, "xcast [%ld,%ld,%ld]: buffer size %ld",
ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), (long)buffer.bytes_used);
}
/* get the number of daemons out there */
orte_ns.get_vpid_range(0, &range);
/* send the message to each daemon as fast as we can */
dummy.cellid = ORTE_PROC_MY_NAME->cellid;
dummy.jobid = 0;
for (i=0; i < range; i++) {
if (ORTE_PROC_MY_NAME->vpid != i) { /* don't send to myself */
dummy.vpid = i;
if (0 > (rc = orte_rml.send_buffer(&dummy, &buffer, ORTE_RML_TAG_PLS_ORTED, 0))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
rc = ORTE_ERR_COMM_FAILURE;
goto CLEANUP;
}
}
}
}
rc = ORTE_SUCCESS;
/* cleanup */
CLEANUP:
OBJ_DESTRUCT(&buffer);
OPAL_THREAD_UNLOCK(&xcastmutex);
OBJ_DESTRUCT(&xcastmutex);
return rc;
} else {
/* if we are not the HNP, then we need to just receive the message and process it */
orte_std_cntr_t i;
orte_buffer_t rbuf;
orte_gpr_notify_message_t *mesg;
OBJ_CONSTRUCT(&rbuf, orte_buffer_t);
rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &rbuf, ORTE_RML_TAG_XCAST);
if(rc < 0) {
OBJ_DESTRUCT(&rbuf);
return rc;
}
if (cbfunc != NULL) {
mesg = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == mesg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
i=1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &mesg, &i, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(mesg);
return rc;
}
cbfunc(mesg);
OBJ_RELEASE(mesg);
}
OBJ_DESTRUCT(&rbuf);
return ORTE_SUCCESS;
}
}
static int mca_oob_xcast_direct(orte_jobid_t job,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc)
{
orte_std_cntr_t i;
@ -232,18 +374,19 @@ static int mca_oob_xcast_linear(orte_jobid_t job,
orte_proc_state_t state;
opal_list_t attrs;
opal_list_item_t *item;
orte_buffer_t buffer;
/* check to see if there is something to send - this is only true on the HNP end.
* However, we cannot just test to see if we are the HNP since, if we are a singleton,
* we are the HNP *and* we still need to handle both ends of the xcast
*/
if (NULL != buffer) {
if (NULL != msg) {
OBJ_CONSTRUCT(&xcastmutex, opal_mutex_t);
OPAL_THREAD_LOCK(&xcastmutex);
/* this is the HNP end, so it does all the sends in this algorithm. First, we need
* to get the job peers so we know who to send the message to
*/
*/
OBJ_CONSTRUCT(&attrs, opal_list_t);
orte_rmgr.add_attribute(&attrs, ORTE_NS_USE_JOBID, ORTE_JOBID, &job, ORTE_RMGR_ATTR_OVERRIDE);
if (ORTE_SUCCESS != (rc = orte_ns.get_peers(&peers, &n, &attrs))) {
@ -256,6 +399,22 @@ static int mca_oob_xcast_linear(orte_jobid_t job,
OBJ_RELEASE(item);
OBJ_DESTRUCT(&attrs);
/* need to pack the msg for sending - no routing info here as this message
* goes DIRECTLY to the processes
*/
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &msg, 1, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buffer);
free(peers);
return rc;
}
if (orte_oob_xcast_timing) {
opal_output(0, "xcast [%ld,%ld,%ld]: buffer size %ld",
ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), (long)buffer.bytes_used);
}
for(i=0; i<n; i++) {
/* check status of peer to ensure they are alive */
if (ORTE_SUCCESS != (rc = orte_smr.get_proc_state(&state, &status, peers+i))) {
@ -266,7 +425,7 @@ static int mca_oob_xcast_linear(orte_jobid_t job,
return rc;
}
if (state != ORTE_PROC_STATE_TERMINATED && state != ORTE_PROC_STATE_ABORTED) {
rc = mca_oob_send_packed(peers+i, buffer, tag, 0);
rc = mca_oob_send_packed(peers+i, &buffer, tag, 0);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
free(peers);
@ -277,6 +436,8 @@ static int mca_oob_xcast_linear(orte_jobid_t job,
}
}
free(peers);
OBJ_DESTRUCT(&buffer);
OPAL_THREAD_UNLOCK(&xcastmutex);
OBJ_DESTRUCT(&xcastmutex);
return ORTE_SUCCESS;
@ -284,7 +445,7 @@ static int mca_oob_xcast_linear(orte_jobid_t job,
/* if we are not the HNP, then we need to just receive the message and process it */
} else {
orte_buffer_t rbuf;
orte_gpr_notify_message_t *msg;
orte_gpr_notify_message_t *mesg;
OBJ_CONSTRUCT(&rbuf, orte_buffer_t);
rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &rbuf, tag);
@ -293,19 +454,19 @@ static int mca_oob_xcast_linear(orte_jobid_t job,
return rc;
}
if (cbfunc != NULL) {
msg = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == msg) {
mesg = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == mesg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
i=1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &msg, &i, ORTE_GPR_NOTIFY_MSG))) {
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &mesg, &i, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(msg);
OBJ_RELEASE(mesg);
return rc;
}
cbfunc(msg);
OBJ_RELEASE(msg);
cbfunc(mesg);
OBJ_RELEASE(mesg);
}
OBJ_DESTRUCT(&rbuf);
}

Просмотреть файл

@ -200,8 +200,7 @@ typedef int (*mca_oob_base_module_fini_fn_t)(void);
* xcast function for sending common messages to all processes
*/
typedef int (*mca_oob_base_module_xcast_fn_t)(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc);
typedef int (*mca_oob_base_module_ft_event_fn_t)( int state );

Просмотреть файл

@ -26,6 +26,7 @@
#include "orte/util/univ_info.h"
#include "orte/mca/rml/rml.h"
#include "orte/runtime/params.h"
#include "orte/mca/pls/base/pls_private.h"

Просмотреть файл

@ -57,7 +57,6 @@ int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job)
int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg)
{
orte_buffer_t buffer;
int rc;
orte_jobid_t job;
@ -119,19 +118,10 @@ int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg)
msg->msg_type = ORTE_GPR_SUBSCRIPTION_MSG;
msg->id = ORTE_GPR_TRIGGER_ID_MAX;
/* need to pack the msg for sending */
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &msg, 1, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buffer);
goto CLEANUP;
}
/* send the message */
if (ORTE_SUCCESS != (rc = orte_rml.xcast(job, false, &buffer, NULL))) {
if (ORTE_SUCCESS != (rc = orte_rml.xcast(job, msg, NULL))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&buffer);
CLEANUP:

Просмотреть файл

@ -331,8 +331,7 @@ typedef int (*orte_rml_module_recv_cancel_fn_t)(orte_process_name_t* peer, orte_
*/
typedef int (*orte_rml_module_xcast_fn_t)(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_notify_message_t *msg,
orte_gpr_trigger_cb_fn_t cbfunc);
/*

Просмотреть файл

@ -24,5 +24,6 @@ libmca_sds_la_SOURCES += \
base/sds_base_open.c \
base/sds_base_select.c \
base/sds_base_interface.c \
base/sds_base_orted_contact.c \
base/sds_base_universe.c \
base/sds_base_put.c

Просмотреть файл

@ -65,6 +65,7 @@ extern "C" {
*/
ORTE_DECLSPEC int orte_sds_base_basic_contact_universe(void);
ORTE_DECLSPEC int orte_sds_base_seed_set_name(void);
ORTE_DECLSPEC int orte_sds_base_contact_orted(char *orted_uri);
/*
* Put functions

80
orte/mca/sds/base/sds_base_orted_contact.c Обычный файл
Просмотреть файл

@ -0,0 +1,80 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/orte_constants.h"
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "orte/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/sds/base/base.h"
int orte_sds_base_contact_orted(char *orted_uri)
{
orte_buffer_t buffer;
int rc;
orte_process_name_t orted;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_WARMUP_LOCAL_CONN;
/* set the contact info into the OOB's hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_uri(orted_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the daemon's name from the uri */
if (ORTE_SUCCESS != (rc = orte_rml.parse_uris(orted_uri, &orted, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* we need to send a very small message to get the oob to establish
* the connection - the oob will leave the connection "alive"
* thereafter so we can communicate readily
*/
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
/* tell the daemon this is a message to warmup the connection */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buffer);
return rc;
}
/* do the send - it will be ignored on the far end, so don't worry about
* getting a response
*/
if (0 > orte_rml.send_buffer(&orted, &buffer, ORTE_RML_TAG_PLS_ORTED, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_CONNECTION_FAILED);
OBJ_DESTRUCT(&buffer);
return ORTE_ERR_CONNECTION_FAILED;
}
OBJ_DESTRUCT(&buffer);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -23,13 +23,16 @@
#include "orte/orte_constants.h"
#include "orte/util/sys_info.h"
#include "opal/util/output.h"
#include "opal/util/os_path.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/mca/sds/sds.h"
#include "orte/mca/sds/base/base.h"
#include "orte/mca/sds/bproc/sds_bproc.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/ns/base/base.h"
#include "orte/mca/errmgr/base/base.h"
#include "orte/util/session_dir.h"
orte_sds_base_module_t orte_sds_bproc_module = {
orte_sds_base_basic_contact_universe,
@ -47,8 +50,15 @@ int orte_sds_bproc_set_name(void)
{
int rc;
int id;
char* name_string = NULL;
char *name_string = NULL;
char *jobid_string;
char *vpid_string;
char orted_uri[1024];
bool cleanup_jobid_string, cleanup_vpid_string;
char *session_dir;
char *uri_file;
FILE *fp;
id = mca_base_param_register_string("ns", "nds", "name", NULL, NULL);
mca_base_param_lookup_string(id, &name_string);
if(name_string != NULL) {
@ -61,6 +71,18 @@ int orte_sds_bproc_set_name(void)
}
free(name_string);
/* get the jobid and vpid strings for use later */
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid_string(&jobid_string, ORTE_PROC_MY_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_ns.get_vpid_string(&vpid_string, ORTE_PROC_MY_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
cleanup_jobid_string = true;
cleanup_vpid_string = true;
} else {
orte_cellid_t cellid;
@ -68,8 +90,6 @@ int orte_sds_bproc_set_name(void)
orte_vpid_t vpid;
orte_vpid_t vpid_start;
char* cellid_string;
char* jobid_string;
char* vpid_string;
int num_procs;
char *bproc_rank_string;
int bproc_rank;
@ -96,6 +116,7 @@ int orte_sds_bproc_set_name(void)
ORTE_ERROR_LOG(rc);
return(rc);
}
cleanup_jobid_string = false;
/* BPROC_RANK is set by bproc when we do a parallel launch */
bproc_rank_string = getenv("BPROC_RANK");
@ -166,7 +187,49 @@ int orte_sds_bproc_set_name(void)
if(NULL != orte_system_info.nodename)
free(orte_system_info.nodename);
asprintf(&orte_system_info.nodename, "%d", bproc_currnode());
/* ensure the vpid is in the vpid_string in case we need it later */
if (ORTE_SUCCESS != (rc = orte_ns.get_vpid_string(&vpid_string, ORTE_PROC_MY_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
cleanup_vpid_string = true;
}
/* if we are NOT a daemon, then lookup our local daemon's contact info
* and setup that link
*/
if (!orte_process_info.daemon) {
/* get the session dir name so we can find the file there */
if (ORTE_SUCCESS != (rc = orte_session_dir_get_name(&session_dir, NULL, NULL, NULL,
NULL, NULL, NULL, jobid_string, vpid_string))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* find the file and get the local orted's uri from it */
uri_file = opal_os_path(false, session_dir, "orted-uri.txt", NULL);
free(session_dir);
fp = fopen(uri_file, "r");
if (NULL == fp) {
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
return ORTE_ERR_FILE_OPEN_FAILURE;
}
fgets(orted_uri, 1024, fp);
orted_uri[strlen(orted_uri)-1] = '\0';
fclose(fp);
/* setup the link */
if (ORTE_SUCCESS != (rc = orte_sds_base_contact_orted(orted_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
free(uri_file);
}
if (cleanup_jobid_string) free(jobid_string);
if (cleanup_vpid_string) free(vpid_string);
return ORTE_SUCCESS;
}

19
orte/mca/sds/env/sds_env_module.c поставляемый
Просмотреть файл

@ -46,6 +46,9 @@ orte_sds_env_set_name(void)
int vpid_start;
int num_procs;
char* name_string = NULL;
char *local_daemon_uri = NULL;
id = mca_base_param_register_string("ns", "nds", "name", NULL, NULL);
mca_base_param_lookup_string(id, &name_string);
@ -117,6 +120,7 @@ orte_sds_env_set_name(void)
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
orte_process_info.vpid_start = (orte_vpid_t)vpid_start;
id = mca_base_param_register_int("ns", "nds", "num_procs", NULL, -1);
mca_base_param_lookup_int(id, &num_procs);
@ -124,9 +128,20 @@ orte_sds_env_set_name(void)
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
orte_process_info.vpid_start = (orte_vpid_t)vpid_start;
orte_process_info.num_procs = (orte_std_cntr_t)num_procs;
id = mca_base_param_register_string("orte", "local_daemon", "uri", NULL, NULL);
mca_base_param_lookup_string(id, &local_daemon_uri);
if (NULL != local_daemon_uri) {
/* if we are a daemon, then we won't have this param set, so allow
* it not to be found
*/
if (ORTE_SUCCESS != (rc = orte_sds_base_contact_orted(local_daemon_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -58,6 +58,7 @@ orte_sds_slurm_set_name(void)
int num_procs;
char* name_string = NULL;
int slurm_nodeid;
char *local_daemon_uri = NULL;
/* start by getting our cellid, jobid, and vpid (which is the
starting vpid for the list of daemons) */
@ -140,6 +141,7 @@ orte_sds_slurm_set_name(void)
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
orte_process_info.vpid_start = (orte_vpid_t)vpid_start;
id = mca_base_param_register_int("ns", "nds", "num_procs", NULL, -1);
mca_base_param_lookup_int(id, &num_procs);
@ -147,9 +149,20 @@ orte_sds_slurm_set_name(void)
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
orte_process_info.vpid_start = (orte_vpid_t)vpid_start;
orte_process_info.num_procs = (size_t)num_procs;
orte_process_info.num_procs = (orte_std_cntr_t)num_procs;
id = mca_base_param_register_string("orte", "local_daemon", "uri", NULL, NULL);
mca_base_param_lookup_string(id, &local_daemon_uri);
if (NULL != local_daemon_uri) {
/* if we are a daemon, then we won't have this param set, so allow
* it not to be found
*/
if (ORTE_SUCCESS != (rc = orte_sds_base_contact_orted(local_daemon_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -203,6 +203,17 @@ int orte_init_stage1(bool infrastructure)
goto error;
}
/*
* Initialize the daemon launch system so those types
* are registered (needed by the sds to talk to its
* local daemon)
*/
if (ORTE_SUCCESS != (ret = orte_odls_base_open())) {
ORTE_ERROR_LOG(ret);
error = "orte_odls_base_open";
goto error;
}
/*
* Initialize schema utilities
*/
@ -324,12 +335,6 @@ int orte_init_stage1(bool infrastructure)
goto error;
}
if (ORTE_SUCCESS != (ret = orte_odls_base_open())) {
ORTE_ERROR_LOG(ret);
error = "orte_odls_base_open";
goto error;
}
if (ORTE_SUCCESS != (ret = orte_odls_base_select())) {
ORTE_ERROR_LOG(ret);
error = "orte_odls_base_select";

Просмотреть файл

@ -28,6 +28,7 @@
#include "opal/threads/condition.h"
#include "orte/util/sys_info.h"
#include "orte/runtime/runtime.h"
#include "orte/runtime/params.h"
#include "orte/orte_constants.h"
#include "orte/util/proc_info.h"
#include "orte/mca/ns/ns_types.h"

Просмотреть файл

@ -30,8 +30,10 @@
#include "orte/runtime/params.h"
/* globals used by RTE */
int orte_debug_flag;
bool orte_debug_flag;
struct timeval orte_abort_timeout;
/*
* Whether we have completed orte_init or not
*/

Просмотреть файл

@ -25,6 +25,7 @@
#include "orte_config.h"
#include "orte/orte_constants.h"
#include <string.h>
#ifdef HAVE_SYS_TIME_H
@ -35,16 +36,16 @@
#include <dirent.h>
#endif /* HAVE_DIRENT_H */
#include "orte/orte_constants.h"
#include "opal/util/output.h"
#include "opal/util/os_path.h"
#include "opal/util/os_dirpath.h"
#include "orte/util/univ_info.h"
#include "orte/util/sys_info.h"
#include "orte/util/proc_info.h"
#include "opal/util/os_path.h"
#include "opal/util/os_dirpath.h"
#include "orte/util/session_dir.h"
#include "orte/util/universe_setup_file_io.h"
#include "orte/runtime/params.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/errmgr/errmgr.h"

Просмотреть файл

@ -39,7 +39,7 @@ extern "C" {
/* globals used by RTE - instanced in orte_params.c */
ORTE_DECLSPEC extern int orte_debug_flag;
ORTE_DECLSPEC extern bool orte_debug_flag;
ORTE_DECLSPEC extern struct timeval orte_abort_timeout;

Просмотреть файл

@ -70,10 +70,6 @@
extern "C" {
#endif
/* globals used by RTE - instanced in orte_init.c */
ORTE_DECLSPEC extern int orte_debug_flag;
/**
* Abort the current application
*

Просмотреть файл

@ -109,7 +109,7 @@ int main(int argc, char* argv[])
}
/* FIRST BARRIER - WAIT FOR MSG FROM RMGR_PROC_STAGE_GATE_MGR TO ARRIVE */
if (ORTE_SUCCESS != (rc = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, true,
if (ORTE_SUCCESS != (rc = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(rc);
error = "failed to see all procs register\n";

Просмотреть файл

@ -64,10 +64,13 @@ extern char **environ;
#include "opal/util/os_path.h"
#include "opal/class/opal_list.h"
#include "opal/mca/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmgr/rmgr_types.h"
#include "orte/mca/rmaps/rmaps.h"
#include "orte/runtime/runtime.h"
#include "orte/runtime/params.h"
#include "totalview.h"
/* +++ begin MPICH/TotalView interface definitions */