From f8ce31f13c65da0c219faf0c5dd27a4d93f9290f Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 19 Oct 2017 18:06:38 -0700 Subject: [PATCH] Fix event registration so OpenMP/MPI coordination sides can both get notification of model declarations Signed-off-by: Ralph Castain --- .gitignore | 1 + .../pmix/src/event/pmix_event_registration.c | 25 ++- orte/test/mpi/Makefile | 2 +- orte/test/mpi/interlib.c | 177 ++++++++++++++++++ 4 files changed, 202 insertions(+), 3 deletions(-) create mode 100644 orte/test/mpi/interlib.c diff --git a/.gitignore b/.gitignore index a5cf6d18b4..cb528c92cb 100644 --- a/.gitignore +++ b/.gitignore @@ -383,6 +383,7 @@ orte/test/mpi/hello_output orte/test/mpi/hello_show_help orte/test/mpi/hello orte/test/mpi/hello++ +orte/test/mpi/interlib orte/test/mpi/loop_child orte/test/mpi/loop_spawn orte/test/mpi/mpi_barrier diff --git a/opal/mca/pmix/pmix3x/pmix/src/event/pmix_event_registration.c b/opal/mca/pmix/pmix3x/pmix/src/event/pmix_event_registration.c index ffe409e8fc..248dd4bf0e 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/event/pmix_event_registration.c +++ b/opal/mca/pmix/pmix3x/pmix/src/event/pmix_event_registration.c @@ -747,13 +747,17 @@ static void reg_event_hdlr(int sd, short args, void *cbdata) ack: /* acknowledge the registration so the caller can release - * their data */ + * their data AND record the event handler index */ if (NULL != cd->evregcbfn) { cd->evregcbfn(rc, index, cd->cbdata); } /* check if any matching notifications have been cached */ check_cached_events(cd); + if (NULL != cd->codes) { + free(cd->codes); + cd->codes = NULL; + } /* all done */ PMIX_RELEASE(cd); @@ -766,6 +770,7 @@ PMIX_EXPORT void PMIx_Register_event_handler(pmix_status_t codes[], size_t ncode void *cbdata) { pmix_rshift_caddy_t *cd; + size_t n; PMIX_ACQUIRE_THREAD(&pmix_global_lock); @@ -781,7 +786,23 @@ PMIX_EXPORT void PMIx_Register_event_handler(pmix_status_t codes[], size_t ncode /* need to thread shift this request so we can access * our global data to register this *local* event handler */ cd = PMIX_NEW(pmix_rshift_caddy_t); - cd->codes = codes; + /* we have to save the codes as we will check them against existing + * registrations AFTER we have executed the callback which allows + * the caller to release their storage */ + if (0 < ncodes) { + cd->codes = (pmix_status_t*)malloc(ncodes * sizeof(pmix_status_t)); + if (NULL == cd->codes) { + /* immediately return error */ + PMIX_RELEASE(cd); + if (NULL != cbfunc) { + cbfunc(PMIX_ERR_NOMEM, SIZE_MAX, cbdata); + } + return; + } + for (n=0; n < ncodes; n++) { + cd->codes[n] = codes[n]; + } + } cd->ncodes = ncodes; cd->info = info; cd->ninfo = ninfo; diff --git a/orte/test/mpi/Makefile b/orte/test/mpi/Makefile index 5e5b17bf94..8b826ccaff 100644 --- a/orte/test/mpi/Makefile +++ b/orte/test/mpi/Makefile @@ -5,7 +5,7 @@ PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spaw parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster early_abort \ debugger singleton_client_server intercomm_create spawn_tree init-exit77 mpi_info \ info_spawn server client paccept pconnect ring hello.sapp binding badcoll attach xlib \ - no-disconnect nonzero + no-disconnect nonzero interlib all: $(PROGS) diff --git a/orte/test/mpi/interlib.c b/orte/test/mpi/interlib.c new file mode 100644 index 0000000000..e45bb0ea43 --- /dev/null +++ b/orte/test/mpi/interlib.c @@ -0,0 +1,177 @@ +/* -*- C -*- + * + * $HEADER$ + * + * The most basic of MPI applications + */ + +#include +#include + +#include "opal/mca/hwloc/base/base.h" +#include "mpi.h" + +#include "orte/util/proc_info.h" +#include "opal/mca/pmix/base/base.h" + +static size_t interlibhandler_id = SIZE_MAX; +static opal_pmix_lock_t thread_complete; + +static void model_registration_callback(int status, + size_t errhandler_ref, + void *cbdata) +{ + opal_pmix_lock_t *lock = (opal_pmix_lock_t*)cbdata; + + interlibhandler_id = errhandler_ref; + OPAL_PMIX_WAKEUP_THREAD(lock); +} +static void model_callback(int status, + const opal_process_name_t *source, + opal_list_t *info, opal_list_t *results, + opal_pmix_notification_complete_fn_t cbfunc, + void *cbdata) +{ + opal_value_t *val; + + /* we can ignore our own callback as we obviously + * know that we are MPI */ + if (NULL != info) { + OPAL_LIST_FOREACH(val, info, opal_value_t) { + if (OPAL_STRING == val->type) { +#if 1 + opal_output(0, "Thread Model Callback Key: %s Val %s", val->key, val->data.string); +#else + if (0 == strcmp(val->key, OPAL_PMIX_MODEL_LIBRARY_NAME) && + 0 == strcmp(val->data.string, "OpenMPI")) { + goto cback; + } +#endif + } + } + } + /* otherwise, do something clever here */ + + cback: + /* we must NOT tell the event handler state machine that we + * are the last step as that will prevent it from notifying + * anyone else that might be listening for declarations */ + if (NULL != cbfunc) { + cbfunc(OPAL_SUCCESS, NULL, NULL, NULL, cbdata); + } + OPAL_PMIX_WAKEUP_THREAD(&thread_complete); +} + +static void *mylib(void *ptr) +{ + opal_list_t info, directives; + opal_value_t *kv; + int ret; + opal_pmix_lock_t lock; + + OPAL_PMIX_CONSTRUCT_LOCK(&thread_complete); + + /* declare that we are present and active */ + OBJ_CONSTRUCT(&info, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_PROGRAMMING_MODEL); + kv->type = OPAL_STRING; + kv->data.string = strdup("OpenMP"); + opal_list_append(&info, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_MODEL_LIBRARY_NAME); + kv->type = OPAL_STRING; + kv->data.string = strdup("foobar"); + opal_list_append(&info, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_MODEL_LIBRARY_VERSION); + kv->type = OPAL_STRING; + kv->data.string = strdup("1.2.3.4"); + opal_list_append(&info, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_THREADING_MODEL); + kv->type = OPAL_STRING; + kv->data.string = strdup("PTHREAD"); + opal_list_append(&info, &kv->super); + + /* call pmix to initialize these values */ + ret = opal_pmix.init(&info); + OPAL_LIST_DESTRUCT(&info); + + /* register to receive model callbacks */ + + /* give it a name so we can distinguish it */ + OBJ_CONSTRUCT(&directives, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME); + kv->type = OPAL_STRING; + kv->data.string = strdup("My-Declarations"); + opal_list_append(&directives, &kv->super); + /* specify the event code */ + OBJ_CONSTRUCT(&info, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup("status"); // the key here is irrelevant + kv->type = OPAL_INT; + kv->data.integer = OPAL_ERR_MODEL_DECLARED; + opal_list_append(&info, &kv->super); + /* we could constrain the range to proc_local - technically, this + * isn't required so long as the code that generates + * the event stipulates its range as proc_local. We rely + * on that here */ + OPAL_PMIX_CONSTRUCT_LOCK(&lock); + opal_pmix.register_evhandler(&info, &directives, model_callback, + model_registration_callback, + (void*)&lock); + OPAL_PMIX_WAIT_THREAD(&lock); + OPAL_PMIX_DESTRUCT_LOCK(&lock); + + /* wait for the model callback */ + OPAL_PMIX_WAIT_THREAD(&thread_complete); + + /* finalize */ + opal_pmix.finalize(); + + /* done */ + return NULL; +} + +int main(int argc, char* argv[]) +{ + int rank, size, rc; + hwloc_cpuset_t cpus; + char *bindings = NULL; + pid_t pid; + pthread_t mythread; + + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + pid = getpid(); + + /* spin up a thread */ + if (pthread_create(&mythread, NULL, mylib, NULL)) { + fprintf(stderr, "Error creating thread\n"); + goto done; + } + + printf("[%lu] Rank %d: getting topology\n", (unsigned long)pid, rank); + fflush(stdout); + if (OPAL_SUCCESS == opal_hwloc_base_get_topology()) { + cpus = hwloc_bitmap_alloc(); + rc = hwloc_get_cpubind(opal_hwloc_topology, cpus, HWLOC_CPUBIND_PROCESS); + hwloc_bitmap_list_asprintf(&bindings, cpus); + } + + printf("Hello, World, I am %d of %d [%d local peers]: get_cpubind: %d bitmap %s\n", + rank, size, orte_process_info.num_local_peers, rc, + (NULL == bindings) ? "NULL" : bindings); + + /* wait for the thread to finish */ + if (pthread_join(mythread, NULL)) { + fprintf(stderr, "Error joining thread\n"); + } + + done: + MPI_Finalize(); + return 0; +}