1
1
Setup subscriptions to correctly return the MPI_APPNUM attribute.

Fix an unreported bug that was found. The universe size was incorrectly defined in the attributes code. As coded, it looked for size_t values and based its size computation on those numbers. Unfortunately, the node_slots value had been changed to an orte_std_cntr_t awhile back! So the universe size was never updated.

Update the hello_nodename test to check for MPI_APPNUM.

Add a definition to ns_types for ORTE_PROC_MY_NAME - just a shortcut for orte_process_info.my_name. Brought over from ORTE 2.0 as it will be used extensively there.

This commit was SVN r12377.
Этот коммит содержится в:
Ralph Castain 2006-10-31 21:29:07 +00:00
родитель 48c5117476
Коммит 17c71f8d2a
3 изменённых файлов: 106 добавлений и 37 удалений

Просмотреть файл

@ -109,9 +109,9 @@ static int set_f(int keyval, MPI_Fint value);
int ompi_attr_create_predefined(void) int ompi_attr_create_predefined(void)
{ {
int rc, ret; int rc, ret;
orte_gpr_subscription_id_t id; orte_gpr_subscription_t *subs, sub = ORTE_GPR_SUBSCRIPTION_EMPTY;
char *sub_name, *trig_name; orte_gpr_trigger_t *trigs, trig = ORTE_GPR_TRIGGER_EMPTY;
orte_jobid_t job; char *jobseg;
/* Create all the keyvals */ /* Create all the keyvals */
@ -173,42 +173,93 @@ int ompi_attr_create_predefined(void)
UNIVERSE_SIZE and APPNUM values once everyone has passed UNIVERSE_SIZE and APPNUM values once everyone has passed
stg1. */ stg1. */
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid(&job, orte_process_info.my_name))) { /* we have to create two subscriptions - one to retrieve the number of slots on
ORTE_ERROR_LOG(rc); * each node so we can estimate the universe size, and the other to return our
return rc; * app_context index to properly set the appnum attribute.
} *
* NOTE: when the 2.0 registry becomes available, this should be consolidated to
* a single subscription
*/
/* indicate that this is a standard subscription. This indicates /* indicate that this is a standard subscription. This indicates
that the subscription will be common to all processes. Thus, that the subscription will be common to all processes. Thus,
the resulting data can be consolidated into a the resulting data can be consolidated into a
process-independent message and broadcast to all processes */ process-independent message and broadcast to all processes */
subs = ⊂
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_schema.get_std_subscription_name(&sub_name, (rc = orte_schema.get_std_subscription_name(&sub.name,
OMPI_ATTRIBUTE_SUBSCRIPTION, job))) { OMPI_ATTRIBUTE_SUBSCRIPTION, ORTE_PROC_MY_NAME->jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG;
sub.values = (orte_gpr_value_t**)malloc(2 * sizeof(orte_gpr_value_t*));
sub.cnt = 2;
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(sub.values[0]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
ORTE_NODE_SEGMENT, 1, 0))) {
ORTE_ERROR_LOG(rc);
free(sub.name);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[0]->keyvals[0]), ORTE_NODE_SLOTS_KEY, ORTE_UNDEF, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(sub.values[0]);
free(sub.name);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&jobseg, ORTE_PROC_MY_NAME->jobid))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(sub.values[0]);
free(sub.name);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(sub.values[1]), ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
jobseg, 2, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(sub.values[0]);
free(sub.name);
free(jobseg);
return rc;
}
free(jobseg);
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[1]->keyvals[0]), ORTE_PROC_RANK_KEY, ORTE_UNDEF, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(sub.values[0]);
OBJ_RELEASE(sub.values[1]);
free(sub.name);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(sub.values[1]->keyvals[1]), ORTE_PROC_APP_CONTEXT_KEY, ORTE_UNDEF, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(sub.values[0]);
OBJ_RELEASE(sub.values[1]);
free(sub.name);
return rc;
}
sub.cbfunc = ompi_attr_create_predefined_callback;
/* attach ourselves to the standard stage-1 trigger */ /* attach ourselves to the standard stage-1 trigger */
trigs = &trig;
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_schema.get_std_trigger_name(&trig_name, (rc = orte_schema.get_std_trigger_name(&trig.name,
ORTE_STG1_TRIGGER, job))) { ORTE_STG1_TRIGGER, ORTE_PROC_MY_NAME->jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
free(sub_name); OBJ_RELEASE(sub.values[0]);
OBJ_RELEASE(sub.values[1]);
free(sub.name);
return rc; return rc;
} }
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id, trig_name, sub_name, if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG,
ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
ORTE_NODE_SEGMENT,
NULL, /* wildcard - look at all containers */
ORTE_NODE_SLOTS_KEY,
ompi_attr_create_predefined_callback, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
free(trig_name); OBJ_RELEASE(sub.values[0]);
free(sub_name); OBJ_RELEASE(sub.values[1]);
free(sub.name);
free(trig.name);
return rc; return rc;
} }
@ -247,10 +298,10 @@ void ompi_attr_create_predefined_callback(
void *cbdata) void *cbdata)
{ {
orte_std_cntr_t i, j, k; orte_std_cntr_t i, j, k;
size_t *sptr;
orte_gpr_keyval_t **keyval; orte_gpr_keyval_t **keyval;
orte_gpr_value_t **value; orte_gpr_value_t **value;
orte_jobid_t job; orte_jobid_t job;
orte_std_cntr_t *cptr, rank, app_num;
unsigned int universe_size = 0; unsigned int universe_size = 0;
int rc; int rc;
@ -260,13 +311,6 @@ void ompi_attr_create_predefined_callback(
return; return;
} }
/* Per conversation between Jeff, Edgar, and Ralph - this needs to
* be fixed to properly determine the appnum. Ignore errors here;
* there's no way to propagate the error up, so just try to keep
* going.
*/
set_f(MPI_APPNUM, (MPI_Fint) job);
/* Query the gpr to find out how many CPUs there will be. /* Query the gpr to find out how many CPUs there will be.
This will only return a non-empty list in a persistent This will only return a non-empty list in a persistent
universe. If we don't have a persistent universe, then just universe. If we don't have a persistent universe, then just
@ -299,18 +343,37 @@ void ompi_attr_create_predefined_callback(
if (0 < value[i]->cnt) { /* make sure some data was returned here */ if (0 < value[i]->cnt) { /* make sure some data was returned here */
keyval = value[i]->keyvals; keyval = value[i]->keyvals;
for (j=0; j < value[i]->cnt; j++) { for (j=0; j < value[i]->cnt; j++) {
/* make sure we don't get confused - all slot counts if (0 == strcmp(ORTE_NODE_SLOTS_KEY, keyval[j]->key)) {
* are in size_t fields
*/
if (ORTE_SIZE == keyval[j]->value->type) {
/* Process slot count */ /* Process slot count */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, keyval[j]->value, ORTE_SIZE))) { if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return; return;
} }
universe_size += (unsigned int)(*sptr); universe_size += (unsigned int)(*cptr);
} else if (0 == strcmp(ORTE_PROC_RANK_KEY, keyval[j]->key)) {
/* Process rank */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return;
}
rank = *cptr;
} else if (0 == strcmp(ORTE_PROC_APP_CONTEXT_KEY, keyval[j]->key)) {
/* App context number */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyval[j]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return;
}
app_num = *cptr;
} }
} }
/* see if this value is for this process. We need to
* perform the check since this subscription is associated
* with a trigger - hence, the data from all the procs
* is included in the message
*/
if (rank == ORTE_PROC_MY_NAME->vpid) {
set_f(MPI_APPNUM, (MPI_Fint) app_num);
}
} }
} }
} }

Просмотреть файл

@ -108,6 +108,8 @@ typedef struct orte_process_name_t orte_process_name_t;
ORTE_DECLSPEC extern orte_process_name_t orte_name_all; ORTE_DECLSPEC extern orte_process_name_t orte_name_all;
#define ORTE_NAME_ALL &orte_name_all #define ORTE_NAME_ALL &orte_name_all
#define ORTE_PROC_MY_NAME orte_process_info.my_name
/** /**
* Convert process name from host to network byte order. * Convert process name from host to network byte order.
* *

Просмотреть файл

@ -12,13 +12,17 @@ int main(int argc, char* argv[])
{ {
int rank, size; int rank, size;
char hostname[512]; char hostname[512];
void *appnum;
int flag;
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_APPNUM, &appnum, &flag);
gethostname(hostname, 512); gethostname(hostname, 512);
printf("Hello, World, I am %d of %d on host %s\n", rank, size, hostname); printf("Hello, World, I am %d of %d on host %s from app number %d\n",
rank, size, hostname, *(int*)appnum);
MPI_Finalize(); MPI_Finalize();
return 0; return 0;