1
1

Squeeeeeeze the launch message. This is the message sent to the daemons that provides all the data required for launching their local procs. In reorganizing the ODLS framework, I discovered that we were sending a significant amount of unnecessary and repeated data. This commit resolves this by:

1. taking advantage of the fact that we no longer create the launch  message via a GPR trigger. In earlier times, we had the GPR create the launch message based on a subscription. In that mode of operation, we could not guarantee the order in which the data was stored in the message - hence, we had no choice but to parse the message in a loop that checked each value against a list of possible "keys" until the corresponding value was found.

Now, however, we construct the message "by hand", so we know precisely what data is in each location in the message. Thus, we no longer need to send the character string "keys" for each data value any more. This represents a rather large savings in the message size - to give you an example, we typically would use a 30-char "key" for a 2-byte data value. As you can see, the overhead can become very large.

2. sending node-specific data only once. Again, because we used to construct the message via subscriptions that were done on a per-proc basis, the data for each node (e.g., the daemon's name, whether or not the node was oversubscribed) would be included in the data for each proc. Thus, the node-specific data was repeated for every proc.

Now that we construct the message "by hand", there is no reason to do this any more. Instead, we can insert the data for a specific node only once, and then provide the per-proc data for that node. We therefore not only save all that extra data in the message, but we also only need to parse the per-node data once.

The savings become significant at scale. Here is a comparison between the revised trunk and the trunk prior to this commit (all data was taken on odin, using openib, 64 nodes, unity message routing, tested with application consisting of mpi_init/mpi_barrier/mpi_finalize, all execution times given in seconds, all launch message sizes in bytes):

Per-node scaling, taken at 1ppn:

#nodes           original trunk                         revised trunk
             time               size                time               size
      1      0.10                819                0.09                564
      2      0.14               1070                0.14                677
      3      0.15               1321                0.14                790
      4      0.15               1572                0.15                903
      8      0.17               2576                0.20               1355
     16      0.25               4584                0.21               2259
     32      0.28               8600                0.27               4067
     64      0.50              16632                0.39               7683

Per-proc scaling, taken at 64 nodes

   ppn             original trunk                         revised trunk
              time               size                time               size
      1       0.50              16669                0.40               7720
      2       0.55              32733                0.54              11048
      3       0.87              48797                0.81              14376
      4       1.0               64861                0.85              17704


Condensing those numbers, it appears we gained:

per-node message size: 251 bytes/node -> 113 bytes/node

per-proc message size: 251 bytes/proc  -> 52 bytes/proc

per-job message size:  568 bytes/job -> 399 bytes/job 
(job-specific data such as jobid, override oversubscribe flag, total #procs in job, total slots allocated)

The fact that the two pre-commit trunk numbers are the same confirms the fact that each proc was containing the node data as well. It isn't quite the 10x message reduction I had hoped to get, but it is significant and gives much better scaling.

Note that the timing info was, as usual, pretty chaotic - the numbers cited here were typical across several runs taken after the initial one to avoid NFS file positioning influences.

Also note that this commit removes the orte_process_info.vpid_start field and the handful of places that passed that useless value. By definition, all jobs start at vpid=0, so all we were doing is passing "0" around. In fact, many places simply hardwired it to "0" anyway rather than deal with it.

This commit was SVN r16428.
Этот коммит содержится в:
Ralph Castain 2007-10-11 15:57:26 +00:00
родитель 25c95c9ee9
Коммит 3dbd4d9be7
21 изменённых файлов: 496 добавлений и 386 удалений

Просмотреть файл

@ -73,7 +73,7 @@ int orte_ns_proxy_get_peers(orte_process_name_t **procs,
for (i=0; i < orte_process_info.num_procs; i++) {
(*procs)[i].jobid = ORTE_PROC_MY_NAME->jobid;
(*procs)[i].vpid = orte_process_info.vpid_start + i;
(*procs)[i].vpid = i;
}
*num_procs = orte_process_info.num_procs;

Просмотреть файл

@ -70,7 +70,7 @@ int orte_ns_replica_get_peers(orte_process_name_t **procs,
for (i=0; i < orte_process_info.num_procs; i++) {
(*procs)[i].jobid = ORTE_PROC_MY_NAME->jobid;
(*procs)[i].vpid = orte_process_info.vpid_start + i;
(*procs)[i].vpid = i;
}
*num_procs = orte_process_info.num_procs;

Просмотреть файл

@ -47,6 +47,7 @@
#include "orte/mca/rmgr/rmgr.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/iof_base_setup.h"
#include "orte/mca/ras/base/ras_private.h"
#include "orte/mca/sds/base/base.h"
#include "orte/util/session_dir.h"
#include "orte/util/sys_info.h"
@ -267,24 +268,17 @@ int orte_odls_base_default_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_job_map_t *map)
{
orte_gpr_notify_data_t *ndat;
orte_gpr_value_t **values, *value;
orte_std_cntr_t cnt;
char *glob_tokens[] = {
ORTE_JOB_GLOBALS,
NULL
};
char *glob_keys[] = {
ORTE_JOB_APP_CONTEXT_KEY,
ORTE_JOB_VPID_START_KEY,
ORTE_JOB_VPID_RANGE_KEY,
ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY,
NULL
};
orte_gpr_value_t *value;
opal_list_item_t *item, *m_item;
orte_mapped_node_t *node;
orte_mapped_proc_t *proc;
int rc;
char *segment;
int posn;
orte_std_cntr_t num_kvs;
orte_app_context_t **app_contexts;
orte_std_cntr_t i, num_contexts, total_slots_alloc;
orte_vpid_t range;
bool override;
/* set default answer */
*data = NULL;
@ -302,28 +296,91 @@ int orte_odls_base_default_get_add_procs_data(orte_gpr_notify_data_t **data,
return rc;
}
/* get the segment name */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, map->job))) {
/* get the vpid range */
if (ORTE_SUCCESS != (rc = orte_ns.get_vpid_range(map->job, &range))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* get the info from the job globals container first */
if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
segment, glob_tokens, glob_keys, &cnt, &values))) {
/* get the total slots allocated to us */
if (ORTE_SUCCESS != (rc = orte_rmgr.get_universe_size(map->job, &total_slots_alloc))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* there can only be one value here since we only specified a single container.
* Just transfer the returned value to the ndat structure
*/
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, values[0]))) {
/* get the override_oversubscribed flag */
if (ORTE_SUCCESS != (rc = orte_ras_base_get_oversubscribe_override(map->job, &override))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(values[0]);
return rc;
}
/* get the app_context array */
if (ORTE_SUCCESS != (rc = orte_rmgr.get_app_context(map->job, &app_contexts, &num_contexts))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* compute number of keyvals required */
num_kvs = 3 + num_contexts;
/* create the value object - don't need tokens or segment name */
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, 0, NULL, num_kvs, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* store the range - ORTE_VPID_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]),
NULL,
ORTE_VPID, &range))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
/* store the total slots allocated - ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[1]),
NULL,
ORTE_STD_CNTR, &total_slots_alloc))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
/* store the oversubscribe override flag - ORTE_JOB_OVERSUBSCRIBE_OVERRIDE_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]),
NULL,
ORTE_BOOL, &override))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
/* for each context, store it - ORTE_JOB_APP_CONTEXT_KEY */
for (i=0; i < num_contexts; i++) {
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[i+3]),
NULL,
ORTE_APP_CONTEXT, app_contexts[i]))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
}
/* add the data to the notify_data object */
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&i, ndat->values, value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
ndat->cnt = 1;
@ -336,33 +393,61 @@ int orte_odls_base_default_get_add_procs_data(orte_gpr_notify_data_t **data,
m_item = opal_list_get_next(m_item)) {
node = (orte_mapped_node_t*)m_item;
/* determine the number of keyvals we need for this node */
num_kvs = 3 + 4*node->num_procs;
/* create the value object - don't need tokens or segment name */
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, 0, NULL, num_kvs, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO IN THESE KEYVALS
* BE REFLECTED IN THE CONSTRUCT_CHILD_LIST PARSER BELOW
*/
/* store the node-specific data */
/* ORTE_VPID_KEY*/
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]),
NULL,
ORTE_VPID, &(node->daemon->vpid)))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
/* ORTE_NODE_NUM_PROCS_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[1]),
NULL,
ORTE_STD_CNTR, &node->num_procs))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
/* ORTE_NODE_OVERSUBSCRIBED_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]),
NULL,
ORTE_BOOL, &node->oversubscribed))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
/* cycle through and add the proc-specific data */
posn = 3;
for (item = opal_list_get_first(&node->procs);
item != opal_list_get_end(&node->procs);
item = opal_list_get_next(item)) {
proc = (orte_mapped_proc_t*)item;
/* must not have any tokens so that launch_procs can process it correctly */
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, 0, segment, 7, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
/* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO IN THESE KEYVALS
* BE REFLECTED IN THE CONSTRUCT_CHILD_LIST PARSER BELOW
*/
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]),
ORTE_VPID_KEY,
ORTE_VPID, &(node->daemon->vpid)))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[1]),
ORTE_VPID_KEY,
/* ORTE_VPID_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[posn++]),
NULL,
ORTE_VPID, &(proc->name.vpid)))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
@ -370,8 +455,9 @@ int orte_odls_base_default_get_add_procs_data(orte_gpr_notify_data_t **data,
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]),
ORTE_PROC_APP_CONTEXT_KEY,
/* ORTE_PROC_APP_CONTEXT_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[posn++]),
NULL,
ORTE_STD_CNTR, &proc->app_idx))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
@ -379,49 +465,35 @@ int orte_odls_base_default_get_add_procs_data(orte_gpr_notify_data_t **data,
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[3]),
ORTE_PROC_LOCAL_RANK_KEY,
/* ORTE_PROC_LOCAL_RANK_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[posn++]),
NULL,
ORTE_VPID, &proc->local_rank))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[4]),
ORTE_PROC_CPU_LIST_KEY,
/* ORTE_PROC_CPU_LIST_KEY */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[posn++]),
NULL,
ORTE_STRING, proc->slot_list))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[5]),
ORTE_NODE_NUM_PROCS_KEY,
ORTE_STD_CNTR, &node->num_procs))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[6]),
ORTE_NODE_OVERSUBSCRIBED_KEY,
ORTE_BOOL, &node->oversubscribed))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(values[0]);
return rc;
}
ndat->cnt += 1;
}
/* add this node's data to the notify_data object */
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&i, ndat->values, value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
ndat->cnt += 1;
}
*data = ndat;
@ -430,7 +502,7 @@ int orte_odls_base_default_get_add_procs_data(orte_gpr_notify_data_t **data,
int orte_odls_base_default_construct_child_list(orte_gpr_notify_data_t *data,
orte_jobid_t *job,
orte_vpid_t *vpid_start,
orte_std_cntr_t *num_local_procs,
orte_vpid_t *vpid_range,
orte_std_cntr_t *total_slots_alloc,
bool *node_included,
@ -441,12 +513,11 @@ int orte_odls_base_default_construct_child_list(orte_gpr_notify_data_t *data,
int rc;
orte_app_context_t *app;
orte_gpr_value_t *value, **values;
orte_gpr_keyval_t *kval;
orte_vpid_t *vptr;
orte_odls_child_t *child;
orte_odls_app_context_t *app_item;
orte_std_cntr_t i, j, kv, *sptr;
orte_process_name_t daemon, proc;
orte_std_cntr_t j, kv, *sptr, posn;
orte_process_name_t proc;
bool *bptr;
char *slot_str;
@ -458,7 +529,7 @@ int orte_odls_base_default_construct_child_list(orte_gpr_notify_data_t *data,
/* set the default values since they may not be included in the data */
*job = ORTE_JOBID_INVALID;
*vpid_start = ORTE_VPID_INVALID;
*num_local_procs = 0;
*vpid_range = ORTE_VPID_INVALID;
*total_slots_alloc = 0;
*node_included = false;
@ -484,153 +555,129 @@ int orte_odls_base_default_construct_child_list(orte_gpr_notify_data_t *data,
return rc;
}
/* init the daemon and proc objects */
daemon.jobid = 0;
/* init the proc object */
proc.jobid = *job;
values = (orte_gpr_value_t**)(data->values)->addr;
for (j=0, i=0; i < data->cnt && j < (data->values)->size; j++) { /* loop through all returned values */
if (NULL != values[j]) {
i++;
value = values[j];
if (NULL != value->tokens) {
/* this came from the globals container, so it must contain
* the app_context(s), vpid_start, and vpid_range entries. Only one
* value object should ever come from that container
*/
for (kv=0; kv < value->cnt; kv++) {
kval = value->keyvals[kv];
if (strcmp(kval->key, ORTE_JOB_VPID_START_KEY) == 0) {
/* this can only occur once, so just store it */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, kval->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*vpid_start = *vptr;
continue;
}
if (strcmp(kval->key, ORTE_JOB_VPID_RANGE_KEY) == 0) {
/* this can only occur once, so just store it */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, kval->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*vpid_range = *vptr;
continue;
}
if (strcmp(kval->key, ORTE_JOB_APP_CONTEXT_KEY) == 0) {
/* this can occur multiple times since we allow multiple
* app_contexts on the orterun command line. Add them
* to the list
*/
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&app, kval->value, ORTE_APP_CONTEXT))) {
ORTE_ERROR_LOG(rc);
return rc;
}
app_item = OBJ_NEW(orte_odls_app_context_t);
if (NULL == app_item) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
app_item->app_context = app;
opal_list_append(app_context_list, &app_item->super);
kval->value->data = NULL; /* protect the data storage from later release */
}
if (strcmp(kval->key, ORTE_JOB_OVERSUBSCRIBE_OVERRIDE_KEY) == 0) {
/* this can only occur once, so just store it */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, kval->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*override_oversubscribed = *bptr;
continue;
}
if (strcmp(kval->key, ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY) == 0) {
/* this can only occur once, so just store it */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, kval->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*total_slots_alloc = *sptr;
continue;
}
} /* end for loop to process global data */
} else {
/* this must have come from one of the process containers, so it must
* contain data for a proc structure - get the name of the daemon and proc
*/
if (ORTE_SUCCESS != (rc = orte_odls_base_default_extract_proc_map_info(&daemon, &proc, value))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* does this proc belong to us? */
if (ORTE_PROC_MY_NAME->vpid != daemon.vpid) {
/* evidently not - ignore it */
continue;
}
/* yes it does - indicate that we need to do something */
*node_included = true;
/* harvest the info into a new child structure, taking advantage
* of our knowledge of the ordering of the data itself
*/
child = OBJ_NEW(orte_odls_child_t);
if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&child->name, &proc, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* 3rd posn - app_idx */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, value->keyvals[2]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
child->app_idx = *sptr; /* save the index into the app_context objects */
/* 4th posn - local rank */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, value->keyvals[3]->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
child->local_rank = *vptr; /* save the local_rank */
/* 5th posn - cpu list */
if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&slot_str, value->keyvals[4]->value->data, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (NULL != slot_str) {
if (ORTE_SUCCESS != (rc = slot_list_to_cpu_set(slot_str, child))){
ORTE_ERROR_LOG(rc);
free(slot_str);
return rc;
}
free(slot_str);
}
/* 6th posn - number of local procs */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, value->keyvals[5]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
child->num_procs = *sptr; /* save the number of procs from this job on this node */
/* 7th posn - oversubscribed flag */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, value->keyvals[6]->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*oversubscribed = *bptr; /* save the flag */
/* protect operation on the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
opal_list_append(&orte_odls_globals.children, &child->super);
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
/* the first value in the data object contains the job-global data, so extract it first */
value = values[0];
/* ORTE_JOB_VPID_RANGE_KEY */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, value->keyvals[0]->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*vpid_range = *vptr;
/* ORTE_JOB_TOTAL_SLOTS_ALLOC_KEY */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, value->keyvals[1]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*total_slots_alloc = *sptr;
/* ORTE_JOB_OVERSUBSCRIBE_OVERRIDE_KEY */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, value->keyvals[2]->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*override_oversubscribed = *bptr;
/* loop through remaining keyvals and get the app_contexts */
for (kv=3; kv < value->cnt; kv++) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&app, value->keyvals[kv]->value, ORTE_APP_CONTEXT))) {
ORTE_ERROR_LOG(rc);
return rc;
}
app_item = OBJ_NEW(orte_odls_app_context_t);
if (NULL == app_item) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
app_item->app_context = app;
opal_list_append(app_context_list, &app_item->super);
value->keyvals[kv]->value->data = NULL; /* protect the data storage from later release */
}
/* parsing of job-global data is complete - now process the node-specific data */
for (j=1; j < data->cnt; j++) { /* loop across remaining values */
value = values[j];
/* this must have be about a node, so extract the daemon's vpid so we can
* see if this is intended for us
*/
/* vpid of daemon is in first position */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, value->keyvals[0]->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* does this data belong to us? */
if (ORTE_PROC_MY_NAME->vpid != *vptr) {
/* evidently not - ignore it */
continue;
}
/* yes it does - indicate that we need to do something */
*node_included = true;
/* harvest the rest of the node-specific data */
/* 2nd position - num local procs for this job on this node */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, value->keyvals[1]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*num_local_procs = *sptr; /* save the value */
/* 3rd posn - oversubscribed flag */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, value->keyvals[2]->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*oversubscribed = *bptr; /* save the flag */
/* loop through the children for this job and harvest their info */
posn = 3;
while (posn < value->cnt) {
child = OBJ_NEW(orte_odls_child_t);
/* 1st child posn - vpid */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, value->keyvals[posn++]->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
proc.vpid = *vptr;
if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&child->name, &proc, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* 2nd child posn - app_idx */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, value->keyvals[posn++]->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
child->app_idx = *sptr; /* save the index into the app_context objects */
/* 3rd child posn - local rank */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, value->keyvals[posn++]->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
child->local_rank = *vptr; /* save the local_rank */
/* 4th child posn - cpu list */
if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&slot_str, value->keyvals[posn++]->value->data, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (NULL != slot_str) {
if (ORTE_SUCCESS != (rc = slot_list_to_cpu_set(slot_str, child))){
ORTE_ERROR_LOG(rc);
free(slot_str);
return rc;
}
free(slot_str);
}
/* protect operation on the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
opal_list_append(&orte_odls_globals.children, &child->super);
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
}
}
@ -638,8 +685,7 @@ int orte_odls_base_default_construct_child_list(orte_gpr_notify_data_t *data,
}
static int odls_base_default_setup_fork(orte_app_context_t *context,
orte_odls_child_t *child,
orte_vpid_t vpid_start,
orte_std_cntr_t num_local_procs,
orte_vpid_t vpid_range,
orte_std_cntr_t total_slots_alloc,
bool want_processor, size_t processor,
@ -660,8 +706,6 @@ static int odls_base_default_setup_fork(orte_app_context_t *context,
opal_sys_limits.num_procs <= (int)opal_list_get_size(&orte_odls_globals.children)) {
/* at the system limit - abort */
ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_CHILDREN);
child->state = ORTE_PROC_STATE_FAILED_TO_START;
child->exit_code = ORTE_ERR_SYS_LIMITS_CHILDREN;
return ORTE_ERR_SYS_LIMITS_CHILDREN;
}
}
@ -671,14 +715,12 @@ static int odls_base_default_setup_fork(orte_app_context_t *context,
take care of outputting a pretty error message, if required
*/
if (ORTE_SUCCESS != (rc = orte_rmgr.check_context_cwd(context, true))) {
child->state = ORTE_PROC_STATE_FAILED_TO_START;
child->exit_code = rc;
return ORTE_ERR_SILENT; /* the error will be reported elsewhere */
/* do not ERROR_LOG - it will be reported elsewhere */
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rmgr.check_context_app(context))) {
child->state = ORTE_PROC_STATE_FAILED_TO_START;
child->exit_code = rc;
return ORTE_ERR_SILENT; /* the error will be reported elsewhere */
/* do not ERROR_LOG - it will be reported elsewhere */
return rc;
}
/* setup base environment: copy the current environ and merge
@ -804,7 +846,7 @@ static int odls_base_default_setup_fork(orte_app_context_t *context,
/* set the app_context number into the environment */
param = mca_base_param_environ_variable("orte","app","num");
asprintf(&param2, "%ld", (long)child->app_idx);
asprintf(&param2, "%ld", (long)context->idx);
opal_setenv(param, param2, true, environ_copy);
free(param);
free(param2);
@ -821,22 +863,25 @@ static int odls_base_default_setup_fork(orte_app_context_t *context,
opal_setenv(param, orte_system_info.nodename, true, environ_copy);
free(param);
/* push data into environment */
orte_ns_nds_env_put(child->name, vpid_start, vpid_range,
child->local_rank, child->num_procs,
environ_copy);
/* push data into environment - don't push any single proc
* info, though. We are setting the environment up on a
* per-context basis, and will add the individual proc
* info later
*/
orte_ns_nds_env_put(vpid_range, num_local_procs, environ_copy);
return ORTE_SUCCESS;
}
int orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_context_list,
orte_vpid_t vpid_start, orte_vpid_t vpid_range,
orte_std_cntr_t num_local_procs,
orte_vpid_t vpid_range,
orte_std_cntr_t total_slots_alloc,
bool node_oversubscribed,
bool override_oversubscribed,
orte_odls_base_fork_local_proc_fn_t fork_local)
{
char *job_str, *session_dir, *uri_file, *my_uri;
char *job_str, *vpid_str, *param, *value, *session_dir, *uri_file, *my_uri;
FILE *fp;
opal_list_item_t *item, *item2;
orte_app_context_t *app;
@ -845,7 +890,6 @@ int orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_conte
int i, num_processors;
bool want_processor, oversubscribed, quit_flag;
int rc;
char **environ_copy;
bool launch_failed=true;
/* protect operations involving the global list of children */
@ -965,6 +1009,36 @@ int orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_conte
opal_output(orte_odls_globals.output, "odls: oversubscribed set to %s want_processor set to %s",
oversubscribed ? "true" : "false", want_processor ? "true" : "false");
/* setup the environment for each context */
for (item2 = opal_list_get_first(app_context_list);
item2 != opal_list_get_end(app_context_list);
item2 = opal_list_get_next(item2)) {
app_item = (orte_odls_app_context_t*)item2;
if (ORTE_SUCCESS != (rc = odls_base_default_setup_fork(app_item->app_context,
num_local_procs,
vpid_range,
total_slots_alloc,
want_processor, i,
oversubscribed,
&app_item->environ_copy))) {
/* do not ERROR_LOG this failure - it will be reported
* elsewhere. The launch is going to fail - find at least one child
* in this job and mark it as failed-to-start
*/
for (item = opal_list_get_first(&orte_odls_globals.children);
!quit_flag && item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (ORTE_EQUAL == orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) {
child->state = ORTE_PROC_STATE_FAILED_TO_START;
child->exit_code = rc;
goto CLEANUP;
}
}
}
}
/* okay, now let's launch our local procs using the provided fork_local fn */
i = 0;
quit_flag = false;
@ -974,9 +1048,9 @@ int orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_conte
child = (orte_odls_child_t*)item;
/* is this child already alive? This can happen if
* we are asked to launch additional processes.
* If it has been launched, then do nothing
*/
* we are asked to launch additional processes.
* If it has been launched, then do nothing
*/
if (child->alive) {
opal_output(orte_odls_globals.output, "odls: child %s is already alive",
ORTE_NAME_PRINT(child->name));
@ -984,9 +1058,9 @@ int orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_conte
}
/* do we have a child from the specified job. Because the
* job could be given as a WILDCARD value, we must use
* the dss.compare function to check for equality.
*/
* job could be given as a WILDCARD value, we must use
* the dss.compare function to check for equality.
*/
if (ORTE_EQUAL != orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) {
opal_output(orte_odls_globals.output, "odls: child %s is not in job %ld being launched",
ORTE_NAME_PRINT(child->name), (long)job);
@ -995,13 +1069,14 @@ int orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_conte
opal_output(orte_odls_globals.output, "odls: preparing to launch child %s",
ORTE_NAME_PRINT(child->name));
/* find the indicated app_context in the list */
/* find the app context for this child */
for (item2 = opal_list_get_first(app_context_list);
item2 != opal_list_get_end(app_context_list);
item2 = opal_list_get_next(item2)) {
app_item = (orte_odls_app_context_t*)item2;
if (child->app_idx == app_item->app_context->idx) {
/* found it */
app = app_item->app_context;
goto DOFORK;
}
@ -1012,14 +1087,43 @@ int orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_conte
goto CLEANUP;
DOFORK:
/* setup the fork */
if (ORTE_SUCCESS != (rc = odls_base_default_setup_fork(app, child,
vpid_start, vpid_range, total_slots_alloc,
want_processor, i,
oversubscribed, &environ_copy))) {
/* setup the rest of the environment with the proc-specific items - these
* will be overwritten for each child
*/
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid_string(&job_str, child->name))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = orte_ns.get_vpid_string(&vpid_str, child->name))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if(NULL == (param = mca_base_param_environ_variable("ns","nds","jobid"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
opal_setenv(param, job_str, true, &app_item->environ_copy);
free(param);
free(job_str);
if(NULL == (param = mca_base_param_environ_variable("ns","nds","vpid"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
opal_setenv(param, vpid_str, true, &app_item->environ_copy);
free(param);
free(vpid_str);
asprintf(&value, "%lu", (unsigned long) child->local_rank);
if(NULL == (param = mca_base_param_environ_variable("ns","nds","local_rank"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
opal_setenv(param, value, true, &app_item->environ_copy);
free(param);
free(value);
/* must unlock prior to fork to keep things clean in the
* event library
@ -1027,7 +1131,7 @@ DOFORK:
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
if (ORTE_SUCCESS != (rc = fork_local(app, child, environ_copy))) {
if (ORTE_SUCCESS != (rc = fork_local(app, child, app_item->environ_copy))) {
/* do NOT ERROR_LOG this error - it generates
* a message/node as most errors will be common
* across the entire cluster. Instead, we let orterun
@ -1072,12 +1176,16 @@ CLEANUP:
}
int orte_odls_base_default_extract_proc_map_info(orte_process_name_t *daemon,
orte_process_name_t *proc,
opal_list_t *proc_list,
orte_gpr_value_t *value)
{
int rc;
orte_vpid_t *vptr;
orte_process_name_t name;
orte_std_cntr_t posn;
orte_namelist_t *proc;
/* daemon jobid is set by caller */
/* vpid of daemon that will host these procs is in first position */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, value->keyvals[0]->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
@ -1085,13 +1193,25 @@ int orte_odls_base_default_extract_proc_map_info(orte_process_name_t *daemon,
}
daemon->vpid = *vptr;
/* vpid of proc is in second position */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, value->keyvals[1]->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
/* loop through the children for this job and harvest their info */
posn = 3;
name.jobid = ORTE_JOBID_INVALID; /* must be reset by caller */
while (posn < value->cnt) {
proc = OBJ_NEW(orte_namelist_t);
/* 1st child posn - vpid */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, value->keyvals[posn]->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
name.vpid = *vptr;
if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&proc->name, &name, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
opal_list_append(proc_list, &proc->item);
posn += 4;
}
proc->vpid = *vptr;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -25,6 +25,7 @@
#include "opal/mca/base/mca_base_param.h"
#include "opal/util/output.h"
#include "opal/util/trace.h"
#include "opal/util/argv.h"
#include "orte/dss/dss.h"
#include "orte/util/proc_info.h"
@ -49,9 +50,20 @@
orte_odls_base_module_t orte_odls;
/* instance the app_context list object */
static void orte_odls_app_context_constructor(orte_odls_app_context_t *ptr)
{
ptr->environ_copy = NULL;
}
static void orte_odls_app_context_destructor(orte_odls_app_context_t *ptr)
{
if (NULL != ptr->environ_copy) {
opal_argv_free(ptr->environ_copy);
}
}
OBJ_CLASS_INSTANCE(orte_odls_app_context_t,
opal_list_item_t,
NULL, NULL);
orte_odls_app_context_constructor,
orte_odls_app_context_destructor);
/* instance the child list object */
@ -59,7 +71,6 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
{
ptr->name = NULL;
ptr->local_rank = ORTE_VPID_INVALID;
ptr->num_procs = 0;
ptr->pid = 0;
ptr->app_idx = -1;
ptr->alive = false;

Просмотреть файл

@ -57,7 +57,6 @@ typedef struct orte_odls_child_t {
opal_list_item_t super; /* required to place this on a list */
orte_process_name_t *name; /* the OpenRTE name of the proc */
orte_vpid_t local_rank; /* local rank of the proc on this node */
orte_std_cntr_t num_procs; /* number of procs from this job on this node */
pid_t pid; /* local pid of the proc */
orte_std_cntr_t app_idx; /* index of the app_context for this proc */
bool alive; /* is this proc alive? */
@ -76,6 +75,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);
typedef struct orted_odls_app_context_t {
opal_list_item_t super; /* required to place this on a list */
orte_app_context_t *app_context;
char **environ_copy; /* the environment for this app_context */
} orte_odls_app_context_t;
OBJ_CLASS_DECLARATION(orte_odls_app_context_t);
@ -111,7 +111,7 @@ orte_odls_base_default_get_add_procs_data(orte_gpr_notify_data_t **data,
ORTE_DECLSPEC int
orte_odls_base_default_construct_child_list(orte_gpr_notify_data_t *data,
orte_jobid_t *job,
orte_vpid_t *vpid_start,
orte_std_cntr_t *num_local_procs,
orte_vpid_t *vpid_range,
orte_std_cntr_t *total_slots_allocated,
bool *node_included,
@ -126,7 +126,8 @@ typedef int (*orte_odls_base_fork_local_proc_fn_t)(orte_app_context_t *context,
ORTE_DECLSPEC int
orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_context_list,
orte_vpid_t vpid_start, orte_vpid_t vpid_range,
orte_std_cntr_t num_local_procs,
orte_vpid_t vpid_range,
orte_std_cntr_t total_slots_allocated,
bool oversubscribed,
bool override_oversubscribed,
@ -134,7 +135,7 @@ orte_odls_base_default_launch_local(orte_jobid_t job, opal_list_t *app_context_l
ORTE_DECLSPEC int
orte_odls_base_default_extract_proc_map_info(orte_process_name_t *daemon,
orte_process_name_t *proc,
opal_list_t *proc_list,
orte_gpr_value_t *value);
ORTE_DECLSPEC int

Просмотреть файл

@ -352,9 +352,9 @@ static int odls_default_fork_local_proc(
int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data)
{
int rc;
orte_std_cntr_t total_slots_alloc;
orte_std_cntr_t total_slots_alloc, num_local_procs;
orte_jobid_t job;
orte_vpid_t start, range;
orte_vpid_t range;
opal_list_item_t *item;
bool node_included;
bool override_oversubscribed;
@ -371,7 +371,8 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data)
/* construct the list of children we are to launch */
if (ORTE_SUCCESS != (rc = orte_odls_base_default_construct_child_list(data, &job,
&start, &range,
&num_local_procs,
&range,
&total_slots_alloc,
&node_included,
&oversubscribed,
@ -389,7 +390,8 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data)
/* launch the local procs */
if (ORTE_SUCCESS != (rc = orte_odls_base_default_launch_local(job, &app_context_list,
start, range, total_slots_alloc,
num_local_procs,
range, total_slots_alloc,
oversubscribed,
override_oversubscribed,
odls_default_fork_local_proc))) {

Просмотреть файл

@ -84,7 +84,7 @@ typedef int (*orte_odls_base_module_deliver_message_fn_t)(orte_jobid_t job, orte
* Extract the mapping of daemon-proc pair
*/
typedef int (*orte_odls_base_module_extract_proc_map_info_fn_t)(orte_process_name_t *daemon,
orte_process_name_t *proc,
opal_list_t *proc_list,
orte_gpr_value_t *value);
/**
* Register to require sync before termination

Просмотреть файл

@ -20,7 +20,7 @@
#include "orte_config.h"
#include "orte/orte_constants.h"
#include "orte/dss/dss_types.h"
#include "orte/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/schema/schema_types.h"
@ -64,3 +64,57 @@ int orte_ras_base_set_oversubscribe_override(orte_jobid_t job)
return rc;
}
int orte_ras_base_get_oversubscribe_override(orte_jobid_t job, bool *flag)
{
orte_gpr_addr_mode_t addr_mode;
char *segment;
char *tokens[] = {
ORTE_JOB_GLOBALS,
NULL
};
char *keys[] = {
ORTE_JOB_OVERSUBSCRIBE_OVERRIDE_KEY,
NULL
};
orte_gpr_value_t **values;
orte_std_cntr_t i, cnt;
bool *bptr;
int rc;
*flag = false; /* default if flag not set */
addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
/* get the job segment name */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* get the data */
if (ORTE_SUCCESS != (rc = orte_gpr.get(addr_mode, segment, tokens,
keys,
&cnt, &values))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
free(segment);
if (0 < cnt) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, values[0]->keyvals[0]->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
*flag = *bptr;
}
CLEANUP:
for (i=0; i < cnt; i++) OBJ_RELEASE(values[i]);
if (NULL != values) {
free(values);
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -84,6 +84,8 @@ ORTE_DECLSPEC int orte_ras_base_reallocate(orte_jobid_t parent_jobid,
ORTE_DECLSPEC int orte_ras_base_set_oversubscribe_override(orte_jobid_t job);
ORTE_DECLSPEC int orte_ras_base_get_oversubscribe_override(orte_jobid_t job, bool *flag);
/*
* Query the registry for all available nodes
*/

Просмотреть файл

@ -159,9 +159,12 @@ int orte_routed_tree_init_routes(orte_jobid_t job, orte_gpr_notify_data_t *ndat)
* point at the daemon for each proc
*/
if (orte_process_info.daemon || orte_process_info.seed) {
orte_std_cntr_t i, j;
orte_std_cntr_t j;
orte_process_name_t daemon, proc;
orte_gpr_value_t **values, *value;
opal_list_t proc_list;
opal_list_item_t *item;
orte_namelist_t *nitem;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_tree: init routes for daemon/seed job %ld",
@ -212,26 +215,24 @@ int orte_routed_tree_init_routes(orte_jobid_t job, orte_gpr_notify_data_t *ndat)
values = (orte_gpr_value_t**)(ndat->values)->addr;
daemon.jobid = 0;
proc.jobid = job;
for (j=0, i=0; i < ndat->cnt && j < (ndat->values)->size; j++) { /* loop through all returned values */
if (NULL != values[j]) {
i++;
value = values[j];
if (NULL != value->tokens) {
/* this came from the globals container, so ignore it */
continue;
}
/* this must have come from one of the process containers, so it must
* contain data for a proc structure - extract what we need
*/
if (ORTE_SUCCESS != (rc = orte_odls.extract_proc_map_info(&daemon, &proc, value))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* loop through all returned values - the first position contains only
* job-global data, so we can skip it
*/
for (j=1; j < ndat->cnt; j++) {
value = values[j];
/* extract the relevant data for this node */
OBJ_CONSTRUCT(&proc_list, opal_list_t);
if (ORTE_SUCCESS != (rc = orte_odls.extract_proc_map_info(&daemon, &proc_list, value))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (0 != orte_ns.compare_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_NAME, &daemon)) {
while (NULL != (item = opal_list_remove_first(&proc_list))) {
nitem = (orte_namelist_t*)item;
proc.jobid = job;
proc.vpid = nitem->name->vpid;
if (ORTE_PROC_MY_NAME->vpid != daemon.vpid) {
/* Setup the route to the remote proc via its daemon */
if (ORTE_SUCCESS != (rc = orte_routed_tree_update_route(&proc, &daemon))) {
ORTE_ERROR_LOG(rc);
@ -247,7 +248,9 @@ int orte_routed_tree_init_routes(orte_jobid_t job, orte_gpr_notify_data_t *ndat)
return rc;
}
}
OBJ_RELEASE(nitem);
}
OBJ_DESTRUCT(&proc_list);
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
@ -268,8 +271,8 @@ int orte_routed_tree_init_routes(orte_jobid_t job, orte_gpr_notify_data_t *ndat)
mca_base_param_lookup_string(id, &rml_uri);
if (NULL == rml_uri) {
/* in this module, we absolutely MUST have this information - if
* we didn't get it, then error out
*/
* we didn't get it, then error out
*/
opal_output(0, "%s ERROR: Failed to identify the local daemon's URI",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_output(0, "%s ERROR: This is a fatal condition when the tree router",

Просмотреть файл

@ -71,14 +71,10 @@ extern "C" {
/*
* Put functions
*/
ORTE_DECLSPEC int orte_ns_nds_env_put(const orte_process_name_t* proc,
orte_vpid_t vpid_start,
orte_std_cntr_t num_procs,
orte_vpid_t local_rank,
ORTE_DECLSPEC int orte_ns_nds_env_put(orte_std_cntr_t num_procs,
orte_std_cntr_t num_local_procs,
char ***env);
ORTE_DECLSPEC int orte_ns_nds_pipe_put(const orte_process_name_t* proc,
orte_vpid_t vpid_start,
orte_std_cntr_t num_procs,
orte_vpid_t local_rank,
orte_std_cntr_t num_local_procs,

Просмотреть файл

@ -35,20 +35,11 @@
int orte_sds_env_get(void)
{
int vpid_start;
int num_procs;
int local_rank;
int num_local_procs;
int id;
id = mca_base_param_register_int("ns", "nds", "vpid_start", NULL, -1);
mca_base_param_lookup_int(id, &vpid_start);
if (vpid_start < 0) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
orte_process_info.vpid_start = (orte_vpid_t)vpid_start;
id = mca_base_param_register_int("ns", "nds", "num_procs", NULL, -1);
mca_base_param_lookup_int(id, &num_procs);
if (num_procs < 0) {

Просмотреть файл

@ -33,26 +33,12 @@
#include "orte/mca/ns/ns.h"
#include "orte/mca/errmgr/base/base.h"
int orte_ns_nds_env_put(const orte_process_name_t* name,
orte_vpid_t vpid_start, orte_std_cntr_t num_procs,
orte_vpid_t local_rank,
int orte_ns_nds_env_put(orte_std_cntr_t num_procs,
orte_std_cntr_t num_local_procs,
char ***env)
{
char* param;
char* jobid;
char* vpid;
char* value;
int rc;
if(ORTE_SUCCESS != (rc = orte_ns.get_jobid_string(&jobid, name))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(ORTE_SUCCESS != (rc = orte_ns.get_vpid_string(&vpid, name))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* set the mode to env */
if(NULL == (param = mca_base_param_environ_variable("ns","nds",NULL))) {
@ -80,32 +66,6 @@ int orte_ns_nds_env_put(const orte_process_name_t* name,
opal_unsetenv(param, env);
free(param);
/* setup the name */
if(NULL == (param = mca_base_param_environ_variable("ns","nds","jobid"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
opal_setenv(param, jobid, true, env);
free(param);
free(jobid);
if(NULL == (param = mca_base_param_environ_variable("ns","nds","vpid"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
opal_setenv(param, vpid, true, env);
free(param);
free(vpid);
asprintf(&value, "%lu", (unsigned long) vpid_start);
if(NULL == (param = mca_base_param_environ_variable("ns","nds","vpid_start"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
opal_setenv(param, value, true, env);
free(param);
free(value);
asprintf(&value, "%lu", (unsigned long) num_procs);
if(NULL == (param = mca_base_param_environ_variable("ns","nds","num_procs"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
@ -115,15 +75,6 @@ int orte_ns_nds_env_put(const orte_process_name_t* name,
free(param);
free(value);
asprintf(&value, "%lu", (unsigned long) local_rank);
if(NULL == (param = mca_base_param_environ_variable("ns","nds","local_rank"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
opal_setenv(param, value, true, env);
free(param);
free(value);
asprintf(&value, "%lu", (unsigned long) num_local_procs);
if(NULL == (param = mca_base_param_environ_variable("ns","nds","num_local_procs"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
@ -358,7 +309,6 @@ int orte_ns_nds_xcpu_put(orte_jobid_t job,
}
int orte_ns_nds_pipe_put(const orte_process_name_t* name,
orte_vpid_t vpid_start,
orte_std_cntr_t num_procs,
orte_vpid_t local_rank,
orte_std_cntr_t num_local_procs,
@ -372,12 +322,6 @@ int orte_ns_nds_pipe_put(const orte_process_name_t* name,
return ORTE_ERR_NOT_FOUND;
}
rc = write(fd,&vpid_start, sizeof(vpid_start));
if(rc != sizeof(vpid_start)) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_NOT_FOUND;
}
rc = write(fd,&num_procs, sizeof(num_procs));
if(rc != sizeof(num_procs)) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);

Просмотреть файл

@ -202,7 +202,6 @@ orte_sds_base_seed_set_name(void)
/* if we are a seed, then there can be only one proc */
orte_process_info.num_procs = 1;
orte_process_info.vpid_start = 0;
orte_process_info.local_rank = 0;
orte_process_info.num_local_procs = 1;

Просмотреть файл

@ -83,7 +83,6 @@ orte_sds_cnos_set_name(void)
return rc;
}
orte_process_info.vpid_start = (orte_vpid_t) 0;
orte_process_info.num_procs = (orte_std_cntr_t) cnos_get_size();
return ORTE_SUCCESS;

Просмотреть файл

@ -65,13 +65,6 @@ orte_sds_pipe_set_name(void)
return rc;
}
rc = read(fd,&orte_process_info.vpid_start, sizeof(orte_process_info.vpid_start));
if(rc != sizeof(orte_process_info.vpid_start)) {
opal_output(0, "orte_ns_nds_pipe_get: read returned %d, errno=%d\n", rc, errno);
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
rc = read(fd,&orte_process_info.num_procs, sizeof(orte_process_info.num_procs));
if(rc != sizeof(orte_process_info.num_procs)) {
opal_output(0, "orte_ns_nds_pipe_get: read returned %d, errno=%d\n", rc, errno);

Просмотреть файл

@ -108,7 +108,6 @@ orte_sds_portals_utcp_set_name(void)
if (nidmap_string[i] == ':') num_procs++;
}
orte_process_info.vpid_start = (orte_vpid_t) 0;
orte_process_info.num_procs = (size_t) num_procs;
return ORTE_SUCCESS;

Просмотреть файл

@ -71,7 +71,6 @@ typedef int (*orte_sds_base_contact_universe_fn_t)(void);
* includes:
*
* - orte_process_info.my_name
* - orte_process_info.vpid_start
* - orte_process_info.num_procs
*
* From this, the ns is able to develop a map of all processes started

Просмотреть файл

@ -53,7 +53,6 @@ orte_sds_singleton_set_name(void)
}
orte_process_info.num_procs = 1;
orte_process_info.vpid_start = ORTE_PROC_MY_NAME->vpid;
/* since we are a singleton, then we must have a local_rank of 0
* and only 1 local process
*/

Просмотреть файл

@ -39,7 +39,6 @@ ORTE_DECLSPEC orte_proc_info_t orte_process_info = {
/* ,app_num = */ -1,
/* ,universe_size = */ -1,
/* .singleton = */ false,
/* .vpid_start = */ 0,
/* .num_procs = */ 1,
/* .local_rank = */ ORTE_VPID_INVALID,
/* .num_local_procs = */ 0,

Просмотреть файл

@ -50,7 +50,6 @@ struct orte_proc_info_t {
orte_std_cntr_t app_num; /**< our index into the app_context array */
orte_std_cntr_t universe_size; /**< the size of the universe we are in */
bool singleton; /**< I am a singleton */
orte_vpid_t vpid_start; /**< starting vpid for this job */
orte_std_cntr_t num_procs; /**< number of processes in this job */
orte_vpid_t local_rank; /**< local rank on this node */
orte_std_cntr_t num_local_procs; /**< total number of procs on this node */