From 7b6a879b56865a3151781de9a611b2031955033c Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Thu, 28 Oct 2004 22:23:22 +0000 Subject: [PATCH] work in progress This commit was SVN r3415. --- src/mca/ptl/mx/ptl_mx.c | 8 ++- src/mca/ptl/mx/ptl_mx_component.c | 2 +- src/mca/ptl/mx/ptl_mx_module.c | 95 +++++++++++++++++++++++++++---- src/mca/ptl/mx/ptl_mx_module.h | 60 +++---------------- src/mca/ptl/mx/ptl_mx_proc.c | 3 +- src/mca/ptl/mx/ptl_mx_recvfrag.h | 74 +++++++++++++++++------- src/mca/ptl/mx/ptl_mx_sendfrag.h | 50 ++++++++++++++-- 7 files changed, 201 insertions(+), 91 deletions(-) diff --git a/src/mca/ptl/mx/ptl_mx.c b/src/mca/ptl/mx/ptl_mx.c index 32d3cb5806..8721a40949 100644 --- a/src/mca/ptl/mx/ptl_mx.c +++ b/src/mca/ptl/mx/ptl_mx.c @@ -141,7 +141,7 @@ int mca_ptl_mx_send( mx_return_t mx_return; int rc; - if (offset == 0 && sendreq->req_cached) { + if (sendreq->req_cached) { sendfrag = (mca_ptl_mx_send_frag_t*)(sendreq+1); } else { ompi_list_item_t* item; @@ -168,6 +168,7 @@ int mca_ptl_mx_send( hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; /* initialize convertor */ + sendfrag->frag_progress = 0; sendfrag->frag_free = 0; if(size > 0) { ompi_convertor_t *convertor; @@ -276,6 +277,7 @@ int mca_ptl_mx_send_continue( mca_ptl_mx_send_frag_t* sendfrag; mca_ptl_base_header_t* hdr; mx_return_t mx_return; + uint64_t match_value; int rc; /* allocate fragment */ @@ -296,6 +298,7 @@ int mca_ptl_mx_send_continue( hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; /* initialize convertor */ + sendfrag->frag_progress = 0; sendfrag->frag_free = 0; if(size > 0) { ompi_convertor_t *convertor; @@ -362,12 +365,13 @@ int mca_ptl_mx_send_continue( sendreq->req_offset += size; /* start the fragment */ + match_value = ((uint64_t)sendreq << 32) | (uint64_t)offset; mx_return = mx_isend( mx_ptl->mx_endpoint, sendfrag->frag_segments, sendfrag->frag_segment_count, sendfrag->frag_send.frag_base.frag_peer->peer_addr, - 1, + match_value, sendfrag, &sendfrag->frag_request); if(mx_return != MX_SUCCESS) { diff --git a/src/mca/ptl/mx/ptl_mx_component.c b/src/mca/ptl/mx/ptl_mx_component.c index 3cca87fd69..b070673c25 100644 --- a/src/mca/ptl/mx/ptl_mx_component.c +++ b/src/mca/ptl/mx/ptl_mx_component.c @@ -91,7 +91,7 @@ int mca_ptl_mx_component_open(void) /* register MX module parameters */ mca_ptl_mx_component.mx_filter = - (uint32_t)mca_ptl_mx_param_register_int("filter", 0x12345); + (uint32_t)mca_ptl_mx_param_register_int("filter", 0xdeadbeef); mca_ptl_mx_component.mx_prepost = mca_ptl_mx_param_register_int("prepost", 1); mca_ptl_mx_component.mx_debug = diff --git a/src/mca/ptl/mx/ptl_mx_module.c b/src/mca/ptl/mx/ptl_mx_module.c index 636d919756..c6d778404b 100644 --- a/src/mca/ptl/mx/ptl_mx_module.c +++ b/src/mca/ptl/mx/ptl_mx_module.c @@ -8,6 +8,11 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr); +static void* mca_ptl_mx_mem_alloc( size_t* size ) +{ + return malloc(*size); +} + /** * Initialize MX PTL modules @@ -128,6 +133,84 @@ static void* mca_ptl_mx_thread(ompi_object_t *arg) #endif +/* + * Callback on a match. + * + */ + +static void mca_ptl_mx_match(void* context, uint64_t match_value, int size) +{ + mca_ptl_mx_module_t* ptl = (mca_ptl_mx_module_t*)context; + mca_ptl_mx_recv_frag_t *frag; + mx_return_t mx_return; + int rc; + + MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc); + if(rc != OMPI_SUCCESS) { + ompi_output(0, "mca_ptl_mx_match: unable to allocate resources.\n"); + return; + } + frag->frag_recv.frag_base.frag_owner = &ptl->super; + frag->frag_recv.frag_base.frag_peer = NULL; + + /* first fragment - post a buffer */ + if(match_value == 0) { + + frag->frag_segment_count = 2; + frag->frag_segments[1].segment_ptr = frag->frag_data; + frag->frag_segments[1].segment_length = size - sizeof(mca_ptl_base_header_t); + + /* fragment has already been matched */ + } else { + + mca_pml_base_recv_request_t* request = (mca_pml_base_recv_request_t*) + (uint32_t)(match_value >> 32); + uint32_t offset = (uint32_t)match_value; + ompi_proc_t *proc = ompi_comm_peer_lookup(request->req_base.req_comm, + request->req_base.req_ompi.req_status.MPI_SOURCE); + ompi_convertor_t* convertor = &frag->frag_recv.frag_base.frag_convertor; + frag->frag_recv.frag_base.frag_size = size - sizeof(mca_ptl_base_header_t); + + /* initialize convertor */ + ompi_convertor_copy(proc->proc_convertor, convertor); + ompi_convertor_init_for_recv( + convertor, + 0, /* flags */ + request->req_base.req_datatype, /* datatype */ + request->req_base.req_count, /* count elements */ + request->req_base.req_addr, /* users buffer */ + offset, /* offset in bytes into packed buffer */ + mca_ptl_mx_mem_alloc ); /* not allocating memory */ + + /* non-contiguous - allocate buffer for receive */ + if( 1 == ompi_convertor_need_buffers( convertor ) || + request->req_bytes_packed < offset + frag->frag_recv.frag_base.frag_size) { + frag->frag_recv.frag_base.frag_addr = malloc(frag->frag_recv.frag_base.frag_size); + frag->frag_recv.frag_is_buffered = true; + /* calculate offset into users buffer */ + } else { + frag->frag_recv.frag_base.frag_addr = ((unsigned char*)request->req_base.req_addr) + offset; + } + + frag->frag_segments[1].segment_ptr = frag->frag_recv.frag_base.frag_addr; + frag->frag_segments[1].segment_length = frag->frag_recv.frag_base.frag_size; + frag->frag_segment_count = 2; + } + + mx_return = mx_irecv( + ptl->mx_endpoint, + frag->frag_segments, + frag->frag_segment_count, + match_value, + MX_MATCH_MASK_NONE, + frag, + &frag->frag_request); + if(mx_return != MX_SUCCESS) { + ompi_output(0, "mca_ptl_mx_match: mx_irecv() failed with status=%dn", mx_return); + } +} + + /* * Create and intialize an MX PTL module, where each module * represents a specific NIC. @@ -137,7 +220,6 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr) { mca_ptl_mx_module_t* ptl = malloc(sizeof(mca_ptl_mx_module_t)); mx_return_t status; - int i; if(NULL == ptl) return NULL; @@ -187,15 +269,8 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr) ptl->mx_filter); } - /* pre-post receive buffers */ - for(i=0; isuper); - return NULL; - } - } + /* register a callback function for matching */ + mx_register_match_callback(ptl->mx_endpoint, mca_ptl_mx_match, ptl); #if OMPI_HAVE_THREADS /* create a thread to progress requests */ diff --git a/src/mca/ptl/mx/ptl_mx_module.h b/src/mca/ptl/mx/ptl_mx_module.h index b6f9f7eb6c..950c4acaae 100644 --- a/src/mca/ptl/mx/ptl_mx_module.h +++ b/src/mca/ptl/mx/ptl_mx_module.h @@ -13,45 +13,6 @@ #include "ptl_mx_sendfrag.h" - -/** - * Prepost recv buffers - */ - -#define MCA_PTL_MX_POST(ptl, rc) \ -do { \ - mca_ptl_mx_recv_frag_t* frag; \ - mx_return_t mx_return; \ - /* post an additional recv */ \ - MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc); \ - if(rc != OMPI_SUCCESS) { \ - ompi_output(0, "mca_ptl_mx_post: unable to allocate recv fragn"); \ - rc = OMPI_ERR_OUT_OF_RESOURCE; \ - break; \ - } \ - frag->frag_recv.frag_base.frag_owner = &ptl->super; \ - frag->frag_recv.frag_base.frag_peer = NULL; \ - frag->frag_segment_count = 2; \ - frag->frag_segments[1].segment_ptr = frag->frag_data; \ - frag->frag_segments[1].segment_length = sizeof(frag->frag_data); \ - \ - mx_return = mx_irecv( \ - ptl->mx_endpoint, \ - frag->frag_segments, \ - frag->frag_segment_count, \ - 1, \ - MX_MATCH_MASK_NONE, \ - frag, \ - &frag->frag_request); \ - if(mx_return != MX_SUCCESS) { \ - ompi_output(0, "mca_ptl_mx_post: mx_irecv() failed with status=%dn", \ - mx_return); \ - rc = OMPI_ERROR; \ - } \ - rc = OMPI_SUCCESS; \ -} while(0) - - /** * Routine to process complete request(s). */ @@ -79,15 +40,7 @@ do { case MCA_PTL_FRAGMENT_SEND: \ { \ mca_ptl_mx_send_frag_t* sendfrag = (mca_ptl_mx_send_frag_t*)frag; \ - mca_pml_base_send_request_t* sendreq = \ - sendfrag->frag_send.frag_request; \ - bool req_cached = sendreq->req_cached; \ - ptl->super.ptl_send_progress( \ - &ptl->super, \ - sendreq, \ - sendfrag->frag_send.frag_base.frag_size); \ - if(req_cached == false) \ - MCA_PTL_MX_SEND_FRAG_RETURN(sendfrag); \ + MCA_PTL_MX_SEND_FRAG_PROGRESS(sendfrag); \ break; \ } \ case MCA_PTL_FRAGMENT_RECV: \ @@ -100,7 +53,6 @@ do { case MCA_PTL_HDR_TYPE_MATCH: \ { \ MCA_PTL_MX_RECV_FRAG_MATCH(recvfrag,hdr); \ - MCA_PTL_MX_POST(ptl, rc); \ break; \ } \ case MCA_PTL_HDR_TYPE_FRAG: \ @@ -110,8 +62,14 @@ do { } \ case MCA_PTL_HDR_TYPE_ACK: \ { \ - MCA_PTL_MX_RECV_FRAG_ACK(recvfrag,hdr); \ - MCA_PTL_MX_POST(ptl, rc); \ + mca_ptl_mx_send_frag_t* sendfrag; \ + mca_pml_base_send_request_t* sendreq; \ + sendfrag = (mca_ptl_mx_send_frag_t*) \ + hdr->hdr_ack.hdr_src_ptr.pval; \ + sendreq = sendfrag->frag_send.frag_request; \ + sendreq->req_peer_match = hdr->hdr_ack.hdr_dst_match; \ + MCA_PTL_MX_SEND_FRAG_PROGRESS(sendfrag); \ + MCA_PTL_MX_RECV_FRAG_RETURN(recvfrag); \ break; \ } \ } \ diff --git a/src/mca/ptl/mx/ptl_mx_proc.c b/src/mca/ptl/mx/ptl_mx_proc.c index 8a1ebd6376..de792521d2 100644 --- a/src/mca/ptl/mx/ptl_mx_proc.c +++ b/src/mca/ptl/mx/ptl_mx_proc.c @@ -140,12 +140,11 @@ mca_ptl_mx_proc_t* mca_ptl_mx_proc_lookup(const ompi_process_name_t *name) */ int mca_ptl_mx_proc_insert(mca_ptl_mx_proc_t* ptl_proc, mca_ptl_base_peer_t* ptl_peer) { - /* insert into peer array */ - mx_endpoint_addr_t addr; uint64_t mx_nic_addr; uint32_t mx_endpoint_id; uint32_t mx_filter; + /* insert into peer array */ ptl_peer->peer_proc = ptl_proc; ptl_peer->peer_addr = ptl_proc->proc_addrs[ptl_proc->proc_peer_count]; ptl_proc->proc_peers[ptl_proc->proc_peer_count] = ptl_peer; diff --git a/src/mca/ptl/mx/ptl_mx_recvfrag.h b/src/mca/ptl/mx/ptl_mx_recvfrag.h index 252b3f7833..0a5bc2b8ab 100644 --- a/src/mca/ptl/mx/ptl_mx_recvfrag.h +++ b/src/mca/ptl/mx/ptl_mx_recvfrag.h @@ -10,6 +10,7 @@ #include "ptl_mx.h" #include "mca/ptl/base/ptl_base_recvfrag.h" +#include "ptl_mx_sendfrag.h" /** * MX received fragment derived type. @@ -17,59 +18,92 @@ struct mca_ptl_mx_recv_frag_t { mca_ptl_base_recv_frag_t frag_recv; /**< base receive fragment descriptor */ mx_request_t frag_request; - mx_segment_t frag_segments[2]; + mx_segment_t frag_segments[3]; uint32_t frag_segment_count; - unsigned char frag_data[32768]; + unsigned char frag_data[32*1024]; }; typedef struct mca_ptl_mx_recv_frag_t mca_ptl_mx_recv_frag_t; OBJ_CLASS_DECLARATION(mca_ptl_mx_recv_frag_t); -#define MCA_PTL_MX_RECV_FRAG_ALLOC(recvfrag, rc) \ +#define MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc) \ { \ ompi_list_item_t* item; \ OMPI_FREE_LIST_GET(&mca_ptl_mx_component.mx_recv_frags, item, rc); \ - recvfrag = (mca_ptl_mx_recv_frag_t*)item; \ + frag = (mca_ptl_mx_recv_frag_t*)item; \ } -#define MCA_PTL_MX_RECV_FRAG_RETURN(recvfrag) \ - OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_recv_frags, (ompi_list_item_t*)recvfrag); +#define MCA_PTL_MX_RECV_FRAG_RETURN(frag) \ +{ \ + if(frag->frag_recv.frag_is_buffered) { \ + free(frag->frag_segments[1].segment_ptr); \ + } \ + OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_recv_frags, (ompi_list_item_t*)frag); \ +} /** * Callback on receipt of a match fragment. */ -#define MCA_PTL_MX_RECV_FRAG_MATCH(recvfrag, hdr) \ -do { \ - if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) { \ - MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match); \ - } \ - ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv, \ - &hdr->hdr_match); \ -} while(0) +#define MCA_PTL_MX_RECV_FRAG_MATCH(frag, hdr) \ +do { \ + if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) { \ + MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match); \ + } \ + ptl->super.ptl_match(&ptl->super, &frag->frag_recv, &hdr->hdr_match); \ +} while(0) + /** - * + * Process a fragment that completed. */ static inline void MCA_PTL_MX_RECV_FRAG_FRAG( - mca_ptl_mx_recv_frag_t* recvfrag, + mca_ptl_mx_recv_frag_t* frag, mca_ptl_base_header_t* hdr) { /* copy into user space */ - if(recvfrag->frag_recv.frag_is_buffered) { + if(frag->frag_recv.frag_is_buffered) { + struct iovec iov; + unsigned int iov_count; + unsigned int max_data; + int free_after; + + iov.iov_base = frag->frag_recv.frag_base.frag_addr; + iov.iov_len = frag->frag_recv.frag_base.frag_size; + iov_count = 1; + max_data = iov.iov_len; + ompi_convertor_unpack( &frag->frag_recv.frag_base.frag_convertor, + &iov, &iov_count, &max_data, &free_after ); + } + + /* progress the request */ + frag->frag_recv.frag_base.frag_owner->ptl_recv_progress( + frag->frag_recv.frag_base.frag_owner, + frag->frag_recv.frag_request, + frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length, + frag->frag_recv.frag_base.frag_size); - - } + MCA_PTL_MX_RECV_FRAG_RETURN(frag); } +/** + * Process an acknowledgment. + */ static inline void MCA_PTL_MX_RECV_FRAG_ACK( - mca_ptl_mx_recv_frag_t* recvfrag, + mca_ptl_mx_recv_frag_t* frag, mca_ptl_base_header_t* hdr) { + mca_ptl_mx_send_frag_t* sendfrag; + mca_pml_base_send_request_t* sendreq; + sendfrag = (mca_ptl_mx_send_frag_t*)frag->frag_recv.frag_base.frag_header.hdr_ack.hdr_src_ptr.pval; + sendreq = sendfrag->frag_send.frag_request; + sendreq->req_peer_match = frag->frag_recv.frag_base.frag_header.hdr_ack.hdr_dst_match; + MCA_PTL_MX_SEND_FRAG_PROGRESS(sendfrag); + MCA_PTL_MX_RECV_FRAG_RETURN(frag); } diff --git a/src/mca/ptl/mx/ptl_mx_sendfrag.h b/src/mca/ptl/mx/ptl_mx_sendfrag.h index cc495c55aa..078f3c3c5d 100644 --- a/src/mca/ptl/mx/ptl_mx_sendfrag.h +++ b/src/mca/ptl/mx/ptl_mx_sendfrag.h @@ -24,9 +24,13 @@ struct mca_ptl_mx_send_frag_t { mx_request_t frag_request; mx_segment_t frag_segments[2]; size_t frag_segment_count; + uint32_t frag_progress; }; typedef struct mca_ptl_mx_send_frag_t mca_ptl_mx_send_frag_t; +OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t); + + #define MCA_PTL_MX_SEND_FRAG_ALLOC(sendfrag, rc) \ { \ ompi_list_item_t* item; \ @@ -48,17 +52,14 @@ typedef struct mca_ptl_mx_send_frag_t mca_ptl_mx_send_frag_t; OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_send_frags, (ompi_list_item_t*)sendfrag); \ } -OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t); - - #define MCA_PTL_MX_SEND_FRAG_INIT_ACK(ack,ptl,frag) \ { \ mca_ptl_base_header_t* hdr = &(ack)->frag_send.frag_base.frag_header; \ - mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request; \ + mca_pml_base_recv_request_t* request = (frag)->frag_recv.frag_request; \ hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; \ hdr->hdr_common.hdr_flags = 0; \ hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); \ - hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr; \ + hdr->hdr_ack.hdr_src_ptr = (frag)->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr; \ hdr->hdr_ack.hdr_dst_match.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ \ hdr->hdr_ack.hdr_dst_match.pval = request; \ hdr->hdr_ack.hdr_dst_addr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ \ @@ -74,5 +75,44 @@ OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t); } +static inline void MCA_PTL_MX_SEND_FRAG_PROGRESS(mca_ptl_mx_send_frag_t* frag) +{ + mca_pml_base_send_request_t* request = frag->frag_send.frag_request; + bool frag_ack; + uint32_t frag_progress; + + /* if this is an ack - simply return to pool */ + if(request == NULL) { + MCA_PTL_MX_SEND_FRAG_RETURN(frag); + return; + } + + /* Done when: + * (1) send completes and ack is received + * (2) send completes and ack is not required + */ + frag_ack = (frag->frag_send.frag_base.frag_header. + hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) ? true : false; + frag_progress = ompi_atomic_add_32(&frag->frag_progress, 1); + + if((frag_ack == true && frag_progress == 2) || + (frag_ack == false && frag_progress == 1)) { + + /* update request status */ + frag->frag_send.frag_base.frag_owner->ptl_send_progress( + frag->frag_send.frag_base.frag_owner, + request, + frag->frag_send.frag_base.frag_size); + + /* return any fragment that didnt come from the cache */ + if (request->req_cached == false || + frag->frag_send.frag_base.frag_header.hdr_frag.hdr_frag_offset != 0) { + MCA_PTL_MX_SEND_FRAG_RETURN(frag); + } + } +} + + + #endif