1
1
openmpi/orte/mca/ras/slurm/ras_slurm_module.c
Ralph Castain a200e4f865 As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***

Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.

***************************************************************************************

I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.

The code is in  https://bitbucket.org/rhc/ompi-oob2


WHAT:    Rewrite of ORTE OOB

WHY:       Support asynchronous progress and a host of other features

WHEN:    Wed, August 21

SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:

* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)

* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.

* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients

* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort

* only one transport (i.e., component) can be "active"


The revised OOB resolves these problems:

* async progress is used for all application processes, with the progress thread blocking in the event library

* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")

* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.

* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.

* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object

* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions

* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel

* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport

* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active

* all blocking send/recv APIs have been removed. Everything operates asynchronously.


KNOWN LIMITATIONS:

* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline

* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker

* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways

* obviously, not every error path has been tested nor necessarily covered

* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.

* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways

* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC

This commit was SVN r29058.
2013-08-22 16:37:40 +00:00

1136 строки
37 KiB
C

/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include "opal/util/argv.h"
#include "opal/util/net.h"
#include "opal/util/output.h"
#include "opal/opal_socket_errno.h"
#include "orte/util/show_help.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/base/base.h"
#include "orte/mca/state/state.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/ras/base/ras_private.h"
#include "ras_slurm.h"
#define ORTE_SLURM_DYN_MAX_SIZE 256
/*
* API functions
*/
static int init(void);
static int orte_ras_slurm_allocate(orte_job_t *jdata, opal_list_t *nodes);
static void deallocate(orte_job_t *jdata,
orte_app_context_t *app);
static int orte_ras_slurm_finalize(void);
/*
* RAS slurm module
*/
orte_ras_base_module_t orte_ras_slurm_module = {
init,
orte_ras_slurm_allocate,
deallocate,
orte_ras_slurm_finalize
};
/* Local functions */
static int orte_ras_slurm_discover(char *regexp, char* tasks_per_node,
opal_list_t *nodelist);
static int orte_ras_slurm_parse_ranges(char *base, char *ranges, char ***nodelist);
static int orte_ras_slurm_parse_range(char *base, char *range, char ***nodelist);
static int dyn_allocate(orte_job_t *jdata);
static char* get_node_list(orte_app_context_t *app);
static int parse_alloc_msg(char *msg, int *idx, int *sjob,
char **nodelist, char **tpn);
static void recv_data(int fd, short args, void *cbdata);
static void timeout(int fd, short args, void *cbdata);
static int read_ip_port(char *filename, char **ip, uint16_t *port);
/* define structs for tracking dynamic allocations */
typedef struct {
opal_object_t super;
int sjob;
} local_apptracker_t;
OBJ_CLASS_INSTANCE(local_apptracker_t,
opal_object_t,
NULL, NULL);
typedef struct {
opal_list_item_t super;
char *cmd;
opal_event_t timeout_ev;
orte_jobid_t jobid;
opal_pointer_array_t apps;
int napps;
} local_jobtracker_t;
static void jtrk_cons(local_jobtracker_t *ptr)
{
ptr->cmd = NULL;
OBJ_CONSTRUCT(&ptr->apps, opal_pointer_array_t);
opal_pointer_array_init(&ptr->apps, 1, INT_MAX, 1);
ptr->napps = 0;
}
static void jtrk_des(local_jobtracker_t *ptr)
{
int i;
local_apptracker_t *ap;
if (NULL != ptr->cmd) {
free(ptr->cmd);
}
for (i=0; i < ptr->apps.size; i++) {
if (NULL != (ap = (local_apptracker_t*)opal_pointer_array_get_item(&ptr->apps, i))) {
OBJ_RELEASE(ap);
}
}
OBJ_DESTRUCT(&ptr->apps);
}
OBJ_CLASS_INSTANCE(local_jobtracker_t,
opal_list_item_t,
jtrk_cons, jtrk_des);
/* local vars */
static int socket_fd;
static opal_list_t jobs;
static opal_event_t recv_ev;
/* init the module */
static int init(void)
{
char *slurm_host=NULL;
uint16_t port=0;
struct sockaddr_in address;
int flags;
struct hostent *h;
if (mca_ras_slurm_component.dyn_alloc_enabled) {
if (NULL == mca_ras_slurm_component.config_file) {
orte_show_help("help-ras-slurm.txt", "dyn-alloc-no-config", true);
return ORTE_ERR_SILENT;
}
/* setup the socket */
if (ORTE_SUCCESS != read_ip_port(mca_ras_slurm_component.config_file,
&slurm_host, &port) ||
NULL == slurm_host || 0 == port) {
return ORTE_ERR_SILENT;
}
OPAL_OUTPUT_VERBOSE((2, orte_ras_base_framework.framework_output,
"ras:slurm got [ ip = %s, port = %u ] from %s\n",
slurm_host, port, mca_ras_slurm_component.config_file));
/* obtain a socket for our use */
if ((socket_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* connect to the Slurm dynamic allocation port */
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
if (!opal_net_isaddr(slurm_host)) {
/* if the ControlMachine was not specified as an IP address,
* we need to resolve it here
*/
if (NULL == (h = gethostbyname(slurm_host))) {
/* could not resolve it */
orte_show_help("help-ras-slurm.txt", "host-not-resolved",
true, slurm_host);
free(slurm_host);
return ORTE_ERR_SILENT;
}
free(slurm_host);
slurm_host = strdup(inet_ntoa(*(struct in_addr*)h->h_addr_list[0]));
}
address.sin_addr.s_addr = inet_addr(slurm_host);
address.sin_port = htons(port);
if (connect(socket_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
orte_show_help("help-ras-slurm.txt", "connection-failed",
true, slurm_host, (int)port);
return ORTE_ERR_SILENT;
}
/* set socket up to be non-blocking */
if ((flags = fcntl(socket_fd, F_GETFL, 0)) < 0) {
opal_output(0, "ras:slurm:dyn: fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
} else {
flags |= O_NONBLOCK;
if (fcntl(socket_fd, F_SETFL, flags) < 0) {
opal_output(0, "ras:slurm:dyn: fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
}
/* setup to recv data */
opal_event_set(orte_event_base, &recv_ev, socket_fd,
OPAL_EV_READ, recv_data, NULL);
opal_event_add(&recv_ev, 0);
/* initialize the list of jobs for tracking dynamic allocations */
OBJ_CONSTRUCT(&jobs, opal_list_t);
}
return ORTE_SUCCESS;
}
/**
* Discover available (pre-allocated) nodes. Allocate the
* requested number of nodes/process slots to the job.
*
*/
static int orte_ras_slurm_allocate(orte_job_t *jdata, opal_list_t *nodes)
{
int ret, cpus_per_task;
char *slurm_node_str, *regexp;
char *tasks_per_node, *node_tasks;
char *tmp;
char *slurm_jobid;
if (NULL == (slurm_jobid = getenv("SLURM_JOBID"))) {
/* we are not in a slurm allocation - see if dyn alloc
* is enabled
*/
if (!mca_ras_slurm_component.dyn_alloc_enabled) {
/* nope - nothing we can do */
opal_output_verbose(2, orte_ras_base_framework.framework_output,
"%s ras:slurm: no prior allocation and dynamic alloc disabled",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return ORTE_ERR_TAKE_NEXT_OPTION;
}
} else {
/* save this value in the global job ident string for
* later use in any error reporting
*/
orte_job_ident = strdup(slurm_jobid);
}
slurm_node_str = getenv("SLURM_NODELIST");
if (NULL == slurm_node_str) {
/* see if dynamic allocation is enabled */
if (mca_ras_slurm_component.dyn_alloc_enabled) {
/* attempt to get the allocation - the function
* dyn_allocate will return as ORTE_ERR_ALLOCATION_PENDING
* if it succeeds in sending the allocation request
*/
ret = dyn_allocate(jdata);
/* return to the above layer in ras/base/ras_base_allocate.c
* to wait for event (libevent) happening
*/
return ret;
}
orte_show_help("help-ras-slurm.txt", "slurm-env-var-not-found", 1,
"SLURM_NODELIST");
return ORTE_ERR_NOT_FOUND;
}
regexp = strdup(slurm_node_str);
tasks_per_node = getenv("SLURM_JOB_CPUS_PER_NODE");
if (NULL == tasks_per_node) {
/* try an older variation */
tasks_per_node = getenv("SLURM_TASKS_PER_NODE");
if (NULL == tasks_per_node) {
/* couldn't find any version - abort */
orte_show_help("help-ras-slurm.txt", "slurm-env-var-not-found", 1,
"SLURM_TASKS_PER_NODE");
return ORTE_ERR_NOT_FOUND;
}
}
node_tasks = strdup(tasks_per_node);
if(NULL == regexp || NULL == node_tasks) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* get the number of CPUs per task that the user provided to slurm */
tmp = getenv("SLURM_CPUS_PER_TASK");
if(NULL != tmp) {
cpus_per_task = atoi(tmp);
if(0 >= cpus_per_task) {
opal_output(0, "ras:slurm:allocate: Got bad value from SLURM_CPUS_PER_TASK. "
"Variable was: %s\n", tmp);
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
} else {
cpus_per_task = 1;
}
ret = orte_ras_slurm_discover(regexp, node_tasks, nodes);
free(regexp);
free(node_tasks);
if (ORTE_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
"%s ras:slurm:allocate: discover failed!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ret;
}
/* record the number of allocated nodes */
orte_num_allocated_nodes = opal_list_get_size(nodes);
/* All done */
OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
"%s ras:slurm:allocate: success",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
static void deallocate(orte_job_t *jdata,
orte_app_context_t *app)
{
}
static int orte_ras_slurm_finalize(void)
{
opal_list_item_t *item;
if (mca_ras_slurm_component.dyn_alloc_enabled) {
/* delete the recv event */
opal_event_del(&recv_ev);
while (NULL != (item = opal_list_remove_first(&jobs))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&jobs);
/* close the socket */
shutdown(socket_fd, 2);
close(socket_fd);
}
return ORTE_SUCCESS;
}
/**
* Discover the available resources.
*
* In order to fully support slurm, we need to be able to handle
* node regexp/task_per_node strings such as:
* foo,bar 5,3
* foo 5
* foo[2-10,12,99-105],bar,foobar[3-11] 2(x10),5,100(x16)
*
* @param *regexp A node regular expression from SLURM (i.e. SLURM_NODELIST)
* @param *tasks_per_node A tasks per node expression from SLURM
* (i.e. SLURM_TASKS_PER_NODE)
* @param *nodelist A list which has already been constucted to return
* the found nodes in
*/
static int orte_ras_slurm_discover(char *regexp, char *tasks_per_node,
opal_list_t* nodelist)
{
int i, j, len, ret, count, reps, num_nodes;
char *base, **names = NULL;
char *begptr, *endptr, *orig;
int *slots;
bool found_range = false;
bool more_to_come = false;
orig = base = strdup(regexp);
if (NULL == base) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
"%s ras:slurm:allocate:discover: checking nodelist: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
regexp));
do {
/* Find the base */
len = strlen(base);
for (i = 0; i <= len; ++i) {
if (base[i] == '[') {
/* we found a range. this gets dealt with below */
base[i] = '\0';
found_range = true;
break;
}
if (base[i] == ',') {
/* we found a singleton node, and there are more to come */
base[i] = '\0';
found_range = false;
more_to_come = true;
break;
}
if (base[i] == '\0') {
/* we found a singleton node */
found_range = false;
more_to_come = false;
break;
}
}
if(i == 0) {
/* we found a special character at the beginning of the string */
orte_show_help("help-ras-slurm.txt", "slurm-env-var-bad-value",
1, regexp, tasks_per_node, "SLURM_NODELIST");
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
free(orig);
return ORTE_ERR_BAD_PARAM;
}
if (found_range) {
/* If we found a range, now find the end of the range */
for (j = i; j < len; ++j) {
if (base[j] == ']') {
base[j] = '\0';
break;
}
}
if (j >= len) {
/* we didn't find the end of the range */
orte_show_help("help-ras-slurm.txt", "slurm-env-var-bad-value",
1, regexp, tasks_per_node, "SLURM_NODELIST");
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
free(orig);
return ORTE_ERR_BAD_PARAM;
}
ret = orte_ras_slurm_parse_ranges(base, base + i + 1, &names);
if(ORTE_SUCCESS != ret) {
orte_show_help("help-ras-slurm.txt", "slurm-env-var-bad-value",
1, regexp, tasks_per_node, "SLURM_NODELIST");
ORTE_ERROR_LOG(ret);
free(orig);
return ret;
}
if(base[j + 1] == ',') {
more_to_come = true;
base = &base[j + 2];
} else {
more_to_come = false;
}
} else {
/* If we didn't find a range, just add the node */
OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
"%s ras:slurm:allocate:discover: found node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
base));
if(ORTE_SUCCESS != (ret = opal_argv_append_nosize(&names, base))) {
ORTE_ERROR_LOG(ret);
free(orig);
return ret;
}
/* set base equal to the (possible) next base to look at */
base = &base[i + 1];
}
} while(more_to_come);
free(orig);
num_nodes = opal_argv_count(names);
/* Find the number of slots per node */
slots = malloc(sizeof(int) * num_nodes);
if (NULL == slots) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(slots, 0, sizeof(int) * num_nodes);
orig = begptr = strdup(tasks_per_node);
if (NULL == begptr) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
free(slots);
return ORTE_ERR_OUT_OF_RESOURCE;
}
j = 0;
while (begptr) {
count = strtol(begptr, &endptr, 10);
if ((endptr[0] == '(') && (endptr[1] == 'x')) {
reps = strtol((endptr+2), &endptr, 10);
if (endptr[0] == ')') {
endptr++;
}
} else {
reps = 1;
}
/**
* TBP: it seems like it would be an error to have more slot
* descriptions than nodes. Turns out that this valid, and SLURM will
* return such a thing. For instance, if I did:
* srun -A -N 30 -w odin001
* I would get SLURM_NODELIST=odin001 SLURM_TASKS_PER_NODE=4(x30)
* That is, I am allocated 30 nodes, but since I only requested
* one specific node, that's what is in the nodelist.
* I'm not sure this is what users would expect, but I think it is
* more of a SLURM issue than a orte issue, since SLURM is OK with it,
* I'm ok with it
*/
for (i = 0; i < reps && j < num_nodes; i++) {
slots[j++] = count;
}
if (*endptr == ',') {
begptr = endptr + 1;
} else if (*endptr == '\0' || j >= num_nodes) {
break;
} else {
orte_show_help("help-ras-slurm.txt", "slurm-env-var-bad-value", 1,
regexp, tasks_per_node, "SLURM_TASKS_PER_NODE");
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
free(slots);
free(orig);
return ORTE_ERR_BAD_PARAM;
}
}
free(orig);
/* Convert the argv of node names to a list of node_t's */
for (i = 0; NULL != names && NULL != names[i]; ++i) {
orte_node_t *node;
OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
"%s ras:slurm:allocate:discover: adding node %s (%d slot%s)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
names[i], slots[i], (1 == slots[i]) ? "" : "s"));
node = OBJ_NEW(orte_node_t);
if (NULL == node) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
free(slots);
return ORTE_ERR_OUT_OF_RESOURCE;
}
node->name = strdup(names[i]);
node->state = ORTE_NODE_STATE_UP;
node->slots_inuse = 0;
node->slots_max = 0;
node->slots = slots[i];
opal_list_append(nodelist, &node->super);
}
free(slots);
opal_argv_free(names);
/* All done */
return ret;
}
/*
* Parse one or more ranges in a set
*
* @param base The base text of the node name
* @param *ranges A pointer to a range. This can contain multiple ranges
* (i.e. "1-3,10" or "5" or "9,0100-0130,250")
* @param ***names An argv array to add the newly discovered nodes to
*/
static int orte_ras_slurm_parse_ranges(char *base, char *ranges, char ***names)
{
int i, len, ret;
char *start, *orig;
/* Look for commas, the separator between ranges */
len = strlen(ranges);
for (orig = start = ranges, i = 0; i < len; ++i) {
if (',' == ranges[i]) {
ranges[i] = '\0';
ret = orte_ras_slurm_parse_range(base, start, names);
if (ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
start = ranges + i + 1;
}
}
/* Pick up the last range, if it exists */
if (start < orig + len) {
OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
"%s ras:slurm:allocate:discover: parse range %s (2)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
start));
ret = orte_ras_slurm_parse_range(base, start, names);
if (ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
}
/* All done */
return ORTE_SUCCESS;
}
/*
* Parse a single range in a set and add the full names of the nodes
* found to the names argv
*
* @param base The base text of the node name
* @param *ranges A pointer to a single range. (i.e. "1-3" or "5")
* @param ***names An argv array to add the newly discovered nodes to
*/
static int orte_ras_slurm_parse_range(char *base, char *range, char ***names)
{
char *str, temp1[BUFSIZ];
size_t i, j, start, end;
size_t base_len, len, num_len;
size_t num_str_len;
bool found;
int ret;
len = strlen(range);
base_len = strlen(base);
/* Silence compiler warnings; start and end are always assigned
properly, below */
start = end = 0;
/* Look for the beginning of the first number */
for (found = false, i = 0; i < len; ++i) {
if (isdigit((int) range[i])) {
if (!found) {
start = atoi(range + i);
found = true;
break;
}
}
}
if (!found) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
/* Look for the end of the first number */
for (found = false, num_str_len = 0; i < len; ++i, ++num_str_len) {
if (!isdigit((int) range[i])) {
break;
}
}
/* Was there no range, just a single number? */
if (i >= len) {
end = start;
found = true;
}
/* Nope, there was a range. Look for the beginning of the second
number */
else {
for (; i < len; ++i) {
if (isdigit((int) range[i])) {
end = atoi(range + i);
found = true;
break;
}
}
}
if (!found) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
/* Make strings for all values in the range */
len = base_len + num_str_len + 32;
str = malloc(len);
if (NULL == str) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
strcpy(str, base);
for (i = start; i <= end; ++i) {
str[base_len] = '\0';
snprintf(temp1, BUFSIZ - 1, "%lu", (long) i);
/* Do we need zero pading? */
if ((num_len = strlen(temp1)) < num_str_len) {
for (j = base_len; j < base_len + (num_str_len - num_len); ++j) {
str[j] = '0';
}
str[j] = '\0';
}
strcat(str, temp1);
ret = opal_argv_append_nosize(names, str);
if(ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
free(str);
return ret;
}
}
free(str);
/* All done */
return ORTE_SUCCESS;
}
static void timeout(int fd, short args, void *cbdata)
{
local_jobtracker_t *jtrk = (local_jobtracker_t*)cbdata;
orte_job_t *jdata;
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-timeout", true);
opal_output_verbose(2, orte_ras_base_framework.framework_output,
"%s Timed out on dynamic allocation",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* indicate that we failed to receive an allocation */
jdata = orte_get_job_data_object(jtrk->jobid);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
}
static void recv_data(int fd, short args, void *cbdata)
{
bool found;
int i, rc;
orte_node_t *nd, *nd2;
opal_list_t nds, ndtmp;
opal_list_item_t *item, *itm;
char recv_msg[8192];
int nbytes, idx, sjob;
char **alloc, *nodelist, *tpn;
local_jobtracker_t *ptr, *jtrk;
local_apptracker_t *aptrk;
orte_app_context_t *app;
orte_jobid_t jobid;
orte_job_t *jdata;
opal_output_verbose(2, orte_ras_base_framework.framework_output,
"%s ras:slurm: dynamic allocation - data recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* read the data from the socket and put it in the
* nodes field of op
*/
memset(recv_msg, 0, sizeof(recv_msg));
nbytes = read(fd, recv_msg, sizeof(recv_msg) - 1);
opal_output_verbose(2, orte_ras_base_framework.framework_output,
"%s ras:slurm: dynamic allocation msg: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg);
/* check if we got something */
if (0 == nbytes || 0 == strlen(recv_msg) || strstr(recv_msg, "failure") != NULL) {
/* show an error here - basically, a "nothing was available"
* message
*/
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true,
(0 == strlen(recv_msg)) ? "NO MSG" : recv_msg);
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALLOC_FAILED);
return;
}
/* break the message into its component parts, separated by colons */
alloc = opal_argv_split(recv_msg, ':');
/* the first section contains the ORTE jobid for this allocation */
tpn = strchr(alloc[0], '=');
orte_util_convert_string_to_jobid(&jobid, tpn+1);
/* get the corresponding job object */
jdata = orte_get_job_data_object(jobid);
jtrk = NULL;
/* find the associated tracking object */
for (item = opal_list_get_first(&jobs);
item != opal_list_get_end(&jobs);
item = opal_list_get_next(item)) {
ptr = (local_jobtracker_t*)item;
if (ptr->jobid == jobid) {
jtrk = ptr;
break;
}
}
if (NULL == jtrk) {
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, "NO JOB TRACKER");
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALLOC_FAILED);
opal_argv_free(alloc);
return;
}
/* stop the timeout event */
opal_event_del(&jtrk->timeout_ev);
/* cycle across all the remaining parts - each is the allocation for
* an app in this job
*/
OBJ_CONSTRUCT(&nds, opal_list_t);
OBJ_CONSTRUCT(&ndtmp, opal_list_t);
idx = -1;
sjob = -1;
nodelist = NULL;
for (i=1; NULL != alloc[i]; i++) {
if (ORTE_SUCCESS != parse_alloc_msg(alloc[i], &idx, &sjob, &nodelist, &tpn)) {
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, jtrk->cmd);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
return;
}
if (idx < 0 || NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, idx))) {
orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, jtrk->cmd);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
opal_argv_free(alloc);
return;
}
/* track the Slurm jobid */
if (NULL == (aptrk = (local_apptracker_t*)opal_pointer_array_get_item(&jtrk->apps, idx))) {
aptrk = OBJ_NEW(local_apptracker_t);
opal_pointer_array_set_item(&jtrk->apps, idx, aptrk);
}
aptrk->sjob = sjob;
/* release the current dash_host as that contained the *desired* allocation */
opal_argv_free(app->dash_host);
app->dash_host = NULL;
/* since the nodelist/tpn may contain regular expressions, parse them */
if (ORTE_SUCCESS != (rc = orte_ras_slurm_discover(nodelist, tpn, &ndtmp))) {
ORTE_ERROR_LOG(rc);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
opal_argv_free(alloc);
return;
}
/* transfer the discovered nodes to our node list, and construct
* the new dash_host entry to match what was allocated
*/
while (NULL != (item = opal_list_remove_first(&ndtmp))) {
nd = (orte_node_t*)item;
opal_argv_append_nosize(&app->dash_host, nd->name);
/* check for duplicates */
found = false;
for (itm = opal_list_get_first(&nds);
itm != opal_list_get_end(&nds);
itm = opal_list_get_next(itm)) {
nd2 = (orte_node_t*)itm;
if (0 == strcmp(nd->name, nd2->name)) {
found = true;
nd2->slots += nd->slots;
OBJ_RELEASE(item);
break;
}
}
if (!found) {
/* append the new node to our list */
opal_list_append(&nds, item);
}
}
/* cleanup */
free(nodelist);
free(tpn);
}
/* cleanup */
opal_argv_free(alloc);
OBJ_DESTRUCT(&ndtmp);
if (opal_list_is_empty(&nds)) {
/* if we get here, then we were able to contact slurm,
* which means we are in an actively managed cluster.
* However, slurm indicated that nothing is currently
* available that meets our requirements. This is a fatal
* situation - we do NOT have the option of running on
* user-specified hosts as the cluster is managed.
*/
OBJ_DESTRUCT(&nds);
orte_show_help("help-ras-base.txt", "ras-base:no-allocation", true);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
}
/* store the found nodes */
if (ORTE_SUCCESS != (rc = orte_ras_base_node_insert(&nds, jdata))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&nds);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
return;
}
OBJ_DESTRUCT(&nds);
/* default to no-oversubscribe-allowed for managed systems */
if (!(ORTE_MAPPING_SUBSCRIBE_GIVEN & ORTE_GET_MAPPING_DIRECTIVE(orte_rmaps_base.mapping))) {
ORTE_SET_MAPPING_DIRECTIVE(orte_rmaps_base.mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
}
/* flag that the allocation is managed */
orte_managed_allocation = true;
/* move the job along */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOCATION_COMPLETE);
/* all done */
return;
}
/* we cannot use the RML to communicate with SLURM as it doesn't
* understand our internal protocol, so we have to do a bare-bones
* exchange based on sockets
*/
static int dyn_allocate(orte_job_t *jdata)
{
char *cmd_str, **cmd=NULL, *tmp, *jstring;
char *node_list;
orte_app_context_t *app;
int i;
struct timeval tv;
local_jobtracker_t *jtrk;
if (NULL == mca_ras_slurm_component.config_file) {
opal_output(0, "Cannot perform dynamic allocation as no Slurm configuration file provided");
return ORTE_ERR_NOT_FOUND;
}
/* track this request */
jtrk = OBJ_NEW(local_jobtracker_t);
jtrk->jobid = jdata->jobid;
opal_list_append(&jobs, &jtrk->super);
/* construct the command - note that the jdata structure contains
* a field for the minimum number of nodes required for the job.
* The node list can be constructed from the union of all the nodes
* contained in the dash_host field of the app_contexts. So you'll
* need to do a little work to build the command. We don't currently
* have a field in the jdata structure for "mandatory" vs "optional"
* allocations, so we'll have to add that someday. Likewise, you may
* want to provide a param to adjust the timeout value
*/
/* construct the cmd string */
opal_argv_append_nosize(&cmd, "allocate");
/* add the jobid */
orte_util_convert_jobid_to_string(&jstring, jdata->jobid);
asprintf(&tmp, "jobid=%s", jstring);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
free(jstring);
/* if we want the allocation for all apps in one shot,
* then tell slurm
*
* RHC: we don't currently have the ability to handle
* rolling allocations in the rest of the code base
*/
#if 0
if (!mca_ras_slurm_component.rolling_alloc) {
opal_argv_append_nosize(&cmd, "return=all");
}
#else
opal_argv_append_nosize(&cmd, "return=all");
#endif
/* pass the timeout */
asprintf(&tmp, "timeout=%d", mca_ras_slurm_component.timeout);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
/* for each app, add its allocation request info */
for (i=0; i < jdata->apps->size; i++) {
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
continue;
}
/* add the app id, preceded by a colon separator */
asprintf(&tmp, ": app=%d", (int)app->idx);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
/* add the number of process "slots" we need */
asprintf(&tmp, "np=%d", app->num_procs);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
/* if we were given a minimum number of nodes, pass it along */
if (0 < app->min_number_of_nodes) {
asprintf(&tmp, "N=%ld", (long int)app->min_number_of_nodes);
opal_argv_append_nosize(&cmd, tmp);
free(tmp);
}
/* add the list of nodes, if one was given, ensuring
* that each node only appears once
*/
node_list = get_node_list(app);
if (NULL != node_list) {
asprintf(&tmp, "node_list=%s", node_list);
opal_argv_append_nosize(&cmd, tmp);
free(node_list);
free(tmp);
}
/* add the mandatory/optional flag */
if (app->mandatory) {
opal_argv_append_nosize(&cmd, "flag=mandatory");
} else {
opal_argv_append_nosize(&cmd, "flag=optional");
}
}
/* assemble it into the final cmd to be sent */
cmd_str = opal_argv_join(cmd, ' ');
opal_argv_free(cmd);
/* start a timer - if the response to our request doesn't appear
* in the defined time, then we will error out as Slurm isn't
* responding to us
*/
opal_event_evtimer_set(orte_event_base, &jtrk->timeout_ev, timeout, jtrk);
tv.tv_sec = mca_ras_slurm_component.timeout * 2;
tv.tv_usec = 0;
opal_event_evtimer_add(&jtrk->timeout_ev, &tv);
opal_output_verbose(2, orte_ras_base_framework.framework_output,
"%s slurm:dynalloc cmd_str = %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cmd_str);
if (send(socket_fd, cmd_str, strlen(cmd_str)+1, 0) < 0) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
}
free(cmd_str);
/* we cannot wait here for a response as we
* are already in an event. So return a value
* that indicates we are waiting for an
* allocation so the base functions know
* that they shouldn't progress the job
*/
return ORTE_ERR_ALLOCATION_PENDING;
}
static int parse_alloc_msg(char *msg, int *idx, int *sjob,
char **nodelist, char **tpn)
{
char *tmp;
char *p_str;
char *pos;
int found=0;
if (msg == NULL || strlen(msg) == 0) {
return ORTE_ERR_BAD_PARAM;
}
tmp = strdup(msg);
p_str = strtok(tmp, " ");
while (p_str) {
if (NULL != strstr(p_str, "slurm_jobid")) {
pos = strchr(p_str, '=');
*sjob = strtol(pos+1, NULL, 10);
found++;
} else if (NULL != strstr(p_str, "allocated_node_list")) {
pos = strchr(p_str, '=');
*nodelist = strdup(pos+1);
found++;
} else if (NULL != strstr(p_str, "tasks_per_node")) {
pos = strchr(p_str, '=');
*tpn = strdup(pos+1);
found++;
} else if (NULL != strstr(p_str, "app")) {
pos = strchr(p_str, '=');
*idx = strtol(pos+1, NULL, 10);
found++;
}
p_str = strtok(NULL, " ");
}
free(tmp);
if (4 != found) {
return ORTE_ERR_NOT_FOUND;
}
return ORTE_SUCCESS;
}
static char* get_node_list(orte_app_context_t *app)
{
int j;
char **total_host = NULL;
char *nodes;
if (NULL == app->dash_host) {
return NULL;
}
for (j=0; NULL != app->dash_host[j]; j++) {
opal_argv_append_unique_nosize(&total_host, app->dash_host[j], false);
}
if (NULL == total_host) {
return NULL;
}
nodes = opal_argv_join(total_host, ',');
opal_argv_free(total_host);
return nodes;
}
static int read_ip_port(char *filename, char **ip, uint16_t *port)
{
FILE *fp;
char line[ORTE_SLURM_DYN_MAX_SIZE];
char *pos;
bool found_port = false;
bool found_ip = false;
if (NULL == (fp = fopen(filename, "r"))) {
orte_show_help("help-ras-slurm.txt", "config-file-not-found", true, filename);
return ORTE_ERR_SILENT;
}
memset(line, 0, ORTE_SLURM_DYN_MAX_SIZE);
while (NULL != fgets(line, ORTE_SLURM_DYN_MAX_SIZE, fp) &&
(!found_ip || !found_port)) {
if (0 == strlen(line)) {
continue;
}
line[strlen(line)-1] = '\0';
if (0 == strncmp(line, "JobSubmitDynAllocPort", strlen("JobSubmitDynAllocPort"))) {
pos = strstr(line, "=") + 1;
*port = strtol(pos, NULL, 10);
found_port = true;
} else if (0 == strncmp(line, "ControlMachine", strlen("ControlMachine"))) {
pos = strstr(line, "=") + 1;
*ip = strdup(pos);
found_ip = true;
}
memset(line, 0, ORTE_SLURM_DYN_MAX_SIZE);
}
fclose(fp);
if (!found_ip) {
opal_output(0, "The IP address or name of the Slurm control machine was not provided");
return ORTE_ERR_NOT_FOUND;
}
if (!found_port) {
opal_output(0, "The IP port of the Slurm dynamic allocation service was not provided");
return ORTE_ERR_NOT_FOUND;
}
return ORTE_SUCCESS;
}