- factor common elements into fragment base class
- tcp cleanup This commit was SVN r642.
Этот коммит содержится в:
родитель
103cb62480
Коммит
3836a7bf33
@ -17,6 +17,9 @@ struct mca_ptl_base_frag_t {
|
||||
lam_list_item_t super;
|
||||
mca_ptl_base_header_t frag_header;
|
||||
struct mca_ptl_t* frag_owner; /**< PTL that allocated this fragment */
|
||||
struct mca_ptl_peer_t* frag_peer; /**< PTL specific addressing info */
|
||||
void *frag_addr; /* pointer into request buffer at fragment offset */
|
||||
size_t frag_size; /* number of bytes available in request buffer */
|
||||
};
|
||||
typedef struct mca_ptl_base_frag_t mca_ptl_base_frag_t;
|
||||
|
||||
|
@ -22,7 +22,7 @@
|
||||
* Specialized matching routines for internal use only.
|
||||
*/
|
||||
|
||||
static mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match(
|
||||
static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match(
|
||||
mca_ptl_base_header_t *frag_header,
|
||||
mca_pml_comm_t *ptl_comm);
|
||||
|
||||
@ -121,7 +121,7 @@ int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
|
||||
(pml_comm->c_next_msg_seq[frag_src])++;
|
||||
|
||||
/* see if receive has already been posted */
|
||||
matched_receive = mca_ptl_base_check_recieves_for_match(frag_header,
|
||||
matched_receive = mca_ptl_base_check_receives_for_match(frag_header,
|
||||
pml_comm);
|
||||
|
||||
/* if match found, process data */
|
||||
@ -131,7 +131,7 @@ int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
|
||||
*match_made=true;
|
||||
/* associate the receive descriptor with the fragment
|
||||
* descriptor */
|
||||
frag_desc->frag_match=matched_receive;
|
||||
frag_desc->frag_request=matched_receive;
|
||||
|
||||
/*
|
||||
* update deliverd sequence number information,
|
||||
@ -203,7 +203,7 @@ int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
static mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match
|
||||
static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match
|
||||
(mca_ptl_base_header_t *frag_header, mca_pml_comm_t *pml_comm)
|
||||
{
|
||||
/* local parameters */
|
||||
@ -215,7 +215,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match
|
||||
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
* look only at "specific" recieves, or "wild" receives,
|
||||
* look only at "specific" receives, or "wild" receives,
|
||||
* or if we need to traverse both sets at the same time.
|
||||
*/
|
||||
frag_src = frag_header->hdr_frag_seq;
|
||||
@ -329,7 +329,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match(
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
static mca_ptl_base_recv_request_t *mca_ptl_base_check_c_specific_receives_for_match(
|
||||
static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_match(
|
||||
mca_ptl_base_header_t *frag_header,
|
||||
mca_pml_comm_t *pml_comm)
|
||||
{
|
||||
@ -454,7 +454,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
|
||||
if (wild_recv == (mca_ptl_base_recv_request_t *)
|
||||
lam_list_get_end(&(pml_comm->c_wild_receives)) )
|
||||
{
|
||||
return_match = mca_ptl_base_check_c_specific_receives_for_match(frag_header,
|
||||
return_match = mca_ptl_base_check_specific_receives_for_match(frag_header,
|
||||
pml_comm);
|
||||
|
||||
return return_match;
|
||||
@ -514,8 +514,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
|
||||
}
|
||||
}
|
||||
|
||||
#if (0)
|
||||
/* need to handle this -- lam_check_cantmatch_for_match();
|
||||
/* need to handle this -- mca_ptl_base_check_cantmatch_for_match();
|
||||
*/
|
||||
/**
|
||||
* Scan the list of frags that came in ahead of time to see if any
|
||||
@ -537,9 +536,9 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
|
||||
{
|
||||
/* local parameters */
|
||||
int match_found;
|
||||
mca_pml_base_sequence_t next_msg_seq_expected, frag_seqber;
|
||||
mca_pml_base_recv_frag_t *frag_desc;
|
||||
mca_pml_base_recv_request_t *matched_receive;
|
||||
mca_ptl_base_sequence_t next_msg_seq_expected, frag_seqber;
|
||||
mca_ptl_base_recv_frag_t *frag_desc;
|
||||
mca_ptl_base_recv_request_t *matched_receive;
|
||||
|
||||
/*
|
||||
* Initialize list size - assume that most of the time this search
|
||||
@ -565,12 +564,12 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
|
||||
/* search the list for a fragment from the send with sequence
|
||||
* number next_msg_seq_expected
|
||||
*/
|
||||
for(frag_desc = (mca_pml_base_recv_frag_t *)
|
||||
for(frag_desc = (mca_ptl_base_recv_frag_t *)
|
||||
lam_list_get_first((pml_comm->c_frags_cant_match)+frag_src);
|
||||
frag_desc != (mca_pml_base_recv_frag_t *)
|
||||
frag_desc != (mca_ptl_base_recv_frag_t *)
|
||||
lam_list_get_end((pml_comm->c_frags_cant_match)+frag_src);
|
||||
frag_desc = (mca_pml_base_recv_frag_t *)
|
||||
((lam_list_item_t *)c_frags_cant_match)->lam_list_next)
|
||||
frag_desc = (mca_ptl_base_recv_frag_t *)
|
||||
lam_list_get_next(frag_desc))
|
||||
{
|
||||
/*
|
||||
* If the message has the next expected seq from that proc...
|
||||
@ -603,15 +602,15 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
|
||||
/*
|
||||
* check to see if this frag matches a posted message
|
||||
*/
|
||||
matched_receive = mca_ptl_base_check_recieves_for_match(
|
||||
frag_desc, pml_comm);
|
||||
matched_receive = mca_ptl_base_check_receives_for_match(
|
||||
&frag_desc->super.frag_header, pml_comm);
|
||||
|
||||
/* if match found, process data */
|
||||
if (matched_receive) {
|
||||
|
||||
/* associate the receive descriptor with the fragment
|
||||
* descriptor */
|
||||
frag_desc->frag_match=matched_receive;
|
||||
frag_desc->frag_request=matched_receive;
|
||||
|
||||
/* add this fragment descriptor to the list of
|
||||
* descriptors to be processed later
|
||||
@ -643,4 +642,4 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
|
||||
|
||||
return;
|
||||
}
|
||||
#endif /* if (0) */
|
||||
|
||||
|
@ -3,7 +3,10 @@
|
||||
*/
|
||||
/*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/
|
||||
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
#include "mca/mpi/ptl/base/ptl_base_recvfrag.h"
|
||||
#include "mca/mpi/ptl/base/ptl_base_match.h"
|
||||
|
||||
|
||||
lam_class_info_t mca_ptl_base_recv_frag_cls = {
|
||||
"mca_ptl_base_recv_frag_t",
|
||||
@ -23,3 +26,51 @@ void mca_ptl_base_recv_frag_destroy(mca_ptl_base_recv_frag_t* frag)
|
||||
SUPER_DESTROY(frag, &mca_ptl_base_frag_cls);
|
||||
}
|
||||
|
||||
int mca_ptl_base_recv_frag_match(mca_ptl_base_recv_frag_t* frag, mca_ptl_base_header_t* header)
|
||||
{
|
||||
lam_list_t matched_frags;
|
||||
bool matched;
|
||||
int rc = mca_ptl_base_match(header, frag, &matched, &matched_frags);
|
||||
if(rc != LAM_SUCCESS)
|
||||
return rc;
|
||||
|
||||
if(matched) {
|
||||
do {
|
||||
mca_ptl_base_recv_request_t* request = frag->frag_request;
|
||||
mca_ptl_t* ptl = frag->super.frag_owner;
|
||||
|
||||
/* determine the offset and size of posted buffer */
|
||||
if (request->super.req_length < frag->super.frag_header.hdr_msg_offset) {
|
||||
|
||||
/* user buffer is to small - discard entire fragment */
|
||||
frag->super.frag_addr = 0;
|
||||
frag->super.frag_size = 0;
|
||||
|
||||
} else if (request->super.req_length < frag->super.frag_header.hdr_msg_offset +
|
||||
frag->super.frag_header.hdr_frag_length) {
|
||||
|
||||
/* user buffer is to small - discard part of fragment */
|
||||
frag->super.frag_addr = ((unsigned char*)request->super.req_addr +
|
||||
frag->super.frag_header.hdr_msg_offset);
|
||||
frag->super.frag_size = request->super.req_length - frag->super.frag_header.hdr_msg_offset;
|
||||
|
||||
} else {
|
||||
|
||||
/* user buffer is large enough for this fragment */
|
||||
frag->super.frag_addr = ((unsigned char*)request->super.req_addr +
|
||||
frag->super.frag_header.hdr_msg_offset);
|
||||
frag->super.frag_size = frag->super.frag_header.hdr_frag_length;
|
||||
|
||||
}
|
||||
|
||||
/* send cts acknowledgment back to peer */
|
||||
ptl->ptl_cts(ptl, frag);
|
||||
|
||||
/* process any fragments that arrived out of order */
|
||||
frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags);
|
||||
} while(NULL != frag);
|
||||
}
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
@ -13,15 +13,17 @@
|
||||
extern lam_class_info_t mca_ptl_base_recv_frag_cls;
|
||||
|
||||
|
||||
typedef struct {
|
||||
struct mca_ptl_base_recv_frag_t {
|
||||
mca_ptl_base_frag_t super;
|
||||
/* matched receve request corresponding to this fragment */
|
||||
mca_ptl_base_recv_request_t *frag_match;
|
||||
} mca_ptl_base_recv_frag_t;
|
||||
mca_ptl_base_recv_request_t *frag_request; /* matched posted receive */
|
||||
struct mca_ptl_peer_t* frag_peer; /* peer received from */
|
||||
};
|
||||
typedef struct mca_ptl_base_recv_frag_t mca_ptl_base_recv_frag_t;
|
||||
|
||||
|
||||
void mca_ptl_base_recv_frag_init(mca_ptl_base_recv_frag_t*);
|
||||
void mca_ptl_base_recv_frag_destroy(mca_ptl_base_recv_frag_t*);
|
||||
int mca_ptl_base_recv_frag_match(mca_ptl_base_recv_frag_t*, mca_ptl_base_header_t*);
|
||||
|
||||
|
||||
#endif
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
#include "mca/mpi/ptl/base/ptl_base_comm.h"
|
||||
#include "mca/mpi/ptl/base/ptl_base_recvreq.h"
|
||||
#include "mca/mpi/ptl/base/ptl_base_recvfrag.h"
|
||||
|
||||
|
||||
lam_class_info_t mca_ptl_base_recv_request_cls = {
|
||||
@ -48,4 +49,8 @@ int mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* req)
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
void mca_ptl_base_recv_request_progress(mca_ptl_base_recv_request_t* req, mca_ptl_base_recv_frag_t* frag)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "mca/mpi/pml/base/pml_base_request.h"
|
||||
|
||||
extern lam_class_info_t mca_ptl_base_recv_request_cls;;
|
||||
struct mca_ptl_base_recv_frag_t;
|
||||
|
||||
|
||||
typedef struct {
|
||||
@ -19,8 +20,9 @@ typedef struct {
|
||||
|
||||
void mca_ptl_base_recv_request_init(mca_ptl_base_recv_request_t*);
|
||||
void mca_ptl_base_recv_request_destroy(mca_ptl_base_recv_request_t*);
|
||||
int mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t*);
|
||||
int mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t*);
|
||||
int mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t*);
|
||||
int mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t*);
|
||||
void mca_ptl_base_recv_request_progress(mca_ptl_base_recv_request_t*, struct mca_ptl_base_recv_frag_t*);
|
||||
|
||||
|
||||
static inline void mca_ptl_base_recv_request_reinit(
|
||||
|
@ -13,8 +13,6 @@ extern lam_class_info_t mca_ptl_base_send_frag_cls;
|
||||
struct mca_ptl_base_send_frag_t {
|
||||
mca_ptl_base_frag_t super;
|
||||
struct mca_ptl_base_send_request_t *frag_request;
|
||||
unsigned char* frag_data;
|
||||
size_t frag_size;
|
||||
};
|
||||
typedef struct mca_ptl_base_send_frag_t mca_ptl_base_send_frag_t;
|
||||
|
||||
|
@ -23,6 +23,7 @@ struct mca_ptl_t;
|
||||
struct mca_ptl_peer_t;
|
||||
struct mca_ptl_base_fragment_t;
|
||||
struct mca_ptl_base_send_request_t;
|
||||
struct mca_ptl_base_recv_frag_t;
|
||||
|
||||
typedef uint64_t mca_ptl_base_sequence_t;
|
||||
typedef uint64_t mca_ptl_base_tstamp_t;
|
||||
@ -126,6 +127,11 @@ typedef void (*mca_ptl_base_request_return_fn_t)(
|
||||
struct mca_ptl_base_send_request_t* request
|
||||
);
|
||||
|
||||
typedef void (*mca_ptl_base_frag_return_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_base_recv_frag_t* frag
|
||||
);
|
||||
|
||||
typedef int (*mca_ptl_base_send_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_peer_t* ptl_peer,
|
||||
@ -134,6 +140,11 @@ typedef int (*mca_ptl_base_send_fn_t)(
|
||||
bool* complete
|
||||
);
|
||||
|
||||
typedef int (*mca_ptl_base_cts_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_base_recv_frag_t* recv_frag
|
||||
);
|
||||
|
||||
/**
|
||||
* PTL instance interface functions and common state.
|
||||
*/
|
||||
@ -154,8 +165,10 @@ struct mca_ptl_t {
|
||||
mca_ptl_base_del_proc_fn_t ptl_del_proc;
|
||||
mca_ptl_base_finalize_fn_t ptl_finalize;
|
||||
mca_ptl_base_send_fn_t ptl_send;
|
||||
mca_ptl_base_cts_fn_t ptl_cts;
|
||||
mca_ptl_base_request_alloc_fn_t ptl_request_alloc;
|
||||
mca_ptl_base_request_return_fn_t ptl_request_return;
|
||||
mca_ptl_base_frag_return_fn_t ptl_frag_return;
|
||||
};
|
||||
typedef struct mca_ptl_t mca_ptl_t;
|
||||
|
||||
|
@ -21,6 +21,7 @@ libmca_ptl_tcp_la_SOURCES = \
|
||||
ptl_tcp_module.c \
|
||||
ptl_tcp_recvfrag.c \
|
||||
ptl_tcp_recvfrag.h \
|
||||
ptl_tcp_send.c \
|
||||
ptl_tcp_sendfrag.c \
|
||||
ptl_tcp_sendfrag.h
|
||||
ptl_tcp_sendfrag.h \
|
||||
ptl_tcp_sendreq.c \
|
||||
ptl_tcp_sendreq.h
|
||||
|
@ -8,11 +8,13 @@
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
#include "mca/mpi/ptl/base/ptl_base_sendreq.h"
|
||||
#include "mca/mpi/ptl/base/ptl_base_sendfrag.h"
|
||||
#include "mca/lam/base/mca_base_module_exchange.h"
|
||||
#include "ptl_tcp.h"
|
||||
#include "ptl_tcp_addr.h"
|
||||
#include "ptl_tcp_peer.h"
|
||||
#include "ptl_tcp_proc.h"
|
||||
#include "ptl_tcp_sendreq.h"
|
||||
|
||||
|
||||
mca_ptl_tcp_t mca_ptl_tcp = {
|
||||
@ -28,8 +30,10 @@ mca_ptl_tcp_t mca_ptl_tcp = {
|
||||
mca_ptl_tcp_del_proc,
|
||||
mca_ptl_tcp_finalize,
|
||||
mca_ptl_tcp_send,
|
||||
mca_ptl_tcp_cts,
|
||||
mca_ptl_tcp_request_alloc,
|
||||
mca_ptl_tcp_request_return
|
||||
mca_ptl_tcp_request_return,
|
||||
mca_ptl_tcp_frag_return
|
||||
}
|
||||
};
|
||||
|
||||
@ -114,8 +118,44 @@ int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_re
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
void mca_ptl_tcp_request_return(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t* request)
|
||||
{
|
||||
lam_free_list_return(&mca_ptl_tcp_module.tcp_send_requests, (lam_list_item_t*)request);
|
||||
}
|
||||
|
||||
|
||||
void mca_ptl_tcp_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_base_recv_frag_t* frag)
|
||||
{
|
||||
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
|
||||
}
|
||||
|
||||
|
||||
int mca_ptl_tcp_send(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_peer_t* ptl_peer,
|
||||
struct mca_ptl_base_send_request_t* sendreq,
|
||||
size_t size,
|
||||
bool* complete)
|
||||
{
|
||||
mca_ptl_tcp_send_frag_t* sendfrag;
|
||||
if (sendreq->req_frags == 0) {
|
||||
sendfrag = &((mca_ptl_tcp_send_request_t*)sendreq)->req_frag;
|
||||
} else {
|
||||
int rc;
|
||||
sendfrag = (mca_ptl_tcp_send_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_frags, &rc);
|
||||
if(sendfrag == 0)
|
||||
return rc;
|
||||
}
|
||||
mca_ptl_tcp_send_frag_reinit(sendfrag, ptl_peer, sendreq, size);
|
||||
return mca_ptl_tcp_peer_send(ptl_peer, sendfrag);
|
||||
}
|
||||
|
||||
|
||||
int mca_ptl_tcp_cts(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_base_recv_frag_t* frag)
|
||||
{
|
||||
return LAM_ERROR;
|
||||
}
|
||||
|
||||
|
@ -102,6 +102,11 @@ extern void mca_ptl_tcp_request_return(
|
||||
struct mca_ptl_base_send_request_t*
|
||||
);
|
||||
|
||||
extern void mca_ptl_tcp_frag_return(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_base_recv_frag_t*
|
||||
);
|
||||
|
||||
extern int mca_ptl_tcp_send(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_peer_t* ptl_peer,
|
||||
@ -110,6 +115,11 @@ extern int mca_ptl_tcp_send(
|
||||
bool* complete
|
||||
);
|
||||
|
||||
extern int mca_ptl_tcp_cts(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_base_recv_frag_t* frag
|
||||
);
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "ptl_tcp_proc.h"
|
||||
#include "ptl_tcp_recvfrag.h"
|
||||
#include "ptl_tcp_sendfrag.h"
|
||||
#include "ptl_tcp_sendreq.h"
|
||||
|
||||
|
||||
mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = {
|
||||
@ -30,7 +31,7 @@ mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = {
|
||||
/* Indicate that we are a pml v1.0.0 module (which also implies a
|
||||
specific MCA version) */
|
||||
|
||||
MCA_PML_BASE_VERSION_1_0_0,
|
||||
MCA_PTL_BASE_VERSION_1_0_0,
|
||||
|
||||
"tcp", /* MCA module name */
|
||||
1, /* MCA module major version */
|
||||
@ -280,8 +281,8 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls,
|
||||
/* initialize free lists */
|
||||
STATIC_INIT(mca_ptl_tcp_module.tcp_send_requests, &lam_free_list_cls);
|
||||
lam_free_list_init_with(&mca_ptl_tcp_module.tcp_send_requests,
|
||||
sizeof(mca_ptl_base_send_request_t) + sizeof(mca_ptl_tcp_send_frag_t),
|
||||
&mca_ptl_base_send_request_cls,
|
||||
sizeof(mca_ptl_tcp_send_request_t),
|
||||
&mca_ptl_tcp_send_request_cls,
|
||||
mca_ptl_tcp_module.tcp_free_list_num,
|
||||
mca_ptl_tcp_module.tcp_free_list_max,
|
||||
mca_ptl_tcp_module.tcp_free_list_inc,
|
||||
|
@ -21,27 +21,29 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd);
|
||||
static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd);
|
||||
|
||||
|
||||
|
||||
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag)
|
||||
{
|
||||
SUPER_INIT(frag, &mca_ptl_base_recv_frag_cls);
|
||||
}
|
||||
|
||||
|
||||
void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t* frag)
|
||||
{
|
||||
SUPER_DESTROY(frag, &mca_ptl_base_recv_frag_cls);
|
||||
}
|
||||
|
||||
|
||||
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_peer_t* peer)
|
||||
{
|
||||
frag->frag_owner = &peer->peer_ptl->super;
|
||||
frag->frag_match = 0;
|
||||
frag->super.frag_request = 0;
|
||||
frag->frag_peer = peer;
|
||||
frag->frag_addr = 0;
|
||||
frag->frag_size = 0;
|
||||
frag->frag_hdr_cnt = 0;
|
||||
frag->frag_msg_cnt = 0;
|
||||
}
|
||||
|
||||
|
||||
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
{
|
||||
@ -57,10 +59,14 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
|
||||
return false;
|
||||
|
||||
/* done - do something */
|
||||
if(NULL != frag->super.frag_request) {
|
||||
/* indicate completion status */
|
||||
mca_ptl_base_recv_request_progress(frag->super.frag_request, &frag->super);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
{
|
||||
/* non-blocking read - continue if interrupted, otherwise wait until data available */
|
||||
@ -94,12 +100,15 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
return false;
|
||||
|
||||
/* attempt to match a posted recv */
|
||||
/* FIX */
|
||||
mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header);
|
||||
|
||||
/* match was not made - so allocate buffer for eager send */
|
||||
if(NULL == frag->frag_match) {
|
||||
if(NULL == frag->super.frag_request) {
|
||||
frag->frag_addr = (unsigned char*)LAM_MALLOC(frag->frag_header.hdr_frag_length);
|
||||
frag->frag_size = frag->frag_header.hdr_frag_length;
|
||||
} else {
|
||||
frag->frag_addr = (unsigned char*)frag->super.super.frag_addr;
|
||||
frag->frag_size = frag->super.super.frag_size;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -18,12 +18,11 @@ extern lam_class_info_t mca_ptl_tcp_recv_frag_cls;
|
||||
|
||||
struct mca_ptl_tcp_recv_frag_t {
|
||||
mca_ptl_base_recv_frag_t super;
|
||||
struct mca_ptl_peer_t* frag_peer;
|
||||
unsigned char* frag_addr;
|
||||
size_t frag_size;
|
||||
size_t frag_hdr_cnt;
|
||||
size_t frag_msg_cnt;
|
||||
#define frag_match super.frag_match
|
||||
#define frag_peer super.super.frag_peer
|
||||
#define frag_owner super.super.frag_owner
|
||||
#define frag_header super.super.frag_header
|
||||
};
|
||||
@ -33,7 +32,8 @@ typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t;
|
||||
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t*);
|
||||
void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t*);
|
||||
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd);
|
||||
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t*, struct mca_ptl_peer_t*);
|
||||
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_peer_t* peer);
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -63,8 +63,8 @@ void mca_ptl_tcp_send_frag_reinit(
|
||||
/* fragment state */
|
||||
sendfrag->frag_owner = &ptl_peer->peer_ptl->super;
|
||||
sendfrag->super.frag_request = sendreq;
|
||||
sendfrag->super.frag_data = sendreq->req_data + hdr->hdr_msg_offset;
|
||||
sendfrag->super.frag_size = size;
|
||||
sendfrag->super.super.frag_addr = sendreq->super.req_addr + hdr->hdr_msg_offset;
|
||||
sendfrag->super.super.frag_size = size;
|
||||
|
||||
sendfrag->frag_peer = ptl_peer;
|
||||
sendfrag->frag_vec_ptr = sendfrag->frag_vec;
|
||||
@ -72,8 +72,8 @@ void mca_ptl_tcp_send_frag_reinit(
|
||||
sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
|
||||
sendfrag->frag_vec_cnt = 1;
|
||||
if(size > 0) {
|
||||
sendfrag->frag_vec[1].iov_base = (lam_iov_base_ptr_t)sendfrag->super.frag_data;
|
||||
sendfrag->frag_vec[1].iov_len = sendfrag->super.frag_size;
|
||||
sendfrag->frag_vec[1].iov_base = (lam_iov_base_ptr_t)sendfrag->super.super.frag_addr;
|
||||
sendfrag->frag_vec[1].iov_len = sendfrag->super.super.frag_size;
|
||||
sendfrag->frag_vec_cnt++;
|
||||
}
|
||||
}
|
||||
|
@ -18,10 +18,10 @@ extern lam_class_info_t mca_ptl_tcp_send_frag_cls;
|
||||
|
||||
struct mca_ptl_tcp_send_frag_t {
|
||||
mca_ptl_base_send_frag_t super;
|
||||
struct mca_ptl_peer_t* frag_peer;
|
||||
struct iovec *frag_vec_ptr;
|
||||
size_t frag_vec_cnt;
|
||||
struct iovec frag_vec[2];
|
||||
#define frag_peer super.super.frag_peer
|
||||
#define frag_header super.super.frag_header
|
||||
#define frag_owner super.super.frag_owner
|
||||
};
|
||||
|
@ -47,15 +47,15 @@ typedef struct lam_communicator_t lam_communicator_t;
|
||||
static inline lam_communicator_t *lam_comm_lookup(uint32_t cid)
|
||||
{
|
||||
/* array of pointers to communicators, indexed by context ID */
|
||||
extern lam_communicator_t **lam_communicator_array;
|
||||
extern lam_communicator_t **lam_mpi_comm_array;
|
||||
#ifdef LAM_ENABLE_DEBUG
|
||||
extern uint32_t lam_communicator_array_len;
|
||||
if(cid >= lam_communicator_array_len) {
|
||||
extern uint32_t lam_mpi_comm_array_size;
|
||||
if(cid >= lam_mpi_comm_array_size) {
|
||||
lam_output(0, "lam_comm_lookup: invalid communicator index (%d)", cid);
|
||||
return (lam_communicator_t *) NULL;
|
||||
}
|
||||
#endif
|
||||
return lam_communicator_array[cid];
|
||||
return lam_mpi_comm_array[cid];
|
||||
}
|
||||
|
||||
static inline lam_proc_t* lam_comm_lookup_peer(lam_communicator_t* comm, size_t peer_id)
|
||||
|
@ -14,5 +14,8 @@
|
||||
* Global variables
|
||||
*/
|
||||
|
||||
lam_communicator_t *lam_mpi_comm_array;
|
||||
size_t lam_mpi_comm_array_size;
|
||||
|
||||
lam_communicator_t lam_mpi_comm_world;
|
||||
lam_communicator_t lam_mpi_comm_self;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user