1
1
openmpi/ompi/mca/pml/base/pml_base_module_exchange.c
Brian Barrett 98884e45e4 Clean up the way procs are added to the global process list after MPI_INIT:
* Do not add new procs to the global list during modex callback or
    when sharing orte names during accept/connect.  For modex, we
    cache the modex info for later, in case that proc ever does get
    added to the global proc list.  For accept/connect orte name
    exchange between the roots, we only need the orte name, so no
    need to add a proc structure anyway.  The procs will be added
    to the global process list during the proc exchange later in 
    the wireup process
  * Rename proc_get_namebuf and proc_get_proclist to proc_pack
    and proc_unpack and extend them to include all information
    needed to build that proc struct on a remote node (which
    includes ORTE name, architecture, and hostname).  Change
    unpack to call pml_add_procs for the entire list of new
    procs at once, rather than one at a time.
  * Remove ompi_proc_find_and_add from the public proc
    interface and make it a private function.  This function
    would add a half-created proc to the global proc list, so
    making it harder to call is a good thing.

This means that there's only two ways to add new procs into the global proc list at this time: During MPI_INIT via the call to ompi_proc_init, where my job is added to the list and via ompi_proc_unpack using a buffer from a packed proc list sent to us by someone else.  Currently, this is enough to implement MPI semantics.  We can extend the interface more if we like, but that may require HNP communication to get the remote proc information and I wanted to avoid that if at all possible.

Refs trac:564

This commit was SVN r12798.

The following Trac tickets were found above:
  Ticket 564 --> https://svn.open-mpi.org/trac/ompi/ticket/564
2006-12-07 19:56:54 +00:00

798 строки
24 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/proc/proc.h"
#include "opal/threads/condition.h"
#include "opal/util/output.h"
#include "orte/util/proc_info.h"
#include "orte/dss/dss.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/schema/schema.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/gpr/base/base.h"
#include "orte/mca/ns/ns.h"
#include "ompi/constants.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
/**
* callback data for modex
*/
struct mca_pml_base_modex_cb_t {
opal_list_item_t super;
mca_base_component_t *component;
mca_pml_base_modex_cb_fn_t cbfunc;
void *cbdata;
};
typedef struct mca_pml_base_modex_cb_t mca_pml_base_modex_cb_t;
OBJ_CLASS_INSTANCE(mca_pml_base_modex_cb_t,
opal_list_item_t,
NULL,
NULL);
/**
* mca_pml_base_modex_module_t
*
* Data for a specic proc and module.
*/
struct mca_pml_base_modex_module_t {
opal_list_item_t super;
mca_base_component_t component;
void *module_data;
size_t module_data_size;
bool module_data_avail;
opal_list_t module_cbs;
opal_condition_t module_data_cond;
};
typedef struct mca_pml_base_modex_module_t mca_pml_base_modex_module_t;
static void
mca_pml_base_modex_module_construct(mca_pml_base_modex_module_t * module)
{
OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t);
OBJ_CONSTRUCT(&module->module_cbs, opal_list_t);
memset(&module->component, 0, sizeof(module->component));
module->module_data = NULL;
module->module_data_size = 0;
module->module_data_avail = false;
}
static void
mca_pml_base_modex_module_destruct(mca_pml_base_modex_module_t * module)
{
OBJ_DESTRUCT(&module->module_data_cond);
}
OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t,
opal_list_item_t,
mca_pml_base_modex_module_construct,
mca_pml_base_modex_module_destruct);
/**
* mca_pml_base_modex_t
*
* List of modules (mca_pml_base_modex_module_t) for which data has been
* received from peers.
*/
struct mca_pml_base_modex_t {
opal_object_t super;
opal_list_t modex_modules;
};
typedef struct mca_pml_base_modex_t mca_pml_base_modex_t;
static void
mca_pml_base_modex_construct(mca_pml_base_modex_t * modex)
{
OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t);
}
static void
mca_pml_base_modex_destruct(mca_pml_base_modex_t * modex)
{
OBJ_DESTRUCT(&modex->modex_modules);
}
OBJ_CLASS_INSTANCE(mca_pml_base_modex_t,
opal_object_t,
mca_pml_base_modex_construct,
mca_pml_base_modex_destruct);
/**
* mca_pml_base_modex_subscription_t
*
* Track segments we have subscribed to.
*/
struct mca_pml_base_modex_subscription_t {
opal_list_item_t item;
orte_jobid_t jobid;
};
typedef struct mca_pml_base_modex_subscription_t mca_pml_base_modex_subscription_t;
OBJ_CLASS_INSTANCE(mca_pml_base_modex_subscription_t,
opal_list_item_t,
NULL,
NULL);
/**
* structure for unknown process structures (ones that aren't in our ompi_proc lists yet)
*/
struct mca_pml_base_modex_unknown_proc_t {
opal_list_item_t item;
orte_process_name_t proc_name;
opal_mutex_t mutex;
opal_object_t* modex_info;
};
typedef struct mca_pml_base_modex_unknown_proc_t mca_pml_base_modex_unknown_proc_t;
static void
mca_pml_base_modex_unknown_proc_construct(mca_pml_base_modex_unknown_proc_t *proc)
{
proc->modex_info = NULL;
OBJ_CONSTRUCT(&(proc->mutex), opal_mutex_t);
}
static void
mca_pml_base_modex_unknown_proc_destruct(mca_pml_base_modex_unknown_proc_t *proc)
{
OBJ_DESTRUCT(&(proc->mutex));
if (NULL != proc->modex_info) {
OBJ_RELEASE(proc->modex_info);
}
}
OBJ_CLASS_INSTANCE(mca_pml_base_modex_unknown_proc_t,
opal_list_item_t,
mca_pml_base_modex_unknown_proc_construct,
mca_pml_base_modex_unknown_proc_destruct);
/**
* Globals to track the list of subscriptions.
*/
static opal_list_t mca_pml_base_modex_subscriptions;
static opal_list_t mca_pml_base_modex_unknown_procs;
static opal_mutex_t mca_pml_base_modex_lock;
/**
* Initialize global state.
*/
int
mca_pml_base_modex_init(void)
{
OBJ_CONSTRUCT(&mca_pml_base_modex_unknown_procs, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t);
return OMPI_SUCCESS;
}
/**
* Cleanup global state.
*/
int
mca_pml_base_modex_finalize(void)
{
opal_list_item_t *item;
while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_unknown_procs)))
OBJ_RELEASE(item);
OBJ_DESTRUCT(&mca_pml_base_modex_unknown_procs);
while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions)))
OBJ_RELEASE(item);
OBJ_DESTRUCT(&mca_pml_base_modex_subscriptions);
return OMPI_SUCCESS;
}
static mca_pml_base_modex_unknown_proc_t *
modex_unknown_proc_get(orte_process_name_t *proc_name)
{
opal_list_item_t *item;
mca_pml_base_modex_unknown_proc_t *unknown_proc = NULL;
orte_ns_cmp_bitmask_t mask;
/* return the proc-struct which matches this jobid+process id */
mask = ORTE_NS_CMP_CELLID | ORTE_NS_CMP_JOBID | ORTE_NS_CMP_VPID;
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
for(item = opal_list_get_first(&mca_pml_base_modex_unknown_procs);
item != opal_list_get_end(&mca_pml_base_modex_unknown_procs);
item = opal_list_get_next(item)) {
unknown_proc = (mca_pml_base_modex_unknown_proc_t*) item;
if (ORTE_EQUAL == orte_ns.compare_fields(mask, &unknown_proc->proc_name, proc_name)) {
break;
}
}
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return unknown_proc;
}
/**
* Look to see if there is any data associated with a specified module.
*/
static mca_pml_base_modex_module_t *
mca_pml_base_modex_lookup_module(mca_pml_base_modex_t * modex,
mca_base_component_t * component)
{
mca_pml_base_modex_module_t *modex_module;
for (modex_module = (mca_pml_base_modex_module_t *) opal_list_get_first(&modex->modex_modules);
modex_module != (mca_pml_base_modex_module_t *) opal_list_get_end(&modex->modex_modules);
modex_module = (mca_pml_base_modex_module_t *) opal_list_get_next(modex_module)) {
if (mca_base_component_compatible(&modex_module->component, component) == 0) {
return modex_module;
}
}
return NULL;
}
/**
* Create a placeholder for data associated with the specified module.
*/
static mca_pml_base_modex_module_t *
mca_pml_base_modex_create_module(mca_pml_base_modex_t * modex,
mca_base_component_t * component)
{
mca_pml_base_modex_module_t *modex_module;
if (NULL == (modex_module = mca_pml_base_modex_lookup_module(modex, component))) {
modex_module = OBJ_NEW(mca_pml_base_modex_module_t);
if (NULL != modex_module) {
modex_module->component = *component;
opal_list_append(&modex->modex_modules, (opal_list_item_t *) modex_module);
}
}
return modex_module;
}
/**
* Callback for registry notifications.
*/
static void
mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data,
void *cbdata)
{
orte_std_cntr_t i, j, k;
orte_gpr_value_t **values, *value;
orte_gpr_keyval_t **keyval;
char **token;
orte_process_name_t *proc_name;
mca_pml_base_modex_t *modex;
opal_mutex_t *proc_mutex;
mca_pml_base_modex_module_t *modex_module;
mca_base_component_t component;
bool is_unknown_proc = false;
int rc;
/* process the callback */
values = (orte_gpr_value_t **) (data->values)->addr;
for (i = 0, k = 0; k < data->cnt &&
i < (data->values)->size; i++) {
if (NULL != values[i]) {
k++;
value = values[i];
if (0 < value->cnt) { /* needs to be at least one keyval */
/*
* Token for the value should be the process name - look it up
*/
token = value->tokens;
if (ORTE_SUCCESS == orte_ns.convert_string_to_process_name(&proc_name, token[0])) {
/*
* Lookup the modex data structure.
*/
ompi_proc_t *proc = ompi_proc_find(proc_name);
if (NULL == proc) {
mca_pml_base_modex_unknown_proc_t *unknown_proc =
modex_unknown_proc_get(proc_name);
if (NULL == unknown_proc) {
unknown_proc = (mca_pml_base_modex_unknown_proc_t*)
OBJ_NEW(mca_pml_base_modex_unknown_proc_t);
if (NULL == unknown_proc) {
opal_output(0, "mca_pml_base_modex_registery_callback: unable to allocate unknown_proc structure");
return;
}
}
is_unknown_proc = true;
proc_mutex = &(unknown_proc->mutex);
OPAL_THREAD_LOCK(proc_mutex);
if (NULL == (modex = (mca_pml_base_modex_t*) unknown_proc->modex_info)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
unknown_proc->modex_info = (opal_object_t*) modex;
}
} else {
proc_mutex = &(proc->proc_lock);
OPAL_THREAD_LOCK(proc_mutex);
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
proc->proc_modex = (opal_object_t*) modex;
}
}
if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(proc_mutex);
return;
}
/*
* Extract the component name and version from the keyval object's key
* Could be multiple keyvals returned since there is one for each
* component type/name/version - process them all
*/
keyval = value->keyvals;
for (j = 0; j < value->cnt; j++) {
orte_buffer_t buffer;
opal_list_item_t *item;
char *ptr;
void *bytes = NULL;
orte_std_cntr_t cnt;
size_t num_bytes;
orte_byte_object_t *bo;
if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0)
continue;
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
continue;
}
if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
continue;
}
strcpy(component.mca_type_name, ptr);
free(ptr);
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
continue;
}
strcpy(component.mca_component_name, ptr);
free(ptr);
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&component.mca_component_major_version, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&component.mca_component_minor_version, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&num_bytes, &cnt, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
continue;
}
if (num_bytes != 0) {
if (NULL == (bytes = malloc(num_bytes))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
continue;
}
cnt = (orte_std_cntr_t) num_bytes;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc);
continue;
}
num_bytes = cnt;
} else {
bytes = NULL;
}
/*
* Lookup the corresponding modex structure
*/
if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) {
opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n");
OBJ_RELEASE(data);
OPAL_THREAD_UNLOCK(proc_mutex);
return;
}
modex_module->module_data = bytes;
modex_module->module_data_size = num_bytes;
modex_module->module_data_avail = true;
opal_condition_signal(&modex_module->module_data_cond);
if (!is_unknown_proc) {
/*
* call any registered
* callbacks
*/
for (item = opal_list_get_first(&modex_module->module_cbs);
item != opal_list_get_end(&modex_module->module_cbs);
item = opal_list_get_next(item)) {
mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item;
cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata);
}
}
}
OPAL_THREAD_UNLOCK(proc_mutex);
} /* convert string to process name */
} /* if value[i]->cnt > 0 */
}
}
}
/**
* Make sure we have subscribed to this segment.
*/
static int
mca_pml_base_modex_subscribe(orte_process_name_t * name)
{
char *segment, *sub_name, *trig_name;
orte_gpr_subscription_id_t sub_id;
orte_jobid_t jobid;
opal_list_item_t *item;
mca_pml_base_modex_subscription_t *subscription;
int rc;
/* check for an existing subscription */
OPAL_LOCK(&mca_pml_base_modex_lock);
if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) {
for (item = opal_list_get_first(&mca_pml_base_modex_subscriptions);
item != opal_list_get_end(&mca_pml_base_modex_subscriptions);
item = opal_list_get_next(item)) {
subscription = (mca_pml_base_modex_subscription_t *) item;
if (subscription->jobid == name->jobid) {
OPAL_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_SUCCESS;
}
}
}
OPAL_UNLOCK(&mca_pml_base_modex_lock);
/* otherwise - subscribe to get this jobid's contact info */
jobid = name->jobid;
if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,
OMPI_MODEX_SUBSCRIPTION, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* attach to the stage-1 standard trigger */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,
ORTE_STG1_TRIGGER, jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
return rc;
}
/* define the segment */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
return rc;
}
if (jobid != orte_process_info.my_name->jobid) {
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL,
ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_PRE_EXISTING,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
segment,
NULL, /* look at all
* containers on this
* segment */
OMPI_MODEX_KEY,
mca_pml_base_modex_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
free(segment);
return rc;
}
} else {
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name,
ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
segment,
NULL, /* look at all
* containers on this
* segment */
OMPI_MODEX_KEY,
mca_pml_base_modex_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
free(segment);
return rc;
}
}
free(sub_name);
free(trig_name);
free(segment);
/* add this jobid to our list of subscriptions */
OPAL_LOCK(&mca_pml_base_modex_lock);
subscription = OBJ_NEW(mca_pml_base_modex_subscription_t);
subscription->jobid = name->jobid;
opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item);
OPAL_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_SUCCESS;
}
/**
* Store the data associated with the specified module in the
* gpr. Note that the gpr is in a mode where it caches
* individual puts during startup and sends them as an aggregate
* command.
*/
int
mca_pml_base_modex_send(mca_base_component_t * source_component,
const void *data,
size_t size)
{
orte_jobid_t jobid;
int rc;
orte_buffer_t buffer;
orte_std_cntr_t i, num_tokens;
char *ptr, *segment, **tokens;
orte_byte_object_t bo;
orte_data_value_t value = ORTE_DATA_VALUE_EMPTY;
jobid = ORTE_PROC_MY_NAME->jobid;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens,
&num_tokens, orte_process_info.my_name))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
ptr = source_component->mca_type_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
ptr = source_component->mca_component_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 != size) {
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
/* setup the data_value structure to hold the byte object */
if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
segment, tokens, OMPI_MODEX_KEY, &value);
cleanup:
free(segment);
for (i = 0; i < num_tokens; i++) {
free(tokens[i]);
tokens[i] = NULL;
}
if (NULL != tokens)
free(tokens);
return rc;
}
/**
* Retreive the data for the specified module from the source process.
*/
int
mca_pml_base_modex_recv(mca_base_component_t * component,
ompi_proc_t * proc,
void **buffer,
size_t * size)
{
mca_pml_base_modex_t *modex;
mca_pml_base_modex_module_t *modex_module;
/* check the proc for cached data */
OPAL_THREAD_LOCK(&proc->proc_lock);
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
if (modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
proc->proc_modex = &modex->super;
/* verify that we have subscribed to this segment */
OPAL_THREAD_UNLOCK(&proc->proc_lock);
mca_pml_base_modex_subscribe(&proc->proc_name);
OPAL_THREAD_LOCK(&proc->proc_lock);
} else {
mca_pml_base_modex_unknown_proc_t *unknown_proc =
modex_unknown_proc_get(&(proc->proc_name));
if (NULL != unknown_proc) {
/* copy over the old stuff and clean us out... The real
proc was locked while this was happening, so there's no
way two threads can both call unknown_proc_get() then
remove_item() at the same time and try to remove the
list twice... */
proc->proc_modex = unknown_proc->modex_info;
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
opal_list_remove_item(&mca_pml_base_modex_unknown_procs,
(opal_list_item_t*) unknown_proc);
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
}
modex = (mca_pml_base_modex_t*) proc->proc_modex;
}
/* lookup/create the module */
if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* wait until data is available */
while (modex_module->module_data_avail == false) {
opal_condition_wait(&modex_module->module_data_cond, &proc->proc_lock);
}
/* copy the data out to the user */
if (modex_module->module_data_size == 0) {
*buffer = NULL;
*size = 0;
} else {
void *copy = malloc(modex_module->module_data_size);
if (copy == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(copy, modex_module->module_data, modex_module->module_data_size);
*buffer = copy;
*size = modex_module->module_data_size;
}
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_SUCCESS;
}
/**
*
*/
int
mca_pml_base_modex_recv_nb(mca_base_component_t * component,
ompi_proc_t * proc,
mca_pml_base_modex_cb_fn_t cbfunc,
void *cbdata)
{
mca_pml_base_modex_t *modex;
mca_pml_base_modex_module_t *module;
mca_pml_base_modex_cb_t *cb;
/* check the proc for existing modex */
OPAL_THREAD_LOCK(&proc->proc_lock);
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
if (modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
proc->proc_modex = &modex->super;
/* verify that we have subscribed to this segment */
OPAL_THREAD_UNLOCK(&proc->proc_lock);
mca_pml_base_modex_subscribe(&proc->proc_name);
OPAL_THREAD_LOCK(&proc->proc_lock);
} else {
mca_pml_base_modex_unknown_proc_t *unknown_proc =
modex_unknown_proc_get(&(proc->proc_name));
if (NULL != unknown_proc) {
/* copy over the old stuff and clean us out... The real
proc was locked while this was happening, so there's no
way two threads can both call unknown_proc_get() then
remove_item() at the same time and try to remove the
list twice... */
proc->proc_modex = unknown_proc->modex_info;
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
opal_list_remove_item(&mca_pml_base_modex_unknown_procs,
(opal_list_item_t*) unknown_proc);
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
}
modex = (mca_pml_base_modex_t*) proc->proc_modex;
}
/* lookup/create the module */
if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* register the callback */
cb = OBJ_NEW(mca_pml_base_modex_cb_t);
cb->component = component;
cb->cbfunc = cbfunc;
cb->cbdata = cbdata;
opal_list_append(&module->module_cbs, (opal_list_item_t *) cb);
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_SUCCESS;
}
/**
* Subscribe to the segment corresponding
* to this job.
*/
int
mca_pml_base_modex_exchange(void)
{
return mca_pml_base_modex_subscribe(orte_process_info.my_name);
}