1
1

Enable the PMIx ompi/rte component

Get the OMPI rte/pmix component working. This was tested using PRRTE as the RM, configuring OMPI using:

* autogen --no-orte

* with external libevent, external hwloc, and external PMIx master

* configuring PMIx master with the same libevent and hwloc

* execute the application using PRRTE's "prun" launcher, which has the same cmd line as ORTE's mpirun

Note that PMIx master appears to have a bug in the event notification system that caches job termination events. Thus, the first execution runs fine, but subsequent executions cause an "abort" when the OMPI default error handler is invoked upon notification of the prior job's termination. Will work that separately.

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
(cherry picked from commit 134cca9ac0de092d767999357573a31703f72292)
Этот коммит содержится в:
Ralph Castain 2018-06-02 16:38:36 -07:00
родитель 6df6d6a7f3
Коммит 55ac526a67
3 изменённых файлов: 370 добавлений и 78 удалений

Просмотреть файл

@ -1,7 +1,7 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
@ -49,10 +49,9 @@ typedef uint32_t ompi_vpid_t;
/* some local storage */
OMPI_DECLSPEC extern opal_process_name_t pmix_name_wildcard;
OMPI_DECLSPEC extern opal_process_name_t pmix_proc_my_name;
OMPI_DECLSPEC extern hwloc_cpuset_t ompi_proc_applied_binding;
#define OMPI_PROC_MY_NAME (&pmix_proc_my_name)
#define OMPI_PROC_MY_NAME (&pmix_process_info.my_name)
#define OMPI_NAME_WILDCARD (&pmix_name_wildcard)
typedef uint8_t ompi_rte_cmp_bitmask_t;
@ -62,7 +61,9 @@ typedef uint8_t ompi_rte_cmp_bitmask_t;
#define OMPI_RTE_CMP_ALL 0x04
#define OMPI_RTE_CMP_WILD 0x10
#define OMPI_NAME_PRINT(a) OPAL_NAME_PRINT((*(a)))
OMPI_DECLSPEC char* ompi_pmix_print_name(const ompi_process_name_t *name);
#define OMPI_NAME_PRINT(a) ompi_pmix_print_name(a)
OMPI_DECLSPEC int ompi_rte_compare_name_fields(ompi_rte_cmp_bitmask_t mask,
const opal_process_name_t* name1,
const opal_process_name_t* name2);
@ -71,11 +72,17 @@ OMPI_DECLSPEC int ompi_rte_convert_string_to_process_name(opal_process_name_t *n
OMPI_DECLSPEC int ompi_rte_convert_process_name_to_string(char** name_string,
const opal_process_name_t *name);
#define OMPI_LOCAL_JOBID(jobid) jobid
#define OMPI_JOB_FAMILY(jobid) 0
/* do a little with the "family" param to avoid compiler warnings */
#define OMPI_CONSTRUCT_JOBID(family,local) \
((family & 0x0000) | local)
#define OMPI_LOCAL_JOBID(n) \
( (n) & 0x0000ffff)
#define OMPI_JOB_FAMILY(n) \
(((n) >> 16) & 0x0000ffff)
#define OMPI_CONSTRUCT_LOCAL_JOBID(local, job) \
( ((local) & 0xffff0000) | ((job) & 0x0000ffff) )
#define OMPI_CONSTRUCT_JOB_FAMILY(n) \
( ((n) << 16) & 0xffff0000)
#define OMPI_CONSTRUCT_JOBID(family, local) \
OMPI_CONSTRUCT_LOCAL_JOBID(OMPI_CONSTRUCT_JOB_FAMILY(family), local)
/* This is the DSS tag to serialize a proc name */
#define OMPI_NAME OPAL_NAME

Просмотреть файл

@ -1,7 +1,7 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2012-2014 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
@ -34,11 +34,13 @@
#include "opal/util/opal_getcwd.h"
#include "opal/util/os_path.h"
#include "opal/util/os_dirpath.h"
#include "opal/util/printf.h"
#include "opal/util/proc.h"
#include "opal/util/show_help.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/mca/pmix/base/base.h"
#include "opal/threads/threads.h"
#include "opal/threads/tsd.h"
#include "opal/class/opal_list.h"
#include "opal/dss/dss.h"
@ -57,7 +59,6 @@ extern ompi_rte_component_t mca_rte_pmix_component;
/* storage to support OMPI */
opal_process_name_t pmix_name_wildcard = {UINT32_MAX-1, UINT32_MAX-1};
opal_process_name_t pmix_name_invalid = {UINT32_MAX, UINT32_MAX};
opal_process_name_t pmix_proc_my_name = {0, 0};
hwloc_cpuset_t ompi_proc_applied_binding = NULL;
pmix_process_info_t pmix_process_info = {0};
bool pmix_proc_is_bound = false;
@ -69,11 +70,175 @@ static bool added_app_ctx = false;
static char* pre_condition_transports_print(uint64_t *unique_key);
static int _setup_job_session_dir(char **sdir);
#define ORTE_SCHEMA_DELIMITER_CHAR '.'
#define ORTE_SCHEMA_WILDCARD_CHAR '*'
#define ORTE_SCHEMA_WILDCARD_STRING "*"
#define ORTE_SCHEMA_INVALID_CHAR '$'
#define ORTE_SCHEMA_INVALID_STRING "$"
#define OPAL_SCHEMA_DELIMITER_CHAR '.'
#define OPAL_SCHEMA_WILDCARD_CHAR '*'
#define OPAL_SCHEMA_WILDCARD_STRING "*"
#define OPAL_SCHEMA_INVALID_CHAR '$'
#define OPAL_SCHEMA_INVALID_STRING "$"
#define OPAL_PRINT_NAME_ARGS_MAX_SIZE 50
#define OPAL_PRINT_NAME_ARG_NUM_BUFS 16
static bool fns_init=false;
static opal_tsd_key_t print_args_tsd_key;
static char* opal_print_args_null = "NULL";
typedef struct {
char *buffers[OPAL_PRINT_NAME_ARG_NUM_BUFS];
int cntr;
} opal_print_args_buffers_t;
static void
buffer_cleanup(void *value)
{
int i;
opal_print_args_buffers_t *ptr;
if (NULL != value) {
ptr = (opal_print_args_buffers_t*)value;
for (i=0; i < OPAL_PRINT_NAME_ARG_NUM_BUFS; i++) {
free(ptr->buffers[i]);
}
free (ptr);
}
}
static opal_print_args_buffers_t*
get_print_name_buffer(void)
{
opal_print_args_buffers_t *ptr;
int ret, i;
if (!fns_init) {
/* setup the print_args function */
if (OPAL_SUCCESS != (ret = opal_tsd_key_create(&print_args_tsd_key, buffer_cleanup))) {
OPAL_ERROR_LOG(ret);
return NULL;
}
fns_init = true;
}
ret = opal_tsd_getspecific(print_args_tsd_key, (void**)&ptr);
if (OPAL_SUCCESS != ret) return NULL;
if (NULL == ptr) {
ptr = (opal_print_args_buffers_t*)malloc(sizeof(opal_print_args_buffers_t));
for (i=0; i < OPAL_PRINT_NAME_ARG_NUM_BUFS; i++) {
ptr->buffers[i] = (char *) malloc((OPAL_PRINT_NAME_ARGS_MAX_SIZE+1) * sizeof(char));
}
ptr->cntr = 0;
ret = opal_tsd_setspecific(print_args_tsd_key, (void*)ptr);
}
return (opal_print_args_buffers_t*) ptr;
}
static char* ompi_pmix_print_jobids(const opal_jobid_t job)
{
opal_print_args_buffers_t *ptr;
unsigned long tmp1, tmp2;
ptr = get_print_name_buffer();
if (NULL == ptr) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return opal_print_args_null;
}
/* cycle around the ring */
if (OPAL_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
if (OPAL_JOBID_INVALID == job) {
snprintf(ptr->buffers[ptr->cntr++], OPAL_PRINT_NAME_ARGS_MAX_SIZE, "[INVALID]");
} else if (OPAL_JOBID_WILDCARD == job) {
snprintf(ptr->buffers[ptr->cntr++], OPAL_PRINT_NAME_ARGS_MAX_SIZE, "[WILDCARD]");
} else {
tmp1 = OMPI_JOB_FAMILY((unsigned long)job);
tmp2 = OMPI_LOCAL_JOBID((unsigned long)job);
snprintf(ptr->buffers[ptr->cntr++],
OPAL_PRINT_NAME_ARGS_MAX_SIZE,
"[%lu,%lu]", tmp1, tmp2);
}
return ptr->buffers[ptr->cntr-1];
}
static char* ompi_pmix_print_vpids(const opal_vpid_t vpid)
{
opal_print_args_buffers_t *ptr;
ptr = get_print_name_buffer();
if (NULL == ptr) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return opal_print_args_null;
}
/* cycle around the ring */
if (OPAL_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
if (OPAL_VPID_INVALID == vpid) {
snprintf(ptr->buffers[ptr->cntr++], OPAL_PRINT_NAME_ARGS_MAX_SIZE, "INVALID");
} else if (OPAL_VPID_WILDCARD == vpid) {
snprintf(ptr->buffers[ptr->cntr++], OPAL_PRINT_NAME_ARGS_MAX_SIZE, "WILDCARD");
} else {
snprintf(ptr->buffers[ptr->cntr++],
OPAL_PRINT_NAME_ARGS_MAX_SIZE,
"%ld", (long)vpid);
}
return ptr->buffers[ptr->cntr-1];
}
char* ompi_pmix_print_name(const ompi_process_name_t *name)
{
opal_print_args_buffers_t *ptr;
char *job, *vpid;
/* protect against NULL names */
if (NULL == name) {
/* get the next buffer */
ptr = get_print_name_buffer();
if (NULL == ptr) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return opal_print_args_null;
}
/* cycle around the ring */
if (OPAL_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
snprintf(ptr->buffers[ptr->cntr++], OPAL_PRINT_NAME_ARGS_MAX_SIZE, "[NO-NAME]");
return ptr->buffers[ptr->cntr-1];
}
/* get the jobid, vpid strings first - this will protect us from
* stepping on each other's buffer. This also guarantees
* that the print_args function has been initialized, so
* we don't need to duplicate that here
*/
job = ompi_pmix_print_jobids(name->jobid);
vpid = ompi_pmix_print_vpids(name->vpid);
/* get the next buffer */
ptr = get_print_name_buffer();
if (NULL == ptr) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return opal_print_args_null;
}
/* cycle around the ring */
if (OPAL_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
snprintf(ptr->buffers[ptr->cntr++],
OPAL_PRINT_NAME_ARGS_MAX_SIZE,
"[%s,%s]", job, vpid);
return ptr->buffers[ptr->cntr-1];
}
int ompi_rte_compare_name_fields(ompi_rte_cmp_bitmask_t fields,
const opal_process_name_t* name1,
@ -154,7 +319,7 @@ int ompi_rte_convert_string_to_process_name(opal_process_name_t *name,
}
temp = strdup(name_string); /** copy input string as the strtok process is destructive */
token = strchr(temp, ORTE_SCHEMA_DELIMITER_CHAR); /** get first field -> jobid */
token = strchr(temp, OPAL_SCHEMA_DELIMITER_CHAR); /** get first field -> jobid */
/* check for error */
if (NULL == token) {
@ -168,9 +333,9 @@ int ompi_rte_convert_string_to_process_name(opal_process_name_t *name,
/* check for WILDCARD character - assign
* value accordingly, if found
*/
if (0 == strcmp(temp, ORTE_SCHEMA_WILDCARD_STRING)) {
if (0 == strcmp(temp, OPAL_SCHEMA_WILDCARD_STRING)) {
job = pmix_name_wildcard.jobid;
} else if (0 == strcmp(temp, ORTE_SCHEMA_INVALID_STRING)) {
} else if (0 == strcmp(temp, OPAL_SCHEMA_INVALID_STRING)) {
job = pmix_name_invalid.jobid;
} else {
job = strtoul(temp, NULL, 10);
@ -179,9 +344,9 @@ int ompi_rte_convert_string_to_process_name(opal_process_name_t *name,
/* check for WILDCARD character - assign
* value accordingly, if found
*/
if (0 == strcmp(token, ORTE_SCHEMA_WILDCARD_STRING)) {
if (0 == strcmp(token, OPAL_SCHEMA_WILDCARD_STRING)) {
vpid = pmix_name_wildcard.vpid;
} else if (0 == strcmp(token, ORTE_SCHEMA_INVALID_STRING)) {
} else if (0 == strcmp(token, OPAL_SCHEMA_INVALID_STRING)) {
vpid = pmix_name_invalid.vpid;
} else {
vpid = strtoul(token, NULL, 10);
@ -210,19 +375,19 @@ int ompi_rte_convert_process_name_to_string(char** name_string,
* it is passed back to us later
*/
if (pmix_name_wildcard.jobid == name->jobid) {
asprintf(&tmp, "%s", ORTE_SCHEMA_WILDCARD_STRING);
asprintf(&tmp, "%s", OPAL_SCHEMA_WILDCARD_STRING);
} else if (pmix_name_invalid.jobid == name->jobid) {
asprintf(&tmp, "%s", ORTE_SCHEMA_INVALID_STRING);
asprintf(&tmp, "%s", OPAL_SCHEMA_INVALID_STRING);
} else {
asprintf(&tmp, "%lu", (unsigned long)name->jobid);
}
if (pmix_name_wildcard.vpid == name->vpid) {
asprintf(&tmp2, "%s%c%s", tmp, ORTE_SCHEMA_DELIMITER_CHAR, ORTE_SCHEMA_WILDCARD_STRING);
asprintf(&tmp2, "%s%c%s", tmp, OPAL_SCHEMA_DELIMITER_CHAR, OPAL_SCHEMA_WILDCARD_STRING);
} else if (pmix_name_invalid.vpid == name->vpid) {
asprintf(&tmp2, "%s%c%s", tmp, ORTE_SCHEMA_DELIMITER_CHAR, ORTE_SCHEMA_INVALID_STRING);
asprintf(&tmp2, "%s%c%s", tmp, OPAL_SCHEMA_DELIMITER_CHAR, OPAL_SCHEMA_INVALID_STRING);
} else {
asprintf(&tmp2, "%s%c%lu", tmp, ORTE_SCHEMA_DELIMITER_CHAR, (unsigned long)name->vpid);
asprintf(&tmp2, "%s%c%lu", tmp, OPAL_SCHEMA_DELIMITER_CHAR, (unsigned long)name->vpid);
}
asprintf(name_string, "%s", tmp2);
@ -233,12 +398,103 @@ int ompi_rte_convert_process_name_to_string(char** name_string,
return OPAL_SUCCESS;
}
static int ompi_pmix_convert_string_to_jobid(opal_jobid_t *jobid, const char* jobidstring)
{
if (NULL == jobidstring) { /* got an error */
OPAL_ERROR_LOG(OPAL_ERR_BAD_PARAM);
*jobid = OPAL_JOBID_INVALID;
return OPAL_ERR_BAD_PARAM;
}
/** check for wildcard character - handle appropriately */
if (0 == strcmp(OPAL_SCHEMA_WILDCARD_STRING, jobidstring)) {
*jobid = OPAL_JOBID_WILDCARD;
return OPAL_SUCCESS;
}
/* check for invalid value */
if (0 == strcmp(OPAL_SCHEMA_INVALID_STRING, jobidstring)) {
*jobid = OPAL_JOBID_INVALID;
return OPAL_SUCCESS;
}
*jobid = strtoul(jobidstring, NULL, 10);
return OPAL_SUCCESS;
}
static int ompi_pmix_snprintf_jobid(char *jobid_string, size_t size, const opal_jobid_t jobid)
{
int rc;
/* check for wildcard value - handle appropriately */
if (OPAL_JOBID_WILDCARD == jobid) {
(void)strncpy(jobid_string, OPAL_SCHEMA_WILDCARD_STRING, size);
} else {
rc = snprintf(jobid_string, size, "%ld", (long) jobid);
if (0 > rc) {
return OPAL_ERROR;
}
}
return OPAL_SUCCESS;
}
/**
* Static functions used to configure the interactions between the OPAL and
* the runtime.
*/
static char*
_process_name_print_for_opal(const opal_process_name_t procname)
{
ompi_process_name_t* rte_name = (ompi_process_name_t*)&procname;
return ompi_pmix_print_name(rte_name);
}
static char*
_jobid_print_for_opal(const opal_jobid_t jobid)
{
return ompi_pmix_print_jobids(jobid);
}
static char*
_vpid_print_for_opal(const opal_vpid_t vpid)
{
return ompi_pmix_print_vpids(vpid);
}
static int
_process_name_compare(const opal_process_name_t p1, const opal_process_name_t p2)
{
return ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL, &p1, &p2);
}
static int _convert_string_to_process_name(opal_process_name_t *name,
const char* name_string)
{
return ompi_rte_convert_string_to_process_name(name, name_string);
}
static int _convert_process_name_to_string(char** name_string,
const opal_process_name_t *name)
{
return ompi_rte_convert_process_name_to_string(name_string, name);
}
static int
_convert_string_to_jobid(opal_jobid_t *jobid, const char *jobid_string)
{
return ompi_pmix_convert_string_to_jobid(jobid, jobid_string);
}
int ompi_rte_init(int *pargc, char ***pargv)
{
int ret;
char *error = NULL;
opal_process_name_t pname;
opal_proc_t *myname;
opal_proc_t *myproc;
int u32, *u32ptr;
uint16_t u16, *u16ptr;
char **peers=NULL, *mycpuset;
@ -253,6 +509,16 @@ int ompi_rte_init(int *pargc, char ***pargv)
u16ptr = &u16;
memset(&pmix_process_info, 0, sizeof(pmix_process_info));
/* Convince OPAL to use our naming scheme */
opal_process_name_print = _process_name_print_for_opal;
opal_vpid_print = _vpid_print_for_opal;
opal_jobid_print = _jobid_print_for_opal;
opal_compare_proc = _process_name_compare;
opal_convert_string_to_process_name = _convert_string_to_process_name;
opal_convert_process_name_to_string = _convert_process_name_to_string;
opal_snprintf_jobid = ompi_pmix_snprintf_jobid;
opal_convert_string_to_jobid = _convert_string_to_jobid;
/* initialize the opal layer */
if (OPAL_SUCCESS != (ret = opal_init(pargc, pargv))) {
error = "opal_init";
@ -284,14 +550,15 @@ int ompi_rte_init(int *pargc, char ***pargv)
}
/* opal_pmix.init will have filled in proc name fields in
* OPAL, so transfer them here */
myname = opal_proc_local_get();
pmix_proc_my_name = myname->proc_name;
pmix_process_info.my_name.jobid = OPAL_PROC_MY_NAME.jobid;
pmix_process_info.my_name.vpid = OPAL_PROC_MY_NAME.vpid;
/* get our hostname */
pmix_process_info.nodename = opal_get_proc_hostname(myname);
myproc = opal_proc_local_get();
pmix_process_info.nodename = opal_get_proc_hostname(myproc);
/* get our local rank from PMI */
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCAL_RANK,
&pmix_proc_my_name, &u16ptr, OPAL_UINT16);
&pmix_process_info.my_name, &u16ptr, OPAL_UINT16);
if (OPAL_SUCCESS != ret) {
error = "getting local rank";
goto error;
@ -300,7 +567,7 @@ int ompi_rte_init(int *pargc, char ***pargv)
/* get our node rank from PMI */
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_NODE_RANK,
&pmix_proc_my_name, &u16ptr, OPAL_UINT16);
&pmix_process_info.my_name, &u16ptr, OPAL_UINT16);
if (OPAL_SUCCESS != ret) {
error = "getting node rank";
goto error;
@ -308,8 +575,10 @@ int ompi_rte_init(int *pargc, char ***pargv)
pmix_process_info.my_node_rank = u16;
/* get job size */
pname.jobid = pmix_process_info.my_name.jobid;
pname.vpid = OPAL_VPID_WILDCARD;
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_JOB_SIZE,
&pmix_name_wildcard, &u32ptr, OPAL_UINT32);
&pname, &u32ptr, OPAL_UINT32);
if (OPAL_SUCCESS != ret) {
error = "getting job size";
goto error;
@ -319,8 +588,8 @@ int ompi_rte_init(int *pargc, char ***pargv)
/* push into the environ for pickup in MPI layer for
* MPI-3 required info key
*/
if (NULL == getenv(OPAL_MCA_PREFIX"orte_ess_num_procs")) {
asprintf(&ev1, OPAL_MCA_PREFIX"orte_ess_num_procs=%d", pmix_process_info.num_procs);
if (NULL == getenv(OPAL_MCA_PREFIX"opal_ess_num_procs")) {
asprintf(&ev1, OPAL_MCA_PREFIX"opal_ess_num_procs=%d", pmix_process_info.num_procs);
putenv(ev1);
added_num_procs = true;
}
@ -332,7 +601,7 @@ int ompi_rte_init(int *pargc, char ***pargv)
/* get our app number from PMI - ok if not found */
OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_APPNUM,
&pmix_proc_my_name, &u32ptr, OPAL_UINT32);
&pmix_process_info.my_name, &u32ptr, OPAL_UINT32);
if (OPAL_SUCCESS == ret) {
pmix_process_info.app_num = u32;
} else {
@ -342,7 +611,7 @@ int ompi_rte_init(int *pargc, char ***pargv)
/* get the number of local peers - required for wireup of
* shared memory BTL */
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCAL_SIZE,
&pmix_name_wildcard, &u32ptr, OPAL_UINT32);
&pname, &u32ptr, OPAL_UINT32);
if (OPAL_SUCCESS == ret) {
pmix_process_info.num_local_peers = u32 - 1; // want number besides ourselves
} else {
@ -353,17 +622,17 @@ int ompi_rte_init(int *pargc, char ***pargv)
* we can use the jobfam and stepid as unique keys
* because they are unique values assigned by the RM
*/
if (NULL == getenv(OPAL_MCA_PREFIX"orte_precondition_transports")) {
unique_key[0] = (pmix_proc_my_name.jobid & 0xff00) >> 16;
unique_key[1] = pmix_proc_my_name.jobid & 0x00ff;
if (NULL == getenv(OPAL_MCA_PREFIX"opal_precondition_transports")) {
unique_key[0] = (pmix_process_info.my_name.jobid & 0xff00) >> 16;
unique_key[1] = pmix_process_info.my_name.jobid & 0x00ff;
if (NULL == (string_key = pre_condition_transports_print(unique_key))) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return OPAL_ERR_OUT_OF_RESOURCE;
}
opal_output_verbose(2, ompi_rte_base_framework.framework_output,
"%s transport key %s",
OPAL_NAME_PRINT(pmix_proc_my_name), string_key);
asprintf(&envar, OPAL_MCA_PREFIX"orte_precondition_transports=%s", string_key);
OPAL_NAME_PRINT(pmix_process_info.my_name), string_key);
asprintf(&envar, OPAL_MCA_PREFIX"opal_precondition_transports=%s", string_key);
putenv(envar);
added_transport_keys = true;
/* cannot free the envar as that messes up our environ */
@ -371,7 +640,7 @@ int ompi_rte_init(int *pargc, char ***pargv)
}
/* retrieve temp directories info */
OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_NSDIR, &pmix_name_wildcard, &val, OPAL_STRING);
OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_NSDIR, &pname, &val, OPAL_STRING);
if (OPAL_SUCCESS == ret && NULL != val) {
pmix_process_info.job_session_dir = val;
val = NULL;
@ -394,7 +663,7 @@ int ompi_rte_init(int *pargc, char ***pargv)
}
/* retrieve the local peers */
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCAL_PEERS,
&pmix_name_wildcard, &val, OPAL_STRING);
&pname, &val, OPAL_STRING);
if (OPAL_SUCCESS == ret && NULL != val) {
peers = opal_argv_split(val, ',');
free(val);
@ -410,16 +679,16 @@ int ompi_rte_init(int *pargc, char ***pargv)
/* identify our location */
val = NULL;
OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_LOCALITY_STRING,
&pmix_proc_my_name, &val, OPAL_STRING);
&pmix_process_info.my_name, &val, OPAL_STRING);
if (OPAL_SUCCESS == ret && NULL != val) {
mycpuset = val;
} else {
mycpuset = NULL;
}
pname.jobid = pmix_proc_my_name.jobid;
pname.jobid = pmix_process_info.my_name.jobid;
for (i=0; NULL != peers[i]; i++) {
pname.vpid = strtoul(peers[i], NULL, 10);
if (pname.vpid == pmix_proc_my_name.vpid) {
if (pname.vpid == pmix_process_info.my_name.vpid) {
/* we are fully local to ourselves */
u16 = OPAL_PROC_ALL_LOCAL;
} else {
@ -439,7 +708,7 @@ int ompi_rte_init(int *pargc, char ***pargv)
kv->type = OPAL_UINT16;
OPAL_OUTPUT_VERBOSE((1, ompi_rte_base_framework.framework_output,
"%s locality: proc %s locality %s",
OPAL_NAME_PRINT(pmix_proc_my_name),
OPAL_NAME_PRINT(pmix_process_info.my_name),
OPAL_NAME_PRINT(pname), opal_hwloc_base_print_locality(u16)));
kv->data.uint16 = u16;
ret = opal_pmix.store_local(&pname, kv);
@ -514,10 +783,10 @@ int ompi_rte_finalize(void)
* so we leave that structure intact
*/
if (added_transport_keys) {
unsetenv(OPAL_MCA_PREFIX"orte_precondition_transports");
unsetenv(OPAL_MCA_PREFIX"opal_precondition_transports");
}
if (added_num_procs) {
unsetenv(OPAL_MCA_PREFIX"orte_ess_num_procs");
unsetenv(OPAL_MCA_PREFIX"opal_ess_num_procs");
}
if (added_app_ctx) {
unsetenv("OMPI_APP_CTX_NUM_PROCS");
@ -750,7 +1019,7 @@ static int _setup_job_session_dir(char **sdir)
"%s/ompi.%s.%lu/jf.0/%u", tmpdir,
pmix_process_info.nodename,
(unsigned long)uid,
pmix_proc_my_name.jobid)) {
pmix_process_info.my_name.jobid)) {
pmix_process_info.job_session_dir = NULL;
return OPAL_ERR_OUT_OF_RESOURCE;
}

Просмотреть файл

@ -239,6 +239,45 @@ static void return_local_event_hdlr(int status, opal_list_t *results,
}
}
/* process the notification */
static void process_event(int sd, short args, void *cbdata)
{
pmix3x_threadshift_t *cd = (pmix3x_threadshift_t*)cbdata;
opal_pmix3x_event_t *event;
OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
/* cycle thru the registrations */
OPAL_LIST_FOREACH(event, &mca_pmix_pmix3x_component.events, opal_pmix3x_event_t) {
if (cd->id == event->index) {
/* found it - invoke the handler, pointing its
* callback function to our callback function */
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s _EVENT_HDLR CALLING EVHDLR",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
if (NULL != event->handler) {
OBJ_RETAIN(event);
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
event->handler(cd->status, &cd->pname,
cd->info, &cd->results,
return_local_event_hdlr, cd);
OBJ_RELEASE(event);
return;
}
}
}
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
/* if we didn't find a match, we still have to call their final callback */
if (NULL != cd->pmixcbfunc) {
cd->pmixcbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cd->cbdata);
}
OPAL_LIST_RELEASE(cd->info);
OBJ_RELEASE(cd);
return;
}
/* this function will be called by the PMIx client library
* whenever it receives notification of an event. The
* notification can come from an ORTE daemon (when launched
@ -256,7 +295,6 @@ void pmix3x_event_hdlr(size_t evhdlr_registration_id,
int rc;
opal_value_t *iptr;
size_t n;
opal_pmix3x_event_t *event;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s RECEIVED NOTIFICATION OF STATUS %d ON HDLR %lu",
@ -317,34 +355,12 @@ void pmix3x_event_hdlr(size_t evhdlr_registration_id,
}
}
/* cycle thru the registrations */
OPAL_LIST_FOREACH(event, &mca_pmix_pmix3x_component.events, opal_pmix3x_event_t) {
if (evhdlr_registration_id == event->index) {
/* found it - invoke the handler, pointing its
* callback function to our callback function */
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s _EVENT_HDLR CALLING EVHDLR",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
if (NULL != event->handler) {
OBJ_RETAIN(event);
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
event->handler(cd->status, &cd->pname,
cd->info, &cd->results,
return_local_event_hdlr, cd);
OBJ_RELEASE(event);
return;
}
}
}
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
/* if we didn't find a match, we still have to call their final callback */
if (NULL != cbfunc) {
cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
}
OPAL_LIST_RELEASE(cd->info);
OBJ_RELEASE(cd);
/* do NOT directly call the event handler as this
* may lead to a deadlock condition should the
* handler invoke a PMIx function */
OPAL_PMIX2X_THREADSHIFT(cd, process_event);
return;
}