1
1
This commit was SVN r560.
Этот коммит содержится в:
Tim Woodall 2004-01-28 22:52:51 +00:00
родитель 98e4446759
Коммит a7005253da
42 изменённых файлов: 1731 добавлений и 221 удалений

Просмотреть файл

@ -5,17 +5,17 @@
#include "lam/mem/allocator.h"
#include "lam/mem/sharedmem_util.h"
void *lam_alc_malloc(lam_allocator_t *allocator, size_t chunk_size);
void lam_alc_default_free(lam_allocator_t *allocator, void *base_ptr);
void *lam_allocator_malloc(lam_allocator_t *allocator, size_t chunk_size);
void lam_allocator_default_free(lam_allocator_t *allocator, void *base_ptr);
lam_class_info_t allocator_cls = {"lam_allocator_t", &lam_object_cls,
(class_init_t)lam_alc_init, (class_destroy_t)lam_obj_destroy};
(class_init_t)lam_allocator_init, (class_destroy_t)lam_obj_destroy};
void lam_alc_init(lam_allocator_t *allocator)
void lam_allocator_init(lam_allocator_t *allocator)
{
SUPER_INIT(allocator, &lam_object_cls);
allocator->alc_alloc_fn = lam_alc_malloc;
allocator->alc_free_fn = lam_alc_free;
allocator->alc_alloc_fn = lam_allocator_malloc;
allocator->alc_free_fn = lam_allocator_free;
allocator->alc_is_shared = 0;
allocator->alc_mem_prot = 0;
allocator->alc_should_pin = 0;
@ -35,23 +35,23 @@ void *lam_alg_get_chunk(size_t chunk_size, int is_shared,
}
void *lam_alc_alloc(lam_allocator_t *allocator, size_t chunk_size)
void *lam_allocator_alloc(lam_allocator_t *allocator, size_t chunk_size)
{
return allocator->alc_alloc_fn(allocator, chunk_size);
}
void lam_alc_free(lam_allocator_t *allocator, void *chunk_ptr)
void lam_allocator_free(lam_allocator_t *allocator, void *chunk_ptr)
{
if ( chunk_ptr )
allocator->alc_free_fn(allocator, chunk_ptr);
}
void *lam_alc_malloc(lam_allocator_t *allocator, size_t chunk_size)
void *lam_allocator_malloc(lam_allocator_t *allocator, size_t chunk_size)
{
return malloc(chunk_size);
}
void lam_alc_default_free(lam_allocator_t *allocator, void *chunk_ptr)
void lam_allocator_default_free(lam_allocator_t *allocator, void *chunk_ptr)
{
if ( chunk_ptr )
free(chunk_ptr);

Просмотреть файл

@ -33,29 +33,29 @@ typedef struct lam_allocator
extern lam_class_info_t allocator_cls;
void lam_alc_init(lam_allocator_t *allocator);
void lam_allocator_init(lam_allocator_t *allocator);
void *lam_alg_get_chunk(size_t chunk_size, int is_shared,
int mem_protect);
void *lam_alc_alloc(lam_allocator_t *allocator, size_t chunk_size);
void lam_alc_free(lam_allocator_t *allocator, void *chunk_ptr);
void *lam_allocator_alloc(lam_allocator_t *allocator, size_t chunk_size);
void lam_allocator_free(lam_allocator_t *allocator, void *chunk_ptr);
static inline int lam_alc_get_is_shared(lam_allocator_t *allocator) {return allocator->alc_is_shared;}
static inline void lam_alc_set_is_shared(lam_allocator_t *allocator, int is_shared) {allocator->alc_is_shared = is_shared;}
static inline int lam_allocator_get_is_shared(lam_allocator_t *allocator) {return allocator->alc_is_shared;}
static inline void lam_allocator_set_is_shared(lam_allocator_t *allocator, int is_shared) {allocator->alc_is_shared = is_shared;}
static inline int lam_alc_get_mem_prot(lam_allocator_t *allocator) {return allocator->alc_mem_prot;}
static inline void lam_alc_set_mem_prot(lam_allocator_t *allocator, int mem_prot) {allocator->alc_mem_prot = mem_prot;}
static inline int lam_allocator_get_mem_prot(lam_allocator_t *allocator) {return allocator->alc_mem_prot;}
static inline void lam_allocator_set_mem_prot(lam_allocator_t *allocator, int mem_prot) {allocator->alc_mem_prot = mem_prot;}
static inline int lam_alc_get_should_pin(lam_allocator_t *allocator) {return allocator->alc_should_pin;}
static inline void lam_alc_set_should_pin(lam_allocator_t *allocator, int pin) {allocator->alc_should_pin = pin;}
static inline int lam_allocator_get_should_pin(lam_allocator_t *allocator) {return allocator->alc_should_pin;}
static inline void lam_allocator_set_should_pin(lam_allocator_t *allocator, int pin) {allocator->alc_should_pin = pin;}
static inline uint64_t lam_alc_get_pin_offset(lam_allocator_t *allocator) {return allocator->alc_pinned_offset;}
static inline void lam_alc_set_pin_offset(lam_allocator_t *allocator, uint64_t pin_offset)
static inline uint64_t lam_allocator_get_pin_offset(lam_allocator_t *allocator) {return allocator->alc_pinned_offset;}
static inline void lam_allocator_set_pin_offset(lam_allocator_t *allocator, uint64_t pin_offset)
{allocator->alc_pinned_offset = pin_offset;}
static inline uint64_t lam_alc_get_pin_size(lam_allocator_t *allocator) {return allocator->alc_pinned_sz;}
static inline void lam_alc_set_pin_size(lam_allocator_t *allocator, uint64_t pin_sz)
static inline uint64_t lam_allocator_get_pin_size(lam_allocator_t *allocator) {return allocator->alc_pinned_sz;}
static inline void lam_allocator_set_pin_size(lam_allocator_t *allocator, uint64_t pin_sz)
{allocator->alc_pinned_sz = pin_sz;}
#endif /* LAM_ALLOCATOR_H */

Просмотреть файл

@ -1,7 +1,71 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "lam/mem/free_list.h"
lam_class_info_t lam_free_list_cls = {
"lam_free_list_t",
&lam_list_cls,
(class_init_t)lam_free_list_init,
(class_destroy_t)lam_free_list_destroy
};
void lam_free_list_init(lam_free_list_t* fl)
{
SUPER_INIT(fl, &lam_list_cls);
}
void lam_free_list_destroy(lam_free_list_t* fl)
{
SUPER_DESTROY(fl, &lam_list_cls);
}
int lam_free_list_init_with(
lam_free_list_t *flist,
size_t elem_size,
lam_class_info_t* elem_class,
int num_elements_to_alloc,
int max_elements_to_alloc,
int num_elements_per_alloc,
lam_allocator_t* allocator)
{
flist->fl_elem_size = elem_size;
flist->fl_elem_class = elem_class;
flist->fl_max_to_alloc = max_elements_to_alloc;
flist->fl_num_allocated = 0;
flist->fl_num_per_alloc = num_elements_per_alloc;
flist->fl_allocator = allocator;
return lam_free_list_grow(flist, num_elements_to_alloc);
}
int lam_free_list_grow(lam_free_list_t* flist, size_t num_elements)
{
unsigned char* ptr;
size_t i;
if (flist->fl_max_to_alloc > 0 && flist->fl_num_allocated + num_elements > flist->fl_max_to_alloc)
return LAM_ERR_TEMP_OUT_OF_RESOURCE;
if (NULL != flist->fl_allocator)
ptr = (unsigned char*)lam_allocator_alloc(flist->fl_allocator, num_elements * flist->fl_elem_size);
else
ptr = (unsigned char*)LAM_MALLOC(num_elements * flist->fl_elem_size);
if(NULL == ptr)
return LAM_ERR_TEMP_OUT_OF_RESOURCE;
for(i=0; i<num_elements; i++) {
lam_list_item_t* item = (lam_list_item_t*)ptr;
if (NULL != flist->fl_elem_class)
STATIC_INIT(item, flist->fl_elem_class);
lam_list_append(&flist->super, item);
ptr += flist->fl_elem_size;
}
flist->fl_num_allocated += num_elements;
return LAM_SUCCESS;
}

Просмотреть файл

@ -20,32 +20,49 @@ struct lam_free_list_t
int fl_max_to_alloc;
int fl_num_allocated;
int fl_num_per_alloc;
lam_allocator_t fl_allocator;
size_t fl_elem_size;
lam_class_info_t* fl_elem_class;
lam_allocator_t* fl_allocator;
lam_mutex_t fl_lock;
};
typedef struct lam_free_list_t lam_free_list_t;
void lam_free_list_init(lam_free_list_t *flist);
void lam_free_list_destroy(lam_free_list_t *flist);
int lam_free_list_grow(lam_free_list_t* flist, size_t num_elements);
/* lam_free_list_init() must have been called prior to calling this function */
int lam_free_list_init_with(
lam_free_list_t *flist,
size_t element_size,
lam_class_info_t* element_class,
int num_elements_to_alloc,
int max_elements_to_alloc,
int num_elements_per_alloc,
lam_allocator_t*);
static inline lam_list_item_t *lam_free_list_get(lam_free_list_t * list, int *rc)
static inline lam_list_item_t *lam_free_list_get(lam_free_list_t * fl, int *rc)
{
return NULL;
lam_list_item_t* item;
THREAD_LOCK(&fl->fl_lock);
item = lam_list_remove_first(&fl->super);
if(NULL == item) {
lam_free_list_grow(fl, fl->fl_num_per_alloc);
item = lam_list_remove_first(&fl->super);
}
THREAD_UNLOCK(&fl->fl_lock);
*rc = (NULL != item) ? LAM_SUCCESS : LAM_ERR_TEMP_OUT_OF_RESOURCE;
return item;
}
static inline int lam_free_list_return(lam_free_list_t *list, lam_list_item_t *rc)
static inline int lam_free_list_return(lam_free_list_t *fl, lam_list_item_t *item)
{
return LAM_ERROR;
THREAD_LOCK(&fl->fl_lock);
lam_list_append(&fl->super, item);
THREAD_UNLOCK(&fl->fl_lock);
return LAM_SUCCESS;
}
#endif

Просмотреть файл

@ -38,8 +38,8 @@ void lam_mp_shared_init(lam_mem_pool_t *pool)
pool->mp_private_alloc = OBJ_CREATE(lam_allocator_t, &allocator_cls);
lam_mutex_init(&(pool->mp_lock));
lam_alc_set_is_shared(pool->mp_private_alloc, 1);
lam_alc_set_mem_prot(pool->mp_private_alloc, MMAP_SHARED_PROT);
lam_allocator_set_is_shared(pool->mp_private_alloc, 1);
lam_allocator_set_mem_prot(pool->mp_private_alloc, MMAP_SHARED_PROT);
pool->mp_dev_alloc = NULL;
}
@ -104,15 +104,15 @@ int lam_mp_init_with(lam_mem_pool_t *pool, uint64_t pool_size,
/* add red-zone pages */
/* set up dev allocator to use pinned memory */
lam_alc_set_should_pin(pool->mp_dev_alloc, 1);
lam_alc_set_pin_offset(pool->mp_dev_alloc, page_size);
lam_allocator_set_should_pin(pool->mp_dev_alloc, 1);
lam_allocator_set_pin_offset(pool->mp_dev_alloc, page_size);
while (!ptr && wrk_size) {
to_alloc = wrk_size + 2 * page_size;
/* allocate memory. Reset pinned memory size. */
lam_alc_set_pin_size(pool->mp_dev_alloc, wrk_size);
ptr = lam_alc_alloc(pool->mp_dev_alloc, to_alloc);
lam_allocator_set_pin_size(pool->mp_dev_alloc, wrk_size);
ptr = lam_allocator_alloc(pool->mp_dev_alloc, to_alloc);
if (ptr == 0)
wrk_size /= 2;
else
@ -147,7 +147,7 @@ int lam_mp_init_with(lam_mem_pool_t *pool, uint64_t pool_size,
/* initialize chunk descriptors */
to_alloc = sizeof(lam_chunk_desc_t) * pool->mp_num_chunks;
pool->mp_chunks = lam_alc_alloc(pool->mp_private_alloc, to_alloc);
pool->mp_chunks = lam_allocator_alloc(pool->mp_private_alloc, to_alloc);
if ( !pool->mp_chunks )
{
lam_output(0, "Error: Out of memory");
@ -197,7 +197,7 @@ void *lam_mp_request_chunk(lam_mem_pool_t *pool, int pool_index)
/* allocate larger array of chunk descriptors and
copy old array into new array */
to_alloc = sizeof(lam_chunk_desc_t) * (pool->mp_num_chunks + 1);
chunk_desc = lam_alc_alloc(pool->mp_private_alloc, to_alloc);
chunk_desc = lam_allocator_alloc(pool->mp_private_alloc, to_alloc);
if ( !chunk_desc )
{
lam_output(0, "Error! Out of memory!");
@ -210,16 +210,16 @@ void *lam_mp_request_chunk(lam_mem_pool_t *pool, int pool_index)
}
/* free old array and set old array pointer to point to new array */
lam_alc_free(pool->mp_private_alloc, pool->mp_chunks);
lam_allocator_free(pool->mp_private_alloc, pool->mp_chunks);
pool->mp_chunks = chunk_desc;
/* allocate new memory chunk using device allocator. */
lam_alc_set_should_pin(pool->mp_dev_alloc, 1);
lam_alc_set_pin_offset(pool->mp_dev_alloc, 0);
lam_alc_set_pin_size(pool->mp_dev_alloc, 0);
lam_allocator_set_should_pin(pool->mp_dev_alloc, 1);
lam_allocator_set_pin_offset(pool->mp_dev_alloc, 0);
lam_allocator_set_pin_size(pool->mp_dev_alloc, 0);
pool->mp_chunks[pool->mp_num_chunks].chd_base_ptr =
lam_alc_alloc(pool->mp_dev_alloc, pool->mp_chunk_sz);
lam_allocator_alloc(pool->mp_dev_alloc, pool->mp_chunk_sz);
if ( !pool->mp_chunks[pool->mp_num_chunks].chd_base_ptr )
{
lam_output(0, "Error: Out of memory");
@ -277,8 +277,8 @@ void lam_fmp_init(lam_fixed_mpool_t *pool)
SUPER_INIT(pool, &lam_object_cls);
pool->fmp_private_alloc = OBJ_CREATE(lam_allocator_t, &allocator_cls);
lam_alc_set_is_shared(pool->fmp_private_alloc, 1);
lam_alc_set_mem_prot(pool->fmp_private_alloc, MMAP_SHARED_PROT);
lam_allocator_set_is_shared(pool->fmp_private_alloc, 1);
lam_allocator_set_mem_prot(pool->fmp_private_alloc, MMAP_SHARED_PROT);
pool->fmp_segments = NULL;
pool->fmp_n_segments = NULL;

Просмотреть файл

@ -69,7 +69,7 @@ void *lam_mp_request_chunk(lam_mem_pool_t *pool, int pool_index);
/* returns 1 if pool uses shared memory, 0 otherwise. */
#define lam_mp_uses_shared_mem(pool) \
lam_alc_get_is_shared(pool->mp_private_alloc)
lam_allocator_get_is_shared(pool->mp_private_alloc)
#define lam_mp_get_dev_allocator(pool) \
((pool)->mp_dev_alloc)

Просмотреть файл

@ -45,5 +45,23 @@ typedef union {
void* pval;
} lam_ptr_t;
/*
* handle differences in iovec
*/
#if defined(__APPLE__)
typedef char* lam_iov_base_ptr_t;
#else
typedef void* lam_iov_base_ptr_t;
#endif
/*
* handle differences in socklen_t
*/
#if defined(__linux__)
typedef socklen_t lam_socklen_t;
#else
typedef int lam_socklen_t;
#endif
#endif

Просмотреть файл

@ -15,10 +15,10 @@
#include "lam/runtime/runtime.h"
const int LAM_NOTIFY_RECV = 1;
const int LAM_NOTIFY_SEND = 2;
const int LAM_NOTIFY_EXCEPT = 4;
const int LAM_NOTIFY_ALL = 7;
const int LAM_REACTOR_NOTIFY_RECV = 1;
const int LAM_REACTOR_NOTIFY_SEND = 2;
const int LAM_REACTOR_NOTIFY_EXCEPT = 4;
const int LAM_REACTOR_NOTIFY_ALL = 7;
#define MAX_DESCRIPTOR_POOL_SIZE 256
@ -79,7 +79,7 @@ void lam_reactor_init(lam_reactor_t* r)
lam_list_init(&r->r_free);
lam_list_init(&r->r_pending);
lam_fh_init(&r->r_hash);
lam_fh_init_with(&r->r_hash, 1024);
lam_fh_resize(&r->r_hash, 1024);
r->r_max = -1;
r->r_run = true;
@ -123,17 +123,17 @@ int lam_reactor_insert(lam_reactor_t* r, int sd, lam_reactor_listener_t* listene
}
descriptor->rd_flags |= flags;
if(flags & LAM_NOTIFY_RECV) {
if(flags & LAM_REACTOR_NOTIFY_RECV) {
descriptor->rd_recv = listener;
descriptor->rd_recv_user = user;
LAM_FD_SET(sd, &r->r_recv_set);
}
if(flags & LAM_NOTIFY_SEND) {
if(flags & LAM_REACTOR_NOTIFY_SEND) {
descriptor->rd_send = listener;
descriptor->rd_send_user = user;
LAM_FD_SET(sd, &r->r_send_set);
}
if(flags & LAM_NOTIFY_EXCEPT) {
if(flags & LAM_REACTOR_NOTIFY_EXCEPT) {
descriptor->rd_except = listener;
descriptor->rd_except_user = user;
LAM_FD_SET(sd, &r->r_except_set);
@ -161,15 +161,15 @@ int lam_reactor_remove(lam_reactor_t* r, int sd, int flags)
return LAM_ERR_BAD_PARAM;
}
descriptor->rd_flags &= ~flags;
if(flags & LAM_NOTIFY_RECV) {
if(flags & LAM_REACTOR_NOTIFY_RECV) {
descriptor->rd_recv = 0;
LAM_FD_CLR(sd, &r->r_recv_set);
}
if(flags & LAM_NOTIFY_SEND) {
if(flags & LAM_REACTOR_NOTIFY_SEND) {
descriptor->rd_send = 0;
LAM_FD_CLR(sd, &r->r_send_set);
}
if(flags & LAM_NOTIFY_EXCEPT) {
if(flags & LAM_REACTOR_NOTIFY_EXCEPT) {
descriptor->rd_except = 0;
LAM_FD_CLR(sd, &r->r_except_set);
}
@ -194,17 +194,17 @@ void lam_reactor_dispatch(lam_reactor_t* r, int cnt, lam_fd_set_t* rset, lam_fd_
descriptor = (lam_reactor_descriptor_t*)lam_list_get_next(descriptor)) {
int rd = descriptor->rd;
int flags = 0;
if(LAM_FD_ISSET(rd, rset) && descriptor->rd_flags & LAM_NOTIFY_RECV) {
if(LAM_FD_ISSET(rd, rset) && descriptor->rd_flags & LAM_REACTOR_NOTIFY_RECV) {
descriptor->rd_recv->rl_recv_handler(descriptor->rd_recv_user, rd);
flags |= LAM_NOTIFY_RECV;
flags |= LAM_REACTOR_NOTIFY_RECV;
}
if(LAM_FD_ISSET(rd, sset) && descriptor->rd_flags & LAM_NOTIFY_SEND) {
if(LAM_FD_ISSET(rd, sset) && descriptor->rd_flags & LAM_REACTOR_NOTIFY_SEND) {
descriptor->rd_send->rl_send_handler(descriptor->rd_send_user, rd);
flags |= LAM_NOTIFY_SEND;
flags |= LAM_REACTOR_NOTIFY_SEND;
}
if(LAM_FD_ISSET(rd, eset) && descriptor->rd_flags & LAM_NOTIFY_EXCEPT) {
if(LAM_FD_ISSET(rd, eset) && descriptor->rd_flags & LAM_REACTOR_NOTIFY_EXCEPT) {
descriptor->rd_except->rl_except_handler(descriptor->rd_except_user, rd);
flags |= LAM_NOTIFY_EXCEPT;
flags |= LAM_REACTOR_NOTIFY_EXCEPT;
}
if(flags) cnt--;
}

Просмотреть файл

@ -10,10 +10,10 @@
#include "lam/lfc/hash_table.h"
#include "lam/threads/mutex.h"
extern const int LAM_NOTIFY_ALL;
extern const int LAM_NOTIFY_RECV;
extern const int LAM_NOTIFY_SEND;
extern const int LAM_NOTIFY_EXCEPT;
extern const int LAM_REACTOR_NOTIFY_ALL;
extern const int LAM_REACTOR_NOTIFY_RECV;
extern const int LAM_REACTOR_NOTIFY_SEND;
extern const int LAM_REACTOR_NOTIFY_EXCEPT;
extern lam_class_info_t lam_reactor_cls;

Просмотреть файл

@ -14,6 +14,7 @@ noinst_LTLIBRARIES = libmca_pml_teg.la
libmca_pml_teg_la_SOURCES = \
pml_teg.c \
pml_teg.h \
pml_teg_irecv.c \
pml_teg_isend.c \
pml_teg_module.c \
pml_teg_proc.c \
@ -21,5 +22,8 @@ libmca_pml_teg_la_SOURCES = \
pml_teg_progress.c \
pml_teg_sendreq.c \
pml_teg_sendreq.h \
pml_teg_start.c \
pml_teg_test.c \
pml_teg_wait.c \
pml_ptl_array.c \
pml_ptl_array.h

Просмотреть файл

@ -21,24 +21,13 @@ mca_pml_teg_t mca_pml_teg = {
mca_pml_teg_del_procs,
mca_pml_teg_fini,
mca_pml_teg_progress,
#if TIM_HASNT_IMPLEMENTED_THIS_YET
mca_pml_teg_irecv_init,
mca_pml_teg_irecv,
#else
NULL,
NULL,
#endif
mca_pml_teg_isend_init,
mca_pml_teg_isend,
#if TIM_HASNT_IMPLEMENTED_YET
mca_pml_teg_start,
mca_pml_teg_test,
mca_pml_teg_wait,
#else
NULL,
NULL,
NULL
#endif
}
};
@ -126,7 +115,8 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
mca_ptl_t* ptl = mca_pml_teg.teg_ptls[p_index];
/* if the ptl can reach the destination proc it will return
* addressing information that will be cached on the proc
* addressing information that will be cached on the proc, if it
* cannot reach the proc - but another peer
*/
struct mca_ptl_peer_t* ptl_peer;
int rc = ptl->ptl_add_proc(ptl, proc, &ptl_peer);

30
src/mca/mpi/pml/teg/src/pml_teg_irecv.c Обычный файл
Просмотреть файл

@ -0,0 +1,30 @@
#include "pml_teg.h"
extern int mca_pml_teg_irecv_init(
void *buf,
size_t size,
struct lam_datatype_t *datatype,
int src,
int tag,
bool persistent,
struct lam_communicator_t* comm,
struct lam_request_t **request
)
{
return LAM_ERROR;
}
int mca_pml_teg_irecv(
void *buf,
size_t size,
struct lam_datatype_t *datatype,
int src,
int tag,
struct lam_communicator_t* comm,
struct lam_request_t **request
)
{
return LAM_ERROR;
}

Просмотреть файл

@ -23,7 +23,7 @@ int mca_pml_teg_isend_init(
if(rc != LAM_SUCCESS)
return rc;
mca_ptl_base_send_request_rinit(
mca_ptl_base_send_request_reinit(
sendreq,
buf,
size,
@ -55,7 +55,7 @@ int mca_pml_teg_isend(
if(rc != LAM_SUCCESS)
return rc;
mca_ptl_base_send_request_rinit(
mca_ptl_base_send_request_reinit(
sendreq,
buf,
size,

9
src/mca/mpi/pml/teg/src/pml_teg_start.c Обычный файл
Просмотреть файл

@ -0,0 +1,9 @@
#include "pml_teg.h"
int mca_pml_teg_start(
lam_request_t* request
)
{
return LAM_ERROR;
}

11
src/mca/mpi/pml/teg/src/pml_teg_test.c Обычный файл
Просмотреть файл

@ -0,0 +1,11 @@
#include "pml_teg.h"
int mca_pml_teg_test(
lam_request_t** request,
int count,
int *completed)
{
return LAM_ERROR;
}

10
src/mca/mpi/pml/teg/src/pml_teg_wait.c Обычный файл
Просмотреть файл

@ -0,0 +1,10 @@
#include "pml_teg.h"
int mca_pml_teg_wait(
lam_request_t* request,
lam_status_public_t* status)
{
return LAM_ERROR;
}

Просмотреть файл

@ -15,10 +15,10 @@ extern lam_class_info_t mca_ptl_comm_cls;
struct mca_pml_comm_t {
/* send message sequence-number support - sender side */
mca_ptl_base_sequence_t *c_msg_seq_num;
mca_ptl_base_sequence_t *c_msg_seq;
/* send message sequence-number support - receiver side */
mca_ptl_base_sequence_t *c_next_msg_seq_num;
mca_ptl_base_sequence_t *c_next_msg_seq;
/* matching lock */
lam_mutex_t *c_matching_lock;

Просмотреть файл

@ -15,7 +15,7 @@ extern lam_class_info_t mca_ptl_base_frag_cls;
struct mca_ptl_base_frag_t {
lam_list_item_t super;
mca_ptl_base_reliable_hdr_t frag_header;
mca_ptl_base_header_t frag_header;
struct mca_ptl_t* frag_owner; /**< PTL that allocated this fragment */
};
typedef struct mca_ptl_base_frag_t mca_ptl_base_frag_t;

Просмотреть файл

@ -15,29 +15,24 @@ typedef struct {
/* communicator index */
uint32_t hdr_contextid;
/* source rank */
int hdr_src_rank;
int32_t hdr_src_rank;
/* destination rank */
int hdr_dst_rank;
int32_t hdr_dst_rank;
/* user tag */
int hdr_user_tag;
int32_t hdr_user_tag;
/* type of message - send/bsend/ssend/rsend/recv */
int hdr_msg_type;
int32_t hdr_msg_type;
/* message length */
size_t hdr_msg_length;
/* fragment length */
size_t hdr_frag_length;
uint32_t hdr_msg_length;
/* offset into message */
size_t hdr_offset;
} mca_ptl_base_hdr_t;
typedef struct {
/* base header */
mca_ptl_base_hdr_t hdr_base;
uint32_t hdr_msg_offset;
/* fragment length */
uint32_t hdr_frag_length;
/* message sequence number */
mca_ptl_base_sequence_t hdr_msg_seq_num;
mca_ptl_base_sequence_t hdr_msg_seq;
/* fragment sequence number */
mca_ptl_base_sequence_t hdr_frag_seq_num;
} mca_ptl_base_reliable_hdr_t;
mca_ptl_base_sequence_t hdr_frag_seq;
} mca_ptl_base_header_t;
#endif /* MCA_PML_BASE_HEADER_H */

Просмотреть файл

@ -47,12 +47,12 @@
* - fragments may be corrupt
* - this routine may be called simoultaneously by more than one thread
*/
int mca_ptl_base_match(mca_ptl_base_reliable_hdr_t *frag_header,
int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
mca_ptl_base_recv_frag_t *frag_desc, int *match_made,
lam_list_t *additional_matches)
{
/* local variables */
mca_ptl_base_sequence_t frag_msg_seq_num,next_msg_seq_num_expected;
mca_ptl_base_sequence_t frag_msg_seq,next_msg_seq_expected;
lam_communicator_t *comm_ptr;
mca_ptl_base_recv_request_t *matched_receive;
mca_pml_comm_t *pml_comm;
@ -62,14 +62,14 @@ int mca_ptl_base_match(mca_ptl_base_reliable_hdr_t *frag_header,
*match_made=0;
/* communicator pointer */
comm_ptr=lam_comm_lookup(frag_header->hdr_base.hdr_contextid);
comm_ptr=lam_comm_lookup(frag_header->hdr_contextid);
pml_comm=(mca_pml_comm_t *)comm_ptr->c_pml_comm;
/* source sequence number */
frag_msg_seq_num = frag_header->hdr_msg_seq_num;
frag_msg_seq = frag_header->hdr_msg_seq;
/* get fragment communicator source rank */
frag_src = frag_header->hdr_frag_seq_num;
frag_src = frag_header->hdr_frag_seq;
/* get next expected message sequence number - if threaded
* run, lock to make sure that if another thread is processing
@ -81,9 +81,9 @@ int mca_ptl_base_match(mca_ptl_base_reliable_hdr_t *frag_header,
THREAD_LOCK((pml_comm->c_matching_lock)+frag_src);
/* get sequence number of next message that can be processed */
next_msg_seq_num_expected = *((pml_comm->c_next_msg_seq_num)+frag_src);
next_msg_seq_expected = *((pml_comm->c_next_msg_seq)+frag_src);
if (frag_msg_seq_num == next_msg_seq_num_expected) {
if (frag_msg_seq == next_msg_seq_expected) {
/*
* This is the sequence number we were expecting,
@ -92,7 +92,7 @@ int mca_ptl_base_match(mca_ptl_base_reliable_hdr_t *frag_header,
*/
/* We're now expecting the next sequence number. */
(pml_comm->c_next_msg_seq_num[frag_src])++;
(pml_comm->c_next_msg_seq[frag_src])++;
/* see if receive has already been posted */
matched_receive = mca_ptl_base_check_recieves_for_match(frag_header,
@ -105,7 +105,7 @@ int mca_ptl_base_match(mca_ptl_base_reliable_hdr_t *frag_header,
*match_made=1;
/* associate the receive descriptor with the fragment
* descriptor */
frag_desc->matched_recv=matched_receive;
frag_desc->frag_match=matched_receive;
/*
* update deliverd sequence number information,
@ -178,7 +178,7 @@ int mca_ptl_base_match(mca_ptl_base_reliable_hdr_t *frag_header,
* set by the upper level routine.
*/
mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match
(mca_ptl_base_reliable_hdr_t *frag_header, mca_pml_comm_t *pml_comm)
(mca_ptl_base_header_t *frag_header, mca_pml_comm_t *pml_comm)
{
/* local parameters */
mca_ptl_base_recv_request_t *return_match;
@ -192,7 +192,7 @@ mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match
* look only at "specific" recieves, or "wild" receives,
* or if we need to traverse both sets at the same time.
*/
frag_src = frag_header->hdr_frag_seq_num;
frag_src = frag_header->hdr_frag_seq;
if (lam_list_get_size((pml_comm->specific_receives)+frag_src) == 0 ){
/*
@ -232,7 +232,7 @@ mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match
* set by the upper level routine.
*/
mca_ptl_base_recv_request_t *check_wild_receives_for_match(
mca_ptl_base_reliable_hdr_t *frag_header,
mca_ptl_base_header_t *frag_header,
mca_pml_comm_t *pml_comm)
{
/* local parameters */
@ -241,7 +241,7 @@ mca_ptl_base_recv_request_t *check_wild_receives_for_match(
/* initialization */
return_match=(mca_ptl_base_recv_request_t *)NULL;
frag_user_tag=frag_header->hdr_base.hdr_user_tag;
frag_user_tag=frag_header->hdr_user_tag;
/*
* Loop over the wild irecvs - no need to lock, the upper level
@ -299,7 +299,7 @@ mca_ptl_base_recv_request_t *check_wild_receives_for_match(
* set by the upper level routine.
*/
mca_ptl_base_recv_request_t *check_specific_receives_for_match(
mca_ptl_base_reliable_hdr_t *frag_header,
mca_ptl_base_header_t *frag_header,
mca_pml_comm_t *pml_comm)
{
/* local variables */
@ -309,8 +309,8 @@ mca_ptl_base_recv_request_t *check_specific_receives_for_match(
/* initialization */
return_match=(mca_ptl_base_recv_request_t *)NULL;
frag_src = frag_header->hdr_frag_seq_num;
frag_user_tag=frag_header->hdr_base.hdr_user_tag;
frag_src = frag_header->hdr_frag_seq;
frag_user_tag=frag_header->hdr_user_tag;
/*
* Loop over the specific irecvs.
@ -361,18 +361,18 @@ mca_ptl_base_recv_request_t *check_specific_receives_for_match(
* set by the upper level routine.
*/
mca_ptl_base_recv_request_t *check_specific_and_wild_receives_for_match(
mca_ptl_base_reliable_hdr_t *frag_header,
mca_ptl_base_header_t *frag_header,
mca_pml_comm_t *pml_comm)
{
/* local variables */
mca_ptl_base_recv_request_t *specific_recv, *wild_recv, *return_match;
mca_ptl_base_sequence_t wild_recv_seq_num, specific_recv_seq_num;
mca_ptl_base_sequence_t wild_recv_seq, specific_recv_seq;
int frag_src,frag_user_tag, wild_recv_tag, specific_recv_tag;
/* initialization */
return_match=(mca_ptl_base_recv_request_t *)NULL;
frag_src = frag_header->hdr_frag_seq_num;
frag_user_tag=frag_header->hdr_base.hdr_user_tag;
frag_src = frag_header->hdr_frag_seq;
frag_user_tag=frag_header->hdr_user_tag;
/*
* We know that when this is called, both specific and wild irecvs
@ -383,11 +383,11 @@ mca_ptl_base_recv_request_t *check_specific_and_wild_receives_for_match(
wild_recv = (mca_ptl_base_recv_request_t *)
lam_list_get_first(&(pml_comm->wild_receives));
specific_recv_seq_num = specific_recv->req_sequence;
wild_recv_seq_num = wild_recv->req_sequence;
specific_recv_seq = specific_recv->req_sequence;
wild_recv_seq = wild_recv->req_sequence;
while (true) {
if (wild_recv_seq_num < specific_recv_seq_num) {
if (wild_recv_seq < specific_recv_seq) {
/*
* wild recv is earlier than the specific one.
*/
@ -433,7 +433,7 @@ mca_ptl_base_recv_request_t *check_specific_and_wild_receives_for_match(
* Get the sequence number for this recv, and go
* back to the top of the loop.
*/
wild_recv_seq_num = wild_recv->req_sequence;
wild_recv_seq = wild_recv->req_sequence;
} else {
/*
@ -478,7 +478,7 @@ mca_ptl_base_recv_request_t *check_specific_and_wild_receives_for_match(
* Get the sequence number for this recv, and go
* back to the top of the loop.
*/
specific_recv_seq_num = specific_recv->req_sequence;
specific_recv_seq = specific_recv->req_sequence;
}
}
}
@ -506,7 +506,7 @@ void lam_check_cantmatch_for_match(lam_list_t *additional_matches,
{
/* local parameters */
int match_found;
mca_pml_base_sequence_t next_msg_seq_num_expected, frag_seq_number;
mca_pml_base_sequence_t next_msg_seq_expected, frag_seqber;
mca_pml_base_recv_frag_t *frag_desc;
mca_pml_base_recv_request_t *matched_receive;
@ -529,10 +529,10 @@ void lam_check_cantmatch_for_match(lam_list_t *additional_matches,
match_found = 0;
/* get sequence number of next message that can be processed */
next_msg_seq_num_expected = *((pml_comm->c_next_msg_seq_num)+frag_src);
next_msg_seq_expected = *((pml_comm->c_next_msg_seq)+frag_src);
/* search the list for a fragment from the send with sequence
* number next_msg_seq_num_expected
* number next_msg_seq_expected
*/
for(frag_desc = (mca_pml_base_recv_frag_t *)
lam_list_get_first((pml_comm->frags_cant_match)+frag_src);
@ -544,8 +544,8 @@ void lam_check_cantmatch_for_match(lam_list_t *additional_matches,
/*
* If the message has the next expected seq from that proc...
*/
frag_seq_number=frag_desc->super.frag_header.hdr_msg_seq_num;
if (frag_seq_number == next_msg_seq_num_expected) {
frag_seqber=frag_desc->super.frag_header.hdr_msg_seq;
if (frag_seqber == next_msg_seq_expected) {
/* initialize list on first entry - assume that most
* of the time nothing is found, so initially we just
@ -558,7 +558,7 @@ void lam_check_cantmatch_for_match(lam_list_t *additional_matches,
}
/* We're now expecting the next sequence number. */
(pml_comm->c_next_msg_seq_num[frag_src])++;
(pml_comm->c_next_msg_seq[frag_src])++;
/* signal that match was made */
match_found = 1;
@ -580,7 +580,7 @@ void lam_check_cantmatch_for_match(lam_list_t *additional_matches,
/* associate the receive descriptor with the fragment
* descriptor */
frag_desc->matched_recv=matched_receive;
frag_desc->frag_match=matched_receive;
/* add this fragment descriptor to the list of
* descriptors to be processed later
@ -604,7 +604,7 @@ void lam_check_cantmatch_for_match(lam_list_t *additional_matches,
* and re-start search for next sequence number */
break;
} /* end if (frag_seq_number == next_msg_seq_num_expected) */
} /* end if (frag_seqber == next_msg_seq_expected) */
} /* end for (frag_desc) loop */

Просмотреть файл

@ -5,24 +5,24 @@
#ifndef MCA_PTL_BASE_MATCH_H
#define MCA_PTL_BASE_MATCH_H
int mca_ptl_base_match(mca_ptl_base_reliable_hdr_t *frag_header,
int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
mca_ptl_base_recv_frag_t *frag_desc, int *match_made,
lam_list_t *additional_matches);
mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match(
mca_ptl_base_reliable_hdr_t *frag_header,
mca_ptl_base_header_t *frag_header,
mca_pml_comm_t *ptl_comm);
mca_ptl_base_recv_request_t *check_wild_receives_for_match(
mca_ptl_base_reliable_hdr_t *frag_header,
mca_ptl_base_header_t *frag_header,
mca_pml_comm_t *ptl_comm);
mca_ptl_base_recv_request_t *check_specific_receives_for_match(
mca_ptl_base_reliable_hdr_t *frag_header,
mca_ptl_base_header_t *frag_header,
mca_pml_comm_t *ptl_comm);
mca_ptl_base_recv_request_t *check_specific_and_wild_receives_for_match(
mca_ptl_base_reliable_hdr_t *frag_header,
mca_ptl_base_header_t *frag_header,
mca_pml_comm_t *ptl_comm);
void lam_check_cantmatch_for_match(lam_list_t *additional_matches,

Просмотреть файл

@ -16,7 +16,7 @@ extern lam_class_info_t mca_ptl_base_recv_frag_cls;
typedef struct {
mca_ptl_base_frag_t super;
/* matched receve request corresponding to this fragment */
mca_ptl_base_recv_request_t *matched_recv;
mca_ptl_base_recv_request_t *frag_match;
} mca_ptl_base_recv_frag_t;

Просмотреть файл

@ -13,6 +13,8 @@ extern lam_class_info_t mca_ptl_base_send_frag_cls;
struct mca_ptl_base_send_frag_t {
mca_ptl_base_frag_t super;
struct mca_ptl_base_send_request_t *frag_request;
unsigned char* frag_data;
size_t frag_size;
};
typedef struct mca_ptl_base_send_frag_t mca_ptl_base_send_frag_t;

Просмотреть файл

@ -11,22 +11,24 @@
#include "mca/mpi/pml/base/pml_base_request.h"
extern lam_class_info_t mca_ptl_base_send_request_cls;
struct mca_ptl_base_send_request_t {
/* request object - common data structure for use by wait/test */
mca_pml_base_request_t super;
/* allow send request to be placed on ack list */
lam_list_item_t req_ack_item;
/* pointer to user data */
void *req_data;
unsigned char *req_data;
/* size of send/recv in bytes */
size_t req_length;
/* number of bytes that have already been assigned to a fragment */
size_t req_bytes_fragmented;
size_t req_offset;
/* number of fragments that have been allocated */
size_t req_frags;
/* number of bytes that have been sent */
size_t req_bytes_sent;
/* number of bytes that have been acked */
size_t req_bytes_acked;
/* number of fragments that have been allocated */
size_t req_frags_allocated;
/* clear to send flag */
bool req_clear_to_send;
/* type of send */
@ -45,7 +47,7 @@ void mca_ptl_base_send_request_init(mca_ptl_base_send_request_t*);
void mca_ptl_base_send_request_destroy(mca_ptl_base_send_request_t*);
static inline void mca_ptl_base_send_request_rinit(
static inline void mca_ptl_base_send_request_reinit(
mca_ptl_base_send_request_t *request,
void *data,
size_t length,
@ -59,9 +61,10 @@ static inline void mca_ptl_base_send_request_rinit(
request->req_data = data;
request->req_length = length;
request->req_clear_to_send = false;
request->req_bytes_fragmented = 0;
request->req_offset = 0;
request->req_frags = 0;
request->req_bytes_sent = 0;
request->req_bytes_acked = 0;
request->req_frags_allocated = 0;
request->req_send_mode = sendmode;
request->super.req_datatype = datatype;
request->super.req_peer = peer;

Просмотреть файл

@ -15,5 +15,12 @@ libmca_ptl_tcp_la_SOURCES = \
ptl_tcp.c \
ptl_tcp.h \
ptl_tcp_peer.c \
ptl_tcp_peer.h \
ptl_tcp_proc.c \
ptl_tcp_proc.h \
ptl_tcp_module.c \
ptl_tcp_send.c
ptl_tcp_recvfrag.c \
ptl_tcp_recvfrag.h \
ptl_tcp_send.c \
ptl_tcp_sendfrag.c \
ptl_tcp_sendfrag.h

Просмотреть файл

@ -7,7 +7,11 @@
#include "lam/util/if.h"
#include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h"
#include "mca/lam/base/mca_base_module_exchange.h"
#include "ptl_tcp.h"
#include "ptl_tcp_addr.h"
#include "ptl_tcp_peer.h"
#include "ptl_tcp_proc.h"
mca_ptl_tcp_t mca_ptl_tcp = {
@ -37,19 +41,57 @@ int mca_ptl_tcp_create(int if_index)
mca_ptl_tcp_module.tcp_ptls[mca_ptl_tcp_module.tcp_num_ptls++] = ptl;
/* initialize the ptl */
ptl->tcp_ifindex = if_index;
lam_ifindextoaddr(if_index, (struct sockaddr*)&ptl->tcp_addr, sizeof(ptl->tcp_addr));
ptl->ptl_ifindex = if_index;
lam_ifindextoaddr(if_index, (struct sockaddr*)&ptl->ptl_ifaddr, sizeof(ptl->ptl_ifaddr));
lam_ifindextomask(if_index, (struct sockaddr*)&ptl->ptl_ifmask, sizeof(ptl->ptl_ifmask));
return LAM_SUCCESS;
}
int mca_ptl_tcp_add_proc(struct mca_ptl_t* ptl, struct lam_proc_t *proc, struct mca_ptl_peer_t** ptl_peer)
int mca_ptl_tcp_add_proc(struct mca_ptl_t* ptl, struct lam_proc_t *lam_proc, struct mca_ptl_peer_t** peer_ret)
{
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_create(lam_proc);
mca_ptl_peer_t* ptl_peer;
int rc;
if(NULL == ptl_proc)
return LAM_ERR_OUT_OF_RESOURCE;
/*
* Check to make sure that the peer has at least as many interface addresses
* exported as we are trying to use. If not, then don't bind this PTL instance
* to the proc.
*/
THREAD_LOCK(&ptl_proc->proc_lock);
if(ptl_proc->proc_addr_count == ptl_proc->proc_peer_count) {
THREAD_UNLOCK(&ptl_proc->proc_lock);
return LAM_ERR_UNREACH;
}
/* The ptl_proc datastructure is shared by all TCP PTL instances that are trying
* to reach this destination. Cache the peer instance on the ptl_proc.
*/
ptl_peer = OBJ_CREATE(mca_ptl_peer_t, &mca_ptl_peer_cls);
if(NULL == ptl_peer) {
THREAD_UNLOCK(&ptl_proc->proc_lock);
return LAM_ERR_OUT_OF_RESOURCE;
}
ptl_peer->peer_ptl = (mca_ptl_tcp_t*)ptl;
rc = mca_ptl_tcp_proc_insert(ptl_proc, ptl_peer);
if(rc != LAM_SUCCESS) {
OBJ_RELEASE(ptl_peer);
THREAD_UNLOCK(&ptl_proc->proc_lock);
return rc;
}
THREAD_UNLOCK(&ptl_proc->proc_lock);
*peer_ret = ptl_peer;
return LAM_SUCCESS;
}
int mca_ptl_tcp_del_proc(struct mca_ptl_t* ptl, struct lam_proc_t *proc, struct mca_ptl_peer_t* ptl_peer)
{
OBJ_RELEASE(ptl_peer);
return LAM_SUCCESS;
}

Просмотреть файл

@ -10,6 +10,7 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include "lam/util/reactor.h"
#include "lam/mem/free_list.h"
#include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h"
@ -23,11 +24,17 @@ struct mca_ptl_tcp_module_1_0_0_t {
struct mca_ptl_tcp_t** tcp_ptls;
size_t tcp_num_ptls; /**< number of ptls actually used */
size_t tcp_max_ptls; /**< maximum number of ptls - available kernel ifs */
lam_reactor_t tcp_reactor;
int tcp_listen;
unsigned short tcp_port;
char* tcp_if_include; /**< comma seperated list of interface to include */
char* tcp_if_exclude; /**< comma seperated list of interface to exclude */
int tcp_free_list_num; /**< initial size of free lists */
int tcp_free_list_max; /**< maximum size of free lists */
int tcp_free_list_inc; /**< number of elements to alloc when growing free lists */
lam_reactor_t tcp_reactor;
lam_free_list_t tcp_send_requests;
lam_free_list_t tcp_send_frags;
lam_free_list_t tcp_recv_frags;
};
typedef struct mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_1_0_0_t;
typedef struct mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_t;
@ -54,9 +61,10 @@ extern void mca_ptl_tcp_module_progress(
*/
struct mca_ptl_tcp_t {
mca_ptl_t super;
int tcp_ifindex;
struct sockaddr_in tcp_addr;
mca_ptl_t super;
int ptl_ifindex;
struct sockaddr_in ptl_ifaddr;
struct sockaddr_in ptl_ifmask;
};
typedef struct mca_ptl_tcp_t mca_ptl_tcp_t;

22
src/mca/mpi/ptl/tcp/src/ptl_tcp_addr.h Обычный файл
Просмотреть файл

@ -0,0 +1,22 @@
/* @file
*
* $HEADER$
*/
#ifndef MCA_PTL_TCP_ADDR_H
#define MCA_PTL_TCP_ADDR_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
struct mca_ptl_tcp_addr_t {
struct in_addr addr_inet;
in_port_t addr_port;
unsigned short addr_inuse;
};
typedef struct mca_ptl_tcp_addr_t mca_ptl_tcp_addr_t;
#endif

Просмотреть файл

@ -2,6 +2,7 @@
* $HEADER$
*/
#include <errno.h>
#include <unistd.h>
#include "lam/constants.h"
#include "lam/util/if.h"
@ -10,9 +11,15 @@
#include "lam/mem/malloc.h"
#include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/ptl/base/ptl_base_sendreq.h"
#include "mca/lam/base/mca_base_param.h"
#include "mca/lam/base/mca_base_module_exchange.h"
#include "ptl_tcp.h"
#include "ptl_tcp_addr.h"
#include "ptl_tcp_proc.h"
#include "ptl_tcp_recvfrag.h"
#include "ptl_tcp_sendfrag.h"
mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = {
{
@ -51,14 +58,15 @@ mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = {
* data structure for receiving reactor callbacks
*/
static void mca_ptl_tcp_module_recv(int sd, void*);
static void mca_ptl_tcp_module_send(int sd, void*);
static void mca_ptl_tcp_module_except(int sd, void*);
static void mca_ptl_tcp_module_recv_handler(void*, int sd);
static void mca_ptl_tcp_module_send_handler(void*, int sd);
static void mca_ptl_tcp_module_except_handler(void*, int sd);
static lam_reactor_listener_t mca_ptl_tcp_module_listener = {
mca_ptl_tcp_module_recv,
mca_ptl_tcp_module_send,
mca_ptl_tcp_module_except,
mca_ptl_tcp_module_recv_handler,
mca_ptl_tcp_module_send_handler,
mca_ptl_tcp_module_except_handler,
};
@ -98,6 +106,12 @@ int mca_ptl_tcp_module_open(void)
mca_ptl_tcp_param_register_string("if-include", "");
mca_ptl_tcp_module.tcp_if_exclude =
mca_ptl_tcp_param_register_string("if-exclude", "");
mca_ptl_tcp_module.tcp_free_list_num =
mca_ptl_tcp_param_register_int("free-list-num", 256);
mca_ptl_tcp_module.tcp_free_list_max =
mca_ptl_tcp_param_register_int("free-list-max", -1);
mca_ptl_tcp_module.tcp_free_list_inc =
mca_ptl_tcp_param_register_int("free-list-inc", 256);
mca_ptl_tcp.super.ptl_exclusivity =
mca_ptl_tcp_param_register_int("exclusivity", 0);
mca_ptl_tcp.super.ptl_first_frag_size =
@ -201,25 +215,20 @@ static int mca_ptl_tcp_module_create_listen(void)
}
/* resolve system assignend port */
#if defined(__linux__)
socklen_t addrlen = sizeof(struct sockaddr_in);
#else
int addrlen = sizeof(struct sockaddr_in);
#endif
lam_socklen_t addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&inaddr, &addrlen) < 0) {
lam_output(0, "mca_ptl_tcp_module_init: getsockname() failed with errno=%d", errno);
return LAM_ERROR;
}
mca_ptl_tcp_module.tcp_port = inaddr.sin_port;
/* initialize reactor and register listen port */
lam_reactor_init(&mca_ptl_tcp_module.tcp_reactor);
/* register listen port */
lam_reactor_insert(
&mca_ptl_tcp_module.tcp_reactor,
mca_ptl_tcp_module.tcp_listen,
&mca_ptl_tcp_module_listener,
0,
LAM_NOTIFY_RECV|LAM_NOTIFY_EXCEPT);
LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT);
return LAM_SUCCESS;
}
@ -227,22 +236,21 @@ static int mca_ptl_tcp_module_create_listen(void)
/*
* Register TCP module addressing information. The MCA framework
* will make this available to all peers.
*
* FIX: just pass around sockaddr_in for now
*/
static int mca_ptl_tcp_module_exchange(void)
{
size_t i;
struct sockaddr_in* addrs = (struct sockaddr_in*)LAM_MALLOC
(mca_ptl_tcp_module.tcp_num_ptls * sizeof(struct sockaddr_in));
mca_ptl_tcp_addr_t *addrs = (mca_ptl_tcp_addr_t*)LAM_MALLOC
(mca_ptl_tcp_module.tcp_num_ptls * sizeof(mca_ptl_tcp_addr_t));
for(i=0; i<mca_ptl_tcp_module.tcp_num_ptls; i++) {
mca_ptl_tcp_t* ptl = mca_ptl_tcp_module.tcp_ptls[i];
addrs[i] = ptl->tcp_addr;
addrs[i].sin_port = mca_ptl_tcp_module.tcp_listen;
addrs[i].addr_inet = ptl->ptl_ifaddr.sin_addr;
addrs[i].addr_port = mca_ptl_tcp_module.tcp_listen;
addrs[i].addr_inuse = 0;
}
return mca_base_modex_send(&mca_ptl_tcp_module.super.ptlm_version,
addrs, sizeof(struct sockaddr_in),mca_ptl_tcp_module.tcp_num_ptls);
addrs, sizeof(mca_ptl_tcp_t),mca_ptl_tcp_module.tcp_num_ptls);
}
/*
@ -258,6 +266,37 @@ mca_ptl_t** mca_ptl_tcp_module_init(int* num_ptls, int* thread_min, int* thread_
*thread_min = MPI_THREAD_MULTIPLE;
*thread_max = MPI_THREAD_MULTIPLE;
/* initialize containers */
lam_reactor_init(&mca_ptl_tcp_module.tcp_reactor);
/* initialize free lists */
lam_free_list_init(&mca_ptl_tcp_module.tcp_send_requests);
lam_free_list_init_with(&mca_ptl_tcp_module.tcp_send_requests,
sizeof(mca_ptl_base_send_request_t) + sizeof(mca_ptl_tcp_send_frag_t),
&mca_ptl_base_send_request_cls,
mca_ptl_tcp_module.tcp_free_list_num,
mca_ptl_tcp_module.tcp_free_list_max,
mca_ptl_tcp_module.tcp_free_list_inc,
NULL); /* use default allocator */
lam_free_list_init(&mca_ptl_tcp_module.tcp_send_frags);
lam_free_list_init_with(&mca_ptl_tcp_module.tcp_send_frags,
sizeof(mca_ptl_tcp_send_frag_t),
&mca_ptl_tcp_send_frag_cls,
mca_ptl_tcp_module.tcp_free_list_num,
mca_ptl_tcp_module.tcp_free_list_max,
mca_ptl_tcp_module.tcp_free_list_inc,
NULL); /* use default allocator */
lam_free_list_init(&mca_ptl_tcp_module.tcp_recv_frags);
lam_free_list_init_with(&mca_ptl_tcp_module.tcp_recv_frags,
sizeof(mca_ptl_tcp_recv_frag_t),
&mca_ptl_tcp_recv_frag_cls,
mca_ptl_tcp_module.tcp_free_list_num,
mca_ptl_tcp_module.tcp_free_list_max,
mca_ptl_tcp_module.tcp_free_list_inc,
NULL); /* use default allocator */
/* create a PTL TCP module for selected interfaces */
if(mca_ptl_tcp_module_create_instances() != LAM_SUCCESS)
return 0;
@ -291,17 +330,11 @@ void mca_ptl_tcp_module_progress(mca_ptl_base_tstamp_t tstamp)
* Called by mca_ptl_tcp_module_recv() when the TCP listen
* socket has pending connection requests. Accept incoming
* requests and queue for completion of the connection handshake.
* We wait for the peer to send a 4 byte global process ID(rank)
* to complete the connection.
*/
static void mca_ptl_tcp_module_accept(void)
{
#if defined(__linux__)
socklen_t addrlen = sizeof(struct sockaddr_in);
#else
int addrlen = sizeof(struct sockaddr_in);
#endif
lam_socklen_t addrlen = sizeof(struct sockaddr_in);
while(true) {
struct sockaddr_in addr;
int sd = accept(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&addr, &addrlen);
@ -312,37 +345,96 @@ static void mca_ptl_tcp_module_accept(void)
lam_output(0, "mca_ptl_tcp_module_accept: accept() failed with errno %d.", errno);
return;
}
/* wait for receipt of data to complete the connect */
/* wait for receipt of peers process identifier to complete this connection */
lam_reactor_insert(
&mca_ptl_tcp_module.tcp_reactor,
sd,
&mca_ptl_tcp_module_listener,
0,
LAM_NOTIFY_RECV|LAM_NOTIFY_EXCEPT);
LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT);
}
}
/*
* Called by reactor when registered socket is ready to read.
* Called by reactor when there is data available on the registered
* socket to recv.
*/
static void mca_ptl_tcp_module_recv(int sd, void* user)
static void mca_ptl_tcp_module_recv_handler(void* user, int sd)
{
void* guid;
uint32_t size;
struct sockaddr_in addr;
lam_socklen_t addr_len = sizeof(addr);
/* accept new connections on the listen socket */
if(mca_ptl_tcp_module.tcp_listen == sd) {
mca_ptl_tcp_module_accept();
return;
}
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_NOTIFY_ALL);
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_ALL);
/* recv the size of the process identifier */
int retval = recv(sd, &size, sizeof(size), 0);
if(retval == 0) {
close(sd);
return;
}
if(retval != sizeof(size)) {
lam_output(0, "mca_ptl_tcp_module_recv_handler: recv() return value %d != %d, errno = %d",
retval, sizeof(size), errno);
close(sd);
return;
}
/* recv the identifier */
size = ntohl(size);
guid = LAM_MALLOC(size);
if(guid == 0) {
close(sd);
return;
}
retval = recv(sd, guid, size, 0);
if(retval != size) {
lam_output(0, "mca_ptl_tcp_module_recv_handler: recv() return value %d != %d, errno = %d",
retval, sizeof(size), errno);
close(sd);
return;
}
/* lookup the corresponding process */
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_lookup(guid, size);
if(NULL == ptl_proc) {
lam_output(0, "mca_ptl_tcp_module_recv_handler: unable to locate process");
close(sd);
return;
}
/* lookup peer address */
if(getpeername(sd, (struct sockaddr*)&addr, &addr_len) != 0) {
lam_output(0, "mca_ptl_tcp_module_recv_handler: getpeername() failed with errno=%d", errno);
close(sd);
return;
}
/* are there any existing peer instances will to accept this connection */
if(mca_ptl_tcp_proc_accept(ptl_proc, &addr, sd) == false) {
close(sd);
return;
}
}
static void mca_ptl_tcp_module_send(int sd, void* user)
static void mca_ptl_tcp_module_send_handler(void* user, int sd)
{
lam_output(0, "mca_ptl_tcp_module_send: received invalid event for descriptor(%d)", sd);
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_ALL);
}
static void mca_ptl_tcp_module_except(int sd, void* user)
static void mca_ptl_tcp_module_except_handler(void* user, int sd)
{
lam_output(0, "mca_ptl_tcp_module_except: received invalid event for descriptor(%d)", sd);
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_ALL);
}

Просмотреть файл

@ -1,35 +1,516 @@
/*
* $HEADER$
*/
#include <stdlib.h>
#include <unistd.h>
#include <sys/errno.h>
#include <sys/types.h>
#include <sys/fcntl.h>
#include "lam/types.h"
#include "ptl_tcp.h"
#include "ptl_tcp_addr.h"
#include "ptl_tcp_peer.h"
#include "ptl_tcp_proc.h"
lam_class_info_t mca_ptl_addr_cls = {
"mca_ptl_addr_t",
lam_class_info_t mca_ptl_peer_cls = {
"mca_ptl_peer_t",
&lam_list_cls,
(class_init_t)mca_ptl_tcp_peer_init,
(class_destroy_t)mca_ptl_tcp_peer_destroy
};
static int mca_ptl_tcp_peer_start_connect(mca_ptl_peer_t*);
static void mca_ptl_tcp_peer_close_i(mca_ptl_peer_t*);
static void mca_ptl_tcp_peer_connected(mca_ptl_peer_t*);
static void mca_ptl_tcp_peer_recv_handler(mca_ptl_peer_t*, int sd);
static void mca_ptl_tcp_peer_send_handler(mca_ptl_peer_t*, int sd);
static void mca_ptl_tcp_peer_except_handler(mca_ptl_peer_t*, int sd);
static lam_reactor_listener_t mca_ptl_tcp_peer_listener = {
(lam_rl_recv_handler_fn_t)mca_ptl_tcp_peer_recv_handler,
(lam_rl_send_handler_fn_t)mca_ptl_tcp_peer_send_handler,
(lam_rl_except_handler_fn_t)mca_ptl_tcp_peer_except_handler
};
/*
* Initialize state of the peer instance.
*/
void mca_ptl_tcp_peer_init(mca_ptl_peer_t* ptl_peer)
{
SUPER_INIT(ptl_peer, &lam_list_cls);
ptl_peer->tcp_state = MCA_PTL_TCP_CLOSED;
ptl_peer->tcp_sd = -1;
ptl_peer->peer_ptl = 0;
ptl_peer->peer_proc = 0;
ptl_peer->peer_addr = 0;
ptl_peer->peer_sd = -1;
ptl_peer->peer_send_frag = 0;
ptl_peer->peer_recv_frag = 0;
ptl_peer->peer_state = MCA_PTL_TCP_CLOSED;
ptl_peer->peer_retries = 0;
lam_list_init(&ptl_peer->peer_frags);
lam_mutex_init(&ptl_peer->peer_lock);
}
/*
* Cleanup any resources held by the peer.
*/
void mca_ptl_tcp_peer_destroy(mca_ptl_peer_t* ptl_peer)
{
mca_ptl_tcp_peer_close(ptl_peer);
mca_ptl_tcp_proc_remove(ptl_peer->peer_proc, ptl_peer);
mca_ptl_tcp_peer_close_i(ptl_peer);
SUPER_DESTROY(ptl_peer, &lam_list_cls);
}
void mca_ptl_tcp_peer_close(mca_ptl_peer_t* ptl_peer)
/*
* Attempt to send a fragment using a given peer. If the peer is not connected,
* queue the fragment and start the connection as required.
*/
int mca_ptl_tcp_peer_send(mca_ptl_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t* frag)
{
if(ptl_peer->tcp_sd >= 0) {
close(ptl_peer->tcp_sd);
ptl_peer->tcp_sd = -1;
int rc = LAM_SUCCESS;
THREAD_LOCK(&ptl_peer->peer_lock);
switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECTING:
case MCA_PTL_TCP_CONNECT_ACK:
case MCA_PTL_TCP_CLOSED:
lam_list_append(&ptl_peer->peer_frags, (lam_list_item_t*)frag);
if(ptl_peer->peer_state == MCA_PTL_TCP_CLOSED)
rc = mca_ptl_tcp_peer_start_connect(ptl_peer);
break;
case MCA_PTL_TCP_FAILED:
rc = LAM_ERR_UNREACH;
break;
case MCA_PTL_TCP_CONNECTED:
if (NULL != ptl_peer->peer_send_frag)
lam_list_append(&ptl_peer->peer_frags, (lam_list_item_t*)frag);
else {
if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd) == false) {
ptl_peer->peer_send_frag = frag;
rc = lam_reactor_insert(
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_SEND);
}
}
break;
}
ptl_peer->tcp_state = MCA_PTL_TCP_CLOSED;
THREAD_UNLOCK(&ptl_peer->peer_lock);
return rc;
}
/*
* Check the state of this peer. If the incoming connection request matches
* our peers address, check the state of our connection:
* (1) if a connection has not been attempted, accept the connection
* (2) if a connection has not been established, and the peers process identifier
* is less than the local process, accept the connection
* otherwise, reject the connection and continue with the current connection
*/
bool mca_ptl_tcp_peer_accept(mca_ptl_peer_t* ptl_peer, struct sockaddr_in* addr, int sd)
{
mca_ptl_tcp_addr_t* ptl_addr;
mca_ptl_tcp_proc_t* this_proc = mca_ptl_tcp_proc_local();
THREAD_LOCK(&ptl_peer->peer_lock);
if((ptl_addr = ptl_peer->peer_addr) != NULL &&
ptl_addr->addr_inet.s_addr == addr->sin_addr.s_addr) {
mca_ptl_tcp_proc_t *peer_proc = ptl_peer->peer_proc;
if((ptl_peer->peer_sd < 0) ||
(ptl_peer->peer_state != MCA_PTL_TCP_CONNECTED &&
peer_proc->proc_lam->proc_vpid < this_proc->proc_lam->proc_vpid)) {
mca_ptl_tcp_peer_close_i(ptl_peer);
ptl_peer->peer_sd = sd;
mca_ptl_tcp_peer_connected(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_lock);
return true;
}
}
THREAD_UNLOCK(&ptl_peer->peer_lock);
return false;
}
/*
* An external I/F to close a peer. Called in the event of failure
* on read or write. Note that this must acquire the peer lock
* prior to delegating to the internal routine.
*/
void mca_ptl_tcp_peer_close(mca_ptl_peer_t* ptl_peer)
{
THREAD_LOCK(&ptl_peer->peer_lock);
mca_ptl_tcp_peer_close_i(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_lock);
}
/*
* Remove the socket descriptor from the reactor, close it,
* and update the peer state to reflect the connection has
* been closed.
*/
static void mca_ptl_tcp_peer_close_i(mca_ptl_peer_t* ptl_peer)
{
if(ptl_peer->peer_sd >= 0) {
lam_reactor_remove(
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
LAM_REACTOR_NOTIFY_ALL);
close(ptl_peer->peer_sd);
ptl_peer->peer_sd = -1;
}
ptl_peer->peer_state = MCA_PTL_TCP_CLOSED;
ptl_peer->peer_retries++;
}
/*
*
*/
static void mca_ptl_tcp_peer_connected(mca_ptl_peer_t* ptl_peer)
{
int flags = LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT;
ptl_peer->peer_state = MCA_PTL_TCP_CONNECTED;
ptl_peer->peer_retries = 0;
if(lam_list_get_size(&ptl_peer->peer_frags) > 0) {
if(NULL == ptl_peer->peer_send_frag)
ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*)lam_list_remove_first(&ptl_peer->peer_frags);
flags |= LAM_REACTOR_NOTIFY_SEND;
}
lam_reactor_insert(
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
flags);
}
/*
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_ptl_tcp_peer_recv_blocking(mca_ptl_peer_t* ptl_peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = recv(ptl_peer->peer_sd, ptr+cnt, size-cnt, 0);
/* remote closed connection */
if(retval == 0) {
mca_ptl_tcp_peer_close_i(ptl_peer);
return -1;
}
/* socket is non-blocking so handle errors */
if(retval < 0) {
if(errno == EINTR)
continue;
if(errno != EAGAIN && errno != EWOULDBLOCK) {
lam_output(0, "mca_ptl_tcp_peer_recv_blocking: recv() failed with errno=%d\n",errno);
mca_ptl_tcp_peer_close_i(ptl_peer);
return -1;
}
}
cnt += retval;
}
return cnt;
}
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_ptl_tcp_peer_send_blocking(mca_ptl_peer_t* ptl_peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = send(ptl_peer->peer_sd, ptr+cnt, size-cnt, 0);
if(retval < 0) {
if(errno == EINTR)
continue;
if(errno != EAGAIN && errno != EWOULDBLOCK) {
lam_output(0, "mca_ptl_tcp_peer_send_blocking: send() failed with errno=%d\n",errno);
mca_ptl_tcp_peer_close_i(ptl_peer);
return -1;
}
}
cnt += retval;
}
return cnt;
}
/*
* Receive the peers globally unique process identification from a newly
* connected socket and verify the expected response. If so, move the
* socket to a connected state.
*/
static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_peer_t* ptl_peer)
{
uint32_t size_n, size_h;
void* guid;
mca_ptl_tcp_proc_t* ptl_proc = ptl_peer->peer_proc;
if(mca_ptl_tcp_peer_recv_blocking(ptl_peer, &size_n, sizeof(size_n)) != size_n)
return LAM_ERR_UNREACH;
size_h = ntohl(size_n);
guid = LAM_MALLOC(size_h);
if(NULL == guid)
return LAM_ERR_OUT_OF_RESOURCE;
if(mca_ptl_tcp_peer_recv_blocking(ptl_peer, guid, size_h) != size_h) {
LAM_FREE(guid);
return LAM_ERR_UNREACH;
}
/* compare this to the expected values */
if(size_h != ptl_proc->proc_guid_size || memcmp(ptl_proc->proc_guid, guid, size_h) != 0) {
lam_output(0, "mca_ptl_tcp_peer_connect: received unexpected process identifier");
mca_ptl_tcp_peer_close_i(ptl_peer);
return LAM_ERR_UNREACH;
}
/* connected */
mca_ptl_tcp_peer_connected(ptl_peer);
return LAM_SUCCESS;
}
/*
* Send the globally unique identifier for this process to a peer on
* a newly connected socket.
*/
static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_peer_t* ptl_peer)
{
/* send process identifier to remote peer */
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_local();
uint32_t size_n = htonl(ptl_proc->proc_guid_size);
if(mca_ptl_tcp_peer_send_blocking(ptl_peer, &size_n, sizeof(size_n)) != sizeof(size_n) ||
mca_ptl_tcp_peer_send_blocking(ptl_peer, ptl_proc->proc_guid, ptl_proc->proc_guid_size) !=
ptl_proc->proc_guid_size) {
return LAM_ERR_UNREACH;
}
return LAM_SUCCESS;
}
/*
* Start a connection to the peer. This will likely not complete,
* as the socket is set to non-blocking, so register with the reactor
* for notification of connect completion. On connection we send
* our globally unique process identifier to the peer and wait for
* the peers response.
*/
static int mca_ptl_tcp_peer_start_connect(mca_ptl_peer_t* ptl_peer)
{
int rc;
ptl_peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
if (ptl_peer->peer_sd < 0) {
ptl_peer->peer_retries++;
return LAM_ERR_UNREACH;
}
/* setup the socket as non-blocking */
int flags;
if((flags = fcntl(ptl_peer->peer_sd, F_GETFL, 0)) < 0) {
lam_output(0, "mca_ptl_tcp_peer_connect: fcntl(F_GETFL) failed with errno=%d\n", errno);
} else {
flags |= O_NONBLOCK;
if(fcntl(ptl_peer->peer_sd, F_SETFL, flags) < 0)
lam_output(0, "mca_ptl_tcp_peer_connect: fcntl(F_SETFL) failed with errno=%d\n", errno);
}
/* start the connect - will likely fail with EINPROGRESS */
if(connect(ptl_peer->peer_sd, (struct sockaddr*)&ptl_peer->peer_addr->addr_inet,
sizeof(struct sockaddr_in)) < 0) {
/* non-blocking so wait for completion */
if(errno == EINPROGRESS) {
ptl_peer->peer_state = MCA_PTL_TCP_CONNECTING;
rc = lam_reactor_insert(
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_SEND|LAM_REACTOR_NOTIFY_EXCEPT);
return rc;
}
mca_ptl_tcp_peer_close_i(ptl_peer);
ptl_peer->peer_retries++;
return LAM_ERR_UNREACH;
}
/* send our globally unique process identifier to the peer */
if((rc = mca_ptl_tcp_peer_send_connect_ack(ptl_peer)) == LAM_SUCCESS) {
ptl_peer->peer_state = MCA_PTL_TCP_CONNECT_ACK;
rc = lam_reactor_insert(
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT);
} else {
mca_ptl_tcp_peer_close_i(ptl_peer);
}
return rc;
}
/*
* Check the status of the connection. If the connection failed, will retry
* later. Otherwise, send this processes identifier to the peer on the
* newly connected socket.
*/
static void mca_ptl_tcp_peer_complete_connect(mca_ptl_peer_t* ptl_peer)
{
int so_error = 0;
lam_socklen_t so_length = sizeof(so_error);
/* unregister for event notifications */
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, ptl_peer->peer_sd, LAM_REACTOR_NOTIFY_ALL);
/* check connect completion status */
if(getsockopt(ptl_peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) {
lam_output(0, "mca_ptl_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n", errno);
mca_ptl_tcp_peer_close_i(ptl_peer);
return;
}
if(so_error == EINPROGRESS) {
lam_reactor_insert(
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_SEND|LAM_REACTOR_NOTIFY_EXCEPT);
return;
}
if(so_error != 0) {
lam_output(0, "mca_ptl_tcp_peer_complete_connect: connect() failedd with errno=%d\n", so_error);
mca_ptl_tcp_peer_close_i(ptl_peer);
return;
}
if(mca_ptl_tcp_peer_send_connect_ack(ptl_peer) == LAM_SUCCESS) {
ptl_peer->peer_state = MCA_PTL_TCP_CONNECT_ACK;
lam_reactor_insert(
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT);
} else {
mca_ptl_tcp_peer_close_i(ptl_peer);
}
}
/*
* A file descriptor is available/ready for recv. Check the state
* of the socket and take the appropriate action.
*/
static void mca_ptl_tcp_peer_recv_handler(mca_ptl_peer_t* ptl_peer, int sd)
{
THREAD_LOCK(&ptl_peer->peer_lock);
switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECT_ACK:
{
mca_ptl_tcp_peer_recv_connect_ack(ptl_peer);
break;
}
case MCA_PTL_TCP_CONNECTED:
{
mca_ptl_tcp_recv_frag_t* recv_frag = ptl_peer->peer_recv_frag;
if(NULL == recv_frag) {
int rc;
recv_frag = (mca_ptl_tcp_recv_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_recv_frags, &rc);
if(recv_frag == 0) {
THREAD_UNLOCK(&ptl_peer->peer_lock);
return;
}
mca_ptl_tcp_recv_frag_reinit(recv_frag, ptl_peer);
}
/* check for completion of non-blocking recv on the current fragment */
if(mca_ptl_tcp_recv_frag_handler(recv_frag, sd) == false)
ptl_peer->peer_recv_frag = recv_frag;
else
ptl_peer->peer_recv_frag = 0;
break;
}
default:
{
lam_output(0, "mca_ptl_tcp_peer_recv_handler: invalid socket state(%d)", ptl_peer->peer_state);
mca_ptl_tcp_peer_close_i(ptl_peer);
break;
}
}
THREAD_UNLOCK(&ptl_peer->peer_lock);
}
/*
* A file descriptor is available/ready for send. Check the state
* of the socket and take the appropriate action.
*/
static void mca_ptl_tcp_peer_send_handler(mca_ptl_peer_t* ptl_peer, int sd)
{
THREAD_LOCK(&ptl_peer->peer_lock);
switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECTING:
mca_ptl_tcp_peer_complete_connect(ptl_peer);
break;
case MCA_PTL_TCP_CONNECTED:
/* complete the current send */
do {
if(mca_ptl_tcp_send_frag_handler(ptl_peer->peer_send_frag, ptl_peer->peer_sd) == false)
break;
/* progress any pending sends */
ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*)
lam_list_remove_first(&ptl_peer->peer_frags);
} while (NULL != ptl_peer->peer_send_frag);
/* if nothing else to do unregister for send event notifications */
if(NULL == ptl_peer->peer_send_frag) {
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_SEND);
}
break;
default:
lam_output(0, "mca_ptl_tcp_peer_send_handler: invalid connection state (%d)",
ptl_peer->peer_state);
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_SEND);
break;
}
THREAD_UNLOCK(&ptl_peer->peer_lock);
}
/*
* A file descriptor is in an erroneous state. Close the connection
* and update the peers state.
*/
static void mca_ptl_tcp_peer_except_handler(mca_ptl_peer_t* ptl_peer, int sd)
{
lam_output(0, "mca_ptl_tcp_peer_except_handler: closing connection");
mca_ptl_tcp_peer_close_i(ptl_peer);
}

Просмотреть файл

@ -13,6 +13,8 @@
#include "lam/util/reactor.h"
#include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h"
#include "ptl_tcp_recvfrag.h"
#include "ptl_tcp_sendfrag.h"
typedef enum {
@ -29,23 +31,31 @@ extern lam_class_info_t mca_ptl_peer_cls;
/**
* An abstraction that represents a connection to a peer process.
* An instance of mca_ptl_peer_t is associated w/ each process
* in the group at startup. However, connections to the peer
* and PTL pair at startup. However, connections to the peer
* are established dynamically on an as-needed basis:
*/
struct mca_ptl_peer_t {
lam_list_item_t super;
struct lam_proc_t *tcp_proc;
struct sockaddr_in tcp_peer;
int tcp_sd;
mca_ptl_tcp_state_t tcp_state;
lam_list_item_t super;
struct mca_ptl_tcp_t* peer_ptl;
struct mca_ptl_tcp_proc_t* peer_proc;
struct mca_ptl_tcp_addr_t* peer_addr;
int peer_sd;
mca_ptl_tcp_send_frag_t* peer_send_frag;
mca_ptl_tcp_recv_frag_t* peer_recv_frag;
mca_ptl_tcp_state_t peer_state;
size_t peer_retries;
lam_list_t peer_frags;
lam_mutex_t peer_lock;
};
typedef struct mca_ptl_peer_t mca_ptl_peer_t;
void mca_ptl_tcp_peer_init(mca_ptl_peer_t*);
void mca_ptl_tcp_peer_destroy(mca_ptl_peer_t*);
void mca_ptl_tcp_peer_close(mca_ptl_peer_t*);
void mca_ptl_tcp_peer_init(mca_ptl_peer_t*);
int mca_ptl_tcp_peer_send(mca_ptl_peer_t*, mca_ptl_tcp_send_frag_t*);
bool mca_ptl_tcp_peer_accept(mca_ptl_peer_t*, struct sockaddr_in*, int);
#endif

238
src/mca/mpi/ptl/tcp/src/ptl_tcp_proc.c Обычный файл
Просмотреть файл

@ -0,0 +1,238 @@
/*
* $HEADER$
*/
#include "lam/lfc/hash_table.h"
#include "mca/lam/base/mca_base_module_exchange.h"
#include "ptl_tcp.h"
#include "ptl_tcp_addr.h"
#include "ptl_tcp_peer.h"
#include "ptl_tcp_proc.h"
lam_class_info_t mca_ptl_tcp_proc_cls = {
"mca_ptl_tcp_proc_t",
&lam_list_item_cls,
(class_init_t)mca_ptl_tcp_proc_init,
(class_destroy_t)mca_ptl_tcp_proc_destroy
};
static lam_list_t mca_ptl_tcp_procs;
static lam_mutex_t mca_ptl_tcp_proc_mutex;
static mca_ptl_tcp_proc_t* mca_ptl_tcp_proc;
static mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup_lam(lam_proc_t* lam_proc);
void mca_ptl_tcp_proc_init(mca_ptl_tcp_proc_t* proc)
{
static int inited = 0;
if(inited++ == 0) {
lam_list_init(&mca_ptl_tcp_procs);
lam_mutex_init(&mca_ptl_tcp_proc_mutex);
}
SUPER_INIT(proc, &lam_list_item_cls);
proc->proc_lam = 0;
proc->proc_addrs = 0;
proc->proc_addr_count = 0;
proc->proc_peers = 0;
proc->proc_peer_count = 0;
lam_mutex_init(&proc->proc_lock);
/* add to list of all proc instance */
THREAD_LOCK(&mca_ptl_tcp_proc_mutex);
lam_list_append(&mca_ptl_tcp_procs, &proc->super);
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
}
void mca_ptl_tcp_proc_destroy(mca_ptl_tcp_proc_t* proc)
{
/* remove from list of all proc instances */
lam_list_remove(&mca_ptl_tcp_procs, &proc->super);
/* release resources */
if(NULL != proc->proc_peers)
LAM_FREE(proc->proc_peers);
if(NULL != proc->proc_guid)
LAM_FREE(proc->proc_guid);
SUPER_DESTROY(proc, &lam_list_item_cls);
}
/*
* Create a TCP process structure. There is a one-to-one correspondence
* between a lam_proc_t and a mca_ptl_tcp_proc_t instance. We cache additional
* data (specifically the list of mca_ptl_tcp_peer_t instances, and publiched
* addresses) associated w/ a given destination on this datastructure.
*/
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_create(lam_proc_t* lam_proc)
{
int rc;
size_t size = strlen(lam_proc->proc_job) + 1;
uint32_t vpid = htonl(lam_proc->proc_vpid);
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_lookup_lam(lam_proc);
if(ptl_proc != NULL)
return ptl_proc;
ptl_proc = OBJ_CREATE(mca_ptl_tcp_proc_t, &mca_ptl_tcp_proc_cls);
ptl_proc->proc_lam = lam_proc;
/* build a unique identifier (of arbitrary size) to represent the proc */
ptl_proc->proc_guid_size = size + sizeof(uint32_t);
ptl_proc->proc_guid = LAM_MALLOC(ptl_proc->proc_guid_size);
if(ptl_proc->proc_guid == 0) {
OBJ_RELEASE(ptl_proc);
return 0;
}
memcpy(ptl_proc->proc_guid, lam_proc->proc_job, size);
memcpy(ptl_proc->proc_guid+size, &vpid, sizeof(uint32_t));
/* lookup tcp parameters exported by this proc */
rc = mca_base_modex_recv(
&mca_ptl_tcp_module.super.ptlm_version,
lam_proc,
(void**)&ptl_proc->proc_addrs,
&size,
&ptl_proc->proc_addr_count);
if(rc != LAM_SUCCESS) {
lam_output(0, "mca_ptl_tcp_proc_create: mca_base_modex_recv: failed with return value=%d", rc);
OBJ_RELEASE(ptl_proc);
return NULL;
}
if(size != sizeof(mca_ptl_tcp_addr_t)) {
lam_output(0, "mca_ptl_tcp_proc_create: mca_base_modex_recv: size %d != %d",
size, sizeof(mca_ptl_tcp_addr_t));
return NULL;
}
/* allocate space for peer array - one for each exported address */
ptl_proc->proc_peers = (mca_ptl_peer_t**)
LAM_MALLOC(ptl_proc->proc_addr_count * sizeof(mca_ptl_peer_t*));
if(NULL == ptl_proc->proc_peers) {
OBJ_RELEASE(ptl_proc);
return NULL;
}
if(lam_proc == lam_proc_local())
mca_ptl_tcp_proc = ptl_proc;
return ptl_proc;
}
/*
* Look for an existing TCP process instances based on the associated
* lam_proc_t instance.
*/
static mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup_lam(lam_proc_t* lam_proc)
{
mca_ptl_tcp_proc_t* tcp_proc;
THREAD_LOCK(&mca_ptl_tcp_proc_mutex);
for(tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_first(&mca_ptl_tcp_procs);
tcp_proc != (mca_ptl_tcp_proc_t*)lam_list_get_end(&mca_ptl_tcp_procs);
tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_next(tcp_proc)) {
if(tcp_proc->proc_lam == lam_proc) {
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
return tcp_proc;
}
}
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
return NULL;
}
/*
* Look for an existing TCP process instance based on the globally unique
* process identifier.
*/
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup(void *guid, size_t size)
{
mca_ptl_tcp_proc_t* tcp_proc;
THREAD_LOCK(&mca_ptl_tcp_proc_mutex);
for(tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_first(&mca_ptl_tcp_procs);
tcp_proc != (mca_ptl_tcp_proc_t*)lam_list_get_end(&mca_ptl_tcp_procs);
tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_next(tcp_proc)) {
if(tcp_proc->proc_guid_size == size && memcmp(tcp_proc->proc_guid, guid, size) == 0) {
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
return tcp_proc;
}
}
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
return NULL;
}
/*
* Note that this routine must be called with the lock on the process already
* held. Insert a ptl instance into the proc array and assign it an address.
*/
int mca_ptl_tcp_proc_insert(mca_ptl_tcp_proc_t* ptl_proc, mca_ptl_peer_t* ptl_peer)
{
struct mca_ptl_tcp_t *ptl_tcp = ptl_peer->peer_ptl;
size_t i;
/* insert into peer array */
ptl_peer->peer_proc = ptl_proc;
ptl_proc->proc_peers[ptl_proc->proc_peer_count++] = ptl_peer;
/*
* Look through the proc instance for an address that is on the
* directly attached network. If we don't find one, pick the first
* unused address.
*/
for(i=0; i<ptl_proc->proc_addr_count; i++) {
mca_ptl_tcp_addr_t* peer_addr = ptl_proc->proc_addrs + i;
unsigned long net1 = ptl_tcp->ptl_ifaddr.sin_addr.s_addr & ptl_tcp->ptl_ifmask.sin_addr.s_addr;
unsigned long net2 = peer_addr->addr_inet.s_addr & ptl_tcp->ptl_ifmask.sin_addr.s_addr;
if(peer_addr->addr_inuse != 0)
continue;
if(net1 == net2) {
ptl_peer->peer_addr = peer_addr;
break;
} else if(ptl_peer->peer_addr != 0)
ptl_peer->peer_addr = peer_addr;
}
ptl_peer->peer_addr->addr_inuse++;
return LAM_SUCCESS;
}
/*
* Remove a peer from the proc array and indicate the address is
* no longer in use.
*/
int mca_ptl_tcp_proc_remove(mca_ptl_tcp_proc_t* ptl_proc, mca_ptl_peer_t* ptl_peer)
{
size_t i;
THREAD_LOCK(&ptl_proc->proc_lock);
for(i=0; i<ptl_proc->proc_peer_count; i++) {
if(ptl_proc->proc_peers[i] == ptl_peer) {
memmove(&ptl_proc->proc_peers+i,ptl_proc->proc_peers+i+1,
(ptl_proc->proc_peer_count-i)*sizeof(mca_ptl_peer_t*));
}
}
ptl_proc->proc_peer_count--;
ptl_peer->peer_addr->addr_inuse--;
THREAD_UNLOCK(&ptl_proc->proc_lock);
return LAM_SUCCESS;
}
/*
* loop through all available PTLs for one matching the source address
* of the request.
*/
bool mca_ptl_tcp_proc_accept(mca_ptl_tcp_proc_t* ptl_proc, struct sockaddr_in* addr, int sd)
{
size_t i;
THREAD_LOCK(&ptl_proc->proc_lock);
for(i=0; i<ptl_proc->proc_peer_count; i++) {
mca_ptl_peer_t* ptl_peer = ptl_proc->proc_peers[i];
if(mca_ptl_tcp_peer_accept(ptl_peer, addr, sd))
return true;
}
THREAD_UNLOCK(&ptl_proc->proc_lock);
return false;
}

50
src/mca/mpi/ptl/tcp/src/ptl_tcp_proc.h Обычный файл
Просмотреть файл

@ -0,0 +1,50 @@
/* @file
*
* $HEADER$
*/
#ifndef MCA_PTL_TCP_PROC_H
#define MCA_PTL_TCP_PROC_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "lam/lfc/object.h"
#include "mpi/proc/proc.h"
#include "ptl_tcp_peer.h"
extern lam_class_info_t mca_ptl_tcp_proc_cls;
/**
* Represents the state of a remote process and the set of addresses
* that it exports. Also cache an instance or mca_ptl_peer_t for each
* PTL instance that attempts to open a connection to the process.
*/
struct mca_ptl_tcp_proc_t {
lam_list_item_t super;
lam_proc_t *proc_lam;
void* proc_guid;
size_t proc_guid_size;
struct mca_ptl_tcp_addr_t *proc_addrs;
size_t proc_addr_count;
struct mca_ptl_peer_t **proc_peers;
size_t proc_peer_count;
lam_mutex_t proc_lock;
};
typedef struct mca_ptl_tcp_proc_t mca_ptl_tcp_proc_t;
void mca_ptl_tcp_proc_init(mca_ptl_tcp_proc_t*);
void mca_ptl_tcp_proc_destroy(mca_ptl_tcp_proc_t*);
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_create(lam_proc_t* lam_proc);
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup(void *guid, size_t size);
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_local(void);
int mca_ptl_tcp_proc_insert(mca_ptl_tcp_proc_t*, mca_ptl_peer_t*);
int mca_ptl_tcp_proc_remove(mca_ptl_tcp_proc_t*, mca_ptl_peer_t*);
bool mca_ptl_tcp_proc_accept(mca_ptl_tcp_proc_t*, struct sockaddr_in*, int sd);
#endif

176
src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c Обычный файл
Просмотреть файл

@ -0,0 +1,176 @@
/*
* $HEADER$
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/errno.h>
#include "ptl_tcp.h"
#include "ptl_tcp_peer.h"
#include "ptl_tcp_recvfrag.h"
lam_class_info_t mca_ptl_peer_cls = {
"mca_ptl_tcp_recv_frag_t",
&mca_ptl_base_recv_frag_cls,
(class_init_t)mca_ptl_tcp_recv_frag_init,
(class_destroy_t)mca_ptl_tcp_recv_frag_destroy
};
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd);
static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd);
static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd);
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag)
{
SUPER_INIT(frag, &mca_ptl_base_recv_frag_cls);
}
void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t* frag)
{
SUPER_DESTROY(frag, &mca_ptl_base_recv_frag_cls);
}
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_peer_t* peer)
{
frag->frag_owner = &peer->peer_ptl->super;
frag->frag_match = 0;
frag->frag_peer = peer;
frag->frag_addr = 0;
frag->frag_size = 0;
frag->frag_hdr_cnt = 0;
frag->frag_msg_cnt = 0;
}
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd) == false)
return false;
if(frag->frag_msg_cnt < frag->frag_size)
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
return false;
if(frag->frag_msg_cnt < frag->frag_header.hdr_frag_length)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
return false;
/* done - do something */
return true;
}
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
/* non-blocking read - continue if interrupted, otherwise wait until data available */
unsigned char* ptr = (unsigned char*)&frag->frag_header;
int cnt = -1;
while(cnt < 0) {
cnt = recv(sd, ptr + frag->frag_hdr_cnt, sizeof(mca_ptl_base_header_t) - frag->frag_hdr_cnt, 0);
if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false;
}
if(cnt < 0) {
switch(errno) {
case EINTR:
continue;
case EWOULDBLOCK:
return false;
default:
lam_output(0, "mca_ptl_tcp_recv_frag_header: recv() failed with errno=%d", errno);
mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false;
}
}
}
/* is the entire header available? */
frag->frag_hdr_cnt += cnt;
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t))
return false;
/* attempt to match a posted recv */
/* ????? */
/* match was not made - so allocate buffer for eager send */
if(NULL == frag->frag_match) {
frag->frag_addr = (unsigned char*)LAM_MALLOC(frag->frag_header.hdr_frag_length);
frag->frag_size = frag->frag_header.hdr_frag_length;
}
return true;
}
/*
* Continue with non-blocking recv() calls until the entire
* fragment is received.
*/
static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
int cnt = -1;
while(cnt < 0) {
cnt = recv(sd, (unsigned char*)frag->frag_addr+frag->frag_msg_cnt, frag->frag_size-frag->frag_msg_cnt, 0);
if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false;
}
if(cnt < 0) {
switch(errno) {
case EINTR:
continue;
case EWOULDBLOCK:
return false;
default:
lam_output(0, "mca_ptl_tcp_recv_frag_data: recv() failed with errno=%d", errno);
mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false;
}
}
}
frag->frag_msg_cnt += cnt;
return (frag->frag_msg_cnt >= frag->frag_size);
}
/*
* If the app posted a receive buffer smaller than the
* fragment, receive and discard remaining bytes.
*/
static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
int cnt = -1;
while(cnt < 0) {
void *rbuf = LAM_MALLOC(frag->frag_header.hdr_frag_length - frag->frag_msg_cnt);
cnt = recv(sd, rbuf, frag->frag_header.hdr_frag_length - frag->frag_msg_cnt, 0);
if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false;
}
if(cnt < 0) {
switch(errno) {
case EINTR:
continue;
case EWOULDBLOCK:
return false;
default:
lam_output(0, "mca_ptl_tcp_recv_frag_discard: recv() failed with errno=%d", errno);
mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false;
}
}
}
frag->frag_msg_cnt += cnt;
return (frag->frag_msg_cnt >= frag->frag_header.hdr_frag_length);
}

39
src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
/* @file
*
* $HEADER$
*/
#ifndef MCA_PTL_TCP_RECV_FRAG_H
#define MCA_PTL_TCP_RECV_FRAG_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/ptl/base/ptl_base_recvfrag.h"
extern lam_class_info_t mca_ptl_tcp_recv_frag_cls;
struct mca_ptl_tcp_recv_frag_t {
mca_ptl_base_recv_frag_t super;
struct mca_ptl_peer_t* frag_peer;
unsigned char* frag_addr;
size_t frag_size;
size_t frag_hdr_cnt;
size_t frag_msg_cnt;
#define frag_match super.frag_match
#define frag_owner super.super.frag_owner
#define frag_header super.super.frag_header
};
typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t;
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t*);
void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t*);
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd);
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t*, struct mca_ptl_peer_t*);
#endif

Просмотреть файл

@ -1,15 +1,30 @@
/*
* $HEADER$
*/
#include "mca/mpi/ptl/base/ptl_base_sendreq.h"
#include "mca/mpi/ptl/base/ptl_base_sendfrag.h"
#include "ptl_tcp.h"
#include "ptl_tcp_peer.h"
#include "ptl_tcp_sendfrag.h"
int mca_ptl_tcp_send(
struct mca_ptl_t* ptl,
struct mca_ptl_peer_t* ptl_addr,
struct mca_ptl_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t* sendreq,
size_t size,
bool* complete)
{
return LAM_SUCCESS;
mca_ptl_tcp_send_frag_t* sendfrag;
if (sendreq->req_frags == 0) {
sendfrag = (mca_ptl_tcp_send_frag_t*)(sendreq+1);
} else {
int rc;
sendfrag = (mca_ptl_tcp_send_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_frags, &rc);
if(sendfrag == 0)
return rc;
}
mca_ptl_tcp_send_frag_reinit(sendfrag, ptl_peer, sendreq, size);
return mca_ptl_tcp_peer_send(ptl_peer, sendfrag);
}

127
src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c Обычный файл
Просмотреть файл

@ -0,0 +1,127 @@
/*
* $HEADER$
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/errno.h>
#include "lam/types.h"
#include "mca/mpi/ptl/base/ptl_base_sendreq.h"
#include "ptl_tcp.h"
#include "ptl_tcp_peer.h"
#include "ptl_tcp_sendfrag.h"
lam_class_info_t mca_ptl_peer_cls = {
"mca_ptl_tcp_send_frag_t",
&mca_ptl_base_send_frag_cls,
(class_init_t)mca_ptl_tcp_send_frag_init,
(class_destroy_t)mca_ptl_tcp_send_frag_destroy
};
void mca_ptl_tcp_send_frag_init(mca_ptl_tcp_send_frag_t* frag)
{
SUPER_INIT(frag, &mca_ptl_base_send_frag_cls);
}
void mca_ptl_tcp_send_frag_destroy(mca_ptl_tcp_send_frag_t* frag)
{
SUPER_DESTROY(frag, &mca_ptl_base_send_frag_cls);
}
/*
* Initialize the fragment based on the current offset into the users
* data buffer, and the indicated size.
*/
void mca_ptl_tcp_send_frag_reinit(
mca_ptl_tcp_send_frag_t* sendfrag,
mca_ptl_peer_t* ptl_peer,
mca_ptl_base_send_request_t* sendreq,
size_t size)
{
/* message header */
mca_ptl_base_header_t* hdr = &sendfrag->frag_header;
hdr->hdr_contextid = sendreq->super.req_communicator->c_contextid;
hdr->hdr_src_rank = sendreq->super.req_communicator->c_rank;
hdr->hdr_dst_rank = sendreq->super.req_peer;
hdr->hdr_user_tag = sendreq->super.req_tag;
hdr->hdr_msg_type = sendreq->req_send_mode;
hdr->hdr_msg_length = sendreq->req_length;
hdr->hdr_msg_offset = sendreq->req_offset;
hdr->hdr_msg_seq = 0;
hdr->hdr_frag_seq = 0;
/* update request */
if(sendreq->req_offset + size > sendreq->req_length)
size = sendreq->req_length = sendreq->req_offset;
hdr->hdr_frag_length = size;
sendreq->req_offset += size;
sendreq->req_frags++;
/* fragment state */
sendfrag->frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->super.frag_request = sendreq;
sendfrag->super.frag_data = sendreq->req_data + hdr->hdr_msg_offset;
sendfrag->super.frag_size = size;
sendfrag->frag_peer = ptl_peer;
sendfrag->frag_vec_ptr = sendfrag->frag_vec;
sendfrag->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr;
sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
sendfrag->frag_vec_cnt = 1;
if(size > 0) {
sendfrag->frag_vec[1].iov_base = (lam_iov_base_ptr_t)sendfrag->super.frag_data;
sendfrag->frag_vec[1].iov_len = sendfrag->super.frag_size;
sendfrag->frag_vec_cnt++;
}
}
/*
* The socket is setup as non-blocking, writes are handled asynchronously,
* with callbacks from the reactor when the socket is ready for writes.
*/
bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t* frag, int sd)
{
int cnt=-1;
size_t i;
/* non-blocking write, but continue if interrupted */
while(cnt < 0) {
cnt = writev(sd, frag->frag_vec_ptr, frag->frag_vec_cnt);
if(cnt < 0) {
switch(errno) {
case EINTR:
continue;
case EWOULDBLOCK:
return false;
default:
{
lam_output(0, "mca_ptl_tcp_send_frag_handler: writev failedd with errno=%d", errno);
mca_ptl_tcp_peer_close(frag->frag_peer);
return false;
}
}
}
}
/* if the write didn't complete - update the iovec state */
size_t num_vecs = frag->frag_vec_cnt;
for(i=0; i<num_vecs; i++) {
if(cnt >= (int)frag->frag_vec_ptr->iov_len) {
cnt -= frag->frag_vec_ptr->iov_len;
frag->frag_vec_ptr++;
frag->frag_vec_cnt--;
} else {
frag->frag_vec_ptr->iov_base = (lam_iov_base_ptr_t)
(((unsigned char*)frag->frag_vec_ptr->iov_base) + cnt);
frag->frag_vec_ptr->iov_len -= cnt;
break;
}
}
return (frag->frag_vec_cnt == 0);
}

42
src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h Обычный файл
Просмотреть файл

@ -0,0 +1,42 @@
/* @file
*
* $HEADER$
*/
#ifndef MCA_PTL_TCP_SEND_FRAG_H
#define MCA_PTL_TCP_SEND_FRAG_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "lam_config.h"
#include "mca/mpi/ptl/base/ptl_base_sendfrag.h"
extern lam_class_info_t mca_ptl_tcp_send_frag_cls;
struct mca_ptl_tcp_send_frag_t {
mca_ptl_base_send_frag_t super;
struct mca_ptl_peer_t* frag_peer;
struct iovec *frag_vec_ptr;
size_t frag_vec_cnt;
struct iovec frag_vec[2];
#define frag_header super.super.frag_header
#define frag_owner super.super.frag_owner
};
typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t;
void mca_ptl_tcp_send_frag_init(mca_ptl_tcp_send_frag_t*);
void mca_ptl_tcp_send_frag_destroy(mca_ptl_tcp_send_frag_t*);
bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t*, int sd);
void mca_ptl_tcp_send_frag_reinit(
mca_ptl_tcp_send_frag_t*,
struct mca_ptl_peer_t*,
struct mca_ptl_base_send_request_t*,
size_t);
#endif

Просмотреть файл

@ -16,6 +16,7 @@ struct lam_communicator_t {
uint32_t c_contextid;
int c_refcount;
int c_flags;
size_t c_rank; /* local rank */
lam_group_t *c_local_group;
lam_group_t *c_remote_group;
@ -46,20 +47,24 @@ typedef struct lam_communicator_t lam_communicator_t;
static inline lam_communicator_t *lam_comm_lookup(uint32_t cid)
{
/* array of pointers to communicators, indexed by context ID */
extern lam_communicator_t **lam_cummunicator_ptrs;
extern uint32_t len_lam_cummunicator_ptrs;
extern lam_communicator_t **lam_communicator_array;
#ifdef LAM_ENABLE_DEBUG
if(cid >= len_lam_cummunicator_ptrs)
extern uint32_t lam_communicator_array_len;
if(cid >= lam_communicator_array_len) {
lam_output(0, "lam_comm_lookup: invalid communicator index (%d)", cid);
return (lam_communicator_t *) NULL;
}
#endif
return lam_cummunicator_ptrs[cid];
return lam_communicator_array[cid];
}
static inline lam_proc_t* lam_comm_lookup_peer(lam_communicator_t* comm, int peer_id)
{
#ifdef LAM_ENABLE_DEBUG
if(peer_id >= comm->c_remote_group->g_proc_count)
if(peer_id >= comm->c_remote_group->g_proc_count) {
lam_output(0, "lam_comm_lookup_peer: invalid peer index (%d)", peer_id);
return (lam_proc_t *) NULL;
}
#endif
return comm->c_remote_group->g_procs[peer_id];
}

Просмотреть файл

@ -421,12 +421,14 @@ static inline void *lam_memcpy(void *dst, const void *src, size_t size,
return memcpy(dst, src, size);
}
#if 0
uint32_t lam_crc32(const void *restrict buffer, size_t size, uint32_t initial_crc);
uint32_t lam_sum32(const void *restrict buffer, size_t size, uint32_t initial_crc);
void *lam_memcpy_sum32(void *dst, const void *src, size_t size,
lam_memcpy_state_t *state);
void *lam_memcpy_crc32(void *dst, const void *src, size_t size,
lam_memcpy_state_t *state);
#endif
/**
* Copy data from one buffer to another and calculate a 32-bit checksum

Просмотреть файл

@ -12,7 +12,6 @@ struct lam_group_t {
char g_name[MPI_MAX_OBJECT_NAME];
/* Processes */
lam_proc_t **g_procs;
size_t g_proc_count;

Просмотреть файл

@ -5,6 +5,7 @@
#ifndef LAM_PROC
#define LAM_PROC
#include "lam/types.h"
#include "lam/lfc/list.h"
@ -14,8 +15,8 @@ extern lam_list_t lam_procs;
struct lam_proc_t {
lam_list_item_t super; /* allow proc to be placed on a list */
char* proc_jobid; /* identifies a unique job */
int proc_vpid; /* process identifier w/in the job */
lam_job_handle_t proc_job; /* identifies a unique job */
uint32_t proc_vpid; /* process identifier w/in the job */
struct mca_pml_proc_t* proc_pml; /* PML specific proc data */
/* JMS: need to have the following information:
@ -32,6 +33,7 @@ typedef struct lam_proc_t lam_proc_t;
void lam_proc_init(lam_proc_t*);
void lam_proc_destroy(lam_proc_t*);
lam_proc_t* lam_proc_local(void);
#endif /* LAM_PROC */