Eliminate virtually all of the attribute_predefined data from the STG1 message. We now compute the total number of slots allocated to us and save that in the registry - the attributed_predefined then retrieves it via the STG1 message. The app_num is passed via the process_info structure, which gets the value from the ODLS in the environment.
Obviously, people like bproc will have to get the app_num via another avenue...but that's a problem for another day. Several options are easily available. This commit was SVN r12788.
Этот коммит содержится в:
родитель
41a70a8f01
Коммит
a1153fdc8f
@ -113,6 +113,7 @@ int ompi_attr_create_predefined(void)
|
||||
int rc, ret;
|
||||
orte_gpr_subscription_t *subs, sub = ORTE_GPR_SUBSCRIPTION_EMPTY;
|
||||
orte_gpr_trigger_t *trigs, trig = ORTE_GPR_TRIGGER_EMPTY;
|
||||
orte_gpr_value_t *values[1];
|
||||
char *jobseg;
|
||||
|
||||
/* Create all the keyvals */
|
||||
@ -195,52 +196,31 @@ int ompi_attr_create_predefined(void)
|
||||
return rc;
|
||||
}
|
||||
sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG;
|
||||
sub.values = (orte_gpr_value_t**)malloc(2 * sizeof(orte_gpr_value_t*));
|
||||
sub.cnt = 2;
|
||||
sub.values = values;
|
||||
sub.cnt = 1;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(sub.values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
|
||||
ORTE_NODE_SEGMENT, 1, 0))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&jobseg, ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[0]->keyvals[0]), ORTE_NODE_SLOTS_KEY, ORTE_UNDEF, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(sub.values[0]);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&jobseg, ORTE_PROC_MY_NAME->jobid))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
|
||||
jobseg, 1, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(sub.values[0]);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(sub.values[1]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
|
||||
jobseg, 2, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(sub.values[0]);
|
||||
free(sub.name);
|
||||
free(jobseg);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
free(jobseg);
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[1]->keyvals[0]), ORTE_PROC_RANK_KEY, ORTE_UNDEF, NULL))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[0]->keyvals[0]), ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY, ORTE_UNDEF, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(sub.values[0]);
|
||||
OBJ_RELEASE(sub.values[1]);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[1]->keyvals[1]), ORTE_PROC_APP_CONTEXT_KEY, ORTE_UNDEF, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(sub.values[0]);
|
||||
OBJ_RELEASE(sub.values[1]);
|
||||
OBJ_RELEASE(values[0]);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
|
||||
sub.cbfunc = ompi_attr_create_predefined_callback;
|
||||
|
||||
/* attach ourselves to the standard stage-1 trigger */
|
||||
@ -249,8 +229,7 @@ int ompi_attr_create_predefined(void)
|
||||
(rc = orte_schema.get_std_trigger_name(&trig.name,
|
||||
ORTE_STG1_TRIGGER, ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(sub.values[0]);
|
||||
OBJ_RELEASE(sub.values[1]);
|
||||
OBJ_RELEASE(values[0]);
|
||||
free(sub.name);
|
||||
return rc;
|
||||
}
|
||||
@ -258,8 +237,7 @@ int ompi_attr_create_predefined(void)
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
OBJ_RELEASE(sub.values[0]);
|
||||
OBJ_RELEASE(sub.values[1]);
|
||||
OBJ_RELEASE(values[0]);
|
||||
free(sub.name);
|
||||
free(trig.name);
|
||||
|
||||
@ -299,18 +277,11 @@ void ompi_attr_create_predefined_callback(
|
||||
orte_gpr_notify_data_t *data,
|
||||
void *cbdata)
|
||||
{
|
||||
orte_std_cntr_t i, j, k;
|
||||
orte_gpr_keyval_t **keyval;
|
||||
orte_gpr_value_t **value;
|
||||
orte_jobid_t job;
|
||||
orte_std_cntr_t *cptr, rank, app_num = 0;
|
||||
unsigned int universe_size = 0;
|
||||
orte_std_cntr_t *cptr;
|
||||
unsigned int universe_size;
|
||||
int rc;
|
||||
|
||||
/* Set some default values */
|
||||
|
||||
job = ORTE_PROC_MY_NAME->jobid;
|
||||
|
||||
/* Query the gpr to find out how many CPUs there will be.
|
||||
This will only return a non-empty list in a persistent
|
||||
universe. If we don't have a persistent universe, then just
|
||||
@ -332,58 +303,33 @@ void ompi_attr_create_predefined_callback(
|
||||
* happens in-between anyway, so this shouldn't cause a problem.
|
||||
*/
|
||||
|
||||
if (0 == data->cnt) { /* no data returned */
|
||||
if (1 != data->cnt) { /* only one data value should be returned, or else something is wrong - use default */
|
||||
universe_size = ompi_comm_size(MPI_COMM_WORLD);
|
||||
} else {
|
||||
value = (orte_gpr_value_t**)(data->values)->addr;
|
||||
for (i=0, k=0; k < data->cnt &&
|
||||
i < (data->values)->size; i++) {
|
||||
if (NULL != value[i]) {
|
||||
k++;
|
||||
rank = ORTE_VPID_INVALID;
|
||||
if (0 < value[i]->cnt) { /* make sure some data was returned here */
|
||||
keyval = value[i]->keyvals;
|
||||
for (j=0; j < value[i]->cnt; j++) {
|
||||
if (0 == strcmp(ORTE_NODE_SLOTS_KEY, keyval[j]->key)) {
|
||||
/* Process slot count */
|
||||
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
universe_size += (unsigned int)(*cptr);
|
||||
} else if (0 == strcmp(ORTE_PROC_RANK_KEY, keyval[j]->key)) {
|
||||
/* Process rank */
|
||||
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
rank = *cptr;
|
||||
} else if (0 == strcmp(ORTE_PROC_APP_CONTEXT_KEY, keyval[j]->key)) {
|
||||
/* App context number */
|
||||
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
app_num = *cptr;
|
||||
}
|
||||
}
|
||||
/* see if this value is for this process. We need to
|
||||
* perform the check since this subscription is associated
|
||||
* with a trigger - hence, the data from all the procs
|
||||
* is included in the message
|
||||
*/
|
||||
if (rank == ORTE_PROC_MY_NAME->vpid) {
|
||||
set_f(MPI_APPNUM, (MPI_Fint) app_num);
|
||||
}
|
||||
}
|
||||
if (NULL == value[0]) {
|
||||
/* again, got an error - use default */
|
||||
universe_size = ompi_comm_size(MPI_COMM_WORLD);
|
||||
} else {
|
||||
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, value[0]->keyvals[0]->value, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
universe_size = (unsigned int)(*cptr);
|
||||
}
|
||||
}
|
||||
|
||||
/* Same as above -- ignore errors here because there's nothing we
|
||||
/* ignore errors here because there's nothing we
|
||||
can do if there's any error anyway */
|
||||
|
||||
set_f(MPI_UNIVERSE_SIZE, universe_size);
|
||||
|
||||
|
||||
/* the app_context index for this app was passed in via the ODLS framework
|
||||
* and stored in the orte_process_info structure when that struct was initialized - set
|
||||
* the corresponding attribute here
|
||||
*/
|
||||
set_f(MPI_APPNUM, (MPI_Fint) orte_process_info.app_num);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -814,6 +814,13 @@ static int odls_default_fork_local_proc(
|
||||
free(param);
|
||||
free(uri);
|
||||
|
||||
/* set the app_context number into the environment */
|
||||
param = mca_base_param_environ_variable("orte","app","num");
|
||||
asprintf(¶m2, "%ld", (long)child->app_idx);
|
||||
opal_setenv(param, param2, true, &environ_copy);
|
||||
free(param);
|
||||
free(param2);
|
||||
|
||||
/* use same nodename as the starting daemon (us) */
|
||||
param = mca_base_param_environ_variable("orte", "base", "nodename");
|
||||
opal_setenv(param, orte_system_info.nodename, true, &environ_copy);
|
||||
|
@ -73,7 +73,7 @@ int mca_oob_xcast(orte_jobid_t job,
|
||||
if (NULL != buffer) {
|
||||
opal_output(0, "xcast [%ld,%ld,%ld]: buffer size %lu", ORTE_NAME_ARGS(ORTE_PROC_MY_NAME),
|
||||
(unsigned long)buffer->bytes_used);
|
||||
}
|
||||
}
|
||||
gettimeofday(&start, NULL);
|
||||
}
|
||||
switch(orte_oob_xcast_mode) {
|
||||
|
@ -675,9 +675,9 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid)
|
||||
opal_list_item_t* item;
|
||||
orte_gpr_value_t **values;
|
||||
int rc;
|
||||
orte_std_cntr_t num_values, i, j;
|
||||
orte_std_cntr_t num_values, i, j, total_slots;
|
||||
orte_ras_node_t* node;
|
||||
char* jobid_str, *key=NULL;
|
||||
char* jobid_str, *key=NULL, *segment;
|
||||
|
||||
num_values = (orte_std_cntr_t)opal_list_get_size(nodes);
|
||||
if (0 >= num_values) {
|
||||
@ -685,7 +685,10 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid)
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
values = (orte_gpr_value_t**)malloc(num_values * sizeof(orte_gpr_value_t*));
|
||||
/* get one value more than needed for these nodes so we can store the total number of
|
||||
* slots being assigned to this jobid in the job segment
|
||||
*/
|
||||
values = (orte_gpr_value_t**)malloc((1+num_values) * sizeof(orte_gpr_value_t*));
|
||||
if (NULL == values) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
@ -711,6 +714,9 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid)
|
||||
asprintf(&key, "%s-%s", ORTE_NODE_SLOTS_ALLOC_KEY, jobid_str);
|
||||
free(jobid_str);
|
||||
|
||||
/* initialize the total slots */
|
||||
total_slots = 0;
|
||||
|
||||
for(i=0, item = opal_list_get_first(nodes);
|
||||
i < num_values && item != opal_list_get_end(nodes);
|
||||
i++, item = opal_list_get_next(item)) {
|
||||
@ -733,15 +739,42 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid)
|
||||
free(key);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* add the slots to our total */
|
||||
total_slots += node->node_slots_alloc;
|
||||
}
|
||||
|
||||
/* try the insert */
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.put(num_values, values))) {
|
||||
/* setup to store the total number of slots */
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[num_values]), ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND,
|
||||
segment, 1, 1))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(segment);
|
||||
goto cleanup;
|
||||
}
|
||||
free(segment);
|
||||
|
||||
/* enter the value */
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[num_values]->keyvals[0]),
|
||||
ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY, ORTE_STD_CNTR, &total_slots))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* put the value in the JOB_GLOBALS container */
|
||||
values[num_values]->tokens[0] = strdup(ORTE_JOB_GLOBALS);
|
||||
|
||||
/* do the insert */
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.put((1+num_values), values))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
for (j=0; j < num_values; j++) {
|
||||
for (j=0; j < (1+num_values); j++) {
|
||||
OBJ_RELEASE(values[j]);
|
||||
}
|
||||
if (NULL != values) free(values);
|
||||
|
@ -78,6 +78,7 @@
|
||||
#define ORTE_JOB_VPID_START_KEY "orte-job-vpid-start"
|
||||
#define ORTE_JOB_VPID_RANGE_KEY "orte-job-vpid-range"
|
||||
#define ORTE_JOB_OVERSUBSCRIBE_OVERRIDE_KEY "orte-job-override-oversubscribe"
|
||||
#define ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY "orte-job-total-slots"
|
||||
#define ORTE_JOB_IOF_KEY "orte-job-iof"
|
||||
#define ORTE_JOB_STATE_KEY "orte-job-state"
|
||||
#define ORTE_PROC_NAME_KEY "orte-proc-name"
|
||||
|
@ -51,7 +51,7 @@ int main(int argc, char **argv)
|
||||
free(test_name);
|
||||
|
||||
/* convert a string to a name */
|
||||
tmp = strdup("1234.5678.0010");
|
||||
tmp = strdup("124.5678.0010");
|
||||
if (ORTE_SUCCESS != (rc = orte_ns.convert_string_to_process_name(&test_name, tmp))) { /* got error */
|
||||
fprintf(stderr, "convert string to process name failed with error %s\n",
|
||||
ORTE_ERROR_NAME(rc));
|
||||
|
@ -35,6 +35,7 @@
|
||||
|
||||
ORTE_DECLSPEC orte_proc_info_t orte_process_info = {
|
||||
/* .my_name = */ NULL,
|
||||
/* ,app_num = */ -1,
|
||||
/* .singleton = */ false,
|
||||
/* .vpid_start = */ 0,
|
||||
/* .num_procs = */ 1,
|
||||
@ -73,6 +74,10 @@ int orte_proc_info(void)
|
||||
orte_process_info.daemon = false;
|
||||
}
|
||||
|
||||
id = mca_base_param_register_int("orte", "app", "num", NULL, -1);
|
||||
mca_base_param_lookup_int(id, &tmp);
|
||||
orte_process_info.app_num = tmp;
|
||||
|
||||
id = mca_base_param_register_string("gpr", "replica", "uri", NULL, orte_process_info.gpr_replica_uri);
|
||||
mca_base_param_lookup_string(id, &(orte_process_info.gpr_replica_uri));
|
||||
mca_base_param_set_internal(id, true);
|
||||
|
@ -48,6 +48,7 @@ extern "C" {
|
||||
*/
|
||||
struct orte_proc_info_t {
|
||||
orte_process_name_t *my_name; /**< My official process name */
|
||||
orte_std_cntr_t app_num; /**< our index into the app_context array */
|
||||
bool singleton; /**< I am a singleton */
|
||||
orte_vpid_t vpid_start; /**< starting vpid for this job */
|
||||
orte_std_cntr_t num_procs; /**< number of processes in this job */
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user