1
1

- resolved multiple threading issues

This commit was SVN r1009.
Этот коммит содержится в:
Tim Woodall 2004-03-31 17:00:38 +00:00
родитель 1f2b72b610
Коммит 14974727a4
28 изменённых файлов: 291 добавлений и 270 удалений

Просмотреть файл

@ -65,6 +65,8 @@ int mca_base_init_select_modules(int requested,
/* JMS ...Do more here with the thread level, etc.... */
*provided = requested;
if(have_hidden_threads)
lam_set_using_threads(true);
/* Tell the selected pml module about all the selected ptl
modules */

Просмотреть файл

@ -55,20 +55,17 @@ OBJ_CLASS_INSTANCE(
struct mca_base_modex_t {
lam_object_t super;
lam_list_t modex_modules;
lam_mutex_t modex_lock;
};
typedef struct mca_base_modex_t mca_base_modex_t;
static void mca_base_modex_construct(mca_base_modex_t* modex)
{
OBJ_CONSTRUCT(&modex->modex_lock, lam_mutex_t);
OBJ_CONSTRUCT(&modex->modex_modules, lam_list_t);
}
static void mca_base_modex_destruct(mca_base_modex_t* modex)
{
OBJ_DESTRUCT(&modex->modex_modules);
OBJ_DESTRUCT(&modex->modex_lock);
}
OBJ_CLASS_INSTANCE(
@ -88,16 +85,13 @@ static inline mca_base_modex_module_t* mca_base_modex_lookup_module(
mca_base_module_t* module)
{
mca_base_modex_module_t* modex_module;
THREAD_LOCK(&modex->modex_lock);
for(modex_module = (mca_base_modex_module_t*)lam_list_get_first(&modex->modex_modules);
modex_module != (mca_base_modex_module_t*)lam_list_get_end(&modex->modex_modules);
modex_module = (mca_base_modex_module_t*)lam_list_get_next(modex_module)) {
if(mca_base_module_compare(modex_module->module, module) == 0) {
THREAD_UNLOCK(&modex->modex_lock);
return modex_module;
}
}
THREAD_UNLOCK(&modex->modex_lock);
return NULL;
}
@ -111,7 +105,6 @@ static inline mca_base_modex_module_t* mca_base_modex_create_module(
mca_base_module_t* module)
{
mca_base_modex_module_t* modex_module;
THREAD_LOCK(&modex->modex_lock);
if(NULL == (modex_module = mca_base_modex_lookup_module(modex, module))) {
modex_module = OBJ_NEW(mca_base_modex_module_t);
if(NULL != modex_module) {
@ -119,7 +112,6 @@ static inline mca_base_modex_module_t* mca_base_modex_create_module(
lam_list_append(&modex->modex_modules, (lam_list_item_t*)modex_module);
}
}
THREAD_UNLOCK(&modex->modex_lock);
return modex_module;
}
@ -139,18 +131,24 @@ int mca_base_modex_send(mca_base_module_t *source_module, const void *buffer, si
if(NULL == self)
return LAM_ERROR;
THREAD_LOCK(&self->proc_lock);
if(NULL == (modex = self->proc_modex)) {
self->proc_modex = modex = OBJ_NEW(mca_base_modex_t);
}
if(NULL == (modex_module = mca_base_modex_create_module(modex, source_module)))
if(NULL == (modex_module = mca_base_modex_create_module(modex, source_module))) {
THREAD_UNLOCK(&self->proc_lock);
return LAM_ERROR;
}
modex_module->module_data = malloc(size);
if(NULL == modex_module->module_data)
if(NULL == modex_module->module_data) {
THREAD_UNLOCK(&self->proc_lock);
return LAM_ERR_OUT_OF_RESOURCE;
}
memcpy(modex_module->module_data, buffer, size);
modex_module->module_data_size = size;
THREAD_UNLOCK(&self->proc_lock);
return LAM_SUCCESS;
}
@ -167,22 +165,29 @@ int mca_base_modex_recv(mca_base_module_t *module, lam_proc_t *source_proc, void
mca_base_modex_module_t* modex_module;
void *copy;
THREAD_LOCK(&source_proc->proc_lock);
if(NULL == (modex = source_proc->proc_modex) ||
NULL == (modex_module = mca_base_modex_lookup_module(modex, module)))
NULL == (modex_module = mca_base_modex_lookup_module(modex, module))) {
THREAD_UNLOCK(&source_proc->proc_lock);
return LAM_ERR_NOT_FOUND;
}
if(0 == modex_module->module_data_size) {
*buffer = NULL;
*size = 0;
THREAD_UNLOCK(&source_proc->proc_lock);
return LAM_SUCCESS;
}
copy = malloc(modex_module->module_data_size);
if(NULL == copy)
if(NULL == copy) {
THREAD_UNLOCK(&source_proc->proc_lock);
return LAM_ERR_OUT_OF_RESOURCE;
}
memcpy(copy, modex_module->module_data, modex_module->module_data_size);
*buffer = copy;
*size = modex_module->module_data_size;
THREAD_UNLOCK(&source_proc->proc_lock);
return LAM_SUCCESS;
}
@ -216,6 +221,7 @@ int mca_base_modex_exchange(void)
}
/* loop through all modules with data cached on local process and send to all peers */
THREAD_LOCK(&self->proc_lock);
for(self_module = (mca_base_modex_module_t*)lam_list_get_first(&modex->modex_modules);
self_module != (mca_base_modex_module_t*)lam_list_get_end(&modex->modex_modules);
self_module = (mca_base_modex_module_t*)lam_list_get_next(self_module)) {
@ -235,6 +241,7 @@ int mca_base_modex_exchange(void)
self_module->module_data_size);
if(rc != LAM_SUCCESS) {
free(procs);
THREAD_UNLOCK(&self->proc_lock);
return rc;
}
}
@ -254,16 +261,20 @@ int mca_base_modex_exchange(void)
if(proc == self)
continue;
THREAD_LOCK(&proc->proc_lock);
if(NULL == proc->proc_modex) {
proc->proc_modex = OBJ_NEW(mca_base_modex_t);
if(NULL == proc->proc_modex) {
free(procs);
THREAD_UNLOCK(&self->proc_lock);
return LAM_ERR_OUT_OF_RESOURCE;
}
}
proc_module = mca_base_modex_create_module(proc->proc_modex, self_module->module);
if(NULL == proc_module) {
free(procs);
THREAD_UNLOCK(&proc->proc_lock);
THREAD_UNLOCK(&self->proc_lock);
return LAM_ERR_OUT_OF_RESOURCE;
}
@ -275,11 +286,15 @@ int mca_base_modex_exchange(void)
&proc_module->module_data_size);
if(rc != LAM_SUCCESS) {
free(procs);
THREAD_UNLOCK(&proc->proc_lock);
THREAD_UNLOCK(&self->proc_lock);
return rc;
}
THREAD_UNLOCK(&proc->proc_lock);
}
}
free(procs);
THREAD_UNLOCK(&self->proc_lock);
return LAM_SUCCESS;
}

Просмотреть файл

@ -11,6 +11,7 @@
#include "request/request.h"
#include "datatype/datatype.h"
#include "communicator/communicator.h"
#include "mca/ptl/ptl.h"
extern lam_class_t mca_pml_base_request_t_class;
@ -35,6 +36,7 @@ struct mca_pml_base_request_t {
int32_t req_tag; /**< user defined tag */
lam_communicator_t *req_comm; /**< communicator pointer */
lam_proc_t* req_proc; /**< peer process */
mca_ptl_base_sequence_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */
lam_datatype_t *req_datatype; /**< pointer to data type */
mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */
lam_status_public_t req_status; /**< completion status */

Просмотреть файл

@ -42,9 +42,6 @@ struct mca_pml_teg_t {
/* free list of recv requests */
lam_free_list_t teg_recv_requests;
/* next recv sequence */
mca_ptl_base_sequence_t teg_recv_sequence;
/* request completion */
lam_mutex_t teg_request_lock;
lam_condition_t teg_request_cond;

Просмотреть файл

@ -102,7 +102,7 @@ mca_pml_t* mca_pml_teg_module_init(int* priority,
{
*priority = 0;
*allow_multi_user_threads = true;
*have_hidden_threads = true;
*have_hidden_threads = false;
OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, lam_mutex_t);
mca_pml_teg.teg_ptl_modules = NULL;
@ -120,9 +120,6 @@ mca_pml_t* mca_pml_teg_module_init(int* priority,
mca_pml_teg.teg_free_list_inc,
NULL);
/* recv sequence */
mca_pml_teg.teg_recv_sequence = 0;
/* request completion */
OBJ_CONSTRUCT(&mca_pml_teg.teg_request_lock, lam_mutex_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_request_cond, lam_condition_t);

Просмотреть файл

@ -7,8 +7,14 @@ void mca_pml_teg_recv_request_progress(
mca_ptl_base_recv_frag_t* frag)
{
lam_mutex_lock(&mca_pml_teg.teg_request_lock);
req->req_bytes_recvd += frag->super.frag_size;
if (req->req_bytes_recvd >= req->super.req_status._count) {
req->req_bytes_delivered += frag->super.frag_size;
req->req_bytes_received += frag->super.frag_header.hdr_frag.hdr_frag_length;
if (req->req_bytes_received >= req->req_bytes_msg) {
/* initialize request status */
req->super.req_status.MPI_SOURCE = req->super.req_peer;
req->super.req_status.MPI_TAG = req->super.req_tag;
req->super.req_status.MPI_ERROR = LAM_SUCCESS;
req->super.req_status._count = req->req_bytes_delivered;
req->super.req_mpi_done = true;
req->super.req_pml_done = true;
if(mca_pml_teg.teg_request_waiting) {

Просмотреть файл

@ -42,9 +42,6 @@ static inline void mca_pml_teg_recv_request_return(mca_ptl_base_recv_request_t*
*/
static inline int mca_pml_teg_recv_request_start(mca_ptl_base_recv_request_t* request)
{
THREAD_SCOPED_LOCK(&mca_pml_teg.teg_lock,
(request->req_sequence = mca_pml_teg.teg_recv_sequence++));
if(request->super.req_peer == LAM_ANY_SOURCE) {
mca_ptl_base_recv_request_match_wild(request);
} else {

Просмотреть файл

@ -25,7 +25,7 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req)
mca_pml_proc_t* proc_pml = proc->proc_pml;
/* allocate remaining bytes to PTLs */
size_t bytes_remaining = req->req_packed_size - req->req_offset;
size_t bytes_remaining = req->req_bytes_msg - req->req_offset;
size_t num_ptl_avail = proc_pml->proc_ptl_next.ptl_size;
size_t num_ptl = 0;
while(bytes_remaining > 0 && num_ptl++ < num_ptl_avail) {
@ -47,14 +47,14 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req)
* previously assigned)
*/
else {
bytes_to_frag = ptl_proc->ptl_weight * req->req_packed_size;
bytes_to_frag = ptl_proc->ptl_weight * req->req_bytes_msg;
if(bytes_to_frag > bytes_remaining)
bytes_to_frag = bytes_remaining;
}
rc = ptl->ptl_send(ptl, ptl_proc->ptl_peer, req, bytes_to_frag, 0);
if(rc == LAM_SUCCESS)
bytes_remaining = req->req_packed_size - req->req_offset;
bytes_remaining = req->req_bytes_msg - req->req_offset;
}
/* unable to complete send - signal request failed */
@ -73,21 +73,28 @@ void mca_pml_teg_send_request_progress(
mca_ptl_base_send_request_t* req,
mca_ptl_base_send_frag_t* frag)
{
bool complete = false;
lam_mutex_lock(&mca_pml_teg.teg_request_lock);
req->req_bytes_sent += frag->super.frag_size;
if (req->req_bytes_sent >= req->req_packed_size) {
req->super.req_mpi_done = true;
if (req->req_bytes_sent >= req->req_bytes_msg) {
req->super.req_pml_done = true;
if(mca_pml_teg.teg_request_waiting) {
lam_condition_broadcast(&mca_pml_teg.teg_request_cond);
}
complete = true;
if (req->super.req_mpi_done == false) {
req->super.req_status.MPI_SOURCE = req->super.req_comm->c_my_rank;
req->super.req_status.MPI_TAG = req->super.req_tag;
req->super.req_status.MPI_ERROR = LAM_SUCCESS;
req->super.req_status._count = req->req_bytes_sent;
req->super.req_mpi_done = true;
if(mca_pml_teg.teg_request_waiting) {
lam_condition_broadcast(&mca_pml_teg.teg_request_cond);
}
} else if (req->super.req_free_called)
mca_pml_teg_free((lam_request_t**)&req);
lam_mutex_unlock(&mca_pml_teg.teg_request_lock);
return;
}
lam_mutex_unlock(&mca_pml_teg.teg_request_lock);
/* if first fragment - schedule remaining fragments */
if(false == complete && req->req_frags == 1) {
if(req->req_frags == 1) {
mca_pml_teg_send_request_schedule(req);
}
}

Просмотреть файл

@ -49,8 +49,8 @@ static inline int mca_pml_teg_send_request_start(
int flags, rc;
/* start the first fragment */
if(req->req_packed_size <= first_fragment_size) {
first_fragment_size = req->req_packed_size;
if(req->req_bytes_msg <= first_fragment_size) {
first_fragment_size = req->req_bytes_msg;
flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ? MCA_PTL_FLAGS_ACK_MATCHED : 0;
} else {
/* require match for first fragment of a multi-fragment message or if synchronous send */

Просмотреть файл

@ -20,7 +20,8 @@ lam_class_t mca_pml_ptl_comm_t_class = {
static void mca_pml_ptl_comm_construct(mca_pml_ptl_comm_t* comm)
{
OBJ_CONSTRUCT(&comm->c_wild_receives, lam_list_t);
OBJ_CONSTRUCT(&comm->c_wild_lock, lam_mutex_t);
OBJ_CONSTRUCT(&comm->c_matching_lock, lam_mutex_t);
comm->c_recv_seq = 0;
}
@ -28,12 +29,11 @@ static void mca_pml_ptl_comm_destruct(mca_pml_ptl_comm_t* comm)
{
free(comm->c_msg_seq);
free(comm->c_next_msg_seq);
free(comm->c_matching_lock);
free(comm->c_unexpected_frags);
free(comm->c_unexpected_frags_lock);
free(comm->c_frags_cant_match);
free(comm->c_specific_receives);
OBJ_DESTRUCT(&comm->c_wild_receives);
OBJ_DESTRUCT(&comm->c_matching_lock);
}
@ -53,15 +53,6 @@ int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size)
return LAM_ERR_OUT_OF_RESOURCE;
memset(comm->c_next_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size);
/* matching lock */
comm->c_matching_lock = malloc(sizeof(lam_mutex_t) * size);
if(NULL == comm->c_matching_lock)
return LAM_ERR_OUT_OF_RESOURCE;
for(i=0; i<size; i++) {
lam_mutex_t *object = comm->c_matching_lock+i;
OBJ_CONSTRUCT(object, lam_mutex_t);
}
/* unexpected fragments queues */
comm->c_unexpected_frags = malloc(sizeof(lam_list_t) * size);
if(NULL == comm->c_unexpected_frags)
@ -71,15 +62,6 @@ int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size)
OBJ_CONSTRUCT(object, lam_list_t);
}
/* these locks are needed to avoid a probe interfering with a match */
comm->c_unexpected_frags_lock = malloc(sizeof(lam_mutex_t) * size);
if(NULL == comm->c_unexpected_frags_lock)
return LAM_ERR_OUT_OF_RESOURCE;
for(i=0; i<size; i++) {
lam_mutex_t* object = comm->c_unexpected_frags_lock+i;
OBJ_CONSTRUCT(object, lam_mutex_t);
}
/* out-of-order fragments queues */
comm->c_frags_cant_match = malloc(sizeof(lam_list_t) * size);
if(NULL == comm->c_frags_cant_match)

Просмотреть файл

@ -22,13 +22,12 @@ struct mca_pml_comm_t {
lam_object_t super;
mca_ptl_base_sequence_t *c_msg_seq; /**< send message sequence number - sender side */
mca_ptl_base_sequence_t *c_next_msg_seq; /**< send message sequence number - receiver side */
lam_mutex_t *c_matching_lock; /**< matching lock */
mca_ptl_base_sequence_t c_recv_seq; /**< recv request sequence number - receiver side */
lam_mutex_t c_matching_lock; /**< matching lock */
lam_list_t *c_unexpected_frags; /**< unexpected fragment queues */
lam_mutex_t *c_unexpected_frags_lock; /**< unexpected fragment locks */
lam_list_t *c_frags_cant_match; /**< out-of-order fragment queues */
lam_list_t *c_specific_receives; /**< queues of unmatched specific (source process specified) receives */
lam_list_t c_wild_receives; /**< queue of unmatched wild (source process not specified) receives */
lam_mutex_t c_wild_lock; /**< lock to protect access to wild receives */
};
typedef struct mca_pml_comm_t mca_pml_ptl_comm_t;
@ -54,9 +53,9 @@ extern int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size);
static inline mca_ptl_base_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst)
{
mca_ptl_base_sequence_t sequence;
lam_mutex_lock(comm->c_matching_lock+dst);
lam_mutex_lock(&comm->c_matching_lock);
sequence = comm->c_msg_seq[dst]++;
lam_mutex_unlock(comm->c_matching_lock+dst);
lam_mutex_unlock(&comm->c_matching_lock);
return sequence;
}

Просмотреть файл

@ -19,6 +19,7 @@ lam_class_t mca_ptl_base_frag_t_class = {
static void mca_ptl_base_frag_construct(mca_ptl_base_frag_t* frag)
{
OBJ_CONSTRUCT(&frag->frag_convertor, lam_convertor_t);
}
static void mca_ptl_base_frag_destruct(mca_ptl_base_frag_t* frag)

Просмотреть файл

@ -73,9 +73,10 @@ static void mca_ptl_base_check_cantmatch_for_match(
* - fragments may be corrupt
* - this routine may be called simoultaneously by more than one thread
*/
int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
mca_ptl_base_recv_frag_t *frag_desc, bool* match_made,
lam_list_t *additional_matches)
bool mca_ptl_base_match(
mca_ptl_base_match_header_t *frag_header,
mca_ptl_base_recv_frag_t *frag_desc,
lam_list_t *additional_matches)
{
/* local variables */
mca_ptl_base_sequence_t frag_msg_seq,next_msg_seq_expected;
@ -83,9 +84,7 @@ int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
mca_ptl_base_recv_request_t *matched_receive;
mca_pml_ptl_comm_t *pml_comm;
int frag_src;
/* initialization */
*match_made=false;
bool match_made=false;
/* communicator pointer */
comm_ptr=lam_comm_lookup(frag_header->hdr_contextid);
@ -104,7 +103,7 @@ int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
* end points) from being processed, and potentially "loosing"
* the fragment.
*/
THREAD_LOCK((pml_comm->c_matching_lock)+frag_src);
THREAD_LOCK(&pml_comm->c_matching_lock);
/* get sequence number of next message that can be processed */
next_msg_seq_expected = *((pml_comm->c_next_msg_seq)+frag_src);
@ -128,7 +127,7 @@ int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
if (matched_receive) {
/* set flag indicating the input fragment was matched */
*match_made=true;
match_made=true;
/* associate the receive descriptor with the fragment
* descriptor */
frag_desc->frag_request=matched_receive;
@ -139,13 +138,9 @@ int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
*/
} else {
/* if no match found, place on unexpected queue - need to
* lock to prevent probe from interfering with updating
* the list */
THREAD_LOCK((pml_comm->c_unexpected_frags_lock)+frag_src);
/* if no match found, place on unexpected queue */
lam_list_append( ((pml_comm->c_unexpected_frags)+frag_src),
(lam_list_item_t *)frag_desc);
THREAD_UNLOCK((pml_comm->c_unexpected_frags_lock)+frag_src);
}
@ -160,16 +155,6 @@ int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
}
/*
* if threaded, ok to release lock, since the posted
* receive is not on any queue, so it won't be
* matched again, and the fragment can be processed
* w/o any conflict from other threads - locks will
* be used where concurent access needs to be managed.
*/
THREAD_UNLOCK((pml_comm->c_matching_lock)+frag_src);
} else {
/*
* This message comes after the next expected, so it
@ -177,13 +162,10 @@ int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
*/
lam_list_append( ((pml_comm->c_frags_cant_match)+frag_src),
(lam_list_item_t *)frag_desc);
/* now that the fragment is on the list, ok to
* release match - other matches may be attempted */
THREAD_UNLOCK((pml_comm->c_matching_lock)+frag_src);
}
return LAM_SUCCESS;
THREAD_UNLOCK(&pml_comm->c_matching_lock);
return match_made;
}
/**
@ -224,9 +206,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match
/*
* There are only wild irecvs, so specialize the algorithm.
*/
THREAD_LOCK(&pml_comm->c_wild_lock);
return_match = mca_ptl_base_check_wild_receives_for_match(frag_header, pml_comm);
THREAD_UNLOCK(&pml_comm->c_wild_lock);
} else if (lam_list_get_size(&(pml_comm->c_wild_receives)) == 0 ) {
/*
@ -238,10 +218,8 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match
/*
* There are some of each.
*/
THREAD_LOCK(&pml_comm->c_wild_lock);
return_match = mca_ptl_base_check_specific_and_wild_receives_for_match(frag_header,
pml_comm);
THREAD_UNLOCK(&pml_comm->c_wild_lock);
}
return return_match;
@ -413,8 +391,8 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
wild_recv = (mca_ptl_base_recv_request_t *)
lam_list_get_first(&(pml_comm->c_wild_receives));
specific_recv_seq = specific_recv->req_sequence;
wild_recv_seq = wild_recv->req_sequence;
specific_recv_seq = specific_recv->super.req_sequence;
wild_recv_seq = wild_recv->super.req_sequence;
while (true) {
if (wild_recv_seq < specific_recv_seq) {
@ -463,7 +441,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
* Get the sequence number for this recv, and go
* back to the top of the loop.
*/
wild_recv_seq = wild_recv->req_sequence;
wild_recv_seq = wild_recv->super.req_sequence;
} else {
/*
@ -508,7 +486,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
* Get the sequence number for this recv, and go
* back to the top of the loop.
*/
specific_recv_seq = specific_recv->req_sequence;
specific_recv_seq = specific_recv->super.req_sequence;
}
}
}
@ -603,13 +581,9 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
} else {
/* if no match found, place on unexpected queue - need to
* lock to prevent probe from interfering with updating
* the list */
THREAD_LOCK((pml_comm->c_unexpected_frags_lock)+frag_src);
/* if no match found, place on unexpected queue */
lam_list_append( ((pml_comm->c_unexpected_frags)+frag_src),
(lam_list_item_t *)frag_desc);
THREAD_UNLOCK((pml_comm->c_unexpected_frags_lock)+frag_src);
}

Просмотреть файл

@ -15,12 +15,11 @@ struct mca_ptl_base_recv_frag_t;
* @param frag_header (IN) Header of received fragment.
* @param frag_desc (IN) Received fragment descriptor.
* @param match_made (OUT) Flag indicating wether a match was made.
* @param additional_matches (OUT) List of additional matches if a match was made.
* @param additional_matches (OUT) List of additional matches
* @return LAM_SUCCESS or error status on failure.
*/
int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
struct mca_ptl_base_recv_frag_t *frag_desc, bool *match_made,
lam_list_t *additional_matches);
bool mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
struct mca_ptl_base_recv_frag_t *frag_desc, lam_list_t *additional_matches);
#endif /* MCA_PTL_BASE_MATCH_H */

Просмотреть файл

@ -24,67 +24,6 @@ struct mca_ptl_base_recv_frag_t {
};
typedef struct mca_ptl_base_recv_frag_t mca_ptl_base_recv_frag_t;
/**
* Initialize the receive fragment after a match has been made.
*
* @param frag (IN) Receive fragment descriptor.
*
* If a buffer has not already been allocated, determine the
* offset into the users buffer (if contigous data), or allocate
* a buffer for the non-contigous case.
*
* TODO: may need to pass in an allocator....
*/
static inline void mca_ptl_base_recv_frag_init(mca_ptl_base_recv_frag_t* frag)
{
mca_ptl_base_recv_request_t* request = frag->frag_request;
mca_ptl_base_match_header_t* header = &frag->super.frag_header.hdr_match;
/* initialize status */
request->super.req_status.MPI_SOURCE = header->hdr_src;
request->super.req_status.MPI_TAG = header->hdr_tag;
request->super.req_status.MPI_ERROR = LAM_SUCCESS;
request->super.req_status._count = header->hdr_msg_length;
if(header->hdr_frag.hdr_frag_length > 0) {
/* initialize receive convertor */
lam_proc_t *proc =
lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
lam_convertor_copy(proc->proc_convertor, &frag->super.frag_convertor);
lam_convertor_init_for_recv(
&frag->super.frag_convertor, /* convertor */
0, /* flags */
request->super.req_datatype, /* datatype */
request->super.req_count, /* count elements */
request->super.req_addr, /* users buffer */
header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */
/* if buffer has not already been allocated for eager
* send - go ahead and figure out offset into users
* buffer (for contigous data) - or allocate a buffer
* for the receive if required.
*/
if(NULL == frag->super.frag_addr) {
struct iovec iov;
iov.iov_base = NULL;
iov.iov_len = header->hdr_frag.hdr_frag_length;
lam_convertor_unpack(&frag->super.frag_convertor, &iov, 1);
/* non-contiguous - allocate buffer for receive */
if(NULL == iov.iov_base) {
frag->super.frag_addr = malloc(iov.iov_len);
frag->frag_is_buffered = true;
/* we now have correct offset into users buffer */
} else {
frag->super.frag_addr = iov.iov_base;
frag->frag_is_buffered = false;
}
frag->super.frag_size = header->hdr_frag.hdr_frag_length;
}
}
}
/**
* Called by the PTL to match attempt a match for new fragments.
@ -93,33 +32,35 @@ static inline void mca_ptl_base_recv_frag_init(mca_ptl_base_recv_frag_t* frag)
* @param header (IN) Header corresponding to the receive fragment.
* @return LAM_SUCCESS or error status on failure.
*/
static inline int mca_ptl_base_recv_frag_match(
static inline bool mca_ptl_base_recv_frag_match(
mca_ptl_base_recv_frag_t* frag,
mca_ptl_base_match_header_t* header)
{
bool matched;
lam_list_t matched_frags;
int rc;
OBJ_CONSTRUCT(&matched_frags, lam_list_t);
if((rc = mca_ptl_base_match(header, frag, &matched, &matched_frags)) != LAM_SUCCESS)
return rc;
if((matched = mca_ptl_base_match(header, frag, &matched_frags)) == false)
frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags);
if(matched) {
do {
mca_ptl_t* ptl = frag->super.frag_owner;
while(NULL != frag) {
mca_ptl_t* ptl = frag->super.frag_owner;
mca_ptl_base_recv_request_t *request = frag->frag_request;
mca_ptl_base_match_header_t *header = &frag->super.frag_header.hdr_match;
/* initialize current fragment */
mca_ptl_base_recv_frag_init(frag);
/* notify ptl of match */
ptl->ptl_recv(ptl, frag);
/*
* Initialize request status.
*/
request->req_bytes_msg = header->hdr_msg_length;
request->super.req_peer = header->hdr_src;
request->super.req_tag = header->hdr_tag;
/* process any additional fragments that arrived out of order */
frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags);
} while(NULL != frag);
}
return LAM_SUCCESS;
/* notify ptl of match */
ptl->ptl_recv(ptl, frag);
/* process any additional fragments that arrived out of order */
frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags);
};
return matched;
}

Просмотреть файл

@ -24,6 +24,8 @@ lam_class_t mca_ptl_base_recv_request_t_class = {
static void mca_ptl_base_recv_request_construct(mca_ptl_base_recv_request_t* request)
{
/* no need to reinit for every recv -- never changes */
request->super.req_type = MCA_PML_REQUEST_RECV;
}
@ -45,12 +47,15 @@ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* reque
mca_ptl_base_recv_frag_t* frag;
/* check for a specific match */
THREAD_LOCK(pml_comm->c_matching_lock+req_peer);
THREAD_LOCK(&pml_comm->c_matching_lock);
/* assign sequence number */
request->super.req_sequence = pml_comm->c_recv_seq++;
if (lam_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 &&
(frag = mca_ptl_base_recv_request_match_specific_proc(request, req_peer)) != NULL) {
mca_ptl_t* ptl = frag->super.frag_owner;
THREAD_UNLOCK(pml_comm->c_matching_lock+req_peer);
mca_ptl_base_recv_frag_init(frag);
THREAD_UNLOCK(&pml_comm->c_matching_lock);
ptl->ptl_recv(ptl, frag);
return; /* match found */
}
@ -59,7 +64,7 @@ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* reque
* it when the message comes in.
*/
lam_list_append(pml_comm->c_specific_receives+req_peer, (lam_list_item_t*)request);
THREAD_UNLOCK(pml_comm->c_matching_lock+req_peer);
THREAD_UNLOCK(&pml_comm->c_matching_lock);
}
@ -81,34 +86,33 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request)
* process, then an inner loop over the messages from the
* process.
*/
THREAD_LOCK(&pml_comm->c_matching_lock);
/* assign sequence number */
request->super.req_sequence = pml_comm->c_recv_seq++;
for (proc = 0; proc < proc_count; proc++) {
mca_ptl_base_recv_frag_t* frag;
/* continue if no frags to match */
THREAD_LOCK(pml_comm->c_matching_lock+proc);
if (lam_list_get_size(&pml_comm->c_unexpected_frags[proc]) == 0) {
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
if (lam_list_get_size(&pml_comm->c_unexpected_frags[proc]) == 0)
continue;
}
/* loop over messages from the current proc */
if ((frag = mca_ptl_base_recv_request_match_specific_proc(request, proc)) != NULL) {
mca_ptl_t* ptl = frag->super.frag_owner;
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
mca_ptl_base_recv_frag_init(frag);
THREAD_UNLOCK(&pml_comm->c_matching_lock);
ptl->ptl_recv(ptl, frag);
return; /* match found */
}
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
}
/* We didn't find any matches. Record this irecv so we can match to
* it when the message comes in.
*/
THREAD_LOCK(&pml_comm->c_wild_lock);
lam_list_append(&pml_comm->c_wild_receives, (lam_list_item_t*)request);
THREAD_UNLOCK(&pml_comm->c_wild_lock);
THREAD_UNLOCK(&pml_comm->c_matching_lock);
}
@ -137,8 +141,8 @@ static mca_ptl_base_recv_frag_t* mca_ptl_base_recv_request_match_specific_proc(
continue;
}
lam_list_remove_item(unexpected_frags, (lam_list_item_t*)frag);
request->req_sequence = header->hdr_msg_seq;
request->super.req_tag = tag = header->hdr_tag;
request->req_bytes_msg = header->hdr_msg_length;
request->super.req_tag = header->hdr_tag;
request->super.req_peer = header->hdr_src;
frag->frag_request = request;
return frag;

Просмотреть файл

@ -17,9 +17,10 @@ struct mca_ptl_base_recv_frag_t;
* Base type for receive requests.
*/
struct mca_ptl_base_recv_request_t {
mca_pml_base_request_t super; /**< base request */
mca_ptl_base_sequence_t req_sequence; /**< request sequence number */
size_t req_bytes_recvd; /**< number of bytes delivered to user */
mca_pml_base_request_t super; /**< base request */
size_t req_bytes_msg; /**< size of message being received */
size_t req_bytes_received; /**< number of bytes received from network */
size_t req_bytes_delivered; /**< number of bytes delivered to user */
};
typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t;
@ -46,8 +47,10 @@ static inline void mca_ptl_base_recv_request_init(
lam_communicator_t* comm,
bool persistent)
{
request->req_sequence = 0;
request->req_bytes_recvd = 0;
request->req_bytes_msg = 0;
request->req_bytes_received = 0;
request->req_bytes_delivered = 0;
request->super.req_sequence = 0;
request->super.req_addr = addr;
request->super.req_count = count;
request->super.req_datatype = datatype;
@ -55,7 +58,6 @@ static inline void mca_ptl_base_recv_request_init(
request->super.req_tag = tag;
request->super.req_comm = comm;
request->super.req_proc = NULL;
request->super.req_type = MCA_PML_REQUEST_RECV;
request->super.req_persistent = persistent;
request->super.req_mpi_done = false;
request->super.req_pml_done = false;

Просмотреть файл

@ -17,8 +17,11 @@ lam_class_t mca_ptl_base_send_request_t_class = {
};
static void mca_ptl_base_send_request_construct(mca_ptl_base_send_request_t* req)
static void mca_ptl_base_send_request_construct(mca_ptl_base_send_request_t* request)
{
/* no need to reinit for every send -- never changes */
request->super.req_type = MCA_PML_REQUEST_SEND;
OBJ_CONSTRUCT(&request->req_convertor, lam_convertor_t);
}
static void mca_ptl_base_send_request_destruct(mca_ptl_base_send_request_t* req)

Просмотреть файл

@ -25,14 +25,13 @@ struct mca_ptl_base_send_request_t {
mca_pml_base_request_t super; /** base request type - common data structure for use by wait/test */
size_t req_offset; /**< number of bytes that have already been assigned to a fragment */
size_t req_frags; /**< number of fragments that have been allocated */
size_t req_bytes_msg; /**< packed size of a message given the datatype and count */
size_t req_bytes_sent; /**< number of bytes that have been sent */
mca_pml_base_send_mode_t req_send_mode; /**< type of send */
mca_ptl_base_sequence_t req_msg_seq; /**< sequence number for MPI pt-2-pt ordering */
struct mca_ptl_t* req_owner; /**< PTL that allocated this descriptor */
struct mca_ptl_base_peer_t* req_peer; /**< PTL peer instance that will be used for first fragment */
lam_ptr_t req_peer_request; /**< matched receive at peer */
lam_convertor_t req_convertor; /**< convertor that describes this datatype */
size_t req_packed_size; /**< packed size of a message given the datatype and count */
};
typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
@ -67,7 +66,7 @@ static inline void mca_ptl_base_send_request_init(
request->req_bytes_sent = 0;
request->req_send_mode = mode;
request->req_peer_request.lval = 0;
request->req_msg_seq = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer);
request->super.req_sequence = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer);
request->super.req_addr = addr;
request->super.req_count = count;
request->super.req_datatype = datatype;
@ -75,7 +74,6 @@ static inline void mca_ptl_base_send_request_init(
request->super.req_tag = tag;
request->super.req_comm = comm;
request->super.req_proc = lam_comm_peer_lookup(comm,peer);
request->super.req_type = MCA_PML_REQUEST_SEND;
request->super.req_persistent = persistent;
request->super.req_mpi_done = false;
request->super.req_pml_done = false;
@ -93,10 +91,10 @@ static inline void mca_ptl_base_send_request_init(
request->super.req_addr,
0);
lam_convertor_get_packed_size(&request->req_convertor, &packed_size);
request->req_packed_size = packed_size;
} else {
request->req_packed_size = 0;
}
request->req_bytes_msg = packed_size;
} else {
request->req_bytes_msg = 0;
}
}

Просмотреть файл

@ -349,7 +349,7 @@ typedef void (*mca_ptl_base_recv_fn_t)(
* not made, the matching code will copy the supplied header into the
* recv fragment so that the match can be made when the receive is posted.
*/
typedef int (*mca_ptl_base_match_fn_t)(
typedef bool (*mca_ptl_base_match_fn_t)(
struct mca_ptl_base_recv_frag_t* recv_frag,
struct mca_ptl_base_match_header_t* header
);

Просмотреть файл

@ -97,8 +97,8 @@ int mca_ptl_tcp_finalize(struct mca_ptl_t* ptl)
int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t** request)
{
int rc;
mca_ptl_base_send_request_t* sendreq =
(mca_ptl_base_send_request_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_requests, &rc);
mca_ptl_base_send_request_t* sendreq;
sendreq = (mca_ptl_base_send_request_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_requests, &rc);
if(NULL != sendreq)
sendreq->req_owner = ptl;
*request = sendreq;
@ -108,32 +108,35 @@ int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_re
void mca_ptl_tcp_request_return(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t* request)
{
/* OBJ_DESTRUCT(&request->req_convertor); */
lam_free_list_return(&mca_ptl_tcp_module.tcp_send_requests, (lam_list_item_t*)request);
}
void mca_ptl_tcp_recv_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_recv_frag_t* frag)
{
/* FIX - need to cleanup convertor */
if(frag->super.frag_is_buffered)
free(frag->super.super.frag_addr);
/* OBJ_DESTRUCT(&frag->super.super.frag_convertor); */
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
}
void mca_ptl_tcp_send_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_send_frag_t* frag)
{
/* OBJ_DESTRUCT(&frag->super.super.frag_convertor); */
if(lam_list_get_size(&mca_ptl_tcp_module.tcp_pending_acks)) {
mca_ptl_tcp_recv_frag_t* pending;
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);
pending = (mca_ptl_tcp_recv_frag_t*)lam_list_remove_first(&mca_ptl_tcp_module.tcp_pending_acks);
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);
if(NULL == pending) {
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
lam_free_list_return(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag);
return;
}
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
mca_ptl_tcp_send_frag_init_ack(frag, ptl, pending->super.super.frag_peer, pending);
mca_ptl_tcp_peer_send(pending->super.super.frag_peer, frag);
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
mca_ptl_tcp_recv_frag_return(ptl, pending);
} else {
lam_free_list_return(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag);

Просмотреть файл

@ -334,7 +334,7 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls,
*num_ptls = 0;
*allow_multi_user_threads = true;
*have_hidden_threads = false;
*have_hidden_threads = true;
if((rc = lam_event_init()) != LAM_SUCCESS) {
lam_output(0, "mca_ptl_tcp_module_init: unable to initialize event dispatch thread: %d\n", rc);

Просмотреть файл

@ -25,6 +25,10 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user);
static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user);
#define PROGRESS_THREAD_LOCK THREAD_LOCK
#define PROGRESS_THREAD_UNLOCK THREAD_UNLOCK
lam_class_t mca_ptl_tcp_peer_t_class = {
"mca_tcp_ptl_peer_t",
OBJ_CLASS(lam_list_item_t),
@ -49,7 +53,8 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer)
ptl_peer->peer_state = MCA_PTL_TCP_CLOSED;
ptl_peer->peer_retries = 0;
OBJ_CONSTRUCT(&ptl_peer->peer_frags, lam_list_t);
OBJ_CONSTRUCT(&ptl_peer->peer_lock, lam_mutex_t);
OBJ_CONSTRUCT(&ptl_peer->peer_send_lock, lam_mutex_t);
OBJ_CONSTRUCT(&ptl_peer->peer_recv_lock, lam_mutex_t);
}
@ -88,7 +93,7 @@ static void mca_ptl_tcp_peer_destruct(mca_ptl_base_peer_t* ptl_peer)
int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t* frag)
{
int rc = LAM_SUCCESS;
THREAD_LOCK(&ptl_peer->peer_lock);
THREAD_LOCK(&ptl_peer->peer_send_lock);
switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECTING:
case MCA_PTL_TCP_CONNECT_ACK:
@ -113,7 +118,7 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
}
break;
}
THREAD_UNLOCK(&ptl_peer->peer_lock);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
return rc;
}
@ -174,7 +179,8 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
{
mca_ptl_tcp_addr_t* ptl_addr;
mca_ptl_tcp_proc_t* this_proc = mca_ptl_tcp_proc_local();
THREAD_LOCK(&ptl_peer->peer_lock);
PROGRESS_THREAD_LOCK(&ptl_peer->peer_recv_lock);
THREAD_LOCK(&ptl_peer->peer_send_lock);
if((ptl_addr = ptl_peer->peer_addr) != NULL &&
ptl_addr->addr_inet.s_addr == addr->sin_addr.s_addr) {
mca_ptl_tcp_proc_t *peer_proc = ptl_peer->peer_proc;
@ -185,16 +191,20 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
ptl_peer->peer_sd = sd;
if(mca_ptl_tcp_peer_send_connect_ack(ptl_peer) != LAM_SUCCESS) {
mca_ptl_tcp_peer_close_i(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return false;
}
mca_ptl_tcp_peer_event_init(ptl_peer, sd);
lam_event_add(&ptl_peer->peer_recv_event, 0);
mca_ptl_tcp_peer_connected(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_lock);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return true;
}
}
THREAD_UNLOCK(&ptl_peer->peer_lock);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return false;
}
@ -206,9 +216,11 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
{
THREAD_LOCK(&ptl_peer->peer_lock);
THREAD_LOCK(&ptl_peer->peer_recv_lock);
THREAD_LOCK(&ptl_peer->peer_send_lock);
mca_ptl_tcp_peer_close_i(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_lock);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
}
/*
@ -427,7 +439,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
{
mca_ptl_base_peer_t* ptl_peer = user;
THREAD_LOCK(&ptl_peer->peer_lock);
PROGRESS_THREAD_LOCK(&ptl_peer->peer_recv_lock);
switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECT_ACK:
{
@ -440,8 +452,8 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
if(NULL == recv_frag) {
int rc;
recv_frag = mca_ptl_tcp_recv_frag_alloc(&rc);
if(recv_frag == 0) {
THREAD_UNLOCK(&ptl_peer->peer_lock);
if(NULL == recv_frag) {
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return;
}
mca_ptl_tcp_recv_frag_init(recv_frag, ptl_peer);
@ -461,7 +473,7 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
break;
}
}
THREAD_UNLOCK(&ptl_peer->peer_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
}
@ -473,7 +485,7 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user)
{
mca_ptl_tcp_peer_t* ptl_peer = user;
THREAD_LOCK(&ptl_peer->peer_lock);
THREAD_LOCK(&ptl_peer->peer_send_lock);
switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECTING:
mca_ptl_tcp_peer_complete_connect(ptl_peer);
@ -506,7 +518,7 @@ static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user)
lam_event_del(&ptl_peer->peer_send_event);
break;
}
THREAD_UNLOCK(&ptl_peer->peer_lock);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
}

Просмотреть файл

@ -45,7 +45,8 @@ struct mca_ptl_base_peer_t {
mca_ptl_tcp_state_t peer_state; /**< current state of the connection */
size_t peer_retries; /**< number of connection retries attempted */
lam_list_t peer_frags; /**< list of pending frags to send */
lam_mutex_t peer_lock; /**< lock for concurrent access to peer state */
lam_mutex_t peer_send_lock; /**< lock for concurrent access to peer state */
lam_mutex_t peer_recv_lock; /**< lock for concurrent access to peer state */
lam_event_t peer_send_event; /**< event for async processing of send frags */
lam_event_t peer_recv_event; /**< event for async processing of recv frags */
};

Просмотреть файл

@ -55,6 +55,7 @@ void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer
frag->frag_hdr_cnt = 0;
frag->frag_msg_cnt = 0;
frag->frag_ack_pending = false;
frag->frag_progressed = 0;
}
@ -67,14 +68,19 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
switch(frag->frag_header.hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
assert(frag->frag_header.hdr_common.hdr_size == sizeof(mca_ptl_base_match_header_t));
return mca_ptl_tcp_recv_frag_match(frag, sd);
case MCA_PTL_HDR_TYPE_FRAG:
assert(frag->frag_header.hdr_common.hdr_size == sizeof(mca_ptl_base_frag_header_t));
return mca_ptl_tcp_recv_frag_frag(frag, sd);
case MCA_PTL_HDR_TYPE_ACK:
case MCA_PTL_HDR_TYPE_NACK:
assert(frag->frag_header.hdr_common.hdr_size == sizeof(mca_ptl_base_ack_header_t));
return mca_ptl_tcp_recv_frag_ack(frag, sd);
default:
return true;
lam_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X",
*(unsigned long*)&frag->frag_header);
return false;
}
}
@ -135,13 +141,13 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_match_header_t)) == false)
return false;
if(frag->frag_msg_cnt == 0) {
/* first pass through - attempt a match */
if(NULL == frag->super.frag_request && 0 == frag->frag_msg_cnt) {
/* attempt to match a posted recv */
mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header.hdr_match);
/* match was not made - so allocate buffer for eager send */
if (NULL == frag->super.frag_request) {
if(mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header.hdr_match)) {
mca_ptl_tcp_recv_frag_matched(frag);
} else {
/* match was not made - so allocate buffer for eager send */
if(frag->frag_header.hdr_frag.hdr_frag_length > 0) {
frag->super.super.frag_addr = malloc(frag->frag_header.hdr_frag.hdr_frag_length);
frag->super.super.frag_size = frag->frag_header.hdr_frag.hdr_frag_length;
@ -150,17 +156,21 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
}
}
/* receive fragment data */
if(frag->frag_msg_cnt < frag->super.super.frag_size) {
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false) {
return false;
}
}
/* discard any data that exceeds the posted receive */
if(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) {
return false;
}
if(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
return false;
/* if fragment has already been matched - go ahead and process */
if (frag->super.frag_request != NULL)
if (NULL != frag->super.frag_request)
mca_ptl_tcp_recv_frag_progress(frag);
return true;
}
@ -168,14 +178,14 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_match_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_match_header_t)) == false)
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_frag_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_frag_header_t)) == false)
return false;
/* get request from header */
if(frag->frag_msg_cnt == 0) {
frag->super.frag_request = frag->frag_header.hdr_frag.hdr_dst_ptr.pval;
mca_ptl_base_recv_frag_init(&frag->super);
mca_ptl_tcp_recv_frag_matched(frag);
}
/* continue to receive user data */
@ -194,7 +204,6 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
}
/*
* Continue with non-blocking recv() calls until the entire
* fragment is received.

Просмотреть файл

@ -12,6 +12,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "os/atomic.h"
#include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_tcp.h"
@ -26,8 +27,9 @@ extern lam_class_t mca_ptl_tcp_recv_frag_t_class;
struct mca_ptl_tcp_recv_frag_t {
mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */
size_t frag_hdr_cnt; /**< number of header bytes received */
size_t frag_msg_cnt; /**< number of msg bytes received */
size_t frag_msg_cnt; /**< number of message bytes received */
bool frag_ack_pending; /**< is an ack pending for this fragment */
volatile int frag_progressed; /**< flag used to atomically progress fragment */
};
typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t;
@ -42,18 +44,85 @@ void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_ba
bool mca_ptl_tcp_recv_frag_send_ack(mca_ptl_tcp_recv_frag_t* frag);
static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag)
{
mca_ptl_base_recv_request_t* request = frag->super.frag_request;
mca_ptl_base_frag_header_t* header = &frag->super.super.frag_header.hdr_frag;
/* if there is data associated with the fragment -- setup to receive */
if(header->hdr_frag_length > 0) {
/* initialize receive convertor */
struct iovec iov;
lam_proc_t *proc =
lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
lam_convertor_copy(proc->proc_convertor, &frag->super.super.frag_convertor);
lam_convertor_init_for_recv(
&frag->super.super.frag_convertor, /* convertor */
0, /* flags */
request->super.req_datatype, /* datatype */
request->super.req_count, /* count elements */
request->super.req_addr, /* users buffer */
header->hdr_frag_offset); /* offset in bytes into packed buffer */
/*
* determine offset into users buffer (for contigous data) -
* or allocate a buffer for the receive if required.
*/
iov.iov_base = NULL;
iov.iov_len = header->hdr_frag_length;
lam_convertor_unpack(&frag->super.super.frag_convertor, &iov, 1);
/* non-contiguous - allocate buffer for receive */
if(NULL == iov.iov_base) {
frag->super.super.frag_addr = malloc(iov.iov_len);
frag->super.super.frag_size = header->hdr_frag_length;
frag->super.frag_is_buffered = true;
/* we now have correct offset into users buffer */
} else {
frag->super.super.frag_addr = iov.iov_base;
frag->super.super.frag_size = iov.iov_len;
}
}
}
static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag)
{
if(frag->frag_msg_cnt >= frag->super.super.frag_header.hdr_frag.hdr_frag_length) {
if(frag->super.frag_is_buffered) {
struct iovec iov;
iov.iov_base = frag->super.super.frag_addr;
iov.iov_len = frag->super.super.frag_size;
lam_convertor_unpack(&frag->super.super.frag_convertor, &iov, 1);
}
frag->super.super.frag_owner->ptl_recv_progress(frag->super.frag_request, &frag->super);
if(frag->frag_ack_pending == false) {
mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag);
if(fetchNset(&frag->frag_progressed, 1) == 0) {
mca_ptl_base_recv_request_t* request = frag->super.frag_request;
if(frag->super.frag_is_buffered) {
mca_ptl_base_match_header_t* header = &frag->super.super.frag_header.hdr_match;
/*
* Initialize convertor and use it to unpack data
*/
struct iovec iov;
lam_proc_t *proc =
lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
lam_convertor_copy(proc->proc_convertor, &frag->super.super.frag_convertor);
lam_convertor_init_for_recv(
&frag->super.super.frag_convertor, /* convertor */
0, /* flags */
request->super.req_datatype, /* datatype */
request->super.req_count, /* count elements */
request->super.req_addr, /* users buffer */
header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */
iov.iov_base = frag->super.super.frag_addr;
iov.iov_len = frag->super.super.frag_size;
lam_convertor_unpack(&frag->super.super.frag_convertor, &iov, 1);
}
/* progress the request */
frag->super.super.frag_owner->ptl_recv_progress(request, &frag->super);
if(frag->frag_ack_pending == false) {
mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag);
}
}
}
}

Просмотреть файл

@ -67,14 +67,15 @@ int mca_ptl_tcp_send_frag_init(
hdr->hdr_match.hdr_src = sendreq->super.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->super.req_peer;
hdr->hdr_match.hdr_tag = sendreq->super.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_packed_size;
hdr->hdr_match.hdr_msg_seq = sendreq->req_msg_seq;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_msg;
hdr->hdr_match.hdr_msg_seq = sendreq->super.req_sequence;
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t);
hdr->hdr_frag.hdr_frag_offset = sendreq->req_offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_request;
}

Просмотреть файл

@ -91,7 +91,7 @@ static inline void mca_ptl_tcp_send_frag_init_ack(
ack->super.super.frag_size = 0;
ack->frag_vec_ptr = ack->frag_vec;
ack->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr;
ack->frag_vec[0].iov_len = sizeof(hdr->hdr_ack);
ack->frag_vec[0].iov_len = sizeof(mca_ptl_base_ack_header_t);
ack->frag_vec_cnt = 1;
}