
1. if the user has specified sched_yield, we simply do what we are told 2. if they didn't specify anything, try to get the number of processors on this node. Note that we already now get the number of local procs in our job that are sharing this node - that now comes in through the proc callback and is stored in the ompi_proc_t structures. 3. if we can get the number of processors, compare that to the number of local procs from my job that are sharing my node. If the number of local procs exceeds the number of processors, then set sched_yield to true. If not, then be a hog and set sched_yield to false 4. if we can't get the number of processors, default to conservative behavior and set sched_yield to true. Note that I have not yet dealt with the need to dynamically adjust this setting as more processes are added via comm_spawn. So far, we are *only* looking within our own job. Given that we have now moved this logic to mpi_init (and away from the orteds), it isn't yet clear to me how a process will be informed about the number of procs in *other* jobs that are also sharing this node. Something to continue to ponder. This commit was SVN r13430.
439 строки
11 KiB
C
439 строки
11 KiB
C
/* -*- C -*-
|
|
*
|
|
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2006 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*
|
|
*/
|
|
|
|
/* @file:
|
|
* xcpu Lancher to launch jobs on compute nodes..
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
#if HAVE_SYS_TYPES_H
|
|
#include <sys/types.h>
|
|
#endif /* HAVE_SYS_TYPES_H */
|
|
#ifdef HAVE_SYS_STAT_H
|
|
#include <sys/stat.h>
|
|
#endif /* HAVE_SYS_STAT_H */
|
|
#ifdef HAVE_UNISTD_H
|
|
#include <unistd.h>
|
|
#endif /* HAVE_UNISTD_H */
|
|
#include <errno.h>
|
|
#include <signal.h>
|
|
#ifdef HAVE_FCNTL_H
|
|
#include <fcntl.h>
|
|
#endif /* HAVE_FCNTL_H */
|
|
#ifdef HAVE_STRING_H
|
|
#include <string.h>
|
|
#endif /* HAVE_STRING_H */
|
|
#ifdef HAVE_SYS_TIME_H
|
|
#include <sys/time.h>
|
|
#endif
|
|
|
|
#include "opal/event/event.h"
|
|
#include "opal/mca/base/mca_base_param.h"
|
|
#include "opal/util/argv.h"
|
|
#include "opal/util/output.h"
|
|
#include "opal/util/opal_environ.h"
|
|
#include "opal/util/path.h"
|
|
#include "opal/util/show_help.h"
|
|
|
|
#include "orte/dss/dss.h"
|
|
#include "orte/util/sys_info.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/gpr/base/base.h"
|
|
#include "orte/mca/iof/iof.h"
|
|
#include "orte/mca/ns/base/base.h"
|
|
#include "orte/mca/sds/base/base.h"
|
|
#include "orte/mca/oob/base/base.h"
|
|
#include "orte/mca/ras/base/base.h"
|
|
#include "orte/mca/rmgr/rmgr.h"
|
|
#include "orte/mca/rmaps/rmaps.h"
|
|
#include "orte/mca/rml/rml.h"
|
|
#include "orte/mca/smr/base/base.h"
|
|
#include "orte/runtime/orte_wait.h"
|
|
#include "orte/runtime/runtime.h"
|
|
|
|
#include "pls_xcpu.h"
|
|
#include "spfs.h"
|
|
#include "spclient.h"
|
|
#include "strutil.h"
|
|
#include "libxcpu.h"
|
|
|
|
extern char **environ;
|
|
|
|
/** external variable defined in libspclient */
|
|
extern int spc_chatty;
|
|
|
|
/**
|
|
* Initialization of the xcpu module with all the needed function pointers
|
|
*/
|
|
orte_pls_base_module_t orte_pls_xcpu_module = {
|
|
orte_pls_xcpu_launch_job,
|
|
orte_pls_xcpu_terminate_job,
|
|
orte_pls_xcpu_terminate_orteds,
|
|
orte_pls_xcpu_terminate_proc,
|
|
orte_pls_xcpu_signal_job,
|
|
orte_pls_xcpu_signal_proc,
|
|
orte_pls_xcpu_cancel_operation,
|
|
orte_pls_xcpu_finalize
|
|
};
|
|
|
|
/* array of *Xpcommand and Xpnodeset, each xcmd/nodeset correspond to one OMPI app_context */
|
|
Xpcommand **xcmd_sets;
|
|
Xpnodeset **node_sets;
|
|
int num_xcmds;
|
|
|
|
void
|
|
pls_xcpu_stdout_cb(Xpsession *s, u8 *buf, u32 buflen)
|
|
{
|
|
fprintf(stdout, "%.*s", buflen, buf);
|
|
}
|
|
|
|
void
|
|
pls_xcpu_stderr_cb(Xpsession *s, u8 *buf, u32 buflen)
|
|
{
|
|
fprintf(stderr, "%.*s", buflen, buf);
|
|
}
|
|
|
|
void
|
|
pls_xcpu_wait_cb(Xpsession *s, u8 *buf, u32 buflen)
|
|
{
|
|
Xpnode *nd;
|
|
|
|
nd = xp_session_get_node(s);
|
|
|
|
/* FixMe: find out the process associated with this session */
|
|
orte_smr.set_proc_state(nd->data, ORTE_PROC_STATE_TERMINATED, 0);
|
|
}
|
|
|
|
static char *
|
|
process_list(char **list, char sep)
|
|
{
|
|
int i, n, len;
|
|
char *s, *ret;
|
|
char **items;
|
|
|
|
/* find list length */
|
|
for(n = 0; list[n] != NULL; n++)
|
|
;
|
|
|
|
items = calloc(n, sizeof(char *));
|
|
if (!items)
|
|
return NULL;
|
|
|
|
/* quote the items if necessary */
|
|
for(len = 0, i = 0; i < n; i++) {
|
|
items[i] = quotestrdup(list[i]);
|
|
len += strlen(items[i]) + 1;
|
|
}
|
|
|
|
ret = malloc(len+1);
|
|
if (!ret)
|
|
return NULL;
|
|
|
|
for(s = ret, i = 0; i < n; i++) {
|
|
len = strlen(items[i]);
|
|
memcpy(s, items[i], len);
|
|
s += len;
|
|
*(s++) = sep;
|
|
free(items[i]);
|
|
}
|
|
|
|
*s = '\0';
|
|
free(items);
|
|
return ret;
|
|
}
|
|
|
|
static char *
|
|
process_env(char **env)
|
|
{
|
|
return process_list(env, '\n');
|
|
}
|
|
|
|
static char *
|
|
process_argv(char **argv)
|
|
{
|
|
return process_list(argv, ' ');
|
|
}
|
|
|
|
static void
|
|
pls_xcpu_setup_env(char ***e)
|
|
{
|
|
|
|
int n, rc;
|
|
char *var, *param, *uri;
|
|
char **env;
|
|
|
|
/* FixME: pointer arthematic */
|
|
n = opal_argv_count(*e);
|
|
rc = mca_base_param_build_env(*e, &n, false);
|
|
if (rc != ORTE_SUCCESS) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
if (NULL != orte_process_info.ns_replica_uri) {
|
|
uri = strdup(orte_process_info.ns_replica_uri);
|
|
} else {
|
|
uri = orte_rml.get_uri();
|
|
}
|
|
param = mca_base_param_environ_variable("ns", "replica", "uri");
|
|
opal_setenv(param, uri, true, e);
|
|
free(param);
|
|
free(uri);
|
|
|
|
if (NULL != orte_process_info.gpr_replica_uri) {
|
|
uri = strdup(orte_process_info.gpr_replica_uri);
|
|
} else {
|
|
uri = orte_rml.get_uri();
|
|
}
|
|
param = mca_base_param_environ_variable("gpr", "replica", "uri");
|
|
opal_setenv(param, uri, true, e);
|
|
free(param);
|
|
free(uri);
|
|
|
|
#if 0
|
|
/* FixMe: Is this the frontend or backend nodename ? we don't have the starting
|
|
* daemon. */
|
|
var = mca_base_param_environ_variable("orte", "base", "nodename");
|
|
opal_setenv(var, orte_system_info.nodename, true, e);
|
|
free(var);
|
|
#endif
|
|
|
|
var = mca_base_param_environ_variable("universe", NULL, NULL);
|
|
asprintf(¶m, "%s@%s:%s", orte_universe_info.uid,
|
|
orte_universe_info.host, orte_universe_info.name);
|
|
opal_setenv(var, param, true, e);
|
|
|
|
free(param);
|
|
free(var);
|
|
|
|
/* merge in environment */
|
|
env = opal_environ_merge(*e, environ);
|
|
opal_argv_free(*e);
|
|
*e = env;
|
|
|
|
}
|
|
|
|
/* This is the main function that will launch jobs on remote compute modes
|
|
* @param jobid the jobid of the job to launch
|
|
* @retval ORTE_SUCCESS or error
|
|
*/
|
|
int
|
|
orte_pls_xcpu_launch_job(orte_jobid_t jobid)
|
|
{
|
|
int i, n, rc;
|
|
int num_processes = 0;
|
|
orte_cellid_t cellid;
|
|
opal_list_item_t *node_item, *proc_item;
|
|
orte_job_map_t *map;
|
|
orte_vpid_t vpid_start, vpid_range;
|
|
char **env;
|
|
|
|
if (mca_pls_xcpu_component.chatty)
|
|
spc_chatty = 1;
|
|
|
|
/* get the job map */
|
|
rc = orte_rmaps.get_job_map(&map, jobid);
|
|
if (rc != ORTE_SUCCESS) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
/* next, get the vpid_start and range */
|
|
rc = orte_rmgr.get_vpid_range(jobid, &vpid_start, &vpid_range);
|
|
if (rc != ORTE_SUCCESS) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
/* get the cellid */
|
|
cellid = orte_process_info.my_name->cellid;
|
|
|
|
/* create num_apps of pointers to Xpnodeset and Xpcommand */
|
|
node_sets = (Xpnodeset **) malloc(map->num_apps * sizeof(Xpnodeset *));
|
|
xcmd_sets = (Xpcommand **) malloc(map->num_apps * sizeof(Xpcommand *));
|
|
|
|
num_xcmds = map->num_apps;
|
|
|
|
/* create Xpnodeset for each app_context */
|
|
for (i = 0; i < map->num_apps; i++) {
|
|
node_sets[i] = xp_nodeset_create();
|
|
}
|
|
|
|
/* create Xpnode for each mapped proc, add them to corresponding Xpnodeset
|
|
* according to their app context */
|
|
for (node_item = opal_list_get_first(&map->nodes);
|
|
node_item != opal_list_get_end(&map->nodes);
|
|
node_item = opal_list_get_next(node_item)) {
|
|
orte_mapped_node_t *node = (orte_mapped_node_t *) node_item;
|
|
for (proc_item = opal_list_get_first(&node->procs);
|
|
proc_item != opal_list_get_end(&node->procs);
|
|
proc_item = opal_list_get_next(proc_item)) {
|
|
orte_mapped_proc_t *proc = (orte_mapped_proc_t *) proc_item;
|
|
Xpnode *xpnode = xp_node_create(node->nodename, node->nodename,
|
|
NULL, NULL);
|
|
xpnode->data = &proc->name;
|
|
xp_nodeset_add(node_sets[proc->app_idx], xpnode);
|
|
}
|
|
}
|
|
|
|
/* setup envrionment variables for each app context */
|
|
for (i = 0; i < map->num_apps; i++) {
|
|
/* FixME: how many layers of *? */
|
|
pls_xcpu_setup_env(&map->apps[i]->env);
|
|
num_processes += map->apps[i]->num_procs;
|
|
}
|
|
|
|
for (i = 0; i < map->num_apps; i++) {
|
|
rc = orte_ns_nds_xcpu_put(cellid, jobid, vpid_start,
|
|
num_processes, &map->apps[i]->env);
|
|
if (rc != ORTE_SUCCESS) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
/* create Xpcommand for each app_context from Xpnodeset */
|
|
for (i = 0; i < map->num_apps; i++) {
|
|
xcmd_sets[i] = xp_command_create(node_sets[i]);
|
|
|
|
/* setup argc, argv and evn in xcpu command */
|
|
xcmd_sets[i]->cwd = strdup(map->apps[i]->cwd);
|
|
xcmd_sets[i]->env = process_env(map->apps[i]->env);
|
|
xcmd_sets[i]->argv = process_argv(map->apps[i]->argv);
|
|
xcmd_sets[i]->exec = strdup(map->apps[i]->argv[0]);
|
|
xcmd_sets[i]->copypath = strdup(map->apps[i]->app);
|
|
asprintf(&xcmd_sets[i]->jobid, "%d", jobid);
|
|
|
|
/* setup io forwarding */
|
|
xcmd_sets[i]->stdout_cb = pls_xcpu_stdout_cb;
|
|
xcmd_sets[i]->stderr_cb = pls_xcpu_stderr_cb;
|
|
xcmd_sets[i]->wait_cb = pls_xcpu_wait_cb;
|
|
|
|
/* call xp_command_exec(xcmd) */
|
|
if (xp_command_exec(xcmd_sets[i]) < 0)
|
|
goto error;
|
|
}
|
|
|
|
/* entering event loop and waiting for termination of processes
|
|
* by calling xp_commands_wait.
|
|
* FixME: we are blocked here so both success and faulure cases
|
|
* fall back to the error handler and all resources are freed.
|
|
* this should be changed when we have non-blocking command_wait() */
|
|
if (xp_commands_wait(map->num_apps, xcmd_sets) < 0) {
|
|
rc = ORTE_ERROR;
|
|
} else {
|
|
rc = ORTE_SUCCESS;
|
|
}
|
|
|
|
error:
|
|
/* error handling and clean up, kill all the processes */
|
|
for (i = 0; i < map->num_apps; i++) {
|
|
if (xcmd_sets[i] != NULL) {
|
|
xp_command_wipe(xcmd_sets[i]);
|
|
xp_command_destroy(xcmd_sets[i]);
|
|
xcmd_sets[i] = NULL;
|
|
}
|
|
}
|
|
|
|
/* set ORTE error code?? */
|
|
return rc;
|
|
}
|
|
|
|
int orte_pls_xcpu_terminate_job(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs)
|
|
{
|
|
int i, rc;
|
|
orte_job_map_t *map;
|
|
|
|
|
|
/* get the job map */
|
|
rc = orte_rmaps.get_job_map(&map, jobid);
|
|
if (rc != ORTE_SUCCESS) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
for (i = 0; i < map->num_apps; i++) {
|
|
if (xcmd_sets[i] != NULL) {
|
|
xp_command_kill(xcmd_sets[i], SIGTERM);
|
|
}
|
|
}
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
int orte_pls_xcpu_terminate_orteds(orte_jobid_t jobid, struct timeval *timeout, opal_list_t * attrs)
|
|
{
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
int orte_pls_xcpu_terminate_proc(const orte_process_name_t* proc_name)
|
|
{
|
|
fprintf(stderr, __FILE__ " terminate_proc\n");
|
|
|
|
/* libxcpu can not wipe individual process in an
|
|
* Xpcommand/Xpsessionset, only to the whole session set */
|
|
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
int orte_pls_xcpu_signal_job(orte_jobid_t jobid, int32_t sig, opal_list_t *attrs)
|
|
{
|
|
int i, rc;
|
|
orte_job_map_t *map;
|
|
|
|
fprintf(stderr, __FILE__ " signal_job, sig = %d\n", sig);
|
|
|
|
/* get the job map */
|
|
rc = orte_rmaps.get_job_map(&map, jobid);
|
|
if (rc != ORTE_SUCCESS) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
for (i = 0; i < map->num_apps; i++) {
|
|
if (xcmd_sets[i] != NULL)
|
|
xp_command_kill(xcmd_sets[i], sig);
|
|
}
|
|
|
|
return ORTE_SUCCESS;
|
|
}
|
|
int orte_pls_xcpu_signal_proc(const orte_process_name_t* proc_name, int32_t sig)
|
|
{
|
|
fprintf(stderr, __FILE__ " terminate_proc\n");
|
|
|
|
/* libxcpu can not send signal to individual process in an
|
|
* Xpcommand/Xpsessionset, only to the whole session set */
|
|
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* Cancel an operation involving comm to an orted
|
|
*/
|
|
int orte_pls_xcpu_cancel_operation(void)
|
|
{
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
int orte_pls_xcpu_finalize(void)
|
|
{
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|