1
1
openmpi/orte/orted/pmix/pmix_server_fence.c
2015-09-30 10:33:53 -07:00

244 строки
8.6 KiB
C

/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "opal/util/output.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/rml/rml.h"
#include "pmix_server_internal.h"
static void relcb(void *cbdata)
{
uint8_t *data = (uint8_t*)cbdata;
if (NULL != data) {
free(data);
}
}
static void pmix_server_release(int status, opal_buffer_t *buf, void *cbdata)
{
orte_pmix_mdx_caddy_t *cd=(orte_pmix_mdx_caddy_t*)cbdata;
char *data = NULL;
int32_t ndata = 0;
int rc = OPAL_SUCCESS;
/* unload the buffer */
if (NULL != buf) {
rc = opal_dss.unload(buf, (void**)&data, &ndata);
}
if (OPAL_SUCCESS == rc) {
rc = status;
}
cd->cbfunc(rc, data, ndata, cd->cbdata, relcb, data);
OBJ_RELEASE(cd);
}
/* this function is called when all the local participants have
* called fence - thus, the collective is already locally
* complete at this point. We therefore just need to create the
* signature and pass the collective into grpcomm */
int pmix_server_fencenb_fn(opal_list_t *procs, opal_list_t *info,
char *data, size_t ndata,
opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
{
orte_pmix_mdx_caddy_t *cd=NULL;
int rc;
opal_namelist_t *nm;
size_t i;
opal_buffer_t *buf=NULL;
cd = OBJ_NEW(orte_pmix_mdx_caddy_t);
cd->cbfunc = cbfunc;
cd->cbdata = cbdata;
/* compute the signature of this collective */
if (NULL != procs) {
cd->sig = OBJ_NEW(orte_grpcomm_signature_t);
cd->sig->sz = opal_list_get_size(procs);
cd->sig->signature = (orte_process_name_t*)malloc(cd->sig->sz * sizeof(orte_process_name_t));
memset(cd->sig->signature, 0, cd->sig->sz * sizeof(orte_process_name_t));
i=0;
OPAL_LIST_FOREACH(nm, procs, opal_namelist_t) {
cd->sig->signature[i].jobid = nm->name.jobid;
cd->sig->signature[i].vpid = nm->name.vpid;
++i;
}
}
buf = OBJ_NEW(opal_buffer_t);
if (NULL != data) {
opal_dss.load(buf, data, ndata);
}
if (4 < opal_output_get_verbosity(orte_pmix_server_globals.output)) {
char *tmp=NULL;
(void)opal_dss.print(&tmp, NULL, cd->sig, ORTE_SIGNATURE);
free(tmp);
}
/* pass it to the global collective algorithm */
/* pass along any data that was collected locally */
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(cd->sig, buf, pmix_server_release, cd))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
static void dmodex_req(int sd, short args, void *cbdata)
{
pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
pmix_server_req_t *r;
orte_job_t *jdata;
orte_proc_t *proct, *dmn;
int rc, rnum;
opal_buffer_t *buf;
uint8_t *data=NULL;
int32_t sz=0;
/* a race condition exists here because of the thread-shift - it is
* possible that data for the specified proc arrived while we were
* waiting to be serviced. In that case, the tracker that would have
* indicated the data was already requested will have been removed,
* and we would therefore think that we had to request it again.
* So do a quick check to ensure we don't already have the desired
* data */
OPAL_MODEX_RECV_STRING(rc, "modex", &req->target, &data, &sz);
if (OPAL_SUCCESS == rc) {
req->mdxcbfunc(rc, (char*)data, sz, req->cbdata, relcb, data);
OBJ_RELEASE(req);
return;
}
/* lookup who is hosting this proc */
if (NULL == (jdata = orte_get_job_data_object(req->target.jobid))) {
/* if we don't know the job, then it could be a race
* condition where we are being asked about a process
* that we don't know about yet. In this case, just
* record the request and we will process it later */
if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
ORTE_ERROR_LOG(rc);
/* can't just return as that would cause the requestor
* to hang, so instead execute the callback */
goto callback;
}
return;
}
if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, req->target.vpid))) {
/* if we find the job, but not the process, then that is an error */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto callback;
}
if (NULL == (dmn = proct->node->daemon)) {
/* we don't know where this proc is located - since we already
* found the job, and therefore know about its locations, this
* must be an error */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto callback;
}
/* has anyone already requested data for this job from this node? If so,
* then the data is already on its way */
for (rnum=0; rnum < orte_pmix_server_globals.reqs.num_rooms; rnum++) {
opal_hotel_knock(&orte_pmix_server_globals.reqs, rnum, (void**)&r);
if (NULL == r) {
continue;
}
if (r->target.jobid == req->target.jobid &&
r->proxy.vpid == dmn->name.vpid) {
/* we have this request, so just wait for the return */
return;
}
}
/* track the request so we know the function and cbdata
* to callback upon completion */
if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
ORTE_ERROR_LOG(rc);
goto callback;
}
/* construct a request message */
buf = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->target, 1, OPAL_NAME))) {
ORTE_ERROR_LOG(rc);
opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
OBJ_RELEASE(buf);
goto callback;
}
/* include the request room number for quick retrieval */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->room_num, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
OBJ_RELEASE(buf);
goto callback;
}
/* send it to the host daemon */
if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&dmn->name, buf, ORTE_RML_TAG_DIRECT_MODEX,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
OBJ_RELEASE(buf);
goto callback;
}
return;
callback:
/* this section gets executed solely upon an error */
if (NULL != req->mdxcbfunc) {
req->mdxcbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
}
OBJ_RELEASE(req);
}
/* the local PMIx embedded server will use this function to call
* us and request that we obtain data from a remote daemon */
int pmix_server_dmodex_req_fn(opal_process_name_t *proc, opal_list_t *info,
opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
{
/* we have to shift threads to the ORTE thread, so
* create a request and push it into that thread */
ORTE_DMX_REQ(*proc, dmodex_req, cbfunc, cbdata);
return OPAL_ERR_IN_PROCESS;
}