From 1326ed704fd38d218bb7dcdbc1e4114df92fe1a6 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 5 Feb 2014 14:39:27 +0000 Subject: [PATCH] Per the RFC discussed here: http://www.open-mpi.org/community/lists/devel/2014/01/13789.php add support for async modex when requested. cmr=v1.7.5:reviewer=jsquyres:subject=Add async modex support This commit was SVN r30565. --- ompi/mca/btl/sm/btl_sm.c | 6 +++ ompi/mca/rte/orte/rte_orte_component.c | 1 + ompi/mca/rte/orte/rte_orte_module.c | 40 +++++++++++--- orte/mca/grpcomm/base/base.h | 12 ++++- orte/mca/grpcomm/base/grpcomm_base_frame.c | 18 ++++++- orte/mca/grpcomm/base/grpcomm_base_modex.c | 2 +- orte/mca/grpcomm/base/grpcomm_base_receive.c | 57 ++++++++++++++++++++ orte/mca/plm/plm_types.h | 2 + 8 files changed, 129 insertions(+), 9 deletions(-) diff --git a/ompi/mca/btl/sm/btl_sm.c b/ompi/mca/btl/sm/btl_sm.c index 26d6ecf13b..24361de3e3 100644 --- a/ompi/mca/btl/sm/btl_sm.c +++ b/ompi/mca/btl/sm/btl_sm.c @@ -139,6 +139,11 @@ setup_mpool_base_resources(mca_btl_sm_component_t *comp_ptr, int fd = -1; ssize_t bread = 0; + /* Wait for the file to be created */ + while (0 != access(comp_ptr->sm_rndv_file_name, R_OK)) { + opal_progress(); + } + if (-1 == (fd = open(comp_ptr->sm_mpool_rndv_file_name, O_RDONLY))) { int err = errno; opal_show_help("help-mpi-btl-sm.txt", "sys call fail", true, @@ -188,6 +193,7 @@ sm_segment_attach(mca_btl_sm_component_t *comp_ptr) opal_show_help("help-mpi-btl-sm.txt", "sys call fail", true, "open(2)", strerror(err), err); rc = OMPI_ERR_IN_ERRNO; + exit(1); goto out; } if ((ssize_t)sizeof(opal_shmem_ds_t) != (bread = diff --git a/ompi/mca/rte/orte/rte_orte_component.c b/ompi/mca/rte/orte/rte_orte_component.c index afaa288717..b7e8168f35 100644 --- a/ompi/mca/rte/orte/rte_orte_component.c +++ b/ompi/mca/rte/orte/rte_orte_component.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved. + * Copyright (c) 2014 Intel, Inc. All rights reserved. * * $COPYRIGHT$ * diff --git a/ompi/mca/rte/orte/rte_orte_module.c b/ompi/mca/rte/orte/rte_orte_module.c index 3ce36ab7f8..a84cdf3172 100644 --- a/ompi/mca/rte/orte/rte_orte_module.c +++ b/ompi/mca/rte/orte/rte_orte_module.c @@ -26,6 +26,7 @@ #include "orte/mca/rmaps/rmaps_types.h" #include "orte/mca/rmaps/base/base.h" #include "orte/mca/rml/base/rml_contact.h" +#include "orte/mca/state/state.h" #include "orte/mca/routed/routed.h" #include "orte/util/name_fns.h" #include "orte/util/session_dir.h" @@ -140,6 +141,9 @@ void ompi_rte_wait_for_debugger(void) int ompi_rte_modex(ompi_rte_collective_t *coll) { + /* mark that this process reached modex */ + orte_grpcomm_base.modex_ready = true; + if ((orte_process_info.num_procs < ompi_hostname_cutoff) || !ompi_rte_orte_direct_modex || orte_standalone_operation) { @@ -158,15 +162,27 @@ int ompi_rte_modex(ompi_rte_collective_t *coll) * info we need, and we will retrieve the MPI-level info * only as requested. This will provide a faster startup * time since we won't do a massive allgather operation, - * but will make first-message connections slower. However, - * we still have to do a barrier op here to ensure that all - * procs have had time to store their modex info prior to - * receiving a request to provide it! + * but will make first-message connections slower. */ + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s using direct modex", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* process any pending requests */ + ORTE_ACTIVATE_PROC_STATE(ORTE_PROC_MY_NAME, ORTE_PROC_STATE_MODEX_READY); + /* release the barrier */ + if (NULL != coll->cbfunc) { OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, - "%s using direct modex - executing barrier", + "%s CALLING MODEX RELEASE", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - return orte_grpcomm.barrier(coll); + coll->cbfunc(NULL, coll->cbdata); + } else { + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s NO MODEX RELEASE CBFUNC", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + } + /* flag the collective as complete */ + coll->active = false; + return OMPI_SUCCESS; } int ompi_rte_db_store(const orte_process_name_t *nm, const char* key, @@ -224,6 +240,10 @@ int ompi_rte_db_fetch(const struct ompi_proc_t *proc, int rc; if (OPAL_SUCCESS != (rc = opal_db.fetch((opal_identifier_t*)(&proc->proc_name), key, data, type))) { + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s requesting direct modex from %s for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc->proc_name), key)); /* if we couldn't fetch the data via the db, then we will attempt * to retrieve it from the target proc */ @@ -254,6 +274,10 @@ int ompi_rte_db_fetch_pointer(const struct ompi_proc_t *proc, /* if we couldn't fetch the data via the db, then we will attempt * to retrieve it from the target proc */ + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s requesting direct modex from %s for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc->proc_name), key)); if (ORTE_SUCCESS != (rc = direct_modex((orte_process_name_t*)&proc->proc_name, OPAL_SCOPE_PEER))) { ORTE_ERROR_LOG(rc); return rc; @@ -283,6 +307,10 @@ int ompi_rte_db_fetch_multiple(const struct ompi_proc_t *proc, /* if we couldn't fetch the data via the db, then we will attempt * to retrieve it from the target proc */ + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s requesting direct modex from %s for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc->proc_name), key)); if (ORTE_SUCCESS != (rc = direct_modex((orte_process_name_t*)&proc->proc_name, OPAL_SCOPE_GLOBAL))) { ORTE_ERROR_LOG(rc); return rc; diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index aadc18d94d..c937b8321f 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -11,7 +11,7 @@ * All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2013 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -62,8 +62,17 @@ typedef struct { #if OPAL_HAVE_HWLOC hwloc_cpuset_t working_cpuset; #endif + bool modex_ready; + opal_list_t modex_requests; } orte_grpcomm_base_t; +typedef struct { + opal_list_item_t super; + orte_process_name_t peer; + opal_scope_t scope; +} orte_grpcomm_modex_req_t; +OBJ_CLASS_DECLARATION(orte_grpcomm_modex_req_t); + typedef struct { opal_object_t super; opal_event_t ev; @@ -114,6 +123,7 @@ ORTE_DECLSPEC int orte_grpcomm_base_pack_xcast(orte_jobid_t job, opal_buffer_t *buffer, opal_buffer_t *message, orte_rml_tag_t tag); +ORTE_DECLSPEC void orte_grpcomm_base_process_modex(int fd, short args, void *cbdata); END_C_DECLS #endif diff --git a/orte/mca/grpcomm/base/grpcomm_base_frame.c b/orte/mca/grpcomm/base/grpcomm_base_frame.c index ead18c35b3..157155d4b2 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_frame.c +++ b/orte/mca/grpcomm/base/grpcomm_base_frame.c @@ -11,6 +11,7 @@ * All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. + * Copyright (c) 2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -26,6 +27,8 @@ #include "opal/util/output.h" #include "opal/mca/base/base.h" +#include "orte/mca/state/state.h" + #include "orte/mca/grpcomm/base/base.h" @@ -50,6 +53,8 @@ static int orte_grpcomm_base_close(void) if( NULL != orte_grpcomm.finalize ) { orte_grpcomm.finalize(); } + OBJ_DESTRUCT(&orte_grpcomm_base.active_colls); + OBJ_DESTRUCT(&orte_grpcomm_base.modex_requests); #if OPAL_HAVE_HWLOC if (NULL != orte_grpcomm_base.working_cpuset) { @@ -69,11 +74,18 @@ static int orte_grpcomm_base_open(mca_base_open_flag_t flags) /* init globals */ OBJ_CONSTRUCT(&orte_grpcomm_base.active_colls, opal_list_t); orte_grpcomm_base.coll_id = 0; - + OBJ_CONSTRUCT(&orte_grpcomm_base.modex_requests, opal_list_t); + orte_grpcomm_base.modex_ready = false; + #if OPAL_HAVE_HWLOC orte_grpcomm_base.working_cpuset = NULL; #endif + /* register the modex processing event */ + if (ORTE_PROC_IS_APP) { + orte_state.add_proc_state(ORTE_PROC_STATE_MODEX_READY, orte_grpcomm_base_process_modex, ORTE_MSG_PRI); + } + return mca_base_framework_components_open(&orte_grpcomm_base_framework, flags); } @@ -146,3 +158,7 @@ OBJ_CLASS_INSTANCE(orte_grpcomm_collective_t, OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t, opal_object_t, NULL, NULL); + +OBJ_CLASS_INSTANCE(orte_grpcomm_modex_req_t, + opal_list_item_t, + NULL, NULL); diff --git a/orte/mca/grpcomm/base/grpcomm_base_modex.c b/orte/mca/grpcomm/base/grpcomm_base_modex.c index 05ab4ed3f6..34076fb3e5 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_modex.c +++ b/orte/mca/grpcomm/base/grpcomm_base_modex.c @@ -14,7 +14,7 @@ * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2013 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow diff --git a/orte/mca/grpcomm/base/grpcomm_base_receive.c b/orte/mca/grpcomm/base/grpcomm_base_receive.c index f6ffe79c2f..31bdf5d924 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_receive.c +++ b/orte/mca/grpcomm/base/grpcomm_base_receive.c @@ -315,6 +315,42 @@ static void app_recv(int status, orte_process_name_t* sender, } } +void orte_grpcomm_base_process_modex(int fd, short args, void *cbdata) +{ + + orte_grpcomm_modex_req_t *req; + opal_buffer_t *buf; + int rc; + + OPAL_LIST_FOREACH(req, &orte_grpcomm_base.modex_requests, orte_grpcomm_modex_req_t) { + /* we always must send a response, even if nothing could be + * returned, to prevent the remote proc from hanging + */ + buf = OBJ_NEW(opal_buffer_t); + + /* pack our process name so the remote end can use the std + * unpacking routine + */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto respond; + } + + /* collect the desired data */ + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(buf, req->scope))) { + ORTE_ERROR_LOG(rc); + } + + respond: + if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&req->peer, buf, + ORTE_RML_TAG_DIRECT_MODEX_RESP, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + } + } +} + static void direct_modex(int status, orte_process_name_t* sender, opal_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata) @@ -322,6 +358,7 @@ static void direct_modex(int status, orte_process_name_t* sender, opal_buffer_t *buf; int rc, cnt; opal_scope_t scope; + orte_grpcomm_modex_req_t *req; OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output, "%s providing direct modex for %s", @@ -340,6 +377,26 @@ static void direct_modex(int status, orte_process_name_t* sender, goto respond; } + /* if we haven't made it to our own modex, then we may + * not yet have all the required info + */ + if (!orte_grpcomm_base.modex_ready) { + /* we are in an event, so it is safe to access + * the global list of requests - record this one. + * Note that we don't support multiple requests + * pending from the same proc as we can't know + * which thread to return the data to, so we + * require that the remote proc only allow + * one thread at a time to call modex_recv + */ + req = OBJ_NEW(orte_grpcomm_modex_req_t); + req->peer = *sender; + req->scope = scope; + opal_list_append(&orte_grpcomm_base.modex_requests, &req->super); + OBJ_RELEASE(buf); + return; + } + /* pack our process name so the remote end can use the std * unpacking routine */ diff --git a/orte/mca/plm/plm_types.h b/orte/mca/plm/plm_types.h index a734a445a7..1aea21f671 100644 --- a/orte/mca/plm/plm_types.h +++ b/orte/mca/plm/plm_types.h @@ -11,6 +11,7 @@ * All rights reserved. * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * reserved. + * Copyright (c) 2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -51,6 +52,7 @@ typedef uint32_t orte_proc_state_t; #define ORTE_PROC_STATE_REGISTERED 5 /* proc registered sync */ #define ORTE_PROC_STATE_IOF_COMPLETE 6 /* io forwarding pipes have closed */ #define ORTE_PROC_STATE_WAITPID_FIRED 7 /* waitpid fired on process */ +#define ORTE_PROC_STATE_MODEX_READY 8 /* all modex info has been stored */ /* * Define a "boundary" so we can easily and quickly determine