1
1

Add initial support for dynamic allocations. At this time, only Slurm supports the new capability, which will be included in an upcoming release.

Add hooks for supporting dynamic allocation and deallocation to support application-driven requests and fault recovery operations.

This commit was SVN r27879.
This commit is contained in:
Ralph Castain 2013-01-20 00:33:42 +00:00
parent e4673f3283
commit a591fbf06f
18 changed files with 807 additions and 75 deletions

View File

@ -126,7 +126,8 @@ enum {
ORTE_ERR_SENSOR_LIMIT_EXCEEDED = (ORTE_ERR_BASE - 43),
ORTE_ERR_JOB_ENTRY_NOT_FOUND = (ORTE_ERR_BASE - 44),
ORTE_ERR_PROC_ENTRY_NOT_FOUND = (ORTE_ERR_BASE - 45),
ORTE_ERR_DATA_VALUE_NOT_FOUND = (ORTE_ERR_BASE - 46)
ORTE_ERR_DATA_VALUE_NOT_FOUND = (ORTE_ERR_BASE - 46),
ORTE_ERR_ALLOCATION_PENDING = (ORTE_ERR_BASE - 47)
};
#define ORTE_ERR_MAX (ORTE_ERR_BASE - 100)

View File

@ -160,7 +160,8 @@ static void job_errors(int fd, short args, void *cbdata)
*/
orte_abnormal_term_ordered = true;
}
if (ORTE_JOB_STATE_NEVER_LAUNCHED == jobstate) {
if (ORTE_JOB_STATE_NEVER_LAUNCHED == jobstate ||
ORTE_JOB_STATE_ALLOC_FAILED == jobstate) {
orte_never_launched = true;
jdata->num_terminated = jdata->num_procs;
ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_TERMINATED);

View File

@ -76,7 +76,9 @@ static const orte_ras_alps_sysconfig_t sysconfigs[] = {
/* /// Global Variables /// */
orte_ras_base_module_t orte_ras_alps_module = {
NULL,
orte_ras_alps_allocate,
NULL,
orte_ras_alps_finalize
};

View File

@ -142,6 +142,12 @@ void orte_ras_base_allocate(int fd, short args, void *cbdata)
if (NULL != orte_ras_base.active_module) {
/* read the allocation */
if (ORTE_SUCCESS != (rc = orte_ras_base.active_module->allocate(jdata, &nodes))) {
if (ORTE_ERR_ALLOCATION_PENDING == rc) {
/* an allocation request is underway, so just do nothing */
OBJ_DESTRUCT(&nodes);
OBJ_RELEASE(caddy);
return;
}
if (ORTE_ERR_SYSTEM_WILL_BOOTSTRAP == rc) {
/* this module indicates that nodes will be discovered
* on a bootstrap basis, so all we do here is add our
@ -149,6 +155,26 @@ void orte_ras_base_allocate(int fd, short args, void *cbdata)
*/
goto addlocal;
}
if (ORTE_ERR_TAKE_NEXT_OPTION == rc) {
/* we have an active module, but it is unable to
* allocate anything for this job - this indicates
* that it isn't a fatal error, but could be if
* an allocation is required
*/
if (orte_allocation_required) {
/* an allocation is required, so this is fatal */
OBJ_DESTRUCT(&nodes);
orte_show_help("help-ras-base.txt", "ras-base:no-allocation", true);
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
} else {
/* an allocation is not required, so we can just
* run on the local node - go add it
*/
goto addlocal;
}
}
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&nodes);
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);

View File

@ -52,6 +52,9 @@ int orte_ras_base_select(void)
/* Save the winner */
/* No component saved */
orte_ras_base.active_module = best_module;
if (NULL != orte_ras_base.active_module->init) {
return orte_ras_base.active_module->init();
}
return ORTE_SUCCESS;
}

View File

@ -51,7 +51,9 @@ void ras_get_cluster_message(ICluster* pCluster);
* Local variables
*/
orte_ras_base_module_t orte_ras_ccp_module = {
NULL,
orte_ras_ccp_allocate,
NULL,
orte_ras_ccp_finalize
};

View File

@ -47,7 +47,9 @@ static int get_slot_count(char* node_name, int* slot_cnt);
* Global variable
*/
orte_ras_base_module_t orte_ras_gridengine_module = {
NULL,
orte_ras_gridengine_allocate,
NULL,
orte_ras_gridengine_finalize
};

View File

@ -50,7 +50,9 @@ static int ll_getline(FILE *fp, char *input);
* Global variable
*/
orte_ras_base_module_t orte_ras_loadleveler_module = {
NULL,
orte_ras_loadleveler_allocate,
NULL,
orte_ras_loadleveler_finalize
};

View File

@ -46,7 +46,9 @@ static int finalize(void);
* Global variable
*/
orte_ras_base_module_t orte_ras_lsf_module = {
NULL,
allocate,
NULL,
finalize
};

View File

@ -73,12 +73,19 @@ ORTE_DECLSPEC extern opal_event_t orte_allocate_event;
* but are defined here by convention
*/
/* init the module */
typedef int (*orte_ras_base_module_init_fn_t)(void);
/**
* Allocate resources to a job.
*/
typedef int (*orte_ras_base_module_allocate_fn_t)(orte_job_t *jdata,
opal_list_t *nodes);
/* deallocate resources */
typedef void (*orte_ras_base_module_dealloc_fn_t)(orte_job_t *jdata,
orte_app_context_t *app);
/**
* Cleanup module resources.
*/
@ -88,10 +95,13 @@ typedef int (*orte_ras_base_module_finalize_fn_t)(void);
* ras module
*/
struct orte_ras_base_module_2_0_0_t {
/** init */
orte_ras_base_module_init_fn_t init;
/** Allocation function pointer */
orte_ras_base_module_allocate_fn_t allocate;
orte_ras_base_module_allocate_fn_t allocate;
orte_ras_base_module_dealloc_fn_t deallocate;
/** Finalization function pointer */
orte_ras_base_module_finalize_fn_t finalize;
orte_ras_base_module_finalize_fn_t finalize;
};
/** Convenience typedef */
typedef struct orte_ras_base_module_2_0_0_t orte_ras_base_module_2_0_0_t;

View File

@ -36,7 +36,9 @@ static int finalize(void);
* Global variable
*/
orte_ras_base_module_t orte_ras_sim_module = {
NULL,
allocate,
NULL,
finalize
};

View File

@ -42,3 +42,37 @@ However, an error was encountered when trying to parse the following variable:
%s
#This is a fatal error.
[slurm-dyn-alloc-timeout]
We attempted to obtain a dynamic allocation from Slurm, but
contact with the Slurm control daemon timed out. Please check
that the Slurm dynamic allocation plug-in is properly operating.
#
[slurm-dyn-alloc-failed]
Allocation request: %s
#
[dyn-alloc-no-config]
Dynamic allocation was enabled, but no Slurm configuration
file was given. Please provide the required configuration file.
#
[host-not-resolved]
The Slurm control host could not be resolved.
Host: %s
Please check the Slurm configuration and try again.
#
[connection-failed]
Connection to the Slurm controller failed:
Host: %s
Port: %d
Please check the Slurm configuration and try again.
#
[config-file-not-found]
The Slurm configuration file was not found.
File: %s
Please check the filename and path and try again.

View File

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -29,7 +31,15 @@
BEGIN_C_DECLS
ORTE_DECLSPEC extern orte_ras_base_component_t mca_ras_slurm_component;
typedef struct {
orte_ras_base_component_t super;
int timeout;
bool dyn_alloc_enabled;
char *config_file;
bool rolling_alloc;
} orte_ras_slurm_component_t;
ORTE_DECLSPEC extern orte_ras_slurm_component_t mca_ras_slurm_component;
ORTE_DECLSPEC extern orte_ras_base_module_t orte_ras_slurm_module;
END_C_DECLS

View File

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -21,82 +23,106 @@
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/util/net.h"
#include "opal/opal_socket_errno.h"
#include "orte/util/name_fns.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/ras/base/ras_private.h"
#include "ras_slurm.h"
/*
* Local variables
*/
static int param_priority;
/*
* Local functions
*/
static int ras_slurm_open(void);
static int ras_slurm_close(void);
static int orte_ras_slurm_component_query(mca_base_module_t **module, int *priority);
orte_ras_base_component_t mca_ras_slurm_component = {
/* First, the mca_base_component_t struct containing meta
information about the component itself */
orte_ras_slurm_component_t mca_ras_slurm_component = {
{
/* First, the mca_base_component_t struct containing meta
information about the component itself */
{
ORTE_RAS_BASE_VERSION_2_0_0,
{
ORTE_RAS_BASE_VERSION_2_0_0,
/* Component name and version */
"slurm",
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
/* Component name and version */
"slurm",
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
/* Component open and close functions */
ras_slurm_open,
NULL,
orte_ras_slurm_component_query
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
/* Component open and close functions */
ras_slurm_open,
ras_slurm_close,
orte_ras_slurm_component_query
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
}
}
};
static int ras_slurm_open(void)
{
param_priority =
mca_base_param_reg_int(&mca_ras_slurm_component.base_version,
"priority",
"Priority of the slurm ras component",
false, false, 75, NULL);
int tmp;
mca_base_param_reg_int(&mca_ras_slurm_component.super.base_version,
"dyn_allocate_timeout",
"Number of seconds to wait for Slurm dynamic allocation",
false, false, 30, &mca_ras_slurm_component.timeout);
mca_base_param_reg_int(&mca_ras_slurm_component.super.base_version,
"enable_dyn_alloc",
"Whether or not dynamic allocations are enabled",
false, false, (int)false, &tmp);
mca_ras_slurm_component.dyn_alloc_enabled = OPAL_INT_TO_BOOL(tmp);
mca_base_param_reg_string(&mca_ras_slurm_component.super.base_version,
"config_file",
"Path to Slurm configuration file",
false, false, NULL, &mca_ras_slurm_component.config_file);
mca_base_param_reg_int(&mca_ras_slurm_component.super.base_version,
"enable_rolling_alloc",
"Enable partial dynamic allocations",
false, false, (int)false, &tmp);
mca_ras_slurm_component.rolling_alloc = OPAL_INT_TO_BOOL(tmp);
return ORTE_SUCCESS;
}
static int ras_slurm_close(void)
{
if (NULL != mca_ras_slurm_component.config_file) {
free(mca_ras_slurm_component.config_file);
}
return ORTE_SUCCESS;
}
static int orte_ras_slurm_component_query(mca_base_module_t **module, int *priority)
{
/* Are we running under a SLURM job? */
if (NULL != getenv("SLURM_JOBID")) {
mca_base_param_lookup_int(param_priority, priority);
OPAL_OUTPUT_VERBOSE((2, orte_ras_base.ras_output,
"%s ras:slurm: available for selection",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
*module = (mca_base_module_t *) &orte_ras_slurm_module;
return ORTE_SUCCESS;
}
/* Sadly, no */
/* if I built, then slurm support is available. If
* I am not in a Slurm allocation, and dynamic alloc
* is not enabled, then I'll deal with that by returning
* an appropriate status code upon allocation
*/
OPAL_OUTPUT_VERBOSE((2, orte_ras_base.ras_output,
"%s ras:slurm: NOT available for selection",
"%s ras:slurm: available for selection",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
*module = NULL;
return ORTE_ERROR;
/* since only one RM can exist on a cluster, just set
* my priority to something - the other components won't
* be responding anyway
*/
*priority = 50;
*module = (mca_base_module_t *) &orte_ras_slurm_module;
return ORTE_SUCCESS;
}

View File

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights
* Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
@ -24,39 +24,195 @@
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#include <fcntl.h>
#include <stdlib.h>
#include "opal/util/argv.h"
#include "opal/util/net.h"
#include "opal/util/output.h"
#include "opal/opal_socket_errno.h"
#include "orte/util/show_help.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/base/base.h"
#include "orte/mca/state/state.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/ras/base/ras_private.h"
#include "ras_slurm.h"
#define ORTE_SLURM_DYN_MAX_SIZE 256
/*
* Local functions
* API functions
*/
static int init(void);
static int orte_ras_slurm_allocate(orte_job_t *jdata, opal_list_t *nodes);
static void deallocate(orte_job_t *jdata,
orte_app_context_t *app);
static int orte_ras_slurm_finalize(void);
/*
* RAS slurm module
*/
orte_ras_base_module_t orte_ras_slurm_module = {
init,
orte_ras_slurm_allocate,
deallocate,
orte_ras_slurm_finalize
};
/* Local functions */
static int orte_ras_slurm_discover(char *regexp, char* tasks_per_node,
opal_list_t *nodelist);
static int orte_ras_slurm_parse_ranges(char *base, char *ranges, char ***nodelist);
static int orte_ras_slurm_parse_range(char *base, char *range, char ***nodelist);
static int dyn_allocate(orte_job_t *jdata);
static char* get_node_list(orte_app_context_t *app);
static int parse_alloc_msg(char *msg, int *idx, int *sjob,
char **nodelist, char **tpn);
static void recv_data(int fd, short args, void *cbdata);
static void timeout(int fd, short args, void *cbdata);
static int read_ip_port(char *filename, char **ip, uint16_t *port);
/*
* Global variable
*/
orte_ras_base_module_t orte_ras_slurm_module = {
orte_ras_slurm_allocate,
orte_ras_slurm_finalize
};
/* define structs for tracking dynamic allocations */
typedef struct {
opal_object_t super;
int sjob;
} local_apptracker_t;
OBJ_CLASS_INSTANCE(local_apptracker_t,
opal_object_t,
NULL, NULL);
typedef struct {
opal_list_item_t super;
char *cmd;
opal_event_t timeout_ev;
orte_jobid_t jobid;
opal_pointer_array_t apps;
int napps;
} local_jobtracker_t;
static void jtrk_cons(local_jobtracker_t *ptr)
{
ptr->cmd = NULL;
OBJ_CONSTRUCT(&ptr->apps, opal_pointer_array_t);
opal_pointer_array_init(&ptr->apps, 1, INT_MAX, 1);
ptr->napps = 0;
}
static void jtrk_des(local_jobtracker_t *ptr)
{
int i;
local_apptracker_t *ap;
if (NULL != ptr->cmd) {
free(ptr->cmd);
}
for (i=0; i < ptr->apps.size; i++) {
if (NULL != (ap = (local_apptracker_t*)opal_pointer_array_get_item(&ptr->apps, i))) {
OBJ_RELEASE(ap);
}
}
OBJ_DESTRUCT(&ptr->apps);
}
OBJ_CLASS_INSTANCE(local_jobtracker_t,
opal_list_item_t,
jtrk_cons, jtrk_des);
/* local vars */
static int socket_fd;
static opal_list_t jobs;
static opal_event_t recv_ev;
/* init the module */
static int init(void)
{
char *slurm_host;
uint16_t port;
struct sockaddr_in address;
int flags;
struct hostent *h;
if (mca_ras_slurm_component.dyn_alloc_enabled) {
if (NULL == mca_ras_slurm_component.config_file) {
orte_show_help("help-ras-slurm.txt", "dyn-alloc-no-config", true);
return ORTE_ERR_SILENT;
}
/* setup the socket */
if (ORTE_SUCCESS != read_ip_port(mca_ras_slurm_component.config_file,
&slurm_host, &port)) {
return ORTE_ERR_SILENT;
}
OPAL_OUTPUT_VERBOSE((2, orte_ras_base.ras_output,
"ras:slurm got [ ip = %s, port = %u ] from %s\n",
slurm_host, port, mca_ras_slurm_component.config_file));
/* obtain a socket for our use */
if ((socket_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* connect to the Slurm dynamic allocation port */
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
if (!opal_net_isaddr(slurm_host)) {
/* if the ControlMachine was not specified as an IP address,
* we need to resolve it here
*/
if (NULL == (h = gethostbyname(slurm_host))) {
/* could not resolve it */
orte_show_help("help-ras-slurm.txt", "host-not-resolved",
true, slurm_host);
free(slurm_host);
return ORTE_ERR_SILENT;
}
free(slurm_host);
slurm_host = strdup(inet_ntoa(*(struct in_addr*)h->h_addr_list[0]));
}
address.sin_addr.s_addr = inet_addr(slurm_host);
address.sin_port = htons(port);
if (connect(socket_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
orte_show_help("help-ras-slurm.txt", "connection-failed",
true, slurm_host, (int)port);
return ORTE_ERR_SILENT;
}
/* set socket up to be non-blocking */
if ((flags = fcntl(socket_fd, F_GETFL, 0)) < 0) {
opal_output(0, "ras:slurm:dyn: fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
} else {
flags |= O_NONBLOCK;
if (fcntl(socket_fd, F_SETFL, flags) < 0) {
opal_output(0, "ras:slurm:dyn: fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
}
/* setup to recv data */
opal_event_set(orte_event_base, &recv_ev, socket_fd,
OPAL_EV_READ, recv_data, NULL);
opal_event_add(&recv_ev, 0);
/* initialize the list of jobs for tracking dynamic allocations */
OBJ_CONSTRUCT(&jobs, opal_list_t);
}
return ORTE_SUCCESS;
}
/**
* Discover available (pre-allocated) nodes. Allocate the
@ -68,20 +224,41 @@ static int orte_ras_slurm_allocate(orte_job_t *jdata, opal_list_t *nodes)
int ret, cpus_per_task;
char *slurm_node_str, *regexp;
char *tasks_per_node, *node_tasks;
char * tmp;
char *tmp;
char *slurm_jobid;
slurm_jobid = getenv("SLURM_JOBID");
/* don't need to check this for NULL as we wouldn't
* have been selected if it wasn't already found
*
* save that value in the global job ident string for
* later use in any error reporting
*/
orte_job_ident = strdup(slurm_jobid);
if (NULL == (slurm_jobid = getenv("SLURM_JOBID"))) {
/* we are not in a slurm allocation - see if dyn alloc
* is enabled
*/
if (!mca_ras_slurm_component.dyn_alloc_enabled) {
/* nope - nothing we can do */
opal_output_verbose(2, orte_ras_base.ras_output,
"%s ras:slurm: no prior allocation and dynamic alloc disabled",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return ORTE_ERR_TAKE_NEXT_OPTION;
}
} else {
/* save this value in the global job ident string for
* later use in any error reporting
*/
orte_job_ident = strdup(slurm_jobid);
}
slurm_node_str = getenv("SLURM_NODELIST");
if (NULL == slurm_node_str) {
/* see if dynamic allocation is enabled */
if (mca_ras_slurm_component.dyn_alloc_enabled) {
/* attempt to get the allocation - the function
* dyn_allocate will return as ORTE_ERR_ALLOCATION_PENDING
* if it succeeds in sending the allocation request
*/
ret = dyn_allocate(jdata);
/* return to the above layer in ras/base/ras_base_allocate.c
* to wait for event (libevent) happening
*/
return ret;
}
orte_show_help("help-ras-slurm.txt", "slurm-env-var-not-found", 1,
"SLURM_NODELIST");
return ORTE_ERR_NOT_FOUND;
@ -146,15 +323,26 @@ static int orte_ras_slurm_allocate(orte_job_t *jdata, opal_list_t *nodes)
return ret;
}
/*
* There's really nothing to do here
*/
static void deallocate(orte_job_t *jdata,
orte_app_context_t *app)
{
}
static int orte_ras_slurm_finalize(void)
{
OPAL_OUTPUT_VERBOSE((1, orte_ras_base.ras_output,
"%s ras:slurm:finalize: success (nothing to do)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_list_item_t *item;
if (mca_ras_slurm_component.dyn_alloc_enabled) {
/* delete the recv event */
opal_event_del(&recv_ev);
while (NULL != (item = opal_list_remove_first(&jobs))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&jobs);
/* close the socket */
shutdown(socket_fd, 2);
close(socket_fd);
}
return ORTE_SUCCESS;
}
@ -531,3 +719,416 @@ static int orte_ras_slurm_parse_range(char *base, char *range, char ***names)
return ORTE_SUCCESS;
}
static void timeout(int fd, short args, void *cbdata)
{
local_jobtracker_t *jtrk = (local_jobtracker_t*)cbdata;
orte_job_t *jdata;
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-timeout", true);
opal_output_verbose(2, orte_ras_base.ras_output,
"%s Timed out on dynamic allocation",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* indicate that we failed to receive an allocation */
jdata = orte_get_job_data_object(jtrk->jobid);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
}
static void recv_data(int fd, short args, void *cbdata)
{
bool found;
int i, rc;
orte_node_t *nd, *nd2;
opal_list_t nds, ndtmp;
opal_list_item_t *item, *itm;
char recv_msg[8192];
int nbytes, idx, sjob;
char **alloc, *nodelist, *tpn;
local_jobtracker_t *ptr, *jtrk;
local_apptracker_t *aptrk;
orte_app_context_t *app;
orte_jobid_t jobid;
orte_job_t *jdata;
opal_output_verbose(2, orte_ras_base.ras_output,
"%s ras:slurm: dynamic allocation - data recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* read the data from the socket and put it in the
* nodes field of op
*/
nbytes = read(fd, recv_msg, 8192);
opal_output_verbose(2, orte_ras_base.ras_output,
"%s ras:slurm: dynamic allocation msg: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg);
/* check if we got something */
if (0 == nbytes || 0 == strlen(recv_msg) || strstr(recv_msg, "failure") != NULL) {
/* show an error here - basically, a "nothing was available"
* message
*/
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true,
(NULL == recv_msg) ? "NO MSG" : recv_msg);
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALLOC_FAILED);
return;
}
/* break the message into its component parts, separated by colons */
alloc = opal_argv_split(recv_msg, ':');
/* the first section contains the ORTE jobid for this allocation */
tpn = strchr(alloc[0], '=');
orte_util_convert_string_to_jobid(&jobid, tpn+1);
/* get the corresponding job object */
jdata = orte_get_job_data_object(jobid);
/* find the associated tracking object */
for (item = opal_list_get_first(&jobs);
item != opal_list_get_end(&jobs);
item = opal_list_get_next(item)) {
ptr = (local_jobtracker_t*)item;
if (ptr->jobid == jobid) {
jtrk = ptr;
break;
}
}
if (NULL == jtrk) {
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, "NO JOB TRACKER");
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALLOC_FAILED);
return;
}
/* stop the timeout event */
opal_event_del(&jtrk->timeout_ev);
/* cycle across all the remaining parts - each is the allocation for
* an app in this job
*/
OBJ_CONSTRUCT(&nds, opal_list_t);
OBJ_CONSTRUCT(&ndtmp, opal_list_t);
for (i=1; NULL != alloc[i]; i++) {
if (ORTE_SUCCESS != parse_alloc_msg(alloc[i], &idx, &sjob, &nodelist, &tpn)) {
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, jtrk->cmd);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
return;
}
if (idx < 0 || NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, idx))) {
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, jtrk->cmd);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
return;
}
/* track the Slurm jobid */
if (NULL == (aptrk = (local_apptracker_t*)opal_pointer_array_get_item(&jtrk->apps, idx))) {
aptrk = OBJ_NEW(local_apptracker_t);
opal_pointer_array_set_item(&jtrk->apps, idx, aptrk);
}
aptrk->sjob = sjob;
/* release the current dash_host as that contained the *desired* allocation */
opal_argv_free(app->dash_host);
app->dash_host = NULL;
/* since the nodelist/tpn may contain regular expressions, parse them */
if (ORTE_SUCCESS != (rc = orte_ras_slurm_discover(nodelist, tpn, &ndtmp))) {
ORTE_ERROR_LOG(rc);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
return;
}
/* transfer the discovered nodes to our node list, and construct
* the new dash_host entry to match what was allocated
*/
while (NULL != (item = opal_list_remove_first(&ndtmp))) {
nd = (orte_node_t*)item;
opal_argv_append_nosize(&app->dash_host, nd->name);
/* check for duplicates */
found = false;
for (itm = opal_list_get_first(&nds);
itm != opal_list_get_end(&nds);
itm = opal_list_get_next(itm)) {
nd2 = (orte_node_t*)itm;
if (0 == strcmp(nd->name, nd2->name)) {
found = true;
nd2->slots += nd->slots;
OBJ_RELEASE(item);
break;
}
}
if (!found) {
/* append the new node to our list */
opal_list_append(&nds, item);
}
}
/* cleanup */
free(nodelist);
free(tpn);
}
/* cleanup */
opal_argv_free(alloc);
OBJ_DESTRUCT(&ndtmp);
if (opal_list_is_empty(&nds)) {
/* if we get here, then we were able to contact slurm,
* which means we are in an actively managed cluster.
* However, slurm indicated that nothing is currently
* available that meets our requirements. This is a fatal
* situation - we do NOT have the option of running on
* user-specified hosts as the cluster is managed.
*/
OBJ_DESTRUCT(&nds);
orte_show_help("help-ras-base.txt", "ras-base:no-allocation", true);
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
}
/* store the found nodes */
if (ORTE_SUCCESS != (rc = orte_ras_base_node_insert(&nds, jdata))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&nds);
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
return;
}
OBJ_DESTRUCT(&nds);
/* default to no-oversubscribe-allowed for managed systems */
if (!(ORTE_MAPPING_SUBSCRIBE_GIVEN & ORTE_GET_MAPPING_DIRECTIVE(orte_rmaps_base.mapping))) {
ORTE_SET_MAPPING_DIRECTIVE(orte_rmaps_base.mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
}
/* flag that the allocation is managed */
orte_managed_allocation = true;
/* move the job along */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOCATION_COMPLETE);
/* all done */
return;
}
/* we cannot use the RML to communicate with SLURM as it doesn't
* understand our internal protocol, so we have to do a bare-bones
* exchange based on sockets
*/
static int dyn_allocate(orte_job_t *jdata)
{
char *cmd_str, **cmd=NULL, *tmp, *jstring;
char *node_list;
orte_app_context_t *app;
int i;
struct timeval tv;
local_jobtracker_t *jtrk;
if (NULL == mca_ras_slurm_component.config_file) {
opal_output(0, "Cannot perform dynamic allocation as no Slurm configuration file provided");
return ORTE_ERR_NOT_FOUND;
}
/* track this request */
jtrk = OBJ_NEW(local_jobtracker_t);
jtrk->jobid = jdata->jobid;
opal_list_append(&jobs, &jtrk->super);
/* construct the command - note that the jdata structure contains
* a field for the minimum number of nodes required for the job.
* The node list can be constructed from the union of all the nodes
* contained in the dash_host field of the app_contexts. So you'll
* need to do a little work to build the command. We don't currently
* have a field in the jdata structure for "mandatory" vs "optional"
* allocations, so we'll have to add that someday. Likewise, you may
* want to provide a param to adjust the timeout value
*/
/* construct the cmd string */
opal_argv_append_nosize(&cmd, "allocate");
/* add the jobid */
orte_util_convert_jobid_to_string(&jstring, jdata->jobid);
asprintf(&tmp, "jobid=%s", jstring);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
free(jstring);
/* if we want the allocation for all apps in one shot,
* then tell slurm
*
* RHC: we don't currently have the ability to handle
* rolling allocations in the rest of the code base
*/
#if 0
if (!mca_ras_slurm_component.rolling_alloc) {
opal_argv_append_nosize(&cmd, "return=all");
}
#else
opal_argv_append_nosize(&cmd, "return=all");
#endif
/* pass the timeout */
asprintf(&tmp, "timeout=%d", mca_ras_slurm_component.timeout);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
/* for each app, add its allocation request info */
for (i=0; i < jdata->apps->size; i++) {
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
continue;
}
/* add the app id, preceded by a colon separator */
asprintf(&tmp, ": app=%d", (int)app->idx);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
/* add the number of process "slots" we need */
asprintf(&tmp, "np=%d", app->num_procs);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
/* if we were given a minimum number of nodes, pass it along */
if (0 < app->min_number_of_nodes) {
asprintf(&tmp, "N=%ld", app->min_number_of_nodes);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
}
/* add the list of nodes, if one was given, ensuring
* that each node only appears once
*/
node_list = get_node_list(app);
if (NULL != node_list) {
asprintf(&tmp, "node_list=%s", node_list);
opal_argv_append_nosize(&cmd, tmp);
free(node_list);
free(tmp);
}
/* add the mandatory/optional flag */
if (app->mandatory) {
opal_argv_append_nosize(&cmd, "flag=mandatory");
} else {
opal_argv_append_nosize(&cmd, "flag=optional");
}
}
/* assemble it into the final cmd to be sent */
cmd_str = opal_argv_join(cmd, ' ');
opal_argv_free(cmd);
/* start a timer - if the response to our request doesn't appear
* in the defined time, then we will error out as Slurm isn't
* responding to us
*/
opal_event_evtimer_set(orte_event_base, &jtrk->timeout_ev, timeout, jtrk);
tv.tv_sec = mca_ras_slurm_component.timeout * 2;
tv.tv_usec = 0;
opal_event_evtimer_add(&jtrk->timeout_ev, &tv);
opal_output_verbose(2, orte_ras_base.ras_output,
"%s slurm:dynalloc cmd_str = %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cmd_str);
if (send(socket_fd, cmd_str, strlen(cmd_str)+1, 0) < 0) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
}
free(cmd_str);
/* we cannot wait here for a response as we
* are already in an event. So return a value
* that indicates we are waiting for an
* allocation so the base functions know
* that they shouldn't progress the job
*/
return ORTE_ERR_ALLOCATION_PENDING;
}
static int parse_alloc_msg(char *msg, int *idx, int *sjob,
char **nodelist, char **tpn)
{
char *tmp;
char *p_str;
char *pos;
int found=0;
if (msg == NULL || strlen(msg) == 0) {
return ORTE_ERR_BAD_PARAM;
}
tmp = strdup(msg);
p_str = strtok(tmp, " ");
while (p_str) {
if (NULL != strstr(p_str, "slurm_jobid")) {
pos = strchr(p_str, '=');
*sjob = strtol(pos+1, NULL, 10);
found++;
} else if (NULL != strstr(p_str, "allocated_node_list")) {
pos = strchr(p_str, '=');
*nodelist = strdup(pos+1);
found++;
} else if (NULL != strstr(p_str, "tasks_per_node")) {
pos = strchr(p_str, '=');
*tpn = strdup(pos+1);
found++;
} else if (NULL != strstr(p_str, "app")) {
pos = strchr(p_str, '=');
*idx = strtol(pos+1, NULL, 10);
found++;
}
p_str = strtok(NULL, " ");
}
free(tmp);
if (4 != found) {
return ORTE_ERR_NOT_FOUND;
}
return ORTE_SUCCESS;
}
static char* get_node_list(orte_app_context_t *app)
{
int j;
char **total_host = NULL;
char *nodes;
if (NULL == app->dash_host) {
return NULL;
}
for (j=0; NULL != app->dash_host[j]; j++) {
opal_argv_append_unique_nosize(&total_host, app->dash_host[j], false);
}
if (NULL == total_host) {
return NULL;
}
nodes = opal_argv_join(total_host, ',');
opal_argv_free(total_host);
return nodes;
}
static int read_ip_port(char *filename, char **ip, uint16_t *port)
{
FILE *fp;
char line[ORTE_SLURM_DYN_MAX_SIZE];
char *pos;
bool found_port = false;
bool found_ip = false;
if (NULL == (fp = fopen(filename, "r"))) {
orte_show_help("help-ras-slurm.txt", "config-file-not-found", true, filename);
return ORTE_ERR_SILENT;
}
memset(line, 0, ORTE_SLURM_DYN_MAX_SIZE);
while (NULL != fgets(line, ORTE_SLURM_DYN_MAX_SIZE, fp) &&
(!found_ip || !found_port)) {
if (0 == strlen(line)) {
continue;
}
line[strlen(line)-1] = '\0';
if (0 == strncmp(line, "JobSubmitDynAllocPort", strlen("JobSubmitDynAllocPort"))) {
pos = strstr(line, "=") + 1;
*port = strtol(pos, NULL, 10);
found_port = true;
} else if (0 == strncmp(line, "ControlMachine", strlen("ControlMachine"))) {
pos = strstr(line, "=") + 1;
*ip = strdup(pos);
found_ip = true;
}
memset(line, 0, ORTE_SLURM_DYN_MAX_SIZE);
}
fclose(fp);
if (!found_ip) {
opal_output(0, "The IP address or name of the Slurm control machine was not provided");
return ORTE_ERR_NOT_FOUND;
}
if (!found_port) {
opal_output(0, "The IP port of the Slurm dynamic allocation service was not provided");
return ORTE_ERR_NOT_FOUND;
}
return ORTE_SUCCESS;
}

View File

@ -52,7 +52,9 @@ static char *filename;
* Global variable
*/
orte_ras_base_module_t orte_ras_tm_module = {
NULL,
allocate,
NULL,
finalize
};

View File

@ -11,7 +11,7 @@
* All rights reserved.
* Copyright (c) 2007-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2009-2010 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
@ -609,6 +609,8 @@ static void orte_app_context_construct(orte_app_context_t* app_context)
app_context->recovery_defined = false;
app_context->max_restarts = -1000;
app_context->max_procs_per_node = 0;
app_context->mandatory = false;
app_context->min_number_of_nodes = -1; /* no minimum */
}
static void orte_app_context_destructor(orte_app_context_t* app_context)

View File

@ -295,6 +295,10 @@ typedef struct {
int32_t max_restarts;
/* maximum number of procs/node for this app */
orte_vpid_t max_procs_per_node;
/* flag if nodes requested in -host are "mandatory" vs "optional" */
bool mandatory;
/* min number of nodes required */
int64_t min_number_of_nodes;
} orte_app_context_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_app_context_t);