
Clean up the remainder of the size_t references in the runtime itself. Convert to orte_std_cntr_t wherever it makes sense (only avoid those places where the actual memory size is referenced). Remove the obsolete oob barrier function (we actually obsoleted it a long time ago - just never bothered to clean it up). I have done my best to go through all the components and catch everything, even if I couldn't test compile them since I wasn't on that type of system. Still, I cannot guarantee that problems won't show up when you test this on specific systems. Usually, these will just show as "warning: comparison between signed and unsigned" notes which are easily fixed (just change a size_t to orte_std_cntr_t). In some places, people didn't use size_t, but instead used some other variant (e.g., I found several places with uint32_t). I tried to catch all of them, but... Once we get all the instances caught and fixed, this should once and for all resolve many of the heterogeneity problems. This commit was SVN r11204.
712 строки
23 KiB
C
712 строки
23 KiB
C
/*
|
|
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "ompi_config.h"
|
|
|
|
#include "ompi/proc/proc.h"
|
|
#include "opal/threads/condition.h"
|
|
#include "opal/util/output.h"
|
|
#include "orte/util/proc_info.h"
|
|
|
|
#include "orte/dss/dss.h"
|
|
#include "opal/mca/mca.h"
|
|
#include "opal/mca/base/base.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/rml/rml.h"
|
|
#include "orte/mca/schema/schema.h"
|
|
#include "orte/mca/gpr/gpr.h"
|
|
#include "orte/mca/gpr/base/base.h"
|
|
#include "orte/mca/ns/ns.h"
|
|
#include "ompi/constants.h"
|
|
#include "ompi/mca/pml/pml.h"
|
|
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
|
|
|
|
/**
|
|
*
|
|
*/
|
|
|
|
struct mca_pml_base_modex_cb_t {
|
|
opal_list_item_t super;
|
|
mca_base_component_t* component;
|
|
mca_pml_base_modex_cb_fn_t cbfunc;
|
|
void* cbdata;
|
|
};
|
|
typedef struct mca_pml_base_modex_cb_t mca_pml_base_modex_cb_t;
|
|
|
|
|
|
OBJ_CLASS_INSTANCE(
|
|
mca_pml_base_modex_cb_t,
|
|
opal_list_item_t,
|
|
NULL,
|
|
NULL
|
|
);
|
|
|
|
|
|
/**
|
|
* mca_pml_base_modex_module_t
|
|
*
|
|
* Data for a specic proc and module.
|
|
*/
|
|
|
|
struct mca_pml_base_modex_module_t {
|
|
opal_list_item_t super;
|
|
mca_base_component_t component;
|
|
void *module_data;
|
|
size_t module_data_size;
|
|
bool module_data_avail;
|
|
opal_list_t module_cbs;
|
|
opal_condition_t module_data_cond;
|
|
};
|
|
typedef struct mca_pml_base_modex_module_t mca_pml_base_modex_module_t;
|
|
|
|
|
|
static void mca_pml_base_modex_module_construct(mca_pml_base_modex_module_t *module)
|
|
{
|
|
OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t);
|
|
OBJ_CONSTRUCT(&module->module_cbs, opal_list_t);
|
|
memset(&module->component, 0, sizeof(module->component));
|
|
module->module_data = NULL;
|
|
module->module_data_size = 0;
|
|
module->module_data_avail = false;
|
|
}
|
|
|
|
static void mca_pml_base_modex_module_destruct(mca_pml_base_modex_module_t *module)
|
|
{
|
|
OBJ_DESTRUCT(&module->module_data_cond);
|
|
}
|
|
|
|
OBJ_CLASS_INSTANCE(
|
|
mca_pml_base_modex_module_t,
|
|
opal_list_item_t,
|
|
mca_pml_base_modex_module_construct,
|
|
mca_pml_base_modex_module_destruct
|
|
);
|
|
|
|
/**
|
|
* mca_pml_base_modex_t
|
|
*
|
|
* List of modules (mca_pml_base_modex_module_t) for which data has been
|
|
* received from peers.
|
|
*/
|
|
struct mca_pml_base_modex_t {
|
|
opal_object_t super;
|
|
opal_list_t modex_modules;
|
|
};
|
|
typedef struct mca_pml_base_modex_t mca_pml_base_modex_t;
|
|
|
|
static void mca_pml_base_modex_construct(mca_pml_base_modex_t* modex)
|
|
{
|
|
OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t);
|
|
}
|
|
|
|
static void mca_pml_base_modex_destruct(mca_pml_base_modex_t* modex)
|
|
{
|
|
OBJ_DESTRUCT(&modex->modex_modules);
|
|
}
|
|
|
|
OBJ_CLASS_INSTANCE(
|
|
mca_pml_base_modex_t,
|
|
opal_object_t,
|
|
mca_pml_base_modex_construct,
|
|
mca_pml_base_modex_destruct
|
|
);
|
|
|
|
/**
|
|
* mca_pml_base_modex_subscription_t
|
|
*
|
|
* Track segments we have subscribed to.
|
|
*/
|
|
|
|
struct mca_pml_base_modex_subscription_t {
|
|
opal_list_item_t item;
|
|
orte_jobid_t jobid;
|
|
};
|
|
typedef struct mca_pml_base_modex_subscription_t mca_pml_base_modex_subscription_t;
|
|
|
|
OBJ_CLASS_INSTANCE(
|
|
mca_pml_base_modex_subscription_t,
|
|
opal_list_item_t,
|
|
NULL,
|
|
NULL);
|
|
|
|
/**
|
|
* Globals to track the list of subscriptions.
|
|
*/
|
|
|
|
static opal_list_t mca_pml_base_modex_subscriptions;
|
|
static opal_mutex_t mca_pml_base_modex_lock;
|
|
|
|
|
|
/**
|
|
* Initialize global state.
|
|
*/
|
|
int mca_pml_base_modex_init(void)
|
|
{
|
|
OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t);
|
|
OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* Cleanup global state.
|
|
*/
|
|
int mca_pml_base_modex_finalize(void)
|
|
{
|
|
opal_list_item_t *item;
|
|
while(NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions)))
|
|
OBJ_RELEASE(item);
|
|
OBJ_DESTRUCT(&mca_pml_base_modex_subscriptions);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
/**
|
|
* Look to see if there is any data associated with a specified module.
|
|
*/
|
|
|
|
static mca_pml_base_modex_module_t* mca_pml_base_modex_lookup_module(
|
|
mca_pml_base_modex_t* modex,
|
|
mca_base_component_t* component)
|
|
{
|
|
mca_pml_base_modex_module_t* modex_module;
|
|
for(modex_module = (mca_pml_base_modex_module_t*)opal_list_get_first(&modex->modex_modules);
|
|
modex_module != (mca_pml_base_modex_module_t*)opal_list_get_end(&modex->modex_modules);
|
|
modex_module = (mca_pml_base_modex_module_t*)opal_list_get_next(modex_module)) {
|
|
if(mca_base_component_compatible(&modex_module->component, component) == 0) {
|
|
return modex_module;
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
|
|
/**
|
|
* Create a placeholder for data associated with the specified module.
|
|
*/
|
|
|
|
static mca_pml_base_modex_module_t* mca_pml_base_modex_create_module(
|
|
mca_pml_base_modex_t* modex,
|
|
mca_base_component_t* component)
|
|
{
|
|
mca_pml_base_modex_module_t* modex_module;
|
|
if(NULL == (modex_module = mca_pml_base_modex_lookup_module(modex, component))) {
|
|
modex_module = OBJ_NEW(mca_pml_base_modex_module_t);
|
|
if(NULL != modex_module) {
|
|
modex_module->component = *component;
|
|
opal_list_append(&modex->modex_modules, (opal_list_item_t*)modex_module);
|
|
}
|
|
}
|
|
return modex_module;
|
|
}
|
|
|
|
|
|
/**
|
|
* Callback for registry notifications.
|
|
*/
|
|
|
|
static void mca_pml_base_modex_registry_callback(
|
|
orte_gpr_notify_data_t* data,
|
|
void* cbdata)
|
|
{
|
|
orte_std_cntr_t i, j, k;
|
|
orte_gpr_value_t **values, *value;
|
|
orte_gpr_keyval_t **keyval;
|
|
ompi_proc_t *proc;
|
|
ompi_proc_t **new_procs = NULL;
|
|
size_t new_proc_count = 0;
|
|
char **token;
|
|
orte_process_name_t *proc_name;
|
|
mca_pml_base_modex_t *modex;
|
|
mca_pml_base_modex_module_t *modex_module;
|
|
mca_base_component_t component;
|
|
bool isnew = false;
|
|
int rc;
|
|
|
|
if(data->cnt) {
|
|
new_procs = (ompi_proc_t**)malloc(sizeof(ompi_proc_t*) * data->cnt);
|
|
if(NULL == new_procs) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
return;
|
|
}
|
|
}
|
|
|
|
/* process the callback */
|
|
values = (orte_gpr_value_t**)(data->values)->addr;
|
|
for (i=0, k=0; k < data->cnt &&
|
|
i < (data->values)->size; i++) {
|
|
if (NULL != values[i]) {
|
|
k++;
|
|
value = values[i];
|
|
if (0 < value->cnt) { /* needs to be at least one keyval */
|
|
/*
|
|
* Token for the value should be the process name - look it up
|
|
*/
|
|
token = value->tokens;
|
|
if (ORTE_SUCCESS == orte_ns.convert_string_to_process_name(&proc_name, token[0])) {
|
|
proc = ompi_proc_find_and_add(proc_name, &isnew);
|
|
if(NULL == proc)
|
|
continue;
|
|
|
|
if(isnew) {
|
|
new_procs[new_proc_count] = proc;
|
|
new_proc_count++;
|
|
}
|
|
|
|
/*
|
|
* Lookup the modex data structure.
|
|
*/
|
|
|
|
OPAL_THREAD_LOCK(&proc->proc_lock);
|
|
if(NULL == (modex = (mca_pml_base_modex_t*)proc->proc_modex)) {
|
|
modex = OBJ_NEW(mca_pml_base_modex_t);
|
|
if(NULL == modex) {
|
|
opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n");
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
return;
|
|
}
|
|
proc->proc_modex = &modex->super;
|
|
}
|
|
|
|
/*
|
|
* Extract the component name and version from the keyval object's key
|
|
* Could be multiple keyvals returned since there is one for each
|
|
* component type/name/version - process them all
|
|
*/
|
|
keyval = value->keyvals;
|
|
for (j=0; j < value->cnt; j++) {
|
|
orte_buffer_t buffer;
|
|
opal_list_item_t* item;
|
|
char *ptr;
|
|
void* bytes = NULL;
|
|
orte_std_cntr_t cnt;
|
|
size_t num_bytes;
|
|
orte_byte_object_t *bo;
|
|
|
|
if(strcmp(keyval[j]->key,OMPI_MODEX_KEY) != 0)
|
|
continue;
|
|
|
|
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
|
|
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bo, keyval[j]->value, ORTE_BYTE_OBJECT))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
strcpy(component.mca_type_name,ptr);
|
|
free(ptr);
|
|
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
strcpy(component.mca_component_name,ptr);
|
|
free(ptr);
|
|
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
|
|
&component.mca_component_major_version, &cnt, ORTE_INT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
|
|
&component.mca_component_minor_version, &cnt, ORTE_INT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
|
|
&num_bytes, &cnt, ORTE_SIZE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
if (num_bytes != 0) {
|
|
if(NULL == (bytes = malloc(num_bytes))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
continue;
|
|
}
|
|
cnt = (orte_std_cntr_t)num_bytes;
|
|
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
num_bytes = cnt;
|
|
} else {
|
|
bytes = NULL;
|
|
}
|
|
|
|
/*
|
|
* Lookup the corresponding modex structure
|
|
*/
|
|
if(NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) {
|
|
opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n");
|
|
OBJ_RELEASE(data);
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
return;
|
|
}
|
|
|
|
modex_module->module_data = bytes;
|
|
modex_module->module_data_size = num_bytes;
|
|
modex_module->module_data_avail = true;
|
|
opal_condition_signal(&modex_module->module_data_cond);
|
|
|
|
/* call any registered callbacks */
|
|
for(item = opal_list_get_first(&modex_module->module_cbs);
|
|
item != opal_list_get_end(&modex_module->module_cbs);
|
|
item = opal_list_get_next(item)) {
|
|
mca_pml_base_modex_cb_t* cb = (mca_pml_base_modex_cb_t*)item;
|
|
cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata);
|
|
}
|
|
}
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
} /* convert string to process name */
|
|
|
|
} /* if value[i]->cnt > 0 */
|
|
}
|
|
}
|
|
|
|
/* pml add procs */
|
|
if(NULL != new_procs) {
|
|
if(new_proc_count > 0) {
|
|
MCA_PML_CALL(add_procs(new_procs, new_proc_count));
|
|
}
|
|
free(new_procs);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Make sure we have subscribed to this segment.
|
|
*/
|
|
|
|
static int mca_pml_base_modex_subscribe(orte_process_name_t* name)
|
|
{
|
|
char *segment, *sub_name, *trig_name;
|
|
orte_gpr_subscription_id_t sub_id;
|
|
orte_jobid_t jobid;
|
|
opal_list_item_t* item;
|
|
mca_pml_base_modex_subscription_t* subscription;
|
|
int rc;
|
|
|
|
/* check for an existing subscription */
|
|
OPAL_LOCK(&mca_pml_base_modex_lock);
|
|
if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) {
|
|
for(item = opal_list_get_first(&mca_pml_base_modex_subscriptions);
|
|
item != opal_list_get_end(&mca_pml_base_modex_subscriptions);
|
|
item = opal_list_get_next(item)) {
|
|
subscription = (mca_pml_base_modex_subscription_t*)item;
|
|
if(subscription->jobid == name->jobid) {
|
|
OPAL_UNLOCK(&mca_pml_base_modex_lock);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
}
|
|
}
|
|
OPAL_UNLOCK(&mca_pml_base_modex_lock);
|
|
|
|
/* otherwise - subscribe to get this jobid's contact info */
|
|
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid(&jobid, name))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,
|
|
OMPI_MODEX_SUBSCRIPTION, jobid))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
/* attach to the stage-1 standard trigger */
|
|
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,
|
|
ORTE_STG1_TRIGGER, jobid))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(sub_name);
|
|
return rc;
|
|
}
|
|
|
|
/* define the segment */
|
|
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(sub_name);
|
|
free(trig_name);
|
|
return rc;
|
|
}
|
|
|
|
if (jobid != orte_process_info.my_name->jobid) {
|
|
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL,
|
|
ORTE_GPR_NOTIFY_ADD_ENTRY |
|
|
ORTE_GPR_NOTIFY_VALUE_CHG |
|
|
ORTE_GPR_NOTIFY_PRE_EXISTING,
|
|
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
|
|
segment,
|
|
NULL, /* look at all containers on this segment */
|
|
OMPI_MODEX_KEY,
|
|
mca_pml_base_modex_registry_callback, NULL))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(sub_name);
|
|
free(trig_name);
|
|
free(segment);
|
|
return rc;
|
|
}
|
|
} else {
|
|
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name,
|
|
ORTE_GPR_NOTIFY_ADD_ENTRY |
|
|
ORTE_GPR_NOTIFY_VALUE_CHG |
|
|
ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG,
|
|
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
|
|
segment,
|
|
NULL, /* look at all containers on this segment */
|
|
OMPI_MODEX_KEY,
|
|
mca_pml_base_modex_registry_callback, NULL))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(sub_name);
|
|
free(trig_name);
|
|
free(segment);
|
|
return rc;
|
|
}
|
|
}
|
|
free(sub_name);
|
|
free(trig_name);
|
|
free(segment);
|
|
|
|
/* add this jobid to our list of subscriptions */
|
|
OPAL_LOCK(&mca_pml_base_modex_lock);
|
|
subscription = OBJ_NEW(mca_pml_base_modex_subscription_t);
|
|
subscription->jobid = name->jobid;
|
|
opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item);
|
|
OPAL_UNLOCK(&mca_pml_base_modex_lock);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
/**
|
|
* Store the data associated with the specified module in the
|
|
* gpr. Note that the gpr is in a mode where it caches
|
|
* individual puts during startup and sends them as an aggregate
|
|
* command.
|
|
*/
|
|
|
|
int mca_pml_base_modex_send(
|
|
mca_base_component_t *source_component,
|
|
const void *data,
|
|
size_t size)
|
|
{
|
|
orte_jobid_t jobid;
|
|
int rc;
|
|
orte_buffer_t buffer;
|
|
orte_std_cntr_t i, num_tokens;
|
|
char *ptr, *segment, **tokens;
|
|
orte_byte_object_t bo;
|
|
orte_data_value_t value = ORTE_DATA_VALUE_EMPTY;
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid(&jobid, orte_process_info.my_name))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens,
|
|
&num_tokens, orte_process_info.my_name))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(segment);
|
|
return rc;
|
|
}
|
|
|
|
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
|
|
ptr = source_component->mca_type_name;
|
|
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
ptr = source_component->mca_component_name;
|
|
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (0 != size) {
|
|
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void*)data, size, ORTE_BYTE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
}
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void**)&(bo.bytes), &(bo.size)))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
OBJ_DESTRUCT(&buffer);
|
|
|
|
/* setup the data_value structure to hold the byte object */
|
|
if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void*)&bo, ORTE_BYTE_OBJECT))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
|
|
segment, tokens, OMPI_MODEX_KEY, &value);
|
|
|
|
cleanup:
|
|
free(segment);
|
|
for (i=0; i < num_tokens; i++) {
|
|
free(tokens[i]);
|
|
tokens[i] = NULL;
|
|
}
|
|
if (NULL != tokens) free(tokens);
|
|
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Retreive the data for the specified module from the source process.
|
|
*/
|
|
|
|
int mca_pml_base_modex_recv(
|
|
mca_base_component_t *component,
|
|
ompi_proc_t *proc,
|
|
void **buffer,
|
|
size_t *size)
|
|
{
|
|
mca_pml_base_modex_t* modex;
|
|
mca_pml_base_modex_module_t* modex_module;
|
|
|
|
/* check the proc for cached data */
|
|
OPAL_THREAD_LOCK(&proc->proc_lock);
|
|
if(NULL == (modex = (mca_pml_base_modex_t*)proc->proc_modex)) {
|
|
modex = OBJ_NEW(mca_pml_base_modex_t);
|
|
if(modex == NULL) {
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
proc->proc_modex = &modex->super;
|
|
|
|
/* verify that we have subscribed to this segment */
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
mca_pml_base_modex_subscribe(&proc->proc_name);
|
|
OPAL_THREAD_LOCK(&proc->proc_lock);
|
|
}
|
|
|
|
/* lookup/create the module */
|
|
if(NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) {
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
/* wait until data is available */
|
|
while(modex_module->module_data_avail == false) {
|
|
opal_condition_wait(&modex_module->module_data_cond, &proc->proc_lock);
|
|
}
|
|
|
|
/* copy the data out to the user */
|
|
if(modex_module->module_data_size == 0) {
|
|
*buffer = NULL;
|
|
*size = 0;
|
|
} else {
|
|
void *copy = malloc(modex_module->module_data_size);
|
|
if(copy == NULL) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
memcpy(copy, modex_module->module_data, modex_module->module_data_size);
|
|
*buffer = copy;
|
|
*size = modex_module->module_data_size;
|
|
}
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
/**
|
|
*
|
|
*/
|
|
|
|
int mca_pml_base_modex_recv_nb(
|
|
mca_base_component_t *component,
|
|
ompi_proc_t *proc,
|
|
mca_pml_base_modex_cb_fn_t cbfunc,
|
|
void *cbdata)
|
|
{
|
|
mca_pml_base_modex_t* modex;
|
|
mca_pml_base_modex_module_t* module;
|
|
mca_pml_base_modex_cb_t* cb;
|
|
|
|
/* check the proc for existing modex */
|
|
OPAL_THREAD_LOCK(&proc->proc_lock);
|
|
if(NULL == (modex = (mca_pml_base_modex_t*)proc->proc_modex)) {
|
|
modex = OBJ_NEW(mca_pml_base_modex_t);
|
|
if(modex == NULL) {
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
proc->proc_modex = &modex->super;
|
|
|
|
/* verify that we have subscribed to this segment */
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
mca_pml_base_modex_subscribe(&proc->proc_name);
|
|
OPAL_THREAD_LOCK(&proc->proc_lock);
|
|
}
|
|
|
|
/* lookup/create the module */
|
|
if(NULL == (module = mca_pml_base_modex_create_module(modex, component))) {
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
/* register the callback */
|
|
cb = OBJ_NEW(mca_pml_base_modex_cb_t);
|
|
cb->component = component;
|
|
cb->cbfunc = cbfunc;
|
|
cb->cbdata = cbdata;
|
|
opal_list_append(&module->module_cbs, (opal_list_item_t*)cb);
|
|
OPAL_THREAD_UNLOCK(&proc->proc_lock);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
/**
|
|
* Subscribe to the segment corresponding
|
|
* to this job.
|
|
*/
|
|
|
|
int mca_pml_base_modex_exchange(void)
|
|
{
|
|
return mca_pml_base_modex_subscribe(orte_process_info.my_name);
|
|
}
|
|
|