1
1

Ensure we only post one receive for direct modex replies, and that we properly handle thread-transfer issues between the ORTE callback and the MPI layer. Account for potential threaded operations at the MPI level.

Refs trac:4258

This commit was SVN r30730.

The following Trac tickets were found above:
  Ticket 4258 --> https://svn.open-mpi.org/trac/ompi/ticket/4258
Этот коммит содержится в:
Ralph Castain 2014-02-14 20:37:17 +00:00
родитель bdff767dce
Коммит 445c9f3384
3 изменённых файлов: 172 добавлений и 46 удалений

Просмотреть файл

@ -23,6 +23,8 @@
#include "ompi/info/info.h"
struct ompi_proc_t;
#include "opal/threads/threads.h"
#include "orte/types.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
@ -122,8 +124,21 @@ typedef orte_rml_tag_t ompi_rml_tag_t;
#define OMPI_RML_PERSISTENT ORTE_RML_PERSISTENT
#define OMPI_RML_NON_PERSISTENT ORTE_RML_NON_PERSISTENT
/* define a local variable shared between component and module */
OMPI_MODULE_DECLSPEC extern bool ompi_rte_orte_direct_modex;
typedef struct {
ompi_rte_component_t super;
bool direct_modex;
opal_mutex_t lock;
opal_list_t modx_reqs;
} ompi_rte_orte_component_t;
typedef struct {
opal_list_item_t super;
opal_mutex_t lock;
opal_condition_t cond;
bool active;
orte_process_name_t peer;
} ompi_orte_tracker_t;
OBJ_CLASS_DECLARATION(ompi_orte_tracker_t);
END_C_DECLS

Просмотреть файл

@ -18,6 +18,12 @@
#include "ompi_config.h"
#include "ompi/constants.h"
#include "opal/threads/threads.h"
#include "opal/class/opal_list.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/grpcomm/base/base.h"
#include "ompi/mca/rte/rte.h"
#include "rte_orte.h"
@ -33,6 +39,7 @@ const char *ompi_rte_orte_component_version_string =
* Local function
*/
static int rte_orte_open(void);
static int rte_orte_close(void);
static int rte_orte_register(void);
/*
@ -40,46 +47,72 @@ static int rte_orte_register(void);
* and pointers to our public functions in it
*/
const ompi_rte_component_t mca_rte_orte_component = {
/* First, the mca_component_t struct containing meta information
about the component itself */
ompi_rte_orte_component_t mca_rte_orte_component = {
{
OMPI_RTE_BASE_VERSION_1_0_0,
/* First, the mca_component_t struct containing meta information
about the component itself */
/* Component name and version */
"orte",
OMPI_MAJOR_VERSION,
OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION,
{
OMPI_RTE_BASE_VERSION_1_0_0,
/* Component open and close functions */
rte_orte_open,
NULL,
NULL,
rte_orte_register
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
/* Component name and version */
"orte",
OMPI_MAJOR_VERSION,
OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION,
/* Component open and close functions */
rte_orte_open,
rte_orte_close,
NULL,
rte_orte_register
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
}
};
static int rte_orte_open(void)
{
return OPAL_SUCCESS;
{
OBJ_CONSTRUCT(&mca_rte_orte_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_rte_orte_component.modx_reqs, opal_list_t);
return OMPI_SUCCESS;
}
static int rte_orte_close(void)
{
opal_mutex_lock(&mca_rte_orte_component.lock);
OPAL_LIST_DESTRUCT(&mca_rte_orte_component.modx_reqs);
OBJ_DESTRUCT(&mca_rte_orte_component.lock);
return OMPI_SUCCESS;
}
static int rte_orte_register(void)
{
ompi_rte_orte_direct_modex = false;
(void) mca_base_component_var_register (&mca_rte_orte_component.base_version,
mca_rte_orte_component.direct_modex = false;
(void) mca_base_component_var_register (&mca_rte_orte_component.super.base_version,
"direct_modex", "Enable direct modex (default: false)",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &ompi_rte_orte_direct_modex);
MCA_BASE_VAR_SCOPE_READONLY, &mca_rte_orte_component.direct_modex);
return OMPI_SUCCESS;
}
static void con(ompi_orte_tracker_t *p)
{
p->active = true;
OBJ_CONSTRUCT(&p->lock, opal_mutex_t);
OBJ_CONSTRUCT(&p->cond, opal_condition_t);
}
static void des(ompi_orte_tracker_t *p)
{
OBJ_DESTRUCT(&p->lock);
OBJ_DESTRUCT(&p->cond);
}
OBJ_CLASS_INSTANCE(ompi_orte_tracker_t,
opal_list_item_t,
con, des);

Просмотреть файл

@ -14,6 +14,7 @@
#include "opal/util/argv.h"
#include "opal/util/opal_getcwd.h"
#include "opal/mca/db/db.h"
#include "opal/threads/threads.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
@ -40,6 +41,11 @@
#include "ompi/proc/proc.h"
#include "ompi/runtime/params.h"
extern ompi_rte_orte_component_t mca_rte_orte_component;
static void recv_callback(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata);
void ompi_rte_abort(int error_code, char *fmt, ...)
{
va_list arglist;
@ -145,7 +151,7 @@ int ompi_rte_modex(ompi_rte_collective_t *coll)
orte_grpcomm_base.modex_ready = true;
if ((orte_process_info.num_procs < ompi_hostname_cutoff) ||
!ompi_rte_orte_direct_modex ||
!mca_rte_orte_component.direct_modex ||
orte_standalone_operation) {
/* if we are direct launched and/or below a user-specified
* cutoff value, then we just fall thru to the ORTE modex
@ -167,7 +173,13 @@ int ompi_rte_modex(ompi_rte_collective_t *coll)
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s using direct modex",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* process any pending requests */
/* if direct modex was enabled, setup the receive for it */
orte_rml.recv_buffer_nb(OMPI_NAME_WILDCARD,
ORTE_RML_TAG_DIRECT_MODEX_RESP,
ORTE_RML_PERSISTENT,
recv_callback, NULL);
/* process any pending requests for our data */
ORTE_ACTIVATE_PROC_STATE(ORTE_PROC_MY_NAME, ORTE_PROC_STATE_MODEX_READY);
/* release the barrier */
if (NULL != coll->cbfunc) {
@ -195,7 +207,7 @@ int ompi_rte_db_store(const orte_process_name_t *nm, const char* key,
static int direct_modex(orte_process_name_t *peer, opal_scope_t scope)
{
int rc;
orte_rml_recv_cb_t xfer;
ompi_orte_tracker_t *req;
opal_buffer_t *buf;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
@ -210,29 +222,38 @@ static int direct_modex(orte_process_name_t *peer, opal_scope_t scope)
OBJ_RELEASE(buf);
return rc;
}
/* setup to receive the answer */
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
xfer.active = true;
orte_rml.recv_buffer_nb(OMPI_NAME_WILDCARD,
ORTE_RML_TAG_DIRECT_MODEX_RESP,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* create a tracker for this request */
req = OBJ_NEW(ompi_orte_tracker_t);
req->peer = *peer;
/* add this to our list of requests - protect it as the
* callback that returns data comes in the ORTE event base
*/
opal_mutex_lock(&mca_rte_orte_component.lock);
opal_list_append(&mca_rte_orte_component.modx_reqs, &req->super);
opal_mutex_unlock(&mca_rte_orte_component.lock);
/* send the request */
if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(peer, buf,
ORTE_RML_TAG_DIRECT_MODEX,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_DESTRUCT(&xfer);
opal_mutex_lock(&mca_rte_orte_component.lock);
opal_list_remove_item(&mca_rte_orte_component.modx_reqs, &req->super);
opal_mutex_unlock(&mca_rte_orte_component.lock);
OBJ_RELEASE(req);
return rc;
}
OMPI_WAIT_FOR_COMPLETION(xfer.active);
/* got it - this is a std modex package, so unpack it with the
* grpcomm function and cache it locally so we can quickly get
* more pieces if necessary
*/
orte_grpcomm_base_store_modex(&xfer.data, NULL);
OBJ_DESTRUCT(&xfer);
/* wait for the response */
opal_mutex_lock(&req->lock);
while (req->active) {
opal_condition_wait(&req->cond, &req->lock);
}
/* now can safely destruct the request */
OBJ_RELEASE(req);
return ORTE_SUCCESS;
}
@ -243,6 +264,11 @@ int ompi_rte_db_fetch(const struct ompi_proc_t *proc,
{
int rc;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s fetch data from %s for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name), key));
if (OPAL_SUCCESS != (rc = opal_db.fetch((opal_identifier_t*)(&proc->proc_name), key, data, type))) {
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s requesting direct modex from %s for %s",
@ -274,6 +300,11 @@ int ompi_rte_db_fetch_pointer(const struct ompi_proc_t *proc,
{
int rc;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s fetch data pointer from %s for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name), key));
if (OPAL_SUCCESS != (rc = opal_db.fetch_pointer((opal_identifier_t*)(&proc->proc_name), key, data, type))) {
/* if we couldn't fetch the data via the db, then we will attempt
* to retrieve it from the target proc
@ -305,6 +336,11 @@ int ompi_rte_db_fetch_multiple(const struct ompi_proc_t *proc,
{
int rc;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s fetch multiple from %s for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name), key));
/* MPI processes are only concerned with shared info */
if (OPAL_SUCCESS != (rc = opal_db.fetch_multiple((opal_identifier_t*)(&proc->proc_name),
OPAL_SCOPE_GLOBAL, key, kvs))) {
@ -339,3 +375,45 @@ int ompi_rte_db_remove(const orte_process_name_t *nm,
return opal_db.remove((opal_identifier_t*)nm, key);
}
/* this function executes in the RML event base, and so
* we must take care to protect against threading conflicts
* with the MPI layer
*/
static void recv_callback(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
ompi_orte_tracker_t *req, *nxt;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s received direct modex data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* this is a std modex package, so unpack it with the
* grpcomm function and cache it locally so we can quickly get
* more pieces if necessary - don't need to thread protect
* here as only one RML callback can be active at a time
*/
orte_grpcomm_base_store_modex(buffer, NULL);
/* protect */
opal_mutex_lock(&mca_rte_orte_component.lock);
/* find all requests against this sender and release them */
OPAL_LIST_FOREACH_SAFE(req, nxt, &mca_rte_orte_component.modx_reqs, ompi_orte_tracker_t) {
if (req->peer.jobid == sender->jobid &&
req->peer.vpid == sender->vpid) {
/* remove the request from the list */
opal_list_remove_item(&mca_rte_orte_component.modx_reqs, &req->super);
/* wake up the waiting thread */
req->active = false;
opal_condition_signal(&req->cond);
}
}
/* release */
opal_mutex_unlock(&mca_rte_orte_component.lock);
}