1
1

Since universe size now is an orte thing, we may as well give it some direct support. Create rmgr set/get functions so it becomes more obvious where this value is being defined and how to retrieve it. Modify the bproc pls to pass it to the app procs when launched. Modify one of the test programs to verify it has been correctly set.

This commit was SVN r15266.
Этот коммит содержится в:
Ralph Castain 2007-07-02 16:45:40 +00:00
родитель c796d84d61
Коммит 684aa1bc9f
10 изменённых файлов: 173 добавлений и 42 удалений

Просмотреть файл

@ -647,10 +647,10 @@ orte_odls_bproc_launch_local_procs(orte_gpr_notify_data_t *data, char **base_env
}
} /* kv2 */
/* protect operation on the global list of children */
OPAL_THREAD_LOCK(&mca_odls_bproc_component.mutex);
OPAL_THREAD_LOCK(&mca_odls_bproc_component.lock);
opal_list_append(&mca_odls_bproc_component.children, &child->super);
opal_condition_signal(&mca_odls_bproc_component.cond);
OPAL_THREAD_UNLOCK(&mca_odls_bproc_component.mutex);
OPAL_THREAD_UNLOCK(&mca_odls_bproc_component.lock);
}
}

Просмотреть файл

@ -797,7 +797,7 @@ static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots,
orte_vpid_t vpid_start, int app_context) {
int *node_array=NULL, num_nodes, cycle;
int rc, i, j, stride;
orte_std_cntr_t num_processes, num_cycles;
orte_std_cntr_t num_processes, num_cycles, univ_size;
int *pids = NULL;
char *var, *param;
orte_process_name_t * proc_name;
@ -825,6 +825,17 @@ static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots,
free(param);
free(var);
/* set the universe size in the environment */
if (ORTE_SUCCESS != (rc = orte_rmgr.get_universe_size(map->job, &univ_size))) {
ORTE_ERROR_LOG(rc);
return rc;
}
var = mca_base_param_environ_variable("orte","universe","size");
asprintf(&param, "%ld", (long)univ_size);
opal_setenv(var, param, true, &env);
free(param);
free(var);
/* set the vpid-to-vpid stride based on the mapping mode */
if (bynode) {
/* we are mapping by node, so we want to set the stride

Просмотреть файл

@ -28,6 +28,8 @@
#include "orte/mca/smr/smr_types.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/rmgr/rmgr.h"
#include "orte/mca/ras/base/ras_private.h"
static void orte_ras_base_node_construct(orte_ras_node_t* node)
@ -747,7 +749,7 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid)
int rc;
orte_std_cntr_t num_values, i, j, total_slots;
orte_ras_node_t* node;
char* jobid_str, *key=NULL, *segment;
char* jobid_str, *key=NULL;
num_values = (orte_std_cntr_t)opal_list_get_size(nodes);
if (0 >= num_values) {
@ -755,10 +757,7 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid)
return ORTE_ERR_BAD_PARAM;
}
/* get one value more than needed for these nodes so we can store the total number of
* slots being assigned to this jobid in the job segment
*/
values = (orte_gpr_value_t**)malloc((1+num_values) * sizeof(orte_gpr_value_t*));
values = (orte_gpr_value_t**)malloc(num_values * sizeof(orte_gpr_value_t*));
if (NULL == values) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
@ -814,37 +813,19 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid)
total_slots += node->node_slots;
}
/* setup to store the total number of slots */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[num_values]), ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND,
segment, 1, 1))) {
ORTE_ERROR_LOG(rc);
free(segment);
goto cleanup;
}
free(segment);
/* enter the value */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[num_values]->keyvals[0]),
ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY, ORTE_STD_CNTR, &total_slots))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* put the value in the JOB_GLOBALS container */
values[num_values]->tokens[0] = strdup(ORTE_JOB_GLOBALS);
/* do the insert */
if (ORTE_SUCCESS != (rc = orte_gpr.put((1+num_values), values))) {
if (ORTE_SUCCESS != (rc = orte_gpr.put(num_values, values))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* store the total number of slots */
if (ORTE_SUCCESS != (rc = orte_rmgr.set_universe_size(jobid, total_slots))) {
ORTE_ERROR_LOG(rc);
}
cleanup:
for (j=0; j < (1+num_values); j++) {
for (j=0; j < num_values; j++) {
OBJ_RELEASE(values[j]);
}
if (NULL != values) free(values);

Просмотреть файл

@ -188,3 +188,106 @@ cleanup:
return rc;
}
int orte_rmgr_base_set_universe_size(orte_jobid_t job, orte_std_cntr_t univ_size)
{
orte_gpr_value_t *value;
char *segment;
int rc;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND,
segment, 1, 1))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
free(segment);
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]),
ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY, ORTE_STD_CNTR, &univ_size))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
/* put the value in the JOB_GLOBALS container */
value->tokens[0] = strdup(ORTE_JOB_GLOBALS);
/* do the insert */
if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &value))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(value);
return rc;
}
int orte_rmgr_base_get_universe_size(orte_jobid_t job, orte_std_cntr_t *univ_size)
{
char *segment;
orte_gpr_value_t **values;
orte_std_cntr_t i, cnt, *sptr;
char *glob_tokens[] = {
ORTE_JOB_GLOBALS,
NULL
};
char *glob_keys[] = {
ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY,
NULL
};
int rc;
/* set default error response */
*univ_size = -1;
/* get the segment name */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* get the info from the job globals container first */
if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
segment, glob_tokens, glob_keys, &cnt, &values))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
free(segment);
/* it is okay for this value not to be defined yet */
if (0 == cnt) {
return ORTE_SUCCESS;
}
/* can only be one value */
if (1 != cnt || values[0]->cnt != 1) {
ORTE_ERROR_LOG(ORTE_ERR_GPR_DATA_CORRUPT);
rc = ORTE_ERR_GPR_DATA_CORRUPT;
goto cleanup;
}
if (strcmp(values[0]->keyvals[0]->key, ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY) == 0) {
/* this can only occur once, so just store it */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, values[0]->keyvals[0]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
*univ_size = *sptr;
rc = ORTE_SUCCESS;
goto cleanup;
}
cleanup:
for (i=0; i < cnt; i++) {
OBJ_RELEASE(values[i]);
}
if (NULL != values) free(values);
return rc;
}

Просмотреть файл

@ -86,6 +86,9 @@ ORTE_DECLSPEC int orte_rmgr_base_check_context_cwd(orte_app_context_t *context,
ORTE_DECLSPEC int orte_rmgr_base_set_proc_info(const orte_process_name_t* name, pid_t pid, char * nodename);
ORTE_DECLSPEC int orte_rmgr_base_get_proc_info(const orte_process_name_t* name, pid_t* pid, char **nodename);
ORTE_DECLSPEC int orte_rmgr_base_set_universe_size(orte_jobid_t job, orte_std_cntr_t univ_size);
ORTE_DECLSPEC int orte_rmgr_base_get_universe_size(orte_jobid_t job, orte_std_cntr_t *univ_size);
ORTE_DECLSPEC int orte_rmgr_base_connect(orte_std_cntr_t num_connect,
orte_process_name_t *connect);

Просмотреть файл

@ -90,6 +90,10 @@ static int orte_rmgr_cnos_set_proc_info(const orte_process_name_t* name,
int orte_rmgr_cnos_get_proc_info(const orte_process_name_t* name,
pid_t* pid, char **nodename);
static int orte_rmgr_cnos_set_universe_size(orte_jobid_t job, orte_std_cntr_t univ_size);
static int orte_rmgr_cnos_get_universe_size(orte_jobid_t job, orte_std_cntr_t *univ_size);
orte_rmgr_base_module_t orte_rmgr_cnos_module = {
NULL, /* don't need special init */
orte_rmgr_cnos_setup_job,
@ -107,7 +111,9 @@ orte_rmgr_base_module_t orte_rmgr_cnos_module = {
orte_rmgr_cnos_check_context_cwd,
orte_rmgr_cnos_check_context_app,
orte_rmgr_cnos_set_proc_info,
orte_rmgr_cnos_get_proc_info
orte_rmgr_cnos_get_proc_info,
orte_rmgr_cnos_set_universe_size,
orte_rmgr_cnos_get_universe_size
};
@ -219,3 +225,13 @@ int orte_rmgr_cnos_get_proc_info(const orte_process_name_t* name,
{
return ORTE_ERR_NOT_SUPPORTED;
}
static int orte_rmgr_cnos_set_universe_size(orte_jobid_t job, orte_std_cntr_t univ_size)
{
return ORTE_ERR_NOT_SUPPORTED;
}
static int orte_rmgr_cnos_get_universe_size(orte_jobid_t job, orte_std_cntr_t *univ_size)
{
return ORTE_ERR_NOT_SUPPORTED;
}

Просмотреть файл

@ -78,7 +78,9 @@ orte_rmgr_base_module_t orte_rmgr_proxy_module = {
orte_rmgr_base_check_context_cwd,
orte_rmgr_base_check_context_app,
orte_rmgr_base_set_proc_info,
orte_rmgr_base_get_proc_info
orte_rmgr_base_get_proc_info,
orte_rmgr_base_set_universe_size,
orte_rmgr_base_get_universe_size
};

Просмотреть файл

@ -252,7 +252,17 @@ typedef int (*orte_rmgr_base_module_check_context_app_fn_t)(orte_app_context_t *
*/
typedef int (*orte_rmgr_base_module_get_process_info_fn_t)(const orte_process_name_t *name, pid_t *pid, char ** nodename);
/*
/**
* Set the universe size for the job
*/
typedef int (*orte_rmgr_base_module_set_universe_size_fn_t)(orte_jobid_t job, orte_std_cntr_t univ_size);
/**
* Get the universe size for the job
*/
typedef int (*orte_rmgr_base_module_get_universe_size_fn_t)(orte_jobid_t job, orte_std_cntr_t *univ_size);
/*
* Ver 2.0
*/
struct orte_rmgr_base_module_2_0_0_t {
@ -273,6 +283,8 @@ struct orte_rmgr_base_module_2_0_0_t {
orte_rmgr_base_module_check_context_app_fn_t check_context_app;
orte_rmgr_base_module_set_process_info_fn_t set_process_info;
orte_rmgr_base_module_get_process_info_fn_t get_process_info;
orte_rmgr_base_module_set_universe_size_fn_t set_universe_size;
orte_rmgr_base_module_get_universe_size_fn_t get_universe_size;
};
typedef struct orte_rmgr_base_module_2_0_0_t orte_rmgr_base_module_2_0_0_t;

Просмотреть файл

@ -91,7 +91,9 @@ orte_rmgr_base_module_t orte_rmgr_urm_module = {
orte_rmgr_base_check_context_cwd,
orte_rmgr_base_check_context_app,
orte_rmgr_base_set_proc_info,
orte_rmgr_base_get_proc_info
orte_rmgr_base_get_proc_info,
orte_rmgr_base_set_universe_size,
orte_rmgr_base_get_universe_size
};

Просмотреть файл

@ -10,7 +10,7 @@
int main(int argc, char* argv[])
{
int rank, size;
int rank, size, univ_size;
char hostname[512];
void *appnum;
int flag;
@ -19,10 +19,11 @@ int main(int argc, char* argv[])
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_APPNUM, &appnum, &flag);
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, &univ_size, &flag);
gethostname(hostname, 512);
printf("Hello, World, I am %d of %d on host %s from app number %d\n",
rank, size, hostname, *(int*)appnum);
printf("Hello, World, I am %d of %d on host %s from app number %d universe size %d\n",
rank, size, hostname, *(int*)appnum, *(int*)univ_size);
MPI_Finalize();
return 0;