1
1
This commit was SVN r653.
Этот коммит содержится в:
Tim Woodall 2004-02-05 17:12:59 +00:00
родитель 056485c6da
Коммит 086fa5b7cb
20 изменённых файлов: 107 добавлений и 115 удалений

Просмотреть файл

@ -9,6 +9,7 @@
typedef lam_lock_data_t lam_mutex_t;
#define lam_mutex_init(m) spinunlock(m)
#define lam_mutex_destroy(m)
#define lam_mutex_lock(m) spinlock(m)
#define lam_mutex_trylock(m) spintrylock(m)
#define lam_mutex_unlock(m) spinunlock(m)

Просмотреть файл

@ -30,6 +30,11 @@ static inline void lam_mutex_init(lam_mutex_t* m)
pthread_cond_init(&m->mutex_cond, 0);
}
static inline void lam_mutex_destroy(lam_mutex_t* m)
{
}
static inline void lam_mutex_lock(lam_mutex_t* m)
{

Просмотреть файл

@ -29,6 +29,7 @@ struct lam_if_t {
char if_name[IF_NAMESIZE];
int if_index;
int if_flags;
int if_speed;
struct sockaddr_in if_addr;
struct sockaddr_in if_mask;
uint32_t if_bandwidth;

Просмотреть файл

@ -6,6 +6,7 @@
#include "lam/mem/malloc.h"
#include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/ptl/base/base.h"
#include "mca/mpi/ptl/base/ptl_base_comm.h"
#include "mca/mpi/ptl/base/ptl_base_sendreq.h"
#include "mca/mpi/ptl/base/ptl_base_recvreq.h"
@ -66,20 +67,32 @@ static int ptl_exclusivity_compare(const void* arg1, const void* arg2)
int mca_pml_teg_add_ptls(lam_list_t *ptls)
{
/* sort the ptls by exclusivity */
#if TIM_HASNT_IMPLEMENTED_THIS_YET
/* build an array of ptls and ptl modules */
mca_ptl_base_selected_module_t* selected_ptl;
size_t num_ptls = lam_list_get_size(ptls);
mca_pml_teg.teg_num_ptls = 0;
mca_pml_teg.teg_num_ptl_modules = 0;
mca_pml_teg.teg_ptls = (mca_ptl_t**)LAM_MALLOC(sizeof(mca_ptl_t*) * num_ptls);
mca_pml_teg.teg_ptl_modules = (mca_ptl_base_module_t**)LAM_MALLOC(sizeof(mca_ptl_base_module_t*) * num_ptls);
if (NULL == mca_pml_teg.teg_ptls || NULL == mca_pml_teg.teg_ptl_modules)
return LAM_ERR_OUT_OF_RESOURCE;
/* Tim: you now get a lam_list_t of
(mca_ptl_base_selected_module_t*)'s (see
mca/mpi/ptl/base/base.h).
for(selected_ptl = (mca_ptl_base_selected_module_t*)lam_list_get_first(ptls);
selected_ptl != (mca_ptl_base_selected_module_t*)lam_list_get_last(ptls);
selected_ptl = (mca_ptl_base_selected_module_t*)lam_list_get_next(selected_ptl)) {
mca_ptl_t *ptl = selected_ptl->pbsm_actions;
size_t i;
You do not own this memory, and therefore do not need to free
anything in the lam_list_t that you receive here. */
mca_pml_teg.teg_ptls[mca_pml_teg.teg_num_ptls++] = ptl;
for(i=0; i<mca_pml_teg.teg_num_ptl_modules; i++)
if(mca_pml_teg.teg_ptl_modules[i] == ptl->ptl_module)
break;
if(i == mca_pml_teg.teg_num_ptl_modules)
mca_pml_teg.teg_ptl_modules[mca_pml_teg.teg_num_ptl_modules++] = ptl->ptl_module;
}
qsort(ptls, nptls, sizeof(struct mca_ptl_t*), ptl_exclusivity_compare);
mca_pml_teg.teg_ptls = ptls;
mca_pml_teg.teg_num_ptls = nptls;
#endif
/* sort ptl list by exclusivity */
qsort(mca_pml_teg.teg_ptls, mca_pml_teg.teg_num_ptls, sizeof(struct mca_ptl_t*), ptl_exclusivity_compare);
return LAM_SUCCESS;
}

Просмотреть файл

@ -31,7 +31,6 @@ struct mca_pml_teg_t {
size_t teg_num_ptls;
lam_list_t teg_procs;
lam_list_t teg_incomplete_sends;
lam_mutex_t teg_lock;
int teg_free_list_num; /* initial size of free list */

Просмотреть файл

@ -71,6 +71,12 @@ int mca_pml_teg_module_open(void)
int mca_pml_teg_module_close(void)
{
if(NULL != mca_pml_teg.teg_ptl_modules)
LAM_FREE(mca_pml_teg.teg_ptl_modules);
if(NULL != mca_pml_teg.teg_ptls)
LAM_FREE(mca_pml_teg.teg_ptls);
STATIC_DESTROY(mca_pml_teg.teg_recv_requests);
STATIC_DESTROY(mca_pml_teg.teg_procs);
return LAM_SUCCESS;
}
@ -83,12 +89,12 @@ mca_pml_t* mca_pml_teg_module_init(int* priority,
*allow_multi_user_threads = true;
*have_hidden_threads = false;
mca_pml_teg.teg_ptl_modules = 0;
mca_pml_teg.teg_ptl_modules = NULL;
mca_pml_teg.teg_num_ptl_modules = 0;
mca_pml_teg.teg_ptls = 0;
mca_pml_teg.teg_ptls = NULL;
mca_pml_teg.teg_num_ptls = 0;
lam_free_list_init(&mca_pml_teg.teg_recv_requests);
STATIC_INIT(mca_pml_teg.teg_recv_requests, &lam_free_list_cls);
lam_free_list_init_with(
&mca_pml_teg.teg_recv_requests,
sizeof(mca_ptl_base_recv_request_t),
@ -98,7 +104,6 @@ mca_pml_t* mca_pml_teg_module_init(int* priority,
mca_pml_teg.teg_free_list_inc,
NULL);
STATIC_INIT(mca_pml_teg.teg_incomplete_sends, &lam_list_cls);
STATIC_INIT(mca_pml_teg.teg_procs, &lam_list_cls);
lam_mutex_init(&mca_pml_teg.teg_lock);
mca_pml_teg.teg_recv_sequence = 0;

Просмотреть файл

@ -12,27 +12,6 @@ int mca_pml_teg_progress(void)
*/
for(i=0; i<mca_pml_teg.teg_num_ptl_modules; i++)
mca_pml_teg.teg_ptl_modules[i]->ptlm_progress(tstamp);
/*
* Complete any pending send requests.
*/
THREAD_LOCK(&mca_pml_teg.teg_lock);
mca_ptl_base_send_request_t* req;
for(req = (mca_ptl_base_send_request_t*)lam_list_get_first(&mca_pml_teg.teg_incomplete_sends);
req != (mca_ptl_base_send_request_t*)lam_list_get_end(&mca_pml_teg.teg_incomplete_sends);
req = (mca_ptl_base_send_request_t*)lam_list_get_next(req)) {
bool complete;
int rc = mca_pml_teg_send_request_schedule(req, &complete);
if(rc != LAM_SUCCESS) {
continue;
}
if(complete) {
req = (mca_ptl_base_send_request_t*)lam_list_remove_item(
&mca_pml_teg.teg_incomplete_sends, (lam_list_item_t*)req);
}
}
THREAD_UNLOCK(&mca_pml_teg.teg_lock);
return LAM_SUCCESS;
}

Просмотреть файл

@ -1,18 +1,2 @@
#include "mca/mpi/ptl/base/ptl_base_comm.h"
#include "pml_teg_recvreq.h"
int mca_pml_teg_recv_request_start(mca_ptl_base_recv_request_t* req)
{
THREAD_SCOPED_LOCK(&mca_pml_teg.teg_lock,
(req->req_sequence = mca_pml_teg.teg_recv_sequence++));
req->super.req_status = MCA_PML_STATUS_INCOMPLETE;
if(req->super.req_peer == LAM_ANY_TAG) {
mca_ptl_base_recv_request_match_wild(req);
} else {
mca_ptl_base_recv_request_match_specific(req);
}
return LAM_SUCCESS;
}

Просмотреть файл

@ -28,7 +28,19 @@ static inline void mca_pml_teg_recv_request_return(mca_ptl_base_recv_request_t*
/*
* Progress an initialized request.
*/
int mca_pml_teg_recv_request_start(mca_ptl_base_recv_request_t*);
static inline int mca_pml_teg_recv_request_start(mca_ptl_base_recv_request_t* req)
{
THREAD_SCOPED_LOCK(&mca_pml_teg.teg_lock,
(req->req_sequence = mca_pml_teg.teg_recv_sequence++));
req->super.req_status = MCA_PML_STATUS_INCOMPLETE;
if(req->super.req_peer == LAM_ANY_TAG) {
mca_ptl_base_recv_request_match_wild(req);
} else {
mca_ptl_base_recv_request_match_specific(req);
}
return LAM_SUCCESS;
}
#endif

Просмотреть файл

@ -46,20 +46,13 @@ static inline int mca_pml_teg_send_request_start(
mca_ptl_t* ptl = req->req_owner;
size_t first_fragment_size = ptl->ptl_first_frag_size;
int rc;
bool complete;
// start the first fragment
if(req->req_length < first_fragment_size)
first_fragment_size = req->req_length;
rc = ptl->ptl_send(ptl, req->req_peer, req, first_fragment_size, &complete);
rc = ptl->ptl_send(ptl, req->req_peer, req, first_fragment_size);
if(rc != LAM_SUCCESS)
return rc;
// if incomplete queue to retry later
if(complete == false) {
THREAD_SCOPED_LOCK(&mca_pml_teg.teg_lock,
lam_list_append(&mca_pml_teg.teg_incomplete_sends, (lam_list_item_t*)req));
}
return LAM_SUCCESS;
}

Просмотреть файл

@ -136,8 +136,7 @@ typedef int (*mca_ptl_base_send_fn_t)(
struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_base_peer,
struct mca_ptl_base_send_request_t* send_request,
size_t size,
bool* complete
size_t size
);
typedef int (*mca_ptl_base_recv_fn_t)(

Просмотреть файл

@ -135,8 +135,7 @@ int mca_ptl_tcp_send(
struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t* sendreq,
size_t size,
bool* complete)
size_t size)
{
mca_ptl_tcp_send_frag_t* sendfrag;
if (sendreq->req_frags == 0) {

Просмотреть файл

@ -3,7 +3,7 @@
* $HEADER$
*/
#ifndef MCA_PTL_TCP_H_
#ifndef MCA_PTL_TCP_H
#define MCA_PTL_TCP_H
#include <sys/types.h>
@ -36,6 +36,8 @@ struct mca_ptl_tcp_module_1_0_0_t {
lam_free_list_t tcp_send_frags;
lam_free_list_t tcp_recv_frags;
lam_list_t tcp_procs;
struct mca_ptl_tcp_proc_t* tcp_local;
lam_mutex_t tcp_lock;
};
typedef struct mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_1_0_0_t;
typedef struct mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_t;
@ -111,8 +113,7 @@ extern int mca_ptl_tcp_send(
struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t*,
size_t size,
bool* complete
size_t size
);
extern int mca_ptl_tcp_recv(

Просмотреть файл

@ -130,6 +130,13 @@ int mca_ptl_tcp_module_close(void)
LAM_FREE(mca_ptl_tcp_module.tcp_if_exclude);
if (NULL != mca_ptl_tcp_module.tcp_ptls)
LAM_FREE(mca_ptl_tcp_module.tcp_ptls);
STATIC_DESTROY(mca_ptl_tcp_module.tcp_reactor);
STATIC_DESTROY(mca_ptl_tcp_module.tcp_procs);
STATIC_DESTROY(mca_ptl_tcp_module.tcp_send_requests);
STATIC_DESTROY(mca_ptl_tcp_module.tcp_send_frags);
STATIC_DESTROY(mca_ptl_tcp_module.tcp_recv_frags);
lam_mutex_destroy(&mca_ptl_tcp_module.tcp_lock);
return LAM_SUCCESS;
}
@ -277,6 +284,8 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls,
/* initialize containers */
STATIC_INIT(mca_ptl_tcp_module.tcp_reactor, &lam_reactor_cls);
STATIC_INIT(mca_ptl_tcp_module.tcp_procs, &lam_list_cls);
lam_mutex_init(&mca_ptl_tcp_module.tcp_lock);
/* initialize free lists */
STATIC_INIT(mca_ptl_tcp_module.tcp_send_requests, &lam_free_list_cls);

Просмотреть файл

@ -13,14 +13,8 @@
#include "ptl_tcp_proc.h"
lam_class_info_t mca_ptl_tcp_peer_cls = {
"mca_tcp_ptl_peer_t",
&lam_list_cls,
(class_init_t)mca_ptl_tcp_peer_init,
(class_destroy_t)mca_ptl_tcp_peer_destroy
};
static void mca_ptl_tcp_peer_init(mca_ptl_base_peer_t* ptl_peer);
static void mca_ptl_tcp_peer_destroy(mca_ptl_base_peer_t* ptl_peer);
static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t*);
@ -29,6 +23,13 @@ static void mca_ptl_tcp_peer_send_handler(mca_ptl_base_peer_t*, int sd);
static void mca_ptl_tcp_peer_except_handler(mca_ptl_base_peer_t*, int sd);
lam_class_info_t mca_ptl_tcp_peer_cls = {
"mca_tcp_ptl_peer_t",
&lam_list_cls,
(class_init_t)mca_ptl_tcp_peer_init,
(class_destroy_t)mca_ptl_tcp_peer_destroy
};
static lam_reactor_listener_t mca_ptl_tcp_peer_listener = {
(lam_rl_recv_handler_fn_t)mca_ptl_tcp_peer_recv_handler,

Просмотреть файл

@ -51,9 +51,7 @@ struct mca_ptl_base_peer_t {
typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t;
void mca_ptl_tcp_peer_destroy(mca_ptl_base_peer_t*);
void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t*);
void mca_ptl_tcp_peer_init(mca_ptl_base_peer_t*);
int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t*, mca_ptl_tcp_send_frag_t*);
bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t*, struct sockaddr_in*, int);

Просмотреть файл

@ -10,6 +10,10 @@
#include "ptl_tcp_proc.h"
static void mca_ptl_tcp_proc_init(mca_ptl_tcp_proc_t* proc);
static void mca_ptl_tcp_proc_destroy(mca_ptl_tcp_proc_t* proc);
static mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup_lam(lam_proc_t* lam_proc);
lam_class_info_t mca_ptl_tcp_proc_cls = {
"mca_ptl_tcp_proc_t",
&lam_list_item_cls,
@ -17,21 +21,9 @@ lam_class_info_t mca_ptl_tcp_proc_cls = {
(class_destroy_t)mca_ptl_tcp_proc_destroy
};
static lam_list_t mca_ptl_tcp_procs;
static lam_mutex_t mca_ptl_tcp_proc_mutex;
static mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup_lam(lam_proc_t* lam_proc);
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_self = 0;
void mca_ptl_tcp_proc_init(mca_ptl_tcp_proc_t* proc)
{
static int inited = 0;
if(fetchNset(&inited, 1) == 0) {
lam_list_init(&mca_ptl_tcp_procs);
lam_mutex_init(&mca_ptl_tcp_proc_mutex);
}
SUPER_INIT(proc, &lam_list_item_cls);
proc->proc_lam = 0;
proc->proc_addrs = 0;
@ -41,16 +33,18 @@ void mca_ptl_tcp_proc_init(mca_ptl_tcp_proc_t* proc)
lam_mutex_init(&proc->proc_lock);
/* add to list of all proc instance */
THREAD_LOCK(&mca_ptl_tcp_proc_mutex);
lam_list_append(&mca_ptl_tcp_procs, &proc->super);
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);
lam_list_append(&mca_ptl_tcp_module.tcp_procs, &proc->super);
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
}
void mca_ptl_tcp_proc_destroy(mca_ptl_tcp_proc_t* proc)
{
/* remove from list of all proc instances */
lam_list_remove_item(&mca_ptl_tcp_procs, &proc->super);
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);
lam_list_remove_item(&mca_ptl_tcp_module.tcp_procs, &proc->super);
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
/* release resources */
if(NULL != proc->proc_peers)
@ -116,8 +110,8 @@ mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_create(lam_proc_t* lam_proc)
OBJ_RELEASE(ptl_proc);
return NULL;
}
if(NULL == mca_ptl_tcp_proc_self && lam_proc == lam_proc_local())
mca_ptl_tcp_proc_self = ptl_proc;
if(NULL == mca_ptl_tcp_module.tcp_local && lam_proc == lam_proc_local())
mca_ptl_tcp_module.tcp_local = ptl_proc;
return ptl_proc;
}
@ -128,16 +122,16 @@ mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_create(lam_proc_t* lam_proc)
static mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup_lam(lam_proc_t* lam_proc)
{
mca_ptl_tcp_proc_t* tcp_proc;
THREAD_LOCK(&mca_ptl_tcp_proc_mutex);
for(tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_first(&mca_ptl_tcp_procs);
tcp_proc != (mca_ptl_tcp_proc_t*)lam_list_get_end(&mca_ptl_tcp_procs);
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);
for(tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_first(&mca_ptl_tcp_module.tcp_procs);
tcp_proc != (mca_ptl_tcp_proc_t*)lam_list_get_end(&mca_ptl_tcp_module.tcp_procs);
tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_next(tcp_proc)) {
if(tcp_proc->proc_lam == lam_proc) {
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
return tcp_proc;
}
}
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
return NULL;
}
@ -149,16 +143,16 @@ static mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup_lam(lam_proc_t* lam_proc)
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup(void *guid, size_t size)
{
mca_ptl_tcp_proc_t* tcp_proc;
THREAD_LOCK(&mca_ptl_tcp_proc_mutex);
for(tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_first(&mca_ptl_tcp_procs);
tcp_proc != (mca_ptl_tcp_proc_t*)lam_list_get_end(&mca_ptl_tcp_procs);
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);
for(tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_first(&mca_ptl_tcp_module.tcp_procs);
tcp_proc != (mca_ptl_tcp_proc_t*)lam_list_get_end(&mca_ptl_tcp_module.tcp_procs);
tcp_proc = (mca_ptl_tcp_proc_t*)lam_list_get_next(tcp_proc)) {
if(tcp_proc->proc_guid_size == size && memcmp(tcp_proc->proc_guid, guid, size) == 0) {
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
return tcp_proc;
}
}
THREAD_UNLOCK(&mca_ptl_tcp_proc_mutex);
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
return NULL;
}

Просмотреть файл

@ -11,6 +11,7 @@
#include <netinet/in.h>
#include "lam/lfc/object.h"
#include "mpi/proc/proc.h"
#include "ptl_tcp.h"
#include "ptl_tcp_peer.h"
extern lam_class_info_t mca_ptl_tcp_proc_cls;
@ -35,18 +36,15 @@ struct mca_ptl_tcp_proc_t {
typedef struct mca_ptl_tcp_proc_t mca_ptl_tcp_proc_t;
void mca_ptl_tcp_proc_init(mca_ptl_tcp_proc_t*);
void mca_ptl_tcp_proc_destroy(mca_ptl_tcp_proc_t*);
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_create(lam_proc_t* lam_proc);
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup(void *guid, size_t size);
static inline mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_local(void)
{
extern mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_self;
if(NULL == mca_ptl_tcp_proc_self)
mca_ptl_tcp_proc_self = mca_ptl_tcp_proc_create(lam_proc_local());
return mca_ptl_tcp_proc_self;
if(NULL == mca_ptl_tcp_module.tcp_local)
mca_ptl_tcp_module.tcp_local = mca_ptl_tcp_proc_create(lam_proc_local());
return mca_ptl_tcp_module.tcp_local;
}
int mca_ptl_tcp_proc_insert(mca_ptl_tcp_proc_t*, mca_ptl_base_peer_t*);

Просмотреть файл

@ -246,3 +246,4 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
}

Просмотреть файл

@ -25,7 +25,7 @@ lam_class_info_t mca_ptl_tcp_send_request_cls = {
void mca_ptl_tcp_send_request_init(mca_ptl_tcp_send_request_t* request)
{
SUPER_INIT(request, &mca_ptl_base_send_request_cls);
STATIC_INIT(request->req_frag, &mca_ptl_tcp_send_request_cls);
STATIC_INIT(request->req_frag, &mca_ptl_tcp_send_frag_cls);
}