1
1

Update the Windows process ODLS.

This commit was SVN r15066.
Этот коммит содержится в:
George Bosilca 2007-06-14 04:32:19 +00:00
родитель 1e18265c16
Коммит 13a693faa0
2 изменённых файлов: 221 добавлений и 45 удалений

Просмотреть файл

@ -1254,7 +1254,6 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_default.mutex);
quit_flag = false;
for (item = opal_list_get_first(&orte_odls_default.children);
!quit_flag && item != opal_list_get_end(&orte_odls_default.children);

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2004-2005 The University of Tennessee and The University
* Copyright (c) 2004-2007 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
@ -39,6 +39,7 @@
#include "opal/util/opal_environ.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/util/num_procs.h"
#include "opal/util/sys_limits.h"
#include "orte/dss/dss.h"
#include "orte/util/sys_info.h"
@ -180,7 +181,24 @@ static int orte_odls_process_get_add_procs_data(orte_gpr_notify_data_t **data,
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[3]),
ORTE_PROC_LOCAL_RANK_KEY,
ORTE_VPID, &proc->local_rank))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[4]),
ORTE_NODE_NUM_PROCS_KEY,
ORTE_STD_CNTR, &node->num_procs))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
@ -222,7 +240,7 @@ static int orte_odls_process_kill_local( pid_t pid, int sig_num )
static int orte_odls_process_kill_local_procs(orte_jobid_t job, bool set_state)
{
orte_odls_child_t *child;
opal_list_item_t *item;
opal_list_item_t *item, *next;
int rc, exit_status;
opal_list_t procs_killed;
orte_namelist_t *proc;
@ -240,9 +258,12 @@ static int orte_odls_process_kill_local_procs(orte_jobid_t job, bool set_state)
for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children);
item = opal_list_get_next(item)) {
item = next) {
child = (orte_odls_child_t*)item;
/* preserve the pointer to the next item in list in case we release it */
next = opal_list_get_next(item);
opal_output(orte_odls_globals.output, "[%ld,%ld,%ld] odls_kill_local_proc: checking child process [%ld,%ld,%ld]",
ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), ORTE_NAME_ARGS(child->name));
@ -311,6 +332,9 @@ MOVEON:
return rc;
}
opal_list_append(&procs_killed, &proc->item);
/* release the object since we killed it */
OBJ_RELEASE(child);
}
/* we are done with the global list, so we can now release
@ -372,14 +396,40 @@ static void odls_process_wait_local_proc(pid_t pid, int status, void* cbdata)
* dead. If the latter, then we have a problem as it means we are detecting
* it exiting multiple times
*/
opal_output(orte_odls_globals.output, "odls: did not find pid %ld in table!", (long) pid);
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
opal_condition_signal(&orte_odls_process.cond);
OPAL_THREAD_UNLOCK(&orte_odls_process.mutex);
return;
GOTCHILD:
orte_iof.iof_flush();
/* If this child was the (vpid==0), we hooked it up to orterun's
STDIN SOURCE earlier (do not change this without also changing
odsl_process_fork_local_proc()). So we have to tell the SOURCE
a) that we don't want any more data and b) that it should not
expect any more ACKs from this endpoint (so that the svc
component can still flush/shut down cleanly).
Note that the source may have already detected that this
process died as part of an OOB/RML exception, but that's ok --
its "exception" detection capabilities are not reliable, so we
*have* to do this unpublish here, even if it arrives after an
exception is detected and handled (in which case this unpublish
request will be ignored/discarded. */
opal_output(orte_odls_globals.output,
"odls: pid %ld corresponds to [%lu,%lu,%lu]\n",
(long) pid, ORTE_NAME_ARGS(child->name));
#if 0
if (0 == child->name->vpid) {
rc = orte_iof.iof_unpublish(child->name, ORTE_NS_CMP_ALL,
ORTE_IOF_STDIN);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
/* We can't really abort, so keep going... */
}
}
opal_output(orte_odls_globals.output, "orted sent IOF unpub message!\n");
#endif
/* determine the state of this process */
aborted = false;
if(WIFEXITED(status)) {
@ -450,13 +500,7 @@ MOVEON:
opal_condition_signal(&orte_odls_process.cond);
OPAL_THREAD_UNLOCK(&orte_odls_process.mutex);
if (aborted) {
rc = orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_ABORTED, status);
} else {
rc = orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_TERMINATED, status);
}
if (ORTE_SUCCESS != rc) {
if (ORTE_SUCCESS != (rc = orte_smr.set_proc_state(child->name, child->state, status))) {
ORTE_ERROR_LOG(rc);
}
}
@ -472,6 +516,7 @@ static int orte_odls_process_fork_local_proc(
orte_vpid_t vpid_range,
bool want_processor,
size_t processor,
bool oversubscribed,
char **base_environ)
{
pid_t pid;
@ -481,25 +526,42 @@ static int orte_odls_process_fork_local_proc(
char** environ_copy;
char *param, *param2;
char *uri;
bool oversubscribed=false;
size_t num_processors;
/* check the system limits - if we are at our max allowed children, then
* we won't be allowed to do this anyway, so we may as well abort now.
* According to the documentation, num_procs = 0 is equivalent to
* to no limit, so treat it as unlimited here.
*/
if (opal_sys_limits.initialized) {
if (0 < opal_sys_limits.num_procs &&
opal_sys_limits.num_procs <= (int)opal_list_get_size(&orte_odls_process.children)) {
/* at the system limit - abort */
ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_CHILDREN);
child->state = ORTE_PROC_STATE_FAILED_TO_START;
child->exit_code = ORTE_ERR_SYS_LIMITS_CHILDREN;
return ORTE_ERR_SYS_LIMITS_CHILDREN;
}
}
/* should pull this information from MPIRUN instead of going with
default */
opts.usepty = OMPI_ENABLE_PTY_SUPPORT;
/* BWB - Fix post beta. Should setup stdin in orterun and
make part of the app_context */
/* BWB - Fix post beta. Should setup stdin in orterun and make
part of the app_context. Do not change this without also
changing the reverse of this in
odls_default_wait_local_proc(). */
if( 0 == child->name->vpid ) {
opts.connect_stdin = true;
} else {
opts.connect_stdin = false;
}
rc = orte_iof_base_setup_prefork(&opts);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_prefork(&opts))) {
ORTE_ERROR_LOG(rc);
child->state = ORTE_PROC_STATE_FAILED_TO_START;
child->exit_code = rc;
return rc;
}
/* Try to change to the context cwd and check that the app
@ -507,11 +569,9 @@ static int orte_odls_process_fork_local_proc(
out a pretty error message if either of these operations fails
*/
if (ORTE_SUCCESS != (i = orte_rmgr.check_context_cwd(context, true))) {
/* Tell the parent that Badness happened */
return ORTE_ERR_FATAL;
}
if (ORTE_SUCCESS != (i = orte_rmgr.check_context_app(context))) {
/* Tell the parent that Badness happened */
return ORTE_ERR_FATAL;
}
@ -557,6 +617,13 @@ static int orte_odls_process_fork_local_proc(
opal_unsetenv(param, &environ_copy);
free(param);
/* pass my contact info to the local proc so we can talk */
uri = orte_rml.get_uri();
param = mca_base_param_environ_variable("orte","local_daemon","uri");
opal_setenv(param, uri, true, &environ_copy);
free(param);
free(uri);
/* setup yield schedule and processor affinity
* We default here to always setting the affinity processor if we want
* it. The processor affinity system then determines
@ -632,12 +699,9 @@ static int orte_odls_process_fork_local_proc(
opal_setenv(param, orte_system_info.nodename, true, &environ_copy);
free(param);
opal_get_num_processors(&rc);
num_processors = (size_t)rc;
/* push name into environment */
orte_ns_nds_env_put(child->name, vpid_start,
num_processors,vpid_range,
orte_process_info.num_local_procs,
orte_ns_nds_env_put(child->name, vpid_start, vpid_range,
child->local_rank, child->num_procs,
&environ_copy);
if (context->argv == NULL) {
@ -645,7 +709,14 @@ static int orte_odls_process_fork_local_proc(
context->argv[0] = strdup(context->app);
context->argv[1] = NULL;
}
#if 0
/* connect endpoints IOF */
rc = orte_iof_base_setup_parent(child->name, &opts);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
#endif
/* Flush all standard handles (stdin, stdout & stderr). */
_flushall();
@ -653,6 +724,8 @@ static int orte_odls_process_fork_local_proc(
intptr_t handle = _spawnve( _P_NOWAIT, context->app, context->argv, environ_copy );
if( -1 == handle ) {
child->state = ORTE_PROC_STATE_FAILED_TO_START;
child->exit_code = ORTE_ERR_PIPE_READ_FAILURE;
opal_show_help("help-orted-launcer.txt", "orted-launcher:execv-error",
true, context->app, "TODO: some error");
orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_ABORTED, -1);
@ -661,19 +734,11 @@ static int orte_odls_process_fork_local_proc(
pid = handle;
}
/* set the proc state to LAUNCHED and increment that counter so the trigger can fire
*/
if (ORTE_SUCCESS !=
(rc = orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_LAUNCHED, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* set the proc state to LAUNCHED and save the pid */
child->state = ORTE_PROC_STATE_LAUNCHED;
child->pid = pid;
child->alive = true;
return ORTE_SUCCESS;
}
@ -698,7 +763,11 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
size_t num_processors;
bool oversubscribed=false, want_processor, *bptr, override_oversubscribed=false;
opal_list_item_t *item, *item2;
bool quit_flag;
bool node_included;
orte_filem_base_request_t *filem_request;
char *job_str, *uri_file, *my_uri, *session_dir=NULL;
FILE *fp;
/* parse the returned data to create the required structures
* for a fork launch. Since the data will contain information
@ -729,6 +798,9 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
start = ORTE_VPID_INVALID;
range = ORTE_VPID_INVALID;
/* set the flag indicating this node is not included in the launch data */
node_included = false;
values = (orte_gpr_value_t**)(data->values)->addr;
for (j=0, i=0; i < data->cnt && j < (data->values)->size; j++) { /* loop through all returned values */
if (NULL != values[j]) {
@ -805,6 +877,9 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
}
/* if this is our node...must also protect against a zero-length string */
if (NULL != node_name && 0 == strcmp(node_name, orte_system_info.nodename)) {
/* indicate that there is something for us to do */
node_included = true;
/* ...harvest the info into a new child structure */
child = OBJ_NEW(orte_odls_child_t);
for (kv2 = 0; kv2 < value->cnt; kv2++) {
@ -825,6 +900,22 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
child->app_idx = *sptr; /* save the index into the app_context objects */
continue;
}
if(strcmp(kval->key, ORTE_PROC_LOCAL_RANK_KEY) == 0) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, kval->value, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
child->local_rank = *vptr; /* save the local_rank */
continue;
}
if(strcmp(kval->key, ORTE_NODE_NUM_PROCS_KEY) == 0) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, kval->value, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
child->num_procs = *sptr; /* save the number of procs from this job on this node */
continue;
}
if(strcmp(kval->key, ORTE_NODE_OVERSUBSCRIBED_KEY) == 0) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, kval->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
@ -847,6 +938,46 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
} /* for j */
}
/* if there is nothing for us to do, just return */
if (!node_included) {
return ORTE_SUCCESS;
}
/* record my uri in a file within the session directory so the local proc
* can contact me
*/
opal_output(orte_odls_globals.output, "odls: dropping local uri file");
/* put the file in the job session dir for the job being launched */
orte_ns.convert_jobid_to_string(&job_str, job);
if (ORTE_SUCCESS != (rc = orte_session_dir(true, NULL, NULL, NULL,
NULL, NULL, job_str, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* get the session dir name so we can put the file there */
if (ORTE_SUCCESS != (rc = orte_session_dir_get_name(&session_dir, NULL, NULL, NULL,
NULL, NULL, NULL, job_str, NULL))) {
ORTE_ERROR_LOG(rc);
free(job_str);
return rc;
}
free(job_str);
/* create the file and put my uri into it */
uri_file = opal_os_path(false, session_dir, "orted-uri.txt", NULL);
fp = fopen(uri_file, "w");
if (NULL == fp) {
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
return ORTE_ERR_FILE_OPEN_FAILURE;
}
my_uri = orte_rml.get_uri();
fprintf(fp, "%s\n", my_uri);
fclose(fp);
free(uri_file);
free(my_uri);
/* Now we preload any files that are needed. This is done on a per
* app context basis */
for (item = opal_list_get_first(&app_context_list);
@ -945,8 +1076,9 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_process.mutex);
for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children);
quit_flag = false;
for( item = opal_list_get_first(&orte_odls_process.children);
((item != opal_list_get_end(&orte_odls_process.children)) && (false == quit_flag));
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
@ -955,6 +1087,8 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
* If it has been launched, then do nothing
*/
if (child->alive) {
opal_output(orte_odls_globals.output, "odls: child [%ld,%ld,%ld] is already alive",
ORTE_NAME_ARGS(child->name));
continue;
}
@ -963,6 +1097,8 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
* the dss.compare function to check for equality.
*/
if (ORTE_EQUAL != orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) {
opal_output(orte_odls_globals.output, "odls: child [%ld,%ld,%ld] is not in job %ld being launched",
ORTE_NAME_ARGS(child->name), (long)job);
continue;
}
@ -994,11 +1130,14 @@ DOFORK:
if (ORTE_SUCCESS != (rc = orte_odls_process_fork_local_proc(app, child, start,
range, want_processor,
i, base_environ))) {
ORTE_ERROR_LOG(rc);
orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_ABORTED, 0);
opal_condition_signal(&orte_odls_process.cond);
return rc;
i, oversubscribed,
base_environ))) {
/* do NOT ERROR_LOG this error - it generates
* a message/node as most errors will be common
* across the entire cluster. Instead, we let orterun
* output a consolidated error message for us
*/
quit_flag = true;
}
/* reaquire lock so we don't double unlock... */
OPAL_THREAD_LOCK(&orte_odls_process.mutex);
@ -1018,7 +1157,9 @@ DOFORK:
child = (orte_odls_child_t*)item;
if (ORTE_PROC_STATE_LAUNCHED == child->state) {
OPAL_THREAD_UNLOCK(&orte_odls_process.mutex);
orte_wait_cb(child->pid, odls_process_wait_local_proc, NULL);
OPAL_THREAD_LOCK(&orte_odls_process.mutex);
child->state = ORTE_PROC_STATE_RUNNING;
}
}
@ -1091,6 +1232,42 @@ static int orte_odls_process_signal_local_proc(const orte_process_name_t *proc,
return ORTE_ERR_NOT_FOUND;
}
int orte_odls_process_deliver_message(orte_jobid_t job, orte_buffer_t *buffer, orte_rml_tag_t tag)
{
int rc;
opal_list_item_t *item;
orte_odls_child_t *child;
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_process.mutex);
for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* do we have a child from the specified job. Because the
* job could be given as a WILDCARD value, we must use
* the dss.compare function to check for equality.
*/
if (ORTE_EQUAL != orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) {
continue;
}
opal_output(orte_odls_globals.output, "odls: sending message to tag %lu on child [%ld, %ld, %ld]",
(unsigned long)tag, ORTE_NAME_ARGS(child->name));
/* if so, send the message */
rc = orte_rml.send_buffer(child->name, buffer, tag, 0);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
}
}
opal_condition_signal(&orte_odls_process.cond);
OPAL_THREAD_UNLOCK(&orte_odls_process.mutex);
return ORTE_SUCCESS;
}
orte_odls_base_module_1_3_0_t orte_odls_process_module = {
orte_odls_process_get_add_procs_data,
orte_odls_process_launch_local_procs,