When direct launching applications, we must allow the MPI layer to progress during RTE-level barriers. Neither SLURM nor Cray provide non-blocking fence functions, so push those calls into a separate event thread (use the OPAL async thread for this purpose so we don't create another one) and let the MPI thread sping in wait_for_completion. This also restores the "lazy" completion during MPI_Finalize to minimize cpu utilization.
Update external as well Revise the change: we still need the MPI_Barrier in MPI_Finalize when we use a blocking fence, but do use the "lazy" wait for completion. Replace the direct logic in MPI_Init with a cleaner macro
Этот коммит содержится в:
родитель
7f65c2b18e
Коммит
01ba861f2a
@ -205,27 +205,27 @@ OMPI_DECLSPEC extern mca_base_framework_t ompi_rte_base_framework;
|
|||||||
* progress while waiting, so we loop over opal_progress, letting
|
* progress while waiting, so we loop over opal_progress, letting
|
||||||
* the RTE progress thread move the RTE along
|
* the RTE progress thread move the RTE along
|
||||||
*/
|
*/
|
||||||
#define OMPI_WAIT_FOR_COMPLETION(flg) \
|
#define OMPI_WAIT_FOR_COMPLETION(flg) \
|
||||||
do { \
|
do { \
|
||||||
opal_output_verbose(1, ompi_rte_base_framework.framework_output, \
|
opal_output_verbose(1, ompi_rte_base_framework.framework_output, \
|
||||||
"%s waiting on RTE event at %s:%d", \
|
"%s waiting on RTE event at %s:%d", \
|
||||||
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \
|
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \
|
||||||
__FILE__, __LINE__); \
|
__FILE__, __LINE__); \
|
||||||
while ((flg)) { \
|
while ((flg)) { \
|
||||||
opal_progress(); \
|
opal_progress(); \
|
||||||
} \
|
} \
|
||||||
}while(0);
|
}while(0);
|
||||||
|
|
||||||
#define OMPI_LAZY_WAIT_FOR_COMPLETION(flg) \
|
#define OMPI_LAZY_WAIT_FOR_COMPLETION(flg) \
|
||||||
do { \
|
do { \
|
||||||
opal_output_verbose(1, ompi_rte_base_framework.framework_output, \
|
opal_output_verbose(1, ompi_rte_base_framework.framework_output, \
|
||||||
"%s lazy waiting on RTE event at %s:%d", \
|
"%s lazy waiting on RTE event at %s:%d", \
|
||||||
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \
|
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \
|
||||||
__FILE__, __LINE__); \
|
__FILE__, __LINE__); \
|
||||||
while ((flg)) { \
|
while ((flg)) { \
|
||||||
opal_progress(); \
|
opal_progress(); \
|
||||||
usleep(100); \
|
usleep(100); \
|
||||||
} \
|
} \
|
||||||
}while(0);
|
}while(0);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
* Copyright (c) 2006 University of Houston. All rights reserved.
|
* Copyright (c) 2006 University of Houston. All rights reserved.
|
||||||
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
|
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
|
||||||
* Copyright (c) 2011 Sandia National Laboratories. All rights reserved.
|
* Copyright (c) 2011 Sandia National Laboratories. All rights reserved.
|
||||||
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
|
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
|
||||||
*
|
*
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
*
|
*
|
||||||
@ -248,19 +248,20 @@ int ompi_mpi_finalize(void)
|
|||||||
more details). */
|
more details). */
|
||||||
if (NULL != opal_pmix.fence_nb) {
|
if (NULL != opal_pmix.fence_nb) {
|
||||||
active = true;
|
active = true;
|
||||||
/* Note that the non-blocking PMIx fence will cycle calling
|
/* Note that use of the non-blocking PMIx fence will
|
||||||
opal_progress(), which will allow any other pending
|
* allow us to lazily cycle calling
|
||||||
communications/actions to complete. See
|
* opal_progress(), which will allow any other pending
|
||||||
https://github.com/open-mpi/ompi/issues/1576 for the
|
* communications/actions to complete. See
|
||||||
original bug report. */
|
* https://github.com/open-mpi/ompi/issues/1576 for the
|
||||||
|
* original bug report. */
|
||||||
opal_pmix.fence_nb(NULL, 0, fence_cbfunc, (void*)&active);
|
opal_pmix.fence_nb(NULL, 0, fence_cbfunc, (void*)&active);
|
||||||
OMPI_WAIT_FOR_COMPLETION(active);
|
OMPI_LAZY_WAIT_FOR_COMPLETION(active);
|
||||||
} else {
|
} else {
|
||||||
/* However, we cannot guarantee that the provided PMIx has
|
/* However, we cannot guarantee that the provided PMIx has
|
||||||
fence_nb. If it doesn't, then do the best we can: an MPI
|
* fence_nb. If it doesn't, then do the best we can: an MPI
|
||||||
barrier on COMM_WORLD (which isn't the best because of the
|
* barrier on COMM_WORLD (which isn't the best because of the
|
||||||
reasons cited above), followed by a blocking PMIx fence
|
* reasons cited above), followed by a blocking PMIx fence
|
||||||
(which may not necessarily call opal_progress()). */
|
* (which does not call opal_progress()). */
|
||||||
ompi_communicator_t *comm = &ompi_mpi_comm_world.comm;
|
ompi_communicator_t *comm = &ompi_mpi_comm_world.comm;
|
||||||
comm->c_coll.coll_barrier(comm, comm->c_coll.coll_barrier_module);
|
comm->c_coll.coll_barrier(comm, comm->c_coll.coll_barrier_module);
|
||||||
|
|
||||||
|
@ -362,6 +362,12 @@ static int ompi_register_mca_variables(void)
|
|||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void fence_release(int status, void *cbdata)
|
||||||
|
{
|
||||||
|
volatile bool *active = (volatile bool*)cbdata;
|
||||||
|
*active = false;
|
||||||
|
}
|
||||||
|
|
||||||
int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
@ -370,6 +376,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
|||||||
char *error = NULL;
|
char *error = NULL;
|
||||||
char *cmd=NULL, *av=NULL;
|
char *cmd=NULL, *av=NULL;
|
||||||
ompi_errhandler_errtrk_t errtrk;
|
ompi_errhandler_errtrk_t errtrk;
|
||||||
|
volatile bool active;
|
||||||
OPAL_TIMING_DECLARE(tm);
|
OPAL_TIMING_DECLARE(tm);
|
||||||
OPAL_TIMING_INIT_EXT(&tm, OPAL_TIMING_GET_TIME_OF_DAY);
|
OPAL_TIMING_INIT_EXT(&tm, OPAL_TIMING_GET_TIME_OF_DAY);
|
||||||
|
|
||||||
@ -634,13 +641,23 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
|||||||
* if data exchange is required. The modex occurs solely across procs
|
* if data exchange is required. The modex occurs solely across procs
|
||||||
* in our job. If a barrier is required, the "modex" function will
|
* in our job. If a barrier is required, the "modex" function will
|
||||||
* perform it internally */
|
* perform it internally */
|
||||||
OPAL_MODEX();
|
active = true;
|
||||||
|
opal_pmix.commit();
|
||||||
|
if (!opal_pmix_base_async_modex) {
|
||||||
|
if (NULL != opal_pmix.fence_nb) {
|
||||||
|
opal_pmix.fence_nb(NULL, opal_pmix_collect_all_data,
|
||||||
|
fence_release, (void*)&active);
|
||||||
|
OMPI_WAIT_FOR_COMPLETION(active);
|
||||||
|
} else {
|
||||||
|
opal_pmix.fence(NULL, opal_pmix_collect_all_data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier"));
|
OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier"));
|
||||||
|
|
||||||
/* select buffered send allocator component to be used */
|
/* select buffered send allocator component to be used */
|
||||||
if( OMPI_SUCCESS !=
|
if( OMPI_SUCCESS !=
|
||||||
(ret = mca_pml_base_bsend_init(ompi_mpi_thread_multiple))) {
|
(ret = mca_pml_base_bsend_init(ompi_mpi_thread_multiple))) {
|
||||||
error = "mca_pml_base_bsend_init() failed";
|
error = "mca_pml_base_bsend_init() failed";
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
@ -802,7 +819,15 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
|||||||
/* wait for everyone to reach this point - this is a hard
|
/* wait for everyone to reach this point - this is a hard
|
||||||
* barrier requirement at this time, though we hope to relax
|
* barrier requirement at this time, though we hope to relax
|
||||||
* it at a later point */
|
* it at a later point */
|
||||||
opal_pmix.fence(NULL, 0);
|
active = true;
|
||||||
|
opal_pmix.commit();
|
||||||
|
if (NULL != opal_pmix.fence_nb) {
|
||||||
|
opal_pmix.fence_nb(NULL, opal_pmix_collect_all_data,
|
||||||
|
fence_release, (void*)&active);
|
||||||
|
OMPI_WAIT_FOR_COMPLETION(active);
|
||||||
|
} else {
|
||||||
|
opal_pmix.fence(NULL, opal_pmix_collect_all_data);
|
||||||
|
}
|
||||||
|
|
||||||
/* check for timing request - get stop time and report elapsed
|
/* check for timing request - get stop time and report elapsed
|
||||||
time if so, then start the clock again */
|
time if so, then start the clock again */
|
||||||
@ -839,10 +864,9 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
|||||||
e.g. hierarch, might create subcommunicators. The threadlevel
|
e.g. hierarch, might create subcommunicators. The threadlevel
|
||||||
requested by all processes is required in order to know
|
requested by all processes is required in order to know
|
||||||
which cid allocation algorithm can be used. */
|
which cid allocation algorithm can be used. */
|
||||||
if ( OMPI_SUCCESS !=
|
if (OMPI_SUCCESS != ( ret = ompi_comm_cid_init ())) {
|
||||||
( ret = ompi_comm_cid_init ())) {
|
error = "ompi_mpi_init: ompi_comm_cid_init failed";
|
||||||
error = "ompi_mpi_init: ompi_comm_cid_init failed";
|
goto error;
|
||||||
goto error;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Init coll for the comms. This has to be after dpm_base_select,
|
/* Init coll for the comms. This has to be after dpm_base_select,
|
||||||
|
@ -57,7 +57,8 @@ static int cray_resolve_peers(const char *nodename,
|
|||||||
opal_list_t *procs);
|
opal_list_t *procs);
|
||||||
static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist);
|
static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist);
|
||||||
static int cray_put(opal_pmix_scope_t scope, opal_value_t *kv);
|
static int cray_put(opal_pmix_scope_t scope, opal_value_t *kv);
|
||||||
static int cray_fence(opal_list_t *procs, int collect_data);
|
static int cray_fencenb(opal_list_t *procs, int collect_data,
|
||||||
|
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
|
||||||
static int cray_commit(void);
|
static int cray_commit(void);
|
||||||
static int cray_get(const opal_process_name_t *id,
|
static int cray_get(const opal_process_name_t *id,
|
||||||
const char *key, opal_list_t *info,
|
const char *key, opal_list_t *info,
|
||||||
@ -90,8 +91,8 @@ const opal_pmix_base_module_t opal_pmix_cray_module = {
|
|||||||
.initialized = cray_initialized,
|
.initialized = cray_initialized,
|
||||||
.abort = cray_abort,
|
.abort = cray_abort,
|
||||||
.commit = cray_commit,
|
.commit = cray_commit,
|
||||||
.fence = cray_fence,
|
.fence = NULL,
|
||||||
.fence_nb = NULL,
|
.fence_nb = cray_fencenb,
|
||||||
.put = cray_put,
|
.put = cray_put,
|
||||||
.get = cray_get,
|
.get = cray_get,
|
||||||
.get_nb = cray_get_nb,
|
.get_nb = cray_get_nb,
|
||||||
@ -119,6 +120,17 @@ const opal_pmix_base_module_t opal_pmix_cray_module = {
|
|||||||
// usage accounting
|
// usage accounting
|
||||||
static int pmix_init_count = 0;
|
static int pmix_init_count = 0;
|
||||||
|
|
||||||
|
// local object
|
||||||
|
typedef struct {
|
||||||
|
opal_object_t super;
|
||||||
|
opal_event_t ev;
|
||||||
|
opal_pmix_op_cbfunc_t opcbfunc;
|
||||||
|
void *cbdata;
|
||||||
|
} pmi_opcaddy_t;
|
||||||
|
OBJ_CLASS_INSTANCE(pmi_opcaddy_t,
|
||||||
|
opal_object_t,
|
||||||
|
NULL, NULL);
|
||||||
|
|
||||||
// PMI constant values:
|
// PMI constant values:
|
||||||
static int pmix_kvslen_max = 0;
|
static int pmix_kvslen_max = 0;
|
||||||
static int pmix_keylen_max = 0;
|
static int pmix_keylen_max = 0;
|
||||||
@ -512,8 +524,9 @@ static int cray_commit(void)
|
|||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int cray_fence(opal_list_t *procs, int collect_data)
|
static void fencenb(int sd, short args, void *cbdata)
|
||||||
{
|
{
|
||||||
|
pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata;
|
||||||
int rc, cnt;
|
int rc, cnt;
|
||||||
int32_t i;
|
int32_t i;
|
||||||
int *all_lens = NULL;
|
int *all_lens = NULL;
|
||||||
@ -550,7 +563,8 @@ static int cray_fence(opal_list_t *procs, int collect_data)
|
|||||||
|
|
||||||
send_buffer = OBJ_NEW(opal_buffer_t);
|
send_buffer = OBJ_NEW(opal_buffer_t);
|
||||||
if (NULL == send_buffer) {
|
if (NULL == send_buffer) {
|
||||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
rc = OPAL_ERR_OUT_OF_RESOURCE;
|
||||||
|
goto fn_exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
opal_dss.copy_payload(send_buffer, mca_pmix_cray_component.cache_global);
|
opal_dss.copy_payload(send_buffer, mca_pmix_cray_component.cache_global);
|
||||||
@ -668,7 +682,7 @@ static int cray_fence(opal_list_t *procs, int collect_data)
|
|||||||
* for every process in the job.
|
* for every process in the job.
|
||||||
*
|
*
|
||||||
* we only need to set locality for each local rank as "not found"
|
* we only need to set locality for each local rank as "not found"
|
||||||
* equates to "non-local"
|
* equates to "non-local"
|
||||||
*/
|
*/
|
||||||
|
|
||||||
for (i=0; i < pmix_nlranks; i++) {
|
for (i=0; i < pmix_nlranks; i++) {
|
||||||
@ -732,7 +746,27 @@ fn_exit:
|
|||||||
if (r_bytes_and_ranks != NULL) {
|
if (r_bytes_and_ranks != NULL) {
|
||||||
free(r_bytes_and_ranks);
|
free(r_bytes_and_ranks);
|
||||||
}
|
}
|
||||||
return rc;
|
if (NULL != op->opcbfunc) {
|
||||||
|
op->opcbfunc(rc, op->cbdata);
|
||||||
|
}
|
||||||
|
OBJ_RELEASE(op);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int cray_fencenb(opal_list_t *procs, int collect_data,
|
||||||
|
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||||
|
{
|
||||||
|
pmi_opcaddy_t *op;
|
||||||
|
|
||||||
|
/* thread-shift this so we don't block in Cray's barrier */
|
||||||
|
op = OBJ_NEW(pmi_opcaddy_t);
|
||||||
|
op->opcbfunc = cbfunc;
|
||||||
|
op->cbdata = cbdata;
|
||||||
|
event_assign(&op->ev, opal_pmix_base.evbase, -1,
|
||||||
|
EV_WRITE, fencenb, op);
|
||||||
|
event_active(&op->ev, EV_WRITE, 1);
|
||||||
|
|
||||||
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int cray_get(const opal_process_name_t *id, const char *key, opal_list_t *info, opal_value_t **kv)
|
static int cray_get(const opal_process_name_t *id, const char *key, opal_list_t *info, opal_value_t **kv)
|
||||||
|
2
opal/mca/pmix/external/pmix_ext_client.c
поставляемый
2
opal/mca/pmix/external/pmix_ext_client.c
поставляемый
@ -369,6 +369,8 @@ int pmix1_fencenb(opal_list_t *procs, int collect_data,
|
|||||||
if (collect_data) {
|
if (collect_data) {
|
||||||
PMIX_INFO_CONSTRUCT(&info);
|
PMIX_INFO_CONSTRUCT(&info);
|
||||||
(void)strncpy(info.key, PMIX_COLLECT_DATA, PMIX_MAX_KEYLEN);
|
(void)strncpy(info.key, PMIX_COLLECT_DATA, PMIX_MAX_KEYLEN);
|
||||||
|
info.value.type = PMIX_BOOL;
|
||||||
|
info.value.data.flag = true;
|
||||||
iptr = &info;
|
iptr = &info;
|
||||||
n = 1;
|
n = 1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -250,21 +250,6 @@ extern int opal_pmix_base_exchange(opal_value_t *info,
|
|||||||
} \
|
} \
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Provide a simplified macro for calling the fence function
|
|
||||||
* that takes into account directives and availability of
|
|
||||||
* non-blocking operations
|
|
||||||
*/
|
|
||||||
#define OPAL_MODEX() \
|
|
||||||
do { \
|
|
||||||
opal_pmix.commit(); \
|
|
||||||
if (!opal_pmix_base_async_modex) { \
|
|
||||||
opal_pmix.fence(NULL, \
|
|
||||||
opal_pmix_collect_all_data); \
|
|
||||||
} \
|
|
||||||
} while(0);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provide a macro for accessing a base function that exchanges
|
* Provide a macro for accessing a base function that exchanges
|
||||||
* data values between two procs using the PMIx Publish/Lookup
|
* data values between two procs using the PMIx Publish/Lookup
|
||||||
|
@ -364,6 +364,8 @@ int pmix1_fencenb(opal_list_t *procs, int collect_data,
|
|||||||
if (collect_data) {
|
if (collect_data) {
|
||||||
PMIX_INFO_CONSTRUCT(&info);
|
PMIX_INFO_CONSTRUCT(&info);
|
||||||
(void)strncpy(info.key, PMIX_COLLECT_DATA, PMIX_MAX_KEYLEN);
|
(void)strncpy(info.key, PMIX_COLLECT_DATA, PMIX_MAX_KEYLEN);
|
||||||
|
info.value.type = PMIX_BOOL;
|
||||||
|
info.value.data.flag = true;
|
||||||
iptr = &info;
|
iptr = &info;
|
||||||
n = 1;
|
n = 1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -36,7 +36,8 @@ static int s1_initialized(void);
|
|||||||
static int s1_abort(int flag, const char msg[],
|
static int s1_abort(int flag, const char msg[],
|
||||||
opal_list_t *procs);
|
opal_list_t *procs);
|
||||||
static int s1_commit(void);
|
static int s1_commit(void);
|
||||||
static int s1_fence(opal_list_t *procs, int collect_data);
|
static int s1_fencenb(opal_list_t *procs, int collect_data,
|
||||||
|
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
|
||||||
static int s1_put(opal_pmix_scope_t scope,
|
static int s1_put(opal_pmix_scope_t scope,
|
||||||
opal_value_t *kv);
|
opal_value_t *kv);
|
||||||
static int s1_get(const opal_process_name_t *id,
|
static int s1_get(const opal_process_name_t *id,
|
||||||
@ -59,7 +60,7 @@ const opal_pmix_base_module_t opal_pmix_s1_module = {
|
|||||||
.initialized = s1_initialized,
|
.initialized = s1_initialized,
|
||||||
.abort = s1_abort,
|
.abort = s1_abort,
|
||||||
.commit = s1_commit,
|
.commit = s1_commit,
|
||||||
.fence = s1_fence,
|
.fence_nb = s1_fencenb,
|
||||||
.put = s1_put,
|
.put = s1_put,
|
||||||
.get = s1_get,
|
.get = s1_get,
|
||||||
.publish = s1_publish,
|
.publish = s1_publish,
|
||||||
@ -78,6 +79,17 @@ const opal_pmix_base_module_t opal_pmix_s1_module = {
|
|||||||
// usage accounting
|
// usage accounting
|
||||||
static int pmix_init_count = 0;
|
static int pmix_init_count = 0;
|
||||||
|
|
||||||
|
// local object
|
||||||
|
typedef struct {
|
||||||
|
opal_object_t super;
|
||||||
|
opal_event_t ev;
|
||||||
|
opal_pmix_op_cbfunc_t opcbfunc;
|
||||||
|
void *cbdata;
|
||||||
|
} pmi_opcaddy_t;
|
||||||
|
OBJ_CLASS_INSTANCE(pmi_opcaddy_t,
|
||||||
|
opal_object_t,
|
||||||
|
NULL, NULL);
|
||||||
|
|
||||||
// PMI constant values:
|
// PMI constant values:
|
||||||
static int pmix_kvslen_max = 0;
|
static int pmix_kvslen_max = 0;
|
||||||
static int pmix_keylen_max = 0;
|
static int pmix_keylen_max = 0;
|
||||||
@ -512,8 +524,9 @@ static int s1_commit(void)
|
|||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int s1_fence(opal_list_t *procs, int collect_data)
|
static void fencenb(int sd, short args, void *cbdata)
|
||||||
{
|
{
|
||||||
|
pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata;
|
||||||
int rc;
|
int rc;
|
||||||
int32_t i;
|
int32_t i;
|
||||||
opal_value_t *kp, kvn;
|
opal_value_t *kp, kvn;
|
||||||
@ -527,7 +540,8 @@ static int s1_fence(opal_list_t *procs, int collect_data)
|
|||||||
/* use the PMI barrier function */
|
/* use the PMI barrier function */
|
||||||
if (PMI_SUCCESS != (rc = PMI_Barrier())) {
|
if (PMI_SUCCESS != (rc = PMI_Barrier())) {
|
||||||
OPAL_PMI_ERROR(rc, "PMI_Barrier");
|
OPAL_PMI_ERROR(rc, "PMI_Barrier");
|
||||||
return OPAL_ERROR;
|
rc = OPAL_ERROR;
|
||||||
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
||||||
@ -548,7 +562,7 @@ static int s1_fence(opal_list_t *procs, int collect_data)
|
|||||||
&kp, pmix_kvs_name, pmix_vallen_max, kvs_get);
|
&kp, pmix_kvs_name, pmix_vallen_max, kvs_get);
|
||||||
if (OPAL_SUCCESS != rc) {
|
if (OPAL_SUCCESS != rc) {
|
||||||
OPAL_ERROR_LOG(rc);
|
OPAL_ERROR_LOG(rc);
|
||||||
return rc;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
if (NULL == kp || NULL == kp->data.string) {
|
if (NULL == kp || NULL == kp->data.string) {
|
||||||
/* if we share a node, but we don't know anything more, then
|
/* if we share a node, but we don't know anything more, then
|
||||||
@ -579,6 +593,27 @@ static int s1_fence(opal_list_t *procs, int collect_data)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
if (NULL != op->opcbfunc) {
|
||||||
|
op->opcbfunc(rc, op->cbdata);
|
||||||
|
}
|
||||||
|
OBJ_RELEASE(op);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int s1_fencenb(opal_list_t *procs, int collect_data,
|
||||||
|
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||||
|
{
|
||||||
|
pmi_opcaddy_t *op;
|
||||||
|
|
||||||
|
/* thread-shift this so we don't block in SLURM's barrier */
|
||||||
|
op = OBJ_NEW(pmi_opcaddy_t);
|
||||||
|
op->opcbfunc = cbfunc;
|
||||||
|
op->cbdata = cbdata;
|
||||||
|
event_assign(&op->ev, opal_pmix_base.evbase, -1,
|
||||||
|
EV_WRITE, fencenb, op);
|
||||||
|
event_active(&op->ev, EV_WRITE, 1);
|
||||||
|
|
||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +43,8 @@ static int s2_initialized(void);
|
|||||||
static int s2_abort(int flag, const char msg[],
|
static int s2_abort(int flag, const char msg[],
|
||||||
opal_list_t *procs);
|
opal_list_t *procs);
|
||||||
static int s2_commit(void);
|
static int s2_commit(void);
|
||||||
static int s2_fence(opal_list_t *procs, int collect_data);
|
static int s2_fencenb(opal_list_t *procs, int collect_data,
|
||||||
|
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
|
||||||
static int s2_put(opal_pmix_scope_t scope,
|
static int s2_put(opal_pmix_scope_t scope,
|
||||||
opal_value_t *kv);
|
opal_value_t *kv);
|
||||||
static int s2_get(const opal_process_name_t *id,
|
static int s2_get(const opal_process_name_t *id,
|
||||||
@ -66,7 +67,7 @@ const opal_pmix_base_module_t opal_pmix_s2_module = {
|
|||||||
.initialized = s2_initialized,
|
.initialized = s2_initialized,
|
||||||
.abort = s2_abort,
|
.abort = s2_abort,
|
||||||
.commit = s2_commit,
|
.commit = s2_commit,
|
||||||
.fence = s2_fence,
|
.fence_nb = s2_fencenb,
|
||||||
.put = s2_put,
|
.put = s2_put,
|
||||||
.get = s2_get,
|
.get = s2_get,
|
||||||
.publish = s2_publish,
|
.publish = s2_publish,
|
||||||
@ -85,6 +86,17 @@ const opal_pmix_base_module_t opal_pmix_s2_module = {
|
|||||||
// usage accounting
|
// usage accounting
|
||||||
static int pmix_init_count = 0;
|
static int pmix_init_count = 0;
|
||||||
|
|
||||||
|
// local object
|
||||||
|
typedef struct {
|
||||||
|
opal_object_t super;
|
||||||
|
opal_event_t ev;
|
||||||
|
opal_pmix_op_cbfunc_t opcbfunc;
|
||||||
|
void *cbdata;
|
||||||
|
} pmi_opcaddy_t;
|
||||||
|
OBJ_CLASS_INSTANCE(pmi_opcaddy_t,
|
||||||
|
opal_object_t,
|
||||||
|
NULL, NULL);
|
||||||
|
|
||||||
// PMI constant values:
|
// PMI constant values:
|
||||||
static int pmix_kvslen_max = 0;
|
static int pmix_kvslen_max = 0;
|
||||||
static int pmix_keylen_max = 0;
|
static int pmix_keylen_max = 0;
|
||||||
@ -530,8 +542,9 @@ static int s2_commit(void)
|
|||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int s2_fence(opal_list_t *procs, int collect_data)
|
static void fencenb(int sd, short args, void *cbdata)
|
||||||
{
|
{
|
||||||
|
pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata;
|
||||||
int rc;
|
int rc;
|
||||||
int32_t i;
|
int32_t i;
|
||||||
opal_value_t *kp, kvn;
|
opal_value_t *kp, kvn;
|
||||||
@ -549,7 +562,8 @@ static int s2_fence(opal_list_t *procs, int collect_data)
|
|||||||
|
|
||||||
/* now call fence */
|
/* now call fence */
|
||||||
if (PMI2_SUCCESS != PMI2_KVS_Fence()) {
|
if (PMI2_SUCCESS != PMI2_KVS_Fence()) {
|
||||||
return OPAL_ERROR;
|
rc = OPAL_ERROR;
|
||||||
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* get the modex data from each local process and set the
|
/* get the modex data from each local process and set the
|
||||||
@ -566,7 +580,7 @@ static int s2_fence(opal_list_t *procs, int collect_data)
|
|||||||
&kp, pmix_kvs_name, pmix_vallen_max, kvs_get);
|
&kp, pmix_kvs_name, pmix_vallen_max, kvs_get);
|
||||||
if (OPAL_SUCCESS != rc) {
|
if (OPAL_SUCCESS != rc) {
|
||||||
OPAL_ERROR_LOG(rc);
|
OPAL_ERROR_LOG(rc);
|
||||||
return rc;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
if (NULL == kp || NULL == kp->data.string) {
|
if (NULL == kp || NULL == kp->data.string) {
|
||||||
/* if we share a node, but we don't know anything more, then
|
/* if we share a node, but we don't know anything more, then
|
||||||
@ -597,6 +611,27 @@ static int s2_fence(opal_list_t *procs, int collect_data)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
if (NULL != op->opcbfunc) {
|
||||||
|
op->opcbfunc(rc, op->cbdata);
|
||||||
|
}
|
||||||
|
OBJ_RELEASE(op);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int s2_fencenb(opal_list_t *procs, int collect_data,
|
||||||
|
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||||
|
{
|
||||||
|
pmi_opcaddy_t *op;
|
||||||
|
|
||||||
|
/* thread-shift this so we don't block in SLURM's barrier */
|
||||||
|
op = OBJ_NEW(pmi_opcaddy_t);
|
||||||
|
op->opcbfunc = cbfunc;
|
||||||
|
op->cbdata = cbdata;
|
||||||
|
event_assign(&op->ev, opal_pmix_base.evbase, -1,
|
||||||
|
EV_WRITE, fencenb, op);
|
||||||
|
event_active(&op->ev, EV_WRITE, 1);
|
||||||
|
|
||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user