Make it so the universe size is passed through the ODLS instead of through a gpr trigger during MPI init. This matches what is currently being done with the app number.
The default odls has been updated and works fine. The process odls has been updated, but I could not verify its operation. The bproc ODLS has not been updated yet. Ralph will look at it soon. This commit was SVN r15257.
Этот коммит содержится в:
родитель
0bf7463c6f
Коммит
c46ed1d5d4
@ -33,7 +33,6 @@
|
||||
#include "ompi/constants.h"
|
||||
#include "opal/class/opal_object.h"
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
#include "orte/mca/gpr/gpr_types.h"
|
||||
|
||||
#define ATTR_HASH_SIZE 10
|
||||
|
||||
@ -45,9 +44,8 @@
|
||||
#define OMPI_KEYVAL_F77_MPI1 0x0004
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
BEGIN_C_DECLS
|
||||
|
||||
enum ompi_attribute_type_t {
|
||||
UNUSED_ATTR = 0, /**< Make the compilers happy when we have to construct
|
||||
* an attribute */
|
||||
@ -524,20 +522,7 @@ int ompi_attr_create_predefined(void);
|
||||
*/
|
||||
int ompi_attr_free_predefined(void);
|
||||
|
||||
/**
|
||||
* \internal
|
||||
* Callback function to get data from registry and create predefined attributes
|
||||
*
|
||||
* @returns Nothing
|
||||
*/
|
||||
void ompi_attr_create_predefined_callback(
|
||||
orte_gpr_notify_data_t *data,
|
||||
void *cbdata);
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* OMPI_ATTRIBUTE_H */
|
||||
|
@ -87,14 +87,8 @@
|
||||
|
||||
#include "ompi/errhandler/errcode.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "orte/dss/dss.h"
|
||||
#include "orte/mca/ns/ns.h"
|
||||
#include "orte/mca/gpr/gpr.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/schema/schema.h"
|
||||
|
||||
#include "orte/util/proc_info.h"
|
||||
|
||||
/*
|
||||
* Private functions
|
||||
@ -111,11 +105,7 @@ static int set_f(int keyval, MPI_Fint value);
|
||||
|
||||
int ompi_attr_create_predefined(void)
|
||||
{
|
||||
int rc, ret;
|
||||
orte_gpr_subscription_t *subs, sub = ORTE_GPR_SUBSCRIPTION_EMPTY;
|
||||
orte_gpr_trigger_t *trigs, trig = ORTE_GPR_TRIGGER_EMPTY;
|
||||
orte_gpr_value_t *values[1];
|
||||
char *jobseg;
|
||||
int ret;
|
||||
|
||||
/* Create all the keyvals */
|
||||
|
||||
@ -144,11 +134,7 @@ int ompi_attr_create_predefined(void)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Set default values for everything except APPNUM. Set UNIVERSE
|
||||
size to comm_world size. It might grow later, it might not
|
||||
(tiggers are not fired in all environments. In environments
|
||||
where triggers aren't set, there won't be COMM_SPAWN, so APPNUM
|
||||
probably isn't a big deal. */
|
||||
/* Set default values for everything except MPI_UNIVERSE_SIZE */
|
||||
|
||||
if (OMPI_SUCCESS != (ret = set_f(MPI_TAG_UB, mca_pml.pml_max_tag)) ||
|
||||
OMPI_SUCCESS != (ret = set_f(MPI_HOST, MPI_PROC_NULL)) ||
|
||||
@ -156,8 +142,7 @@ int ompi_attr_create_predefined(void)
|
||||
OMPI_SUCCESS != (ret = set_f(MPI_WTIME_IS_GLOBAL, 0)) ||
|
||||
OMPI_SUCCESS != (ret = set_f(MPI_LASTUSEDCODE,
|
||||
ompi_mpi_errcode_lastused)) ||
|
||||
OMPI_SUCCESS != (ret = set_f(MPI_UNIVERSE_SIZE,
|
||||
ompi_comm_size(MPI_COMM_WORLD))) ||
|
||||
OMPI_SUCCESS != (ret = set_f(MPI_APPNUM, orte_process_info.app_num)) ||
|
||||
#if 0
|
||||
/* JMS For when we implement IMPI */
|
||||
OMPI_SUCCESS != (ret = set(IMPI_CLIENT_SIZE,
|
||||
@ -173,76 +158,15 @@ int ompi_attr_create_predefined(void)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Now that those are all created, setup the trigger to get the
|
||||
UNIVERSE_SIZE and APPNUM values once everyone has passed
|
||||
stg1. */
|
||||
|
||||
/* we have to create two subscriptions - one to retrieve the number of slots on
|
||||
* each node so we can estimate the universe size, and the other to return our
|
||||
* app_context index to properly set the appnum attribute.
|
||||
*
|
||||
* NOTE: when the 2.0 registry becomes available, this should be consolidated to
|
||||
* a single subscription
|
||||
*/
|
||||
|
||||
/* indicate that this is a standard subscription. This indicates
|
||||
that the subscription will be common to all processes. Thus,
|
||||
the resulting data can be consolidated into a
|
||||
process-independent message and broadcast to all processes */
|
||||
subs = ⊂
|
||||
if (ORTE_SUCCESS !=
|
||||
(rc = orte_schema.get_std_subscription_name(&sub.name,
|
||||
OMPI_ATTRIBUTE_SUBSCRIPTION, ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG;
|
||||
sub.values = values;
|
||||
sub.cnt = 1;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&jobseg, ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
/* If the universe size is set, then use it. Otherwise default
|
||||
* to the size of MPI_COMM_WORLD */
|
||||
if(orte_process_info.universe_size > 0) {
|
||||
ret = set_f(MPI_UNIVERSE_SIZE, orte_process_info.universe_size);
|
||||
} else {
|
||||
ret = set_f(MPI_UNIVERSE_SIZE, ompi_comm_size(MPI_COMM_WORLD));
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR | ORTE_GPR_STRIPPED,
|
||||
jobseg, 1, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(jobseg);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
free(jobseg);
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[0]->keyvals[0]), ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY, ORTE_UNDEF, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(values[0]);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
|
||||
sub.cbfunc = ompi_attr_create_predefined_callback;
|
||||
|
||||
/* attach ourselves to the standard stage-1 trigger */
|
||||
trigs = &trig;
|
||||
if (ORTE_SUCCESS !=
|
||||
(rc = orte_schema.get_std_trigger_name(&trig.name,
|
||||
ORTE_STG1_TRIGGER, ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(values[0]);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
OBJ_RELEASE(values[0]);
|
||||
free(sub.name);
|
||||
free(trig.name);
|
||||
|
||||
return rc;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -274,67 +198,6 @@ int ompi_attr_free_predefined(void)
|
||||
}
|
||||
|
||||
|
||||
void ompi_attr_create_predefined_callback(
|
||||
orte_gpr_notify_data_t *data,
|
||||
void *cbdata)
|
||||
{
|
||||
orte_gpr_value_t **value;
|
||||
orte_std_cntr_t *cptr;
|
||||
unsigned int universe_size;
|
||||
int rc;
|
||||
|
||||
/* Query the gpr to find out how many CPUs there will be.
|
||||
This will only return a non-empty list in a persistent
|
||||
universe. If we don't have a persistent universe, then just
|
||||
default to the size of MPI_COMM_WORLD.
|
||||
|
||||
JMS: I think we need more here -- there are cases where you
|
||||
wouldn't have a persistent universe but still may have a
|
||||
comm_size(COMM_WORLD) != UNIVERSE_SIZE. For example, say you
|
||||
reserve 8 CPUs in a batch environment and then run ./master,
|
||||
where the master is supposed to SPAWN the other processes.
|
||||
Perhaps need some integration with the LLM here...? [shrug] */
|
||||
|
||||
/* RHC: Needed to change this code so it wouldn't issue a gpr.get
|
||||
* during the compound command phase of mpi_init. Since all you need
|
||||
* is to have the data prior to dtypes etc., and since this function
|
||||
* is called right before we send the compound command, I've changed
|
||||
* it to a subscription and a callback function. This allows you to
|
||||
* get the data AFTER the compound command executes. Nothing else
|
||||
* happens in-between anyway, so this shouldn't cause a problem.
|
||||
*/
|
||||
|
||||
if (1 != data->cnt) { /* only one data value should be returned, or else something is wrong - use default */
|
||||
universe_size = ompi_comm_size(MPI_COMM_WORLD);
|
||||
} else {
|
||||
value = (orte_gpr_value_t**)(data->values)->addr;
|
||||
if (NULL == value[0]) {
|
||||
/* again, got an error - use default */
|
||||
universe_size = ompi_comm_size(MPI_COMM_WORLD);
|
||||
} else {
|
||||
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, value[0]->keyvals[0]->value, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
universe_size = (unsigned int)(*cptr);
|
||||
}
|
||||
}
|
||||
|
||||
/* ignore errors here because there's nothing we
|
||||
can do if there's any error anyway */
|
||||
set_f(MPI_UNIVERSE_SIZE, universe_size);
|
||||
|
||||
|
||||
/* the app_context index for this app was passed in via the ODLS framework
|
||||
* and stored in the orte_process_info structure when that struct was initialized - set
|
||||
* the corresponding attribute here
|
||||
*/
|
||||
set_f(MPI_APPNUM, (MPI_Fint) orte_process_info.app_num);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
static int create_comm(int target_keyval, bool want_inherit)
|
||||
{
|
||||
int err;
|
||||
|
@ -81,7 +81,6 @@ enum {
|
||||
/*
|
||||
* OMPI-specific names for triggers and subscriptions used across processes
|
||||
*/
|
||||
#define OMPI_ATTRIBUTE_SUBSCRIPTION "ompi-attribute-sub"
|
||||
#define OMPI_PROC_SUBSCRIPTION "ompi-proc-sub"
|
||||
#define OMPI_OOB_SUBSCRIPTION "ompi-oob-sub"
|
||||
#define OMPI_MODEX_SUBSCRIPTION "ompi-modex-sub"
|
||||
|
@ -139,6 +139,7 @@ int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data,
|
||||
ORTE_JOB_APP_CONTEXT_KEY,
|
||||
ORTE_JOB_VPID_START_KEY,
|
||||
ORTE_JOB_VPID_RANGE_KEY,
|
||||
ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY,
|
||||
NULL
|
||||
};
|
||||
opal_list_item_t *item, *m_item;
|
||||
@ -593,6 +594,7 @@ static int odls_default_fork_local_proc(
|
||||
orte_odls_child_t *child,
|
||||
orte_vpid_t vpid_start,
|
||||
orte_vpid_t vpid_range,
|
||||
orte_std_cntr_t total_slots_alloc,
|
||||
bool want_processor,
|
||||
size_t processor,
|
||||
bool oversubscribed,
|
||||
@ -825,6 +827,13 @@ static int odls_default_fork_local_proc(
|
||||
free(param);
|
||||
free(param2);
|
||||
|
||||
/* set the universe size in the environment */
|
||||
param = mca_base_param_environ_variable("orte","universe","size");
|
||||
asprintf(¶m2, "%ld", (long)total_slots_alloc);
|
||||
opal_setenv(param, param2, true, &environ_copy);
|
||||
free(param);
|
||||
free(param2);
|
||||
|
||||
/* use same nodename as the starting daemon (us) */
|
||||
param = mca_base_param_environ_variable("orte", "base", "nodename");
|
||||
opal_setenv(param, orte_system_info.nodename, true, &environ_copy);
|
||||
@ -931,7 +940,7 @@ static int odls_default_fork_local_proc(
|
||||
int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ)
|
||||
{
|
||||
int rc;
|
||||
orte_std_cntr_t i, j, kv, kv2, *sptr;
|
||||
orte_std_cntr_t i, j, kv, kv2, *sptr, total_slots_alloc;
|
||||
orte_gpr_value_t *value, **values;
|
||||
orte_gpr_keyval_t *kval;
|
||||
orte_app_context_t *app;
|
||||
@ -1040,6 +1049,15 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba
|
||||
override_oversubscribed = *bptr;
|
||||
continue;
|
||||
}
|
||||
if (strcmp(kval->key, ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY) == 0) {
|
||||
/* this can only occur once, so just store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, kval->value, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
total_slots_alloc = *sptr;
|
||||
continue;
|
||||
}
|
||||
} /* end for loop to process global data */
|
||||
} else {
|
||||
/* this must have come from one of the process containers, so it must
|
||||
@ -1320,7 +1338,8 @@ DOFORK:
|
||||
OPAL_THREAD_UNLOCK(&orte_odls_default.mutex);
|
||||
|
||||
if (ORTE_SUCCESS != (rc = odls_default_fork_local_proc(app, child, start,
|
||||
range, want_processor,
|
||||
range, total_slots_alloc,
|
||||
want_processor,
|
||||
i, oversubscribed,
|
||||
base_environ))) {
|
||||
/* do NOT ERROR_LOG this error - it generates
|
||||
|
@ -84,6 +84,7 @@ static int orte_odls_process_get_add_procs_data(orte_gpr_notify_data_t **data,
|
||||
ORTE_JOB_APP_CONTEXT_KEY,
|
||||
ORTE_JOB_VPID_START_KEY,
|
||||
ORTE_JOB_VPID_RANGE_KEY,
|
||||
ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY,
|
||||
NULL
|
||||
};
|
||||
opal_list_item_t *item, *m_item;
|
||||
@ -514,6 +515,7 @@ static int orte_odls_process_fork_local_proc(
|
||||
orte_odls_child_t *child,
|
||||
orte_vpid_t vpid_start,
|
||||
orte_vpid_t vpid_range,
|
||||
orte_std_cntr_t total_slots_alloc,
|
||||
bool want_processor,
|
||||
size_t processor,
|
||||
bool oversubscribed,
|
||||
@ -694,6 +696,14 @@ static int orte_odls_process_fork_local_proc(
|
||||
free(param);
|
||||
free(param2);
|
||||
|
||||
/* set the universe size in the environment */
|
||||
param = mca_base_param_environ_variable("orte","universe","size");
|
||||
asprintf(¶m2, "%ld", (long)total_slots_alloc);
|
||||
opal_setenv(param, param2, true, &environ_copy);
|
||||
free(param);
|
||||
free(param2);
|
||||
|
||||
|
||||
/* use same nodename as the starting daemon (us) */
|
||||
param = mca_base_param_environ_variable("orte", "base", "nodename");
|
||||
opal_setenv(param, orte_system_info.nodename, true, &environ_copy);
|
||||
@ -750,7 +760,7 @@ static int orte_odls_process_fork_local_proc(
|
||||
static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ)
|
||||
{
|
||||
int rc;
|
||||
orte_std_cntr_t i, j, kv, kv2, *sptr;
|
||||
orte_std_cntr_t i, j, kv, kv2, *sptr, total_slots_alloc;
|
||||
orte_gpr_value_t *value, **values;
|
||||
orte_gpr_keyval_t *kval;
|
||||
orte_app_context_t *app;
|
||||
@ -859,6 +869,16 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
|
||||
override_oversubscribed = *bptr;
|
||||
continue;
|
||||
}
|
||||
if (strcmp(kval->key, ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY) == 0) {
|
||||
/* this can only occur once, so just store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, kval->value, ORTE_STD_CNTR))
|
||||
) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
total_slots_alloc = *sptr;
|
||||
continue;
|
||||
}
|
||||
} /* end for loop to process global data */
|
||||
} else {
|
||||
/* this must have come from one of the process containers, so it must
|
||||
@ -1129,7 +1149,8 @@ DOFORK:
|
||||
OPAL_THREAD_UNLOCK(&orte_odls_process.mutex);
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_odls_process_fork_local_proc(app, child, start,
|
||||
range, want_processor,
|
||||
range, total_slots_alloc,
|
||||
want_processor,
|
||||
i, oversubscribed,
|
||||
base_environ))) {
|
||||
/* do NOT ERROR_LOG this error - it generates
|
||||
|
@ -36,6 +36,7 @@
|
||||
ORTE_DECLSPEC orte_proc_info_t orte_process_info = {
|
||||
/* .my_name = */ NULL,
|
||||
/* ,app_num = */ -1,
|
||||
/* ,universe_size = */ -1,
|
||||
/* .singleton = */ false,
|
||||
/* .vpid_start = */ 0,
|
||||
/* .num_procs = */ 1,
|
||||
@ -80,6 +81,10 @@ int orte_proc_info(void)
|
||||
mca_base_param_lookup_int(id, &tmp);
|
||||
orte_process_info.app_num = tmp;
|
||||
|
||||
id = mca_base_param_register_int("orte", "universe", "size", NULL, -1);
|
||||
mca_base_param_lookup_int(id, &tmp);
|
||||
orte_process_info.universe_size = tmp;
|
||||
|
||||
id = mca_base_param_register_string("gpr", "replica", "uri", NULL, orte_process_info.gpr_replica_uri);
|
||||
mca_base_param_lookup_string(id, &(orte_process_info.gpr_replica_uri));
|
||||
mca_base_param_set_internal(id, true);
|
||||
|
@ -33,9 +33,7 @@
|
||||
#endif
|
||||
#include "orte/mca/ns/ns_types.h"
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
BEGIN_C_DECLS
|
||||
|
||||
/**
|
||||
* Process information structure
|
||||
@ -49,6 +47,7 @@ extern "C" {
|
||||
struct orte_proc_info_t {
|
||||
orte_process_name_t *my_name; /**< My official process name */
|
||||
orte_std_cntr_t app_num; /**< our index into the app_context array */
|
||||
orte_std_cntr_t universe_size; /**< the size of the universe we are in */
|
||||
bool singleton; /**< I am a singleton */
|
||||
orte_vpid_t vpid_start; /**< starting vpid for this job */
|
||||
orte_std_cntr_t num_procs; /**< number of processes in this job */
|
||||
@ -113,8 +112,7 @@ ORTE_DECLSPEC extern orte_proc_info_t orte_process_info;
|
||||
ORTE_DECLSPEC int orte_proc_info(void);
|
||||
|
||||
ORTE_DECLSPEC int orte_proc_info_finalize(void);
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user