diff --git a/orte/mca/pls/slurm/pls_slurm.h b/orte/mca/pls/slurm/pls_slurm.h index f18027b3a9..4b7b24b2e4 100644 --- a/orte/mca/pls/slurm/pls_slurm.h +++ b/orte/mca/pls/slurm/pls_slurm.h @@ -26,15 +26,22 @@ extern "C" { #endif + struct orte_pls_slurm_component_t { + orte_pls_base_component_t super; + int priority; + int debug; + char *orted; + }; + typedef struct orte_pls_slurm_component_t orte_pls_slurm_component_t; + /* * Globally exported variable */ - OMPI_COMP_EXPORT extern orte_pls_base_component_1_0_0_t - orte_pls_slurm_component; + OMPI_COMP_EXPORT extern orte_pls_slurm_component_t + mca_pls_slurm_component; OMPI_COMP_EXPORT extern orte_pls_base_module_1_0_0_t orte_pls_slurm_module; - OMPI_COMP_EXPORT extern int orte_pls_slurm_param_priorty; #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/orte/mca/pls/slurm/pls_slurm_component.c b/orte/mca/pls/slurm/pls_slurm_component.c index 3ca4fa5b36..74b457826a 100644 --- a/orte/mca/pls/slurm/pls_slurm_component.c +++ b/orte/mca/pls/slurm/pls_slurm_component.c @@ -22,10 +22,11 @@ #include "ompi_config.h" -#include "include/orte_constants.h" -#include "mca/pls/pls.h" +#include "opal/mca/base/mca_base_param.h" +#include "orte/include/orte_constants.h" +#include "orte/mca/pls/pls.h" +#include "orte/mca/pls/base/base.h" #include "pls_slurm.h" -#include "mca/base/mca_base_param.h" /* @@ -35,12 +36,6 @@ const char *mca_pls_slurm_component_version_string = "Open MPI slurm pls MCA component version " ORTE_VERSION; -/* - * Local variable - */ -static int param_priority = -1; - - /* * Local functions */ @@ -53,48 +48,65 @@ static struct orte_pls_base_module_1_0_0_t *pls_slurm_init(int *priority); * and pointers to our public functions in it */ -orte_pls_base_component_1_0_0_t mca_pls_slurm_component = { - - /* First, the mca_component_t struct containing meta information - about the component itself */ +orte_pls_slurm_component_t mca_pls_slurm_component = { { - /* Indicate that we are a pls v1.0.0 component (which also - implies a specific MCA version) */ + /* First, the mca_component_t struct containing meta + information about the component itself */ - ORTE_PLS_BASE_VERSION_1_0_0, + { + /* Indicate that we are a pls v1.0.0 component (which also + implies a specific MCA version) */ - /* Component name and version */ + ORTE_PLS_BASE_VERSION_1_0_0, + + /* Component name and version */ + + "slurm", + ORTE_MAJOR_VERSION, + ORTE_MINOR_VERSION, + ORTE_RELEASE_VERSION, + + /* Component open and close functions */ + + pls_slurm_open, + NULL + }, + + /* Next the MCA v1.0.0 component meta data */ + + { + /* Whether the component is checkpointable or not */ + + true + }, + + /* Initialization / querying functions */ + + pls_slurm_init + } - "slurm", - ORTE_MAJOR_VERSION, - ORTE_MINOR_VERSION, - ORTE_RELEASE_VERSION, - - /* Component open and close functions */ - - pls_slurm_open, - NULL - }, - - /* Next the MCA v1.0.0 component meta data */ - - { - /* Whether the component is checkpointable or not */ - - true - }, - - /* Initialization / querying functions */ - - pls_slurm_init + /* Other orte_pls_slurm_component_t items -- left uninitialized + here; will be initialized in pls_slurm_open() */ }; static int pls_slurm_open(void) { - param_priority = - mca_base_param_register_int("pls", "slurm", "priority", NULL, 75); + mca_base_component_t *comp = &mca_pls_slurm_component.super.pls_version; + + mca_base_param_reg_int(comp, "debug", "Enable debugging of slurm pls", + false, false, 0, + &mca_pls_slurm_component.debug); + + mca_base_param_reg_int(comp, "priority", "Default selection priority", + false, false, 75, + &mca_pls_slurm_component.priority); + + mca_base_param_reg_string(comp, "orted", + "Command to use to start proxy orted", + false, false, "orted", + &mca_pls_slurm_component.orted); return ORTE_SUCCESS; } @@ -102,11 +114,12 @@ static int pls_slurm_open(void) static struct orte_pls_base_module_1_0_0_t *pls_slurm_init(int *priority) { - /* Are we runing under SLURM? */ + /* Are we running under a SLURM job? */ if (NULL != getenv("SLURM_JOBID")) { - mca_base_param_lookup_int(param_priority, priority); - + *priority = mca_pls_slurm_component.priority; + opal_output(orte_pls_base.pls_output, + "pls:slurm: available for selection"); return &orte_pls_slurm_module; } diff --git a/orte/mca/pls/slurm/pls_slurm_module.c b/orte/mca/pls/slurm/pls_slurm_module.c index dae5141c4a..1dd03e71b6 100644 --- a/orte/mca/pls/slurm/pls_slurm_module.c +++ b/orte/mca/pls/slurm/pls_slurm_module.c @@ -22,8 +22,19 @@ #include "ompi_config.h" -#include "include/orte_constants.h" -#include "mca/pls/pls.h" +#include "opal/util/argv.h" +#include "opal/util/output.h" +#include "opal/util/opal_environ.h" +#include "opal/mca/base/mca_base_param.h" +#include "orte/runtime/runtime.h" +#include "orte/include/orte_constants.h" +#include "orte/include/orte_types.h" +#include "orte/include/orte_constants.h" +#include "orte/mca/pls/pls.h" +#include "orte/mca/pls/base/base.h" +#include "orte/mca/ns/base/base.h" +#include "orte/mca/rml/rml.h" +#include "orte/mca/errmgr/errmgr.h" #include "pls_slurm.h" @@ -35,6 +46,8 @@ static int pls_slurm_terminate_job(orte_jobid_t jobid); static int pls_slurm_terminate_proc(const orte_process_name_t *name); static int pls_slurm_finalize(void); +static int pls_slurm_start_proc(char *nodename, int argc, char **argv, + char **env); orte_pls_base_module_1_0_0_t orte_pls_slurm_module = { pls_slurm_launch, @@ -44,25 +57,254 @@ orte_pls_base_module_1_0_0_t orte_pls_slurm_module = { }; +extern char **environ; + + static int pls_slurm_launch(orte_jobid_t jobid) { - return ORTE_ERR_NOT_IMPLEMENTED; + opal_list_t nodes; + opal_list_item_t* item; + size_t num_nodes; + orte_vpid_t vpid; + int node_name_index; + int proc_name_index; + char *jobid_string; + char *uri, *param; + char **argv; + int argc; + int rc; + + /* query the list of nodes allocated to the job - don't need the entire + * mapping - as the daemon/proxy is responsibe for determining the apps + * to launch on each node. + */ + OBJ_CONSTRUCT(&nodes, opal_list_t); + + rc = orte_ras_base_node_query_alloc(&nodes, jobid); + if (ORTE_SUCCESS != rc) { + goto cleanup; + } + + /* + * Allocate a range of vpids for the daemons. + */ + num_nodes = opal_list_get_size(&nodes); + if (num_nodes == 0) { + return ORTE_ERR_BAD_PARAM; + } + rc = orte_ns.reserve_range(0, num_nodes, &vpid); + if (ORTE_SUCCESS != rc) { + goto cleanup; + } + + /* need integer value for command line parameter */ + asprintf(&jobid_string, "%lu", (unsigned long) jobid); + + /* + * start building argv array + */ + argv = NULL; + argc = 0; + + /* add the daemon command (as specified by user) */ + opal_argv_append(&argc, &argv, mca_pls_slurm_component.orted); + + opal_argv_append(&argc, &argv, "--no-daemonize"); + + /* check for debug flags */ + orte_pls_base_proxy_mca_argv(&argc, &argv); + + /* proxy information */ + opal_argv_append(&argc, &argv, "--bootproxy"); + opal_argv_append(&argc, &argv, jobid_string); + opal_argv_append(&argc, &argv, "--name"); + proc_name_index = argc; + opal_argv_append(&argc, &argv, ""); + + /* tell the daemon how many procs are in the daemon's job */ + opal_argv_append(&argc, &argv, "--num_procs"); + asprintf(¶m, "%lu", (unsigned long)(vpid + num_nodes)); + opal_argv_append(&argc, &argv, param); + free(param); + + /* tell the daemon the starting vpid of the daemon's job */ + opal_argv_append(&argc, &argv, "--vpid_start"); + opal_argv_append(&argc, &argv, "0"); + + opal_argv_append(&argc, &argv, "--nodename"); + node_name_index = argc; + opal_argv_append(&argc, &argv, ""); + + /* pass along the universe name and location info */ + opal_argv_append(&argc, &argv, "--universe"); + asprintf(¶m, "%s@%s:%s", orte_universe_info.uid, + orte_universe_info.host, orte_universe_info.name); + opal_argv_append(&argc, &argv, param); + free(param); + + /* setup ns contact info */ + opal_argv_append(&argc, &argv, "--nsreplica"); + if (NULL != orte_process_info.ns_replica_uri) { + uri = strdup(orte_process_info.ns_replica_uri); + } else { + uri = orte_rml.get_uri(); + } + asprintf(¶m, "\"%s\"", uri); + opal_argv_append(&argc, &argv, param); + free(uri); + free(param); + + /* setup gpr contact info */ + opal_argv_append(&argc, &argv, "--gprreplica"); + if (NULL != orte_process_info.gpr_replica_uri) { + uri = strdup(orte_process_info.gpr_replica_uri); + } else { + uri = orte_rml.get_uri(); + } + asprintf(¶m, "\"%s\"", uri); + opal_argv_append(&argc, &argv, param); + free(uri); + free(param); + + if (mca_pls_slurm_component.debug) { + param = opal_argv_join(argv, ' '); + if (NULL != param) { + opal_output(0, "pls:slurm: final top-level argv:"); + opal_output(0, "pls:slurm: %s", param); + free(param); + } + } + + /* + * Iterate through each of the nodes and spin + * up a daemon. + */ + for(item = opal_list_get_first(&nodes); + item != opal_list_get_end(&nodes); + item = opal_list_get_next(item)) { + orte_ras_node_t* node = (orte_ras_node_t*)item; + orte_process_name_t* name; + char* name_string; + char** env; + char* var; + + /* setup node name */ + argv[node_name_index] = node->node_name; + + /* initialize daemons process name */ + rc = orte_ns.create_process_name(&name, node->node_cellid, 0, vpid); + if (ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* setup per-node options */ + if (mca_pls_slurm_component.debug) { + opal_output(0, "pls:slurm: launching on node %s\n", + node->node_name); + } + + /* setup process name */ + rc = orte_ns.get_proc_name_string(&name_string, name); + if (ORTE_SUCCESS != rc) { + opal_output(0, "pls:slurm: unable to create process name"); + exit(-1); + } + argv[proc_name_index] = name_string; + + /* setup environment */ + env = opal_argv_copy(environ); + var = mca_base_param_environ_variable("seed",NULL,NULL); + opal_setenv(var, "0", true, &env); + + /* set the progress engine schedule for this node. + * if node_slots is set to zero, then we default to + * NOT being oversubscribed + */ + if (node->node_slots > 0 && + node->node_slots_inuse > node->node_slots) { + if (mca_pls_slurm_component.debug) { + opal_output(0, "pls:slurm: oversubscribed -- setting mpi_yield_when_idle to 1"); + } + var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle"); + opal_setenv(var, "1", true, &env); + } else { + if (mca_pls_slurm_component.debug) { + opal_output(0, "pls:slurm: not oversubscribed -- setting mpi_yield_when_idle to 0"); + } + var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle"); + opal_setenv(var, "0", true, &env); + } + free(var); + + /* save the daemons name on the node */ + if (ORTE_SUCCESS != (rc = orte_pls_base_proxy_set_node_name(node,jobid,name))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* exec the daemon */ + if (mca_pls_slurm_component.debug) { + param = opal_argv_join(argv, ' '); + if (NULL != param) { + opal_output(0, "pls:slurm: executing: %s", param); + free(param); + } + } + + rc = pls_slurm_start_proc(node->node_name, argc, argv, env); + if (ORTE_SUCCESS != rc) { + opal_output(0, "pls:slurm: start_procs returned error %d", rc); + goto cleanup; + } + + vpid++; + free(name); + } + +cleanup: + while (NULL != (item = opal_list_remove_first(&nodes))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&nodes); + return rc; } static int pls_slurm_terminate_job(orte_jobid_t jobid) { - return ORTE_ERR_NOT_IMPLEMENTED; + return orte_pls_base_proxy_terminate_job(jobid); } +/* + * The way we've used SLURM, we can't kill individual processes -- + * we'll kill the entire job + */ static int pls_slurm_terminate_proc(const orte_process_name_t *name) { - return ORTE_ERR_NOT_IMPLEMENTED; + opal_output(orte_pls_base.pls_output, + "pls:slurm:terminate_proc: not supported"); + return ORTE_ERR_NOT_SUPPORTED; } static int pls_slurm_finalize(void) { - return ORTE_ERR_NOT_IMPLEMENTED; + /* cleanup any pending recvs */ + orte_rml.recv_cancel(ORTE_RML_NAME_ANY, ORTE_RML_TAG_RMGR_CLNT); + + return ORTE_SUCCESS; +} + + +static int pls_slurm_start_proc(char *nodename, int argc, char **argv, + char **env) +{ + char *a = opal_argv_join(argv, ' '); + + printf("SLURM Starting on node %s: %s\n", nodename, a); + free(a); + + return ORTE_SUCCESS; }