diff --git a/ompi/mca/pml/base/pml_base_module_exchange.c b/ompi/mca/pml/base/pml_base_module_exchange.c index 9ec5e05242..b47d991c56 100644 --- a/ompi/mca/pml/base/pml_base_module_exchange.c +++ b/ompi/mca/pml/base/pml_base_module_exchange.c @@ -10,7 +10,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006 Los Alamos National Security, LLC. All rights - * reserved. + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -42,10 +42,10 @@ * callback data for modex */ struct mca_pml_base_modex_cb_t { - opal_list_item_t super; - mca_base_component_t *component; - mca_pml_base_modex_cb_fn_t cbfunc; - void *cbdata; + opal_list_item_t super; + mca_base_component_t *component; + mca_pml_base_modex_cb_fn_t cbfunc; + void *cbdata; }; typedef struct mca_pml_base_modex_cb_t mca_pml_base_modex_cb_t; @@ -61,31 +61,31 @@ OBJ_CLASS_INSTANCE(mca_pml_base_modex_cb_t, * Data for a specic proc and module. */ struct mca_pml_base_modex_module_t { - opal_list_item_t super; - mca_base_component_t component; - void *module_data; - size_t module_data_size; - bool module_data_avail; - opal_list_t module_cbs; - opal_condition_t module_data_cond; + opal_list_item_t super; + mca_base_component_t component; + void *module_data; + size_t module_data_size; + bool module_data_avail; + opal_list_t module_cbs; + opal_condition_t module_data_cond; }; typedef struct mca_pml_base_modex_module_t mca_pml_base_modex_module_t; static void mca_pml_base_modex_module_construct(mca_pml_base_modex_module_t * module) { - OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t); - OBJ_CONSTRUCT(&module->module_cbs, opal_list_t); - memset(&module->component, 0, sizeof(module->component)); - module->module_data = NULL; - module->module_data_size = 0; - module->module_data_avail = false; + OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t); + OBJ_CONSTRUCT(&module->module_cbs, opal_list_t); + memset(&module->component, 0, sizeof(module->component)); + module->module_data = NULL; + module->module_data_size = 0; + module->module_data_avail = false; } static void mca_pml_base_modex_module_destruct(mca_pml_base_modex_module_t * module) { - OBJ_DESTRUCT(&module->module_data_cond); + OBJ_DESTRUCT(&module->module_data_cond); } OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t, @@ -101,21 +101,21 @@ OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t, * received from peers. */ struct mca_pml_base_modex_t { - opal_object_t super; - opal_list_t modex_modules; + opal_object_t super; + opal_list_t modex_modules; }; typedef struct mca_pml_base_modex_t mca_pml_base_modex_t; static void mca_pml_base_modex_construct(mca_pml_base_modex_t * modex) { - OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t); + OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t); } static void mca_pml_base_modex_destruct(mca_pml_base_modex_t * modex) { - OBJ_DESTRUCT(&modex->modex_modules); + OBJ_DESTRUCT(&modex->modex_modules); } OBJ_CLASS_INSTANCE(mca_pml_base_modex_t, @@ -130,8 +130,8 @@ OBJ_CLASS_INSTANCE(mca_pml_base_modex_t, * Track segments we have subscribed to. */ struct mca_pml_base_modex_subscription_t { - opal_list_item_t item; - orte_jobid_t jobid; + opal_list_item_t item; + orte_jobid_t jobid; }; typedef struct mca_pml_base_modex_subscription_t mca_pml_base_modex_subscription_t; @@ -154,9 +154,9 @@ static opal_mutex_t mca_pml_base_modex_lock; int mca_pml_base_modex_init(void) { - OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t); - OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t); - return OMPI_SUCCESS; + OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t); + OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t); + return OMPI_SUCCESS; } /** @@ -165,11 +165,11 @@ mca_pml_base_modex_init(void) int mca_pml_base_modex_finalize(void) { - opal_list_item_t *item; - while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions))) - OBJ_RELEASE(item); - OBJ_DESTRUCT(&mca_pml_base_modex_subscriptions); - return OMPI_SUCCESS; + opal_list_item_t *item; + while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions))) + OBJ_RELEASE(item); + OBJ_DESTRUCT(&mca_pml_base_modex_subscriptions); + return OMPI_SUCCESS; } @@ -181,15 +181,15 @@ static mca_pml_base_modex_module_t * mca_pml_base_modex_lookup_module(mca_pml_base_modex_t * modex, mca_base_component_t * component) { - mca_pml_base_modex_module_t *modex_module; - for (modex_module = (mca_pml_base_modex_module_t *) opal_list_get_first(&modex->modex_modules); - modex_module != (mca_pml_base_modex_module_t *) opal_list_get_end(&modex->modex_modules); - modex_module = (mca_pml_base_modex_module_t *) opal_list_get_next(modex_module)) { - if (mca_base_component_compatible(&modex_module->component, component) == 0) { - return modex_module; + mca_pml_base_modex_module_t *modex_module; + for (modex_module = (mca_pml_base_modex_module_t *) opal_list_get_first(&modex->modex_modules); + modex_module != (mca_pml_base_modex_module_t *) opal_list_get_end(&modex->modex_modules); + modex_module = (mca_pml_base_modex_module_t *) opal_list_get_next(modex_module)) { + if (mca_base_component_compatible(&modex_module->component, component) == 0) { + return modex_module; + } } - } - return NULL; + return NULL; } @@ -201,15 +201,15 @@ static mca_pml_base_modex_module_t * mca_pml_base_modex_create_module(mca_pml_base_modex_t * modex, mca_base_component_t * component) { - mca_pml_base_modex_module_t *modex_module; - if (NULL == (modex_module = mca_pml_base_modex_lookup_module(modex, component))) { - modex_module = OBJ_NEW(mca_pml_base_modex_module_t); - if (NULL != modex_module) { - modex_module->component = *component; - opal_list_append(&modex->modex_modules, (opal_list_item_t *) modex_module); + mca_pml_base_modex_module_t *modex_module; + if (NULL == (modex_module = mca_pml_base_modex_lookup_module(modex, component))) { + modex_module = OBJ_NEW(mca_pml_base_modex_module_t); + if (NULL != modex_module) { + modex_module->component = *component; + opal_list_append(&modex->modex_modules, (opal_list_item_t *) modex_module); + } } - } - return modex_module; + return modex_module; } @@ -221,176 +221,176 @@ static void mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data, void *cbdata) { - orte_std_cntr_t i, j, k; - orte_gpr_value_t **values, *value; - orte_gpr_keyval_t **keyval; - ompi_proc_t *proc; - ompi_proc_t **new_procs = NULL; - size_t new_proc_count = 0; - char **token; - orte_process_name_t *proc_name; - mca_pml_base_modex_t *modex; - mca_pml_base_modex_module_t *modex_module; - mca_base_component_t component; - bool isnew = false; - int rc; + orte_std_cntr_t i, j, k; + orte_gpr_value_t **values, *value; + orte_gpr_keyval_t **keyval; + ompi_proc_t *proc; + ompi_proc_t **new_procs = NULL; + size_t new_proc_count = 0; + char **token; + orte_process_name_t *proc_name; + mca_pml_base_modex_t *modex; + mca_pml_base_modex_module_t *modex_module; + mca_base_component_t component; + bool isnew = false; + int rc; - if (data->cnt) { - new_procs = (ompi_proc_t **) malloc(sizeof(ompi_proc_t *) * data->cnt); - if (NULL == new_procs) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return; + if (data->cnt) { + new_procs = (ompi_proc_t **) malloc(sizeof(ompi_proc_t *) * data->cnt); + if (NULL == new_procs) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return; + } } - } - /* process the callback */ - values = (orte_gpr_value_t **) (data->values)->addr; - for (i = 0, k = 0; k < data->cnt && - i < (data->values)->size; i++) { - if (NULL != values[i]) { - k++; - value = values[i]; - if (0 < value->cnt) { /* needs to be at least one keyval */ - /* - * Token for the value should be the process name - look it up - */ - token = value->tokens; - if (ORTE_SUCCESS == orte_ns.convert_string_to_process_name(&proc_name, token[0])) { - proc = ompi_proc_find_and_add(proc_name, &isnew); - if (NULL == proc) - continue; + /* process the callback */ + values = (orte_gpr_value_t **) (data->values)->addr; + for (i = 0, k = 0; k < data->cnt && + i < (data->values)->size; i++) { + if (NULL != values[i]) { + k++; + value = values[i]; + if (0 < value->cnt) { /* needs to be at least one keyval */ + /* + * Token for the value should be the process name - look it up + */ + token = value->tokens; + if (ORTE_SUCCESS == orte_ns.convert_string_to_process_name(&proc_name, token[0])) { + proc = ompi_proc_find_and_add(proc_name, &isnew); + if (NULL == proc) + continue; - if (isnew) { - new_procs[new_proc_count] = proc; - new_proc_count++; - } - /* - * Lookup the modex data structure. - */ + if (isnew) { + new_procs[new_proc_count] = proc; + new_proc_count++; + } + /* + * Lookup the modex data structure. + */ - OPAL_THREAD_LOCK(&proc->proc_lock); - if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { - modex = OBJ_NEW(mca_pml_base_modex_t); - if (NULL == modex) { - opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n"); - OPAL_THREAD_UNLOCK(&proc->proc_lock); - return; - } - proc->proc_modex = &modex->super; - } - /* - * Extract the component name and version from the keyval object's key - * Could be multiple keyvals returned since there is one for each - * component type/name/version - process them all - */ - keyval = value->keyvals; - for (j = 0; j < value->cnt; j++) { - orte_buffer_t buffer; - opal_list_item_t *item; - char *ptr; - void *bytes = NULL; - orte_std_cntr_t cnt; - size_t num_bytes; - orte_byte_object_t *bo; + OPAL_THREAD_LOCK(&proc->proc_lock); + if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { + modex = OBJ_NEW(mca_pml_base_modex_t); + if (NULL == modex) { + opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n"); + OPAL_THREAD_UNLOCK(&proc->proc_lock); + return; + } + proc->proc_modex = &modex->super; + } + /* + * Extract the component name and version from the keyval object's key + * Could be multiple keyvals returned since there is one for each + * component type/name/version - process them all + */ + keyval = value->keyvals; + for (j = 0; j < value->cnt; j++) { + orte_buffer_t buffer; + opal_list_item_t *item; + char *ptr; + void *bytes = NULL; + orte_std_cntr_t cnt; + size_t num_bytes; + orte_byte_object_t *bo; - if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0) - continue; + if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0) + continue; - OBJ_CONSTRUCT(&buffer, orte_buffer_t); - if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) { - ORTE_ERROR_LOG(rc); - continue; - } - if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) { - ORTE_ERROR_LOG(rc); - continue; - } - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { - ORTE_ERROR_LOG(rc); - continue; - } - strcpy(component.mca_type_name, ptr); - free(ptr); + OBJ_CONSTRUCT(&buffer, orte_buffer_t); + if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + continue; + } + if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) { + ORTE_ERROR_LOG(rc); + continue; + } + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + continue; + } + strcpy(component.mca_type_name, ptr); + free(ptr); - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { - ORTE_ERROR_LOG(rc); - continue; - } - strcpy(component.mca_component_name, ptr); - free(ptr); + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + continue; + } + strcpy(component.mca_component_name, ptr); + free(ptr); - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, - &component.mca_component_major_version, &cnt, ORTE_INT32))) { - ORTE_ERROR_LOG(rc); - continue; - } - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, - &component.mca_component_minor_version, &cnt, ORTE_INT32))) { - ORTE_ERROR_LOG(rc); - continue; - } - cnt = 1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, + &component.mca_component_major_version, &cnt, ORTE_INT32))) { + ORTE_ERROR_LOG(rc); + continue; + } + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, + &component.mca_component_minor_version, &cnt, ORTE_INT32))) { + ORTE_ERROR_LOG(rc); + continue; + } + cnt = 1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &num_bytes, &cnt, ORTE_SIZE))) { - ORTE_ERROR_LOG(rc); - continue; - } - if (num_bytes != 0) { - if (NULL == (bytes = malloc(num_bytes))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - continue; - } - cnt = (orte_std_cntr_t) num_bytes; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) { - ORTE_ERROR_LOG(rc); - continue; - } - num_bytes = cnt; - } else { - bytes = NULL; - } + ORTE_ERROR_LOG(rc); + continue; + } + if (num_bytes != 0) { + if (NULL == (bytes = malloc(num_bytes))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + continue; + } + cnt = (orte_std_cntr_t) num_bytes; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) { + ORTE_ERROR_LOG(rc); + continue; + } + num_bytes = cnt; + } else { + bytes = NULL; + } - /* - * Lookup the corresponding modex structure - */ - if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) { - opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n"); - OBJ_RELEASE(data); - OPAL_THREAD_UNLOCK(&proc->proc_lock); - return; - } - modex_module->module_data = bytes; - modex_module->module_data_size = num_bytes; - modex_module->module_data_avail = true; - opal_condition_signal(&modex_module->module_data_cond); + /* + * Lookup the corresponding modex structure + */ + if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) { + opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n"); + OBJ_RELEASE(data); + OPAL_THREAD_UNLOCK(&proc->proc_lock); + return; + } + modex_module->module_data = bytes; + modex_module->module_data_size = num_bytes; + modex_module->module_data_avail = true; + opal_condition_signal(&modex_module->module_data_cond); - /* - * call any registered - * callbacks - */ - for (item = opal_list_get_first(&modex_module->module_cbs); - item != opal_list_get_end(&modex_module->module_cbs); - item = opal_list_get_next(item)) { - mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item; - cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata); - } - } - OPAL_THREAD_UNLOCK(&proc->proc_lock); - } /* convert string to process name */ - } /* if value[i]->cnt > 0 */ + /* + * call any registered + * callbacks + */ + for (item = opal_list_get_first(&modex_module->module_cbs); + item != opal_list_get_end(&modex_module->module_cbs); + item = opal_list_get_next(item)) { + mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item; + cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata); + } + } + OPAL_THREAD_UNLOCK(&proc->proc_lock); + } /* convert string to process name */ + } /* if value[i]->cnt > 0 */ + } } - } - /* pml add procs */ - if (NULL != new_procs) { - if (new_proc_count > 0) { - MCA_PML_CALL(add_procs(new_procs, new_proc_count)); + /* pml add procs */ + if (NULL != new_procs) { + if (new_proc_count > 0) { + MCA_PML_CALL(add_procs(new_procs, new_proc_count)); + } + free(new_procs); } - free(new_procs); - } } /** @@ -400,98 +400,98 @@ mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data, static int mca_pml_base_modex_subscribe(orte_process_name_t * name) { - char *segment, *sub_name, *trig_name; - orte_gpr_subscription_id_t sub_id; - orte_jobid_t jobid; - opal_list_item_t *item; - mca_pml_base_modex_subscription_t *subscription; - int rc; + char *segment, *sub_name, *trig_name; + orte_gpr_subscription_id_t sub_id; + orte_jobid_t jobid; + opal_list_item_t *item; + mca_pml_base_modex_subscription_t *subscription; + int rc; - /* check for an existing subscription */ - OPAL_LOCK(&mca_pml_base_modex_lock); - if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) { - for (item = opal_list_get_first(&mca_pml_base_modex_subscriptions); - item != opal_list_get_end(&mca_pml_base_modex_subscriptions); - item = opal_list_get_next(item)) { - subscription = (mca_pml_base_modex_subscription_t *) item; - if (subscription->jobid == name->jobid) { - OPAL_UNLOCK(&mca_pml_base_modex_lock); - return OMPI_SUCCESS; - } + /* check for an existing subscription */ + OPAL_LOCK(&mca_pml_base_modex_lock); + if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) { + for (item = opal_list_get_first(&mca_pml_base_modex_subscriptions); + item != opal_list_get_end(&mca_pml_base_modex_subscriptions); + item = opal_list_get_next(item)) { + subscription = (mca_pml_base_modex_subscription_t *) item; + if (subscription->jobid == name->jobid) { + OPAL_UNLOCK(&mca_pml_base_modex_lock); + return OMPI_SUCCESS; + } + } } - } - OPAL_UNLOCK(&mca_pml_base_modex_lock); + OPAL_UNLOCK(&mca_pml_base_modex_lock); - /* otherwise - subscribe to get this jobid's contact info */ - jobid = name->jobid; + /* otherwise - subscribe to get this jobid's contact info */ + jobid = name->jobid; - if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name, + if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name, OMPI_MODEX_SUBSCRIPTION, jobid))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* attach to the stage-1 standard trigger */ - if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name, + ORTE_ERROR_LOG(rc); + return rc; + } + /* attach to the stage-1 standard trigger */ + if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name, ORTE_STG1_TRIGGER, jobid))) { - ORTE_ERROR_LOG(rc); - free(sub_name); - return rc; - } - /* define the segment */ - if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { - ORTE_ERROR_LOG(rc); - free(sub_name); - free(trig_name); - return rc; - } - if (jobid != orte_process_info.my_name->jobid) { - if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL, + ORTE_ERROR_LOG(rc); + free(sub_name); + return rc; + } + /* define the segment */ + if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { + ORTE_ERROR_LOG(rc); + free(sub_name); + free(trig_name); + return rc; + } + if (jobid != orte_process_info.my_name->jobid) { + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, NULL, NULL, ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_PRE_EXISTING, ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, - segment, - NULL, /* look at all + segment, + NULL, /* look at all * containers on this * segment */ - OMPI_MODEX_KEY, + OMPI_MODEX_KEY, mca_pml_base_modex_registry_callback, NULL))) { - ORTE_ERROR_LOG(rc); - free(sub_name); - free(trig_name); - free(segment); - return rc; - } - } else { - if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name, + ORTE_ERROR_LOG(rc); + free(sub_name); + free(trig_name); + free(segment); + return rc; + } + } else { + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&sub_id, trig_name, sub_name, ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG | ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG, ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, - segment, - NULL, /* look at all + segment, + NULL, /* look at all * containers on this * segment */ - OMPI_MODEX_KEY, + OMPI_MODEX_KEY, mca_pml_base_modex_registry_callback, NULL))) { - ORTE_ERROR_LOG(rc); - free(sub_name); - free(trig_name); - free(segment); - return rc; + ORTE_ERROR_LOG(rc); + free(sub_name); + free(trig_name); + free(segment); + return rc; + } } - } - free(sub_name); - free(trig_name); - free(segment); + free(sub_name); + free(trig_name); + free(segment); - /* add this jobid to our list of subscriptions */ - OPAL_LOCK(&mca_pml_base_modex_lock); - subscription = OBJ_NEW(mca_pml_base_modex_subscription_t); - subscription->jobid = name->jobid; - opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item); - OPAL_UNLOCK(&mca_pml_base_modex_lock); - return OMPI_SUCCESS; + /* add this jobid to our list of subscriptions */ + OPAL_LOCK(&mca_pml_base_modex_lock); + subscription = OBJ_NEW(mca_pml_base_modex_subscription_t); + subscription->jobid = name->jobid; + opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item); + OPAL_UNLOCK(&mca_pml_base_modex_lock); + return OMPI_SUCCESS; } @@ -507,79 +507,79 @@ mca_pml_base_modex_send(mca_base_component_t * source_component, const void *data, size_t size) { - orte_jobid_t jobid; - int rc; - orte_buffer_t buffer; - orte_std_cntr_t i, num_tokens; - char *ptr, *segment, **tokens; - orte_byte_object_t bo; - orte_data_value_t value = ORTE_DATA_VALUE_EMPTY; + orte_jobid_t jobid; + int rc; + orte_buffer_t buffer; + orte_std_cntr_t i, num_tokens; + char *ptr, *segment, **tokens; + orte_byte_object_t bo; + orte_data_value_t value = ORTE_DATA_VALUE_EMPTY; - jobid = ORTE_PROC_MY_NAME->jobid; + jobid = ORTE_PROC_MY_NAME->jobid; - if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { - ORTE_ERROR_LOG(rc); - return rc; - } - if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens, - &num_tokens, orte_process_info.my_name))) { - ORTE_ERROR_LOG(rc); - free(segment); - return rc; - } - OBJ_CONSTRUCT(&buffer, orte_buffer_t); - ptr = source_component->mca_type_name; - if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - ptr = source_component->mca_component_name; - if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (0 != size) { - if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; + if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { + ORTE_ERROR_LOG(rc); + return rc; } - } - if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - OBJ_DESTRUCT(&buffer); + if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens, + &num_tokens, orte_process_info.my_name))) { + ORTE_ERROR_LOG(rc); + free(segment); + return rc; + } + OBJ_CONSTRUCT(&buffer, orte_buffer_t); + ptr = source_component->mca_type_name; + if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + ptr = source_component->mca_component_name; + if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + if (0 != size) { + if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + OBJ_DESTRUCT(&buffer); - /* setup the data_value structure to hold the byte object */ - if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR, - segment, tokens, OMPI_MODEX_KEY, &value); + /* setup the data_value structure to hold the byte object */ + if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR, + segment, tokens, OMPI_MODEX_KEY, &value); cleanup: - free(segment); - for (i = 0; i < num_tokens; i++) { - free(tokens[i]); - tokens[i] = NULL; - } - if (NULL != tokens) - free(tokens); + free(segment); + for (i = 0; i < num_tokens; i++) { + free(tokens[i]); + tokens[i] = NULL; + } + if (NULL != tokens) + free(tokens); - return rc; + return rc; } @@ -593,49 +593,49 @@ mca_pml_base_modex_recv(mca_base_component_t * component, void **buffer, size_t * size) { - mca_pml_base_modex_t *modex; - mca_pml_base_modex_module_t *modex_module; + mca_pml_base_modex_t *modex; + mca_pml_base_modex_module_t *modex_module; - /* check the proc for cached data */ - OPAL_THREAD_LOCK(&proc->proc_lock); - if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { - modex = OBJ_NEW(mca_pml_base_modex_t); - if (modex == NULL) { - OPAL_THREAD_UNLOCK(&proc->proc_lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } - proc->proc_modex = &modex->super; - - /* verify that we have subscribed to this segment */ - OPAL_THREAD_UNLOCK(&proc->proc_lock); - mca_pml_base_modex_subscribe(&proc->proc_name); + /* check the proc for cached data */ OPAL_THREAD_LOCK(&proc->proc_lock); - } - /* lookup/create the module */ - if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) { - OPAL_THREAD_UNLOCK(&proc->proc_lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } - /* wait until data is available */ - while (modex_module->module_data_avail == false) { - opal_condition_wait(&modex_module->module_data_cond, &proc->proc_lock); - } + if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { + modex = OBJ_NEW(mca_pml_base_modex_t); + if (modex == NULL) { + OPAL_THREAD_UNLOCK(&proc->proc_lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + proc->proc_modex = &modex->super; - /* copy the data out to the user */ - if (modex_module->module_data_size == 0) { - *buffer = NULL; - *size = 0; - } else { - void *copy = malloc(modex_module->module_data_size); - if (copy == NULL) { - return OMPI_ERR_OUT_OF_RESOURCE; + /* verify that we have subscribed to this segment */ + OPAL_THREAD_UNLOCK(&proc->proc_lock); + mca_pml_base_modex_subscribe(&proc->proc_name); + OPAL_THREAD_LOCK(&proc->proc_lock); } - memcpy(copy, modex_module->module_data, modex_module->module_data_size); - *buffer = copy; - *size = modex_module->module_data_size; - } - OPAL_THREAD_UNLOCK(&proc->proc_lock); - return OMPI_SUCCESS; + /* lookup/create the module */ + if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) { + OPAL_THREAD_UNLOCK(&proc->proc_lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + /* wait until data is available */ + while (modex_module->module_data_avail == false) { + opal_condition_wait(&modex_module->module_data_cond, &proc->proc_lock); + } + + /* copy the data out to the user */ + if (modex_module->module_data_size == 0) { + *buffer = NULL; + *size = 0; + } else { + void *copy = malloc(modex_module->module_data_size); + if (copy == NULL) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + memcpy(copy, modex_module->module_data, modex_module->module_data_size); + *buffer = copy; + *size = modex_module->module_data_size; + } + OPAL_THREAD_UNLOCK(&proc->proc_lock); + return OMPI_SUCCESS; } @@ -649,38 +649,38 @@ mca_pml_base_modex_recv_nb(mca_base_component_t * component, mca_pml_base_modex_cb_fn_t cbfunc, void *cbdata) { - mca_pml_base_modex_t *modex; - mca_pml_base_modex_module_t *module; - mca_pml_base_modex_cb_t *cb; + mca_pml_base_modex_t *modex; + mca_pml_base_modex_module_t *module; + mca_pml_base_modex_cb_t *cb; - /* check the proc for existing modex */ - OPAL_THREAD_LOCK(&proc->proc_lock); - if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { - modex = OBJ_NEW(mca_pml_base_modex_t); - if (modex == NULL) { - OPAL_THREAD_UNLOCK(&proc->proc_lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } - proc->proc_modex = &modex->super; - - /* verify that we have subscribed to this segment */ - OPAL_THREAD_UNLOCK(&proc->proc_lock); - mca_pml_base_modex_subscribe(&proc->proc_name); + /* check the proc for existing modex */ OPAL_THREAD_LOCK(&proc->proc_lock); - } - /* lookup/create the module */ - if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) { + if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) { + modex = OBJ_NEW(mca_pml_base_modex_t); + if (modex == NULL) { + OPAL_THREAD_UNLOCK(&proc->proc_lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + proc->proc_modex = &modex->super; + + /* verify that we have subscribed to this segment */ + OPAL_THREAD_UNLOCK(&proc->proc_lock); + mca_pml_base_modex_subscribe(&proc->proc_name); + OPAL_THREAD_LOCK(&proc->proc_lock); + } + /* lookup/create the module */ + if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) { + OPAL_THREAD_UNLOCK(&proc->proc_lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + /* register the callback */ + cb = OBJ_NEW(mca_pml_base_modex_cb_t); + cb->component = component; + cb->cbfunc = cbfunc; + cb->cbdata = cbdata; + opal_list_append(&module->module_cbs, (opal_list_item_t *) cb); OPAL_THREAD_UNLOCK(&proc->proc_lock); - return OMPI_ERR_OUT_OF_RESOURCE; - } - /* register the callback */ - cb = OBJ_NEW(mca_pml_base_modex_cb_t); - cb->component = component; - cb->cbfunc = cbfunc; - cb->cbdata = cbdata; - opal_list_append(&module->module_cbs, (opal_list_item_t *) cb); - OPAL_THREAD_UNLOCK(&proc->proc_lock); - return OMPI_SUCCESS; + return OMPI_SUCCESS; } @@ -692,5 +692,5 @@ mca_pml_base_modex_recv_nb(mca_base_component_t * component, int mca_pml_base_modex_exchange(void) { - return mca_pml_base_modex_subscribe(orte_process_info.my_name); + return mca_pml_base_modex_subscribe(orte_process_info.my_name); }