diff --git a/ompi/mca/rte/orte/rte_orte.h b/ompi/mca/rte/orte/rte_orte.h index 6bb9ddb99a..08619a076b 100644 --- a/ompi/mca/rte/orte/rte_orte.h +++ b/ompi/mca/rte/orte/rte_orte.h @@ -23,6 +23,8 @@ #include "ompi/info/info.h" struct ompi_proc_t; +#include "opal/threads/threads.h" + #include "orte/types.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/grpcomm/grpcomm.h" @@ -122,8 +124,21 @@ typedef orte_rml_tag_t ompi_rml_tag_t; #define OMPI_RML_PERSISTENT ORTE_RML_PERSISTENT #define OMPI_RML_NON_PERSISTENT ORTE_RML_NON_PERSISTENT -/* define a local variable shared between component and module */ -OMPI_MODULE_DECLSPEC extern bool ompi_rte_orte_direct_modex; +typedef struct { + ompi_rte_component_t super; + bool direct_modex; + opal_mutex_t lock; + opal_list_t modx_reqs; +} ompi_rte_orte_component_t; + +typedef struct { + opal_list_item_t super; + opal_mutex_t lock; + opal_condition_t cond; + bool active; + orte_process_name_t peer; +} ompi_orte_tracker_t; +OBJ_CLASS_DECLARATION(ompi_orte_tracker_t); END_C_DECLS diff --git a/ompi/mca/rte/orte/rte_orte_component.c b/ompi/mca/rte/orte/rte_orte_component.c index b7e8168f35..a82de8ed46 100644 --- a/ompi/mca/rte/orte/rte_orte_component.c +++ b/ompi/mca/rte/orte/rte_orte_component.c @@ -18,6 +18,12 @@ #include "ompi_config.h" #include "ompi/constants.h" +#include "opal/threads/threads.h" +#include "opal/class/opal_list.h" + +#include "orte/mca/rml/rml.h" +#include "orte/mca/grpcomm/base/base.h" + #include "ompi/mca/rte/rte.h" #include "rte_orte.h" @@ -33,6 +39,7 @@ const char *ompi_rte_orte_component_version_string = * Local function */ static int rte_orte_open(void); +static int rte_orte_close(void); static int rte_orte_register(void); /* @@ -40,46 +47,72 @@ static int rte_orte_register(void); * and pointers to our public functions in it */ -const ompi_rte_component_t mca_rte_orte_component = { - - /* First, the mca_component_t struct containing meta information - about the component itself */ - +ompi_rte_orte_component_t mca_rte_orte_component = { { - OMPI_RTE_BASE_VERSION_1_0_0, + /* First, the mca_component_t struct containing meta information + about the component itself */ - /* Component name and version */ - "orte", - OMPI_MAJOR_VERSION, - OMPI_MINOR_VERSION, - OMPI_RELEASE_VERSION, + { + OMPI_RTE_BASE_VERSION_1_0_0, - /* Component open and close functions */ - rte_orte_open, - NULL, - NULL, - rte_orte_register - }, - { - /* The component is checkpoint ready */ - MCA_BASE_METADATA_PARAM_CHECKPOINT + /* Component name and version */ + "orte", + OMPI_MAJOR_VERSION, + OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION, + + /* Component open and close functions */ + rte_orte_open, + rte_orte_close, + NULL, + rte_orte_register + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + }, } }; - static int rte_orte_open(void) -{ - return OPAL_SUCCESS; +{ + OBJ_CONSTRUCT(&mca_rte_orte_component.lock, opal_mutex_t); + OBJ_CONSTRUCT(&mca_rte_orte_component.modx_reqs, opal_list_t); + + return OMPI_SUCCESS; +} + +static int rte_orte_close(void) +{ + opal_mutex_lock(&mca_rte_orte_component.lock); + OPAL_LIST_DESTRUCT(&mca_rte_orte_component.modx_reqs); + OBJ_DESTRUCT(&mca_rte_orte_component.lock); + + return OMPI_SUCCESS; } static int rte_orte_register(void) { - ompi_rte_orte_direct_modex = false; - (void) mca_base_component_var_register (&mca_rte_orte_component.base_version, + mca_rte_orte_component.direct_modex = false; + (void) mca_base_component_var_register (&mca_rte_orte_component.super.base_version, "direct_modex", "Enable direct modex (default: false)", MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9, - MCA_BASE_VAR_SCOPE_READONLY, &ompi_rte_orte_direct_modex); + MCA_BASE_VAR_SCOPE_READONLY, &mca_rte_orte_component.direct_modex); return OMPI_SUCCESS; } +static void con(ompi_orte_tracker_t *p) +{ + p->active = true; + OBJ_CONSTRUCT(&p->lock, opal_mutex_t); + OBJ_CONSTRUCT(&p->cond, opal_condition_t); +} +static void des(ompi_orte_tracker_t *p) +{ + OBJ_DESTRUCT(&p->lock); + OBJ_DESTRUCT(&p->cond); +} +OBJ_CLASS_INSTANCE(ompi_orte_tracker_t, + opal_list_item_t, + con, des); diff --git a/ompi/mca/rte/orte/rte_orte_module.c b/ompi/mca/rte/orte/rte_orte_module.c index e46dd0b90f..ec25f88c14 100644 --- a/ompi/mca/rte/orte/rte_orte_module.c +++ b/ompi/mca/rte/orte/rte_orte_module.c @@ -14,6 +14,7 @@ #include "opal/util/argv.h" #include "opal/util/opal_getcwd.h" #include "opal/mca/db/db.h" +#include "opal/threads/threads.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" @@ -40,6 +41,11 @@ #include "ompi/proc/proc.h" #include "ompi/runtime/params.h" +extern ompi_rte_orte_component_t mca_rte_orte_component; +static void recv_callback(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata); + void ompi_rte_abort(int error_code, char *fmt, ...) { va_list arglist; @@ -145,7 +151,7 @@ int ompi_rte_modex(ompi_rte_collective_t *coll) orte_grpcomm_base.modex_ready = true; if ((orte_process_info.num_procs < ompi_hostname_cutoff) || - !ompi_rte_orte_direct_modex || + !mca_rte_orte_component.direct_modex || orte_standalone_operation) { /* if we are direct launched and/or below a user-specified * cutoff value, then we just fall thru to the ORTE modex @@ -167,7 +173,13 @@ int ompi_rte_modex(ompi_rte_collective_t *coll) OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, "%s using direct modex", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* process any pending requests */ + /* if direct modex was enabled, setup the receive for it */ + orte_rml.recv_buffer_nb(OMPI_NAME_WILDCARD, + ORTE_RML_TAG_DIRECT_MODEX_RESP, + ORTE_RML_PERSISTENT, + recv_callback, NULL); + + /* process any pending requests for our data */ ORTE_ACTIVATE_PROC_STATE(ORTE_PROC_MY_NAME, ORTE_PROC_STATE_MODEX_READY); /* release the barrier */ if (NULL != coll->cbfunc) { @@ -195,7 +207,7 @@ int ompi_rte_db_store(const orte_process_name_t *nm, const char* key, static int direct_modex(orte_process_name_t *peer, opal_scope_t scope) { int rc; - orte_rml_recv_cb_t xfer; + ompi_orte_tracker_t *req; opal_buffer_t *buf; OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, @@ -210,29 +222,38 @@ static int direct_modex(orte_process_name_t *peer, opal_scope_t scope) OBJ_RELEASE(buf); return rc; } - /* setup to receive the answer */ - OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t); - xfer.active = true; - orte_rml.recv_buffer_nb(OMPI_NAME_WILDCARD, - ORTE_RML_TAG_DIRECT_MODEX_RESP, - ORTE_RML_NON_PERSISTENT, - orte_rml_recv_callback, &xfer); + + /* create a tracker for this request */ + req = OBJ_NEW(ompi_orte_tracker_t); + req->peer = *peer; + + /* add this to our list of requests - protect it as the + * callback that returns data comes in the ORTE event base + */ + opal_mutex_lock(&mca_rte_orte_component.lock); + opal_list_append(&mca_rte_orte_component.modx_reqs, &req->super); + opal_mutex_unlock(&mca_rte_orte_component.lock); + /* send the request */ if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(peer, buf, ORTE_RML_TAG_DIRECT_MODEX, orte_rml_send_callback, NULL))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); - OBJ_DESTRUCT(&xfer); + opal_mutex_lock(&mca_rte_orte_component.lock); + opal_list_remove_item(&mca_rte_orte_component.modx_reqs, &req->super); + opal_mutex_unlock(&mca_rte_orte_component.lock); + OBJ_RELEASE(req); return rc; } - OMPI_WAIT_FOR_COMPLETION(xfer.active); - /* got it - this is a std modex package, so unpack it with the - * grpcomm function and cache it locally so we can quickly get - * more pieces if necessary - */ - orte_grpcomm_base_store_modex(&xfer.data, NULL); - OBJ_DESTRUCT(&xfer); + + /* wait for the response */ + opal_mutex_lock(&req->lock); + while (req->active) { + opal_condition_wait(&req->cond, &req->lock); + } + /* now can safely destruct the request */ + OBJ_RELEASE(req); return ORTE_SUCCESS; } @@ -243,6 +264,11 @@ int ompi_rte_db_fetch(const struct ompi_proc_t *proc, { int rc; + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s fetch data from %s for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc->proc_name), key)); + if (OPAL_SUCCESS != (rc = opal_db.fetch((opal_identifier_t*)(&proc->proc_name), key, data, type))) { OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, "%s requesting direct modex from %s for %s", @@ -274,6 +300,11 @@ int ompi_rte_db_fetch_pointer(const struct ompi_proc_t *proc, { int rc; + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s fetch data pointer from %s for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc->proc_name), key)); + if (OPAL_SUCCESS != (rc = opal_db.fetch_pointer((opal_identifier_t*)(&proc->proc_name), key, data, type))) { /* if we couldn't fetch the data via the db, then we will attempt * to retrieve it from the target proc @@ -305,6 +336,11 @@ int ompi_rte_db_fetch_multiple(const struct ompi_proc_t *proc, { int rc; + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s fetch multiple from %s for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc->proc_name), key)); + /* MPI processes are only concerned with shared info */ if (OPAL_SUCCESS != (rc = opal_db.fetch_multiple((opal_identifier_t*)(&proc->proc_name), OPAL_SCOPE_GLOBAL, key, kvs))) { @@ -339,3 +375,45 @@ int ompi_rte_db_remove(const orte_process_name_t *nm, return opal_db.remove((opal_identifier_t*)nm, key); } + +/* this function executes in the RML event base, and so + * we must take care to protect against threading conflicts + * with the MPI layer + */ +static void recv_callback(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + ompi_orte_tracker_t *req, *nxt; + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, + "%s received direct modex data from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + + /* this is a std modex package, so unpack it with the + * grpcomm function and cache it locally so we can quickly get + * more pieces if necessary - don't need to thread protect + * here as only one RML callback can be active at a time + */ + orte_grpcomm_base_store_modex(buffer, NULL); + + /* protect */ + opal_mutex_lock(&mca_rte_orte_component.lock); + + /* find all requests against this sender and release them */ + OPAL_LIST_FOREACH_SAFE(req, nxt, &mca_rte_orte_component.modx_reqs, ompi_orte_tracker_t) { + if (req->peer.jobid == sender->jobid && + req->peer.vpid == sender->vpid) { + /* remove the request from the list */ + opal_list_remove_item(&mca_rte_orte_component.modx_reqs, &req->super); + /* wake up the waiting thread */ + req->active = false; + opal_condition_signal(&req->cond); + } + } + + /* release */ + opal_mutex_unlock(&mca_rte_orte_component.lock); +} +