diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index df4d585fb5..def9f1b30c 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -112,8 +112,8 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, const char *port_string, bool send_first, ompi_communicator_t **newcomm) { - int k, size, rsize, rank, rc; - char **members = NULL, *nstring; + int k, size, rsize, rank, rc, rportlen=0; + char **members = NULL, *nstring, *rport=NULL, **pkeys=NULL; bool dense, isnew; opal_process_name_t pname; opal_list_t ilist, mlist, rlist; @@ -169,6 +169,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, if (NULL == (proc_list[i] = ompi_group_peer_lookup(group,i))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); rc = ORTE_ERR_NOT_FOUND; + free(proc_list); goto exit; } } @@ -195,13 +196,6 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, if (rank == root) { /* the root for each side publishes their list of participants */ OBJ_CONSTRUCT(&ilist, opal_list_t); - /* publish a key-val containing the port string itself - * so other participants from our job can connect */ - info = OBJ_NEW(opal_value_t); - info->key = opal_argv_join(members, ':'); - info->type = OPAL_STRING; - info->data.string = strdup(port_string); - opal_list_append(&ilist, &info->super); /* put my name at the front of the list of members - my * name will therefore be on the list twice, but the * other side's root needs to know the root from this side */ @@ -220,141 +214,111 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, info->type = OPAL_STRING; info->data.string = opal_argv_join(members, ':'); opal_list_append(&ilist, &info->super); + /* also save the key for later */ + opal_argv_append_nosize(&pkeys, info->key); /* publish them with "session" scope */ rc = opal_pmix.publish(&ilist); OPAL_LIST_DESTRUCT(&ilist); if (OPAL_SUCCESS != rc) { opal_argv_free(members); + opal_argv_free(pkeys); return OMPI_ERROR; } - } else { - /* check to see if we have to get the port string for - * this connect - if we were started via a call to spawn, - * then the port string was given to us. Otherwise, the - * string will be NULL */ - if (0 == strlen(port_string)) { - /* our root should have published it */ - OBJ_CONSTRUCT(&ilist, opal_list_t); - pdat = OBJ_NEW(opal_pmix_pdata_t); - pdat->value.key = opal_argv_join(members, ':'); - opal_list_append(&ilist, &pdat->super); - OBJ_CONSTRUCT(&mlist, opal_list_t); - /* if a non-blocking version of lookup isn't - * available, then use the blocking version */ - if (NULL == opal_pmix.lookup_nb) { - rc = opal_pmix.lookup(&ilist, &mlist); - OPAL_LIST_DESTRUCT(&mlist); - if (OPAL_SUCCESS != rc) { - OMPI_ERROR_LOG(rc); - OPAL_LIST_DESTRUCT(&ilist); - opal_argv_free(members); - return OMPI_ERROR; - } - } else { - char **keys = NULL; - struct lookup_caddy_t caddy; - opal_argv_append_nosize(&keys, pdat->value.key); - caddy.active = true; - caddy.pdat = pdat; - /* tell it to wait for the data to arrive */ - info = OBJ_NEW(opal_value_t); - info->key = strdup(OPAL_PMIX_WAIT); - info->type = OPAL_BOOL; - info->data.flag = true; - opal_list_append(&mlist, &info->super); - /* give it a decent timeout as we don't know when - * the other side may call connect - it doesn't - * have to be simultaneous */ - info = OBJ_NEW(opal_value_t); - info->key = strdup(OPAL_PMIX_TIMEOUT); - info->type = OPAL_INT; - info->data.integer = (size < 60) ? size : 60; - opal_list_append(&mlist, &info->super); - rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy); - if (OPAL_SUCCESS != rc) { - OPAL_LIST_DESTRUCT(&ilist); - OPAL_LIST_DESTRUCT(&mlist); - opal_argv_free(keys); - opal_argv_free(members); - return OMPI_ERROR; - } - OMPI_WAIT_FOR_COMPLETION(caddy.active); - opal_argv_free(keys); - OPAL_LIST_DESTRUCT(&mlist); - if (OPAL_SUCCESS != caddy.status) { - OMPI_ERROR_LOG(caddy.status); - OPAL_LIST_DESTRUCT(&ilist); - opal_argv_free(members); - return OMPI_ERROR; - } - } - port_string = strdup(pdat->value.data.string); - OPAL_LIST_DESTRUCT(&ilist); + /* lookup the other side's info - if a non-blocking form + * of lookup isn't available, then we use the blocking + * form and trust that the underlying system will WAIT + * until the other side publishes its data */ + OBJ_CONSTRUCT(&ilist, opal_list_t); + pdat = OBJ_NEW(opal_pmix_pdata_t); + if (send_first) { + (void)asprintf(&pdat->value.key, "%s:accept", port_string); + } else { + (void)asprintf(&pdat->value.key, "%s:connect", port_string); } + opal_list_append(&ilist, &pdat->super); + OBJ_CONSTRUCT(&mlist, opal_list_t); + /* if a non-blocking version of lookup isn't + * available, then use the blocking version */ + if (NULL == opal_pmix.lookup_nb) { + rc = opal_pmix.lookup(&ilist, &mlist); + OPAL_LIST_DESTRUCT(&mlist); + if (OPAL_SUCCESS != rc) { + OMPI_ERROR_LOG(rc); + OPAL_LIST_DESTRUCT(&ilist); + opal_argv_free(members); + goto exit; + } + } else { + char **keys = NULL; + struct lookup_caddy_t caddy; + opal_argv_append_nosize(&keys, pdat->value.key); + caddy.active = true; + caddy.pdat = pdat; + /* tell it to wait for the data to arrive */ + info = OBJ_NEW(opal_value_t); + info->key = strdup(OPAL_PMIX_WAIT); + info->type = OPAL_BOOL; + info->data.flag = true; + opal_list_append(&mlist, &info->super); + /* give it a decent timeout as we don't know when + * the other side may call connect - it doesn't + * have to be simultaneous */ + info = OBJ_NEW(opal_value_t); + info->key = strdup(OPAL_PMIX_TIMEOUT); + info->type = OPAL_INT; + info->data.integer = 60; + opal_list_append(&mlist, &info->super); + rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy); + if (OPAL_SUCCESS != rc) { + OPAL_LIST_DESTRUCT(&ilist); + OPAL_LIST_DESTRUCT(&mlist); + opal_argv_free(keys); + opal_argv_free(members); + goto exit; + } + OMPI_WAIT_FOR_COMPLETION(caddy.active); + opal_argv_free(keys); + OPAL_LIST_DESTRUCT(&mlist); + if (OPAL_SUCCESS != caddy.status) { + OMPI_ERROR_LOG(caddy.status); + OPAL_LIST_DESTRUCT(&ilist); + opal_argv_free(members); + goto exit; + } + } + /* save the result */ + rport = strdup(pdat->value.data.string); // need this later + rportlen = strlen(rport) + 1; // retain the NULL terminator + OPAL_LIST_DESTRUCT(&ilist); } - /* lookup the other side's info - if a non-blocking form - * of lookup isn't available, then we use the blocking - * form and trust that the underlying system will WAIT - * until the other side publishes its data */ - OBJ_CONSTRUCT(&ilist, opal_list_t); - pdat = OBJ_NEW(opal_pmix_pdata_t); - if (send_first) { - (void)asprintf(&pdat->value.key, "%s:accept", port_string); - } else { - (void)asprintf(&pdat->value.key, "%s:connect", port_string); + /* if we aren't in a comm_spawn, the non-root members won't have + * a port_string - so let's make sure everyone knows the other + * side's participants */ + + /* bcast the list-length to all processes in the local comm */ + rc = comm->c_coll.coll_bcast(&rportlen, 1, MPI_INT, root, comm, + comm->c_coll.coll_bcast_module); + if (OMPI_SUCCESS != rc) { + free(rport); + goto exit; } - opal_list_append(&ilist, &pdat->super); - OBJ_CONSTRUCT(&mlist, opal_list_t); - /* if a non-blocking version of lookup isn't - * available, then use the blocking version */ - if (NULL == opal_pmix.lookup_nb) { - rc = opal_pmix.lookup(&ilist, &mlist); - OPAL_LIST_DESTRUCT(&mlist); - if (OPAL_SUCCESS != rc) { - OMPI_ERROR_LOG(rc); - OPAL_LIST_DESTRUCT(&ilist); - opal_argv_free(members); - return OMPI_ERROR; - } - } else { - char **keys = NULL; - struct lookup_caddy_t caddy; - opal_argv_append_nosize(&keys, pdat->value.key); - caddy.active = true; - caddy.pdat = pdat; - /* tell it to wait for the data to arrive */ - info = OBJ_NEW(opal_value_t); - info->key = strdup(OPAL_PMIX_WAIT); - info->type = OPAL_BOOL; - info->data.flag = true; - opal_list_append(&mlist, &info->super); - /* give it a decent timeout as we don't know when - * the other side may call connect - it doesn't - * have to be simultaneous */ - info = OBJ_NEW(opal_value_t); - info->key = strdup(OPAL_PMIX_TIMEOUT); - info->type = OPAL_INT; - info->data.integer = (size < 60) ? size : 60; - opal_list_append(&mlist, &info->super); - rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy); - if (OPAL_SUCCESS != rc) { - OPAL_LIST_DESTRUCT(&ilist); - OPAL_LIST_DESTRUCT(&mlist); - opal_argv_free(keys); - opal_argv_free(members); - return OMPI_ERROR; - } - OMPI_WAIT_FOR_COMPLETION(caddy.active); - opal_argv_free(keys); - OPAL_LIST_DESTRUCT(&mlist); - if (OPAL_SUCCESS != caddy.status) { - OMPI_ERROR_LOG(caddy.status); - OPAL_LIST_DESTRUCT(&ilist); - opal_argv_free(members); - return OMPI_ERROR; + + if (rank != root) { + /* non root processes need to allocate the buffer manually */ + rport = (char*)malloc(rportlen); + if (NULL == rport) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; } } + /* now share the list of remote participants */ + rc = comm->c_coll.coll_bcast(rport, rportlen, MPI_BYTE, root, comm, + comm->c_coll.coll_bcast_module); + if (OMPI_SUCCESS != rc) { + free(rport); + goto exit; + } /* initiate a list of participants for the connect, * starting with our own members, remembering to @@ -370,8 +334,9 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) { OMPI_ERROR_LOG(rc); opal_argv_free(members); + free(rport); OPAL_LIST_DESTRUCT(&mlist); - return rc; + goto exit; } /* if the rank is wildcard, then we need to add all procs * in that job to the list */ @@ -390,7 +355,9 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM); OPAL_LIST_DESTRUCT(&mlist); opal_argv_free(members); - return OMPI_ERR_BAD_PARAM; + free(rport); + rc = OMPI_ERR_BAD_PARAM; + goto exit; } ++i; } else { @@ -403,21 +370,22 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, /* the pdat object will contain a colon-delimited list * of process names for the remote procs - convert it * into an argv array */ - members = opal_argv_split(pdat->value.data.string, ':'); - OPAL_LIST_DESTRUCT(&ilist); + members = opal_argv_split(rport, ':'); + free(rport); /* the first entry is the root for the remote side */ if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&pname, members[0]))) { OMPI_ERROR_LOG(rc); opal_argv_free(members); - return rc; + goto exit; } /* check the name - it should never be a wildcard, so * this is just checking for an error */ if (OPAL_VPID_WILDCARD == pname.vpid) { OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM); opal_argv_free(members); - return OMPI_ERR_BAD_PARAM; + rc = OMPI_ERR_BAD_PARAM; + goto exit; } /* add the list of remote procs to our list, and @@ -432,7 +400,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, opal_argv_free(members); OPAL_LIST_DESTRUCT(&ilist); OPAL_LIST_DESTRUCT(&rlist); - return rc; + goto exit; } if (OPAL_VPID_WILDCARD == nm->name.vpid) { jobid = nm->name.jobid; @@ -446,7 +414,8 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, opal_argv_free(members); OPAL_LIST_DESTRUCT(&ilist); OPAL_LIST_DESTRUCT(&rlist); - return OMPI_ERR_BAD_PARAM; + rc = OMPI_ERR_BAD_PARAM; + goto exit; } rsize = strtoul(members[i+1], NULL, 10); ++i; @@ -485,10 +454,16 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, opal_argv_free(members); /* tell the host RTE to connect us - this will download - * all known data for the nspace's of participating procs */ + * all known data for the nspace's of participating procs + * so that add_procs will not result in a slew of lookups */ rc = opal_pmix.connect(&mlist); OPAL_LIST_DESTRUCT(&mlist); - + if (OPAL_SUCCESS != rc) { + OMPI_ERROR_LOG(rc); + OPAL_LIST_DESTRUCT(&ilist); + OPAL_LIST_DESTRUCT(&rlist); + goto exit; + } if (0 < opal_list_get_size(&ilist)) { /* convert the list of new procs to a proc_t array */ new_proc_list = (ompi_proc_t**)calloc(opal_list_get_size(&ilist), @@ -598,6 +573,10 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, */ exit: + if (NULL != pkeys) { + opal_pmix.unpublish(pkeys, NULL); + opal_argv_free(pkeys); + } if (OMPI_SUCCESS != rc) { if (MPI_COMM_NULL != newcomp && NULL != newcomp) { OBJ_RETAIN(newcomp); @@ -695,13 +674,12 @@ int ompi_dpm_disconnect(ompi_communicator_t *comm) return ret; } - opal_pmix.fence(&coll, false); - - /* ensure we tell the host RM to disconnect us */ - opal_pmix.disconnect(&coll); + /* ensure we tell the host RM to disconnect us - this + * is a blocking operation that must include a fence */ + ret = opal_pmix.disconnect(&coll); OPAL_LIST_DESTRUCT(&coll); - return OMPI_SUCCESS; + return ret; } int ompi_dpm_spawn(int count, const char *array_of_commands[], @@ -1067,7 +1045,7 @@ int ompi_dpm_open_port(char *port_name) r = opal_rand(&rnd); opal_convert_process_name_to_string(&tmp, OMPI_PROC_MY_NAME); - snprintf(port_name, MPI_MAX_PORT_NAME, "%s:%u", tmp, r); + snprintf(port_name, MPI_MAX_PORT_NAME-1, "%s:%u", tmp, r); free(tmp); return OMPI_SUCCESS; } diff --git a/ompi/mpi/c/comm_join.c b/ompi/mpi/c/comm_join.c index d1493f8eb0..d94b1638ac 100644 --- a/ompi/mpi/c/comm_join.c +++ b/ompi/mpi/c/comm_join.c @@ -100,6 +100,9 @@ int MPI_Comm_join(int fd, MPI_Comm *intercomm) send_first = true; } + /* ensure the port name is NULL terminated */ + memset(port_name, 0, MPI_MAX_PORT_NAME); + /* Assumption: socket_send should not block, even if the socket is not configured to be non-blocking, because the message length are so short. */ diff --git a/opal/mca/pmix/pmix1xx/pmix1_client.c b/opal/mca/pmix/pmix1xx/pmix1_client.c index 69bc44cc53..7c9ae9f927 100644 --- a/opal/mca/pmix/pmix1xx/pmix1_client.c +++ b/opal/mca/pmix/pmix1xx/pmix1_client.c @@ -301,7 +301,7 @@ int pmix1_get(const opal_process_name_t *proc, opal_output_verbose(1, opal_pmix_base_framework.framework_output, "%s PMIx_client get on proc %s key %s", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*proc), key); + (NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key); /* prep default response */ *val = NULL; @@ -371,7 +371,7 @@ int pmix1_getnb(const opal_process_name_t *proc, const char *key, opal_output_verbose(1, opal_pmix_base_framework.framework_output, "%s PMIx_client get_nb on proc %s key %s", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*proc), key); + (NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key); /* create the caddy */ op = OBJ_NEW(pmix1_opcaddy_t); @@ -501,7 +501,7 @@ int pmix1_lookup(opal_list_t *data, opal_list_t *info) ++n; } } else { - pdata = NULL; + pinfo = NULL; ninfo = 0; } diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index 4649251b29..9cb387a0c2 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -569,6 +569,15 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, } } +static void opcon(orte_pmix_server_op_caddy_t *p) +{ + p->procs = NULL; + p->info = NULL; + p->cbdata = NULL; +} +OBJ_CLASS_INSTANCE(orte_pmix_server_op_caddy_t, + opal_object_t, + opcon, NULL); static void rqcon(pmix_server_req_t *p) { diff --git a/orte/orted/pmix/pmix_server_dyn.c b/orte/orted/pmix/pmix_server_dyn.c index b7d49b55d4..18a5d1b932 100644 --- a/orte/orted/pmix/pmix_server_dyn.c +++ b/orte/orted/pmix/pmix_server_dyn.c @@ -46,7 +46,8 @@ #include "orte/runtime/orte_globals.h" #include "orte/mca/rml/rml.h" -#include "pmix_server_internal.h" +#include "orte/orted/pmix/pmix_server.h" +#include "orte/orted/pmix/pmix_server_internal.h" void pmix_server_launch_resp(int status, orte_process_name_t* sender, opal_buffer_t *buffer, @@ -327,6 +328,119 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor, return OPAL_SUCCESS; } +static void _cnct(int sd, short args, void *cbdata); + +static void _cnlk(int status, opal_list_t *data, void *cbdata) +{ + orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata; + int rc, cnt; + opal_pmix_pdata_t *pdat; + orte_job_t *jdata; + opal_buffer_t buf; + + /* if we failed to get the required data, then just inform + * the embedded server that the connect cannot succeed */ + if (ORTE_SUCCESS != status || NULL == data) { + if (NULL != cd->cbfunc) { + rc = status; + goto release; + } + } + + /* register the returned data with the embedded PMIx server */ + pdat = (opal_pmix_pdata_t*)opal_list_get_first(data); + if (OPAL_BYTE_OBJECT != pdat->value.type) { + rc = ORTE_ERR_BAD_PARAM; + goto release; + } + /* the data will consist of a packed buffer with the job data in it */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.load(&buf, pdat->value.data.bo.bytes, pdat->value.data.bo.size); + pdat->value.data.bo.bytes = NULL; + pdat->value.data.bo.size = 0; + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) { + OBJ_DESTRUCT(&buf); + goto release; + } + OBJ_DESTRUCT(&buf); + if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata))) { + OBJ_RELEASE(jdata); + goto release; + } + OBJ_RELEASE(jdata); // no reason to keep this around + + /* restart the cnct processor */ + ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata); + OBJ_RELEASE(cd); + + release: + if (NULL != cd->cbfunc) { + cd->cbfunc(rc, cd->cbdata); + } + OBJ_RELEASE(cd); +} + +static void _cnct(int sd, short args, void *cbdata) +{ + orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata; + orte_namelist_t *nm; + char **keys = NULL, *key; + orte_job_t *jdata; + int rc = ORTE_SUCCESS; + + /* at some point, we need to add bookeeping to track which + * procs are "connected" so we know who to notify upon + * termination or failure. For now, we have to ensure + * that we have registered all participating nspaces so + * the embedded PMIx server can provide them to the client. + * Otherwise, the client will receive an error as it won't + * be able to resolve any of the required data for the + * missing nspaces */ + + /* cycle thru the procs */ + OPAL_LIST_FOREACH(nm, cd->procs, orte_namelist_t) { + /* see if we have the job object for this job */ + if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) { + /* we don't know about this job. If our "global" data + * server is just our HNP, then we have no way of finding + * out about it, and all we can do is return an error */ + if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid && + orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) { + rc = ORTE_ERR_NOT_SUPPORTED; + goto release; + } + /* ask the global data server for the data - if we get it, + * then we can complete the request */ + key = opal_convert_jobid_to_string(nm->name.jobid); + opal_argv_append_nosize(&keys, key); + free(key); + if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) { + opal_argv_free(keys); + goto release; + } + opal_argv_free(keys); + /* the callback function on this lookup will return us to this + * routine so we can continue the process */ + return; + } + /* we know about the job - check to ensure it has been + * registered with the local PMIx server */ + if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) { + /* it hasn't been registered yet, so register it now */ + if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata))) { + goto release; + } + } + } + + release: + if (NULL != cd->cbfunc) { + cd->cbfunc(rc, cd->cbdata); + } + OBJ_RELEASE(cd); +} + int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info, opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { @@ -335,26 +449,52 @@ int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)opal_list_get_size(procs)); - /* for now, just ack the call */ - if (NULL != cbfunc) { - cbfunc(OPAL_SUCCESS, cbdata); + /* protect ourselves */ + if (NULL == procs || 0 == opal_list_get_size(procs)) { + return ORTE_ERR_BAD_PARAM; } + /* must thread shift this as we will be accessing global data */ + ORTE_PMIX_OPERATION(procs, info, _cnct, cbfunc, cbdata); + return ORTE_SUCCESS; +} - return OPAL_SUCCESS; +static void mdxcbfunc(int status, + const char *data, size_t ndata, void *cbdata, + opal_pmix_release_cbfunc_t relcbfunc, void *relcbdata) +{ + orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata; + + /* ack the call */ + if (NULL != cd->cbfunc) { + cd->cbfunc(status, cd->cbdata); + } + OBJ_RELEASE(cd); } int pmix_server_disconnect_fn(opal_list_t *procs, opal_list_t *info, opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { + orte_pmix_server_op_caddy_t *cd; + int rc; + opal_output_verbose(2, orte_pmix_server_globals.output, "%s disconnect called with %d procs", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)opal_list_get_size(procs)); - /* for now, just ack the call */ - if (NULL != cbfunc) { - cbfunc(OPAL_SUCCESS, cbdata); + /* at some point, we need to add bookeeping to track which + * procs are "connected" so we know who to notify upon + * termination or failure. For now, just execute a fence + * Note that we do not need to thread-shift here as the + * fence function will do it for us */ + cd = OBJ_NEW(orte_pmix_server_op_caddy_t); + cd->cbfunc = cbfunc; + cd->cbdata = cbdata; + + if (ORTE_SUCCESS != (rc = pmix_server_fencenb_fn(procs, info, NULL, 0, + mdxcbfunc, cd))) { + OBJ_RELEASE(cd); } - return OPAL_SUCCESS; + return rc; } diff --git a/orte/orted/pmix/pmix_server_internal.h b/orte/orted/pmix/pmix_server_internal.h index 576c610fc9..f632fecee8 100644 --- a/orte/orted/pmix/pmix_server_internal.h +++ b/orte/orted/pmix/pmix_server_internal.h @@ -72,10 +72,8 @@ OBJ_CLASS_DECLARATION(pmix_server_req_t); typedef struct { opal_object_t super; opal_event_t ev; - orte_job_t *jdata; - orte_process_name_t proc; - int status; - orte_proc_t *object; + opal_list_t *procs; + opal_list_t *info; opal_pmix_op_cbfunc_t cbfunc; void *cbdata; } orte_pmix_server_op_caddy_t; @@ -115,21 +113,18 @@ do { \ opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \ } while(0); -#define ORTE_PMIX_OPERATION(n, r, ob, s, fn, cf, cb) \ -do { \ - orte_pmix_server_op_caddy_t *_cd; \ - _cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \ - /* convert the namespace to jobid and create name */ \ - orte_util_convert_string_to_jobid(&(_cd->proc.jobid), (n)); \ - _cd->proc.vpid = (r); \ - _cd->object = (ob); \ - _cd->cbfunc = (cf); \ - _cd->cbdata = (cb); \ - _cd->status = (s); \ - opal_event_set(orte_event_base, &(_cd->ev), -1, \ - OPAL_EV_WRITE, (fn), _cd); \ - opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \ - opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \ +#define ORTE_PMIX_OPERATION(p, i, fn, cf, cb) \ +do { \ + orte_pmix_server_op_caddy_t *_cd; \ + _cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \ + _cd->procs = (p); \ + _cd->info = (i); \ + _cd->cbfunc = (cf); \ + _cd->cbdata = (cb); \ + opal_event_set(orte_event_base, &(_cd->ev), -1, \ + OPAL_EV_WRITE, (fn), _cd); \ + opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \ + opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \ } while(0); diff --git a/orte/orted/pmix/pmix_server_register_fns.c b/orte/orted/pmix/pmix_server_register_fns.c index b7718f663e..67e67e0ec8 100644 --- a/orte/orted/pmix/pmix_server_register_fns.c +++ b/orte/orted/pmix/pmix_server_register_fns.c @@ -387,6 +387,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata) opal_list_append(pmap, &kv->super); } + /* mark the job as registered */ + orte_set_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, ORTE_ATTR_LOCAL, NULL, OPAL_BOOL); + /* pass it down */ if (OPAL_SUCCESS != opal_pmix.server_register_nspace(jdata->jobid, jdata->num_local_procs, diff --git a/orte/util/attr.c b/orte/util/attr.c index 6f7eba57aa..d4d96334b6 100644 --- a/orte/util/attr.c +++ b/orte/util/attr.c @@ -259,6 +259,8 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key) return "JOB-ROOM-NUM"; case ORTE_JOB_LAUNCH_PROXY: return "JOB-LAUNCH-PROXY"; + case ORTE_JOB_NSPACE_REGISTERED: + return "JOB-NSPACE-REGISTERED"; case ORTE_PROC_NOBARRIER: return "PROC-NOBARRIER"; diff --git a/orte/util/attr.h b/orte/util/attr.h index c11f104ecb..2bf5b8265d 100644 --- a/orte/util/attr.h +++ b/orte/util/attr.h @@ -129,6 +129,7 @@ typedef uint16_t orte_job_flags_t; #define ORTE_JOB_NOTIFICATIONS (ORTE_JOB_START_KEY + 38) // string - comma-separated list of desired notifications+methods #define ORTE_JOB_ROOM_NUM (ORTE_JOB_START_KEY + 39) // int - number of remote request's hotel room #define ORTE_JOB_LAUNCH_PROXY (ORTE_JOB_START_KEY + 40) // opal_process_name_t - name of spawn requestor +#define ORTE_JOB_NSPACE_REGISTERED (ORTE_JOB_START_KEY + 41) // bool - job has been registered with embedded PMIx server #define ORTE_JOB_MAX_KEY 300