1
1

interface to post a callback for notification of change to modex data

This commit was SVN r9753.
Этот коммит содержится в:
Tim Woodall 2006-04-27 16:15:35 +00:00
родитель 4fd2a71b6c
Коммит 02d991532f
2 изменённых файлов: 111 добавлений и 34 удалений

Просмотреть файл

@ -40,6 +40,23 @@
*
*/
struct mca_pml_base_modex_cb_t {
opal_list_item_t super;
mca_base_component_t* component;
mca_pml_base_modex_cb_fn_t cbfunc;
void* cbdata;
};
typedef struct mca_pml_base_modex_cb_t mca_pml_base_modex_cb_t;
OBJ_CLASS_INSTANCE(
mca_pml_base_modex_cb_t,
opal_list_item_t,
NULL,
NULL
);
/**
* mca_pml_base_modex_module_t
*
@ -52,13 +69,16 @@ struct mca_pml_base_modex_module_t {
void *module_data;
size_t module_data_size;
bool module_data_avail;
opal_list_t module_cbs;
opal_condition_t module_data_cond;
};
typedef struct mca_pml_base_modex_module_t mca_pml_base_modex_module_t;
static void mca_pml_base_modex_module_construct(mca_pml_base_modex_module_t *module)
{
OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t);
OBJ_CONSTRUCT(&module->module_cbs, opal_list_t);
memset(&module->component, 0, sizeof(module->component));
module->module_data = NULL;
module->module_data_size = 0;
@ -270,11 +290,13 @@ static void mca_pml_base_modex_registry_callback(
keyval = value->keyvals;
for (j=0; j < value->cnt; j++) {
orte_buffer_t buffer;
opal_list_item_t* item;
char *ptr;
void* bytes = NULL;
size_t cnt;
size_t num_bytes;
orte_byte_object_t *bo;
if(strcmp(keyval[j]->key,OMPI_MODEX_KEY) != 0)
continue;
@ -347,16 +369,15 @@ static void mca_pml_base_modex_registry_callback(
modex_module->module_data = bytes;
modex_module->module_data_size = num_bytes;
modex_module->module_data_avail = true;
#if 0
opal_output(0, "[%lu,%lu,%lu] mca_pml_base_modex_registry_callback: %s-%s-%d-%d received %d bytes\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
component.mca_type_name,
component.mca_component_name,
component.mca_component_major_version,
component.mca_component_minor_version,
num_bytes);
#endif
opal_condition_signal(&modex_module->module_data_cond);
/* call any registered callbacks */
for(item = opal_list_get_first(&modex_module->module_cbs);
item != opal_list_get_end(&modex_module->module_cbs);
item = opal_list_get_next(item)) {
mca_pml_base_modex_cb_t* cb = (mca_pml_base_modex_cb_t*)item;
cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata);
}
}
OPAL_THREAD_UNLOCK(&proc->proc_lock);
} /* convert string to process name */
@ -514,22 +535,6 @@ int mca_pml_base_modex_send(
return rc;
}
#if 0
value.byteobject.size = size;
value.byteobject.bytes = (void *)malloc(size);
if(NULL == value.byteobject.bytes) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy((value.byteobject.bytes, data, size));
asprintf(&((value->keyvals[0])->key), "modex-%s-%s-%d-%d",
source_component->mca_type_name,
source_component->mca_component_name,
source_component->mca_component_major_version,
source_component->mca_component_minor_version);
#else
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
ptr = source_component->mca_type_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
@ -571,7 +576,6 @@ int mca_pml_base_modex_send(
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#endif
rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
segment, tokens, OMPI_MODEX_KEY, &value);
@ -625,14 +629,6 @@ int mca_pml_base_modex_recv(
/* wait until data is available */
while(modex_module->module_data_avail == false) {
#if 0
opal_output(0, "[%lu,%lu,%lu] mca_pml_base_modex_registry_callback: waiting for %s-%s-%d-%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
component->mca_type_name,
component->mca_component_name,
component->mca_component_major_version,
component->mca_component_minor_version);
#endif
opal_condition_wait(&modex_module->module_data_cond, &proc->proc_lock);
}
@ -654,6 +650,53 @@ opal_output(0, "[%lu,%lu,%lu] mca_pml_base_modex_registry_callback: waiting for
}
/**
*
*/
int mca_pml_base_modex_recv_nb(
mca_base_component_t *component,
ompi_proc_t *proc,
mca_pml_base_modex_cb_fn_t cbfunc,
void *cbdata)
{
mca_pml_base_modex_t* modex;
mca_pml_base_modex_module_t* module;
mca_pml_base_modex_cb_t* cb;
/* check the proc for existing modex */
OPAL_THREAD_LOCK(&proc->proc_lock);
if(NULL == (modex = (mca_pml_base_modex_t*)proc->proc_modex)) {
modex = OBJ_NEW(mca_pml_base_modex_t);
if(modex == NULL) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
proc->proc_modex = &modex->super;
/* verify that we have subscribed to this segment */
OPAL_THREAD_UNLOCK(&proc->proc_lock);
mca_pml_base_modex_subscribe(&proc->proc_name);
OPAL_THREAD_LOCK(&proc->proc_lock);
}
/* lookup/create the module */
if(NULL == (module = mca_pml_base_modex_create_module(modex, component))) {
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* register the callback */
cb = OBJ_NEW(mca_pml_base_modex_cb_t);
cb->component = component;
cb->cbfunc = cbfunc;
cb->cbdata = cbdata;
opal_list_append(&module->module_cbs, (opal_list_item_t*)cb);
OPAL_THREAD_UNLOCK(&proc->proc_lock);
return OMPI_SUCCESS;
}
/**
* Subscribe to the segment corresponding
* to this job.
@ -663,3 +706,4 @@ int mca_pml_base_modex_exchange(void)
{
return mca_pml_base_modex_subscribe(orte_process_info.my_name);
}

Просмотреть файл

@ -128,6 +128,39 @@ OMPI_DECLSPEC int mca_pml_base_modex_recv(mca_base_component_t *dest_component,
struct ompi_proc_t *source_proc,
void **buffer, size_t *size);
/**
* Register to receive a callback on change to module specific data.
*
* @param dest_component A pointer to this module's component struct
* (i.e., mca_base_component_t instance).
* @param source_proc Peer process to receive from.
* @param buffer A pointer to a (void*) that will be filled with a
* pointer to the received buffer.
* @param size Pointer to a size_t that will be filled with the
* number of bytes of each instance in the buffer.
* @param count Pointer to an int that will be filled with the
* number of instances in the buffer.
*
* @retval OMPI_SUCCESS If a corresponding module buffer is found and
* is successfully returned to the caller.
* @retval OMPI_ERR_OUT_OF_RESOURCE If no corresponding module buffer is found,
* or if an error occurs wil returning the buffer to the caller.
*
*/
typedef void (*mca_pml_base_modex_cb_fn_t)(
mca_base_component_t *component,
struct ompi_proc_t* proc,
void* buffer,
size_t size,
void* cbdata);
OMPI_DECLSPEC int mca_pml_base_modex_recv_nb(
mca_base_component_t *component,
struct ompi_proc_t* proc,
mca_pml_base_modex_cb_fn_t cbfunc,
void* cbdata);
/*
* Called to subscribe to registry.
*/