Two quick additions:
1. Added OMPI_PROC_ARCH as a defined registry key and added the code so that the architecture info gets properly transmitted across all processes using the startup message. 2. Added an OMPI_MODEX_KEY definition and removed the hard-coded "modex" key from pml_modex_exchange This commit was SVN r7129.
Этот коммит содержится в:
родитель
ad0c6059a8
Коммит
03e45e6723
@ -3,14 +3,14 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
@ -77,7 +77,7 @@ enum {
|
||||
OMPI_UNPACK_INADEQUATE_SPACE = ORTE_UNPACK_INADEQUATE_SPACE,
|
||||
OMPI_UNPACK_READ_PAST_END_OF_BUFFER = ORTE_UNPACK_READ_PAST_END_OF_BUFFER,
|
||||
OMPI_ERR_GPR_DATA_CORRUPT = ORTE_ERR_GPR_DATA_CORRUPT,
|
||||
OMPI_ERR_TYPE_MISMATCH = ORTE_ERR_TYPE_MISMATCH
|
||||
OMPI_ERR_TYPE_MISMATCH = ORTE_ERR_TYPE_MISMATCH
|
||||
};
|
||||
|
||||
#define OMPI_ERR_MAX (OMPI_ERR_BASE - 1)
|
||||
@ -92,6 +92,11 @@ enum {
|
||||
#define OMPI_OOB_SUBSCRIPTION "ompi-oob-sub"
|
||||
#define OMPI_MODEX_SUBSCRIPTION "ompi-modex-sub"
|
||||
|
||||
/*
|
||||
* OMPI-specific registry keys
|
||||
*/
|
||||
#define OMPI_PROC_ARCH "ompi-proc-arch"
|
||||
#define OMPI_MODEX_KEY "ompi-modex"
|
||||
|
||||
#endif /* OMPI_CONSTANTS_H */
|
||||
|
||||
|
@ -278,7 +278,7 @@ orte_gpr_base_dump_notify_data(data,0);
|
||||
void* bytes = NULL;
|
||||
size_t cnt;
|
||||
size_t num_bytes;
|
||||
if(strcmp(keyval[j]->key,"modex") != 0)
|
||||
if(strcmp(keyval[j]->key,OMPI_MODEX_KEY) != 0)
|
||||
continue;
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
|
||||
@ -324,7 +324,6 @@ orte_gpr_base_dump_notify_data(data,0);
|
||||
}
|
||||
if (num_bytes != 0) {
|
||||
if(NULL == (bytes = malloc(num_bytes))) {
|
||||
opal_output(0, "Unable to allocate memory (length %d bytes).\n", num_bytes );
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
continue;
|
||||
}
|
||||
@ -437,11 +436,9 @@ static int mca_pml_base_modex_subscribe(orte_process_name_t* name)
|
||||
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
|
||||
segment,
|
||||
NULL, /* look at all containers on this segment */
|
||||
"modex",
|
||||
OMPI_MODEX_KEY,
|
||||
mca_pml_base_modex_registry_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
opal_output(0, "mca_pml_base_modex_exchange: "
|
||||
"orte_gpr.subscribe failed with return code %d\n", rc);
|
||||
free(sub_name);
|
||||
free(trig_name);
|
||||
free(segment);
|
||||
@ -474,50 +471,38 @@ int mca_pml_base_modex_send(
|
||||
size_t size)
|
||||
{
|
||||
orte_jobid_t jobid;
|
||||
orte_gpr_value_t *value;
|
||||
orte_gpr_value_union_t value;
|
||||
int rc;
|
||||
orte_buffer_t buffer;
|
||||
char* ptr;
|
||||
|
||||
value = OBJ_NEW(orte_gpr_value_t);
|
||||
if (NULL == value) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
size_t i, num_tokens;
|
||||
char *ptr, *segment, **tokens;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid(&jobid, orte_process_info.my_name))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&(value->segment), jobid))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&(value->tokens),
|
||||
&(value->num_tokens), orte_process_info.my_name))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens,
|
||||
&num_tokens, orte_process_info.my_name))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(segment);
|
||||
return rc;
|
||||
}
|
||||
value->addr_mode = ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR;
|
||||
value->cnt = 1;
|
||||
value->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
|
||||
value->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
|
||||
if (NULL == value->keyvals[0]) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
(value->keyvals[0])->type = ORTE_BYTE_OBJECT;
|
||||
|
||||
|
||||
#if 0
|
||||
(value->keyvals[0])->value.byteobject.size = size;
|
||||
(value->keyvals[0])->value.byteobject.bytes = (void *)malloc(size);
|
||||
if(NULL == (value->keyvals[0])->value.byteobject.bytes) {
|
||||
value.byteobject.size = size;
|
||||
value.byteobject.bytes = (void *)malloc(size);
|
||||
if(NULL == value.byteobject.bytes) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
memcpy((value->keyvals[0])->value.byteobject.bytes, data, size);
|
||||
memcpy((value.byteobject.bytes, data, size);
|
||||
|
||||
asprintf(&((value->keyvals[0])->key), "modex-%s-%s-%d-%d",
|
||||
source_component->mca_type_name,
|
||||
@ -546,19 +531,24 @@ int mca_pml_base_modex_send(
|
||||
if (ORTE_SUCCESS != (rc = orte_dps.pack(&buffer, (void*)data, size, ORTE_BYTE))) {
|
||||
goto cleanup;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_dps.unload(&buffer,
|
||||
(void**)&(value->keyvals[0])->value.byteobject.bytes,
|
||||
(size_t*)&(value->keyvals[0])->value.byteobject.size))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_dps.unload(&buffer, (void**)&value.byteobject.bytes,
|
||||
(size_t*)&value.byteobject.size))) {
|
||||
goto cleanup;
|
||||
}
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
value->keyvals[0]->key = strdup("modex");
|
||||
#endif
|
||||
|
||||
rc = orte_gpr.put(1, &value);
|
||||
rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
|
||||
segment, tokens, OMPI_MODEX_KEY, ORTE_BYTE_OBJECT, value);
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(value);
|
||||
free(segment);
|
||||
for (i=0; i < num_tokens; i++) {
|
||||
free(tokens[i]);
|
||||
tokens[i] = NULL;
|
||||
}
|
||||
if (NULL != tokens) free(tokens);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -92,7 +92,10 @@ void ompi_proc_destruct(ompi_proc_t* proc)
|
||||
int ompi_proc_init(void)
|
||||
{
|
||||
orte_process_name_t *peers;
|
||||
size_t i, npeers, self;
|
||||
size_t i, npeers, self, num_tokens;
|
||||
orte_jobid_t jobid;
|
||||
char *segment, **tokens;
|
||||
orte_gpr_value_union_t value;
|
||||
int rc;
|
||||
|
||||
OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t);
|
||||
@ -125,8 +128,41 @@ int ompi_proc_init(void)
|
||||
}
|
||||
|
||||
/* Here we have to add to the GPR the information about the current architecture.
|
||||
* TODO: george
|
||||
*/
|
||||
if (OMPI_SUCCESS != (rc = ompi_arch_compute_local_id(&value.ui32))) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid(&jobid, orte_process_info.my_name))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* find the job segment on the registry */
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* get the registry tokens for this node */
|
||||
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens, &num_tokens,
|
||||
orte_process_info.my_name))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(segment);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* put the arch info on the registry */
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.put_1(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
|
||||
segment, tokens,
|
||||
OMPI_PROC_ARCH, ORTE_UINT32, value))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
free(segment);
|
||||
for (i=0; i < num_tokens; i++) {
|
||||
free(tokens[i]);
|
||||
tokens[i] = NULL;
|
||||
}
|
||||
if (NULL != tokens) free(tokens);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -337,7 +373,7 @@ int ompi_proc_get_proclist (orte_buffer_t* buf, int proclistsize, ompi_proc_t **
|
||||
static int setup_registry_callback(void)
|
||||
{
|
||||
int rc;
|
||||
char *segment, *sub_name, *trig_name, *keys[2];
|
||||
char *segment, *sub_name, *trig_name, *keys[3];
|
||||
ompi_proc_t *local = ompi_proc_local();
|
||||
orte_gpr_subscription_id_t id;
|
||||
orte_jobid_t jobid;
|
||||
@ -368,6 +404,7 @@ static int setup_registry_callback(void)
|
||||
/* define the keys to be returned */
|
||||
keys[0] = strdup(ORTE_PROC_NAME_KEY);
|
||||
keys[1] = strdup(ORTE_NODE_NAME_KEY);
|
||||
keys[2] = strdup(OMPI_PROC_ARCH);
|
||||
|
||||
/* Here we have to add another key to the registry to be able to get the information
|
||||
* about the remote architectures.
|
||||
@ -387,7 +424,7 @@ static int setup_registry_callback(void)
|
||||
ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
|
||||
segment,
|
||||
NULL, /* wildcard - look at all containers */
|
||||
2, keys,
|
||||
3, keys,
|
||||
callback, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
@ -414,7 +451,8 @@ static void callback(orte_gpr_notify_data_t *data, void *cbdata)
|
||||
{
|
||||
size_t i, j, k;
|
||||
char *str;
|
||||
bool found_name;
|
||||
uint32_t arch;
|
||||
bool found_name, found_arch;
|
||||
orte_ns_cmp_bitmask_t mask;
|
||||
orte_process_name_t name;
|
||||
orte_gpr_value_t **value;
|
||||
@ -438,6 +476,7 @@ static void callback(orte_gpr_notify_data_t *data, void *cbdata)
|
||||
k++;
|
||||
str = NULL;
|
||||
found_name = false;
|
||||
found_arch = false;
|
||||
keyval = value[i]->keyvals;
|
||||
|
||||
/* find the 2 keys that we're looking for */
|
||||
@ -451,27 +490,32 @@ static void callback(orte_gpr_notify_data_t *data, void *cbdata)
|
||||
free(str);
|
||||
}
|
||||
str = strdup(keyval[j]->value.strptr);
|
||||
} else if (strcmp(keyval[j]->key, OMPI_PROC_ARCH) == 0) {
|
||||
arch = keyval[j]->value.ui32;
|
||||
found_arch = true;
|
||||
}
|
||||
}
|
||||
|
||||
/* if we found both keys and the proc is on my local host,
|
||||
/* if we found all keys and the proc is on my local host,
|
||||
find it in the master proc list and set the "local" flag */
|
||||
if (NULL != str && found_name &&
|
||||
0 == strcmp(str, orte_system_info.nodename)) {
|
||||
if (NULL != str && found_name && found_arch) {
|
||||
for (proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
|
||||
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
|
||||
proc = (ompi_proc_t*)opal_list_get_next(proc)) {
|
||||
if (0 == orte_ns.compare(mask, &name,
|
||||
&proc->proc_name)) {
|
||||
proc->proc_flags |= OMPI_PROC_FLAG_LOCAL;
|
||||
|
||||
/* if the nodename of this info is my local host,
|
||||
* find the associated proc entry and set the local
|
||||
* flag
|
||||
*/
|
||||
if (0 == strcmp(str, orte_system_info.nodename) &&
|
||||
0 == orte_ns.compare(mask, &name, &proc->proc_name)) {
|
||||
proc->proc_flags |= OMPI_PROC_FLAG_LOCAL;
|
||||
}
|
||||
/* set the architecture entry for this proc */
|
||||
proc->proc_arch = arch;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* And finally here we have to retrieve the remote architectures and create the convertors
|
||||
* attached to the remote processors depending on the remote architecture.
|
||||
* TODO: George.
|
||||
*/
|
||||
}
|
||||
|
||||
/* unlock */
|
||||
|
@ -276,23 +276,23 @@ void orte_gpr_base_dump_keyval_value(orte_buffer_t *buffer, orte_gpr_keyval_t *i
|
||||
break;
|
||||
|
||||
case ORTE_UINT8:
|
||||
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT8\tValue: %d", (int)iptr->value.ui8);
|
||||
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT8\tValue: %u", (int)iptr->value.ui8);
|
||||
orte_gpr_base_dump_load_string(buffer, &tmp_out);
|
||||
break;
|
||||
|
||||
case ORTE_UINT16:
|
||||
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT16\tValue: %d", (int)iptr->value.ui16);
|
||||
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT16\tValue: %u", (int)iptr->value.ui16);
|
||||
orte_gpr_base_dump_load_string(buffer, &tmp_out);
|
||||
break;
|
||||
|
||||
case ORTE_UINT32:
|
||||
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT32\tValue: %d", (int)iptr->value.ui32);
|
||||
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT32\tValue: %u", (int)iptr->value.ui32);
|
||||
orte_gpr_base_dump_load_string(buffer, &tmp_out);
|
||||
break;
|
||||
|
||||
#ifdef HAVE_INT64_T
|
||||
case ORTE_UINT64:
|
||||
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT64\tValue: %d", (int)iptr->value.ui64);
|
||||
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT64\tValue: %u", (int)iptr->value.ui64);
|
||||
orte_gpr_base_dump_load_string(buffer, &tmp_out);
|
||||
break;
|
||||
#endif
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user