diff --git a/src/mca/bmi/bmi.h b/src/mca/bmi/bmi.h index 421f5bfe99..6c91fcc7b5 100644 --- a/src/mca/bmi/bmi.h +++ b/src/mca/bmi/bmi.h @@ -154,7 +154,7 @@ typedef struct mca_bmi_base_segment_t mca_bmi_base_segment_t; /** * A descriptor that holds the parameters to a send/put/get - * operation along w/ a callback function that is called on + * operation along w/ a callback routine that is called on * completion of the request. */ @@ -373,7 +373,7 @@ typedef int (*mca_bmi_base_module_free_fn_t)( * @param bmi (IN) BMI module * @param peer (IN) BMI peer addressing */ -typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_pack_fn_t)( +typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_prepare_fn_t)( struct mca_bmi_base_module_t* bmi, struct mca_bmi_base_endpoint_t* peer, struct ompi_convertor_t* convertor, @@ -381,6 +381,31 @@ typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_pack_fn_t)( size_t* size ); +/** + * Pack data and return a descriptor that can be + * used for send/put. + * + * @param bmi (IN) BMI module + * @param endpoint (IN) BMI peer addressing + */ + +typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_prepare_dst_fn_t)( + struct mca_bmi_base_module_t* bmi, + struct mca_bmi_base_endpoint_t* endpoint, + struct ompi_convertor_t* convertor, + size_t reserve, + size_t* size +); + + +typedef int (*mca_bmi_base_module_unpack_fn_t)( + struct mca_bmi_base_module_t* bmi, + struct mca_bmi_base_endpoint_t* peer, + struct ompi_convertor_t* convertor, + struct mca_bmi_base_descriptor_t* descriptor +); + + /** * Initiate a send to the peer. @@ -405,7 +430,7 @@ typedef int (*mca_bmi_base_module_send_fn_t)( typedef int (*mca_bmi_base_module_put_fn_t)( struct mca_bmi_base_module_t* bmi, - struct mca_bmi_base_endpoint_t* peer, + struct mca_bmi_base_endpoint_t* endpoint, struct mca_bmi_base_descriptor_t* descriptor ); @@ -447,7 +472,8 @@ struct mca_bmi_base_module_t { mca_bmi_base_module_alloc_fn_t bmi_alloc; mca_bmi_base_module_free_fn_t bmi_free; - mca_bmi_base_module_pack_fn_t bmi_pack; + mca_bmi_base_module_prepare_fn_t bmi_prepare_src; + mca_bmi_base_module_prepare_fn_t bmi_prepare_dst; mca_bmi_base_module_send_fn_t bmi_send; mca_bmi_base_module_put_fn_t bmi_put; mca_bmi_base_module_get_fn_t bmi_get; diff --git a/src/mca/bmi/ib/Makefile.am b/src/mca/bmi/ib/Makefile.am index 41fd7adcb6..4dd7f87e17 100644 --- a/src/mca/bmi/ib/Makefile.am +++ b/src/mca/bmi/ib/Makefile.am @@ -44,7 +44,7 @@ libmca_bmi_ib_la_SOURCES = \ # mca__.la (for DSO builds) or libmca__.la # (for static builds). -if OMPI_BUILD_bmi_sm_DSO +if OMPI_BUILD_bmi_ib_DSO component_noinst = component_install = mca_bmi_ib.la else diff --git a/src/mca/bmi/sm/bmi_sm.c b/src/mca/bmi/sm/bmi_sm.c index 62dce6396b..cbc2c2e408 100644 --- a/src/mca/bmi/sm/bmi_sm.c +++ b/src/mca/bmi/sm/bmi_sm.c @@ -60,7 +60,8 @@ mca_bmi_sm_t mca_bmi_sm[2] = { mca_bmi_sm_finalize, mca_bmi_sm_alloc, mca_bmi_sm_free, - mca_bmi_sm_pack, + mca_bmi_sm_prepare_src, + NULL, mca_bmi_sm_send, NULL, /* put */ NULL /* get */ @@ -82,7 +83,8 @@ mca_bmi_sm_t mca_bmi_sm[2] = { mca_bmi_sm_finalize, mca_bmi_sm_alloc, mca_bmi_sm_free, - mca_bmi_sm_pack, + mca_bmi_sm_prepare_src, + NULL, mca_bmi_sm_send, NULL, /* get function */ NULL /* put function */ @@ -763,9 +765,9 @@ extern int mca_bmi_sm_free( { mca_bmi_sm_frag_t* frag = (mca_bmi_sm_frag_t*)des; if(frag->size <= mca_bmi_sm_component.first_fragment_size) { - MCA_BMI_SM_FRAG_RETURN1(des); + MCA_BMI_SM_FRAG_RETURN1(frag); } else { - MCA_BMI_SM_FRAG_RETURN2(des); + MCA_BMI_SM_FRAG_RETURN2(frag); } return OMPI_SUCCESS; } @@ -777,14 +779,37 @@ extern int mca_bmi_sm_free( * @param bmi (IN) BMI module * @param peer (IN) BMI peer addressing */ -struct mca_bmi_base_descriptor_t* mca_bmi_sm_pack( +struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src( struct mca_bmi_base_module_t* bmi, struct mca_bmi_base_endpoint_t* peer, struct ompi_convertor_t* convertor, size_t reserve, size_t* size) { - return NULL; + mca_bmi_sm_frag_t* frag; + struct iovec iov; + uint32_t iov_count = 1; + uint32_t max_data = *size; + int rc; + + MCA_BMI_SM_FRAG_ALLOC2(frag, rc); + if(NULL == frag) { + return NULL; + } + + if(max_data + reserve > frag->size) { + max_data = *size - reserve; + } + iov.iov_len = max_data; + iov.iov_base = (unsigned char*)(frag+1) + reserve; + + rc = ompi_convertor_pack(convertor, &iov, &iov_count, &max_data, NULL); + if(rc < 0) { + MCA_BMI_SM_FRAG_RETURN2(frag); + return NULL; + } + *size = max_data; + return &frag->base; } diff --git a/src/mca/bmi/sm/bmi_sm.h b/src/mca/bmi/sm/bmi_sm.h index c2db0aed94..c0284cb6ff 100644 --- a/src/mca/bmi/sm/bmi_sm.h +++ b/src/mca/bmi/sm/bmi_sm.h @@ -300,7 +300,7 @@ extern int mca_bmi_sm_free( * @param bmi (IN) BMI module * @param peer (IN) BMI peer addressing */ -struct mca_bmi_base_descriptor_t* mca_bmi_sm_pack( +struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src( struct mca_bmi_base_module_t* bmi, struct mca_bmi_base_endpoint_t* peer, struct ompi_convertor_t* convertor, diff --git a/src/mca/bmi/sm/bmi_sm_frag.h b/src/mca/bmi/sm/bmi_sm_frag.h index bdca8234dd..8601635139 100644 --- a/src/mca/bmi/sm/bmi_sm_frag.h +++ b/src/mca/bmi/sm/bmi_sm_frag.h @@ -70,12 +70,12 @@ OBJ_CLASS_DECLARATION(mca_bmi_sm_frag2_t); #define MCA_BMI_SM_FRAG_RETURN1(frag) \ { \ - OMPI_FREE_LIST_RETURN(&mca_bmi_sm_component.sm_frags1, &frag->super); \ + OMPI_FREE_LIST_RETURN(&mca_bmi_sm_component.sm_frags1, (ompi_list_item_t*)(frag)); \ } #define MCA_BMI_SM_FRAG_RETURN2(frag) \ { \ - OMPI_FREE_LIST_RETURN(&mca_bmi_sm_component.sm_frags2, &frag->super); \ + OMPI_FREE_LIST_RETURN(&mca_bmi_sm_component.sm_frags2, (ompi_list_item_t*)(frag)); \ } #endif diff --git a/src/mca/pml/ob1/pml_ob1.h b/src/mca/pml/ob1/pml_ob1.h index 53c2984395..480409c4d6 100644 --- a/src/mca/pml/ob1/pml_ob1.h +++ b/src/mca/pml/ob1/pml_ob1.h @@ -55,7 +55,8 @@ struct mca_pml_ob1_t { int free_list_num; /* initial size of free list */ int free_list_max; /* maximum size of free list */ int free_list_inc; /* number of elements to grow free list */ - int poll_iterations; /* number of iterations to poll for completion */ + size_t send_pipeline_depth; + size_t recv_pipeline_depth; /* lock queue access */ ompi_mutex_t lock; @@ -66,6 +67,7 @@ struct mca_pml_ob1_t { /* list of pending send requests */ ompi_list_t send_pending; + ompi_list_t acks_pending; }; typedef struct mca_pml_ob1_t mca_pml_ob1_t; @@ -203,7 +205,6 @@ extern void mca_pml_ob1_recv_callback( mca_bmi_base_descriptor_t* descriptor, void* cbdata ); - extern int mca_pml_ob1_progress(void); diff --git a/src/mca/pml/ob1/pml_ob1_component.c b/src/mca/pml/ob1/pml_ob1_component.c index 6883c370ab..3e3193ee67 100644 --- a/src/mca/pml/ob1/pml_ob1_component.c +++ b/src/mca/pml/ob1/pml_ob1_component.c @@ -93,8 +93,6 @@ int mca_pml_ob1_component_open(void) mca_pml_ob1_param_register_int("free_list_max", -1); mca_pml_ob1.free_list_inc = mca_pml_ob1_param_register_int("free_list_inc", 256); - mca_pml_ob1.poll_iterations = - mca_pml_ob1_param_register_int("poll_iterations", 100000); mca_pml_ob1.priority = mca_pml_ob1_param_register_int("priority", 0); diff --git a/src/mca/pml/ob1/pml_ob1_endpoint.h b/src/mca/pml/ob1/pml_ob1_endpoint.h index c535da4530..caf12541bf 100644 --- a/src/mca/pml/ob1/pml_ob1_endpoint.h +++ b/src/mca/pml/ob1/pml_ob1_endpoint.h @@ -27,23 +27,28 @@ extern "C" { /** * A data structure associated with a ompi_proc_t that caches - * addressing/scheduling attributes for a specific NTL instance + * addressing/scheduling attributes for a specific BMI instance * that can be used to reach the process. */ struct mca_pml_ob1_endpoint_t { - int bmi_weight; /**< NTL weight for scheduling */ - size_t bmi_eager_limit; /**< NTL eager limit */ - struct mca_bmi_base_module_t *bmi; /**< NTL module */ - struct mca_bmi_base_endpoint_t* bmi_endpoint; /**< NTL addressing info */ + int bmi_weight; /**< BMI weight for scheduling */ + int bmi_flags; /**< support for put/get? */ + size_t bmi_eager_limit; /**< BMI eager limit */ + size_t bmi_min_seg_size; /**< BMI min segment size */ + size_t bmi_max_seg_size; /**< BMI max segment size */ + struct mca_bmi_base_module_t *bmi; /**< BMI module */ + struct mca_bmi_base_endpoint_t* bmi_endpoint; /**< BMI addressing info */ struct mca_bmi_base_descriptor_t* bmi_cache; - /* NTL function table */ - mca_bmi_base_module_alloc_fn_t bmi_alloc; - mca_bmi_base_module_free_fn_t bmi_free; - mca_bmi_base_module_send_fn_t bmi_send; - mca_bmi_base_module_put_fn_t bmi_put; - mca_bmi_base_module_get_fn_t bmi_get; + /* BMI function table */ + mca_bmi_base_module_alloc_fn_t bmi_alloc; + mca_bmi_base_module_free_fn_t bmi_free; + mca_bmi_base_module_send_fn_t bmi_send; + mca_bmi_base_module_prepare_fn_t bmi_prepare_src; + mca_bmi_base_module_prepare_fn_t bmi_prepare_dst; + mca_bmi_base_module_put_fn_t bmi_put; + mca_bmi_base_module_get_fn_t bmi_get; }; typedef struct mca_pml_ob1_endpoint_t mca_pml_ob1_endpoint_t; diff --git a/src/mca/pml/ob1/pml_ob1_hdr.h b/src/mca/pml/ob1/pml_ob1_hdr.h index bb89d8cf1e..cf6e40713f 100644 --- a/src/mca/pml/ob1/pml_ob1_hdr.h +++ b/src/mca/pml/ob1/pml_ob1_hdr.h @@ -30,15 +30,17 @@ #define MCA_PML_OB1_HDR_TYPE_MATCH 1 #define MCA_PML_OB1_HDR_TYPE_RNDV 2 -#define MCA_PML_OB1_HDR_TYPE_FRAG 3 -#define MCA_PML_OB1_HDR_TYPE_ACK 4 -#define MCA_PML_OB1_HDR_TYPE_NACK 5 -#define MCA_PML_OB1_HDR_TYPE_GET 6 -#define MCA_PML_OB1_HDR_TYPE_FIN 7 -#define MCA_PML_OB1_HDR_TYPE_MAX 8 +#define MCA_PML_OB1_HDR_TYPE_ACK 3 +#define MCA_PML_OB1_HDR_TYPE_NACK 4 +#define MCA_PML_OB1_HDR_TYPE_FRAG 5 +#define MCA_PML_OB1_HDR_TYPE_SEG 6 +#define MCA_PML_OB1_HDR_TYPE_GET 7 +#define MCA_PML_OB1_HDR_TYPE_FIN 8 +#define MCA_PML_OB1_HDR_TYPE_MAX 9 #define MCA_PML_OB1_HDR_FLAGS_ACK 1 /* is an ack required */ #define MCA_PML_OB1_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */ +#define MCA_PML_OB1_HDR_FLAGS_PUT 3 /* @@ -138,8 +140,8 @@ typedef struct mca_pml_ob1_match_hdr_t mca_pml_ob1_match_hdr_t; */ struct mca_pml_ob1_rendezvous_hdr_t { mca_pml_ob1_match_hdr_t hdr_match; - uint64_t hdr_frag_length; /**< fragment length */ - ompi_ptr_t hdr_src_ptr; /**< pointer to source fragment - returned in ack */ + uint64_t hdr_frag_length; /**< fragment length */ + ompi_ptr_t hdr_src_req; /**< pointer to source request - returned in ack */ }; typedef struct mca_pml_ob1_rendezvous_hdr_t mca_pml_ob1_rendezvous_hdr_t; @@ -162,8 +164,8 @@ struct mca_pml_ob1_frag_hdr_t { mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */ uint64_t hdr_frag_length; /**< fragment length */ uint64_t hdr_frag_offset; /**< offset into message */ - ompi_ptr_t hdr_src_ptr; /**< pointer to source fragment */ - ompi_ptr_t hdr_dst_ptr; /**< pointer to matched receive */ + ompi_ptr_t hdr_src_req; /**< pointer to source request */ + ompi_ptr_t hdr_dst_req; /**< pointer to matched receive */ }; typedef struct mca_pml_ob1_frag_hdr_t mca_pml_ob1_frag_hdr_t; @@ -185,13 +187,12 @@ typedef struct mca_pml_ob1_frag_hdr_t mca_pml_ob1_frag_hdr_t; /** * Header used to acknowledgment outstanding fragment(s). */ + struct mca_pml_ob1_ack_hdr_t { - mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */ - ompi_ptr_t hdr_src_ptr; /**< source fragment */ - ompi_ptr_t hdr_dst_match; /**< matched receive request */ - ompi_ptr_t hdr_dst_addr; /**< posted receive buffer */ - uint64_t hdr_dst_size; /**< size of posted buffer */ - /* sequence range? */ + mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */ + ompi_ptr_t hdr_src_req; /**< source request */ + ompi_ptr_t hdr_dst_req; /**< matched receive request */ + mca_bmi_base_segment_t hdr_seg; /**< segment */ }; typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t; diff --git a/src/mca/pml/ob1/pml_ob1_proc.h b/src/mca/pml/ob1/pml_ob1_proc.h index e964c39698..815e7de88a 100644 --- a/src/mca/pml/ob1/pml_ob1_proc.h +++ b/src/mca/pml/ob1/pml_ob1_proc.h @@ -34,9 +34,10 @@ extern "C" { */ struct mca_pml_proc_t { ompi_object_t super; - ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */ - ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */ - volatile uint32_t proc_sequence; /**< sequence number for send */ + ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */ + ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */ + int proc_flags; /**< prefered method of accessing this peer */ + volatile uint32_t proc_sequence; /**< sequence number for send */ mca_pml_ob1_ep_array_t bmi_first; /**< array of endpoints to use for first fragments */ mca_pml_ob1_ep_array_t bmi_next; /**< array of endpoints to use for remaining fragments */ }; diff --git a/src/mca/pml/ob1/pml_ob1_recvfrag.c b/src/mca/pml/ob1/pml_ob1_recvfrag.c index 453291756a..bc7b17bada 100644 --- a/src/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/src/mca/pml/ob1/pml_ob1_recvfrag.c @@ -23,6 +23,7 @@ #include "mca/pml/pml.h" #include "pml_ob1_recvfrag.h" #include "pml_ob1_recvreq.h" +#include "pml_ob1_sendreq.h" #include "pml_ob1_proc.h" #include "pml_ob1_match.h" @@ -36,7 +37,6 @@ OBJ_CLASS_INSTANCE( - void mca_pml_ob1_recv_callback( mca_bmi_base_module_t* bmi, mca_bmi_base_tag_t tag, @@ -56,6 +56,9 @@ void mca_pml_ob1_recv_callback( case MCA_PML_OB1_HDR_TYPE_RNDV: mca_pml_ob1_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt); break; + case MCA_PML_OB1_HDR_TYPE_ACK: + mca_pml_ob1_send_request_acked(bmi,&hdr->hdr_ack); + break; default: break; } diff --git a/src/mca/pml/ob1/pml_ob1_recvreq.c b/src/mca/pml/ob1/pml_ob1_recvreq.c index 087cc133da..6982599925 100644 --- a/src/mca/pml/ob1/pml_ob1_recvreq.c +++ b/src/mca/pml/ob1/pml_ob1_recvreq.c @@ -95,41 +95,143 @@ OBJ_CLASS_INSTANCE( mca_pml_ob1_recv_request_construct, mca_pml_ob1_recv_request_destruct); + +/* + * Release resources. + */ + +static void mca_pml_ob1_recv_request_acked( + mca_bmi_base_module_t* bmi, + struct mca_bmi_base_endpoint_t* ep, + struct mca_bmi_base_descriptor_t* des, + int status) +{ + bmi->bmi_free(bmi,des); +} + + +/* + * + */ + +static void mca_pml_ob1_recv_request_ack( + mca_pml_ob1_recv_request_t* recvreq, + mca_pml_ob1_rendezvous_hdr_t* hdr) +{ + mca_pml_ob1_proc_t* proc = mca_pml_ob1_proc_lookup_remote( + recvreq->req_recv.req_base.req_comm, + hdr->hdr_match.hdr_src); + mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); + mca_bmi_base_descriptor_t* des; + mca_pml_ob1_recv_frag_t* frag; + mca_pml_ob1_ack_hdr_t* ack; + int rc; + + /* allocate descriptor */ + des = ep->bmi_alloc(ep->bmi, sizeof(mca_pml_ob1_ack_hdr_t)); + if(NULL == des) { + goto retry; + } + + /* fill out header */ + ack = (mca_pml_ob1_ack_hdr_t*)des->des_src->seg_addr.pval; + ack->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_ACK; + ack->hdr_common.hdr_flags = 0; + ack->hdr_src_req = hdr->hdr_src_req; + ack->hdr_dst_req.pval = recvreq; + + /* initialize descriptor */ + des->des_cbfunc = mca_pml_ob1_recv_request_acked; + des->des_cbdata = recvreq; + + rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML); + if(rc != OMPI_SUCCESS) { + ep->bmi_free(ep->bmi,des); + goto retry; + } + return; + + /* queue request to retry later */ +retry: + MCA_PML_OB1_RECV_FRAG_ALLOC(frag,rc); + frag->bmi = NULL; + frag->hdr.hdr_rndv = *hdr; + frag->segments = NULL; + frag->num_segments = 0; + frag->request = recvreq; + ompi_list_append(&mca_pml_ob1.acks_pending, (ompi_list_item_t*)frag); +} + + /* * Update the recv request status to reflect the number of bytes * received and actually delivered to the application. */ void mca_pml_ob1_recv_request_progress( - mca_pml_ob1_recv_request_t* req, + mca_pml_ob1_recv_request_t* recvreq, mca_bmi_base_module_t* bmi, mca_bmi_base_segment_t* segments, size_t num_segments) { - size_t bytes_received = 0; - size_t bytes_delivered = 0; + uint64_t bytes_received = 0; + uint64_t bytes_delivered = 0; mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval; switch(hdr->hdr_common.hdr_type) { case MCA_PML_OB1_HDR_TYPE_MATCH: + bytes_received = hdr->hdr_match.hdr_msg_length; + MCA_PML_OB1_RECV_REQUEST_UNPACK( + recvreq, + segments, + num_segments, + sizeof(mca_pml_ob1_match_hdr_t), + 0, + bytes_received, + bytes_delivered); break; + case MCA_PML_OB1_HDR_TYPE_RNDV: - bytes_received = hdr->hdr_frag.hdr_frag_length; + + mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv); + bytes_received = hdr->hdr_rndv.hdr_frag_length; + MCA_PML_OB1_RECV_REQUEST_UNPACK( + recvreq, + segments, + num_segments, + sizeof(mca_pml_ob1_rendezvous_hdr_t), + 0, + bytes_received, + bytes_delivered); break; + + case MCA_PML_OB1_HDR_TYPE_FRAG: + + bytes_received = hdr->hdr_frag.hdr_frag_length; + MCA_PML_OB1_RECV_REQUEST_UNPACK( + recvreq, + segments, + num_segments, + sizeof(mca_pml_ob1_rendezvous_hdr_t), + hdr->hdr_frag.frag_offset, + bytes_received, + bytes_delivered); + break; + default: break; } - bytes_delivered = bytes_received; + /* check completion status */ OMPI_THREAD_LOCK(&ompi_request_lock); - req->req_bytes_received += bytes_received; - req->req_bytes_delivered += bytes_delivered; - if (req->req_bytes_received >= req->req_recv.req_bytes_packed) { + recvreq->req_bytes_received += bytes_received; + recvreq->req_bytes_delivered += bytes_delivered; + if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) { /* initialize request status */ - req->req_recv.req_base.req_ompi.req_status._count = req->req_bytes_delivered; - req->req_recv.req_base.req_pml_complete = true; - req->req_recv.req_base.req_ompi.req_complete = true; + recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered; + recvreq->req_recv.req_base.req_pml_complete = true; + recvreq->req_recv.req_base.req_ompi.req_complete = true; if(ompi_request_waiting) { ompi_condition_broadcast(&ompi_request_cond); } diff --git a/src/mca/pml/ob1/pml_ob1_recvreq.h b/src/mca/pml/ob1/pml_ob1_recvreq.h index 90e2328cb7..f21d3de4e5 100644 --- a/src/mca/pml/ob1/pml_ob1_recvreq.h +++ b/src/mca/pml/ob1/pml_ob1_recvreq.h @@ -31,6 +31,13 @@ struct mca_pml_ob1_recv_request_t { mca_pml_base_recv_request_t req_recv; size_t req_bytes_received; size_t req_bytes_delivered; + + /* note that we allocate additional space for the recv + * request to increase the array size based on run-time + * parameters for the pipeline depth. So... this MUST be + * the last element of this struct. + */ + mca_bmi_base_descriptor_t *req_pipeline[1]; }; typedef struct mca_pml_ob1_recv_request_t mca_pml_ob1_recv_request_t; @@ -119,7 +126,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request * @param request Receive request. * @return OMPI_SUCESS or error status on failure. */ -#define MCA_PML_OB1_RECV_REQUEST_START(request) \ +#define MCA_PML_OB1_RECV_REQUEST_START(request) \ { \ /* init/re-init the request */ \ (request)->req_bytes_received = 0; \ @@ -138,13 +145,29 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request \ /* attempt to match posted recv */ \ if((request)->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) { \ - mca_pml_ob1_recv_request_match_wild(request); \ + mca_pml_ob1_recv_request_match_wild(request); \ } else { \ - mca_pml_ob1_recv_request_match_specific(request); \ + mca_pml_ob1_recv_request_match_specific(request); \ } \ } +/** + * + */ + +#define MCA_PML_OB1_RECV_REQUEST_UNPACK( \ + request, \ + segments, \ + num_segments, \ + seg_offset, \ + data_offset, \ + bytes_received, \ + bytes_delivered) \ +{ \ +} + + /** * */ diff --git a/src/mca/pml/ob1/pml_ob1_sendreq.c b/src/mca/pml/ob1/pml_ob1_sendreq.c index 03affaac33..d13be7818b 100644 --- a/src/mca/pml/ob1/pml_ob1_sendreq.c +++ b/src/mca/pml/ob1/pml_ob1_sendreq.c @@ -78,8 +78,8 @@ static void mca_pml_ob1_send_completion( int status) { mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata; - mca_pml_ob1_endpoint_t* ob1_ep = sendreq->req_endpoint; + /* check for request completion */ OMPI_THREAD_LOCK(&ompi_request_lock); if (sendreq->req_offset == sendreq->req_send.req_bytes_packed) { sendreq->req_send.req_base.req_pml_complete = true; @@ -97,30 +97,27 @@ static void mca_pml_ob1_send_completion( } else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); } + sendreq->req_state = MCA_PML_OB1_SR_COMPLETE; } OMPI_THREAD_UNLOCK(&ompi_request_lock); + /* advance based on request state */ + MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq); + /* check for pending requests that need to be progressed */ while(ompi_list_get_size(&mca_pml_ob1.send_pending) != 0) { OMPI_THREAD_LOCK(&mca_pml_ob1.ob1_lock); sendreq = (mca_pml_ob1_send_request_t*)ompi_list_remove_first(&mca_pml_ob1.send_pending); OMPI_THREAD_UNLOCK(&mca_pml_ob1.ob1_lock); } - - /* release NTL resources */ - if(ob1_ep->bmi_cache == NULL) { - ob1_ep->bmi_cache = descriptor; - } else { - ob1_ep->bmi_free(bmi,descriptor); - } } /** - * NTL can send directly from user allocated memory. + * BMI can send directly from user allocated memory. */ -int mca_pml_ob1_send_user( +int mca_pml_ob1_send_request_start_user( mca_pml_ob1_send_request_t* sendreq, mca_pml_ob1_endpoint_t* endpoint) { @@ -129,11 +126,11 @@ int mca_pml_ob1_send_user( /** - * NTL requires "specially" allocated memory. Request a segment that + * BMI requires "specially" allocated memory. Request a segment that * is used for initial hdr and any eager data. */ -int mca_pml_ob1_send_copy( +int mca_pml_ob1_send_request_start_copy( mca_pml_ob1_send_request_t* sendreq, mca_pml_ob1_endpoint_t* endpoint) { @@ -152,7 +149,6 @@ int mca_pml_ob1_send_copy( } else { descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); if(NULL == descriptor) { - OBJ_RELEASE(sendreq); return OMPI_ERR_OUT_OF_RESOURCE; } descriptor->des_cbfunc = mca_pml_ob1_send_completion; @@ -173,31 +169,34 @@ int mca_pml_ob1_send_copy( hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH; segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t); + ompi_request_complete((ompi_request_t*)sendreq); } else { hdr->hdr_common.hdr_flags = MCA_PML_OB1_HDR_FLAGS_ACK; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV; hdr->hdr_rndv.hdr_frag_length = 0; - hdr->hdr_rndv.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_rndv.hdr_src_ptr.pval = sendreq; + hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_rndv.hdr_src_req.pval = sendreq; segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t); } - ompi_request_complete((ompi_request_t*)sendreq); } else { struct iovec iov; unsigned int iov_count; unsigned int max_data; + int flags = 0; /* determine first fragment size */ if(size > endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t)) { size = endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t); - } + flags = MCA_PML_OB1_HDR_FLAGS_ACK; + } else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) { + flags = MCA_PML_OB1_HDR_FLAGS_ACK; + } /* allocate space for hdr + first fragment */ descriptor = endpoint->bmi_alloc(endpoint->bmi, size + sizeof(mca_pml_ob1_hdr_t)); if(NULL == descriptor) { - OBJ_RELEASE(sendreq); return OMPI_ERR_OUT_OF_RESOURCE; } descriptor->des_cbfunc = mca_pml_ob1_send_completion; @@ -205,6 +204,7 @@ int mca_pml_ob1_send_copy( /* build hdr */ hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval; + hdr->hdr_common.hdr_flags = flags; hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer; @@ -213,8 +213,7 @@ int mca_pml_ob1_send_copy( hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence; /* if an acknowledgment is not required - can get by w/ shorter hdr */ - if (sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) { - hdr->hdr_common.hdr_flags = MCA_PML_OB1_HDR_FLAGS_ACK; + if (flags == 0) { hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH; /* pack the data into the supplied buffer */ @@ -229,19 +228,21 @@ int mca_pml_ob1_send_copy( &max_data, NULL)) < 0) { endpoint->bmi_free(endpoint->bmi, descriptor); - OBJ_RELEASE(sendreq); return rc; } /* update length w/ number of bytes actually packed */ segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data; + sendreq->req_offset = max_data; + if(sendreq->req_offset == sendreq->req_send.req_bytes_packed) { + ompi_request_complete((ompi_request_t*)sendreq); + } /* rendezvous header is required */ } else { - hdr->hdr_common.hdr_flags = MCA_PML_OB1_HDR_FLAGS_ACK; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV; - hdr->hdr_rndv.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_rndv.hdr_src_ptr.pval = sendreq; + hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_rndv.hdr_src_req.pval = sendreq; /* pack the data into the supplied buffer */ iov.iov_base = (unsigned char*)segment->seg_addr.pval + sizeof(mca_pml_ob1_rendezvous_hdr_t); @@ -255,16 +256,12 @@ int mca_pml_ob1_send_copy( &max_data, NULL)) < 0) { endpoint->bmi_free(endpoint->bmi, descriptor); - OBJ_RELEASE(sendreq); return rc; } hdr->hdr_rndv.hdr_frag_length = max_data; segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data; - } - sendreq->req_offset = max_data; - if(sendreq->req_offset == sendreq->req_send.req_bytes_packed) { - ompi_request_complete((ompi_request_t*)sendreq); + sendreq->req_offset = max_data; } } descriptor->des_cbdata = sendreq; @@ -277,9 +274,126 @@ int mca_pml_ob1_send_copy( MCA_BMI_TAG_PML); if(OMPI_SUCCESS != rc) { endpoint->bmi_free(endpoint->bmi,descriptor); - OBJ_RELEASE(sendreq); } return rc; } +/** + * + */ + +int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq) +{ + /* + * Only allow one thread in this routine for a given request. + * However, we cannot block callers on a mutex, so simply keep track + * of the number of times the routine has been called and run through + * the scheduling logic once for every call. + */ + if(OMPI_THREAD_ADD32(&sendreq->req_lock,1) == 1) { + mca_pml_ob1_proc_t* proc = sendreq->req_proc; + do { + /* allocate remaining bytes to PTLs */ + size_t bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_offset; + while(bytes_remaining > 0 && sendreq->req_pending < mca_pml_ob1.send_pipeline_depth) { + mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_next); + mca_pml_ob1_frag_hdr_t* hdr; + mca_bmi_base_descriptor_t* des; + int rc; + + /* if this is the last PTL that is available to use, or the number of + * bytes remaining in the message is less than the PTLs minimum fragment + * size, then go ahead and give the rest of the message to this PTL. + */ + size_t size; + if(bytes_remaining < ep->bmi_min_seg_size) { + size = bytes_remaining; + + /* otherwise attempt to give the PTL a percentage of the message + * based on a weighting factor. for simplicity calculate this as + * a percentage of the overall message length (regardless of amount + * previously assigned) + */ + } else { + size = (ep->bmi_weight * bytes_remaining) / 100; + } + + /* makes sure that we don't exceed ptl_max_frag_size */ + if(ep->bmi_max_seg_size != 0 && size > ep->bmi_max_seg_size) + size = ep->bmi_max_seg_size; + + /* pack into a descriptor */ + des = ep->bmi_prepare_src( + ep->bmi, + ep->bmi_endpoint, + &sendreq->req_send.req_convertor, + sizeof(mca_pml_ob1_frag_hdr_t), + &size); + if(des == NULL) { + break; + } + + /* prepare header */ + hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval; + hdr->hdr_frag_length = size; + hdr->hdr_frag_offset = sendreq->req_offset; + hdr->hdr_src_req.pval = sendreq; + hdr->hdr_dst_req.pval = sendreq->req_peer.pval; + + /* update state */ + sendreq->req_offset += size; + sendreq->req_pending++; + + /* initiate send - note that this may complete before the call returns */ + rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML); + if(rc == OMPI_SUCCESS) { + bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_offset; + } else { + sendreq->req_offset -= size; + sendreq->req_pending--; + break; + } + } + } while (OMPI_THREAD_ADD32(&sendreq->req_lock,-1) > 0); + } + return OMPI_SUCCESS; +} + + + +/** + * + */ + +int mca_pml_ob1_send_request_acked( + mca_bmi_base_module_t* bmi, + mca_pml_ob1_ack_hdr_t* hdr) +{ + mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*) + hdr->hdr_src_req.pval; + if(hdr->hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PUT) { + sendreq->req_state = MCA_PML_OB1_SR_PUT; + mca_pml_ob1_send_request_put(sendreq); + } else { + sendreq->req_state = MCA_PML_OB1_SR_SEND; + mca_pml_ob1_send_request_send(sendreq); + } + return OMPI_SUCCESS; +} + + +/* + * + */ + +int mca_pml_ob1_send_request_put( + mca_pml_ob1_send_request_t* sendreq) +{ + return OMPI_SUCCESS; +} + + + + + diff --git a/src/mca/pml/ob1/pml_ob1_sendreq.h b/src/mca/pml/ob1/pml_ob1_sendreq.h index 529ca9bc95..acf01ec60a 100644 --- a/src/mca/pml/ob1/pml_ob1_sendreq.h +++ b/src/mca/pml/ob1/pml_ob1_sendreq.h @@ -23,16 +23,29 @@ #include "mca/pml/base/pml_base_sendreq.h" #include "pml_ob1_proc.h" #include "pml_ob1_comm.h" +#include "pml_ob1_hdr.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif +typedef enum { + MCA_PML_OB1_SR_INIT, + MCA_PML_OB1_SR_WAIT, + MCA_PML_OB1_SR_SEND, + MCA_PML_OB1_SR_PUT, + MCA_PML_OB1_SR_COMPLETE +} mca_pml_ob1_send_request_state_t; + struct mca_pml_ob1_send_request_t { mca_pml_base_send_request_t req_send; mca_pml_ob1_proc_t* req_proc; mca_pml_ob1_endpoint_t* req_endpoint; + mca_pml_ob1_send_request_state_t req_state; + ompi_ptr_t req_peer; + int32_t req_lock; + size_t req_pending; size_t req_offset; }; typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t; @@ -41,27 +54,27 @@ typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t; OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); -#define MCA_PML_OB1_SEND_REQUEST_ALLOC( \ +#define MCA_PML_OB1_SEND_REQUEST_ALLOC( \ comm, \ dst, \ sendreq, \ rc) \ { \ - mca_pml_ob1_proc_t *proc = comm->c_pml_procs[dst]; \ + mca_pml_ob1_proc_t *proc = comm->c_pml_procs[dst]; \ ompi_list_item_t* item; \ \ if(NULL == proc) { \ rc = OMPI_ERR_OUT_OF_RESOURCE; \ } else { \ rc = OMPI_SUCCESS; \ - OMPI_FREE_LIST_WAIT(&mca_pml_ob1.send_requests, item, rc); \ - sendreq = (mca_pml_ob1_send_request_t*)item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_ob1.send_requests, item, rc); \ + sendreq = (mca_pml_ob1_send_request_t*)item; \ sendreq->req_proc = proc; \ } \ } -#define MCA_PML_OB1_SEND_REQUEST_INIT( \ +#define MCA_PML_OB1_SEND_REQUEST_INIT( \ sendreq, \ buf, \ count, \ @@ -83,38 +96,17 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); persistent); \ } - -/** - * NTL doesn't require pre-pinned or "specially" allocated memory. - * Can try to directly send from the users buffer if contigous. - */ - -int mca_pml_ob1_send_user( - mca_pml_ob1_send_request_t* sendreq, - mca_pml_ob1_endpoint_t* endpoint); - - -/** - * NTL requires "specially" allocated memory. Request a segment that - * is used for initial hdr and any eager data. - */ - -int mca_pml_ob1_send_copy( - mca_pml_ob1_send_request_t* sendreq, - mca_pml_ob1_endpoint_t* endpoint); - - /** * Start a send request. */ -#define MCA_PML_OB1_SEND_REQUEST_START(sendreq, rc) \ +#define MCA_PML_OB1_SEND_REQUEST_START(sendreq, rc) \ { \ - mca_pml_ob1_endpoint_t* endpoint; \ - mca_pml_ob1_proc_t* proc = sendreq->req_proc; \ + mca_pml_ob1_endpoint_t* endpoint; \ + mca_pml_ob1_proc_t* proc = sendreq->req_proc; \ \ /* select next endpoint */ \ - endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); \ + endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); \ sendreq->req_offset = 0; \ sendreq->req_send.req_base.req_ompi.req_complete = false; \ sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \ @@ -127,22 +119,84 @@ int mca_pml_ob1_send_copy( } \ \ if(NULL != endpoint->bmi_alloc) { \ - rc = mca_pml_ob1_send_copy(sendreq, endpoint); \ + rc = mca_pml_ob1_send_request_start_copy(sendreq, endpoint); \ } else { \ - rc = mca_pml_ob1_send_user(sendreq, endpoint); \ + rc = mca_pml_ob1_send_request_start_user(sendreq, endpoint); \ } \ } +/* + * Advance a request + */ -#define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \ +#define MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq) + +#if 0 + switch(sendreq->req_state) { + case MCA_PML_OB1_SR_WAIT: + mca_pml_ob1_send_request_wait(sendreq); + break; + case MCA_PML_OB1_SR_SEND: + mca_pml_ob1_send_request_send(sendreq); + break; + case MCA_PML_OB1_SR_PUT: + mca_pml_ob1_send_request_put(sendreq); + break; + default: + break; + } +#endif + + +#define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \ { \ /* Let the base handle the reference counts */ \ MCA_PML_BASE_SEND_REQUEST_FINI((&sendreq->req_send)); \ OMPI_FREE_LIST_RETURN( \ - &mca_pml_ob1.send_requests, (ompi_list_item_t*)sendreq); \ + &mca_pml_ob1.send_requests, (ompi_list_item_t*)sendreq); \ } +/** + * BMI doesn't require pre-pinned or "specially" allocated memory. + * Can try to directly send from the users buffer if contigous. + */ + +int mca_pml_ob1_send_request_start_user( + mca_pml_ob1_send_request_t* sendreq, + mca_pml_ob1_endpoint_t* endpoint); + + +/** + * BMI requires "specially" allocated memory. Request a segment that + * is used for initial hdr and any eager data. + */ + +int mca_pml_ob1_send_request_start_copy( + mca_pml_ob1_send_request_t* sendreq, + mca_pml_ob1_endpoint_t* endpoint); + + +/** + * + */ + +int mca_pml_ob1_send_request_acked( + mca_bmi_base_module_t* bmi, + struct mca_pml_ob1_ack_hdr_t* hdr); + + +/** + * + */ +int mca_pml_ob1_send_request_send( + mca_pml_ob1_send_request_t* sendreq); + +/** + * + */ +int mca_pml_ob1_send_request_put( + mca_pml_ob1_send_request_t* sendreq); #if defined(c_plusplus) || defined(__cplusplus) }