1
1

Update the Windows ODLS component.

This commit was SVN r15020.
Этот коммит содержится в:
George Bosilca 2007-06-12 22:37:04 +00:00
родитель bf6f30a42c
Коммит 16c38cabe1
3 изменённых файлов: 384 добавлений и 193 удалений

Просмотреть файл

@ -48,20 +48,6 @@ typedef struct orte_odls_process_globals_t {
extern orte_odls_process_globals_t orte_odls_process; extern orte_odls_process_globals_t orte_odls_process;
/*
* List object to locally store the process names and pids of
* our children. This can subsequently be used to order termination
* or pass signals without looking the info up again.
*/
typedef struct odls_process_child_t {
opal_list_item_t super; /* required to place this on a list */
orte_process_name_t *name; /* the OpenRTE name of the proc */
pid_t pid; /* local pid of the proc */
orte_std_cntr_t app_idx; /* index of the app_context for this proc */
bool alive; /* is this proc alive? */
} odls_process_child_t;
OBJ_CLASS_DECLARATION(odls_process_child_t);
/* /*
* List object to locally store app_contexts returned by the * List object to locally store app_contexts returned by the
* registry subscription. Since we don't know how many app_contexts will * registry subscription. Since we don't know how many app_contexts will

Просмотреть файл

@ -28,24 +28,6 @@
orte_odls_process_globals_t orte_odls_process; orte_odls_process_globals_t orte_odls_process;
/* instance the child list object */
static void odls_process_child_constructor(odls_process_child_t *ptr)
{
ptr->name = NULL;
ptr->pid = 0;
ptr->app_idx = -1;
ptr->alive = false;
}
static void odls_process_child_destructor(odls_process_child_t *ptr)
{
if (NULL != ptr->name) free(ptr->name);
}
OBJ_CLASS_INSTANCE(odls_process_child_t,
opal_list_item_t,
odls_process_child_constructor,
odls_process_child_destructor);
/* instance the app_context list object */ /* instance the app_context list object */
OBJ_CLASS_INSTANCE(odls_process_app_context_t, OBJ_CLASS_INSTANCE(odls_process_app_context_t,
opal_list_item_t, opal_list_item_t,

Просмотреть файл

@ -15,18 +15,19 @@
#include <stdlib.h> #include <stdlib.h>
#ifdef HAVE_UNISTD_H #ifdef HAVE_UNISTD_H
#include <unistd.h> #include <unistd.h>
#endif #endif /* HAVE_UNISTD_H */
#include <errno.h> #include <errno.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h> #include <sys/types.h>
#endif /* HAVE_SYS_TYPES_H */
#include <sys/stat.h> #include <sys/stat.h>
#ifdef HAVE_SYS_WAIT_H #ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h> #include <sys/wait.h>
#endif #endif /* HAVE_SYS_WAIT_H */
#include <signal.h> #include <signal.h>
#ifdef HAVE_FCNTL_H #ifdef HAVE_FCNTL_H
#include <fcntl.h> #include <fcntl.h>
#endif #endif /* HAVE_FCNTL_H */
#include <time.h>
#include "opal/event/event.h" #include "opal/event/event.h"
#include "opal/util/argv.h" #include "opal/util/argv.h"
@ -37,7 +38,7 @@
#include "opal/util/basename.h" #include "opal/util/basename.h"
#include "opal/util/opal_environ.h" #include "opal/util/opal_environ.h"
#include "opal/mca/base/mca_base_param.h" #include "opal/mca/base/mca_base_param.h"
#include "opal/mca/paffinity/base/base.h" #include "opal/util/num_procs.h"
#include "orte/dss/dss.h" #include "orte/dss/dss.h"
#include "orte/util/sys_info.h" #include "orte/util/sys_info.h"
@ -55,129 +56,21 @@
#include "orte/mca/gpr/gpr.h" #include "orte/mca/gpr/gpr.h"
#include "orte/mca/rmaps/base/base.h" #include "orte/mca/rmaps/base/base.h"
#include "orte/mca/smr/smr.h" #include "orte/mca/smr/smr.h"
#include "orte/mca/filem/filem.h"
#include "orte/mca/filem/base/base.h"
#include "orte/mca/odls/base/odls_private.h" #include "orte/mca/odls/base/odls_private.h"
#include "orte/mca/odls/process/odls_process.h" #include "orte/mca/odls/process/odls_process.h"
static void set_handler_default(int sig); static void set_handler_default(int sig);
static bool is_preload_local_dup(char *local_ref, orte_filem_base_request_t *filem_request);
static int orte_pls_fork_preload_append_files(orte_app_context_t* context,
orte_filem_base_request_t *filem_request);
static int orte_pls_fork_preload_append_binary(orte_app_context_t* context,
orte_filem_base_request_t *filem_request);
/* this entire function gets called within a GPR compound command, static int orte_odls_process_get_add_procs_data(orte_gpr_notify_data_t **data,
* so the subscription actually doesn't get done until the orted orte_job_map_t *map)
* executes the compound command
*/
static int orte_odls_process_subscribe_launch_data( orte_jobid_t job,
orte_gpr_notify_cb_fn_t cbfunc )
{
char *segment;
orte_gpr_value_t *values[2];
orte_gpr_subscription_t *subs, sub=ORTE_GPR_SUBSCRIPTION_EMPTY;
orte_gpr_trigger_t *trigs, trig=ORTE_GPR_TRIGGER_EMPTY;
char *glob_keys[] = {
ORTE_JOB_APP_CONTEXT_KEY,
ORTE_JOB_VPID_START_KEY,
ORTE_JOB_VPID_RANGE_KEY,
ORTE_JOB_OVERSUBSCRIBE_OVERRIDE_KEY
};
int num_glob_keys = 4;
char* keys[] = {
ORTE_PROC_NAME_KEY,
ORTE_PROC_APP_CONTEXT_KEY,
ORTE_NODE_NAME_KEY,
ORTE_NODE_OVERSUBSCRIBED_KEY
};
int num_keys = 4;
int i, rc;
/* get the job segment name */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* attach ourselves to the "standard" orted trigger */
if (ORTE_SUCCESS !=
(rc = orte_schema.get_std_trigger_name(&(trig.name),
ORTED_LAUNCH_STAGE_GATE_TRIGGER, job))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
/* ask for return of all data required for launching local processes */
subs = &sub;
sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG;
if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&(sub.name),
ORTED_LAUNCH_STG_SUB,
job))) {
ORTE_ERROR_LOG(rc);
free(segment);
free(trig.name);
return rc;
}
sub.cnt = 2;
sub.values = values;
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_TOKENS_OR, segment,
num_glob_keys, 1))) {
ORTE_ERROR_LOG(rc);
free(segment);
free(sub.name);
free(trig.name);
return rc;
}
values[0]->tokens[0] = strdup(ORTE_JOB_GLOBALS);
for (i=0; i < num_glob_keys; i++) {
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[0]->keyvals[i]),
glob_keys[i], ORTE_UNDEF, NULL))) {
ORTE_ERROR_LOG(rc);
free(segment);
free(sub.name);
free(trig.name);
OBJ_RELEASE(values[0]);
return rc;
}
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[1]), ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,
segment, num_keys, 0))) {
ORTE_ERROR_LOG(rc);
free(segment);
free(sub.name);
free(trig.name);
OBJ_RELEASE(values[0]);
return rc;
}
for (i=0; i < num_keys; i++) {
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[1]->keyvals[i]),
keys[i], ORTE_UNDEF, NULL))) {
ORTE_ERROR_LOG(rc);
free(segment);
free(sub.name);
free(trig.name);
OBJ_RELEASE(values[0]);
OBJ_RELEASE(values[1]);
return rc;
}
}
sub.cbfunc = cbfunc;
trigs = &trig;
/* do the subscription */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
ORTE_ERROR_LOG(rc);
}
free(segment);
free(sub.name);
free(trig.name);
OBJ_RELEASE(values[0]);
OBJ_RELEASE(values[1]);
return rc;
}
static int orte_odls_process_get_add_procs_data(orte_gpr_notify_data_t **data, orte_job_map_t *map)
{ {
orte_gpr_notify_data_t *ndat; orte_gpr_notify_data_t *ndat;
orte_gpr_value_t **values, *value; orte_gpr_value_t **values, *value;
@ -328,7 +221,7 @@ static int orte_odls_process_kill_local( pid_t pid, int sig_num )
static int orte_odls_process_kill_local_procs(orte_jobid_t job, bool set_state) static int orte_odls_process_kill_local_procs(orte_jobid_t job, bool set_state)
{ {
odls_process_child_t *child; orte_odls_child_t *child;
opal_list_item_t *item; opal_list_item_t *item;
int rc, exit_status; int rc, exit_status;
opal_list_t procs_killed; opal_list_t procs_killed;
@ -336,6 +229,9 @@ static int orte_odls_process_kill_local_procs(orte_jobid_t job, bool set_state)
OBJ_CONSTRUCT(&procs_killed, opal_list_t); OBJ_CONSTRUCT(&procs_killed, opal_list_t);
opal_output(orte_odls_globals.output, "[%ld,%ld,%ld] odls_kill_local_proc: working on job %ld",
ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), (long)job);
/* since we are going to be working with the global list of /* since we are going to be working with the global list of
* children, we need to protect that list from modification * children, we need to protect that list from modification
* by other threads * by other threads
@ -345,25 +241,39 @@ static int orte_odls_process_kill_local_procs(orte_jobid_t job, bool set_state)
for (item = opal_list_get_first(&orte_odls_process.children); for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children); item != opal_list_get_end(&orte_odls_process.children);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
child = (odls_process_child_t*)item; child = (orte_odls_child_t*)item;
opal_output(orte_odls_globals.output, "[%ld,%ld,%ld] odls_kill_local_proc: checking child process [%ld,%ld,%ld]",
ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), ORTE_NAME_ARGS(child->name));
/* do we have a child from the specified job? Because the
* job could be given as a WILDCARD value, we must use
* the dss.compare function to check for equality.
*/
if (ORTE_EQUAL != orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) {
continue;
}
/* remove the child from the list since it is either already dead or soon going to be dead */
opal_list_remove_item(&orte_odls_process.children, item);
/* is this process alive? if not, then nothing for us /* is this process alive? if not, then nothing for us
* to do to it * to do to it
*/ */
if (!child->alive) { if (!child->alive) {
continue; opal_output(orte_odls_globals.output, "[%ld,%ld,%ld] odls_kill_local_proc: child [%ld,%ld,%ld] is not alive",
ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), ORTE_NAME_ARGS(child->name));
/* ensure, though, that the state is terminated so we don't lockup if
* the proc never started
*/
goto MOVEON;
} }
/* do we have a child from the specified job? Because the
* job could be given as a WILDCARD value, we must use
* the dss.compare function to check for equality.
*/
if (ORTE_EQUAL != orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) {
continue;
}
/* de-register the SIGCHILD callback for this pid */ /* de-register the SIGCHILD callback for this pid */
orte_wait_cb_cancel(child->pid); if (ORTE_SUCCESS != (rc = orte_wait_cb_cancel(child->pid))) {
/* no need to error_log this - it just means that the pid is already gone */
goto MOVEON;
}
/* Send a sigterm to the process. */ /* Send a sigterm to the process. */
if (0 != orte_odls_process_kill_local(child->pid, SIGTERM)) { if (0 != orte_odls_process_kill_local(child->pid, SIGTERM)) {
@ -433,13 +343,15 @@ MOVEON:
*/ */
static void odls_process_wait_local_proc(pid_t pid, int status, void* cbdata) static void odls_process_wait_local_proc(pid_t pid, int status, void* cbdata)
{ {
odls_process_child_t *child; orte_odls_child_t *child;
opal_list_item_t *item; opal_list_item_t *item;
bool aborted; bool aborted;
char *job, *vpid, *abort_file; char *job, *vpid, *abort_file;
struct _stat buf; struct _stat buf;
int rc; int rc;
opal_output(orte_odls_globals.output, "odls: child process terminated");
/* since we are going to be working with the global list of /* since we are going to be working with the global list of
* children, we need to protect that list from modification * children, we need to protect that list from modification
* by other threads. This will also be used to protect us * by other threads. This will also be used to protect us
@ -451,7 +363,7 @@ static void odls_process_wait_local_proc(pid_t pid, int status, void* cbdata)
for (item = opal_list_get_first(&orte_odls_process.children); for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children); item != opal_list_get_end(&orte_odls_process.children);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
child = (odls_process_child_t*)item; child = (orte_odls_child_t*)item;
if (child->alive && pid == child->pid) { /* found it */ if (child->alive && pid == child->pid) { /* found it */
goto GOTCHILD; goto GOTCHILD;
} }
@ -498,13 +410,20 @@ GOTCHILD:
/* the abort file must exist - there is nothing in it we need. It's /* the abort file must exist - there is nothing in it we need. It's
* meer existence indicates that an abnormal termination occurred * meer existence indicates that an abnormal termination occurred
*/ */
opal_output(orte_odls_globals.output, "odls: child [%ld,%ld,%ld] died by abort",
ORTE_NAME_ARGS(child->name));
aborted = true; aborted = true;
free(abort_file); free(abort_file);
} else {
opal_output(orte_odls_globals.output, "odls: child process [%ld,%ld,%ld] terminated normally",
ORTE_NAME_ARGS(child->name));
} }
} else { } else {
/* the process was terminated with a signal! That's definitely /* the process was terminated with a signal! That's definitely
* abnormal, so indicate that condition * abnormal, so indicate that condition
*/ */
opal_output(orte_odls_globals.output, "odls: child process [%ld,%ld,%ld] terminated with signal",
ORTE_NAME_ARGS(child->name));
aborted = true; aborted = true;
} }
@ -518,6 +437,13 @@ MOVEON:
*/ */
orte_session_dir_finalize(child->name); orte_session_dir_finalize(child->name);
/* set the proc state in the child structure */
if (aborted) {
child->state = ORTE_PROC_STATE_ABORTED;
} else {
child->state = ORTE_PROC_STATE_TERMINATED;
}
/* Need to unlock before we call set_proc_state as this is going to generate /* Need to unlock before we call set_proc_state as this is going to generate
* a trigger that will eventually callback to us * a trigger that will eventually callback to us
*/ */
@ -541,7 +467,7 @@ MOVEON:
static int orte_odls_process_fork_local_proc( static int orte_odls_process_fork_local_proc(
orte_app_context_t* context, orte_app_context_t* context,
odls_process_child_t *child, orte_odls_child_t *child,
orte_vpid_t vpid_start, orte_vpid_t vpid_start,
orte_vpid_t vpid_range, orte_vpid_t vpid_range,
bool want_processor, bool want_processor,
@ -555,6 +481,7 @@ static int orte_odls_process_fork_local_proc(
char** environ_copy; char** environ_copy;
char *param, *param2; char *param, *param2;
char *uri; char *uri;
bool oversubscribed=false;
size_t num_processors; size_t num_processors;
/* should pull this information from MPIRUN instead of going with /* should pull this information from MPIRUN instead of going with
@ -569,6 +496,12 @@ static int orte_odls_process_fork_local_proc(
opts.connect_stdin = false; opts.connect_stdin = false;
} }
rc = orte_iof_base_setup_prefork(&opts);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* Try to change to the context cwd and check that the app /* Try to change to the context cwd and check that the app
exists and is executable. The RMGR functions will print exists and is executable. The RMGR functions will print
out a pretty error message if either of these operations fails out a pretty error message if either of these operations fails
@ -624,6 +557,22 @@ static int orte_odls_process_fork_local_proc(
opal_unsetenv(param, &environ_copy); opal_unsetenv(param, &environ_copy);
free(param); free(param);
/* setup yield schedule and processor affinity
* We default here to always setting the affinity processor if we want
* it. The processor affinity system then determines
* if processor affinity is enabled/requested - if so, it then uses
* this value to select the process to which the proc is "assigned".
* Otherwise, the paffinity subsystem just ignores this value anyway
*/
if (oversubscribed) {
param = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle");
opal_setenv(param, "1", false, &environ_copy);
} else {
param = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle");
opal_setenv(param, "0", false, &environ_copy);
}
free(param);
if (want_processor) { if (want_processor) {
param = mca_base_param_environ_variable("mpi", NULL, param = mca_base_param_environ_variable("mpi", NULL,
"paffinity_processor"); "paffinity_processor");
@ -631,6 +580,11 @@ static int orte_odls_process_fork_local_proc(
opal_setenv(param, param2, true, &environ_copy); opal_setenv(param, param2, true, &environ_copy);
free(param); free(param);
free(param2); free(param2);
} else {
param = mca_base_param_environ_variable("mpi", NULL,
"paffinity_processor");
opal_unsetenv(param, &environ_copy);
free(param);
} }
/* setup universe info */ /* setup universe info */
@ -666,12 +620,19 @@ static int orte_odls_process_fork_local_proc(
free(param); free(param);
free(uri); free(uri);
/* set the app_context number into the environment */
param = mca_base_param_environ_variable("orte","app","num");
asprintf(&param2, "%ld", (long)child->app_idx);
opal_setenv(param, param2, true, &environ_copy);
free(param);
free(param2);
/* use same nodename as the starting daemon (us) */ /* use same nodename as the starting daemon (us) */
param = mca_base_param_environ_variable("orte", "base", "nodename"); param = mca_base_param_environ_variable("orte", "base", "nodename");
opal_setenv(param, orte_system_info.nodename, true, &environ_copy); opal_setenv(param, orte_system_info.nodename, true, &environ_copy);
free(param); free(param);
opal_paffinity_base_get_num_processors(&rc); opal_get_num_processors(&rc);
num_processors = (size_t)rc; num_processors = (size_t)rc;
/* push name into environment */ /* push name into environment */
orte_ns_nds_env_put(child->name, vpid_start, orte_ns_nds_env_put(child->name, vpid_start,
@ -708,16 +669,11 @@ static int orte_odls_process_fork_local_proc(
return rc; return rc;
} }
/* save the pid and indicate we've been launched */ /* set the proc state to LAUNCHED and save the pid */
child->state = ORTE_PROC_STATE_LAUNCHED;
child->pid = pid; child->pid = pid;
child->alive = true; child->alive = true;
/* wait for the child process - dont register for wait
* callback until after I/O is setup and the pid registered -
* otherwise can receive the wait callback before the above is
* ever completed
*/
orte_wait_cb(pid, odls_process_wait_local_proc, NULL);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -737,11 +693,12 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
orte_vpid_t *vptr, start, range; orte_vpid_t *vptr, start, range;
char *node_name; char *node_name;
opal_list_t app_context_list; opal_list_t app_context_list;
odls_process_child_t *child; orte_odls_child_t *child;
odls_process_app_context_t *app_item; odls_process_app_context_t *app_item;
size_t num_processors; size_t num_processors;
bool want_processor; bool oversubscribed=false, want_processor, *bptr, override_oversubscribed=false;
opal_list_item_t *item, *item2; opal_list_item_t *item, *item2;
orte_filem_base_request_t *filem_request;
/* parse the returned data to create the required structures /* parse the returned data to create the required structures
* for a fork launch. Since the data will contain information * for a fork launch. Since the data will contain information
@ -757,6 +714,9 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
opal_output(orte_odls_globals.output, "odls: setting up launch for job %ld", (long)job);
/* We need to create a list of the app_contexts /* We need to create a list of the app_contexts
* so we can know what to launch - the process info only gives * so we can know what to launch - the process info only gives
* us an index into the app_context array, not the app_context * us an index into the app_context array, not the app_context
@ -775,7 +735,7 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
i++; i++;
value = values[j]; value = values[j];
if (0 == strcmp(value->tokens[0], ORTE_JOB_GLOBALS)) { if (NULL != value->tokens) {
/* this came from the globals container, so it must contain /* this came from the globals container, so it must contain
* the app_context(s), vpid_start, and vpid_range entries. Only one * the app_context(s), vpid_start, and vpid_range entries. Only one
* value object should ever come from that container * value object should ever come from that container
@ -818,6 +778,15 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
opal_list_append(&app_context_list, &app_item->super); opal_list_append(&app_context_list, &app_item->super);
kval->value->data = NULL; /* protect the data storage from later release */ kval->value->data = NULL; /* protect the data storage from later release */
} }
if (strcmp(kval->key, ORTE_JOB_OVERSUBSCRIBE_OVERRIDE_KEY) == 0) {
/* this can only occur once, so just store it */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, kval->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
override_oversubscribed = *bptr;
continue;
}
} /* end for loop to process global data */ } /* end for loop to process global data */
} else { } else {
/* this must have come from one of the process containers, so it must /* this must have come from one of the process containers, so it must
@ -837,7 +806,7 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
/* if this is our node...must also protect against a zero-length string */ /* if this is our node...must also protect against a zero-length string */
if (NULL != node_name && 0 == strcmp(node_name, orte_system_info.nodename)) { if (NULL != node_name && 0 == strcmp(node_name, orte_system_info.nodename)) {
/* ...harvest the info into a new child structure */ /* ...harvest the info into a new child structure */
child = OBJ_NEW(odls_process_child_t); child = OBJ_NEW(orte_odls_child_t);
for (kv2 = 0; kv2 < value->cnt; kv2++) { for (kv2 = 0; kv2 < value->cnt; kv2++) {
kval = value->keyvals[kv2]; kval = value->keyvals[kv2];
if(strcmp(kval->key, ORTE_PROC_NAME_KEY) == 0) { if(strcmp(kval->key, ORTE_PROC_NAME_KEY) == 0) {
@ -856,6 +825,14 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
child->app_idx = *sptr; /* save the index into the app_context objects */ child->app_idx = *sptr; /* save the index into the app_context objects */
continue; continue;
} }
if(strcmp(kval->key, ORTE_NODE_OVERSUBSCRIBED_KEY) == 0) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, kval->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
oversubscribed = *bptr;
continue;
}
} /* kv2 */ } /* kv2 */
/* protect operation on the global list of children */ /* protect operation on the global list of children */
OPAL_THREAD_LOCK(&orte_odls_process.mutex); OPAL_THREAD_LOCK(&orte_odls_process.mutex);
@ -870,14 +847,99 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
} /* for j */ } /* for j */
} }
/* determine if we are oversubscribed */ /* Now we preload any files that are needed. This is done on a per
want_processor = true; /* default to taking it for ourselves */ * app context basis */
opal_paffinity_base_get_num_processors(&rc); for (item = opal_list_get_first(&app_context_list);
num_processors = (size_t)rc; item != opal_list_get_end(&app_context_list);
if (opal_list_get_size(&orte_odls_process.children) > num_processors) { /* oversubscribed */ item = opal_list_get_next(item)) {
want_processor = false; app_item = (odls_process_app_context_t*)item;
if(app_item->app_context->preload_binary || NULL != app_item->app_context->preload_files) {
filem_request = OBJ_NEW(orte_filem_base_request_t);
filem_request->num_procs = 1;
filem_request->proc_name = (orte_process_name_t*)malloc(sizeof(orte_process_name_t) * filem_request->num_procs);
filem_request->proc_name[0].cellid = orte_process_info.gpr_replica->cellid;
filem_request->proc_name[0].jobid = orte_process_info.gpr_replica->jobid;
filem_request->proc_name[0].vpid = orte_process_info.gpr_replica->vpid;
if(app_item->app_context->preload_binary) {
if( ORTE_SUCCESS != (rc = orte_pls_fork_preload_append_binary(app_item->app_context,
filem_request) ) ){
opal_show_help("help-orte-odls-default.txt",
"orte-odls-default:could-not-preload-binary",
true, app_item->app_context->app);
ORTE_ERROR_LOG(rc);
/* Keep accumulating files anyway */
}
}
if( NULL != app_item->app_context->preload_files) {
if( ORTE_SUCCESS != (rc = orte_pls_fork_preload_append_files(app_item->app_context,
filem_request) ) ){
opal_show_help("help-orte-odls-default.txt",
"orte-odls-default:could-not-preload-files",
true, app_item->app_context->preload_files);
ORTE_ERROR_LOG(rc);
/* Keep accumulating files anyway */
}
}
/* Actually bring over the files */
if( ORTE_SUCCESS != (rc = orte_filem.get(filem_request)) ) {
opal_show_help("help-orte-odls-default.txt",
"orte-odls-default:could-not-preload",
true, opal_argv_join(filem_request->local_targets, ' '));
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(filem_request);
}
} }
/* setup for processor affinity. If there are enough physical processors on this node, then
* we indicate which processor each process should be assigned to, IFF the user has requested
* processor affinity be used - the paffinity subsystem will make that final determination. All
* we do here is indicate that we should do the definitions just in case paffinity is active
*/
if (OPAL_SUCCESS != opal_get_num_processors(&num_processors)) {
/* if we cannot find the number of local processors, then default to conservative
* settings
*/
want_processor = false; /* default to not being a hog */
opal_output(orte_odls_globals.output,
"odls: could not get number of processors - using conservative settings");
} else {
opal_output(orte_odls_globals.output,
"odls: got %ld processors", (long)num_processors);
/* only do this if we can actually get info on the number of processors */
if (opal_list_get_size(&orte_odls_process.children) > (size_t)num_processors) {
want_processor = false;
} else {
want_processor = true;
}
/* now let's deal with the oversubscribed flag - and the use-case where a hostfile or some
* other non-guaranteed-accurate method was used to inform us about our allocation. Since
* the information on the number of slots on this node could have been incorrect, we need
* to check it against the local number of processors to ensure we don't overload them
*/
if (override_oversubscribed) {
opal_output(orte_odls_globals.output, "odls: overriding oversubscription");
if (opal_list_get_size(&orte_odls_process.children) > (size_t)num_processors) {
/* if the #procs > #processors, declare us oversubscribed regardless
* of what the mapper claimed - the user may have told us something
* incorrect
*/
oversubscribed = true;
} else {
/* likewise, if there are more processors here than we were told,
* declare us to not be oversubscribed so we can be aggressive. This
* covers the case where the user didn't tell us anything about the
* number of available slots, so we defaulted to a value of 1
*/
oversubscribed = false;
}
}
}
opal_output(orte_odls_globals.output, "odls: oversubscribed set to %s want_processor set to %s",
oversubscribed ? "true" : "false", want_processor ? "true" : "false");
/* okay, now let's launch our local procs using a fork/exec */ /* okay, now let's launch our local procs using a fork/exec */
i = 0; i = 0;
/* protect operations involving the global list of children */ /* protect operations involving the global list of children */
@ -886,7 +948,7 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
for (item = opal_list_get_first(&orte_odls_process.children); for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children); item != opal_list_get_end(&orte_odls_process.children);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
child = (odls_process_child_t*)item; child = (orte_odls_child_t*)item;
/* is this child already alive? This can happen if /* is this child already alive? This can happen if
* we are asked to launch additional processes. * we are asked to launch additional processes.
@ -904,6 +966,9 @@ static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data, ch
continue; continue;
} }
opal_output(orte_odls_globals.output, "odls: preparing to launch child [%ld, %ld, %ld]",
ORTE_NAME_ARGS(child->name));
/* find the indicated app_context in the list */ /* find the indicated app_context in the list */
for (item2 = opal_list_get_first(&app_context_list); for (item2 = opal_list_get_first(&app_context_list);
item2 != opal_list_get_end(&app_context_list); item2 != opal_list_get_end(&app_context_list);
@ -940,6 +1005,24 @@ DOFORK:
i++; i++;
} }
/* report the proc info and state in the registry */
if (ORTE_SUCCESS != (rc = orte_odls_base_report_spawn(&orte_odls_process.children))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* setup the waitpids on the children */
for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (ORTE_PROC_STATE_LAUNCHED == child->state) {
orte_wait_cb(child->pid, odls_process_wait_local_proc, NULL);
child->state = ORTE_PROC_STATE_RUNNING;
}
}
/* cleanup */ /* cleanup */
while (NULL != (item = opal_list_remove_first(&app_context_list))) { while (NULL != (item = opal_list_remove_first(&app_context_list))) {
OBJ_RELEASE(item); OBJ_RELEASE(item);
@ -960,7 +1043,7 @@ static int orte_odls_process_signal_local_proc(const orte_process_name_t *proc,
{ {
int rc; int rc;
opal_list_item_t *item; opal_list_item_t *item;
odls_process_child_t *child; orte_odls_child_t *child;
/* protect operations involving the global list of children */ /* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_process.mutex); OPAL_THREAD_LOCK(&orte_odls_process.mutex);
@ -973,7 +1056,7 @@ static int orte_odls_process_signal_local_proc(const orte_process_name_t *proc,
for (item = opal_list_get_first(&orte_odls_process.children); for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children); item != opal_list_get_end(&orte_odls_process.children);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
child = (odls_process_child_t*)item; child = (orte_odls_child_t*)item;
if (ORTE_SUCCESS != (rc = send_signal(child->pid, (int)signal))) { if (ORTE_SUCCESS != (rc = send_signal(child->pid, (int)signal))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
@ -987,7 +1070,7 @@ static int orte_odls_process_signal_local_proc(const orte_process_name_t *proc,
for (item = opal_list_get_first(&orte_odls_process.children); for (item = opal_list_get_first(&orte_odls_process.children);
item != opal_list_get_end(&orte_odls_process.children); item != opal_list_get_end(&orte_odls_process.children);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
child = (odls_process_child_t*)item; child = (orte_odls_child_t*)item;
if (ORTE_EQUAL == orte_dss.compare(&(child->name), (void*)proc, ORTE_NAME)) { if (ORTE_EQUAL == orte_dss.compare(&(child->name), (void*)proc, ORTE_NAME)) {
/* unlock before signaling as this may generate a callback */ /* unlock before signaling as this may generate a callback */
opal_condition_signal(&orte_odls_process.cond); opal_condition_signal(&orte_odls_process.cond);
@ -1009,9 +1092,149 @@ static int orte_odls_process_signal_local_proc(const orte_process_name_t *proc,
} }
orte_odls_base_module_1_3_0_t orte_odls_process_module = { orte_odls_base_module_1_3_0_t orte_odls_process_module = {
orte_odls_process_subscribe_launch_data,
orte_odls_process_get_add_procs_data, orte_odls_process_get_add_procs_data,
orte_odls_process_launch_local_procs, orte_odls_process_launch_local_procs,
orte_odls_process_kill_local_procs, orte_odls_process_kill_local_procs,
orte_odls_process_signal_local_proc orte_odls_process_signal_local_proc
}; };
/*
* The difference between preloading a file, and a binary file is that
* we may need to update the app_context to reflect the placement of the binary file
* on the local machine.
*/
static int orte_pls_fork_preload_append_binary(orte_app_context_t* context,
orte_filem_base_request_t *filem_request) {
char * local_bin = NULL;
int tmp_argc = 0;
/*
* Append the local placement
*/
asprintf(&local_bin, "%s/%s", orte_process_info.job_session_dir, opal_basename(context->app));
if(is_preload_local_dup(local_bin, filem_request) ) {
goto cleanup;
}
opal_argv_append(&filem_request->num_targets, &(filem_request->local_targets), local_bin);
/*
* Append the remote file
*/
tmp_argc = 0;
opal_argv_append(&tmp_argc, &filem_request->remote_targets, context->app);
/*
* Append the flag
*/
filem_request->target_flags = (int *)realloc(filem_request->target_flags,
sizeof(int) * (filem_request->num_targets + 1));
filem_request->target_flags[filem_request->num_targets-1] = ORTE_FILEM_TYPE_FILE;
cleanup:
/*
* Adjust the process name
*/
if(NULL != context->app)
free(context->app);
context->app = local_bin;
return ORTE_SUCCESS;
}
static int orte_pls_fork_preload_append_files(orte_app_context_t* context,
orte_filem_base_request_t *filem_request) {
char * local_ref = NULL;
int i, tmp_argc = 0, remote_argc = 0;
char **remote_targets = NULL;
char * temp = NULL;
remote_targets = opal_argv_split(context->preload_files, ',');
remote_argc = opal_argv_count(remote_targets);
for(i = 0; i < remote_argc; ++i) {
if(NULL != context->preload_files_dest_dir) {
if(context->preload_files_dest_dir[0] == '.') {
asprintf(&local_ref, "%s/%s/%s", context->cwd, context->preload_files_dest_dir, opal_basename(remote_targets[i]) );
}
else {
asprintf(&local_ref, "%s/%s", context->preload_files_dest_dir, opal_basename(remote_targets[i]) );
}
}
else {
/*
* If the preload_files_dest_dir is not specified
* If this is an absolute path, copy it to that path. Otherwise copy it to the cwd.
*/
if('/' == remote_targets[i][0]) {
asprintf(&local_ref, "%s", remote_targets[i]);
} else {
asprintf(&local_ref, "%s/%s", context->cwd, opal_basename(remote_targets[i]) );
}
}
asprintf(&temp, "test -e %s", local_ref);
if(0 == system(temp)) {
char hostname[MAXHOSTNAMELEN];
gethostname(hostname, sizeof(hostname));
opal_show_help("help-orte-pls-fork.txt",
"orte-pls-fork:preload-file-exists",
true, local_ref, hostname);
free(temp);
temp = NULL;
free(local_ref);
local_ref = NULL;
continue;
}
free(temp);
temp = NULL;
/*
* Is this a duplicate
*/
if(is_preload_local_dup(local_ref, filem_request) ) {
free(local_ref);
local_ref = NULL;
continue;
}
/*
* Append the local files we want
*/
opal_argv_append(&filem_request->num_targets, &filem_request->local_targets, local_ref);
/*
* Append the remote files we want
*/
tmp_argc = filem_request->num_targets - 1;
opal_argv_append(&tmp_argc, &filem_request->remote_targets, remote_targets[i]);
/*
* Set the flags
*/
filem_request->target_flags = (int *)realloc(filem_request->target_flags, sizeof(int) * 1);
filem_request->target_flags[filem_request->num_targets-1] = ORTE_FILEM_TYPE_UNKNOWN;
free(local_ref);
local_ref = NULL;
}
if(NULL != local_ref)
free(local_ref);
if(NULL != remote_targets)
opal_argv_free(remote_targets);
return ORTE_SUCCESS;
}
/*
* Keeps us from transfering the same file more than once.
*/
static bool is_preload_local_dup(char *local_ref, orte_filem_base_request_t *filem_request) {
int i;
for(i = 0; i < filem_request->num_targets; ++i) {
if(0 == strncmp(local_ref, filem_request->local_targets[i], strlen(local_ref)+1) ) {
return true;
}
}
return false;
}