diff --git a/configure.ac b/configure.ac index dd28e98c37..d0bb4d5997 100644 --- a/configure.ac +++ b/configure.ac @@ -457,6 +457,8 @@ AC_CONFIG_FILES([ test/Makefile test/unit/Makefile + test/unit/lam/Makefile + test/unit/lam/oob_cofs/Makefile test/unit/mpi/Makefile test/unit/mpi/communicator/Makefile test/unit/mpi/datatype/Makefile diff --git a/src/lam/constants.h b/src/lam/constants.h index 001c78b355..1f8bddba70 100644 --- a/src/lam/constants.h +++ b/src/lam/constants.h @@ -19,7 +19,8 @@ enum { LAM_ERR_FATAL = -9, LAM_ERR_NOT_IMPLEMENTED = -10, LAM_ERR_NOT_SUPPORTED = -11, - LAM_ERR_INTERUPTED = -12 + LAM_ERR_INTERUPTED = -12, + LAM_ERR_WOULD_BLOCK = -13 }; #endif /* LAM_CONSTANTS_H */ diff --git a/src/lam/lfc/hash_table.h b/src/lam/lfc/hash_table.h index 18041fdc1e..cacba430d4 100644 --- a/src/lam/lfc/hash_table.h +++ b/src/lam/lfc/hash_table.h @@ -54,7 +54,7 @@ int lam_fh_set_value_for_skey(lam_fast_hash_t *htbl, void *val, const char *ke /* returns the number of items in the table */ -uint32_t lam_fh_count(lam_fast_hash_t *htbl); -inline uint32_t lam_fh_count(lam_fast_hash_t *htbl) {return htbl->fh_count;} +static uint32_t lam_fh_count(lam_fast_hash_t *htbl); +static inline uint32_t lam_fh_count(lam_fast_hash_t *htbl) {return htbl->fh_count;} #endif /* LAM_HASH_TABLE_H */ diff --git a/src/lam/lfc/list.h b/src/lam/lfc/list.h index 992af180d0..19aa7ec33f 100644 --- a/src/lam/lfc/list.h +++ b/src/lam/lfc/list.h @@ -65,20 +65,20 @@ void lam_list_destroy(lam_list_t *list); */ #define lam_list_get_type(list) \ - ((lam_list_list_t*)list)->lam_list_type + ((lam_list_t*)list)->lam_list_type #define lam_list_set_type(list, type) \ - (((lam_list_list_t*)list)->lam_list_type = type) + (((lam_list_t*)list)->lam_list_type = type) #define lam_list_get_size(list) \ - ((lam_list_list_t*)list)->lam_list_length + ((lam_list_t*)list)->lam_list_length /* * Returns first item on list, but does not remove it from the list. */ #define lam_list_get_first(list) \ - ((lam_list_list_t*)list)->lam_list_head + ((lam_list_t*)list)->lam_list_head /* diff --git a/src/lam/mem/seg_list.c b/src/lam/mem/seg_list.c index 087bd4ad68..6ed4305f8d 100644 --- a/src/lam/mem/seg_list.c +++ b/src/lam/mem/seg_list.c @@ -3,6 +3,7 @@ */ #include "lam/mem/seg_list.h" +#include "lam/lfc/list.h" lam_class_info_t seg_list_cls = {"lam_seg_list_t", &lam_object_cls, (class_init_t)lam_sgl_init, (class_destroy_t)lam_sgl_destroy}; @@ -21,7 +22,7 @@ void lam_sgl_init(lam_seg_list_t *slist) void lam_sgl_destroy(lam_seg_list_t *slist) { - lam_dbl_empty_list(&(slist->sgl_list)); + lam_list_empty_list(&(slist->sgl_list)); SUPER_DESTROY(slist, seg_list_cls.cls_parent); } @@ -44,7 +45,7 @@ void lam_sgl_append_elt_chunk( slist->sgl_bytes_pushed += chunk_size; for ( i = 0; i < n_elts; i++ ) { - lam_dbl_append(&(slist->sgl_list), (lam_list_item_t *)ptr); + lam_list_append(&(slist->sgl_list), (lam_list_item_t *)ptr); ptr += elt_size; } } diff --git a/src/lam/mem/seg_list.h b/src/lam/mem/seg_list.h index a7c0ffb944..18f735daac 100644 --- a/src/lam/mem/seg_list.h +++ b/src/lam/mem/seg_list.h @@ -41,8 +41,8 @@ void lam_sgl_append_elt_chunk( #define lam_sgl_lock_list(slist) lam_mtx_trylock(&slist->sgl_lock) #define lam_sgl_unlock_list(slist) lam_mtx_unlock(&slist->sgl_lock) -bool lam_sgl_is_locked(lam_seg_list_t *slist); -inline bool lam_sgl_is_locked(lam_seg_list_t *slist) +static bool lam_sgl_is_locked(lam_seg_list_t *slist); +static inline bool lam_sgl_is_locked(lam_seg_list_t *slist) { /* returns 1 if list is currently locked, otherwise 0. */ int ret; diff --git a/src/lam/util/cmd_line.c b/src/lam/util/cmd_line.c index 541256a46c..f55b08f59f 100644 --- a/src/lam/util/cmd_line.c +++ b/src/lam/util/cmd_line.c @@ -117,14 +117,16 @@ lam_cmd_line_t *lam_cmd_line_create(void) */ int lam_cmd_line_free(lam_cmd_line_t *cmd) { +#if 0 /* We don't lock the mutex; there's no point. Just free it. */ lam_mtx_free(&cmd->lcl_mutex); +#endif /* Free the lists */ - lam_list_free(&cmd->lcl_options); - lam_list_free(&cmd->lcl_params); + lam_list_destroy(&cmd->lcl_options); + lam_list_destroy(&cmd->lcl_params); /* Free the argv's */ diff --git a/src/lam/util/path.c b/src/lam/util/path.c index ab4c3c446c..d97a701988 100644 --- a/src/lam/util/path.c +++ b/src/lam/util/path.c @@ -204,14 +204,12 @@ lam_path_env_findv(char *fname, int mode, char **envv, char *wrkdir) char * lam_path_env_find(char *fname, int mode) { - char *cwd; + char cwd[LAM_PATH_MAX]; char *r; - cwd = getworkdir(); + getcwd(cwd, LAM_PATH_MAX); r = lam_path_env_findv(fname, mode, 0, cwd); - if (cwd) - LAM_FREE(cwd); - + return(r); } diff --git a/src/lam/util/reactor.c b/src/lam/util/reactor.c index c192e68ae8..0cbed971f0 100644 --- a/src/lam/util/reactor.c +++ b/src/lam/util/reactor.c @@ -34,22 +34,22 @@ lam_class_info_t lam_reactor_descriptor_cls = { void lam_reactor_descriptor_init(lam_reactor_descriptor_t* rd) { - lam_dbl_item_init(&rd->rd_base); + lam_list_item_init(&rd->rd_base); rd->rd_base.super.obj_class = &lam_reactor_descriptor_cls; } void lam_reactor_descriptor_destroy(lam_reactor_descriptor_t* rd) { - lam_dbl_item_destroy(&rd->rd_base); + lam_list_item_destroy(&rd->rd_base); } static inline lam_reactor_descriptor_t* lam_reactor_get_descriptor(lam_reactor_t* r, int sd) { lam_reactor_descriptor_t *descriptor; - if(lam_dbl_get_size(&r->r_free)) - descriptor = (lam_reactor_descriptor_t*)lam_dbl_remove_first_item(&r->r_free); + if(lam_list_get_size(&r->r_free)) + descriptor = (lam_reactor_descriptor_t*)lam_list_remove_first(&r->r_free); else { descriptor = (lam_reactor_descriptor_t*)LAM_MALLOC(sizeof(lam_reactor_descriptor_t)); lam_reactor_descriptor_init(descriptor); @@ -73,9 +73,9 @@ void lam_reactor_init(lam_reactor_t* r) r->r_base.obj_class = &lam_reactor_cls; lam_mtx_init(&r->r_mutex); - lam_dbl_init(&r->r_active); - lam_dbl_init(&r->r_free); - lam_dbl_init(&r->r_pending); + lam_list_init(&r->r_active); + lam_list_init(&r->r_free); + lam_list_init(&r->r_pending); lam_fh_init(&r->r_hash); lam_fh_init_with(&r->r_hash, 1024); @@ -91,9 +91,9 @@ void lam_reactor_init(lam_reactor_t* r) void lam_reactor_destroy(lam_reactor_t* r) { - lam_dbl_destroy(&r->r_active); - lam_dbl_destroy(&r->r_free); - lam_dbl_destroy(&r->r_pending); + lam_list_destroy(&r->r_active); + lam_list_destroy(&r->r_free); + lam_list_destroy(&r->r_pending); lam_fh_destroy(&r->r_hash); lam_obj_destroy(&r->r_base); } @@ -109,14 +109,14 @@ bool lam_reactor_insert(lam_reactor_t* r, int sd, lam_reactor_listener_t* listen #endif lam_mtx_lock(&r->r_mutex); - lam_reactor_descriptor_t *descriptor = (lam_reactor_descriptor_t*)lam_dbl_remove_first(&r->r_free); + lam_reactor_descriptor_t *descriptor = (lam_reactor_descriptor_t*)lam_list_remove_first(&r->r_free); if(descriptor == 0) { descriptor = lam_reactor_get_descriptor(r, sd); if(descriptor == 0) { lam_mtx_unlock(&r->r_mutex); return false; } - lam_dbl_append(&r->r_pending, &descriptor->rd_base); + lam_list_append(&r->r_pending, &descriptor->rd_base); lam_fh_set_value_for_ikey(&r->r_hash,descriptor,sd); } @@ -184,9 +184,9 @@ void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_ */ lam_reactor_descriptor_t *descriptor; - for(descriptor = (lam_reactor_descriptor_t*)lam_dbl_get_first(&r->r_active); + for(descriptor = (lam_reactor_descriptor_t*)lam_list_get_first(&r->r_active); descriptor != 0; - descriptor = (lam_reactor_descriptor_t*)lam_dbl_get_next(descriptor)) { + descriptor = (lam_reactor_descriptor_t*)lam_list_get_next(descriptor)) { int rd = descriptor->rd; int flags = 0; if(LAM_FD_ISSET(rd, rset) && descriptor->rd_flags & LAM_NOTIFY_RECV) { @@ -211,14 +211,14 @@ void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_ } /* cleanup any pending deletes while holding the lock */ - descriptor = (lam_reactor_descriptor_t*)lam_dbl_get_first(&r->r_active); + descriptor = (lam_reactor_descriptor_t*)lam_list_get_first(&r->r_active); while(descriptor != 0) { - lam_reactor_descriptor_t* next = (lam_reactor_descriptor_t*)lam_dbl_get_next(&r->r_active); + lam_reactor_descriptor_t* next = (lam_reactor_descriptor_t*)lam_list_get_next(&r->r_active); if(descriptor->rd_flags == 0) { lam_fh_remove_value_for_ikey(&r->r_hash, descriptor->rd); - lam_dbl_list_remove(&r->r_active, descriptor); - if(lam_dbl_get_size(&r->r_free) < MAX_DESCRIPTOR_POOL_SIZE) { - lam_dbl_append(&r->r_free, &descriptor->rd_base); + lam_list_remove(&r->r_active, descriptor); + if(lam_list_get_size(&r->r_free) < MAX_DESCRIPTOR_POOL_SIZE) { + lam_list_append(&r->r_free, &descriptor->rd_base); } else { lam_reactor_descriptor_destroy(descriptor); LAM_FREE(descriptor); @@ -228,18 +228,18 @@ void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_ } /* add any other pending inserts/deletes */ - while(lam_dbl_get_size(&r->r_pending)) { - lam_reactor_descriptor_t* descriptor = (lam_reactor_descriptor_t*)lam_dbl_remove_first(&r->r_pending); + while(lam_list_get_size(&r->r_pending)) { + lam_reactor_descriptor_t* descriptor = (lam_reactor_descriptor_t*)lam_list_remove_first(&r->r_pending); if(descriptor->rd_flags == 0) { lam_fh_remove_value_for_ikey(&r->r_hash, descriptor->rd); - if(lam_dbl_get_size(&r->r_free) < MAX_DESCRIPTOR_POOL_SIZE) { - lam_dbl_append(&r->r_free, &descriptor->rd_base); + if(lam_list_get_size(&r->r_free) < MAX_DESCRIPTOR_POOL_SIZE) { + lam_list_append(&r->r_free, &descriptor->rd_base); } else { lam_reactor_descriptor_destroy(descriptor); LAM_FREE(descriptor); } } else { - lam_dbl_append(&r->r_active, &descriptor->rd_base); + lam_list_append(&r->r_active, &descriptor->rd_base); if(descriptor->rd > r->r_max) r->r_max = descriptor->rd; } diff --git a/src/lam/util/reactor.h b/src/lam/util/reactor.h index 320f673ba7..0948c0cb27 100644 --- a/src/lam/util/reactor.h +++ b/src/lam/util/reactor.h @@ -70,6 +70,7 @@ bool lam_reactor_insert(lam_reactor_t*, int sd, lam_reactor_listener_t*, int fla bool lam_reactor_remove(lam_reactor_t*, int sd, lam_reactor_listener_t*, int flags); void lam_reactor_poll(lam_reactor_t*); void lam_reactor_run(lam_reactor_t*); +void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_set_t* sset, lam_fd_set_t* eset); #endif /* LAM_REACTOR_H */ diff --git a/src/mca/lam/oob/cofs/src/oob_cofs.c b/src/mca/lam/oob/cofs/src/oob_cofs.c index 93b03d1cdf..b0903a4b58 100644 --- a/src/mca/lam/oob/cofs/src/oob_cofs.c +++ b/src/mca/lam/oob/cofs/src/oob_cofs.c @@ -8,12 +8,60 @@ #include "mca/lam/oob/oob.h" #include "mca/lam/oob/cofs/src/oob_cofs.h" +#include "lam/util/malloc.h" + +#include +#include +#include +#include +#include + +static int blocking_recv_posted = 0; +static int do_recv(lam_job_handle_t job_handle, int* tag, int* vpid, + void** data, size_t* data_len); int mca_oob_cofs_send(lam_job_handle_t job_handle, int vpid, int tag, void* data, size_t data_len) { - return 0; + FILE *fp; + size_t wlen; + char msg_file[LAM_PATH_MAX]; + char msg_file_tmp[LAM_PATH_MAX]; + + /* create the file and open it... */ + snprintf(msg_file, LAM_PATH_MAX, "%s/%s_%d_%d_%d_%d.msg", mca_oob_cofs_comm_loc, + job_handle, mca_oob_cofs_my_vpid, vpid, tag, mca_oob_cofs_serial); + snprintf(msg_file_tmp, LAM_PATH_MAX, "%s/.%s_%d_%d_%d_%d.msg", mca_oob_cofs_comm_loc, + job_handle, mca_oob_cofs_my_vpid, vpid, tag, mca_oob_cofs_serial); + + fp = fopen(msg_file_tmp, "w"); + if (fp == NULL) { + return LAM_ERR_OUT_OF_RESOURCE; + } + + /* BWB - do network byte ordering... */ + /* write size */ + wlen = fwrite(&data_len, sizeof(size_t), 1, fp); + if (wlen != 1) { + fclose(fp); + unlink(msg_file_tmp); + return LAM_ERR_OUT_OF_RESOURCE; + } + + /* write packet */ + wlen = fwrite(data, 1, data_len, fp); + if (wlen != data_len) { + fclose(fp); + unlink(msg_file_tmp); + return LAM_ERR_OUT_OF_RESOURCE; + } + + /* publish the thing... */ + fclose(fp); + rename(msg_file_tmp, msg_file); + + return LAM_SUCCESS; } @@ -21,7 +69,13 @@ int mca_oob_cofs_recv(lam_job_handle_t job_handle, int* tag, int* vpid, void** data, size_t* data_len) { - return 0; + int ret = LAM_ERR_WOULD_BLOCK; + blocking_recv_posted = 1; + while (ret == LAM_ERR_WOULD_BLOCK) { + ret = do_recv(job_handle, tag, vpid, data, data_len); + } + blocking_recv_posted = 0; + return ret; } @@ -29,7 +83,11 @@ int mca_oob_cofs_recv_nb(lam_job_handle_t job_handle, int* tag, int* vpid, void** data, size_t* data_len) { - return 0; + if (blocking_recv_posted != 0) { + return LAM_ERR_WOULD_BLOCK; + } + + return do_recv(job_handle, tag, vpid, data, data_len); } @@ -37,5 +95,103 @@ int mca_oob_cofs_recv_cb(lam_job_handle_t job_handle, int tag, mca_oob_recv_cb_t callback) { - return 0; + return LAM_ERR_NOT_SUPPORTED; +} + + +static char* +find_match(lam_job_handle_t job_handle, int* tag, int* vpid) +{ + DIR* dir; + struct dirent *ent; + char tmp_handle[LAM_PATH_MAX]; + int tmp_tag, tmp_vpid, tmp_myvpid, tmp_serial; + int ret; + bool found = false; + char best_name[LAM_PATH_MAX]; + int best_tag, best_vpid; + + dir = opendir(mca_oob_cofs_comm_loc); + if (dir == NULL) { + return NULL; + } + + while ((ent = readdir(dir)) != NULL) { + if (ent->d_name[0] = '.') continue; + ret = sscanf(ent->d_name, "%s_%d_%d_%d_%d.msg", tmp_handle, &tmp_myvpid, + &tmp_vpid, &tmp_tag, &tmp_serial); + if (ret != 5) continue; + + if (strcmp(tmp_handle, job_handle)) continue; + if (tmp_myvpid != mca_oob_cofs_my_vpid) continue; + if (*tag != MCA_OOB_ANY_TAG && tmp_tag != *tag) continue; + + /* do best one here... */ + found = true; + strcpy(best_name, ent->d_name); + best_tag = tmp_tag; + best_vpid = tmp_vpid; + break; + } + + closedir(dir); + if (found) { + *tag = best_tag; + *vpid = best_vpid; + return strdup(best_name); + } else { + return NULL; + } +} + + +static int +do_recv(lam_job_handle_t job_handle, int* tag, int* vpid, + void** data, size_t* data_len) +{ + char *fname; + FILE *fp; + size_t rlen; + + fname = find_match(job_handle, tag, vpid); + if (fname == NULL) { + return LAM_ERR_WOULD_BLOCK; + } + + fp = fopen(fname, "r"); + if (fp == NULL) { + free(fname); + return LAM_ERROR; + } + + unlink(fname); + + rlen = fread(data_len, sizeof(size_t), 1, fp); + if (rlen != 1) { + free(fname); + fclose(fp); + return LAM_ERROR; + } + + *data = (void*) malloc(*data_len); + if (*data_len == NULL) { + free(fname); + fclose(fp); + *data_len = 0; + return LAM_ERROR; + } + + rlen = fread(*data, 1, *data_len, fp); + if (rlen != *data_len) { + free(fname); + fclose(fp); + free(*data); + *data_len = 0; + return LAM_ERROR; + } + + free(fname); + fclose(fp); + + return LAM_SUCCESS; } diff --git a/src/mca/lam/oob/cofs/src/oob_cofs.h b/src/mca/lam/oob/cofs/src/oob_cofs.h index d5e6c47d10..701871d4d6 100644 --- a/src/mca/lam/oob/cofs/src/oob_cofs.h +++ b/src/mca/lam/oob/cofs/src/oob_cofs.h @@ -3,6 +3,7 @@ * $HEADER$ * */ +#include "lam_config.h" #include "mca/lam/oob/oob.h" #include "lam/types.h" @@ -33,3 +34,8 @@ int mca_oob_cofs_recv_nb(lam_job_handle_t job_handle, int* tag, int* vpid, void** data, size_t* data_len); int mca_oob_cofs_recv_cb(lam_job_handle_t job_handle, int tag, mca_oob_recv_cb_t callback); + + +extern char mca_oob_cofs_comm_loc[LAM_PATH_MAX]; /* location for file drop-off */ +extern int mca_oob_cofs_my_vpid; +extern unsigned int mca_oob_cofs_serial; diff --git a/src/mca/lam/oob/cofs/src/oob_cofs_module.c b/src/mca/lam/oob/cofs/src/oob_cofs_module.c index 4f547ca109..60f06e5bdd 100644 --- a/src/mca/lam/oob/cofs/src/oob_cofs_module.c +++ b/src/mca/lam/oob/cofs/src/oob_cofs_module.c @@ -11,6 +11,10 @@ #include "mca/lam/oob/oob.h" #include "mca/lam/oob/cofs/src/oob_cofs.h" +#include +#include +#include +#include /* * Struct of function pointers and all that to let us be initialized @@ -37,6 +41,16 @@ mca_oob_module_1_0_0_t mca_oob_cofs_module_1_0_0_0 = { mca_oob_cofs_finalize }; +struct mca_oob_1_0_0_t mca_oob_cofs_1_0_0 = { + mca_oob_cofs_send, + mca_oob_cofs_recv, + mca_oob_cofs_recv_nb, + mca_oob_cofs_recv_cb +}; + +char mca_oob_cofs_comm_loc[LAM_PATH_MAX]; +int mca_oob_cofs_my_vpid; +unsigned int mca_oob_cofs_serial; int mca_oob_cofs_open(lam_cmd_line_t *cmd) @@ -63,11 +77,55 @@ mca_oob_cofs_query(int *priority) struct mca_oob_1_0_0* mca_oob_cofs_init(void) { + char *tmp; + FILE *fp; + /* * BWB - fix me, make register the "right" way... */ + tmp = getenv("MCA_common_lam_cofs_comm_dir"); + if (tmp == NULL) { + /* make it $HOME */ + tmp = getenv("HOME"); + if (tmp == NULL) { + printf("oob_cofs can not find communication dir\n"); + return NULL; + } + snprintf(mca_oob_cofs_comm_loc, LAM_PATH_MAX, "%s/cofs", tmp); + } else { + snprintf(mca_oob_cofs_comm_loc, LAM_PATH_MAX, "%s", tmp); + } + + /* + * See if we can write in our directory... + */ + tmp = malloc(strlen(mca_oob_cofs_comm_loc) + 5); + if (tmp == NULL) return NULL; + sprintf(tmp, "%s/me", mca_oob_cofs_comm_loc); + fp = fopen(tmp, "w"); + if (fp == NULL) { + printf("oob_cofs can not write in communication dir\n"); + free(tmp); + return NULL; + } + fclose(fp); + unlink(tmp); + free(tmp); + + /* + * BWB - fix me, make register the "right" way... + */ + /* find our vpid */ + tmp = getenv("MCA_OOB_BASE_VPID"); + if (tmp == NULL) { + printf("oob_cofs can not find vpid\n"); + return NULL; + } + mca_oob_cofs_my_vpid = atoi(tmp); + + mca_oob_cofs_serial = 0; - return NULL; + return &mca_oob_cofs_1_0_0; } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_proc.c b/src/mca/mpi/pml/teg/src/pml_teg_proc.c index c712dc906e..eafdae9025 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_proc.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_proc.c @@ -3,7 +3,8 @@ */ #include "mca/mpi/pml/teg/proc.h" - +#include "lam/atomic.h" +#include "mca/mpi/pml/teg/ptl_array.h" lam_class_info_t mca_pml_teg_proc_cls = { "mca_pml_teg_proc_t", @@ -21,8 +22,8 @@ void mca_pml_teg_proc_init(mca_pml_proc_t* proc) if(fetchNset(&mca_pml_teg_procs_init,1) == 0) lam_list_init(&mca_pml_teg_procs); SUPER_INIT(proc, &lam_list_item_cls); - mca_pml_teg_array_init(&proc->proc_ptl_first); - mca_pml_teg_array_init(&proc->proc_ptl_next); + mca_ptl_array_init(&proc->proc_ptl_first); + mca_ptl_array_init(&proc->proc_ptl_next); lam_list_append(&mca_pml_teg_procs, &proc->super); } diff --git a/test/unit/Makefile.am b/test/unit/Makefile.am index 7e82785d8a..c6aa483466 100644 --- a/test/unit/Makefile.am +++ b/test/unit/Makefile.am @@ -5,4 +5,4 @@ include $(top_srcdir)/config/Makefile.options -SUBDIRS = mpi +SUBDIRS = mpi lam