1
1

indent, this time with the right coding standards...

This commit was SVN r12787.
Этот коммит содержится в:
Brian Barrett 2006-12-07 00:24:01 +00:00
родитель f9ec8d6f2a
Коммит 41a70a8f01

Просмотреть файл

@ -42,10 +42,10 @@
* callback data for modex * callback data for modex
*/ */
struct mca_pml_base_modex_cb_t { struct mca_pml_base_modex_cb_t {
opal_list_item_t super; opal_list_item_t super;
mca_base_component_t *component; mca_base_component_t *component;
mca_pml_base_modex_cb_fn_t cbfunc; mca_pml_base_modex_cb_fn_t cbfunc;
void *cbdata; void *cbdata;
}; };
typedef struct mca_pml_base_modex_cb_t mca_pml_base_modex_cb_t; typedef struct mca_pml_base_modex_cb_t mca_pml_base_modex_cb_t;
@ -61,31 +61,31 @@ OBJ_CLASS_INSTANCE(mca_pml_base_modex_cb_t,
* Data for a specic proc and module. * Data for a specic proc and module.
*/ */
struct mca_pml_base_modex_module_t { struct mca_pml_base_modex_module_t {
opal_list_item_t super; opal_list_item_t super;
mca_base_component_t component; mca_base_component_t component;
void *module_data; void *module_data;
size_t module_data_size; size_t module_data_size;
bool module_data_avail; bool module_data_avail;
opal_list_t module_cbs; opal_list_t module_cbs;
opal_condition_t module_data_cond; opal_condition_t module_data_cond;
}; };
typedef struct mca_pml_base_modex_module_t mca_pml_base_modex_module_t; typedef struct mca_pml_base_modex_module_t mca_pml_base_modex_module_t;
static void static void
mca_pml_base_modex_module_construct(mca_pml_base_modex_module_t * module) mca_pml_base_modex_module_construct(mca_pml_base_modex_module_t * module)
{ {
OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t); OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t);
OBJ_CONSTRUCT(&module->module_cbs, opal_list_t); OBJ_CONSTRUCT(&module->module_cbs, opal_list_t);
memset(&module->component, 0, sizeof(module->component)); memset(&module->component, 0, sizeof(module->component));
module->module_data = NULL; module->module_data = NULL;
module->module_data_size = 0; module->module_data_size = 0;
module->module_data_avail = false; module->module_data_avail = false;
} }
static void static void
mca_pml_base_modex_module_destruct(mca_pml_base_modex_module_t * module) mca_pml_base_modex_module_destruct(mca_pml_base_modex_module_t * module)
{ {
OBJ_DESTRUCT(&module->module_data_cond); OBJ_DESTRUCT(&module->module_data_cond);
} }
OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t, OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t,
@ -101,21 +101,21 @@ OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t,
* received from peers. * received from peers.
*/ */
struct mca_pml_base_modex_t { struct mca_pml_base_modex_t {
opal_object_t super; opal_object_t super;
opal_list_t modex_modules; opal_list_t modex_modules;
}; };
typedef struct mca_pml_base_modex_t mca_pml_base_modex_t; typedef struct mca_pml_base_modex_t mca_pml_base_modex_t;
static void static void
mca_pml_base_modex_construct(mca_pml_base_modex_t * modex) mca_pml_base_modex_construct(mca_pml_base_modex_t * modex)
{ {
OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t); OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t);
} }
static void static void
mca_pml_base_modex_destruct(mca_pml_base_modex_t * modex) mca_pml_base_modex_destruct(mca_pml_base_modex_t * modex)
{ {
OBJ_DESTRUCT(&modex->modex_modules); OBJ_DESTRUCT(&modex->modex_modules);
} }
OBJ_CLASS_INSTANCE(mca_pml_base_modex_t, OBJ_CLASS_INSTANCE(mca_pml_base_modex_t,
@ -130,8 +130,8 @@ OBJ_CLASS_INSTANCE(mca_pml_base_modex_t,
* Track segments we have subscribed to. * Track segments we have subscribed to.
*/ */
struct mca_pml_base_modex_subscription_t { struct mca_pml_base_modex_subscription_t {
opal_list_item_t item; opal_list_item_t item;
orte_jobid_t jobid; orte_jobid_t jobid;
}; };
typedef struct mca_pml_base_modex_subscription_t mca_pml_base_modex_subscription_t; typedef struct mca_pml_base_modex_subscription_t mca_pml_base_modex_subscription_t;
@ -154,9 +154,9 @@ static opal_mutex_t mca_pml_base_modex_lock;
int int
mca_pml_base_modex_init(void) mca_pml_base_modex_init(void)
{ {
OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t); OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/** /**
@ -165,11 +165,11 @@ mca_pml_base_modex_init(void)
int int
mca_pml_base_modex_finalize(void) mca_pml_base_modex_finalize(void)
{ {
opal_list_item_t *item; opal_list_item_t *item;
while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions))) while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions)))
OBJ_RELEASE(item); OBJ_RELEASE(item);
OBJ_DESTRUCT(&mca_pml_base_modex_subscriptions); OBJ_DESTRUCT(&mca_pml_base_modex_subscriptions);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -181,15 +181,15 @@ static mca_pml_base_modex_module_t *
mca_pml_base_modex_lookup_module(mca_pml_base_modex_t * modex, mca_pml_base_modex_lookup_module(mca_pml_base_modex_t * modex,
mca_base_component_t * component) mca_base_component_t * component)
{ {
mca_pml_base_modex_module_t *modex_module; mca_pml_base_modex_module_t *modex_module;
for (modex_module = (mca_pml_base_modex_module_t *) opal_list_get_first(&modex->modex_modules); for (modex_module = (mca_pml_base_modex_module_t *) opal_list_get_first(&modex->modex_modules);
modex_module != (mca_pml_base_modex_module_t *) opal_list_get_end(&modex->modex_modules); modex_module != (mca_pml_base_modex_module_t *) opal_list_get_end(&modex->modex_modules);
modex_module = (mca_pml_base_modex_module_t *) opal_list_get_next(modex_module)) { modex_module = (mca_pml_base_modex_module_t *) opal_list_get_next(modex_module)) {
if (mca_base_component_compatible(&modex_module->component, component) == 0) { if (mca_base_component_compatible(&modex_module->component, component) == 0) {
return modex_module; return modex_module;
}
} }
} return NULL;
return NULL;
} }
@ -201,15 +201,15 @@ static mca_pml_base_modex_module_t *
mca_pml_base_modex_create_module(mca_pml_base_modex_t * modex, mca_pml_base_modex_create_module(mca_pml_base_modex_t * modex,
mca_base_component_t * component) mca_base_component_t * component)
{ {
mca_pml_base_modex_module_t *modex_module; mca_pml_base_modex_module_t *modex_module;
if (NULL == (modex_module = mca_pml_base_modex_lookup_module(modex, component))) { if (NULL == (modex_module = mca_pml_base_modex_lookup_module(modex, component))) {
modex_module = OBJ_NEW(mca_pml_base_modex_module_t); modex_module = OBJ_NEW(mca_pml_base_modex_module_t);
if (NULL != modex_module) { if (NULL != modex_module) {
modex_module->component = *component; modex_module->component = *component;
opal_list_append(&modex->modex_modules, (opal_list_item_t *) modex_module); opal_list_append(&modex->modex_modules, (opal_list_item_t *) modex_module);
}
} }
} return modex_module;
return modex_module;
} }
@ -221,176 +221,176 @@ static void
mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data, mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data,
void *cbdata) void *cbdata)
{ {
orte_std_cntr_t i, j, k; orte_std_cntr_t i, j, k;
orte_gpr_value_t **values, *value; orte_gpr_value_t **values, *value;
orte_gpr_keyval_t **keyval; orte_gpr_keyval_t **keyval;
ompi_proc_t *proc; ompi_proc_t *proc;
ompi_proc_t **new_procs = NULL; ompi_proc_t **new_procs = NULL;
size_t new_proc_count = 0; size_t new_proc_count = 0;
char **token; char **token;
orte_process_name_t *proc_name; orte_process_name_t *proc_name;
mca_pml_base_modex_t *modex; mca_pml_base_modex_t *modex;
mca_pml_base_modex_module_t *modex_module; mca_pml_base_modex_module_t *modex_module;
mca_base_component_t component; mca_base_component_t component;
bool isnew = false; bool isnew = false;
int rc; int rc;
if (data->cnt) { if (data->cnt) {
new_procs = (ompi_proc_t **) malloc(sizeof(ompi_proc_t *) * data->cnt); new_procs = (ompi_proc_t **) malloc(sizeof(ompi_proc_t *) * data->cnt);
if (NULL == new_procs) { if (NULL == new_procs) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return; return;
}
} }
} /* process the callback */
/* process the callback */ values = (orte_gpr_value_t **) (data->values)->addr;
values = (orte_gpr_value_t **) (data->values)->addr; for (i = 0, k = 0; k < data->cnt &&
for (i = 0, k = 0; k < data->cnt && i < (data->values)->size; i++) {
i < (data->values)->size; i++) { if (NULL != values[i]) {
if (NULL != values[i]) { k++;
k++; value = values[i];
value = values[i]; if (0 < value->cnt) { /* needs to be at least one keyval */
if (0 < value->cnt) { /* needs to be at least one keyval */ /*
/* * Token for the value should be the process name - look it up
* Token for the value should be the process name - look it up */
*/ token = value->tokens;
token = value->tokens; if (ORTE_SUCCESS == orte_ns.convert_string_to_process_name(&proc_name, token[0])) {
if (ORTE_SUCCESS == orte_ns.convert_string_to_process_name(&proc_name, token[0])) { proc = ompi_proc_find_and_add(proc_name, &isnew);
proc = ompi_proc_find_and_add(proc_name, &isnew); if (NULL == proc)
if (NULL == proc) continue;
continue;
if (isnew) { if (isnew) {
new_procs[new_proc_count] = proc; new_procs[new_proc_count] = proc;
new_proc_count++; new_proc_count++;
} }
/* /*
* Lookup the modex data structure. * Lookup the modex data structure.
*/ */
OPAL_THREAD_LOCK(&proc->proc_lock); OPAL_THREAD_LOCK(&proc->proc_lock);
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t); modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == modex) { if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n"); opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(&proc->proc_lock); OPAL_THREAD_UNLOCK(&proc->proc_lock);
return; return;
} }
proc->proc_modex = &modex->super; proc->proc_modex = &modex->super;
} }
/* /*
* Extract the component name and version from the keyval object's key * Extract the component name and version from the keyval object's key
* Could be multiple keyvals returned since there is one for each * Could be multiple keyvals returned since there is one for each
* component type/name/version - process them all * component type/name/version - process them all
*/ */
keyval = value->keyvals; keyval = value->keyvals;
for (j = 0; j < value->cnt; j++) { for (j = 0; j < value->cnt; j++) {
orte_buffer_t buffer; orte_buffer_t buffer;
opal_list_item_t *item; opal_list_item_t *item;
char *ptr; char *ptr;
void *bytes = NULL; void *bytes = NULL;
orte_std_cntr_t cnt; orte_std_cntr_t cnt;
size_t num_bytes; size_t num_bytes;
orte_byte_object_t *bo; orte_byte_object_t *bo;
if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0) if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0)
continue; continue;
OBJ_CONSTRUCT(&buffer, orte_buffer_t); OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) { if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
continue; continue;
} }
if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) { if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
continue; continue;
} }
cnt = 1; cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
continue; continue;
} }
strcpy(component.mca_type_name, ptr); strcpy(component.mca_type_name, ptr);
free(ptr); free(ptr);
cnt = 1; cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
continue; continue;
} }
strcpy(component.mca_component_name, ptr); strcpy(component.mca_component_name, ptr);
free(ptr); free(ptr);
cnt = 1; cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&component.mca_component_major_version, &cnt, ORTE_INT32))) { &component.mca_component_major_version, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
continue; continue;
} }
cnt = 1; cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&component.mca_component_minor_version, &cnt, ORTE_INT32))) { &component.mca_component_minor_version, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
continue; continue;
} }
cnt = 1; cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&num_bytes, &cnt, ORTE_SIZE))) { &num_bytes, &cnt, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
continue; continue;
} }
if (num_bytes != 0) { if (num_bytes != 0) {
if (NULL == (bytes = malloc(num_bytes))) { if (NULL == (bytes = malloc(num_bytes))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
continue; continue;
} }
cnt = (orte_std_cntr_t) num_bytes; cnt = (orte_std_cntr_t) num_bytes;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) { if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
continue; continue;
} }
num_bytes = cnt; num_bytes = cnt;
} else { } else {
bytes = NULL; bytes = NULL;
} }
/* /*
* Lookup the corresponding modex structure * Lookup the corresponding modex structure
*/ */
if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) { if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) {
opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n"); opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n");
OBJ_RELEASE(data); OBJ_RELEASE(data);
OPAL_THREAD_UNLOCK(&proc->proc_lock); OPAL_THREAD_UNLOCK(&proc->proc_lock);
return; return;
} }
modex_module->module_data = bytes; modex_module->module_data = bytes;
modex_module->module_data_size = num_bytes; modex_module->module_data_size = num_bytes;
modex_module->module_data_avail = true; modex_module->module_data_avail = true;
opal_condition_signal(&modex_module->module_data_cond); opal_condition_signal(&modex_module->module_data_cond);
/* /*
* call any registered * call any registered
* callbacks * callbacks
*/ */
for (item = opal_list_get_first(&modex_module->module_cbs); for (item = opal_list_get_first(&modex_module->module_cbs);
item != opal_list_get_end(&modex_module->module_cbs); item != opal_list_get_end(&modex_module->module_cbs);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item; mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item;
cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata); cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata);
} }
} }
OPAL_THREAD_UNLOCK(&proc->proc_lock); OPAL_THREAD_UNLOCK(&proc->proc_lock);
} /* convert string to process name */ } /* convert string to process name */
} /* if value[i]->cnt > 0 */ } /* if value[i]->cnt > 0 */
}
} }
}
/* pml add procs */ /* pml add procs */
if (NULL != new_procs) { if (NULL != new_procs) {
if (new_proc_count > 0) { if (new_proc_count > 0) {
MCA_PML_CALL(add_procs(new_procs, new_proc_count)); MCA_PML_CALL(add_procs(new_procs, new_proc_count));
}
free(new_procs);
} }
free(new_procs);
}
} }
/** /**
@ -400,98 +400,98 @@ mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data,
static int static int
mca_pml_base_modex_subscribe(orte_process_name_t * name) mca_pml_base_modex_subscribe(orte_process_name_t * name)
{ {
char *segment, *sub_name, *trig_name; char *segment, *sub_name, *trig_name;
orte_gpr_subscription_id_t sub_id; orte_gpr_subscription_id_t sub_id;
orte_jobid_t jobid; orte_jobid_t jobid;
opal_list_item_t *item; opal_list_item_t *item;
mca_pml_base_modex_subscription_t *subscription; mca_pml_base_modex_subscription_t *subscription;
int rc; int rc;
/* check for an existing subscription */ /* check for an existing subscription */
OPAL_LOCK(&mca_pml_base_modex_lock); OPAL_LOCK(&mca_pml_base_modex_lock);
if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) { if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) {
for (item = opal_list_get_first(&mca_pml_base_modex_subscriptions); for (item = opal_list_get_first(&mca_pml_base_modex_subscriptions);
item != opal_list_get_end(&mca_pml_base_modex_subscriptions); item != opal_list_get_end(&mca_pml_base_modex_subscriptions);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
subscription = (mca_pml_base_modex_subscription_t *) item; subscription = (mca_pml_base_modex_subscription_t *) item;
if (subscription->jobid == name->jobid) { if (subscription->jobid == name->jobid) {
OPAL_UNLOCK(&mca_pml_base_modex_lock); OPAL_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
}
} }
} OPAL_UNLOCK(&mca_pml_base_modex_lock);
OPAL_UNLOCK(&mca_pml_base_modex_lock);
/* otherwise - subscribe to get this jobid's contact info */ /* otherwise - subscribe to get this jobid's contact info */
jobid = name->jobid; jobid = name->jobid;
if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name, if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,
OMPI_MODEX_SUBSCRIPTION, jobid))) { OMPI_MODEX_SUBSCRIPTION, jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
/* attach to the stage-1 standard trigger */ /* attach to the stage-1 standard trigger */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name, if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,
ORTE_STG1_TRIGGER, jobid))) { ORTE_STG1_TRIGGER, jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
free(sub_name); free(sub_name);
return rc; return rc;
} }
/* define the segment */ /* define the segment */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
free(sub_name); free(sub_name);
free(trig_name); free(trig_name);
return rc; return rc;
} }
if (jobid != orte_process_info.my_name->jobid) { if (jobid != orte_process_info.my_name->jobid) {
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL, if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL,
ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_PRE_EXISTING, ORTE_GPR_NOTIFY_PRE_EXISTING,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
segment, segment,
NULL, /* look at all NULL, /* look at all
* containers on this * containers on this
* segment */ * segment */
OMPI_MODEX_KEY, OMPI_MODEX_KEY,
mca_pml_base_modex_registry_callback, NULL))) { mca_pml_base_modex_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
free(sub_name); free(sub_name);
free(trig_name); free(trig_name);
free(segment); free(segment);
return rc; return rc;
} }
} else { } else {
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name, if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name,
ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG, ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR,
segment, segment,
NULL, /* look at all NULL, /* look at all
* containers on this * containers on this
* segment */ * segment */
OMPI_MODEX_KEY, OMPI_MODEX_KEY,
mca_pml_base_modex_registry_callback, NULL))) { mca_pml_base_modex_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
free(sub_name); free(sub_name);
free(trig_name); free(trig_name);
free(segment); free(segment);
return rc; return rc;
}
} }
} free(sub_name);
free(sub_name); free(trig_name);
free(trig_name); free(segment);
free(segment);
/* add this jobid to our list of subscriptions */ /* add this jobid to our list of subscriptions */
OPAL_LOCK(&mca_pml_base_modex_lock); OPAL_LOCK(&mca_pml_base_modex_lock);
subscription = OBJ_NEW(mca_pml_base_modex_subscription_t); subscription = OBJ_NEW(mca_pml_base_modex_subscription_t);
subscription->jobid = name->jobid; subscription->jobid = name->jobid;
opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item); opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item);
OPAL_UNLOCK(&mca_pml_base_modex_lock); OPAL_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -507,79 +507,79 @@ mca_pml_base_modex_send(mca_base_component_t * source_component,
const void *data, const void *data,
size_t size) size_t size)
{ {
orte_jobid_t jobid; orte_jobid_t jobid;
int rc; int rc;
orte_buffer_t buffer; orte_buffer_t buffer;
orte_std_cntr_t i, num_tokens; orte_std_cntr_t i, num_tokens;
char *ptr, *segment, **tokens; char *ptr, *segment, **tokens;
orte_byte_object_t bo; orte_byte_object_t bo;
orte_data_value_t value = ORTE_DATA_VALUE_EMPTY; orte_data_value_t value = ORTE_DATA_VALUE_EMPTY;
jobid = ORTE_PROC_MY_NAME->jobid; jobid = ORTE_PROC_MY_NAME->jobid;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens,
&num_tokens, orte_process_info.my_name))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
ptr = source_component->mca_type_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
ptr = source_component->mca_component_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 != size) {
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
} }
} if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens,
if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) { &num_tokens, orte_process_info.my_name))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; free(segment);
} return rc;
OBJ_DESTRUCT(&buffer); }
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
ptr = source_component->mca_type_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
ptr = source_component->mca_component_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 != size) {
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
/* setup the data_value structure to hold the byte object */ /* setup the data_value structure to hold the byte object */
if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) { if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR, rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
segment, tokens, OMPI_MODEX_KEY, &value); segment, tokens, OMPI_MODEX_KEY, &value);
cleanup: cleanup:
free(segment); free(segment);
for (i = 0; i < num_tokens; i++) { for (i = 0; i < num_tokens; i++) {
free(tokens[i]); free(tokens[i]);
tokens[i] = NULL; tokens[i] = NULL;
} }
if (NULL != tokens) if (NULL != tokens)
free(tokens); free(tokens);
return rc; return rc;
} }
@ -593,49 +593,49 @@ mca_pml_base_modex_recv(mca_base_component_t * component,
void **buffer, void **buffer,
size_t * size) size_t * size)
{ {
mca_pml_base_modex_t *modex; mca_pml_base_modex_t *modex;
mca_pml_base_modex_module_t *modex_module; mca_pml_base_modex_module_t *modex_module;
/* check the proc for cached data */ /* check the proc for cached data */
OPAL_THREAD_LOCK(&proc->proc_lock);
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
if (modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
proc->proc_modex = &modex->super;
/* verify that we have subscribed to this segment */
OPAL_THREAD_UNLOCK(&proc->proc_lock);
mca_pml_base_modex_subscribe(&proc->proc_name);
OPAL_THREAD_LOCK(&proc->proc_lock); OPAL_THREAD_LOCK(&proc->proc_lock);
} if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
/* lookup/create the module */ modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) { if (modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock); OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
/* wait until data is available */ proc->proc_modex = &modex->super;
while (modex_module->module_data_avail == false) {
opal_condition_wait(&modex_module->module_data_cond, &proc->proc_lock);
}
/* copy the data out to the user */ /* verify that we have subscribed to this segment */
if (modex_module->module_data_size == 0) { OPAL_THREAD_UNLOCK(&proc->proc_lock);
*buffer = NULL; mca_pml_base_modex_subscribe(&proc->proc_name);
*size = 0; OPAL_THREAD_LOCK(&proc->proc_lock);
} else {
void *copy = malloc(modex_module->module_data_size);
if (copy == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
} }
memcpy(copy, modex_module->module_data, modex_module->module_data_size); /* lookup/create the module */
*buffer = copy; if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) {
*size = modex_module->module_data_size; OPAL_THREAD_UNLOCK(&proc->proc_lock);
} return OMPI_ERR_OUT_OF_RESOURCE;
OPAL_THREAD_UNLOCK(&proc->proc_lock); }
return OMPI_SUCCESS; /* wait until data is available */
while (modex_module->module_data_avail == false) {
opal_condition_wait(&modex_module->module_data_cond, &proc->proc_lock);
}
/* copy the data out to the user */
if (modex_module->module_data_size == 0) {
*buffer = NULL;
*size = 0;
} else {
void *copy = malloc(modex_module->module_data_size);
if (copy == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(copy, modex_module->module_data, modex_module->module_data_size);
*buffer = copy;
*size = modex_module->module_data_size;
}
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_SUCCESS;
} }
@ -649,38 +649,38 @@ mca_pml_base_modex_recv_nb(mca_base_component_t * component,
mca_pml_base_modex_cb_fn_t cbfunc, mca_pml_base_modex_cb_fn_t cbfunc,
void *cbdata) void *cbdata)
{ {
mca_pml_base_modex_t *modex; mca_pml_base_modex_t *modex;
mca_pml_base_modex_module_t *module; mca_pml_base_modex_module_t *module;
mca_pml_base_modex_cb_t *cb; mca_pml_base_modex_cb_t *cb;
/* check the proc for existing modex */ /* check the proc for existing modex */
OPAL_THREAD_LOCK(&proc->proc_lock);
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
if (modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
proc->proc_modex = &modex->super;
/* verify that we have subscribed to this segment */
OPAL_THREAD_UNLOCK(&proc->proc_lock);
mca_pml_base_modex_subscribe(&proc->proc_name);
OPAL_THREAD_LOCK(&proc->proc_lock); OPAL_THREAD_LOCK(&proc->proc_lock);
} if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
/* lookup/create the module */ modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) { if (modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
proc->proc_modex = &modex->super;
/* verify that we have subscribed to this segment */
OPAL_THREAD_UNLOCK(&proc->proc_lock);
mca_pml_base_modex_subscribe(&proc->proc_name);
OPAL_THREAD_LOCK(&proc->proc_lock);
}
/* lookup/create the module */
if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* register the callback */
cb = OBJ_NEW(mca_pml_base_modex_cb_t);
cb->component = component;
cb->cbfunc = cbfunc;
cb->cbdata = cbdata;
opal_list_append(&module->module_cbs, (opal_list_item_t *) cb);
OPAL_THREAD_UNLOCK(&proc->proc_lock); OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_SUCCESS;
}
/* register the callback */
cb = OBJ_NEW(mca_pml_base_modex_cb_t);
cb->component = component;
cb->cbfunc = cbfunc;
cb->cbdata = cbdata;
opal_list_append(&module->module_cbs, (opal_list_item_t *) cb);
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_SUCCESS;
} }
@ -692,5 +692,5 @@ mca_pml_base_modex_recv_nb(mca_base_component_t * component,
int int
mca_pml_base_modex_exchange(void) mca_pml_base_modex_exchange(void)
{ {
return mca_pml_base_modex_subscribe(orte_process_info.my_name); return mca_pml_base_modex_subscribe(orte_process_info.my_name);
} }