1
1

* Fix a number of compiler errors that crop up with undefined function

errors
* Add some basic unit test gorp for COFS OOB module

This commit was SVN r234.
Этот коммит содержится в:
Brian Barrett 2004-01-11 00:02:06 +00:00
родитель a6612f3664
Коммит 6f7811f9d5
15 изменённых файлов: 277 добавлений и 51 удалений

Просмотреть файл

@ -457,6 +457,8 @@ AC_CONFIG_FILES([
test/Makefile
test/unit/Makefile
test/unit/lam/Makefile
test/unit/lam/oob_cofs/Makefile
test/unit/mpi/Makefile
test/unit/mpi/communicator/Makefile
test/unit/mpi/datatype/Makefile

Просмотреть файл

@ -19,7 +19,8 @@ enum {
LAM_ERR_FATAL = -9,
LAM_ERR_NOT_IMPLEMENTED = -10,
LAM_ERR_NOT_SUPPORTED = -11,
LAM_ERR_INTERUPTED = -12
LAM_ERR_INTERUPTED = -12,
LAM_ERR_WOULD_BLOCK = -13
};
#endif /* LAM_CONSTANTS_H */

Просмотреть файл

@ -54,7 +54,7 @@ int lam_fh_set_value_for_skey(lam_fast_hash_t *htbl, void *val, const char *ke
/* returns the number of items in the table */
uint32_t lam_fh_count(lam_fast_hash_t *htbl);
inline uint32_t lam_fh_count(lam_fast_hash_t *htbl) {return htbl->fh_count;}
static uint32_t lam_fh_count(lam_fast_hash_t *htbl);
static inline uint32_t lam_fh_count(lam_fast_hash_t *htbl) {return htbl->fh_count;}
#endif /* LAM_HASH_TABLE_H */

Просмотреть файл

@ -65,20 +65,20 @@ void lam_list_destroy(lam_list_t *list);
*/
#define lam_list_get_type(list) \
((lam_list_list_t*)list)->lam_list_type
((lam_list_t*)list)->lam_list_type
#define lam_list_set_type(list, type) \
(((lam_list_list_t*)list)->lam_list_type = type)
(((lam_list_t*)list)->lam_list_type = type)
#define lam_list_get_size(list) \
((lam_list_list_t*)list)->lam_list_length
((lam_list_t*)list)->lam_list_length
/*
* Returns first item on list, but does not remove it from the list.
*/
#define lam_list_get_first(list) \
((lam_list_list_t*)list)->lam_list_head
((lam_list_t*)list)->lam_list_head
/*

Просмотреть файл

@ -3,6 +3,7 @@
*/
#include "lam/mem/seg_list.h"
#include "lam/lfc/list.h"
lam_class_info_t seg_list_cls = {"lam_seg_list_t", &lam_object_cls,
(class_init_t)lam_sgl_init, (class_destroy_t)lam_sgl_destroy};
@ -21,7 +22,7 @@ void lam_sgl_init(lam_seg_list_t *slist)
void lam_sgl_destroy(lam_seg_list_t *slist)
{
lam_dbl_empty_list(&(slist->sgl_list));
lam_list_empty_list(&(slist->sgl_list));
SUPER_DESTROY(slist, seg_list_cls.cls_parent);
}
@ -44,7 +45,7 @@ void lam_sgl_append_elt_chunk(
slist->sgl_bytes_pushed += chunk_size;
for ( i = 0; i < n_elts; i++ )
{
lam_dbl_append(&(slist->sgl_list), (lam_list_item_t *)ptr);
lam_list_append(&(slist->sgl_list), (lam_list_item_t *)ptr);
ptr += elt_size;
}
}

Просмотреть файл

@ -41,8 +41,8 @@ void lam_sgl_append_elt_chunk(
#define lam_sgl_lock_list(slist) lam_mtx_trylock(&slist->sgl_lock)
#define lam_sgl_unlock_list(slist) lam_mtx_unlock(&slist->sgl_lock)
bool lam_sgl_is_locked(lam_seg_list_t *slist);
inline bool lam_sgl_is_locked(lam_seg_list_t *slist)
static bool lam_sgl_is_locked(lam_seg_list_t *slist);
static inline bool lam_sgl_is_locked(lam_seg_list_t *slist)
{
/* returns 1 if list is currently locked, otherwise 0. */
int ret;

Просмотреть файл

@ -117,14 +117,16 @@ lam_cmd_line_t *lam_cmd_line_create(void)
*/
int lam_cmd_line_free(lam_cmd_line_t *cmd)
{
#if 0
/* We don't lock the mutex; there's no point. Just free it. */
lam_mtx_free(&cmd->lcl_mutex);
#endif
/* Free the lists */
lam_list_free(&cmd->lcl_options);
lam_list_free(&cmd->lcl_params);
lam_list_destroy(&cmd->lcl_options);
lam_list_destroy(&cmd->lcl_params);
/* Free the argv's */

Просмотреть файл

@ -204,13 +204,11 @@ lam_path_env_findv(char *fname, int mode, char **envv, char *wrkdir)
char *
lam_path_env_find(char *fname, int mode)
{
char *cwd;
char cwd[LAM_PATH_MAX];
char *r;
cwd = getworkdir();
getcwd(cwd, LAM_PATH_MAX);
r = lam_path_env_findv(fname, mode, 0, cwd);
if (cwd)
LAM_FREE(cwd);
return(r);
}

Просмотреть файл

@ -34,22 +34,22 @@ lam_class_info_t lam_reactor_descriptor_cls = {
void lam_reactor_descriptor_init(lam_reactor_descriptor_t* rd)
{
lam_dbl_item_init(&rd->rd_base);
lam_list_item_init(&rd->rd_base);
rd->rd_base.super.obj_class = &lam_reactor_descriptor_cls;
}
void lam_reactor_descriptor_destroy(lam_reactor_descriptor_t* rd)
{
lam_dbl_item_destroy(&rd->rd_base);
lam_list_item_destroy(&rd->rd_base);
}
static inline lam_reactor_descriptor_t* lam_reactor_get_descriptor(lam_reactor_t* r, int sd)
{
lam_reactor_descriptor_t *descriptor;
if(lam_dbl_get_size(&r->r_free))
descriptor = (lam_reactor_descriptor_t*)lam_dbl_remove_first_item(&r->r_free);
if(lam_list_get_size(&r->r_free))
descriptor = (lam_reactor_descriptor_t*)lam_list_remove_first(&r->r_free);
else {
descriptor = (lam_reactor_descriptor_t*)LAM_MALLOC(sizeof(lam_reactor_descriptor_t));
lam_reactor_descriptor_init(descriptor);
@ -73,9 +73,9 @@ void lam_reactor_init(lam_reactor_t* r)
r->r_base.obj_class = &lam_reactor_cls;
lam_mtx_init(&r->r_mutex);
lam_dbl_init(&r->r_active);
lam_dbl_init(&r->r_free);
lam_dbl_init(&r->r_pending);
lam_list_init(&r->r_active);
lam_list_init(&r->r_free);
lam_list_init(&r->r_pending);
lam_fh_init(&r->r_hash);
lam_fh_init_with(&r->r_hash, 1024);
@ -91,9 +91,9 @@ void lam_reactor_init(lam_reactor_t* r)
void lam_reactor_destroy(lam_reactor_t* r)
{
lam_dbl_destroy(&r->r_active);
lam_dbl_destroy(&r->r_free);
lam_dbl_destroy(&r->r_pending);
lam_list_destroy(&r->r_active);
lam_list_destroy(&r->r_free);
lam_list_destroy(&r->r_pending);
lam_fh_destroy(&r->r_hash);
lam_obj_destroy(&r->r_base);
}
@ -109,14 +109,14 @@ bool lam_reactor_insert(lam_reactor_t* r, int sd, lam_reactor_listener_t* listen
#endif
lam_mtx_lock(&r->r_mutex);
lam_reactor_descriptor_t *descriptor = (lam_reactor_descriptor_t*)lam_dbl_remove_first(&r->r_free);
lam_reactor_descriptor_t *descriptor = (lam_reactor_descriptor_t*)lam_list_remove_first(&r->r_free);
if(descriptor == 0) {
descriptor = lam_reactor_get_descriptor(r, sd);
if(descriptor == 0) {
lam_mtx_unlock(&r->r_mutex);
return false;
}
lam_dbl_append(&r->r_pending, &descriptor->rd_base);
lam_list_append(&r->r_pending, &descriptor->rd_base);
lam_fh_set_value_for_ikey(&r->r_hash,descriptor,sd);
}
@ -184,9 +184,9 @@ void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_
*/
lam_reactor_descriptor_t *descriptor;
for(descriptor = (lam_reactor_descriptor_t*)lam_dbl_get_first(&r->r_active);
for(descriptor = (lam_reactor_descriptor_t*)lam_list_get_first(&r->r_active);
descriptor != 0;
descriptor = (lam_reactor_descriptor_t*)lam_dbl_get_next(descriptor)) {
descriptor = (lam_reactor_descriptor_t*)lam_list_get_next(descriptor)) {
int rd = descriptor->rd;
int flags = 0;
if(LAM_FD_ISSET(rd, rset) && descriptor->rd_flags & LAM_NOTIFY_RECV) {
@ -211,14 +211,14 @@ void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_
}
/* cleanup any pending deletes while holding the lock */
descriptor = (lam_reactor_descriptor_t*)lam_dbl_get_first(&r->r_active);
descriptor = (lam_reactor_descriptor_t*)lam_list_get_first(&r->r_active);
while(descriptor != 0) {
lam_reactor_descriptor_t* next = (lam_reactor_descriptor_t*)lam_dbl_get_next(&r->r_active);
lam_reactor_descriptor_t* next = (lam_reactor_descriptor_t*)lam_list_get_next(&r->r_active);
if(descriptor->rd_flags == 0) {
lam_fh_remove_value_for_ikey(&r->r_hash, descriptor->rd);
lam_dbl_list_remove(&r->r_active, descriptor);
if(lam_dbl_get_size(&r->r_free) < MAX_DESCRIPTOR_POOL_SIZE) {
lam_dbl_append(&r->r_free, &descriptor->rd_base);
lam_list_remove(&r->r_active, descriptor);
if(lam_list_get_size(&r->r_free) < MAX_DESCRIPTOR_POOL_SIZE) {
lam_list_append(&r->r_free, &descriptor->rd_base);
} else {
lam_reactor_descriptor_destroy(descriptor);
LAM_FREE(descriptor);
@ -228,18 +228,18 @@ void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_
}
/* add any other pending inserts/deletes */
while(lam_dbl_get_size(&r->r_pending)) {
lam_reactor_descriptor_t* descriptor = (lam_reactor_descriptor_t*)lam_dbl_remove_first(&r->r_pending);
while(lam_list_get_size(&r->r_pending)) {
lam_reactor_descriptor_t* descriptor = (lam_reactor_descriptor_t*)lam_list_remove_first(&r->r_pending);
if(descriptor->rd_flags == 0) {
lam_fh_remove_value_for_ikey(&r->r_hash, descriptor->rd);
if(lam_dbl_get_size(&r->r_free) < MAX_DESCRIPTOR_POOL_SIZE) {
lam_dbl_append(&r->r_free, &descriptor->rd_base);
if(lam_list_get_size(&r->r_free) < MAX_DESCRIPTOR_POOL_SIZE) {
lam_list_append(&r->r_free, &descriptor->rd_base);
} else {
lam_reactor_descriptor_destroy(descriptor);
LAM_FREE(descriptor);
}
} else {
lam_dbl_append(&r->r_active, &descriptor->rd_base);
lam_list_append(&r->r_active, &descriptor->rd_base);
if(descriptor->rd > r->r_max)
r->r_max = descriptor->rd;
}

Просмотреть файл

@ -70,6 +70,7 @@ bool lam_reactor_insert(lam_reactor_t*, int sd, lam_reactor_listener_t*, int fla
bool lam_reactor_remove(lam_reactor_t*, int sd, lam_reactor_listener_t*, int flags);
void lam_reactor_poll(lam_reactor_t*);
void lam_reactor_run(lam_reactor_t*);
void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_set_t* sset, lam_fd_set_t* eset);
#endif /* LAM_REACTOR_H */

Просмотреть файл

@ -8,12 +8,60 @@
#include "mca/lam/oob/oob.h"
#include "mca/lam/oob/cofs/src/oob_cofs.h"
#include "lam/util/malloc.h"
#include <stdio.h>
#include <sys/types.h>
#include <dirent.h>
#include <string.h>
#include <unistd.h>
static int blocking_recv_posted = 0;
static int do_recv(lam_job_handle_t job_handle, int* tag, int* vpid,
void** data, size_t* data_len);
int
mca_oob_cofs_send(lam_job_handle_t job_handle, int vpid, int tag,
void* data, size_t data_len)
{
return 0;
FILE *fp;
size_t wlen;
char msg_file[LAM_PATH_MAX];
char msg_file_tmp[LAM_PATH_MAX];
/* create the file and open it... */
snprintf(msg_file, LAM_PATH_MAX, "%s/%s_%d_%d_%d_%d.msg", mca_oob_cofs_comm_loc,
job_handle, mca_oob_cofs_my_vpid, vpid, tag, mca_oob_cofs_serial);
snprintf(msg_file_tmp, LAM_PATH_MAX, "%s/.%s_%d_%d_%d_%d.msg", mca_oob_cofs_comm_loc,
job_handle, mca_oob_cofs_my_vpid, vpid, tag, mca_oob_cofs_serial);
fp = fopen(msg_file_tmp, "w");
if (fp == NULL) {
return LAM_ERR_OUT_OF_RESOURCE;
}
/* BWB - do network byte ordering... */
/* write size */
wlen = fwrite(&data_len, sizeof(size_t), 1, fp);
if (wlen != 1) {
fclose(fp);
unlink(msg_file_tmp);
return LAM_ERR_OUT_OF_RESOURCE;
}
/* write packet */
wlen = fwrite(data, 1, data_len, fp);
if (wlen != data_len) {
fclose(fp);
unlink(msg_file_tmp);
return LAM_ERR_OUT_OF_RESOURCE;
}
/* publish the thing... */
fclose(fp);
rename(msg_file_tmp, msg_file);
return LAM_SUCCESS;
}
@ -21,7 +69,13 @@ int
mca_oob_cofs_recv(lam_job_handle_t job_handle, int* tag, int* vpid,
void** data, size_t* data_len)
{
return 0;
int ret = LAM_ERR_WOULD_BLOCK;
blocking_recv_posted = 1;
while (ret == LAM_ERR_WOULD_BLOCK) {
ret = do_recv(job_handle, tag, vpid, data, data_len);
}
blocking_recv_posted = 0;
return ret;
}
@ -29,7 +83,11 @@ int
mca_oob_cofs_recv_nb(lam_job_handle_t job_handle, int* tag, int* vpid,
void** data, size_t* data_len)
{
return 0;
if (blocking_recv_posted != 0) {
return LAM_ERR_WOULD_BLOCK;
}
return do_recv(job_handle, tag, vpid, data, data_len);
}
@ -37,5 +95,103 @@ int
mca_oob_cofs_recv_cb(lam_job_handle_t job_handle, int tag,
mca_oob_recv_cb_t callback)
{
return 0;
return LAM_ERR_NOT_SUPPORTED;
}
static char*
find_match(lam_job_handle_t job_handle, int* tag, int* vpid)
{
DIR* dir;
struct dirent *ent;
char tmp_handle[LAM_PATH_MAX];
int tmp_tag, tmp_vpid, tmp_myvpid, tmp_serial;
int ret;
bool found = false;
char best_name[LAM_PATH_MAX];
int best_tag, best_vpid;
dir = opendir(mca_oob_cofs_comm_loc);
if (dir == NULL) {
return NULL;
}
while ((ent = readdir(dir)) != NULL) {
if (ent->d_name[0] = '.') continue;
ret = sscanf(ent->d_name, "%s_%d_%d_%d_%d.msg", tmp_handle, &tmp_myvpid,
&tmp_vpid, &tmp_tag, &tmp_serial);
if (ret != 5) continue;
if (strcmp(tmp_handle, job_handle)) continue;
if (tmp_myvpid != mca_oob_cofs_my_vpid) continue;
if (*tag != MCA_OOB_ANY_TAG && tmp_tag != *tag) continue;
/* do best one here... */
found = true;
strcpy(best_name, ent->d_name);
best_tag = tmp_tag;
best_vpid = tmp_vpid;
break;
}
closedir(dir);
if (found) {
*tag = best_tag;
*vpid = best_vpid;
return strdup(best_name);
} else {
return NULL;
}
}
static int
do_recv(lam_job_handle_t job_handle, int* tag, int* vpid,
void** data, size_t* data_len)
{
char *fname;
FILE *fp;
size_t rlen;
fname = find_match(job_handle, tag, vpid);
if (fname == NULL) {
return LAM_ERR_WOULD_BLOCK;
}
fp = fopen(fname, "r");
if (fp == NULL) {
free(fname);
return LAM_ERROR;
}
unlink(fname);
rlen = fread(data_len, sizeof(size_t), 1, fp);
if (rlen != 1) {
free(fname);
fclose(fp);
return LAM_ERROR;
}
*data = (void*) malloc(*data_len);
if (*data_len == NULL) {
free(fname);
fclose(fp);
*data_len = 0;
return LAM_ERROR;
}
rlen = fread(*data, 1, *data_len, fp);
if (rlen != *data_len) {
free(fname);
fclose(fp);
free(*data);
*data_len = 0;
return LAM_ERROR;
}
free(fname);
fclose(fp);
return LAM_SUCCESS;
}

Просмотреть файл

@ -3,6 +3,7 @@
* $HEADER$
*
*/
#include "lam_config.h"
#include "mca/lam/oob/oob.h"
#include "lam/types.h"
@ -33,3 +34,8 @@ int mca_oob_cofs_recv_nb(lam_job_handle_t job_handle, int* tag, int* vpid,
void** data, size_t* data_len);
int mca_oob_cofs_recv_cb(lam_job_handle_t job_handle, int tag,
mca_oob_recv_cb_t callback);
extern char mca_oob_cofs_comm_loc[LAM_PATH_MAX]; /* location for file drop-off */
extern int mca_oob_cofs_my_vpid;
extern unsigned int mca_oob_cofs_serial;

Просмотреть файл

@ -11,6 +11,10 @@
#include "mca/lam/oob/oob.h"
#include "mca/lam/oob/cofs/src/oob_cofs.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
/*
* Struct of function pointers and all that to let us be initialized
@ -37,6 +41,16 @@ mca_oob_module_1_0_0_t mca_oob_cofs_module_1_0_0_0 = {
mca_oob_cofs_finalize
};
struct mca_oob_1_0_0_t mca_oob_cofs_1_0_0 = {
mca_oob_cofs_send,
mca_oob_cofs_recv,
mca_oob_cofs_recv_nb,
mca_oob_cofs_recv_cb
};
char mca_oob_cofs_comm_loc[LAM_PATH_MAX];
int mca_oob_cofs_my_vpid;
unsigned int mca_oob_cofs_serial;
int
mca_oob_cofs_open(lam_cmd_line_t *cmd)
@ -63,11 +77,55 @@ mca_oob_cofs_query(int *priority)
struct mca_oob_1_0_0*
mca_oob_cofs_init(void)
{
char *tmp;
FILE *fp;
/*
* BWB - fix me, make register the "right" way...
*/
tmp = getenv("MCA_common_lam_cofs_comm_dir");
if (tmp == NULL) {
/* make it $HOME */
tmp = getenv("HOME");
if (tmp == NULL) {
printf("oob_cofs can not find communication dir\n");
return NULL;
}
snprintf(mca_oob_cofs_comm_loc, LAM_PATH_MAX, "%s/cofs", tmp);
} else {
snprintf(mca_oob_cofs_comm_loc, LAM_PATH_MAX, "%s", tmp);
}
return NULL;
/*
* See if we can write in our directory...
*/
tmp = malloc(strlen(mca_oob_cofs_comm_loc) + 5);
if (tmp == NULL) return NULL;
sprintf(tmp, "%s/me", mca_oob_cofs_comm_loc);
fp = fopen(tmp, "w");
if (fp == NULL) {
printf("oob_cofs can not write in communication dir\n");
free(tmp);
return NULL;
}
fclose(fp);
unlink(tmp);
free(tmp);
/*
* BWB - fix me, make register the "right" way...
*/
/* find our vpid */
tmp = getenv("MCA_OOB_BASE_VPID");
if (tmp == NULL) {
printf("oob_cofs can not find vpid\n");
return NULL;
}
mca_oob_cofs_my_vpid = atoi(tmp);
mca_oob_cofs_serial = 0;
return &mca_oob_cofs_1_0_0;
}

Просмотреть файл

@ -3,7 +3,8 @@
*/
#include "mca/mpi/pml/teg/proc.h"
#include "lam/atomic.h"
#include "mca/mpi/pml/teg/ptl_array.h"
lam_class_info_t mca_pml_teg_proc_cls = {
"mca_pml_teg_proc_t",
@ -21,8 +22,8 @@ void mca_pml_teg_proc_init(mca_pml_proc_t* proc)
if(fetchNset(&mca_pml_teg_procs_init,1) == 0)
lam_list_init(&mca_pml_teg_procs);
SUPER_INIT(proc, &lam_list_item_cls);
mca_pml_teg_array_init(&proc->proc_ptl_first);
mca_pml_teg_array_init(&proc->proc_ptl_next);
mca_ptl_array_init(&proc->proc_ptl_first);
mca_ptl_array_init(&proc->proc_ptl_next);
lam_list_append(&mca_pml_teg_procs, &proc->super);
}

Просмотреть файл

@ -5,4 +5,4 @@
include $(top_srcdir)/config/Makefile.options
SUBDIRS = mpi
SUBDIRS = mpi lam