diff --git a/opal/mca/event/external/event_external_component.c b/opal/mca/event/external/event_external_component.c index 7856b7b06b..3ac2a83292 100644 --- a/opal/mca/event/external/event_external_component.c +++ b/opal/mca/event/external/event_external_component.c @@ -5,6 +5,7 @@ * reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. * + * Copyright (c) 2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -86,7 +87,7 @@ static int event_external_register (void) { event_module_include = "poll"; #endif - avail = opal_argv_join(all_available_eventops, ','); + avail = opal_argv_join((char**)all_available_eventops, ','); asprintf( &help_msg, "Comma-delimited list of libevent subsystems " "to use (%s -- available on your platform)", diff --git a/opal/mca/event/external/external.h b/opal/mca/event/external/external.h index ada10ebbae..29b2eaaef5 100644 --- a/opal/mca/event/external/external.h +++ b/opal/mca/event/external/external.h @@ -1,7 +1,7 @@ /* * Copyright (c) 2011-2015 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved. - * Copyright (c) 2015 Intel, Inc. All rights reserved. + * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. * Copyright (c) 2015-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. @@ -75,6 +75,8 @@ OPAL_DECLSPEC int opal_event_finalize(void); #define opal_event_set(b, x, fd, fg, cb, arg) event_assign((x), (b), (fd), (fg), (event_callback_fn) (cb), (arg)) +#define opal_event_assign(x, b, fd, fg, cb, arg) event_assign((x), (b), (fd), (fg), (event_callback_fn) (cb), (arg)) + #define opal_event_add(ev, tv) event_add((ev), (tv)) #define opal_event_del(ev) event_del((ev)) diff --git a/opal/mca/pmix/ext2x/configure.m4 b/opal/mca/pmix/ext2x/configure.m4 index a320eb65db..82ac30dfc5 100644 --- a/opal/mca/pmix/ext2x/configure.m4 +++ b/opal/mca/pmix/ext2x/configure.m4 @@ -12,8 +12,8 @@ # All rights reserved. # Copyright (c) 2011-2013 Los Alamos National Security, LLC. # All rights reserved. -# Copyright (c) 2010-2016 Cisco Systems, Inc. All rights reserved. -# Copyright (c) 2013-2016 Intel, Inc. All rights reserved. +# Copyright (c) 2010-2017 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2013-2017 Intel, Inc. All rights reserved. # Copyright (c) 2015-2016 Research Organization for Information Science # and Technology (RIST). All rights reserved. # $COPYRIGHT$ @@ -28,7 +28,59 @@ AC_DEFUN([MCA_opal_pmix_ext2x_CONFIG],[ AC_CONFIG_FILES([opal/mca/pmix/ext2x/Makefile]) - # check to see + OPAL_VAR_SCOPE_PUSH([PMIX_VERSION opal_pmix_ext2x_save_CPPFLAGS opal_pmix_pmix2_save_CFLAGS opal_pmix_ext2x_save_LDFLAGS opal_pmix_ext2x_save_LIBS opal_pmix_ext2x_basedir opal_pmix_ext2x_args opal_pmix_ext2x_happy opal_pmix_ext2x_sm_flag pmix_ext2x_status_filename]) + + opal_pmix_ext2x_basedir=opal/mca/pmix/ext2x + + opal_pmix_ext2x_save_CFLAGS=$CFLAGS + opal_pmix_ext2x_save_CPPFLAGS=$CPPFLAGS + opal_pmix_ext2x_save_LDFLAGS=$LDFLAGS + opal_pmix_ext2x_save_LIBS=$LIBS + + AC_ARG_ENABLE([pmix-dstore], + [AC_HELP_STRING([--enable-pmix-dstore], + [Enable PMIx shared memory data store (default: enabled)])]) + AC_MSG_CHECKING([if PMIx shared memory data store is enabled]) + if test "$enable_pmix_dstore" != "no"; then + AC_MSG_RESULT([yes]) + opal_pmix_ext2x_sm_flag=--enable-dstore + else + AC_MSG_RESULT([no (disabled)]) + opal_pmix_ext2x_sm_flag=--disable-dstore + fi + + AC_ARG_ENABLE([pmix-timing], + [AC_HELP_STRING([--enable-pmix-timing], + [Enable PMIx timing measurements (default: disabled)])]) + AC_MSG_CHECKING([if PMIx timing is enabled]) + if test "$enable_pmix_timing" == "yes"; then + AC_MSG_RESULT([yes]) + opal_pmix_ext2x_timing_flag=--enable-pmix-timing + else + AC_MSG_RESULT([no (disabled)]) + opal_pmix_ext2x_timing_flag=--disable-pmix-timing + fi + + opal_pmix_ext2x_args="--with-pmix-symbol-rename=OPAL_MCA_PMIX2X_ $opal_pmix_ext2x_sm_flag $opal_pmix_ext2x_timing_flag --without-tests-examples --disable-pmix-backward-compatibility --disable-visibility --enable-embedded-libevent --with-libevent-header=\\\"opal/mca/event/$opal_event_base_include\\\" --enable-embedded-mode" + AS_IF([test "$enable_debug" = "yes"], + [opal_pmix_ext2x_args="--enable-debug $opal_pmix_ext2x_args" + CFLAGS="$OPAL_CFLAGS_BEFORE_PICKY $OPAL_VISIBILITY_CFLAGS -g"], + [opal_pmix_ext2x_args="--disable-debug $opal_pmix_ext2x_args" + CFLAGS="$OPAL_CFLAGS_BEFORE_PICKY $OPAL_VISIBILITY_CFLAGS"]) + AS_IF([test "$with_devel_headers" = "yes"], + [opal_pmix_ext2x_args="--with-devel-headers $opal_pmix_ext2x_args"], + [opal_pmix_ext2x_args=$opal_pmix_ext2x_args]) + CPPFLAGS="-I$OPAL_TOP_SRCDIR -I$OPAL_TOP_BUILDDIR -I$OPAL_TOP_SRCDIR/opal/include -I$OPAL_TOP_BUILDDIR/opal/include $CPPFLAGS" + + OPAL_CONFIG_SUBDIR([$opal_pmix_ext2x_basedir/pmix], + [$opal_pmix_ext2x_args $opal_subdir_args 'CFLAGS=$CFLAGS' 'CPPFLAGS=$CPPFLAGS'], + [opal_pmix_ext2x_happy=1], [opal_pmix_ext2x_happy=0]) + + CFLAGS=$opal_pmix_ext2x_save_CFLAGS + CPPFLAGS=$opal_pmix_ext2x_save_CPPFLAGS + LDFLAGS=$opal_pmix_ext2x_save_LDFLAGS + LIBS=$opal_pmix_ext2x_save_LIBS + # if we are linking to an external v2.x library. If not, then # do not use this component. AC_MSG_CHECKING([if external v2.x component is to be used]) @@ -45,16 +97,24 @@ AC_DEFUN([MCA_opal_pmix_ext2x_CONFIG],[ pmix_ext2x_WRAPPER_EXTRA_LIBS=$opal_external_pmix_LIBS], [AC_MSG_RESULT([no - disqualifying this component]) opal_pmix_ext2x_happy=0])], - [AC_MSG_RESULT([no - disqualifying this component]) - opal_pmix_ext2x_happy=0]) + [AC_MSG_RESULT([no]) + opal_pmix_ext2x_happy=0]) AC_SUBST([opal_pmix_ext2x_LIBS]) AC_SUBST([opal_pmix_ext2x_CPPFLAGS]) AC_SUBST([opal_pmix_ext2x_LDFLAGS]) AC_SUBST([opal_pmix_ext2x_DEPENDENCIES]) + AC_MSG_CHECKING([PMIx extra wrapper CPPFLAGS]) + AC_MSG_RESULT([$pmix_ext2x_WRAPPER_EXTRA_CPPFLAGS]) + AC_MSG_CHECKING([PMIx extra wrapper LDFLAGS]) + AC_MSG_RESULT([$pmix_ext2x_WRAPPER_EXTRA_LDFLAGS]) + AC_MSG_CHECKING([PMIx extra wrapper LIBS]) + AC_MSG_RESULT([$pmix_ext2x_WRAPPER_EXTRA_LIBS]) + AS_IF([test $opal_pmix_ext2x_happy -eq 1], [$1], [$2]) + OPAL_VAR_SCOPE_POP ])dnl diff --git a/opal/mca/pmix/ext2x/pmix2x.c b/opal/mca/pmix/ext2x/pmix2x.c index 253276fca6..959480c9c5 100644 --- a/opal/mca/pmix/ext2x/pmix2x.c +++ b/opal/mca/pmix/ext2x/pmix2x.c @@ -1,11 +1,13 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2017 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -29,6 +31,7 @@ #include "opal/mca/hwloc/base/base.h" #include "opal/runtime/opal.h" #include "opal/runtime/opal_progress_threads.h" +#include "opal/threads/threads.h" #include "opal/util/argv.h" #include "opal/util/error.h" #include "opal/util/output.h" @@ -145,121 +148,53 @@ static void pmix2x_register_jobid(opal_jobid_t jobid, const char *nspace) opal_list_append(&mca_pmix_ext2x_component.jobids, &jptr->super); } -static void completion_handler(int status, void *cbdata) +static void event_hdlr_complete(pmix_status_t status, void *cbdata) { - opal_pmix2x_event_chain_t *chain = (opal_pmix2x_event_chain_t*)cbdata; - if (NULL != chain->info) { - OPAL_LIST_RELEASE(chain->info); - } + pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; + + OBJ_RELEASE(op); } -static void progress_local_event_hdlr(int status, - opal_list_t *results, - opal_pmix_op_cbfunc_t cbfunc, void *thiscbdata, - void *notification_cbdata) +static void return_local_event_hdlr(int status, opal_list_t *results, + opal_pmix_op_cbfunc_t cbfunc, void *thiscbdata, + void *notification_cbdata) { - opal_pmix2x_event_chain_t *chain = (opal_pmix2x_event_chain_t*)notification_cbdata; + pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)notification_cbdata; + pmix2x_opcaddy_t *op; + opal_value_t *kv; + pmix_status_t pstatus; size_t n; - opal_list_item_t *nxt; - opal_pmix2x_single_event_t *sing; - opal_pmix2x_multi_event_t *multi; - opal_pmix2x_default_event_t *def; - /* if the caller indicates that the chain is completed, then stop here */ - if (OPAL_ERR_HANDLERS_COMPLETE == status) { - goto complete; - } + OPAL_ACQUIRE_OBJECT(cd); + if (NULL != cd->pmixcbfunc) { + op = OBJ_NEW(pmix2x_opcaddy_t); - /* if any results were provided, then add them here */ - if (NULL != results) { - while (NULL != (nxt = opal_list_remove_first(results))) { - opal_list_append(results, nxt); - } - } - - /* see if we need to continue, starting with the single code events */ - if (NULL != chain->sing) { - /* the last handler was for a single code - see if there are - * any others that match this event */ - while (opal_list_get_end(&mca_pmix_ext2x_component.single_events) != (nxt = opal_list_get_next(&chain->sing->super))) { - sing = (opal_pmix2x_single_event_t*)nxt; - if (sing->code == chain->status) { - OBJ_RETAIN(chain); - chain->sing = sing; - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s PROGRESS CALLING SINGLE EVHDLR", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - sing->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - goto complete; - } - } - /* if we get here, then there are no more single code - * events that match */ - chain->sing = NULL; - /* pickup the beginning of the multi-code event list */ - if (0 < opal_list_get_size(&mca_pmix_ext2x_component.multi_events)) { - chain->multi = (opal_pmix2x_multi_event_t*)opal_list_get_begin(&mca_pmix_ext2x_component.multi_events); - } - } - - /* see if we need to continue with the multi code events */ - if (NULL != chain->multi) { - while (opal_list_get_end(&mca_pmix_ext2x_component.multi_events) != (nxt = opal_list_get_next(&chain->multi->super))) { - multi = (opal_pmix2x_multi_event_t*)nxt; - for (n=0; n < multi->ncodes; n++) { - if (multi->codes[n] == chain->status) { - /* found it - invoke the handler, pointing its - * callback function to our progression function */ - OBJ_RETAIN(chain); - chain->multi = multi; - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s PROGRESS CALLING MULTI EVHDLR", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - multi->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - goto complete; + if (NULL != results) { + /* convert the list of results to an array of info */ + op->ninfo = opal_list_get_size(results); + if (0 < op->ninfo) { + PMIX_INFO_CREATE(op->info, op->ninfo); + n=0; + OPAL_LIST_FOREACH(kv, cd->info, opal_value_t) { + (void)strncpy(op->info[n].key, kv->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&op->info[n].value, kv); + ++n; } } } - /* if we get here, then there are no more multi-mode - * events that match */ - chain->multi = NULL; - /* pickup the beginning of the default event list */ - if (0 < opal_list_get_size(&mca_pmix_ext2x_component.default_events)) { - chain->def = (opal_pmix2x_default_event_t*)opal_list_get_begin(&mca_pmix_ext2x_component.default_events); - } + /* convert the status */ + pstatus = pmix2x_convert_opalrc(status); + /* call the library's callback function */ + cd->pmixcbfunc(pstatus, op->info, op->ninfo, event_hdlr_complete, op, cd->cbdata); } - /* if they didn't want it to go to a default handler, then we are done */ - if (chain->nondefault) { - goto complete; + /* release the threadshift object */ + if (NULL != cd->info) { + OPAL_LIST_RELEASE(cd->info); } + OBJ_RELEASE(cd); - if (NULL != chain->def) { - if (opal_list_get_end(&mca_pmix_ext2x_component.default_events) != (nxt = opal_list_get_next(&chain->def->super))) { - def = (opal_pmix2x_default_event_t*)nxt; - OBJ_RETAIN(chain); - chain->def = def; - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s PROGRESS CALLING DEFAULT EVHDLR", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - def->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - } - } - - complete: - /* we still have to call their final callback */ - if (NULL != chain->final_cbfunc) { - chain->final_cbfunc(OPAL_SUCCESS, chain->final_cbdata); - } - /* maintain acctng */ - OBJ_RELEASE(chain); - /* let the caller know that we are done with their callback */ + /* release the caller */ if (NULL != cbfunc) { cbfunc(OPAL_SUCCESS, thiscbdata); } @@ -268,93 +203,34 @@ static void progress_local_event_hdlr(int status, static void _event_hdlr(int sd, short args, void *cbdata) { pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata; - size_t n; - opal_pmix2x_event_chain_t *chain; - opal_pmix2x_single_event_t *sing; - opal_pmix2x_multi_event_t *multi; - opal_pmix2x_default_event_t *def; + opal_pmix2x_event_t *event; + + OPAL_ACQUIRE_OBJECT(cd); opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s _EVENT_HDLR RECEIVED NOTIFICATION OF STATUS %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), cd->status); + "%s _EVENT_HDLR RECEIVED NOTIFICATION FOR HANDLER %d OF STATUS %d", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), (int)cd->id, cd->status); - chain = OBJ_NEW(opal_pmix2x_event_chain_t); - /* point it at our final callback */ - chain->final_cbfunc = completion_handler; - chain->final_cbdata = chain; - - /* carry across provided info */ - chain->status = cd->status; - chain->source = cd->pname; - chain->info = cd->info; - chain->nondefault = cd->nondefault; - - /* cycle thru the single-event registrations first */ - OPAL_LIST_FOREACH(sing, &mca_pmix_ext2x_component.single_events, opal_pmix2x_single_event_t) { - if (sing->code == chain->status) { + /* cycle thru the registrations */ + OPAL_LIST_FOREACH(event, &mca_pmix_ext2x_component.events, opal_pmix2x_event_t) { + if (cd->id == event->index) { /* found it - invoke the handler, pointing its - * callback function to our progression function */ - chain->sing = sing; + * callback function to our callback function */ opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s _EVENT_HDLR CALLING SINGLE EVHDLR", + "%s _EVENT_HDLR CALLING EVHDLR", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - sing->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); + event->handler(cd->status, &cd->pname, + cd->info, &cd->results, + return_local_event_hdlr, (void*)cd); return; } } - - /* if we didn't find any match in the single-event registrations, - * then cycle thru the multi-event registrations next */ - OPAL_LIST_FOREACH(multi, &mca_pmix_ext2x_component.multi_events, opal_pmix2x_multi_event_t) { - for (n=0; n < multi->ncodes; n++) { - if (multi->codes[n] == chain->status) { - /* found it - invoke the handler, pointing its - * callback function to our progression function */ - chain->multi = multi; - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s _EVENT_HDLR CALLING MULTI EVHDLR", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - multi->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - return; - } - } + /* if we didn't find a match, we still have to call their final callback */ + if (NULL != cd->pmixcbfunc) { + cd->pmixcbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cd->cbdata); } - - /* if they didn't want it to go to a default handler, then we are done */ - if (chain->nondefault) { - /* if we get here, then we need to cache this event in case they - * register for it later - we cannot lose individual events */ - opal_list_append(&mca_pmix_ext2x_component.cache, &chain->super); - return; - } - - /* we are done with the threadshift caddy */ + OPAL_LIST_RELEASE(cd->info); OBJ_RELEASE(cd); - - /* finally, pass it to any default handlers */ - if (0 < opal_list_get_size(&mca_pmix_ext2x_component.default_events)) { - def = (opal_pmix2x_default_event_t*)opal_list_get_first(&mca_pmix_ext2x_component.default_events); - chain->def = def; - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s _EVENT_HDLR CALLING DEFAULT EVHDLR", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - def->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - return; - } - - /* we still have to call their final callback */ - if (NULL != chain->final_cbfunc) { - chain->final_cbfunc(PMIX_SUCCESS, chain->final_cbdata); - } - - OBJ_RELEASE(chain); - return; } @@ -385,6 +261,9 @@ void pmix2x_event_hdlr(size_t evhdlr_registration_id, OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), status); cd = OBJ_NEW(pmix2x_threadshift_t); + cd->id = evhdlr_registration_id; + cd->pmixcbfunc = cbfunc; + cd->cbdata = cbdata; /* convert the incoming status */ cd->status = pmix2x_convert_rc(status); @@ -409,9 +288,6 @@ void pmix2x_event_hdlr(size_t evhdlr_registration_id, if (NULL != info) { cd->info = OBJ_NEW(opal_list_t); for (n=0; n < ninfo; n++) { - if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { - cd->nondefault = true; - } iptr = OBJ_NEW(opal_value_t); iptr->key = strdup(info[n].key); if (OPAL_SUCCESS != (rc = pmix2x_value_unload(iptr, &info[n].value))) { @@ -422,20 +298,29 @@ void pmix2x_event_hdlr(size_t evhdlr_registration_id, opal_list_append(cd->info, &iptr->super); } } - /* now push it into the local thread */ - event_assign(&cd->ev, opal_pmix_base.evbase, - -1, EV_WRITE, _event_hdlr, cd); - event_active(&cd->ev, EV_WRITE, 1); - /* we don't need any of the data they provided, - * so let them go - also tell them that we will handle - * everything from this point forward */ - if (NULL != cbfunc) { - cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); + /* convert the array of prior results */ + if (NULL != results) { + for (n=0; n < nresults; n++) { + iptr = OBJ_NEW(opal_value_t); + iptr->key = strdup(results[n].key); + if (OPAL_SUCCESS != (rc = pmix2x_value_unload(iptr, &results[n].value))) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(iptr); + continue; + } + opal_list_append(&cd->results, &iptr->super); + } } + + /* now push it into the local thread */ + opal_event_assign(&cd->ev, opal_pmix_base.evbase, + -1, EV_WRITE, _event_hdlr, cd); + OPAL_POST_OBJECT(cd); + opal_event_active(&cd->ev, EV_WRITE, 1); } -opal_vpid_t pmix2x_convert_rank(int rank) +opal_vpid_t pmix2x_convert_rank(pmix_rank_t rank) { switch(rank) { case PMIX_RANK_UNDEF: @@ -531,12 +416,15 @@ pmix_status_t pmix2x_convert_opalrc(int rc) case OPAL_ERR_PARTIAL_SUCCESS: return PMIX_QUERY_PARTIAL_SUCCESS; + case OPAL_ERR_MODEL_DECLARED: + return PMIX_MODEL_DECLARED; + case OPAL_ERROR: return PMIX_ERROR; case OPAL_SUCCESS: return PMIX_SUCCESS; default: - return PMIX_ERROR; + return rc; } } @@ -615,12 +503,22 @@ int pmix2x_convert_rc(pmix_status_t rc) case PMIX_QUERY_PARTIAL_SUCCESS: return OPAL_ERR_PARTIAL_SUCCESS; + case PMIX_MONITOR_HEARTBEAT_ALERT: + return OPAL_ERR_HEARTBEAT_ALERT; + + case PMIX_MONITOR_FILE_ALERT: + return OPAL_ERR_FILE_ALERT; + + case PMIX_MODEL_DECLARED: + return OPAL_ERR_MODEL_DECLARED; + + case PMIX_ERROR: return OPAL_ERROR; case PMIX_SUCCESS: return OPAL_SUCCESS; default: - return OPAL_ERROR; + return rc; } } @@ -735,6 +633,10 @@ void pmix2x_value_load(pmix_value_t *v, { opal_pmix2x_jobid_trkr_t *job; bool found; + opal_list_t *list; + opal_value_t *val; + pmix_info_t *info; + size_t n; switch(kv->type) { case OPAL_UNDEF: @@ -859,15 +761,15 @@ void pmix2x_value_load(pmix_value_t *v, break; case OPAL_PERSIST: v->type = PMIX_PERSIST; - v->data.persist = pmix2x_convert_opalpersist(kv->data.uint8); + v->data.persist = pmix2x_convert_opalpersist((opal_pmix_persistence_t)kv->data.uint8); break; case OPAL_SCOPE: v->type = PMIX_SCOPE; - v->data.scope = pmix2x_convert_opalscope(kv->data.uint8); + v->data.scope = pmix2x_convert_opalscope((opal_pmix_scope_t)kv->data.uint8); break; case OPAL_DATA_RANGE: v->type = PMIX_DATA_RANGE; - v->data.range = pmix2x_convert_opalrange(kv->data.uint8); + v->data.range = pmix2x_convert_opalrange((opal_pmix_data_range_t)kv->data.uint8); break; case OPAL_PROC_STATE: v->type = PMIX_PROC_STATE; @@ -876,8 +778,22 @@ void pmix2x_value_load(pmix_value_t *v, memcpy(&v->data.state, &kv->data.uint8, sizeof(uint8_t)); break; case OPAL_PTR: - v->type = PMIX_POINTER; - v->data.ptr = kv->data.ptr; + /* if someone returned a pointer, it must be to a list of + * opal_value_t's that we need to convert to a pmix_data_array + * of pmix_info_t structures */ + list = (opal_list_t*)kv->data.ptr; + v->type = PMIX_DATA_ARRAY; + v->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t)); + v->data.darray->type = PMIX_INFO; + v->data.darray->size = opal_list_get_size(list); + PMIX_INFO_CREATE(info, v->data.darray->size); + v->data.darray->array = info; + n=0; + OPAL_LIST_FOREACH(val, list, opal_value_t) { + (void)strncpy(info[n].key, val->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&info[n].value, val); + ++n; + } break; default: /* silence warnings */ @@ -891,6 +807,9 @@ int pmix2x_value_unload(opal_value_t *kv, int rc=OPAL_SUCCESS; bool found; opal_pmix2x_jobid_trkr_t *job; + opal_list_t *lt; + opal_value_t *ival; + size_t n; switch(v->type) { case PMIX_UNDEF: @@ -1033,6 +952,31 @@ int pmix2x_value_unload(opal_value_t *kv, kv->type = OPAL_PTR; kv->data.ptr = v->data.ptr; break; + case PMIX_DATA_ARRAY: + if (NULL == v->data.darray || NULL == v->data.darray->array) { + kv->data.ptr = NULL; + break; + } + lt = OBJ_NEW(opal_list_t); + kv->type = OPAL_PTR; + kv->data.ptr = (void*)lt; + for (n=0; n < v->data.darray->size; n++) { + ival = OBJ_NEW(opal_value_t); + opal_list_append(lt, &ival->super); + /* handle the various types */ + if (PMIX_INFO == v->data.darray->type) { + pmix_info_t *iptr = (pmix_info_t*)v->data.darray->array; + ival->key = strdup(iptr[n].key); + rc = pmix2x_value_unload(ival, &iptr[n].value); + if (OPAL_SUCCESS != rc) { + OPAL_LIST_RELEASE(lt); + kv->type = OPAL_UNDEF; + kv->data.ptr = NULL; + break; + } + } + } + break; default: /* silence warnings */ rc = OPAL_ERROR; @@ -1041,133 +985,77 @@ int pmix2x_value_unload(opal_value_t *kv, return rc; } +static void errreg_cbfunc (pmix_status_t status, + size_t errhandler_ref, + void *cbdata) +{ + pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; + + OPAL_ACQUIRE_OBJECT(op); + op->event->index = errhandler_ref; + opal_output_verbose(5, opal_pmix_base_framework.framework_output, + "PMIX2x errreg_cbfunc - error handler registered status=%d, reference=%lu", + status, (unsigned long)errhandler_ref); + if (NULL != op->evregcbfunc) { + op->evregcbfunc(pmix2x_convert_rc(status), errhandler_ref, op->cbdata); + } + OBJ_RELEASE(op); +} + static void _reg_hdlr(int sd, short args, void *cbdata) { pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata; - opal_pmix2x_event_chain_t *chain; - opal_pmix2x_single_event_t *sing = NULL; - opal_pmix2x_multi_event_t *multi = NULL; - opal_pmix2x_default_event_t *def = NULL; + pmix2x_opcaddy_t *op; opal_value_t *kv; - int i; - bool prepend = false; size_t n; + OPAL_ACQUIRE_OBJECT(cd); opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s REGISTER HANDLER CODES %s", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), (NULL == cd->event_codes) ? "NULL" : "NON-NULL"); - if (NULL != cd->info) { - OPAL_LIST_FOREACH(kv, cd->info, opal_value_t) { - if (0 == strcmp(kv->key, OPAL_PMIX_EVENT_ORDER_PREPEND)) { - prepend = true; - break; - } - } - } + op = OBJ_NEW(pmix2x_opcaddy_t); + op->evregcbfunc = cd->cbfunc; + op->cbdata = cd->cbdata; - if (NULL == cd->event_codes) { - /* this is a default handler */ - def = OBJ_NEW(opal_pmix2x_default_event_t); - def->handler = cd->evhandler; - def->index = mca_pmix_ext2x_component.evindex; - if (prepend) { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s PREPENDING TO DEFAULT EVENTS", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - opal_list_prepend(&mca_pmix_ext2x_component.default_events, &def->super); - } else { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s APPENDING TO DEFAULT EVENTS", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - opal_list_append(&mca_pmix_ext2x_component.default_events, &def->super); - } - } else if (1 == opal_list_get_size(cd->event_codes)) { - /* single handler */ - sing = OBJ_NEW(opal_pmix2x_single_event_t); - kv = (opal_value_t*)opal_list_get_first(cd->event_codes); - sing->code = kv->data.integer; - sing->index = mca_pmix_ext2x_component.evindex; - sing->handler = cd->evhandler; - if (prepend) { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s PREPENDING TO SINGLE EVENTS WITH CODE %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), sing->code); - opal_list_prepend(&mca_pmix_ext2x_component.single_events, &sing->super); - } else { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s APPENDING TO SINGLE EVENTS WITH CODE %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), sing->code); - opal_list_append(&mca_pmix_ext2x_component.single_events, &sing->super); - } - } else { - multi = OBJ_NEW(opal_pmix2x_multi_event_t); - multi->ncodes = opal_list_get_size(cd->event_codes); - multi->codes = (int*)malloc(multi->ncodes * sizeof(int)); - i=0; + /* convert the event codes */ + if (NULL != cd->event_codes) { + op->ncodes = opal_list_get_size(cd->event_codes); + op->pcodes = (pmix_status_t*)malloc(op->ncodes * sizeof(pmix_status_t)); + n=0; OPAL_LIST_FOREACH(kv, cd->event_codes, opal_value_t) { - multi->codes[i] = kv->data.integer; - ++i; - } - multi->index = mca_pmix_ext2x_component.evindex; - multi->handler = cd->evhandler; - if (prepend) { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s PREPENDING TO MULTI EVENTS", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - opal_list_prepend(&mca_pmix_ext2x_component.multi_events, &multi->super); - } else { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s APPENDING TO MULTI EVENTS", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - opal_list_append(&mca_pmix_ext2x_component.multi_events, &multi->super); + op->pcodes[n] = pmix2x_convert_opalrc(kv->data.integer); + ++n; } } - /* release the caller */ - if (NULL != cd->cbfunc) { - cd->cbfunc(OPAL_SUCCESS, mca_pmix_ext2x_component.evindex, cd->cbdata); - } - mca_pmix_ext2x_component.evindex++; - - /* check if any matching notifications have been cached - only nondefault - * events will have been cached*/ - if (NULL == def) { - /* check single code registrations */ - if (NULL != sing) { - OPAL_LIST_FOREACH(chain, &mca_pmix_ext2x_component.cache, opal_pmix2x_event_chain_t) { - if (sing->code == chain->status) { - opal_list_remove_item(&mca_pmix_ext2x_component.cache, &chain->super); - chain->sing = sing; - sing->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - OBJ_RELEASE(cd); - return; - } - } - } else if (NULL != multi) { - /* check for multi code registrations */ - OPAL_LIST_FOREACH(chain, &mca_pmix_ext2x_component.cache, opal_pmix2x_event_chain_t) { - for (n=0; n < multi->ncodes; n++) { - if (multi->codes[n] == chain->status) { - opal_list_remove_item(&mca_pmix_ext2x_component.cache, &chain->super); - chain->multi = multi; - multi->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - OBJ_RELEASE(cd); - return; - } - } + /* convert the list of info to an array of pmix_info_t */ + if (NULL != cd->info) { + op->ninfo = opal_list_get_size(cd->info); + if (0 < op->ninfo) { + PMIX_INFO_CREATE(op->info, op->ninfo); + n=0; + OPAL_LIST_FOREACH(kv, cd->info, opal_value_t) { + (void)strncpy(op->info[n].key, kv->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&op->info[n].value, kv); + ++n; } } } + /* register the event */ + op->event = OBJ_NEW(opal_pmix2x_event_t); + op->event->handler = cd->evhandler; + opal_list_append(&mca_pmix_ext2x_component.events, &op->event->super); + PMIx_Register_event_handler(op->pcodes, op->ncodes, + op->info, op->ninfo, + pmix2x_event_hdlr, errreg_cbfunc, op); + OBJ_RELEASE(cd); return; } + static void register_handler(opal_list_t *event_codes, opal_list_t *info, opal_pmix_notification_fn_t evhandler, @@ -1184,36 +1072,21 @@ static void register_handler(opal_list_t *event_codes, static void _dereg_hdlr(int sd, short args, void *cbdata) { pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata; - opal_pmix2x_single_event_t *sing; - opal_pmix2x_multi_event_t *multi; - opal_pmix2x_default_event_t *def; + opal_pmix2x_event_t *event; - /* check the single events first */ - OPAL_LIST_FOREACH(sing, &mca_pmix_ext2x_component.single_events, opal_pmix2x_single_event_t) { - if (cd->handler == sing->index) { - opal_list_remove_item(&mca_pmix_ext2x_component.single_events, &sing->super); - OBJ_RELEASE(sing); - goto release; - } - } - /* check multi events */ - OPAL_LIST_FOREACH(multi, &mca_pmix_ext2x_component.multi_events, opal_pmix2x_multi_event_t) { - if (cd->handler == multi->index) { - opal_list_remove_item(&mca_pmix_ext2x_component.multi_events, &multi->super); - OBJ_RELEASE(multi); - goto release; - } - } - /* check default events */ - OPAL_LIST_FOREACH(def, &mca_pmix_ext2x_component.default_events, opal_pmix2x_default_event_t) { - if (cd->handler == def->index) { - opal_list_remove_item(&mca_pmix_ext2x_component.default_events, &def->super); - OBJ_RELEASE(def); + OPAL_ACQUIRE_OBJECT(cd); + /* look for this event */ + OPAL_LIST_FOREACH(event, &mca_pmix_ext2x_component.events, opal_pmix2x_event_t) { + if (cd->handler == event->index) { + opal_list_remove_item(&mca_pmix_ext2x_component.events, &event->super); + OBJ_RELEASE(event); break; } } + /* tell the library to deregister this handler */ + PMIx_Deregister_event_handler(cd->handler, NULL, NULL); - release: + /* release the caller */ if (NULL != cd->opcbfunc) { cd->opcbfunc(OPAL_SUCCESS, cd->cbdata); } @@ -1230,90 +1103,83 @@ static void deregister_handler(size_t evhandler, return; } -static void _notify_event(int sd, short args, void *cbdata) +static void notify_complete(pmix_status_t status, void *cbdata) { - pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata; - size_t i; - opal_pmix2x_single_event_t *sing; - opal_pmix2x_multi_event_t *multi; - opal_pmix2x_default_event_t *def; - opal_pmix2x_event_chain_t *chain; - - /* check the single events first */ - OPAL_LIST_FOREACH(sing, &mca_pmix_ext2x_component.single_events, opal_pmix2x_single_event_t) { - if (cd->status == sing->code) { - /* found it - invoke the handler, pointing its - * callback function to our progression function */ - chain = OBJ_NEW(opal_pmix2x_event_chain_t); - chain->status = cd->status; - chain->range = pmix2x_convert_opalrange(cd->range); - chain->source = *(cd->source); - chain->info = cd->info; - chain->final_cbfunc = cd->opcbfunc; - chain->final_cbdata = cd->cbdata; - chain->sing = sing; - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "[%s] CALLING SINGLE EVHDLR FOR STATUS %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), chain->status); - sing->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - OBJ_RELEASE(cd); - return; - } + pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; + if (NULL != op->opcbfunc) { + op->opcbfunc(pmix2x_convert_rc(status), op->cbdata); } - /* check multi events */ - OPAL_LIST_FOREACH(multi, &mca_pmix_ext2x_component.multi_events, opal_pmix2x_multi_event_t) { - for (i=0; i < multi->ncodes; i++) { - if (cd->status == multi->codes[i]) { - /* found it - invoke the handler, pointing its - * callback function to our progression function */ - chain = OBJ_NEW(opal_pmix2x_event_chain_t); - chain->status = cd->status; - chain->range = pmix2x_convert_opalrange(cd->range); - chain->source = *(cd->source); - chain->info = cd->info; - chain->final_cbfunc = cd->opcbfunc; - chain->final_cbdata = cd->cbdata; - chain->multi = multi; - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "[%s] CALLING MULTI EVHDLR FOR STATUS %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), chain->status); - multi->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - OBJ_RELEASE(cd); - return; + OBJ_RELEASE(op); +} + +static void _notify(int sd, short args, void *cbdata) +{ + pmix2x_threadshift_t *cd = (pmix2x_threadshift_t *)cbdata; + pmix2x_opcaddy_t *op; + opal_value_t *kv; + pmix_proc_t p, *pptr; + pmix_status_t pstatus; + size_t n; + int rc=OPAL_SUCCESS; + pmix_data_range_t prange; + opal_pmix2x_jobid_trkr_t *job, *jptr; + + OPAL_ACQUIRE_OBJECT(cd); + + op = OBJ_NEW(pmix2x_opcaddy_t); + + /* convert the status */ + pstatus = pmix2x_convert_opalrc(cd->status); + + /* convert the source */ + if (NULL == cd->source) { + pptr = NULL; + } else { + /* look thru our list of jobids and find the + * corresponding nspace */ + job = NULL; + OPAL_LIST_FOREACH(jptr, &mca_pmix_ext2x_component.jobids, opal_pmix2x_jobid_trkr_t) { + if (jptr->jobid == cd->source->jobid) { + job = jptr; + break; + } + } + if (NULL == job) { + rc = OPAL_ERR_NOT_FOUND; + goto release; + } + (void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN); + p.rank = pmix2x_convert_opalrank(cd->source->vpid); + pptr = &p; + } + + /* convert the range */ + prange = pmix2x_convert_opalrange(cd->range); + + /* convert the list of info */ + if (NULL != cd->info) { + op->ninfo = opal_list_get_size(cd->info); + if (0 < op->ninfo) { + PMIX_INFO_CREATE(op->info, op->ninfo); + n=0; + OPAL_LIST_FOREACH(kv, cd->info, opal_value_t) { + (void)strncpy(op->info[n].key, kv->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&op->info[n].value, kv); + ++n; } } } - /* check default events */ - if (0 < opal_list_get_size(&mca_pmix_ext2x_component.default_events)) { - def = (opal_pmix2x_default_event_t*)opal_list_get_first(&mca_pmix_ext2x_component.default_events); - chain = OBJ_NEW(opal_pmix2x_event_chain_t); - chain->status = cd->status; - chain->range = pmix2x_convert_opalrange(cd->range); - chain->source = *(cd->source); - chain->info = cd->info; - chain->final_cbfunc = cd->opcbfunc; - chain->final_cbdata = cd->cbdata; - chain->def = def; - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "[%s] CALLING DEFAULT EVHDLR FOR STATUS %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), chain->status); - def->handler(chain->status, &chain->source, - chain->info, &chain->results, - progress_local_event_hdlr, (void*)chain); - OBJ_RELEASE(cd); - return; - } - /* if we get here, then there are no registered event handlers */ + /* ask the library to notify our clients */ + pstatus = PMIx_Notify_event(pstatus, pptr, prange, op->info, op->ninfo, notify_complete, op); + rc = pmix2x_convert_rc(pstatus); + + release: + /* release the caller */ if (NULL != cd->opcbfunc) { - cd->opcbfunc(OPAL_ERR_NOT_FOUND, cd->cbdata); + cd->opcbfunc(rc, cd->cbdata); } OBJ_RELEASE(cd); - return; } static int notify_event(int status, @@ -1324,16 +1190,115 @@ static int notify_event(int status, { /* we must threadshift this request as we might not be in an event * and we are going to access framework-global lists/objects */ - OPAL_PMIX_NOTIFY_THREADSHIFT(status, source, range, info, _notify_event, cbfunc, cbdata); + OPAL_PMIX_NOTIFY_THREADSHIFT(status, source, range, info, _notify, cbfunc, cbdata); return OPAL_SUCCESS; } +static void relcbfunc(void *cbdata) +{ + opal_list_t *results = (opal_list_t*)cbdata; + if (NULL != results) { + OPAL_LIST_RELEASE(results); + } +} + +static void infocbfunc(pmix_status_t status, + pmix_info_t *info, size_t ninfo, + void *cbdata, + pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + pmix2x_opcaddy_t *cd = (pmix2x_opcaddy_t*)cbdata; + int rc = OPAL_SUCCESS; + opal_list_t *results = NULL; + opal_value_t *iptr; + size_t n; + + OPAL_ACQUIRE_OBJECT(cd); + + /* convert the array of pmix_info_t to the list of info */ + if (NULL != info) { + results = OBJ_NEW(opal_list_t); + for (n=0; n < ninfo; n++) { + iptr = OBJ_NEW(opal_value_t); + opal_list_append(results, &iptr->super); + iptr->key = strdup(info[n].key); + if (OPAL_SUCCESS != (rc = pmix2x_value_unload(iptr, &info[n].value))) { + OPAL_LIST_RELEASE(results); + results = NULL; + break; + } + } + } + + if (NULL != release_fn) { + release_fn(release_cbdata); + } + + /* return the values to the original requestor */ + if (NULL != cd->qcbfunc) { + cd->qcbfunc(rc, results, cd->cbdata, relcbfunc, results); + } + OBJ_RELEASE(cd); +} + static void pmix2x_query(opal_list_t *queries, opal_pmix_info_cbfunc_t cbfunc, void *cbdata) { - if (NULL != cbfunc) { - cbfunc(OPAL_ERR_NOT_SUPPORTED, NULL, cbdata, NULL, NULL); + int rc; + opal_value_t *ival; + size_t n, nqueries, nq; + pmix2x_opcaddy_t *cd; + pmix_status_t prc; + opal_pmix_query_t *q; + + /* create the caddy */ + cd = OBJ_NEW(pmix2x_opcaddy_t); + + /* bozo check */ + if (NULL == queries || 0 == (nqueries = opal_list_get_size(queries))) { + rc = OPAL_ERR_BAD_PARAM; + goto CLEANUP; } + + /* setup the operation */ + cd->qcbfunc = cbfunc; + cd->cbdata = cbdata; + cd->nqueries = nqueries; + + /* convert the list to an array of query objects */ + PMIX_QUERY_CREATE(cd->queries, cd->nqueries); + n=0; + OPAL_LIST_FOREACH(q, queries, opal_pmix_query_t) { + cd->queries[n].keys = opal_argv_copy(q->keys); + cd->queries[n].nqual = opal_list_get_size(&q->qualifiers); + if (0 < cd->queries[n].nqual) { + PMIX_INFO_CREATE(cd->queries[n].qualifiers, cd->queries[n].nqual); + nq = 0; + OPAL_LIST_FOREACH(ival, &q->qualifiers, opal_value_t) { + (void)strncpy(cd->queries[n].qualifiers[nq].key, ival->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&cd->queries[n].qualifiers[nq].value, ival); + ++nq; + } + } + ++n; + } + + /* pass it down */ + if (PMIX_SUCCESS != (prc = PMIx_Query_info_nb(cd->queries, cd->nqueries, + infocbfunc, cd))) { + /* do not hang! */ + rc = pmix2x_convert_rc(prc); + goto CLEANUP; + } + + return; + + CLEANUP: + if (NULL != cbfunc) { + cbfunc(rc, NULL, cbdata, NULL, NULL); + } + OBJ_RELEASE(cd); return; } @@ -1341,6 +1306,8 @@ static void opcbfunc(pmix_status_t status, void *cbdata) { pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; + OPAL_ACQUIRE_OBJECT(op); + if (NULL != op->opcbfunc) { op->opcbfunc(pmix2x_convert_rc(status), op->cbdata); } @@ -1396,55 +1363,35 @@ static void pmix2x_log(opal_list_t *info, OBJ_RELEASE(cd); } +opal_pmix_alloc_directive_t pmix2x_convert_allocdir(pmix_alloc_directive_t dir) +{ + switch (dir) { + case PMIX_ALLOC_NEW: + return OPAL_PMIX_ALLOC_NEW; + case PMIX_ALLOC_EXTEND: + return OPAL_PMIX_ALLOC_EXTEND; + case PMIX_ALLOC_RELEASE: + return OPAL_PMIX_ALLOC_RELEASE; + case PMIX_ALLOC_REAQUIRE: + return OPAL_PMIX_ALLOC_REAQCUIRE; + default: + return OPAL_PMIX_ALLOC_UNDEF; + } +} + /**** INSTANTIATE INTERNAL CLASSES ****/ OBJ_CLASS_INSTANCE(opal_pmix2x_jobid_trkr_t, opal_list_item_t, NULL, NULL); -OBJ_CLASS_INSTANCE(opal_pmix2x_single_event_t, - opal_list_item_t, - NULL, NULL); - -static void mtevcon(opal_pmix2x_multi_event_t *p) +static void evcon(opal_pmix2x_event_t *p) { - p->codes = NULL; - p->ncodes = 0; + p->handler = NULL; + p->cbdata = NULL; } -static void mtevdes(opal_pmix2x_multi_event_t *p) -{ - if (NULL != p->codes) { - free(p->codes); - } -} -OBJ_CLASS_INSTANCE(opal_pmix2x_multi_event_t, +OBJ_CLASS_INSTANCE(opal_pmix2x_event_t, opal_list_item_t, - mtevcon, mtevdes); - -OBJ_CLASS_INSTANCE(opal_pmix2x_default_event_t, - opal_list_item_t, - NULL, NULL); - -static void chcon(opal_pmix2x_event_chain_t *p) -{ - p->nondefault = false; - p->info = NULL; - OBJ_CONSTRUCT(&p->results, opal_list_t); - p->sing = NULL; - p->multi = NULL; - p->def = NULL; - p->final_cbfunc = NULL; - p->final_cbdata = NULL; -} -static void chdes(opal_pmix2x_event_chain_t *p) -{ - OPAL_LIST_DESTRUCT(&p->results); - if (NULL != p->info) { - OPAL_LIST_RELEASE(p->info); - } -} -OBJ_CLASS_INSTANCE(opal_pmix2x_event_chain_t, - opal_list_item_t, - chcon, chdes); + evcon, NULL); static void opcon(pmix2x_opcaddy_t *p) { @@ -1458,11 +1405,17 @@ static void opcon(pmix2x_opcaddy_t *p) p->apps = NULL; p->sz = 0; p->active = false; + p->codes = NULL; + p->pcodes = NULL; + p->queries = NULL; + p->nqueries = 0; + p->event = NULL; p->opcbfunc = NULL; p->mdxcbfunc = NULL; p->valcbfunc = NULL; p->lkcbfunc = NULL; p->spcbfunc = NULL; + p->evregcbfunc = NULL; p->cbdata = NULL; } static void opdes(pmix2x_opcaddy_t *p) @@ -1473,12 +1426,18 @@ static void opdes(pmix2x_opcaddy_t *p) if (NULL != p->error_procs) { PMIX_PROC_FREE(p->error_procs, p->nerror_procs); } - if (NULL != p->info) { - PMIX_INFO_FREE(p->info, p->sz); + if (0 < p->ninfo) { + PMIX_INFO_FREE(p->info, p->ninfo); } if (NULL != p->apps) { PMIX_APP_FREE(p->apps, p->sz); } + if (NULL != p->pcodes) { + free(p->pcodes); + } + if (NULL != p->queries) { + PMIX_QUERY_FREE(p->queries, p->nqueries); + } } OBJ_CLASS_INSTANCE(pmix2x_opcaddy_t, opal_object_t, @@ -1516,12 +1475,33 @@ static void tscon(pmix2x_threadshift_t *p) p->source = NULL; p->event_codes = NULL; p->info = NULL; + OBJ_CONSTRUCT(&p->results, opal_list_t); p->evhandler = NULL; p->nondefault = false; p->cbfunc = NULL; p->opcbfunc = NULL; p->cbdata = NULL; } +static void tsdes(pmix2x_threadshift_t *p) +{ + OPAL_LIST_DESTRUCT(&p->results); +} OBJ_CLASS_INSTANCE(pmix2x_threadshift_t, opal_object_t, - tscon, NULL); + tscon, tsdes); + +static void dmcon(opal_pmix2x_dmx_trkr_t *p) +{ + p->nspace = NULL; + p->cbfunc = NULL; + p->cbdata = NULL; +} +static void dmdes(opal_pmix2x_dmx_trkr_t *p) +{ + if (NULL != p->nspace) { + free(p->nspace); + } +} +OBJ_CLASS_INSTANCE(opal_pmix2x_dmx_trkr_t, + opal_list_item_t, + dmcon, dmdes); diff --git a/opal/mca/pmix/ext2x/pmix2x.h b/opal/mca/pmix/ext2x/pmix2x.h index 29aca672f3..c4b47a163f 100644 --- a/opal/mca/pmix/ext2x/pmix2x.h +++ b/opal/mca/pmix/ext2x/pmix2x.h @@ -1,9 +1,12 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2017 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -39,11 +42,10 @@ typedef struct { opal_list_t jobids; bool native_launch; size_t evindex; - opal_list_t single_events; - opal_list_t multi_events; - opal_list_t default_events; + opal_list_t events; int cache_size; opal_list_t cache; + opal_list_t dmdx; } mca_pmix_ext2x_component_t; OPAL_DECLSPEC extern mca_pmix_ext2x_component_t mca_pmix_ext2x_component; @@ -61,42 +63,18 @@ OBJ_CLASS_DECLARATION(opal_pmix2x_jobid_trkr_t); typedef struct { opal_list_item_t super; size_t index; - int code; opal_pmix_notification_fn_t handler; -} opal_pmix2x_single_event_t; -OBJ_CLASS_DECLARATION(opal_pmix2x_single_event_t); + void *cbdata; +} opal_pmix2x_event_t; +OBJ_CLASS_DECLARATION(opal_pmix2x_event_t); typedef struct { opal_list_item_t super; - size_t index; - int *codes; - size_t ncodes; - opal_pmix_notification_fn_t handler; -} opal_pmix2x_multi_event_t; -OBJ_CLASS_DECLARATION(opal_pmix2x_multi_event_t); - -typedef struct { - opal_list_item_t super; - size_t index; - opal_pmix_notification_fn_t handler; -} opal_pmix2x_default_event_t; -OBJ_CLASS_DECLARATION(opal_pmix2x_default_event_t); - -typedef struct { - opal_list_item_t super; - int status; - bool nondefault; - opal_process_name_t source; - pmix_data_range_t range; - opal_list_t *info; - opal_list_t results; - opal_pmix2x_single_event_t *sing; - opal_pmix2x_multi_event_t *multi; - opal_pmix2x_default_event_t *def; - opal_pmix_op_cbfunc_t final_cbfunc; - void *final_cbdata; -} opal_pmix2x_event_chain_t; -OBJ_CLASS_DECLARATION(opal_pmix2x_event_chain_t); + char *nspace; + pmix_modex_cbfunc_t cbfunc; + void *cbdata; +} opal_pmix2x_dmx_trkr_t; +OBJ_CLASS_DECLARATION(opal_pmix2x_dmx_trkr_t); typedef struct { opal_object_t super; @@ -111,11 +89,19 @@ typedef struct { pmix_app_t *apps; size_t sz; volatile bool active; + opal_list_t *codes; + pmix_status_t *pcodes; + size_t ncodes; + pmix_query_t *queries; + size_t nqueries; + opal_pmix2x_event_t *event; opal_pmix_op_cbfunc_t opcbfunc; opal_pmix_modex_cbfunc_t mdxcbfunc; opal_pmix_value_cbfunc_t valcbfunc; opal_pmix_lookup_cbfunc_t lkcbfunc; opal_pmix_spawn_cbfunc_t spcbfunc; + opal_pmix_evhandler_reg_cbfunc_t evregcbfunc; + opal_pmix_info_cbfunc_t qcbfunc; void *cbdata; } pmix2x_opcaddy_t; OBJ_CLASS_DECLARATION(pmix2x_opcaddy_t); @@ -152,28 +138,15 @@ typedef struct { size_t handler; opal_list_t *event_codes; opal_list_t *info; + opal_list_t results; opal_pmix_notification_fn_t evhandler; opal_pmix_evhandler_reg_cbfunc_t cbfunc; opal_pmix_op_cbfunc_t opcbfunc; + pmix_event_notification_cbfunc_fn_t pmixcbfunc; void *cbdata; } pmix2x_threadshift_t; OBJ_CLASS_DECLARATION(pmix2x_threadshift_t); -#define OPAL_PMIX_OPCD_THREADSHIFT(i, s, sr, if, nif, fn, cb, cd) \ - do { \ - pmix2x_opalcaddy_t *_cd; \ - _cd = OBJ_NEW(pmix2x_opalcaddy_t); \ - _cd->id = (i); \ - _cd->status = (s); \ - _cd->source = (sr); \ - _cd->info = (i); \ - _cd->evcbfunc = (cb); \ - _cd->cbdata = (cd); \ - event_assign(&((_cd)->ev), opal_pmix_base.evbase, \ - -1, EV_WRITE, (fn), (_cd)); \ - event_active(&((_cd)->ev), EV_WRITE, 1); \ - } while(0) - #define OPAL_PMIX_OP_THREADSHIFT(e, fn, cb, cd) \ do { \ pmix2x_threadshift_t *_cd; \ @@ -181,9 +154,10 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t); _cd->handler = (e); \ _cd->opcbfunc = (cb); \ _cd->cbdata = (cd); \ - event_assign(&((_cd)->ev), opal_pmix_base.evbase, \ - -1, EV_WRITE, (fn), (_cd)); \ - event_active(&((_cd)->ev), EV_WRITE, 1); \ + opal_event_assign(&((_cd)->ev), opal_pmix_base.evbase, \ + -1, EV_WRITE, (fn), (_cd)); \ + OPAL_POST_OBJECT(_cd); \ + opal_event_active(&((_cd)->ev), EV_WRITE, 1); \ } while(0) #define OPAL_PMIX_THREADSHIFT(e, i, eh, fn, cb, cd) \ @@ -195,9 +169,10 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t); _cd->evhandler = (eh); \ _cd->cbfunc = (cb); \ _cd->cbdata = (cd); \ - event_assign(&((_cd)->ev), opal_pmix_base.evbase, \ - -1, EV_WRITE, (fn), (_cd)); \ - event_active(&((_cd)->ev), EV_WRITE, 1); \ + opal_event_assign(&((_cd)->ev), opal_pmix_base.evbase, \ + -1, EV_WRITE, (fn), (_cd)); \ + OPAL_POST_OBJECT(_cd); \ + opal_event_active(&((_cd)->ev), EV_WRITE, 1); \ } while(0) #define OPAL_PMIX_NOTIFY_THREADSHIFT(s, sr, r, i, fn, cb, cd) \ @@ -210,9 +185,10 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t); _cd->info = (i); \ _cd->opcbfunc = (cb); \ _cd->cbdata = (cd); \ - event_assign(&((_cd)->ev), opal_pmix_base.evbase, \ + opal_event_assign(&((_cd)->ev), opal_pmix_base.evbase, \ -1, EV_WRITE, (fn), (_cd)); \ - event_active(&((_cd)->ev), EV_WRITE, 1); \ + OPAL_POST_OBJECT(_cd); \ + opal_event_active(&((_cd)->ev), EV_WRITE, 1); \ } while(0) /**** CLIENT FUNCTIONS ****/ @@ -301,7 +277,7 @@ OPAL_MODULE_DECLSPEC void pmix2x_event_hdlr(size_t evhdlr_registration_id, OPAL_MODULE_DECLSPEC pmix_status_t pmix2x_convert_opalrc(int rc); OPAL_MODULE_DECLSPEC int pmix2x_convert_rc(pmix_status_t rc); -OPAL_MODULE_DECLSPEC opal_vpid_t pmix2x_convert_rank(int rank); +OPAL_MODULE_DECLSPEC opal_vpid_t pmix2x_convert_rank(pmix_rank_t rank); OPAL_MODULE_DECLSPEC pmix_rank_t pmix2x_convert_opalrank(opal_vpid_t vpid); OPAL_MODULE_DECLSPEC opal_pmix_scope_t pmix2x_convert_scope(pmix_scope_t scope); @@ -318,6 +294,8 @@ OPAL_MODULE_DECLSPEC void pmix2x_value_load(pmix_value_t *v, OPAL_MODULE_DECLSPEC int pmix2x_value_unload(opal_value_t *kv, const pmix_value_t *v); +OPAL_MODULE_DECLSPEC opal_pmix_alloc_directive_t pmix2x_convert_allocdir(pmix_alloc_directive_t dir); + END_C_DECLS #endif /* MCA_PMIX_EXTERNAL_H */ diff --git a/opal/mca/pmix/ext2x/pmix2x_client.c b/opal/mca/pmix/ext2x/pmix2x_client.c index 28485f170b..12da6c2a37 100644 --- a/opal/mca/pmix/ext2x/pmix2x_client.c +++ b/opal/mca/pmix/ext2x/pmix2x_client.c @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. - * Copyright (c) 2014-2017 Research Organization for Information Science + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Mellanox Technologies, Inc. * All rights reserved. @@ -27,6 +27,7 @@ #endif #include "opal/hash_string.h" +#include "opal/threads/threads.h" #include "opal/util/argv.h" #include "opal/util/proc.h" @@ -36,13 +37,15 @@ static pmix_proc_t my_proc; static char *dbgvalue=NULL; -static size_t errhdler_ref = 0; +static volatile bool regactive; +static bool initialized = false; #define PMIX_WAIT_FOR_COMPLETION(a) \ do { \ while ((a)) { \ usleep(10); \ } \ + OPAL_ACQUIRE_OBJECT(a); \ } while (0) @@ -50,10 +53,16 @@ static void errreg_cbfunc (pmix_status_t status, size_t errhandler_ref, void *cbdata) { - errhdler_ref = errhandler_ref; + opal_pmix2x_event_t *event = (opal_pmix2x_event_t*)cbdata; + + OPAL_ACQUIRE_OBJECT(event); + + event->index = errhandler_ref; opal_output_verbose(5, opal_pmix_base_framework.framework_output, "PMIX client errreg_cbfunc - error handler registered status=%d, reference=%lu", status, (unsigned long)errhandler_ref); + regactive = false; + OPAL_POST_OBJECT(regactive); } int pmix2x_client_init(opal_list_t *ilist) @@ -62,19 +71,52 @@ int pmix2x_client_init(opal_list_t *ilist) pmix_status_t rc; int dbg; opal_pmix2x_jobid_trkr_t *job; + opal_pmix2x_event_t *event; + pmix_info_t *pinfo; + size_t ninfo, n; + opal_value_t *ival; opal_output_verbose(1, opal_pmix_base_framework.framework_output, "PMIx_client init"); - if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) { - asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg); - putenv(dbgvalue); + if (!initialized) { + if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) { + asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg); + putenv(dbgvalue); + } } - rc = PMIx_Init(&my_proc, NULL, 0); + /* convert the incoming list to info structs */ + if (NULL != ilist) { + ninfo = opal_list_get_size(ilist); + if (0 < ninfo) { + PMIX_INFO_CREATE(pinfo, ninfo); + n=0; + OPAL_LIST_FOREACH(ival, ilist, opal_value_t) { + (void)strncpy(pinfo[n].key, ival->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&pinfo[n].value, ival); + ++n; + } + } else { + pinfo = NULL; + } + } else { + pinfo = NULL; + ninfo = 0; + } + + rc = PMIx_Init(&my_proc, pinfo, ninfo); if (PMIX_SUCCESS != rc) { return pmix2x_convert_rc(rc); } + if (0 < ninfo) { + PMIX_INFO_FREE(pinfo, ninfo); + + } + if (initialized) { + return OPAL_SUCCESS; + } + initialized = true; /* store our jobid and rank */ if (NULL != getenv(OPAL_MCA_PREFIX"orte_launch")) { @@ -98,7 +140,15 @@ int pmix2x_client_init(opal_list_t *ilist) opal_proc_set_name(&pname); /* register the default event handler */ - PMIx_Register_event_handler(NULL, 0, NULL, 0, pmix2x_event_hdlr, errreg_cbfunc, NULL); + event = OBJ_NEW(opal_pmix2x_event_t); + opal_list_append(&mca_pmix_ext2x_component.events, &event->super); + PMIX_INFO_CREATE(pinfo, 1); + PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-DEFAULT", PMIX_STRING); + regactive = true; + PMIx_Register_event_handler(NULL, 0, pinfo, 1, pmix2x_event_hdlr, errreg_cbfunc, event); + PMIX_WAIT_FOR_COMPLETION(regactive); + PMIX_INFO_FREE(pinfo, 1); + return OPAL_SUCCESS; } @@ -106,12 +156,16 @@ int pmix2x_client_init(opal_list_t *ilist) int pmix2x_client_finalize(void) { pmix_status_t rc; + opal_pmix2x_event_t *event; opal_output_verbose(1, opal_pmix_base_framework.framework_output, "PMIx_client finalize"); - /* deregister the default event handler */ - PMIx_Deregister_event_handler(errhdler_ref, NULL, NULL); + /* deregister all event handlers */ + OPAL_LIST_FOREACH(event, &mca_pmix_ext2x_component.events, opal_pmix2x_event_t) { + PMIx_Deregister_event_handler(event->index, NULL, NULL); + } + /* the list will be destructed when the component is finalized */ rc = PMIx_Finalize(NULL, 0); return pmix2x_convert_rc(rc); @@ -122,7 +176,7 @@ int pmix2x_initialized(void) opal_output_verbose(1, opal_pmix_base_framework.framework_output, "PMIx_client initialized"); - return PMIx_Initialized(); + return initialized; } int pmix2x_abort(int flag, const char *msg, @@ -192,7 +246,6 @@ int pmix2x_store_local(const opal_process_name_t *proc, opal_value_t *val) } } if (NULL == job) { - OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); return OPAL_ERR_NOT_FOUND; } (void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN); @@ -224,6 +277,7 @@ static void opcbfunc(pmix_status_t status, void *cbdata) { pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; + OPAL_ACQUIRE_OBJECT(op); if (NULL != op->opcbfunc) { op->opcbfunc(pmix2x_convert_rc(status), op->cbdata); } @@ -473,6 +527,8 @@ static void val_cbfunc(pmix_status_t status, int rc; opal_value_t val, *v=NULL; + OPAL_ACQUIRE_OBJECT(op); + rc = pmix2x_convert_opalrc(status); if (PMIX_SUCCESS == status && NULL != kv) { rc = pmix2x_value_unload(&val, kv); @@ -720,6 +776,8 @@ static void lk_cbfunc(pmix_status_t status, size_t n; opal_pmix2x_jobid_trkr_t *job, *jptr; + OPAL_ACQUIRE_OBJECT(op); + /* this is in the PMIx local thread - need to threadshift to * our own thread as we will be accessing framework-global * lists and objects */ @@ -769,7 +827,7 @@ static void lk_cbfunc(pmix_status_t status, } r = &results; } -release: + release: /* execute the callback */ op->lkcbfunc(rc, r, op->cbdata); @@ -946,6 +1004,8 @@ static void spcbfunc(pmix_status_t status, opal_jobid_t jobid=OPAL_JOBID_INVALID; opal_pmix2x_jobid_trkr_t *job; + OPAL_ACQUIRE_OBJECT(op); + /* this is in the PMIx local thread - need to threadshift to * our own thread as we will be accessing framework-global * lists and objects */ diff --git a/opal/mca/pmix/ext2x/pmix2x_component.c b/opal/mca/pmix/ext2x/pmix2x_component.c index fb1af6a74a..5ea1c3febc 100644 --- a/opal/mca/pmix/ext2x/pmix2x_component.c +++ b/opal/mca/pmix/ext2x/pmix2x_component.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved. @@ -28,7 +28,7 @@ * Public string showing the pmix external component version number */ const char *opal_pmix_ext2x_component_version_string = - "OPAL ext2x MCA component version " OPAL_VERSION; + "OPAL pmix2x MCA component version " OPAL_VERSION; /* * Local function @@ -36,7 +36,6 @@ const char *opal_pmix_ext2x_component_version_string = static int external_open(void); static int external_close(void); static int external_component_query(mca_base_module_t **module, int *priority); -static int external_register(void); /* @@ -66,7 +65,6 @@ mca_pmix_ext2x_component_t mca_pmix_ext2x_component = { .mca_open_component = external_open, .mca_close_component = external_close, .mca_query_component = external_component_query, - .mca_register_component_params = external_register, }, /* Next the MCA v1.0.0 component meta data */ .base_data = { @@ -77,27 +75,12 @@ mca_pmix_ext2x_component_t mca_pmix_ext2x_component = { .native_launch = false }; -static int external_register(void) -{ - mca_pmix_ext2x_component.cache_size = 256; - mca_base_component_var_register(&mca_pmix_ext2x_component.super.base_version, - "cache_size", "Size of the ring buffer cache for events", - MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_5, - MCA_BASE_VAR_SCOPE_CONSTANT, - &mca_pmix_ext2x_component.cache_size); - - return OPAL_SUCCESS; -} - - static int external_open(void) { mca_pmix_ext2x_component.evindex = 0; OBJ_CONSTRUCT(&mca_pmix_ext2x_component.jobids, opal_list_t); - OBJ_CONSTRUCT(&mca_pmix_ext2x_component.single_events, opal_list_t); - OBJ_CONSTRUCT(&mca_pmix_ext2x_component.multi_events, opal_list_t); - OBJ_CONSTRUCT(&mca_pmix_ext2x_component.default_events, opal_list_t); - OBJ_CONSTRUCT(&mca_pmix_ext2x_component.cache, opal_list_t); + OBJ_CONSTRUCT(&mca_pmix_ext2x_component.events, opal_list_t); + OBJ_CONSTRUCT(&mca_pmix_ext2x_component.dmdx, opal_list_t); return OPAL_SUCCESS; } @@ -105,10 +88,8 @@ static int external_open(void) static int external_close(void) { OPAL_LIST_DESTRUCT(&mca_pmix_ext2x_component.jobids); - OPAL_LIST_DESTRUCT(&mca_pmix_ext2x_component.single_events); - OPAL_LIST_DESTRUCT(&mca_pmix_ext2x_component.multi_events); - OPAL_LIST_DESTRUCT(&mca_pmix_ext2x_component.default_events); - OPAL_LIST_DESTRUCT(&mca_pmix_ext2x_component.cache); + OPAL_LIST_DESTRUCT(&mca_pmix_ext2x_component.events); + OPAL_LIST_DESTRUCT(&mca_pmix_ext2x_component.dmdx); return OPAL_SUCCESS; } diff --git a/opal/mca/pmix/ext2x/pmix2x_server_north.c b/opal/mca/pmix/ext2x/pmix2x_server_north.c index df23ab2720..3c37bae19a 100644 --- a/opal/mca/pmix/ext2x/pmix2x_server_north.c +++ b/opal/mca/pmix/ext2x/pmix2x_server_north.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Mellanox Technologies, Inc. @@ -29,6 +29,7 @@ #include "opal/mca/hwloc/base/base.h" #include "opal/runtime/opal.h" #include "opal/runtime/opal_progress_threads.h" +#include "opal/threads/threads.h" #include "opal/util/argv.h" #include "opal/util/error.h" #include "opal/util/output.h" @@ -45,63 +46,73 @@ /* These are the interfaces used by the embedded PMIx server * to call up into ORTE for service requests */ - static pmix_status_t server_client_connected_fn(const pmix_proc_t *proc, void* server_object, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_client_finalized_fn(const pmix_proc_t *proc, void* server_object, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_abort_fn(const pmix_proc_t *proc, void *server_object, - int status, const char msg[], - pmix_proc_t procs[], size_t nprocs, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_fencenb_fn(const pmix_proc_t procs[], size_t nprocs, - const pmix_info_t info[], size_t ninfo, - char *data, size_t ndata, - pmix_modex_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *proc, - const pmix_info_t info[], size_t ninfo, - pmix_modex_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_publish_fn(const pmix_proc_t *proc, - const pmix_info_t info[], size_t ninfo, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_lookup_fn(const pmix_proc_t *proc, char **keys, +static pmix_status_t server_client_connected_fn(const pmix_proc_t *proc, void* server_object, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_client_finalized_fn(const pmix_proc_t *proc, void* server_object, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_abort_fn(const pmix_proc_t *proc, void *server_object, + int status, const char msg[], + pmix_proc_t procs[], size_t nprocs, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_fencenb_fn(const pmix_proc_t procs[], size_t nprocs, const pmix_info_t info[], size_t ninfo, - pmix_lookup_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_unpublish_fn(const pmix_proc_t *proc, char **keys, + char *data, size_t ndata, + pmix_modex_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *proc, + const pmix_info_t info[], size_t ninfo, + pmix_modex_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_publish_fn(const pmix_proc_t *proc, + const pmix_info_t info[], size_t ninfo, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_lookup_fn(const pmix_proc_t *proc, char **keys, + const pmix_info_t info[], size_t ninfo, + pmix_lookup_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_unpublish_fn(const pmix_proc_t *proc, char **keys, + const pmix_info_t info[], size_t ninfo, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_spawn_fn(const pmix_proc_t *proc, + const pmix_info_t job_info[], size_t ninfo, + const pmix_app_t apps[], size_t napps, + pmix_spawn_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_connect_fn(const pmix_proc_t procs[], size_t nprocs, + const pmix_info_t info[], size_t ninfo, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_disconnect_fn(const pmix_proc_t procs[], size_t nprocs, const pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_spawn_fn(const pmix_proc_t *proc, - const pmix_info_t job_info[], size_t ninfo, - const pmix_app_t apps[], size_t napps, - pmix_spawn_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_connect_fn(const pmix_proc_t procs[], size_t nprocs, - const pmix_info_t info[], size_t ninfo, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_disconnect_fn(const pmix_proc_t procs[], size_t nprocs, - const pmix_info_t info[], size_t ninfo, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_register_events(pmix_status_t *codes, size_t ncodes, - const pmix_info_t info[], size_t ninfo, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_deregister_events(pmix_status_t *codes, size_t ncodes, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_notify_event(pmix_status_t code, - const pmix_proc_t *source, - pmix_data_range_t range, - pmix_info_t info[], size_t ninfo, - pmix_op_cbfunc_t cbfunc, void *cbdata); - static pmix_status_t server_query(pmix_proc_t *proct, - pmix_query_t *queryies, size_t nqueries, - pmix_info_cbfunc_t cbfunc, +static pmix_status_t server_register_events(pmix_status_t *codes, size_t ncodes, + const pmix_info_t info[], size_t ninfo, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_deregister_events(pmix_status_t *codes, size_t ncodes, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_notify_event(pmix_status_t code, + const pmix_proc_t *source, + pmix_data_range_t range, + pmix_info_t info[], size_t ninfo, + pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t server_query(pmix_proc_t *proct, + pmix_query_t *queryies, size_t nqueries, + pmix_info_cbfunc_t cbfunc, + void *cbdata); +static void server_tool_connection(pmix_info_t *info, size_t ninfo, + pmix_tool_connection_cbfunc_t cbfunc, void *cbdata); - static void server_tool_connection(pmix_info_t *info, size_t ninfo, - pmix_tool_connection_cbfunc_t cbfunc, - void *cbdata); static void server_log(const pmix_proc_t *client, const pmix_info_t data[], size_t ndata, const pmix_info_t directives[], size_t ndirs, pmix_op_cbfunc_t cbfunc, void *cbdata); - pmix_server_module_t mymodule = { +static pmix_status_t server_allocate(const pmix_proc_t *client, + pmix_alloc_directive_t directive, + const pmix_info_t data[], size_t ndata, + pmix_info_cbfunc_t cbfunc, void *cbdata); + +static pmix_status_t server_job_control(const pmix_proc_t *requestor, + const pmix_proc_t targets[], size_t ntargets, + const pmix_info_t directives[], size_t ndirs, + pmix_info_cbfunc_t cbfunc, void *cbdata); + +pmix_server_module_t mymodule = { .client_connected = server_client_connected_fn, .client_finalized = server_client_finalized_fn, .abort = server_abort_fn, @@ -118,7 +129,11 @@ static void server_log(const pmix_proc_t *client, .notify_event = server_notify_event, .query = server_query, .tool_connected = server_tool_connection, - .log = server_log + .log = server_log, + .allocate = server_allocate, + .job_control = server_job_control + /* we do not support monitoring, but use the + * PMIx internal monitoring capability */ }; opal_pmix_server_module_t *host_module = NULL; @@ -128,6 +143,7 @@ static void opal_opcbfunc(int status, void *cbdata) { pmix2x_opalcaddy_t *opalcaddy = (pmix2x_opalcaddy_t*)cbdata; + OPAL_ACQUIRE_OBJECT(opalcaddy); if (NULL != opalcaddy->opcbfunc) { opalcaddy->opcbfunc(pmix2x_convert_opalrc(status), opalcaddy->cbdata); } @@ -252,6 +268,7 @@ static void opmdx_response(int status, const char *data, size_t sz, void *cbdata { pmix_status_t rc; pmix2x_opalcaddy_t *opalcaddy = (pmix2x_opalcaddy_t*)cbdata; + opal_pmix2x_dmx_trkr_t *dmdx; rc = pmix2x_convert_rc(status); if (NULL != opalcaddy->mdxcbfunc) { @@ -259,6 +276,13 @@ static void opmdx_response(int status, const char *data, size_t sz, void *cbdata opalcaddy->ocbdata = relcbdata; opalcaddy->mdxcbfunc(rc, data, sz, opalcaddy->cbdata, _data_release, opalcaddy); + /* if we were collecting all data, then check for any pending + * dmodx requests that we cached and notify them that the + * data has arrived */ + while (NULL != (dmdx = (opal_pmix2x_dmx_trkr_t*)opal_list_remove_first(&mca_pmix_ext2x_component.dmdx))) { + dmdx->cbfunc(PMIX_SUCCESS, NULL, 0, dmdx->cbdata, NULL, NULL); + OBJ_RELEASE(dmdx); + } } else { OBJ_RELEASE(opalcaddy); } @@ -278,7 +302,6 @@ static pmix_status_t server_fencenb_fn(const pmix_proc_t procs[], size_t nprocs, if (NULL == host_module || NULL == host_module->fence_nb) { return PMIX_ERR_NOT_SUPPORTED; } - /* setup the caddy */ opalcaddy = OBJ_NEW(pmix2x_opalcaddy_t); opalcaddy->mdxcbfunc = cbfunc; @@ -324,6 +347,7 @@ static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *p, opal_process_name_t proc; opal_value_t *iptr; size_t n; + opal_pmix2x_dmx_trkr_t *dmdx; if (NULL == host_module || NULL == host_module->direct_modex) { return PMIX_ERR_NOT_SUPPORTED; @@ -340,6 +364,21 @@ static pmix_status_t server_dmodex_req_fn(const pmix_proc_t *p, opalcaddy->mdxcbfunc = cbfunc; opalcaddy->cbdata = cbdata; + /* this function should only get called if we are in an async modex. + * If we are also collecting data, then the fence_nb will eventually + * complete and return all the required data down to the pmix + * server beneath us. Thus, we only need to track the dmodex_req + * and ensure that the release gets called once the data has + * arrived - this will trigger the pmix server to tell the + * client that the data is available */ + if (opal_pmix_base_async_modex && opal_pmix_collect_all_data) { + dmdx = OBJ_NEW(opal_pmix2x_dmx_trkr_t); + dmdx->cbfunc = cbfunc; + dmdx->cbdata = cbdata; + opal_list_append(&mca_pmix_ext2x_component.dmdx, &dmdx->super); + return PMIX_SUCCESS; + } + /* convert the array of pmix_info_t to the list of info */ for (n=0; n < ninfo; n++) { iptr = OBJ_NEW(opal_value_t); @@ -1016,6 +1055,7 @@ static void server_log(const pmix_proc_t *proct, /* convert the data */ for (n=0; n < ndata; n++) { oinfo = OBJ_NEW(opal_value_t); + oinfo->key = strdup(data[n].key); /* we "borrow" the info field of the caddy as we and the * server function both agree on what will be there */ opal_list_append(&opalcaddy->info, &oinfo->super); @@ -1051,3 +1091,117 @@ static void server_log(const pmix_proc_t *proct, &opalcaddy->apps, opal_opcbfunc, opalcaddy); } + +static pmix_status_t server_allocate(const pmix_proc_t *proct, + pmix_alloc_directive_t directive, + const pmix_info_t data[], size_t ndata, + pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + pmix2x_opalcaddy_t *opalcaddy; + opal_process_name_t requestor; + int rc; + size_t n; + opal_value_t *oinfo; + opal_pmix_alloc_directive_t odir; + + if (NULL == host_module || NULL == host_module->allocate) { + return PMIX_ERR_NOT_SUPPORTED; + } + + /* setup the caddy */ + opalcaddy = OBJ_NEW(pmix2x_opalcaddy_t); + opalcaddy->infocbfunc = cbfunc; + opalcaddy->cbdata = cbdata; + + /* convert the requestor */ + if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&requestor.jobid, proct->nspace))) { + OBJ_RELEASE(opalcaddy); + return pmix2x_convert_opalrc(rc); + } + requestor.vpid = pmix2x_convert_rank(proct->rank); + + /* convert the directive */ + odir = pmix2x_convert_allocdir(directive); + + /* convert the data */ + for (n=0; n < ndata; n++) { + oinfo = OBJ_NEW(opal_value_t); + opal_list_append(&opalcaddy->info, &oinfo->super); + if (OPAL_SUCCESS != (rc = pmix2x_value_unload(oinfo, &data[n].value))) { + OBJ_RELEASE(opalcaddy); + return pmix2x_convert_opalrc(rc); + } + } + + /* pass the call upwards */ + if (OPAL_SUCCESS != (rc = host_module->allocate(&requestor, odir, + &opalcaddy->info, + info_cbfunc, opalcaddy))) { + OBJ_RELEASE(opalcaddy); + return pmix2x_convert_opalrc(rc); + } + + return PMIX_SUCCESS; + +} + +static pmix_status_t server_job_control(const pmix_proc_t *proct, + const pmix_proc_t targets[], size_t ntargets, + const pmix_info_t directives[], size_t ndirs, + pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + pmix2x_opalcaddy_t *opalcaddy; + opal_process_name_t requestor; + int rc; + size_t n; + opal_value_t *oinfo; + opal_namelist_t *nm; + + if (NULL == host_module || NULL == host_module->job_control) { + return PMIX_ERR_NOT_SUPPORTED; + } + + /* setup the caddy */ + opalcaddy = OBJ_NEW(pmix2x_opalcaddy_t); + opalcaddy->infocbfunc = cbfunc; + opalcaddy->cbdata = cbdata; + + /* convert the requestor */ + if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&requestor.jobid, proct->nspace))) { + OBJ_RELEASE(opalcaddy); + return pmix2x_convert_opalrc(rc); + } + requestor.vpid = pmix2x_convert_rank(proct->rank); + + /* convert the targets */ + for (n=0; n < ntargets; n++) { + nm = OBJ_NEW(opal_namelist_t); + opal_list_append(&opalcaddy->procs, &nm->super); + if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&nm->name.jobid, targets[n].nspace))) { + OBJ_RELEASE(opalcaddy); + return pmix2x_convert_opalrc(rc); + } + nm->name.vpid = pmix2x_convert_rank(targets[n].rank); + } + + /* convert the directives */ + for (n=0; n < ndirs; n++) { + oinfo = OBJ_NEW(opal_value_t); + opal_list_append(&opalcaddy->info, &oinfo->super); + if (OPAL_SUCCESS != (rc = pmix2x_value_unload(oinfo, &directives[n].value))) { + OBJ_RELEASE(opalcaddy); + return pmix2x_convert_opalrc(rc); + } + } + + /* pass the call upwards */ + if (OPAL_SUCCESS != (rc = host_module->job_control(&requestor, + &opalcaddy->procs, + &opalcaddy->info, + info_cbfunc, opalcaddy))) { + OBJ_RELEASE(opalcaddy); + return pmix2x_convert_opalrc(rc); + } + + return PMIX_SUCCESS; +} diff --git a/opal/mca/pmix/ext2x/pmix2x_server_south.c b/opal/mca/pmix/ext2x/pmix2x_server_south.c index 187fb81394..f83a17ee6d 100644 --- a/opal/mca/pmix/ext2x/pmix2x_server_south.c +++ b/opal/mca/pmix/ext2x/pmix2x_server_south.c @@ -1,12 +1,14 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2017 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -30,6 +32,7 @@ #include "opal/mca/hwloc/base/base.h" #include "opal/runtime/opal.h" #include "opal/runtime/opal_progress_threads.h" +#include "opal/threads/threads.h" #include "opal/util/argv.h" #include "opal/util/error.h" #include "opal/util/output.h" @@ -56,6 +59,7 @@ static size_t errhdler_ref = 0; while ((a)) { \ usleep(10); \ } \ + OPAL_ACQUIRE_OBJECT(a); \ } while (0) static void errreg_cbfunc (pmix_status_t status, @@ -64,10 +68,12 @@ static void errreg_cbfunc (pmix_status_t status, { volatile bool *active = (volatile bool*)cbdata; + OPAL_ACQUIRE_OBJECT(active); errhdler_ref = errhandler_ref; opal_output_verbose(5, opal_pmix_base_framework.framework_output, "PMIX server errreg_cbfunc - error handler registered status=%d, reference=%lu", status, (unsigned long)errhandler_ref); + OPAL_POST_OBJECT(active); *active = false; } @@ -75,11 +81,14 @@ static void opcbfunc(pmix_status_t status, void *cbdata) { pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata; + OPAL_ACQUIRE_OBJECT(op); + if (NULL != op->opcbfunc) { op->opcbfunc(pmix2x_convert_rc(status), op->cbdata); } if (op->active) { op->status = status; + OPAL_POST_OBJECT(op); op->active = false; } else { OBJ_RELEASE(op); @@ -90,6 +99,7 @@ static void op2cbfunc(pmix_status_t status, void *cbdata) { volatile bool *active = (volatile bool*)cbdata; + OPAL_POST_OBJECT(active); *active = false; } @@ -142,14 +152,20 @@ int pmix2x_server_init(opal_pmix_server_module_t *module, /* register the default event handler */ active = true; - PMIx_Register_event_handler(NULL, 0, NULL, 0, pmix2x_event_hdlr, errreg_cbfunc, (void*)&active); + PMIX_INFO_CREATE(pinfo, 1); + PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-SERVER-DEFAULT", PMIX_STRING); + PMIx_Register_event_handler(NULL, 0, pinfo, 1, pmix2x_event_hdlr, errreg_cbfunc, (void*)&active); PMIX_WAIT_FOR_COMPLETION(active); + PMIX_INFO_FREE(pinfo, 1); /* as we might want to use some client-side functions, be sure * to register our own nspace */ + PMIX_INFO_CREATE(pinfo, 1); + PMIX_INFO_LOAD(&pinfo[0], PMIX_REGISTER_NODATA, NULL, PMIX_BOOL); active = true; - PMIx_server_register_nspace(job->nspace, 1, NULL, 0, op2cbfunc, (void*)&active); + PMIx_server_register_nspace(job->nspace, 1, pinfo, 1, op2cbfunc, (void*)&active); PMIX_WAIT_FOR_COMPLETION(active); + PMIX_INFO_FREE(pinfo, 1); return OPAL_SUCCESS; } @@ -157,6 +173,7 @@ int pmix2x_server_init(opal_pmix_server_module_t *module, static void fincb(pmix_status_t status, void *cbdata) { volatile bool *active = (volatile bool*)cbdata; + OPAL_POST_OBJECT(active); *active = false; } @@ -203,6 +220,8 @@ static void _reg_nspace(int sd, short args, void *cbdata) opal_pmix2x_jobid_trkr_t *job; pmix2x_opcaddy_t op; + OPAL_ACQUIRE_OBJECT(cd); + /* we must threadshift this request as we might not be in an event * and we are going to access framework-global lists/objects */ @@ -291,9 +310,10 @@ int pmix2x_server_register_nspace(opal_jobid_t jobid, if (NULL == cbfunc) { _reg_nspace(0, 0, cd); } else { - event_assign(&cd->ev, opal_pmix_base.evbase, - -1, EV_WRITE, _reg_nspace, cd); - event_active(&cd->ev, EV_WRITE, 1); + opal_event_assign(&cd->ev, opal_pmix_base.evbase, + -1, EV_WRITE, _reg_nspace, cd); + OPAL_POST_OBJECT(cd); + opal_event_active(&cd->ev, EV_WRITE, 1); } return OPAL_SUCCESS; @@ -303,10 +323,12 @@ static void tdcbfunc(pmix_status_t status, void *cbdata) { pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata; + OPAL_ACQUIRE_OBJECT(cd); if (NULL != cd->opcbfunc) { cd->opcbfunc(pmix2x_convert_rc(status), cd->cbdata); } if (cd->active) { + OPAL_POST_OBJECT(cd); cd->active = false; } else { OBJ_RELEASE(cd); @@ -318,6 +340,7 @@ static void _dereg_nspace(int sd, short args, void *cbdata) pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata; opal_pmix2x_jobid_trkr_t *jptr; + OPAL_ACQUIRE_OBJECT(cd); /* if we don't already have it, we can ignore this */ OPAL_LIST_FOREACH(jptr, &mca_pmix_ext2x_component.jobids, opal_pmix2x_jobid_trkr_t) { if (jptr->jobid == cd->jobid) { @@ -351,9 +374,10 @@ void pmix2x_server_deregister_nspace(opal_jobid_t jobid, if (NULL == cbfunc) { _dereg_nspace(0, 0, cd); } else { - event_assign(&cd->ev, opal_pmix_base.evbase, + opal_event_assign(&cd->ev, opal_pmix_base.evbase, -1, EV_WRITE, _dereg_nspace, cd); - event_active(&cd->ev, EV_WRITE, 1); + OPAL_POST_OBJECT(cd); + opal_event_active(&cd->ev, EV_WRITE, 1); } } @@ -389,6 +413,7 @@ static void _dereg_client(int sd, short args, void *cbdata) opal_pmix2x_jobid_trkr_t *jptr; pmix_proc_t p; + OPAL_ACQUIRE_OBJECT(cd); /* if we don't already have it, we can ignore this */ OPAL_LIST_FOREACH(jptr, &mca_pmix_ext2x_component.jobids, opal_pmix2x_jobid_trkr_t) { if (jptr->jobid == cd->source->jobid) { @@ -421,9 +446,10 @@ void pmix2x_server_deregister_client(const opal_process_name_t *proc, if (NULL == cbfunc) { _dereg_client(0, 0, cd); } else { - event_assign(&cd->ev, opal_pmix_base.evbase, + opal_event_assign(&cd->ev, opal_pmix_base.evbase, -1, EV_WRITE, _dereg_client, cd); - event_active(&cd->ev, EV_WRITE, 1); + OPAL_POST_OBJECT(cd); + opal_event_active(&cd->ev, EV_WRITE, 1); } } diff --git a/orte/mca/state/base/state_base_fns.c b/orte/mca/state/base/state_base_fns.c index 4c15a873ae..1fc9ece4fb 100644 --- a/orte/mca/state/base/state_base_fns.c +++ b/orte/mca/state/base/state_base_fns.c @@ -23,6 +23,7 @@ #include "opal/class/opal_list.h" #include "opal/mca/event/event.h" #include "opal/mca/pmix/pmix.h" +#include "opal/util/argv.h" #include "orte/orted/pmix/pmix_server_internal.h" #include "orte/runtime/orte_data_server.h"