diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index 12c753e7b6..abded3c592 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -113,7 +113,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, bool dense, isnew; opal_process_name_t pname; opal_list_t ilist, mlist, rlist; - opal_pmix_info_t *info; + opal_value_t *info; opal_pmix_pdata_t *pdat; opal_namelist_t *nm; @@ -192,17 +192,17 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, free(nstring); /* the root for each side publishes their list of participants */ OBJ_CONSTRUCT(&ilist, opal_list_t); - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); opal_list_append(&ilist, &info->super); if (send_first) { (void)asprintf(&info->key, "%s:connect", port_string); - info->value.type = OPAL_STRING; - info->value.data.string = opal_argv_join(members, ':'); + info->type = OPAL_STRING; + info->data.string = opal_argv_join(members, ':'); } else { (void)asprintf(&info->key, "%s:accept", port_string); - info->value.type = OPAL_STRING; - info->value.data.string = opal_argv_join(members, ':'); + info->type = OPAL_STRING; + info->data.string = opal_argv_join(members, ':'); } /* publish it with "session" scope */ rc = opal_pmix.publish(OPAL_PMIX_SESSION, @@ -222,9 +222,9 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, OBJ_CONSTRUCT(&ilist, opal_list_t); pdat = OBJ_NEW(opal_pmix_pdata_t); if (send_first) { - (void)asprintf(&pdat->key, "%s:accept", port_string); + (void)asprintf(&pdat->value.key, "%s:accept", port_string); } else { - (void)asprintf(&pdat->key, "%s:connect", port_string); + (void)asprintf(&pdat->value.key, "%s:connect", port_string); } opal_list_append(&ilist, &pdat->super); if (NULL == opal_pmix.lookup_nb) { @@ -239,7 +239,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, * the given data has been published */ char **keys = NULL; struct lookup_caddy_t caddy; - opal_argv_append_nosize(&keys, pdat->key); + opal_argv_append_nosize(&keys, pdat->value.key); caddy.active = true; caddy.pdat = pdat; rc = opal_pmix.lookup_nb(OPAL_PMIX_SESSION, true, keys, @@ -251,6 +251,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, return OMPI_ERROR; } OMPI_WAIT_FOR_COMPLETION(caddy.active); + opal_argv_free(keys); } /* initiate a list of participants for the connect, * starting with our own members, remembering to @@ -542,7 +543,7 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], opal_list_t apps; opal_list_t job_info; opal_pmix_app_t *app; - opal_pmix_info_t *info; + opal_value_t *info; bool local_spawn, non_mpi; char **envars; @@ -631,45 +632,45 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], ompi_info_get (array_of_info[i], "personality", sizeof(host) - 1, host, &flag); if ( flag ) { personality = true; - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_PERSONALITY); - opal_value_load(&info->value, host, OPAL_STRING); + opal_value_load(info, host, OPAL_STRING); opal_list_append(&job_info, &info->super); } /* check for 'host' */ ompi_info_get (array_of_info[i], "host", sizeof(host) - 1, host, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_HOST); - opal_value_load(&info->value, host, OPAL_STRING); + opal_value_load(info, host, OPAL_STRING); opal_list_append(&app->info, &info->super); } /* check for 'hostfile' */ ompi_info_get (array_of_info[i], "hostfile", sizeof(host) - 1, host, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_HOSTFILE); - opal_value_load(&info->value, host, OPAL_STRING); + opal_value_load(info, host, OPAL_STRING); opal_list_append(&app->info, &info->super); } /* check for 'add-hostfile' */ ompi_info_get (array_of_info[i], "add-hostfile", sizeof(host) - 1, host, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_ADD_HOSTFILE); - opal_value_load(&info->value, host, OPAL_STRING); + opal_value_load(info, host, OPAL_STRING); opal_list_append(&app->info, &info->super); } /* check for 'add-host' */ ompi_info_get (array_of_info[i], "add-host", sizeof(host) - 1, host, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_ADD_HOST); - opal_value_load(&info->value, host, OPAL_STRING); + opal_value_load(info, host, OPAL_STRING); opal_list_append(&app->info, &info->super); } @@ -692,18 +693,18 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], */ ompi_info_get (array_of_info[i], "ompi_prefix", sizeof(prefix) - 1, prefix, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_PREFIX); - opal_value_load(&info->value, prefix, OPAL_STRING); + opal_value_load(info, prefix, OPAL_STRING); opal_list_append(&job_info, &info->super); } /* check for 'wdir' */ ompi_info_get (array_of_info[i], "wdir", sizeof(cwd) - 1, cwd, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_WDIR); - opal_value_load(&info->value, cwd, OPAL_STRING); + opal_value_load(info, cwd, OPAL_STRING); opal_list_append(&app->info, &info->super); have_wdir = 1; } @@ -711,60 +712,60 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], /* check for 'mapper' - a job-level key */ ompi_info_get(array_of_info[i], "mapper", sizeof(mapper) - 1, mapper, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_MAPPER); - opal_value_load(&info->value, mapper, OPAL_STRING); + opal_value_load(info, mapper, OPAL_STRING); opal_list_append(&job_info, &info->super); } /* check for 'display_map' - a job-level key */ ompi_info_get_bool(array_of_info[i], "display_map", &local_spawn, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_DISPLAY_MAP); - opal_value_load(&info->value, &local_spawn, OPAL_BOOL); + opal_value_load(info, &local_spawn, OPAL_BOOL); opal_list_append(&job_info, &info->super); } /* check for 'npernode' and 'ppr' - job-level key */ ompi_info_get (array_of_info[i], "npernode", sizeof(slot_list) - 1, slot_list, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_PPR); - info->value.type = OPAL_STRING; - (void)asprintf(&(info->value.data.string), "%s:n", slot_list); + info->type = OPAL_STRING; + (void)asprintf(&(info->data.string), "%s:n", slot_list); opal_list_append(&job_info, &info->super); } ompi_info_get (array_of_info[i], "pernode", sizeof(slot_list) - 1, slot_list, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_PPR); - opal_value_load(&info->value, "1:n", OPAL_STRING); + opal_value_load(info, "1:n", OPAL_STRING); opal_list_append(&job_info, &info->super); } ompi_info_get (array_of_info[i], "ppr", sizeof(slot_list) - 1, slot_list, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_PPR); - opal_value_load(&info->value, slot_list, OPAL_STRING); + opal_value_load(info, slot_list, OPAL_STRING); opal_list_append(&job_info, &info->super); } /* check for 'map_by' - job-level key */ ompi_info_get(array_of_info[i], "map_by", sizeof(slot_list) - 1, slot_list, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_MAPBY); - opal_value_load(&info->value, slot_list, OPAL_STRING); + opal_value_load(info, slot_list, OPAL_STRING); opal_list_append(&job_info, &info->super); } /* check for 'rank_by' - job-level key */ ompi_info_get(array_of_info[i], "rank_by", sizeof(slot_list) - 1, slot_list, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_RANKBY); - opal_value_load(&info->value, slot_list, OPAL_STRING); + opal_value_load(info, slot_list, OPAL_STRING); opal_list_append(&job_info, &info->super); } @@ -772,9 +773,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], /* check for 'bind_to' - job-level key */ ompi_info_get(array_of_info[i], "bind_to", sizeof(slot_list) - 1, slot_list, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_BINDTO); - opal_value_load(&info->value, slot_list, OPAL_STRING); + opal_value_load(info, slot_list, OPAL_STRING); opal_list_append(&job_info, &info->super); } #endif @@ -782,18 +783,18 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], /* check for 'preload_binary' - job-level key */ ompi_info_get_bool(array_of_info[i], "ompi_preload_binary", &local_spawn, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_PRELOAD_BIN); - opal_value_load(&info->value, &local_spawn, OPAL_BOOL); + opal_value_load(info, &local_spawn, OPAL_BOOL); opal_list_append(&job_info, &info->super); } /* check for 'preload_files' - job-level key */ ompi_info_get (array_of_info[i], "ompi_preload_files", sizeof(cwd) - 1, cwd, &flag); if ( flag ) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_PRELOAD_FILES); - opal_value_load(&info->value, cwd, OPAL_STRING); + opal_value_load(info, cwd, OPAL_STRING); opal_list_append(&job_info, &info->super); } @@ -802,9 +803,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], */ ompi_info_get_bool(array_of_info[i], "ompi_non_mpi", &non_mpi, &flag); if (flag && non_mpi) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_NON_PMI); - opal_value_load(&info->value, &non_mpi, OPAL_BOOL); + opal_value_load(info, &non_mpi, OPAL_BOOL); opal_list_append(&job_info, &info->super); } @@ -826,9 +827,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], } else { ui32 = strtoul(stdin_target, NULL, 10); } - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_STDIN_TGT); - opal_value_load(&info->value, &ui32, OPAL_UINT32); + opal_value_load(info, &ui32, OPAL_UINT32); opal_list_append(&job_info, &info->super); } } @@ -843,9 +844,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], opal_progress_event_users_decrement(); return rc; } - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_WDIR); - opal_value_load(&info->value, cwd, OPAL_STRING); + opal_value_load(info, cwd, OPAL_STRING); opal_list_append(&app->info, &info->super); } @@ -856,9 +857,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], /* default the personality - job-level key */ if (!personality) { - info = OBJ_NEW(opal_pmix_info_t); + info = OBJ_NEW(opal_value_t); info->key = strdup(OPAL_PMIX_PERSONALITY); - opal_value_load(&info->value, "ompi", OPAL_STRING); + opal_value_load(info, "ompi", OPAL_STRING); opal_list_append(&job_info, &info->super); } @@ -880,10 +881,12 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[], int ompi_dpm_open_port(char *port_name) { uint32_t r; + char *tmp; r = opal_rand(&rnd); - snprintf(port_name, MPI_MAX_PORT_NAME, "%s:%u", - OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), r); + opal_convert_process_name_to_string(&tmp, OMPI_PROC_MY_NAME); + snprintf(port_name, MPI_MAX_PORT_NAME, "%s:%u", tmp, r); + free(tmp); return OMPI_SUCCESS; } diff --git a/ompi/mca/vprotocol/pessimist/vprotocol_pessimist_eventlog.c b/ompi/mca/vprotocol/pessimist/vprotocol_pessimist_eventlog.c index a063fde633..054ce80119 100644 --- a/ompi/mca/vprotocol/pessimist/vprotocol_pessimist_eventlog.c +++ b/ompi/mca/vprotocol/pessimist/vprotocol_pessimist_eventlog.c @@ -27,7 +27,7 @@ int vprotocol_pessimist_event_logger_connect(int el_rank, ompi_communicator_t ** OBJ_CONSTRUCT(&results, opal_list_t); pdat = OBJ_NEW(opal_pmix_pdata_t); - asprintf(&pdat->key, VPROTOCOL_EVENT_LOGGER_NAME_FMT, el_rank); + asprintf(&pdat->value.key, VPROTOCOL_EVENT_LOGGER_NAME_FMT, el_rank); opal_list_append(&results, &pdat->super); rc = opal_pmix.lookup(OPAL_PMIX_NAMESPACE, &results); diff --git a/ompi/mpi/c/lookup_name.c b/ompi/mpi/c/lookup_name.c index 1613638bc7..be4838b242 100644 --- a/ompi/mpi/c/lookup_name.c +++ b/ompi/mpi/c/lookup_name.c @@ -95,7 +95,7 @@ int MPI_Lookup_name(const char *service_name, MPI_Info info, char *port_name) /* collect the findings */ OBJ_CONSTRUCT(&results, opal_list_t); pdat = OBJ_NEW(opal_pmix_pdata_t); - pdat->key = strdup(service_name); + pdat->value.key = strdup(service_name); opal_list_append(&results, &pdat->super); ret = opal_pmix.lookup(rng, &results); diff --git a/ompi/mpi/c/publish_name.c b/ompi/mpi/c/publish_name.c index 3ae6364dff..b9fee343b2 100644 --- a/ompi/mpi/c/publish_name.c +++ b/ompi/mpi/c/publish_name.c @@ -53,7 +53,7 @@ int MPI_Publish_name(const char *service_name, MPI_Info info, opal_pmix_persistence_t persist; bool persistence_given = false; opal_list_t values; - opal_pmix_info_t *pinfo; + opal_value_t *pinfo; if ( MPI_PARAM_CHECK ) { OMPI_ERR_INIT_FINALIZE(FUNC_NAME); @@ -118,10 +118,10 @@ int MPI_Publish_name(const char *service_name, MPI_Info info, /* publish the values */ OBJ_CONSTRUCT(&values, opal_list_t); - pinfo = OBJ_NEW(opal_pmix_info_t); + pinfo = OBJ_NEW(opal_value_t); pinfo->key = strdup(service_name); - pinfo->value.type = OPAL_STRING; - pinfo->value.data.string = strdup(port_name); + pinfo->type = OPAL_STRING; + pinfo->data.string = strdup(port_name); opal_list_append(&values, &pinfo->super); rc = opal_pmix.publish(rng, persist, &values); diff --git a/opal/dss/dss.h b/opal/dss/dss.h index fc081f154c..35e3589577 100644 --- a/opal/dss/dss.h +++ b/opal/dss/dss.h @@ -41,7 +41,8 @@ OPAL_DECLSPEC int opal_value_load(opal_value_t *kv, void *data, opal_data_type_t type); OPAL_DECLSPEC int opal_value_unload(opal_value_t *kv, void **data, opal_data_type_t type); - +OPAL_DECLSPEC int opal_value_xfer(opal_value_t *dest, + opal_value_t *src); /** * Top-level interface function to pack one or more values into a * buffer. diff --git a/opal/dss/dss_load_unload.c b/opal/dss/dss_load_unload.c index e5a9b5c8f4..52ca03af79 100644 --- a/opal/dss/dss_load_unload.c +++ b/opal/dss/dss_load_unload.c @@ -367,3 +367,104 @@ int opal_value_unload(opal_value_t *kv, } return OPAL_SUCCESS; } + +int opal_value_xfer(opal_value_t *dest, + opal_value_t *src) +{ + opal_byte_object_t *boptr; + + if (NULL != src->key) { + dest->key = strdup(src->key); + } + dest->type = src->type; + + switch (src->type) { + case OPAL_BOOL: + dest->data.flag = src->data.flag; + break; + case OPAL_BYTE: + dest->data.byte = src->data.byte; + break; + case OPAL_STRING: + if (NULL != dest->data.string) { + free(dest->data.string); + } + if (NULL != src->data.string) { + dest->data.string = strdup(src->data.string); + } else { + dest->data.string = NULL; + } + break; + case OPAL_SIZE: + dest->data.size = src->data.size; + break; + case OPAL_PID: + dest->data.pid = src->data.pid; + break; + + case OPAL_INT: + dest->data.integer = src->data.integer; + break; + case OPAL_INT8: + dest->data.int8 = src->data.int8; + break; + case OPAL_INT16: + dest->data.int16 = src->data.int16; + break; + case OPAL_INT32: + dest->data.int32 = src->data.int32; + break; + case OPAL_INT64: + dest->data.int64 = src->data.int64; + break; + + case OPAL_UINT: + dest->data.uint = src->data.uint; + break; + case OPAL_UINT8: + dest->data.uint8 = src->data.uint8; + break; + case OPAL_UINT16: + dest->data.uint16 = src->data.uint16; + break; + case OPAL_UINT32: + dest->data.uint32 = src->data.uint32; + break; + case OPAL_UINT64: + dest->data.uint64 = src->data.uint64; + break; + + case OPAL_BYTE_OBJECT: + if (NULL != dest->data.bo.bytes) { + free(dest->data.bo.bytes); + } + boptr = &src->data.bo; + if (NULL != boptr && NULL != boptr->bytes && 0 < boptr->size) { + dest->data.bo.bytes = (uint8_t *) malloc(boptr->size); + memcpy(dest->data.bo.bytes, boptr->bytes, boptr->size); + dest->data.bo.size = boptr->size; + } else { + dest->data.bo.bytes = NULL; + dest->data.bo.size = 0; + } + break; + + case OPAL_FLOAT: + dest->data.fval = src->data.fval; + break; + + case OPAL_TIMEVAL: + dest->data.tv.tv_sec = src->data.tv.tv_sec; + dest->data.tv.tv_usec = src->data.tv.tv_usec; + break; + + case OPAL_PTR: + dest->data.ptr = src->data.ptr; + break; + + default: + OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED); + return OPAL_ERR_NOT_SUPPORTED; + } + return OPAL_SUCCESS; +} diff --git a/opal/mca/pmix/base/pmix_base_frame.c b/opal/mca/pmix/base/pmix_base_frame.c index cc64466dcd..0a9a226e75 100644 --- a/opal/mca/pmix/base/pmix_base_frame.c +++ b/opal/mca/pmix/base/pmix_base_frame.c @@ -70,34 +70,14 @@ MCA_BASE_FRAMEWORK_DECLARE(opal, pmix, "OPAL PMI Client Framework", mca_pmix_base_static_components, 0); /**** PMIX FRAMEWORK OBJECTS ****/ -static void icon(opal_pmix_info_t *i) -{ - i->key = NULL; - OBJ_CONSTRUCT(&i->value, opal_value_t); -} -static void ides(opal_pmix_info_t *i) -{ - if (NULL != i->key) { - free(i->key); - } - OBJ_DESTRUCT(&i->value); -} -OBJ_CLASS_INSTANCE(opal_pmix_info_t, - opal_list_item_t, - icon, ides); - static void lkcon(opal_pmix_pdata_t *p) { p->proc.jobid = OPAL_JOBID_INVALID; p->proc.vpid = OPAL_VPID_INVALID; - p->key = NULL; OBJ_CONSTRUCT(&p->value, opal_value_t); } static void lkdes(opal_pmix_pdata_t *p) { - if (NULL != p->key) { - free(p->key); - } OBJ_DESTRUCT(&p->value); } OBJ_CLASS_INSTANCE(opal_pmix_pdata_t, diff --git a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in index 17f94cb441..fa2ece0f4f 100644 --- a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in +++ b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in @@ -97,6 +97,10 @@ BEGIN_C_DECLS * these keys are RESERVED */ #define PMIX_ATTR_UNDEF NULL +/* identification attributes */ +#define PMIX_USERID "pmix.euid" // (uint32_t) effective user id +#define PMIX_GRPID "pmix.egid" // (uint32_t) effective group id + /* general proc-level attributes */ #define PMIX_CPUSET "pmix.cpuset" // (char*) hwloc bitmap applied to proc upon launch #define PMIX_CREDENTIAL "pmix.cred" // (char*) security credential assigned to proc diff --git a/opal/mca/pmix/pmix1xx/pmix/include/pmix_server.h b/opal/mca/pmix/pmix1xx/pmix/include/pmix_server.h index 893126dc9c..81da17fafb 100644 --- a/opal/mca/pmix/pmix1xx/pmix/include/pmix_server.h +++ b/opal/mca/pmix/pmix1xx/pmix/include/pmix_server.h @@ -169,7 +169,7 @@ typedef pmix_status_t (*pmix_server_dmodex_req_fn_t)(const pmix_proc_t *proc, * process is also provided and is expected to be returned on any subsequent * lookup request */ typedef pmix_status_t (*pmix_server_publish_fn_t)(const pmix_proc_t *proc, - pmix_data_range_t scope, pmix_persistence_t persist, + pmix_data_range_t range, pmix_persistence_t persist, const pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata); @@ -188,7 +188,7 @@ typedef pmix_status_t (*pmix_server_publish_fn_t)(const pmix_proc_t *proc, * has been set - in such cases, the host RM is required to return an error * if the directive cannot be met. */ typedef pmix_status_t (*pmix_server_lookup_fn_t)(const pmix_proc_t *proc, - pmix_data_range_t scope, + pmix_data_range_t range, const pmix_info_t info[], size_t ninfo, char **keys, pmix_lookup_cbfunc_t cbfunc, void *cbdata); @@ -198,7 +198,9 @@ typedef pmix_status_t (*pmix_server_lookup_fn_t)(const pmix_proc_t *proc, * been published. The callback is to be executed upon completion of the delete * procedure */ typedef pmix_status_t (*pmix_server_unpublish_fn_t)(const pmix_proc_t *proc, - pmix_data_range_t scope, char **keys, + pmix_data_range_t range, + const pmix_info_t info[], size_t ninfo, + char **keys, pmix_op_cbfunc_t cbfunc, void *cbdata); /* Spawn a set of applications/processes as per the PMIx API. Note that @@ -259,7 +261,7 @@ typedef pmix_status_t (*pmix_server_disconnect_fn_t)(const pmix_proc_t procs[], * the resource manager. */ typedef pmix_status_t (*pmix_server_register_events_fn_t)(const pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata); - + /* Callback function for incoming connection requests from * local clients */ typedef void (*pmix_connection_cbfunc_t)(int incoming_sd); diff --git a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c index 831d373b1d..29e1f733c5 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c @@ -244,6 +244,9 @@ int PMIx_Init(pmix_proc_t *proc) PMIX_CONSTRUCT(&pmix_client_globals.myserver, pmix_peer_t); /* mark that we are a client */ pmix_globals.server = false; + /* get our effective id's */ + pmix_globals.uid = geteuid(); + pmix_globals.gid = getegid(); /* initialize the output system */ if (!pmix_output_init()) { diff --git a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_pub.c b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_pub.c index 20f993c278..186a28c40f 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_pub.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_pub.c @@ -137,6 +137,12 @@ int PMIx_Publish_nb(pmix_data_range_t scope, PMIX_RELEASE(msg); return rc; } + /* pack our effective userid - will be used to constrain lookup */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &pmix_globals.uid, 1, PMIX_UINT32))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + return rc; + } /* pack the data range */ if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &scope, 1, PMIX_DATA_RANGE))) { PMIX_ERROR_LOG(rc); @@ -233,7 +239,7 @@ int PMIx_Lookup_nb(pmix_data_range_t range, char **keys, pmix_cmd_t cmd = PMIX_LOOKUPNB_CMD; int rc; pmix_cb_t *cb; - size_t nkeys; + size_t nkeys, n; pmix_output_verbose(2, pmix_globals.debug_output, "pmix: lookup called"); @@ -255,6 +261,12 @@ int PMIx_Lookup_nb(pmix_data_range_t range, char **keys, PMIX_RELEASE(msg); return rc; } + /* pack our effective userid - will be used to constrain lookup */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &pmix_globals.uid, 1, PMIX_UINT32))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + return rc; + } /* pack the range */ if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) { PMIX_ERROR_LOG(rc); @@ -282,10 +294,12 @@ int PMIx_Lookup_nb(pmix_data_range_t range, char **keys, return rc; } if (0 < nkeys) { - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, keys, nkeys, PMIX_STRING))) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - return rc; + for (n=0; n < nkeys; n++) { + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &keys[n], 1, PMIX_STRING))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + return rc; + } } } @@ -330,7 +344,7 @@ int PMIx_Unpublish(pmix_data_range_t scope, char **keys) return rc; } -int PMIx_Unpublish_nb(pmix_data_range_t scope, char **keys, +int PMIx_Unpublish_nb(pmix_data_range_t range, char **keys, pmix_op_cbfunc_t cbfunc, void *cbdata) { pmix_buffer_t *msg; @@ -354,8 +368,14 @@ int PMIx_Unpublish_nb(pmix_data_range_t scope, char **keys, PMIX_RELEASE(msg); return rc; } - /* pack the scope */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &scope, 1, PMIX_DATA_RANGE))) { + /* pack our effective userid - will be used to constrain lookup */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &pmix_globals.uid, 1, PMIX_UINT32))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + return rc; + } + /* pack the range */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) { PMIX_ERROR_LOG(rc); PMIX_RELEASE(msg); return rc; @@ -493,18 +513,22 @@ static void lookup_cbfunc(int status, pmix_pdata_t pdata[], size_t ndata, pmix_pdata_t *tgt = (pmix_pdata_t*)cb->cbdata; size_t i, j; - /* find the matching key in the provided info array - error if not found */ - for (i=0; i < ndata; i++) { - for (j=0; j < cb->nvals; j++) { - if (0 == strcmp(pdata[i].key, tgt[j].key)) { - /* transfer the publishing proc id */ - (void)strncpy(tgt[j].proc.nspace, pdata[i].proc.nspace, PMIX_MAX_NSLEN); - tgt[j].proc.rank = pdata[i].proc.rank; - /* transfer the value to the pmix_info_t */ - pmix_value_xfer(&tgt[j].value, &pdata[i].value); - break; + cb->status = status; + if (PMIX_SUCCESS == status) { + /* find the matching key in the provided info array - error if not found */ + for (i=0; i < ndata; i++) { + for (j=0; j < cb->nvals; j++) { + if (0 == strcmp(pdata[i].key, tgt[j].key)) { + /* transfer the publishing proc id */ + (void)strncpy(tgt[j].proc.nspace, pdata[i].proc.nspace, PMIX_MAX_NSLEN); + tgt[j].proc.rank = pdata[i].proc.rank; + /* transfer the value to the pmix_info_t */ + pmix_value_xfer(&tgt[j].value, &pdata[i].value); + break; + } } } } + cb->active = false; } diff --git a/opal/mca/pmix/pmix1xx/pmix/src/include/pmix_globals.h b/opal/mca/pmix/pmix1xx/pmix/src/include/pmix_globals.h index 1c412378da..7e529737c8 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/include/pmix_globals.h +++ b/opal/mca/pmix/pmix1xx/pmix/src/include/pmix_globals.h @@ -24,6 +24,10 @@ #include #include +#include +#ifdef HAVE_SYS_TYPES_H +#include +#endif #include PMIX_EVENT_HEADER #include @@ -42,6 +46,8 @@ BEGIN_C_DECLS typedef struct { int init_cntr; // #times someone called Init - #times called Finalize pmix_proc_t myid; + uid_t uid; // my effective uid + gid_t gid; // my effective gid int pindex; pmix_event_base_t *evbase; int debug_output; diff --git a/opal/mca/pmix/pmix1xx/pmix/src/sec/pmix_native.c b/opal/mca/pmix/pmix1xx/pmix/src/sec/pmix_native.c index 3860f6f2bd..d353e82720 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/sec/pmix_native.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/sec/pmix_native.c @@ -55,19 +55,14 @@ static void native_finalize(void) static char* create_cred(void) { - uid_t uid; - gid_t gid; char *cred; pmix_output_verbose(2, pmix_globals.debug_output, "sec: native create_cred"); - /* get our uid/gid */ - uid = getuid(); - gid = getgid(); - /* print them and return the string */ - (void)asprintf(&cred, "%lu:%lu", (unsigned long)uid, (unsigned long)gid); + (void)asprintf(&cred, "%lu:%lu", (unsigned long)pmix_globals.uid, + (unsigned long)pmix_globals.gid); pmix_output_verbose(2, pmix_globals.debug_output, "sec: using credential %s", cred); diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c index 390625260a..0970a45e55 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c @@ -979,11 +979,12 @@ pmix_status_t pmix_server_publish(pmix_peer_t *peer, { pmix_status_t rc; int32_t cnt; - pmix_data_range_t scope; + pmix_data_range_t range; pmix_persistence_t persist; - size_t i, ninfo; + size_t i, ninfo, einfo; pmix_info_t *info = NULL; pmix_proc_t proc; + uint32_t uid; pmix_output_verbose(2, pmix_globals.debug_output, "recvd PUBLISH"); @@ -992,9 +993,15 @@ pmix_status_t pmix_server_publish(pmix_peer_t *peer, return PMIX_ERR_NOT_SUPPORTED; } + /* unpack the effective user id */ + cnt=1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &uid, &cnt, PMIX_UINT32))) { + PMIX_ERROR_LOG(rc); + return rc; + } /* unpack the scope */ cnt=1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &scope, &cnt, PMIX_DATA_RANGE))) { + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &range, &cnt, PMIX_DATA_RANGE))) { PMIX_ERROR_LOG(rc); return rc; } @@ -1010,28 +1017,28 @@ pmix_status_t pmix_server_publish(pmix_peer_t *peer, PMIX_ERROR_LOG(rc); return rc; } + /* we will be adding one for the user id */ + einfo = ninfo + 1; + PMIX_INFO_CREATE(info, einfo); /* unpack the array of info objects */ if (0 < ninfo) { - info = (pmix_info_t*)malloc(ninfo * sizeof(pmix_info_t)); cnt=ninfo; if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) { PMIX_ERROR_LOG(rc); goto cleanup; } } + (void)strncpy(info[einfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN); + info[einfo-1].value.type = PMIX_UINT32; + info[einfo-1].value.data.uint32 = uid; /* call the local server */ (void)strncpy(proc.nspace, peer->info->nptr->nspace, PMIX_MAX_NSLEN); proc.rank = peer->info->rank; - rc = pmix_host_server.publish(&proc, scope, persist, info, ninfo, cbfunc, cbdata); + rc = pmix_host_server.publish(&proc, range, persist, info, einfo, cbfunc, cbdata); cleanup: - if (NULL != info) { - for (i=0; i < ninfo; i++) { - PMIX_INFO_DESTRUCT(&info[i]); - } - free(info); - } + PMIX_INFO_FREE(info, einfo); return rc; } @@ -1042,12 +1049,13 @@ pmix_status_t pmix_server_lookup(pmix_peer_t *peer, int32_t cnt; pmix_status_t rc; int wait; - pmix_data_range_t scope; + pmix_data_range_t range; size_t nkeys, i; char **keys=NULL, *sptr; pmix_info_t *info = NULL; - size_t ninfo=0; + size_t ninfo, einfo; pmix_proc_t proc; + uint32_t uid; pmix_output_verbose(2, pmix_globals.debug_output, "recvd LOOKUP"); @@ -1056,18 +1064,40 @@ pmix_status_t pmix_server_lookup(pmix_peer_t *peer, return PMIX_ERR_NOT_SUPPORTED; } - /* unpack the scope */ + /* unpack the effective user id */ cnt=1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &scope, &cnt, PMIX_DATA_RANGE))) { + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &uid, &cnt, PMIX_UINT32))) { PMIX_ERROR_LOG(rc); return rc; } - /* unpack the wait flag */ + /* unpack the range */ cnt=1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &wait, &cnt, PMIX_INT))) { + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &range, &cnt, PMIX_DATA_RANGE))) { PMIX_ERROR_LOG(rc); return rc; } + /* unpack the number of info objects */ + cnt=1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) { + PMIX_ERROR_LOG(rc); + return rc; + } + /* we will be adding one for the user id */ + einfo = ninfo + 1; + PMIX_INFO_CREATE(info, einfo); + /* unpack the array of info objects */ + if (0 < ninfo) { + PMIX_INFO_CREATE(info, ninfo); + cnt=ninfo; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + } + (void)strncpy(info[einfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN); + info[einfo-1].value.type = PMIX_UINT32; + info[einfo-1].value.data.uint32 = uid; + /* unpack the number of keys */ cnt=1; if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &nkeys, &cnt, PMIX_SIZE))) { @@ -1084,27 +1114,14 @@ pmix_status_t pmix_server_lookup(pmix_peer_t *peer, pmix_argv_append_nosize(&keys, sptr); free(sptr); } - /* unpack the number of provided info structs */ - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) { - return rc; - } - if (0 < ninfo) { - PMIX_INFO_CREATE(info, ninfo); - /* unpack the info */ - cnt = ninfo; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) { - goto cleanup; - } - } /* call the local server */ (void)strncpy(proc.nspace, peer->info->nptr->nspace, PMIX_MAX_NSLEN); proc.rank = peer->info->rank; - rc = pmix_host_server.lookup(&proc, scope, info, ninfo, keys, cbfunc, cbdata); + rc = pmix_host_server.lookup(&proc, range, info, einfo, keys, cbfunc, cbdata); cleanup: - PMIX_INFO_FREE(info, ninfo); + PMIX_INFO_FREE(info, einfo); pmix_argv_free(keys); return rc; } @@ -1115,10 +1132,12 @@ pmix_status_t pmix_server_unpublish(pmix_peer_t *peer, { int32_t cnt; pmix_status_t rc; - pmix_data_range_t scope; + pmix_data_range_t range; size_t i, nkeys; char **keys=NULL, *sptr; pmix_proc_t proc; + uint32_t uid; + pmix_info_t info; pmix_output_verbose(2, pmix_globals.debug_output, "recvd UNPUBLISH"); @@ -1127,9 +1146,15 @@ pmix_status_t pmix_server_unpublish(pmix_peer_t *peer, return PMIX_ERR_NOT_SUPPORTED; } - /* unpack the scope */ + /* unpack the effective user id */ cnt=1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &scope, &cnt, PMIX_DATA_RANGE))) { + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &uid, &cnt, PMIX_UINT32))) { + PMIX_ERROR_LOG(rc); + return rc; + } + /* unpack the range */ + cnt=1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &range, &cnt, PMIX_DATA_RANGE))) { PMIX_ERROR_LOG(rc); return rc; } @@ -1149,10 +1174,16 @@ pmix_status_t pmix_server_unpublish(pmix_peer_t *peer, pmix_argv_append_nosize(&keys, sptr); free(sptr); } + /* setup the info key */ + PMIX_INFO_CONSTRUCT(&info); + (void)strncpy(info.key, PMIX_USERID, PMIX_MAX_KEYLEN); + info.value.type = PMIX_UINT32; + info.value.data.uint32 = uid; + /* call the local server */ (void)strncpy(proc.nspace, peer->info->nptr->nspace, PMIX_MAX_NSLEN); proc.rank = peer->info->rank; - rc = pmix_host_server.unpublish(&proc, scope, keys, cbfunc, cbdata); + rc = pmix_host_server.unpublish(&proc, range, &info, 1, keys, cbfunc, cbdata); cleanup: pmix_argv_free(keys); diff --git a/opal/mca/pmix/pmix1xx/pmix1_client.c b/opal/mca/pmix/pmix1xx/pmix1_client.c index 5a111ace92..261c0e1be8 100644 --- a/opal/mca/pmix/pmix1xx/pmix1_client.c +++ b/opal/mca/pmix/pmix1xx/pmix1_client.c @@ -351,7 +351,7 @@ int pmix1_publish(opal_pmix_data_range_t scope, int rc; pmix_info_t *pinfo; pmix_status_t ret; - opal_pmix_info_t *iptr; + opal_value_t *iptr; size_t sz, n; rc = convert_data_range(&rng, scope); @@ -369,9 +369,9 @@ int pmix1_publish(opal_pmix_data_range_t scope, if (0 < sz) { PMIX_INFO_CREATE(pinfo, sz); n=0; - OPAL_LIST_FOREACH(iptr, info, opal_pmix_info_t) { + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { (void)strncpy(pinfo[n].key, iptr->key, PMIX_MAX_KEYLEN); - pmix1_value_load(&pinfo[n].value, &iptr->value); + pmix1_value_load(&pinfo[n].value, iptr); ++n; } } @@ -390,7 +390,7 @@ int pmix1_publishnb(opal_pmix_data_range_t scope, pmix_persistence_t pst; int rc; pmix_status_t ret; - opal_pmix_info_t *iptr; + opal_value_t *iptr; size_t n; pmix1_opcaddy_t *op; @@ -412,9 +412,9 @@ int pmix1_publishnb(opal_pmix_data_range_t scope, if (0 < op->sz) { PMIX_INFO_CREATE(op->info, op->sz); n=0; - OPAL_LIST_FOREACH(iptr, info, opal_pmix_info_t) { + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { (void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN); - pmix1_value_load(&op->info[n].value, &iptr->value); + pmix1_value_load(&op->info[n].value, iptr); ++n; } } @@ -443,26 +443,29 @@ int pmix1_lookup(opal_pmix_data_range_t scope, PMIX_PDATA_CREATE(pdata, sz); n=0; OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) { - (void)strncpy(pdata[n++].key, d->key, PMIX_MAX_KEYLEN); + (void)strncpy(pdata[n++].key, d->value.key, PMIX_MAX_KEYLEN); } ret = PMIx_Lookup(rng, NULL, 0, pdata, sz); - /* transfer the data back */ - n=0; - OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) { - rc = opal_convert_string_to_jobid(&d->proc.jobid, pdata[n].proc.nspace); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - PMIX_PDATA_FREE(pdata, sz); - return OPAL_ERR_BAD_PARAM; - } - d->proc.vpid = pdata[n].proc.rank; - rc = pmix1_value_unload(&d->value, &pdata[n].value); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - PMIX_PDATA_FREE(pdata, sz); - return OPAL_ERR_BAD_PARAM; + if (PMIX_SUCCESS == ret) { + /* transfer the data back */ + n=0; + OPAL_LIST_FOREACH(d, data, opal_pmix_pdata_t) { + rc = opal_convert_string_to_jobid(&d->proc.jobid, pdata[n].proc.nspace); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + PMIX_PDATA_FREE(pdata, sz); + return OPAL_ERR_BAD_PARAM; + } + d->proc.vpid = pdata[n].proc.rank; + rc = pmix1_value_unload(&d->value, &pdata[n].value); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + PMIX_PDATA_FREE(pdata, sz); + return OPAL_ERR_BAD_PARAM; + } + ++n; } } @@ -475,7 +478,7 @@ static void lk_cbfunc(pmix_status_t status, { pmix1_opcaddy_t *op = (pmix1_opcaddy_t*)cbdata; opal_pmix_pdata_t *d; - opal_list_t results; + opal_list_t results, *r; int rc; size_t n; @@ -485,36 +488,43 @@ static void lk_cbfunc(pmix_status_t status, } rc = pmix1_convert_rc(status); - OBJ_CONSTRUCT(&results, opal_list_t); - for (n=0; n < ndata; n++) { - d = OBJ_NEW(opal_pmix_pdata_t); - opal_list_append(&results, &d->super); - rc = opal_convert_string_to_jobid(&d->proc.jobid, data[n].proc.nspace); - if (OPAL_SUCCESS != rc) { - rc = OPAL_ERR_BAD_PARAM; - OPAL_ERROR_LOG(rc); - goto release; - } - d->proc.vpid = data[n].proc.rank; - d->key = strdup(data[n].key); - rc = pmix1_value_unload(&d->value, &data[n].value); - if (OPAL_SUCCESS != rc) { - rc = OPAL_ERR_BAD_PARAM; - OPAL_ERROR_LOG(rc); - goto release; + if (OPAL_SUCCESS == rc) { + OBJ_CONSTRUCT(&results, opal_list_t); + for (n=0; n < ndata; n++) { + d = OBJ_NEW(opal_pmix_pdata_t); + opal_list_append(&results, &d->super); + rc = opal_convert_string_to_jobid(&d->proc.jobid, data[n].proc.nspace); + if (OPAL_SUCCESS != rc) { + rc = OPAL_ERR_BAD_PARAM; + OPAL_ERROR_LOG(rc); + goto release; + } + d->proc.vpid = data[n].proc.rank; + d->value.key = strdup(data[n].key); + rc = pmix1_value_unload(&d->value, &data[n].value); + if (OPAL_SUCCESS != rc) { + rc = OPAL_ERR_BAD_PARAM; + OPAL_ERROR_LOG(rc); + goto release; + } } + r = &results; + } else { + r = NULL; } release: /* execute the callback */ - op->lkcbfunc(rc, &results, op->cbdata); + op->lkcbfunc(rc, r, op->cbdata); - OPAL_LIST_DESTRUCT(&results); + if (NULL != r) { + OPAL_LIST_DESTRUCT(&results); + } OBJ_RELEASE(op); } int pmix1_lookupnb(opal_pmix_data_range_t scope, int wait, char **keys, - opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata) + opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata) { pmix_data_range_t rng; int rc; @@ -582,15 +592,15 @@ int pmix1_spawn(opal_list_t *job_info, opal_list_t *apps, opal_jobid_t *jobid) pmix_app_t *papps; size_t napps, n, m, ninfo = 0; char nspace[PMIX_MAX_NSLEN+1]; - opal_pmix_info_t *info; + opal_value_t *info; opal_pmix_app_t *app; if (NULL != job_info && 0 < (ninfo = opal_list_get_size(job_info))) { PMIX_INFO_CREATE(pinfo, ninfo); n=0; - OPAL_LIST_FOREACH(info, job_info, opal_pmix_info_t) { + OPAL_LIST_FOREACH(info, job_info, opal_value_t) { (void)strncpy(pinfo[n].key, info->key, PMIX_MAX_KEYLEN); - pmix1_value_load(&pinfo[n].value, &info->value); + pmix1_value_load(&pinfo[n].value, info); ++n; } } @@ -607,9 +617,9 @@ int pmix1_spawn(opal_list_t *job_info, opal_list_t *apps, opal_jobid_t *jobid) if (0 < (papps[n].ninfo = opal_list_get_size(&app->info))) { PMIX_INFO_CREATE(papps[n].info, papps[n].ninfo); m=0; - OPAL_LIST_FOREACH(info, &app->info, opal_pmix_info_t) { + OPAL_LIST_FOREACH(info, &app->info, opal_value_t) { (void)strncpy(papps[n].info[m].key, info->key, PMIX_MAX_KEYLEN); - pmix1_value_load(&papps[n].info[m].value, &info->value); + pmix1_value_load(&papps[n].info[m].value, info); ++m; } } @@ -647,7 +657,7 @@ int pmix1_spawnnb(opal_list_t *job_info, opal_list_t *apps, pmix_status_t ret; pmix1_opcaddy_t *op; size_t n, m; - opal_pmix_info_t *info; + opal_value_t *info; opal_pmix_app_t *app; /* create the caddy */ @@ -658,9 +668,9 @@ int pmix1_spawnnb(opal_list_t *job_info, opal_list_t *apps, if (NULL != job_info && 0 < (op->ninfo = opal_list_get_size(job_info))) { PMIX_INFO_CREATE(op->info, op->ninfo); n=0; - OPAL_LIST_FOREACH(info, job_info, opal_pmix_info_t) { + OPAL_LIST_FOREACH(info, job_info, opal_value_t) { (void)strncpy(op->info[n].key, info->key, PMIX_MAX_KEYLEN); - pmix1_value_load(&op->info[n].value, &info->value); + pmix1_value_load(&op->info[n].value, info); ++n; } } @@ -677,9 +687,9 @@ int pmix1_spawnnb(opal_list_t *job_info, opal_list_t *apps, if (0 < (op->apps[n].ninfo = opal_list_get_size(&app->info))) { PMIX_INFO_CREATE(op->apps[n].info, op->apps[n].ninfo); m=0; - OPAL_LIST_FOREACH(info, &app->info, opal_pmix_info_t) { + OPAL_LIST_FOREACH(info, &app->info, opal_value_t) { (void)strncpy(op->apps[n].info[m].key, info->key, PMIX_MAX_KEYLEN); - pmix1_value_load(&op->apps[n].info[m].value, &info->value); + pmix1_value_load(&op->apps[n].info[m].value, info); ++m; } } diff --git a/opal/mca/pmix/pmix1xx/pmix1_server_north.c b/opal/mca/pmix/pmix1xx/pmix1_server_north.c index 56eaedc956..e400341704 100644 --- a/opal/mca/pmix/pmix1xx/pmix1_server_north.c +++ b/opal/mca/pmix/pmix1xx/pmix1_server_north.c @@ -66,7 +66,8 @@ static pmix_status_t server_lookup_fn(const pmix_proc_t *proc, pmix_data_range_t const pmix_info_t info[], size_t ninfo, char **keys, pmix_lookup_cbfunc_t cbfunc, void *cbdata); static pmix_status_t server_unpublish_fn(const pmix_proc_t *proc, - pmix_data_range_t scope, char **keys, + pmix_data_range_t scope, + const pmix_info_t info[], size_t ninfo, char **keys, pmix_op_cbfunc_t cbfunc, void *cbdata); static pmix_status_t server_spawn_fn(const pmix_proc_t *proc, const pmix_info_t job_info[], size_t ninfo, @@ -251,7 +252,7 @@ static pmix_status_t server_fencenb_fn(const pmix_proc_t procs[], size_t nprocs, pmix1_opalcaddy_t *opalcaddy; size_t n; opal_namelist_t *nm; - opal_pmix_info_t *iptr; + opal_value_t *iptr; int rc; if (NULL == host_module || NULL == host_module->fence_nb) { @@ -280,10 +281,10 @@ static pmix_status_t server_fencenb_fn(const pmix_proc_t procs[], size_t nprocs, /* convert the array of pmix_info_t to the list of info */ for (n=0; n < ninfo; n++) { - iptr = OBJ_NEW(opal_pmix_info_t); + iptr = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &iptr->super); iptr->key = strdup(info[n].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&iptr->value, &info[n].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(iptr, &info[n].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } @@ -305,7 +306,7 @@ static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *p, int rc; pmix1_opalcaddy_t *opalcaddy; opal_process_name_t proc; - opal_pmix_info_t *iptr; + opal_value_t *iptr; size_t n; if (NULL == host_module || NULL == host_module->direct_modex) { @@ -329,10 +330,10 @@ static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *p, /* convert the array of pmix_info_t to the list of info */ for (n=0; n < ninfo; n++) { - iptr = OBJ_NEW(opal_pmix_info_t); + iptr = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &iptr->super); iptr->key = strdup(info[n].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&iptr->value, &info[n].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(iptr, &info[n].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } @@ -360,7 +361,7 @@ static pmix_status_t server_publish_fn(const pmix_proc_t *p, opal_process_name_t proc; opal_pmix_data_range_t oscp; opal_pmix_persistence_t opers; - opal_pmix_info_t *oinfo; + opal_value_t *oinfo; if (NULL == host_module || NULL == host_module->publish) { return PMIX_ERR_NOT_SUPPORTED; @@ -393,10 +394,10 @@ static pmix_status_t server_publish_fn(const pmix_proc_t *p, /* convert the info array */ for (n=0; n < ninfo; n++) { - oinfo = OBJ_NEW(opal_pmix_info_t); + oinfo = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &oinfo->super); oinfo->key = strdup(info[n].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &info[n].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &info[n].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } @@ -432,7 +433,7 @@ static void opal_lkupcbfunc(int status, /* convert the jobid */ (void)snprintf(d[n].proc.nspace, PMIX_MAX_NSLEN, opal_convert_jobid_to_string(p->proc.jobid)); d[n].proc.rank = p->proc.vpid; - (void)strncpy(d[n].key, p->key, PMIX_MAX_KEYLEN); + (void)strncpy(d[n].key, p->value.key, PMIX_MAX_KEYLEN); pmix1_value_load(&d[n].value, &p->value); } } @@ -449,7 +450,7 @@ static pmix_status_t server_lookup_fn(const pmix_proc_t *p, pmix_data_range_t sc pmix1_opalcaddy_t *opalcaddy; opal_pmix_data_range_t oscp; opal_process_name_t proc; - opal_pmix_info_t *iptr; + opal_value_t *iptr; size_t n; if (NULL == host_module || NULL == host_module->lookup) { @@ -478,10 +479,10 @@ static pmix_status_t server_lookup_fn(const pmix_proc_t *p, pmix_data_range_t sc /* convert the array of pmix_info_t to the list of info */ for (n=0; n < ninfo; n++) { - iptr = OBJ_NEW(opal_pmix_info_t); + iptr = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &iptr->super); iptr->key = strdup(info[n].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&iptr->value, &info[n].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(iptr, &info[n].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } @@ -498,13 +499,16 @@ static pmix_status_t server_lookup_fn(const pmix_proc_t *p, pmix_data_range_t sc static pmix_status_t server_unpublish_fn(const pmix_proc_t *p, - pmix_data_range_t scope, char **keys, + pmix_data_range_t scope, + const pmix_info_t info[], size_t ninfo, char **keys, pmix_op_cbfunc_t cbfunc, void *cbdata) { int rc; pmix1_opalcaddy_t *opalcaddy; opal_process_name_t proc; opal_pmix_data_range_t oscp; + opal_value_t *iptr; + size_t n; if (NULL == host_module || NULL == host_module->unpublish) { return PMIX_SUCCESS; @@ -530,8 +534,19 @@ static pmix_status_t server_unpublish_fn(const pmix_proc_t *p, opalcaddy->opcbfunc = cbfunc; opalcaddy->cbdata = cbdata; + /* convert the array of pmix_info_t to the list of info */ + for (n=0; n < ninfo; n++) { + iptr = OBJ_NEW(opal_value_t); + opal_list_append(&opalcaddy->info, &iptr->super); + iptr->key = strdup(info[n].key); + if (OPAL_SUCCESS != (rc = pmix1_value_unload(iptr, &info[n].value))) { + OBJ_RELEASE(opalcaddy); + return pmix1_convert_opalrc(rc); + } + } + /* pass it up */ - rc = host_module->unpublish(&proc, oscp, keys, opal_opcbfunc, opalcaddy); + rc = host_module->unpublish(&proc, oscp, &opalcaddy->info, keys, opal_opcbfunc, opalcaddy); if (OPAL_SUCCESS != rc) { OBJ_RELEASE(opalcaddy); } @@ -562,7 +577,7 @@ static pmix_status_t server_spawn_fn(const pmix_proc_t *p, pmix1_opalcaddy_t *opalcaddy; opal_process_name_t proc; opal_pmix_app_t *app; - opal_pmix_info_t *oinfo; + opal_value_t *oinfo; size_t k, n; int rc; @@ -587,10 +602,10 @@ static pmix_status_t server_spawn_fn(const pmix_proc_t *p, /* convert the job info */ for (k=0; k < ninfo; k++) { - oinfo = OBJ_NEW(opal_pmix_info_t); + oinfo = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &oinfo->super); oinfo->key = strdup(job_info[k].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &job_info[k].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &job_info[k].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } @@ -612,10 +627,10 @@ static pmix_status_t server_spawn_fn(const pmix_proc_t *p, } app->maxprocs = apps[n].maxprocs; for (k=0; k < apps[n].ninfo; k++) { - oinfo = OBJ_NEW(opal_pmix_info_t); + oinfo = OBJ_NEW(opal_value_t); opal_list_append(&app->info, &oinfo->super); oinfo->key = strdup(apps[n].info[k].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &apps[n].info[k].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &apps[n].info[k].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } @@ -641,7 +656,7 @@ static pmix_status_t server_connect_fn(const pmix_proc_t procs[], size_t nprocs, pmix1_opalcaddy_t *opalcaddy; opal_namelist_t *nm; size_t n; - opal_pmix_info_t *oinfo; + opal_value_t *oinfo; if (NULL == host_module || NULL == host_module->connect) { return PMIX_ERR_NOT_SUPPORTED; @@ -669,10 +684,10 @@ static pmix_status_t server_connect_fn(const pmix_proc_t procs[], size_t nprocs, /* convert the info */ for (n=0; n < ninfo; n++) { - oinfo = OBJ_NEW(opal_pmix_info_t); + oinfo = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &oinfo->super); oinfo->key = strdup(info[n].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &info[n].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &info[n].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } @@ -696,7 +711,7 @@ static pmix_status_t server_disconnect_fn(const pmix_proc_t procs[], size_t npro pmix1_opalcaddy_t *opalcaddy; opal_namelist_t *nm; size_t n; - opal_pmix_info_t *oinfo; + opal_value_t *oinfo; if (NULL == host_module || NULL == host_module->disconnect) { return PMIX_ERR_NOT_SUPPORTED; @@ -724,10 +739,10 @@ static pmix_status_t server_disconnect_fn(const pmix_proc_t procs[], size_t npro /* convert the info */ for (n=0; n < ninfo; n++) { - oinfo = OBJ_NEW(opal_pmix_info_t); + oinfo = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &oinfo->super); oinfo->key = strdup(info[n].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &info[n].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &info[n].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } @@ -747,7 +762,7 @@ static pmix_status_t server_register_events(const pmix_info_t info[], size_t nin { pmix1_opalcaddy_t *opalcaddy; size_t n; - opal_pmix_info_t *oinfo; + opal_value_t *oinfo; int rc; /* setup the caddy */ @@ -757,10 +772,10 @@ static pmix_status_t server_register_events(const pmix_info_t info[], size_t nin /* convert the info */ for (n=0; n < ninfo; n++) { - oinfo = OBJ_NEW(opal_pmix_info_t); + oinfo = OBJ_NEW(opal_value_t); opal_list_append(&opalcaddy->info, &oinfo->super); oinfo->key = strdup(info[n].key); - if (OPAL_SUCCESS != (rc = pmix1_value_unload(&oinfo->value, &info[n].value))) { + if (OPAL_SUCCESS != (rc = pmix1_value_unload(oinfo, &info[n].value))) { OBJ_RELEASE(opalcaddy); return pmix1_convert_opalrc(rc); } diff --git a/opal/mca/pmix/pmix_server.h b/opal/mca/pmix/pmix_server.h index 19ec63aa4b..1acf1753b3 100644 --- a/opal/mca/pmix/pmix_server.h +++ b/opal/mca/pmix/pmix_server.h @@ -58,7 +58,7 @@ typedef int (*opal_pmix_server_abort_fn_t)(opal_process_name_t *proc, void *serv * in the fence[_nb] operation. The callback is to be executed once each daemon * hosting at least one participant has called the host server's fencenb function. * - * The list of opal_pmix_info_t includes any directives from the user regarding + * The list of opal_value_t includes any directives from the user regarding * how the fence is to be executed (e.g., timeout limits). * * The provided data is to be collectively shared with all host @@ -73,7 +73,7 @@ typedef int (*opal_pmix_server_fencenb_fn_t)(opal_list_t *procs, opal_list_t *in * PMIx server on the remote node that hosts the specified proc to * obtain and return a direct modex blob for that proc * - * The list of opal_pmix_info_t includes any directives from the user regarding + * The list of opal_value_t includes any directives from the user regarding * how the operation is to be executed (e.g., timeout limits). */ typedef int (*opal_pmix_server_dmodex_req_fn_t)(opal_process_name_t *proc, opal_list_t *info, @@ -92,7 +92,7 @@ typedef int (*opal_pmix_server_dmodex_req_fn_t)(opal_process_name_t *proc, opal_ * process is also provided and is expected to be returned on any subsequent * lookup request */ typedef int (*opal_pmix_server_publish_fn_t)(opal_process_name_t *proc, - opal_pmix_data_range_t scope, + opal_pmix_data_range_t range, opal_pmix_persistence_t persist, opal_list_t *info, opal_pmix_op_cbfunc_t cbfunc, void *cbdata); @@ -106,12 +106,12 @@ typedef int (*opal_pmix_server_publish_fn_t)(opal_process_name_t *proc, * before executing the callback function, or should callback with whatever * data is immediately available. * - * The list of opal_pmix_info_t includes any directives from the user regarding + * The list of opal_value_t includes any directives from the user regarding * how the operation is to be executed (e.g., timeout limits, whether the * lookup should wait until data appears). */ typedef int (*opal_pmix_server_lookup_fn_t)(opal_process_name_t *proc, - opal_pmix_data_range_t scope, + opal_pmix_data_range_t range, opal_list_t *info, char **keys, opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata); @@ -120,7 +120,8 @@ typedef int (*opal_pmix_server_lookup_fn_t)(opal_process_name_t *proc, * been published. The callback is to be executed upon completion of the delete * procedure */ typedef int (*opal_pmix_server_unpublish_fn_t)(opal_process_name_t *proc, - opal_pmix_data_range_t scope, char **keys, + opal_pmix_data_range_t range, + opal_list_t *info, char **keys, opal_pmix_op_cbfunc_t cbfunc, void *cbdata); /* Spawn a set of applications/processes as per the PMIx API. Note that @@ -142,7 +143,7 @@ typedef int (*opal_pmix_server_spawn_fn_t)(opal_process_name_t *requestor, * set of procs at a time. However, a process *can* be simultaneously engaged * in multiple connect operations, each involving a different set of procs * - * The list of opal_pmix_info_t includes any directives from the user regarding + * The list of opal_value_t includes any directives from the user regarding * how the operation is to be executed (e.g., timeout limits). */ typedef int (*opal_pmix_server_connect_fn_t)(opal_list_t *procs, opal_list_t *info, @@ -155,7 +156,7 @@ typedef int (*opal_pmix_server_connect_fn_t)(opal_list_t *procs, opal_list_t *in * disconnect - i.e., you have to fully disconnect before you can reconnect to the * same group of processes. * - * The list of opal_pmix_info_t includes any directives from the user regarding + * The list of opal_value_t includes any directives from the user regarding * how the operation is to be executed (e.g., timeout limits). */ typedef int (*opal_pmix_server_disconnect_fn_t)(opal_list_t *procs, opal_list_t *info, @@ -165,12 +166,12 @@ typedef int (*opal_pmix_server_disconnect_fn_t)(opal_list_t *procs, opal_list_t * manager may have access to events beyond process failure. In cases where * the client application requests to be notified of such events, the request * will be passed to the PMIx server, which in turn shall pass the request to - * the resource manager. The list of opal_pmix_info_t will describe the + * the resource manager. The list of opal_value_t will describe the * desired events */ typedef int (*opal_pmix_server_register_events_fn_t)(opal_list_t *info, opal_pmix_op_cbfunc_t cbfunc, void *cbdata); - + /* Callback function for incoming connection requests from * local clients */ typedef void (*opal_pmix_connection_cbfunc_t)(int incoming_sd); diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index 84162835cd..d6fa76b6dd 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -28,6 +28,10 @@ BEGIN_C_DECLS * these keys are RESERVED */ #define OPAL_PMIX_ATTR_UNDEF NULL +/* identification attributes */ +#define OPAL_PMIX_USERID "pmix.euid" // (uint32_t) effective user id +#define OPAL_PMIX_GRPID "pmix.egid" // (uint32_t) effective group id + #define OPAL_PMIX_CPUSET "pmix.cpuset" // (char*) hwloc bitmap applied to proc upon launch #define OPAL_PMIX_CREDENTIAL "pmix.cred" // (char*) security credential assigned to proc #define OPAL_PMIX_SPAWNED "pmix.spawned" // (bool) true if this proc resulted from a call to PMIx_Spawn @@ -159,19 +163,15 @@ typedef enum { /**** PMIX INFO STRUCT ****/ -typedef struct { - opal_list_item_t super; - char *key; - opal_value_t value; -} opal_pmix_info_t; -OBJ_CLASS_DECLARATION(opal_pmix_info_t); + +/* NOTE: the pmix_info_t is essentially equivalent to the opal_value_t + * Hence, we do not define an opal_value_t */ /**** PMIX LOOKUP RETURN STRUCT ****/ typedef struct { opal_list_item_t super; opal_process_name_t proc; - char *key; opal_value_t value; } opal_pmix_pdata_t; OBJ_CLASS_DECLARATION(opal_pmix_pdata_t); @@ -256,7 +256,7 @@ typedef void (*opal_pmix_lookup_cbfunc_t)(int status, * It is the responsibility of the application to parse any provided info array * for defined key-values if it so desires. * - * Possible uses of a pmix_info_t object include: + * Possible uses of the opal_value_t list include: * * - for the RM to alert the process as to planned actions, such as * to abort the session, in response to the reported error diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index a57613b16a..845ffe3f45 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -187,6 +187,10 @@ int pmix_server_init(void) orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP, ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL); + /* setup recv for replies from data server */ + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT, + ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL); + /* setup the local server */ if (ORTE_SUCCESS != (rc = opal_pmix.server_init(&pmix_server))) { ORTE_ERROR_LOG(rc); diff --git a/orte/orted/pmix/pmix_server_dyn.c b/orte/orted/pmix/pmix_server_dyn.c index 3220ccfea6..bf443ecbf2 100644 --- a/orte/orted/pmix/pmix_server_dyn.c +++ b/orte/orted/pmix/pmix_server_dyn.c @@ -156,7 +156,7 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, orte_job_t *jdata; orte_app_context_t *app; opal_pmix_app_t *papp; - opal_pmix_info_t *info; + opal_value_t *info; int rc; char cwd[OPAL_PATH_MAX]; @@ -169,14 +169,14 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, jdata = OBJ_NEW(orte_job_t); /* transfer the job info across */ - OPAL_LIST_FOREACH(info, job_info, opal_pmix_info_t) { + OPAL_LIST_FOREACH(info, job_info, opal_value_t) { if (0 == strcmp(info->key, OPAL_PMIX_PERSONALITY)) { - jdata->personality = strdup(info->value.data.string); + jdata->personality = strdup(info->data.string); } else if (0 == strcmp(info->key, OPAL_PMIX_MAPPER)) { if (NULL == jdata->map) { jdata->map = OBJ_NEW(orte_job_map_t); } - jdata->map->req_mapper = strdup(info->value.data.string); + jdata->map->req_mapper = strdup(info->data.string); } else if (0 == strcmp(info->key, OPAL_PMIX_DISPLAY_MAP)) { if (NULL == jdata->map) { jdata->map = OBJ_NEW(orte_job_map_t); @@ -189,12 +189,12 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, if (ORTE_MAPPING_POLICY_IS_SET(jdata->map->mapping)) { /* not allowed to provide multiple mapping policies */ orte_show_help("help-orte-rmaps-base.txt", "redefining-policy", - true, "mapping", info->value.data.string, + true, "mapping", info->data.string, orte_rmaps_base_print_mapping(orte_rmaps_base.mapping)); return ORTE_ERR_BAD_PARAM; } ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_PPR); - jdata->map->ppr = strdup(info->value.data.string); + jdata->map->ppr = strdup(info->data.string); } else if (0 == strcmp(info->key, OPAL_PMIX_MAPBY)) { if (NULL == jdata->map) { jdata->map = OBJ_NEW(orte_job_map_t); @@ -202,12 +202,12 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, if (ORTE_MAPPING_POLICY_IS_SET(jdata->map->mapping)) { /* not allowed to provide multiple mapping policies */ orte_show_help("help-orte-rmaps-base.txt", "redefining-policy", - true, "mapping", info->value.data.string, + true, "mapping", info->data.string, orte_rmaps_base_print_mapping(orte_rmaps_base.mapping)); return ORTE_ERR_BAD_PARAM; } rc = orte_rmaps_base_set_mapping_policy(&jdata->map->mapping, - NULL, info->value.data.string); + NULL, info->data.string); if (ORTE_SUCCESS != rc) { return rc; } @@ -218,13 +218,13 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, if (ORTE_RANKING_POLICY_IS_SET(jdata->map->ranking)) { /* not allowed to provide multiple ranking policies */ orte_show_help("help-orte-rmaps-base.txt", "redefining-policy", - true, "ranking", info->value.data.string, + true, "ranking", info->data.string, orte_rmaps_base_print_ranking(orte_rmaps_base.ranking)); return ORTE_ERR_BAD_PARAM; } rc = orte_rmaps_base_set_ranking_policy(&jdata->map->ranking, jdata->map->mapping, - info->value.data.string); + info->data.string); if (ORTE_SUCCESS != rc) { return rc; } @@ -236,12 +236,12 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, if (OPAL_BINDING_POLICY_IS_SET(jdata->map->binding)) { /* not allowed to provide multiple mapping policies */ orte_show_help("help-opal-hwloc-base.txt", "redefining-policy", true, - info->value.data.string, + info->data.string, opal_hwloc_base_print_binding(opal_hwloc_binding_policy)); return ORTE_ERR_BAD_PARAM; } rc = opal_hwloc_base_set_binding_policy(&jdata->map->binding, - info->value.data.string); + info->data.string); if (ORTE_SUCCESS != rc) { return rc; } @@ -250,12 +250,12 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, orte_set_attribute(&jdata->attributes, ORTE_JOB_NON_ORTE_JOB, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); } else if (0 == strcmp(info->key, OPAL_PMIX_STDIN_TGT)) { - if (0 == strcmp(info->value.data.string, "all")) { + if (0 == strcmp(info->data.string, "all")) { jdata->stdin_target = ORTE_VPID_WILDCARD; - } else if (0 == strcmp(info->value.data.string, "none")) { + } else if (0 == strcmp(info->data.string, "none")) { jdata->stdin_target = ORTE_VPID_INVALID; } else { - jdata->stdin_target = strtoul(info->value.data.string, NULL, 10); + jdata->stdin_target = strtoul(info->data.string, NULL, 10); } } else { /* unrecognized key */ @@ -273,26 +273,26 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, app->argv = opal_argv_copy(papp->argv); app->env = opal_argv_copy(papp->env); app->num_procs = papp->maxprocs; - OPAL_LIST_FOREACH(info, &papp->info, opal_pmix_info_t) { + OPAL_LIST_FOREACH(info, &papp->info, opal_value_t) { if (0 == strcmp(info->key, OPAL_PMIX_HOST)) { orte_set_attribute(&app->attributes, ORTE_APP_DASH_HOST, - ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING); + ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING); } else if (0 == strcmp(info->key, OPAL_PMIX_HOSTFILE)) { orte_set_attribute(&app->attributes, ORTE_APP_HOSTFILE, - ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING); + ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING); } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_HOSTFILE)) { orte_set_attribute(&app->attributes, ORTE_APP_ADD_HOSTFILE, - ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING); + ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING); } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_HOST)) { orte_set_attribute(&app->attributes, ORTE_APP_ADD_HOST, - ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING); + ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING); } else if (0 == strcmp(info->key, OPAL_PMIX_PREFIX)) { orte_set_attribute(&app->attributes, ORTE_APP_PREFIX_DIR, - ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING); + ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING); } else if (0 == strcmp(info->key, OPAL_PMIX_WDIR)) { /* if this is a relative path, convert it to an absolute path */ - if (opal_path_is_absolute(info->value.data.string)) { - app->cwd = strdup(info->value.data.string); + if (opal_path_is_absolute(info->data.string)) { + app->cwd = strdup(info->data.string); } else { /* get the cwd */ if (OPAL_SUCCESS != (rc = opal_getcwd(cwd, sizeof(cwd)))) { @@ -301,14 +301,14 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, return rc; } /* construct the absolute path */ - app->cwd = opal_os_path(false, cwd, info->value.data.string, NULL); + app->cwd = opal_os_path(false, cwd, info->data.string, NULL); } } else if (0 == strcmp(info->key, OPAL_PMIX_PRELOAD_BIN)) { orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN, ORTE_ATTR_GLOBAL, NULL, OPAL_BOOL); } else if (0 == strcmp(info->key, OPAL_PMIX_PRELOAD_FILES)) { orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES, - ORTE_ATTR_GLOBAL, info->value.data.string, OPAL_STRING); + ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING); } else { /* unrecognized key */ orte_show_help("help-orted.txt", "bad-key", diff --git a/orte/orted/pmix/pmix_server_internal.h b/orte/orted/pmix/pmix_server_internal.h index 9e239367fe..6d327453cb 100644 --- a/orte/orted/pmix/pmix_server_internal.h +++ b/orte/orted/pmix/pmix_server_internal.h @@ -155,7 +155,8 @@ extern int pmix_server_lookup_fn(opal_process_name_t *proc, opal_list_t *info, char **keys, opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata); extern int pmix_server_unpublish_fn(opal_process_name_t *proc, - opal_pmix_data_range_t range, char **keys, + opal_pmix_data_range_t range, + opal_list_t *info, char **keys, opal_pmix_op_cbfunc_t cbfunc, void *cbdata); extern int pmix_server_spawn_fn(opal_process_name_t *requestor, opal_list_t *job_info, opal_list_t *apps, @@ -168,10 +169,15 @@ extern int pmix_server_register_events_fn(opal_list_t *info, opal_pmix_op_cbfunc_t cbfunc, void *cbdata); +/* declare the RML recv functions for responses */ extern void pmix_server_launch_resp(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tg, void *cbdata); +extern void pmix_server_keyval_client(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tg, void *cbdata); + /* exposed shared variables */ typedef struct { bool initialized; diff --git a/orte/orted/pmix/pmix_server_pub.c b/orte/orted/pmix/pmix_server_pub.c index 49661436b0..d98fd508e4 100644 --- a/orte/orted/pmix/pmix_server_pub.c +++ b/orte/orted/pmix/pmix_server_pub.c @@ -37,16 +37,12 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/util/name_fns.h" +#include "orte/runtime/orte_data_server.h" #include "orte/runtime/orte_globals.h" #include "orte/mca/rml/rml.h" #include "pmix_server_internal.h" -#define ORTE_PMIX_PUBLISH_CMD 0x01 -#define ORTE_PMIX_LOOKUP_CMD 0x02 -#define ORTE_PMIX_UNPUBLISH_CMD 0x03 - - static void execute(int sd, short args, void *cbdata) { pmix_server_req_t *req = (pmix_server_req_t*)cbdata; @@ -98,10 +94,12 @@ int pmix_server_publish_fn(opal_process_name_t *proc, int rc; uint8_t cmd = ORTE_PMIX_PUBLISH_CMD; int32_t ninfo; - opal_pmix_info_t *iptr; + opal_value_t *iptr; /* create the caddy */ req = OBJ_NEW(pmix_server_req_t); + req->opcbfunc = cbfunc; + req->cbdata = cbdata; /* load the command */ if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) { @@ -147,13 +145,8 @@ int pmix_server_publish_fn(opal_process_name_t *proc, } /* if we have items, pack those too */ - OPAL_LIST_FOREACH(iptr, info, opal_pmix_info_t) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr->key, 1, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(req); - return rc; - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr->value, 1, OPAL_VALUE))) { + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(req); return rc; @@ -178,8 +171,13 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, pmix_server_req_t *req; int rc; uint8_t cmd = ORTE_PMIX_LOOKUP_CMD; - int32_t nkeys; - opal_pmix_info_t *iptr; + int32_t nkeys, i; + int32_t ninfo; + opal_value_t *iptr; + + /* the list of info objects are directives for us - they include + * things like timeout constraints, so there is no reason to + * forward them to the server */ /* create the caddy */ req = OBJ_NEW(pmix_server_req_t); @@ -193,13 +191,6 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, return rc; } - /* pack the name of the requestor */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, proc, 1, OPAL_NAME))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(req); - return rc; - } - /* pack the range */ if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); @@ -214,26 +205,20 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, req->target = *ORTE_PROC_MY_HNP; } - /* pack the number of info objects */ - nkeys = opal_list_get_size(info); - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) { + /* pack the number of info items */ + ninfo = opal_list_get_size(info); + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &ninfo, 1, OPAL_UINT32))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(req); return rc; } - /* pack the objects */ - if (0 < nkeys) { - OPAL_LIST_FOREACH(iptr, info, opal_pmix_info_t) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr->key, 1, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(req); - return rc; - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr->value, 1, OPAL_VALUE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(req); - return rc; - } + + /* if we have items, pack those too */ + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(req); + return rc; } } @@ -246,10 +231,12 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, } /* pack the keys too */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, keys, nkeys, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(req); - return rc; + for (i=0; i < nkeys; i++) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &keys[i], 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(req); + return rc; + } } /* thread-shift so we can store the tracker */ @@ -262,16 +249,20 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, } int pmix_server_unpublish_fn(opal_process_name_t *proc, - opal_pmix_data_range_t range, char **keys, + opal_pmix_data_range_t range, + opal_list_t *info, char **keys, opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { pmix_server_req_t *req; int rc; uint8_t cmd = ORTE_PMIX_UNPUBLISH_CMD; - uint32_t nkeys; + uint32_t nkeys, ninfo; + opal_value_t *iptr; /* create the caddy */ req = OBJ_NEW(pmix_server_req_t); + req->opcbfunc = cbfunc; + req->cbdata = cbdata; /* load the command */ if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &cmd, 1, OPAL_UINT8))) { @@ -301,6 +292,22 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc, req->target = *ORTE_PROC_MY_HNP; } + /* pack the number of info items */ + ninfo = opal_list_get_size(info); + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &ninfo, 1, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(req); + return rc; + } + + /* if we have items, pack those too */ + OPAL_LIST_FOREACH(iptr, info, opal_value_t) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &iptr, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(req); + return rc; + } + } /* pack the number of keys */ nkeys = opal_argv_count(keys); if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) { @@ -325,94 +332,19 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc, return OPAL_SUCCESS; } -void pmix_server_keyval_srvr(int status, orte_process_name_t* sender, - opal_buffer_t *buffer, - orte_rml_tag_t tg, void *cbdata) -{ - uint8_t cmd; - int range, room_num, cnt, rc; - opal_buffer_t *answer; - opal_process_name_t proc; - - /* setup the answer */ - answer = OBJ_NEW(opal_buffer_t); - - /* unpack the remote room number */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(answer); - return; - } - /* save it for the return */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &room_num, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(answer); - return; - } - - /* unpack the cmd */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, OPAL_UINT8))) { - ORTE_ERROR_LOG(rc); - goto release; - } - - /* unpack the name of the publisher/requestor */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &proc, &cnt, OPAL_NAME))) { - ORTE_ERROR_LOG(rc); - goto release; - } - - /* unpack the range */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &cnt, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - goto release; - } - - switch(cmd) { - case ORTE_PMIX_PUBLISH_CMD: - /* unpack the publisher */ - /* unpack the range */ - /* unpack the persistence */ - /* unpack the key */ - /* unpack the value */ - case ORTE_PMIX_LOOKUP_CMD: - case ORTE_PMIX_UNPUBLISH_CMD: - break; - default: - break; - } - - release: - /* pack the error code */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rc, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - return; - } - - /* send back the reply */ - rc = orte_rml.send_buffer_nb(sender, answer, - ORTE_RML_TAG_DATA_CLIENT, - orte_rml_send_callback, NULL); - if (ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(answer); - } -} - void pmix_server_keyval_client(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tg, void *cbdata) { int rc, ret, room_num; - int32_t cnt, ninfo, n; + int32_t cnt; pmix_server_req_t *req; - opal_list_t *info = NULL; - opal_pmix_info_t *iptr; + opal_list_t info; + opal_value_t *iptr; + opal_pmix_pdata_t *pdata; + opal_process_name_t source; + OBJ_CONSTRUCT(&info, opal_list_t); /* unpack the room number of the request tracker */ cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) { @@ -428,26 +360,25 @@ void pmix_server_keyval_client(int status, orte_process_name_t* sender, goto release; } - /* see if any data was included - not an error if the answer is no */ - cnt = 1; - rc = opal_dss.unpack(buffer, &ninfo, &cnt, OPAL_INT32); - if (ORTE_SUCCESS == rc && 0 < ninfo) { - info = OBJ_NEW(opal_list_t); - for (n=0; n < ninfo; n++) { - iptr = OBJ_NEW(opal_pmix_info_t); - opal_list_append(info, &iptr->super); - rc = opal_dss.unpack(buffer, &iptr->key, &cnt, OPAL_STRING); - if (OPAL_SUCCESS != rc) { - ret = rc; - OPAL_LIST_RELEASE(info); - info = NULL; + if (ORTE_SUCCESS == ret) { + /* see if any data was included - not an error if the answer is no */ + cnt = 1; + while (OPAL_SUCCESS == opal_dss.unpack(buffer, &source, &cnt, OPAL_NAME)) { + pdata = OBJ_NEW(opal_pmix_pdata_t); + pdata->proc = source; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &cnt, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(pdata); + continue; } - rc = opal_dss.unpack(buffer, &iptr->value, &cnt, OPAL_VALUE); - if (OPAL_SUCCESS != rc) { - ret = rc; - OPAL_LIST_RELEASE(info); - info = NULL; + if (OPAL_SUCCESS != (rc = opal_value_xfer(&pdata->value, iptr))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(pdata); + OBJ_RELEASE(iptr); + continue; } + OBJ_RELEASE(iptr); + opal_list_append(&info, &pdata->super); } } @@ -459,11 +390,15 @@ void pmix_server_keyval_client(int status, orte_process_name_t* sender, /* pass down the response */ if (NULL != req->opcbfunc) { req->opcbfunc(ret, req->cbdata); + } else if (NULL != req->lkcbfunc) { + req->lkcbfunc(ret, &info, req->cbdata); } else { - req->lkcbfunc(ret, info, req->cbdata); + /* should not happen */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED); } /* cleanup */ + OPAL_LIST_DESTRUCT(&info); OBJ_RELEASE(req); } } diff --git a/orte/runtime/orte_data_server.c b/orte/runtime/orte_data_server.c index 4a17981271..e944c7a83b 100644 --- a/orte/runtime/orte_data_server.c +++ b/orte/runtime/orte_data_server.c @@ -12,6 +12,7 @@ * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012 Los Alamos National Security, LLC. * All rights reserved + * Copyright (c) 2015 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -31,8 +32,9 @@ #include "opal/util/output.h" #include "opal/class/opal_pointer_array.h" - #include "opal/dss/dss.h" +#include "opal/mca/pmix/pmix_types.h" + #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/rml/rml.h" #include "orte/runtime/orte_globals.h" @@ -52,23 +54,26 @@ typedef struct { * owner can remove it */ orte_process_name_t owner; - /* service name */ - char *service; - /* port */ - char *port; + /* uid of the owner - helps control + * access rights */ + uint32_t uid; + /* characteristics */ + opal_pmix_data_range_t range; + opal_pmix_persistence_t persistence; + /* and the values themselves */ + opal_list_t values; + /* the value itself */ } orte_data_object_t; static void construct(orte_data_object_t *ptr) { ptr->index = -1; - ptr->service = NULL; - ptr->port = NULL; + OBJ_CONSTRUCT(&ptr->values, opal_list_t); } static void destruct(orte_data_object_t *ptr) { - if (NULL != ptr->service) free(ptr->service); - if (NULL != ptr->port) free(ptr->port); + OPAL_LIST_DESTRUCT(&ptr->values); } OBJ_CLASS_INSTANCE(orte_data_object_t, @@ -76,15 +81,14 @@ OBJ_CLASS_INSTANCE(orte_data_object_t, construct, destruct); /* local globals */ -static opal_pointer_array_t *orte_data_server_store=NULL; -static bool recv_issued=false; +static opal_pointer_array_t orte_data_server_store; int orte_data_server_init(void) { int rc; - orte_data_server_store = OBJ_NEW(opal_pointer_array_t); - if (ORTE_SUCCESS != (rc = opal_pointer_array_init(orte_data_server_store, + OBJ_CONSTRUCT(&orte_data_server_store, opal_pointer_array_t); + if (ORTE_SUCCESS != (rc = opal_pointer_array_init(&orte_data_server_store, 1, INT_MAX, 1))) { @@ -92,14 +96,11 @@ int orte_data_server_init(void) return rc; } - if (!recv_issued) { - orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, - ORTE_RML_TAG_DATA_SERVER, - ORTE_RML_PERSISTENT, - orte_data_server, - NULL); - recv_issued = true; - } + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_DATA_SERVER, + ORTE_RML_PERSISTENT, + orte_data_server, + NULL); return ORTE_SUCCESS; } @@ -107,295 +108,350 @@ int orte_data_server_init(void) void orte_data_server_finalize(void) { orte_std_cntr_t i; - orte_data_object_t **data; + orte_data_object_t *data; - if (NULL != orte_data_server_store) { - data = (orte_data_object_t**)orte_data_server_store->addr; - for (i=0; i < orte_data_server_store->size; i++) { - if (NULL != data[i]) OBJ_RELEASE(data[i]); - } - OBJ_RELEASE(orte_data_server_store); - } + orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_SERVER); - if (recv_issued) { - orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_SERVER); - recv_issued = false; - } -} - -static orte_data_object_t *lookup(char *service) -{ - orte_std_cntr_t i; - orte_data_object_t **data; - - data = (orte_data_object_t**)orte_data_server_store->addr; - for (i=0; i < orte_data_server_store->size; i++) { - if (NULL != data[i]) { - if (0 == strcmp(data[i]->service, service)) { - return data[i]; - } + for (i=0; i < orte_data_server_store.size; i++) { + if (NULL != (data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, i))) { + OBJ_RELEASE(data); } } - - /* get here if not found - return NULL */ - return NULL; -} - -static void rml_cbfunc(int status, orte_process_name_t* sender, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata) -{ - OBJ_RELEASE(buffer); + OBJ_DESTRUCT(&orte_data_server_store); } void orte_data_server(int status, orte_process_name_t* sender, opal_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata) { - orte_data_server_cmd_t command; + uint8_t command; orte_std_cntr_t count; - char *service_name, *port_name; + opal_process_name_t requestor; orte_data_object_t *data; opal_buffer_t *answer; - int rc, ret; - bool unique; + int rc, ret, k; + opal_value_t *iptr, *inext; + uint32_t ninfo, i; + char **keys = NULL, *str; + bool ret_packed = false; + int room_number; + uint32_t uid; + opal_pmix_data_range_t range; OPAL_OUTPUT_VERBOSE((1, orte_debug_output, "%s data server got message from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender))); + /* unpack the room number of the caller's request */ count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_DATA_SERVER_CMD))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &room_number, &count, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + return; + } + + /* unpack the command */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, OPAL_UINT8))) { ORTE_ERROR_LOG(rc); return; } answer = OBJ_NEW(opal_buffer_t); + /* pack the room number as this must lead any response */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &room_number, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(answer); + return; + } switch(command) { - case ORTE_DATA_SERVER_PUBLISH: - /* unpack the service name */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto SEND_ERROR; - } - - /* unpack the port name */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &port_name, &count, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto SEND_ERROR; - } - - /* unpack uniqueness flag */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &unique, &count, OPAL_BOOL))) { - ORTE_ERROR_LOG(rc); - goto SEND_ERROR; - } - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: publishing service %s port %s %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, port_name, - unique ? "UNIQUE" : "OVERWRITE")); - - /* check the current data store to see if this service name has already - * been published - */ - if (NULL != (data = lookup(service_name))) { - /* already exists - see if overwrite allowed */ - if (unique) { - /* return ORTE_EXISTS error code */ - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: publishing service %s port %s already exists", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, port_name)); - - ret = ORTE_EXISTS; - } else { - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: overwriting service %s with port %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, port_name)); - if (NULL != data->port) { - free(data->port); - } - data->port = port_name; - data->owner.jobid = sender->jobid; - data->owner.vpid = sender->vpid; - ret = ORTE_SUCCESS; - } - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - } - goto SEND_ANSWER; - - } - - /* create a new data object */ + case ORTE_PMIX_PUBLISH_CMD: data = OBJ_NEW(orte_data_object_t); - - /* pass over the data values - these were malloc'd when unpacked, - * so we don't need to strdup them here - */ - data->service = service_name; - data->port = port_name; - data->owner.jobid = sender->jobid; - data->owner.vpid = sender->vpid; - - /* store the data */ - data->index = opal_pointer_array_add(orte_data_server_store, data); + /* unpack the requestor */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->owner, &count, OPAL_NAME))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(data); + goto SEND_ERROR; + } OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: successfully published service %s port %s", + "%s data server: publishing data from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, port_name)); + ORTE_NAME_PRINT(&data->owner))); + + /* unpack the range */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->range, &count, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(data); + goto SEND_ERROR; + } + /* unpack the persistence */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &data->persistence, &count, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(data); + goto SEND_ERROR; + } + + /* unpack the number of info elements */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(data); + goto SEND_ERROR; + } + + if (0 < ninfo) { + for (i=0; i < ninfo; i++) { + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(data); + goto SEND_ERROR; + } + /* if this is the userid, separate it out */ + if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) { + data->uid = iptr->data.uint32; + OBJ_RELEASE(iptr); + } else { + opal_list_append(&data->values, &iptr->super); + } + } + } + + data->index = opal_pointer_array_add(&orte_data_server_store, data); /* tell the user it was wonderful... */ ret = ORTE_SUCCESS; if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ + * rc value either, so just send whatever is there */ } goto SEND_ANSWER; break; - case ORTE_DATA_SERVER_LOOKUP: - /* unpack the service name */ + case ORTE_PMIX_LOOKUP_CMD: + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: lookup data from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + + /* unpack the range - this sets some constraints on the range of data to be considered */ count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + + /* unpack the number of info elements */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + if (0 < ninfo) { + for (i=0; i < ninfo; i++) { + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + /* if this is the userid, separate it out */ + if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) { + uid = iptr->data.uint32; + } + /* ignore anything else for now */ + OBJ_RELEASE(iptr); + } + } + + /* unpack the number of keys */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + if (0 == ninfo) { + /* they forgot to send us the keys?? */ + rc = ORTE_ERR_BAD_PARAM; + goto SEND_ERROR; + } + + /* unpack the keys */ + for (i=0; i < ninfo; i++) { + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &str, &count, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + opal_argv_free(keys); + goto SEND_ERROR; + } + opal_argv_append_nosize(&keys, str); + free(str); + } + + /* cycle across the provided keys */ + for (i=0; NULL != keys[i]; i++) { + /* cycle across the stored data, looking for a match */ + for (k=0; k < orte_data_server_store.size; k++) { + data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k); + if (NULL == data) { + continue; + } + /* can only access data posted by the same user id */ + if (uid != data->uid) { + continue; + } + /* if the range doesn't match, then we cannot consider it */ + if (range != data->range) { + continue; + } + /* see if we have this key */ + OPAL_LIST_FOREACH(iptr, &data->values, opal_value_t) { + if (0 == strcmp(iptr->key, keys[i])) { + /* found it - package it for return */ + if (!ret_packed) { + ret = ORTE_SUCCESS; + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + opal_argv_free(keys); + goto SEND_ERROR; + } + ret_packed = true; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->owner, 1, OPAL_NAME))) { + ORTE_ERROR_LOG(rc); + opal_argv_free(keys); + goto SEND_ERROR; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &iptr, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + opal_argv_free(keys); + goto SEND_ERROR; + } + } + } + } + } + opal_argv_free(keys); + if (!ret_packed) { + /* nothing was found - indicate that situation */ + rc = ORTE_ERR_NOT_FOUND; + goto SEND_ERROR; + } + goto SEND_ANSWER; + break; + + case ORTE_PMIX_UNPUBLISH_CMD: + /* unpack the requestor */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, OPAL_NAME))) { ORTE_ERROR_LOG(rc); goto SEND_ERROR; } OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: lookup on service %s", + "%s data server: unpublish data from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); + ORTE_NAME_PRINT(&requestor))); - /* locate this record in the data store */ - if (NULL == (data = lookup(service_name))) { - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: service %s not found", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); - - /* return ORTE_ERR_NOT_FOUND error code */ - ret = ORTE_ERR_NOT_FOUND; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - } - goto SEND_ANSWER; - } - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: successful lookup on service %s port %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, data->port)); - - /* pack success so the unpack on the other end can - * always unpack an int first - */ - ret = ORTE_SUCCESS; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - goto SEND_ANSWER; - } - - /* pack the returned port */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->port, 1, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - } - goto SEND_ANSWER; - break; - - case ORTE_DATA_SERVER_UNPUBLISH: - /* unpack the service name */ + /* unpack the range - this sets some constraints on the range of data to be considered */ count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &range, &count, OPAL_INT))) { ORTE_ERROR_LOG(rc); goto SEND_ERROR; } - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: unpublish on service %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); - - /* locate this record in the data store */ - if (NULL == (data = lookup(service_name))) { - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: service %s not found", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); - - /* return ORTE_ERR_NOT_FOUND error code */ - ret = ORTE_ERR_NOT_FOUND; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ + /* unpack the number of info elements */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + if (0 < ninfo) { + for (i=0; i < ninfo; i++) { + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &count, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + /* if this is the userid, separate it out */ + if (0 == strcmp(iptr->key, OPAL_PMIX_USERID)) { + uid = iptr->data.uint32; + } + /* ignore anything else for now */ + OBJ_RELEASE(iptr); } - goto SEND_ANSWER; } - /* check to see if the sender owns it - must be exact match */ - if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, - &data->owner, sender)) { - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: service %s not owned by sender %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, ORTE_NAME_PRINT(sender))); - - /* nope - return ORTE_ERR_PERM error code */ - ret = ORTE_ERR_PERM; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - } - goto SEND_ANSWER; + /* unpack the number of keys */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &count, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + if (0 == ninfo) { + /* they forgot to send us the keys?? */ + rc = ORTE_ERR_BAD_PARAM; + goto SEND_ERROR; } - /* delete the object from the data store */ - opal_pointer_array_set_item(orte_data_server_store, data->index, NULL); - OBJ_RELEASE(data); + /* unpack the keys */ + for (i=0; i < ninfo; i++) { + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &str, &count, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + opal_argv_free(keys); + goto SEND_ERROR; + } + opal_argv_append_nosize(&keys, str); + free(str); + } - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: service %s unpublished", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); + /* cycle across the provided keys */ + for (i=0; NULL != keys[i]; i++) { + /* cycle across the stored data, looking for a match */ + for (k=0; k < orte_data_server_store.size; k++) { + data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k); + if (NULL == data) { + continue; + } + /* can only access data posted by the same user id */ + if (uid != data->uid) { + continue; + } + /* can only access data posted by the same process */ + if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &data->owner, &requestor)) { + continue; + } + /* can only access data posted for the same range */ + if (range != data->range) { + continue; + } + /* see if we have this key */ + OPAL_LIST_FOREACH_SAFE(iptr, inext, &data->values, opal_value_t) { + if (0 == strcmp(iptr->key, keys[i])) { + /* found it - delete the object from the data store */ + opal_list_remove_item(&data->values, &iptr->super); + OBJ_RELEASE(iptr); + } + } + /* if all the data has been removed, then remove the object */ + if (0 == opal_list_get_size(&data->values)) { + opal_pointer_array_set_item(&orte_data_server_store, k, NULL); + OBJ_RELEASE(data); + } + } + } + opal_argv_free(keys); /* tell the sender this succeeded */ ret = ORTE_SUCCESS; if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ } goto SEND_ANSWER; break; @@ -413,7 +469,8 @@ void orte_data_server(int status, orte_process_name_t* sender, } SEND_ANSWER: - if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT, rml_cbfunc, NULL))) { + if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT, + orte_rml_send_callback, NULL))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(answer); } diff --git a/orte/runtime/orte_data_server.h b/orte/runtime/orte_data_server.h index a267f4545d..8981732445 100644 --- a/orte/runtime/orte_data_server.h +++ b/orte/runtime/orte_data_server.h @@ -11,6 +11,7 @@ * All rights reserved. * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2015 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -34,6 +35,11 @@ BEGIN_C_DECLS +#define ORTE_PMIX_PUBLISH_CMD 0x01 +#define ORTE_PMIX_LOOKUP_CMD 0x02 +#define ORTE_PMIX_UNPUBLISH_CMD 0x03 + + /* provide hooks to startup and finalize the data server */ ORTE_DECLSPEC int orte_data_server_init(void); ORTE_DECLSPEC void orte_data_server_finalize(void); @@ -43,17 +49,6 @@ ORTE_DECLSPEC void orte_data_server(int status, orte_process_name_t* sender, opal_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata); -/* define a type and some values for the commands - * to be used with the server - */ -typedef uint8_t orte_data_server_cmd_t; -#define ORTE_DATA_SERVER_CMD OPAL_UINT8 - -#define ORTE_DATA_SERVER_PUBLISH 0x01 -#define ORTE_DATA_SERVER_UNPUBLISH 0x02 -#define ORTE_DATA_SERVER_LOOKUP 0x04 - - END_C_DECLS #endif /* ORTE_DATA_SERVER_H */