1
1

Merge pull request #856 from rhc54/topic/pub

Bring the MPI_Publish functions online
Этот коммит содержится в:
rhc54 2015-09-02 13:51:17 -07:00
родитель 5b49dc156f a772b46c15
Коммит f4a3a86c86
24 изменённых файлов: 848 добавлений и 675 удалений

Просмотреть файл

@ -113,7 +113,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
bool dense, isnew;
opal_process_name_t pname;
opal_list_t ilist, mlist, rlist;
opal_pmix_info_t *info;
opal_value_t *info;
opal_pmix_pdata_t *pdat;
opal_namelist_t *nm;
@ -192,17 +192,17 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
free(nstring);
/* the root for each side publishes their list of participants */
OBJ_CONSTRUCT(&ilist, opal_list_t);
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
opal_list_append(&ilist, &info->super);
if (send_first) {
(void)asprintf(&info->key, "%s:connect", port_string);
info->value.type = OPAL_STRING;
info->value.data.string = opal_argv_join(members, ':');
info->type = OPAL_STRING;
info->data.string = opal_argv_join(members, ':');
} else {
(void)asprintf(&info->key, "%s:accept", port_string);
info->value.type = OPAL_STRING;
info->value.data.string = opal_argv_join(members, ':');
info->type = OPAL_STRING;
info->data.string = opal_argv_join(members, ':');
}
/* publish it with "session" scope */
rc = opal_pmix.publish(OPAL_PMIX_SESSION,
@ -222,9 +222,9 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
OBJ_CONSTRUCT(&ilist, opal_list_t);
pdat = OBJ_NEW(opal_pmix_pdata_t);
if (send_first) {
(void)asprintf(&pdat->key, "%s:accept", port_string);
(void)asprintf(&pdat->value.key, "%s:accept", port_string);
} else {
(void)asprintf(&pdat->key, "%s:connect", port_string);
(void)asprintf(&pdat->value.key, "%s:connect", port_string);
}
opal_list_append(&ilist, &pdat->super);
if (NULL == opal_pmix.lookup_nb) {
@ -239,7 +239,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
* the given data has been published */
char **keys = NULL;
struct lookup_caddy_t caddy;
opal_argv_append_nosize(&keys, pdat->key);
opal_argv_append_nosize(&keys, pdat->value.key);
caddy.active = true;
caddy.pdat = pdat;
rc = opal_pmix.lookup_nb(OPAL_PMIX_SESSION, true, keys,
@ -251,6 +251,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
return OMPI_ERROR;
}
OMPI_WAIT_FOR_COMPLETION(caddy.active);
opal_argv_free(keys);
}
/* initiate a list of participants for the connect,
* starting with our own members, remembering to
@ -542,7 +543,7 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
opal_list_t apps;
opal_list_t job_info;
opal_pmix_app_t *app;
opal_pmix_info_t *info;
opal_value_t *info;
bool local_spawn, non_mpi;
char **envars;
@ -631,45 +632,45 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
ompi_info_get (array_of_info[i], "personality", sizeof(host) - 1, host, &flag);
if ( flag ) {
personality = true;
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_PERSONALITY);
opal_value_load(&info->value, host, OPAL_STRING);
opal_value_load(info, host, OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
/* check for 'host' */
ompi_info_get (array_of_info[i], "host", sizeof(host) - 1, host, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_HOST);
opal_value_load(&info->value, host, OPAL_STRING);
opal_value_load(info, host, OPAL_STRING);
opal_list_append(&app->info, &info->super);
}
/* check for 'hostfile' */
ompi_info_get (array_of_info[i], "hostfile", sizeof(host) - 1, host, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_HOSTFILE);
opal_value_load(&info->value, host, OPAL_STRING);
opal_value_load(info, host, OPAL_STRING);
opal_list_append(&app->info, &info->super);
}
/* check for 'add-hostfile' */
ompi_info_get (array_of_info[i], "add-hostfile", sizeof(host) - 1, host, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_ADD_HOSTFILE);
opal_value_load(&info->value, host, OPAL_STRING);
opal_value_load(info, host, OPAL_STRING);
opal_list_append(&app->info, &info->super);
}
/* check for 'add-host' */
ompi_info_get (array_of_info[i], "add-host", sizeof(host) - 1, host, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_ADD_HOST);
opal_value_load(&info->value, host, OPAL_STRING);
opal_value_load(info, host, OPAL_STRING);
opal_list_append(&app->info, &info->super);
}
@ -692,18 +693,18 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
*/
ompi_info_get (array_of_info[i], "ompi_prefix", sizeof(prefix) - 1, prefix, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_PREFIX);
opal_value_load(&info->value, prefix, OPAL_STRING);
opal_value_load(info, prefix, OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
/* check for 'wdir' */
ompi_info_get (array_of_info[i], "wdir", sizeof(cwd) - 1, cwd, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_WDIR);
opal_value_load(&info->value, cwd, OPAL_STRING);
opal_value_load(info, cwd, OPAL_STRING);
opal_list_append(&app->info, &info->super);
have_wdir = 1;
}
@ -711,60 +712,60 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
/* check for 'mapper' - a job-level key */
ompi_info_get(array_of_info[i], "mapper", sizeof(mapper) - 1, mapper, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_MAPPER);
opal_value_load(&info->value, mapper, OPAL_STRING);
opal_value_load(info, mapper, OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
/* check for 'display_map' - a job-level key */
ompi_info_get_bool(array_of_info[i], "display_map", &local_spawn, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_DISPLAY_MAP);
opal_value_load(&info->value, &local_spawn, OPAL_BOOL);
opal_value_load(info, &local_spawn, OPAL_BOOL);
opal_list_append(&job_info, &info->super);
}
/* check for 'npernode' and 'ppr' - job-level key */
ompi_info_get (array_of_info[i], "npernode", sizeof(slot_list) - 1, slot_list, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_PPR);
info->value.type = OPAL_STRING;
(void)asprintf(&(info->value.data.string), "%s:n", slot_list);
info->type = OPAL_STRING;
(void)asprintf(&(info->data.string), "%s:n", slot_list);
opal_list_append(&job_info, &info->super);
}
ompi_info_get (array_of_info[i], "pernode", sizeof(slot_list) - 1, slot_list, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_PPR);
opal_value_load(&info->value, "1:n", OPAL_STRING);
opal_value_load(info, "1:n", OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
ompi_info_get (array_of_info[i], "ppr", sizeof(slot_list) - 1, slot_list, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_PPR);
opal_value_load(&info->value, slot_list, OPAL_STRING);
opal_value_load(info, slot_list, OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
/* check for 'map_by' - job-level key */
ompi_info_get(array_of_info[i], "map_by", sizeof(slot_list) - 1, slot_list, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_MAPBY);
opal_value_load(&info->value, slot_list, OPAL_STRING);
opal_value_load(info, slot_list, OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
/* check for 'rank_by' - job-level key */
ompi_info_get(array_of_info[i], "rank_by", sizeof(slot_list) - 1, slot_list, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_RANKBY);
opal_value_load(&info->value, slot_list, OPAL_STRING);
opal_value_load(info, slot_list, OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
@ -772,9 +773,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
/* check for 'bind_to' - job-level key */
ompi_info_get(array_of_info[i], "bind_to", sizeof(slot_list) - 1, slot_list, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_BINDTO);
opal_value_load(&info->value, slot_list, OPAL_STRING);
opal_value_load(info, slot_list, OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
#endif
@ -782,18 +783,18 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
/* check for 'preload_binary' - job-level key */
ompi_info_get_bool(array_of_info[i], "ompi_preload_binary", &local_spawn, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_PRELOAD_BIN);
opal_value_load(&info->value, &local_spawn, OPAL_BOOL);
opal_value_load(info, &local_spawn, OPAL_BOOL);
opal_list_append(&job_info, &info->super);
}
/* check for 'preload_files' - job-level key */
ompi_info_get (array_of_info[i], "ompi_preload_files", sizeof(cwd) - 1, cwd, &flag);
if ( flag ) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_PRELOAD_FILES);
opal_value_load(&info->value, cwd, OPAL_STRING);
opal_value_load(info, cwd, OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
@ -802,9 +803,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
*/
ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi, &flag);
if (flag && non_mpi) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_NON_PMI);
opal_value_load(&info->value, &non_mpi, OPAL_BOOL);
opal_value_load(info, &non_mpi, OPAL_BOOL);
opal_list_append(&job_info, &info->super);
}
@ -826,9 +827,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
} else {
ui32 = strtoul(stdin_target, NULL, 10);
}
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_STDIN_TGT);
opal_value_load(&info->value, &ui32, OPAL_UINT32);
opal_value_load(info, &ui32, OPAL_UINT32);
opal_list_append(&job_info, &info->super);
}
}
@ -843,9 +844,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
opal_progress_event_users_decrement();
return rc;
}
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_WDIR);
opal_value_load(&info->value, cwd, OPAL_STRING);
opal_value_load(info, cwd, OPAL_STRING);
opal_list_append(&app->info, &info->super);
}
@ -856,9 +857,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
/* default the personality - job-level key */
if (!personality) {
info = OBJ_NEW(opal_pmix_info_t);
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_PERSONALITY);
opal_value_load(&info->value, "ompi", OPAL_STRING);
opal_value_load(info, "ompi", OPAL_STRING);
opal_list_append(&job_info, &info->super);
}
@ -880,10 +881,12 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
int ompi_dpm_open_port(char *port_name)
{
uint32_t r;
char *tmp;
r = opal_rand(&rnd);
snprintf(port_name, MPI_MAX_PORT_NAME, "%s:%u",
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), r);
opal_convert_process_name_to_string(&tmp, OMPI_PROC_MY_NAME);
snprintf(port_name, MPI_MAX_PORT_NAME, "%s:%u", tmp, r);
free(tmp);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -27,7 +27,7 @@ int vprotocol_pessimist_event_logger_connect(int el_rank, ompi_communicator_t **
OBJ_CONSTRUCT(&results, opal_list_t);
pdat = OBJ_NEW(opal_pmix_pdata_t);
asprintf(&pdat->key, VPROTOCOL_EVENT_LOGGER_NAME_FMT, el_rank);
asprintf(&pdat->value.key, VPROTOCOL_EVENT_LOGGER_NAME_FMT, el_rank);
opal_list_append(&results, &pdat->super);
rc = opal_pmix.lookup(OPAL_PMIX_NAMESPACE, &results);

Просмотреть файл

@ -95,7 +95,7 @@ int MPI_Lookup_name(const char *service_name, MPI_Info info, char *port_name)
/* collect the findings */
OBJ_CONSTRUCT(&results, opal_list_t);
pdat = OBJ_NEW(opal_pmix_pdata_t);
pdat->key = strdup(service_name);
pdat->value.key = strdup(service_name);
opal_list_append(&results, &pdat->super);
ret = opal_pmix.lookup(rng, &results);

Просмотреть файл

@ -53,7 +53,7 @@ int MPI_Publish_name(const char *service_name, MPI_Info info,
opal_pmix_persistence_t persist;
bool persistence_given = false;
opal_list_t values;
opal_pmix_info_t *pinfo;
opal_value_t *pinfo;
if ( MPI_PARAM_CHECK ) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
@ -118,10 +118,10 @@ int MPI_Publish_name(const char *service_name, MPI_Info info,
/* publish the values */
OBJ_CONSTRUCT(&values, opal_list_t);
pinfo = OBJ_NEW(opal_pmix_info_t);
pinfo = OBJ_NEW(opal_value_t);
pinfo->key = strdup(service_name);
pinfo->value.type = OPAL_STRING;
pinfo->value.data.string = strdup(port_name);
pinfo->type = OPAL_STRING;
pinfo->data.string = strdup(port_name);
opal_list_append(&values, &pinfo->super);
rc = opal_pmix.publish(rng, persist, &values);

Просмотреть файл

@ -41,7 +41,8 @@ OPAL_DECLSPEC int opal_value_load(opal_value_t *kv,
void *data, opal_data_type_t type);
OPAL_DECLSPEC int opal_value_unload(opal_value_t *kv,
void **data, opal_data_type_t type);
OPAL_DECLSPEC int opal_value_xfer(opal_value_t *dest,
opal_value_t *src);
/**
* Top-level interface function to pack one or more values into a
* buffer.

Просмотреть файл

@ -367,3 +367,104 @@ int opal_value_unload(opal_value_t *kv,
}
return OPAL_SUCCESS;
}
int opal_value_xfer(opal_value_t *dest,
opal_value_t *src)
{
opal_byte_object_t *boptr;
if (NULL != src->key) {
dest->key = strdup(src->key);
}
dest->type = src->type;
switch (src->type) {
case OPAL_BOOL:
dest->data.flag = src->data.flag;
break;
case OPAL_BYTE:
dest->data.byte = src->data.byte;
break;
case OPAL_STRING:
if (NULL != dest->data.string) {
free(dest->data.string);
}
if (NULL != src->data.string) {
dest->data.string = strdup(src->data.string);
} else {
dest->data.string = NULL;
}
break;
case OPAL_SIZE:
dest->data.size = src->data.size;
break;
case OPAL_PID:
dest->data.pid = src->data.pid;
break;
case OPAL_INT:
dest->data.integer = src->data.integer;
break;
case OPAL_INT8:
dest->data.int8 = src->data.int8;
break;
case OPAL_INT16:
dest->data.int16 = src->data.int16;
break;
case OPAL_INT32:
dest->data.int32 = src->data.int32;
break;
case OPAL_INT64:
dest->data.int64 = src->data.int64;
break;
case OPAL_UINT:
dest->data.uint = src->data.uint;
break;
case OPAL_UINT8:
dest->data.uint8 = src->data.uint8;
break;
case OPAL_UINT16:
dest->data.uint16 = src->data.uint16;
break;
case OPAL_UINT32:
dest->data.uint32 = src->data.uint32;
break;
case OPAL_UINT64:
dest->data.uint64 = src->data.uint64;
break;
case OPAL_BYTE_OBJECT:
if (NULL != dest->data.bo.bytes) {
free(dest->data.bo.bytes);
}
boptr = &src->data.bo;
if (NULL != boptr && NULL != boptr->bytes && 0 < boptr->size) {
dest->data.bo.bytes = (uint8_t *) malloc(boptr->size);
memcpy(dest->data.bo.bytes, boptr->bytes, boptr->size);
dest->data.bo.size = boptr->size;
} else {
dest->data.bo.bytes = NULL;
dest->data.bo.size = 0;
}
break;
case OPAL_FLOAT:
dest->data.fval = src->data.fval;
break;
case OPAL_TIMEVAL:
dest->data.tv.tv_sec = src->data.tv.tv_sec;
dest->data.tv.tv_usec = src->data.tv.tv_usec;
break;
case OPAL_PTR:
dest->data.ptr = src->data.ptr;
break;
default:
OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED);
return OPAL_ERR_NOT_SUPPORTED;
}
return OPAL_SUCCESS;
}

Просмотреть файл

@ -70,34 +70,14 @@ MCA_BASE_FRAMEWORK_DECLARE(opal, pmix, "OPAL PMI Client Framework",
mca_pmix_base_static_components, 0);
/**** PMIX FRAMEWORK OBJECTS ****/
static void icon(opal_pmix_info_t *i)
{
i->key = NULL;
OBJ_CONSTRUCT(&i->value, opal_value_t);
}
static void ides(opal_pmix_info_t *i)
{
if (NULL != i->key) {
free(i->key);
}
OBJ_DESTRUCT(&i->value);
}
OBJ_CLASS_INSTANCE(opal_pmix_info_t,
opal_list_item_t,
icon, ides);
static void lkcon(opal_pmix_pdata_t *p)
{
p->proc.jobid = OPAL_JOBID_INVALID;
p->proc.vpid = OPAL_VPID_INVALID;
p->key = NULL;
OBJ_CONSTRUCT(&p->value, opal_value_t);
}
static void lkdes(opal_pmix_pdata_t *p)
{
if (NULL != p->key) {
free(p->key);
}
OBJ_DESTRUCT(&p->value);
}
OBJ_CLASS_INSTANCE(opal_pmix_pdata_t,

Просмотреть файл

@ -97,6 +97,10 @@ BEGIN_C_DECLS
* these keys are RESERVED */
#define PMIX_ATTR_UNDEF NULL
/* identification attributes */
#define PMIX_USERID "pmix.euid" // (uint32_t) effective user id
#define PMIX_GRPID "pmix.egid" // (uint32_t) effective group id
/* general proc-level attributes */
#define PMIX_CPUSET "pmix.cpuset" // (char*) hwloc bitmap applied to proc upon launch
#define PMIX_CREDENTIAL "pmix.cred" // (char*) security credential assigned to proc

Просмотреть файл

@ -169,7 +169,7 @@ typedef pmix_status_t (*pmix_server_dmodex_req_fn_t)(const pmix_proc_t *proc,
* process is also provided and is expected to be returned on any subsequent
* lookup request */
typedef pmix_status_t (*pmix_server_publish_fn_t)(const pmix_proc_t *proc,
pmix_data_range_t scope, pmix_persistence_t persist,
pmix_data_range_t range, pmix_persistence_t persist,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata);
@ -188,7 +188,7 @@ typedef pmix_status_t (*pmix_server_publish_fn_t)(const pmix_proc_t *proc,
* has been set - in such cases, the host RM is required to return an error
* if the directive cannot be met. */
typedef pmix_status_t (*pmix_server_lookup_fn_t)(const pmix_proc_t *proc,
pmix_data_range_t scope,
pmix_data_range_t range,
const pmix_info_t info[], size_t ninfo,
char **keys,
pmix_lookup_cbfunc_t cbfunc, void *cbdata);
@ -198,7 +198,9 @@ typedef pmix_status_t (*pmix_server_lookup_fn_t)(const pmix_proc_t *proc,
* been published. The callback is to be executed upon completion of the delete
* procedure */
typedef pmix_status_t (*pmix_server_unpublish_fn_t)(const pmix_proc_t *proc,
pmix_data_range_t scope, char **keys,
pmix_data_range_t range,
const pmix_info_t info[], size_t ninfo,
char **keys,
pmix_op_cbfunc_t cbfunc, void *cbdata);
/* Spawn a set of applications/processes as per the PMIx API. Note that
@ -259,7 +261,7 @@ typedef pmix_status_t (*pmix_server_disconnect_fn_t)(const pmix_proc_t procs[],
* the resource manager. */
typedef pmix_status_t (*pmix_server_register_events_fn_t)(const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata);
/* Callback function for incoming connection requests from
* local clients */
typedef void (*pmix_connection_cbfunc_t)(int incoming_sd);

Просмотреть файл

@ -244,6 +244,9 @@ int PMIx_Init(pmix_proc_t *proc)
PMIX_CONSTRUCT(&pmix_client_globals.myserver, pmix_peer_t);
/* mark that we are a client */
pmix_globals.server = false;
/* get our effective id's */
pmix_globals.uid = geteuid();
pmix_globals.gid = getegid();
/* initialize the output system */
if (!pmix_output_init()) {

Просмотреть файл

@ -137,6 +137,12 @@ int PMIx_Publish_nb(pmix_data_range_t scope,
PMIX_RELEASE(msg);
return rc;
}
/* pack our effective userid - will be used to constrain lookup */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &pmix_globals.uid, 1, PMIX_UINT32))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(msg);
return rc;
}
/* pack the data range */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &scope, 1, PMIX_DATA_RANGE))) {
PMIX_ERROR_LOG(rc);
@ -233,7 +239,7 @@ int PMIx_Lookup_nb(pmix_data_range_t range, char **keys,
pmix_cmd_t cmd = PMIX_LOOKUPNB_CMD;
int rc;
pmix_cb_t *cb;
size_t nkeys;
size_t nkeys, n;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: lookup called");
@ -255,6 +261,12 @@ int PMIx_Lookup_nb(pmix_data_range_t range, char **keys,
PMIX_RELEASE(msg);
return rc;
}
/* pack our effective userid - will be used to constrain lookup */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &pmix_globals.uid, 1, PMIX_UINT32))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(msg);
return rc;
}
/* pack the range */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) {
PMIX_ERROR_LOG(rc);
@ -282,10 +294,12 @@ int PMIx_Lookup_nb(pmix_data_range_t range, char **keys,
return rc;
}
if (0 < nkeys) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, keys, nkeys, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(msg);
return rc;
for (n=0; n < nkeys; n++) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &keys[n], 1, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(msg);
return rc;
}
}
}
@ -330,7 +344,7 @@ int PMIx_Unpublish(pmix_data_range_t scope, char **keys)
return rc;
}
int PMIx_Unpublish_nb(pmix_data_range_t scope, char **keys,
int PMIx_Unpublish_nb(pmix_data_range_t range, char **keys,
pmix_op_cbfunc_t cbfunc, void *cbdata)
{
pmix_buffer_t *msg;
@ -354,8 +368,14 @@ int PMIx_Unpublish_nb(pmix_data_range_t scope, char **keys,
PMIX_RELEASE(msg);
return rc;
}
/* pack the scope */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &scope, 1, PMIX_DATA_RANGE))) {
/* pack our effective userid - will be used to constrain lookup */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &pmix_globals.uid, 1, PMIX_UINT32))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(msg);
return rc;
}
/* pack the range */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(msg);
return rc;
@ -493,18 +513,22 @@ static void lookup_cbfunc(int status, pmix_pdata_t pdata[], size_t ndata,
pmix_pdata_t *tgt = (pmix_pdata_t*)cb->cbdata;
size_t i, j;
/* find the matching key in the provided info array - error if not found */
for (i=0; i < ndata; i++) {
for (j=0; j < cb->nvals; j++) {
if (0 == strcmp(pdata[i].key, tgt[j].key)) {
/* transfer the publishing proc id */
(void)strncpy(tgt[j].proc.nspace, pdata[i].proc.nspace, PMIX_MAX_NSLEN);
tgt[j].proc.rank = pdata[i].proc.rank;
/* transfer the value to the pmix_info_t */
pmix_value_xfer(&tgt[j].value, &pdata[i].value);
break;
cb->status = status;
if (PMIX_SUCCESS == status) {
/* find the matching key in the provided info array - error if not found */
for (i=0; i < ndata; i++) {
for (j=0; j < cb->nvals; j++) {
if (0 == strcmp(pdata[i].key, tgt[j].key)) {
/* transfer the publishing proc id */
(void)strncpy(tgt[j].proc.nspace, pdata[i].proc.nspace, PMIX_MAX_NSLEN);
tgt[j].proc.rank = pdata[i].proc.rank;
/* transfer the value to the pmix_info_t */
pmix_value_xfer(&tgt[j].value, &pdata[i].value);
break;
}
}
}
}
cb->active = false;
}

Просмотреть файл

@ -24,6 +24,10 @@
#include <pmix/rename.h>
#include <private/types.h>
#include <unistd.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include PMIX_EVENT_HEADER
#include <pmix_common.h>
@ -42,6 +46,8 @@ BEGIN_C_DECLS
typedef struct {
int init_cntr; // #times someone called Init - #times called Finalize
pmix_proc_t myid;
uid_t uid; // my effective uid
gid_t gid; // my effective gid
int pindex;
pmix_event_base_t *evbase;
int debug_output;

Просмотреть файл

@ -55,19 +55,14 @@ static void native_finalize(void)
static char* create_cred(void)
{
uid_t uid;
gid_t gid;
char *cred;
pmix_output_verbose(2, pmix_globals.debug_output,
"sec: native create_cred");
/* get our uid/gid */
uid = getuid();
gid = getgid();
/* print them and return the string */
(void)asprintf(&cred, "%lu:%lu", (unsigned long)uid, (unsigned long)gid);
(void)asprintf(&cred, "%lu:%lu", (unsigned long)pmix_globals.uid,
(unsigned long)pmix_globals.gid);
pmix_output_verbose(2, pmix_globals.debug_output,
"sec: using credential %s", cred);

Просмотреть файл

@ -979,11 +979,12 @@ pmix_status_t pmix_server_publish(pmix_peer_t *peer,
{
pmix_status_t rc;
int32_t cnt;
pmix_data_range_t scope;
pmix_data_range_t range;
pmix_persistence_t persist;
size_t i, ninfo;
size_t i, ninfo, einfo;
pmix_info_t *info = NULL;
pmix_proc_t proc;
uint32_t uid;
pmix_output_verbose(2, pmix_globals.debug_output,
"recvd PUBLISH");
@ -992,9 +993,15 @@ pmix_status_t pmix_server_publish(pmix_peer_t *peer,
return PMIX_ERR_NOT_SUPPORTED;
}
/* unpack the effective user id */
cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &uid, &cnt, PMIX_UINT32))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* unpack the scope */
cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &scope, &cnt, PMIX_DATA_RANGE))) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &range, &cnt, PMIX_DATA_RANGE))) {
PMIX_ERROR_LOG(rc);
return rc;
}
@ -1010,28 +1017,28 @@ pmix_status_t pmix_server_publish(pmix_peer_t *peer,
PMIX_ERROR_LOG(rc);
return rc;
}
/* we will be adding one for the user id */
einfo = ninfo + 1;
PMIX_INFO_CREATE(info, einfo);
/* unpack the array of info objects */
if (0 < ninfo) {
info = (pmix_info_t*)malloc(ninfo * sizeof(pmix_info_t));
cnt=ninfo;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
}
(void)strncpy(info[einfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN);
info[einfo-1].value.type = PMIX_UINT32;
info[einfo-1].value.data.uint32 = uid;
/* call the local server */
(void)strncpy(proc.nspace, peer->info->nptr->nspace, PMIX_MAX_NSLEN);
proc.rank = peer->info->rank;
rc = pmix_host_server.publish(&proc, scope, persist, info, ninfo, cbfunc, cbdata);
rc = pmix_host_server.publish(&proc, range, persist, info, einfo, cbfunc, cbdata);
cleanup:
if (NULL != info) {
for (i=0; i < ninfo; i++) {
PMIX_INFO_DESTRUCT(&info[i]);
}
free(info);
}
PMIX_INFO_FREE(info, einfo);
return rc;
}
@ -1042,12 +1049,13 @@ pmix_status_t pmix_server_lookup(pmix_peer_t *peer,
int32_t cnt;
pmix_status_t rc;
int wait;
pmix_data_range_t scope;
pmix_data_range_t range;
size_t nkeys, i;
char **keys=NULL, *sptr;
pmix_info_t *info = NULL;
size_t ninfo=0;
size_t ninfo, einfo;
pmix_proc_t proc;
uint32_t uid;
pmix_output_verbose(2, pmix_globals.debug_output,
"recvd LOOKUP");
@ -1056,18 +1064,40 @@ pmix_status_t pmix_server_lookup(pmix_peer_t *peer,
return PMIX_ERR_NOT_SUPPORTED;
}
/* unpack the scope */
/* unpack the effective user id */
cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &scope, &cnt, PMIX_DATA_RANGE))) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &uid, &cnt, PMIX_UINT32))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* unpack the wait flag */
/* unpack the range */
cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &wait, &cnt, PMIX_INT))) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &range, &cnt, PMIX_DATA_RANGE))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* unpack the number of info objects */
cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* we will be adding one for the user id */
einfo = ninfo + 1;
PMIX_INFO_CREATE(info, einfo);
/* unpack the array of info objects */
if (0 < ninfo) {
PMIX_INFO_CREATE(info, ninfo);
cnt=ninfo;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
}
(void)strncpy(info[einfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN);
info[einfo-1].value.type = PMIX_UINT32;
info[einfo-1].value.data.uint32 = uid;
/* unpack the number of keys */
cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &nkeys, &cnt, PMIX_SIZE))) {
@ -1084,27 +1114,14 @@ pmix_status_t pmix_server_lookup(pmix_peer_t *peer,
pmix_argv_append_nosize(&keys, sptr);
free(sptr);
}
/* unpack the number of provided info structs */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) {
return rc;
}
if (0 < ninfo) {
PMIX_INFO_CREATE(info, ninfo);
/* unpack the info */
cnt = ninfo;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
goto cleanup;
}
}
/* call the local server */
(void)strncpy(proc.nspace, peer->info->nptr->nspace, PMIX_MAX_NSLEN);
proc.rank = peer->info->rank;
rc = pmix_host_server.lookup(&proc, scope, info, ninfo, keys, cbfunc, cbdata);
rc = pmix_host_server.lookup(&proc, range, info, einfo, keys, cbfunc, cbdata);
cleanup:
PMIX_INFO_FREE(info, ninfo);
PMIX_INFO_FREE(info, einfo);
pmix_argv_free(keys);
return rc;
}
@ -1115,10 +1132,12 @@ pmix_status_t pmix_server_unpublish(pmix_peer_t *peer,
{
int32_t cnt;
pmix_status_t rc;
pmix_data_range_t scope;
pmix_data_range_t range;
size_t i, nkeys;
char **keys=NULL, *sptr;
pmix_proc_t proc;
uint32_t uid;
pmix_info_t info;
pmix_output_verbose(2, pmix_globals.debug_output,
"recvd UNPUBLISH");
@ -1127,9 +1146,15 @@ pmix_status_t pmix_server_unpublish(pmix_peer_t *peer,
return PMIX_ERR_NOT_SUPPORTED;
}
/* unpack the scope */
/* unpack the effective user id */
cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &scope, &cnt, PMIX_DATA_RANGE))) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &uid, &cnt, PMIX_UINT32))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* unpack the range */
cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &range, &cnt, PMIX_DATA_RANGE))) {
PMIX_ERROR_LOG(rc);
return rc;
}
@ -1149,10 +1174,16 @@ pmix_status_t pmix_server_unpublish(pmix_peer_t *peer,
pmix_argv_append_nosize(&keys, sptr);
free(sptr);
}
/* setup the info key */
PMIX_INFO_CONSTRUCT(&info);
(void)strncpy(info.key, PMIX_USERID, PMIX_MAX_KEYLEN);
info.value.type = PMIX_UINT32;
info.value.data.uint32 = uid;
/* call the local server */
(void)strncpy(proc.nspace, peer->info->nptr->nspace, PMIX_MAX_NSLEN);
proc.rank = peer->info->rank;
rc = pmix_host_server.unpublish(&proc, scope, keys, cbfunc, cbdata);
rc = pmix_host_server.unpublish(&proc, range, &info, 1, keys, cbfunc, cbdata);
cleanup:
pmix_argv_free(keys);

Просмотреть файл

@ -351,7 +351,7 @@ int pmix1_publish(opal_pmix_data_range_t scope,
int rc;
pmix_info_t *pinfo;
pmix_status_t ret;
opal_pmix_info_t *iptr;
opal_value_t *iptr;
size_t sz, n;
rc = convert_data_range(&rng, scope);
@ -369,9 +369,9 @@ int pmix1_publish(opal_pmix_data_range_t scope,
if (0 < sz) {
PMIX_INFO_CREATE(pinfo, sz);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_pmix_info_t) {
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&pinfo[n].value, &iptr->value);
pmix1_value_load(&pinfo[n].value, iptr);
++n;
}
}
@ -390,7 +390,7 @@ int pmix1_publishnb(opal_pmix_data_range_t scope,
pmix_persistence_t pst;
int rc;
pmix_status_t ret;
opal_pmix_info_t *iptr;
opal_value_t *iptr;
size_t n;
pmix1_opcaddy_t *op;
@ -412,9 +412,9 @@ int pmix1_publishnb(opal_pmix_data_range_t scope,
if (0 < op->sz) {
PMIX_INFO_CREATE(op->info, op->sz);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_pmix_info_t) {
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&op->info[n].value, &iptr->value);
pmix1_value_load(&op->info[n].value, iptr);
++n;
}
}
@ -443,26 +443,29 @@ int pmix1_lookup(opal_pmix_data_range_t scope,
PMIX_PDATA_CREATE(pdata, sz);
n=0;
OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) {
(void)strncpy(pdata[n++].key, d->key, PMIX_MAX_KEYLEN);
(void)strncpy(pdata[n++].key, d->value.key, PMIX_MAX_KEYLEN);
}
ret = PMIx_Lookup(rng, NULL, 0, pdata, sz);
/* transfer the data back */
n=0;
OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) {
rc = opal_convert_string_to_jobid(&d->proc.jobid, pdata[n].proc.nspace);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
PMIX_PDATA_FREE(pdata, sz);
return OPAL_ERR_BAD_PARAM;
}
d->proc.vpid = pdata[n].proc.rank;
rc = pmix1_value_unload(&d->value, &pdata[n].value);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
PMIX_PDATA_FREE(pdata, sz);
return OPAL_ERR_BAD_PARAM;
if (PMIX_SUCCESS == ret) {
/* transfer the data back */
n=0;
OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) {
rc = opal_convert_string_to_jobid(&d->proc.jobid, pdata[n].proc.nspace);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
PMIX_PDATA_FREE(pdata, sz);
return OPAL_ERR_BAD_PARAM;
}
d->proc.vpid = pdata[n].proc.rank;
rc = pmix1_value_unload(&d->value, &pdata[n].value);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
PMIX_PDATA_FREE(pdata, sz);
return OPAL_ERR_BAD_PARAM;
}
++n;
}
}
@ -475,7 +478,7 @@ static void lk_cbfunc(pmix_status_t status,
{
pmix1_opcaddy_t *op = (pmix1_opcaddy_t*)cbdata;
opal_pmix_pdata_t *d;
opal_list_t results;
opal_list_t results, *r;
int rc;
size_t n;
@ -485,36 +488,43 @@ static void lk_cbfunc(pmix_status_t status,
}
rc = pmix1_convert_rc(status);
OBJ_CONSTRUCT(&results, opal_list_t);
for (n=0; n < ndata; n++) {
d = OBJ_NEW(opal_pmix_pdata_t);
opal_list_append(&results, &d->super);
rc = opal_convert_string_to_jobid(&d->proc.jobid, data[n].proc.nspace);
if (OPAL_SUCCESS != rc) {
rc = OPAL_ERR_BAD_PARAM;
OPAL_ERROR_LOG(rc);
goto release;
}
d->proc.vpid = data[n].proc.rank;
d->key = strdup(data[n].key);
rc = pmix1_value_unload(&d->value, &data[n].value);
if (OPAL_SUCCESS != rc) {
rc = OPAL_ERR_BAD_PARAM;
OPAL_ERROR_LOG(rc);
goto release;
if (OPAL_SUCCESS == rc) {
OBJ_CONSTRUCT(&results, opal_list_t);
for (n=0; n < ndata; n++) {
d = OBJ_NEW(opal_pmix_pdata_t);
opal_list_append(&results, &d->super);
rc = opal_convert_string_to_jobid(&d->proc.jobid, data[n].proc.nspace);
if (OPAL_SUCCESS != rc) {
rc = OPAL_ERR_BAD_PARAM;
OPAL_ERROR_LOG(rc);
goto release;
}
d->proc.vpid = data[n].proc.rank;
d->value.key = strdup(data[n].key);
rc = pmix1_value_unload(&d->value, &data[n].value);
if (OPAL_SUCCESS != rc) {
rc = OPAL_ERR_BAD_PARAM;
OPAL_ERROR_LOG(rc);
goto release;
}
}
r = &results;
} else {
r = NULL;
}
release:
/* execute the callback */
op->lkcbfunc(rc, &results, op->cbdata);
op->lkcbfunc(rc, r, op->cbdata);
OPAL_LIST_DESTRUCT(&results);
if (NULL != r) {
OPAL_LIST_DESTRUCT(&results);
}
OBJ_RELEASE(op);
}
int pmix1_lookupnb(opal_pmix_data_range_t scope, int wait, char **keys,
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
{
pmix_data_range_t rng;
int rc;
@ -582,15 +592,15 @@ int pmix1_spawn(opal_list_t *job_info, opal_list_t *apps, opal_jobid_t *jobid)
pmix_app_t *papps;
size_t napps, n, m, ninfo = 0;
char nspace[PMIX_MAX_NSLEN+1];
opal_pmix_info_t *info;
opal_value_t *info;
opal_pmix_app_t *app;
if (NULL != job_info && 0 < (ninfo = opal_list_get_size(job_info))) {
PMIX_INFO_CREATE(pinfo, ninfo);
n=0;
OPAL_LIST_FOREACH(info, job_info, opal_pmix_info_t) {
OPAL_LIST_FOREACH(info, job_info, opal_value_t) {
(void)strncpy(pinfo[n].key, info->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&pinfo[n].value, &info->value);
pmix1_value_load(&pinfo[n].value, info);
++n;
}
}
@ -607,9 +617,9 @@ int pmix1_spawn(opal_list_t *job_info, opal_list_t *apps, opal_jobid_t *jobid)
if (0 < (papps[n].ninfo = opal_list_get_size(&app->info))) {
PMIX_INFO_CREATE(papps[n].info, papps[n].ninfo);
m=0;
OPAL_LIST_FOREACH(info, &app->info, opal_pmix_info_t) {
OPAL_LIST_FOREACH(info, &app->info, opal_value_t) {
(void)strncpy(papps[n].info[m].key, info->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&papps[n].info[m].value, &info->value);
pmix1_value_load(&papps[n].info[m].value, info);
++m;
}
}
@ -647,7 +657,7 @@ int pmix1_spawnnb(opal_list_t *job_info, opal_list_t *apps,
pmix_status_t ret;
pmix1_opcaddy_t *op;
size_t n, m;
opal_pmix_info_t *info;
opal_value_t *info;
opal_pmix_app_t *app;
/* create the caddy */
@ -658,9 +668,9 @@ int pmix1_spawnnb(opal_list_t *job_info, opal_list_t *apps,
if (NULL != job_info && 0 < (op->ninfo = opal_list_get_size(job_info))) {
PMIX_INFO_CREATE(op->info, op->ninfo);
n=0;
OPAL_LIST_FOREACH(info, job_info, opal_pmix_info_t) {
OPAL_LIST_FOREACH(info, job_info, opal_value_t) {
(void)strncpy(op->info[n].key, info->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&op->info[n].value, &info->value);
pmix1_value_load(&op->info[n].value, info);
++n;
}
}
@ -677,9 +687,9 @@ int pmix1_spawnnb(opal_list_t *job_info, opal_list_t *apps,
if (0 < (op->apps[n].ninfo = opal_list_get_size(&app->info))) {
PMIX_INFO_CREATE(op->apps[n].info, op->apps[n].ninfo);
m=0;
OPAL_LIST_FOREACH(info, &app->info, opal_pmix_info_t) {
OPAL_LIST_FOREACH(info, &app->info, opal_value_t) {
(void)strncpy(op->apps[n].info[m].key, info->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&op->apps[n].info[m].value, &info->value);
pmix1_value_load(&op->apps[n].info[m].value, info);
++m;
}
}

Просмотреть файл

@ -66,7 +66,8 @@ static pmix_status_t server_lookup_fn(const pmix_proc_t *proc, pmix_data_range_t
const pmix_info_t info[], size_t ninfo, char **keys,
pmix_lookup_cbfunc_t cbfunc, void *cbdata);
static pmix_status_t server_unpublish_fn(const pmix_proc_t *proc,
pmix_data_range_t scope, char **keys,
pmix_data_range_t scope,
const pmix_info_t info[], size_t ninfo, char **keys,
pmix_op_cbfunc_t cbfunc, void *cbdata);
static pmix_status_t server_spawn_fn(const pmix_proc_t *proc,
const pmix_info_t job_info[], size_t ninfo,
@ -251,7 +252,7 @@ static pmix_status_t server_fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
pmix1_opalcaddy_t *opalcaddy;
size_t n;
opal_namelist_t *nm;
opal_pmix_info_t *iptr;
opal_value_t *iptr;
int rc;
if (NULL == host_module || NULL == host_module->fence_nb) {
@ -280,10 +281,10 @@ static pmix_status_t server_fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
/* convert the array of pmix_info_t to the list of info */
for (n=0; n < ninfo; n++) {
iptr = OBJ_NEW(opal_pmix_info_t);
iptr = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &iptr->super);
iptr->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&iptr->value, &info[n].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(iptr, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
@ -305,7 +306,7 @@ static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *p,
int rc;
pmix1_opalcaddy_t *opalcaddy;
opal_process_name_t proc;
opal_pmix_info_t *iptr;
opal_value_t *iptr;
size_t n;
if (NULL == host_module || NULL == host_module->direct_modex) {
@ -329,10 +330,10 @@ static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *p,
/* convert the array of pmix_info_t to the list of info */
for (n=0; n < ninfo; n++) {
iptr = OBJ_NEW(opal_pmix_info_t);
iptr = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &iptr->super);
iptr->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&iptr->value, &info[n].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(iptr, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
@ -360,7 +361,7 @@ static pmix_status_t server_publish_fn(const pmix_proc_t *p,
opal_process_name_t proc;
opal_pmix_data_range_t oscp;
opal_pmix_persistence_t opers;
opal_pmix_info_t *oinfo;
opal_value_t *oinfo;
if (NULL == host_module || NULL == host_module->publish) {
return PMIX_ERR_NOT_SUPPORTED;
@ -393,10 +394,10 @@ static pmix_status_t server_publish_fn(const pmix_proc_t *p,
/* convert the info array */
for (n=0; n < ninfo; n++) {
oinfo = OBJ_NEW(opal_pmix_info_t);
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &oinfo->super);
oinfo->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &info[n].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
@ -432,7 +433,7 @@ static void opal_lkupcbfunc(int status,
/* convert the jobid */
(void)snprintf(d[n].proc.nspace, PMIX_MAX_NSLEN, opal_convert_jobid_to_string(p->proc.jobid));
d[n].proc.rank = p->proc.vpid;
(void)strncpy(d[n].key, p->key, PMIX_MAX_KEYLEN);
(void)strncpy(d[n].key, p->value.key, PMIX_MAX_KEYLEN);
pmix1_value_load(&d[n].value, &p->value);
}
}
@ -449,7 +450,7 @@ static pmix_status_t server_lookup_fn(const pmix_proc_t *p, pmix_data_range_t sc
pmix1_opalcaddy_t *opalcaddy;
opal_pmix_data_range_t oscp;
opal_process_name_t proc;
opal_pmix_info_t *iptr;
opal_value_t *iptr;
size_t n;
if (NULL == host_module || NULL == host_module->lookup) {
@ -478,10 +479,10 @@ static pmix_status_t server_lookup_fn(const pmix_proc_t *p, pmix_data_range_t sc
/* convert the array of pmix_info_t to the list of info */
for (n=0; n < ninfo; n++) {
iptr = OBJ_NEW(opal_pmix_info_t);
iptr = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &iptr->super);
iptr->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&iptr->value, &info[n].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(iptr, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
@ -498,13 +499,16 @@ static pmix_status_t server_lookup_fn(const pmix_proc_t *p, pmix_data_range_t sc
static pmix_status_t server_unpublish_fn(const pmix_proc_t *p,
pmix_data_range_t scope, char **keys,
pmix_data_range_t scope,
const pmix_info_t info[], size_t ninfo, char **keys,
pmix_op_cbfunc_t cbfunc, void *cbdata)
{
int rc;
pmix1_opalcaddy_t *opalcaddy;
opal_process_name_t proc;
opal_pmix_data_range_t oscp;
opal_value_t *iptr;
size_t n;
if (NULL == host_module || NULL == host_module->unpublish) {
return PMIX_SUCCESS;
@ -530,8 +534,19 @@ static pmix_status_t server_unpublish_fn(const pmix_proc_t *p,
opalcaddy->opcbfunc = cbfunc;
opalcaddy->cbdata = cbdata;
/* convert the array of pmix_info_t to the list of info */
for (n=0; n < ninfo; n++) {
iptr = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &iptr->super);
iptr->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(iptr, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
}
/* pass it up */
rc = host_module->unpublish(&proc, oscp, keys, opal_opcbfunc, opalcaddy);
rc = host_module->unpublish(&proc, oscp, &opalcaddy->info, keys, opal_opcbfunc, opalcaddy);
if (OPAL_SUCCESS != rc) {
OBJ_RELEASE(opalcaddy);
}
@ -562,7 +577,7 @@ static pmix_status_t server_spawn_fn(const pmix_proc_t *p,
pmix1_opalcaddy_t *opalcaddy;
opal_process_name_t proc;
opal_pmix_app_t *app;
opal_pmix_info_t *oinfo;
opal_value_t *oinfo;
size_t k, n;
int rc;
@ -587,10 +602,10 @@ static pmix_status_t server_spawn_fn(const pmix_proc_t *p,
/* convert the job info */
for (k=0; k < ninfo; k++) {
oinfo = OBJ_NEW(opal_pmix_info_t);
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &oinfo->super);
oinfo->key = strdup(job_info[k].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &job_info[k].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &job_info[k].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
@ -612,10 +627,10 @@ static pmix_status_t server_spawn_fn(const pmix_proc_t *p,
}
app->maxprocs = apps[n].maxprocs;
for (k=0; k < apps[n].ninfo; k++) {
oinfo = OBJ_NEW(opal_pmix_info_t);
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&app->info, &oinfo->super);
oinfo->key = strdup(apps[n].info[k].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &apps[n].info[k].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &apps[n].info[k].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
@ -641,7 +656,7 @@ static pmix_status_t server_connect_fn(const pmix_proc_t procs[], size_t nprocs,
pmix1_opalcaddy_t *opalcaddy;
opal_namelist_t *nm;
size_t n;
opal_pmix_info_t *oinfo;
opal_value_t *oinfo;
if (NULL == host_module || NULL == host_module->connect) {
return PMIX_ERR_NOT_SUPPORTED;
@ -669,10 +684,10 @@ static pmix_status_t server_connect_fn(const pmix_proc_t procs[], size_t nprocs,
/* convert the info */
for (n=0; n < ninfo; n++) {
oinfo = OBJ_NEW(opal_pmix_info_t);
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &oinfo->super);
oinfo->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &info[n].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
@ -696,7 +711,7 @@ static pmix_status_t server_disconnect_fn(const pmix_proc_t procs[], size_t npro
pmix1_opalcaddy_t *opalcaddy;
opal_namelist_t *nm;
size_t n;
opal_pmix_info_t *oinfo;
opal_value_t *oinfo;
if (NULL == host_module || NULL == host_module->disconnect) {
return PMIX_ERR_NOT_SUPPORTED;
@ -724,10 +739,10 @@ static pmix_status_t server_disconnect_fn(const pmix_proc_t procs[], size_t npro
/* convert the info */
for (n=0; n < ninfo; n++) {
oinfo = OBJ_NEW(opal_pmix_info_t);
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &oinfo->super);
oinfo->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &info[n].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}
@ -747,7 +762,7 @@ static pmix_status_t server_register_events(const pmix_info_t info[], size_t nin
{
pmix1_opalcaddy_t *opalcaddy;
size_t n;
opal_pmix_info_t *oinfo;
opal_value_t *oinfo;
int rc;
/* setup the caddy */
@ -757,10 +772,10 @@ static pmix_status_t server_register_events(const pmix_info_t info[], size_t nin
/* convert the info */
for (n=0; n < ninfo; n++) {
oinfo = OBJ_NEW(opal_pmix_info_t);
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &oinfo->super);
oinfo->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &info[n].value))) {
if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix1_convert_opalrc(rc);
}

Просмотреть файл

@ -58,7 +58,7 @@ typedef int (*opal_pmix_server_abort_fn_t)(opal_process_name_t *proc, void *serv
* in the fence[_nb] operation. The callback is to be executed once each daemon
* hosting at least one participant has called the host server's fencenb function.
*
* The list of opal_pmix_info_t includes any directives from the user regarding
* The list of opal_value_t includes any directives from the user regarding
* how the fence is to be executed (e.g., timeout limits).
*
* The provided data is to be collectively shared with all host
@ -73,7 +73,7 @@ typedef int (*opal_pmix_server_fencenb_fn_t)(opal_list_t *procs, opal_list_t *in
* PMIx server on the remote node that hosts the specified proc to
* obtain and return a direct modex blob for that proc
*
* The list of opal_pmix_info_t includes any directives from the user regarding
* The list of opal_value_t includes any directives from the user regarding
* how the operation is to be executed (e.g., timeout limits).
*/
typedef int (*opal_pmix_server_dmodex_req_fn_t)(opal_process_name_t *proc, opal_list_t *info,
@ -92,7 +92,7 @@ typedef int (*opal_pmix_server_dmodex_req_fn_t)(opal_process_name_t *proc, opal_
* process is also provided and is expected to be returned on any subsequent
* lookup request */
typedef int (*opal_pmix_server_publish_fn_t)(opal_process_name_t *proc,
opal_pmix_data_range_t scope,
opal_pmix_data_range_t range,
opal_pmix_persistence_t persist,
opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
@ -106,12 +106,12 @@ typedef int (*opal_pmix_server_publish_fn_t)(opal_process_name_t *proc,
* before executing the callback function, or should callback with whatever
* data is immediately available.
*
* The list of opal_pmix_info_t includes any directives from the user regarding
* The list of opal_value_t includes any directives from the user regarding
* how the operation is to be executed (e.g., timeout limits, whether the
* lookup should wait until data appears).
*/
typedef int (*opal_pmix_server_lookup_fn_t)(opal_process_name_t *proc,
opal_pmix_data_range_t scope,
opal_pmix_data_range_t range,
opal_list_t *info, char **keys,
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata);
@ -120,7 +120,8 @@ typedef int (*opal_pmix_server_lookup_fn_t)(opal_process_name_t *proc,
* been published. The callback is to be executed upon completion of the delete
* procedure */
typedef int (*opal_pmix_server_unpublish_fn_t)(opal_process_name_t *proc,
opal_pmix_data_range_t scope, char **keys,
opal_pmix_data_range_t range,
opal_list_t *info, char **keys,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
/* Spawn a set of applications/processes as per the PMIx API. Note that
@ -142,7 +143,7 @@ typedef int (*opal_pmix_server_spawn_fn_t)(opal_process_name_t *requestor,
* set of procs at a time. However, a process *can* be simultaneously engaged
* in multiple connect operations, each involving a different set of procs
*
* The list of opal_pmix_info_t includes any directives from the user regarding
* The list of opal_value_t includes any directives from the user regarding
* how the operation is to be executed (e.g., timeout limits).
*/
typedef int (*opal_pmix_server_connect_fn_t)(opal_list_t *procs, opal_list_t *info,
@ -155,7 +156,7 @@ typedef int (*opal_pmix_server_connect_fn_t)(opal_list_t *procs, opal_list_t *in
* disconnect - i.e., you have to fully disconnect before you can reconnect to the
* same group of processes.
*
* The list of opal_pmix_info_t includes any directives from the user regarding
* The list of opal_value_t includes any directives from the user regarding
* how the operation is to be executed (e.g., timeout limits).
*/
typedef int (*opal_pmix_server_disconnect_fn_t)(opal_list_t *procs, opal_list_t *info,
@ -165,12 +166,12 @@ typedef int (*opal_pmix_server_disconnect_fn_t)(opal_list_t *procs, opal_list_t
* manager may have access to events beyond process failure. In cases where
* the client application requests to be notified of such events, the request
* will be passed to the PMIx server, which in turn shall pass the request to
* the resource manager. The list of opal_pmix_info_t will describe the
* the resource manager. The list of opal_value_t will describe the
* desired events */
typedef int (*opal_pmix_server_register_events_fn_t)(opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata);
/* Callback function for incoming connection requests from
* local clients */
typedef void (*opal_pmix_connection_cbfunc_t)(int incoming_sd);

Просмотреть файл

@ -28,6 +28,10 @@ BEGIN_C_DECLS
* these keys are RESERVED */
#define OPAL_PMIX_ATTR_UNDEF NULL
/* identification attributes */
#define OPAL_PMIX_USERID "pmix.euid" // (uint32_t) effective user id
#define OPAL_PMIX_GRPID "pmix.egid" // (uint32_t) effective group id
#define OPAL_PMIX_CPUSET "pmix.cpuset" // (char*) hwloc bitmap applied to proc upon launch
#define OPAL_PMIX_CREDENTIAL "pmix.cred" // (char*) security credential assigned to proc
#define OPAL_PMIX_SPAWNED "pmix.spawned" // (bool) true if this proc resulted from a call to PMIx_Spawn
@ -159,19 +163,15 @@ typedef enum {
/**** PMIX INFO STRUCT ****/
typedef struct {
opal_list_item_t super;
char *key;
opal_value_t value;
} opal_pmix_info_t;
OBJ_CLASS_DECLARATION(opal_pmix_info_t);
/* NOTE: the pmix_info_t is essentially equivalent to the opal_value_t
* Hence, we do not define an opal_value_t */
/**** PMIX LOOKUP RETURN STRUCT ****/
typedef struct {
opal_list_item_t super;
opal_process_name_t proc;
char *key;
opal_value_t value;
} opal_pmix_pdata_t;
OBJ_CLASS_DECLARATION(opal_pmix_pdata_t);
@ -256,7 +256,7 @@ typedef void (*opal_pmix_lookup_cbfunc_t)(int status,
* It is the responsibility of the application to parse any provided info array
* for defined key-values if it so desires.
*
* Possible uses of a pmix_info_t object include:
* Possible uses of the opal_value_t list include:
*
* - for the RM to alert the process as to planned actions, such as
* to abort the session, in response to the reported error

Просмотреть файл

@ -187,6 +187,10 @@ int pmix_server_init(void)
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP,
ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL);
/* setup recv for replies from data server */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT,
ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL);
/* setup the local server */
if (ORTE_SUCCESS != (rc = opal_pmix.server_init(&pmix_server))) {
ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -156,7 +156,7 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
orte_job_t *jdata;
orte_app_context_t *app;
opal_pmix_app_t *papp;
opal_pmix_info_t *info;
opal_value_t *info;
int rc;
char cwd[OPAL_PATH_MAX];
@ -169,14 +169,14 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
jdata = OBJ_NEW(orte_job_t);
/* transfer the job info across */
OPAL_LIST_FOREACH(info, job_info, opal_pmix_info_t) {
OPAL_LIST_FOREACH(info, job_info, opal_value_t) {
if (0 == strcmp(info->key, OPAL_PMIX_PERSONALITY)) {
jdata->personality = strdup(info->value.data.string);
jdata->personality = strdup(info->data.string);
} else if (0 == strcmp(info->key, OPAL_PMIX_MAPPER)) {
if (NULL == jdata->map) {
jdata->map = OBJ_NEW(orte_job_map_t);
}
jdata->map->req_mapper = strdup(info->value.data.string);
jdata->map->req_mapper = strdup(info->data.string);
} else if (0 == strcmp(info->key, OPAL_PMIX_DISPLAY_MAP)) {
if (NULL == jdata->map) {
jdata->map = OBJ_NEW(orte_job_map_t);
@ -189,12 +189,12 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
if (ORTE_MAPPING_POLICY_IS_SET(jdata->map->mapping)) {
/* not allowed to provide multiple mapping policies */
orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
true, "mapping", info->value.data.string,
true, "mapping", info->data.string,
orte_rmaps_base_print_mapping(orte_rmaps_base.mapping));
return ORTE_ERR_BAD_PARAM;
}
ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_PPR);
jdata->map->ppr = strdup(info->value.data.string);
jdata->map->ppr = strdup(info->data.string);
} else if (0 == strcmp(info->key, OPAL_PMIX_MAPBY)) {
if (NULL == jdata->map) {
jdata->map = OBJ_NEW(orte_job_map_t);
@ -202,12 +202,12 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
if (ORTE_MAPPING_POLICY_IS_SET(jdata->map->mapping)) {
/* not allowed to provide multiple mapping policies */
orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
true, "mapping", info->value.data.string,
true, "mapping", info->data.string,
orte_rmaps_base_print_mapping(orte_rmaps_base.mapping));
return ORTE_ERR_BAD_PARAM;
}
rc = orte_rmaps_base_set_mapping_policy(&jdata->map->mapping,
NULL, info->value.data.string);
NULL, info->data.string);
if (ORTE_SUCCESS != rc) {
return rc;
}
@ -218,13 +218,13 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
if (ORTE_RANKING_POLICY_IS_SET(jdata->map->ranking)) {
/* not allowed to provide multiple ranking policies */
orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
true, "ranking", info->value.data.string,
true, "ranking", info->data.string,
orte_rmaps_base_print_ranking(orte_rmaps_base.ranking));
return ORTE_ERR_BAD_PARAM;
}
rc = orte_rmaps_base_set_ranking_policy(&jdata->map->ranking,
jdata->map->mapping,
info->value.data.string);
info->data.string);
if (ORTE_SUCCESS != rc) {
return rc;
}
@ -236,12 +236,12 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
if (OPAL_BINDING_POLICY_IS_SET(jdata->map->binding)) {
/* not allowed to provide multiple mapping policies */
orte_show_help("help-opal-hwloc-base.txt", "redefining-policy", true,
info->value.data.string,
info->data.string,
opal_hwloc_base_print_binding(opal_hwloc_binding_policy));
return ORTE_ERR_BAD_PARAM;
}
rc = opal_hwloc_base_set_binding_policy(&jdata->map->binding,
info->value.data.string);
info->data.string);
if (ORTE_SUCCESS != rc) {
return rc;
}
@ -250,12 +250,12 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
orte_set_attribute(&jdata->attributes, ORTE_JOB_NON_ORTE_JOB,
ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
} else if (0 == strcmp(info->key, OPAL_PMIX_STDIN_TGT)) {
if (0 == strcmp(info->value.data.string, "all")) {
if (0 == strcmp(info->data.string, "all")) {
jdata->stdin_target = ORTE_VPID_WILDCARD;
} else if (0 == strcmp(info->value.data.string, "none")) {
} else if (0 == strcmp(info->data.string, "none")) {
jdata->stdin_target = ORTE_VPID_INVALID;
} else {
jdata->stdin_target = strtoul(info->value.data.string, NULL, 10);
jdata->stdin_target = strtoul(info->data.string, NULL, 10);
}
} else {
/* unrecognized key */
@ -273,26 +273,26 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
app->argv = opal_argv_copy(papp->argv);
app->env = opal_argv_copy(papp->env);
app->num_procs = papp->maxprocs;
OPAL_LIST_FOREACH(info, &papp->info, opal_pmix_info_t) {
OPAL_LIST_FOREACH(info, &papp->info, opal_value_t) {
if (0 == strcmp(info->key, OPAL_PMIX_HOST)) {
orte_set_attribute(&app->attributes, ORTE_APP_DASH_HOST,
ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING);
ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
} else if (0 == strcmp(info->key, OPAL_PMIX_HOSTFILE)) {
orte_set_attribute(&app->attributes, ORTE_APP_HOSTFILE,
ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING);
ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
} else if (0 == strcmp(info->key, OPAL_PMIX_ADD_HOSTFILE)) {
orte_set_attribute(&app->attributes, ORTE_APP_ADD_HOSTFILE,
ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING);
ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
} else if (0 == strcmp(info->key, OPAL_PMIX_ADD_HOST)) {
orte_set_attribute(&app->attributes, ORTE_APP_ADD_HOST,
ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING);
ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
} else if (0 == strcmp(info->key, OPAL_PMIX_PREFIX)) {
orte_set_attribute(&app->attributes, ORTE_APP_PREFIX_DIR,
ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING);
ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
} else if (0 == strcmp(info->key, OPAL_PMIX_WDIR)) {
/* if this is a relative path, convert it to an absolute path */
if (opal_path_is_absolute(info->value.data.string)) {
app->cwd = strdup(info->value.data.string);
if (opal_path_is_absolute(info->data.string)) {
app->cwd = strdup(info->data.string);
} else {
/* get the cwd */
if (OPAL_SUCCESS != (rc = opal_getcwd(cwd, sizeof(cwd)))) {
@ -301,14 +301,14 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
return rc;
}
/* construct the absolute path */
app->cwd = opal_os_path(false, cwd, info->value.data.string, NULL);
app->cwd = opal_os_path(false, cwd, info->data.string, NULL);
}
} else if (0 == strcmp(info->key, OPAL_PMIX_PRELOAD_BIN)) {
orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN,
ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL);
} else if (0 == strcmp(info->key, OPAL_PMIX_PRELOAD_FILES)) {
orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES,
ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING);
ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
} else {
/* unrecognized key */
orte_show_help("help-orted.txt", "bad-key",

Просмотреть файл

@ -155,7 +155,8 @@ extern int pmix_server_lookup_fn(opal_process_name_t *proc,
opal_list_t *info, char **keys,
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata);
extern int pmix_server_unpublish_fn(opal_process_name_t *proc,
opal_pmix_data_range_t range, char **keys,
opal_pmix_data_range_t range,
opal_list_t *info, char **keys,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
extern int pmix_server_spawn_fn(opal_process_name_t *requestor,
opal_list_t *job_info, opal_list_t *apps,
@ -168,10 +169,15 @@ extern int pmix_server_register_events_fn(opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata);
/* declare the RML recv functions for responses */
extern void pmix_server_launch_resp(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tg, void *cbdata);
extern void pmix_server_keyval_client(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tg, void *cbdata);
/* exposed shared variables */
typedef struct {
bool initialized;

Просмотреть файл

@ -37,16 +37,12 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_data_server.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rml/rml.h"
#include "pmix_server_internal.h"
#define ORTE_PMIX_PUBLISH_CMD 0x01
#define ORTE_PMIX_LOOKUP_CMD 0x02
#define ORTE_PMIX_UNPUBLISH_CMD 0x03
static void execute(int sd, short args, void *cbdata)
{
pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
@ -98,10 +94,12 @@ int pmix_server_publish_fn(opal_process_name_t *proc,
int rc;
uint8_t cmd = ORTE_PMIX_PUBLISH_CMD;
int32_t ninfo;
opal_pmix_info_t *iptr;
opal_value_t *iptr;
/* create the caddy */
req = OBJ_NEW(pmix_server_req_t);
req->opcbfunc = cbfunc;
req->cbdata = cbdata;
/* load the command */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) {
@ -147,13 +145,8 @@ int pmix_server_publish_fn(opal_process_name_t *proc,
}
/* if we have items, pack those too */
OPAL_LIST_FOREACH(iptr, info, opal_pmix_info_t) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr->key, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr->value, 1, OPAL_VALUE))) {
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
@ -178,8 +171,13 @@ int pmix_server_lookup_fn(opal_process_name_t *proc,
pmix_server_req_t *req;
int rc;
uint8_t cmd = ORTE_PMIX_LOOKUP_CMD;
int32_t nkeys;
opal_pmix_info_t *iptr;
int32_t nkeys, i;
int32_t ninfo;
opal_value_t *iptr;
/* the list of info objects are directives for us - they include
* things like timeout constraints, so there is no reason to
* forward them to the server */
/* create the caddy */
req = OBJ_NEW(pmix_server_req_t);
@ -193,13 +191,6 @@ int pmix_server_lookup_fn(opal_process_name_t *proc,
return rc;
}
/* pack the name of the requestor */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, proc, 1, OPAL_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
/* pack the range */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
@ -214,26 +205,20 @@ int pmix_server_lookup_fn(opal_process_name_t *proc,
req->target = *ORTE_PROC_MY_HNP;
}
/* pack the number of info objects */
nkeys = opal_list_get_size(info);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {
/* pack the number of info items */
ninfo = opal_list_get_size(info);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &ninfo, 1, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
/* pack the objects */
if (0 < nkeys) {
OPAL_LIST_FOREACH(iptr, info, opal_pmix_info_t) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr->key, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr->value, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
/* if we have items, pack those too */
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
}
@ -246,10 +231,12 @@ int pmix_server_lookup_fn(opal_process_name_t *proc,
}
/* pack the keys too */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, keys, nkeys, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
for (i=0; i < nkeys; i++) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &keys[i], 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
}
/* thread-shift so we can store the tracker */
@ -262,16 +249,20 @@ int pmix_server_lookup_fn(opal_process_name_t *proc,
}
int pmix_server_unpublish_fn(opal_process_name_t *proc,
opal_pmix_data_range_t range, char **keys,
opal_pmix_data_range_t range,
opal_list_t *info, char **keys,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
pmix_server_req_t *req;
int rc;
uint8_t cmd = ORTE_PMIX_UNPUBLISH_CMD;
uint32_t nkeys;
uint32_t nkeys, ninfo;
opal_value_t *iptr;
/* create the caddy */
req = OBJ_NEW(pmix_server_req_t);
req->opcbfunc = cbfunc;
req->cbdata = cbdata;
/* load the command */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) {
@ -301,6 +292,22 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc,
req->target = *ORTE_PROC_MY_HNP;
}
/* pack the number of info items */
ninfo = opal_list_get_size(info);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &ninfo, 1, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
/* if we have items, pack those too */
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(req);
return rc;
}
}
/* pack the number of keys */
nkeys = opal_argv_count(keys);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {
@ -325,94 +332,19 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc,
return OPAL_SUCCESS;
}
void pmix_server_keyval_srvr(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tg, void *cbdata)
{
uint8_t cmd;
int range, room_num, cnt, rc;
opal_buffer_t *answer;
opal_process_name_t proc;
/* setup the answer */
answer = OBJ_NEW(opal_buffer_t);
/* unpack the remote room number */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return;
}
/* save it for the return */
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &room_num, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return;
}
/* unpack the cmd */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
goto release;
}
/* unpack the name of the publisher/requestor */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &proc, &cnt, OPAL_NAME))) {
ORTE_ERROR_LOG(rc);
goto release;
}
/* unpack the range */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &cnt, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
goto release;
}
switch(cmd) {
case ORTE_PMIX_PUBLISH_CMD:
/* unpack the publisher */
/* unpack the range */
/* unpack the persistence */
/* unpack the key */
/* unpack the value */
case ORTE_PMIX_LOOKUP_CMD:
case ORTE_PMIX_UNPUBLISH_CMD:
break;
default:
break;
}
release:
/* pack the error code */
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rc, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send back the reply */
rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DATA_CLIENT,
orte_rml_send_callback, NULL);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}
}
void pmix_server_keyval_client(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tg, void *cbdata)
{
int rc, ret, room_num;
int32_t cnt, ninfo, n;
int32_t cnt;
pmix_server_req_t *req;
opal_list_t *info = NULL;
opal_pmix_info_t *iptr;
opal_list_t info;
opal_value_t *iptr;
opal_pmix_pdata_t *pdata;
opal_process_name_t source;
OBJ_CONSTRUCT(&info, opal_list_t);
/* unpack the room number of the request tracker */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) {
@ -428,26 +360,25 @@ void pmix_server_keyval_client(int status, orte_process_name_t* sender,
goto release;
}
/* see if any data was included - not an error if the answer is no */
cnt = 1;
rc = opal_dss.unpack(buffer, &ninfo, &cnt, OPAL_INT32);
if (ORTE_SUCCESS == rc && 0 < ninfo) {
info = OBJ_NEW(opal_list_t);
for (n=0; n < ninfo; n++) {
iptr = OBJ_NEW(opal_pmix_info_t);
opal_list_append(info, &iptr->super);
rc = opal_dss.unpack(buffer, &iptr->key, &cnt, OPAL_STRING);
if (OPAL_SUCCESS != rc) {
ret = rc;
OPAL_LIST_RELEASE(info);
info = NULL;
if (ORTE_SUCCESS == ret) {
/* see if any data was included - not an error if the answer is no */
cnt = 1;
while (OPAL_SUCCESS == opal_dss.unpack(buffer, &source, &cnt, OPAL_NAME)) {
pdata = OBJ_NEW(opal_pmix_pdata_t);
pdata->proc = source;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &cnt, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(pdata);
continue;
}
rc = opal_dss.unpack(buffer, &iptr->value, &cnt, OPAL_VALUE);
if (OPAL_SUCCESS != rc) {
ret = rc;
OPAL_LIST_RELEASE(info);
info = NULL;
if (OPAL_SUCCESS != (rc = opal_value_xfer(&pdata->value, iptr))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(pdata);
OBJ_RELEASE(iptr);
continue;
}
OBJ_RELEASE(iptr);
opal_list_append(&info, &pdata->super);
}
}
@ -459,11 +390,15 @@ void pmix_server_keyval_client(int status, orte_process_name_t* sender,
/* pass down the response */
if (NULL != req->opcbfunc) {
req->opcbfunc(ret, req->cbdata);
} else if (NULL != req->lkcbfunc) {
req->lkcbfunc(ret, &info, req->cbdata);
} else {
req->lkcbfunc(ret, info, req->cbdata);
/* should not happen */
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
}
/* cleanup */
OPAL_LIST_DESTRUCT(&info);
OBJ_RELEASE(req);
}
}

Просмотреть файл

@ -12,6 +12,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved
* Copyright (c) 2015 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -31,8 +32,9 @@
#include "opal/util/output.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/dss/dss.h"
#include "opal/mca/pmix/pmix_types.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "orte/runtime/orte_globals.h"
@ -52,23 +54,26 @@ typedef struct {
* owner can remove it
*/
orte_process_name_t owner;
/* service name */
char *service;
/* port */
char *port;
/* uid of the owner - helps control
* access rights */
uint32_t uid;
/* characteristics */
opal_pmix_data_range_t range;
opal_pmix_persistence_t persistence;
/* and the values themselves */
opal_list_t values;
/* the value itself */
} orte_data_object_t;
static void construct(orte_data_object_t *ptr)
{
ptr->index = -1;
ptr->service = NULL;
ptr->port = NULL;
OBJ_CONSTRUCT(&ptr->values, opal_list_t);
}
static void destruct(orte_data_object_t *ptr)
{
if (NULL != ptr->service) free(ptr->service);
if (NULL != ptr->port) free(ptr->port);
OPAL_LIST_DESTRUCT(&ptr->values);
}
OBJ_CLASS_INSTANCE(orte_data_object_t,
@ -76,15 +81,14 @@ OBJ_CLASS_INSTANCE(orte_data_object_t,
construct, destruct);
/* local globals */
static opal_pointer_array_t *orte_data_server_store=NULL;
static bool recv_issued=false;
static opal_pointer_array_t orte_data_server_store;
int orte_data_server_init(void)
{
int rc;
orte_data_server_store = OBJ_NEW(opal_pointer_array_t);
if (ORTE_SUCCESS != (rc = opal_pointer_array_init(orte_data_server_store,
OBJ_CONSTRUCT(&orte_data_server_store, opal_pointer_array_t);
if (ORTE_SUCCESS != (rc = opal_pointer_array_init(&orte_data_server_store,
1,
INT_MAX,
1))) {
@ -92,14 +96,11 @@ int orte_data_server_init(void)
return rc;
}
if (!recv_issued) {
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DATA_SERVER,
ORTE_RML_PERSISTENT,
orte_data_server,
NULL);
recv_issued = true;
}
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DATA_SERVER,
ORTE_RML_PERSISTENT,
orte_data_server,
NULL);
return ORTE_SUCCESS;
}
@ -107,295 +108,350 @@ int orte_data_server_init(void)
void orte_data_server_finalize(void)
{
orte_std_cntr_t i;
orte_data_object_t **data;
orte_data_object_t *data;
if (NULL != orte_data_server_store) {
data = (orte_data_object_t**)orte_data_server_store->addr;
for (i=0; i < orte_data_server_store->size; i++) {
if (NULL != data[i]) OBJ_RELEASE(data[i]);
}
OBJ_RELEASE(orte_data_server_store);
}
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_SERVER);
if (recv_issued) {
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_SERVER);
recv_issued = false;
}
}
static orte_data_object_t *lookup(char *service)
{
orte_std_cntr_t i;
orte_data_object_t **data;
data = (orte_data_object_t**)orte_data_server_store->addr;
for (i=0; i < orte_data_server_store->size; i++) {
if (NULL != data[i]) {
if (0 == strcmp(data[i]->service, service)) {
return data[i];
}
for (i=0; i < orte_data_server_store.size; i++) {
if (NULL != (data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, i))) {
OBJ_RELEASE(data);
}
}
/* get here if not found - return NULL */
return NULL;
}
static void rml_cbfunc(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
OBJ_RELEASE(buffer);
OBJ_DESTRUCT(&orte_data_server_store);
}
void orte_data_server(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_data_server_cmd_t command;
uint8_t command;
orte_std_cntr_t count;
char *service_name, *port_name;
opal_process_name_t requestor;
orte_data_object_t *data;
opal_buffer_t *answer;
int rc, ret;
bool unique;
int rc, ret, k;
opal_value_t *iptr, *inext;
uint32_t ninfo, i;
char **keys = NULL, *str;
bool ret_packed = false;
int room_number;
uint32_t uid;
opal_pmix_data_range_t range;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* unpack the room number of the caller's request */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_DATA_SERVER_CMD))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &room_number, &count, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the command */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
return;
}
answer = OBJ_NEW(opal_buffer_t);
/* pack the room number as this must lead any response */
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &room_number, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return;
}
switch(command) {
case ORTE_DATA_SERVER_PUBLISH:
/* unpack the service name */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
/* unpack the port name */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &port_name, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
/* unpack uniqueness flag */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &unique, &count, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: publishing service %s port %s %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, port_name,
unique ? "UNIQUE" : "OVERWRITE"));
/* check the current data store to see if this service name has already
* been published
*/
if (NULL != (data = lookup(service_name))) {
/* already exists - see if overwrite allowed */
if (unique) {
/* return ORTE_EXISTS error code */
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: publishing service %s port %s already exists",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, port_name));
ret = ORTE_EXISTS;
} else {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: overwriting service %s with port %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, port_name));
if (NULL != data->port) {
free(data->port);
}
data->port = port_name;
data->owner.jobid = sender->jobid;
data->owner.vpid = sender->vpid;
ret = ORTE_SUCCESS;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
}
/* create a new data object */
case ORTE_PMIX_PUBLISH_CMD:
data = OBJ_NEW(orte_data_object_t);
/* pass over the data values - these were malloc'd when unpacked,
* so we don't need to strdup them here
*/
data->service = service_name;
data->port = port_name;
data->owner.jobid = sender->jobid;
data->owner.vpid = sender->vpid;
/* store the data */
data->index = opal_pointer_array_add(orte_data_server_store, data);
/* unpack the requestor */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->owner, &count, OPAL_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(data);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: successfully published service %s port %s",
"%s data server: publishing data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, port_name));
ORTE_NAME_PRINT(&data->owner)));
/* unpack the range */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->range, &count, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(data);
goto SEND_ERROR;
}
/* unpack the persistence */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->persistence, &count, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(data);
goto SEND_ERROR;
}
/* unpack the number of info elements */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(data);
goto SEND_ERROR;
}
if (0 < ninfo) {
for (i=0; i < ninfo; i++) {
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(data);
goto SEND_ERROR;
}
/* if this is the userid, separate it out */
if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
data->uid = iptr->data.uint32;
OBJ_RELEASE(iptr);
} else {
opal_list_append(&data->values, &iptr->super);
}
}
}
data->index = opal_pointer_array_add(&orte_data_server_store, data);
/* tell the user it was wonderful... */
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
* rc value either, so just send whatever is there */
}
goto SEND_ANSWER;
break;
case ORTE_DATA_SERVER_LOOKUP:
/* unpack the service name */
case ORTE_PMIX_LOOKUP_CMD:
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: lookup data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* unpack the range - this sets some constraints on the range of data to be considered */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
/* unpack the number of info elements */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
if (0 < ninfo) {
for (i=0; i < ninfo; i++) {
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
/* if this is the userid, separate it out */
if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
uid = iptr->data.uint32;
}
/* ignore anything else for now */
OBJ_RELEASE(iptr);
}
}
/* unpack the number of keys */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
if (0 == ninfo) {
/* they forgot to send us the keys?? */
rc = ORTE_ERR_BAD_PARAM;
goto SEND_ERROR;
}
/* unpack the keys */
for (i=0; i < ninfo; i++) {
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &str, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(keys);
goto SEND_ERROR;
}
opal_argv_append_nosize(&keys, str);
free(str);
}
/* cycle across the provided keys */
for (i=0; NULL != keys[i]; i++) {
/* cycle across the stored data, looking for a match */
for (k=0; k < orte_data_server_store.size; k++) {
data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
if (NULL == data) {
continue;
}
/* can only access data posted by the same user id */
if (uid != data->uid) {
continue;
}
/* if the range doesn't match, then we cannot consider it */
if (range != data->range) {
continue;
}
/* see if we have this key */
OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) {
if (0 == strcmp(iptr->key, keys[i])) {
/* found it - package it for return */
if (!ret_packed) {
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(keys);
goto SEND_ERROR;
}
ret_packed = true;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->owner, 1, OPAL_NAME))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(keys);
goto SEND_ERROR;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &iptr, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(keys);
goto SEND_ERROR;
}
}
}
}
}
opal_argv_free(keys);
if (!ret_packed) {
/* nothing was found - indicate that situation */
rc = ORTE_ERR_NOT_FOUND;
goto SEND_ERROR;
}
goto SEND_ANSWER;
break;
case ORTE_PMIX_UNPUBLISH_CMD:
/* unpack the requestor */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, OPAL_NAME))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: lookup on service %s",
"%s data server: unpublish data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
ORTE_NAME_PRINT(&requestor)));
/* locate this record in the data store */
if (NULL == (data = lookup(service_name))) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* return ORTE_ERR_NOT_FOUND error code */
ret = ORTE_ERR_NOT_FOUND;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: successful lookup on service %s port %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, data->port));
/* pack success so the unpack on the other end can
* always unpack an int first
*/
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
goto SEND_ANSWER;
}
/* pack the returned port */
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->port, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
break;
case ORTE_DATA_SERVER_UNPUBLISH:
/* unpack the service name */
/* unpack the range - this sets some constraints on the range of data to be considered */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: unpublish on service %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* locate this record in the data store */
if (NULL == (data = lookup(service_name))) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* return ORTE_ERR_NOT_FOUND error code */
ret = ORTE_ERR_NOT_FOUND;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
/* unpack the number of info elements */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
if (0 < ninfo) {
for (i=0; i < ninfo; i++) {
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
/* if this is the userid, separate it out */
if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) {
uid = iptr->data.uint32;
}
/* ignore anything else for now */
OBJ_RELEASE(iptr);
}
goto SEND_ANSWER;
}
/* check to see if the sender owns it - must be exact match */
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&data->owner, sender)) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s not owned by sender %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, ORTE_NAME_PRINT(sender)));
/* nope - return ORTE_ERR_PERM error code */
ret = ORTE_ERR_PERM;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
/* unpack the number of keys */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
if (0 == ninfo) {
/* they forgot to send us the keys?? */
rc = ORTE_ERR_BAD_PARAM;
goto SEND_ERROR;
}
/* delete the object from the data store */
opal_pointer_array_set_item(orte_data_server_store, data->index, NULL);
OBJ_RELEASE(data);
/* unpack the keys */
for (i=0; i < ninfo; i++) {
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &str, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(keys);
goto SEND_ERROR;
}
opal_argv_append_nosize(&keys, str);
free(str);
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s unpublished",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* cycle across the provided keys */
for (i=0; NULL != keys[i]; i++) {
/* cycle across the stored data, looking for a match */
for (k=0; k < orte_data_server_store.size; k++) {
data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
if (NULL == data) {
continue;
}
/* can only access data posted by the same user id */
if (uid != data->uid) {
continue;
}
/* can only access data posted by the same process */
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &data->owner, &requestor)) {
continue;
}
/* can only access data posted for the same range */
if (range != data->range) {
continue;
}
/* see if we have this key */
OPAL_LIST_FOREACH_SAFE(iptr, inext, &data->values, opal_value_t) {
if (0 == strcmp(iptr->key, keys[i])) {
/* found it - delete the object from the data store */
opal_list_remove_item(&data->values, &iptr->super);
OBJ_RELEASE(iptr);
}
}
/* if all the data has been removed, then remove the object */
if (0 == opal_list_get_size(&data->values)) {
opal_pointer_array_set_item(&orte_data_server_store, k, NULL);
OBJ_RELEASE(data);
}
}
}
opal_argv_free(keys);
/* tell the sender this succeeded */
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
break;
@ -413,7 +469,8 @@ void orte_data_server(int status, orte_process_name_t* sender,
}
SEND_ANSWER:
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT, rml_cbfunc, NULL))) {
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}

Просмотреть файл

@ -11,6 +11,7 @@
* All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2015 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -34,6 +35,11 @@
BEGIN_C_DECLS
#define ORTE_PMIX_PUBLISH_CMD 0x01
#define ORTE_PMIX_LOOKUP_CMD 0x02
#define ORTE_PMIX_UNPUBLISH_CMD 0x03
/* provide hooks to startup and finalize the data server */
ORTE_DECLSPEC int orte_data_server_init(void);
ORTE_DECLSPEC void orte_data_server_finalize(void);
@ -43,17 +49,6 @@ ORTE_DECLSPEC void orte_data_server(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
/* define a type and some values for the commands
* to be used with the server
*/
typedef uint8_t orte_data_server_cmd_t;
#define ORTE_DATA_SERVER_CMD OPAL_UINT8
#define ORTE_DATA_SERVER_PUBLISH 0x01
#define ORTE_DATA_SERVER_UNPUBLISH 0x02
#define ORTE_DATA_SERVER_LOOKUP 0x04
END_C_DECLS
#endif /* ORTE_DATA_SERVER_H */