1
1

- A bunch of little pedantic / style fixes

- Change ompi_proc_world() to only return the procs in this job (as
  opposed to all of them)
- Add a subscription that fires during MPI_INIT (stg1) for figuring
  out which procs are on my local node.  Need to figure out what to do
  in the esoteric cases -- but the obvious one (Red Storm), where
  subscriptions are never fired, is ok, because by definition, no
  other procs will be on my node, so their default value (not on my
  node) is ok.
  --> Need to have RHC check this code; it seems to work, but I think
      I'm getting too much data back from the subscription.
- End result is that any proc that is on my node will have its
  OMPI_PROC_FLAG_LOCAL bit set on its proc->proc_flags field.
- Added/corrected a few comments in proc.h.

This commit was SVN r6507.
Этот коммит содержится в:
Jeff Squyres 2005-07-14 22:43:01 +00:00
родитель 088bd37454
Коммит 94160da4c0
2 изменённых файлов: 272 добавлений и 37 удалений

Просмотреть файл

@ -20,13 +20,15 @@
#include "opal/threads/mutex.h" #include "opal/threads/mutex.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "util/proc_info.h" #include "opal/util/sys_info.h"
#include "dps/dps.h" #include "orte/dps/dps.h"
#include "proc/proc.h" #include "orte/mca/oob/oob.h"
#include "mca/oob/oob.h" #include "orte/mca/ns/ns.h"
#include "mca/ns/ns.h" #include "orte/mca/gpr/gpr.h"
#include "mca/pml/pml.h" #include "orte/util/proc_info.h"
#include "datatype/convertor.h" #include "ompi/proc/proc.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/datatype/convertor.h"
static opal_list_t ompi_proc_list; static opal_list_t ompi_proc_list;
static opal_mutex_t ompi_proc_lock; static opal_mutex_t ompi_proc_lock;
@ -34,7 +36,8 @@ ompi_proc_t* ompi_proc_local_proc = NULL;
static void ompi_proc_construct(ompi_proc_t* proc); static void ompi_proc_construct(ompi_proc_t* proc);
static void ompi_proc_destruct(ompi_proc_t* proc); static void ompi_proc_destruct(ompi_proc_t* proc);
static int setup_registry_callback(void);
static void callback(orte_gpr_notify_data_t *data, void *cbdata);
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(
ompi_proc_t, ompi_proc_t,
@ -54,6 +57,8 @@ void ompi_proc_construct(ompi_proc_t* proc)
proc->proc_convertor = ompi_convertor_create(0, 0); proc->proc_convertor = ompi_convertor_create(0, 0);
proc->proc_arch = 0; proc->proc_arch = 0;
proc->proc_flags = 0;
OPAL_THREAD_LOCK(&ompi_proc_lock); OPAL_THREAD_LOCK(&ompi_proc_lock);
opal_list_append(&ompi_proc_list, (opal_list_item_t*)proc); opal_list_append(&ompi_proc_list, (opal_list_item_t*)proc);
OPAL_THREAD_UNLOCK(&ompi_proc_lock); OPAL_THREAD_UNLOCK(&ompi_proc_lock);
@ -62,8 +67,9 @@ void ompi_proc_construct(ompi_proc_t* proc)
void ompi_proc_destruct(ompi_proc_t* proc) void ompi_proc_destruct(ompi_proc_t* proc)
{ {
if(proc->proc_modex != NULL) if (proc->proc_modex != NULL) {
OBJ_RELEASE(proc->proc_modex); OBJ_RELEASE(proc->proc_modex);
}
OBJ_RELEASE( proc->proc_convertor ); OBJ_RELEASE( proc->proc_convertor );
OPAL_THREAD_LOCK(&ompi_proc_lock); OPAL_THREAD_LOCK(&ompi_proc_lock);
opal_list_remove_item(&ompi_proc_list, (opal_list_item_t*)proc); opal_list_remove_item(&ompi_proc_list, (opal_list_item_t*)proc);
@ -81,19 +87,32 @@ int ompi_proc_init(void)
OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t); OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t);
OBJ_CONSTRUCT(&ompi_proc_lock, opal_mutex_t); OBJ_CONSTRUCT(&ompi_proc_lock, opal_mutex_t);
/* get all peers in this job */
if(OMPI_SUCCESS != (rc = orte_ns.get_peers(&peers, &npeers, &self))) { if(OMPI_SUCCESS != (rc = orte_ns.get_peers(&peers, &npeers, &self))) {
opal_output(0, "ompi_proc_init: get_peers failed with errno=%d", rc); opal_output(0, "ompi_proc_init: get_peers failed with errno=%d", rc);
return rc; return rc;
} }
/* find self */
for(i=0; i<npeers; i++) { for(i=0; i<npeers; i++) {
ompi_proc_t *proc = OBJ_NEW(ompi_proc_t); ompi_proc_t *proc = OBJ_NEW(ompi_proc_t);
proc->proc_name = peers[i]; proc->proc_name = peers[i];
if( i == self ) { if( i == self ) {
ompi_proc_local_proc = proc; ompi_proc_local_proc = proc;
proc->proc_flags |= OMPI_PROC_FLAG_LOCAL;
} }
} }
free(peers); free(peers);
/* setup registry callback to find everyone on my local node.
Can't do a GPR get because we're in the middle of MPI_INIT,
and we're setup for the GPR compound command -- so create a
subscription which will be serviced later, at the end of the
compound command. */
if (ORTE_SUCCESS != (rc = setup_registry_callback())) {
return rc;
}
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -118,23 +137,46 @@ int ompi_proc_finalize (void)
ompi_proc_t** ompi_proc_world(size_t *size) ompi_proc_t** ompi_proc_world(size_t *size)
{ {
ompi_proc_t **procs = ompi_proc_t **procs;
(ompi_proc_t**) malloc(opal_list_get_size(&ompi_proc_list) * sizeof(ompi_proc_t*));
ompi_proc_t *proc; ompi_proc_t *proc;
size_t count = 0; size_t count = 0;
orte_ns_cmp_bitmask_t mask;
orte_process_name_t my_name;
if(NULL == procs) /* check bozo case */
if (NULL == ompi_proc_local_proc) {
return NULL; return NULL;
}
mask = ORTE_NS_CMP_JOBID;
my_name = ompi_proc_local_proc->proc_name;
/* return only the procs that match this jobid */ /* First count how many match this jobid */
OPAL_THREAD_LOCK(&ompi_proc_lock); OPAL_THREAD_LOCK(&ompi_proc_lock);
for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); for (proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list); proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
proc = (ompi_proc_t*)opal_list_get_next(proc)) { proc = (ompi_proc_t*)opal_list_get_next(proc)) {
/* TSW - FIX */ if (0 == orte_ns.compare(mask, &proc->proc_name, &my_name)) {
procs[count++] = proc; ++count;
}
}
/* allocate an array */
procs = (ompi_proc_t**) malloc(count * sizeof(ompi_proc_t*));
if (NULL == procs) {
return NULL;
}
/* now save only the procs that match this jobid */
count = 0;
for (proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
proc = (ompi_proc_t*)opal_list_get_next(proc)) {
if (0 == orte_ns.compare(mask, &proc->proc_name, &my_name)) {
procs[count++] = proc;
}
} }
OPAL_THREAD_UNLOCK(&ompi_proc_lock); OPAL_THREAD_UNLOCK(&ompi_proc_lock);
*size = count; *size = count;
return procs; return procs;
} }
@ -147,8 +189,9 @@ ompi_proc_t** ompi_proc_all(size_t* size)
ompi_proc_t *proc; ompi_proc_t *proc;
size_t count = 0; size_t count = 0;
if(NULL == procs) if (NULL == procs) {
return NULL; return NULL;
}
OPAL_THREAD_LOCK(&ompi_proc_lock); OPAL_THREAD_LOCK(&ompi_proc_lock);
for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
@ -166,8 +209,9 @@ ompi_proc_t** ompi_proc_all(size_t* size)
ompi_proc_t** ompi_proc_self(size_t* size) ompi_proc_t** ompi_proc_self(size_t* size)
{ {
ompi_proc_t **procs = (ompi_proc_t**) malloc(sizeof(ompi_proc_t*)); ompi_proc_t **procs = (ompi_proc_t**) malloc(sizeof(ompi_proc_t*));
if(NULL == procs) if (NULL == procs) {
return NULL; return NULL;
}
OBJ_RETAIN(ompi_proc_local_proc); OBJ_RETAIN(ompi_proc_local_proc);
*procs = ompi_proc_local_proc; *procs = ompi_proc_local_proc;
*size = 1; *size = 1;
@ -186,7 +230,7 @@ ompi_proc_t * ompi_proc_find ( const orte_process_name_t * name )
for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list); proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
proc = (ompi_proc_t*)opal_list_get_next(proc)) { proc = (ompi_proc_t*)opal_list_get_next(proc)) {
if (orte_ns.compare(mask, &proc->proc_name, name) == 0) { if (0 == orte_ns.compare(mask, &proc->proc_name, name)) {
rproc = proc; rproc = proc;
break; break;
} }
@ -207,7 +251,7 @@ ompi_proc_t * ompi_proc_find_and_add ( const orte_process_name_t * name, bool* i
for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list); proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
proc = (ompi_proc_t*)opal_list_get_next(proc)) { proc = (ompi_proc_t*)opal_list_get_next(proc)) {
if(orte_ns.compare(mask, &proc->proc_name, name) == 0) { if (0 == orte_ns.compare(mask, &proc->proc_name, name)) {
*isnew = false; *isnew = false;
rproc = proc; rproc = proc;
break; break;
@ -216,9 +260,9 @@ ompi_proc_t * ompi_proc_find_and_add ( const orte_process_name_t * name, bool* i
OPAL_THREAD_UNLOCK(&ompi_proc_lock); OPAL_THREAD_UNLOCK(&ompi_proc_lock);
if ( NULL == rproc ) { if ( NULL == rproc ) {
ompi_proc_t *tproc = OBJ_NEW(ompi_proc_t); ompi_proc_t *tproc = OBJ_NEW(ompi_proc_t);
rproc = tproc; rproc = tproc;
rproc->proc_name = *name; rproc->proc_name = *name;
*isnew = true; *isnew = true;
} }
return rproc; return rproc;
@ -247,8 +291,8 @@ int ompi_proc_get_proclist (orte_buffer_t* buf, int proclistsize, ompi_proc_t **
orte_process_name_t name; orte_process_name_t name;
bool isnew = false; bool isnew = false;
/* do not free plist *ever*, since it is used in the remote group structure /* do not free plist *ever*, since it is used in the remote group
of a communicator */ structure of a communicator */
plist = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *)); plist = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
if ( NULL == plist ) { if ( NULL == plist ) {
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
@ -256,9 +300,10 @@ int ompi_proc_get_proclist (orte_buffer_t* buf, int proclistsize, ompi_proc_t **
for ( i=0; i<proclistsize; i++ ){ for ( i=0; i<proclistsize; i++ ){
size_t count=1; size_t count=1;
int rc = orte_dps.unpack(buf, &name, &count, ORTE_NAME); int rc = orte_dps.unpack(buf, &name, &count, ORTE_NAME);
if(rc != OMPI_SUCCESS) if(rc != ORTE_SUCCESS) {
return rc; return rc;
}
plist[i] = ompi_proc_find_and_add ( &name, &isnew ); plist[i] = ompi_proc_find_and_add ( &name, &isnew );
if(isnew) { if(isnew) {
MCA_PML_CALL(add_procs(&plist[i], 1)); MCA_PML_CALL(add_procs(&plist[i], 1));
@ -268,3 +313,177 @@ int ompi_proc_get_proclist (orte_buffer_t* buf, int proclistsize, ompi_proc_t **
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/*
* As described above, we cannot do a simple GPR get because we're in
* the middle of the GPR compound command in MPI_INIT. So setup a
* subscription that will be fullfilled later in MPI_INIT.
*/
static int setup_registry_callback(void)
{
int rc;
char *segment;
ompi_proc_t *local = ompi_proc_local();
orte_jobid_t jobid;
orte_gpr_trigger_t trig, *trig1;
orte_gpr_value_t value, *values;
orte_gpr_subscription_t sub, *sub1;
if (ORTE_SUCCESS != orte_ns.get_jobid(&jobid, &local->proc_name)) {
printf("Badness!\n");
}
/* find the job segment on the registry */
if (ORTE_SUCCESS !=
(rc = orte_schema.get_job_segment_name(&segment, jobid))) {
return rc;
}
OBJ_CONSTRUCT(&sub, orte_gpr_subscription_t);
/* indicate that this is a standard subscription. This indicates
that the subscription will be common to all processes. Thus,
the resulting data can be consolidated into a
process-independent message and broadcast to all processes */
if (ORTE_SUCCESS !=
(rc = orte_schema.get_std_subscription_name(&(sub.name),
OMPI_ATTRIBUTE_SUBSCRIPTION, jobid))) {
return rc;
}
/* send data when trigger fires, then delete - no need for further
notifications */
sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG;
OBJ_CONSTRUCT(&value, orte_gpr_value_t);
values = &value;
sub.values = &values;
sub.cnt = 1;
value.addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
value.segment = segment;
value.tokens = NULL; /* wildcard - look at all containers */
value.num_tokens = 0;
value.cnt = 2;
value.keyvals =
(orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*) * 2);
if (NULL == value.keyvals) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
value.keyvals[0] = NULL;
value.keyvals[1] = NULL;
value.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[0]) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
value.keyvals[0]->key = strdup(ORTE_PROC_NAME_KEY);
if (NULL == value.keyvals[0]->key) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
value.keyvals[1] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[0]) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
value.keyvals[1]->key = strdup(ORTE_NODE_NAME_KEY);
if (NULL == value.keyvals[0]->key) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
sub.cbfunc = callback;
sub.user_tag = NULL;
/* setup the trigger information */
OBJ_CONSTRUCT(&trig, orte_gpr_trigger_t);
if (ORTE_SUCCESS !=
(rc = orte_schema.get_std_trigger_name(&(trig.name),
ORTE_STG1_TRIGGER, jobid))) {
goto cleanup;
}
/* do the subscription */
sub1 = &sub;
trig1 = &trig;
rc = orte_gpr.subscribe(1, &sub1, 1, &trig1);
cleanup:
OBJ_DESTRUCT(&value);
sub.values = NULL;
OBJ_DESTRUCT(&sub);
OBJ_DESTRUCT(&trig);
return rc;
}
/*
* This callback is invoked by a subscription during MPI_INIT to let
* us know what procs are on what hosts. We look at the results and
* figure out which procs are on the same host as the local proc. For
* each proc that is on the same host as the local proc, we set that
* proc's OMPI_PROC_FLAG_LOCAL flag.
*/
static void callback(orte_gpr_notify_data_t *data, void *cbdata)
{
size_t i, j;
char *str;
bool found_name;
orte_ns_cmp_bitmask_t mask;
orte_process_name_t name;
orte_gpr_value_t **value;
orte_gpr_keyval_t **keyval;
ompi_proc_t *proc;
/* check bozo case */
if (0 == data->cnt) {
return;
}
/* locks are probably not necessary here, but just be safe anyway */
OPAL_THREAD_LOCK(&ompi_proc_lock);
/* loop over the data returned in the subscription */
mask = ORTE_NS_CMP_CELLID | ORTE_NS_CMP_JOBID | ORTE_NS_CMP_VPID;
value = data->values;
for (i = 0; i < data->cnt; ++i) {
str = NULL;
found_name = false;
keyval = value[i]->keyvals;
/* find the 2 keys that we're looking for */
for (j = 0; j < value[i]->cnt; ++j) {
if (strcmp(keyval[j]->key, ORTE_PROC_NAME_KEY) == 0) {
orte_ns.get_proc_name_string(&str, &keyval[j]->value.proc);
name = keyval[j]->value.proc;
found_name = true;
} else if (strcmp(keyval[j]->key, ORTE_NODE_NAME_KEY) == 0) {
if (NULL != str) {
free(str);
}
str = strdup(keyval[j]->value.strptr);
}
}
/* if we found both keys and the proc is on my local host,
find it in the master proc list and set the "local" flag */
if (NULL != str && found_name &&
0 == strcmp(str, orte_system_info.nodename)) {
for (proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
proc = (ompi_proc_t*)opal_list_get_next(proc)) {
if (0 == orte_ns.compare(mask, &name,
&proc->proc_name)) {
proc->proc_flags |= OMPI_PROC_FLAG_LOCAL;
}
}
}
}
/* unlock */
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
}

Просмотреть файл

@ -30,22 +30,38 @@ extern "C" {
OMPI_DECLSPEC extern opal_class_t ompi_proc_t_class; OMPI_DECLSPEC extern opal_class_t ompi_proc_t_class;
struct ompi_proc_t { struct ompi_proc_t {
opal_list_item_t super; /* allow proc to be placed on a list */ /** allow proc to be placed on a list */
opal_list_item_t super;
/** this process' name */
orte_process_name_t proc_name; orte_process_name_t proc_name;
struct mca_pml_proc_t* proc_pml; /* PML specific proc data */ /** PML specific proc data */
opal_object_t* proc_modex; /* MCA module exchange data */ struct mca_pml_proc_t* proc_pml;
/** MCA module exchange data */
opal_object_t* proc_modex;
/** architecture of this process */
uint32_t proc_arch; uint32_t proc_arch;
/** process-wide convertor */
struct ompi_convertor_t* proc_convertor; struct ompi_convertor_t* proc_convertor;
/** process-wide lock */
opal_mutex_t proc_lock; opal_mutex_t proc_lock;
/** flags for this proc */
/* JMS: need to have the following information: uint8_t proc_flags;
- how am i [mpi] connected (bitmap): spawn (parent/child),
connect, accept, joint
*/
}; };
/**
* Convenience typedef
*/
typedef struct ompi_proc_t ompi_proc_t; typedef struct ompi_proc_t ompi_proc_t;
OMPI_DECLSPEC extern ompi_proc_t* ompi_proc_local_proc; OMPI_DECLSPEC extern ompi_proc_t* ompi_proc_local_proc;
/*
* Flags
*/
/**
* Flag to indicate that the proc is on the same node as the local proc
*/
#define OMPI_PROC_FLAG_LOCAL 0x01
/** /**
* Query the run-time environment and build list of available proc instances. * Query the run-time environment and build list of available proc instances.