- made lam_mutex_t a lam_object
- implemented module exchange - debugging p2p init - added condition variables - added threading test code This commit was SVN r817.
Этот коммит содержится в:
родитель
366b325f24
Коммит
1f915b0b4a
@ -738,6 +738,8 @@ AC_CONFIG_FILES([
|
||||
test/support/Makefile
|
||||
test/lam/Makefile
|
||||
test/lam/lfc/Makefile
|
||||
test/lam/threads/Makefile
|
||||
test/lam/util/Makefile
|
||||
test/mpi/Makefile
|
||||
test/mpi/environment/Makefile
|
||||
test/unit/Makefile
|
||||
|
@ -21,13 +21,12 @@ static void lam_hash_table_construct(lam_hash_table_t* ht);
|
||||
static void lam_hash_table_destruct(lam_hash_table_t* ht);
|
||||
|
||||
|
||||
lam_class_t lam_hash_table_t_class = {
|
||||
"lam_hash_table_t",
|
||||
OBJ_CLASS(lam_object_t),
|
||||
(lam_construct_t)lam_hash_table_construct,
|
||||
(lam_destruct_t)lam_hash_table_destruct
|
||||
};
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
lam_hash_table_t,
|
||||
lam_object_t,
|
||||
lam_hash_table_construct,
|
||||
lam_hash_table_destruct
|
||||
);
|
||||
|
||||
|
||||
static void lam_hash_table_construct(lam_hash_table_t* ht)
|
||||
@ -65,8 +64,10 @@ int lam_hash_table_init(lam_hash_table_t* ht, size_t table_size)
|
||||
ht->ht_table = realloc(ht->ht_table, power2 * sizeof(lam_list_t));
|
||||
if(NULL == ht->ht_table)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
for(i=ht->ht_table_size; i<power2; i++)
|
||||
OBJ_CONSTRUCT(ht->ht_table+i, lam_list_t);
|
||||
for(i=ht->ht_table_size; i<power2; i++) {
|
||||
lam_list_t* list = ht->ht_table+i;
|
||||
OBJ_CONSTRUCT(list, lam_list_t);
|
||||
}
|
||||
ht->ht_table_size = power2;
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
@ -21,12 +21,12 @@ lam_class_t lam_list_item_t_class = {
|
||||
static void lam_list_construct(lam_list_t*);
|
||||
static void lam_list_destruct(lam_list_t*);
|
||||
|
||||
lam_class_t lam_list_t_class = {
|
||||
"lam_list_t",
|
||||
OBJ_CLASS(lam_object_t),
|
||||
(lam_construct_t) lam_list_construct,
|
||||
(lam_destruct_t) lam_list_destruct
|
||||
};
|
||||
OBJ_CLASS_INSTANCE(
|
||||
lam_list_t,
|
||||
lam_object_t,
|
||||
lam_list_construct,
|
||||
lam_list_destruct
|
||||
);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -416,7 +416,7 @@ static inline int lam_obj_update(lam_object_t *object, int inc)
|
||||
do {
|
||||
oldval = *addr;
|
||||
newval = oldval + inc;
|
||||
} while (lam_atomic_cmpset_int(addr, oldval, newval) != 0);
|
||||
} while (lam_atomic_cmpset_int(addr, oldval, newval) == 0);
|
||||
|
||||
return newval;
|
||||
}
|
||||
|
@ -27,9 +27,9 @@ lam_class_t lam_pointer_array_t_class = {
|
||||
/**
|
||||
* lam_pointer_array constructor
|
||||
*/
|
||||
void lam_pointer_array_construct(lam_pointer_array_t *array){
|
||||
|
||||
lam_mutex_init(&array->lock);
|
||||
void lam_pointer_array_construct(lam_pointer_array_t *array)
|
||||
{
|
||||
OBJ_CONSTRUCT(&array->lock, lam_mutex_t);
|
||||
array->lowest_free = 0;
|
||||
array->number_free = 0;
|
||||
array->size = 0;
|
||||
@ -45,9 +45,7 @@ void lam_pointer_array_destruct(lam_pointer_array_t *array){
|
||||
if( NULL != array->addr)
|
||||
free(array->addr);
|
||||
|
||||
lam_mutex_destroy(&array->lock);
|
||||
|
||||
/* return */
|
||||
OBJ_DESTRUCT(&array->lock);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,7 @@ lam_class_t lam_free_lists_t_class = {
|
||||
|
||||
void lam_free_lists_construct(lam_free_lists_t *flist)
|
||||
{
|
||||
lam_mutex_init(&flist->fl_lock);
|
||||
OBJ_CONSTRUCT(&flist->fl_lock, lam_mutex_t);
|
||||
flist->fl_pool = NULL;
|
||||
flist->fl_elt_cls = NULL;
|
||||
flist->fl_description = NULL;
|
||||
|
@ -33,14 +33,14 @@ lam_class_t shmem_pool_t_class = {
|
||||
void lam_mp_construct(lam_mem_pool_t *pool)
|
||||
{
|
||||
pool->mp_private_alloc = OBJ_NEW(lam_allocator_t);
|
||||
lam_mutex_init(&(pool->mp_lock));
|
||||
OBJ_CONSTRUCT(&pool->mp_lock, lam_mutex_t);
|
||||
pool->mp_dev_alloc = NULL;
|
||||
}
|
||||
|
||||
void lam_mp_shared_construct(lam_mem_pool_t *pool)
|
||||
{
|
||||
pool->mp_private_alloc = OBJ_NEW(lam_allocator_t);
|
||||
lam_mutex_init(&(pool->mp_lock));
|
||||
OBJ_CONSTRUCT(&pool->mp_lock, lam_mutex_t);
|
||||
lam_allocator_set_is_shared(pool->mp_private_alloc, 1);
|
||||
lam_allocator_set_mem_prot(pool->mp_private_alloc, MMAP_SHARED_PROT);
|
||||
pool->mp_dev_alloc = NULL;
|
||||
@ -51,7 +51,7 @@ void lam_mp_destruct(lam_mem_pool_t *pool)
|
||||
if ( pool->mp_dev_alloc )
|
||||
OBJ_RELEASE(pool->mp_dev_alloc);
|
||||
OBJ_RELEASE(pool->mp_private_alloc);
|
||||
|
||||
OBJ_DESTRUCT(&pool->mp_lock);
|
||||
}
|
||||
|
||||
int lam_mp_construct_with(lam_mem_pool_t *pool, uint64_t pool_size,
|
||||
|
@ -18,7 +18,7 @@ lam_class_t lam_seg_list_t_class = {
|
||||
void lam_sgl_construct(lam_seg_list_t *slist)
|
||||
{
|
||||
OBJ_CONSTRUCT(&slist->sgl_list, lam_list_t);
|
||||
lam_mutex_init(&slist->sgl_lock);
|
||||
OBJ_CONSTRUCT(&slist->sgl_lock, lam_mutex_t);
|
||||
slist->sgl_min_bytes_pushed = 0;
|
||||
slist->sgl_max_bytes_pushed = 0;
|
||||
slist->sgl_bytes_pushed = 0;
|
||||
@ -28,7 +28,8 @@ void lam_sgl_construct(lam_seg_list_t *slist)
|
||||
|
||||
void lam_sgl_destruct(lam_seg_list_t *slist)
|
||||
{
|
||||
OBJ_DESTRUCT(&(slist->sgl_list));
|
||||
OBJ_DESTRUCT(&slist->sgl_list);
|
||||
OBJ_DESTRUCT(&slist->sgl_lock);
|
||||
}
|
||||
|
||||
|
||||
|
@ -105,7 +105,7 @@ lam_cmd_line_t *lam_cmd_line_create(void)
|
||||
only thread that has this instance), there's no need to lock it
|
||||
right now. */
|
||||
|
||||
lam_mutex_init(&cmd->lcl_mutex);
|
||||
OBJ_CONSTRUCT(&cmd->lcl_mutex, lam_mutex_t);
|
||||
|
||||
/* Initialize the lists */
|
||||
|
||||
|
@ -115,7 +115,7 @@ bool lam_output_init(void)
|
||||
|
||||
/* Initialize the mutex that protects the output */
|
||||
|
||||
lam_mutex_init(&mutex);
|
||||
OBJ_CONSTRUCT(&mutex, lam_mutex_t);
|
||||
initialized = true;
|
||||
|
||||
/* Open the default verbose stream */
|
||||
|
@ -70,7 +70,7 @@ static inline lam_reactor_descriptor_t* lam_reactor_get_descriptor(lam_reactor_t
|
||||
|
||||
void lam_reactor_construct(lam_reactor_t* r)
|
||||
{
|
||||
lam_mutex_init(&r->r_mutex);
|
||||
OBJ_CONSTRUCT(&r->r_mutex, lam_mutex_t);
|
||||
OBJ_CONSTRUCT(&r->r_active, lam_list_t);
|
||||
OBJ_CONSTRUCT(&r->r_free, lam_list_t);
|
||||
OBJ_CONSTRUCT(&r->r_pending, lam_list_t);
|
||||
@ -93,6 +93,7 @@ void lam_reactor_destruct(lam_reactor_t* r)
|
||||
OBJ_DESTRUCT(&r->r_free);
|
||||
OBJ_DESTRUCT(&r->r_pending);
|
||||
OBJ_DESTRUCT(&r->r_hash);
|
||||
OBJ_DESTRUCT(&r->r_mutex);
|
||||
}
|
||||
|
||||
|
||||
|
@ -67,8 +67,10 @@ extern "C" {
|
||||
|
||||
/* mca_base_module_compare.c */
|
||||
|
||||
int mca_base_module_compare(mca_base_module_priority_list_item_t *a,
|
||||
int mca_base_module_compare_priority(mca_base_module_priority_list_item_t *a,
|
||||
mca_base_module_priority_list_item_t *b);
|
||||
int mca_base_module_compare(mca_base_module_t *a,
|
||||
mca_base_module_t *b);
|
||||
|
||||
/* mca_base_module_find.c */
|
||||
|
||||
|
@ -22,8 +22,9 @@
|
||||
* may help the gentle reader to consider this an inverse comparison.
|
||||
* :-)
|
||||
*/
|
||||
int mca_base_module_compare(mca_base_module_priority_list_item_t *a,
|
||||
mca_base_module_priority_list_item_t *b)
|
||||
int mca_base_module_compare_priority(
|
||||
mca_base_module_priority_list_item_t *a,
|
||||
mca_base_module_priority_list_item_t *b)
|
||||
{
|
||||
int val;
|
||||
|
||||
@ -33,13 +34,14 @@ int mca_base_module_compare(mca_base_module_priority_list_item_t *a,
|
||||
return -1;
|
||||
else if (a->mpli_priority < b->mpli_priority)
|
||||
return 1;
|
||||
else {
|
||||
mca_base_module_t *aa = a->mpli_module;
|
||||
mca_base_module_t *bb = b->mpli_module;
|
||||
else
|
||||
return mca_base_module_compare(a->mpli_module, b->mpli_module);
|
||||
}
|
||||
|
||||
int mca_base_module_compare(mca_base_module_t* aa, mca_base_module_t* bb)
|
||||
{
|
||||
/* The priorities were equal, so compare the names */
|
||||
|
||||
val = strncmp(aa->mca_module_name, bb->mca_module_name,
|
||||
int val = strncmp(aa->mca_module_name, bb->mca_module_name,
|
||||
MCA_BASE_MAX_MODULE_NAME_LEN);
|
||||
if (val != 0)
|
||||
return -val;
|
||||
@ -60,9 +62,6 @@ int mca_base_module_compare(mca_base_module_priority_list_item_t *a,
|
||||
return -1;
|
||||
else if (aa->mca_module_release_version < bb->mca_module_release_version)
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* They're equal */
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2,23 +2,285 @@
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include <string.h>
|
||||
#include "lam_config.h"
|
||||
|
||||
#include "lam/lfc/lam_hash_table.h"
|
||||
#include "lam/util/output.h"
|
||||
#include "mpi/proc/proc.h"
|
||||
#include "mca/mca.h"
|
||||
#include "mca/lam/base/base.h"
|
||||
#include "mca/lam/oob/oob.h"
|
||||
#include "mca/lam/base/mca_base_module_exchange.h"
|
||||
|
||||
|
||||
int mca_base_modex_send(mca_base_module_t *source_module,
|
||||
void *buffer, size_t size, size_t count)
|
||||
|
||||
/**
|
||||
* mca_base_modex_module_t
|
||||
*
|
||||
* Data for a specic proc and module.
|
||||
*/
|
||||
|
||||
struct mca_base_modex_module_t {
|
||||
lam_list_item_t super;
|
||||
mca_base_module_t *module;
|
||||
void *module_data;
|
||||
size_t module_data_size;
|
||||
};
|
||||
typedef struct mca_base_modex_module_t mca_base_modex_module_t;
|
||||
|
||||
static void mca_base_modex_module_construct(mca_base_modex_module_t *module)
|
||||
{
|
||||
return LAM_SUCCESS;
|
||||
module->module = NULL;
|
||||
module->module_data = NULL;
|
||||
module->module_data_size = 0;
|
||||
}
|
||||
|
||||
static void mca_base_modex_module_destruct(mca_base_modex_module_t *module)
|
||||
{
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_base_modex_module_t,
|
||||
lam_list_item_t,
|
||||
mca_base_modex_module_construct,
|
||||
mca_base_modex_module_destruct
|
||||
);
|
||||
|
||||
/**
|
||||
* mca_base_modex_t
|
||||
*
|
||||
* List of modules (mca_base_modex_module_t) for which data has been
|
||||
* received from peers.
|
||||
*/
|
||||
struct mca_base_modex_t {
|
||||
lam_object_t super;
|
||||
lam_list_t modex_modules;
|
||||
lam_mutex_t modex_lock;
|
||||
};
|
||||
typedef struct mca_base_modex_t mca_base_modex_t;
|
||||
|
||||
static void mca_base_modex_construct(mca_base_modex_t* modex)
|
||||
{
|
||||
OBJ_CONSTRUCT(&modex->modex_lock, lam_mutex_t);
|
||||
OBJ_CONSTRUCT(&modex->modex_modules, lam_list_t);
|
||||
}
|
||||
|
||||
static void mca_base_modex_destruct(mca_base_modex_t* modex)
|
||||
{
|
||||
OBJ_DESTRUCT(&modex->modex_modules);
|
||||
OBJ_DESTRUCT(&modex->modex_lock);
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_base_modex_t,
|
||||
lam_object_t,
|
||||
mca_base_modex_construct,
|
||||
mca_base_modex_destruct
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* Look to see if there is any data associated with a specified module.
|
||||
*/
|
||||
|
||||
static inline mca_base_modex_module_t* mca_base_modex_lookup_module(
|
||||
mca_base_modex_t* modex,
|
||||
mca_base_module_t* module)
|
||||
{
|
||||
mca_base_modex_module_t* modex_module;
|
||||
THREAD_LOCK(&modex->modex_lock);
|
||||
for(modex_module = (mca_base_modex_module_t*)lam_list_get_first(&modex->modex_modules);
|
||||
modex_module != (mca_base_modex_module_t*)lam_list_get_end(&modex->modex_modules);
|
||||
modex_module = (mca_base_modex_module_t*)lam_list_get_next(modex_module)) {
|
||||
if(mca_base_module_compare(modex_module->module, module) == 0) {
|
||||
THREAD_UNLOCK(&modex->modex_lock);
|
||||
return modex_module;
|
||||
}
|
||||
}
|
||||
THREAD_UNLOCK(&modex->modex_lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int mca_base_modex_recv(mca_base_module_t *dest_module,
|
||||
lam_proc_t *source_proc,
|
||||
void **buffer, size_t *size, size_t *count)
|
||||
/**
|
||||
* Create a placeholder for data associated with the specified module.
|
||||
*/
|
||||
|
||||
static inline mca_base_modex_module_t* mca_base_modex_create_module(
|
||||
mca_base_modex_t* modex,
|
||||
mca_base_module_t* module)
|
||||
{
|
||||
return LAM_ERR_NOT_IMPLEMENTED;
|
||||
mca_base_modex_module_t* modex_module;
|
||||
THREAD_LOCK(&modex->modex_lock);
|
||||
if(NULL == (modex_module = mca_base_modex_lookup_module(modex, module))) {
|
||||
modex_module = OBJ_NEW(mca_base_modex_module_t);
|
||||
if(NULL != modex_module) {
|
||||
modex_module->module = module;
|
||||
lam_list_append(&modex->modex_modules, (lam_list_item_t*)modex_module);
|
||||
}
|
||||
}
|
||||
THREAD_UNLOCK(&modex->modex_lock);
|
||||
return modex_module;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Store the data associated with the specified module on the
|
||||
* local process, which will be exchanged with all other processes
|
||||
* during mca_base_modex_exchange().
|
||||
*/
|
||||
|
||||
int mca_base_modex_send(mca_base_module_t *source_module, const void *buffer, size_t size)
|
||||
{
|
||||
lam_proc_t *self = lam_proc_local();
|
||||
mca_base_modex_t* modex;
|
||||
mca_base_modex_module_t* modex_module;
|
||||
|
||||
if(NULL == self)
|
||||
return LAM_ERROR;
|
||||
|
||||
if(NULL == (modex = self->proc_modex)) {
|
||||
self->proc_modex = modex = OBJ_NEW(mca_base_modex_t);
|
||||
}
|
||||
|
||||
if(NULL == (modex_module = mca_base_modex_create_module(modex, source_module)))
|
||||
return LAM_ERROR;
|
||||
|
||||
modex_module->module_data = malloc(size);
|
||||
if(NULL == modex_module->module_data)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
memcpy(modex_module->module_data, buffer, size);
|
||||
modex_module->module_data_size = size;
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Retreive the data for the specified module from the source process.
|
||||
* This (data) should have already been cached on the process during
|
||||
* mca_base_modex_exchange().
|
||||
*/
|
||||
|
||||
int mca_base_modex_recv(mca_base_module_t *module, lam_proc_t *source_proc, void **buffer, size_t *size)
|
||||
{
|
||||
mca_base_modex_t* modex;
|
||||
mca_base_modex_module_t* modex_module;
|
||||
void *copy;
|
||||
|
||||
if(NULL == (modex = source_proc->proc_modex) ||
|
||||
NULL == (modex_module = mca_base_modex_lookup_module(modex, module)))
|
||||
return LAM_ERR_NOT_FOUND;
|
||||
|
||||
if(0 == modex_module->module_data_size) {
|
||||
*buffer = NULL;
|
||||
*size = 0;
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
copy = malloc(modex_module->module_data_size);
|
||||
if(NULL == copy)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
memcpy(copy, modex_module->module_data, modex_module->module_data_size);
|
||||
*buffer = copy;
|
||||
*size = modex_module->module_data_size;
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Iterate over all modules for which the local proc has data associated
|
||||
* with it, and exchange this with all other currently known processes.
|
||||
* Note that we will have to expand this as procs are added/deleted...
|
||||
*/
|
||||
|
||||
int mca_base_modex_exchange(void)
|
||||
{
|
||||
lam_proc_t *self = lam_proc_local();
|
||||
mca_base_modex_t* modex;
|
||||
mca_base_modex_module_t* self_module;
|
||||
size_t nprocs;
|
||||
lam_proc_t **procs = lam_proc_all(&nprocs);
|
||||
|
||||
if(nprocs <= 1) {
|
||||
if(procs) free(procs);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
if(NULL == self) {
|
||||
free(procs);
|
||||
return LAM_ERROR;
|
||||
}
|
||||
|
||||
if(NULL == (modex = self->proc_modex)) {
|
||||
self->proc_modex = modex = OBJ_NEW(mca_base_modex_t);
|
||||
}
|
||||
|
||||
/* loop through all modules with data cached on local process and send to all peers */
|
||||
for(self_module = (mca_base_modex_module_t*)lam_list_get_first(&modex->modex_modules);
|
||||
self_module != (mca_base_modex_module_t*)lam_list_get_end(&modex->modex_modules);
|
||||
self_module = (mca_base_modex_module_t*)lam_list_get_next(self_module)) {
|
||||
size_t i;
|
||||
for(i=0; i<nprocs; i++) {
|
||||
lam_proc_t *proc = procs[i];
|
||||
int rc;
|
||||
|
||||
if(proc == self)
|
||||
continue;
|
||||
|
||||
rc = mca_oob.oob_send(
|
||||
proc->proc_job,
|
||||
proc->proc_vpid,
|
||||
MCA_OOB_ANY_TAG,
|
||||
self_module->module_data,
|
||||
self_module->module_data_size);
|
||||
if(rc != LAM_SUCCESS) {
|
||||
free(procs);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* loop through the same modules and receive data from all peers */
|
||||
for(self_module = (mca_base_modex_module_t*)lam_list_get_first(&modex->modex_modules);
|
||||
self_module != (mca_base_modex_module_t*)lam_list_get_end(&modex->modex_modules);
|
||||
self_module = (mca_base_modex_module_t*)lam_list_get_next(self_module)) {
|
||||
size_t i;
|
||||
for(i=0; i<nprocs; i++) {
|
||||
lam_proc_t *proc = procs[i];
|
||||
mca_base_modex_module_t* proc_module;
|
||||
int tag = MCA_OOB_ANY_TAG;
|
||||
int rc;
|
||||
|
||||
if(proc == self)
|
||||
continue;
|
||||
|
||||
if(NULL == proc->proc_modex) {
|
||||
proc->proc_modex = OBJ_NEW(mca_base_modex_t);
|
||||
if(NULL == proc->proc_modex) {
|
||||
free(procs);
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
}
|
||||
proc_module = mca_base_modex_create_module(proc->proc_modex, self_module->module);
|
||||
if(NULL == proc_module) {
|
||||
free(procs);
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
rc = mca_oob.oob_recv(
|
||||
proc->proc_job,
|
||||
proc->proc_vpid,
|
||||
&tag,
|
||||
&proc_module->module_data,
|
||||
&proc_module->module_data_size);
|
||||
if(rc != LAM_SUCCESS) {
|
||||
free(procs);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
free(procs);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
@ -57,18 +57,6 @@ extern "C" {
|
||||
* this buffer can therefore open a socket to the indicated IP
|
||||
* address and TCP port.
|
||||
*
|
||||
* The total number of bytes sent is (size * count). The count
|
||||
* argument is currently superfulous, since corresponding modules,
|
||||
* by definition, should be able to deduce how many instances of
|
||||
* meta data are in the received buffer either by dividing the total
|
||||
* bytes received by the size of the module's meta data struct, or
|
||||
* by analyzing the content in the received buffer (if it is
|
||||
* self-describing data). It is included in the interface to
|
||||
* provide flexability for modules that may require sending
|
||||
* multiple, logically separate instances of meta data (e.g., a PTL
|
||||
* module type that sends one meta data struct for each of the three
|
||||
* TCP NICs on its node).
|
||||
*
|
||||
* During the selection process, the MCA framework will effectively
|
||||
* perform an "allgather" operation of all modex buffers; every
|
||||
* buffer will be available to every peer process (see
|
||||
@ -84,7 +72,7 @@ extern "C" {
|
||||
* regardless of pointer sizes or endian bias.
|
||||
*/
|
||||
int mca_base_modex_send(mca_base_module_t *source_module,
|
||||
void *buffer, size_t size, size_t count);
|
||||
const void *buffer, size_t size);
|
||||
|
||||
/**
|
||||
* Receive a module-specific buffer from a corresponding MCA module
|
||||
@ -121,7 +109,12 @@ extern "C" {
|
||||
*/
|
||||
int mca_base_modex_recv(mca_base_module_t *dest_module,
|
||||
lam_proc_t *source_proc,
|
||||
void **buffer, size_t *size, size_t *count);
|
||||
void **buffer, size_t *size);
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
int mca_base_modex_exchange(void);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
|
@ -14,11 +14,11 @@ lam_class_t mca_pml_base_request_t_class = {
|
||||
|
||||
void mca_pml_base_request_construct(mca_pml_base_request_t* req)
|
||||
{
|
||||
lam_mutex_init(&req->req_lock);
|
||||
OBJ_CONSTRUCT(&req->req_lock, lam_mutex_t);
|
||||
}
|
||||
|
||||
void mca_pml_base_request_destruct(mca_pml_base_request_t* req)
|
||||
{
|
||||
lam_mutex_destroy(&req->req_lock);
|
||||
OBJ_DESTRUCT(&req->req_lock);
|
||||
}
|
||||
|
||||
|
@ -49,9 +49,11 @@ typedef struct {
|
||||
mca_pml_base_request_type_t req_type;
|
||||
/* MPI request status */
|
||||
mca_pml_base_request_status_t req_status;
|
||||
/* persistence indicating if the this is a persistent request */
|
||||
/* completion status */
|
||||
lam_status_public_t req_status_public;
|
||||
/* flag indicating if the this is a persistent request */
|
||||
bool req_persistent;
|
||||
/* flag indicating if MPI is done with this request called */
|
||||
/* flag indicating if MPI is done with this request */
|
||||
bool req_mpi_done;
|
||||
/* flag indicating if the pt-2-pt layer is done with this request */
|
||||
bool req_pml_layer_done;
|
||||
|
@ -130,6 +130,7 @@ int mca_pml_base_select(mca_pml_t *selected, bool *allow_multi_user_threads,
|
||||
/* Save the winner */
|
||||
|
||||
mca_pml_base_selected_module = *best_module;
|
||||
mca_pml = *actions;
|
||||
*selected = *actions;
|
||||
*allow_multi_user_threads = best_user_threads;
|
||||
*have_hidden_threads = best_hidden_threads;
|
||||
|
@ -97,6 +97,16 @@ typedef int (*mca_pml_base_irecv_fn_t)(
|
||||
struct lam_request_t **request
|
||||
);
|
||||
|
||||
typedef int (*mca_pml_base_recv_fn_t)(
|
||||
void *buf,
|
||||
size_t size,
|
||||
struct lam_datatype_t *datatype,
|
||||
int src,
|
||||
int tag,
|
||||
struct lam_communicator_t* comm,
|
||||
lam_status_public_t* status
|
||||
);
|
||||
|
||||
typedef int (*mca_pml_base_isend_init_fn_t)(
|
||||
void *buf,
|
||||
size_t size,
|
||||
@ -120,6 +130,16 @@ typedef int (*mca_pml_base_isend_fn_t)(
|
||||
struct lam_request_t **request
|
||||
);
|
||||
|
||||
typedef int (*mca_pml_base_send_fn_t)(
|
||||
void *buf,
|
||||
size_t size,
|
||||
struct lam_datatype_t *datatype,
|
||||
int dst,
|
||||
int tag,
|
||||
mca_pml_base_send_mode_t mode,
|
||||
struct lam_communicator_t* comm
|
||||
);
|
||||
|
||||
typedef int (*mca_pml_base_start_fn_t)(
|
||||
lam_request_t* request
|
||||
);
|
||||
@ -153,14 +173,17 @@ struct mca_pml_1_0_0_t {
|
||||
/* downcalls from MPI to PML */
|
||||
mca_pml_base_irecv_init_fn_t pml_irecv_init;
|
||||
mca_pml_base_irecv_fn_t pml_irecv;
|
||||
mca_pml_base_recv_fn_t pml_recv;
|
||||
mca_pml_base_isend_init_fn_t pml_isend_init;
|
||||
mca_pml_base_isend_fn_t pml_isend;
|
||||
mca_pml_base_send_fn_t pml_send;
|
||||
mca_pml_base_start_fn_t pml_start;
|
||||
mca_pml_base_test_fn_t pml_test;
|
||||
mca_pml_base_wait_fn_t pml_wait;
|
||||
};
|
||||
typedef struct mca_pml_1_0_0_t mca_pml_1_0_0_t;
|
||||
typedef mca_pml_1_0_0_t mca_pml_t;
|
||||
extern mca_pml_t mca_pml;
|
||||
|
||||
/*
|
||||
* Macro for use in modules that are of type pml v1.0.0
|
||||
|
@ -6,15 +6,7 @@
|
||||
#include "pml_ptl_array.h"
|
||||
|
||||
|
||||
lam_class_t mca_pml_teg_array_t_class = {
|
||||
"mca_ptl_array_t",
|
||||
OBJ_CLASS(lam_object_t),
|
||||
(lam_construct_t) mca_ptl_array_construct,
|
||||
(lam_destruct_t) mca_ptl_array_destruct
|
||||
};
|
||||
|
||||
|
||||
void mca_ptl_array_construct(mca_ptl_array_t* array)
|
||||
static void mca_ptl_array_construct(mca_ptl_array_t* array)
|
||||
{
|
||||
array->ptl_procs = 0;
|
||||
array->ptl_size = 0;
|
||||
@ -23,12 +15,18 @@ void mca_ptl_array_construct(mca_ptl_array_t* array)
|
||||
}
|
||||
|
||||
|
||||
void mca_ptl_array_destruct(mca_ptl_array_t* array)
|
||||
static void mca_ptl_array_destruct(mca_ptl_array_t* array)
|
||||
{
|
||||
if(array->ptl_procs != 0)
|
||||
free(array->ptl_procs);
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_pml_teg_ptl_array_t,
|
||||
lam_object_t,
|
||||
mca_ptl_array_construct,
|
||||
mca_ptl_array_destruct
|
||||
);
|
||||
|
||||
int mca_ptl_array_reserve(mca_ptl_array_t* array, size_t size)
|
||||
{
|
||||
@ -36,16 +34,12 @@ int mca_ptl_array_reserve(mca_ptl_array_t* array, size_t size)
|
||||
if(array->ptl_reserve >= size)
|
||||
return LAM_SUCCESS;
|
||||
|
||||
procs = malloc(sizeof(mca_ptl_array_t)*size);
|
||||
if(array == 0)
|
||||
procs = realloc(array->ptl_procs, sizeof(mca_ptl_proc_t)*size);
|
||||
if(NULL == procs)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
if(array->ptl_size) {
|
||||
memcpy(procs, array->ptl_procs, array->ptl_size * sizeof(mca_ptl_proc_t));
|
||||
free(array->ptl_procs);
|
||||
}
|
||||
array->ptl_procs = procs;
|
||||
array->ptl_reserve = size;
|
||||
memset(array->ptl_procs+(size-array->ptl_size), 0, (size-array->ptl_size)*sizeof(mca_ptl_proc_t));
|
||||
memset(array->ptl_procs+array->ptl_size, 0, (size-array->ptl_size)*sizeof(mca_ptl_proc_t));
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@
|
||||
#include "lam/util/output.h"
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
|
||||
extern lam_class_t mca_ptl_array_t_class;
|
||||
extern lam_class_t mca_pml_teg_ptl_array_t_class;
|
||||
|
||||
struct mca_ptl_proc_t {
|
||||
double ptl_weight; /* PTL weight for scheduling */
|
||||
@ -25,11 +25,10 @@ struct mca_ptl_array_t {
|
||||
size_t ptl_index; /* last used index*/
|
||||
};
|
||||
typedef struct mca_ptl_array_t mca_ptl_array_t;
|
||||
typedef struct mca_ptl_array_t mca_pml_teg_ptl_array_t;
|
||||
|
||||
|
||||
void mca_ptl_array_construct(mca_ptl_array_t*);
|
||||
void mca_ptl_array_destruct(mca_ptl_array_t*);
|
||||
int mca_ptl_array_reserve(mca_ptl_array_t*, size_t);
|
||||
int mca_ptl_array_reserve(mca_ptl_array_t*, size_t);
|
||||
|
||||
static inline size_t mca_ptl_array_get_size(mca_ptl_array_t* array)
|
||||
{
|
||||
|
@ -26,8 +26,10 @@ mca_pml_teg_t mca_pml_teg = {
|
||||
mca_pml_teg_progress,
|
||||
mca_pml_teg_irecv_init,
|
||||
mca_pml_teg_irecv,
|
||||
mca_pml_teg_recv,
|
||||
mca_pml_teg_isend_init,
|
||||
mca_pml_teg_isend,
|
||||
mca_pml_teg_send,
|
||||
mca_pml_teg_start,
|
||||
mca_pml_teg_test,
|
||||
mca_pml_teg_wait,
|
||||
@ -121,7 +123,7 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
|
||||
|
||||
/* initialize each proc */
|
||||
mca_pml_proc_t* proc_pml = proc->proc_pml;
|
||||
if(proc_pml == 0) {
|
||||
if(NULL == proc_pml) {
|
||||
|
||||
/* allocate pml specific proc data */
|
||||
proc_pml = OBJ_NEW(mca_pml_teg_proc_t);
|
||||
@ -256,9 +258,8 @@ int mca_pml_teg_del_procs(lam_proc_t** procs, size_t nprocs)
|
||||
}
|
||||
|
||||
/* do any required cleanup */
|
||||
mca_pml_teg_proc_destruct(proc_pml);
|
||||
free(proc_pml);
|
||||
proc->proc_pml = 0;
|
||||
OBJ_RELEASE(proc_pml);
|
||||
proc->proc_pml = NULL;
|
||||
}
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
#ifndef MCA_PML_TEG_H
|
||||
#define MCA_PML_TEG_H
|
||||
|
||||
#include "lam/threads/condition.h"
|
||||
#include "lam/mem/free_list.h"
|
||||
#include "lam/util/cmd_line.h"
|
||||
#include "mpi/request/request.h"
|
||||
@ -32,6 +33,8 @@ struct mca_pml_teg_t {
|
||||
|
||||
lam_list_t teg_procs;
|
||||
lam_mutex_t teg_lock;
|
||||
lam_condition_t teg_condition;
|
||||
int teg_reqs_waiting;
|
||||
|
||||
int teg_free_list_num; /* initial size of free list */
|
||||
int teg_free_list_max; /* maximum size of free list */
|
||||
@ -114,6 +117,16 @@ extern int mca_pml_teg_isend(
|
||||
struct lam_request_t **request
|
||||
);
|
||||
|
||||
extern int mca_pml_teg_send(
|
||||
void *buf,
|
||||
size_t size,
|
||||
struct lam_datatype_t *datatype,
|
||||
int dst,
|
||||
int tag,
|
||||
mca_pml_base_send_mode_t mode,
|
||||
struct lam_communicator_t* comm
|
||||
);
|
||||
|
||||
extern int mca_pml_teg_irecv_init(
|
||||
void *buf,
|
||||
size_t size,
|
||||
@ -135,6 +148,16 @@ extern int mca_pml_teg_irecv(
|
||||
struct lam_request_t **request
|
||||
);
|
||||
|
||||
extern int mca_pml_teg_recv(
|
||||
void *buf,
|
||||
size_t size,
|
||||
struct lam_datatype_t *datatype,
|
||||
int src,
|
||||
int tag,
|
||||
struct lam_communicator_t* comm,
|
||||
lam_status_public_t* status
|
||||
);
|
||||
|
||||
extern int mca_pml_teg_progress(void);
|
||||
|
||||
extern int mca_pml_teg_start(
|
||||
|
@ -62,3 +62,35 @@ int mca_pml_teg_irecv(
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mca_pml_teg_recv(
|
||||
void *addr,
|
||||
size_t length,
|
||||
struct lam_datatype_t *datatype,
|
||||
int src,
|
||||
int tag,
|
||||
struct lam_communicator_t* comm,
|
||||
lam_status_public_t* status)
|
||||
{
|
||||
int rc;
|
||||
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc);
|
||||
if(NULL == recvreq)
|
||||
return rc;
|
||||
|
||||
mca_ptl_base_recv_request_reinit(
|
||||
recvreq,
|
||||
addr,
|
||||
length,
|
||||
datatype,
|
||||
src,
|
||||
tag,
|
||||
comm,
|
||||
false);
|
||||
|
||||
if((rc = mca_pml_teg_recv_request_start(recvreq)) != LAM_SUCCESS) {
|
||||
mca_pml_teg_recv_request_return(recvreq);
|
||||
return rc;
|
||||
}
|
||||
return mca_pml_teg_wait((lam_request_t*)recvreq, status);
|
||||
}
|
||||
|
||||
|
@ -73,3 +73,35 @@ int mca_pml_teg_isend(
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mca_pml_teg_send(
|
||||
void *buf,
|
||||
size_t size,
|
||||
lam_datatype_t *datatype,
|
||||
int dst,
|
||||
int tag,
|
||||
mca_pml_base_send_mode_t sendmode,
|
||||
lam_communicator_t* comm)
|
||||
{
|
||||
int rc;
|
||||
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc);
|
||||
if(rc != LAM_SUCCESS)
|
||||
return rc;
|
||||
|
||||
mca_ptl_base_send_request_reinit(
|
||||
sendreq,
|
||||
buf,
|
||||
size,
|
||||
datatype,
|
||||
dst,
|
||||
tag,
|
||||
comm,
|
||||
sendmode,
|
||||
false
|
||||
);
|
||||
|
||||
if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS)
|
||||
return rc;
|
||||
return mca_pml_teg_wait(&sendreq->super, NULL);
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ static inline int mca_pml_teg_param_register_int(
|
||||
|
||||
int mca_pml_teg_module_open(void)
|
||||
{
|
||||
lam_mutex_init(&mca_pml_teg.teg_lock);
|
||||
OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, lam_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_teg.teg_recv_requests, lam_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_teg.teg_procs, lam_list_t);
|
||||
|
||||
@ -80,7 +80,7 @@ int mca_pml_teg_module_close(void)
|
||||
free(mca_pml_teg.teg_ptls);
|
||||
OBJ_DESTRUCT(&mca_pml_teg.teg_recv_requests);
|
||||
OBJ_DESTRUCT(&mca_pml_teg.teg_procs);
|
||||
lam_mutex_destroy(&mca_pml_teg.teg_lock);
|
||||
OBJ_DESTRUCT(&mca_pml_teg.teg_lock);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
@ -93,6 +93,7 @@ mca_pml_t* mca_pml_teg_module_init(int* priority,
|
||||
*allow_multi_user_threads = true;
|
||||
*have_hidden_threads = false;
|
||||
|
||||
OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, lam_mutex_t);
|
||||
mca_pml_teg.teg_ptl_modules = NULL;
|
||||
mca_pml_teg.teg_num_ptl_modules = 0;
|
||||
mca_pml_teg.teg_ptls = NULL;
|
||||
@ -107,7 +108,6 @@ mca_pml_t* mca_pml_teg_module_init(int* priority,
|
||||
mca_pml_teg.teg_free_list_inc,
|
||||
NULL);
|
||||
|
||||
lam_mutex_init(&mca_pml_teg.teg_lock);
|
||||
mca_pml_teg.teg_recv_sequence = 0;
|
||||
return &mca_pml_teg.super;
|
||||
}
|
||||
|
@ -7,18 +7,13 @@
|
||||
#include "pml_teg_proc.h"
|
||||
#include "pml_ptl_array.h"
|
||||
|
||||
lam_class_t mca_pml_teg_proc_t_class = {
|
||||
"mca_pml_teg_proc_t",
|
||||
OBJ_CLASS(lam_list_item_t),
|
||||
(lam_construct_t) mca_pml_teg_proc_construct,
|
||||
(lam_destruct_t) mca_pml_teg_proc_destruct
|
||||
};
|
||||
|
||||
|
||||
void mca_pml_teg_proc_construct(mca_pml_proc_t* proc)
|
||||
static void mca_pml_teg_proc_construct(mca_pml_proc_t* proc)
|
||||
{
|
||||
mca_ptl_array_construct(&proc->proc_ptl_first);
|
||||
mca_ptl_array_construct(&proc->proc_ptl_next);
|
||||
proc->proc_lam = NULL;
|
||||
OBJ_CONSTRUCT(&proc->proc_lock, lam_mutex_t);
|
||||
OBJ_CONSTRUCT(&proc->proc_ptl_first, mca_pml_teg_ptl_array_t);
|
||||
OBJ_CONSTRUCT(&proc->proc_ptl_next, mca_pml_teg_ptl_array_t);
|
||||
|
||||
THREAD_LOCK(&mca_pml_teg.teg_lock);
|
||||
lam_list_append(&mca_pml_teg.teg_procs, (lam_list_item_t*)proc);
|
||||
@ -26,10 +21,21 @@ void mca_pml_teg_proc_construct(mca_pml_proc_t* proc)
|
||||
}
|
||||
|
||||
|
||||
void mca_pml_teg_proc_destruct(mca_pml_proc_t* proc)
|
||||
static void mca_pml_teg_proc_destruct(mca_pml_proc_t* proc)
|
||||
{
|
||||
THREAD_LOCK(&mca_pml_teg.teg_lock);
|
||||
lam_list_remove_item(&mca_pml_teg.teg_procs, (lam_list_item_t*)proc);
|
||||
THREAD_UNLOCK(&mca_pml_teg.teg_lock);
|
||||
|
||||
OBJ_DESTRUCT(&proc->proc_lock);
|
||||
OBJ_DESTRUCT(&proc->proc_ptl_first);
|
||||
OBJ_DESTRUCT(&proc->proc_ptl_next);
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_pml_teg_proc_t,
|
||||
lam_list_item_t,
|
||||
mca_pml_teg_proc_construct,
|
||||
mca_pml_teg_proc_destruct
|
||||
);
|
||||
|
||||
|
@ -30,9 +30,6 @@ extern lam_class_t mca_pml_teg_proc_t_class;
|
||||
typedef struct mca_pml_proc_t mca_pml_teg_proc_t;
|
||||
|
||||
|
||||
void mca_pml_teg_proc_construct(mca_pml_proc_t*);
|
||||
void mca_pml_teg_proc_destruct(mca_pml_proc_t*);
|
||||
|
||||
static inline mca_pml_proc_t* mca_pml_teg_proc_lookup_local(lam_communicator_t* comm, int rank)
|
||||
{
|
||||
lam_proc_t* proc = comm->c_local_group->grp_proc_pointers[rank];
|
||||
|
@ -1,10 +1,22 @@
|
||||
#include "pml_teg.h"
|
||||
|
||||
|
||||
|
||||
int mca_pml_teg_wait(
|
||||
lam_request_t* request,
|
||||
lam_status_public_t* status)
|
||||
{
|
||||
return LAM_ERROR;
|
||||
#if 0
|
||||
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)request;
|
||||
if(pml_request->req_mpi_done == false) {
|
||||
lam_mutex_lock(&mca_pml_teg.teg_lock);
|
||||
mca_pml_teg.teg_req_waiting++;
|
||||
while(request->req_mpi_done == false)
|
||||
lam_condition_wait(&mca_pml_teg.teg_condition);
|
||||
mca_pml_teg.teg_req_waiting--;
|
||||
lam_mutex_unlock(&mca_pml_teg.teg_lock);
|
||||
}
|
||||
*status = request->req_status;
|
||||
#endif
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ lam_class_t mca_pml_comm_t_class = {
|
||||
static void mca_pml_ptl_comm_construct(mca_pml_comm_t* comm)
|
||||
{
|
||||
OBJ_CONSTRUCT(&comm->c_wild_receives, lam_list_t);
|
||||
lam_mutex_init(&comm->c_wild_lock);
|
||||
OBJ_CONSTRUCT(&comm->c_wild_lock, lam_mutex_t);
|
||||
}
|
||||
|
||||
static void mca_pml_ptl_comm_destruct(mca_pml_comm_t* comm)
|
||||
@ -54,7 +54,7 @@ int mca_pml_ptl_comm_init_size(mca_pml_comm_t* comm, size_t size)
|
||||
if(NULL == comm->c_matching_lock)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
for(i=0; i<size; i++)
|
||||
lam_mutex_init(comm->c_matching_lock+i);
|
||||
OBJ_CONSTRUCT(comm->c_matching_lock+i, lam_mutex_t);
|
||||
|
||||
/* unexpected fragments queues */
|
||||
comm->c_unexpected_frags = malloc(sizeof(lam_list_t) * size);
|
||||
@ -68,7 +68,7 @@ int mca_pml_ptl_comm_init_size(mca_pml_comm_t* comm, size_t size)
|
||||
if(NULL == comm->c_unexpected_frags_lock)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
for(i=0; i<size; i++)
|
||||
lam_mutex_init(comm->c_unexpected_frags_lock+i);
|
||||
OBJ_CONSTRUCT(comm->c_unexpected_frags_lock+i, lam_mutex_t);
|
||||
|
||||
/* out-of-order fragments queues */
|
||||
comm->c_frags_cant_match = malloc(sizeof(lam_list_t) * size);
|
||||
|
@ -97,7 +97,7 @@ int mca_ptl_tcp_module_open(void)
|
||||
mca_ptl_tcp_module.tcp_num_ptls = 0;
|
||||
|
||||
/* initialize objects */
|
||||
lam_mutex_init(&mca_ptl_tcp_module.tcp_lock);
|
||||
OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_lock, lam_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_procs, lam_list_t);
|
||||
OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_acks, lam_list_t);
|
||||
OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_send_requests, lam_free_list_t);
|
||||
@ -139,7 +139,7 @@ int mca_ptl_tcp_module_close(void)
|
||||
OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_send_requests);
|
||||
OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_send_frags);
|
||||
OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_recv_frags);
|
||||
lam_mutex_destroy(&mca_ptl_tcp_module.tcp_lock);
|
||||
OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_lock);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
@ -256,16 +256,15 @@ static int mca_ptl_tcp_module_create_listen(void)
|
||||
static int mca_ptl_tcp_module_exchange(void)
|
||||
{
|
||||
size_t i;
|
||||
mca_ptl_tcp_addr_t *addrs = malloc
|
||||
(mca_ptl_tcp_module.tcp_num_ptls * sizeof(mca_ptl_tcp_addr_t));
|
||||
size_t size = mca_ptl_tcp_module.tcp_num_ptls * sizeof(mca_ptl_tcp_addr_t);
|
||||
mca_ptl_tcp_addr_t *addrs = malloc(size);
|
||||
for(i=0; i<mca_ptl_tcp_module.tcp_num_ptls; i++) {
|
||||
mca_ptl_tcp_t* ptl = mca_ptl_tcp_module.tcp_ptls[i];
|
||||
addrs[i].addr_inet = ptl->ptl_ifaddr.sin_addr;
|
||||
addrs[i].addr_port = mca_ptl_tcp_module.tcp_listen;
|
||||
addrs[i].addr_inuse = 0;
|
||||
}
|
||||
int rc = mca_base_modex_send(&mca_ptl_tcp_module.super.ptlm_version,
|
||||
addrs, sizeof(mca_ptl_tcp_t),mca_ptl_tcp_module.tcp_num_ptls);
|
||||
int rc = mca_base_modex_send(&mca_ptl_tcp_module.super.ptlm_version, addrs, size);
|
||||
free(addrs);
|
||||
return rc;
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer)
|
||||
ptl_peer->peer_state = MCA_PTL_TCP_CLOSED;
|
||||
ptl_peer->peer_retries = 0;
|
||||
OBJ_CONSTRUCT(&ptl_peer->peer_frags, lam_list_t);
|
||||
lam_mutex_init(&ptl_peer->peer_lock);
|
||||
OBJ_CONSTRUCT(&ptl_peer->peer_lock, lam_mutex_t);
|
||||
}
|
||||
|
||||
|
||||
|
@ -29,7 +29,7 @@ void mca_ptl_tcp_proc_construct(mca_ptl_tcp_proc_t* proc)
|
||||
proc->proc_addr_count = 0;
|
||||
proc->proc_peers = 0;
|
||||
proc->proc_peer_count = 0;
|
||||
lam_mutex_init(&proc->proc_lock);
|
||||
OBJ_CONSTRUCT(&proc->proc_lock, lam_mutex_t);
|
||||
|
||||
/* add to list of all proc instance */
|
||||
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);
|
||||
@ -88,18 +88,17 @@ mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_create(lam_proc_t* lam_proc)
|
||||
&mca_ptl_tcp_module.super.ptlm_version,
|
||||
lam_proc,
|
||||
(void**)&ptl_proc->proc_addrs,
|
||||
&size,
|
||||
&ptl_proc->proc_addr_count);
|
||||
&size);
|
||||
if(rc != LAM_SUCCESS) {
|
||||
lam_output(0, "mca_ptl_tcp_proc_create: mca_base_modex_recv: failed with return value=%d", rc);
|
||||
OBJ_RELEASE(ptl_proc);
|
||||
return NULL;
|
||||
}
|
||||
if(size != sizeof(mca_ptl_tcp_addr_t)) {
|
||||
lam_output(0, "mca_ptl_tcp_proc_create: mca_base_modex_recv: size %d != %d",
|
||||
size, sizeof(mca_ptl_tcp_addr_t));
|
||||
if(0 != (size % sizeof(mca_ptl_tcp_addr_t))) {
|
||||
lam_output(0, "mca_ptl_tcp_proc_create: mca_base_modex_recv: invalid size %d\n", size);
|
||||
return NULL;
|
||||
}
|
||||
ptl_proc->proc_addr_count = size / sizeof(mca_ptl_tcp_addr_t);
|
||||
|
||||
/* allocate space for peer array - one for each exported address */
|
||||
ptl_proc->proc_peers = (mca_ptl_base_peer_t**)
|
||||
|
@ -45,6 +45,14 @@ struct lam_communicator_t {
|
||||
typedef struct lam_communicator_t lam_communicator_t;
|
||||
|
||||
|
||||
/**
|
||||
* rank w/in the communicator
|
||||
*/
|
||||
static inline int lam_comm_rank(lam_communicator_t* comm)
|
||||
{
|
||||
return comm->c_my_rank;
|
||||
}
|
||||
|
||||
/* return pointer to communicator associated with context id cid,
|
||||
* No error checking is done*/
|
||||
static inline lam_communicator_t *lam_comm_lookup(uint32_t cid)
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include "mpi.h"
|
||||
#include "mpi/communicator/communicator.h"
|
||||
#include "mpi/group/group.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
|
||||
|
||||
/*
|
||||
@ -65,14 +66,15 @@ int lam_comm_init(void)
|
||||
OBJ_CONSTRUCT(&lam_mpi_comm_world, lam_communicator_t);
|
||||
group = OBJ_NEW(lam_group_t);
|
||||
group->grp_proc_pointers = lam_proc_world(&size);
|
||||
group->grp_my_rank = 0;
|
||||
group->grp_my_rank = lam_proc_local()->proc_vpid ;
|
||||
group->grp_proc_count = size;
|
||||
OBJ_RETAIN(group); /* bump reference count for remote reference */
|
||||
|
||||
lam_mpi_comm_self.c_contextid = 0;
|
||||
lam_mpi_comm_self.c_my_rank = group->grp_my_rank;
|
||||
lam_mpi_comm_world.c_contextid = 0;
|
||||
lam_mpi_comm_world.c_my_rank = group->grp_my_rank;
|
||||
lam_mpi_comm_world.c_local_group = group;
|
||||
lam_mpi_comm_world.c_remote_group = group;
|
||||
mca_pml.pml_add_comm(&lam_mpi_comm_world);
|
||||
|
||||
/* Setup MPI_COMM_SELF */
|
||||
OBJ_CONSTRUCT(&lam_mpi_comm_self, lam_communicator_t);
|
||||
@ -80,13 +82,13 @@ int lam_comm_init(void)
|
||||
group->grp_proc_pointers = lam_proc_self(&size);
|
||||
group->grp_my_rank = 0;
|
||||
group->grp_proc_count = size;
|
||||
OBJ_RETAIN(group); /* bump reference count for remote reference */
|
||||
|
||||
OBJ_RETAIN(group);
|
||||
lam_mpi_comm_self.c_contextid = 1;
|
||||
lam_mpi_comm_self.c_my_rank = group->grp_my_rank;
|
||||
lam_mpi_comm_self.c_local_group = group;
|
||||
lam_mpi_comm_self.c_remote_group = group;
|
||||
|
||||
mca_pml.pml_add_comm(&lam_mpi_comm_self);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include "mpi.h"
|
||||
#include "mpi/interface/c/bindings.h"
|
||||
#include "mpi/communicator/communicator.h"
|
||||
|
||||
#if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES
|
||||
#pragma weak MPI_Comm_rank = PMPI_Comm_rank
|
||||
@ -13,5 +14,7 @@
|
||||
|
||||
|
||||
int MPI_Comm_rank(MPI_Comm comm, int *rank) {
|
||||
*rank = lam_comm_rank(comm);
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,8 @@
|
||||
|
||||
#include "mpi.h"
|
||||
#include "mpi/interface/c/bindings.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
|
||||
|
||||
#if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES
|
||||
#pragma weak MPI_Recv = PMPI_Recv
|
||||
@ -13,5 +15,6 @@
|
||||
|
||||
int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source,
|
||||
int tag, MPI_Comm comm, MPI_Status *status) {
|
||||
return MPI_SUCCESS;
|
||||
/* FIX - error checking */
|
||||
return mca_pml.pml_recv(buf, count, datatype, source, tag, comm, status);
|
||||
}
|
||||
|
@ -6,6 +6,8 @@
|
||||
|
||||
#include "mpi.h"
|
||||
#include "mpi/interface/c/bindings.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
|
||||
|
||||
#if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES
|
||||
#pragma weak MPI_Send = PMPI_Send
|
||||
@ -13,5 +15,6 @@
|
||||
|
||||
int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest,
|
||||
int tag, MPI_Comm comm) {
|
||||
return MPI_SUCCESS;
|
||||
/* FIX - error checking, return value */
|
||||
return mca_pml.pml_send(buf, count, datatype, dest, tag, MCA_PML_BASE_SEND_STANDARD, comm);
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ void lam_proc_construct(lam_proc_t* proc)
|
||||
static int init = 0;
|
||||
if(fetchNset(&init,1) == 0) {
|
||||
OBJ_CONSTRUCT(&lam_proc_list, lam_list_t);
|
||||
lam_mutex_init(&lam_proc_lock);
|
||||
OBJ_CONSTRUCT(&lam_proc_lock, lam_mutex_t);
|
||||
}
|
||||
|
||||
proc->proc_job = NULL;
|
||||
@ -68,7 +68,6 @@ int lam_proc_init(void)
|
||||
lam_output(0, "lam_proc_init: pcm_proc_get_peers failed with errno=%d", rc);
|
||||
return rc;
|
||||
}
|
||||
lam_output(0, "lam_proc_init: pcm_proc_get_peers returned %d peers", nprocs);
|
||||
|
||||
for(i=0; i<nprocs; i++) {
|
||||
lam_job_handle_t job = procs[i].job_handle;
|
||||
@ -76,8 +75,10 @@ int lam_proc_init(void)
|
||||
lam_proc_t *proc = OBJ_NEW(lam_proc_t);
|
||||
proc->proc_job = strdup(job);
|
||||
proc->proc_vpid = vpid;
|
||||
if(proc->proc_vpid == vpid && strcmp(proc->proc_job, job) == 0)
|
||||
if(proc->proc_vpid == local->vpid && strcmp(proc->proc_job, local->job_handle) == 0) {
|
||||
lam_proc_local_proc = proc;
|
||||
lam_output(0, "myrank=%d\n", local->vpid);
|
||||
}
|
||||
}
|
||||
free(procs);
|
||||
return LAM_SUCCESS;
|
||||
|
@ -41,6 +41,8 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
int ret, param, value;
|
||||
bool allow_multi_user_threads;
|
||||
bool have_hidden_threads;
|
||||
lam_proc_t** procs;
|
||||
size_t nprocs;
|
||||
|
||||
/* Become a LAM process */
|
||||
|
||||
@ -58,11 +60,15 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
}
|
||||
|
||||
/* Join the run-time environment */
|
||||
|
||||
if (LAM_SUCCESS != (ret = lam_rte_init(&allow_multi_user_threads, &have_hidden_threads))) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* initialize lam procs */
|
||||
if (LAM_SUCCESS != (ret = lam_proc_init())) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Open up relevant MCA modules. Do not open io, topo, or one
|
||||
module types here -- they are loaded upon demand (i.e., upon
|
||||
relevant constructors). */
|
||||
@ -90,11 +96,6 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* initialize lam procs */
|
||||
if (LAM_SUCCESS != (ret = lam_proc_init())) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* initialize groups */
|
||||
if (LAM_SUCCESS != (ret = lam_group_init())) {
|
||||
return ret;
|
||||
@ -113,8 +114,18 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
lam_mpi_param_check = (bool) value;
|
||||
|
||||
/* do module exchange */
|
||||
if (LAM_SUCCESS != (ret = mca_base_modex_exchange())) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* add all lam_proc_t's to PML */
|
||||
if (NULL == (procs = lam_proc_world(&nprocs)))
|
||||
return LAM_ERROR;
|
||||
if (LAM_SUCCESS != (ret = mca_pml.pml_add_procs(procs, nprocs))) {
|
||||
free(procs);
|
||||
return ret;
|
||||
}
|
||||
free(procs);
|
||||
|
||||
/* save the resulting thread levels */
|
||||
|
||||
|
@ -5,4 +5,8 @@
|
||||
|
||||
include $(top_srcdir)/config/Makefile.options
|
||||
|
||||
SUBDIRS = lfc
|
||||
SUBDIRS = \
|
||||
lfc \
|
||||
threads \
|
||||
util
|
||||
|
||||
|
7
test/lam/threads/.cvsignore
Обычный файл
7
test/lam/threads/.cvsignore
Обычный файл
@ -0,0 +1,7 @@
|
||||
Makefile
|
||||
Makefile.in
|
||||
*.lo
|
||||
.deps
|
||||
.libs
|
||||
lam_condition
|
||||
lam_thread
|
40
test/lam/threads/Makefile.am
Обычный файл
40
test/lam/threads/Makefile.am
Обычный файл
@ -0,0 +1,40 @@
|
||||
# -*- makefile -*-
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
include $(top_srcdir)/config/Makefile.options
|
||||
AM_CPPFLAGS = -I$(top_srcdir)/test/support -DLAM_ENABLE_DEBUG_OVERRIDE=1
|
||||
|
||||
noinst_PROGRAMS = \
|
||||
lam_thread \
|
||||
lam_condition
|
||||
|
||||
lam_thread_SOURCES = lam_thread.c
|
||||
lam_thread_LDADD = \
|
||||
$(top_builddir)/src/lam/threads/thread.lo \
|
||||
$(top_builddir)/src/lam/lfc/lam_object.lo \
|
||||
$(top_builddir)/src/lam/mem/malloc.lo \
|
||||
$(top_builddir)/src/lam/util/output.lo \
|
||||
$(top_builddir)/src/lam/threads/mutex.lo \
|
||||
$(top_builddir)/test/support/libsupport.la \
|
||||
-lpthread
|
||||
lam_thread_DEPENDENCIES = $(lam_thread_LDADD)
|
||||
|
||||
lam_condition_SOURCES = lam_condition.c
|
||||
lam_condition_LDADD = \
|
||||
$(top_builddir)/src/lam/threads/condition_pthread.lo \
|
||||
$(top_builddir)/src/lam/threads/condition_spinwait.lo \
|
||||
$(top_builddir)/src/lam/threads/condition_spinlock.lo \
|
||||
$(top_builddir)/src/lam/threads/mutex_pthread.lo \
|
||||
$(top_builddir)/src/lam/threads/mutex_spinwait.lo \
|
||||
$(top_builddir)/src/lam/threads/mutex_spinlock.lo \
|
||||
$(top_builddir)/src/lam/threads/mutex.lo \
|
||||
$(top_builddir)/src/lam/threads/thread.lo \
|
||||
$(top_builddir)/src/lam/lfc/lam_object.lo \
|
||||
$(top_builddir)/src/lam/mem/malloc.lo \
|
||||
$(top_builddir)/src/lam/util/output.lo \
|
||||
$(top_builddir)/test/support/libsupport.la \
|
||||
-lpthread
|
||||
lam_condition_DEPENDENCIES = $(lam_condition_LDADD)
|
||||
|
90
test/lam/threads/lam_condition.c
Обычный файл
90
test/lam/threads/lam_condition.c
Обычный файл
@ -0,0 +1,90 @@
|
||||
#include <stdio.h>
|
||||
#include <time.h>
|
||||
#include "support.h"
|
||||
#include "lam/constants.h"
|
||||
#include "lam/threads/thread.h"
|
||||
#include "lam/threads/condition.h"
|
||||
#include "lam/os/atomic.h"
|
||||
|
||||
|
||||
lam_mutex_t mutex;
|
||||
lam_condition_t thr1_cond;
|
||||
lam_condition_t thr2_cond;
|
||||
|
||||
int thr1_count = 0;
|
||||
int thr2_count = 0;
|
||||
|
||||
|
||||
#define TEST_COUNT 1000000
|
||||
|
||||
|
||||
static void* thr1_run(lam_object_t* obj)
|
||||
{
|
||||
int i;
|
||||
lam_mutex_lock(&mutex);
|
||||
clock_t c1 = clock();
|
||||
for(i=0; i<TEST_COUNT; i++) {
|
||||
lam_condition_wait(&thr1_cond, &mutex);
|
||||
lam_condition_signal(&thr2_cond);
|
||||
thr1_count++;
|
||||
}
|
||||
clock_t c2 = clock();
|
||||
lam_mutex_unlock(&mutex);
|
||||
fprintf(stderr, "thr1: time per iteration: %ld usec\n", (c2 - c1) / TEST_COUNT);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void* thr2_run(lam_object_t* obj)
|
||||
{
|
||||
int i;
|
||||
lam_mutex_lock(&mutex);
|
||||
clock_t c1 = clock();
|
||||
for(i=0; i<TEST_COUNT; i++) {
|
||||
lam_condition_signal(&thr1_cond);
|
||||
lam_condition_wait(&thr2_cond, &mutex);
|
||||
thr2_count++;
|
||||
}
|
||||
clock_t c2 = clock();
|
||||
lam_mutex_unlock(&mutex);
|
||||
fprintf(stderr, "thr2: time per iteration: %ld usec\n", (c2 - c1) / TEST_COUNT);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
int rc;
|
||||
lam_thread_t* thr1;
|
||||
lam_thread_t* thr2;
|
||||
|
||||
test_init("lam_condition_t");
|
||||
|
||||
OBJ_CONSTRUCT(&mutex, lam_mutex_t);
|
||||
OBJ_CONSTRUCT(&thr1_cond, lam_condition_t);
|
||||
OBJ_CONSTRUCT(&thr2_cond, lam_condition_t);
|
||||
|
||||
thr1 = OBJ_NEW(lam_thread_t);
|
||||
thr2 = OBJ_NEW(lam_thread_t);
|
||||
thr1->t_run = thr1_run;
|
||||
thr2->t_run = thr2_run;
|
||||
|
||||
rc = lam_thread_start(thr1);
|
||||
test_verify_int(LAM_SUCCESS, rc);
|
||||
|
||||
rc = lam_thread_start(thr2);
|
||||
test_verify_int(LAM_SUCCESS, rc);
|
||||
|
||||
rc = lam_thread_join(thr1, NULL);
|
||||
test_verify_int(LAM_SUCCESS, rc);
|
||||
test_verify_int(TEST_COUNT, thr1_count);
|
||||
|
||||
rc = lam_thread_join(thr2, NULL);
|
||||
test_verify_int(LAM_SUCCESS, rc);
|
||||
test_verify_int(TEST_COUNT, thr2_count);
|
||||
|
||||
test_finalize();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
53
test/lam/threads/lam_thread.c
Обычный файл
53
test/lam/threads/lam_thread.c
Обычный файл
@ -0,0 +1,53 @@
|
||||
#include "support.h"
|
||||
#include "lam/constants.h"
|
||||
#include "lam/threads/thread.h"
|
||||
#include "lam/os/atomic.h"
|
||||
|
||||
|
||||
static volatile int count = 0;
|
||||
|
||||
|
||||
static void* thr1_run(lam_object_t* obj)
|
||||
{
|
||||
fetchNadd(&count, 1);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void* thr2_run(lam_object_t* obj)
|
||||
{
|
||||
fetchNadd(&count, 2);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
int rc;
|
||||
lam_thread_t thr1;
|
||||
lam_thread_t thr2;
|
||||
|
||||
test_init("lam_thread_t");
|
||||
|
||||
OBJ_CONSTRUCT(&thr1, lam_thread_t);
|
||||
OBJ_CONSTRUCT(&thr2, lam_thread_t);
|
||||
|
||||
thr1.t_run = thr1_run;
|
||||
thr2.t_run = thr2_run;
|
||||
|
||||
rc = lam_thread_start(&thr1);
|
||||
test_verify_int(LAM_SUCCESS, rc);
|
||||
|
||||
rc = lam_thread_start(&thr2);
|
||||
test_verify_int(LAM_SUCCESS, rc);
|
||||
|
||||
rc = lam_thread_join(&thr1, NULL);
|
||||
test_verify_int(LAM_SUCCESS, rc);
|
||||
|
||||
rc = lam_thread_join(&thr2, NULL);
|
||||
test_verify_int(LAM_SUCCESS, rc);
|
||||
|
||||
test_verify_int(3, count);
|
||||
return test_finalize();
|
||||
}
|
||||
|
||||
|
||||
|
6
test/lam/util/.cvsignore
Обычный файл
6
test/lam/util/.cvsignore
Обычный файл
@ -0,0 +1,6 @@
|
||||
Makefile
|
||||
Makefile.in
|
||||
*.lo
|
||||
.deps
|
||||
.libs
|
||||
lam_argv
|
20
test/lam/util/Makefile.am
Обычный файл
20
test/lam/util/Makefile.am
Обычный файл
@ -0,0 +1,20 @@
|
||||
# -*- makefile -*-
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
include $(top_srcdir)/config/Makefile.options
|
||||
AM_CPPFLAGS = -I$(top_srcdir)/test/support -DLAM_ENABLE_DEBUG_OVERRIDE=1
|
||||
|
||||
noinst_PROGRAMS = \
|
||||
lam_argv
|
||||
|
||||
lam_argv_SOURCES = lam_argv.c
|
||||
lam_argv_LDADD = \
|
||||
$(top_builddir)/src/lam/lfc/lam_object.lo \
|
||||
$(top_builddir)/src/lam/mem/malloc.lo \
|
||||
$(top_builddir)/src/lam/util/output.lo \
|
||||
$(top_builddir)/src/lam/threads/mutex.lo \
|
||||
$(top_builddir)/test/support/libsupport.la
|
||||
lam_argv_DEPENDENCIES = $(lam_argv_LDADD)
|
||||
|
14
test/lam/util/lam_argv.c
Обычный файл
14
test/lam/util/lam_argv.c
Обычный файл
@ -0,0 +1,14 @@
|
||||
#include "lam_config.h"
|
||||
#include "lam/util/argv.h"
|
||||
#include "support.h"
|
||||
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
test_init("lam_argv");
|
||||
|
||||
/* body of test goes here */
|
||||
|
||||
return test_finalize();
|
||||
}
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user