From 01ba861f2ab96c0562464d16cb1b28c40d408a94 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 12 May 2016 12:38:01 -0700 Subject: [PATCH] When direct launching applications, we must allow the MPI layer to progress during RTE-level barriers. Neither SLURM nor Cray provide non-blocking fence functions, so push those calls into a separate event thread (use the OPAL async thread for this purpose so we don't create another one) and let the MPI thread sping in wait_for_completion. This also restores the "lazy" completion during MPI_Finalize to minimize cpu utilization. Update external as well Revise the change: we still need the MPI_Barrier in MPI_Finalize when we use a blocking fence, but do use the "lazy" wait for completion. Replace the direct logic in MPI_Init with a cleaner macro --- ompi/mca/rte/rte.h | 38 +++++++++---------- ompi/runtime/ompi_mpi_finalize.c | 23 ++++++------ ompi/runtime/ompi_mpi_init.c | 38 +++++++++++++++---- opal/mca/pmix/cray/pmix_cray.c | 48 ++++++++++++++++++++---- opal/mca/pmix/external/pmix_ext_client.c | 2 + opal/mca/pmix/pmix.h | 15 -------- opal/mca/pmix/pmix114/pmix1_client.c | 2 + opal/mca/pmix/s1/pmix_s1.c | 45 +++++++++++++++++++--- opal/mca/pmix/s2/pmix_s2.c | 45 +++++++++++++++++++--- 9 files changed, 187 insertions(+), 69 deletions(-) diff --git a/ompi/mca/rte/rte.h b/ompi/mca/rte/rte.h index 6929f95734..2d66ac4f2c 100644 --- a/ompi/mca/rte/rte.h +++ b/ompi/mca/rte/rte.h @@ -205,27 +205,27 @@ OMPI_DECLSPEC extern mca_base_framework_t ompi_rte_base_framework; * progress while waiting, so we loop over opal_progress, letting * the RTE progress thread move the RTE along */ -#define OMPI_WAIT_FOR_COMPLETION(flg) \ - do { \ - opal_output_verbose(1, ompi_rte_base_framework.framework_output, \ - "%s waiting on RTE event at %s:%d", \ - OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \ - __FILE__, __LINE__); \ - while ((flg)) { \ - opal_progress(); \ - } \ +#define OMPI_WAIT_FOR_COMPLETION(flg) \ + do { \ + opal_output_verbose(1, ompi_rte_base_framework.framework_output, \ + "%s waiting on RTE event at %s:%d", \ + OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \ + __FILE__, __LINE__); \ + while ((flg)) { \ + opal_progress(); \ + } \ }while(0); -#define OMPI_LAZY_WAIT_FOR_COMPLETION(flg) \ - do { \ - opal_output_verbose(1, ompi_rte_base_framework.framework_output, \ - "%s lazy waiting on RTE event at %s:%d", \ - OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \ - __FILE__, __LINE__); \ - while ((flg)) { \ - opal_progress(); \ - usleep(100); \ - } \ +#define OMPI_LAZY_WAIT_FOR_COMPLETION(flg) \ + do { \ + opal_output_verbose(1, ompi_rte_base_framework.framework_output, \ + "%s lazy waiting on RTE event at %s:%d", \ + OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), \ + __FILE__, __LINE__); \ + while ((flg)) { \ + opal_progress(); \ + usleep(100); \ + } \ }while(0); typedef struct { diff --git a/ompi/runtime/ompi_mpi_finalize.c b/ompi/runtime/ompi_mpi_finalize.c index c5870dbf36..d75f67be38 100644 --- a/ompi/runtime/ompi_mpi_finalize.c +++ b/ompi/runtime/ompi_mpi_finalize.c @@ -16,7 +16,7 @@ * Copyright (c) 2006 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2011 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * * $COPYRIGHT$ * @@ -248,19 +248,20 @@ int ompi_mpi_finalize(void) more details). */ if (NULL != opal_pmix.fence_nb) { active = true; - /* Note that the non-blocking PMIx fence will cycle calling - opal_progress(), which will allow any other pending - communications/actions to complete. See - https://github.com/open-mpi/ompi/issues/1576 for the - original bug report. */ + /* Note that use of the non-blocking PMIx fence will + * allow us to lazily cycle calling + * opal_progress(), which will allow any other pending + * communications/actions to complete. See + * https://github.com/open-mpi/ompi/issues/1576 for the + * original bug report. */ opal_pmix.fence_nb(NULL, 0, fence_cbfunc, (void*)&active); - OMPI_WAIT_FOR_COMPLETION(active); + OMPI_LAZY_WAIT_FOR_COMPLETION(active); } else { /* However, we cannot guarantee that the provided PMIx has - fence_nb. If it doesn't, then do the best we can: an MPI - barrier on COMM_WORLD (which isn't the best because of the - reasons cited above), followed by a blocking PMIx fence - (which may not necessarily call opal_progress()). */ + * fence_nb. If it doesn't, then do the best we can: an MPI + * barrier on COMM_WORLD (which isn't the best because of the + * reasons cited above), followed by a blocking PMIx fence + * (which does not call opal_progress()). */ ompi_communicator_t *comm = &ompi_mpi_comm_world.comm; comm->c_coll.coll_barrier(comm, comm->c_coll.coll_barrier_module); diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c index 09c1fc138d..7ddf21484e 100644 --- a/ompi/runtime/ompi_mpi_init.c +++ b/ompi/runtime/ompi_mpi_init.c @@ -362,6 +362,12 @@ static int ompi_register_mca_variables(void) return OMPI_SUCCESS; } +static void fence_release(int status, void *cbdata) +{ + volatile bool *active = (volatile bool*)cbdata; + *active = false; +} + int ompi_mpi_init(int argc, char **argv, int requested, int *provided) { int ret; @@ -370,6 +376,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) char *error = NULL; char *cmd=NULL, *av=NULL; ompi_errhandler_errtrk_t errtrk; + volatile bool active; OPAL_TIMING_DECLARE(tm); OPAL_TIMING_INIT_EXT(&tm, OPAL_TIMING_GET_TIME_OF_DAY); @@ -634,13 +641,23 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) * if data exchange is required. The modex occurs solely across procs * in our job. If a barrier is required, the "modex" function will * perform it internally */ - OPAL_MODEX(); + active = true; + opal_pmix.commit(); + if (!opal_pmix_base_async_modex) { + if (NULL != opal_pmix.fence_nb) { + opal_pmix.fence_nb(NULL, opal_pmix_collect_all_data, + fence_release, (void*)&active); + OMPI_WAIT_FOR_COMPLETION(active); + } else { + opal_pmix.fence(NULL, opal_pmix_collect_all_data); + } + } OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier")); /* select buffered send allocator component to be used */ if( OMPI_SUCCESS != - (ret = mca_pml_base_bsend_init(ompi_mpi_thread_multiple))) { + (ret = mca_pml_base_bsend_init(ompi_mpi_thread_multiple))) { error = "mca_pml_base_bsend_init() failed"; goto error; } @@ -802,7 +819,15 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) /* wait for everyone to reach this point - this is a hard * barrier requirement at this time, though we hope to relax * it at a later point */ - opal_pmix.fence(NULL, 0); + active = true; + opal_pmix.commit(); + if (NULL != opal_pmix.fence_nb) { + opal_pmix.fence_nb(NULL, opal_pmix_collect_all_data, + fence_release, (void*)&active); + OMPI_WAIT_FOR_COMPLETION(active); + } else { + opal_pmix.fence(NULL, opal_pmix_collect_all_data); + } /* check for timing request - get stop time and report elapsed time if so, then start the clock again */ @@ -839,10 +864,9 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) e.g. hierarch, might create subcommunicators. The threadlevel requested by all processes is required in order to know which cid allocation algorithm can be used. */ - if ( OMPI_SUCCESS != - ( ret = ompi_comm_cid_init ())) { - error = "ompi_mpi_init: ompi_comm_cid_init failed"; - goto error; + if (OMPI_SUCCESS != ( ret = ompi_comm_cid_init ())) { + error = "ompi_mpi_init: ompi_comm_cid_init failed"; + goto error; } /* Init coll for the comms. This has to be after dpm_base_select, diff --git a/opal/mca/pmix/cray/pmix_cray.c b/opal/mca/pmix/cray/pmix_cray.c index 9af1c19303..448af72cae 100644 --- a/opal/mca/pmix/cray/pmix_cray.c +++ b/opal/mca/pmix/cray/pmix_cray.c @@ -57,7 +57,8 @@ static int cray_resolve_peers(const char *nodename, opal_list_t *procs); static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist); static int cray_put(opal_pmix_scope_t scope, opal_value_t *kv); -static int cray_fence(opal_list_t *procs, int collect_data); +static int cray_fencenb(opal_list_t *procs, int collect_data, + opal_pmix_op_cbfunc_t cbfunc, void *cbdata); static int cray_commit(void); static int cray_get(const opal_process_name_t *id, const char *key, opal_list_t *info, @@ -90,8 +91,8 @@ const opal_pmix_base_module_t opal_pmix_cray_module = { .initialized = cray_initialized, .abort = cray_abort, .commit = cray_commit, - .fence = cray_fence, - .fence_nb = NULL, + .fence = NULL, + .fence_nb = cray_fencenb, .put = cray_put, .get = cray_get, .get_nb = cray_get_nb, @@ -119,6 +120,17 @@ const opal_pmix_base_module_t opal_pmix_cray_module = { // usage accounting static int pmix_init_count = 0; +// local object +typedef struct { + opal_object_t super; + opal_event_t ev; + opal_pmix_op_cbfunc_t opcbfunc; + void *cbdata; +} pmi_opcaddy_t; +OBJ_CLASS_INSTANCE(pmi_opcaddy_t, + opal_object_t, + NULL, NULL); + // PMI constant values: static int pmix_kvslen_max = 0; static int pmix_keylen_max = 0; @@ -512,8 +524,9 @@ static int cray_commit(void) return OPAL_SUCCESS; } -static int cray_fence(opal_list_t *procs, int collect_data) +static void fencenb(int sd, short args, void *cbdata) { + pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata; int rc, cnt; int32_t i; int *all_lens = NULL; @@ -550,7 +563,8 @@ static int cray_fence(opal_list_t *procs, int collect_data) send_buffer = OBJ_NEW(opal_buffer_t); if (NULL == send_buffer) { - return OPAL_ERR_OUT_OF_RESOURCE; + rc = OPAL_ERR_OUT_OF_RESOURCE; + goto fn_exit; } opal_dss.copy_payload(send_buffer, mca_pmix_cray_component.cache_global); @@ -668,7 +682,7 @@ static int cray_fence(opal_list_t *procs, int collect_data) * for every process in the job. * * we only need to set locality for each local rank as "not found" - * equates to "non-local" + * equates to "non-local" */ for (i=0; i < pmix_nlranks; i++) { @@ -732,7 +746,27 @@ fn_exit: if (r_bytes_and_ranks != NULL) { free(r_bytes_and_ranks); } - return rc; + if (NULL != op->opcbfunc) { + op->opcbfunc(rc, op->cbdata); + } + OBJ_RELEASE(op); + return; +} + +static int cray_fencenb(opal_list_t *procs, int collect_data, + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) +{ + pmi_opcaddy_t *op; + + /* thread-shift this so we don't block in Cray's barrier */ + op = OBJ_NEW(pmi_opcaddy_t); + op->opcbfunc = cbfunc; + op->cbdata = cbdata; + event_assign(&op->ev, opal_pmix_base.evbase, -1, + EV_WRITE, fencenb, op); + event_active(&op->ev, EV_WRITE, 1); + + return OPAL_SUCCESS; } static int cray_get(const opal_process_name_t *id, const char *key, opal_list_t *info, opal_value_t **kv) diff --git a/opal/mca/pmix/external/pmix_ext_client.c b/opal/mca/pmix/external/pmix_ext_client.c index 80a5b814f6..c47e13ff35 100644 --- a/opal/mca/pmix/external/pmix_ext_client.c +++ b/opal/mca/pmix/external/pmix_ext_client.c @@ -369,6 +369,8 @@ int pmix1_fencenb(opal_list_t *procs, int collect_data, if (collect_data) { PMIX_INFO_CONSTRUCT(&info); (void)strncpy(info.key, PMIX_COLLECT_DATA, PMIX_MAX_KEYLEN); + info.value.type = PMIX_BOOL; + info.value.data.flag = true; iptr = &info; n = 1; } else { diff --git a/opal/mca/pmix/pmix.h b/opal/mca/pmix/pmix.h index faf65c6578..b9cf9cbdcb 100644 --- a/opal/mca/pmix/pmix.h +++ b/opal/mca/pmix/pmix.h @@ -250,21 +250,6 @@ extern int opal_pmix_base_exchange(opal_value_t *info, } \ } while(0); - -/** - * Provide a simplified macro for calling the fence function - * that takes into account directives and availability of - * non-blocking operations - */ -#define OPAL_MODEX() \ - do { \ - opal_pmix.commit(); \ - if (!opal_pmix_base_async_modex) { \ - opal_pmix.fence(NULL, \ - opal_pmix_collect_all_data); \ - } \ - } while(0); - /** * Provide a macro for accessing a base function that exchanges * data values between two procs using the PMIx Publish/Lookup diff --git a/opal/mca/pmix/pmix114/pmix1_client.c b/opal/mca/pmix/pmix114/pmix1_client.c index 8175560afb..e09ba66914 100644 --- a/opal/mca/pmix/pmix114/pmix1_client.c +++ b/opal/mca/pmix/pmix114/pmix1_client.c @@ -364,6 +364,8 @@ int pmix1_fencenb(opal_list_t *procs, int collect_data, if (collect_data) { PMIX_INFO_CONSTRUCT(&info); (void)strncpy(info.key, PMIX_COLLECT_DATA, PMIX_MAX_KEYLEN); + info.value.type = PMIX_BOOL; + info.value.data.flag = true; iptr = &info; n = 1; } else { diff --git a/opal/mca/pmix/s1/pmix_s1.c b/opal/mca/pmix/s1/pmix_s1.c index 6656847cad..cf1a5a2c08 100644 --- a/opal/mca/pmix/s1/pmix_s1.c +++ b/opal/mca/pmix/s1/pmix_s1.c @@ -36,7 +36,8 @@ static int s1_initialized(void); static int s1_abort(int flag, const char msg[], opal_list_t *procs); static int s1_commit(void); -static int s1_fence(opal_list_t *procs, int collect_data); +static int s1_fencenb(opal_list_t *procs, int collect_data, + opal_pmix_op_cbfunc_t cbfunc, void *cbdata); static int s1_put(opal_pmix_scope_t scope, opal_value_t *kv); static int s1_get(const opal_process_name_t *id, @@ -59,7 +60,7 @@ const opal_pmix_base_module_t opal_pmix_s1_module = { .initialized = s1_initialized, .abort = s1_abort, .commit = s1_commit, - .fence = s1_fence, + .fence_nb = s1_fencenb, .put = s1_put, .get = s1_get, .publish = s1_publish, @@ -78,6 +79,17 @@ const opal_pmix_base_module_t opal_pmix_s1_module = { // usage accounting static int pmix_init_count = 0; +// local object +typedef struct { + opal_object_t super; + opal_event_t ev; + opal_pmix_op_cbfunc_t opcbfunc; + void *cbdata; +} pmi_opcaddy_t; +OBJ_CLASS_INSTANCE(pmi_opcaddy_t, + opal_object_t, + NULL, NULL); + // PMI constant values: static int pmix_kvslen_max = 0; static int pmix_keylen_max = 0; @@ -512,8 +524,9 @@ static int s1_commit(void) return OPAL_SUCCESS; } -static int s1_fence(opal_list_t *procs, int collect_data) +static void fencenb(int sd, short args, void *cbdata) { + pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata; int rc; int32_t i; opal_value_t *kp, kvn; @@ -527,7 +540,8 @@ static int s1_fence(opal_list_t *procs, int collect_data) /* use the PMI barrier function */ if (PMI_SUCCESS != (rc = PMI_Barrier())) { OPAL_PMI_ERROR(rc, "PMI_Barrier"); - return OPAL_ERROR; + rc = OPAL_ERROR; + goto cleanup; } opal_output_verbose(2, opal_pmix_base_framework.framework_output, @@ -548,7 +562,7 @@ static int s1_fence(opal_list_t *procs, int collect_data) &kp, pmix_kvs_name, pmix_vallen_max, kvs_get); if (OPAL_SUCCESS != rc) { OPAL_ERROR_LOG(rc); - return rc; + goto cleanup; } if (NULL == kp || NULL == kp->data.string) { /* if we share a node, but we don't know anything more, then @@ -579,6 +593,27 @@ static int s1_fence(opal_list_t *procs, int collect_data) } } + cleanup: + if (NULL != op->opcbfunc) { + op->opcbfunc(rc, op->cbdata); + } + OBJ_RELEASE(op); + return; +} + +static int s1_fencenb(opal_list_t *procs, int collect_data, + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) +{ + pmi_opcaddy_t *op; + + /* thread-shift this so we don't block in SLURM's barrier */ + op = OBJ_NEW(pmi_opcaddy_t); + op->opcbfunc = cbfunc; + op->cbdata = cbdata; + event_assign(&op->ev, opal_pmix_base.evbase, -1, + EV_WRITE, fencenb, op); + event_active(&op->ev, EV_WRITE, 1); + return OPAL_SUCCESS; } diff --git a/opal/mca/pmix/s2/pmix_s2.c b/opal/mca/pmix/s2/pmix_s2.c index ed45fe4ad3..7994907969 100644 --- a/opal/mca/pmix/s2/pmix_s2.c +++ b/opal/mca/pmix/s2/pmix_s2.c @@ -43,7 +43,8 @@ static int s2_initialized(void); static int s2_abort(int flag, const char msg[], opal_list_t *procs); static int s2_commit(void); -static int s2_fence(opal_list_t *procs, int collect_data); +static int s2_fencenb(opal_list_t *procs, int collect_data, + opal_pmix_op_cbfunc_t cbfunc, void *cbdata); static int s2_put(opal_pmix_scope_t scope, opal_value_t *kv); static int s2_get(const opal_process_name_t *id, @@ -66,7 +67,7 @@ const opal_pmix_base_module_t opal_pmix_s2_module = { .initialized = s2_initialized, .abort = s2_abort, .commit = s2_commit, - .fence = s2_fence, + .fence_nb = s2_fencenb, .put = s2_put, .get = s2_get, .publish = s2_publish, @@ -85,6 +86,17 @@ const opal_pmix_base_module_t opal_pmix_s2_module = { // usage accounting static int pmix_init_count = 0; +// local object +typedef struct { + opal_object_t super; + opal_event_t ev; + opal_pmix_op_cbfunc_t opcbfunc; + void *cbdata; +} pmi_opcaddy_t; +OBJ_CLASS_INSTANCE(pmi_opcaddy_t, + opal_object_t, + NULL, NULL); + // PMI constant values: static int pmix_kvslen_max = 0; static int pmix_keylen_max = 0; @@ -530,8 +542,9 @@ static int s2_commit(void) return OPAL_SUCCESS; } -static int s2_fence(opal_list_t *procs, int collect_data) +static void fencenb(int sd, short args, void *cbdata) { + pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata; int rc; int32_t i; opal_value_t *kp, kvn; @@ -549,7 +562,8 @@ static int s2_fence(opal_list_t *procs, int collect_data) /* now call fence */ if (PMI2_SUCCESS != PMI2_KVS_Fence()) { - return OPAL_ERROR; + rc = OPAL_ERROR; + goto cleanup; } /* get the modex data from each local process and set the @@ -566,7 +580,7 @@ static int s2_fence(opal_list_t *procs, int collect_data) &kp, pmix_kvs_name, pmix_vallen_max, kvs_get); if (OPAL_SUCCESS != rc) { OPAL_ERROR_LOG(rc); - return rc; + goto cleanup; } if (NULL == kp || NULL == kp->data.string) { /* if we share a node, but we don't know anything more, then @@ -597,6 +611,27 @@ static int s2_fence(opal_list_t *procs, int collect_data) } } + cleanup: + if (NULL != op->opcbfunc) { + op->opcbfunc(rc, op->cbdata); + } + OBJ_RELEASE(op); + return; +} + +static int s2_fencenb(opal_list_t *procs, int collect_data, + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) +{ + pmi_opcaddy_t *op; + + /* thread-shift this so we don't block in SLURM's barrier */ + op = OBJ_NEW(pmi_opcaddy_t); + op->opcbfunc = cbfunc; + op->cbdata = cbdata; + event_assign(&op->ev, opal_pmix_base.evbase, -1, + EV_WRITE, fencenb, op); + event_active(&op->ev, EV_WRITE, 1); + return OPAL_SUCCESS; }