1
1

Restore slurm pmi support from long, long ago. Since we already have the ability to directly srun an MPI job, just conditionally add the PMI support for key values and provide a grpcomm module that uses PMI for barriers and modex.

Currently ompi_ignored, and unignored only for me (others to soon follow).

This commit was SVN r24792.
Этот коммит содержится в:
Ralph Castain 2011-06-20 21:04:46 +00:00
родитель 3d8ef08912
Коммит 92a65f21bf
10 изменённых файлов: 599 добавлений и 5 удалений

Просмотреть файл

@ -24,6 +24,9 @@ AC_DEFUN([ORTE_CHECK_SLURM],[
AC_ARG_WITH([slurm],
[AC_HELP_STRING([--with-slurm],
[Build SLURM scheduler component (default: yes)])])
AC_ARG_WITH([slurm-pmi],
[AC_HELP_STRING([--with-slurm-pmi],
[Build SLURM PMI support (default: no)])])
if test "$with_slurm" = "no" ; then
orte_check_slurm_happy="no"
@ -65,6 +68,28 @@ AC_DEFUN([ORTE_CHECK_SLURM],[
[orte_check_slurm_happy="yes"],
[orte_check_slurm_happy="no"])])
AC_MSG_CHECKING([if user requested PMI support])
orte_enable_slurm_pmi=0
AS_IF([test "$with_slurm_pmi" = "yes"],
[AC_MSG_RESULT([yes])
orte_want_pmi_support=yes
AC_MSG_CHECKING([if SLURM PMI support installed])
AC_CHECK_HEADER([slurm/pmi.h], [orte_have_pmi_support=yes], [orte_have_pmi_support=no])]
AS_IF([test "$orte_have_pmi_support" = "yes"],
[AC_MSG_RESULT([yes])
AC_MSG_WARN([SLURM PMI SUPPORT HAS BEEN INCLUDED - RESULTING])
AC_MSG_WARN([BINARIES ARE SUBJECT TO ADDITIONAL LICENSING])
AC_MSG_WARN([RESTRICTIONS - SEE THE SLURM LICENSE FOR INFO])
orte_enable_slurm_pmi=1],
[AC_MSG_RESULT([no])
AC_MSG_WARN([SLURM PMI support requested (via --with-slurm-pmi) but not found.])
AC_MSG_ERROR([Aborting.])]),
[AC_MSG_RESULT([no])
orte_want_pmi_support=no])
AC_DEFINE_UNQUOTED([WANT_SLURM_PMI_SUPPORT],
[$orte_enable_slurm_pmi],
[Whether we want SLURM PMI support])
AS_IF([test "$orte_check_slurm_happy" = "yes"],
[$2],
[$3])

Просмотреть файл

@ -25,6 +25,10 @@
#include "orte_config.h"
#include "orte/constants.h"
#if WANT_SLURM_PMI_SUPPORT
#include <slurm/pmi.h>
#endif
#include "orte/util/proc_info.h"
#include "orte/mca/ess/ess.h"
@ -78,6 +82,17 @@ int orte_ess_slurmd_component_query(mca_base_module_t **module, int *priority)
NULL != getenv("SLURM_JOBID") &&
NULL != getenv("SLURM_STEPID") &&
NULL == orte_process_info.my_hnp_uri) {
#if WANT_SLURM_PMI_SUPPORT
{
int spawned;
/* if we can't startup the PMI, we can't be used */
if (PMI_SUCCESS != PMI_Init(&spawned)) {
*priority = -1;
*module = NULL;
return ORTE_ERROR;
}
}
#endif
*priority = 30;
*module = (mca_base_module_t *)&orte_ess_slurmd_module;
return ORTE_SUCCESS;

Просмотреть файл

@ -34,7 +34,9 @@
#ifdef HAVE_IFADDRS_H
#include <ifaddrs.h>
#endif
#if WANT_SLURM_PMI_SUPPORT
#include <slurm/pmi.h>
#endif
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
@ -172,13 +174,22 @@ static int rte_init(void)
free(cs_env);
free(string_key);
#if WANT_SLURM_PMI_SUPPORT
/* get our rank from PMI */
if (PMI_SUCCESS != PMI_Get_rank(&i)) {
error = "PMI_Get_rank failed";
goto error;
}
ORTE_PROC_MY_NAME->vpid = i;
#else
/* get the slurm procid - this will be our vpid */
if (NULL == (envar = getenv("SLURM_PROCID"))) {
error = "could not get SLURM_PROCID";
goto error;
}
ORTE_PROC_MY_NAME->vpid = strtol(envar, NULL, 10);
#endif
/* get our local rank */
if (NULL == (envar = getenv("SLURM_LOCALID"))) {
error = "could not get SLURM_LOCALID";
@ -190,17 +201,32 @@ static int rte_init(void)
"%s local rank %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
local_rank));
#if WANT_SLURM_PMI_SUPPORT
if (PMI_SUCCESS != PMI_Get_universe_size(&i)) {
error = "PMI_Get_universe_size failed";
goto error;
}
orte_process_info.num_procs = i;
#else
/* get the number of procs in this job */
if (NULL == (envar = getenv("SLURM_STEP_NUM_TASKS"))) {
error = "could not get SLURM_STEP_NUM_TASKS";
goto error;
}
orte_process_info.num_procs = strtol(envar, NULL, 10);
#endif
#if WANT_SLURM_PMI_SUPPORT
if (PMI_SUCCESS != PMI_Get_appnum(&i)) {
error = "PMI_Get_appnum failed";
goto error;
}
orte_process_info.app_num = i;
#else
/* set the app_num so that MPI attributes get set correctly */
orte_process_info.app_num = 1;
#endif
/* if this is SLURM 2.0 or above, get our port
* assignments for use in the OOB
*/

0
orte/mca/grpcomm/pmi/.ompi_ignore Обычный файл
Просмотреть файл

1
orte/mca/grpcomm/pmi/.ompi_unignore Обычный файл
Просмотреть файл

@ -0,0 +1 @@
rhc

34
orte/mca/grpcomm/pmi/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,34 @@
#
# Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
grpcomm_pmi.h \
grpcomm_pmi_module.c \
grpcomm_pmi_component.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_grpcomm_pmi_DSO
component_noinst =
component_install = mca_grpcomm_pmi.la
else
component_noinst = libmca_grpcomm_pmi.la
component_install =
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_grpcomm_pmi_la_SOURCES = $(sources)
mca_grpcomm_pmi_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_grpcomm_pmi_la_SOURCES =$(sources)
libmca_grpcomm_pmi_la_LDFLAGS = -module -avoid-version

23
orte/mca/grpcomm/pmi/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,23 @@
# -*- shell-script -*-
#
# Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
AC_DEFUN([MCA_orte_grpcomm_pmi_PRIORITY], [10])
# MCA_grpcomm_pmi_CONFIG([action-if-found], [action-if-not-found])
# -----------------------------------------------------------
AC_DEFUN([MCA_orte_grpcomm_pmi_CONFIG], [
AC_CONFIG_FILES([orte/mca/grpcomm/pmi/Makefile])
# Evaluate succeed / fail
AS_IF([test "$orte_enable_slurm_pmi" = "1"],
[$1],
[$2])
])

38
orte/mca/grpcomm/pmi/grpcomm_pmi.h Обычный файл
Просмотреть файл

@ -0,0 +1,38 @@
/* -*- C -*-
*
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#ifndef GRPCOMM_PMI_H
#define GRPCOMM_PMI_H
#include "orte_config.h"
#include "orte/mca/grpcomm/grpcomm.h"
BEGIN_C_DECLS
/*
* Component open / close
*/
int orte_grpcomm_pmi_open(void);
int orte_grpcomm_pmi_close(void);
int orte_grpcomm_pmi_component_query(mca_base_module_t **module, int *priority);
/*
* Grpcomm interfaces
*/
ORTE_MODULE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_pmi_component;
extern orte_grpcomm_base_module_t orte_grpcomm_pmi_module;
END_C_DECLS
#endif

Просмотреть файл

@ -0,0 +1,78 @@
/* -*- C -*-
*
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
* The Open MPI Name Server
*
* The Open MPI Name Server provides unique name ranges for processes
* within the universe. Each universe will have one name server
* running within the seed daemon. This is done to prevent the
* inadvertent duplication of names.
*/
/*
* includes
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_param.h"
#include "grpcomm_pmi.h"
/*
* Struct of function pointers that need to be initialized
*/
orte_grpcomm_base_component_t mca_grpcomm_pmi_component = {
{
ORTE_GRPCOMM_BASE_VERSION_2_0_0,
"pmi", /* MCA module name */
ORTE_MAJOR_VERSION, /* MCA module major version */
ORTE_MINOR_VERSION, /* MCA module minor version */
ORTE_RELEASE_VERSION, /* MCA module release version */
orte_grpcomm_pmi_open, /* module open */
orte_grpcomm_pmi_close, /* module close */
orte_grpcomm_pmi_component_query /* module query */
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
}
};
/* Open the component */
int orte_grpcomm_pmi_open(void)
{
return ORTE_SUCCESS;
}
int orte_grpcomm_pmi_close(void)
{
return ORTE_SUCCESS;
}
int orte_grpcomm_pmi_component_query(mca_base_module_t **module, int *priority)
{
/*** see if we can run */
if (NULL == getenv("SLURM_JOBID")) {
*module = NULL;
return ORTE_ERROR;
}
/* we are a default, so set a low priority so we can be overridden */
*priority = 10;
*module = (mca_base_module_t *)&orte_grpcomm_pmi_module;
return ORTE_SUCCESS;
}

354
orte/mca/grpcomm/pmi/grpcomm_pmi_module.c Обычный файл
Просмотреть файл

@ -0,0 +1,354 @@
/*
* Copyright (c) 2007 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <string.h>
#include <slurm/pmi.h>
#include "orte/util/proc_info.h"
#include "opal/dss/dss.h"
#include "opal/util/opal_sos.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/routed/routed.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/orted/orted.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/grpcomm/base/base.h"
#include "grpcomm_pmi.h"
/* Static API's */
static int init(void);
static void finalize(void);
static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int pmi_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
static int pmi_allgather_list(opal_list_t *names,
opal_buffer_t *sbuf, opal_buffer_t *rbuf);
static int pmi_barrier(void);
static int pmi_set_proc_attr(const char* attr_name,
const void *buffer, size_t size);
static int pmi_get_proc_attr(const orte_process_name_t name,
const char* attr_name,
void **buffer, size_t *size);
static int modex(opal_list_t *procs);
static int purge_proc_attrs(void);
/* Module def */
orte_grpcomm_base_module_t orte_grpcomm_pmi_module = {
init,
finalize,
xcast,
pmi_allgather,
pmi_allgather_list,
pmi_barrier,
pmi_set_proc_attr,
pmi_get_proc_attr,
modex,
purge_proc_attrs
};
/* useful util */
static char* orte_pmi_error(int pmi_err) {
char * err_msg;
switch(pmi_err) {
case PMI_FAIL: err_msg = "Operation failed"; break;
case PMI_ERR_INIT: err_msg = "PMI is not initialized"; break;
case PMI_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
case PMI_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
case PMI_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
case PMI_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
case PMI_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
case PMI_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
case PMI_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
case PMI_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
case PMI_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
case PMI_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
case PMI_ERR_INVALID_KEYVALP: err_msg = "Invalid invalid keyvalp atgument"; break;
case PMI_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
case PMI_ERR_INVALID_KVS: err_msg = "Invalid kvs argument"; break;
case PMI_SUCCESS: err_msg = "Success"; break;
default: err_msg = "Unkown error";
}
return err_msg;
}
static char* pmi_encode(const void *val, size_t vallen);
static void* pmi_decode(unsigned char *val, size_t *retlen);
/* Local variables */
static char *pmi_kvs_name = NULL;
static int pmi_vallen_max = -1;
#define ORTE_PMI_ERROR(pmi_err, pmi_func) \
do { \
opal_output(0, "%s[%s:%d:%s] %s: %s\n", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, __func__, \
pmi_func, orte_pmi_error(pmi_err)); \
} while(0);
/**
* Initialize the module
*/
static int init(void)
{
return ORTE_SUCCESS;
}
/**
* Finalize the module
*/
static void finalize(void)
{
return;
}
/**
* A "broadcast-like" function to a job's processes.
* @param jobid The job whose processes are to receive the message
* @param buffer The data to broadcast
*/
static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
/* not used in this module */
return ORTE_ERR_NOT_SUPPORTED;
}
static int pmi_barrier(void)
{
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:pmi entering barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if I am alone, just return */
if (1 == orte_process_info.num_procs) {
return ORTE_SUCCESS;
}
/* use the PMI barrier function */
if (PMI_SUCCESS != (rc = PMI_Barrier())) {
ORTE_PMI_ERROR(rc, "PMI_Barrier");
return ORTE_ERROR;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
"%s grpcomm:pmi barrier complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
static int pmi_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
/* not used in this implementation */
return ORTE_ERR_NOT_SUPPORTED;
}
static int pmi_allgather_list(opal_list_t *names,
opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
/* no idea how to do this - only occurs for comm_spawn,
* which this module doesn't support
*/
return ORTE_ERR_NOT_SUPPORTED;
}
static int pmi_set_proc_attr(const char* attr_name,
const void *buffer, size_t size)
{
char *attr, *attrval;
int rc;
if (NULL == pmi_kvs_name) {
int max_length;
rc = PMI_KVS_Get_value_length_max(&pmi_vallen_max);
if (PMI_SUCCESS != rc) {
ORTE_PMI_ERROR(rc, "PMI_Get_value_length_max");
return ORTE_ERROR;
}
if (PMI_SUCCESS != (rc = PMI_KVS_Get_name_length_max(&max_length))) {
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_name_length_max");
return ORTE_ERROR;
}
pmi_kvs_name = malloc(max_length);
if (NULL == pmi_kvs_name) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
rc = PMI_KVS_Get_my_name(pmi_kvs_name,max_length);
if (PMI_SUCCESS != rc) {
ORTE_PMI_ERROR(rc, "PMI_KVS_Get_my_name");
return ORTE_ERROR;
}
}
if (0 > asprintf(&attr, "%s-%s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr_name)) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
attrval = pmi_encode(buffer, size);
if (NULL == attrval) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (strlen(attrval) > (size_t)pmi_vallen_max) {
opal_output(0, "pmi_proc_set_attr: attribute length is too long\n");
return ORTE_ERROR;
}
rc = PMI_KVS_Put(pmi_kvs_name, attr, attrval);
if (PMI_SUCCESS != rc) {
ORTE_PMI_ERROR(rc, "PMI_KVS_Put");
return ORTE_ERROR;
}
free(attr);
free(attrval);
return ORTE_SUCCESS;
}
static int pmi_get_proc_attr(const orte_process_name_t name,
const char* attr_name,
void **buffer, size_t *size)
{
char *attrval, *attr;
int rc;
if (NULL == pmi_kvs_name) {
return ORTE_ERR_UNREACH;
}
attrval = malloc(pmi_vallen_max);
if (NULL == attrval) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (0 > asprintf(&attr, "%s-%s", ORTE_NAME_PRINT(&name), attr_name)) {
free(attrval);
return ORTE_ERR_OUT_OF_RESOURCE;
}
rc = PMI_KVS_Get(pmi_kvs_name, attr, attrval, pmi_vallen_max);
if (PMI_SUCCESS != rc) {
ORTE_PMI_ERROR(rc, "PMI_KVS_Get");
free(attrval);
free(attr);
return ORTE_ERROR;
}
*buffer = pmi_decode((unsigned char *)attrval, size);
free(attrval);
free(attr);
if (NULL == buffer) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
return ORTE_SUCCESS;
}
/*** MODEX SECTION ***/
static int modex(opal_list_t *procs)
{
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:pmi: modex entered",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* commit our modex info */
if (PMI_SUCCESS != (rc = PMI_KVS_Commit(pmi_kvs_name))) {
ORTE_PMI_ERROR(rc, "PMI_KVS_Commit failed");
return ORTE_ERROR;
}
/* Barrier here to ensure all other procs have committed */
if (ORTE_SUCCESS != (rc = pmi_barrier())) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
"%s grpcomm:pmi: modex completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int purge_proc_attrs(void)
{
/* nothing to do here */
return ORTE_SUCCESS;
}
/* PMI only supports strings. For now, do a simple base16
* encoding. Should do something smarter, both with the
* algorith used and its implementation. */
static char* pmi_encode(const void *val, size_t vallen) {
static unsigned char encodings[] = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
size_t i;
unsigned char *ret = malloc(vallen *2 +1);
if (NULL == ret) {
return NULL;
}
for (i = 0; i < vallen; i++) {
ret[2 * i] = encodings[((unsigned char *)val)[i] & 0xf];
ret[2 * i + 1] = encodings[((unsigned char *)val)[i] >> 4];
}
ret[vallen *2] = '\0';
return (char *)ret;
}
static void* pmi_decode(unsigned char *val, size_t *retlen) {
unsigned char *ret;
size_t i;
*retlen = strlen((char*)val)/2;
ret = malloc(*retlen);
if (NULL == ret) {
return ret;
}
for (i = 0; i < *retlen; i++) {
if (*val >= '0' && *val <= '9') {
ret[i] = *val - '0';
} else {
ret[i] = *val - 'a' + 10;
}
val++;
if (*val >= '0' && *val <= '9') {
ret[i] |= ((*val - '0') << 4);
} else {
ret[i] |= ((*val - 'a' + 10) << 4);
}
val++;
}
return ret;
}