diff --git a/src/mca/pml/ob1/pml_ob1.c b/src/mca/pml/ob1/pml_ob1.c index 527c03dde5..61b6da8cf0 100644 --- a/src/mca/pml/ob1/pml_ob1.c +++ b/src/mca/pml/ob1/pml_ob1.c @@ -109,12 +109,14 @@ int mca_pml_ob1_add_bmis() ompi_list_t* bmis = &mca_bmi_base_modules_initialized; mca_bmi_base_selected_module_t* selected_bmi; size_t num_bmis = ompi_list_get_size(bmis); + mca_pml_ob1.num_bmi_modules = 0; mca_pml_ob1.num_bmi_progress = 0; mca_pml_ob1.num_bmi_components = 0; mca_pml_ob1.bmi_modules = (mca_bmi_base_module_t **)malloc(sizeof(mca_bmi_base_module_t*) * num_bmis); mca_pml_ob1.bmi_progress = (mca_bmi_base_component_progress_fn_t*)malloc(sizeof(mca_bmi_base_component_progress_fn_t) * num_bmis); mca_pml_ob1.bmi_components = (mca_bmi_base_component_t **)malloc(sizeof(mca_bmi_base_component_t*) * num_bmis); + if (NULL == mca_pml_ob1.bmi_modules || NULL == mca_pml_ob1.bmi_progress || NULL == mca_pml_ob1.bmi_components) { @@ -135,6 +137,11 @@ int mca_pml_ob1_add_bmis() } } + /* override eager limit larger than our max */ + if(bmi->bmi_eager_limit > mca_pml_ob1.eager_limit) { + bmi->bmi_eager_limit = mca_pml_ob1.eager_limit; + } + /* setup callback for receive */ rc = bmi->bmi_register(bmi, MCA_BMI_TAG_PML, mca_pml_ob1_recv_frag_callback, NULL); if(OMPI_SUCCESS != rc) @@ -145,6 +152,16 @@ int mca_pml_ob1_add_bmis() } } + /* initialize free list of receive buffers */ + ompi_free_list_init( + &mca_pml_ob1.buffers, + sizeof(mca_pml_ob1_buffer_t) + mca_pml_ob1.eager_limit, + OBJ_CLASS(mca_pml_ob1_buffer_t), + mca_pml_ob1.free_list_num, + mca_pml_ob1.free_list_max, + mca_pml_ob1.free_list_inc, + NULL); + /* sort ob1 list by exclusivity */ qsort(mca_pml_ob1.bmi_modules, mca_pml_ob1.num_bmi_modules, diff --git a/src/mca/pml/ob1/pml_ob1.h b/src/mca/pml/ob1/pml_ob1.h index 5bf907e849..8ce4e0fa5f 100644 --- a/src/mca/pml/ob1/pml_ob1.h +++ b/src/mca/pml/ob1/pml_ob1.h @@ -55,15 +55,18 @@ struct mca_pml_ob1_t { int free_list_num; /* initial size of free list */ int free_list_max; /* maximum size of free list */ int free_list_inc; /* number of elements to grow free list */ + size_t eager_limit; size_t send_pipeline_depth; size_t recv_pipeline_depth; /* lock queue access */ ompi_mutex_t lock; - /* free list of requests */ + /* free lists */ ompi_free_list_t send_requests; ompi_free_list_t recv_requests; + ompi_free_list_t buffers; + ompi_free_list_t fragments; /* list of pending send requests */ ompi_list_t send_pending; diff --git a/src/mca/pml/ob1/pml_ob1_component.c b/src/mca/pml/ob1/pml_ob1_component.c index 9ba0738652..881acea0c2 100644 --- a/src/mca/pml/ob1/pml_ob1_component.c +++ b/src/mca/pml/ob1/pml_ob1_component.c @@ -28,6 +28,7 @@ #include "pml_ob1_hdr.h" #include "pml_ob1_sendreq.h" #include "pml_ob1_recvreq.h" +#include "pml_ob1_recvfrag.h" mca_pml_base_component_1_0_0_t mca_pml_ob1_component = { @@ -76,9 +77,18 @@ static inline int mca_pml_ob1_param_register_int( int mca_pml_ob1_component_open(void) { OBJ_CONSTRUCT(&mca_pml_ob1.lock, ompi_mutex_t); + + /* requests */ OBJ_CONSTRUCT(&mca_pml_ob1.send_requests, ompi_free_list_t); OBJ_CONSTRUCT(&mca_pml_ob1.recv_requests, ompi_free_list_t); + + /* fragments */ + OBJ_CONSTRUCT(&mca_pml_ob1.fragments, ompi_free_list_t); + OBJ_CONSTRUCT(&mca_pml_ob1.buffers, ompi_free_list_t); + + /* pending operations */ OBJ_CONSTRUCT(&mca_pml_ob1.send_pending, ompi_list_t); + OBJ_CONSTRUCT(&mca_pml_ob1.acks_pending, ompi_list_t); mca_pml_ob1.bmi_components = NULL; mca_pml_ob1.num_bmi_components = 0; @@ -95,6 +105,8 @@ int mca_pml_ob1_component_open(void) mca_pml_ob1_param_register_int("free_list_inc", 256); mca_pml_ob1.priority = mca_pml_ob1_param_register_int("priority", 0); + mca_pml_ob1.eager_limit = + mca_pml_ob1_param_register_int("eager_limit", 256 * 1024); mca_pml_ob1.send_pipeline_depth = mca_pml_ob1_param_register_int("send_pipeline_depth", 3); mca_pml_ob1.recv_pipeline_depth = @@ -134,6 +146,7 @@ int mca_pml_ob1_component_close(void) if(NULL != mca_pml_ob1.bmi_progress) { free(mca_pml_ob1.bmi_progress); } + OBJ_DESTRUCT(&mca_pml_ob1.acks_pending); OBJ_DESTRUCT(&mca_pml_ob1.send_pending); OBJ_DESTRUCT(&mca_pml_ob1.send_requests); OBJ_DESTRUCT(&mca_pml_ob1.recv_requests); @@ -169,6 +182,16 @@ mca_pml_base_module_t* mca_pml_ob1_component_init(int* priority, mca_pml_ob1.free_list_inc, NULL); + /* recv fragments */ + ompi_free_list_init( + &mca_pml_ob1.fragments, + sizeof(mca_pml_ob1_fragment_t), + OBJ_CLASS(mca_pml_ob1_fragment_t), + mca_pml_ob1.free_list_num, + mca_pml_ob1.free_list_max, + mca_pml_ob1.free_list_inc, + NULL); + /* buffered send */ if(OMPI_SUCCESS != mca_pml_base_bsend_init(enable_mpi_threads)) { ompi_output(0, "mca_pml_ob1_component_init: mca_pml_bsend_init failed\n"); diff --git a/src/mca/pml/ob1/pml_ob1_endpoint.h b/src/mca/pml/ob1/pml_ob1_endpoint.h index 281241fd7d..6b7be393b4 100644 --- a/src/mca/pml/ob1/pml_ob1_endpoint.h +++ b/src/mca/pml/ob1/pml_ob1_endpoint.h @@ -153,6 +153,61 @@ static inline mca_pml_ob1_endpoint_t* mca_pml_ob1_ep_array_get_next(mca_pml_ob1_ return endpoint; } +/** + * Allocate a descriptor + */ + +#if OMPI_HAVE_THREAD_SUPPORT +#define MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor) \ +do { \ + if(NULL != (descriptor = endpoint->bmi_cache)) { \ + /* atomically acquire the cached descriptor */ \ + if(ompi_atomic_cmpset_ptr(&endpoint->bmi_cache, descriptor, NULL) == 0) { \ + endpoint->bmi_cache = NULL; \ + } else { \ + descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \ + } \ + } else { \ + descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \ + } \ +} while(0) +#else +#define MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor) \ +do { \ + if(NULL != (descriptor = endpoint->bmi_cache)) { \ + endpoint->bmi_cache = NULL; \ + } else { \ + descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \ + } \ +} while(0) +#endif + +/** + * Return a descriptor + */ + +#if OMPI_HAVE_THREAD_SUPPORT +#define MCA_PML_OB1_ENDPOINT_DES_RETURN(endpoint, descriptor) \ +do { \ + if(NULL == bmi_ep->bmi_cache) { \ + if(ompi_atomic_cmpset_ptr(&endpoint->bmi_cache,NULL,descriptor) == 0) { \ + bmi->bmi_free(bmi,descriptor); \ + } \ + } else { \ + bmi->bmi_free(bmi,descriptor); \ + } +} while(0) +#else +#define MCA_PML_OB1_ENDPOINT_DES_RETURN(endpoint, descriptor) \ +do { \ + if(NULL == bmi_ep->bmi_cache) { \ + bmi_ep->bmi_cache = descriptor; \ + } else { \ + bmi->bmi_free(bmi,descriptor); \ + } \ +} while(0) +#endif + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/src/mca/pml/ob1/pml_ob1_recvfrag.c b/src/mca/pml/ob1/pml_ob1_recvfrag.c index 677f5c8d26..0bafd8ad4a 100644 --- a/src/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/src/mca/pml/ob1/pml_ob1_recvfrag.c @@ -35,7 +35,14 @@ OBJ_CLASS_INSTANCE( - mca_pml_ob1_recv_frag_t, + mca_pml_ob1_buffer_t, + ompi_list_item_t, + NULL, + NULL +); + +OBJ_CLASS_INSTANCE( + mca_pml_ob1_fragment_t, ompi_list_item_t, NULL, NULL @@ -348,15 +355,15 @@ static bool mca_pml_ob1_check_cantmatch_for_match( * RCS/CTS receive side matching * * @param hdr list of parameters needed for matching - * This list is also embeded in frag_desc, + * This list is also embeded in frag, * but this allows to save a memory copy when * a match is made in this routine. (IN) - * @param frag_desc pointer to receive fragment which we want + * @param frag pointer to receive fragment which we want * to match (IN/OUT). If a match is not made, - * hdr is copied to frag_desc. - * @param match_made parameter indicating if we matched frag_desc/ + * hdr is copied to frag. + * @param match_made parameter indicating if we matched frag/ * hdr (OUT) - * @param additional_matches if a match is made with frag_desc, we + * @param additional_matches if a match is made with frag, we * may be able to match fragments that previously * have arrived out-of-order. If this is the * case, the associated fragment descriptors are @@ -459,13 +466,13 @@ int mca_pml_ob1_recv_frag_match( } else { /* if no match found, place on unexpected queue */ - mca_pml_ob1_recv_frag_t* frag; - MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc); + mca_pml_ob1_fragment_t* frag; + MCA_PML_OB1_FRAG_ALLOC(frag, rc); if(OMPI_SUCCESS != rc) { OMPI_THREAD_UNLOCK(&pml_comm->matching_lock); return rc; } - MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments); + MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segments,num_segments); ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag ); } @@ -484,13 +491,13 @@ int mca_pml_ob1_recv_frag_match( * This message comes after the next expected, so it * is ahead of sequence. Save it for later. */ - mca_pml_ob1_recv_frag_t* frag; - MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc); + mca_pml_ob1_fragment_t* frag; + MCA_PML_OB1_FRAG_ALLOC(frag, rc); if(OMPI_SUCCESS != rc) { OMPI_THREAD_UNLOCK(&pml_comm->matching_lock); return rc; } - MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments); + MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segments,num_segments); ompi_list_append(&proc->frags_cant_match, (ompi_list_item_t *)frag); } @@ -507,10 +514,10 @@ int mca_pml_ob1_recv_frag_match( if(additional_match) { ompi_list_item_t* item; while(NULL != (item = ompi_list_remove_first(&additional_matches))) { - mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item; + mca_pml_ob1_fragment_t* frag = (mca_pml_ob1_fragment_t*)item; MCA_PML_OB1_RECV_REQUEST_MATCHED(frag->request, hdr); mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments); - MCA_PML_OB1_RECV_FRAG_RETURN(frag); + MCA_PML_OB1_FRAG_RETURN(frag); } } return OMPI_SUCCESS; @@ -540,7 +547,7 @@ static bool mca_pml_ob1_check_cantmatch_for_match( /* local parameters */ int match_found; uint16_t next_msg_seq_expected, frag_seq; - mca_pml_ob1_recv_frag_t *frag_desc; + mca_pml_ob1_fragment_t *frag; mca_pml_ob1_recv_request_t *match = NULL; bool match_made = false; @@ -561,19 +568,19 @@ static bool mca_pml_ob1_check_cantmatch_for_match( /* search the list for a fragment from the send with sequence * number next_msg_seq_expected */ - for(frag_desc = (mca_pml_ob1_recv_frag_t *) + for(frag = (mca_pml_ob1_fragment_t *) ompi_list_get_first(&proc->frags_cant_match); - frag_desc != (mca_pml_ob1_recv_frag_t *) + frag != (mca_pml_ob1_fragment_t *) ompi_list_get_end(&proc->frags_cant_match); - frag_desc = (mca_pml_ob1_recv_frag_t *) - ompi_list_get_next(frag_desc)) + frag = (mca_pml_ob1_fragment_t *) + ompi_list_get_next(frag)) { /* * If the message has the next expected seq from that proc... */ - frag_seq=frag_desc->hdr.hdr_match.hdr_msg_seq; + frag_seq=frag->hdr.hdr_match.hdr_msg_seq; if (frag_seq == next_msg_seq_expected) { - mca_pml_ob1_match_hdr_t* hdr = &frag_desc->hdr.hdr_match; + mca_pml_ob1_match_hdr_t* hdr = &frag->hdr.hdr_match; /* We're now expecting the next sequence number. */ (proc->expected_sequence)++; @@ -582,10 +589,10 @@ static bool mca_pml_ob1_check_cantmatch_for_match( match_found = 1; /* - * remove frag_desc from list + * remove frag from list */ ompi_list_remove_item(&proc->frags_cant_match, - (ompi_list_item_t *)frag_desc); + (ompi_list_item_t *)frag); /* * figure out what sort of matching logic to use, if need to @@ -616,7 +623,7 @@ static bool mca_pml_ob1_check_cantmatch_for_match( /* associate the receive descriptor with the fragment * descriptor */ - frag_desc->request=match; + frag->request=match; /* add this fragment descriptor to the list of * descriptors to be processed later @@ -625,12 +632,12 @@ static bool mca_pml_ob1_check_cantmatch_for_match( match_made = true; OBJ_CONSTRUCT(additional_matches, ompi_list_t); } - ompi_list_append(additional_matches, (ompi_list_item_t *)frag_desc); + ompi_list_append(additional_matches, (ompi_list_item_t *)frag); } else { /* if no match found, place on unexpected queue */ - ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag_desc); + ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag); } @@ -640,7 +647,7 @@ static bool mca_pml_ob1_check_cantmatch_for_match( } /* end if (frag_seq == next_msg_seq_expected) */ - } /* end for (frag_desc) loop */ + } /* end for (frag) loop */ } /* end while loop */ diff --git a/src/mca/pml/ob1/pml_ob1_recvfrag.h b/src/mca/pml/ob1/pml_ob1_recvfrag.h index d8fe225e53..3bb3daf491 100644 --- a/src/mca/pml/ob1/pml_ob1_recvfrag.h +++ b/src/mca/pml/ob1/pml_ob1_recvfrag.h @@ -23,31 +23,78 @@ #include "mca/bmi/bmi.h" #include "pml_ob1_hdr.h" -struct mca_pml_ob1_recv_frag_t { +struct mca_pml_ob1_buffer_t { + ompi_list_item_t super; + unsigned char addr[1]; +}; +typedef struct mca_pml_ob1_buffer_t mca_pml_ob1_buffer_t; + +OBJ_CLASS_DECLARATION(mca_pml_ob1_buffer_t); + + +struct mca_pml_ob1_fragment_t { ompi_list_item_t super; mca_bmi_base_module_t* bmi; mca_pml_ob1_hdr_t hdr; - mca_bmi_base_segment_t* segments; - size_t num_segments; struct mca_pml_ob1_recv_request_t* request; + size_t num_segments; + mca_bmi_base_segment_t segments[MCA_BMI_DES_MAX_SEGMENTS]; + mca_pml_ob1_buffer_t* buffers[MCA_BMI_DES_MAX_SEGMENTS]; }; -typedef struct mca_pml_ob1_recv_frag_t mca_pml_ob1_recv_frag_t; +typedef struct mca_pml_ob1_fragment_t mca_pml_ob1_fragment_t; + +OBJ_CLASS_DECLARATION(mca_pml_ob1_fragment_t); -#define MCA_PML_OB1_RECV_FRAG_ALLOC(frag,rc) \ -{ \ - \ -} +#define MCA_PML_OB1_FRAG_ALLOC(frag,rc) \ +do { \ + ompi_list_item_t* item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_ob1.fragments, item, rc); \ + frag = (mca_pml_ob1_fragment_t*)item; \ +} while(0) -#define MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segs,cnt) \ -{ \ - \ -} -#define MCA_PML_OB1_RECV_FRAG_RETURN(frag) \ -{ \ - \ -} +#define MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segs,cnt) \ +do { \ + size_t i; \ + mca_bmi_base_segment_t* segments = frag->segments; \ + mca_pml_ob1_buffer_t** buffers = frag->buffers; \ + \ + /* init fragment */ \ + frag->bmi = bmi; \ + frag->hdr = *(mca_pml_ob1_hdr_t*)hdr; \ + frag->num_segments = cnt; \ + \ + /* copy over data */ \ + for(i=0; i<cnt; i++) { \ + ompi_list_item_t* item; \ + mca_pml_ob1_buffer_t* buff; \ + OMPI_FREE_LIST_WAIT(&mca_pml_ob1.buffers, item, rc); \ + buff = (mca_pml_ob1_buffer_t*)item; \ + buffers[i] = buff; \ + segments[i].seg_addr.pval = buff->addr; \ + segments[i].seg_len = segs[i].seg_len; \ + memcpy(segments[i].seg_addr.pval, \ + segs[i].seg_addr.pval, \ + segs[i].seg_len); \ + } \ +} while(0) + +#define MCA_PML_OB1_FRAG_RETURN(frag) \ +do { \ + size_t i; \ + \ + /* return buffers */ \ + for(i=0; i<frag->num_segments; i++) { \ + OMPI_FREE_LIST_RETURN(&mca_pml_ob1.buffers, \ + (ompi_list_item_t*)frag->buffers[i]); \ + } \ + frag->num_segments = 0; \ + \ + /* return fragment */ \ + OMPI_FREE_LIST_RETURN(&mca_pml_ob1.fragments, \ + (ompi_list_item_t*)frag); \ +} while(0) /** diff --git a/src/mca/pml/ob1/pml_ob1_recvreq.c b/src/mca/pml/ob1/pml_ob1_recvreq.c index b0851298a4..0038aa9220 100644 --- a/src/mca/pml/ob1/pml_ob1_recvreq.c +++ b/src/mca/pml/ob1/pml_ob1_recvreq.c @@ -24,7 +24,7 @@ #include "pml_ob1_sendreq.h" -static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc( +static mca_pml_ob1_fragment_t* mca_pml_ob1_recv_request_match_specific_proc( mca_pml_ob1_recv_request_t* request, mca_pml_ob1_comm_proc_t* proc); @@ -123,7 +123,7 @@ static void mca_pml_ob1_recv_request_ack( hdr->hdr_match.hdr_src); mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); mca_bmi_base_descriptor_t* des; - mca_pml_ob1_recv_frag_t* frag; + mca_pml_ob1_fragment_t* frag; mca_pml_ob1_ack_hdr_t* ack; int rc; @@ -153,10 +153,9 @@ static void mca_pml_ob1_recv_request_ack( /* queue request to retry later */ retry: - MCA_PML_OB1_RECV_FRAG_ALLOC(frag,rc); + MCA_PML_OB1_FRAG_ALLOC(frag,rc); frag->bmi = NULL; frag->hdr.hdr_rndv = *hdr; - frag->segments = NULL; frag->num_segments = 0; frag->request = recvreq; ompi_list_append(&mca_pml_ob1.acks_pending, (ompi_list_item_t*)frag); @@ -249,7 +248,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request { mca_pml_ob1_comm_t* comm = request->req_recv.req_base.req_comm->c_pml_comm; mca_pml_ob1_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer; - mca_pml_ob1_recv_frag_t* frag; + mca_pml_ob1_fragment_t* frag; /* check for a specific match */ OMPI_THREAD_LOCK(&comm->matching_lock); @@ -264,7 +263,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request mca_pml_ob1_recv_request_progress(request,frag->bmi,frag->segments,frag->num_segments); if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) || (MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) { - MCA_PML_OB1_RECV_FRAG_RETURN(frag); + MCA_PML_OB1_FRAG_RETURN(frag); } return; /* match found */ } @@ -303,7 +302,7 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request) request->req_recv.req_base.req_sequence = comm->recv_sequence++; for (i = 0; i < proc_count; i++) { - mca_pml_ob1_recv_frag_t* frag; + mca_pml_ob1_fragment_t* frag; /* continue if no frags to match */ if (ompi_list_get_size(&proc->unexpected_frags) == 0) { @@ -318,7 +317,7 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request) mca_pml_ob1_recv_request_progress(request,frag->bmi,frag->segments,frag->num_segments); if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) || (MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) { - MCA_PML_OB1_RECV_FRAG_RETURN(frag); + MCA_PML_OB1_FRAG_RETURN(frag); } return; /* match found */ } @@ -340,19 +339,19 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request) * it places the request in the appropriate matched receive list. */ -static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc( +static mca_pml_ob1_fragment_t* mca_pml_ob1_recv_request_match_specific_proc( mca_pml_ob1_recv_request_t* request, mca_pml_ob1_comm_proc_t* proc) { ompi_list_t* unexpected_frags = &proc->unexpected_frags; - mca_pml_ob1_recv_frag_t* frag; + mca_pml_ob1_fragment_t* frag; mca_pml_ob1_match_hdr_t* hdr; int tag = request->req_recv.req_base.req_tag; if( OMPI_ANY_TAG == tag ) { - for (frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_first(unexpected_frags); - frag != (mca_pml_ob1_recv_frag_t*)ompi_list_get_end(unexpected_frags); - frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_next(frag)) { + for (frag = (mca_pml_ob1_fragment_t*)ompi_list_get_first(unexpected_frags); + frag != (mca_pml_ob1_fragment_t*)ompi_list_get_end(unexpected_frags); + frag = (mca_pml_ob1_fragment_t*)ompi_list_get_next(frag)) { hdr = &(frag->hdr.hdr_match); /* check first frag - we assume that process matching has been done already */ @@ -361,9 +360,9 @@ static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc( } } } else { - for (frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_first(unexpected_frags); - frag != (mca_pml_ob1_recv_frag_t*)ompi_list_get_end(unexpected_frags); - frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_next(frag)) { + for (frag = (mca_pml_ob1_fragment_t*)ompi_list_get_first(unexpected_frags); + frag != (mca_pml_ob1_fragment_t*)ompi_list_get_end(unexpected_frags); + frag = (mca_pml_ob1_fragment_t*)ompi_list_get_next(frag)) { hdr = &(frag->hdr.hdr_match); /* check first frag - we assume that process matching has been done already */ diff --git a/src/mca/pml/ob1/pml_ob1_recvreq.h b/src/mca/pml/ob1/pml_ob1_recvreq.h index d078dd8934..f4292c4866 100644 --- a/src/mca/pml/ob1/pml_ob1_recvreq.h +++ b/src/mca/pml/ob1/pml_ob1_recvreq.h @@ -52,13 +52,13 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_recv_request_t); * @param rc (OUT) OMPI_SUCCESS or error status on failure. * @return Receive request. */ -#define MCA_PML_OB1_RECV_REQUEST_ALLOC(recvreq, rc) \ - do { \ - ompi_list_item_t* item; \ - rc = OMPI_SUCCESS; \ - OMPI_FREE_LIST_GET(&mca_pml_ob1.recv_requests, item, rc); \ - recvreq = (mca_pml_ob1_recv_request_t*)item; \ - } while(0) +#define MCA_PML_OB1_RECV_REQUEST_ALLOC(recvreq, rc) \ +do { \ + ompi_list_item_t* item; \ + rc = OMPI_SUCCESS; \ + OMPI_FREE_LIST_GET(&mca_pml_ob1.recv_requests, item, rc); \ + recvreq = (mca_pml_ob1_recv_request_t*)item; \ +} while(0) /** @@ -82,7 +82,7 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_recv_request_t); tag, \ comm, \ persistent) \ -{ \ +do { \ MCA_PML_BASE_RECV_REQUEST_INIT( \ &(request)->req_recv, \ addr, \ @@ -92,18 +92,18 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_recv_request_t); tag, \ comm, \ persistent); \ -} +} while(0) /** * Return a recv request to the modules free list. * * @param request (IN) Receive request. */ -#define MCA_PML_OB1_RECV_REQUEST_RETURN(request) \ - do { \ - MCA_PML_BASE_RECV_REQUEST_FINI(&request->req_recv); \ - OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_requests, (ompi_list_item_t*)request); \ - } while(0) +#define MCA_PML_OB1_RECV_REQUEST_RETURN(request) \ +do { \ + MCA_PML_BASE_RECV_REQUEST_FINI(&request->req_recv); \ + OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_requests, (ompi_list_item_t*)request); \ +} while(0) /** * Attempt to match the request against the unexpected fragment list @@ -160,7 +160,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request #define MCA_PML_OB1_RECV_REQUEST_MATCHED( \ request, \ hdr) \ -{ \ +do { \ (request)->req_recv.req_bytes_packed = (hdr)->hdr_msg_length; \ (request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \ (request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = (hdr)->hdr_src; \ @@ -181,7 +181,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request 0, /* offset in bytes into packed buffer */ \ NULL ); /* not allocating memory */ \ } \ -} +} while (0) /** @@ -196,17 +196,22 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request data_offset, \ bytes_received, \ bytes_delivered) \ -{ \ +do { \ if(request->req_recv.req_base.req_count > 0) { \ struct iovec iov[MCA_BMI_DES_MAX_SEGMENTS]; \ - uint32_t iov_count = num_segments; \ + uint32_t iov_count = 0; \ uint32_t max_data = bytes_received; \ int32_t free_after = 0; \ - size_t i; \ - for(i=0; i<num_segments; i++) { \ - if(i == 0) { \ - iov[i].iov_base = (unsigned char*)segments[i].seg_addr.pval + seg_offset; \ - iov[i].iov_len = segments[i].seg_len - seg_offset; \ + size_t n, offset = seg_offset; \ + \ + for(n=0; n<num_segments; n++) { \ + mca_bmi_base_segment_t* segment = segments+n; \ + if(offset >= segment->seg_len) { \ + offset -= segment->seg_len; \ + } else { \ + iov[iov_count].iov_len = segment->seg_len - seg_offset; \ + iov[iov_count].iov_base = (unsigned char*)segment->seg_addr.pval + seg_offset; \ + iov_count++; \ } \ } \ ompi_convertor_unpack( \ @@ -217,7 +222,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request &free_after); \ bytes_delivered = max_data; \ } \ -} +} while (0) /** diff --git a/src/mca/pml/ob1/pml_ob1_sendreq.c b/src/mca/pml/ob1/pml_ob1_sendreq.c index f96811e0e4..a408d5e9ac 100644 --- a/src/mca/pml/ob1/pml_ob1_sendreq.c +++ b/src/mca/pml/ob1/pml_ob1_sendreq.c @@ -81,11 +81,7 @@ static void mca_pml_ob1_send_completion( mca_pml_ob1_endpoint_t* bmi_ep = sendreq->req_endpoint; /* for now - return the descriptor - may cache these at some point */ - if(NULL == bmi_ep->bmi_cache) { - bmi_ep->bmi_cache = descriptor; - } else { - bmi->bmi_free(bmi,descriptor); - } + MCA_PML_OB1_ENDPOINT_DES_RETURN(bmi_ep,descriptor); /* check for request completion */ OMPI_THREAD_LOCK(&ompi_request_lock); @@ -158,13 +154,9 @@ int mca_pml_ob1_send_request_start_copy( if(size == 0) { /* allocate a descriptor */ - if(NULL != (descriptor = endpoint->bmi_cache)) { - endpoint->bmi_cache = NULL; - } else { - descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); - if(NULL == descriptor) { - return OMPI_ERR_OUT_OF_RESOURCE; - } + MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor); + if(NULL == descriptor) { + return OMPI_ERR_OUT_OF_RESOURCE; } descriptor->des_cbfunc = mca_pml_ob1_send_completion; segment = descriptor->des_src; @@ -208,7 +200,7 @@ int mca_pml_ob1_send_request_start_copy( } /* allocate space for hdr + first fragment */ - descriptor = endpoint->bmi_alloc(endpoint->bmi, size + sizeof(mca_pml_ob1_hdr_t)); + MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor); if(NULL == descriptor) { return OMPI_ERR_OUT_OF_RESOURCE; }