diff --git a/ompi/attribute/attribute_predefined.c b/ompi/attribute/attribute_predefined.c index ca55525d98..6d7d841e8a 100644 --- a/ompi/attribute/attribute_predefined.c +++ b/ompi/attribute/attribute_predefined.c @@ -113,6 +113,7 @@ int ompi_attr_create_predefined(void) int rc, ret; orte_gpr_subscription_t *subs, sub = ORTE_GPR_SUBSCRIPTION_EMPTY; orte_gpr_trigger_t *trigs, trig = ORTE_GPR_TRIGGER_EMPTY; + orte_gpr_value_t *values[1]; char *jobseg; /* Create all the keyvals */ @@ -195,52 +196,31 @@ int ompi_attr_create_predefined(void) return rc; } sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG; - sub.values = (orte_gpr_value_t**)malloc(2 * sizeof(orte_gpr_value_t*)); - sub.cnt = 2; + sub.values = values; + sub.cnt = 1; - if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(sub.values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, - ORTE_NODE_SEGMENT, 1, 0))) { + if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&jobseg, ORTE_PROC_MY_NAME->jobid))) { ORTE_ERROR_LOG(rc); free(sub.name); return rc; } - if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[0]->keyvals[0]), ORTE_NODE_SLOTS_KEY, ORTE_UNDEF, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(sub.values[0]); - free(sub.name); - return rc; - } - if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&jobseg, ORTE_PROC_MY_NAME->jobid))) { + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, + jobseg, 1, 0))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(sub.values[0]); - free(sub.name); - return rc; - } - if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(sub.values[1]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, - jobseg, 2, 0))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(sub.values[0]); - free(sub.name); free(jobseg); + free(sub.name); return rc; } free(jobseg); - if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[1]->keyvals[0]), ORTE_PROC_RANK_KEY, ORTE_UNDEF, NULL))) { + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[0]->keyvals[0]), ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY, ORTE_UNDEF, NULL))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(sub.values[0]); - OBJ_RELEASE(sub.values[1]); - free(sub.name); - return rc; - } - if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[1]->keyvals[1]), ORTE_PROC_APP_CONTEXT_KEY, ORTE_UNDEF, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(sub.values[0]); - OBJ_RELEASE(sub.values[1]); + OBJ_RELEASE(values[0]); free(sub.name); return rc; } + sub.cbfunc = ompi_attr_create_predefined_callback; /* attach ourselves to the standard stage-1 trigger */ @@ -249,8 +229,7 @@ int ompi_attr_create_predefined(void) (rc = orte_schema.get_std_trigger_name(&trig.name, ORTE_STG1_TRIGGER, ORTE_PROC_MY_NAME->jobid))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(sub.values[0]); - OBJ_RELEASE(sub.values[1]); + OBJ_RELEASE(values[0]); free(sub.name); return rc; } @@ -258,8 +237,7 @@ int ompi_attr_create_predefined(void) if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { ORTE_ERROR_LOG(rc); } - OBJ_RELEASE(sub.values[0]); - OBJ_RELEASE(sub.values[1]); + OBJ_RELEASE(values[0]); free(sub.name); free(trig.name); @@ -299,18 +277,11 @@ void ompi_attr_create_predefined_callback( orte_gpr_notify_data_t *data, void *cbdata) { - orte_std_cntr_t i, j, k; - orte_gpr_keyval_t **keyval; orte_gpr_value_t **value; - orte_jobid_t job; - orte_std_cntr_t *cptr, rank, app_num = 0; - unsigned int universe_size = 0; + orte_std_cntr_t *cptr; + unsigned int universe_size; int rc; - /* Set some default values */ - - job = ORTE_PROC_MY_NAME->jobid; - /* Query the gpr to find out how many CPUs there will be. This will only return a non-empty list in a persistent universe. If we don't have a persistent universe, then just @@ -332,58 +303,33 @@ void ompi_attr_create_predefined_callback( * happens in-between anyway, so this shouldn't cause a problem. */ - if (0 == data->cnt) { /* no data returned */ + if (1 != data->cnt) { /* only one data value should be returned, or else something is wrong - use default */ universe_size = ompi_comm_size(MPI_COMM_WORLD); } else { value = (orte_gpr_value_t**)(data->values)->addr; - for (i=0, k=0; k < data->cnt && - i < (data->values)->size; i++) { - if (NULL != value[i]) { - k++; - rank = ORTE_VPID_INVALID; - if (0 < value[i]->cnt) { /* make sure some data was returned here */ - keyval = value[i]->keyvals; - for (j=0; j < value[i]->cnt; j++) { - if (0 == strcmp(ORTE_NODE_SLOTS_KEY, keyval[j]->key)) { - /* Process slot count */ - if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - return; - } - universe_size += (unsigned int)(*cptr); - } else if (0 == strcmp(ORTE_PROC_RANK_KEY, keyval[j]->key)) { - /* Process rank */ - if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - return; - } - rank = *cptr; - } else if (0 == strcmp(ORTE_PROC_APP_CONTEXT_KEY, keyval[j]->key)) { - /* App context number */ - if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - return; - } - app_num = *cptr; - } - } - /* see if this value is for this process. We need to - * perform the check since this subscription is associated - * with a trigger - hence, the data from all the procs - * is included in the message - */ - if (rank == ORTE_PROC_MY_NAME->vpid) { - set_f(MPI_APPNUM, (MPI_Fint) app_num); - } - } + if (NULL == value[0]) { + /* again, got an error - use default */ + universe_size = ompi_comm_size(MPI_COMM_WORLD); + } else { + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, value[0]->keyvals[0]->value, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + return; } + universe_size = (unsigned int)(*cptr); } } - /* Same as above -- ignore errors here because there's nothing we + /* ignore errors here because there's nothing we can do if there's any error anyway */ - set_f(MPI_UNIVERSE_SIZE, universe_size); + + + /* the app_context index for this app was passed in via the ODLS framework + * and stored in the orte_process_info structure when that struct was initialized - set + * the corresponding attribute here + */ + set_f(MPI_APPNUM, (MPI_Fint) orte_process_info.app_num); + return; } diff --git a/orte/mca/odls/default/odls_default_module.c b/orte/mca/odls/default/odls_default_module.c index ea6972e446..f74cecf7a8 100644 --- a/orte/mca/odls/default/odls_default_module.c +++ b/orte/mca/odls/default/odls_default_module.c @@ -814,6 +814,13 @@ static int odls_default_fork_local_proc( free(param); free(uri); + /* set the app_context number into the environment */ + param = mca_base_param_environ_variable("orte","app","num"); + asprintf(¶m2, "%ld", (long)child->app_idx); + opal_setenv(param, param2, true, &environ_copy); + free(param); + free(param2); + /* use same nodename as the starting daemon (us) */ param = mca_base_param_environ_variable("orte", "base", "nodename"); opal_setenv(param, orte_system_info.nodename, true, &environ_copy); diff --git a/orte/mca/oob/base/oob_base_xcast.c b/orte/mca/oob/base/oob_base_xcast.c index 671b09b9d4..15b9073948 100644 --- a/orte/mca/oob/base/oob_base_xcast.c +++ b/orte/mca/oob/base/oob_base_xcast.c @@ -73,7 +73,7 @@ int mca_oob_xcast(orte_jobid_t job, if (NULL != buffer) { opal_output(0, "xcast [%ld,%ld,%ld]: buffer size %lu", ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), (unsigned long)buffer->bytes_used); - } + } gettimeofday(&start, NULL); } switch(orte_oob_xcast_mode) { diff --git a/orte/mca/ras/base/ras_base_node.c b/orte/mca/ras/base/ras_base_node.c index 8d39316e65..e44d167861 100644 --- a/orte/mca/ras/base/ras_base_node.c +++ b/orte/mca/ras/base/ras_base_node.c @@ -675,9 +675,9 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid) opal_list_item_t* item; orte_gpr_value_t **values; int rc; - orte_std_cntr_t num_values, i, j; + orte_std_cntr_t num_values, i, j, total_slots; orte_ras_node_t* node; - char* jobid_str, *key=NULL; + char* jobid_str, *key=NULL, *segment; num_values = (orte_std_cntr_t)opal_list_get_size(nodes); if (0 >= num_values) { @@ -685,7 +685,10 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid) return ORTE_ERR_BAD_PARAM; } - values = (orte_gpr_value_t**)malloc(num_values * sizeof(orte_gpr_value_t*)); + /* get one value more than needed for these nodes so we can store the total number of + * slots being assigned to this jobid in the job segment + */ + values = (orte_gpr_value_t**)malloc((1+num_values) * sizeof(orte_gpr_value_t*)); if (NULL == values) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; @@ -711,6 +714,9 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid) asprintf(&key, "%s-%s", ORTE_NODE_SLOTS_ALLOC_KEY, jobid_str); free(jobid_str); + /* initialize the total slots */ + total_slots = 0; + for(i=0, item = opal_list_get_first(nodes); i < num_values && item != opal_list_get_end(nodes); i++, item = opal_list_get_next(item)) { @@ -733,15 +739,42 @@ int orte_ras_base_node_assign(opal_list_t* nodes, orte_jobid_t jobid) free(key); goto cleanup; } + + /* add the slots to our total */ + total_slots += node->node_slots_alloc; } - /* try the insert */ - if (ORTE_SUCCESS != (rc = orte_gpr.put(num_values, values))) { + /* setup to store the total number of slots */ + if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[num_values]), ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND, + segment, 1, 1))) { + ORTE_ERROR_LOG(rc); + free(segment); + goto cleanup; + } + free(segment); + + /* enter the value */ + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[num_values]->keyvals[0]), + ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY, ORTE_STD_CNTR, &total_slots))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* put the value in the JOB_GLOBALS container */ + values[num_values]->tokens[0] = strdup(ORTE_JOB_GLOBALS); + + /* do the insert */ + if (ORTE_SUCCESS != (rc = orte_gpr.put((1+num_values), values))) { ORTE_ERROR_LOG(rc); } cleanup: - for (j=0; j < num_values; j++) { + for (j=0; j < (1+num_values); j++) { OBJ_RELEASE(values[j]); } if (NULL != values) free(values); diff --git a/orte/mca/schema/schema_types.h b/orte/mca/schema/schema_types.h index 392e619d18..3716172a2b 100644 --- a/orte/mca/schema/schema_types.h +++ b/orte/mca/schema/schema_types.h @@ -78,6 +78,7 @@ #define ORTE_JOB_VPID_START_KEY "orte-job-vpid-start" #define ORTE_JOB_VPID_RANGE_KEY "orte-job-vpid-range" #define ORTE_JOB_OVERSUBSCRIBE_OVERRIDE_KEY "orte-job-override-oversubscribe" +#define ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY "orte-job-total-slots" #define ORTE_JOB_IOF_KEY "orte-job-iof" #define ORTE_JOB_STATE_KEY "orte-job-state" #define ORTE_PROC_NAME_KEY "orte-proc-name" diff --git a/orte/test/unit/ns/ns_string_fns.c b/orte/test/unit/ns/ns_string_fns.c index 8e206dd956..5f0cfdf27d 100644 --- a/orte/test/unit/ns/ns_string_fns.c +++ b/orte/test/unit/ns/ns_string_fns.c @@ -51,7 +51,7 @@ int main(int argc, char **argv) free(test_name); /* convert a string to a name */ - tmp = strdup("1234.5678.0010"); + tmp = strdup("124.5678.0010"); if (ORTE_SUCCESS != (rc = orte_ns.convert_string_to_process_name(&test_name, tmp))) { /* got error */ fprintf(stderr, "convert string to process name failed with error %s\n", ORTE_ERROR_NAME(rc)); diff --git a/orte/util/proc_info.c b/orte/util/proc_info.c index 97f5a34b1f..bf7cff6d3b 100644 --- a/orte/util/proc_info.c +++ b/orte/util/proc_info.c @@ -35,6 +35,7 @@ ORTE_DECLSPEC orte_proc_info_t orte_process_info = { /* .my_name = */ NULL, + /* ,app_num = */ -1, /* .singleton = */ false, /* .vpid_start = */ 0, /* .num_procs = */ 1, @@ -73,6 +74,10 @@ int orte_proc_info(void) orte_process_info.daemon = false; } + id = mca_base_param_register_int("orte", "app", "num", NULL, -1); + mca_base_param_lookup_int(id, &tmp); + orte_process_info.app_num = tmp; + id = mca_base_param_register_string("gpr", "replica", "uri", NULL, orte_process_info.gpr_replica_uri); mca_base_param_lookup_string(id, &(orte_process_info.gpr_replica_uri)); mca_base_param_set_internal(id, true); diff --git a/orte/util/proc_info.h b/orte/util/proc_info.h index 6e589d90f1..9f8c7bc454 100644 --- a/orte/util/proc_info.h +++ b/orte/util/proc_info.h @@ -48,6 +48,7 @@ extern "C" { */ struct orte_proc_info_t { orte_process_name_t *my_name; /**< My official process name */ + orte_std_cntr_t app_num; /**< our index into the app_context array */ bool singleton; /**< I am a singleton */ orte_vpid_t vpid_start; /**< starting vpid for this job */ orte_std_cntr_t num_procs; /**< number of processes in this job */