rdma protocol - a work in progress
This commit was SVN r6005.
Этот коммит содержится в:
родитель
ce553cfec8
Коммит
b73fb5dcf2
@ -404,32 +404,6 @@ typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_prepare_fn_t)(
|
||||
size_t* size
|
||||
);
|
||||
|
||||
/**
|
||||
* Pack data and return a descriptor that can be
|
||||
* used for send/put.
|
||||
*
|
||||
* @param bmi (IN) BMI module
|
||||
* @param endpoint (IN) BMI peer addressing
|
||||
*/
|
||||
|
||||
typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_prepare_dst_fn_t)(
|
||||
struct mca_bmi_base_module_t* bmi,
|
||||
struct mca_bmi_base_endpoint_t* endpoint,
|
||||
struct ompi_convertor_t* convertor,
|
||||
size_t reserve,
|
||||
size_t* size
|
||||
);
|
||||
|
||||
|
||||
typedef int (*mca_bmi_base_module_unpack_fn_t)(
|
||||
struct mca_bmi_base_module_t* bmi,
|
||||
struct mca_bmi_base_endpoint_t* peer,
|
||||
struct ompi_convertor_t* convertor,
|
||||
struct mca_bmi_base_descriptor_t* descriptor
|
||||
);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Initiate a send to the peer.
|
||||
*
|
||||
@ -480,8 +454,10 @@ struct mca_bmi_base_module_t {
|
||||
/* BMI common attributes */
|
||||
mca_bmi_base_component_t* bmi_component; /**< pointer back to the BMI component structure */
|
||||
size_t bmi_eager_limit; /**< maximum size of first fragment -- eager send */
|
||||
size_t bmi_min_frag_size; /**< threshold below which the BMI will not fragment */
|
||||
size_t bmi_max_frag_size; /**< maximum fragment size supported by the BMI */
|
||||
size_t bmi_min_send_size; /**< threshold below which the BMI should not fragment */
|
||||
size_t bmi_max_send_size; /**< maximum send fragment size supported by the BMI */
|
||||
size_t bmi_min_rdma_size; /**< threshold below which the BMI should not fragment */
|
||||
size_t bmi_max_rdma_size; /**< maximum rdma fragment size supported by the BMI */
|
||||
uint32_t bmi_exclusivity; /**< indicates this BMI should be used exclusively */
|
||||
uint32_t bmi_latency; /**< relative ranking of latency used to prioritize bmis */
|
||||
uint32_t bmi_bandwidth; /**< bandwidth (Mbytes/sec) supported by each endpoint */
|
||||
|
@ -35,8 +35,10 @@ mca_bmi_ib_module_t mca_bmi_ib_module = {
|
||||
{
|
||||
&mca_bmi_ib_component.super,
|
||||
0, /* max size of first fragment */
|
||||
0, /* min fragment size */
|
||||
0, /* max fragment size */
|
||||
0, /* min send fragment size */
|
||||
0, /* max send fragment size */
|
||||
0, /* min rdma fragment size */
|
||||
0, /* max rdma fragment size */
|
||||
0, /* exclusivity */
|
||||
0, /* latency */
|
||||
0, /* bandwidth */
|
||||
|
@ -127,12 +127,12 @@ int mca_bmi_ib_component_open(void)
|
||||
mca_bmi_ib_param_register_int ("first_frag_size",
|
||||
(MCA_BMI_IB_FIRST_FRAG_SIZE
|
||||
- sizeof(mca_bmi_ib_header_t)));
|
||||
mca_bmi_ib_module.super.bmi_min_frag_size =
|
||||
mca_bmi_ib_param_register_int ("min_frag_size",
|
||||
mca_bmi_ib_module.super.bmi_min_send_size =
|
||||
mca_bmi_ib_param_register_int ("min_send_size",
|
||||
(MCA_BMI_IB_FIRST_FRAG_SIZE
|
||||
- sizeof(mca_bmi_ib_header_t)));
|
||||
mca_bmi_ib_module.super.bmi_max_frag_size =
|
||||
mca_bmi_ib_param_register_int ("max_frag_size", 2<<30);
|
||||
mca_bmi_ib_module.super.bmi_max_send_size =
|
||||
mca_bmi_ib_param_register_int ("max_send_size", 2<<30);
|
||||
|
||||
|
||||
|
||||
|
@ -48,8 +48,10 @@ mca_bmi_sm_t mca_bmi_sm[2] = {
|
||||
{
|
||||
&mca_bmi_sm_component.super,
|
||||
0, /* bmi_eager_limit */
|
||||
0, /* bmi_min_frag_size */
|
||||
0, /* bmi_max_frag_size */
|
||||
0, /* bmi_min_send_size */
|
||||
0, /* bmi_max_send_size */
|
||||
0, /* bmi_min_rdma_size */
|
||||
0, /* bmi_max_rdma_size */
|
||||
0, /* bmi_exclusivity */
|
||||
0, /* bmi_latency */
|
||||
0, /* bmi_bandwidth */
|
||||
@ -71,8 +73,10 @@ mca_bmi_sm_t mca_bmi_sm[2] = {
|
||||
{
|
||||
&mca_bmi_sm_component.super,
|
||||
0, /* bmi_eager_limit */
|
||||
0, /* bmi_min_frag_size */
|
||||
0, /* bmi_max_frag_size */
|
||||
0, /* bmi_min_send_size */
|
||||
0, /* bmi_max_send_size */
|
||||
0, /* bmi_min_rdma_size */
|
||||
0, /* bmi_max_rdma_size */
|
||||
0, /* bmi_exclusivity */
|
||||
0, /* bmi_latency */
|
||||
0, /* bmi_bandwidth */
|
||||
|
@ -269,8 +269,10 @@ mca_bmi_base_module_t** mca_bmi_sm_component_init(
|
||||
/* set scheduling parameters */
|
||||
for( i=0 ; i < 2 ; i++ ) {
|
||||
mca_bmi_sm[i].super.bmi_eager_limit=mca_bmi_sm_component.eager_limit;
|
||||
mca_bmi_sm[i].super.bmi_min_frag_size=mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm[i].super.bmi_max_frag_size=mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm[i].super.bmi_min_send_size=mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm[i].super.bmi_max_send_size=mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm[i].super.bmi_min_rdma_size=mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm[i].super.bmi_max_rdma_size=mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm[i].super.bmi_exclusivity=100; /* always use this ptl */
|
||||
mca_bmi_sm[i].super.bmi_latency=100; /* lowest latency */
|
||||
mca_bmi_sm[i].super.bmi_bandwidth=900; /* not really used now since exclusivity is set to 100 */
|
||||
|
@ -1 +1,2 @@
|
||||
gshipman
|
||||
twoodall
|
||||
|
@ -273,8 +273,10 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
endpoint = mca_pml_ob1_ep_array_insert(&proc_pml->bmi_send);
|
||||
endpoint->bmi = bmi;
|
||||
endpoint->bmi_eager_limit = bmi->bmi_eager_limit;
|
||||
endpoint->bmi_min_frag_size = bmi->bmi_min_frag_size;
|
||||
endpoint->bmi_max_frag_size = bmi->bmi_max_frag_size;
|
||||
endpoint->bmi_min_send_size = bmi->bmi_min_send_size;
|
||||
endpoint->bmi_max_send_size = bmi->bmi_max_send_size;
|
||||
endpoint->bmi_min_rdma_size = bmi->bmi_min_rdma_size;
|
||||
endpoint->bmi_max_rdma_size = bmi->bmi_max_rdma_size;
|
||||
endpoint->bmi_cache = NULL;
|
||||
endpoint->bmi_endpoint = bmi_endpoints[p];
|
||||
endpoint->bmi_weight = 0;
|
||||
|
@ -72,6 +72,7 @@ struct mca_pml_ob1_t {
|
||||
|
||||
/* list of pending send requests */
|
||||
ompi_list_t send_pending;
|
||||
ompi_list_t recv_pending;
|
||||
ompi_list_t acks_pending;
|
||||
};
|
||||
typedef struct mca_pml_ob1_t mca_pml_ob1_t;
|
||||
|
@ -88,6 +88,7 @@ int mca_pml_ob1_component_open(void)
|
||||
|
||||
/* pending operations */
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.send_pending, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.recv_pending, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.acks_pending, ompi_list_t);
|
||||
|
||||
mca_pml_ob1.bmi_components = NULL;
|
||||
@ -148,6 +149,7 @@ int mca_pml_ob1_component_close(void)
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.acks_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.send_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.recv_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.send_requests);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.recv_requests);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.lock);
|
||||
|
@ -35,8 +35,10 @@ struct mca_pml_ob1_endpoint_t {
|
||||
int bmi_weight; /**< BMI weight for scheduling */
|
||||
int bmi_flags; /**< support for put/get? */
|
||||
size_t bmi_eager_limit; /**< BMI eager limit */
|
||||
size_t bmi_min_frag_size; /**< BMI min fragment size */
|
||||
size_t bmi_max_frag_size; /**< BMI max fragment size */
|
||||
size_t bmi_min_send_size; /**< BMI min send size */
|
||||
size_t bmi_max_send_size; /**< BMI max send size */
|
||||
size_t bmi_min_rdma_size; /**< BMI min rdma size */
|
||||
size_t bmi_max_rdma_size; /**< BMI max rdma size */
|
||||
struct mca_bmi_base_module_t *bmi; /**< BMI module */
|
||||
struct mca_bmi_base_endpoint_t* bmi_endpoint; /**< BMI addressing info */
|
||||
struct mca_bmi_base_descriptor_t* bmi_cache;
|
||||
|
@ -214,11 +214,11 @@ typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t;
|
||||
|
||||
struct mca_pml_ob1_rdma_hdr_t {
|
||||
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
|
||||
ompi_ptr_t hdr_src_req; /**< source request */
|
||||
ompi_ptr_t hdr_dst_req; /**< matched receive request */
|
||||
ompi_ptr_t hdr_src; /**< source request/descriptor */
|
||||
ompi_ptr_t hdr_dst; /**< receive request/descriptor */
|
||||
uint64_t hdr_offset; /**< current offset into user buffer */
|
||||
uint32_t hdr_num_segments; /**< number of segments for rdma */
|
||||
mca_bmi_base_segment_t segments[1]; /**< list of segments for rdma */
|
||||
uint32_t hdr_seg_cnt; /**< number of segments for rdma */
|
||||
mca_bmi_base_segment_t hdr_segs[1]; /**< list of segments for rdma */
|
||||
};
|
||||
typedef struct mca_pml_ob1_rdma_hdr_t mca_pml_ob1_rdma_hdr_t;
|
||||
|
||||
|
@ -77,7 +77,7 @@ void mca_pml_ob1_recv_frag_callback(
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)
|
||||
hdr->hdr_ack.hdr_src_req.pval;
|
||||
sendreq->req_state = MCA_PML_OB1_SR_SEND;
|
||||
sendreq->req_state = MCA_PML_OB1_SR_ACKED;
|
||||
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req;
|
||||
sendreq->req_rdma_offset = hdr->hdr_ack.hdr_rdma_offset;
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
@ -90,6 +90,17 @@ void mca_pml_ob1_recv_frag_callback(
|
||||
mca_pml_ob1_recv_request_progress(recvreq,bmi,segments,des->des_src_cnt);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_PUT:
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)
|
||||
hdr->hdr_rdma.hdr_src.pval;
|
||||
mca_pml_ob1_send_request_put(sendreq,bmi,&hdr->hdr_rdma);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_FIN:
|
||||
{
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ OBJ_CLASS_INSTANCE(
|
||||
* Release resources.
|
||||
*/
|
||||
|
||||
static void mca_pml_ob1_recv_request_acked(
|
||||
static void mca_pml_ob1_send_ctl_complete(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
struct mca_bmi_base_endpoint_t* ep,
|
||||
struct mca_bmi_base_descriptor_t* des,
|
||||
@ -123,6 +123,7 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
mca_bmi_base_descriptor_t* des;
|
||||
mca_pml_ob1_fragment_t* frag;
|
||||
mca_pml_ob1_ack_hdr_t* ack;
|
||||
bool schedule;
|
||||
int rc;
|
||||
|
||||
/* allocate descriptor */
|
||||
@ -139,18 +140,20 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
* - size is larger than the rdma threshold
|
||||
* - rdma devices are available
|
||||
*/
|
||||
if(recvreq->req_recv.req_bytes_packed > mca_pml_ob1.rdma_threshold &&
|
||||
if(recvreq->req_recv.req_bytes_packed >= mca_pml_ob1.rdma_threshold &&
|
||||
mca_pml_ob1_ep_array_get_size(&proc->bmi_rdma)) {
|
||||
|
||||
/* use convertor to figure out the rdma offset for this request */
|
||||
recvreq->req_rdma_offset = mca_pml_ob1.rdma_offset;
|
||||
ompi_convertor_set_position(
|
||||
&recvreq->req_convertor, /* convertor */
|
||||
&recvreq->req_recv.req_convertor,
|
||||
&recvreq->req_rdma_offset);
|
||||
ack->hdr_rdma_offset = recvreq->req_rdma_offset;
|
||||
schedule = true;
|
||||
} else {
|
||||
recvreq->req_rdma_offset = recvreq->req_recv.req_bytes_packed;
|
||||
ack->hdr_rdma_offset = recvreq->req_recv.req_bytes_packed;
|
||||
schedule = false;
|
||||
}
|
||||
|
||||
ack->hdr_common.hdr_flags = 0;
|
||||
@ -158,7 +161,7 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
ack->hdr_dst_req.pval = recvreq;
|
||||
|
||||
/* initialize descriptor */
|
||||
des->des_cbfunc = mca_pml_ob1_recv_request_acked;
|
||||
des->des_cbfunc = mca_pml_ob1_send_ctl_complete;
|
||||
des->des_cbdata = recvreq;
|
||||
|
||||
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML);
|
||||
@ -167,6 +170,9 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
goto retry;
|
||||
}
|
||||
|
||||
/* after sending ack - attempt to schedule rdma */
|
||||
if(schedule)
|
||||
mca_pml_ob1_recv_request_schedule(recvreq);
|
||||
return;
|
||||
|
||||
/* queue request to retry later */
|
||||
@ -212,6 +218,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
|
||||
recvreq->req_send = hdr->hdr_rndv.hdr_src_req;
|
||||
mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv);
|
||||
bytes_received = hdr->hdr_rndv.hdr_frag_length;
|
||||
MCA_PML_OB1_RECV_REQUEST_UNPACK(
|
||||
@ -258,6 +265,112 @@ void mca_pml_ob1_recv_request_progress(
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Schedule RDMA protocol.
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_schedule(mca_pml_ob1_recv_request_t* recvreq)
|
||||
{
|
||||
if(OMPI_THREAD_ADD32(&recvreq->req_lock,1) == 1) {
|
||||
mca_pml_ob1_proc_t* proc = recvreq->req_proc;
|
||||
size_t num_bmi_avail = mca_pml_ob1_ep_array_get_size(&proc->bmi_rdma);
|
||||
do {
|
||||
size_t bytes_remaining = recvreq->req_recv.req_bytes_packed - recvreq->req_rdma_offset;
|
||||
while(bytes_remaining > 0 && recvreq->req_pipeline_depth < mca_pml_ob1.recv_pipeline_depth) {
|
||||
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_send);
|
||||
size_t hdr_size;
|
||||
mca_pml_ob1_rdma_hdr_t* hdr;
|
||||
mca_bmi_base_descriptor_t* dst;
|
||||
mca_bmi_base_descriptor_t* ctl;
|
||||
int rc;
|
||||
|
||||
/* if there is only one bmi available or the size is less than
|
||||
* than the min fragment size, schedule the rest via this bmi
|
||||
*/
|
||||
size_t size;
|
||||
if(num_bmi_avail == 1 || bytes_remaining < ep->bmi_min_rdma_size) {
|
||||
size = bytes_remaining;
|
||||
|
||||
/* otherwise attempt to give the BMI a percentage of the message
|
||||
* based on a weighting factor. for simplicity calculate this as
|
||||
* a percentage of the overall message length (regardless of amount
|
||||
* previously assigned)
|
||||
*/
|
||||
} else {
|
||||
size = (ep->bmi_weight * bytes_remaining) / 100;
|
||||
}
|
||||
|
||||
/* makes sure that we don't exceed BMI max rdma size */
|
||||
if (ep->bmi_max_rdma_size != 0 && size > ep->bmi_max_rdma_size) {
|
||||
size = ep->bmi_max_rdma_size;
|
||||
}
|
||||
|
||||
/* prepare a descriptor for RDMA */
|
||||
ompi_convertor_set_position(&recvreq->req_recv.req_convertor, &recvreq->req_rdma_offset);
|
||||
dst = ep->bmi_prepare_dst(
|
||||
ep->bmi,
|
||||
ep->bmi_endpoint,
|
||||
&recvreq->req_recv.req_convertor,
|
||||
0,
|
||||
&size);
|
||||
if(dst == NULL) {
|
||||
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
ompi_list_append(&mca_pml_ob1.recv_pending, (ompi_list_item_t*)recvreq);
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
break;
|
||||
}
|
||||
dst->des_cbdata = recvreq;
|
||||
|
||||
/* prepare a descriptor for rdma control message */
|
||||
hdr_size = sizeof(mca_pml_ob1_rdma_hdr_t);
|
||||
if(dst->des_dst_cnt > 1) {
|
||||
hdr_size += (sizeof(mca_bmi_base_segment_t) * (dst->des_dst_cnt-1));
|
||||
}
|
||||
ctl = ep->bmi_alloc(ep->bmi, hdr_size);
|
||||
if(ctl == NULL) {
|
||||
ep->bmi_free(ep->bmi,dst);
|
||||
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
ompi_list_append(&mca_pml_ob1.recv_pending, (ompi_list_item_t*)recvreq);
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
break;
|
||||
}
|
||||
ctl->des_cbfunc = mca_pml_ob1_send_ctl_complete;
|
||||
ctl->des_cbdata = recvreq;
|
||||
|
||||
/* fill in rdma header */
|
||||
hdr = (mca_pml_ob1_rdma_hdr_t*)ctl->des_src->seg_addr.pval;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_PUT;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_src = recvreq->req_send;
|
||||
hdr->hdr_dst.pval = dst;
|
||||
hdr->hdr_offset = recvreq->req_rdma_offset;
|
||||
hdr->hdr_seg_cnt = dst->des_dst_cnt;
|
||||
memcpy(hdr->hdr_segs, dst->des_dst, dst->des_dst_cnt * sizeof(mca_bmi_base_segment_t));
|
||||
|
||||
/* update request state */
|
||||
recvreq->req_rdma_offset += size;
|
||||
OMPI_THREAD_ADD32(&recvreq->req_pipeline_depth,1);
|
||||
|
||||
/* send rdma request to peer */
|
||||
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, ctl, MCA_BMI_TAG_PML);
|
||||
if(rc == OMPI_SUCCESS) {
|
||||
bytes_remaining = recvreq->req_recv.req_bytes_packed - recvreq->req_rdma_offset;
|
||||
} else {
|
||||
ep->bmi_free(ep->bmi,ctl);
|
||||
ep->bmi_free(ep->bmi,dst);
|
||||
recvreq->req_rdma_offset -= size;
|
||||
OMPI_THREAD_ADD32(&recvreq->req_pipeline_depth,-1);
|
||||
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
ompi_list_append(&mca_pml_ob1.recv_pending, (ompi_list_item_t*)recvreq);
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while(OMPI_THREAD_ADD32(&recvreq->req_lock,-1) > 0);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This routine is used to match a posted receive when the source process
|
||||
* is specified.
|
||||
|
@ -29,18 +29,13 @@ extern "C" {
|
||||
|
||||
struct mca_pml_ob1_recv_request_t {
|
||||
mca_pml_base_recv_request_t req_recv;
|
||||
size_t req_bytes_received;
|
||||
size_t req_bytes_delivered;
|
||||
size_t req_rdma_offset;
|
||||
|
||||
ompi_convertor_t req_convertor;
|
||||
/* note that we allocate additional space for the recv
|
||||
* request to increase the array size based on run-time
|
||||
* parameters for the pipeline depth. So... this MUST be
|
||||
* the last element of this struct.
|
||||
*/
|
||||
mca_bmi_base_descriptor_t *req_pipeline[1];
|
||||
struct mca_pml_proc_t *req_proc;
|
||||
ompi_ptr_t req_send;
|
||||
int32_t req_lock;
|
||||
size_t req_pipeline_depth;
|
||||
size_t req_bytes_received;
|
||||
size_t req_bytes_delivered;
|
||||
size_t req_rdma_offset;
|
||||
};
|
||||
typedef struct mca_pml_ob1_recv_request_t mca_pml_ob1_recv_request_t;
|
||||
|
||||
@ -134,6 +129,8 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
||||
/* init/re-init the request */ \
|
||||
(request)->req_bytes_received = 0; \
|
||||
(request)->req_bytes_delivered = 0; \
|
||||
(request)->req_lock = 0; \
|
||||
(request)->req_pipeline_depth = 0; \
|
||||
(request)->req_recv.req_base.req_pml_complete = false; \
|
||||
(request)->req_recv.req_base.req_ompi.req_complete = false; \
|
||||
(request)->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
|
||||
@ -177,7 +174,7 @@ do {
|
||||
(request)->req_recv.req_base.req_datatype, \
|
||||
(request)->req_recv.req_base.req_count, \
|
||||
(request)->req_recv.req_base.req_addr, \
|
||||
&(request)->req_convertor ); \
|
||||
&(request)->req_recv.req_convertor ); \
|
||||
} else { \
|
||||
(request)->req_proc = NULL; \
|
||||
} \
|
||||
@ -215,10 +212,10 @@ do {
|
||||
} \
|
||||
} \
|
||||
ompi_convertor_set_position( \
|
||||
&(request->req_convertor), \
|
||||
&(request->req_recv.req_convertor), \
|
||||
&data_offset); \
|
||||
ompi_convertor_unpack( \
|
||||
&(request)->req_convertor, \
|
||||
&(request)->req_recv.req_convertor, \
|
||||
iov, \
|
||||
&iov_count, \
|
||||
&max_data, \
|
||||
@ -239,6 +236,14 @@ void mca_pml_ob1_recv_request_progress(
|
||||
size_t num_segments);
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_schedule(
|
||||
mca_pml_ob1_recv_request_t* req);
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -149,7 +149,7 @@ static void mca_pml_ob1_send_completion(
|
||||
bmi_ep->bmi_free(bmi_ep->bmi, descriptor);
|
||||
|
||||
/* check for request completion */
|
||||
if (OMPI_THREAD_ADD32(&sendreq->req_send_pending,-1) == 0 &&
|
||||
if (OMPI_THREAD_ADD32(&sendreq->req_pipeline_depth,-1) == 0 &&
|
||||
sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
sendreq->req_send.req_base.req_pml_complete = true;
|
||||
if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
|
||||
@ -174,7 +174,7 @@ static void mca_pml_ob1_send_completion(
|
||||
/* advance pending requests */
|
||||
while(NULL != sendreq) {
|
||||
switch(sendreq->req_state) {
|
||||
case MCA_PML_OB1_SR_SEND:
|
||||
case MCA_PML_OB1_SR_ACKED:
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
break;
|
||||
default:
|
||||
@ -342,7 +342,7 @@ int mca_pml_ob1_send_request_start(
|
||||
}
|
||||
}
|
||||
descriptor->des_cbdata = sendreq;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_send_pending,1);
|
||||
OMPI_THREAD_ADD32(&sendreq->req_pipeline_depth,1);
|
||||
|
||||
/* send */
|
||||
rc = endpoint->bmi_send(
|
||||
@ -373,37 +373,34 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
|
||||
mca_pml_ob1_proc_t* proc = sendreq->req_proc;
|
||||
size_t num_bmi_avail = mca_pml_ob1_ep_array_get_size(&proc->bmi_send);
|
||||
do {
|
||||
/* allocate remaining bytes to PTLs */
|
||||
/* allocate remaining bytes to BMIs */
|
||||
size_t bytes_remaining = sendreq->req_rdma_offset - sendreq->req_send_offset;
|
||||
while(bytes_remaining > 0 && sendreq->req_send_pending < mca_pml_ob1.send_pipeline_depth) {
|
||||
while(bytes_remaining > 0 && sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth) {
|
||||
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_send);
|
||||
mca_pml_ob1_frag_hdr_t* hdr;
|
||||
mca_bmi_base_descriptor_t* des;
|
||||
int rc;
|
||||
|
||||
/* if this is the last PTL that is available to use, or the number of
|
||||
* bytes remaining in the message is less than the PTLs minimum fragment
|
||||
* size, then go ahead and give the rest of the message to this PTL.
|
||||
/* if there is only one bmi available or the size is less than
|
||||
* than the min fragment size, schedule the rest via this bmi
|
||||
*/
|
||||
size_t size;
|
||||
if(bytes_remaining < ep->bmi_min_frag_size) {
|
||||
if(num_bmi_avail == 1 || bytes_remaining < ep->bmi_min_send_size) {
|
||||
size = bytes_remaining;
|
||||
|
||||
/* otherwise attempt to give the PTL a percentage of the message
|
||||
/* otherwise attempt to give the BMI a percentage of the message
|
||||
* based on a weighting factor. for simplicity calculate this as
|
||||
* a percentage of the overall message length (regardless of amount
|
||||
* previously assigned)
|
||||
*/
|
||||
} else if (num_bmi_avail > 1) {
|
||||
size = (ep->bmi_weight * bytes_remaining) / 100;
|
||||
} else {
|
||||
size = ep->bmi_min_frag_size;
|
||||
}
|
||||
size = (ep->bmi_weight * bytes_remaining) / 100;
|
||||
}
|
||||
|
||||
/* makes sure that we don't exceed ptl_max_frag_size */
|
||||
if (ep->bmi_max_frag_size != 0 &&
|
||||
size > ep->bmi_max_frag_size - sizeof(mca_pml_ob1_frag_hdr_t)) {
|
||||
size = ep->bmi_max_frag_size - sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
/* makes sure that we don't exceed BMI max send size */
|
||||
if (ep->bmi_max_send_size != 0 &&
|
||||
size > ep->bmi_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t)) {
|
||||
size = ep->bmi_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
}
|
||||
|
||||
/* pack into a descriptor */
|
||||
@ -433,7 +430,7 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
|
||||
|
||||
/* update state */
|
||||
sendreq->req_send_offset += size;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_send_pending,1);
|
||||
OMPI_THREAD_ADD32(&sendreq->req_pipeline_depth,1);
|
||||
|
||||
/* initiate send - note that this may complete before the call returns */
|
||||
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML);
|
||||
@ -441,7 +438,8 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
|
||||
bytes_remaining = sendreq->req_rdma_offset - sendreq->req_send_offset;
|
||||
} else {
|
||||
sendreq->req_send_offset -= size;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_send_pending,-1);
|
||||
OMPI_THREAD_ADD32(&sendreq->req_pipeline_depth,-1);
|
||||
ep->bmi_free(ep->bmi,des);
|
||||
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
ompi_list_append(&mca_pml_ob1.send_pending, (ompi_list_item_t*)sendreq);
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
|
@ -33,8 +33,7 @@ extern "C" {
|
||||
typedef enum {
|
||||
MCA_PML_OB1_SR_INIT,
|
||||
MCA_PML_OB1_SR_START,
|
||||
MCA_PML_OB1_SR_SEND,
|
||||
MCA_PML_OB1_SR_PUT,
|
||||
MCA_PML_OB1_SR_ACKED,
|
||||
MCA_PML_OB1_SR_COMPLETE
|
||||
} mca_pml_ob1_send_request_state_t;
|
||||
|
||||
@ -46,8 +45,8 @@ struct mca_pml_ob1_send_request_t {
|
||||
mca_pml_ob1_send_request_state_t req_state;
|
||||
ompi_ptr_t req_recv;
|
||||
int32_t req_lock;
|
||||
size_t req_pipeline_depth;
|
||||
size_t req_bytes_delivered;
|
||||
size_t req_send_pending;
|
||||
size_t req_send_offset;
|
||||
size_t req_rdma_offset;
|
||||
};
|
||||
@ -112,9 +111,9 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
/* select next endpoint */ \
|
||||
endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_eager); \
|
||||
sendreq->req_lock = 0; \
|
||||
sendreq->req_pipeline_depth = 0; \
|
||||
sendreq->req_bytes_delivered = 0; \
|
||||
sendreq->req_send_offset = 0; \
|
||||
sendreq->req_send_pending = 0; \
|
||||
sendreq->req_state = MCA_PML_OB1_SR_START; \
|
||||
sendreq->req_send.req_base.req_ompi.req_complete = false; \
|
||||
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
|
||||
@ -156,6 +155,16 @@ int mca_pml_ob1_send_request_start(
|
||||
int mca_pml_ob1_send_request_schedule(
|
||||
mca_pml_ob1_send_request_t* sendreq);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_put(
|
||||
mca_pml_ob1_send_request_t* sendreq,
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_pml_ob1_rdma_hdr_t* hdr);
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -1,5 +1,4 @@
|
||||
mitch
|
||||
spoole
|
||||
mlleinin
|
||||
twoodall
|
||||
rlgraham
|
||||
|
@ -327,6 +327,7 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
|
||||
/* Process incoming receives */
|
||||
mca_ptl_ib_process_recv(ib_ptl, comp_addr);
|
||||
#if 0
|
||||
/* Re post recv buffers */
|
||||
if(ompi_list_get_size(&ib_ptl->repost) <= 1) {
|
||||
ompi_list_append(&ib_ptl->repost, (ompi_list_item_t*)comp_addr);
|
||||
@ -337,6 +338,9 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
}
|
||||
mca_ptl_ib_buffer_repost(ib_ptl->nic, comp_addr);
|
||||
}
|
||||
#else
|
||||
mca_ptl_ib_buffer_repost(ib_ptl->nic, comp_addr);
|
||||
#endif
|
||||
count++;
|
||||
break;
|
||||
|
||||
|
@ -200,6 +200,7 @@ static int mca_ptl_ib_peer_start_connect(mca_ptl_base_peer_t* peer)
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name), __FILE__,__LINE__,rc);
|
||||
return rc;
|
||||
}
|
||||
D_PRINT("mca_ptl_ib_peer_start_connect: qp_num=%d lid=%d", peer->lcl_qp_prop.qp_num, ib_ptl->port.lid);
|
||||
|
||||
/* Send connection info over to remote peer */
|
||||
peer->peer_state = MCA_PTL_IB_CONNECTING;
|
||||
@ -232,6 +233,7 @@ static int mca_ptl_ib_peer_reply_start_connect(mca_ptl_ib_peer_t *peer, orte_buf
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name), __FILE__,__LINE__,rc);
|
||||
return rc;
|
||||
}
|
||||
D_PRINT("mca_ptl_ib_peer_reply_start_connect: qp_num=%d lid=%d", peer->lcl_qp_prop.qp_num, ib_ptl->port.lid);
|
||||
|
||||
/* Set the remote side info */
|
||||
mca_ptl_ib_peer_set_remote_info(peer, buffer);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user