- fix scheduling logic
- use cached descriptor for control messages This commit was SVN r6053.
Этот коммит содержится в:
родитель
83cba7f7cf
Коммит
6e919f9fb1
@ -115,9 +115,9 @@ int mca_pml_ob1_component_open(void)
|
||||
mca_pml_ob1.recv_pipeline_depth =
|
||||
mca_pml_ob1_param_register_int("recv_pipeline_depth", 3);
|
||||
mca_pml_ob1.rdma_threshold =
|
||||
mca_pml_ob1_param_register_int("rdma_threshold", 512*1024);
|
||||
mca_pml_ob1_param_register_int("rdma_threshold", 256*1024);
|
||||
mca_pml_ob1.rdma_offset =
|
||||
mca_pml_ob1_param_register_int("rdma_offset", 256*1024);
|
||||
mca_pml_ob1_param_register_int("rdma_offset", 128*1024);
|
||||
|
||||
return mca_bmi_base_open();
|
||||
}
|
||||
|
@ -174,31 +174,36 @@ static inline mca_pml_ob1_endpoint_t* mca_pml_ob1_ep_array_find(
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a descriptor
|
||||
* Allocate a descriptor for control message
|
||||
*/
|
||||
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor) \
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor, size) \
|
||||
do { \
|
||||
if(NULL != (descriptor = endpoint->bmi_cache)) { \
|
||||
/* atomically acquire the cached descriptor */ \
|
||||
if(ompi_atomic_cmpset_ptr(&endpoint->bmi_cache, descriptor, NULL) == 0) { \
|
||||
endpoint->bmi_cache = NULL; \
|
||||
} else { \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t) + \
|
||||
MCA_BMI_DES_MAX_SEGMENTS * sizeof(mca_bmi_base_segment_t)); \
|
||||
} \
|
||||
} else { \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t) + \
|
||||
MCA_BMI_DES_MAX_SEGMENTS * sizeof(mca_bmi_base_segment_t)); \
|
||||
} \
|
||||
descriptor->des_src->seg_len = size; \
|
||||
} while(0)
|
||||
#else
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor) \
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor, size) \
|
||||
do { \
|
||||
if(NULL != (descriptor = endpoint->bmi_cache)) { \
|
||||
endpoint->bmi_cache = NULL; \
|
||||
} else { \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t) + \
|
||||
MCA_BMI_DES_MAX_SEGMENTS * sizeof(mca_bmi_base_segment_t)); \
|
||||
} \
|
||||
descriptor->des_src->seg_len = size; \
|
||||
} while(0)
|
||||
#endif
|
||||
|
||||
@ -209,21 +214,21 @@ do {
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_RETURN(endpoint, descriptor) \
|
||||
do { \
|
||||
if(NULL == bmi_ep->bmi_cache) { \
|
||||
if(NULL == endpoint->bmi_cache) { \
|
||||
if(ompi_atomic_cmpset_ptr(&endpoint->bmi_cache,NULL,descriptor) == 0) { \
|
||||
bmi->bmi_free(bmi,descriptor); \
|
||||
endpoint->bmi_free(endpoint->bmi,descriptor); \
|
||||
} \
|
||||
} else { \
|
||||
bmi->bmi_free(bmi,descriptor); \
|
||||
endpoint->bmi_free(endpoint->bmi,descriptor); \
|
||||
}
|
||||
} while(0)
|
||||
#else
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_RETURN(endpoint, descriptor) \
|
||||
do { \
|
||||
if(NULL == bmi_ep->bmi_cache) { \
|
||||
bmi_ep->bmi_cache = descriptor; \
|
||||
if(NULL == endpoint->bmi_cache) { \
|
||||
endpoint->bmi_cache = descriptor; \
|
||||
} else { \
|
||||
bmi->bmi_free(bmi,descriptor); \
|
||||
endpoint->bmi_free(endpoint->bmi,descriptor); \
|
||||
} \
|
||||
} while(0)
|
||||
#endif
|
||||
|
@ -37,6 +37,7 @@ struct mca_pml_ob1_rdma_frag_t {
|
||||
mca_pml_ob1_rdma_state_t rdma_state;
|
||||
size_t rdma_length;
|
||||
mca_bmi_base_segment_t rdma_segs[MCA_BMI_DES_MAX_SEGMENTS];
|
||||
struct mca_pml_ob1_endpoint_t* rdma_ep;
|
||||
struct mca_pml_ob1_send_request_t* rdma_req;
|
||||
};
|
||||
typedef struct mca_pml_ob1_rdma_frag_t mca_pml_ob1_rdma_frag_t;
|
||||
|
@ -106,7 +106,8 @@ static void mca_pml_ob1_send_ctl_complete(
|
||||
struct mca_bmi_base_descriptor_t* des,
|
||||
int status)
|
||||
{
|
||||
bmi->bmi_free(bmi,des);
|
||||
mca_pml_ob1_endpoint_t* endpoint = (mca_pml_ob1_endpoint_t*)des->des_cbdata;
|
||||
MCA_PML_OB1_ENDPOINT_DES_RETURN(endpoint, des);
|
||||
}
|
||||
|
||||
|
||||
@ -127,7 +128,7 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
int rc;
|
||||
|
||||
/* allocate descriptor */
|
||||
des = ep->bmi_alloc(ep->bmi, sizeof(mca_pml_ob1_ack_hdr_t));
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(ep, des, sizeof(mca_pml_ob1_ack_hdr_t));
|
||||
if(NULL == des) {
|
||||
goto retry;
|
||||
}
|
||||
@ -140,7 +141,8 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
* - size is larger than the rdma threshold
|
||||
* - rdma devices are available
|
||||
*/
|
||||
if(recvreq->req_recv.req_bytes_packed >= mca_pml_ob1.rdma_threshold &&
|
||||
if(mca_pml_ob1.rdma_threshold != 0 &&
|
||||
recvreq->req_recv.req_bytes_packed >= mca_pml_ob1.rdma_threshold &&
|
||||
mca_pml_ob1_ep_array_get_size(&proc->bmi_rdma) &&
|
||||
ompi_convertor_need_buffers(&recvreq->req_recv.req_convertor) == 0) {
|
||||
|
||||
@ -163,7 +165,7 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
|
||||
/* initialize descriptor */
|
||||
des->des_cbfunc = mca_pml_ob1_send_ctl_complete;
|
||||
des->des_cbdata = recvreq;
|
||||
des->des_cbdata = ep;
|
||||
|
||||
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
@ -250,6 +252,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
case MCA_PML_OB1_HDR_TYPE_FIN:
|
||||
|
||||
bytes_delivered = bytes_received = hdr->hdr_fin.hdr_rdma_length;
|
||||
OMPI_THREAD_ADD32(&recvreq->req_pipeline_depth,-1);
|
||||
break;
|
||||
|
||||
default:
|
||||
@ -343,7 +346,8 @@ void mca_pml_ob1_recv_request_schedule(mca_pml_ob1_recv_request_t* recvreq)
|
||||
if(dst->des_dst_cnt > 1) {
|
||||
hdr_size += (sizeof(mca_bmi_base_segment_t) * (dst->des_dst_cnt-1));
|
||||
}
|
||||
ctl = ep->bmi_alloc(ep->bmi, hdr_size);
|
||||
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(ep, ctl, hdr_size);
|
||||
if(ctl == NULL) {
|
||||
ep->bmi_free(ep->bmi,dst);
|
||||
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
@ -352,7 +356,7 @@ void mca_pml_ob1_recv_request_schedule(mca_pml_ob1_recv_request_t* recvreq)
|
||||
break;
|
||||
}
|
||||
ctl->des_cbfunc = mca_pml_ob1_send_ctl_complete;
|
||||
ctl->des_cbdata = recvreq;
|
||||
ctl->des_cbdata = ep;
|
||||
|
||||
/* fill in rdma header */
|
||||
hdr = (mca_pml_ob1_rdma_hdr_t*)ctl->des_src->seg_addr.pval;
|
||||
|
@ -189,12 +189,11 @@ int mca_pml_ob1_send_request_start(
|
||||
if(size == 0 && sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
||||
|
||||
/* allocate a descriptor */
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor);
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor, sizeof(mca_pml_ob1_match_hdr_t));
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
segment = descriptor->des_src;
|
||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t);
|
||||
|
||||
/* build hdr */
|
||||
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
||||
@ -233,7 +232,7 @@ int mca_pml_ob1_send_request_start(
|
||||
int32_t free_after;
|
||||
|
||||
/* allocate descriptor */
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor);
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_match_hdr_t) + size);
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
@ -281,7 +280,7 @@ int mca_pml_ob1_send_request_start(
|
||||
int32_t free_after;
|
||||
|
||||
/* allocate space for hdr + first fragment */
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, size);
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_rendezvous_hdr_t) + size);
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
@ -447,8 +446,9 @@ static void mca_pml_ob1_fin_completion(
|
||||
{
|
||||
|
||||
mca_pml_ob1_rdma_frag_t* frag = (mca_pml_ob1_rdma_frag_t*)des->des_cbdata;
|
||||
mca_pml_ob1_endpoint_t* endpoint = frag->rdma_ep;
|
||||
MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
|
||||
bmi->bmi_free(bmi,des);
|
||||
MCA_PML_OB1_ENDPOINT_DES_RETURN(endpoint, des);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -489,7 +489,8 @@ static void mca_pml_ob1_put_completion(
|
||||
* at the user buffer
|
||||
*/
|
||||
frag->rdma_state = MCA_PML_OB1_RDMA_FIN;
|
||||
fin = bmi->bmi_alloc(bmi,sizeof(mca_pml_ob1_fin_hdr_t));
|
||||
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(frag->rdma_ep, fin, sizeof(mca_pml_ob1_fin_hdr_t));
|
||||
if(NULL == fin) {
|
||||
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
ompi_list_append(&mca_pml_ob1.rdma_pending, (ompi_list_item_t*)frag);
|
||||
@ -573,6 +574,8 @@ void mca_pml_ob1_send_request_put(
|
||||
frag->rdma_segs[i] = hdr->hdr_segs[i];
|
||||
}
|
||||
frag->rdma_hdr.hdr_rdma = *hdr;
|
||||
frag->rdma_req = sendreq;
|
||||
frag->rdma_ep = ep;
|
||||
frag->rdma_state = MCA_PML_OB1_RDMA_PREPARE;
|
||||
|
||||
/* setup descriptor */
|
||||
@ -590,7 +593,6 @@ void mca_pml_ob1_send_request_put(
|
||||
}
|
||||
frag->rdma_state = MCA_PML_OB1_RDMA_PUT;
|
||||
frag->rdma_length = size;
|
||||
frag->rdma_req = sendreq;
|
||||
|
||||
des->des_dst = frag->rdma_segs;
|
||||
des->des_dst_cnt = hdr->hdr_seg_cnt;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user