1
1

port of flow control from mvapi

This commit was SVN r8102.
Этот коммит содержится в:
Tim Woodall 2005-11-10 20:15:02 +00:00
родитель 7f20198d49
Коммит 4a06e8463c
7 изменённых файлов: 570 добавлений и 436 удалений

Просмотреть файл

@ -124,10 +124,10 @@ int mca_btl_openib_add_procs(
if( 0 == openib_btl->num_peers ) {
openib_btl->num_peers += nprocs;
if(mca_btl_openib_component.use_srq) {
openib_btl->rd_buf_max = mca_btl_openib_component.ib_rr_buf_max + log2(nprocs) * mca_btl_openib_component.rd_per_peer;
free(openib_btl->rr_desc_post);
openib_btl->rr_desc_post = (struct ibv_recv_wr*) malloc((openib_btl->rd_buf_max * sizeof(struct ibv_recv_wr)));
openib_btl->rd_buf_min = openib_btl->rd_buf_max / 2;
openib_btl->rd_num = mca_btl_openib_component.rd_num + log2(nprocs) * mca_btl_openib_component.srq_rd_per_peer;
free(openib_btl->rd_desc_post);
openib_btl->rd_desc_post = (struct ibv_recv_wr*) malloc((openib_btl->rd_num * sizeof(struct ibv_recv_wr)));
openib_btl->rd_low = openib_btl->rd_num * 0.75;
}
}
@ -562,12 +562,9 @@ int mca_btl_openib_send(
mca_btl_openib_frag_t* frag = (mca_btl_openib_frag_t*)descriptor;
frag->endpoint = endpoint;
frag->hdr->tag = tag;
frag->type = MCA_BTL_IB_FRAG_SEND;
frag->rc = mca_btl_openib_endpoint_send(endpoint, frag);
return frag->rc;
return mca_btl_openib_endpoint_send(endpoint, frag);
}
/*
@ -578,23 +575,30 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor)
{
int rc;
struct ibv_send_wr* bad_wr;
mca_btl_openib_frag_t* frag = (mca_btl_openib_frag_t*) descriptor;
mca_btl_openib_module_t* openib_btl = (mca_btl_openib_module_t*) btl;
frag->endpoint = endpoint;
frag->wr_desc.sr_desc.opcode = IBV_WR_RDMA_WRITE;
if(!mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) {
BTL_VERBOSE(("Queing because no rdma write tokens \n"));
BTL_OPENIB_INSERT_PENDING(frag, endpoint->pending_frags_lp,
endpoint->wr_sq_tokens_lp, endpoint->endpoint_lock);
return OMPI_SUCCESS;
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1);
rc = OMPI_SUCCESS;
} else if(mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_lp,-1) < 0) {
opal_list_append(&openib_btl->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_lp,1);
return OMPI_SUCCESS;
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_LOCK(&openib_btl->ib_lock);
opal_list_append(&openib_btl->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&openib_btl->ib_lock);
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_lp,1);
rc = OMPI_SUCCESS;
} else {
frag->wr_desc.sr_desc.send_flags = IBV_SEND_SIGNALED;
@ -609,12 +613,13 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl,
, frag->sg_entry.addr
, frag->sg_entry.length));
if(ibv_post_send(endpoint->lcl_qp_low,
if(ibv_post_send(endpoint->lcl_qp_lp,
&frag->wr_desc.sr_desc,
&bad_wr)){
BTL_ERROR(("error posting send request errno says %s", strerror(errno)));
return OMPI_ERROR;
}
rc = OMPI_ERROR;
} else {
rc = OMPI_SUCCESS;
}
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
if(mca_btl_openib_component.use_srq) {
@ -628,7 +633,7 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl,
}
#endif
}
return OMPI_SUCCESS;
return rc;
}
@ -640,24 +645,40 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor)
{
int rc;
struct ibv_send_wr* bad_wr;
mca_btl_openib_frag_t* frag = (mca_btl_openib_frag_t*) descriptor;
mca_btl_openib_module_t* openib_btl = (mca_btl_openib_module_t*) btl;
frag->endpoint = endpoint;
frag->wr_desc.sr_desc.opcode = IBV_WR_RDMA_READ;
/* atomically test and acquire a token */
if(!mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) {
BTL_VERBOSE(("Queing because no rdma write tokens \n"));
BTL_OPENIB_INSERT_PENDING(frag, endpoint->pending_frags_lp,
endpoint->wr_sq_tokens_lp, endpoint->endpoint_lock);
return OMPI_SUCCESS;
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1);
rc = OMPI_SUCCESS;
} else if(mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_lp,-1) < 0) {
opal_list_append(&openib_btl->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_lp,1);
return OMPI_SUCCESS;
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_LOCK(&openib_btl->ib_lock);
opal_list_append(&openib_btl->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_UNLOCK(&openib_btl->ib_lock);
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_lp,1);
rc = OMPI_SUCCESS;
} else if(OPAL_THREAD_ADD32(&endpoint->get_tokens,-1) < 0) {
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
OPAL_THREAD_ADD32(&endpoint->get_tokens,1);
rc = OMPI_SUCCESS;
} else {
frag->wr_desc.sr_desc.send_flags = IBV_SEND_SIGNALED;
@ -672,13 +693,14 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl,
, frag->sg_entry.addr
, frag->sg_entry.length));
if(ibv_post_send(endpoint->lcl_qp_low,
if(ibv_post_send(endpoint->lcl_qp_lp,
&frag->wr_desc.sr_desc,
&bad_wr)){
BTL_ERROR(("error posting send request errno says %s", strerror(errno)));
return OMPI_ERROR;
}
rc = ORTE_ERROR;
} else {
rc = ORTE_SUCCESS;
}
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
if(mca_btl_openib_component.use_srq) {
@ -692,8 +714,7 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl,
}
#endif
}
return OMPI_SUCCESS;
return rc;
}
/*
@ -722,41 +743,41 @@ int mca_btl_openib_module_init(mca_btl_openib_module_t *openib_btl)
if(mca_btl_openib_component.use_srq) {
struct ibv_srq_init_attr attr;
attr.attr.max_wr = mca_btl_openib_component.ib_wq_size;
attr.attr.max_wr = openib_btl->rd_num;
attr.attr.max_sge = mca_btl_openib_component.ib_sg_list_size;
openib_btl->srr_posted_high = 0;
openib_btl->srr_posted_low = 0;
openib_btl->srd_posted_hp = 0;
openib_btl->srd_posted_lp = 0;
openib_btl->srq_high = ibv_create_srq(openib_btl->ib_pd, &attr);
if(NULL == openib_btl->srq_high) {
openib_btl->srq_hp = ibv_create_srq(openib_btl->ib_pd, &attr);
if(NULL == openib_btl->srq_hp) {
BTL_ERROR(("error in ibv_create_srq\n"));
return OMPI_ERROR;
}
openib_btl->srq_low = ibv_create_srq(openib_btl->ib_pd, &attr);
if(NULL == openib_btl->srq_high) {
openib_btl->srq_lp = ibv_create_srq(openib_btl->ib_pd, &attr);
if(NULL == openib_btl->srq_hp) {
BTL_ERROR(("error in ibv_create_srq\n"));
return OMPI_ERROR;
}
} else {
openib_btl->srq_high = NULL;
openib_btl->srq_low = NULL;
openib_btl->srq_hp = NULL;
openib_btl->srq_lp = NULL;
}
#endif
/* Create the low and high priority queue pairs */
#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3
openib_btl->ib_cq_low =
openib_btl->ib_cq_lp =
ibv_create_cq(ctx, mca_btl_openib_component.ib_cq_size, NULL);
#else
openib_btl->ib_cq_low =
openib_btl->ib_cq_lp =
ibv_create_cq(ctx, mca_btl_openib_component.ib_cq_size,
NULL, NULL, 0);
#endif
if(NULL == openib_btl->ib_cq_low) {
if(NULL == openib_btl->ib_cq_lp) {
BTL_ERROR(("error creating low priority cq for %s errno says %s\n",
ibv_get_device_name(openib_btl->ib_dev),
strerror(errno)));
@ -764,15 +785,15 @@ int mca_btl_openib_module_init(mca_btl_openib_module_t *openib_btl)
}
#if OMPI_MCA_BTL_OPENIB_IBV_CREATE_CQ_ARGS == 3
openib_btl->ib_cq_high =
openib_btl->ib_cq_hp =
ibv_create_cq(ctx, mca_btl_openib_component.ib_cq_size, NULL);
#else
openib_btl->ib_cq_high =
openib_btl->ib_cq_hp =
ibv_create_cq(ctx, mca_btl_openib_component.ib_cq_size,
NULL, NULL, 0);
#endif
if(NULL == openib_btl->ib_cq_high) {
if(NULL == openib_btl->ib_cq_hp) {
BTL_ERROR(("error creating high priority cq for %s errno says %s\n",
ibv_get_device_name(openib_btl->ib_dev),
strerror(errno)));

Просмотреть файл

@ -83,20 +83,20 @@ struct mca_btl_openib_component_t {
char* ib_mpool_name;
/**< name of ib memory pool */
uint32_t ib_rr_buf_max;
/**< the maximum number of posted rr */
uint32_t ib_rr_buf_min;
/**< the minimum number of posted rr */
int32_t rd_num; /**< the number of receive descriptors to post to each queue pair */
int32_t rd_low; /**< low water mark to reach before posting additional receive descriptors */
int32_t rd_win; /**< ack credits when window size exceeded */
int32_t rd_rsv; /**< descriptors held in reserve for control messages */
int32_t srq_rd_per_peer; /* number of receive descriptors to post per log2(peers) in SRQ mode */
int32_t srq_sd_per_proc; /* maximum number of send descriptors posted */
size_t eager_limit;
size_t max_send_size;
uint32_t leave_pinned;
uint32_t reg_mru_len;
uint32_t use_srq;
uint32_t ib_cq_size; /**< Max outstanding CQE on the CQ */
uint32_t ib_wq_size; /**< Max outstanding WR on the WQ */
uint32_t ib_sg_list_size; /**< Max scatter/gather descriptor entries on the WQ*/
uint32_t ib_pkey_ix;
uint32_t ib_psn;
@ -110,10 +110,6 @@ struct mca_btl_openib_component_t {
uint32_t ib_service_level;
uint32_t ib_static_rate;
uint32_t ib_src_path_bits;
uint32_t rd_per_peer; /* number of receive descriptors to post per log2(peers)
in SRQ mode */
uint32_t max_wr_sq_tokens;
uint32_t max_total_wr_sq_tokens;
}; typedef struct mca_btl_openib_component_t mca_btl_openib_component_t;
@ -136,23 +132,20 @@ struct mca_btl_openib_module_t {
struct ibv_device *ib_dev; /* the ib device */
struct ibv_context *ib_dev_context;
struct ibv_pd *ib_pd;
struct ibv_cq *ib_cq_high;
struct ibv_cq *ib_cq_low;
struct ibv_cq *ib_cq_hp;
struct ibv_cq *ib_cq_lp;
struct ibv_port_attr* ib_port_attr;
struct ibv_recv_wr* rr_desc_post;
struct ibv_recv_wr* rd_desc_post;
ompi_free_list_t send_free_eager; /**< free list of eager buffer descriptors */
ompi_free_list_t send_free_max; /**< free list of max buffer descriptors */
ompi_free_list_t send_free_frag; /**< free list of frags only... used for pining memory */
ompi_free_list_t send_free_eager; /**< free list of eager buffer descriptors */
ompi_free_list_t send_free_max; /**< free list of max buffer descriptors */
ompi_free_list_t send_free_frag; /**< free list of frags only... used for pining memory */
ompi_free_list_t recv_free_eager; /**< High priority free list of buffer descriptors */
ompi_free_list_t recv_free_max; /**< Low priority free list of buffer descriptors */
ompi_free_list_t recv_free_eager; /**< High priority free list of buffer descriptors */
ompi_free_list_t recv_free_max; /**< Low priority free list of buffer descriptors */
opal_list_t reg_mru_list; /**< a most recently used list of mca_mpool_openib_registration_t
opal_list_t reg_mru_list; /**< a most recently used list of mca_mpool_openib_registration_t
entries, this allows us to keep a working set of memory pinned */
opal_list_t repost; /**< list of buffers to repost */
opal_mutex_t ib_lock; /**< module level lock */
@ -162,18 +155,18 @@ struct mca_btl_openib_module_t {
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
struct ibv_srq *srq_high;
struct ibv_srq *srq_low;
uint32_t srr_posted_high;
uint32_t srr_posted_low;
struct ibv_srq *srq_hp;
struct ibv_srq *srq_lp;
int32_t srd_posted_hp;
int32_t srd_posted_lp;
#endif
uint32_t num_peers;
uint32_t rd_buf_max;
uint32_t rd_buf_min;
int32_t num_peers;
int32_t rd_num;
int32_t rd_low;
int32_t wr_sq_tokens_hp;
int32_t sd_tokens_hp;
/**< number of high priority frags that can be outstanding (down counter) */
int32_t wr_sq_tokens_lp;
int32_t sd_tokens_lp;
/**< number of low priority frags that can be outstanding (down counter) */
opal_list_t pending_frags_hp;
@ -420,14 +413,14 @@ int mca_btl_openib_module_init(mca_btl_openib_module_t* openib_btl);
{ \
do{ \
OPAL_THREAD_LOCK(&openib_btl->ib_lock); \
if(openib_btl->srr_posted_high <= openib_btl->rd_buf_min+additional && \
openib_btl->srr_posted_high < openib_btl->rd_buf_max){ \
MCA_BTL_OPENIB_POST_SRR_SUB(openib_btl->rd_buf_max - \
openib_btl->srr_posted_high, \
if(openib_btl->srd_posted_hp <= openib_btl->rd_low+additional && \
openib_btl->srd_posted_hp < openib_btl->rd_num){ \
MCA_BTL_OPENIB_POST_SRR_SUB(openib_btl->rd_num - \
openib_btl->srd_posted_hp, \
openib_btl, \
&openib_btl->recv_free_eager, \
&openib_btl->srr_posted_high, \
openib_btl->srq_high); \
&openib_btl->srd_posted_hp, \
openib_btl->srq_hp); \
} \
OPAL_THREAD_UNLOCK(&openib_btl->ib_lock); \
} while(0); \
@ -437,14 +430,14 @@ int mca_btl_openib_module_init(mca_btl_openib_module_t* openib_btl);
{ \
do { \
OPAL_THREAD_LOCK(&openib_btl->ib_lock); \
if(openib_btl->srr_posted_low <= openib_btl->rd_buf_min+additional && \
openib_btl->srr_posted_low < openib_btl->rd_buf_max){ \
MCA_BTL_OPENIB_POST_SRR_SUB(openib_btl->rd_buf_max - \
openib_btl->srr_posted_low, \
if(openib_btl->srd_posted_lp <= openib_btl->rd_low+additional && \
openib_btl->srd_posted_lp < openib_btl->rd_num){ \
MCA_BTL_OPENIB_POST_SRR_SUB(openib_btl->rd_num - \
openib_btl->srd_posted_lp, \
openib_btl, \
&openib_btl->recv_free_max, \
&openib_btl->srr_posted_low, \
openib_btl->srq_low); \
&openib_btl->srd_posted_lp, \
openib_btl->srq_lp); \
} \
OPAL_THREAD_UNLOCK(&openib_btl->ib_lock); \
} while(0); \
@ -454,28 +447,29 @@ int mca_btl_openib_module_init(mca_btl_openib_module_t* openib_btl);
#define MCA_BTL_OPENIB_POST_SRR_SUB(cnt, \
openib_btl, \
frag_list, \
srr_posted, \
srd_posted, \
srq) \
{\
do { \
uint32_t i; \
int32_t i; \
int32_t num_post = cnt; \
opal_list_item_t* item = NULL; \
mca_btl_openib_frag_t* frag = NULL; \
struct ibv_recv_wr *bad_wr; \
int32_t rc; \
for(i = 0; i < cnt; i++) { \
for(i = 0; i < num_post; i++) { \
OMPI_FREE_LIST_WAIT(frag_list, item, rc); \
frag = (mca_btl_openib_frag_t*) item; \
frag->sg_entry.length = frag->size + \
((unsigned char*) frag->segment.seg_addr.pval- \
(unsigned char*) frag->hdr); \
if(ibv_post_srq_recv(srq, &frag->wr_desc.rr_desc, &bad_wr)) { \
if(ibv_post_srq_recv(srq, &frag->wr_desc.rd_desc, &bad_wr)) { \
BTL_ERROR(("error posting receive descriptors to shared receive queue: %s",\
strerror(errno))); \
return OMPI_ERROR; \
}\
}\
OPAL_THREAD_ADD32((int32_t*) srr_posted, cnt); \
OPAL_THREAD_ADD32(srd_posted, num_post); \
} while(0);\
}

Просмотреть файл

@ -119,9 +119,6 @@ static inline void mca_btl_openib_param_register_int(
int mca_btl_openib_component_open(void)
{
int param, value;
/* initialize state */
mca_btl_openib_component.ib_num_btls=0;
mca_btl_openib_component.openib_btls=NULL;
@ -133,23 +130,17 @@ int mca_btl_openib_component_open(void)
mca_btl_openib_param_register_int ("free_list_num", "intial size of free lists",
8, &mca_btl_openib_component.ib_free_list_num);
mca_btl_openib_param_register_int ("free_list_max", "maximum size of free lists",
1024, &mca_btl_openib_component.ib_free_list_max);
-1, &mca_btl_openib_component.ib_free_list_max);
mca_btl_openib_param_register_int ("free_list_inc", "increment size of free lists",
32, &mca_btl_openib_component.ib_free_list_inc);
mca_btl_openib_param_register_string("mpool", "name of the memory pool to be used",
"openib", &mca_btl_openib_component.ib_mpool_name);
mca_btl_openib_param_register_int("rd_max", "maximum number of receive descriptors to post to a QP",
16, (int*) &mca_btl_openib_component.ib_rr_buf_max);
mca_btl_openib_param_register_int("rd_min", "minimum number of receive descriptors before reposting occurs",
8, (int*) &mca_btl_openib_component.ib_rr_buf_min);
mca_btl_openib_param_register_int("reg_mru_len", "length of the registration cache most recently used list",
16, (int*) &mca_btl_openib_component.reg_mru_len);
mca_btl_openib_param_register_int("use_srq", "if 1 use the IB shared receive queue to post receive descriptors",
0, (int*) &mca_btl_openib_component.use_srq);
mca_btl_openib_param_register_int("ib_cq_size", "size of the IB completion queue",
500, (int*) &mca_btl_openib_component.ib_cq_size);
mca_btl_openib_param_register_int("ib_wq_size", "size of the IB work queue",
500, (int*) &mca_btl_openib_component.ib_wq_size);
1000, (int*) &mca_btl_openib_component.ib_cq_size);
mca_btl_openib_param_register_int("ib_sg_list_size", "size of IB segment list",
1, (int*) &mca_btl_openib_component.ib_sg_list_size);
mca_btl_openib_param_register_int("ib_pkey_ix", "IB pkey index",
@ -157,7 +148,7 @@ int mca_btl_openib_component_open(void)
mca_btl_openib_param_register_int("ib_psn", "IB Packet sequence starting number",
0, (int*) &mca_btl_openib_component.ib_psn);
mca_btl_openib_param_register_int("ib_qp_ous_rd_atom", "IB outstanding atomic reads",
1, (int*) &mca_btl_openib_component.ib_qp_ous_rd_atom);
4, (int*) &mca_btl_openib_component.ib_qp_ous_rd_atom);
mca_btl_openib_param_register_int("ib_mtu", "IB MTU",
IBV_MTU_1024, (int*) &mca_btl_openib_component.ib_mtu);
mca_btl_openib_param_register_int("ib_min_rnr_timer", "IB min rnr timer",
@ -176,10 +167,21 @@ int mca_btl_openib_component_open(void)
0, (int*) &mca_btl_openib_component.ib_static_rate);
mca_btl_openib_param_register_int("ib_src_path_bits", "IB source path bits",
0, (int*) &mca_btl_openib_component.ib_src_path_bits);
mca_btl_openib_param_register_int("rd_per_peer", "receive descriptors posted per peer, SRQ mode only",
16, (int*) &mca_btl_openib_component.rd_per_peer);
mca_btl_openib_param_register_int ("exclusivity", "BTL exclusivity",
MCA_BTL_EXCLUSIVITY_DEFAULT, (int*) &mca_btl_openib_module.super.btl_exclusivity);
mca_btl_openib_param_register_int("rd_num", "number of receive descriptors to post to a QP",
16, (int*) &mca_btl_openib_component.rd_num);
mca_btl_openib_param_register_int("rd_low", "low water mark before reposting occurs",
12, (int*) &mca_btl_openib_component.rd_low);
mca_btl_openib_param_register_int("rd_win", "window size at which generate explicity credit message",
8, (int*) &mca_btl_openib_component.rd_win);
mca_btl_openib_component.rd_rsv = ((mca_btl_openib_component.rd_num<<1)-1) / mca_btl_openib_component.rd_win;
mca_btl_openib_param_register_int("srq_rd_per_peer", "Number of receive descriptors posted per peer. (SRQ)",
16, (int*) &mca_btl_openib_component.srq_rd_per_peer);
mca_btl_openib_param_register_int("srq_sd_per_proc", "Maximum number of send descriptors posted. (SRQ)",
16, &mca_btl_openib_component.srq_sd_per_proc);
mca_btl_openib_param_register_int ("eager_limit", "eager send limit",
(64*1024),(int*) &mca_btl_openib_module.super.btl_eager_limit);
mca_btl_openib_module.super.btl_eager_limit -= sizeof(mca_btl_openib_header_t);
@ -198,16 +200,7 @@ int mca_btl_openib_component_open(void)
mca_btl_openib_param_register_int("bandwidth", "Approximate maximum bandwidth of interconnect",
800, (int*) &mca_btl_openib_module.super.btl_bandwidth);
mca_btl_openib_param_register_int("max_wr_sq_tokens", "Maximum number of send/rdma work request tokens",
16, (int*) &mca_btl_openib_component.max_wr_sq_tokens);
mca_btl_openib_param_register_int("max_total_wr_sq_tokens", "Maximum number of send/rdma work request tokens peer btl",
32, (int*) &mca_btl_openib_component.max_total_wr_sq_tokens);
param = mca_base_param_find("mpi", NULL, "leave_pinned");
mca_base_param_lookup_int(param, &value);
mca_btl_openib_component.leave_pinned = value;
mca_btl_openib_component.max_send_size = mca_btl_openib_module.super.btl_max_send_size;
mca_btl_openib_component.eager_limit = mca_btl_openib_module.super.btl_eager_limit;
@ -256,6 +249,27 @@ mca_btl_openib_modex_send(void)
return rc;
}
/*
* Callback function on control message.
*/
static void mca_btl_openib_control(
struct mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* descriptor,
void* cbdata)
{
/* dont return credits used for control messages */
mca_btl_openib_frag_t* frag = (mca_btl_openib_frag_t*)descriptor;
mca_btl_openib_endpoint_t* endpoint = frag->endpoint;
if(frag->size == mca_btl_openib_component.eager_limit) {
OPAL_THREAD_ADD32(&endpoint->rd_credits_hp, -1);
} else {
OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, -1);
}
}
/*
* IB component initialization:
@ -371,6 +385,9 @@ mca_btl_base_module_t** mca_btl_openib_component_init(int *num_btl_modules,
openib_btl->port_num = (uint8_t) j;
openib_btl->ib_port_attr = ib_port_attr;
openib_btl->port_info.subnet = ib_port_attr->sm_lid; /* store the sm_lid for multi-nic support */
openib_btl->ib_reg[MCA_BTL_TAG_BTL].cbfunc = mca_btl_openib_control;
openib_btl->ib_reg[MCA_BTL_TAG_BTL].cbdata = NULL;
opal_list_append(&btl_list, (opal_list_item_t*) ib_selected);
mca_btl_openib_component.ib_num_btls ++;
@ -409,11 +426,10 @@ mca_btl_base_module_t** mca_btl_openib_component_init(int *num_btl_modules,
free(openib_btl);
openib_btl = &mca_btl_openib_component.openib_btls[i];
openib_btl->rd_buf_max = mca_btl_openib_component.ib_rr_buf_max;
openib_btl->rd_buf_min = mca_btl_openib_component.ib_rr_buf_min;
openib_btl->rd_num = mca_btl_openib_component.rd_num + mca_btl_openib_component.rd_rsv;
openib_btl->rd_low = mca_btl_openib_component.rd_low;
openib_btl->num_peers = 0;
openib_btl->wr_sq_tokens_hp =
openib_btl->wr_sq_tokens_lp = mca_btl_openib_component.max_total_wr_sq_tokens;
openib_btl->sd_tokens_hp = openib_btl->sd_tokens_lp = mca_btl_openib_component.srq_sd_per_proc;
/* Initialize module state */
@ -428,12 +444,6 @@ mca_btl_base_module_t** mca_btl_openib_component_init(int *num_btl_modules,
OBJ_CONSTRUCT(&openib_btl->recv_free_eager, ompi_free_list_t);
OBJ_CONSTRUCT(&openib_btl->recv_free_max, ompi_free_list_t);
OBJ_CONSTRUCT(&openib_btl->repost, opal_list_t);
OBJ_CONSTRUCT(&openib_btl->reg_mru_list, opal_list_t);
if(mca_btl_openib_module_init(openib_btl) != OMPI_SUCCESS) {
free(ib_devs);
return NULL;
@ -516,9 +526,9 @@ mca_btl_base_module_t** mca_btl_openib_component_init(int *num_btl_modules,
openib_btl->super.btl_mpool);
/* Initialize the rr_desc_post array for posting of rr*/
openib_btl->rr_desc_post = (struct ibv_recv_wr *)
malloc((mca_btl_openib_component.ib_rr_buf_max * sizeof(struct ibv_recv_wr)));
/* Initialize the rd_desc_post array for posting of rr*/
openib_btl->rd_desc_post = (struct ibv_recv_wr *)
malloc((mca_btl_openib_component.rd_num * sizeof(struct ibv_recv_wr)));
btls[i] = &openib_btl->super;
}
@ -541,8 +551,10 @@ int mca_btl_openib_component_progress()
{
uint32_t i;
int count = 0,ne;
int32_t credits;
mca_btl_openib_frag_t* frag;
mca_btl_openib_endpoint_t* endpoint;
/* Poll for completions */
for(i = 0; i < mca_btl_openib_component.ib_num_btls; i++) {
@ -554,7 +566,7 @@ int mca_btl_openib_component_progress()
* we will check the high priority and process them until there are none left.
* note that low priority messages are only processed one per progress call.
*/
ne=ibv_poll_cq(openib_btl->ib_cq_high, 1, &wc );
ne=ibv_poll_cq(openib_btl->ib_cq_hp, 1, &wc );
if(ne < 0 ){
BTL_ERROR(("error polling CQ with %d errno says %s\n", ne, strerror(errno)));
return OMPI_ERROR;
@ -573,65 +585,16 @@ int mca_btl_openib_component_progress()
BTL_ERROR(("Got an RDMA with Immediate data Not supported!"));
return OMPI_ERROR;
case IBV_WC_RECV:
/* Process a RECV */
BTL_VERBOSE(("Got an recv on the completion queue"));
frag = (mca_btl_openib_frag_t*) (void*) (unsigned long) wc.wr_id;
endpoint = (mca_btl_openib_endpoint_t*) frag->endpoint;
frag->rc=OMPI_SUCCESS;
frag->segment.seg_len =
wc.byte_len-
((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
/* advance the segment address past the header and subtract from the length..*/
openib_btl->ib_reg[frag->hdr->tag].cbfunc(&openib_btl->super,
frag->hdr->tag,
&frag->base,
openib_btl->ib_reg[frag->hdr->tag].cbdata);
OMPI_FREE_LIST_RETURN(&(openib_btl->recv_free_eager), (opal_list_item_t*) frag);
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
if(mca_btl_openib_component.use_srq) {
OPAL_THREAD_ADD32((int32_t*) &openib_btl->srr_posted_high, -1);
MCA_BTL_OPENIB_POST_SRR_HIGH(openib_btl, 0);
} else {
#endif
OPAL_THREAD_ADD32((int32_t*) &endpoint->rr_posted_high, -1);
MCA_BTL_OPENIB_ENDPOINT_POST_RR_HIGH(((mca_btl_openib_frag_t*) (void*) (unsigned long) wc.wr_id)->endpoint, 0);
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
}
#endif
count++;
break;
case IBV_WC_RDMA_READ:
case IBV_WC_RDMA_WRITE:
case IBV_WC_SEND :
/* Process a completed send or rdma write*/
frag = (mca_btl_openib_frag_t*) (void*) (unsigned long) wc.wr_id;
frag->rc = OMPI_SUCCESS;
frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, frag->rc);
count++;
/* check and see if we need to progress pending sends */
if( !mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_hp, 1) > 0
&& !opal_list_is_empty(&(frag->endpoint->pending_frags_hp))) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&frag->endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_hp));
OPAL_THREAD_UNLOCK(&frag->endpoint->endpoint_lock);
frag = (mca_btl_openib_frag_t *) frag_item;
endpoint = frag->endpoint;
frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
if(OMPI_SUCCESS != mca_btl_openib_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
}
}
/* check and see if we need to progress pending sends */
if( mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_hp, 1) > 0
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_hp, 1) > 0
&& !opal_list_is_empty(&openib_btl->pending_frags_hp)) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&openib_btl->pending_frags_hp);
@ -640,17 +603,79 @@ int mca_btl_openib_component_progress()
BTL_ERROR(("error in posting pending send\n"));
}
}
count++;
break;
case IBV_WC_RECV:
/* Process a RECV */
frag = (mca_btl_openib_frag_t*) (void*) (unsigned long) wc.wr_id;
endpoint = (mca_btl_openib_endpoint_t*) frag->endpoint;
credits = frag->hdr->credits;
/* advance the segment address past the header and subtract from the length..*/
frag->segment.seg_len = wc.byte_len-
((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
/* call registered callback */
openib_btl->ib_reg[frag->hdr->tag].cbfunc(&openib_btl->super,
frag->hdr->tag,
&frag->base,
openib_btl->ib_reg[frag->hdr->tag].cbdata);
OMPI_FREE_LIST_RETURN(&(openib_btl->recv_free_eager), (opal_list_item_t*) frag);
/* repost receive descriptors */
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
if(mca_btl_openib_component.use_srq) {
OPAL_THREAD_ADD32((int32_t*) &openib_btl->srd_posted_hp, -1);
MCA_BTL_OPENIB_POST_SRR_HIGH(openib_btl, 0);
} else {
#endif
OPAL_THREAD_ADD32((int32_t*) &endpoint->rd_posted_hp, -1);
MCA_BTL_OPENIB_ENDPOINT_POST_RR_HIGH(endpoint, 0);
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
}
#endif
/* check to see if we need to progress any pending desciptors */
if( !mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp, credits) > 0
&& !opal_list_is_empty(&(endpoint->pending_frags_hp))) {
do {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(endpoint->pending_frags_hp));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL == (frag = (mca_btl_openib_frag_t *) frag_item))
break;
if(OMPI_SUCCESS != mca_btl_openib_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
break;
}
} while(endpoint->sd_tokens_hp > 0);
}
/* check to see if we need to return credits */
if( !mca_btl_openib_component.use_srq &&
endpoint->rd_credits_hp >= mca_btl_openib_component.rd_win) {
mca_btl_openib_endpoint_send_credits(
endpoint,
endpoint->lcl_qp_hp,
&endpoint->rd_credits_hp);
}
count++;
break;
case IBV_WC_RDMA_READ:
case IBV_WC_RDMA_WRITE:
default:
BTL_ERROR(("Unhandled work completion opcode is %d", wc.opcode));
break;
}
}
ne=ibv_poll_cq(openib_btl->ib_cq_low, 1, &wc );
ne=ibv_poll_cq(openib_btl->ib_cq_lp, 1, &wc );
if(ne < 0){
BTL_ERROR(("error polling CQ with %d errno says %s", ne, strerror(errno)));
return OMPI_ERROR;
@ -664,118 +689,115 @@ int mca_btl_openib_component_progress()
/* Handle n/w completions */
switch(wc.opcode) {
case IBV_WC_RECV_RDMA_WITH_IMM:
BTL_ERROR(("Got an RDMA with Immediate data Not supported!"));
return OMPI_ERROR;
case IBV_WC_RECV:
/* process a recv completion (this should only occur for a send not an rdma) */
BTL_VERBOSE(( "Got a recv completion"));
frag = (mca_btl_openib_frag_t*) (void*) (unsigned long) wc.wr_id;
endpoint = (mca_btl_openib_endpoint_t*) frag->endpoint;
frag->rc=OMPI_SUCCESS;
/* advance the segment address past the header and subtract from the length..*/
frag->segment.seg_len =
wc.byte_len-
((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
openib_btl->ib_reg[frag->hdr->tag].cbfunc(&openib_btl->super,
frag->hdr->tag,
&frag->base,
openib_btl->ib_reg[frag->hdr->tag].cbdata);
OMPI_FREE_LIST_RETURN(&(openib_btl->recv_free_max), (opal_list_item_t*) frag);
case IBV_WC_SEND:
/* Process a completed send - receiver must return tokens */
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
/* if we have tokens, process pending sends */
if(mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_lp, 1) > 0
&& !opal_list_is_empty(&openib_btl->pending_frags_lp)) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&openib_btl->pending_frags_lp);
frag = (mca_btl_openib_frag_t *) frag_item;
MCA_BTL_IB_FRAG_PROGRESS(frag);
}
count++;
break;
case IBV_WC_RDMA_READ:
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
OPAL_THREAD_ADD32(&frag->endpoint->get_tokens, 1);
/* fall through */
case IBV_WC_RDMA_WRITE:
/* Process a completed write - returns tokens immediately */
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
endpoint = frag->endpoint;
frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
if(mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_lp, 1) > 0
&& !opal_list_is_empty(&openib_btl->pending_frags_lp)) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&openib_btl->pending_frags_lp);
frag = (mca_btl_openib_frag_t *) frag_item;
MCA_BTL_IB_FRAG_PROGRESS(frag);
}
if(!mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp, 1) > 0 &&
!opal_list_is_empty(&(endpoint->pending_frags_lp))) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&frag->endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_lp));
OPAL_THREAD_UNLOCK(&frag->endpoint->endpoint_lock);
frag = (mca_btl_openib_frag_t *) frag_item;
MCA_BTL_IB_FRAG_PROGRESS(frag);
}
count++;
break;
case IBV_WC_RECV:
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
endpoint = (mca_btl_openib_endpoint_t*) frag->endpoint;
credits = frag->hdr->credits;
/* process received frag */
frag->rc=OMPI_SUCCESS;
frag->segment.seg_len = wc.byte_len-((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
/* call registered callback */
openib_btl->ib_reg[frag->hdr->tag].cbfunc(&openib_btl->super, frag->hdr->tag, &frag->base, openib_btl->ib_reg[frag->hdr->tag].cbdata);
OMPI_FREE_LIST_RETURN(&(openib_btl->recv_free_max), (opal_list_item_t*) frag);
/* post descriptors */
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
if(mca_btl_openib_component.use_srq) {
OPAL_THREAD_ADD32((int32_t*) &openib_btl->srr_posted_low, -1);
OPAL_THREAD_ADD32((int32_t*) &openib_btl->srd_posted_lp, -1);
MCA_BTL_OPENIB_POST_SRR_LOW(openib_btl, 0);
} else {
#endif
OPAL_THREAD_ADD32((int32_t*) &endpoint->rr_posted_low, -1);
OPAL_THREAD_ADD32((int32_t*) &endpoint->rd_posted_lp, -1);
MCA_BTL_OPENIB_ENDPOINT_POST_RR_LOW(((mca_btl_openib_frag_t*) (void*)
(unsigned long)wc.wr_id)->endpoint, 0);
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
}
#endif
/* check to see if we need to progress pending descriptors */
if(!mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp, credits) > 0 &&
!opal_list_is_empty(&(endpoint->pending_frags_lp))) {
do {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(endpoint->pending_frags_lp));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if(NULL == (frag = (mca_btl_openib_frag_t *) frag_item))
break;
MCA_BTL_IB_FRAG_PROGRESS(frag);
} while(endpoint->sd_tokens_lp > 0);
}
/* check to see if we need to return credits */
if( !mca_btl_openib_component.use_srq &&
endpoint->rd_credits_lp >= mca_btl_openib_component.rd_win) {
mca_btl_openib_endpoint_send_credits(
endpoint,
endpoint->lcl_qp_lp,
&endpoint->rd_credits_lp);
}
count++;
break;
case IBV_WC_RDMA_READ:
case IBV_WC_RDMA_WRITE:
case IBV_WC_SEND :
/* Process a completed send */
frag = (mca_btl_openib_frag_t*) (void*) (unsigned long) wc.wr_id;
frag->rc = OMPI_SUCCESS;
frag->base.des_cbfunc(&openib_btl->super, frag->endpoint, &frag->base, frag->rc);
count++;
if(!mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_lp, 1) > 0 &&
!opal_list_is_empty(&(frag->endpoint->pending_frags_lp))) {
opal_list_item_t *frag_item;
OPAL_THREAD_LOCK(&frag->endpoint->endpoint_lock);
frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_lp));
OPAL_THREAD_UNLOCK(&frag->endpoint->endpoint_lock);
frag = (mca_btl_openib_frag_t *) frag_item;
switch(frag->wr_desc.sr_desc.opcode){
case IBV_WR_SEND:
if(OMPI_SUCCESS != mca_btl_openib_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
}
break;
case IBV_WR_RDMA_WRITE:
if(OMPI_SUCCESS != mca_btl_openib_put((mca_btl_base_module_t*) openib_btl,
frag->endpoint,
(mca_btl_base_descriptor_t*) frag)) {
BTL_ERROR(("error in posting pending rdma write\n"));
}
break;
case IBV_WR_RDMA_READ:
if(OMPI_SUCCESS != mca_btl_openib_put((mca_btl_base_module_t *) openib_btl,
frag->endpoint,
(mca_btl_base_descriptor_t*) frag)) {
BTL_ERROR(("error in posting pending rdma read\n"));
}
break;
default:
BTL_ERROR(("error in posting pending operation, invalide opcode %d\n", frag->wr_desc.sr_desc.opcode));
}
}
if(mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_lp, 1) > 0
&& !opal_list_is_empty(&openib_btl->pending_frags_lp)) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&openib_btl->pending_frags_lp);
frag = (mca_btl_openib_frag_t *) frag_item;
switch(frag->wr_desc.sr_desc.opcode){
case IBV_WR_SEND:
if(OMPI_SUCCESS != mca_btl_openib_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
}
break;
case IBV_WR_RDMA_WRITE:
if(OMPI_SUCCESS != mca_btl_openib_put((mca_btl_base_module_t*) openib_btl,
frag->endpoint,
(mca_btl_base_descriptor_t*) frag)) {
BTL_ERROR(("error in posting pending rdma write\n"));
}
break;
case IBV_WR_RDMA_READ:
if(OMPI_SUCCESS != mca_btl_openib_put((mca_btl_base_module_t *) openib_btl,
frag->endpoint,
(mca_btl_base_descriptor_t*) frag)) {
BTL_ERROR(("error in posting pending rdma read\n"));
}
break;
default:
BTL_ERROR(("error in posting pending operation, invalide opcode %d\n", frag->wr_desc.sr_desc.opcode));
}
}
break;
default:
BTL_ERROR(("Unhandled work completion opcode is %d", wc.opcode));
break;

Просмотреть файл

@ -79,36 +79,40 @@ static inline int mca_btl_openib_endpoint_post_send(mca_btl_openib_module_t* ope
/* atomically test and acquire a token */
if(!mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_hp,-1) < 0) {
OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp,-1) < 0) {
BTL_VERBOSE(("Queing because no send tokens \n"));
opal_list_append(&endpoint->pending_frags_hp, (opal_list_item_t *)frag);
OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_hp,1);
OPAL_THREAD_ADD32(&endpoint->sd_tokens_hp,1);
return OMPI_SUCCESS;
} else if( mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_hp,-1) < 0) {
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_hp,1);
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_hp,-1) < 0) {
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_hp,1);
opal_list_append(&openib_btl->pending_frags_hp, (opal_list_item_t *)frag);
return OMPI_SUCCESS;
} else {
ib_qp = endpoint->lcl_qp_high;
frag->hdr->credits = endpoint->rd_credits_hp;
OPAL_THREAD_ADD32(&endpoint->rd_credits_hp, -frag->hdr->credits);
ib_qp = endpoint->lcl_qp_hp;
}
} else {
/* atomically test and acquire a token */
if(!mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0 ) {
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,-1) < 0 ) {
BTL_VERBOSE(("Queing because no send tokens \n"));
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,1);
OPAL_THREAD_ADD32(&endpoint->sd_tokens_lp,1);
return OMPI_SUCCESS;
} else if(mca_btl_openib_component.use_srq &&
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_lp,-1) < 0) {
OPAL_THREAD_ADD32(&openib_btl->wr_sq_tokens_lp,1);
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_lp,-1) < 0) {
OPAL_THREAD_ADD32(&openib_btl->sd_tokens_lp,1);
opal_list_append(&openib_btl->pending_frags_lp, (opal_list_item_t *)frag);
return OMPI_SUCCESS;
} else {
ib_qp = endpoint->lcl_qp_low;
frag->hdr->credits = endpoint->rd_credits_lp;
OPAL_THREAD_ADD32(&endpoint->rd_credits_lp, -frag->hdr->credits);
ib_qp = endpoint->lcl_qp_lp;
}
}
@ -170,21 +174,30 @@ static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
OBJ_CONSTRUCT(&endpoint->pending_frags_hp, opal_list_t);
OBJ_CONSTRUCT(&endpoint->pending_frags_lp, opal_list_t);
endpoint->lcl_qp_attr_high = (struct ibv_qp_attr *) malloc(sizeof(struct ibv_qp_attr));
endpoint->lcl_qp_attr_low = (struct ibv_qp_attr *) malloc(sizeof(struct ibv_qp_attr));
memset(endpoint->lcl_qp_attr_high, 0, sizeof(struct ibv_qp_attr));
memset(endpoint->lcl_qp_attr_low, 0, sizeof(struct ibv_qp_attr));
endpoint->rr_posted_high = 0;
endpoint->rr_posted_low = 0;
endpoint->wr_sq_tokens_hp = mca_btl_openib_component.max_wr_sq_tokens;
endpoint->wr_sq_tokens_lp = mca_btl_openib_component.max_wr_sq_tokens;
endpoint->rem_info.rem_qp_num_high = 0;
endpoint->rem_info.rem_qp_num_low = 0;
endpoint->lcl_qp_attr_hp = (struct ibv_qp_attr *) malloc(sizeof(struct ibv_qp_attr));
endpoint->lcl_qp_attr_lp = (struct ibv_qp_attr *) malloc(sizeof(struct ibv_qp_attr));
memset(endpoint->lcl_qp_attr_hp, 0, sizeof(struct ibv_qp_attr));
memset(endpoint->lcl_qp_attr_lp, 0, sizeof(struct ibv_qp_attr));
endpoint->rd_posted_hp = 0;
endpoint->rd_posted_lp = 0;
/* zero these out w/ initial posting, so that we start out w/
* zero credits to return to peer
*/
endpoint->rd_credits_hp = -(mca_btl_openib_component.rd_num + mca_btl_openib_component.rd_rsv);
endpoint->rd_credits_lp = -(mca_btl_openib_component.rd_num + mca_btl_openib_component.rd_rsv);
/* initialize the high and low priority tokens */
endpoint->sd_tokens_hp = mca_btl_openib_component.rd_num;
endpoint->sd_tokens_lp = mca_btl_openib_component.rd_num;
endpoint->get_tokens = mca_btl_openib_component.ib_qp_ous_rd_atom;
endpoint->rem_info.rem_qp_num_hp = 0;
endpoint->rem_info.rem_qp_num_lp = 0;
endpoint->rem_info.rem_lid = 0;
endpoint->rem_info.rem_psn_high = 0;
endpoint->rem_info.rem_psn_low = 0;
endpoint->rem_info.rem_psn_hp = 0;
endpoint->rem_info.rem_psn_lp = 0;
endpoint->rem_info.rem_subnet = 0;
}
@ -224,25 +237,25 @@ static int mca_btl_openib_endpoint_send_connect_data(mca_btl_base_endpoint_t* en
/* pack the info in the send buffer */
rc = orte_dps.pack(buffer, &endpoint->lcl_qp_high->qp_num, 1, ORTE_UINT32);
rc = orte_dps.pack(buffer, &endpoint->lcl_qp_hp->qp_num, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, &endpoint->lcl_qp_low->qp_num, 1, ORTE_UINT32);
rc = orte_dps.pack(buffer, &endpoint->lcl_qp_lp->qp_num, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, &endpoint->lcl_psn_high, 1, ORTE_UINT32);
rc = orte_dps.pack(buffer, &endpoint->lcl_psn_hp, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, &endpoint->lcl_psn_low, 1, ORTE_UINT32);
rc = orte_dps.pack(buffer, &endpoint->lcl_psn_lp, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
@ -269,8 +282,8 @@ static int mca_btl_openib_endpoint_send_connect_data(mca_btl_base_endpoint_t* en
BTL_VERBOSE(("Sending High Priority QP num = %d, Low Priority QP num = %d, LID = %d",
endpoint->lcl_qp_high->qp_num,
endpoint->lcl_qp_low->qp_num,
endpoint->lcl_qp_hp->qp_num,
endpoint->lcl_qp_lp->qp_num,
endpoint->endpoint_btl->ib_port_attr->lid));
if(rc < 0) {
@ -291,8 +304,8 @@ static int mca_btl_openib_endpoint_set_remote_info(mca_btl_base_endpoint_t* endp
memcpy(&((mca_btl_openib_endpoint_t*) endpoint)->rem_info, rem_info, sizeof(mca_btl_openib_rem_info_t));
BTL_VERBOSE(("Setting High Priority QP num = %d, Low Priority QP num %d, LID = %d",
endpoint->rem_info.rem_qp_num_high,
endpoint->rem_info.rem_qp_num_low,
endpoint->rem_info.rem_qp_num_hp,
endpoint->rem_info.rem_qp_num_lp,
endpoint->rem_info.rem_lid));
return ORTE_SUCCESS;
@ -319,35 +332,35 @@ static int mca_btl_openib_endpoint_start_connect(mca_btl_base_endpoint_t* endpoi
/* Create the High Priority Queue Pair */
if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl,
openib_btl->ib_pd,
openib_btl->ib_cq_high,
openib_btl->ib_cq_hp,
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
openib_btl->srq_high,
openib_btl->srq_hp,
#endif
endpoint->lcl_qp_attr_high,
&endpoint->lcl_qp_high))) {
endpoint->lcl_qp_attr_hp,
&endpoint->lcl_qp_hp))) {
BTL_ERROR(("error creating queue pair, error code %d", rc));
return rc;
}
srand48(getpid() * time(NULL));
endpoint->lcl_psn_high = lrand48() & 0xffffff;
endpoint->lcl_psn_hp = lrand48() & 0xffffff;
/* Create the Low Priority Queue Pair */
if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl,
openib_btl->ib_pd,
openib_btl->ib_cq_low,
openib_btl->ib_cq_lp,
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
openib_btl->srq_low,
openib_btl->srq_lp,
#endif
endpoint->lcl_qp_attr_low,
&endpoint->lcl_qp_low))) {
endpoint->lcl_qp_attr_lp,
&endpoint->lcl_qp_lp))) {
BTL_ERROR(("error creating queue pair, error code %d", rc));
return rc;
}
endpoint->lcl_psn_low = lrand48() & 0xffffff;
endpoint->lcl_psn_lp = lrand48() & 0xffffff;
BTL_VERBOSE(("Initialized High Priority QP num = %d, Low Priority QP num = %d, LID = %d",
endpoint->lcl_qp_high->qp_num,
endpoint->lcl_qp_low->qp_num,
endpoint->lcl_qp_hp->qp_num,
endpoint->lcl_qp_lp->qp_num,
openib_btl->ib_port_attr->lid));
/* Send connection info over to remote endpoint */
@ -373,37 +386,37 @@ static int mca_btl_openib_endpoint_reply_start_connect(mca_btl_openib_endpoint_t
/* Create the High Priority Queue Pair */
if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl,
openib_btl->ib_pd,
openib_btl->ib_cq_high,
openib_btl->ib_cq_hp,
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
openib_btl->srq_high,
openib_btl->srq_hp,
#endif
endpoint->lcl_qp_attr_high,
&endpoint->lcl_qp_high))) {
endpoint->lcl_qp_attr_hp,
&endpoint->lcl_qp_hp))) {
BTL_ERROR(("error creating queue pair, error code %d", rc));
return rc;
}
srand48(getpid() * time(NULL));
endpoint->lcl_psn_high = lrand48() & 0xffffff;
endpoint->lcl_psn_hp = lrand48() & 0xffffff;
/* Create the Low Priority Queue Pair */
if(OMPI_SUCCESS != (rc = mca_btl_openib_endpoint_create_qp(openib_btl,
openib_btl->ib_pd,
openib_btl->ib_cq_low,
openib_btl->ib_cq_lp,
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
openib_btl->srq_low,
openib_btl->srq_lp,
#endif
endpoint->lcl_qp_attr_low,
&endpoint->lcl_qp_low))) {
endpoint->lcl_qp_attr_lp,
&endpoint->lcl_qp_lp))) {
BTL_ERROR(("error creating queue pair, error code %d", rc));
return rc;
}
endpoint->lcl_psn_low = lrand48() & 0xffffff;
endpoint->lcl_psn_lp = lrand48() & 0xffffff;
BTL_VERBOSE(("Initialized High Priority QP num = %d, Low Priority QP num = %d, LID = %d",
endpoint->lcl_qp_high->qp_num,
endpoint->lcl_qp_low->qp_num,
endpoint->lcl_qp_hp->qp_num,
endpoint->lcl_qp_lp->qp_num,
openib_btl->ib_port_attr->lid));
@ -491,21 +504,21 @@ static void mca_btl_openib_endpoint_recv(
/* start by unpacking data first so we know who is knocking at
our door */
rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_high, &cnt, ORTE_UINT32);
rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_hp, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return;
}
rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_low, &cnt, ORTE_UINT32);
rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_lp, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return;
}
rc = orte_dps.unpack(buffer, &rem_info.rem_psn_high, &cnt, ORTE_UINT32);
rc = orte_dps.unpack(buffer, &rem_info.rem_psn_hp, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return;
}rc = orte_dps.unpack(buffer, &rem_info.rem_psn_low, &cnt, ORTE_UINT32);
}rc = orte_dps.unpack(buffer, &rem_info.rem_psn_lp, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return;
@ -548,8 +561,8 @@ static void mca_btl_openib_endpoint_recv(
BTL_VERBOSE(("Received High Priority QP num = %d, Low Priority QP num %d, LID = %d",
rem_info.rem_qp_num_high,
rem_info.rem_qp_num_low,
rem_info.rem_qp_num_hp,
rem_info.rem_qp_num_lp,
rem_info.rem_lid));
@ -764,11 +777,11 @@ int mca_btl_openib_endpoint_connect(
/* Connection establishment RC */
rc = mca_btl_openib_endpoint_qp_init_query(
openib_btl,
endpoint->lcl_qp_high,
endpoint->lcl_qp_attr_high,
endpoint->lcl_psn_high,
endpoint->rem_info.rem_qp_num_high,
endpoint->rem_info.rem_psn_high,
endpoint->lcl_qp_hp,
endpoint->lcl_qp_attr_hp,
endpoint->lcl_psn_hp,
endpoint->rem_info.rem_qp_num_hp,
endpoint->rem_info.rem_psn_hp,
endpoint->rem_info.rem_lid,
openib_btl->port_num
);
@ -780,11 +793,11 @@ int mca_btl_openib_endpoint_connect(
}
rc = mca_btl_openib_endpoint_qp_init_query(
openib_btl,
endpoint->lcl_qp_low,
endpoint->lcl_qp_attr_low,
endpoint->lcl_psn_low,
endpoint->rem_info.rem_qp_num_low,
endpoint->rem_info.rem_psn_low,
endpoint->lcl_qp_lp,
endpoint->lcl_qp_attr_lp,
endpoint->lcl_psn_lp,
endpoint->rem_info.rem_qp_num_lp,
endpoint->rem_info.rem_psn_lp,
endpoint->rem_info.rem_lid,
openib_btl->port_num
);
@ -835,9 +848,9 @@ int mca_btl_openib_endpoint_create_qp(
qp_init_attr.send_cq = cq;
qp_init_attr.recv_cq = cq;
qp_init_attr.cap.max_send_wr = mca_btl_openib_component.max_wr_sq_tokens;
qp_init_attr.cap.max_recv_wr = mca_btl_openib_component.ib_rr_buf_max;
qp_init_attr.cap.max_send_sge = mca_btl_openib_component.ib_sg_list_size;
qp_init_attr.cap.max_send_wr = mca_btl_openib_component.rd_num + mca_btl_openib_component.rd_rsv;
qp_init_attr.cap.max_recv_wr = mca_btl_openib_component.rd_num + mca_btl_openib_component.rd_rsv;
qp_init_attr.cap.max_send_sge = mca_btl_openib_component.ib_sg_list_size;
qp_init_attr.cap.max_recv_sge = mca_btl_openib_component.ib_sg_list_size;
qp_init_attr.qp_type = IBV_QPT_RC;
#ifdef OMPI_MCA_BTL_OPENIB_HAVE_SRQ
@ -935,3 +948,63 @@ int mca_btl_openib_endpoint_qp_init_query(
return OMPI_SUCCESS;
}
/**
* Return control fragment.
*/
static void mca_btl_openib_endpoint_control_cb(
mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
MCA_BTL_IB_FRAG_RETURN_EAGER((mca_btl_openib_module_t*)btl, (mca_btl_openib_frag_t*)descriptor);
}
/**
* Return credits to peer
*/
void mca_btl_openib_endpoint_send_credits(
mca_btl_openib_endpoint_t* endpoint,
struct ibv_qp* ib_qp,
int32_t* credits)
{
mca_btl_openib_module_t* openib_btl = endpoint->endpoint_btl;
mca_btl_openib_frag_t* frag;
struct ibv_send_wr* bad_wr;
int rc;
MCA_BTL_IB_FRAG_ALLOC_EAGER(openib_btl, frag, rc);
if(NULL == frag) {
BTL_ERROR(("error allocating fragment"));
return;
}
frag->base.des_cbfunc = mca_btl_openib_endpoint_control_cb;
frag->base.des_cbdata = NULL;
frag->hdr->tag = MCA_BTL_TAG_BTL;
frag->hdr->credits = *credits;
OPAL_THREAD_ADD32(credits, -frag->hdr->credits);
frag->wr_desc.sr_desc.opcode = IBV_WR_SEND;
frag->sg_entry.length =
frag->segment.seg_len +
((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
if(frag->sg_entry.length <= openib_btl->ib_inline_max) {
frag->wr_desc.sr_desc.send_flags |= IBV_SEND_INLINE;
} else {
frag->wr_desc.sr_desc.send_flags = IBV_SEND_SIGNALED;
}
if(ibv_post_send(ib_qp,
&frag->wr_desc.sr_desc,
&bad_wr)) {
BTL_ERROR(("error posting send request errno says %s", strerror(errno)));
return;
}
}

Просмотреть файл

@ -74,16 +74,16 @@ typedef enum {
struct mca_btl_openib_rem_info_t {
uint32_t rem_qp_num_high;
uint32_t rem_qp_num_low;
uint32_t rem_qp_num_hp;
uint32_t rem_qp_num_lp;
/* Remote QP number (Low and High priority) */
uint16_t rem_lid;
/* Local identifier of the remote process */
uint32_t rem_psn_high;
uint32_t rem_psn_low;
uint32_t rem_psn_hp;
uint32_t rem_psn_lp;
/* Remote processes port sequence number (Low and High) */
uint16_t rem_subnet;
@ -133,29 +133,29 @@ struct mca_btl_base_endpoint_t {
/**< list of pending low priority frags */
int32_t wr_sq_tokens_hp;
/**< number of high priority frags that can be outstanding (down counter) */
int32_t wr_sq_tokens_lp;
/**< number of low priority frags that can be outstanding (down counter) */
mca_btl_openib_rem_info_t rem_info;
uint32_t lcl_psn_high;
uint32_t lcl_psn_low;
uint32_t lcl_psn_hp;
uint32_t lcl_psn_lp;
/* Local processes port sequence number (Low and High) */
struct ibv_qp* lcl_qp_high;
struct ibv_qp* lcl_qp_low;
struct ibv_qp* lcl_qp_hp;
struct ibv_qp* lcl_qp_lp;
/* Local QP (Low and High) */
struct ibv_qp_attr* lcl_qp_attr_high;
struct ibv_qp_attr* lcl_qp_attr_low;
struct ibv_qp_attr* lcl_qp_attr_hp;
struct ibv_qp_attr* lcl_qp_attr_lp;
/* Local QP attributes (Low and High) */
int32_t sd_tokens_hp; /**< number of high priority send tokens */
int32_t sd_tokens_lp; /**< number of low priority send tokens */
int32_t get_tokens; /**< number of available get tokens */
uint32_t rr_posted_high; /**< number of high priority rr posted to the nic*/
uint32_t rr_posted_low; /**< number of low priority rr posted to the nic*/
int32_t rd_posted_hp; /**< number of high priority descriptors posted to the nic*/
int32_t rd_posted_lp; /**< number of low priority descriptors posted to the nic*/
int32_t rd_credits_hp; /**< number of high priority credits to return to peer */
int32_t rd_credits_lp; /**< number of low priority credits to return to peer */
uint16_t subnet; /**< subnet of this endpoint*/
};
@ -166,7 +166,10 @@ typedef mca_btl_base_endpoint_t mca_btl_openib_endpoint_t;
int mca_btl_openib_endpoint_send(mca_btl_base_endpoint_t* endpoint, struct mca_btl_openib_frag_t* frag);
int mca_btl_openib_endpoint_connect(mca_btl_base_endpoint_t*);
void mca_btl_openib_post_recv(void);
void mca_btl_openib_endpoint_send_credits(
mca_btl_base_endpoint_t*,
struct ibv_qp* qp,
int32_t* credits);
#define MCA_BTL_OPENIB_ENDPOINT_POST_RR_HIGH(endpoint, \
@ -175,14 +178,15 @@ void mca_btl_openib_post_recv(void);
do { \
mca_btl_openib_module_t * openib_btl = endpoint->endpoint_btl; \
OPAL_THREAD_LOCK(&openib_btl->ib_lock); \
if(endpoint->rr_posted_high <= mca_btl_openib_component.ib_rr_buf_min+additional && \
endpoint->rr_posted_high < mca_btl_openib_component.ib_rr_buf_max){ \
MCA_BTL_OPENIB_ENDPOINT_POST_RR_SUB(mca_btl_openib_component.ib_rr_buf_max - \
endpoint->rr_posted_high, \
if(endpoint->rd_posted_hp <= mca_btl_openib_component.rd_low+additional && \
endpoint->rd_posted_hp < openib_btl->rd_num){ \
MCA_BTL_OPENIB_ENDPOINT_POST_RR_SUB(openib_btl->rd_num - \
endpoint->rd_posted_hp, \
endpoint, \
&openib_btl->recv_free_eager, \
&endpoint->rr_posted_high, \
endpoint->lcl_qp_high); \
endpoint->rd_posted_hp, \
endpoint->rd_credits_hp, \
endpoint->lcl_qp_hp); \
} \
OPAL_THREAD_UNLOCK(&openib_btl->ib_lock); \
} while(0); \
@ -193,14 +197,15 @@ void mca_btl_openib_post_recv(void);
do { \
mca_btl_openib_module_t * openib_btl = endpoint->endpoint_btl; \
OPAL_THREAD_LOCK(&openib_btl->ib_lock); \
if(endpoint->rr_posted_low <= mca_btl_openib_component.ib_rr_buf_min+additional && \
endpoint->rr_posted_low < mca_btl_openib_component.ib_rr_buf_max){ \
MCA_BTL_OPENIB_ENDPOINT_POST_RR_SUB(mca_btl_openib_component.ib_rr_buf_max - \
endpoint->rr_posted_low, \
if(endpoint->rd_posted_lp <= mca_btl_openib_component.rd_low+additional && \
endpoint->rd_posted_lp < openib_btl->rd_num){ \
MCA_BTL_OPENIB_ENDPOINT_POST_RR_SUB(openib_btl->rd_num - \
endpoint->rd_posted_lp, \
endpoint, \
&openib_btl->recv_free_max, \
&endpoint->rr_posted_low, \
endpoint->lcl_qp_low \
endpoint->rd_posted_lp, \
endpoint->rd_credits_lp, \
endpoint->lcl_qp_lp \
); } \
OPAL_THREAD_UNLOCK(&openib_btl->ib_lock); \
} while(0); \
@ -209,16 +214,17 @@ void mca_btl_openib_post_recv(void);
#define MCA_BTL_OPENIB_ENDPOINT_POST_RR_SUB(cnt, \
my_endpoint, \
frag_list, \
rr_posted, \
rd_posted, \
rd_credits, \
qp ) \
{\
do { \
uint32_t i; \
do { \
int32_t i; \
int rc; \
opal_list_item_t* item; \
mca_btl_openib_frag_t* frag; \
int32_t num_post = cnt; \
struct ibv_recv_wr* bad_wr; \
for(i = 0; i < cnt; i++) { \
for(i = 0; i < num_post; i++) { \
opal_list_item_t* item; \
mca_btl_openib_frag_t* frag; \
OMPI_FREE_LIST_WAIT(frag_list, item, rc); \
frag = (mca_btl_openib_frag_t*) item; \
frag->endpoint = my_endpoint; \
@ -226,25 +232,23 @@ void mca_btl_openib_post_recv(void);
((unsigned char*) frag->segment.seg_addr.pval- \
(unsigned char*) frag->hdr); \
if(ibv_post_recv(qp, \
&frag->wr_desc.rr_desc, \
&frag->wr_desc.rd_desc, \
&bad_wr)) { \
BTL_ERROR(("error posting receive errno says %s\n", strerror(errno))); \
return OMPI_ERROR; \
}\
}\
OPAL_THREAD_ADD32((int32_t*) rr_posted, cnt); \
} while(0); \
}
OPAL_THREAD_ADD32(&(rd_posted), num_post); \
OPAL_THREAD_ADD32(&(rd_credits), num_post); \
} while(0);
#define BTL_OPENIB_INSERT_PENDING(frag, frag_list, tokens, lock) \
{ \
do{ \
do{ \
OPAL_THREAD_LOCK(&lock); \
opal_list_append(&frag_list, (opal_list_item_t *)frag); \
OPAL_THREAD_UNLOCK(&lock); \
OPAL_THREAD_ADD32(&tokens, 1); \
} while(0); \
}
} while(0);
#if defined(c_plusplus) || defined(__cplusplus)

Просмотреть файл

@ -26,23 +26,9 @@ static void mca_btl_openib_frag_common_constructor( mca_btl_openib_frag_t* frag)
mca_mpool_openib_registration_t* registration =
(mca_mpool_openib_registration_t*) frag->base.super.user_data;
frag->hdr = (mca_btl_openib_header_t*) (frag+1); /* initialize the btl header to point to start at end of frag */
#if 0
mod = (unsigned long) frag->hdr % MCA_BTL_IB_FRAG_ALIGN;
if(mod != 0) {
frag->hdr = (mca_btl_openib_header_t*) ((unsigned char*) frag->hdr + (MCA_BTL_IB_FRAG_ALIGN - mod));
}
#endif
frag->segment.seg_addr.pval = ((unsigned char* )frag->hdr) + sizeof(mca_btl_openib_header_t); /* init the segment address to start after the btl header */
#if 0
mod = (frag->segment.seg_addr.lval) % MCA_BTL_IB_FRAG_ALIGN;
if(mod != 0) {
frag->segment.seg_addr.lval += (MCA_BTL_IB_FRAG_ALIGN - mod);
}
#endif
frag->hdr = (mca_btl_openib_header_t*) (frag+1); /* initialize the btl header to start at end of frag */
frag->segment.seg_addr.pval = ((unsigned char* )frag->hdr) + sizeof(mca_btl_openib_header_t);
/* init the segment address to start after the btl header */
frag->mr = registration->mr;
frag->segment.seg_len = frag->size;
@ -80,10 +66,10 @@ static void mca_btl_openib_recv_frag_common_constructor(mca_btl_openib_frag_t* f
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->wr_desc.rr_desc.wr_id = (unsigned long) frag;
frag->wr_desc.rr_desc.sg_list = &frag->sg_entry;
frag->wr_desc.rr_desc.num_sge = 1;
frag->wr_desc.rr_desc.next = NULL;
frag->wr_desc.rd_desc.wr_id = (unsigned long) frag;
frag->wr_desc.rd_desc.sg_list = &frag->sg_entry;
frag->wr_desc.rd_desc.num_sge = 1;
frag->wr_desc.rd_desc.next = NULL;
}
static void mca_btl_openib_send_frag_eager_constructor(mca_btl_openib_frag_t* frag)

Просмотреть файл

@ -32,7 +32,11 @@ extern "C" {
#endif
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_openib_frag_t);
typedef mca_btl_base_header_t mca_btl_openib_header_t;
struct mca_btl_openib_header_t {
mca_btl_base_tag_t tag;
int16_t credits;
};
typedef struct mca_btl_openib_header_t mca_btl_openib_header_t;
typedef enum {
@ -54,7 +58,7 @@ struct mca_btl_openib_frag_t {
size_t size;
int rc;
union{
struct ibv_recv_wr rr_desc;
struct ibv_recv_wr rd_desc;
struct ibv_send_wr sr_desc;
} wr_desc;
struct ibv_sge sg_entry;
@ -136,6 +140,36 @@ OBJ_CLASS_DECLARATION(mca_btl_openib_recv_frag_max_t);
}
#define MCA_BTL_IB_FRAG_PROGRESS(frag) \
do { \
switch(frag->wr_desc.sr_desc.opcode) { \
case IBV_WR_SEND: \
if(OMPI_SUCCESS != mca_btl_openib_endpoint_send(frag->endpoint, frag)) { \
BTL_ERROR(("error in posting pending send\n")); \
} \
break; \
case IBV_WR_RDMA_WRITE: \
if(OMPI_SUCCESS != mca_btl_openib_put((mca_btl_base_module_t*) openib_btl, \
frag->endpoint, \
(mca_btl_base_descriptor_t*) frag)) { \
BTL_ERROR(("error in posting pending rdma write\n")); \
} \
break; \
case IBV_WR_RDMA_READ: \
if(OMPI_SUCCESS != mca_btl_openib_get((mca_btl_base_module_t *) openib_btl, \
frag->endpoint, \
(mca_btl_base_descriptor_t*) frag)) { \
BTL_ERROR(("error in posting pending rdma read\n")); \
} \
break; \
default: \
BTL_ERROR(("error in posting pending operation, invalide opcode %d\n", frag->wr_desc.sr_desc.opcode)); \
break; \
} \
} while (0)