1
1
- when recv is posted check for matching frags

This commit was SVN r651.
Этот коммит содержится в:
Tim Woodall 2004-02-05 00:50:37 +00:00
родитель 11fe4769aa
Коммит 9d0b36c34d
23 изменённых файлов: 378 добавлений и 249 удалений

Просмотреть файл

@ -7,3 +7,6 @@ config.status
configure
libtool
doxygen
bin
lib
include

Просмотреть файл

@ -12,7 +12,7 @@ extern lam_class_info_t mca_ptl_array_cls;
struct mca_ptl_proc_t {
double ptl_weight; /* PTL weight for scheduling */
struct mca_ptl_peer_t* ptl_peer; /* PTL addressing info */
struct mca_ptl_base_peer_t* ptl_peer; /* PTL addressing info */
mca_ptl_t *ptl; /* PTL implementation */
};
typedef struct mca_ptl_proc_t mca_ptl_proc_t;

Просмотреть файл

@ -126,7 +126,7 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
* addressing information that will be cached on the proc, if it
* cannot reach the proc - but another peer
*/
struct mca_ptl_peer_t* ptl_peer;
struct mca_ptl_base_peer_t* ptl_peer;
int rc = ptl->ptl_add_proc(ptl, proc, &ptl_peer);
if(rc == LAM_SUCCESS) {

Просмотреть файл

@ -17,7 +17,7 @@ struct mca_ptl_base_frag_t {
lam_list_item_t super;
mca_ptl_base_header_t frag_header;
struct mca_ptl_t* frag_owner; /**< PTL that allocated this fragment */
struct mca_ptl_peer_t* frag_peer; /**< PTL specific addressing info */
struct mca_ptl_base_peer_t* frag_peer; /**< PTL specific addressing info */
void *frag_addr; /* pointer into request buffer at fragment offset */
size_t frag_size; /* number of bytes available in request buffer */
};

Просмотреть файл

@ -2,37 +2,108 @@
* $HEADER$
*/
#ifndef MCA_PML_BASE_HEADER_H
#define MCA_PML_BASE_HEADER_H
#ifndef MCA_PTL_BASE_HEADER_H
#define MCA_PTL_BASE_HEADER_H
#include "mca/mpi/ptl/ptl.h"
/* define a common set of parameters included in all
* point-to-point headers
#define MCA_PTL_HDR_TYPE_MATCH 0
#define MCA_PTL_HDR_TYPE_FRAG 1
#define MCA_PTL_HDR_TYPE_ACK 2
#define MCA_PTL_HDR_TYPE_NACK 3
#define MCA_PTL_FLAGS_ACK_IMMEDIATE 1
#define MCA_PTL_FLAGS_ACK_AGGREGATE 2
/* Defines the common header attributes - must be first element in each header type */
struct mca_ptl_base_common_header_t {
/* type of envelope */
uint8_t hdr_type;
/* flags indicating how fragment should be processed */
uint8_t hdr_flags;
/* size of header - allow for variable length */
uint16_t hdr_size;
};
typedef struct mca_ptl_base_common_header_t mca_ptl_base_common_header_t;
/*
* Common header definition for all fragments.
*/
typedef struct {
/* communicator index */
uint32_t hdr_contextid;
/* source rank */
int32_t hdr_src_rank;
/* destination rank */
int32_t hdr_dst_rank;
/* user tag */
int32_t hdr_user_tag;
/* type of message - send/bsend/ssend/rsend/recv */
int32_t hdr_msg_type;
/* message length */
uint32_t hdr_msg_length;
/* offset into message */
uint32_t hdr_msg_offset;
/* fragment length */
uint32_t hdr_frag_length;
/* message sequence number */
mca_ptl_base_sequence_t hdr_msg_seq;
/* fragment sequence number */
mca_ptl_base_sequence_t hdr_frag_seq;
} mca_ptl_base_header_t;
struct mca_ptl_base_frag_header_t {
/* common header */
mca_ptl_base_common_header_t hdr_common;
/* fragment length */
uint32_t hdr_frag_length;
/* offset into message */
uint32_t hdr_frag_offset;
/* fragment sequence number */
mca_ptl_base_sequence_t hdr_frag_seq;
/* pointer to source fragment */
lam_ptr_t hdr_src_ptr;
/* pointer to matched receive */
lam_ptr_t hdr_dst_ptr;
};
typedef struct mca_ptl_base_frag_header_t mca_ptl_base_frag_header_t;
#endif /* MCA_PML_BASE_HEADER_H */
/*
* Header definition for the first fragment, contains the additional
* attributes required to match the corresponding posted receive.
*/
struct mca_ptl_base_match_header_t {
/* fragment info */
mca_ptl_base_frag_header_t hdr_frag;
/* communicator index */
uint32_t hdr_contextid;
/* source rank */
int32_t hdr_src_rank;
/* destination rank */
int32_t hdr_dst_rank;
/* user tag */
int32_t hdr_user_tag;
/* message length */
uint32_t hdr_msg_length;
/* message sequence number */
mca_ptl_base_sequence_t hdr_msg_seq;
};
typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t;
/*
* Header used to acknowledgment outstanding fragment(s).
*/
struct mca_ptl_base_ack_header_t {
/* common header */
mca_ptl_base_common_header_t hdr_common;
/* source fragment */
lam_ptr_t hdr_src_ptr;
/* matched receive request */
lam_ptr_t hdr_dst_ptr;
/* sequence range */
};
typedef struct mca_ptl_base_ack_header_t mca_ptl_base_ack_header_t;
/*
* Union of defined header types.
*/
union mca_ptl_base_header_t {
mca_ptl_base_common_header_t hdr_common;
mca_ptl_base_match_header_t hdr_match;
mca_ptl_base_frag_header_t hdr_frag;
mca_ptl_base_ack_header_t hdr_ack;
};
typedef union mca_ptl_base_header_t mca_ptl_base_header_t;
#define hdr_type hdr_common.hdr_type
#define hdr_flags hdr_common.hdr_flags
#define hdr_size hdr_common.hdr_size
#endif

Просмотреть файл

@ -23,19 +23,19 @@
*/
static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match(
mca_ptl_base_header_t *frag_header,
mca_ptl_base_match_header_t *frag_header,
mca_pml_comm_t *ptl_comm);
static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match(
mca_ptl_base_header_t *frag_header,
mca_ptl_base_match_header_t *frag_header,
mca_pml_comm_t *ptl_comm);
static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_match(
mca_ptl_base_header_t *frag_header,
mca_ptl_base_match_header_t *frag_header,
mca_pml_comm_t *ptl_comm);
static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receives_for_match(
mca_ptl_base_header_t *frag_header,
mca_ptl_base_match_header_t *frag_header,
mca_pml_comm_t *ptl_comm);
static void mca_ptl_base_check_cantmatch_for_match(
@ -73,7 +73,7 @@ static void mca_ptl_base_check_cantmatch_for_match(
* - fragments may be corrupt
* - this routine may be called simoultaneously by more than one thread
*/
int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
mca_ptl_base_recv_frag_t *frag_desc, bool* match_made,
lam_list_t *additional_matches)
{
@ -95,7 +95,7 @@ int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
frag_msg_seq = frag_header->hdr_msg_seq;
/* get fragment communicator source rank */
frag_src = frag_header->hdr_frag_seq;
frag_src = frag_header->hdr_src_rank;
/* get next expected message sequence number - if threaded
* run, lock to make sure that if another thread is processing
@ -204,7 +204,7 @@ int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
* set by the upper level routine.
*/
static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match
(mca_ptl_base_header_t *frag_header, mca_pml_comm_t *pml_comm)
(mca_ptl_base_match_header_t *frag_header, mca_pml_comm_t *pml_comm)
{
/* local parameters */
mca_ptl_base_recv_request_t *return_match;
@ -218,7 +218,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match
* look only at "specific" receives, or "wild" receives,
* or if we need to traverse both sets at the same time.
*/
frag_src = frag_header->hdr_frag_seq;
frag_src = frag_header->hdr_src_rank;
if (lam_list_get_size((pml_comm->c_specific_receives)+frag_src) == 0 ){
/*
@ -263,7 +263,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match
* set by the upper level routine.
*/
static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match(
mca_ptl_base_header_t *frag_header,
mca_ptl_base_match_header_t *frag_header,
mca_pml_comm_t *pml_comm)
{
/* local parameters */
@ -330,7 +330,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match(
* set by the upper level routine.
*/
static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_match(
mca_ptl_base_header_t *frag_header,
mca_ptl_base_match_header_t *frag_header,
mca_pml_comm_t *pml_comm)
{
/* local variables */
@ -340,7 +340,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_mat
/* initialization */
return_match=(mca_ptl_base_recv_request_t *)NULL;
frag_src = frag_header->hdr_frag_seq;
frag_src = frag_header->hdr_src_rank;
frag_user_tag=frag_header->hdr_user_tag;
/*
@ -392,7 +392,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_mat
* set by the upper level routine.
*/
static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receives_for_match(
mca_ptl_base_header_t *frag_header,
mca_ptl_base_match_header_t *frag_header,
mca_pml_comm_t *pml_comm)
{
/* local variables */
@ -402,7 +402,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
/* initialization */
return_match=(mca_ptl_base_recv_request_t *)NULL;
frag_src = frag_header->hdr_frag_seq;
frag_src = frag_header->hdr_src_rank;
frag_user_tag=frag_header->hdr_user_tag;
/*
@ -574,7 +574,7 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
/*
* If the message has the next expected seq from that proc...
*/
frag_seqber=frag_desc->super.frag_header.hdr_msg_seq;
frag_seqber=frag_desc->super.frag_header.hdr_match.hdr_msg_seq;
if (frag_seqber == next_msg_seq_expected) {
/* initialize list on first entry - assume that most
@ -603,7 +603,7 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
* check to see if this frag matches a posted message
*/
matched_receive = mca_ptl_base_check_receives_for_match(
&frag_desc->super.frag_header, pml_comm);
&frag_desc->super.frag_header.hdr_match, pml_comm);
/* if match found, process data */
if (matched_receive) {

Просмотреть файл

@ -5,8 +5,11 @@
#ifndef MCA_PTL_BASE_MATCH_H
#define MCA_PTL_BASE_MATCH_H
int mca_ptl_base_match(mca_ptl_base_header_t *frag_header,
mca_ptl_base_recv_frag_t *frag_desc, bool *match_made,
struct mca_ptl_base_recv_frag_t;
int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
struct mca_ptl_base_recv_frag_t *frag_desc, bool *match_made,
lam_list_t *additional_matches);
#endif /* MCA_PTL_BASE_MATCH_H */

Просмотреть файл

@ -29,51 +29,3 @@ void mca_ptl_base_recv_frag_destroy(mca_ptl_base_recv_frag_t* frag)
SUPER_DESTROY(frag, &mca_ptl_base_frag_cls);
}
int mca_ptl_base_recv_frag_match(mca_ptl_base_recv_frag_t* frag, mca_ptl_base_header_t* header)
{
lam_list_t matched_frags;
bool matched;
int rc = mca_ptl_base_match(header, frag, &matched, &matched_frags);
if(rc != LAM_SUCCESS)
return rc;
if(matched) {
do {
mca_ptl_base_recv_request_t* request = frag->frag_request;
mca_ptl_t* ptl = frag->super.frag_owner;
/* determine the offset and size of posted buffer */
if (request->super.req_length < frag->super.frag_header.hdr_msg_offset) {
/* user buffer is to small - discard entire fragment */
frag->super.frag_addr = 0;
frag->super.frag_size = 0;
} else if (request->super.req_length < frag->super.frag_header.hdr_msg_offset +
frag->super.frag_header.hdr_frag_length) {
/* user buffer is to small - discard part of fragment */
frag->super.frag_addr = ((unsigned char*)request->super.req_addr +
frag->super.frag_header.hdr_msg_offset);
frag->super.frag_size = request->super.req_length - frag->super.frag_header.hdr_msg_offset;
} else {
/* user buffer is large enough for this fragment */
frag->super.frag_addr = ((unsigned char*)request->super.req_addr +
frag->super.frag_header.hdr_msg_offset);
frag->super.frag_size = frag->super.frag_header.hdr_frag_length;
}
/* send cts acknowledgment back to peer */
ptl->ptl_cts(ptl, frag);
/* process any fragments that arrived out of order */
frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags);
} while(NULL != frag);
}
return LAM_SUCCESS;
}

Просмотреть файл

@ -9,6 +9,7 @@
#include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/ptl/base/ptl_base_fragment.h"
#include "mca/mpi/ptl/base/ptl_base_recvreq.h"
#include "mca/mpi/ptl/base/ptl_base_match.h"
extern lam_class_info_t mca_ptl_base_recv_frag_cls;
@ -16,12 +17,63 @@ extern lam_class_info_t mca_ptl_base_recv_frag_cls;
struct mca_ptl_base_recv_frag_t {
mca_ptl_base_frag_t super;
mca_ptl_base_recv_request_t *frag_request; /* matched posted receive */
struct mca_ptl_peer_t* frag_peer; /* peer received from */
struct mca_ptl_base_peer_t* frag_peer; /* peer received from */
};
typedef struct mca_ptl_base_recv_frag_t mca_ptl_base_recv_frag_t;
int mca_ptl_base_recv_frag_match(mca_ptl_base_recv_frag_t*, mca_ptl_base_header_t*);
static inline void mca_ptl_base_recv_frag_process(mca_ptl_base_recv_frag_t* frag)
{
mca_ptl_base_recv_request_t* request = frag->frag_request;
mca_ptl_base_frag_header_t* header = &frag->super.frag_header.hdr_frag;
mca_ptl_t* ptl = frag->super.frag_owner;
/* determine the offset and size of posted buffer */
if (request->super.req_length < header->hdr_frag_offset) {
/* user buffer is to small - discard entire fragment */
frag->super.frag_addr = 0;
frag->super.frag_size = 0;
} else if (request->super.req_length < header->hdr_frag_offset + header->hdr_frag_length) {
/* user buffer is to small - discard part of fragment */
frag->super.frag_addr = ((unsigned char*)request->super.req_addr + header->hdr_frag_offset);
frag->super.frag_size = request->super.req_length - header->hdr_frag_offset;
} else {
/* user buffer is large enough for this fragment */
frag->super.frag_addr = ((unsigned char*)request->super.req_addr + header->hdr_frag_offset);
frag->super.frag_size = header->hdr_frag_length;
}
/* indicate to the ptl that the fragment can be delivered */
ptl->ptl_recv(ptl, frag);
}
static inline int mca_ptl_base_recv_frag_match(mca_ptl_base_recv_frag_t* frag, mca_ptl_base_match_header_t* header)
{
bool matched;
lam_list_t matched_frags;
int rc = mca_ptl_base_match(header, frag, &matched, &matched_frags);
if(rc != LAM_SUCCESS)
return rc;
if(matched) {
do {
/* process current fragment */
mca_ptl_base_recv_frag_process(frag);
/* process any additional fragments that arrived out of order */
frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags);
} while(NULL != frag);
}
return LAM_SUCCESS;
}
#endif

Просмотреть файл

@ -118,23 +118,24 @@ static bool mca_ptl_base_recv_request_match_specific_proc(mca_ptl_base_recv_requ
for (frag = (mca_ptl_base_recv_frag_t*)lam_list_get_first(unexpected_frags);
frag != (mca_ptl_base_recv_frag_t*)lam_list_get_end(unexpected_frags);
frag = (mca_ptl_base_recv_frag_t*)lam_list_get_next(frag)) {
mca_ptl_base_match_header_t* header = &frag->super.frag_header.hdr_match;
/* check first frag - we assume that process matching has been done already */
if (((tag == LAM_ANY_TAG) || (tag == frag->super.frag_header.hdr_user_tag))) {
if (((tag == LAM_ANY_TAG) || (tag == header->hdr_user_tag))) {
mca_ptl_t* ptl = frag->super.frag_owner;
if (tag == LAM_ANY_TAG && frag->super.frag_header.hdr_user_tag < 0) {
if (tag == LAM_ANY_TAG && header->hdr_user_tag < 0) {
continue;
}
request->req_sequence = header->hdr_msg_seq;
request->super.req_tag = tag = header->hdr_user_tag;
request->super.req_peer = header->hdr_src_rank;
frag->frag_request = request;
request->req_sequence = frag->super.frag_header.hdr_msg_seq;
request->super.req_tag = tag = frag->super.frag_header.hdr_user_tag;
request->super.req_peer = frag->super.frag_header.hdr_src_rank;
/* notify ptl fragment has been matched - send cts to peer */
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
ptl->ptl_cts(ptl, frag);
mca_ptl_base_recv_frag_process(frag);
return true;
}
}

Просмотреть файл

@ -40,7 +40,9 @@ struct mca_ptl_base_send_request_t {
/* PTL that allocated this descriptor */
struct mca_ptl_t* req_owner;
/* PTL peer instance that will be used for first fragment */
struct mca_ptl_peer_t* req_peer;
struct mca_ptl_base_peer_t* req_peer;
/* peer matched receive */
lam_ptr_t req_peer_request;
};
typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
@ -62,6 +64,7 @@ static inline void mca_ptl_base_send_request_reinit(
request->req_bytes_sent = 0;
request->req_bytes_acked = 0;
request->req_send_mode = sendmode;
request->req_peer_request.lval = 0;
request->super.req_addr = addr;
request->super.req_length = length;
request->super.req_datatype = datatype;

Просмотреть файл

@ -20,7 +20,7 @@
*/
struct mca_ptl_t;
struct mca_ptl_peer_t;
struct mca_ptl_base_peer_t;
struct mca_ptl_base_fragment_t;
struct mca_ptl_base_send_request_t;
struct mca_ptl_base_recv_frag_t;
@ -91,7 +91,7 @@ typedef struct mca_ptl_base_module_1_0_0_t mca_ptl_base_module_t;
typedef int (*mca_ptl_base_add_proc_fn_t)(
struct mca_ptl_t* ptl,
struct lam_proc_t* proc,
struct mca_ptl_peer_t**
struct mca_ptl_base_peer_t**
);
/**
@ -105,7 +105,7 @@ typedef int (*mca_ptl_base_add_proc_fn_t)(
typedef int (*mca_ptl_base_del_proc_fn_t)(
struct mca_ptl_t* ptl,
struct lam_proc_t* proc,
struct mca_ptl_peer_t*
struct mca_ptl_base_peer_t*
);
/**
@ -134,13 +134,13 @@ typedef void (*mca_ptl_base_frag_return_fn_t)(
typedef int (*mca_ptl_base_send_fn_t)(
struct mca_ptl_t* ptl,
struct mca_ptl_peer_t* ptl_peer,
struct mca_ptl_base_peer_t* ptl_base_peer,
struct mca_ptl_base_send_request_t* send_request,
size_t size,
bool* complete
);
typedef int (*mca_ptl_base_cts_fn_t)(
typedef int (*mca_ptl_base_recv_fn_t)(
struct mca_ptl_t* ptl,
struct mca_ptl_base_recv_frag_t* recv_frag
);
@ -165,7 +165,7 @@ struct mca_ptl_t {
mca_ptl_base_del_proc_fn_t ptl_del_proc;
mca_ptl_base_finalize_fn_t ptl_finalize;
mca_ptl_base_send_fn_t ptl_send;
mca_ptl_base_cts_fn_t ptl_cts;
mca_ptl_base_recv_fn_t ptl_recv;
mca_ptl_base_request_alloc_fn_t ptl_request_alloc;
mca_ptl_base_request_return_fn_t ptl_request_return;
mca_ptl_base_frag_return_fn_t ptl_frag_return;

Просмотреть файл

@ -30,7 +30,7 @@ mca_ptl_tcp_t mca_ptl_tcp = {
mca_ptl_tcp_del_proc,
mca_ptl_tcp_finalize,
mca_ptl_tcp_send,
mca_ptl_tcp_cts,
mca_ptl_tcp_recv,
mca_ptl_tcp_request_alloc,
mca_ptl_tcp_request_return,
mca_ptl_tcp_frag_return
@ -54,10 +54,10 @@ int mca_ptl_tcp_create(int if_index)
}
int mca_ptl_tcp_add_proc(struct mca_ptl_t* ptl, struct lam_proc_t *lam_proc, struct mca_ptl_peer_t** peer_ret)
int mca_ptl_tcp_add_proc(struct mca_ptl_t* ptl, struct lam_proc_t *lam_proc, struct mca_ptl_base_peer_t** peer_ret)
{
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_create(lam_proc);
mca_ptl_peer_t* ptl_peer;
mca_ptl_base_peer_t* ptl_peer;
int rc;
if(NULL == ptl_proc)
@ -77,7 +77,7 @@ int mca_ptl_tcp_add_proc(struct mca_ptl_t* ptl, struct lam_proc_t *lam_proc, str
/* The ptl_proc datastructure is shared by all TCP PTL instances that are trying
* to reach this destination. Cache the peer instance on the ptl_proc.
*/
ptl_peer = OBJ_CREATE(mca_ptl_peer_t, &mca_ptl_tcp_peer_cls);
ptl_peer = OBJ_CREATE(mca_ptl_base_peer_t, &mca_ptl_tcp_peer_cls);
if(NULL == ptl_peer) {
THREAD_UNLOCK(&ptl_proc->proc_lock);
return LAM_ERR_OUT_OF_RESOURCE;
@ -95,7 +95,7 @@ int mca_ptl_tcp_add_proc(struct mca_ptl_t* ptl, struct lam_proc_t *lam_proc, str
}
int mca_ptl_tcp_del_proc(struct mca_ptl_t* ptl, struct lam_proc_t *proc, struct mca_ptl_peer_t* ptl_peer)
int mca_ptl_tcp_del_proc(struct mca_ptl_t* ptl, struct lam_proc_t *proc, struct mca_ptl_base_peer_t* ptl_peer)
{
OBJ_RELEASE(ptl_peer);
return LAM_SUCCESS;
@ -133,7 +133,7 @@ void mca_ptl_tcp_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_base_recv_fra
int mca_ptl_tcp_send(
struct mca_ptl_t* ptl,
struct mca_ptl_peer_t* ptl_peer,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t* sendreq,
size_t size,
bool* complete)
@ -152,7 +152,7 @@ int mca_ptl_tcp_send(
}
int mca_ptl_tcp_cts(
int mca_ptl_tcp_recv(
struct mca_ptl_t* ptl,
struct mca_ptl_base_recv_frag_t* frag)
{

Просмотреть файл

@ -83,13 +83,13 @@ extern int mca_ptl_tcp_finalize(
extern int mca_ptl_tcp_add_proc(
struct mca_ptl_t* ptl,
struct lam_proc_t *procs,
struct mca_ptl_peer_t** addr
struct mca_ptl_base_peer_t** addr
);
extern int mca_ptl_tcp_del_proc(
struct mca_ptl_t* ptl,
struct lam_proc_t *procs,
struct mca_ptl_peer_t* addr
struct mca_ptl_base_peer_t* addr
);
extern int mca_ptl_tcp_request_alloc(
@ -109,13 +109,13 @@ extern void mca_ptl_tcp_frag_return(
extern int mca_ptl_tcp_send(
struct mca_ptl_t* ptl,
struct mca_ptl_peer_t* ptl_peer,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t*,
size_t size,
bool* complete
);
extern int mca_ptl_tcp_cts(
extern int mca_ptl_tcp_recv(
struct mca_ptl_t* ptl,
struct mca_ptl_base_recv_frag_t* frag
);

Просмотреть файл

@ -21,12 +21,12 @@ lam_class_info_t mca_ptl_tcp_peer_cls = {
};
static int mca_ptl_tcp_peer_start_connect(mca_ptl_peer_t*);
static void mca_ptl_tcp_peer_close_i(mca_ptl_peer_t*);
static void mca_ptl_tcp_peer_connected(mca_ptl_peer_t*);
static void mca_ptl_tcp_peer_recv_handler(mca_ptl_peer_t*, int sd);
static void mca_ptl_tcp_peer_send_handler(mca_ptl_peer_t*, int sd);
static void mca_ptl_tcp_peer_except_handler(mca_ptl_peer_t*, int sd);
static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_recv_handler(mca_ptl_base_peer_t*, int sd);
static void mca_ptl_tcp_peer_send_handler(mca_ptl_base_peer_t*, int sd);
static void mca_ptl_tcp_peer_except_handler(mca_ptl_base_peer_t*, int sd);
@ -41,7 +41,7 @@ static lam_reactor_listener_t mca_ptl_tcp_peer_listener = {
* Initialize state of the peer instance.
*/
void mca_ptl_tcp_peer_init(mca_ptl_peer_t* ptl_peer)
void mca_ptl_tcp_peer_init(mca_ptl_base_peer_t* ptl_peer)
{
SUPER_INIT(ptl_peer, &lam_list_cls);
ptl_peer->peer_ptl = 0;
@ -61,7 +61,7 @@ void mca_ptl_tcp_peer_init(mca_ptl_peer_t* ptl_peer)
* Cleanup any resources held by the peer.
*/
void mca_ptl_tcp_peer_destroy(mca_ptl_peer_t* ptl_peer)
void mca_ptl_tcp_peer_destroy(mca_ptl_base_peer_t* ptl_peer)
{
mca_ptl_tcp_proc_remove(ptl_peer->peer_proc, ptl_peer);
mca_ptl_tcp_peer_close_i(ptl_peer);
@ -74,7 +74,7 @@ void mca_ptl_tcp_peer_destroy(mca_ptl_peer_t* ptl_peer)
* queue the fragment and start the connection as required.
*/
int mca_ptl_tcp_peer_send(mca_ptl_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t* frag)
int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t* frag)
{
int rc = LAM_SUCCESS;
THREAD_LOCK(&ptl_peer->peer_lock);
@ -118,7 +118,7 @@ int mca_ptl_tcp_peer_send(mca_ptl_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t* fra
* otherwise, reject the connection and continue with the current connection
*/
bool mca_ptl_tcp_peer_accept(mca_ptl_peer_t* ptl_peer, struct sockaddr_in* addr, int sd)
bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in* addr, int sd)
{
mca_ptl_tcp_addr_t* ptl_addr;
mca_ptl_tcp_proc_t* this_proc = mca_ptl_tcp_proc_local();
@ -146,7 +146,7 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_peer_t* ptl_peer, struct sockaddr_in* addr,
* prior to delegating to the internal routine.
*/
void mca_ptl_tcp_peer_close(mca_ptl_peer_t* ptl_peer)
void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
{
THREAD_LOCK(&ptl_peer->peer_lock);
mca_ptl_tcp_peer_close_i(ptl_peer);
@ -159,7 +159,7 @@ void mca_ptl_tcp_peer_close(mca_ptl_peer_t* ptl_peer)
* been closed.
*/
static void mca_ptl_tcp_peer_close_i(mca_ptl_peer_t* ptl_peer)
static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t* ptl_peer)
{
if(ptl_peer->peer_sd >= 0) {
lam_reactor_remove(
@ -177,7 +177,7 @@ static void mca_ptl_tcp_peer_close_i(mca_ptl_peer_t* ptl_peer)
*
*/
static void mca_ptl_tcp_peer_connected(mca_ptl_peer_t* ptl_peer)
static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t* ptl_peer)
{
int flags = LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT;
ptl_peer->peer_state = MCA_PTL_TCP_CONNECTED;
@ -200,7 +200,7 @@ static void mca_ptl_tcp_peer_connected(mca_ptl_peer_t* ptl_peer)
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_ptl_tcp_peer_recv_blocking(mca_ptl_peer_t* ptl_peer, void* data, size_t size)
static int mca_ptl_tcp_peer_recv_blocking(mca_ptl_base_peer_t* ptl_peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
@ -233,7 +233,7 @@ static int mca_ptl_tcp_peer_recv_blocking(mca_ptl_peer_t* ptl_peer, void* data,
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_ptl_tcp_peer_send_blocking(mca_ptl_peer_t* ptl_peer, void* data, size_t size)
static int mca_ptl_tcp_peer_send_blocking(mca_ptl_base_peer_t* ptl_peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
@ -260,7 +260,7 @@ static int mca_ptl_tcp_peer_send_blocking(mca_ptl_peer_t* ptl_peer, void* data,
* socket to a connected state.
*/
static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_peer_t* ptl_peer)
static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer)
{
uint32_t size_n, size_h;
void* guid;
@ -296,7 +296,7 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_peer_t* ptl_peer)
* a newly connected socket.
*/
static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_peer_t* ptl_peer)
static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_base_peer_t* ptl_peer)
{
/* send process identifier to remote peer */
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_local();
@ -318,7 +318,7 @@ static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_peer_t* ptl_peer)
* the peers response.
*/
static int mca_ptl_tcp_peer_start_connect(mca_ptl_peer_t* ptl_peer)
static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer)
{
int rc;
ptl_peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
@ -378,7 +378,7 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_peer_t* ptl_peer)
* newly connected socket.
*/
static void mca_ptl_tcp_peer_complete_connect(mca_ptl_peer_t* ptl_peer)
static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
{
int so_error = 0;
lam_socklen_t so_length = sizeof(so_error);
@ -426,7 +426,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_peer_t* ptl_peer)
* of the socket and take the appropriate action.
*/
static void mca_ptl_tcp_peer_recv_handler(mca_ptl_peer_t* ptl_peer, int sd)
static void mca_ptl_tcp_peer_recv_handler(mca_ptl_base_peer_t* ptl_peer, int sd)
{
THREAD_LOCK(&ptl_peer->peer_lock);
switch(ptl_peer->peer_state) {
@ -471,7 +471,7 @@ static void mca_ptl_tcp_peer_recv_handler(mca_ptl_peer_t* ptl_peer, int sd)
* of the socket and take the appropriate action.
*/
static void mca_ptl_tcp_peer_send_handler(mca_ptl_peer_t* ptl_peer, int sd)
static void mca_ptl_tcp_peer_send_handler(mca_ptl_base_peer_t* ptl_peer, int sd)
{
THREAD_LOCK(&ptl_peer->peer_lock);
switch(ptl_peer->peer_state) {
@ -507,7 +507,7 @@ static void mca_ptl_tcp_peer_send_handler(mca_ptl_peer_t* ptl_peer, int sd)
* A file descriptor is in an erroneous state. Close the connection
* and update the peers state.
*/
static void mca_ptl_tcp_peer_except_handler(mca_ptl_peer_t* ptl_peer, int sd)
static void mca_ptl_tcp_peer_except_handler(mca_ptl_base_peer_t* ptl_peer, int sd)
{
lam_output(0, "mca_ptl_tcp_peer_except_handler: closing connection");
mca_ptl_tcp_peer_close_i(ptl_peer);

Просмотреть файл

@ -30,12 +30,12 @@ extern lam_class_info_t mca_ptl_tcp_peer_cls;
/**
* An abstraction that represents a connection to a peer process.
* An instance of mca_ptl_peer_t is associated w/ each process
* An instance of mca_ptl_base_peer_t is associated w/ each process
* and PTL pair at startup. However, connections to the peer
* are established dynamically on an as-needed basis:
*/
struct mca_ptl_peer_t {
struct mca_ptl_base_peer_t {
lam_list_item_t super;
struct mca_ptl_tcp_t* peer_ptl;
struct mca_ptl_tcp_proc_t* peer_proc;
@ -48,14 +48,14 @@ struct mca_ptl_peer_t {
lam_list_t peer_frags;
lam_mutex_t peer_lock;
};
typedef struct mca_ptl_peer_t mca_ptl_peer_t;
typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t;
void mca_ptl_tcp_peer_destroy(mca_ptl_peer_t*);
void mca_ptl_tcp_peer_close(mca_ptl_peer_t*);
void mca_ptl_tcp_peer_init(mca_ptl_peer_t*);
int mca_ptl_tcp_peer_send(mca_ptl_peer_t*, mca_ptl_tcp_send_frag_t*);
bool mca_ptl_tcp_peer_accept(mca_ptl_peer_t*, struct sockaddr_in*, int);
void mca_ptl_tcp_peer_destroy(mca_ptl_base_peer_t*);
void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t*);
void mca_ptl_tcp_peer_init(mca_ptl_base_peer_t*);
int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t*, mca_ptl_tcp_send_frag_t*);
bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t*, struct sockaddr_in*, int);
#endif

Просмотреть файл

@ -110,8 +110,8 @@ mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_create(lam_proc_t* lam_proc)
}
/* allocate space for peer array - one for each exported address */
ptl_proc->proc_peers = (mca_ptl_peer_t**)
LAM_MALLOC(ptl_proc->proc_addr_count * sizeof(mca_ptl_peer_t*));
ptl_proc->proc_peers = (mca_ptl_base_peer_t**)
LAM_MALLOC(ptl_proc->proc_addr_count * sizeof(mca_ptl_base_peer_t*));
if(NULL == ptl_proc->proc_peers) {
OBJ_RELEASE(ptl_proc);
return NULL;
@ -167,7 +167,7 @@ mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup(void *guid, size_t size)
* Note that this routine must be called with the lock on the process already
* held. Insert a ptl instance into the proc array and assign it an address.
*/
int mca_ptl_tcp_proc_insert(mca_ptl_tcp_proc_t* ptl_proc, mca_ptl_peer_t* ptl_peer)
int mca_ptl_tcp_proc_insert(mca_ptl_tcp_proc_t* ptl_proc, mca_ptl_base_peer_t* ptl_peer)
{
struct mca_ptl_tcp_t *ptl_tcp = ptl_peer->peer_ptl;
size_t i;
@ -202,14 +202,14 @@ int mca_ptl_tcp_proc_insert(mca_ptl_tcp_proc_t* ptl_proc, mca_ptl_peer_t* ptl_pe
* no longer in use.
*/
int mca_ptl_tcp_proc_remove(mca_ptl_tcp_proc_t* ptl_proc, mca_ptl_peer_t* ptl_peer)
int mca_ptl_tcp_proc_remove(mca_ptl_tcp_proc_t* ptl_proc, mca_ptl_base_peer_t* ptl_peer)
{
size_t i;
THREAD_LOCK(&ptl_proc->proc_lock);
for(i=0; i<ptl_proc->proc_peer_count; i++) {
if(ptl_proc->proc_peers[i] == ptl_peer) {
memmove(&ptl_proc->proc_peers+i,ptl_proc->proc_peers+i+1,
(ptl_proc->proc_peer_count-i)*sizeof(mca_ptl_peer_t*));
(ptl_proc->proc_peer_count-i)*sizeof(mca_ptl_base_peer_t*));
}
}
ptl_proc->proc_peer_count--;
@ -228,7 +228,7 @@ bool mca_ptl_tcp_proc_accept(mca_ptl_tcp_proc_t* ptl_proc, struct sockaddr_in* a
size_t i;
THREAD_LOCK(&ptl_proc->proc_lock);
for(i=0; i<ptl_proc->proc_peer_count; i++) {
mca_ptl_peer_t* ptl_peer = ptl_proc->proc_peers[i];
mca_ptl_base_peer_t* ptl_peer = ptl_proc->proc_peers[i];
if(mca_ptl_tcp_peer_accept(ptl_peer, addr, sd))
return true;
}

Просмотреть файл

@ -18,7 +18,7 @@ extern lam_class_info_t mca_ptl_tcp_proc_cls;
/**
* Represents the state of a remote process and the set of addresses
* that it exports. Also cache an instance or mca_ptl_peer_t for each
* that it exports. Also cache an instance or mca_ptl_base_peer_t for each
* PTL instance that attempts to open a connection to the process.
*/
struct mca_ptl_tcp_proc_t {
@ -28,7 +28,7 @@ struct mca_ptl_tcp_proc_t {
size_t proc_guid_size;
struct mca_ptl_tcp_addr_t *proc_addrs;
size_t proc_addr_count;
struct mca_ptl_peer_t **proc_peers;
struct mca_ptl_base_peer_t **proc_peers;
size_t proc_peer_count;
lam_mutex_t proc_lock;
};
@ -49,8 +49,8 @@ static inline mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_local(void)
return mca_ptl_tcp_proc_self;
}
int mca_ptl_tcp_proc_insert(mca_ptl_tcp_proc_t*, mca_ptl_peer_t*);
int mca_ptl_tcp_proc_remove(mca_ptl_tcp_proc_t*, mca_ptl_peer_t*);
int mca_ptl_tcp_proc_insert(mca_ptl_tcp_proc_t*, mca_ptl_base_peer_t*);
int mca_ptl_tcp_proc_remove(mca_ptl_tcp_proc_t*, mca_ptl_base_peer_t*);
bool mca_ptl_tcp_proc_accept(mca_ptl_tcp_proc_t*, struct sockaddr_in*, int sd);
#endif

Просмотреть файл

@ -9,6 +9,16 @@
#include "ptl_tcp_recvfrag.h"
static void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag);
static void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t* frag);
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t);
static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd);
static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd);
static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd);
static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd);
static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd);
lam_class_info_t mca_ptl_tcp_recv_frag_cls = {
"mca_ptl_tcp_recv_frag_t",
&mca_ptl_base_recv_frag_cls,
@ -16,24 +26,20 @@ lam_class_info_t mca_ptl_tcp_recv_frag_cls = {
(class_destroy_t)mca_ptl_tcp_recv_frag_destroy
};
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd);
static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd);
static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd);
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag)
static void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag)
{
SUPER_INIT(frag, &mca_ptl_base_recv_frag_cls);
}
void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t* frag)
static void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t* frag)
{
SUPER_DESTROY(frag, &mca_ptl_base_recv_frag_cls);
}
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_peer_t* peer)
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer_t* peer)
{
frag->frag_owner = &peer->peer_ptl->super;
frag->super.frag_request = 0;
@ -47,33 +53,32 @@ void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_peer_t*
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd) == false)
/* read common header */
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_common_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_common_header_t)) == false)
return false;
if(frag->frag_msg_cnt < frag->frag_size)
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
return false;
if(frag->frag_msg_cnt < frag->frag_header.hdr_frag_length)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
return false;
if(NULL != frag->super.frag_request) {
/* indicate completion status */
mca_ptl_base_recv_request_progress(frag->super.frag_request, &frag->super);
switch(frag->frag_header.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
return mca_ptl_tcp_recv_frag_match(frag, sd);
case MCA_PTL_HDR_TYPE_FRAG:
return mca_ptl_tcp_recv_frag_frag(frag, sd);
case MCA_PTL_HDR_TYPE_ACK:
case MCA_PTL_HDR_TYPE_NACK:
return mca_ptl_tcp_recv_frag_ack(frag, sd);
default:
return true;
}
return true;
}
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd)
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t size)
{
/* non-blocking read - continue if interrupted, otherwise wait until data available */
unsigned char* ptr = (unsigned char*)&frag->frag_header;
int cnt = -1;
while(cnt < 0) {
cnt = recv(sd, ptr + frag->frag_hdr_cnt, sizeof(mca_ptl_base_header_t) - frag->frag_hdr_cnt, 0);
cnt = recv(sd, ptr + frag->frag_hdr_cnt, size - frag->frag_hdr_cnt, 0);
if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
@ -93,26 +98,84 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd)
}
}
}
/* is the entire header available? */
/* is the entire common header available? */
frag->frag_hdr_cnt += cnt;
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t))
return false;
return (frag->frag_hdr_cnt == size);
}
/* attempt to match a posted recv */
mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header);
/* match was not made - so allocate buffer for eager send */
if(NULL == frag->super.frag_request) {
frag->frag_addr = (unsigned char*)LAM_MALLOC(frag->frag_header.hdr_frag_length);
frag->frag_size = frag->frag_header.hdr_frag_length;
} else {
frag->frag_addr = (unsigned char*)frag->super.super.frag_addr;
frag->frag_size = frag->super.super.frag_size;
static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
if (frag->frag_hdr_cnt < sizeof(mca_ptl_base_ack_header_t))
if (mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_ack_header_t)) == false)
return false;
return true;
}
static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_match_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_match_header_t)) == false)
return false;
if(frag->frag_msg_cnt == 0) {
/* attempt to match a posted recv */
mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header.hdr_match);
/* match was not made - so allocate buffer for eager send */
if(NULL == frag->super.frag_request) {
frag->frag_addr = (unsigned char*)LAM_MALLOC(frag->frag_header.hdr_frag.hdr_frag_length);
frag->frag_size = frag->frag_header.hdr_frag.hdr_frag_length;
} else {
frag->frag_addr = (unsigned char*)frag->super.super.frag_addr;
frag->frag_size = frag->super.super.frag_size;
}
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
return false;
} else if(frag->frag_msg_cnt < frag->super.super.frag_size) {
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
return false;
}
if(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
return false;
if(NULL != frag->super.frag_request) {
/* indicate completion status */
mca_ptl_base_recv_request_progress(frag->super.frag_request, &frag->super);
}
return true;
}
static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_match_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_match_header_t)) == false)
return false;
if(frag->frag_msg_cnt == 0) {
/* determine offset into user buffer or allocate buffer for non-contig data */
} else if(frag->frag_msg_cnt < frag->super.super.frag_size) {
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
return false;
}
if(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
return false;
/* indicate completion status */
mca_ptl_base_recv_request_progress(frag->super.frag_request, &frag->super);
return true;
}
/*
* Continue with non-blocking recv() calls until the entire
* fragment is received.
@ -156,8 +219,8 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
int cnt = -1;
while(cnt < 0) {
void *rbuf = LAM_MALLOC(frag->frag_header.hdr_frag_length - frag->frag_msg_cnt);
cnt = recv(sd, rbuf, frag->frag_header.hdr_frag_length - frag->frag_msg_cnt, 0);
void *rbuf = LAM_MALLOC(frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt);
cnt = recv(sd, rbuf, frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt, 0);
LAM_FREE(rbuf);
if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer);
@ -179,8 +242,7 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
}
}
frag->frag_msg_cnt += cnt;
return (frag->frag_msg_cnt >= frag->frag_header.hdr_frag_length);
return (frag->frag_msg_cnt >= frag->frag_header.hdr_frag.hdr_frag_length);
}

Просмотреть файл

@ -29,10 +29,8 @@ struct mca_ptl_tcp_recv_frag_t {
typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t;
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t*);
void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t*);
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd);
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_peer_t* peer);
void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_base_peer_t* peer);
#endif

Просмотреть файл

@ -1,30 +0,0 @@
/*
* $HEADER$
*/
#include "mca/mpi/ptl/base/ptl_base_sendreq.h"
#include "mca/mpi/ptl/base/ptl_base_sendfrag.h"
#include "ptl_tcp.h"
#include "ptl_tcp_peer.h"
#include "ptl_tcp_sendfrag.h"
int mca_ptl_tcp_send(
struct mca_ptl_t* ptl,
struct mca_ptl_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t* sendreq,
size_t size,
bool* complete)
{
mca_ptl_tcp_send_frag_t* sendfrag;
if (sendreq->req_frags == 0) {
sendfrag = (mca_ptl_tcp_send_frag_t*)(sendreq+1);
} else {
int rc;
sendfrag = (mca_ptl_tcp_send_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_frags, &rc);
if(sendfrag == 0)
return rc;
}
mca_ptl_tcp_send_frag_reinit(sendfrag, ptl_peer, sendreq, size);
return mca_ptl_tcp_peer_send(ptl_peer, sendfrag);
}

Просмотреть файл

@ -37,33 +37,47 @@ void mca_ptl_tcp_send_frag_destroy(mca_ptl_tcp_send_frag_t* frag)
void mca_ptl_tcp_send_frag_reinit(
mca_ptl_tcp_send_frag_t* sendfrag,
mca_ptl_peer_t* ptl_peer,
mca_ptl_base_peer_t* ptl_peer,
mca_ptl_base_send_request_t* sendreq,
size_t size)
{
/* message header */
mca_ptl_base_header_t* hdr = &sendfrag->frag_header;
hdr->hdr_contextid = sendreq->super.req_communicator->c_contextid;
hdr->hdr_src_rank = sendreq->super.req_communicator->c_rank;
hdr->hdr_dst_rank = sendreq->super.req_peer;
hdr->hdr_user_tag = sendreq->super.req_tag;
hdr->hdr_msg_type = sendreq->req_send_mode;
hdr->hdr_msg_length = sendreq->req_length;
hdr->hdr_msg_offset = sendreq->req_offset;
hdr->hdr_msg_seq = 0;
hdr->hdr_frag_seq = 0;
if(sendreq->req_frags == 0) {
hdr->hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_flags = MCA_PTL_FLAGS_ACK_IMMEDIATE;
hdr->hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = sendreq->req_offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr.pval = 0;
hdr->hdr_match.hdr_contextid = sendreq->super.req_communicator->c_contextid;
hdr->hdr_match.hdr_src_rank = sendreq->super.req_communicator->c_rank;
hdr->hdr_match.hdr_dst_rank = sendreq->super.req_peer;
hdr->hdr_match.hdr_user_tag = sendreq->super.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_length;
hdr->hdr_match.hdr_msg_seq = 0;
} else {
hdr->hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_flags = 0;
hdr->hdr_size = sizeof(mca_ptl_base_frag_header_t);
hdr->hdr_frag.hdr_frag_offset = sendreq->req_offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_request;
}
/* update request */
if(sendreq->req_offset + size > sendreq->req_length)
size = sendreq->req_length = sendreq->req_offset;
hdr->hdr_frag_length = size;
hdr->hdr_frag.hdr_frag_length = size;
sendreq->req_offset += size;
sendreq->req_frags++;
/* fragment state */
sendfrag->frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->super.frag_request = sendreq;
sendfrag->super.super.frag_addr = sendreq->super.req_addr + hdr->hdr_msg_offset;
sendfrag->super.super.frag_addr = sendreq->super.req_addr + hdr->hdr_frag.hdr_frag_offset;
sendfrag->super.super.frag_size = size;
sendfrag->frag_peer = ptl_peer;

Просмотреть файл

@ -34,7 +34,7 @@ bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t*, int sd);
void mca_ptl_tcp_send_frag_reinit(
mca_ptl_tcp_send_frag_t*,
struct mca_ptl_peer_t*,
struct mca_ptl_base_peer_t*,
struct mca_ptl_base_send_request_t*,
size_t);