1
1

Instead of an unknown proc list that requires ownership transfer of data (which, in turn, requires a complex series of locks to be held during the transfer), use a modex backing store with backpointers from the proc to the backing store. The proc structures no longer own the modex data, which greatly simplifies locking when an unknown proc suddenly becomes known.

Refs trac:564

This commit was SVN r12822.

The following Trac tickets were found above:
  Ticket 564 --> https://svn.open-mpi.org/trac/ompi/ticket/564
Этот коммит содержится в:
Brian Barrett 2006-12-11 21:27:30 +00:00
родитель 8314e8dbb9
Коммит cf196ce420

Просмотреть файл

@ -24,6 +24,7 @@
#include "opal/threads/condition.h"
#include "opal/util/output.h"
#include "orte/util/proc_info.h"
#include "orte/class/orte_proc_table.h"
#include "orte/dss/dss.h"
#include "opal/mca/mca.h"
@ -38,6 +39,29 @@
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
/* MODEX DESIGN
*
* Modex data is always associated with a given ompi_proc_t. However,
* because modex data is received from the GPR for entire jobids, it
* is possible that the modex callback will receive data for a process
* not yet in the ompi_proc_all() list of processes. This information
* must be kept for later use, because if accept/connect causes the
* proc to be added to the ompi_proc_all() list, the subscription to
* the mdoex information can not be reliably fired without causing a
* potential connection storm. Therefore, we use an orte_proc_table
* backing store to contain all modex information. Backpointers are
* provided from the ompi_proc_t structure to improve lookup
* performance in the common case.
*
* While we could add the now discovered proc into the ompi_proc_all()
* list, this has some problems, in that we don't have the
* architecture and hostname information needed to properly fill in
* the ompi_proc_t structure and we don't want to cause GPR
* communication to get it when we dont' really need to know anything
* about the remote proc.
*
*/
/**
* callback data for modex
*/
@ -101,7 +125,8 @@ OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t,
* received from peers.
*/
struct mca_pml_base_modex_t {
opal_object_t super;
opal_list_item_t super;
opal_mutex_t modex_lock;
opal_list_t modex_modules;
};
typedef struct mca_pml_base_modex_t mca_pml_base_modex_t;
@ -109,6 +134,7 @@ typedef struct mca_pml_base_modex_t mca_pml_base_modex_t;
static void
mca_pml_base_modex_construct(mca_pml_base_modex_t * modex)
{
OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t);
OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t);
}
@ -116,6 +142,7 @@ static void
mca_pml_base_modex_destruct(mca_pml_base_modex_t * modex)
{
OBJ_DESTRUCT(&modex->modex_modules);
OBJ_DESTRUCT(&modex->modex_lock);
}
OBJ_CLASS_INSTANCE(mca_pml_base_modex_t,
@ -141,44 +168,11 @@ OBJ_CLASS_INSTANCE(mca_pml_base_modex_subscription_t,
NULL);
/**
* structure for unknown process structures (ones that aren't in our ompi_proc lists yet)
*/
struct mca_pml_base_modex_unknown_proc_t {
opal_list_item_t item;
orte_process_name_t proc_name;
opal_mutex_t mutex;
opal_object_t* modex_info;
};
typedef struct mca_pml_base_modex_unknown_proc_t mca_pml_base_modex_unknown_proc_t;
static void
mca_pml_base_modex_unknown_proc_construct(mca_pml_base_modex_unknown_proc_t *proc)
{
proc->modex_info = NULL;
OBJ_CONSTRUCT(&(proc->mutex), opal_mutex_t);
}
static void
mca_pml_base_modex_unknown_proc_destruct(mca_pml_base_modex_unknown_proc_t *proc)
{
OBJ_DESTRUCT(&(proc->mutex));
if (NULL != proc->modex_info) {
OBJ_RELEASE(proc->modex_info);
}
}
OBJ_CLASS_INSTANCE(mca_pml_base_modex_unknown_proc_t,
opal_list_item_t,
mca_pml_base_modex_unknown_proc_construct,
mca_pml_base_modex_unknown_proc_destruct);
/**
* Globals to track the list of subscriptions.
*/
static opal_list_t mca_pml_base_modex_subscriptions;
static opal_list_t mca_pml_base_modex_unknown_procs;
static opal_hash_table_t mca_pml_base_modex_data;
static opal_mutex_t mca_pml_base_modex_lock;
@ -188,10 +182,12 @@ static opal_mutex_t mca_pml_base_modex_lock;
int
mca_pml_base_modex_init(void)
{
OBJ_CONSTRUCT(&mca_pml_base_modex_unknown_procs, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_base_modex_data, opal_hash_table_t);
OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t);
opal_hash_table_init(&mca_pml_base_modex_data, 256);
return OMPI_SUCCESS;
}
@ -204,9 +200,8 @@ mca_pml_base_modex_finalize(void)
{
opal_list_item_t *item;
while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_unknown_procs)))
OBJ_RELEASE(item);
OBJ_DESTRUCT(&mca_pml_base_modex_unknown_procs);
opal_hash_table_remove_all(&mca_pml_base_modex_data);
OBJ_DESTRUCT(&mca_pml_base_modex_data);
while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions)))
OBJ_RELEASE(item);
@ -216,30 +211,6 @@ mca_pml_base_modex_finalize(void)
}
static mca_pml_base_modex_unknown_proc_t *
modex_unknown_proc_get(orte_process_name_t *proc_name)
{
opal_list_item_t *item;
mca_pml_base_modex_unknown_proc_t *unknown_proc = NULL;
orte_ns_cmp_bitmask_t mask;
/* return the proc-struct which matches this jobid+process id */
mask = ORTE_NS_CMP_CELLID | ORTE_NS_CMP_JOBID | ORTE_NS_CMP_VPID;
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
for(item = opal_list_get_first(&mca_pml_base_modex_unknown_procs);
item != opal_list_get_end(&mca_pml_base_modex_unknown_procs);
item = opal_list_get_next(item)) {
unknown_proc = (mca_pml_base_modex_unknown_proc_t*) item;
if (ORTE_EQUAL == orte_ns.compare_fields(mask, &unknown_proc->proc_name, proc_name)) {
break;
}
}
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return unknown_proc;
}
/**
* Look to see if there is any data associated with a specified module.
*/
@ -290,10 +261,8 @@ mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data,
orte_gpr_keyval_t **keyval;
orte_process_name_t *proc_name;
mca_pml_base_modex_t *modex;
opal_mutex_t *proc_mutex;
mca_pml_base_modex_module_t *modex_module;
mca_base_component_t component;
bool is_unknown_proc = false;
int rc;
ompi_proc_t *proc;
@ -320,41 +289,23 @@ mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data,
return; /* if the name wasn't here, there is nothing we can do */
GOTNAME:
/*
* Lookup the modex data structure.
*/
proc = ompi_proc_find(proc_name);
if (NULL == proc) {
mca_pml_base_modex_unknown_proc_t *unknown_proc = modex_unknown_proc_get(proc_name);
if (NULL == unknown_proc) {
unknown_proc = (mca_pml_base_modex_unknown_proc_t*) OBJ_NEW(mca_pml_base_modex_unknown_proc_t);
if (NULL == unknown_proc) {
opal_output(0, "mca_pml_base_modex_registery_callback: unable to allocate unknown_proc structure");
return;
}
/* look up the modex data structure */
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
modex = orte_hash_table_get_proc(&mca_pml_base_modex_data, proc_name);
if (modex == NULL) {
/* create a modex data structure for this proc */
modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return;
}
is_unknown_proc = true;
proc_mutex = &(unknown_proc->mutex);
OPAL_THREAD_LOCK(proc_mutex);
if (NULL == (modex = (mca_pml_base_modex_t*) unknown_proc->modex_info)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
unknown_proc->modex_info = (opal_object_t*) modex;
}
} else {
proc_mutex = &(proc->proc_lock);
OPAL_THREAD_LOCK(proc_mutex);
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
proc->proc_modex = (opal_object_t*) modex;
}
}
if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(proc_mutex);
return;
orte_hash_table_set_proc(&mca_pml_base_modex_data, proc_name, modex);
}
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
OPAL_THREAD_LOCK(&modex->modex_lock);
proc = NULL;
/*
* Extract the component name and version from the keyval object's key
@ -437,7 +388,8 @@ GOTNAME:
if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) {
opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n");
OBJ_RELEASE(data);
OPAL_THREAD_UNLOCK(proc_mutex);
OPAL_THREAD_UNLOCK(modex->modex_mutex);
OBJ_RELEASE(modex);
return;
}
modex_module->module_data = bytes;
@ -445,17 +397,25 @@ GOTNAME:
modex_module->module_data_avail = true;
opal_condition_signal(&modex_module->module_data_cond);
if (!is_unknown_proc) {
/* call any registered callbacks */
for (item = opal_list_get_first(&modex_module->module_cbs);
item != opal_list_get_end(&modex_module->module_cbs);
item = opal_list_get_next(item)) {
mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item;
cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata);
if (opal_list_get_size(&modex_module->module_cbs)) {
if (NULL == proc) {
proc = ompi_proc_find(proc_name);
}
if (NULL != proc) {
OPAL_THREAD_LOCK(&proc->proc_lock);
/* call any registered callbacks */
for (item = opal_list_get_first(&modex_module->module_cbs);
item != opal_list_get_end(&modex_module->module_cbs);
item = opal_list_get_next(item)) {
mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item;
cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata);
}
OPAL_THREAD_UNLOCK(&proc->proc_lock);
}
}
}
OPAL_THREAD_UNLOCK(proc_mutex);
OPAL_THREAD_UNLOCK(modex->modex_lock);
} /* if value[i]->cnt > 0 */
} /* if value[i] != NULL */
}
@ -670,46 +630,41 @@ mca_pml_base_modex_recv(mca_base_component_t * component,
mca_pml_base_modex_module_t *modex_module;
/* check the proc for cached data */
OPAL_THREAD_LOCK(&proc->proc_lock);
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
if (modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
proc->proc_modex = &modex->super;
/* verify that we have subscribed to this segment */
OPAL_THREAD_UNLOCK(&proc->proc_lock);
mca_pml_base_modex_subscribe(&proc->proc_name);
OPAL_THREAD_LOCK(&proc->proc_lock);
} else {
mca_pml_base_modex_unknown_proc_t *unknown_proc =
modex_unknown_proc_get(&(proc->proc_name));
if (NULL != unknown_proc) {
/* copy over the old stuff and clean us out... The real
proc was locked while this was happening, so there's no
way two threads can both call unknown_proc_get() then
remove_item() at the same time and try to remove the
list twice... */
proc->proc_modex = unknown_proc->modex_info;
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
opal_list_remove_item(&mca_pml_base_modex_unknown_procs,
(opal_list_item_t*) unknown_proc);
/* see if we already have data for this proc... */
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
modex = orte_hash_table_get_proc(&mca_pml_base_modex_data, &proc->proc_name);
if (NULL == modex) {
/* create an empty modex data... */
modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_recv: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
orte_hash_table_set_proc(&mca_pml_base_modex_data, &proc->proc_name, modex);
OBJ_RETAIN(modex);
proc->proc_modex = &modex->super.super;
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
/* verify that we have subscribed to this segment */
mca_pml_base_modex_subscribe(&proc->proc_name);
} else {
/* create a backpointer from the proc to the modex data */
OBJ_RETAIN(modex);
proc->proc_modex = &modex->super.super;
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
}
modex = (mca_pml_base_modex_t*) proc->proc_modex;
}
OPAL_THREAD_LOCK(&modex->modex_lock);
/* lookup/create the module */
if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
OPAL_THREAD_UNLOCK(&modex->modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* wait until data is available */
while (modex_module->module_data_avail == false) {
opal_condition_wait(&modex_module->module_data_cond, &proc->proc_lock);
opal_condition_wait(&modex_module->module_data_cond, &modex->modex_lock);
}
/* copy the data out to the user */
@ -725,7 +680,7 @@ mca_pml_base_modex_recv(mca_base_component_t * component,
*buffer = copy;
*size = modex_module->module_data_size;
}
OPAL_THREAD_UNLOCK(&proc->proc_lock);
OPAL_THREAD_UNLOCK(&modex->modex_lock);
return OMPI_SUCCESS;
}
@ -744,42 +699,37 @@ mca_pml_base_modex_recv_nb(mca_base_component_t * component,
mca_pml_base_modex_module_t *module;
mca_pml_base_modex_cb_t *cb;
/* check the proc for existing modex */
OPAL_THREAD_LOCK(&proc->proc_lock);
/* check the proc for cached data */
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
if (modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
proc->proc_modex = &modex->super;
/* verify that we have subscribed to this segment */
OPAL_THREAD_UNLOCK(&proc->proc_lock);
mca_pml_base_modex_subscribe(&proc->proc_name);
OPAL_THREAD_LOCK(&proc->proc_lock);
} else {
mca_pml_base_modex_unknown_proc_t *unknown_proc =
modex_unknown_proc_get(&(proc->proc_name));
if (NULL != unknown_proc) {
/* copy over the old stuff and clean us out... The real
proc was locked while this was happening, so there's no
way two threads can both call unknown_proc_get() then
remove_item() at the same time and try to remove the
list twice... */
proc->proc_modex = unknown_proc->modex_info;
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
opal_list_remove_item(&mca_pml_base_modex_unknown_procs,
(opal_list_item_t*) unknown_proc);
/* see if we already have data for this proc... */
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
modex = orte_hash_table_get_proc(&mca_pml_base_modex_data, &proc->proc_name);
if (NULL == modex) {
/* create an empty modex data... */
modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_recv: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
orte_hash_table_set_proc(&mca_pml_base_modex_data, &proc->proc_name, modex);
OBJ_RETAIN(modex);
proc->proc_modex = &modex->super.super;
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
/* verify that we have subscribed to this segment */
mca_pml_base_modex_subscribe(&proc->proc_name);
} else {
/* create a backpointer from the proc to the modex data */
OBJ_RETAIN(modex);
proc->proc_modex = &modex->super.super;
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
}
modex = (mca_pml_base_modex_t*) proc->proc_modex;
}
OPAL_THREAD_LOCK(&modex->modex_lock);
/* lookup/create the module */
if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
OPAL_THREAD_UNLOCK(&modex->modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* register the callback */
@ -788,7 +738,7 @@ mca_pml_base_modex_recv_nb(mca_base_component_t * component,
cb->cbfunc = cbfunc;
cb->cbdata = cbdata;
opal_list_append(&module->module_cbs, (opal_list_item_t *) cb);
OPAL_THREAD_UNLOCK(&proc->proc_lock);
OPAL_THREAD_UNLOCK(&modex->modex_lock);
return OMPI_SUCCESS;
}