1
1

consolidate credit management and CQ polling code.

This commit was SVN r11622.
Этот коммит содержится в:
Gleb Natapov 2006-09-12 09:17:59 +00:00
родитель 8667648a1b
Коммит 7999c08107
6 изменённых файлов: 227 добавлений и 390 удалений

Просмотреть файл

@ -177,12 +177,13 @@ int mca_btl_openib_size_queues( struct mca_btl_openib_module_t* openib_btl, size
openib_btl->hca->ib_dev_attr.max_cq : min_cq_size;
#if OMPI_MCA_BTL_OPENIB_HAVE_RESIZE_CQ
if(!first_time) {
rc = ibv_resize_cq(openib_btl->ib_cq_lp, mca_btl_openib_component.ib_cq_size);
rc = ibv_resize_cq(openib_btl->ib_cq[BTL_OPENIB_LP_QP], mca_btl_openib_component.ib_cq_size);
if(rc) {
BTL_ERROR(("cannot resize low priority completion queue, error: %d", rc));
return OMPI_ERROR;
}
rc = ibv_resize_cq(openib_btl->ib_cq_hp, mca_btl_openib_component.ib_cq_size);
rc = ibv_resize_cq(openib_btl->ib_cq[BTL_OPENIB_HP_QP],
mca_btl_openib_component.ib_cq_size);
if(rc) {
BTL_ERROR(("cannot resize high priority completion queue, error: %d", rc));
return OMPI_ERROR;
@ -797,16 +798,16 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl)
/* Create the low and high priority queue pairs */
#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3
openib_btl->ib_cq_lp =
openib_btl->ib_cq[BTL_OPENIB_LP_QP] =
ibv_create_cq(openib_btl->hca->ib_dev_context,
mca_btl_openib_component.ib_cq_size, NULL);
#else
openib_btl->ib_cq_lp =
openib_btl->ib_cq[BTL_OPENIB_LP_QP] =
ibv_create_cq(openib_btl->hca->ib_dev_context,
mca_btl_openib_component.ib_cq_size, NULL, NULL, 0);
#endif
if(NULL == openib_btl->ib_cq_lp) {
if(NULL == openib_btl->ib_cq[BTL_OPENIB_LP_QP]) {
BTL_ERROR(("error creating low priority cq for %s errno says %s\n",
ibv_get_device_name(openib_btl->hca->ib_dev),
strerror(errno)));
@ -814,16 +815,16 @@ int mca_btl_openib_create_cq_srq(mca_btl_openib_module_t *openib_btl)
}
#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3
openib_btl->ib_cq_hp =
openib_btl->ib_cq[BTL_OPENIB_HP_QP] =
ibv_create_cq(openib_btl->hca->ib_dev_context,
mca_btl_openib_component.ib_cq_size, NULL);
#else
openib_btl->ib_cq_hp =
openib_btl->ib_cq[BTL_OPENIB_HP_QP] =
ibv_create_cq(openib_btl->hca->ib_dev_context,
mca_btl_openib_component.ib_cq_size, NULL, NULL, 0);
#endif
if(NULL == openib_btl->ib_cq_hp) {
if(NULL == openib_btl->ib_cq[BTL_OPENIB_HP_QP]) {
BTL_ERROR(("error creating high priority cq for %s errno says %s\n",
ibv_get_device_name(openib_btl->hca->ib_dev),
strerror(errno)));

Просмотреть файл

@ -166,8 +166,7 @@ struct mca_btl_openib_module_t {
mca_btl_openib_port_info_t port_info; /* contains only the subnet right now */
mca_btl_openib_hca_t *hca;
uint8_t port_num; /**< ID of the PORT */
struct ibv_cq *ib_cq_hp;
struct ibv_cq *ib_cq_lp;
struct ibv_cq *ib_cq[2];
struct ibv_port_attr ib_port_attr;
uint16_t lid; /**< lid that is actually used (for LMC) */
uint8_t src_path_bits; /**< offset from base lid (for LMC) */
@ -433,9 +432,6 @@ static inline int mca_btl_openib_post_srr(mca_btl_openib_module_t* openib_btl,
for(i = 0; i < num_post; i++) {
OMPI_FREE_LIST_WAIT(free_list, item, rc);
frag = (mca_btl_openib_frag_t*)item;
frag->sg_entry.length = frag->size +
((unsigned char*)frag->segment.seg_addr.pval -
(unsigned char*)frag->hdr);
if(ibv_post_srq_recv(openib_btl->srq[prio], &frag->wr_desc.rd_desc,
&bad_wr)) {
BTL_ERROR(("error posting receive descriptors to shared "

Просмотреть файл

@ -70,10 +70,10 @@ static mca_btl_base_module_t **btl_openib_component_init(
bool enable_mpi_threads);
static void merge_values(ompi_btl_openib_ini_values_t *target,
ompi_btl_openib_ini_values_t *src);
static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl,
static int btl_openib_handle_incoming(mca_btl_openib_module_t *openib_btl,
mca_btl_openib_endpoint_t *endpoint,
mca_btl_openib_frag_t *frag,
size_t byte_len);
size_t byte_len, const int prio);
static char* btl_openib_component_status_to_string(enum ibv_wc_status status);
static int btl_openib_component_progress(void);
static void btl_openib_frag_progress_pending(
@ -691,15 +691,21 @@ static void merge_values(ompi_btl_openib_ini_values_t *target,
}
static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl,
static int btl_openib_handle_incoming(mca_btl_openib_module_t *openib_btl,
mca_btl_openib_endpoint_t *endpoint,
mca_btl_openib_frag_t *frag,
size_t byte_len)
size_t byte_len, const int prio)
{
/* advance the segment address past the header and subtract from the length..*/
frag->segment.seg_len = byte_len -
((unsigned char*)frag->segment.seg_addr.pval -
(unsigned char*) frag->hdr);
ompi_free_list_t *free_list;
if(BTL_OPENIB_HP_QP == prio)
free_list = &openib_btl->recv_free_eager;
else
free_list = &openib_btl->recv_free_max;
/* advance the segment address past the header and subtract from the
* length..*/
frag->segment.seg_len = byte_len - sizeof(mca_btl_openib_header_t);
/* call registered callback */
openib_btl->ib_reg[frag->hdr->tag].cbfunc(&openib_btl->super,
@ -712,12 +718,11 @@ static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl,
BTL_OPENIB_CREDITS(frag->hdr->credits));
else
if(!mca_btl_openib_component.use_srq && frag->hdr->credits > 0)
OPAL_THREAD_ADD32(&endpoint->sd_tokens[BTL_OPENIB_HP_QP],
OPAL_THREAD_ADD32(&endpoint->sd_tokens[prio],
frag->hdr->credits);
if (!MCA_BTL_OPENIB_RDMA_FRAG(frag)) {
OMPI_FREE_LIST_RETURN(&(openib_btl->recv_free_eager),
(ompi_free_list_item_t*) frag);
OMPI_FREE_LIST_RETURN(free_list, (ompi_free_list_item_t*) frag);
} else {
mca_btl_openib_frag_t *tf;
OPAL_THREAD_LOCK(&endpoint->eager_rdma_local.lock);
@ -734,8 +739,9 @@ static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl,
OPAL_THREAD_UNLOCK(&endpoint->eager_rdma_local.lock);
}
if (mca_btl_openib_component.use_eager_rdma &&
!endpoint->eager_rdma_local.base.pval &&
if (!endpoint->eager_rdma_local.base.pval &&
mca_btl_openib_component.use_eager_rdma &&
BTL_OPENIB_HP_QP == prio &&
openib_btl->eager_rdma_buffers_count <
mca_btl_openib_component.max_eager_rdma &&
OPAL_THREAD_ADD32(&endpoint->eager_recv_count, 1) ==
@ -743,32 +749,25 @@ static int btl_openib_handle_incoming_hp(mca_btl_openib_module_t *openib_btl,
mca_btl_openib_endpoint_connect_eager_rdma(endpoint);
}
/* check to see if we need to return credits */
if((endpoint->rd_credits[BTL_OPENIB_HP_QP] >=
mca_btl_openib_component.rd_win ||
endpoint->eager_rdma_local.credits >=
mca_btl_openib_component.rd_win) &&
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP],1) == 1) {
mca_btl_openib_endpoint_send_credits_hp(endpoint);
}
/* repost receive descriptors if receive not by RDMA */
if(!MCA_BTL_OPENIB_RDMA_FRAG(frag)) {
if(mca_btl_openib_component.use_srq) {
OPAL_THREAD_ADD32((int32_t*)&openib_btl->srd_posted[BTL_OPENIB_HP_QP], -1);
mca_btl_openib_post_srr(openib_btl, 0, BTL_OPENIB_HP_QP);
OPAL_THREAD_ADD32((int32_t*)&openib_btl->srd_posted[prio], -1);
mca_btl_openib_post_srr(openib_btl, 0, prio);
} else {
OPAL_THREAD_ADD32((int32_t*)&endpoint->rd_posted[BTL_OPENIB_HP_QP],
-1);
btl_openib_endpoint_post_rr(endpoint, 0, BTL_OPENIB_HP_QP);
OPAL_THREAD_ADD32((int32_t*)&endpoint->rd_posted[prio], -1);
btl_openib_endpoint_post_rr(endpoint, 0, prio);
}
}
/* nothing to progress for SRQ case */
if(!mca_btl_openib_component.use_srq) {
btl_openib_frag_progress_pending(openib_btl, endpoint,
BTL_OPENIB_HP_QP);
btl_openib_frag_progress_pending(openib_btl, endpoint, prio);
}
/* check to see if we need to return credits */
if(btl_openib_check_send_credits(endpoint, prio)) {
mca_btl_openib_endpoint_send_credits(endpoint, prio);
}
return OMPI_SUCCESS;
@ -927,15 +926,16 @@ static void btl_openib_frag_progress_pending(
static int btl_openib_component_progress(void)
{
static char *qp_name[] = {"HP", "LP"};
int i, j, c, qp = 0;
int i, j, c, qp;
int count = 0,ne = 0, ret;
int32_t credits;
mca_btl_openib_frag_t* frag;
mca_btl_openib_endpoint_t* endpoint;
struct ibv_wc wc;
mca_btl_openib_module_t* openib_btl;
/* Poll for RDMA completions - if any succeed, we don't process the slower queues */
/* Poll for RDMA completions - if any succeed, we don't process the slower
* queues.
*/
for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++) {
mca_btl_openib_module_t* openib_btl = &mca_btl_openib_component.openib_btls[i];
@ -971,9 +971,10 @@ static int btl_openib_component_progress(void)
frag->segment.seg_addr.pval = ((unsigned char* )frag->hdr) +
sizeof(mca_btl_openib_header_t);
ret = btl_openib_handle_incoming_hp(openib_btl,
ret = btl_openib_handle_incoming(openib_btl,
frag->endpoint, frag,
size - sizeof(mca_btl_openib_footer_t));
size - sizeof(mca_btl_openib_footer_t),
BTL_OPENIB_HP_QP);
if (ret != MPI_SUCCESS) {
openib_btl->error_cb(&openib_btl->super,
MCA_BTL_ERROR_FLAGS_FATAL);
@ -989,45 +990,57 @@ static int btl_openib_component_progress(void)
for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++) {
openib_btl = &mca_btl_openib_component.openib_btls[i];
/* we have two completion queues, one for "high" priority and one for "low".
* we will check the high priority and process them until there are none left.
* note that low priority messages are only processed one per progress call.
*/
/* We have two completion queues, one for "high" priority and one for
* "low". Check high priority before low priority */
for(qp = 0; qp < 2; qp++) {
ne = ibv_poll_cq(openib_btl->ib_cq[qp], 1, &wc);
ne=ibv_poll_cq(openib_btl->ib_cq_hp, 1, &wc );
if(0 == ne)
continue;
if(ne != 0) {
if(ne < 0 || wc.status != IBV_WC_SUCCESS)
goto error_hp;
goto error;
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
endpoint = frag->endpoint;
/* Handle work completions */
switch(wc.opcode) {
case IBV_WC_RDMA_READ:
assert(BTL_OPENIB_LP_QP == qp);
OPAL_THREAD_ADD32(&frag->endpoint->get_tokens, 1);
/* fall through */
case IBV_WC_RDMA_WRITE:
if(BTL_OPENIB_LP_QP == qp) {
/* process a completed write */
frag->base.des_cbfunc(&openib_btl->super, endpoint,
&frag->base, OMPI_SUCCESS);
/* return send wqe */
OPAL_THREAD_ADD32(&endpoint->sd_wqe[qp], 1);
/* check for pending frags */
btl_openib_frag_progress_pending(openib_btl, endpoint, qp);
count++;
break;
}
/* fall through for high prio QP */
case IBV_WC_SEND:
/* Process a completed send */
frag->base.des_cbfunc(&openib_btl->super, endpoint, &frag->base,
OMPI_SUCCESS);
/* return send wqe */
OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_HP_QP], 1);
OPAL_THREAD_ADD32(&endpoint->sd_wqe[qp], 1);
if(mca_btl_openib_component.use_srq)
OPAL_THREAD_ADD32(&openib_btl->sd_tokens[BTL_OPENIB_HP_QP], 1);
OPAL_THREAD_ADD32(&openib_btl->sd_tokens[qp], 1);
/* check to see if we need to progress any pending desciptors */
btl_openib_frag_progress_pending(openib_btl, endpoint,
BTL_OPENIB_HP_QP);
if(!mca_btl_openib_component.use_srq) {
/* check to see if we need to return credits */
if((endpoint->rd_credits[BTL_OPENIB_HP_QP] >=
mca_btl_openib_component.rd_win ||
endpoint->eager_rdma_local.credits >=
mca_btl_openib_component.rd_win) &&
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP], 1) == 1) {
mca_btl_openib_endpoint_send_credits_hp(endpoint);
}
btl_openib_frag_progress_pending(openib_btl, endpoint, qp);
/* check to see if we need to return credits */
if(btl_openib_check_send_credits(endpoint, qp)) {
mca_btl_openib_endpoint_send_credits(endpoint, qp);
}
count++;
@ -1035,131 +1048,33 @@ static int btl_openib_component_progress(void)
case IBV_WC_RECV:
if(wc.wc_flags & IBV_WC_WITH_IMM) {
endpoint = (mca_btl_openib_endpoint_t*)orte_pointer_array_get_item(openib_btl->endpoints, wc.imm_data);
endpoint = (mca_btl_openib_endpoint_t*)
orte_pointer_array_get_item(openib_btl->endpoints,
wc.imm_data);
frag->endpoint = endpoint;
}
/* Process a RECV */
ret = btl_openib_handle_incoming_hp(openib_btl, endpoint, frag,
wc.byte_len);
ret = btl_openib_handle_incoming(openib_btl, endpoint, frag,
wc.byte_len, qp);
if (ret != OMPI_SUCCESS) {
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
openib_btl->error_cb(&openib_btl->super,
MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
}
count++;
break;
default:
BTL_ERROR(("Unhandled work completion opcode is %d", wc.opcode));
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
break;
}
}
ne=ibv_poll_cq(openib_btl->ib_cq_lp, 1, &wc);
if(ne != 0) {
if(ne < 0 || wc.status != IBV_WC_SUCCESS)
goto error_lp;
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
endpoint = frag->endpoint;
/* Handle n/w completions */
switch(wc.opcode) {
case IBV_WC_SEND:
/* Process a completed send - receiver must return tokens */
frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
/* return send wqe */
OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_LP_QP], 1);
if(mca_btl_openib_component.use_srq)
OPAL_THREAD_ADD32(&openib_btl->sd_tokens[BTL_OPENIB_LP_QP], 1);
/* check to see if we need to progress any pending desciptors */
btl_openib_frag_progress_pending(openib_btl, endpoint,
BTL_OPENIB_LP_QP);
if(!mca_btl_openib_component.use_srq) {
/* check to see if we need to return credits */
if( endpoint->rd_credits[BTL_OPENIB_LP_QP] >=
mca_btl_openib_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP], 1) == 1) {
mca_btl_openib_endpoint_send_credits_lp(endpoint);
}
}
count++;
break;
case IBV_WC_RDMA_READ:
OPAL_THREAD_ADD32(&frag->endpoint->get_tokens, 1);
/* fall through */
case IBV_WC_RDMA_WRITE:
/* process a completed write */
frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
/* return send wqe */
OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_LP_QP], 1);
/* check for pending frags */
btl_openib_frag_progress_pending(openib_btl, endpoint,
BTL_OPENIB_LP_QP);
count++;
break;
case IBV_WC_RECV:
/* Process a RECV */
credits = frag->hdr->credits;
/* advance the segment address past the header and subtract from the length..*/
frag->segment.seg_len = wc.byte_len-
((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
/* call registered callback */
openib_btl->ib_reg[frag->hdr->tag].cbfunc(&openib_btl->super,
frag->hdr->tag,
&frag->base,
openib_btl->ib_reg[frag->hdr->tag].cbdata);
OMPI_FREE_LIST_RETURN(&(openib_btl->recv_free_max), (ompi_free_list_item_t*) frag);
if(mca_btl_openib_component.use_srq) {
/* repost receive descriptors */
OPAL_THREAD_ADD32((int32_t*)&openib_btl->srd_posted[BTL_OPENIB_LP_QP], -1);
mca_btl_openib_post_srr(openib_btl, 0, BTL_OPENIB_LP_QP);
} else {
/* repost receive descriptors */
OPAL_THREAD_ADD32((int32_t*)
&endpoint->rd_posted[BTL_OPENIB_LP_QP], -1);
btl_openib_endpoint_post_rr(endpoint, 0, BTL_OPENIB_LP_QP);
OPAL_THREAD_ADD32(&endpoint->sd_tokens[BTL_OPENIB_LP_QP],
credits);
/* check to see if we need to progress any pending desciptors */
btl_openib_frag_progress_pending(openib_btl, endpoint,
BTL_OPENIB_LP_QP);
/* check to see if we need to return credits */
if(endpoint->rd_credits[BTL_OPENIB_LP_QP] >=
mca_btl_openib_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP], 1) == 1) {
mca_btl_openib_endpoint_send_credits_lp(endpoint);
}
}
count++;
break;
default:
BTL_ERROR(("Unhandled work completion opcode is %d", wc.opcode));
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
BTL_ERROR(("Unhandled work completion opcode is %d",
wc.opcode));
openib_btl->error_cb(&openib_btl->super,
MCA_BTL_ERROR_FLAGS_FATAL);
break;
}
}
}
return count;
error_lp:
qp = 1;
error_hp:
error:
if(ne < 0){
BTL_ERROR(("error polling %s CQ with %d errno says %s\n",
qp_name[qp], ne, strerror(errno)));

Просмотреть файл

@ -114,43 +114,28 @@ static inline int mca_btl_openib_endpoint_post_send(mca_btl_openib_module_t* ope
mca_btl_openib_endpoint_t * endpoint,
mca_btl_openib_frag_t * frag)
{
int do_rdma = 0;
struct ibv_qp* ib_qp;
int do_rdma = 0, prio;
struct ibv_send_wr* bad_wr;
frag->sg_entry.addr = (unsigned long) frag->hdr;
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) {
assert(frag->size <= openib_btl->super.btl_eager_limit);
if(btl_openib_acquire_send_resources(openib_btl, endpoint, frag,
BTL_OPENIB_HP_QP, &do_rdma) == OMPI_ERR_OUT_OF_RESOURCE)
return MPI_SUCCESS;
prio = (frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) ?
BTL_OPENIB_HP_QP : BTL_OPENIB_LP_QP;
if(endpoint->eager_rdma_local.credits > 0) {
frag->hdr->credits = endpoint->eager_rdma_local.credits;
OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits,
-frag->hdr->credits);
frag->hdr->credits |= BTL_OPENIB_RDMA_CREDITS_FLAG;
} else if(endpoint->rd_credits[BTL_OPENIB_HP_QP] > 0) {
frag->hdr->credits = endpoint->rd_credits[BTL_OPENIB_HP_QP];
OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_HP_QP],
-frag->hdr->credits);
} else {
frag->hdr->credits = 0;
}
ib_qp = endpoint->lcl_qp[BTL_OPENIB_HP_QP];
if(btl_openib_acquire_send_resources(openib_btl, endpoint, frag,
prio, &do_rdma) == OMPI_ERR_OUT_OF_RESOURCE)
return MPI_SUCCESS;
if(BTL_OPENIB_HP_QP == prio && endpoint->eager_rdma_local.credits > 0) {
frag->hdr->credits = endpoint->eager_rdma_local.credits;
OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits,
-frag->hdr->credits);
frag->hdr->credits |= BTL_OPENIB_RDMA_CREDITS_FLAG;
} else if(endpoint->rd_credits[prio] > 0) {
frag->hdr->credits = endpoint->rd_credits[prio];
OPAL_THREAD_ADD32(&endpoint->rd_credits[prio], -frag->hdr->credits);
} else {
if(btl_openib_acquire_send_resources(openib_btl, endpoint, frag,
BTL_OPENIB_LP_QP, NULL) == OMPI_ERR_OUT_OF_RESOURCE)
return MPI_SUCCESS;
if(endpoint->rd_credits[BTL_OPENIB_LP_QP] > 0) {
frag->hdr->credits = endpoint->rd_credits[BTL_OPENIB_LP_QP];
OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_LP_QP],
-frag->hdr->credits);
} else {
frag->hdr->credits = 0;
}
ib_qp = endpoint->lcl_qp[BTL_OPENIB_LP_QP];
frag->hdr->credits = 0;
}
frag->sg_entry.length =
@ -184,7 +169,7 @@ static inline int mca_btl_openib_endpoint_post_send(mca_btl_openib_module_t* ope
mca_btl_openib_component.eager_limit +
sizeof(mca_btl_openib_footer_t);
frag->wr_desc.sr_desc.wr.rdma.remote_addr -= frag->sg_entry.length;
MCA_BTL_OPENIB_RDMA_NEXT_INDEX (endpoint->eager_rdma_remote.head);
MCA_BTL_OPENIB_RDMA_NEXT_INDEX(endpoint->eager_rdma_remote.head);
} else {
if(mca_btl_openib_component.use_srq) {
frag->wr_desc.sr_desc.opcode = IBV_WR_SEND_WITH_IMM;
@ -194,9 +179,24 @@ static inline int mca_btl_openib_endpoint_post_send(mca_btl_openib_module_t* ope
}
}
if(ibv_post_send(ib_qp,
&frag->wr_desc.sr_desc,
&bad_wr)) {
if(ibv_post_send(endpoint->lcl_qp[prio], &frag->wr_desc.sr_desc,
&bad_wr)) {
if(BTL_OPENIB_IS_RDMA_CREDITS(frag->hdr->credits)) {
OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits,
BTL_OPENIB_CREDITS(frag->hdr->credits));
} else {
OPAL_THREAD_ADD32(&endpoint->rd_credits[prio], frag->hdr->credits);
}
OPAL_THREAD_ADD32(&endpoint->sd_wqe[prio], 1);
if(do_rdma) {
OPAL_THREAD_ADD32(&endpoint->eager_rdma_remote.tokens, 1);
} else {
if(mca_btl_openib_component.use_srq) {
OPAL_THREAD_ADD32(&openib_btl->sd_tokens[prio], 1);
} else {
OPAL_THREAD_ADD32(&endpoint->sd_tokens[prio], 1);
}
}
BTL_ERROR(("error posting send request errno says %s\n",
strerror(errno)));
return OMPI_ERROR;
@ -417,11 +417,11 @@ static int mca_btl_openib_endpoint_start_connect(mca_btl_base_endpoint_t* endpoi
/* Create the High Priority Queue Pair */
if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl,
openib_btl->hca->ib_pd,
openib_btl->ib_cq_hp,
openib_btl->srq[BTL_OPENIB_HP_QP],
endpoint->lcl_qp_attr_hp,
&endpoint->lcl_qp[BTL_OPENIB_HP_QP]))) {
openib_btl->hca->ib_pd,
openib_btl->ib_cq[BTL_OPENIB_HP_QP],
openib_btl->srq[BTL_OPENIB_HP_QP],
endpoint->lcl_qp_attr_hp,
&endpoint->lcl_qp[BTL_OPENIB_HP_QP]))) {
BTL_ERROR(("error creating queue pair, error code %d", rc));
return rc;
}
@ -430,11 +430,11 @@ static int mca_btl_openib_endpoint_start_connect(mca_btl_base_endpoint_t* endpoi
/* Create the Low Priority Queue Pair */
if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl,
openib_btl->hca->ib_pd,
openib_btl->ib_cq_lp,
openib_btl->srq[BTL_OPENIB_LP_QP],
endpoint->lcl_qp_attr_lp,
&endpoint->lcl_qp[BTL_OPENIB_LP_QP]))) {
openib_btl->hca->ib_pd,
openib_btl->ib_cq[BTL_OPENIB_LP_QP],
openib_btl->srq[BTL_OPENIB_LP_QP],
endpoint->lcl_qp_attr_lp,
&endpoint->lcl_qp[BTL_OPENIB_LP_QP]))) {
BTL_ERROR(("error creating queue pair, error code %d", rc));
return rc;
}
@ -467,12 +467,11 @@ static int mca_btl_openib_endpoint_reply_start_connect(mca_btl_openib_endpoint_t
/* Create the High Priority Queue Pair */
if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl,
openib_btl->hca->ib_pd,
openib_btl->ib_cq_hp,
openib_btl->srq[BTL_OPENIB_HP_QP],
endpoint->lcl_qp_attr_hp,
&endpoint->lcl_qp[BTL_OPENIB_HP_QP]))) {
openib_btl->hca->ib_pd,
openib_btl->ib_cq[BTL_OPENIB_HP_QP],
openib_btl->srq[BTL_OPENIB_HP_QP],
endpoint->lcl_qp_attr_hp,
&endpoint->lcl_qp[BTL_OPENIB_HP_QP]))) {
BTL_ERROR(("error creating queue pair, error code %d", rc));
return rc;
}
@ -481,12 +480,11 @@ static int mca_btl_openib_endpoint_reply_start_connect(mca_btl_openib_endpoint_t
/* Create the Low Priority Queue Pair */
if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl,
openib_btl->hca->ib_pd,
openib_btl->ib_cq_lp,
openib_btl->srq[BTL_OPENIB_LP_QP],
endpoint->lcl_qp_attr_lp,
&endpoint->lcl_qp[BTL_OPENIB_LP_QP]))) {
openib_btl->hca->ib_pd,
openib_btl->ib_cq[BTL_OPENIB_LP_QP],
openib_btl->srq[BTL_OPENIB_LP_QP],
endpoint->lcl_qp_attr_lp,
&endpoint->lcl_qp[BTL_OPENIB_LP_QP]))) {
BTL_ERROR(("error creating queue pair, error code %d", rc));
return rc;
}
@ -870,34 +868,30 @@ int mca_btl_openib_endpoint_connect(
mca_btl_openib_module_t* openib_btl = (mca_btl_openib_module_t*) endpoint->endpoint_btl;
/* Connection establishment RC */
rc = mca_btl_openib_endpoint_qp_init_query(
openib_btl,
endpoint->lcl_qp[BTL_OPENIB_HP_QP],
endpoint->lcl_qp_attr_hp,
endpoint->lcl_psn_hp,
endpoint->rem_info.rem_qp_num_hp,
endpoint->rem_info.rem_psn_hp,
endpoint->rem_info.rem_lid,
endpoint->rem_info.rem_mtu,
openib_btl->port_num
);
rc = mca_btl_openib_endpoint_qp_init_query(openib_btl,
endpoint->lcl_qp[BTL_OPENIB_HP_QP],
endpoint->lcl_qp_attr_hp,
endpoint->lcl_psn_hp,
endpoint->rem_info.rem_qp_num_hp,
endpoint->rem_info.rem_psn_hp,
endpoint->rem_info.rem_lid,
endpoint->rem_info.rem_mtu,
openib_btl->port_num);
if(rc != OMPI_SUCCESS) {
return rc;
}
rc = mca_btl_openib_endpoint_qp_init_query(
openib_btl,
endpoint->lcl_qp[BTL_OPENIB_LP_QP],
endpoint->lcl_qp_attr_lp,
endpoint->lcl_psn_lp,
endpoint->rem_info.rem_qp_num_lp,
endpoint->rem_info.rem_psn_lp,
endpoint->rem_info.rem_lid,
endpoint->rem_info.rem_mtu,
openib_btl->port_num
);
rc = mca_btl_openib_endpoint_qp_init_query(openib_btl,
endpoint->lcl_qp[BTL_OPENIB_LP_QP],
endpoint->lcl_qp_attr_lp,
endpoint->lcl_psn_lp,
endpoint->rem_info.rem_qp_num_lp,
endpoint->rem_info.rem_psn_lp,
endpoint->rem_info.rem_lid,
endpoint->rem_info.rem_mtu,
openib_btl->port_num);
@ -905,8 +899,10 @@ int mca_btl_openib_endpoint_connect(
return rc;
}
MCA_BTL_IB_FRAG_ALLOC_CREDIT_WAIT(openib_btl, endpoint->hp_credit_frag, rc);
MCA_BTL_IB_FRAG_ALLOC_CREDIT_WAIT(openib_btl, endpoint->lp_credit_frag, rc);
MCA_BTL_IB_FRAG_ALLOC_CREDIT_WAIT(openib_btl,
endpoint->credit_frag[BTL_OPENIB_HP_QP], rc);
MCA_BTL_IB_FRAG_ALLOC_CREDIT_WAIT(openib_btl,
endpoint->credit_frag[BTL_OPENIB_LP_QP], rc);
if(mca_btl_openib_component.use_srq) {
mca_btl_openib_post_srr(openib_btl, 1, BTL_OPENIB_HP_QP);
@ -1055,24 +1051,27 @@ int mca_btl_openib_endpoint_qp_init_query(
* Return control fragment.
*/
static void mca_btl_openib_endpoint_credits_lp(
static void mca_btl_openib_endpoint_credits(
mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
int32_t credits;
int32_t credits, prio;
if((void*)descriptor == (void*)endpoint->credit_frag[BTL_OPENIB_LP_QP])
prio = BTL_OPENIB_LP_QP;
else
prio = BTL_OPENIB_HP_QP;
/* we don't acquire a wqe or token for credit message - so decrement */
OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_LP_QP],-1);
OPAL_THREAD_ADD32(&endpoint->sd_wqe[prio],-1);
/* check to see if there are addditional credits to return */
if ((credits = OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP],-1)) > 0) {
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP],-credits);
if (endpoint->rd_credits[BTL_OPENIB_LP_QP] >=
mca_btl_openib_component.rd_win &&
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP],1) == 1) {
mca_btl_openib_endpoint_send_credits_lp(endpoint);
if((credits = OPAL_THREAD_ADD32(&endpoint->sd_credits[prio],-1)) > 0) {
OPAL_THREAD_ADD32(&endpoint->sd_credits[prio], -credits);
if(btl_openib_check_send_credits(endpoint, prio)) {
mca_btl_openib_endpoint_send_credits(endpoint, prio);
}
}
}
@ -1081,120 +1080,38 @@ static void mca_btl_openib_endpoint_credits_lp(
* Return credits to peer
*/
void mca_btl_openib_endpoint_send_credits_lp(
mca_btl_openib_endpoint_t* endpoint)
void mca_btl_openib_endpoint_send_credits(mca_btl_openib_endpoint_t* endpoint,
const int prio)
{
mca_btl_openib_module_t* openib_btl = endpoint->endpoint_btl;
mca_btl_openib_frag_t* frag;
struct ibv_send_wr* bad_wr;
mca_btl_openib_rdma_credits_header_t *credits_hdr;
frag = endpoint->lp_credit_frag;
credits_hdr = (mca_btl_openib_rdma_credits_header_t*)frag->segment.seg_addr.pval;
frag = endpoint->credit_frag[prio];
credits_hdr =
(mca_btl_openib_rdma_credits_header_t*)frag->segment.seg_addr.pval;
frag->base.des_cbfunc = mca_btl_openib_endpoint_credits_lp;
frag->base.des_cbfunc = mca_btl_openib_endpoint_credits;
frag->base.des_cbdata = NULL;
frag->endpoint = endpoint;
frag->hdr->tag = MCA_BTL_TAG_BTL;
if(endpoint->rd_credits[BTL_OPENIB_LP_QP] > 0) {
frag->hdr->credits = endpoint->rd_credits[BTL_OPENIB_LP_QP];
OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_LP_QP],
-frag->hdr->credits);
/* send credits for high/low prios */
if(endpoint->rd_credits[prio] > 0) {
frag->hdr->credits = endpoint->rd_credits[prio];
OPAL_THREAD_ADD32(&endpoint->rd_credits[prio], -frag->hdr->credits);
} else {
frag->hdr->credits = 0;
}
credits_hdr->control.type = MCA_BTL_OPENIB_CONTROL_CREDITS;
credits_hdr->rdma_credits = 0;
if(mca_btl_openib_component.use_srq) {
frag->wr_desc.sr_desc.opcode = IBV_WR_SEND_WITH_IMM;
frag->wr_desc.sr_desc.imm_data = endpoint->rem_info.rem_index;
} else {
frag->wr_desc.sr_desc.opcode = IBV_WR_SEND;
}
frag->sg_entry.length = sizeof(mca_btl_openib_header_t) +
sizeof(mca_btl_openib_rdma_credits_header_t);
frag->sg_entry.addr = (unsigned long) frag->hdr;
if(frag->sg_entry.length <= openib_btl->ib_inline_max) {
frag->wr_desc.sr_desc.send_flags = IBV_SEND_INLINE | IBV_SEND_SIGNALED;
} else {
frag->wr_desc.sr_desc.send_flags = IBV_SEND_SIGNALED;
}
if(ibv_post_send(endpoint->lcl_qp[BTL_OPENIB_LP_QP],
&frag->wr_desc.sr_desc,
&bad_wr)) {
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_LP_QP], -1);
OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_LP_QP],
frag->hdr->credits);
MCA_BTL_IB_FRAG_RETURN(openib_btl, frag);
BTL_ERROR(("error posting send request errno %d says %s", strerror(errno)));
return;
}
}
/**
* Return control fragment.
*/
static void mca_btl_openib_endpoint_credits_hp(
mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
int32_t credits;
/* we don't acquire a wqe or token for credit message - so decrement */
OPAL_THREAD_ADD32(&endpoint->sd_wqe[BTL_OPENIB_HP_QP],-1);
/* check to see if there are addditional credits to return */
if ((credits = OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP],-1)) > 0) {
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP],-credits);
if ((endpoint->rd_credits[BTL_OPENIB_HP_QP] >= mca_btl_openib_component.rd_win ||
endpoint->eager_rdma_local.credits >= mca_btl_openib_component.rd_win) &&
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP],1) == 1) {
mca_btl_openib_endpoint_send_credits_hp(endpoint);
}
}
}
/**
* Return credits to peer
*/
void mca_btl_openib_endpoint_send_credits_hp(
mca_btl_openib_endpoint_t* endpoint)
{
mca_btl_openib_module_t* openib_btl = endpoint->endpoint_btl;
mca_btl_openib_frag_t* frag;
struct ibv_send_wr* bad_wr;
mca_btl_openib_rdma_credits_header_t *credits_hdr;
frag = endpoint->hp_credit_frag;
credits_hdr = (mca_btl_openib_rdma_credits_header_t*)frag->segment.seg_addr.pval;
frag->base.des_cbfunc = mca_btl_openib_endpoint_credits_hp;
frag->base.des_cbdata = NULL;
frag->endpoint = endpoint;
frag->hdr->tag = MCA_BTL_TAG_BTL;
if(endpoint->rd_credits[BTL_OPENIB_HP_QP] > 0) {
frag->hdr->credits = endpoint->rd_credits[BTL_OPENIB_HP_QP];
OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_HP_QP],
-frag->hdr->credits);
} else
frag->hdr->credits = 0;
if(endpoint->eager_rdma_local.credits > 0) {
/* send eager RDMA credits only for high prio */
if(BTL_OPENIB_HP_QP == prio && endpoint->eager_rdma_local.credits > 0) {
credits_hdr->rdma_credits = endpoint->eager_rdma_local.credits;
OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits,
-credits_hdr->rdma_credits);
} else
} else {
credits_hdr->rdma_credits = 0;
}
credits_hdr->control.type = MCA_BTL_OPENIB_CONTROL_CREDITS;
if(mca_btl_openib_component.use_srq) {
@ -1213,16 +1130,13 @@ void mca_btl_openib_endpoint_send_credits_hp(
frag->wr_desc.sr_desc.send_flags = IBV_SEND_SIGNALED;
}
if(ibv_post_send(endpoint->lcl_qp[BTL_OPENIB_HP_QP],
&frag->wr_desc.sr_desc,
&bad_wr)) {
OPAL_THREAD_ADD32(&endpoint->sd_credits[BTL_OPENIB_HP_QP], -1);
OPAL_THREAD_ADD32(&endpoint->rd_credits[BTL_OPENIB_HP_QP],
frag->hdr->credits);
MCA_BTL_IB_FRAG_RETURN(openib_btl, frag);
BTL_ERROR(("error posting send request errno %d says %s", errno,
if(ibv_post_send(endpoint->lcl_qp[prio], &frag->wr_desc.sr_desc, &bad_wr)) {
OPAL_THREAD_ADD32(&endpoint->sd_credits[prio], -1);
OPAL_THREAD_ADD32(&endpoint->rd_credits[prio], frag->hdr->credits);
OPAL_THREAD_ADD32(&endpoint->eager_rdma_local.credits,
credits_hdr->rdma_credits);
BTL_ERROR(("error posting send request errno %d says %s",
strerror(errno)));
return;
}
}
@ -1260,8 +1174,7 @@ static int mca_btl_openib_endpoint_send_eager_rdma(
rdma_hdr->rkey = endpoint->eager_rdma_local.reg->mr->rkey;
rdma_hdr->rdma_start.pval = endpoint->eager_rdma_local.base.pval;
frag->segment.seg_len = sizeof(mca_btl_openib_eager_rdma_header_t);
if (mca_btl_openib_endpoint_send(endpoint, frag) !=
OMPI_SUCCESS) {
if (mca_btl_openib_endpoint_send(endpoint, frag) != OMPI_SUCCESS) {
MCA_BTL_IB_FRAG_RETURN(openib_btl, frag);
BTL_ERROR(("Error sending RDMA buffer", strerror(errno)));
return -1;

Просмотреть файл

@ -154,8 +154,7 @@ struct mca_btl_base_endpoint_t {
/**< info about local RDMA buffer */
int32_t eager_rdma_index; /**< index into RDMA buffers pointer array */
uint32_t index; /**< index of the endpoint in endpoints array */
struct mca_btl_openib_frag_t *hp_credit_frag; /**< frag for sending explicit high priority credits */
struct mca_btl_openib_frag_t *lp_credit_frag; /**< frag for sending explicit low priority credits */
struct mca_btl_openib_frag_t *credit_frag[2]; /**< frags for sending explicit high priority credits */
};
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
@ -166,8 +165,7 @@ OBJ_CLASS_DECLARATION(mca_btl_openib_endpoint_t);
int mca_btl_openib_endpoint_send(mca_btl_base_endpoint_t* endpoint, struct mca_btl_openib_frag_t* frag);
int mca_btl_openib_endpoint_connect(mca_btl_base_endpoint_t*);
void mca_btl_openib_post_recv(void);
void mca_btl_openib_endpoint_send_credits_hp(mca_btl_base_endpoint_t*);
void mca_btl_openib_endpoint_send_credits_lp(mca_btl_base_endpoint_t*);
void mca_btl_openib_endpoint_send_credits(mca_btl_base_endpoint_t*, const int);
void mca_btl_openib_endpoint_connect_eager_rdma(mca_btl_openib_endpoint_t*);
static inline int btl_openib_endpoint_post_rr(mca_btl_base_endpoint_t *endpoint,
@ -195,9 +193,6 @@ static inline int btl_openib_endpoint_post_rr(mca_btl_base_endpoint_t *endpoint,
OMPI_FREE_LIST_WAIT(free_list, item, rc);
frag = (mca_btl_openib_frag_t*)item;
frag->endpoint = endpoint;
frag->sg_entry.length = frag->size +
((unsigned char*)frag->segment.seg_addr.pval -
(unsigned char*)frag->hdr);
if(ibv_post_recv(endpoint->lcl_qp[prio], &frag->wr_desc.rd_desc,
&bad_wr)) {
BTL_ERROR(("error posting receive errno says %s\n",
@ -212,6 +207,23 @@ static inline int btl_openib_endpoint_post_rr(mca_btl_base_endpoint_t *endpoint,
return OMPI_SUCCESS;
}
static inline int btl_openib_check_send_credits(
mca_btl_openib_endpoint_t *endpoint, const int prio)
{
if(!mca_btl_openib_component.use_srq &&
endpoint->rd_credits[prio] >= mca_btl_openib_component.rd_win)
return OPAL_THREAD_ADD32(&endpoint->sd_credits[prio], 1) == 1;
if(BTL_OPENIB_LP_QP == prio) /* nothing more for low prio QP */
return 0;
/* for high prio check eager RDMA credits */
if(endpoint->eager_rdma_local.credits >= mca_btl_openib_component.rd_win)
return OPAL_THREAD_ADD32(&endpoint->sd_credits[prio], 1) == 1;
return 0;
}
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -38,7 +38,7 @@ static void mca_btl_openib_frag_common_constructor( mca_btl_openib_frag_t* frag)
}
frag->segment.seg_len = frag->size;
frag->sg_entry.addr = (unsigned long) frag->hdr;
frag->sg_entry.length = frag->size;
frag->sg_entry.length = frag->size + sizeof(mca_btl_openib_header_t);
frag->base.des_flags = 0;
}