1
1

Cleanup connect/disconnect and bring comm_spawn back online!

Этот коммит содержится в:
Ralph Castain 2015-09-05 11:19:41 -07:00
родитель 794ee4a604
Коммит 37c3ed68e7
13 изменённых файлов: 511 добавлений и 244 удалений

Просмотреть файл

@ -112,7 +112,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
const char *port_string, bool send_first,
ompi_communicator_t **newcomm)
{
int size, rsize, rank, rc;
int k, size, rsize, rank, rc;
char **members = NULL, *nstring;
bool dense, isnew;
opal_process_name_t pname;
@ -120,6 +120,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
opal_value_t *info;
opal_pmix_pdata_t *pdat;
opal_namelist_t *nm;
opal_jobid_t jobid;
ompi_communicator_t *newcomp=MPI_COMM_NULL;
ompi_proc_t *proc;
@ -152,6 +153,11 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
}
opal_argv_append_nosize(&members, nstring);
free(nstring);
/* have to add the number of procs in the job so the remote side
* can correctly add the procs by computing their names */
(void)asprintf(&nstring, "%d", size);
opal_argv_append_nosize(&members, nstring);
free(nstring);
} else {
if (OMPI_GROUP_IS_DENSE(group)) {
proc_list = group->grp_proc_pointers;
@ -173,6 +179,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
if (OPAL_SUCCESS != rc) {
if (!dense) {
free(proc_list);
proc_list = NULL;
}
return OMPI_ERROR;
}
@ -181,24 +188,30 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
}
if (!dense) {
free(proc_list);
proc_list = NULL;
}
}
if (rank == root) {
/* the root for each side publishes their list of participants */
OBJ_CONSTRUCT(&ilist, opal_list_t);
/* publish a key-val containing the port string itself
* so other participants from our job can connect */
info = OBJ_NEW(opal_value_t);
info->key = opal_argv_join(members, ':');
info->type = OPAL_STRING;
info->data.string = strdup(port_string);
opal_list_append(&ilist, &info->super);
/* put my name at the front of the list of members - my
* name will therefore be on the list twice, but the
* other side needs to know the root from this side */
* other side's root needs to know the root from this side */
rc = opal_convert_process_name_to_string(&nstring, OMPI_PROC_MY_NAME);
if (OPAL_SUCCESS != rc) {
return OMPI_ERROR;
}
opal_argv_prepend_nosize(&members, nstring);
free(nstring);
/* the root for each side publishes their list of participants */
OBJ_CONSTRUCT(&ilist, opal_list_t);
info = OBJ_NEW(opal_value_t);
opal_list_append(&ilist, &info->super);
if (send_first) {
(void)asprintf(&info->key, "%s:connect", port_string);
} else {
@ -206,16 +219,81 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
}
info->type = OPAL_STRING;
info->data.string = opal_argv_join(members, ':');
/* publish it with "session" scope */
opal_list_append(&ilist, &info->super);
/* publish them with "session" scope */
rc = opal_pmix.publish(&ilist);
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != rc) {
opal_argv_free(members);
return OMPI_ERROR;
}
} else {
/* check to see if we have to get the port string for
* this connect - if we were started via a call to spawn,
* then the port string was given to us. Otherwise, the
* string will be NULL */
if (0 == strlen(port_string)) {
/* our root should have published it */
OBJ_CONSTRUCT(&ilist, opal_list_t);
pdat = OBJ_NEW(opal_pmix_pdata_t);
pdat->value.key = opal_argv_join(members, ':');
opal_list_append(&ilist, &pdat->super);
OBJ_CONSTRUCT(&mlist, opal_list_t);
/* if a non-blocking version of lookup isn't
* available, then use the blocking version */
if (NULL == opal_pmix.lookup_nb) {
rc = opal_pmix.lookup(&ilist, &mlist);
OPAL_LIST_DESTRUCT(&mlist);
if (OPAL_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
OPAL_LIST_DESTRUCT(&ilist);
opal_argv_free(members);
return OMPI_ERROR;
}
} else {
char **keys = NULL;
struct lookup_caddy_t caddy;
opal_argv_append_nosize(&keys, pdat->value.key);
caddy.active = true;
caddy.pdat = pdat;
/* tell it to wait for the data to arrive */
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_WAIT);
info->type = OPAL_BOOL;
info->data.flag = true;
opal_list_append(&mlist, &info->super);
/* give it a decent timeout as we don't know when
* the other side may call connect - it doesn't
* have to be simultaneous */
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_TIMEOUT);
info->type = OPAL_INT;
info->data.integer = (size < 60) ? size : 60;
opal_list_append(&mlist, &info->super);
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
if (OPAL_SUCCESS != rc) {
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&mlist);
opal_argv_free(keys);
opal_argv_free(members);
return OMPI_ERROR;
}
OMPI_WAIT_FOR_COMPLETION(caddy.active);
opal_argv_free(keys);
OPAL_LIST_DESTRUCT(&mlist);
if (OPAL_SUCCESS != caddy.status) {
OMPI_ERROR_LOG(caddy.status);
OPAL_LIST_DESTRUCT(&ilist);
opal_argv_free(members);
return OMPI_ERROR;
}
}
port_string = strdup(pdat->value.data.string);
OPAL_LIST_DESTRUCT(&ilist);
}
}
/* lookup the other side's info - if a non-blocking form
/* lookup the other side's info - if a non-blocking form
* of lookup isn't available, then we use the blocking
* form and trust that the underlying system will WAIT
* until the other side publishes its data */
@ -227,36 +305,57 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
(void)asprintf(&pdat->value.key, "%s:connect", port_string);
}
opal_list_append(&ilist, &pdat->super);
OBJ_CONSTRUCT(&mlist, opal_list_t);
/* if a non-blocking version of lookup isn't
* available, then use the blocking version */
if (NULL == opal_pmix.lookup_nb) {
rc = opal_pmix.lookup(&ilist, NULL);
rc = opal_pmix.lookup(&ilist, &mlist);
OPAL_LIST_DESTRUCT(&mlist);
if (OPAL_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
OPAL_LIST_DESTRUCT(&ilist);
opal_argv_free(members);
return OMPI_ERROR;
}
} else {
/* specifically request that the lookup wait until
* the given data has been published */
char **keys = NULL;
struct lookup_caddy_t caddy;
opal_argv_append_nosize(&keys, pdat->value.key);
caddy.active = true;
caddy.pdat = pdat;
rc = opal_pmix.lookup_nb(keys, NULL, lookup_cbfunc, &caddy);
/* tell it to wait for the data to arrive */
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_WAIT);
info->type = OPAL_BOOL;
info->data.flag = true;
opal_list_append(&mlist, &info->super);
/* give it a decent timeout as we don't know when
* the other side may call connect - it doesn't
* have to be simultaneous */
info = OBJ_NEW(opal_value_t);
info->key = strdup(OPAL_PMIX_TIMEOUT);
info->type = OPAL_INT;
info->data.integer = (size < 60) ? size : 60;
opal_list_append(&mlist, &info->super);
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
if (OPAL_SUCCESS != rc) {
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&mlist);
opal_argv_free(keys);
opal_argv_free(members);
return OMPI_ERROR;
}
OMPI_WAIT_FOR_COMPLETION(caddy.active);
opal_argv_free(keys);
OPAL_LIST_DESTRUCT(&mlist);
if (OPAL_SUCCESS != caddy.status) {
OMPI_ERROR_LOG(caddy.status);
OPAL_LIST_DESTRUCT(&ilist);
opal_argv_free(members);
return OMPI_ERROR;
}
}
/* initiate a list of participants for the connect,
* starting with our own members, remembering to
* skip the first member if we are the root rank */
@ -268,8 +367,35 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
OBJ_CONSTRUCT(&mlist, opal_list_t);
for (i=j; NULL != members[i]; i++) {
nm = OBJ_NEW(opal_namelist_t);
opal_convert_string_to_process_name(&nm->name, members[i]);
opal_list_append(&mlist, &nm->super);
if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
OMPI_ERROR_LOG(rc);
opal_argv_free(members);
OPAL_LIST_DESTRUCT(&mlist);
return rc;
}
/* if the rank is wildcard, then we need to add all procs
* in that job to the list */
if (OPAL_VPID_WILDCARD == nm->name.vpid) {
jobid = nm->name.jobid;
OBJ_RELEASE(nm);
for (k=0; k < size; k++) {
nm = OBJ_NEW(opal_namelist_t);
nm->name.jobid = jobid;
nm->name.vpid = k;
opal_list_append(&mlist, &nm->super);
}
/* now step over the size */
if (NULL == members[i+1]) {
/* this shouldn't happen and is an error */
OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
OPAL_LIST_DESTRUCT(&mlist);
opal_argv_free(members);
return OMPI_ERR_BAD_PARAM;
}
++i;
} else {
opal_list_append(&mlist, &nm->super);
}
}
opal_argv_free(members);
members = NULL;
@ -281,27 +407,80 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
OPAL_LIST_DESTRUCT(&ilist);
/* the first entry is the root for the remote side */
opal_convert_string_to_process_name(&pname, members[0]);
if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&pname, members[0]))) {
OMPI_ERROR_LOG(rc);
opal_argv_free(members);
return rc;
}
/* check the name - it should never be a wildcard, so
* this is just checking for an error */
if (OPAL_VPID_WILDCARD == pname.vpid) {
OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
opal_argv_free(members);
return OMPI_ERR_BAD_PARAM;
}
/* add the list of remote procs to our list, and
* keep a list of them for later */
OBJ_CONSTRUCT(&ilist, opal_list_t);
OBJ_CONSTRUCT(&rlist, opal_list_t);
for (i=1; NULL != members[i]; i++) {
nm = OBJ_NEW(opal_namelist_t);
opal_convert_string_to_process_name(&nm->name, members[i]);
opal_list_append(&mlist, &nm->super);
/* see if this needs to be added to our ompi_proc_t array */
proc = ompi_proc_find_and_add(&nm->name, &isnew);
if (isnew) {
if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
OMPI_ERROR_LOG(rc);
opal_argv_free(members);
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&rlist);
return rc;
}
if (OPAL_VPID_WILDCARD == nm->name.vpid) {
jobid = nm->name.jobid;
OBJ_RELEASE(nm);
/* if the vpid is wildcard, then we are including all ranks
* of that job, and the next entry in members should be the
* number of procs in the job */
if (NULL == members[i+1]) {
/* just protect against the error */
OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
opal_argv_free(members);
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&rlist);
return OMPI_ERR_BAD_PARAM;
}
rsize = strtoul(members[i+1], NULL, 10);
++i;
for (k=0; k < rsize; k++) {
nm = OBJ_NEW(opal_namelist_t);
nm->name.jobid = jobid;
nm->name.vpid = k;
opal_list_append(&mlist, &nm->super);
/* see if this needs to be added to our ompi_proc_t array */
proc = ompi_proc_find_and_add(&nm->name, &isnew);
if (isnew) {
cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
cd->p = proc;
opal_list_append(&ilist, &cd->super);
}
/* either way, add to the remote list */
cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
cd->p = proc;
opal_list_append(&rlist, &cd->super);
}
} else {
opal_list_append(&mlist, &nm->super);
/* see if this needs to be added to our ompi_proc_t array */
proc = ompi_proc_find_and_add(&nm->name, &isnew);
if (isnew) {
cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
cd->p = proc;
opal_list_append(&ilist, &cd->super);
}
/* either way, add to the remote list */
cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
cd->p = proc;
opal_list_append(&ilist, &cd->super);
opal_list_append(&rlist, &cd->super);
}
/* either way, add to the remote list */
cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
cd->p = proc;
opal_list_append(&rlist, &cd->super);
}
opal_argv_free(members);
@ -310,33 +489,37 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
rc = opal_pmix.connect(&mlist);
OPAL_LIST_DESTRUCT(&mlist);
#if 0
/* set the locality of the new procs - the required info should
* have been included in the data exchange */
for (j=0; j < new_proc_len; j++) {
OBJ_CONSTRUCT(&myvals, opal_list_t);
if (OMPI_SUCCESS != (rc = opal_dstore.fetch(opal_dstore_internal,
&new_proc_list[j]->super.proc_name,
OPAL_DSTORE_LOCALITY, &myvals))) {
new_proc_list[j]->super.proc_flags = OPAL_PROC_NON_LOCAL;
} else {
kv = (opal_value_t*)opal_list_get_first(&myvals);
new_proc_list[j]->super.proc_flags = kv->data.uint16;
}
OPAL_LIST_DESTRUCT(&myvals);
}
#endif
if (0 < opal_list_get_size(&ilist)) {
/* convert the list of new procs to a proc_t array */
new_proc_list = (ompi_proc_t**)calloc(opal_list_get_size(&ilist),
sizeof(ompi_proc_t *));
i = 0;
OPAL_LIST_FOREACH(cd, &ilist, ompi_dpm_proct_caddy_t) {
new_proc_list[i++] = cd->p;
opal_value_t *kv;
new_proc_list[i] = cd->p;
/* set the locality */
new_proc_list[i]->super.proc_flags = OPAL_PROC_NON_LOCAL;
/* have to save it for later */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_LOCALITY);
kv->type = OPAL_UINT16;
kv->data.uint16 = OPAL_PROC_NON_LOCAL;
opal_pmix.store_local(&cd->p->super.proc_name, kv);
OBJ_RELEASE(kv); // maintain accounting
/* we can retrieve the hostname at no cost because it
* was provided at connect */
OPAL_MODEX_RECV_VALUE(rc, OPAL_PMIX_HOSTNAME, &new_proc_list[i]->super.proc_name,
(char**)&(new_proc_list[i]->super.proc_hostname), OPAL_STRING);
if (OPAL_SUCCESS != rc) {
/* we can live without it */
new_proc_list[i]->super.proc_hostname = NULL;
}
++i;
}
/* call add_procs on the new ones */
rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist)));
free(new_proc_list);
new_proc_list = NULL;
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
OPAL_LIST_DESTRUCT(&ilist);
@ -415,12 +598,6 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
*/
exit:
if (NULL != proc_list) {
free (proc_list);
}
if (NULL != new_proc_list) {
free (new_proc_list);
}
if (OMPI_SUCCESS != rc) {
if (MPI_COMM_NULL != newcomp && NULL != newcomp) {
OBJ_RETAIN(newcomp);
@ -446,7 +623,7 @@ static int construct_peers(ompi_group_t *group, opal_list_t *peers)
}
/* add to the list of peers */
nm = OBJ_NEW(opal_namelist_t);
nm->name = *(opal_process_name_t*)&proct->super.proc_name;
nm->name = proct->super.proc_name;
/* need to maintain an ordered list to ensure the tracker signatures
* match across all procs */
OPAL_LIST_FOREACH(n2, peers, opal_namelist_t) {
@ -470,7 +647,7 @@ static int construct_peers(ompi_group_t *group, opal_list_t *peers)
}
/* add to the list of peers */
nm = OBJ_NEW(opal_namelist_t);
nm->name = *(opal_process_name_t*)&proct->super.proc_name;
nm->name = proct->super.proc_name;
/* need to maintain an ordered list to ensure the tracker signatures
* match across all procs */
OPAL_LIST_FOREACH(n2, peers, opal_namelist_t) {
@ -519,6 +696,9 @@ int ompi_dpm_disconnect(ompi_communicator_t *comm)
}
opal_pmix.fence(&coll, false);
/* ensure we tell the host RM to disconnect us */
opal_pmix.disconnect(&coll);
OPAL_LIST_DESTRUCT(&coll);
return OMPI_SUCCESS;

Просмотреть файл

@ -156,27 +156,12 @@ int ompi_proc_complete_init(void)
proc->super.proc_flags = u16;
}
if (ompi_process_info.num_procs < ompi_direct_modex_cutoff) {
/* IF the number of procs falls below the specified cutoff,
* then we assume the job is small enough that retrieving
* the hostname (which will typically cause retrieval of
* ALL modex info for this proc) will have no appreciable
* impact on launch scaling
*/
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_HOSTNAME, &proc->super.proc_name,
(char**)&(proc->super.proc_hostname), OPAL_STRING);
if (OPAL_SUCCESS != ret) {
errcode = ret;
break;
}
} else {
/* just set the hostname to NULL for now - we'll fill it in
* as modex_recv's are called for procs we will talk to, thus
* avoiding retrieval of ALL modex info for this proc until
* required. Transports that delay calling modex_recv until
* first message will therefore scale better than those that
* call modex_recv on all procs during init.
*/
/* we can retrieve the hostname at no cost because it
* was provided at startup */
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_HOSTNAME, &proc->super.proc_name,
(char**)&(proc->super.proc_hostname), OPAL_STRING);
if (OPAL_SUCCESS != ret) {
/* we can live without it */
proc->super.proc_hostname = NULL;
}
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT

Просмотреть файл

@ -33,7 +33,7 @@
opal_pmix_base_module_t opal_pmix = { 0 };
bool opal_pmix_collect_all_data = false;
bool opal_pmix_base_allow_delayed_server = false;
int pmix_verbose_output = -1;
int opal_pmix_verbose_output = -1;
static int opal_pmix_base_frame_register(mca_base_register_flag_t flags)
{
@ -59,7 +59,7 @@ static int opal_pmix_base_frame_open(mca_base_open_flag_t flags)
/* ensure the function pointers are NULL */
memset(&opal_pmix, 0, sizeof(opal_pmix));
/* pass across the verbosity */
pmix_verbose_output = opal_pmix_base_framework.framework_output;
opal_pmix_verbose_output = opal_pmix_base_framework.framework_output;
return rc;
}

Просмотреть файл

@ -65,20 +65,14 @@ static int cray_get(const opal_process_name_t *id,
opal_value_t **kv);
static int cray_get_nb(const opal_process_name_t *id, const char *key,
opal_pmix_value_cbfunc_t cbfunc, void *cbdata);
static int cray_publish(opal_pmix_data_range_t scope,
opal_pmix_persistence_t persist,
opal_list_t *info);
static int cray_publish_nb(opal_pmix_data_range_t scope,
opal_pmix_persistence_t persist,
opal_list_t *info,
static int cray_publish(opal_list_t *info);
static int cray_publish_nb(opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
static int cray_lookup(opal_pmix_data_range_t scope,
opal_list_t *data);
static int cray_lookup_nb(opal_pmix_data_range_t scope, int wait,
char **keys,
static int cray_lookup(opal_list_t *data, opal_list_t *info);
static int cray_lookup_nb(char **keys, opal_list_t *info,
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata);
static int cray_unpublish(opal_pmix_data_range_t scope, char **keys);
static int cray_unpublish_nb(opal_pmix_data_range_t scope, char **keys,
static int cray_unpublish(char **keys, opal_list_t *info);
static int cray_unpublish_nb(char **keys, opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
static const char *cray_get_version(void);
static int cray_store_local(const opal_process_name_t *proc,
@ -773,40 +767,34 @@ static int cray_get_nb(const opal_process_name_t *id, const char *key,
return OPAL_ERR_NOT_IMPLEMENTED;
}
static int cray_publish(opal_pmix_data_range_t scope,
opal_pmix_persistence_t persist,
opal_list_t *info)
static int cray_publish(opal_list_t *info)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int cray_publish_nb(opal_pmix_data_range_t scope,
opal_pmix_persistence_t persist,
opal_list_t *info,
static int cray_publish_nb(opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int cray_lookup(opal_pmix_data_range_t scope,
opal_list_t *data)
static int cray_lookup(opal_list_t *data, opal_list_t *info)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int cray_lookup_nb(opal_pmix_data_range_t scope, int wait,
char **keys,
static int cray_lookup_nb(char **keys, opal_list_t *info,
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int cray_unpublish(opal_pmix_data_range_t scope, char **keys)
static int cray_unpublish(char **keys, opal_list_t *info)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int cray_unpublish_nb(opal_pmix_data_range_t scope, char **keys,
static int cray_unpublish_nb(char **keys, opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;

Просмотреть файл

@ -33,6 +33,10 @@
BEGIN_C_DECLS
/* provide access to the framework verbose output without
* exposing the entire base */
extern int opal_pmix_verbose_output;
/**
* Provide a simplified macro for sending data via modex
* to other processes. The macro requires four arguments:
@ -116,15 +120,20 @@ BEGIN_C_DECLS
* is to be returned
* t - the expected data type
*/
#define OPAL_MODEX_RECV_VALUE(r, s, p, d, t) \
do { \
opal_value_t *_kv; \
if (OPAL_SUCCESS != ((r) = opal_pmix.get((p), (s), &(_kv)))) { \
*(d) = NULL; \
} else { \
(r) = opal_value_unload(_kv, (void**)(d), (t)); \
OBJ_RELEASE(_kv); \
} \
#define OPAL_MODEX_RECV_VALUE(r, s, p, d, t) \
do { \
opal_value_t *_kv; \
OPAL_OUTPUT_VERBOSE((1, opal_pmix_verbose_output, \
"%s[%s:%d] MODEX RECV VALUE FOR PROC %s KEY %s", \
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), \
__FILE__, __LINE__, \
OPAL_NAME_PRINT(*(p)), (s))); \
if (OPAL_SUCCESS != ((r) = opal_pmix.get((p), (s), &(_kv)))) { \
*(d) = NULL; \
} else { \
(r) = opal_value_unload(_kv, (void**)(d), (t)); \
OBJ_RELEASE(_kv); \
} \
} while(0);
/**
@ -140,19 +149,24 @@ BEGIN_C_DECLS
* sz - pointer to a location wherein the number of bytes
* in the data object can be returned (size_t)
*/
#define OPAL_MODEX_RECV_STRING(r, s, p, d, sz) \
do { \
opal_value_t *_kv; \
if (OPAL_SUCCESS == ((r) = opal_pmix.get((p), (s), &(_kv))) && \
NULL != _kv) { \
*(d) = _kv->data.bo.bytes; \
*(sz) = _kv->data.bo.size; \
_kv->data.bo.bytes = NULL; /* protect the data */ \
OBJ_RELEASE(_kv); \
} else { \
*(d) = NULL; \
*(sz) = 0; \
} \
#define OPAL_MODEX_RECV_STRING(r, s, p, d, sz) \
do { \
opal_value_t *_kv; \
OPAL_OUTPUT_VERBOSE((1, opal_pmix_verbose_output, \
"%s[%s:%d] MODEX RECV STRING FOR PROC %s KEY %s", \
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), \
__FILE__, __LINE__, \
OPAL_NAME_PRINT(*(p)), (s))); \
if (OPAL_SUCCESS == ((r) = opal_pmix.get((p), (s), &(_kv))) && \
NULL != _kv) { \
*(d) = _kv->data.bo.bytes; \
*(sz) = _kv->data.bo.size; \
_kv->data.bo.bytes = NULL; /* protect the data */ \
OBJ_RELEASE(_kv); \
} else { \
*(d) = NULL; \
*(sz) = 0; \
} \
} while(0);
/**
@ -172,6 +186,11 @@ BEGIN_C_DECLS
do { \
char *_key; \
_key = mca_base_component_to_string((s)); \
OPAL_OUTPUT_VERBOSE((1, opal_pmix_verbose_output, \
"%s[%s:%d] MODEX RECV FOR PROC %s KEY %s", \
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), \
__FILE__, __LINE__, \
OPAL_NAME_PRINT(*(p)), _key)); \
if (NULL == _key) { \
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); \
(r) = OPAL_ERR_OUT_OF_RESOURCE; \

Просмотреть файл

@ -783,12 +783,16 @@ static int recv_connect_ack(int sd)
/* get the current timeout value so we can reset to it */
sz = sizeof(save);
getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz);
if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz)) {
return PMIX_ERR_UNREACH;
}
/* set a timeout on the blocking recv so we don't hang */
tv.tv_sec = 2;
tv.tv_usec = 0;
setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
return PMIX_ERR_UNREACH;
}
/* receive the status reply */
rc = pmix_usock_recv_blocking(sd, (char*)&reply, sizeof(int));
@ -820,7 +824,9 @@ static int recv_connect_ack(int sd)
}
/* return the socket to normal */
setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz);
if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
return PMIX_ERR_UNREACH;
}
return PMIX_SUCCESS;
}

Просмотреть файл

@ -479,8 +479,8 @@ static pmix_server_trkr_t* get_tracker(pmix_proc_t *procs,
size_t nprocs, pmix_cmd_t type)
{
pmix_server_trkr_t *trk;
size_t i;
bool match;
size_t i, j;
size_t matches;
pmix_output_verbose(5, pmix_globals.debug_output,
"get_tracker called with %d procs", (int)nprocs);
@ -504,18 +504,22 @@ static pmix_server_trkr_t* get_tracker(pmix_proc_t *procs,
if (nprocs != trk->npcs) {
continue;
}
if( type != trk->type ){
if (type != trk->type) {
continue;
}
match = true;
matches = 0;
for (i=0; i < nprocs; i++) {
if (0 != strcmp(procs[i].nspace, trk->pcs[i].nspace) ||
procs[i].rank != trk->pcs[i].rank) {
match = false;
break;
/* the procs may be in different order, so we have
* to do an exhaustive search */
for (j=0; j < trk->npcs; j++) {
if (0 == strcmp(procs[i].nspace, trk->pcs[j].nspace) &&
procs[i].rank == trk->pcs[j].rank) {
++matches;
break;
}
}
}
if (match) {
if (trk->npcs == matches) {
return trk;
}
}
@ -549,7 +553,7 @@ static pmix_server_trkr_t* new_tracker(pmix_proc_t *procs,
pmix_nspace_t *nptr, *ns;
pmix_output_verbose(5, pmix_globals.debug_output,
"get_tracker called with %d procs", (int)nprocs);
"new_tracker called with %d procs", (int)nprocs);
/* bozo check - should never happen outside of programmer error */
if (NULL == procs) {
@ -584,8 +588,8 @@ static pmix_server_trkr_t* new_tracker(pmix_proc_t *procs,
}
if (NULL == nptr) {
/* cannot be a local proc */
pmix_output_verbose(8, pmix_globals.debug_output,
"get_tracker: unknown nspace %s",
pmix_output_verbose(5, pmix_globals.debug_output,
"new_tracker: unknown nspace %s",
procs[i].nspace);
continue;
}
@ -594,8 +598,8 @@ static pmix_server_trkr_t* new_tracker(pmix_proc_t *procs,
/* nope, so no point in going further on this one - we'll
* process it once all the procs are known */
all_def = false;
pmix_output_verbose(8, pmix_globals.debug_output,
"get_tracker: all clients not registered nspace %s",
pmix_output_verbose(5, pmix_globals.debug_output,
"new_tracker: all clients not registered nspace %s",
procs[i].nspace);
continue;
}
@ -604,7 +608,8 @@ static pmix_server_trkr_t* new_tracker(pmix_proc_t *procs,
if (procs[i].rank == info->rank ||
PMIX_RANK_WILDCARD == procs[i].rank) {
pmix_output_verbose(5, pmix_globals.debug_output,
"adding local proc %s.%d to tracker", procs[i].nspace, procs[i].rank);
"adding local proc %s.%d to tracker",
info->nptr->nspace, info->rank);
/* add a tracker for this proc - don't need more than
* the nspace pointer and rank */
iptr = PMIX_NEW(pmix_rank_info_t);
@ -1021,7 +1026,6 @@ pmix_status_t pmix_server_publish(pmix_peer_t *peer,
/* call the local server */
(void)strncpy(proc.nspace, peer->info->nptr->nspace, PMIX_MAX_NSLEN);
proc.rank = peer->info->rank;
pmix_output(0, "server passing %d values up", (int)einfo);
rc = pmix_host_server.publish(&proc, info, einfo, cbfunc, cbdata);
cleanup:
@ -1082,7 +1086,6 @@ pmix_status_t pmix_server_lookup(pmix_peer_t *peer,
PMIX_INFO_CREATE(info, einfo);
/* unpack the array of info objects */
if (0 < ninfo) {
PMIX_INFO_CREATE(info, ninfo);
cnt=ninfo;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
@ -1156,7 +1159,6 @@ pmix_status_t pmix_server_unpublish(pmix_peer_t *peer,
PMIX_INFO_CREATE(info, einfo);
/* unpack the array of info objects */
if (0 < ninfo) {
PMIX_INFO_CREATE(info, ninfo);
cnt=ninfo;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
@ -1256,7 +1258,8 @@ pmix_status_t pmix_server_connect(pmix_server_caddy_t *cd,
size_t ninfo=0;
pmix_output_verbose(2, pmix_globals.debug_output,
"recvd CONNECT");
"recvd CONNECT from peer %s:%d",
cd->peer->info->nptr->nspace, cd->peer->info->rank);
if ((disconnect && NULL == pmix_host_server.disconnect) ||
(!disconnect && NULL == pmix_host_server.connect)) {

Просмотреть файл

@ -103,6 +103,9 @@ int pmix1_client_finalize(void)
{
pmix_status_t rc;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client finalize");
/* deregister the errhandler */
PMIx_Deregister_errhandler();
@ -114,6 +117,9 @@ int pmix1_initialized(void)
{
pmix_status_t rc;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client initialized");
rc = PMIx_Initialized();
return pmix1_convert_rc(rc);
}
@ -126,6 +132,9 @@ int pmix1_abort(int flag, const char *msg,
size_t n, cnt=0;
opal_namelist_t *ptr;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client abort");
/* convert the list of procs to an array
* of pmix_proc_t */
if (NULL != procs && 0 < (cnt = opal_list_get_size(procs))) {
@ -173,6 +182,9 @@ int pmix1_fence(opal_list_t *procs, int collect_data)
opal_namelist_t *ptr;
pmix_info_t info, *iptr;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client fence");
/* convert the list of procs to an array
* of pmix_proc_t */
if (NULL != procs && 0 < (cnt = opal_list_get_size(procs))) {
@ -219,6 +231,9 @@ int pmix1_fencenb(opal_list_t *procs, int collect_data,
pmix1_opcaddy_t *op;
pmix_info_t info, *iptr;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client fence_nb");
/* convert the list of procs to an array
* of pmix_proc_t */
if (NULL != procs && 0 < (cnt = opal_list_get_size(procs))) {
@ -264,6 +279,9 @@ int pmix1_put(opal_pmix_scope_t scope,
pmix_value_t kv;
pmix_status_t rc;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client put");
PMIX_VALUE_CONSTRUCT(&kv);
pmix1_value_load(&kv, val);
@ -280,6 +298,11 @@ int pmix1_get(const opal_process_name_t *proc,
pmix_status_t rc;
pmix_proc_t p, *pptr;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"%s PMIx_client get on proc %s key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*proc), key);
/* prep default response */
*val = NULL;
if (NULL != proc) {
@ -345,6 +368,11 @@ int pmix1_getnb(const opal_process_name_t *proc, const char *key,
pmix_status_t rc;
char *tmp;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"%s PMIx_client get_nb on proc %s key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*proc), key);
/* create the caddy */
op = OBJ_NEW(pmix1_opcaddy_t);
op->valcbfunc = cbfunc;
@ -378,6 +406,13 @@ int pmix1_publish(opal_list_t *info)
opal_value_t *iptr;
size_t sz, n;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client publish");
if (NULL == info) {
return OPAL_ERR_BAD_PARAM;
}
sz = opal_list_get_size(info);
if (0 < sz) {
PMIX_INFO_CREATE(pinfo, sz);
@ -387,6 +422,8 @@ int pmix1_publish(opal_list_t *info)
pmix1_value_load(&pinfo[n].value, iptr);
++n;
}
} else {
pinfo = NULL;
}
ret = PMIx_Publish(pinfo, sz);
@ -402,6 +439,13 @@ int pmix1_publishnb(opal_list_t *info,
size_t n;
pmix1_opcaddy_t *op;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client publish_nb");
if (NULL == info) {
return OPAL_ERR_BAD_PARAM;
}
/* create the caddy */
op = OBJ_NEW(pmix1_opcaddy_t);
op->opcbfunc = cbfunc;
@ -433,6 +477,13 @@ int pmix1_lookup(opal_list_t *data, opal_list_t *info)
opal_pmix_pdata_t *d;
opal_value_t *iptr;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client lookup");
if (NULL == data) {
return OPAL_ERR_BAD_PARAM;
}
sz = opal_list_get_size(data);
PMIX_PDATA_CREATE(pdata, sz);
n=0;
@ -440,13 +491,18 @@ int pmix1_lookup(opal_list_t *data, opal_list_t *info)
(void)strncpy(pdata[n++].key, d->value.key, PMIX_MAX_KEYLEN);
}
ninfo = opal_list_get_size(info);
PMIX_INFO_CREATE(pinfo, ninfo);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(pinfo[n++].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&pinfo[n].value, iptr);
++n;
if (NULL != info) {
ninfo = opal_list_get_size(info);
PMIX_INFO_CREATE(pinfo, ninfo);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(pinfo[n++].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&pinfo[n].value, iptr);
++n;
}
} else {
pdata = NULL;
ninfo = 0;
}
ret = PMIx_Lookup(pdata, sz, pinfo, ninfo);
@ -462,7 +518,11 @@ int pmix1_lookup(opal_list_t *data, opal_list_t *info)
PMIX_PDATA_FREE(pdata, sz);
return OPAL_ERR_BAD_PARAM;
}
d->proc.vpid = pdata[n].proc.rank;
if (PMIX_RANK_WILDCARD == pdata[n].proc.rank) {
d->proc.vpid = OPAL_VPID_WILDCARD;
} else {
d->proc.vpid = pdata[n].proc.rank;
}
rc = pmix1_value_unload(&d->value, &pdata[n].value);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
@ -503,7 +563,11 @@ static void lk_cbfunc(pmix_status_t status,
OPAL_ERROR_LOG(rc);
goto release;
}
d->proc.vpid = data[n].proc.rank;
if (PMIX_RANK_WILDCARD == data[n].proc.rank) {
d->proc.vpid = OPAL_VPID_WILDCARD;
} else {
d->proc.vpid = data[n].proc.rank;
}
d->value.key = strdup(data[n].key);
rc = pmix1_value_unload(&d->value, &data[n].value);
if (OPAL_SUCCESS != rc) {
@ -535,19 +599,25 @@ int pmix1_lookupnb(char **keys, opal_list_t *info,
opal_value_t *iptr;
size_t n;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client lookup_nb");
/* create the caddy */
op = OBJ_NEW(pmix1_opcaddy_t);
op->lkcbfunc = cbfunc;
op->cbdata = cbdata;
op->sz = opal_list_get_size(info);
if (0 < op->sz) {
PMIX_INFO_CREATE(op->info, op->sz);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&op->info[n].value, iptr);
++n;
if (NULL != info) {
op->sz = opal_list_get_size(info);
if (0 < op->sz) {
PMIX_INFO_CREATE(op->info, op->sz);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&op->info[n].value, iptr);
++n;
}
}
}
@ -563,13 +633,18 @@ int pmix1_unpublish(char **keys, opal_list_t *info)
pmix_info_t *pinfo;
opal_value_t *iptr;
ninfo = opal_list_get_size(info);
PMIX_INFO_CREATE(pinfo, ninfo);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(pinfo[n++].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&pinfo[n].value, iptr);
++n;
if (NULL != info) {
ninfo = opal_list_get_size(info);
PMIX_INFO_CREATE(pinfo, ninfo);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(pinfo[n++].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&pinfo[n].value, iptr);
++n;
}
} else {
pinfo = NULL;
ninfo = 0;
}
ret = PMIx_Unpublish(keys, pinfo, ninfo);
@ -591,14 +666,16 @@ int pmix1_unpublishnb(char **keys, opal_list_t *info,
op->opcbfunc = cbfunc;
op->cbdata = cbdata;
op->sz = opal_list_get_size(info);
if (0 < op->sz) {
PMIX_INFO_CREATE(op->info, op->sz);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&op->info[n].value, iptr);
++n;
if (NULL != info) {
op->sz = opal_list_get_size(info);
if (0 < op->sz) {
PMIX_INFO_CREATE(op->info, op->sz);
n=0;
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
(void)strncpy(op->info[n].key, iptr->key, PMIX_MAX_KEYLEN);
pmix1_value_load(&op->info[n].value, iptr);
++n;
}
}
}
@ -744,7 +821,11 @@ int pmix1_connect(opal_list_t *procs)
strname = opal_convert_jobid_to_string(ptr->name.jobid);
(void)strncpy(parray[n].nspace, strname, PMIX_MAX_NSLEN);
free(strname);
parray[n].rank = ptr->name.vpid;
if (OPAL_VPID_WILDCARD == ptr->name.vpid) {
parray[n].rank = PMIX_RANK_WILDCARD;
} else {
parray[n].rank = ptr->name.vpid;
}
++n;
}
@ -755,8 +836,8 @@ int pmix1_connect(opal_list_t *procs)
}
int pmix1_connectnb(opal_list_t *procs,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata)
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata)
{
pmix_status_t ret;
size_t n, cnt=0;
@ -783,7 +864,11 @@ int pmix1_connectnb(opal_list_t *procs,
strname = opal_convert_jobid_to_string(ptr->name.jobid);
(void)strncpy(op->procs[n].nspace, strname, PMIX_MAX_NSLEN);
free(strname);
op->procs[n].rank = ptr->name.vpid;
if (OPAL_VPID_WILDCARD == ptr->name.vpid) {
op->procs[n].rank = PMIX_RANK_WILDCARD;
} else {
op->procs[n].rank = ptr->name.vpid;
}
++n;
}
@ -810,7 +895,11 @@ int pmix1_disconnect(opal_list_t *procs)
n=0;
OPAL_LIST_FOREACH(ptr, procs, opal_namelist_t) {
(void)strncpy(parray[n].nspace, opal_convert_jobid_to_string(ptr->name.jobid), PMIX_MAX_NSLEN);
parray[n].rank = ptr->name.vpid;
if (OPAL_VPID_WILDCARD == ptr->name.vpid) {
parray[n].rank = PMIX_RANK_WILDCARD;
} else {
parray[n].rank = ptr->name.vpid;
}
++n;
}
@ -849,7 +938,11 @@ int pmix1_disconnectnb(opal_list_t *procs,
strname = opal_convert_jobid_to_string(ptr->name.jobid);
(void)strncpy(op->procs[n].nspace, strname, PMIX_MAX_NSLEN);
free(strname);
op->procs[n].rank = ptr->name.vpid;
if (OPAL_VPID_WILDCARD == ptr->name.vpid) {
op->procs[n].rank = PMIX_RANK_WILDCARD;
} else {
op->procs[n].rank = ptr->name.vpid;
}
++n;
}

Просмотреть файл

@ -88,8 +88,7 @@ const opal_pmix_base_module_t opal_pmix_pmix1xx_module = {
pmix1_store_local
};
int pmix1_store_local(const opal_process_name_t *proc,
opal_value_t *val)
int pmix1_store_local(const opal_process_name_t *proc, opal_value_t *val)
{
pmix_value_t kv;
pmix_status_t rc;

Просмотреть файл

@ -336,66 +336,8 @@ static int rte_init(void)
if (NULL != mycpuset){
free(mycpuset);
}
/* get our local peers */
if (0 < orte_process_info.num_local_peers) {
/* retrieve the local peers */
OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCAL_PEERS,
ORTE_PROC_MY_NAME, &val, OPAL_STRING);
if (OPAL_SUCCESS == ret && NULL != val) {
peers = opal_argv_split(val, ',');
free(val);
} else {
peers = NULL;
}
} else {
peers = NULL;
}
/* set the locality */
name.jobid = ORTE_PROC_MY_NAME->jobid;
for (sz=0; sz < orte_process_info.num_procs; sz++) {
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_LOCALITY);
kv->type = OPAL_UINT16;
name.vpid = sz;
if (sz == ORTE_PROC_MY_NAME->vpid) {
/* we are fully local to ourselves */
u16 = OPAL_PROC_ALL_LOCAL;
} else if (NULL == peers) {
/* nobody is local to us */
u16 = OPAL_PROC_NON_LOCAL;
} else {
for (i=0; NULL != peers[i]; i++) {
if (sz == strtoul(peers[i], NULL, 10)) {
break;
}
}
if (NULL == peers[i]) {
/* not a local peer */
u16 = OPAL_PROC_NON_LOCAL;
} else {
/* all we can say is they are on the same node */
u16 = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
}
}
/* store this data internally - not to be pushed outside of
* ourselves as it only has meaning relative to us */
ret = opal_pmix.store_local(&name, kv);
if (OPAL_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
error = "pmix store local";
opal_argv_free(peers);
goto error;
}
OBJ_RELEASE(kv);
}
opal_argv_free(peers);
/* we don't need to force the routed system to pick the
* "direct" component as that should happen automatically
* in those cases where we are direct launched (i.e., no
* HNP is defined in the environment */
/* now that we have all required info, complete the setup */
if (ORTE_SUCCESS != (ret = orte_ess_base_app_setup(false))) {
ORTE_ERROR_LOG(ret);

Просмотреть файл

@ -330,6 +330,11 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
opal_output_verbose(2, orte_pmix_server_globals.output,
"%s connect called with %d procs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)opal_list_get_size(procs));
/* for now, just ack the call */
if (NULL != cbfunc) {
cbfunc(OPAL_SUCCESS, cbdata);
@ -341,6 +346,11 @@ int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
int pmix_server_disconnect_fn(opal_list_t *procs, opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
opal_output_verbose(2, orte_pmix_server_globals.output,
"%s disconnect called with %d procs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)opal_list_get_size(procs));
/* for now, just ack the call */
if (NULL != cbfunc) {
cbfunc(OPAL_SUCCESS, cbdata);

Просмотреть файл

@ -360,14 +360,18 @@ void pmix_server_keyval_client(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tg, void *cbdata)
{
int rc, ret, room_num;
int rc, ret, room_num = -1;
int32_t cnt;
pmix_server_req_t *req;
pmix_server_req_t *req=NULL;
opal_list_t info;
opal_value_t *iptr;
opal_pmix_pdata_t *pdata;
opal_process_name_t source;
opal_output_verbose(1, orte_pmix_server_globals.output,
"%s recvd lookup data return",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
OBJ_CONSTRUCT(&info, opal_list_t);
/* unpack the room number of the request tracker */
cnt = 1;
@ -384,12 +388,20 @@ void pmix_server_keyval_client(int status, orte_process_name_t* sender,
goto release;
}
opal_output_verbose(5, orte_pmix_server_globals.output,
"%s recvd lookup returned status %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ret);
if (ORTE_SUCCESS == ret) {
/* see if any data was included - not an error if the answer is no */
cnt = 1;
while (OPAL_SUCCESS == opal_dss.unpack(buffer, &source, &cnt, OPAL_NAME)) {
pdata = OBJ_NEW(opal_pmix_pdata_t);
pdata->proc = source;
opal_output_verbose(5, orte_pmix_server_globals.output,
"%s recvd lookup returned data from source %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&source));
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &iptr, &cnt, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(pdata);
@ -406,10 +418,12 @@ void pmix_server_keyval_client(int status, orte_process_name_t* sender,
}
}
/* retrieve the tracker */
opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
release:
if (0 <= room_num) {
/* retrieve the tracker */
opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
}
if (NULL != req) {
/* pass down the response */
if (NULL != req->opcbfunc) {

Просмотреть файл

@ -84,6 +84,7 @@ OBJ_CLASS_INSTANCE(orte_data_object_t,
typedef struct {
opal_list_item_t super;
orte_process_name_t requestor;
int room_number;
uint32_t uid;
opal_pmix_data_range_t range;
char **keys;
@ -234,6 +235,10 @@ void orte_data_server(int status, orte_process_name_t* sender,
data->index = opal_pointer_array_add(&orte_data_server_store, data);
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: checking for pending requests",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* check for pending requests that match this data */
reply = NULL;
OPAL_LIST_FOREACH_SAFE(req, rqnext, &pending, orte_data_req_t) {
@ -250,6 +255,12 @@ void orte_data_server(int status, orte_process_name_t* sender,
/* found it - package it for return */
if (NULL == reply) {
reply = OBJ_NEW(opal_buffer_t);
/* start with their room number */
if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &req->room_number, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
break;
}
/* then the status */
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
@ -269,6 +280,11 @@ void orte_data_server(int status, orte_process_name_t* sender,
}
if (NULL != reply) {
/* send it back to the requestor */
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: returning data to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&req->requestor)));
if (0 > (rc = orte_rml.send_buffer_nb(&req->requestor, reply, ORTE_RML_TAG_DATA_CLIENT,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
@ -393,14 +409,23 @@ void orte_data_server(int status, orte_process_name_t* sender,
}
}
if (!ret_packed) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server:lookup: data not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if we were told to wait for the data, then queue this up
* for later processing */
if (wait) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server:lookup: pushing request to wait",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
req = OBJ_NEW(orte_data_req_t);
req->room_number = room_number;
req->requestor = *sender;
req->uid = uid;
req->range = range;
req->keys = keys;
opal_list_append(&pending, &req->super);
return;
}
/* nothing was found - indicate that situation */
@ -409,6 +434,9 @@ void orte_data_server(int status, orte_process_name_t* sender,
goto SEND_ERROR;
}
opal_argv_free(keys);
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server:lookup: data found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto SEND_ANSWER;
break;