Коммит
3fa6c635a3
272
ompi/dpm/dpm.c
272
ompi/dpm/dpm.c
@ -112,8 +112,8 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
const char *port_string, bool send_first,
|
||||
ompi_communicator_t **newcomm)
|
||||
{
|
||||
int k, size, rsize, rank, rc;
|
||||
char **members = NULL, *nstring;
|
||||
int k, size, rsize, rank, rc, rportlen=0;
|
||||
char **members = NULL, *nstring, *rport=NULL, **pkeys=NULL;
|
||||
bool dense, isnew;
|
||||
opal_process_name_t pname;
|
||||
opal_list_t ilist, mlist, rlist;
|
||||
@ -169,6 +169,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
if (NULL == (proc_list[i] = ompi_group_peer_lookup(group,i))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
free(proc_list);
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
@ -195,13 +196,6 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
if (rank == root) {
|
||||
/* the root for each side publishes their list of participants */
|
||||
OBJ_CONSTRUCT(&ilist, opal_list_t);
|
||||
/* publish a key-val containing the port string itself
|
||||
* so other participants from our job can connect */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = opal_argv_join(members, ':');
|
||||
info->type = OPAL_STRING;
|
||||
info->data.string = strdup(port_string);
|
||||
opal_list_append(&ilist, &info->super);
|
||||
/* put my name at the front of the list of members - my
|
||||
* name will therefore be on the list twice, but the
|
||||
* other side's root needs to know the root from this side */
|
||||
@ -220,141 +214,111 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
info->type = OPAL_STRING;
|
||||
info->data.string = opal_argv_join(members, ':');
|
||||
opal_list_append(&ilist, &info->super);
|
||||
/* also save the key for later */
|
||||
opal_argv_append_nosize(&pkeys, info->key);
|
||||
/* publish them with "session" scope */
|
||||
rc = opal_pmix.publish(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
opal_argv_free(members);
|
||||
opal_argv_free(pkeys);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
/* check to see if we have to get the port string for
|
||||
* this connect - if we were started via a call to spawn,
|
||||
* then the port string was given to us. Otherwise, the
|
||||
* string will be NULL */
|
||||
if (0 == strlen(port_string)) {
|
||||
/* our root should have published it */
|
||||
OBJ_CONSTRUCT(&ilist, opal_list_t);
|
||||
pdat = OBJ_NEW(opal_pmix_pdata_t);
|
||||
pdat->value.key = opal_argv_join(members, ':');
|
||||
opal_list_append(&ilist, &pdat->super);
|
||||
OBJ_CONSTRUCT(&mlist, opal_list_t);
|
||||
/* if a non-blocking version of lookup isn't
|
||||
* available, then use the blocking version */
|
||||
if (NULL == opal_pmix.lookup_nb) {
|
||||
rc = opal_pmix.lookup(&ilist, &mlist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
opal_argv_free(members);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
char **keys = NULL;
|
||||
struct lookup_caddy_t caddy;
|
||||
opal_argv_append_nosize(&keys, pdat->value.key);
|
||||
caddy.active = true;
|
||||
caddy.pdat = pdat;
|
||||
/* tell it to wait for the data to arrive */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = strdup(OPAL_PMIX_WAIT);
|
||||
info->type = OPAL_BOOL;
|
||||
info->data.flag = true;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
/* give it a decent timeout as we don't know when
|
||||
* the other side may call connect - it doesn't
|
||||
* have to be simultaneous */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = strdup(OPAL_PMIX_TIMEOUT);
|
||||
info->type = OPAL_INT;
|
||||
info->data.integer = (size < 60) ? size : 60;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
opal_argv_free(keys);
|
||||
opal_argv_free(members);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
OMPI_WAIT_FOR_COMPLETION(caddy.active);
|
||||
opal_argv_free(keys);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != caddy.status) {
|
||||
OMPI_ERROR_LOG(caddy.status);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
opal_argv_free(members);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
port_string = strdup(pdat->value.data.string);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
/* lookup the other side's info - if a non-blocking form
|
||||
* of lookup isn't available, then we use the blocking
|
||||
* form and trust that the underlying system will WAIT
|
||||
* until the other side publishes its data */
|
||||
OBJ_CONSTRUCT(&ilist, opal_list_t);
|
||||
pdat = OBJ_NEW(opal_pmix_pdata_t);
|
||||
if (send_first) {
|
||||
(void)asprintf(&pdat->value.key, "%s:accept", port_string);
|
||||
} else {
|
||||
(void)asprintf(&pdat->value.key, "%s:connect", port_string);
|
||||
}
|
||||
opal_list_append(&ilist, &pdat->super);
|
||||
OBJ_CONSTRUCT(&mlist, opal_list_t);
|
||||
/* if a non-blocking version of lookup isn't
|
||||
* available, then use the blocking version */
|
||||
if (NULL == opal_pmix.lookup_nb) {
|
||||
rc = opal_pmix.lookup(&ilist, &mlist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
opal_argv_free(members);
|
||||
goto exit;
|
||||
}
|
||||
} else {
|
||||
char **keys = NULL;
|
||||
struct lookup_caddy_t caddy;
|
||||
opal_argv_append_nosize(&keys, pdat->value.key);
|
||||
caddy.active = true;
|
||||
caddy.pdat = pdat;
|
||||
/* tell it to wait for the data to arrive */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = strdup(OPAL_PMIX_WAIT);
|
||||
info->type = OPAL_BOOL;
|
||||
info->data.flag = true;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
/* give it a decent timeout as we don't know when
|
||||
* the other side may call connect - it doesn't
|
||||
* have to be simultaneous */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = strdup(OPAL_PMIX_TIMEOUT);
|
||||
info->type = OPAL_INT;
|
||||
info->data.integer = 60;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
opal_argv_free(keys);
|
||||
opal_argv_free(members);
|
||||
goto exit;
|
||||
}
|
||||
OMPI_WAIT_FOR_COMPLETION(caddy.active);
|
||||
opal_argv_free(keys);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != caddy.status) {
|
||||
OMPI_ERROR_LOG(caddy.status);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
opal_argv_free(members);
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
/* save the result */
|
||||
rport = strdup(pdat->value.data.string); // need this later
|
||||
rportlen = strlen(rport) + 1; // retain the NULL terminator
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
}
|
||||
|
||||
/* lookup the other side's info - if a non-blocking form
|
||||
* of lookup isn't available, then we use the blocking
|
||||
* form and trust that the underlying system will WAIT
|
||||
* until the other side publishes its data */
|
||||
OBJ_CONSTRUCT(&ilist, opal_list_t);
|
||||
pdat = OBJ_NEW(opal_pmix_pdata_t);
|
||||
if (send_first) {
|
||||
(void)asprintf(&pdat->value.key, "%s:accept", port_string);
|
||||
} else {
|
||||
(void)asprintf(&pdat->value.key, "%s:connect", port_string);
|
||||
/* if we aren't in a comm_spawn, the non-root members won't have
|
||||
* a port_string - so let's make sure everyone knows the other
|
||||
* side's participants */
|
||||
|
||||
/* bcast the list-length to all processes in the local comm */
|
||||
rc = comm->c_coll.coll_bcast(&rportlen, 1, MPI_INT, root, comm,
|
||||
comm->c_coll.coll_bcast_module);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
free(rport);
|
||||
goto exit;
|
||||
}
|
||||
opal_list_append(&ilist, &pdat->super);
|
||||
OBJ_CONSTRUCT(&mlist, opal_list_t);
|
||||
/* if a non-blocking version of lookup isn't
|
||||
* available, then use the blocking version */
|
||||
if (NULL == opal_pmix.lookup_nb) {
|
||||
rc = opal_pmix.lookup(&ilist, &mlist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
opal_argv_free(members);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
char **keys = NULL;
|
||||
struct lookup_caddy_t caddy;
|
||||
opal_argv_append_nosize(&keys, pdat->value.key);
|
||||
caddy.active = true;
|
||||
caddy.pdat = pdat;
|
||||
/* tell it to wait for the data to arrive */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = strdup(OPAL_PMIX_WAIT);
|
||||
info->type = OPAL_BOOL;
|
||||
info->data.flag = true;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
/* give it a decent timeout as we don't know when
|
||||
* the other side may call connect - it doesn't
|
||||
* have to be simultaneous */
|
||||
info = OBJ_NEW(opal_value_t);
|
||||
info->key = strdup(OPAL_PMIX_TIMEOUT);
|
||||
info->type = OPAL_INT;
|
||||
info->data.integer = (size < 60) ? size : 60;
|
||||
opal_list_append(&mlist, &info->super);
|
||||
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
opal_argv_free(keys);
|
||||
opal_argv_free(members);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
OMPI_WAIT_FOR_COMPLETION(caddy.active);
|
||||
opal_argv_free(keys);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
if (OPAL_SUCCESS != caddy.status) {
|
||||
OMPI_ERROR_LOG(caddy.status);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
opal_argv_free(members);
|
||||
return OMPI_ERROR;
|
||||
|
||||
if (rank != root) {
|
||||
/* non root processes need to allocate the buffer manually */
|
||||
rport = (char*)malloc(rportlen);
|
||||
if (NULL == rport) {
|
||||
rc = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
/* now share the list of remote participants */
|
||||
rc = comm->c_coll.coll_bcast(rport, rportlen, MPI_BYTE, root, comm,
|
||||
comm->c_coll.coll_bcast_module);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
free(rport);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* initiate a list of participants for the connect,
|
||||
* starting with our own members, remembering to
|
||||
@ -370,8 +334,9 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
opal_argv_free(members);
|
||||
free(rport);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
return rc;
|
||||
goto exit;
|
||||
}
|
||||
/* if the rank is wildcard, then we need to add all procs
|
||||
* in that job to the list */
|
||||
@ -390,7 +355,9 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
opal_argv_free(members);
|
||||
return OMPI_ERR_BAD_PARAM;
|
||||
free(rport);
|
||||
rc = OMPI_ERR_BAD_PARAM;
|
||||
goto exit;
|
||||
}
|
||||
++i;
|
||||
} else {
|
||||
@ -403,21 +370,22 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
/* the pdat object will contain a colon-delimited list
|
||||
* of process names for the remote procs - convert it
|
||||
* into an argv array */
|
||||
members = opal_argv_split(pdat->value.data.string, ':');
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
members = opal_argv_split(rport, ':');
|
||||
free(rport);
|
||||
|
||||
/* the first entry is the root for the remote side */
|
||||
if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&pname, members[0]))) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
opal_argv_free(members);
|
||||
return rc;
|
||||
goto exit;
|
||||
}
|
||||
/* check the name - it should never be a wildcard, so
|
||||
* this is just checking for an error */
|
||||
if (OPAL_VPID_WILDCARD == pname.vpid) {
|
||||
OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
|
||||
opal_argv_free(members);
|
||||
return OMPI_ERR_BAD_PARAM;
|
||||
rc = OMPI_ERR_BAD_PARAM;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* add the list of remote procs to our list, and
|
||||
@ -432,7 +400,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
opal_argv_free(members);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&rlist);
|
||||
return rc;
|
||||
goto exit;
|
||||
}
|
||||
if (OPAL_VPID_WILDCARD == nm->name.vpid) {
|
||||
jobid = nm->name.jobid;
|
||||
@ -446,7 +414,8 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
opal_argv_free(members);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&rlist);
|
||||
return OMPI_ERR_BAD_PARAM;
|
||||
rc = OMPI_ERR_BAD_PARAM;
|
||||
goto exit;
|
||||
}
|
||||
rsize = strtoul(members[i+1], NULL, 10);
|
||||
++i;
|
||||
@ -485,10 +454,16 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
opal_argv_free(members);
|
||||
|
||||
/* tell the host RTE to connect us - this will download
|
||||
* all known data for the nspace's of participating procs */
|
||||
* all known data for the nspace's of participating procs
|
||||
* so that add_procs will not result in a slew of lookups */
|
||||
rc = opal_pmix.connect(&mlist);
|
||||
OPAL_LIST_DESTRUCT(&mlist);
|
||||
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OMPI_ERROR_LOG(rc);
|
||||
OPAL_LIST_DESTRUCT(&ilist);
|
||||
OPAL_LIST_DESTRUCT(&rlist);
|
||||
goto exit;
|
||||
}
|
||||
if (0 < opal_list_get_size(&ilist)) {
|
||||
/* convert the list of new procs to a proc_t array */
|
||||
new_proc_list = (ompi_proc_t**)calloc(opal_list_get_size(&ilist),
|
||||
@ -598,6 +573,10 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
|
||||
*/
|
||||
|
||||
exit:
|
||||
if (NULL != pkeys) {
|
||||
opal_pmix.unpublish(pkeys, NULL);
|
||||
opal_argv_free(pkeys);
|
||||
}
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
if (MPI_COMM_NULL != newcomp && NULL != newcomp) {
|
||||
OBJ_RETAIN(newcomp);
|
||||
@ -695,13 +674,12 @@ int ompi_dpm_disconnect(ompi_communicator_t *comm)
|
||||
return ret;
|
||||
}
|
||||
|
||||
opal_pmix.fence(&coll, false);
|
||||
|
||||
/* ensure we tell the host RM to disconnect us */
|
||||
opal_pmix.disconnect(&coll);
|
||||
/* ensure we tell the host RM to disconnect us - this
|
||||
* is a blocking operation that must include a fence */
|
||||
ret = opal_pmix.disconnect(&coll);
|
||||
OPAL_LIST_DESTRUCT(&coll);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_dpm_spawn(int count, const char *array_of_commands[],
|
||||
@ -1067,7 +1045,7 @@ int ompi_dpm_open_port(char *port_name)
|
||||
|
||||
r = opal_rand(&rnd);
|
||||
opal_convert_process_name_to_string(&tmp, OMPI_PROC_MY_NAME);
|
||||
snprintf(port_name, MPI_MAX_PORT_NAME, "%s:%u", tmp, r);
|
||||
snprintf(port_name, MPI_MAX_PORT_NAME-1, "%s:%u", tmp, r);
|
||||
free(tmp);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -100,6 +100,9 @@ int MPI_Comm_join(int fd, MPI_Comm *intercomm)
|
||||
send_first = true;
|
||||
}
|
||||
|
||||
/* ensure the port name is NULL terminated */
|
||||
memset(port_name, 0, MPI_MAX_PORT_NAME);
|
||||
|
||||
/* Assumption: socket_send should not block, even if the socket
|
||||
is not configured to be non-blocking, because the message length are
|
||||
so short. */
|
||||
|
@ -301,7 +301,7 @@ int pmix1_get(const opal_process_name_t *proc,
|
||||
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
|
||||
"%s PMIx_client get on proc %s key %s",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
|
||||
OPAL_NAME_PRINT(*proc), key);
|
||||
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);
|
||||
|
||||
/* prep default response */
|
||||
*val = NULL;
|
||||
@ -371,7 +371,7 @@ int pmix1_getnb(const opal_process_name_t *proc, const char *key,
|
||||
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
|
||||
"%s PMIx_client get_nb on proc %s key %s",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
|
||||
OPAL_NAME_PRINT(*proc), key);
|
||||
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);
|
||||
|
||||
/* create the caddy */
|
||||
op = OBJ_NEW(pmix1_opcaddy_t);
|
||||
@ -501,7 +501,7 @@ int pmix1_lookup(opal_list_t *data, opal_list_t *info)
|
||||
++n;
|
||||
}
|
||||
} else {
|
||||
pdata = NULL;
|
||||
pinfo = NULL;
|
||||
ninfo = 0;
|
||||
}
|
||||
|
||||
|
@ -569,6 +569,15 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
|
||||
}
|
||||
}
|
||||
|
||||
static void opcon(orte_pmix_server_op_caddy_t *p)
|
||||
{
|
||||
p->procs = NULL;
|
||||
p->info = NULL;
|
||||
p->cbdata = NULL;
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_pmix_server_op_caddy_t,
|
||||
opal_object_t,
|
||||
opcon, NULL);
|
||||
|
||||
static void rqcon(pmix_server_req_t *p)
|
||||
{
|
||||
|
@ -46,7 +46,8 @@
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
|
||||
#include "pmix_server_internal.h"
|
||||
#include "orte/orted/pmix/pmix_server.h"
|
||||
#include "orte/orted/pmix/pmix_server_internal.h"
|
||||
|
||||
void pmix_server_launch_resp(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
@ -327,6 +328,119 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
static void _cnct(int sd, short args, void *cbdata);
|
||||
|
||||
static void _cnlk(int status, opal_list_t *data, void *cbdata)
|
||||
{
|
||||
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
|
||||
int rc, cnt;
|
||||
opal_pmix_pdata_t *pdat;
|
||||
orte_job_t *jdata;
|
||||
opal_buffer_t buf;
|
||||
|
||||
/* if we failed to get the required data, then just inform
|
||||
* the embedded server that the connect cannot succeed */
|
||||
if (ORTE_SUCCESS != status || NULL == data) {
|
||||
if (NULL != cd->cbfunc) {
|
||||
rc = status;
|
||||
goto release;
|
||||
}
|
||||
}
|
||||
|
||||
/* register the returned data with the embedded PMIx server */
|
||||
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
|
||||
if (OPAL_BYTE_OBJECT != pdat->value.type) {
|
||||
rc = ORTE_ERR_BAD_PARAM;
|
||||
goto release;
|
||||
}
|
||||
/* the data will consist of a packed buffer with the job data in it */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
opal_dss.load(&buf, pdat->value.data.bo.bytes, pdat->value.data.bo.size);
|
||||
pdat->value.data.bo.bytes = NULL;
|
||||
pdat->value.data.bo.size = 0;
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
|
||||
OBJ_DESTRUCT(&buf);
|
||||
goto release;
|
||||
}
|
||||
OBJ_DESTRUCT(&buf);
|
||||
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata))) {
|
||||
OBJ_RELEASE(jdata);
|
||||
goto release;
|
||||
}
|
||||
OBJ_RELEASE(jdata); // no reason to keep this around
|
||||
|
||||
/* restart the cnct processor */
|
||||
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
|
||||
OBJ_RELEASE(cd);
|
||||
|
||||
release:
|
||||
if (NULL != cd->cbfunc) {
|
||||
cd->cbfunc(rc, cd->cbdata);
|
||||
}
|
||||
OBJ_RELEASE(cd);
|
||||
}
|
||||
|
||||
static void _cnct(int sd, short args, void *cbdata)
|
||||
{
|
||||
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
|
||||
orte_namelist_t *nm;
|
||||
char **keys = NULL, *key;
|
||||
orte_job_t *jdata;
|
||||
int rc = ORTE_SUCCESS;
|
||||
|
||||
/* at some point, we need to add bookeeping to track which
|
||||
* procs are "connected" so we know who to notify upon
|
||||
* termination or failure. For now, we have to ensure
|
||||
* that we have registered all participating nspaces so
|
||||
* the embedded PMIx server can provide them to the client.
|
||||
* Otherwise, the client will receive an error as it won't
|
||||
* be able to resolve any of the required data for the
|
||||
* missing nspaces */
|
||||
|
||||
/* cycle thru the procs */
|
||||
OPAL_LIST_FOREACH(nm, cd->procs, orte_namelist_t) {
|
||||
/* see if we have the job object for this job */
|
||||
if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) {
|
||||
/* we don't know about this job. If our "global" data
|
||||
* server is just our HNP, then we have no way of finding
|
||||
* out about it, and all we can do is return an error */
|
||||
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
|
||||
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
|
||||
rc = ORTE_ERR_NOT_SUPPORTED;
|
||||
goto release;
|
||||
}
|
||||
/* ask the global data server for the data - if we get it,
|
||||
* then we can complete the request */
|
||||
key = opal_convert_jobid_to_string(nm->name.jobid);
|
||||
opal_argv_append_nosize(&keys, key);
|
||||
free(key);
|
||||
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
|
||||
opal_argv_free(keys);
|
||||
goto release;
|
||||
}
|
||||
opal_argv_free(keys);
|
||||
/* the callback function on this lookup will return us to this
|
||||
* routine so we can continue the process */
|
||||
return;
|
||||
}
|
||||
/* we know about the job - check to ensure it has been
|
||||
* registered with the local PMIx server */
|
||||
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
|
||||
/* it hasn't been registered yet, so register it now */
|
||||
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata))) {
|
||||
goto release;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
release:
|
||||
if (NULL != cd->cbfunc) {
|
||||
cd->cbfunc(rc, cd->cbdata);
|
||||
}
|
||||
OBJ_RELEASE(cd);
|
||||
}
|
||||
|
||||
int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
|
||||
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||
{
|
||||
@ -335,26 +449,52 @@ int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)opal_list_get_size(procs));
|
||||
|
||||
/* for now, just ack the call */
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(OPAL_SUCCESS, cbdata);
|
||||
/* protect ourselves */
|
||||
if (NULL == procs || 0 == opal_list_get_size(procs)) {
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
/* must thread shift this as we will be accessing global data */
|
||||
ORTE_PMIX_OPERATION(procs, info, _cnct, cbfunc, cbdata);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
static void mdxcbfunc(int status,
|
||||
const char *data, size_t ndata, void *cbdata,
|
||||
opal_pmix_release_cbfunc_t relcbfunc, void *relcbdata)
|
||||
{
|
||||
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
|
||||
|
||||
/* ack the call */
|
||||
if (NULL != cd->cbfunc) {
|
||||
cd->cbfunc(status, cd->cbdata);
|
||||
}
|
||||
OBJ_RELEASE(cd);
|
||||
}
|
||||
|
||||
int pmix_server_disconnect_fn(opal_list_t *procs, opal_list_t *info,
|
||||
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||
{
|
||||
orte_pmix_server_op_caddy_t *cd;
|
||||
int rc;
|
||||
|
||||
opal_output_verbose(2, orte_pmix_server_globals.output,
|
||||
"%s disconnect called with %d procs",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)opal_list_get_size(procs));
|
||||
|
||||
/* for now, just ack the call */
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(OPAL_SUCCESS, cbdata);
|
||||
/* at some point, we need to add bookeeping to track which
|
||||
* procs are "connected" so we know who to notify upon
|
||||
* termination or failure. For now, just execute a fence
|
||||
* Note that we do not need to thread-shift here as the
|
||||
* fence function will do it for us */
|
||||
cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
|
||||
cd->cbfunc = cbfunc;
|
||||
cd->cbdata = cbdata;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = pmix_server_fencenb_fn(procs, info, NULL, 0,
|
||||
mdxcbfunc, cd))) {
|
||||
OBJ_RELEASE(cd);
|
||||
}
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
return rc;
|
||||
}
|
||||
|
@ -72,10 +72,8 @@ OBJ_CLASS_DECLARATION(pmix_server_req_t);
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t ev;
|
||||
orte_job_t *jdata;
|
||||
orte_process_name_t proc;
|
||||
int status;
|
||||
orte_proc_t *object;
|
||||
opal_list_t *procs;
|
||||
opal_list_t *info;
|
||||
opal_pmix_op_cbfunc_t cbfunc;
|
||||
void *cbdata;
|
||||
} orte_pmix_server_op_caddy_t;
|
||||
@ -115,21 +113,18 @@ do { \
|
||||
opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_PMIX_OPERATION(n, r, ob, s, fn, cf, cb) \
|
||||
do { \
|
||||
orte_pmix_server_op_caddy_t *_cd; \
|
||||
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
|
||||
/* convert the namespace to jobid and create name */ \
|
||||
orte_util_convert_string_to_jobid(&(_cd->proc.jobid), (n)); \
|
||||
_cd->proc.vpid = (r); \
|
||||
_cd->object = (ob); \
|
||||
_cd->cbfunc = (cf); \
|
||||
_cd->cbdata = (cb); \
|
||||
_cd->status = (s); \
|
||||
opal_event_set(orte_event_base, &(_cd->ev), -1, \
|
||||
OPAL_EV_WRITE, (fn), _cd); \
|
||||
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
|
||||
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
|
||||
#define ORTE_PMIX_OPERATION(p, i, fn, cf, cb) \
|
||||
do { \
|
||||
orte_pmix_server_op_caddy_t *_cd; \
|
||||
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
|
||||
_cd->procs = (p); \
|
||||
_cd->info = (i); \
|
||||
_cd->cbfunc = (cf); \
|
||||
_cd->cbdata = (cb); \
|
||||
opal_event_set(orte_event_base, &(_cd->ev), -1, \
|
||||
OPAL_EV_WRITE, (fn), _cd); \
|
||||
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
|
||||
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
|
||||
|
@ -387,6 +387,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata)
|
||||
opal_list_append(pmap, &kv->super);
|
||||
}
|
||||
|
||||
/* mark the job as registered */
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, ORTE_ATTR_LOCAL, NULL, OPAL_BOOL);
|
||||
|
||||
/* pass it down */
|
||||
if (OPAL_SUCCESS != opal_pmix.server_register_nspace(jdata->jobid,
|
||||
jdata->num_local_procs,
|
||||
|
@ -259,6 +259,8 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
|
||||
return "JOB-ROOM-NUM";
|
||||
case ORTE_JOB_LAUNCH_PROXY:
|
||||
return "JOB-LAUNCH-PROXY";
|
||||
case ORTE_JOB_NSPACE_REGISTERED:
|
||||
return "JOB-NSPACE-REGISTERED";
|
||||
|
||||
case ORTE_PROC_NOBARRIER:
|
||||
return "PROC-NOBARRIER";
|
||||
|
@ -129,6 +129,7 @@ typedef uint16_t orte_job_flags_t;
|
||||
#define ORTE_JOB_NOTIFICATIONS (ORTE_JOB_START_KEY + 38) // string - comma-separated list of desired notifications+methods
|
||||
#define ORTE_JOB_ROOM_NUM (ORTE_JOB_START_KEY + 39) // int - number of remote request's hotel room
|
||||
#define ORTE_JOB_LAUNCH_PROXY (ORTE_JOB_START_KEY + 40) // opal_process_name_t - name of spawn requestor
|
||||
#define ORTE_JOB_NSPACE_REGISTERED (ORTE_JOB_START_KEY + 41) // bool - job has been registered with embedded PMIx server
|
||||
|
||||
#define ORTE_JOB_MAX_KEY 300
|
||||
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user