From 17c71f8d2a95c853254b162add964fb5a4267cbc Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 31 Oct 2006 21:29:07 +0000 Subject: [PATCH] Fix ticket #545 Setup subscriptions to correctly return the MPI_APPNUM attribute. Fix an unreported bug that was found. The universe size was incorrectly defined in the attributes code. As coded, it looked for size_t values and based its size computation on those numbers. Unfortunately, the node_slots value had been changed to an orte_std_cntr_t awhile back! So the universe size was never updated. Update the hello_nodename test to check for MPI_APPNUM. Add a definition to ns_types for ORTE_PROC_MY_NAME - just a shortcut for orte_process_info.my_name. Brought over from ORTE 2.0 as it will be used extensively there. This commit was SVN r12377. --- ompi/attribute/attribute_predefined.c | 135 +++++++++++++++++++------- orte/mca/ns/ns_types.h | 2 + orte/test/mpi/hello_nodename.c | 6 +- 3 files changed, 106 insertions(+), 37 deletions(-) diff --git a/ompi/attribute/attribute_predefined.c b/ompi/attribute/attribute_predefined.c index 94bee2fcb7..e79d2caf4f 100644 --- a/ompi/attribute/attribute_predefined.c +++ b/ompi/attribute/attribute_predefined.c @@ -109,9 +109,9 @@ static int set_f(int keyval, MPI_Fint value); int ompi_attr_create_predefined(void) { int rc, ret; - orte_gpr_subscription_id_t id; - char *sub_name, *trig_name; - orte_jobid_t job; + orte_gpr_subscription_t *subs, sub = ORTE_GPR_SUBSCRIPTION_EMPTY; + orte_gpr_trigger_t *trigs, trig = ORTE_GPR_TRIGGER_EMPTY; + char *jobseg; /* Create all the keyvals */ @@ -173,42 +173,93 @@ int ompi_attr_create_predefined(void) UNIVERSE_SIZE and APPNUM values once everyone has passed stg1. */ - if (ORTE_SUCCESS != (rc = orte_ns.get_jobid(&job, orte_process_info.my_name))) { - ORTE_ERROR_LOG(rc); - return rc; - } + /* we have to create two subscriptions - one to retrieve the number of slots on + * each node so we can estimate the universe size, and the other to return our + * app_context index to properly set the appnum attribute. + * + * NOTE: when the 2.0 registry becomes available, this should be consolidated to + * a single subscription + */ /* indicate that this is a standard subscription. This indicates that the subscription will be common to all processes. Thus, the resulting data can be consolidated into a process-independent message and broadcast to all processes */ + subs = ⊂ if (ORTE_SUCCESS != - (rc = orte_schema.get_std_subscription_name(&sub_name, - OMPI_ATTRIBUTE_SUBSCRIPTION, job))) { + (rc = orte_schema.get_std_subscription_name(&sub.name, + OMPI_ATTRIBUTE_SUBSCRIPTION, ORTE_PROC_MY_NAME->jobid))) { ORTE_ERROR_LOG(rc); return rc; } + sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG; + sub.values = (orte_gpr_value_t**)malloc(2 * sizeof(orte_gpr_value_t*)); + sub.cnt = 2; + + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(sub.values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, + ORTE_NODE_SEGMENT, 1, 0))) { + ORTE_ERROR_LOG(rc); + free(sub.name); + return rc; + } + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[0]->keyvals[0]), ORTE_NODE_SLOTS_KEY, ORTE_UNDEF, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(sub.values[0]); + free(sub.name); + return rc; + } + if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&jobseg, ORTE_PROC_MY_NAME->jobid))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(sub.values[0]); + free(sub.name); + return rc; + } + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(sub.values[1]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, + jobseg, 2, 0))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(sub.values[0]); + free(sub.name); + free(jobseg); + return rc; + } + free(jobseg); + + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[1]->keyvals[0]), ORTE_PROC_RANK_KEY, ORTE_UNDEF, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(sub.values[0]); + OBJ_RELEASE(sub.values[1]); + free(sub.name); + return rc; + } + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[1]->keyvals[1]), ORTE_PROC_APP_CONTEXT_KEY, ORTE_UNDEF, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(sub.values[0]); + OBJ_RELEASE(sub.values[1]); + free(sub.name); + return rc; + } + sub.cbfunc = ompi_attr_create_predefined_callback; + /* attach ourselves to the standard stage-1 trigger */ + trigs = &trig; if (ORTE_SUCCESS != - (rc = orte_schema.get_std_trigger_name(&trig_name, - ORTE_STG1_TRIGGER, job))) { + (rc = orte_schema.get_std_trigger_name(&trig.name, + ORTE_STG1_TRIGGER, ORTE_PROC_MY_NAME->jobid))) { ORTE_ERROR_LOG(rc); - free(sub_name); + OBJ_RELEASE(sub.values[0]); + OBJ_RELEASE(sub.values[1]); + free(sub.name); return rc; } - if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id, trig_name, sub_name, - ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG, - ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, - ORTE_NODE_SEGMENT, - NULL, /* wildcard - look at all containers */ - ORTE_NODE_SLOTS_KEY, - ompi_attr_create_predefined_callback, NULL))) { + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { ORTE_ERROR_LOG(rc); } - free(trig_name); - free(sub_name); + OBJ_RELEASE(sub.values[0]); + OBJ_RELEASE(sub.values[1]); + free(sub.name); + free(trig.name); return rc; } @@ -247,10 +298,10 @@ void ompi_attr_create_predefined_callback( void *cbdata) { orte_std_cntr_t i, j, k; - size_t *sptr; orte_gpr_keyval_t **keyval; orte_gpr_value_t **value; orte_jobid_t job; + orte_std_cntr_t *cptr, rank, app_num; unsigned int universe_size = 0; int rc; @@ -260,13 +311,6 @@ void ompi_attr_create_predefined_callback( return; } - /* Per conversation between Jeff, Edgar, and Ralph - this needs to - * be fixed to properly determine the appnum. Ignore errors here; - * there's no way to propagate the error up, so just try to keep - * going. - */ - set_f(MPI_APPNUM, (MPI_Fint) job); - /* Query the gpr to find out how many CPUs there will be. This will only return a non-empty list in a persistent universe. If we don't have a persistent universe, then just @@ -299,18 +343,37 @@ void ompi_attr_create_predefined_callback( if (0 < value[i]->cnt) { /* make sure some data was returned here */ keyval = value[i]->keyvals; for (j=0; j < value[i]->cnt; j++) { - /* make sure we don't get confused - all slot counts - * are in size_t fields - */ - if (ORTE_SIZE == keyval[j]->value->type) { + if (0 == strcmp(ORTE_NODE_SLOTS_KEY, keyval[j]->key)) { /* Process slot count */ - if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, keyval[j]->value, ORTE_SIZE))) { + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) { ORTE_ERROR_LOG(rc); return; } - universe_size += (unsigned int)(*sptr); + universe_size += (unsigned int)(*cptr); + } else if (0 == strcmp(ORTE_PROC_RANK_KEY, keyval[j]->key)) { + /* Process rank */ + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + return; + } + rank = *cptr; + } else if (0 == strcmp(ORTE_PROC_APP_CONTEXT_KEY, keyval[j]->key)) { + /* App context number */ + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + return; + } + app_num = *cptr; } - } + } + /* see if this value is for this process. We need to + * perform the check since this subscription is associated + * with a trigger - hence, the data from all the procs + * is included in the message + */ + if (rank == ORTE_PROC_MY_NAME->vpid) { + set_f(MPI_APPNUM, (MPI_Fint) app_num); + } } } } diff --git a/orte/mca/ns/ns_types.h b/orte/mca/ns/ns_types.h index 7cca8d9435..0042e0ffbf 100644 --- a/orte/mca/ns/ns_types.h +++ b/orte/mca/ns/ns_types.h @@ -108,6 +108,8 @@ typedef struct orte_process_name_t orte_process_name_t; ORTE_DECLSPEC extern orte_process_name_t orte_name_all; #define ORTE_NAME_ALL &orte_name_all +#define ORTE_PROC_MY_NAME orte_process_info.my_name + /** * Convert process name from host to network byte order. * diff --git a/orte/test/mpi/hello_nodename.c b/orte/test/mpi/hello_nodename.c index 33d44cbd4f..c5a8737838 100644 --- a/orte/test/mpi/hello_nodename.c +++ b/orte/test/mpi/hello_nodename.c @@ -12,13 +12,17 @@ int main(int argc, char* argv[]) { int rank, size; char hostname[512]; + void *appnum; + int flag; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_APPNUM, &appnum, &flag); gethostname(hostname, 512); - printf("Hello, World, I am %d of %d on host %s\n", rank, size, hostname); + printf("Hello, World, I am %d of %d on host %s from app number %d\n", + rank, size, hostname, *(int*)appnum); MPI_Finalize(); return 0;