diff --git a/opal/mca/pmix/pmix1xx/pmix/VERSION b/opal/mca/pmix/pmix1xx/pmix/VERSION index 3ef5324908..ea4a1fcfb9 100644 --- a/opal/mca/pmix/pmix1xx/pmix/VERSION +++ b/opal/mca/pmix/pmix1xx/pmix/VERSION @@ -30,7 +30,7 @@ greek=a1 # command, or with the date (if "git describe" fails) in the form of # "date". -repo_rev=gita1cad92 +repo_rev=git51479b0 # If tarball_version is not empty, it is used as the version string in # the tarball filename, regardless of all other versions listed in @@ -44,7 +44,7 @@ tarball_version= # The date when this release was created -date="Aug 31, 2015" +date="Sep 01, 2015" # The shared library version of each of PMIx's public libraries. # These versions are maintained in accordance with the "Library diff --git a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c index e3636f4fdf..831d373b1d 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client.c @@ -155,7 +155,7 @@ static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, pmix_status_t rc; char *nspace; int32_t cnt = 1; - bool *active = (bool*)cbdata; + pmix_cb_t *cb = (pmix_cb_t*)cbdata; /* unpack the nspace - we don't really need it, but have to * unpack it to maintain sequence */ @@ -165,10 +165,10 @@ static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, } /* decode it */ pmix_client_process_nspace_blob(pmix_globals.myid.nspace, buf); - *active = false; + cb->active = false; } -static int connect_to_server(struct sockaddr_un *address, bool *active) +static int connect_to_server(struct sockaddr_un *address, void *cbdata) { int rc; pmix_cmd_t cmd = PMIX_REQ_CMD; @@ -206,7 +206,7 @@ static int connect_to_server(struct sockaddr_un *address, bool *active) PMIX_RELEASE(req); return rc; } - PMIX_ACTIVATE_SEND_RECV(&pmix_client_globals.myserver, req, job_data, active); + PMIX_ACTIVATE_SEND_RECV(&pmix_client_globals.myserver, req, job_data, cbdata); return PMIX_SUCCESS; } @@ -222,7 +222,7 @@ int PMIx_Init(pmix_proc_t *proc) int rc, debug_level; struct sockaddr_un address; pmix_nspace_t *nsptr; - bool active; + pmix_cb_t cb; if (NULL == proc) { return PMIX_ERR_BAD_PARAM; @@ -321,11 +321,14 @@ int PMIx_Init(pmix_proc_t *proc) } /* connect to the server - returns job info if successful */ - active = true; - if (PMIX_SUCCESS != (rc = connect_to_server(&address, &active))){ + PMIX_CONSTRUCT(&cb, pmix_cb_t); + cb.active = true; + if (PMIX_SUCCESS != (rc = connect_to_server(&address, &cb))){ + PMIX_DESTRUCT(&cb); return rc; } - PMIX_WAIT_FOR_COMPLETION(active); + PMIX_WAIT_FOR_COMPLETION(cb.active); + PMIX_DESTRUCT(&cb); return PMIX_SUCCESS; } diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c index e2504b55b6..390625260a 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c @@ -80,41 +80,8 @@ static void relfn(void *cbdata) free(data); } -static pmix_status_t _satisfy_local_req(pmix_nspace_t *nptr, pmix_rank_info_t *info, - pmix_modex_cbfunc_t cbfunc, void *cbdata) -{ - int rc; - pmix_buffer_t pbkt, xfer; - pmix_value_t *val; - char *data; - size_t sz; - - /* check for the local/global data - data committed to remote - * scope does not get returned to a local proc - * get any local/global contribution - note that there - * may not be a contribution */ - PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); - rc = pmix_hash_fetch(&info->nptr->server->mylocal, info->rank, "modex", &val); - if (PMIX_SUCCESS == rc && NULL != val) { - PMIX_CONSTRUCT(&xfer, pmix_buffer_t); - pmix_buffer_t *pxfer = &xfer; - PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); - pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER); - xfer.base_ptr = NULL; - xfer.bytes_used = 0; - PMIX_DESTRUCT(&xfer); - PMIX_VALUE_RELEASE(val); - PMIX_UNLOAD_BUFFER(&pbkt, data, sz); - PMIX_DESTRUCT(&pbkt); - /* pass it back */ - cbfunc(rc, data, sz, cbdata, relfn, data); - return rc; - } - - return PMIX_ERR_NOT_FOUND; -} - -static pmix_status_t _satisfy_remote_req(pmix_nspace_t *nptr, int rank, +static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank, + pmix_hash_table_t *ht, pmix_modex_cbfunc_t cbfunc, void *cbdata) { int rc; @@ -126,7 +93,7 @@ static pmix_status_t _satisfy_remote_req(pmix_nspace_t *nptr, int rank, /* check to see if this data already has been * obtained as a result of a prior direct modex request from * another local peer */ - rc = pmix_hash_fetch(&nptr->server->remote, rank, "modex", &val); + rc = pmix_hash_fetch(ht, rank, "modex", &val); if (PMIX_SUCCESS == rc && NULL != val) { PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); PMIX_CONSTRUCT(&xfer, pmix_buffer_t); @@ -151,33 +118,23 @@ pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, pmix_modex_cbfunc_t cbfunc, void *cbdata) { pmix_dmdx_local_t *lcd = NULL, *cd; - pmix_rank_info_t *iptr, *rkinfo; + pmix_rank_info_t *iptr; + pmix_hash_table_t *ht; int rc; /* 1. Try to satisfy the request right now */ - rkinfo = NULL; + + /* by default we are looking for the remote data */ + ht = &nptr->server->remote; PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { if (iptr->rank == rank) { - rkinfo = iptr; + /* in case it is known local rank - check local table */ + ht = &nptr->server->mylocal; break; } } - if (NULL != rkinfo) { - /* this is a request for info about another local proc */ - rc = _satisfy_local_req(nptr, rkinfo, cbfunc, cbdata); - } else { - /* this is a request for info about a remote proc, or - * a LOCAL proc that we don't know about yet. Because - * we register client's in events, it is always possible - * that a client gets launched and issues a "get" before - * we finish registering all our local clients. So we - * have to consider the case where a request is made for - * data about a process that will be local, but hasn't - * been registered yet */ - rc = _satisfy_remote_req(nptr, rank, cbfunc, cbdata); - } - + rc = _satisfy_request(nptr, rank, ht, cbfunc, cbdata); if( PMIX_SUCCESS == rc ){ /* request was successfully satisfied */ PMIX_INFO_FREE(info, ninfo); @@ -213,7 +170,6 @@ pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, lcd->proc.rank = rank; lcd->info = info; lcd->ninfo = ninfo; - PMIX_CONSTRUCT(&lcd->loc_reqs, pmix_list_t); pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super); /* check & send request if need/possible */ @@ -280,7 +236,9 @@ void pmix_pending_nspace_fix(pmix_nspace_t *nptr) } } -pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, pmix_dmdx_local_t *lcd) +/* Resolve pending requests to this namespace/rank */ +pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, + pmix_status_t status, pmix_dmdx_local_t *lcd) { pmix_dmdx_local_t *cd; @@ -296,29 +254,44 @@ pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, pmix_dmdx_loca } } + /* If somebody was interested in this rank */ if( NULL != lcd ){ - /* check if this rank is local */ - pmix_rank_info_t *iptr, *info = NULL; - PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { - if (iptr->rank == rank) { - info = iptr; - break; - } - } - pmix_dmdx_request_t *req; - PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { - int rc; - if( NULL != info ){ - rc = _satisfy_local_req(nptr, info, req->cbfunc, req->cbdata); - } else { - rc = _satisfy_remote_req(nptr, rank, req->cbfunc, req->cbdata); + + if (PMIX_SUCCESS != status){ + /* if we've got an error for this request - just forward it*/ + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { + /* if we can't satisfy this request - respond with error */ + req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL); } - if( PMIX_SUCCESS != rc ){ - return rc; + } else { + /* if we've got the blob - try to satisfy requests */ + pmix_hash_table_t *ht; + pmix_rank_info_t *iptr; + + /* by default we are looking for the remote data */ + ht = &nptr->server->remote; + /* check if this rank is local */ + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { + if (iptr->rank == rank) { + ht = &nptr->server->mylocal; + break; + } + } + + /* run through all the requests to this rank */ + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { + pmix_status_t rc; + rc = _satisfy_request(nptr, rank, ht, req->cbfunc, req->cbdata); + if( PMIX_SUCCESS != rc ){ + /* if we can't satisfy this particular request (missing key?) */ + req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL); + } } } + /* remove all requests to this rank and cleanup the corresponding structure */ pmix_list_remove_item(&pmix_server_globals.local_reqs, (pmix_list_item_t*)lcd); + PMIX_RELEASE(lcd); } return PMIX_SUCCESS; } @@ -483,7 +456,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf) } } /* see if anyone local is waiting on this data- could be more than one */ - return pmix_pending_resolve(nptr, info->rank, NULL); + return pmix_pending_resolve(nptr, info->rank, PMIX_SUCCESS, NULL); } /* get an existing object for tracking LOCAL participation in a collective @@ -871,25 +844,26 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata) /* should be impossible */ PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); caddy->status = PMIX_ERR_NOT_FOUND; - PMIX_RELEASE(caddy); goto cleanup; } - kp = PMIX_NEW(pmix_kval_t); - kp->key = strdup("modex"); - PMIX_VALUE_CREATE(kp->value, 1); - kp->value->type = PMIX_BYTE_OBJECT; - kp->value->data.bo.bytes = (char*)caddy->data; - kp->value->data.bo.size = caddy->ndata; - /* store it in the appropriate hash */ - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) { - PMIX_ERROR_LOG(rc); + if (PMIX_SUCCESS == caddy->status) { + kp = PMIX_NEW(pmix_kval_t); + kp->key = strdup("modex"); + PMIX_VALUE_CREATE(kp->value, 1); + kp->value->type = PMIX_BYTE_OBJECT; + kp->value->data.bo.bytes = (char*)caddy->data; + kp->value->data.bo.size = caddy->ndata; + /* store it in the appropriate hash */ + if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) { + PMIX_ERROR_LOG(rc); + } + PMIX_RELEASE(kp); // maintain acctg } - PMIX_RELEASE(kp); // maintain acctg cleanup: /* always execute the callback to avoid having the client hang */ - pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->lcd); + pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd); /* now call the release function so the host server * knows it can release the data */ @@ -1478,12 +1452,14 @@ PMIX_CLASS_INSTANCE(pmix_dmdx_request_t, static void lmcon(pmix_dmdx_local_t *p) { memset(&p->proc, 0, sizeof(pmix_proc_t)); + PMIX_CONSTRUCT(&p->loc_reqs, pmix_list_t); p->info = NULL; p->ninfo = 0; } static void lmdes(pmix_dmdx_local_t *p) { PMIX_INFO_FREE(p->info, p->ninfo); + PMIX_DESTRUCT(&p->loc_reqs); } PMIX_CLASS_INSTANCE(pmix_dmdx_local_t, pmix_list_item_t, diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h index b4001fbab2..d6594766bf 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h @@ -185,7 +185,8 @@ pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, pmix_info_t *info, size_t ninfo, pmix_modex_cbfunc_t cbfunc, void *cbdata); void pmix_pending_nspace_fix(pmix_nspace_t *nptr); -pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, pmix_dmdx_local_t *lcd); +pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, + pmix_status_t status, pmix_dmdx_local_t *lcd); pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf,