Merge pull request #3398 from rhc54/topic/modex
Implement a background fence that collects all data during modex operation
Этот коммит содержится в:
Коммит
f2ed293ecd
@ -654,15 +654,40 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
opal_pmix.commit();
|
||||
OMPI_TIMING_NEXT("commit");
|
||||
|
||||
if (!opal_pmix_base_async_modex) {
|
||||
if (NULL != opal_pmix.fence_nb) {
|
||||
/* If we have a non-blocking fence:
|
||||
* if we are doing an async modex, but we are collecting all
|
||||
* data, then execute the non-blocking modex in the background.
|
||||
* All calls to modex_recv will be cached until the background
|
||||
* modex completes. If collect_all_data is false, then we skip
|
||||
* the fence completely and retrieve data on-demand from the
|
||||
* source node.
|
||||
*
|
||||
* If we do not have a non-blocking fence, then we must always
|
||||
* execute the blocking fence as the system does not support
|
||||
* later data retrieval. */
|
||||
if (NULL != opal_pmix.fence_nb) {
|
||||
if (opal_pmix_base_async_modex && opal_pmix_collect_all_data) {
|
||||
/* execute the fence_nb in the background to collect
|
||||
* the data */
|
||||
if (!ompi_async_mpi_init) {
|
||||
/* we are going to execute a barrier at the
|
||||
* end of MPI_Init. We can only have ONE fence
|
||||
* operation with the identical involved procs
|
||||
* at a time, so we will need to wait when we
|
||||
* get there */
|
||||
active = true;
|
||||
opal_pmix.fence_nb(NULL, true, fence_release, (void*)&active);
|
||||
} else {
|
||||
opal_pmix.fence_nb(NULL, true, NULL, NULL);
|
||||
}
|
||||
} else if (!opal_pmix_base_async_modex) {
|
||||
active = true;
|
||||
opal_pmix.fence_nb(NULL, opal_pmix_collect_all_data,
|
||||
fence_release, (void*)&active);
|
||||
OMPI_LAZY_WAIT_FOR_COMPLETION(active);
|
||||
} else {
|
||||
opal_pmix.fence(NULL, opal_pmix_collect_all_data);
|
||||
}
|
||||
} else {
|
||||
opal_pmix.fence(NULL, opal_pmix_collect_all_data);
|
||||
}
|
||||
|
||||
OMPI_TIMING_NEXT("modex");
|
||||
@ -832,13 +857,20 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
* barrier requirement at this time, though we hope to relax
|
||||
* it at a later point */
|
||||
if (!ompi_async_mpi_init) {
|
||||
active = true;
|
||||
if (NULL != opal_pmix.fence_nb) {
|
||||
opal_pmix.fence_nb(NULL, false,
|
||||
fence_release, (void*)&active);
|
||||
/* if we executed the above fence in the background, then
|
||||
* we have to wait here for it to complete. However, there
|
||||
* is no reason to do two barriers! */
|
||||
if (opal_pmix_base_async_modex && opal_pmix_collect_all_data) {
|
||||
OMPI_LAZY_WAIT_FOR_COMPLETION(active);
|
||||
} else {
|
||||
opal_pmix.fence(NULL, false);
|
||||
active = true;
|
||||
if (NULL != opal_pmix.fence_nb) {
|
||||
opal_pmix.fence_nb(NULL, false,
|
||||
fence_release, (void*)&active);
|
||||
OMPI_LAZY_WAIT_FOR_COMPLETION(active);
|
||||
} else {
|
||||
opal_pmix.fence(NULL, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -287,11 +287,7 @@ int ompi_mpi_register_params(void)
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&ompi_mpi_dynamics_enabled);
|
||||
|
||||
if (opal_pmix_base_async_modex) {
|
||||
ompi_async_mpi_init = true;
|
||||
} else {
|
||||
ompi_async_mpi_init = false;
|
||||
}
|
||||
ompi_async_mpi_init = true;
|
||||
(void) mca_base_var_register("ompi", "async", "mpi", "init",
|
||||
"Do not perform a barrier at the end of MPI_Init",
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2015-2016 Cisco Systems, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
@ -39,7 +39,7 @@ opal_pmix_base_t opal_pmix_base = {0};
|
||||
|
||||
static int opal_pmix_base_frame_register(mca_base_register_flag_t flags)
|
||||
{
|
||||
opal_pmix_base_async_modex = false;
|
||||
opal_pmix_base_async_modex = true;
|
||||
(void) mca_base_var_register("opal", "pmix", "base", "async_modex", "Use asynchronous modex mode",
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_base_async_modex);
|
||||
|
@ -1850,7 +1850,7 @@ static void _mdxcbfunc(int sd, short argc, void *cbdata)
|
||||
}
|
||||
|
||||
finish_collective:
|
||||
if(NULL != databuf) {
|
||||
if (NULL != databuf) {
|
||||
PMIX_RELEASE(databuf);
|
||||
}
|
||||
/* setup the reply, starting with the returned status */
|
||||
|
@ -612,10 +612,10 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata)
|
||||
}
|
||||
|
||||
if (NULL == nptr) {
|
||||
/*
|
||||
* We may not have this namespace because someone asked about this namespace
|
||||
* but there are not processses from it running on this host
|
||||
*/
|
||||
/*
|
||||
* We may not have this namespace because someone asked about this namespace
|
||||
* but there are not processses from it running on this host
|
||||
*/
|
||||
nptr = PMIX_NEW(pmix_nspace_t);
|
||||
(void)strncpy(nptr->nspace, caddy->lcd->proc.nspace, PMIX_MAX_NSLEN);
|
||||
nptr->server = PMIX_NEW(pmix_server_nspace_t);
|
||||
@ -628,8 +628,12 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata)
|
||||
* store the data first so we can immediately satisfy any future
|
||||
* requests. Then, rather than duplicate the resolve code here, we
|
||||
* will let the pmix_pending_resolve function go ahead and retrieve
|
||||
* it from the hash table */
|
||||
if (PMIX_SUCCESS == caddy->status) {
|
||||
* it from the hash table.
|
||||
*
|
||||
* NOTE: A NULL data pointer indicates that the data has already
|
||||
* been returned via completion of a background fence_nb operation.
|
||||
* In this case, all we need to do is resolve the request */
|
||||
if (PMIX_SUCCESS == caddy->status && NULL != caddy->data) {
|
||||
if (caddy->lcd->proc.rank == PMIX_RANK_WILDCARD) {
|
||||
void * where = malloc(caddy->ndata);
|
||||
if (where) {
|
||||
|
@ -364,8 +364,6 @@ static pmix_server_trkr_t* new_tracker(pmix_proc_t *procs,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
assert( NULL == get_tracker(procs, nprocs, type) );
|
||||
|
||||
pmix_output_verbose(5, pmix_globals.debug_output,
|
||||
"adding new tracker with %d procs", (int)nprocs);
|
||||
|
||||
@ -468,7 +466,8 @@ pmix_status_t pmix_server_fence(pmix_server_caddy_t *cd,
|
||||
return rc;
|
||||
}
|
||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||
"recvd fence with %d procs", (int)nprocs);
|
||||
"recvd fence from %s:%u with %d procs",
|
||||
cd->peer->info->nptr->nspace, cd->peer->info->rank, (int)nprocs);
|
||||
/* there must be at least one as the client has to at least provide
|
||||
* their own namespace */
|
||||
if (nprocs < 1) {
|
||||
|
@ -1465,3 +1465,19 @@ static void tsdes(pmix2x_threadshift_t *p)
|
||||
OBJ_CLASS_INSTANCE(pmix2x_threadshift_t,
|
||||
opal_object_t,
|
||||
tscon, tsdes);
|
||||
|
||||
static void dmcon(opal_pmix2x_dmx_trkr_t *p)
|
||||
{
|
||||
p->nspace = NULL;
|
||||
p->cbfunc = NULL;
|
||||
p->cbdata = NULL;
|
||||
}
|
||||
static void dmdes(opal_pmix2x_dmx_trkr_t *p)
|
||||
{
|
||||
if (NULL != p->nspace) {
|
||||
free(p->nspace);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(opal_pmix2x_dmx_trkr_t,
|
||||
opal_list_item_t,
|
||||
dmcon, dmdes);
|
||||
|
@ -42,6 +42,7 @@ typedef struct {
|
||||
opal_list_t events;
|
||||
int cache_size;
|
||||
opal_list_t cache;
|
||||
opal_list_t dmdx;
|
||||
} mca_pmix_pmix2x_component_t;
|
||||
|
||||
OPAL_DECLSPEC extern mca_pmix_pmix2x_component_t mca_pmix_pmix2x_component;
|
||||
@ -64,6 +65,14 @@ typedef struct {
|
||||
} opal_pmix2x_event_t;
|
||||
OBJ_CLASS_DECLARATION(opal_pmix2x_event_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
char *nspace;
|
||||
pmix_modex_cbfunc_t cbfunc;
|
||||
void *cbdata;
|
||||
} opal_pmix2x_dmx_trkr_t;
|
||||
OBJ_CLASS_DECLARATION(opal_pmix2x_dmx_trkr_t);
|
||||
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
pmix_status_t status;
|
||||
|
@ -80,6 +80,7 @@ static int external_open(void)
|
||||
mca_pmix_pmix2x_component.evindex = 0;
|
||||
OBJ_CONSTRUCT(&mca_pmix_pmix2x_component.jobids, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pmix_pmix2x_component.events, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pmix_pmix2x_component.dmdx, opal_list_t);
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
@ -88,6 +89,7 @@ static int external_close(void)
|
||||
{
|
||||
OPAL_LIST_DESTRUCT(&mca_pmix_pmix2x_component.jobids);
|
||||
OPAL_LIST_DESTRUCT(&mca_pmix_pmix2x_component.events);
|
||||
OPAL_LIST_DESTRUCT(&mca_pmix_pmix2x_component.dmdx);
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -266,6 +266,7 @@ static void opmdx_response(int status, const char *data, size_t sz, void *cbdata
|
||||
{
|
||||
pmix_status_t rc;
|
||||
pmix2x_opalcaddy_t *opalcaddy = (pmix2x_opalcaddy_t*)cbdata;
|
||||
opal_pmix2x_dmx_trkr_t *dmdx;
|
||||
|
||||
rc = pmix2x_convert_rc(status);
|
||||
if (NULL != opalcaddy->mdxcbfunc) {
|
||||
@ -273,6 +274,13 @@ static void opmdx_response(int status, const char *data, size_t sz, void *cbdata
|
||||
opalcaddy->ocbdata = relcbdata;
|
||||
opalcaddy->mdxcbfunc(rc, data, sz, opalcaddy->cbdata,
|
||||
_data_release, opalcaddy);
|
||||
/* if we were collecting all data, then check for any pending
|
||||
* dmodx requests that we cached and notify them that the
|
||||
* data has arrived */
|
||||
while (NULL != (dmdx = (opal_pmix2x_dmx_trkr_t*)opal_list_remove_first(&mca_pmix_pmix2x_component.dmdx))) {
|
||||
dmdx->cbfunc(PMIX_SUCCESS, NULL, 0, dmdx->cbdata, NULL, NULL);
|
||||
OBJ_RELEASE(dmdx);
|
||||
}
|
||||
} else {
|
||||
OBJ_RELEASE(opalcaddy);
|
||||
}
|
||||
@ -292,7 +300,6 @@ static pmix_status_t server_fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
|
||||
if (NULL == host_module || NULL == host_module->fence_nb) {
|
||||
return PMIX_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
/* setup the caddy */
|
||||
opalcaddy = OBJ_NEW(pmix2x_opalcaddy_t);
|
||||
opalcaddy->mdxcbfunc = cbfunc;
|
||||
@ -338,6 +345,7 @@ static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *p,
|
||||
opal_process_name_t proc;
|
||||
opal_value_t *iptr;
|
||||
size_t n;
|
||||
opal_pmix2x_dmx_trkr_t *dmdx;
|
||||
|
||||
if (NULL == host_module || NULL == host_module->direct_modex) {
|
||||
return PMIX_ERR_NOT_SUPPORTED;
|
||||
@ -354,6 +362,21 @@ static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *p,
|
||||
opalcaddy->mdxcbfunc = cbfunc;
|
||||
opalcaddy->cbdata = cbdata;
|
||||
|
||||
/* this function should only get called if we are in an async modex.
|
||||
* If we are also collecting data, then the fence_nb will eventually
|
||||
* complete and return all the required data down to the pmix
|
||||
* server beneath us. Thus, we only need to track the dmodex_req
|
||||
* and ensure that the release gets called once the data has
|
||||
* arrived - this will trigger the pmix server to tell the
|
||||
* client that the data is available */
|
||||
if (opal_pmix_base_async_modex && opal_pmix_collect_all_data) {
|
||||
dmdx = OBJ_NEW(opal_pmix2x_dmx_trkr_t);
|
||||
dmdx->cbfunc = cbfunc;
|
||||
dmdx->cbdata = cbdata;
|
||||
opal_list_append(&mca_pmix_pmix2x_component.dmdx, &dmdx->super);
|
||||
return PMIX_SUCCESS;
|
||||
}
|
||||
|
||||
/* convert the array of pmix_info_t to the list of info */
|
||||
for (n=0; n < ninfo; n++) {
|
||||
iptr = OBJ_NEW(opal_value_t);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user