1
1
This commit was SVN r3415.
Этот коммит содержится в:
Tim Woodall 2004-10-28 22:23:22 +00:00
родитель c275a54c1f
Коммит 7b6a879b56
7 изменённых файлов: 201 добавлений и 91 удалений

Просмотреть файл

@ -141,7 +141,7 @@ int mca_ptl_mx_send(
mx_return_t mx_return;
int rc;
if (offset == 0 && sendreq->req_cached) {
if (sendreq->req_cached) {
sendfrag = (mca_ptl_mx_send_frag_t*)(sendreq+1);
} else {
ompi_list_item_t* item;
@ -168,6 +168,7 @@ int mca_ptl_mx_send(
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
/* initialize convertor */
sendfrag->frag_progress = 0;
sendfrag->frag_free = 0;
if(size > 0) {
ompi_convertor_t *convertor;
@ -276,6 +277,7 @@ int mca_ptl_mx_send_continue(
mca_ptl_mx_send_frag_t* sendfrag;
mca_ptl_base_header_t* hdr;
mx_return_t mx_return;
uint64_t match_value;
int rc;
/* allocate fragment */
@ -296,6 +298,7 @@ int mca_ptl_mx_send_continue(
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
/* initialize convertor */
sendfrag->frag_progress = 0;
sendfrag->frag_free = 0;
if(size > 0) {
ompi_convertor_t *convertor;
@ -362,12 +365,13 @@ int mca_ptl_mx_send_continue(
sendreq->req_offset += size;
/* start the fragment */
match_value = ((uint64_t)sendreq << 32) | (uint64_t)offset;
mx_return = mx_isend(
mx_ptl->mx_endpoint,
sendfrag->frag_segments,
sendfrag->frag_segment_count,
sendfrag->frag_send.frag_base.frag_peer->peer_addr,
1,
match_value,
sendfrag,
&sendfrag->frag_request);
if(mx_return != MX_SUCCESS) {

Просмотреть файл

@ -91,7 +91,7 @@ int mca_ptl_mx_component_open(void)
/* register MX module parameters */
mca_ptl_mx_component.mx_filter =
(uint32_t)mca_ptl_mx_param_register_int("filter", 0x12345);
(uint32_t)mca_ptl_mx_param_register_int("filter", 0xdeadbeef);
mca_ptl_mx_component.mx_prepost =
mca_ptl_mx_param_register_int("prepost", 1);
mca_ptl_mx_component.mx_debug =

Просмотреть файл

@ -8,6 +8,11 @@
static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr);
static void* mca_ptl_mx_mem_alloc( size_t* size )
{
return malloc(*size);
}
/**
* Initialize MX PTL modules
@ -128,6 +133,84 @@ static void* mca_ptl_mx_thread(ompi_object_t *arg)
#endif
/*
* Callback on a match.
*
*/
static void mca_ptl_mx_match(void* context, uint64_t match_value, int size)
{
mca_ptl_mx_module_t* ptl = (mca_ptl_mx_module_t*)context;
mca_ptl_mx_recv_frag_t *frag;
mx_return_t mx_return;
int rc;
MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_ptl_mx_match: unable to allocate resources.\n");
return;
}
frag->frag_recv.frag_base.frag_owner = &ptl->super;
frag->frag_recv.frag_base.frag_peer = NULL;
/* first fragment - post a buffer */
if(match_value == 0) {
frag->frag_segment_count = 2;
frag->frag_segments[1].segment_ptr = frag->frag_data;
frag->frag_segments[1].segment_length = size - sizeof(mca_ptl_base_header_t);
/* fragment has already been matched */
} else {
mca_pml_base_recv_request_t* request = (mca_pml_base_recv_request_t*)
(uint32_t)(match_value >> 32);
uint32_t offset = (uint32_t)match_value;
ompi_proc_t *proc = ompi_comm_peer_lookup(request->req_base.req_comm,
request->req_base.req_ompi.req_status.MPI_SOURCE);
ompi_convertor_t* convertor = &frag->frag_recv.frag_base.frag_convertor;
frag->frag_recv.frag_base.frag_size = size - sizeof(mca_ptl_base_header_t);
/* initialize convertor */
ompi_convertor_copy(proc->proc_convertor, convertor);
ompi_convertor_init_for_recv(
convertor,
0, /* flags */
request->req_base.req_datatype, /* datatype */
request->req_base.req_count, /* count elements */
request->req_base.req_addr, /* users buffer */
offset, /* offset in bytes into packed buffer */
mca_ptl_mx_mem_alloc ); /* not allocating memory */
/* non-contiguous - allocate buffer for receive */
if( 1 == ompi_convertor_need_buffers( convertor ) ||
request->req_bytes_packed < offset + frag->frag_recv.frag_base.frag_size) {
frag->frag_recv.frag_base.frag_addr = malloc(frag->frag_recv.frag_base.frag_size);
frag->frag_recv.frag_is_buffered = true;
/* calculate offset into users buffer */
} else {
frag->frag_recv.frag_base.frag_addr = ((unsigned char*)request->req_base.req_addr) + offset;
}
frag->frag_segments[1].segment_ptr = frag->frag_recv.frag_base.frag_addr;
frag->frag_segments[1].segment_length = frag->frag_recv.frag_base.frag_size;
frag->frag_segment_count = 2;
}
mx_return = mx_irecv(
ptl->mx_endpoint,
frag->frag_segments,
frag->frag_segment_count,
match_value,
MX_MATCH_MASK_NONE,
frag,
&frag->frag_request);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_match: mx_irecv() failed with status=%dn", mx_return);
}
}
/*
* Create and intialize an MX PTL module, where each module
* represents a specific NIC.
@ -137,7 +220,6 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
{
mca_ptl_mx_module_t* ptl = malloc(sizeof(mca_ptl_mx_module_t));
mx_return_t status;
int i;
if(NULL == ptl)
return NULL;
@ -187,15 +269,8 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
ptl->mx_filter);
}
/* pre-post receive buffers */
for(i=0; i<mca_ptl_mx_component.mx_prepost; i++) {
int rc;
MCA_PTL_MX_POST(ptl, rc);
if(rc != OMPI_SUCCESS) {
mca_ptl_mx_finalize(&ptl->super);
return NULL;
}
}
/* register a callback function for matching */
mx_register_match_callback(ptl->mx_endpoint, mca_ptl_mx_match, ptl);
#if OMPI_HAVE_THREADS
/* create a thread to progress requests */

Просмотреть файл

@ -13,45 +13,6 @@
#include "ptl_mx_sendfrag.h"
/**
* Prepost recv buffers
*/
#define MCA_PTL_MX_POST(ptl, rc) \
do { \
mca_ptl_mx_recv_frag_t* frag; \
mx_return_t mx_return; \
/* post an additional recv */ \
MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc); \
if(rc != OMPI_SUCCESS) { \
ompi_output(0, "mca_ptl_mx_post: unable to allocate recv fragn"); \
rc = OMPI_ERR_OUT_OF_RESOURCE; \
break; \
} \
frag->frag_recv.frag_base.frag_owner = &ptl->super; \
frag->frag_recv.frag_base.frag_peer = NULL; \
frag->frag_segment_count = 2; \
frag->frag_segments[1].segment_ptr = frag->frag_data; \
frag->frag_segments[1].segment_length = sizeof(frag->frag_data); \
\
mx_return = mx_irecv( \
ptl->mx_endpoint, \
frag->frag_segments, \
frag->frag_segment_count, \
1, \
MX_MATCH_MASK_NONE, \
frag, \
&frag->frag_request); \
if(mx_return != MX_SUCCESS) { \
ompi_output(0, "mca_ptl_mx_post: mx_irecv() failed with status=%dn", \
mx_return); \
rc = OMPI_ERROR; \
} \
rc = OMPI_SUCCESS; \
} while(0)
/**
* Routine to process complete request(s).
*/
@ -79,15 +40,7 @@ do {
case MCA_PTL_FRAGMENT_SEND: \
{ \
mca_ptl_mx_send_frag_t* sendfrag = (mca_ptl_mx_send_frag_t*)frag; \
mca_pml_base_send_request_t* sendreq = \
sendfrag->frag_send.frag_request; \
bool req_cached = sendreq->req_cached; \
ptl->super.ptl_send_progress( \
&ptl->super, \
sendreq, \
sendfrag->frag_send.frag_base.frag_size); \
if(req_cached == false) \
MCA_PTL_MX_SEND_FRAG_RETURN(sendfrag); \
MCA_PTL_MX_SEND_FRAG_PROGRESS(sendfrag); \
break; \
} \
case MCA_PTL_FRAGMENT_RECV: \
@ -100,7 +53,6 @@ do {
case MCA_PTL_HDR_TYPE_MATCH: \
{ \
MCA_PTL_MX_RECV_FRAG_MATCH(recvfrag,hdr); \
MCA_PTL_MX_POST(ptl, rc); \
break; \
} \
case MCA_PTL_HDR_TYPE_FRAG: \
@ -110,8 +62,14 @@ do {
} \
case MCA_PTL_HDR_TYPE_ACK: \
{ \
MCA_PTL_MX_RECV_FRAG_ACK(recvfrag,hdr); \
MCA_PTL_MX_POST(ptl, rc); \
mca_ptl_mx_send_frag_t* sendfrag; \
mca_pml_base_send_request_t* sendreq; \
sendfrag = (mca_ptl_mx_send_frag_t*) \
hdr->hdr_ack.hdr_src_ptr.pval; \
sendreq = sendfrag->frag_send.frag_request; \
sendreq->req_peer_match = hdr->hdr_ack.hdr_dst_match; \
MCA_PTL_MX_SEND_FRAG_PROGRESS(sendfrag); \
MCA_PTL_MX_RECV_FRAG_RETURN(recvfrag); \
break; \
} \
} \

Просмотреть файл

@ -140,12 +140,11 @@ mca_ptl_mx_proc_t* mca_ptl_mx_proc_lookup(const ompi_process_name_t *name)
*/
int mca_ptl_mx_proc_insert(mca_ptl_mx_proc_t* ptl_proc, mca_ptl_base_peer_t* ptl_peer)
{
/* insert into peer array */
mx_endpoint_addr_t addr;
uint64_t mx_nic_addr;
uint32_t mx_endpoint_id;
uint32_t mx_filter;
/* insert into peer array */
ptl_peer->peer_proc = ptl_proc;
ptl_peer->peer_addr = ptl_proc->proc_addrs[ptl_proc->proc_peer_count];
ptl_proc->proc_peers[ptl_proc->proc_peer_count] = ptl_peer;

Просмотреть файл

@ -10,6 +10,7 @@
#include "ptl_mx.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_mx_sendfrag.h"
/**
* MX received fragment derived type.
@ -17,59 +18,92 @@
struct mca_ptl_mx_recv_frag_t {
mca_ptl_base_recv_frag_t frag_recv; /**< base receive fragment descriptor */
mx_request_t frag_request;
mx_segment_t frag_segments[2];
mx_segment_t frag_segments[3];
uint32_t frag_segment_count;
unsigned char frag_data[32768];
unsigned char frag_data[32*1024];
};
typedef struct mca_ptl_mx_recv_frag_t mca_ptl_mx_recv_frag_t;
OBJ_CLASS_DECLARATION(mca_ptl_mx_recv_frag_t);
#define MCA_PTL_MX_RECV_FRAG_ALLOC(recvfrag, rc) \
#define MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc) \
{ \
ompi_list_item_t* item; \
OMPI_FREE_LIST_GET(&mca_ptl_mx_component.mx_recv_frags, item, rc); \
recvfrag = (mca_ptl_mx_recv_frag_t*)item; \
frag = (mca_ptl_mx_recv_frag_t*)item; \
}
#define MCA_PTL_MX_RECV_FRAG_RETURN(recvfrag) \
OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_recv_frags, (ompi_list_item_t*)recvfrag);
#define MCA_PTL_MX_RECV_FRAG_RETURN(frag) \
{ \
if(frag->frag_recv.frag_is_buffered) { \
free(frag->frag_segments[1].segment_ptr); \
} \
OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_recv_frags, (ompi_list_item_t*)frag); \
}
/**
* Callback on receipt of a match fragment.
*/
#define MCA_PTL_MX_RECV_FRAG_MATCH(recvfrag, hdr) \
do { \
if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) { \
MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match); \
} \
ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv, \
&hdr->hdr_match); \
} while(0)
#define MCA_PTL_MX_RECV_FRAG_MATCH(frag, hdr) \
do { \
if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) { \
MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match); \
} \
ptl->super.ptl_match(&ptl->super, &frag->frag_recv, &hdr->hdr_match); \
} while(0)
/**
*
* Process a fragment that completed.
*/
static inline void MCA_PTL_MX_RECV_FRAG_FRAG(
mca_ptl_mx_recv_frag_t* recvfrag,
mca_ptl_mx_recv_frag_t* frag,
mca_ptl_base_header_t* hdr)
{
/* copy into user space */
if(recvfrag->frag_recv.frag_is_buffered) {
if(frag->frag_recv.frag_is_buffered) {
struct iovec iov;
unsigned int iov_count;
unsigned int max_data;
int free_after;
iov.iov_base = frag->frag_recv.frag_base.frag_addr;
iov.iov_len = frag->frag_recv.frag_base.frag_size;
iov_count = 1;
max_data = iov.iov_len;
ompi_convertor_unpack( &frag->frag_recv.frag_base.frag_convertor,
&iov, &iov_count, &max_data, &free_after );
}
/* progress the request */
frag->frag_recv.frag_base.frag_owner->ptl_recv_progress(
frag->frag_recv.frag_base.frag_owner,
frag->frag_recv.frag_request,
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length,
frag->frag_recv.frag_base.frag_size);
}
MCA_PTL_MX_RECV_FRAG_RETURN(frag);
}
/**
* Process an acknowledgment.
*/
static inline void MCA_PTL_MX_RECV_FRAG_ACK(
mca_ptl_mx_recv_frag_t* recvfrag,
mca_ptl_mx_recv_frag_t* frag,
mca_ptl_base_header_t* hdr)
{
mca_ptl_mx_send_frag_t* sendfrag;
mca_pml_base_send_request_t* sendreq;
sendfrag = (mca_ptl_mx_send_frag_t*)frag->frag_recv.frag_base.frag_header.hdr_ack.hdr_src_ptr.pval;
sendreq = sendfrag->frag_send.frag_request;
sendreq->req_peer_match = frag->frag_recv.frag_base.frag_header.hdr_ack.hdr_dst_match;
MCA_PTL_MX_SEND_FRAG_PROGRESS(sendfrag);
MCA_PTL_MX_RECV_FRAG_RETURN(frag);
}

Просмотреть файл

@ -24,9 +24,13 @@ struct mca_ptl_mx_send_frag_t {
mx_request_t frag_request;
mx_segment_t frag_segments[2];
size_t frag_segment_count;
uint32_t frag_progress;
};
typedef struct mca_ptl_mx_send_frag_t mca_ptl_mx_send_frag_t;
OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t);
#define MCA_PTL_MX_SEND_FRAG_ALLOC(sendfrag, rc) \
{ \
ompi_list_item_t* item; \
@ -48,17 +52,14 @@ typedef struct mca_ptl_mx_send_frag_t mca_ptl_mx_send_frag_t;
OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_send_frags, (ompi_list_item_t*)sendfrag); \
}
OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t);
#define MCA_PTL_MX_SEND_FRAG_INIT_ACK(ack,ptl,frag) \
{ \
mca_ptl_base_header_t* hdr = &(ack)->frag_send.frag_base.frag_header; \
mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request; \
mca_pml_base_recv_request_t* request = (frag)->frag_recv.frag_request; \
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; \
hdr->hdr_common.hdr_flags = 0; \
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); \
hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr; \
hdr->hdr_ack.hdr_src_ptr = (frag)->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr; \
hdr->hdr_ack.hdr_dst_match.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ \
hdr->hdr_ack.hdr_dst_match.pval = request; \
hdr->hdr_ack.hdr_dst_addr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ \
@ -74,5 +75,44 @@ OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t);
}
static inline void MCA_PTL_MX_SEND_FRAG_PROGRESS(mca_ptl_mx_send_frag_t* frag)
{
mca_pml_base_send_request_t* request = frag->frag_send.frag_request;
bool frag_ack;
uint32_t frag_progress;
/* if this is an ack - simply return to pool */
if(request == NULL) {
MCA_PTL_MX_SEND_FRAG_RETURN(frag);
return;
}
/* Done when:
* (1) send completes and ack is received
* (2) send completes and ack is not required
*/
frag_ack = (frag->frag_send.frag_base.frag_header.
hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) ? true : false;
frag_progress = ompi_atomic_add_32(&frag->frag_progress, 1);
if((frag_ack == true && frag_progress == 2) ||
(frag_ack == false && frag_progress == 1)) {
/* update request status */
frag->frag_send.frag_base.frag_owner->ptl_send_progress(
frag->frag_send.frag_base.frag_owner,
request,
frag->frag_send.frag_base.frag_size);
/* return any fragment that didnt come from the cache */
if (request->req_cached == false ||
frag->frag_send.frag_base.frag_header.hdr_frag.hdr_frag_offset != 0) {
MCA_PTL_MX_SEND_FRAG_RETURN(frag);
}
}
}
#endif