From 6df4e807275b15825183a7928b7448bdb4fe86d7 Mon Sep 17 00:00:00 2001 From: Li-Ta Lo Date: Thu, 21 Dec 2006 00:05:36 +0000 Subject: [PATCH] new XCPU PLS and SDS to work with libxcpu This commit was SVN r12905. --- orte/mca/pls/xcpu/Makefile.am | 51 +++ orte/mca/pls/xcpu/configure.m4 | 37 +++ orte/mca/pls/xcpu/configure.params | 24 ++ orte/mca/pls/xcpu/pls_xcpu.c | 431 +++++++++++++++++++++++++ orte/mca/pls/xcpu/pls_xcpu.h | 98 ++++++ orte/mca/pls/xcpu/pls_xcpu_component.c | 103 ++++++ orte/mca/pls/xcpu/poll.c | 271 ++++++++++++++++ orte/mca/sds/base/base.h | 4 + orte/mca/sds/base/sds_base_put.c | 87 +++++ orte/mca/sds/xcpu/Makefile.am | 54 ++++ orte/mca/sds/xcpu/configure.m4 | 36 +++ orte/mca/sds/xcpu/configure.params | 21 ++ orte/mca/sds/xcpu/sds_xcpu.h | 48 +++ orte/mca/sds/xcpu/sds_xcpu_component.c | 97 ++++++ orte/mca/sds/xcpu/sds_xcpu_module.c | 171 ++++++++++ 15 files changed, 1533 insertions(+) create mode 100644 orte/mca/pls/xcpu/Makefile.am create mode 100644 orte/mca/pls/xcpu/configure.m4 create mode 100644 orte/mca/pls/xcpu/configure.params create mode 100644 orte/mca/pls/xcpu/pls_xcpu.c create mode 100644 orte/mca/pls/xcpu/pls_xcpu.h create mode 100644 orte/mca/pls/xcpu/pls_xcpu_component.c create mode 100644 orte/mca/pls/xcpu/poll.c create mode 100644 orte/mca/sds/xcpu/Makefile.am create mode 100644 orte/mca/sds/xcpu/configure.m4 create mode 100644 orte/mca/sds/xcpu/configure.params create mode 100644 orte/mca/sds/xcpu/sds_xcpu.h create mode 100644 orte/mca/sds/xcpu/sds_xcpu_component.c create mode 100644 orte/mca/sds/xcpu/sds_xcpu_module.c diff --git a/orte/mca/pls/xcpu/Makefile.am b/orte/mca/pls/xcpu/Makefile.am new file mode 100644 index 0000000000..03190ccb84 --- /dev/null +++ b/orte/mca/pls/xcpu/Makefile.am @@ -0,0 +1,51 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2006 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +AM_CPPFLAGS = $(pls_xcpu_CPPFLAGS) -I$(srcdir)/include + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if OMPI_BUILD_pls_xcpu_DSO +component_noinst = +component_install = mca_pls_xcpu.la +else +component_noinst = libmca_pls_xcpu.la +component_install = +endif + +sources = \ + pls_xcpu.h \ + pls_xcpu.c \ + pls_xcpu_component.c \ + poll.c + +mcacomponentdir = $(libdir)/openmpi +mcacomponent_LTLIBRARIES = $(component_install) +mca_pls_xcpu_la_SOURCES = $(sources) +mca_pls_xcpu_la_LIBADD = \ + $(pls_xcpu_LIBS) \ + $(top_ompi_builddir)/orte/libopen-rte.la \ + $(top_ompi_builddir)/opal/libopen-pal.la +mca_pls_xcpu_la_LDFLAGS = -module -avoid-version $(pls_xcpu_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_pls_xcpu_la_SOURCES = $(sources) +libmca_pls_xcpu_la_LIBADD = $(pls_xcpu_LIBS) +libmca_pls_xcpu_la_LDFLAGS = -module -avoid-version $(pls_xcpu_LDFLAGS) diff --git a/orte/mca/pls/xcpu/configure.m4 b/orte/mca/pls/xcpu/configure.m4 new file mode 100644 index 0000000000..f76822b0f5 --- /dev/null +++ b/orte/mca/pls/xcpu/configure.m4 @@ -0,0 +1,37 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2006 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_pls_xcpu_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_pls_xcpu_CONFIG],[ + OMPI_CHECK_XCPU([pls_xcpu], [pls_xcpu_good=1], [pls_xcpu_good=0]) + + # if check worked, set wrapper flags. + # Evaluate succeed / fail + AS_IF([test "$pls_xcpu_good" = "1"], + [pls_xcpu_WRAPPER_EXTRA_LDFLAGS="$pls_xcpu_LDFLAGS" + pls_xcpu_WRAPPER_EXTRA_LIBS="$pls_xcpu_LIBS" + $1], + [$2]) + + # set build flags to use in makefile + AC_SUBST([pls_xcpu_CPPFLAGS]) + AC_SUBST([pls_xcpu_LDFLAGS]) + AC_SUBST([pls_xcpu_LIBS]) +])dnl diff --git a/orte/mca/pls/xcpu/configure.params b/orte/mca/pls/xcpu/configure.params new file mode 100644 index 0000000000..b3a26176aa --- /dev/null +++ b/orte/mca/pls/xcpu/configure.params @@ -0,0 +1,24 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2006 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Specific to this module + + +PARAM_INIT_FILE=pls_xcpu.c +PARAM_CONFIG_FILES="Makefile" diff --git a/orte/mca/pls/xcpu/pls_xcpu.c b/orte/mca/pls/xcpu/pls_xcpu.c new file mode 100644 index 0000000000..1ba57b2046 --- /dev/null +++ b/orte/mca/pls/xcpu/pls_xcpu.c @@ -0,0 +1,431 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +/* @file: + * xcpu Lancher to launch jobs on compute nodes.. + */ + +#include "orte_config.h" +#if HAVE_SYS_TYPES_H +#include +#endif /* HAVE_SYS_TYPES_H */ +#ifdef HAVE_SYS_STAT_H +#include +#endif /* HAVE_SYS_STAT_H */ +#ifdef HAVE_UNISTD_H +#include +#endif /* HAVE_UNISTD_H */ +#include +#include +#ifdef HAVE_FCNTL_H +#include +#endif /* HAVE_FCNTL_H */ +#ifdef HAVE_STRING_H +#include +#endif /* HAVE_STRING_H */ + +#include "opal/event/event.h" +#include "opal/mca/base/mca_base_param.h" +#include "opal/util/argv.h" +#include "opal/util/output.h" +#include "opal/util/opal_environ.h" +#include "opal/util/path.h" +#include "opal/util/show_help.h" + +#include "orte/dss/dss.h" +#include "orte/util/sys_info.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/gpr/base/base.h" +#include "orte/mca/iof/iof.h" +#include "orte/mca/ns/base/base.h" +#include "orte/mca/sds/base/base.h" +#include "orte/mca/oob/base/base.h" +#include "orte/mca/ras/base/base.h" +#include "orte/mca/rmgr/rmgr.h" +#include "orte/mca/rmaps/rmaps.h" +#include "orte/mca/rml/rml.h" +#include "orte/mca/smr/base/base.h" +#include "orte/runtime/orte_wait.h" +#include "orte/runtime/runtime.h" + +#include "pls_xcpu.h" +#include "spfs.h" +#include "spclient.h" +#include "strutil.h" +#include "libxcpu.h" + +extern char **environ; + +/** external variable defined in libspclient */ +extern int spc_chatty; + +/** + * Initialization of the xcpu module with all the needed function pointers + */ +orte_pls_base_module_t orte_pls_xcpu_module = { + orte_pls_xcpu_launch_job, + orte_pls_xcpu_terminate_job, + orte_pls_xcpu_terminate_orteds, + orte_pls_xcpu_terminate_proc, + orte_pls_xcpu_signal_job, + orte_pls_xcpu_signal_proc, + orte_pls_xcpu_finalize +}; + +/* array of *Xpcommand and Xpnodeset, each xcmd/nodeset correspond to one OMPI app_context */ +Xpcommand **xcmd_sets; +Xpnodeset **node_sets; +int num_xcmds; + +void +pls_xcpu_stdout_cb(Xpsession *s, u8 *buf, u32 buflen) +{ + fprintf(stdout, "%.*s", buflen, buf); +} + +void +pls_xcpu_stderr_cb(Xpsession *s, u8 *buf, u32 buflen) +{ + fprintf(stderr, "%.*s", buflen, buf); +} + +void +pls_xcpu_wait_cb(Xpsession *s, u8 *buf, u32 buflen) +{ + Xpnode *nd; + + nd = xp_session_get_node(s); + + /* FixMe: find out the process associated with this session */ + orte_smr.set_proc_state(nd->data, ORTE_PROC_STATE_TERMINATED, 0); +} + +static char * +process_list(char **list, char sep) +{ + int i, n, len; + char *s, *ret; + char **items; + + /* find list length */ + for(n = 0; list[n] != NULL; n++) + ; + + items = calloc(n, sizeof(char *)); + if (!items) + return NULL; + + /* quote the items if necessary */ + for(len = 0, i = 0; i < n; i++) { + items[i] = quotestrdup(list[i]); + len += strlen(items[i]) + 1; + } + + ret = malloc(len+1); + if (!ret) + return NULL; + + for(s = ret, i = 0; i < n; i++) { + len = strlen(items[i]); + memcpy(s, items[i], len); + s += len; + *(s++) = sep; + free(items[i]); + } + + *s = '\0'; + free(items); + return ret; +} + +static char * +process_env(char **env) +{ + return process_list(env, '\n'); +} + +static char * +process_argv(char **argv) +{ + return process_list(argv, ' '); +} + +static void +pls_xcpu_setup_env(char ***e) +{ + + int n, rc; + char *var, *param, *uri; + char **env; + + /* FixME: pointer arthematic */ + n = opal_argv_count(*e); + rc = mca_base_param_build_env(*e, &n, false); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + if (NULL != orte_process_info.ns_replica_uri) { + uri = strdup(orte_process_info.ns_replica_uri); + } else { + uri = orte_rml.get_uri(); + } + param = mca_base_param_environ_variable("ns", "replica", "uri"); + opal_setenv(param, uri, true, e); + free(param); + free(uri); + + if (NULL != orte_process_info.gpr_replica_uri) { + uri = strdup(orte_process_info.gpr_replica_uri); + } else { + uri = orte_rml.get_uri(); + } + param = mca_base_param_environ_variable("gpr", "replica", "uri"); + opal_setenv(param, uri, true, e); + free(param); + free(uri); + +#if 0 + /* FixMe: Is this the frontend or backend nodename ? we don't have the starting + * daemon. */ + var = mca_base_param_environ_variable("orte", "base", "nodename"); + opal_setenv(var, orte_system_info.nodename, true, e); + free(var); +#endif + + var = mca_base_param_environ_variable("universe", NULL, NULL); + asprintf(¶m, "%s@%s:%s", orte_universe_info.uid, + orte_universe_info.host, orte_universe_info.name); + opal_setenv(var, param, true, e); + + free(param); + free(var); +#if 0 + /* FixMe: do this only when we oversubscribe */ + var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle"); + opal_setenv(var, "1", true, e); + free(var); +#endif + /* merge in environment */ + env = opal_environ_merge(*e, environ); + opal_argv_free(*e); + *e = env; + +} + +/* This is the main function that will launch jobs on remote compute modes + * @param jobid the jobid of the job to launch + * @retval ORTE_SUCCESS or error + */ +int +orte_pls_xcpu_launch_job(orte_jobid_t jobid) +{ + int i, n, rc; + int num_processes = 0; + orte_cellid_t cellid; + opal_list_item_t *node_item, *proc_item; + orte_job_map_t *map; + orte_vpid_t vpid_start, vpid_range; + char **env; + + if (mca_pls_xcpu_component.chatty) + spc_chatty = 1; + + /* get the job map */ + rc = orte_rmaps.get_job_map(&map, jobid); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* next, get the vpid_start and range */ + rc = orte_rmgr.get_vpid_range(jobid, &vpid_start, &vpid_range); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* get the cellid */ + cellid = orte_process_info.my_name->cellid; + + /* create num_apps of pointers to Xpnodeset and Xpcommand */ + node_sets = (Xpnodeset **) malloc(map->num_apps * sizeof(Xpnodeset *)); + xcmd_sets = (Xpcommand **) malloc(map->num_apps * sizeof(Xpcommand *)); + + num_xcmds = map->num_apps; + + /* create Xpnodeset for each app_context */ + for (i = 0; i < map->num_apps; i++) { + node_sets[i] = xp_nodeset_create(); + } + + /* create Xpnode for each mapped proc, add them to corresponding Xpnodeset + * according to their app context */ + for (node_item = opal_list_get_first(&map->nodes); + node_item != opal_list_get_end(&map->nodes); + node_item = opal_list_get_next(node_item)) { + orte_mapped_node_t *node = (orte_mapped_node_t *) node_item; + for (proc_item = opal_list_get_first(&node->procs); + proc_item != opal_list_get_end(&node->procs); + proc_item = opal_list_get_next(proc_item)) { + orte_mapped_proc_t *proc = (orte_mapped_proc_t *) proc_item; + Xpnode *xpnode = xp_node_create(node->nodename, node->nodename, + NULL, NULL); + xpnode->data = &proc->name; + xp_nodeset_add(node_sets[proc->app_idx], xpnode); + } + } + + /* setup envrionment variables for each app context */ + for (i = 0; i < map->num_apps; i++) { + /* FixME: how many layers of *? */ + pls_xcpu_setup_env(&map->apps[i]->env); + num_processes += map->apps[i]->num_procs; + } + + for (i = 0; i < map->num_apps; i++) { + rc = orte_ns_nds_xcpu_put(cellid, jobid, vpid_start, + num_processes, &map->apps[i]->env); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + } + + /* create Xpcommand for each app_context from Xpnodeset */ + for (i = 0; i < map->num_apps; i++) { + xcmd_sets[i] = xp_command_create(node_sets[i]); + + /* setup argc, argv and evn in xcpu command */ + xcmd_sets[i]->cwd = strdup(map->apps[i]->cwd); + xcmd_sets[i]->env = process_env(map->apps[i]->env); + xcmd_sets[i]->argv = process_argv(map->apps[i]->argv); + xcmd_sets[i]->exec = strdup(map->apps[i]->argv[0]); + xcmd_sets[i]->copypath = strdup(map->apps[i]->app); + asprintf(&xcmd_sets[i]->jobid, "%d", jobid); + + /* setup io forwarding */ + xcmd_sets[i]->stdout_cb = pls_xcpu_stdout_cb; + xcmd_sets[i]->stderr_cb = pls_xcpu_stderr_cb; + xcmd_sets[i]->wait_cb = pls_xcpu_wait_cb; + + /* call xp_command_exec(xcmd) */ + if (xp_command_exec(xcmd_sets[i]) < 0) + goto error; + } + + /* entering event loop and waiting for termination of processes + * by calling xp_commands_wait. + * FixME: we are blocked here so both success and faulure cases + * fall back to the error handler and all resources are freed. + * this should be changed when we have non-blocking command_wait() */ + if (xp_commands_wait(map->num_apps, xcmd_sets) < 0) { + rc = ORTE_ERROR; + } else { + rc = ORTE_SUCCESS; + } + +error: + /* error handling and clean up, kill all the processes */ + for (i = 0; i < map->num_apps; i++) { + if (xcmd_sets[i] != NULL) { + xp_command_wipe(xcmd_sets[i]); + xp_command_destroy(xcmd_sets[i]); + xcmd_sets[i] = NULL; + } + } + + /* set ORTE error code?? */ + return rc; +} + +int orte_pls_xcpu_terminate_job(orte_jobid_t jobid, opal_list_t *attrs) +{ + int i, rc; + orte_job_map_t *map; + + + /* get the job map */ + rc = orte_rmaps.get_job_map(&map, jobid); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + for (i = 0; i < map->num_apps; i++) { + if (xcmd_sets[i] != NULL) { + xp_command_kill(xcmd_sets[i], SIGTERM); + } + } + return ORTE_SUCCESS; +} + +int orte_pls_xcpu_terminate_orteds(orte_jobid_t jobid, opal_list_t * attrs) +{ + return ORTE_SUCCESS; +} + +int orte_pls_xcpu_terminate_proc(const orte_process_name_t* proc_name) +{ + fprintf(stderr, __FILE__ " terminate_proc\n"); + + /* libxcpu can not wipe individual process in an + * Xpcommand/Xpsessionset, only to the whole session set */ + + return ORTE_SUCCESS; +} + +int orte_pls_xcpu_signal_job(orte_jobid_t jobid, int32_t sig, opal_list_t *attrs) +{ + int i, rc; + orte_job_map_t *map; + + fprintf(stderr, __FILE__ " signal_job, sig = %d\n", sig); + + /* get the job map */ + rc = orte_rmaps.get_job_map(&map, jobid); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + + for (i = 0; i < map->num_apps; i++) { + if (xcmd_sets[i] != NULL) + xp_command_kill(xcmd_sets[i], sig); + } + + return ORTE_SUCCESS; +} +int orte_pls_xcpu_signal_proc(const orte_process_name_t* proc_name, int32_t sig) +{ + fprintf(stderr, __FILE__ " terminate_proc\n"); + + /* libxcpu can not send signal to individual process in an + * Xpcommand/Xpsessionset, only to the whole session set */ + + return ORTE_SUCCESS; +} + +int orte_pls_xcpu_finalize(void) +{ + return ORTE_SUCCESS; +} + diff --git a/orte/mca/pls/xcpu/pls_xcpu.h b/orte/mca/pls/xcpu/pls_xcpu.h new file mode 100644 index 0000000000..e61968d0b7 --- /dev/null +++ b/orte/mca/pls/xcpu/pls_xcpu.h @@ -0,0 +1,98 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * + */ +/** + * @file: + * Header file for the xcpu launcher. This will use xcpu to launch jobs on + * the list of nodes that it will get from RAS (resource allocation + * system + * -# pls_xcpu is called by orterun. It first setsup environment for the + * process to be launched on remote node, then reads the ompi registry and + * then launch the binary on the nodes specified in the registry. + */ + +#ifndef orte_pls_xcpu_H_ +#define orte_pls_xcpu_H_ + +#include "orte_config.h" +#include "orte/class/orte_pointer_array.h" +#include "orte/orte_constants.h" +#include "orte/mca/pls/base/base.h" +#include "orte/util/proc_info.h" +#include "opal/threads/condition.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + +/* + * Module open / close -- defined in component file + */ +int orte_pls_xcpu_component_open(void); +int orte_pls_xcpu_component_close(void); + +/* + * Startup / Shutdown + */ +orte_pls_base_module_t* orte_pls_xcpu_init(int *priority); /* in component file */ + +/* + * Interface + */ +int orte_pls_xcpu_launch_job(orte_jobid_t); +int orte_pls_xcpu_terminate_job(orte_jobid_t, opal_list_t *); + int orte_pls_xcpu_terminate_orteds(orte_jobid_t jobid, opal_list_t * attrs); +int orte_pls_xcpu_terminate_proc(const orte_process_name_t* proc_name); +int orte_pls_xcpu_signal_job(orte_jobid_t jobid, int32_t sig, opal_list_t*); +int orte_pls_xcpu_signal_proc(const orte_process_name_t* proc_name, int32_t sig); +int orte_pls_xcpu_finalize(void); + +void orte_pls_xcpu_close_sessions(void); + +/** + * (P)rocess (L)aunch (S)ubsystem xcpu Component + */ +struct orte_pls_xcpu_component_t { + orte_pls_base_component_t super; + + int debug; + /* If greater than 0 print debugging information */ + int priority; + /* The priority of this component. This will be returned if + * we determine that xcpu is available and running on this node, + */ + int terminate_sig; + /* The signal that gets sent to a process to kill it. */ + opal_mutex_t lock; + /* Lock used to prevent some race conditions */ + opal_condition_t condition; + /* Condition that is signaled when all the daemons have died */ + int chatty; +}; +typedef struct orte_pls_xcpu_component_t orte_pls_xcpu_component_t; + +ORTE_DECLSPEC extern orte_pls_xcpu_component_t mca_pls_xcpu_component; +ORTE_DECLSPEC extern orte_pls_base_module_t orte_pls_xcpu_module; + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif /* orte_pls_xcpu_H_ */ + diff --git a/orte/mca/pls/xcpu/pls_xcpu_component.c b/orte/mca/pls/xcpu/pls_xcpu_component.c new file mode 100644 index 0000000000..490fbf03f3 --- /dev/null +++ b/orte/mca/pls/xcpu/pls_xcpu_component.c @@ -0,0 +1,103 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ +/** + * @file: + * Takes care of the component stuff for the MCA. + */ +#include "orte_config.h" +#include "orte/mca/errmgr/errmgr.h" +#include "opal/mca/mca.h" +#include "opal/mca/base/mca_base_param.h" +#include "pls_xcpu.h" +#include "spfs.h" + +/** + * The xcpu component data structure that stores all the relevent data about + * this component. + */ +orte_pls_xcpu_component_t mca_pls_xcpu_component = { + { /* version, data and init members of only first + * structure (called super) being initialized + */ + { + ORTE_PLS_BASE_VERSION_1_3_0, + "xcpu", /* MCA component name */ + ORTE_MAJOR_VERSION, /* MCA component major version */ + ORTE_MINOR_VERSION, /* MCA component minor version */ + ORTE_RELEASE_VERSION, /* MCA component release version */ + orte_pls_xcpu_component_open, /* component open */ + orte_pls_xcpu_component_close /* component close */ + }, + { + false /* checkpoint / restart */ + }, + orte_pls_xcpu_init /* component init */ + } +}; + +/** + * Opens the pls_xcpu component, setting all the needed mca parameters and + * finishes setting up the component struct. + */ +int orte_pls_xcpu_component_open(void) +{ + int rc = ORTE_SUCCESS; + + /* init parameters */ + /* read trunk/opal/mca/base/mca_base_param.h for reg_int details*/ + mca_base_component_t *c = &mca_pls_xcpu_component.super.pls_version; + mca_base_param_reg_int(c, "priority", + "Priority of the xcpu pls component", + false, false, 5, &mca_pls_xcpu_component.priority); + mca_base_param_reg_int(c, "debug", + "If > 0 prints library debugging information", + false, false, 0, &mca_pls_xcpu_component.debug); + mca_base_param_reg_int(c, "chatty", "Prints 9P protocol transactions", + false, false, 0, &mca_pls_xcpu_component.chatty); + OBJ_CONSTRUCT(&mca_pls_xcpu_component.lock, opal_mutex_t); + OBJ_CONSTRUCT(&mca_pls_xcpu_component.condition, opal_condition_t); + + return rc; +} + +/** + * Closes the pls_xcpu component + */ +int orte_pls_xcpu_component_close(void) +{ + //fprintf(stderr, "orte_pls_xcpu_component_close\n"); + + OBJ_DESTRUCT(&mca_pls_xcpu_component.lock); + OBJ_DESTRUCT(&mca_pls_xcpu_component.condition); + + return ORTE_SUCCESS; +} + +/** + * Initializes the module, + * + * FixMe: do we have to check anything? + */ +orte_pls_base_module_t* orte_pls_xcpu_init(int *priority) +{ + *priority = mca_pls_xcpu_component.priority; + return &orte_pls_xcpu_module; +} + diff --git a/orte/mca/pls/xcpu/poll.c b/orte/mca/pls/xcpu/poll.c new file mode 100644 index 0000000000..e3fad9288a --- /dev/null +++ b/orte/mca/pls/xcpu/poll.c @@ -0,0 +1,271 @@ +/* + * Copyright (C) 2006 by Latchesar Ionkov + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * LATCHESAR IONKOV AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "spfs.h" +//#include "spfsimpl.h" +#include "orte_config.h" +#include "opal/event/event.h" +#include "opal/runtime/opal_progress.h" + +enum { + TblModified = 1, + ChunkSize = 4 +}; + +enum { + Readable = 1, + Writable = 2, + Error = 4, + + Notifying = 32, + Removed = 64 +}; + +typedef struct Spolltbl Spolltbl; +struct Spolltbl { + int shutdown; + int looping; + Spfd* spfds; +}; + +struct Spfd { + int fd; + opal_event_t opevent; + int flags; + int events; + void* aux; + void (*notify)(Spfd *, void *); + + Spfd* prev; + Spfd* next; +}; + +static Spolltbl ptbl; +static struct timeval tval = { 5, 0 }; +static struct timeval *sptval = &tval; + + +static void spfd_handler(int fd, short event, void *aux); +static void sp_setup_event(Spfd *spfd); + +void +sp_poll_stop() +{ + ptbl.shutdown = 1; +} + +int +sp_poll_looping() +{ + return ptbl.looping; +} + +Spfd * +spfd_add(int fd, void (*notify)(Spfd *, void *), void *aux) +{ + Spfd *spfd; + + spfd = sp_malloc(sizeof(*spfd)); + if (!spfd) + return NULL; + +// fprintf(stderr, "spfd_add spfd %p fd %d\n", spfd, fd); + fcntl(fd, F_SETFL, O_NONBLOCK); + spfd->fd = fd; + spfd->flags = 0; + spfd->events = OPAL_EV_READ | OPAL_EV_WRITE; + spfd->aux = aux; + spfd->notify = notify; + + spfd->prev = NULL; + spfd->next = ptbl.spfds; + ptbl.spfds = spfd; + + sp_setup_event(spfd); + return spfd; +} + +void +spfd_remove(Spfd *spfd) +{ +// fprintf(stderr, "spfd_remove spfd %p\n", spfd); + if (spfd->prev) + spfd->prev->next = spfd->next; + else + ptbl.spfds = spfd->next; + + if (spfd->next) + spfd->next->prev = spfd->prev; + + if (spfd->flags & Notifying) + spfd->flags |= Removed; + else { + opal_event_del(&spfd->opevent); + free(spfd); + } +} + +void +spfd_remove_all(void) +{ + Spfd *spfd, *spfd1; + + spfd = ptbl.spfds; + while (spfd != NULL) { + spfd1 = spfd->next; + opal_event_del(&spfd->opevent); + free(spfd); + spfd = spfd1; + } +} + +int +spfd_can_read(Spfd *spfd) +{ + return spfd->flags & Readable; +} + +int +spfd_can_write(Spfd *spfd) +{ + return spfd->flags & Writable; +} + +int +spfd_has_error(Spfd *spfd) +{ + return spfd->flags & Error; +} + +int +spfd_read(Spfd *spfd, void *buf, int buflen) +{ + int ret; + + if (buflen) + ret = read(spfd->fd, buf, buflen); + else + ret = 0; + + spfd->flags &= ~Readable; + spfd->events |= OPAL_EV_READ; + if (!(spfd->flags & Notifying)) { + opal_event_del(&spfd->opevent); + sp_setup_event(spfd); + } + + return ret; +} + +int +spfd_write(Spfd *spfd, void *buf, int buflen) +{ + int ret; + + if (buflen) + ret = write(spfd->fd, buf, buflen); + else + ret = 0; + + spfd->flags &= ~Writable; + spfd->events |= OPAL_EV_WRITE; + if (!(spfd->flags & Notifying)) { + opal_event_del(&spfd->opevent); + sp_setup_event(spfd); + } + + return ret; +} + +static void +spfd_handler(int fd, short event, void *aux) +{ + int flags, events; + Spfd *spfd; + + spfd = aux; + +// fprintf(stderr, "spfd_handler spfd %p event %d events %d flags %d\n", spfd, event, spfd->events, spfd->flags); + flags = spfd->flags; + events = spfd->events; + + if (event & OPAL_EV_READ) { + spfd->events &= ~OPAL_EV_READ; + flags |= Readable; + } + + if (event & OPAL_EV_WRITE) { + spfd->events &= ~OPAL_EV_WRITE; + flags |= Writable; + } + + if (spfd->flags != flags) { + spfd->flags = flags | Notifying; + (*spfd->notify)(spfd, spfd->aux); + spfd->flags &= ~Notifying; + } + + if (spfd->flags & Removed) { + free(spfd); + return; + } + + sp_setup_event(spfd); +} + +static void +sp_setup_event(Spfd *spfd) +{ +// fprintf(stderr, "sp_setup_event "); +// sp_printtime(stderr); +// fprintf(stderr, " spfd %p events %d\n", spfd, spfd->events); + opal_event_set(&spfd->opevent, spfd->fd, spfd->events, spfd_handler, spfd); + opal_event_add(&spfd->opevent, sptval); +} + +void +sp_poll_once(void) +{ + ptbl.looping = 1; + opal_progress(); + ptbl.looping = 0; +} + +void +sp_poll_loop() +{ + ptbl.shutdown = 0; + ptbl.looping = 1; + while (!ptbl.shutdown) { + opal_progress(); + } + ptbl.looping = 0; +} diff --git a/orte/mca/sds/base/base.h b/orte/mca/sds/base/base.h index 702c36488d..f5dffdd43b 100644 --- a/orte/mca/sds/base/base.h +++ b/orte/mca/sds/base/base.h @@ -82,6 +82,10 @@ extern "C" { orte_vpid_t vpid_start, orte_vpid_t global_vpid_start, int num_procs, char ***env); + ORTE_DECLSPEC int orte_ns_nds_xcpu_put(orte_cellid_t cell, + orte_jobid_t job, + orte_vpid_t vpid_start, + int num_procs, char ***env); ORTE_DECLSPEC extern opal_list_t orte_sds_base_components_available; diff --git a/orte/mca/sds/base/sds_base_put.c b/orte/mca/sds/base/sds_base_put.c index b6e31b36e0..331b0dae9a 100644 --- a/orte/mca/sds/base/sds_base_put.c +++ b/orte/mca/sds/base/sds_base_put.c @@ -243,6 +243,93 @@ int orte_ns_nds_bproc_put(orte_cellid_t cell, orte_jobid_t job, return ORTE_SUCCESS; } +/** + * sets up the environment so that a process launched with the xcpu launcher can + * figure out its name + * @param cell the cell that the process belongs to. + * @param job the job the process belongs to + * @param vpid_start the starting vpid for the current parallel launch + * @param global_vpid_start the starting vpid for the job + * @param num_procs the number of user processes in the job + * @param env a pointer to the environment to setup + * @retval ORTE_SUCCESS + * @retval error + */ +int orte_ns_nds_xcpu_put(orte_cellid_t cell, orte_jobid_t job, + orte_vpid_t vpid_start, int num_procs, + char ***env) +{ + char* param; + char* value; + int rc; + + /* set the mode to xcpu */ + if(NULL == (param = mca_base_param_environ_variable("ns","nds",NULL))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + opal_setenv(param, "xcpu", true, env); + free(param); + + /* since we want to pass the name as separate components, make sure + * that the "name" environmental variable is cleared! + */ + if(NULL == (param = mca_base_param_environ_variable("ns","nds","name"))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + opal_unsetenv(param, env); + free(param); + + /* setup the name */ + if(ORTE_SUCCESS != (rc = orte_ns.convert_cellid_to_string(&value, cell))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if(NULL == (param = mca_base_param_environ_variable("ns","nds","cellid"))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + opal_setenv(param, value, true, env); + free(param); + free(value); + + if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&value, job))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if(NULL == (param = mca_base_param_environ_variable("ns","nds","jobid"))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + opal_setenv(param, value, true, env); + free(param); + free(value); + + rc = orte_ns.convert_vpid_to_string(&value, vpid_start); + if (ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return(rc); + } + if(NULL == (param = mca_base_param_environ_variable("ns","nds","vpid_start"))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + opal_setenv(param, value, true, env); + free(param); + free(value); + + asprintf(&value, "%d", num_procs); + if(NULL == (param = mca_base_param_environ_variable("ns","nds","num_procs"))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + opal_setenv(param, value, true, env); + free(param); + free(value); + + return ORTE_SUCCESS; +} int orte_ns_nds_pipe_put(const orte_process_name_t* name, orte_vpid_t vpid_start, size_t num_procs, int fd) { diff --git a/orte/mca/sds/xcpu/Makefile.am b/orte/mca/sds/xcpu/Makefile.am new file mode 100644 index 0000000000..5a266424f6 --- /dev/null +++ b/orte/mca/sds/xcpu/Makefile.am @@ -0,0 +1,54 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Use the top-level Makefile.options + + + +AM_CPPFLAGS = $(sds_xcpu_CPPFLAGS) + +sources = \ + sds_xcpu.h \ + sds_xcpu_component.c \ + sds_xcpu_module.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if OMPI_BUILD_sds_xcpu_DSO +component_noinst = +component_install = mca_sds_xcpu.la +else +component_noinst = libmca_sds_xcpu.la +component_install = +endif + +mcacomponentdir = $(libdir)/openmpi +mcacomponent_LTLIBRARIES = $(component_install) +mca_sds_xcpu_la_SOURCES = $(sources) +mca_sds_xcpu_la_LDFLAGS = -module -avoid-version $(sds_xcpu_LDFLAGS) +mca_sds_xcpu_la_LIBADD = \ + $(sds_xcpu_LIBS) \ + $(top_ompi_builddir)/orte/libopen-rte.la \ + $(top_ompi_builddir)/opal/libopen-pal.la + +noinst_LTLIBRARIES = $(component_noinst) +libmca_sds_xcpu_la_SOURCES =$(sources) +libmca_sds_xcpu_la_LDFLAGS = -module -avoid-version $(sds_xcpu_LDFLAGS) +libmca_sds_xcpu_la_LIBADD = $(sds_xcpu_LIBS) diff --git a/orte/mca/sds/xcpu/configure.m4 b/orte/mca/sds/xcpu/configure.m4 new file mode 100644 index 0000000000..1b4b5b0dca --- /dev/null +++ b/orte/mca/sds/xcpu/configure.m4 @@ -0,0 +1,36 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_sds_xcpu_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_sds_xcpu_CONFIG],[ + OMPI_CHECK_XCPU([sds_xcpu], [sds_xcpu_good=1], [sds_xcpu_good=0]) + + # if check worked, set wrapper flags if so. + # Evaluate succeed / fail + AS_IF([test "$sds_xcpu_good" = "1"], + [sds_xcpu_WRAPPER_EXTRA_LDFLAGS="$sds_xcpu_LDFLAGS" + sds_xcpu_WRAPPER_EXTRA_LIBS="$sds_xcpu_LIBS" + $1], + [$2]) + # set build flags to use in makefile + AC_SUBST([sds_xcpu_CPPFLAGS]) + AC_SUBST([sds_xcpu_LDFLAGS]) + AC_SUBST([sds_xcpu_LIBS]) +])dnl diff --git a/orte/mca/sds/xcpu/configure.params b/orte/mca/sds/xcpu/configure.params new file mode 100644 index 0000000000..301de8a3b9 --- /dev/null +++ b/orte/mca/sds/xcpu/configure.params @@ -0,0 +1,21 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +PARAM_INIT_FILE=sds_xcpu_component.c +PARAM_CONFIG_FILES="Makefile" diff --git a/orte/mca/sds/xcpu/sds_xcpu.h b/orte/mca/sds/xcpu/sds_xcpu.h new file mode 100644 index 0000000000..656cd3bf67 --- /dev/null +++ b/orte/mca/sds/xcpu/sds_xcpu.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef ORTE_SDS_XCPU_H +#define ORTE_SDS_XCPU_H + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + + /* + * Module open / close + */ + int orte_sds_xcpu_component_open(void); + int orte_sds_xcpu_component_close(void); + orte_sds_base_module_t* orte_sds_xcpu_component_init(int *priority); + + /* + * Startup / Shutdown + */ + int orte_sds_xcpu_finalize(void); + + /* + * Module functions + */ + int orte_sds_xcpu_set_name(void); + + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif + +#endif /* ORTE_SDS_XCPU_H */ diff --git a/orte/mca/sds/xcpu/sds_xcpu_component.c b/orte/mca/sds/xcpu/sds_xcpu_component.c new file mode 100644 index 0000000000..13651d5942 --- /dev/null +++ b/orte/mca/sds/xcpu/sds_xcpu_component.c @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * These symbols are in a file by themselves to provide nice linker + * semantics. Since linkers generally pull in symbols by object + * files, keeping these symbols as the only symbols in this file + * prevents utility programs such as "ompi_info" from having to import + * entire components just to query their version and parameters. + */ + +#include "orte_config.h" + +#include "orte/orte_constants.h" +#include "orte/mca/sds/sds.h" +#include "orte/mca/sds/xcpu/sds_xcpu.h" +#include "opal/mca/base/mca_base_param.h" + +extern orte_sds_base_module_t orte_sds_xcpu_module; + +/* + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ +orte_sds_base_component_t mca_sds_xcpu_component = { + /* First, the mca_component_t struct containing meta information + about the component itself */ + { + /* Indicate that we are a sds v1.0.0 component (which also + implies a specific MCA version) */ + ORTE_SDS_BASE_VERSION_1_0_0, + + /* Component name and version */ + "xcpu", + ORTE_MAJOR_VERSION, + ORTE_MINOR_VERSION, + ORTE_RELEASE_VERSION, + + /* Component open and close functions */ + orte_sds_xcpu_component_open, + orte_sds_xcpu_component_close + }, + + /* Next the MCA v1.0.0 component meta data */ + { + /* Whether the component is checkpointable or not */ + true + }, + + /* Initialization / querying functions */ + orte_sds_xcpu_component_init +}; + + +int +orte_sds_xcpu_component_open(void) +{ + return ORTE_SUCCESS; +} + + +orte_sds_base_module_t * +orte_sds_xcpu_component_init(int *priority) +{ + int id; + char *mode; + + /* okay, not seed/singleton attempt another approach */ + id = mca_base_param_register_string("ns", "nds", NULL, NULL, NULL); + mca_base_param_lookup_string(id, &mode); + + if (NULL == mode || 0 != strcmp("xcpu", mode)) { return NULL; } + + *priority = 30; + return &orte_sds_xcpu_module; +} + + +int +orte_sds_xcpu_component_close(void) +{ + return ORTE_SUCCESS; +} + diff --git a/orte/mca/sds/xcpu/sds_xcpu_module.c b/orte/mca/sds/xcpu/sds_xcpu_module.c new file mode 100644 index 0000000000..5a57599bea --- /dev/null +++ b/orte/mca/sds/xcpu/sds_xcpu_module.c @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "orte_config.h" +//#include + +#include "orte/orte_constants.h" +#include "orte/util/sys_info.h" +#include "opal/util/output.h" +#include "opal/mca/base/mca_base_param.h" +#include "orte/mca/sds/sds.h" +#include "orte/mca/sds/base/base.h" +#include "orte/mca/sds/xcpu/sds_xcpu.h" +#include "orte/mca/ns/ns.h" +#include "orte/mca/ns/base/base.h" +#include "orte/mca/errmgr/base/base.h" + +orte_sds_base_module_t orte_sds_xcpu_module = { + orte_sds_base_basic_contact_universe, + orte_sds_xcpu_set_name, + orte_sds_xcpu_finalize, +}; + +/** + * Sets up the process name from the information put into the environment + * by the xcpu launcher and orte_ns_nds_xcpu_put. + * @retval ORTE_SUCCESS + * @retval error + */ +int orte_sds_xcpu_set_name(void) +{ + int rc; + int id; + char* name_string = NULL; + + id = mca_base_param_register_string("ns", "nds", "name", NULL, NULL); + mca_base_param_lookup_string(id, &name_string); + if(name_string != NULL) { + if (ORTE_SUCCESS != (rc = orte_ns.convert_string_to_process_name( + &(orte_process_info.my_name), + name_string))) { + ORTE_ERROR_LOG(rc); + free(name_string); + return rc; + } + free(name_string); + + } else { + + orte_cellid_t cellid; + orte_jobid_t jobid; + orte_vpid_t vpid; + orte_vpid_t vpid_start; + char* cellid_string; + char* jobid_string; + char* vpid_string; + int num_procs; + char *xcpu_rank_string; + int xcpu_rank; + int stride; + + id = mca_base_param_register_string("ns", "nds", "cellid", NULL, NULL); + mca_base_param_lookup_string(id, &cellid_string); + if (NULL == cellid_string) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + } + if (ORTE_SUCCESS != (rc = orte_ns.convert_string_to_cellid(&cellid, cellid_string))) { + ORTE_ERROR_LOG(rc); + return(rc); + } + + id = mca_base_param_register_string("ns", "nds", "jobid", NULL, NULL); + mca_base_param_lookup_string(id, &jobid_string); + if (NULL == jobid_string) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + } + if (ORTE_SUCCESS != (rc = orte_ns.convert_string_to_jobid(&jobid, jobid_string))) { + ORTE_ERROR_LOG(rc); + return(rc); + } + + /* XCPUID is set by xcpu when we do a parallel launch */ + xcpu_rank_string = getenv("XCPUID"); + if (NULL == xcpu_rank_string) { + opal_output(0, "orte_ns_nds_xcpu_get: Error: Environment variable " + "XCPUID not found.\n"); + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + } + xcpu_rank_string = strstr(xcpu_rank_string, "/"); + + xcpu_rank = (int)strtol(xcpu_rank_string+1, NULL, 10); + + id = mca_base_param_register_string("ns", "nds", "vpid_start", NULL, NULL); + mca_base_param_lookup_string(id, &vpid_string); + if (NULL == vpid_string) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + } + rc = orte_ns.convert_string_to_vpid(&vpid_start, vpid_string); + if (ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return(rc); + } + + /* compute our vpid */ + vpid = vpid_start + xcpu_rank - 1; + + /* create our name */ + if (ORTE_SUCCESS != (rc = orte_ns.create_process_name( + &(orte_process_info.my_name), + cellid, + jobid, + vpid))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + id = mca_base_param_register_int("ns", "nds", "num_procs", NULL, -1); + mca_base_param_lookup_int(id, &num_procs); + if (num_procs < 0) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + } + orte_process_info.num_procs = (size_t)num_procs; + +#if 0 + id = mca_base_param_register_string("ns", "nds", "global_vpid_start", NULL, NULL); + mca_base_param_lookup_string(id, &vpid_string); + if (NULL == vpid_string) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + } + rc = orte_ns.convert_string_to_vpid(&orte_process_info.vpid_start, vpid_string); + if (ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return(rc); + } + + if(NULL != orte_system_info.nodename) + free(orte_system_info.nodename); + //asprintf(&orte_system_info.nodename, "%d", xcpu_currnode()); +#endif + } + return ORTE_SUCCESS; +} + + +int +orte_sds_xcpu_finalize(void) +{ + return ORTE_SUCCESS; +}