diff --git a/ompi/mca/btl/mvapi/btl_mvapi.c b/ompi/mca/btl/mvapi/btl_mvapi.c index 3b7ab884b1..0a70767fa8 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi.c +++ b/ompi/mca/btl/mvapi/btl_mvapi.c @@ -102,6 +102,7 @@ int mca_btl_mvapi_add_procs( } ib_peer->endpoint_btl = mvapi_btl; + ib_peer->subnet = mvapi_btl->port_info.subnet; rc = mca_btl_mvapi_proc_insert(ib_proc, ib_peer); if(rc != OMPI_SUCCESS) { OBJ_RELEASE(ib_peer); @@ -772,32 +773,45 @@ int mca_btl_mvapi_put( mca_btl_base_module_t* btl, mca_btl_base_endpoint_t* endpoint, mca_btl_base_descriptor_t* descriptor) { + int rc; mca_btl_mvapi_module_t* mvapi_btl = (mca_btl_mvapi_module_t*) btl; mca_btl_mvapi_frag_t* frag = (mca_btl_mvapi_frag_t*) descriptor; - frag->endpoint = endpoint; frag->sr_desc.opcode = VAPI_RDMA_WRITE; - - frag->sr_desc.remote_qp = endpoint->rem_qp_num_low; - frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval; - frag->sr_desc.r_key = frag->base.des_dst->seg_key.key32[0]; - frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval; - frag->sg_entry.len = frag->base.des_src->seg_len; - - frag->ret = VAPI_post_sr(mvapi_btl->nic, - endpoint->lcl_qp_hndl_low, - &frag->sr_desc); - if(VAPI_OK != frag->ret){ - return OMPI_ERROR; - } - if(mca_btl_mvapi_component.use_srq) { - MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 1); - MCA_BTL_MVAPI_POST_SRR_LOW(mvapi_btl, 1); + OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock); + /* atomically test and acquire a token */ + if(OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) { + BTL_VERBOSE(("Queing because no rdma write tokens \n")); + BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_lp, endpoint->wr_sq_tokens_lp, rc); } else { - MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 1); - MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 1); + + frag->endpoint = endpoint; + + + frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low; + frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval; + frag->sr_desc.r_key = frag->base.des_dst->seg_key.key32[0]; + frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval; + frag->sg_entry.len = frag->base.des_src->seg_len; + + frag->ret = VAPI_post_sr(mvapi_btl->nic, + endpoint->lcl_qp_hndl_low, + &frag->sr_desc); + if(VAPI_OK != frag->ret){ + rc = OMPI_ERROR; + } else { + rc = OMPI_SUCCESS; + } + if(mca_btl_mvapi_component.use_srq) { + MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 1); + MCA_BTL_MVAPI_POST_SRR_LOW(mvapi_btl, 1); + } else { + MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 1); + MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 1); + } } - return OMPI_SUCCESS; - + OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + return rc; + } /* @@ -808,33 +822,45 @@ int mca_btl_mvapi_get( mca_btl_base_module_t* btl, mca_btl_base_endpoint_t* endpoint, mca_btl_base_descriptor_t* descriptor) { + int rc; mca_btl_mvapi_module_t* mvapi_btl = (mca_btl_mvapi_module_t*) btl; mca_btl_mvapi_frag_t* frag = (mca_btl_mvapi_frag_t*) descriptor; - frag->endpoint = endpoint; - frag->sr_desc.opcode = VAPI_RDMA_READ; - frag->sr_desc.remote_qp = endpoint->rem_qp_num_low; - frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval; - frag->sr_desc.r_key = frag->base.des_src->seg_key.key32[0]; - frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval; - frag->sg_entry.len = frag->base.des_dst->seg_len; - - frag->ret = VAPI_post_sr(mvapi_btl->nic, - endpoint->lcl_qp_hndl_low, - &frag->sr_desc); - if(VAPI_OK != frag->ret){ - return OMPI_ERROR; - } - if(mca_btl_mvapi_component.use_srq) { - MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 1); - MCA_BTL_MVAPI_POST_SRR_LOW(mvapi_btl, 1); + frag->sr_desc.opcode = VAPI_RDMA_READ; + OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock); + /* atomically test and acquire a token */ + if(OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) { + BTL_VERBOSE(("Queing because no rdma write tokens \n")); + BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_lp, endpoint->wr_sq_tokens_lp, rc); } else { - MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 1); - MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 1); - } - return OMPI_SUCCESS; + frag->endpoint = endpoint; + frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low; + frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval; + frag->sr_desc.r_key = frag->base.des_src->seg_key.key32[0]; + frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval; + frag->sg_entry.len = frag->base.des_dst->seg_len; + frag->ret = VAPI_post_sr(mvapi_btl->nic, + endpoint->lcl_qp_hndl_low, + &frag->sr_desc); + if(VAPI_OK != frag->ret){ + rc = OMPI_ERROR; + } else { + rc = OMPI_SUCCESS; + } + if(mca_btl_mvapi_component.use_srq) { + MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 1); + MCA_BTL_MVAPI_POST_SRR_LOW(mvapi_btl, 1); + } else { + MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 1); + MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 1); + } + } + OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + return rc; + } + /* diff --git a/ompi/mca/btl/mvapi/btl_mvapi.h b/ompi/mca/btl/mvapi/btl_mvapi.h index 6bf1b844d7..6a1e41f5ed 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi.h +++ b/ompi/mca/btl/mvapi/btl_mvapi.h @@ -135,7 +135,7 @@ struct mca_btl_mvapi_module_t { mca_btl_base_module_t super; /**< base PTL interface */ bool btl_inited; mca_btl_mvapi_recv_reg_t ib_reg[256]; - mca_btl_mvapi_addr_t mvapi_addr; /* contains only the subnet right now */ + mca_btl_mvapi_port_info_t port_info; /* contains only the subnet right now */ VAPI_hca_id_t hca_id; /**< ID of HCA */ IB_port_t port_id; /**< ID of the PORT */ VAPI_hca_port_t port; /**< IB port of this PTL */ diff --git a/ompi/mca/btl/mvapi/btl_mvapi_component.c b/ompi/mca/btl/mvapi/btl_mvapi_component.c index d598126e2e..09a3d5db29 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_component.c +++ b/ompi/mca/btl/mvapi/btl_mvapi_component.c @@ -233,7 +233,7 @@ int mca_btl_mvapi_component_close(void) /* - * Register GM component addressing information. The MCA framework + * Register MVAPI port information. The MCA framework * will make this available to all peers. */ @@ -243,20 +243,20 @@ mca_btl_mvapi_modex_send(void) int rc; size_t i; size_t size; - mca_btl_mvapi_addr_t *addrs; + mca_btl_mvapi_port_info_t *ports; - size = mca_btl_mvapi_component.ib_num_btls * sizeof (mca_btl_mvapi_addr_t); - addrs = (mca_btl_mvapi_addr_t *)malloc (size); - if (NULL == addrs) { + size = mca_btl_mvapi_component.ib_num_btls * sizeof (mca_btl_mvapi_port_info_t); + ports = (mca_btl_mvapi_port_info_t *)malloc (size); + if (NULL == ports) { return OMPI_ERR_OUT_OF_RESOURCE; } for (i = 0; i < mca_btl_mvapi_component.ib_num_btls; i++) { mca_btl_mvapi_module_t *btl = &mca_btl_mvapi_component.mvapi_btls[i]; - addrs[i] = btl->mvapi_addr; + ports[i] = btl->port_info; } - rc = mca_pml_base_modex_send (&mca_btl_mvapi_component.super.btl_version, addrs, size); - free (addrs); + rc = mca_pml_base_modex_send (&mca_btl_mvapi_component.super.btl_version, ports, size); + free (ports); return rc; } @@ -353,7 +353,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, mvapi_btl->nic = hca_hndl; mvapi_btl->port_id = (IB_port_t) j; mvapi_btl->port = hca_port; - mvapi_btl->mvapi_addr.subnet = hca_port.sm_lid; + mvapi_btl->port_info.subnet = hca_port.sm_lid; opal_list_append(&btl_list, (opal_list_item_t*) ib_selected); mca_btl_mvapi_component.ib_num_btls ++; @@ -507,6 +507,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules, /* Post OOB receive to support dynamic connection setup */ mca_btl_mvapi_post_recv(); + mca_btl_mvapi_modex_send(); *num_btl_modules = mca_btl_mvapi_component.ib_num_btls; free(hca_ids); @@ -554,13 +555,13 @@ int mca_btl_mvapi_component_progress() case VAPI_CQE_SQ_RDMA_READ: case VAPI_CQE_SQ_RDMA_WRITE: frag = (mca_btl_mvapi_frag_t*) comp.id; - OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_hp, 1); /* Process a completed send or an rdma write */ frag->rc = OMPI_SUCCESS; frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, frag->rc); count++; /* check and see if we need to progress pending sends */ - if(frag->endpoint->wr_sq_tokens_hp && !opal_list_is_empty(&(frag->endpoint->pending_frags_hp))) { + if(OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_hp, 1) > 0 + && !opal_list_is_empty(&(frag->endpoint->pending_frags_hp))) { opal_list_item_t *frag_item; frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_hp)); frag = (mca_btl_mvapi_frag_t *) frag_item; @@ -624,18 +625,37 @@ int mca_btl_mvapi_component_progress() /* Process a completed send */ frag = (mca_btl_mvapi_frag_t*) comp.id; - OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_lp, 1); frag->rc = OMPI_SUCCESS; frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, frag->rc); count++; /* check and see if we need to progress pending sends */ - if(frag->endpoint->wr_sq_tokens_lp && !opal_list_is_empty(&(frag->endpoint->pending_frags_lp))) { + if(OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_lp, 1) > 0 + && !opal_list_is_empty(&(frag->endpoint->pending_frags_lp))) { opal_list_item_t *frag_item; frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_lp)); frag = (mca_btl_mvapi_frag_t *) frag_item; - - if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) { - BTL_ERROR(("error in posting pending send\n")); + switch(frag->sr_desc.opcode){ + case VAPI_SEND: + if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) { + BTL_ERROR(("error in posting pending send\n")); + } + break; + case VAPI_RDMA_WRITE: + if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t*) mvapi_btl, + frag->endpoint, + (mca_btl_base_descriptor_t*) frag)) { + BTL_ERROR(("error in posting pending rdma write\n")); + } + break; + case VAPI_RDMA_READ: + if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t *) mvapi_btl, + frag->endpoint, + (mca_btl_base_descriptor_t*) frag)) { + BTL_ERROR(("error in posting pending rdma read\n")); + } + break; + default: + BTL_ERROR(("error in posting pending operation, invalide opcode %d\n", frag->sr_desc.opcode)); } } diff --git a/ompi/mca/btl/mvapi/btl_mvapi_endpoint.c b/ompi/mca/btl/mvapi/btl_mvapi_endpoint.c index 493eba6347..03e8fe325c 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_endpoint.c +++ b/ompi/mca/btl/mvapi/btl_mvapi_endpoint.c @@ -58,21 +58,21 @@ int mca_btl_mvapi_endpoint_qp_init_query( static inline int mca_btl_mvapi_endpoint_post_send(mca_btl_mvapi_module_t* mvapi_btl, mca_btl_mvapi_endpoint_t * endpoint, mca_btl_mvapi_frag_t * frag) { - + int rc; + VAPI_qp_hndl_t qp_hndl; frag->sr_desc.remote_qkey = 0; frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->hdr; + frag->sr_desc.opcode = VAPI_SEND; - VAPI_qp_hndl_t qp_hndl; if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY && frag->size <= mvapi_btl->super.btl_eager_limit){ /* atomically test and acquire a token */ if(OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_hp,-1) < 0) { BTL_VERBOSE(("Queing because no send tokens \n")); - opal_list_append(&endpoint->pending_frags_hp, (opal_list_item_t *)frag); - OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_hp, 1); + BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_hp, endpoint->wr_sq_tokens_hp, rc); return OMPI_SUCCESS; } else { - frag->sr_desc.remote_qp = endpoint->rem_qp_num_high; + frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_high; qp_hndl = endpoint->lcl_qp_hndl_high; } @@ -81,15 +81,14 @@ static inline int mca_btl_mvapi_endpoint_post_send(mca_btl_mvapi_module_t* mvapi /* atomically test and acquire a token */ if(OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) { BTL_VERBOSE(("Queing because no send tokens \n")); - opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t *)frag); - OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,1); + BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_hp, endpoint->wr_sq_tokens_hp, rc); return OMPI_SUCCESS; } else { - frag->sr_desc.remote_qp = endpoint->rem_qp_num_low; + frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low; qp_hndl = endpoint->lcl_qp_hndl_low; } } - frag->sr_desc.opcode = VAPI_SEND; + frag->sg_entry.len = frag->segment.seg_len + ((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr); @@ -149,7 +148,10 @@ static void mca_btl_mvapi_endpoint_construct(mca_btl_base_endpoint_t* endpoint) /* initialize the high and low priority tokens */ endpoint->wr_sq_tokens_hp = mca_btl_mvapi_component.max_wr_sq_tokens; endpoint->wr_sq_tokens_lp = mca_btl_mvapi_component.max_wr_sq_tokens; - + endpoint->rem_info.rem_qp_num_high = 0; + endpoint->rem_info.rem_qp_num_low = 0; + endpoint->rem_info.rem_lid = 0; + endpoint->rem_info.rem_subnet = 0; } /* @@ -177,7 +179,7 @@ static void mca_btl_mvapi_endpoint_send_cb( } -static int mca_btl_mvapi_endpoint_send_connect_req(mca_btl_base_endpoint_t* endpoint) +static int mca_btl_mvapi_endpoint_send_connect_data(mca_btl_base_endpoint_t* endpoint) { orte_buffer_t* buffer = OBJ_NEW(orte_buffer_t); int rc; @@ -204,6 +206,13 @@ static int mca_btl_mvapi_endpoint_send_connect_req(mca_btl_base_endpoint_t* endp ORTE_ERROR_LOG(rc); return rc; } + + rc = orte_dps.pack(buffer, &((mca_btl_mvapi_endpoint_t*)endpoint)->subnet, 1, ORTE_UINT32); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* send to endpoint */ rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buffer, ORTE_RML_TAG_DYNAMIC-1, 0, @@ -227,34 +236,34 @@ static int mca_btl_mvapi_endpoint_send_connect_req(mca_btl_base_endpoint_t* endp * */ -static int mca_btl_mvapi_endpoint_send_connect_ack(mca_btl_base_endpoint_t* endpoint) -{ - orte_buffer_t* buffer = OBJ_NEW(orte_buffer_t); - int rc; - uint32_t zero = 0; +/* static int mca_btl_mvapi_endpoint_send_connect_data(mca_btl_base_endpoint_t* endpoint) */ +/* { */ +/* orte_buffer_t* buffer = OBJ_NEW(orte_buffer_t); */ +/* int rc; */ +/* uint32_t zero = 0; */ - /* pack the info in the send buffer */ - if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { - ORTE_ERROR_LOG(rc); - return rc; - } - if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { - ORTE_ERROR_LOG(rc); - return rc; - } - if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { - ORTE_ERROR_LOG(rc); - return rc; - } +/* /\* pack the info in the send buffer *\/ */ +/* if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { */ +/* ORTE_ERROR_LOG(rc); */ +/* return rc; */ +/* } */ +/* if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { */ +/* ORTE_ERROR_LOG(rc); */ +/* return rc; */ +/* } */ +/* if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { */ +/* ORTE_ERROR_LOG(rc); */ +/* return rc; */ +/* } */ - /* send to endpoint */ - rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buffer, ORTE_RML_TAG_DYNAMIC-1, 0, - mca_btl_mvapi_endpoint_send_cb, NULL); - if(rc < 0) { - ORTE_ERROR_LOG(rc); - } - return rc; -} +/* /\* send to endpoint *\/ */ +/* rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buffer, ORTE_RML_TAG_DYNAMIC-1, 0, */ +/* mca_btl_mvapi_endpoint_send_cb, NULL); */ +/* if(rc < 0) { */ +/* ORTE_ERROR_LOG(rc); */ +/* } */ +/* return rc; */ +/* } */ /* * Set remote connection info @@ -264,32 +273,15 @@ static int mca_btl_mvapi_endpoint_send_connect_ack(mca_btl_base_endpoint_t* endp * setup. * */ -static int mca_btl_mvapi_endpoint_set_remote_info(mca_btl_base_endpoint_t* endpoint, orte_buffer_t* buffer) +static int mca_btl_mvapi_endpoint_set_remote_info(mca_btl_base_endpoint_t* endpoint, mca_btl_mvapi_rem_info_t* rem_info) { - int rc; - - - - size_t cnt = 1; - rc = orte_dps.unpack(buffer, &endpoint->rem_qp_num_high, &cnt, ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return rc; - } - rc = orte_dps.unpack(buffer, &endpoint->rem_qp_num_low, &cnt, ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return rc; - } - rc = orte_dps.unpack(buffer, &endpoint->rem_lid, &cnt, ORTE_UINT32); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - return rc; - } - BTL_VERBOSE(("Received High Priority QP num = %d, Low Priority QP num %d, LID = %d", - endpoint->rem_qp_num_high, - endpoint->rem_qp_num_low, - endpoint->rem_lid)); + + memcpy(&((mca_btl_mvapi_endpoint_t*) endpoint)->rem_info, rem_info, sizeof(mca_btl_mvapi_rem_info_t)); + + BTL_VERBOSE(("Setting High Priority QP num = %d, Low Priority QP num %d, LID = %d", + endpoint->rem_info.rem_qp_num_high, + endpoint->rem_info.rem_qp_num_low, + endpoint->rem_info.rem_lid)); return ORTE_SUCCESS; } @@ -345,7 +337,7 @@ static int mca_btl_mvapi_endpoint_start_connect(mca_btl_base_endpoint_t* endpoin /* Send connection info over to remote endpoint */ endpoint->endpoint_state = MCA_BTL_IB_CONNECTING; - if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_send_connect_req(endpoint))) { + if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_send_connect_data(endpoint))) { BTL_ERROR(("error sending connect request, error code %d", rc)); return rc; } @@ -356,7 +348,8 @@ static int mca_btl_mvapi_endpoint_start_connect(mca_btl_base_endpoint_t* endpoin * Reply to a `start - connect' message * */ -static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t *endpoint, orte_buffer_t* buffer) +static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t *endpoint, + mca_btl_mvapi_rem_info_t* rem_info) { int rc; @@ -397,8 +390,8 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t * /* Set the remote side info */ - mca_btl_mvapi_endpoint_set_remote_info(endpoint, buffer); - + mca_btl_mvapi_endpoint_set_remote_info(endpoint, rem_info); + /* Connect to endpoint */ rc = mca_btl_mvapi_endpoint_connect(endpoint); @@ -408,7 +401,7 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t * } /* Send connection info over to remote endpoint */ - if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_send_connect_req(endpoint))) { + if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_send_connect_data(endpoint))) { BTL_ERROR(("error in endpoint send connect request error code is %d", rc)); return rc; } @@ -447,8 +440,43 @@ static void mca_btl_mvapi_endpoint_recv( mca_btl_mvapi_proc_t *ib_proc; mca_btl_mvapi_endpoint_t *ib_endpoint; int endpoint_state; - int rc; + int rc; + uint32_t i; + size_t cnt = 1; + mca_btl_mvapi_rem_info_t rem_info; + + /* start by unpacking data first so we know who is knocking at + our door */ + rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_high, &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_low, &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dps.unpack(buffer, &rem_info.rem_lid, &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + rc = orte_dps.unpack(buffer, &rem_info.rem_subnet, &cnt, ORTE_UINT32); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + + BTL_VERBOSE(("Received High Priority QP num = %d, Low Priority QP num %d, LID = %d", + rem_info.rem_qp_num_high, + rem_info.rem_qp_num_low, + rem_info.rem_lid)); + + + + for(ib_proc = (mca_btl_mvapi_proc_t*) opal_list_get_first(&mca_btl_mvapi_component.ib_procs); ib_proc != (mca_btl_mvapi_proc_t*) @@ -456,16 +484,57 @@ static void mca_btl_mvapi_endpoint_recv( ib_proc = (mca_btl_mvapi_proc_t*)opal_list_get_next(ib_proc)) { if(ib_proc->proc_guid.vpid == endpoint->vpid) { + bool found = false; + + /* ib_endpoint = ib_proc->proc_endpoints[0] */ /* Try to get the endpoint instance of this proc */ - - /* Limitation: Right now, we have only 1 endpoint - * for every process. Need several changes, some - * in PML/BTL interface to set this right */ - ib_endpoint = ib_proc->proc_endpoints[0]; - + /* first match the endpoint based on lid meaning we've seen */ + /* this endpoint before.. */ + for(i = 0; i < ib_proc->proc_endpoint_count; i++) { + mca_btl_mvapi_port_info_t port_info; + port_info = ib_proc->proc_ports[i]; + ib_endpoint = ib_proc->proc_endpoints[i]; + if(ib_endpoint->rem_info.rem_lid && + ib_endpoint->rem_info.rem_lid == rem_info.rem_lid) { + /* we've seen them before! */ + found = true; + break; + } + } + /* If we haven't seen this remote lid before then try to match on + endpoint */ + for(i = 0; !found && i < ib_proc->proc_endpoint_count; i++) { + mca_btl_mvapi_port_info_t port_info; + port_info = ib_proc->proc_ports[i]; + ib_endpoint = ib_proc->proc_endpoints[i]; + if(!ib_endpoint->rem_info.rem_lid && + ib_endpoint->rem_info.rem_subnet == rem_info.rem_subnet) { + /* found a match based on subnet! */ + found = true; + break; + } + } + /* try finding an open port, even if subnets + don't match + */ + for(i = 0; !found && i < ib_proc->proc_endpoint_count; i++) { + mca_btl_mvapi_port_info_t port_info; + port_info = ib_proc->proc_ports[i]; + ib_endpoint = ib_proc->proc_endpoints[i]; + if(!ib_endpoint->rem_info.rem_lid) { + /* found an unused end-point */ + found = true; + break; + } + } + + if(!found) { + BTL_ERROR(("can't find suitable endpoint for this peer\n")); + return; + } endpoint_state = ib_endpoint->endpoint_state; - + /* Update status */ switch(endpoint_state) { case MCA_BTL_IB_CLOSED : @@ -474,18 +543,18 @@ static void mca_btl_mvapi_endpoint_recv( * status of this connection to CONNECTING, * and then reply with our QP information */ - if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_reply_start_connect(ib_endpoint, buffer))) { + if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_reply_start_connect(ib_endpoint, &rem_info))) { BTL_ERROR(("error in endpoint reply start connect")); break; } - + /* Setup state as connected */ ib_endpoint->endpoint_state = MCA_BTL_IB_CONNECT_ACK; break; case MCA_BTL_IB_CONNECTING : - mca_btl_mvapi_endpoint_set_remote_info(ib_endpoint, buffer); + mca_btl_mvapi_endpoint_set_remote_info(ib_endpoint, &rem_info); if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_connect(ib_endpoint))) { BTL_ERROR(("endpoint connect error: %d", rc)); break; @@ -495,7 +564,7 @@ static void mca_btl_mvapi_endpoint_recv( mca_btl_mvapi_endpoint_waiting_ack(ib_endpoint); /* Send him an ack */ - mca_btl_mvapi_endpoint_send_connect_ack(ib_endpoint); + mca_btl_mvapi_endpoint_send_connect_data(ib_endpoint); break; case MCA_BTL_IB_WAITING_ACK: @@ -505,7 +574,7 @@ static void mca_btl_mvapi_endpoint_recv( case MCA_BTL_IB_CONNECT_ACK: mca_btl_mvapi_endpoint_connected(ib_endpoint); - mca_btl_mvapi_endpoint_send_connect_ack(ib_endpoint); + mca_btl_mvapi_endpoint_send_connect_data(ib_endpoint); break; case MCA_BTL_IB_CONNECTED : @@ -649,18 +718,18 @@ int mca_btl_mvapi_endpoint_connect( int rc; /* Connection establishment RC */ rc = mca_btl_mvapi_endpoint_qp_init_query(endpoint->endpoint_btl, - endpoint->endpoint_btl->nic, - endpoint->lcl_qp_hndl_high, - endpoint->rem_qp_num_high, - endpoint->rem_lid, - endpoint->endpoint_btl->port_id); + endpoint->endpoint_btl->nic, + endpoint->lcl_qp_hndl_high, + endpoint->rem_info.rem_qp_num_high, + endpoint->rem_info.rem_lid, + endpoint->endpoint_btl->port_id); rc = mca_btl_mvapi_endpoint_qp_init_query(endpoint->endpoint_btl, - endpoint->endpoint_btl->nic, - endpoint->lcl_qp_hndl_low, - endpoint->rem_qp_num_low, - endpoint->rem_lid, - endpoint->endpoint_btl->port_id); + endpoint->endpoint_btl->nic, + endpoint->lcl_qp_hndl_low, + endpoint->rem_info.rem_qp_num_low, + endpoint->rem_info.rem_lid, + endpoint->endpoint_btl->port_id); @@ -699,6 +768,7 @@ int mca_btl_mvapi_endpoint_create_qp( VAPI_ret_t ret; VAPI_qp_init_attr_t qp_init_attr; VAPI_qp_init_attr_ext_t qp_init_attr_ext; + switch(transport_type) { case VAPI_TS_RC: /* Set up RC qp parameters */ diff --git a/ompi/mca/btl/mvapi/btl_mvapi_endpoint.h b/ompi/mca/btl/mvapi/btl_mvapi_endpoint.h index 392438de08..ba481c764f 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_endpoint.h +++ b/ompi/mca/btl/mvapi/btl_mvapi_endpoint.h @@ -38,10 +38,10 @@ OBJ_CLASS_DECLARATION(mca_btl_mvapi_endpoint_t); struct mca_btl_mvapi_frag_t; -struct mca_btl_mvapi_addr_t { +struct mca_btl_mvapi_port_info_t { uint32_t subnet; }; -typedef struct mca_btl_mvapi_addr_t mca_btl_mvapi_addr_t; +typedef struct mca_btl_mvapi_port_info_t mca_btl_mvapi_port_info_t; /** @@ -72,6 +72,24 @@ typedef enum { MCA_BTL_IB_FAILED } mca_btl_mvapi_endpoint_state_t; + +struct mca_btl_mvapi_rem_info_t { + + VAPI_qp_num_t rem_qp_num_high; + /* High priority remote side QP number */ + + VAPI_qp_num_t rem_qp_num_low; + /* Low prioirty remote size QP number */ + + IB_lid_t rem_lid; + /* Local identifier of the remote process */ + + uint32_t rem_subnet; + /* subnet of remote process */ +} ; +typedef struct mca_btl_mvapi_rem_info_t mca_btl_mvapi_rem_info_t; + + /** * An abstraction that represents a connection to a endpoint process. * An instance of mca_btl_base_endpoint_t is associated w/ each process @@ -119,14 +137,7 @@ struct mca_btl_base_endpoint_t { uint32_t wr_sq_tokens_lp; /**< number of low priority frags that can be outstanding (down counter) */ - VAPI_qp_num_t rem_qp_num_high; - /* High priority remote side QP number */ - - VAPI_qp_num_t rem_qp_num_low; - /* Low prioirty remote size QP number */ - - IB_lid_t rem_lid; - /* Local identifier of the remote process */ + mca_btl_mvapi_rem_info_t rem_info; VAPI_qp_hndl_t lcl_qp_hndl_high; /* High priority local QP handle */ @@ -144,13 +155,12 @@ struct mca_btl_base_endpoint_t { uint32_t rr_posted_high; /**< number of high priority rr posted to the nic*/ uint32_t rr_posted_low; /**< number of low priority rr posted to the nic*/ - mca_btl_mvapi_addr_t endpoint_addr; + uint32_t subnet; }; typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t; typedef mca_btl_base_endpoint_t mca_btl_mvapi_endpoint_t; - int mca_btl_mvapi_endpoint_send(mca_btl_base_endpoint_t* endpoint, struct mca_btl_mvapi_frag_t* frag); int mca_btl_mvapi_endpoint_connect(mca_btl_base_endpoint_t*); void mca_btl_mvapi_post_recv(void); @@ -229,84 +239,14 @@ void mca_btl_mvapi_progress_send_frags(mca_btl_mvapi_endpoint_t*); }\ } -/* static inline int mca_btl_mvapi_endpoint_post_rr_sub(int cnt, */ -/* mca_btl_mvapi_endpoint_t* endpoint, */ -/* ompi_free_list_t* frag_list, */ -/* uint32_t* rr_posted, */ -/* VAPI_hca_hndl_t nic, */ -/* VAPI_qp_hndl_t qp_hndl */ -/* ) */ -/* { */ - -/* int rc, i; */ -/* opal_list_item_t* item; */ -/* mca_btl_mvapi_frag_t* frag; */ - -/* mca_btl_mvapi_module_t *mvapi_btl = endpoint->endpoint_btl; */ -/* VAPI_rr_desc_t* rr_desc_post = mvapi_btl->rr_desc_post; */ - -/* /\* prepare frags and post receive requests *\/ */ -/* for(i = 0; i < cnt; i++) { */ -/* OMPI_FREE_LIST_WAIT(frag_list, item, rc); */ -/* frag = (mca_btl_mvapi_frag_t*) item; */ -/* frag->endpoint = endpoint; */ -/* frag->sg_entry.len = frag->size + ((unsigned char*) frag->segment.seg_addr.pval- (unsigned char*) frag->hdr); /\* sizeof(mca_btl_mvapi_header_t); *\/ */ -/* rr_desc_post[i] = frag->rr_desc; */ - -/* } */ - -/* frag->ret = EVAPI_post_rr_list(nic, */ -/* qp_hndl, */ -/* cnt, */ -/* rr_desc_post); */ -/* if(VAPI_OK != frag->ret) { */ -/* BTL_ERROR(("error posting receive descriptors: %s", VAPI_strerror(frag->ret))); */ -/* return OMPI_ERROR; */ -/* } */ -/* OPAL_THREAD_ADD32(rr_posted, cnt); */ -/* return OMPI_SUCCESS; */ -/* } */ - -/* static inline int mca_btl_mvapi_endpoint_post_rr( mca_btl_mvapi_endpoint_t * endpoint, int additional){ */ -/* mca_btl_mvapi_module_t * mvapi_btl = endpoint->endpoint_btl; */ -/* int rc; */ -/* OPAL_THREAD_LOCK(&endpoint->ib_lock); */ - -/* if(endpoint->rr_posted_high <= mca_btl_mvapi_component.ib_rr_buf_min+additional && endpoint->rr_posted_high < mca_btl_mvapi_component.ib_rr_buf_max){ */ - -/* rc = mca_btl_mvapi_endpoint_post_rr_sub(mca_btl_mvapi_component.ib_rr_buf_max - endpoint->rr_posted_high, */ -/* endpoint, */ -/* &mvapi_btl->recv_free_eager, */ -/* &endpoint->rr_posted_high, */ -/* mvapi_btl->nic, */ -/* endpoint->lcl_qp_hndl_high */ -/* ); */ -/* if(rc != OMPI_SUCCESS){ */ -/* OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); */ -/* return rc; */ -/* } */ -/* } */ -/* if(endpoint->rr_posted_low <= mca_btl_mvapi_component.ib_rr_buf_min+additional && endpoint->rr_posted_low < mca_btl_mvapi_component.ib_rr_buf_max){ */ - -/* rc = mca_btl_mvapi_endpoint_post_rr_sub(mca_btl_mvapi_component.ib_rr_buf_max - endpoint->rr_posted_low, */ -/* endpoint, */ -/* &mvapi_btl->recv_free_max, */ -/* &endpoint->rr_posted_low, */ -/* mvapi_btl->nic, */ -/* endpoint->lcl_qp_hndl_low */ -/* ); */ -/* if(rc != OMPI_SUCCESS) { */ -/* OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); */ -/* return rc; */ -/* } */ - -/* } */ -/* OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); */ -/* return OMPI_SUCCESS; */ - - -/* } */ - +#define BTL_MVAPI_INSERT_PENDING(frag, frag_list, tokens, rc) \ +{ \ + do{ \ + opal_list_append(&frag_list, (opal_list_item_t *)frag); \ + OPAL_THREAD_ADD32(&tokens, 1); \ + rc = OMPI_SUCCESS; \ + } while(0); \ +} #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/ompi/mca/btl/mvapi/btl_mvapi_proc.c b/ompi/mca/btl/mvapi/btl_mvapi_proc.c index 7d275be04e..3337e8bdcd 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_proc.c +++ b/ompi/mca/btl/mvapi/btl_mvapi_proc.c @@ -32,7 +32,7 @@ OBJ_CLASS_INSTANCE(mca_btl_mvapi_proc_t, void mca_btl_mvapi_proc_construct(mca_btl_mvapi_proc_t* proc) { proc->proc_ompi = 0; - proc->proc_addr_count = 0; + proc->proc_port_count = 0; proc->proc_endpoints = 0; proc->proc_endpoint_count = 0; OBJ_CONSTRUCT(&proc->proc_lock, opal_mutex_t); @@ -131,7 +131,7 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc) rc = mca_pml_base_modex_recv( &mca_btl_mvapi_component.super.btl_version, ompi_proc, - (void*)&mvapi_proc->proc_addrs, + (void*)&mvapi_proc->proc_ports, &size ); @@ -144,7 +144,7 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc) return NULL; } - if((size % sizeof(mca_btl_mvapi_addr_t)) != 0) { + if((size % sizeof(mca_btl_mvapi_port_info_t)) != 0) { opal_output(0, "[%s:%d] invalid mvapi address for peer [%d,%d,%d]", __FILE__,__LINE__,ORTE_NAME_ARGS(&ompi_proc->proc_name)); OBJ_RELEASE(mvapi_proc); @@ -152,16 +152,11 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc) } - mvapi_proc->proc_addr_count = size/sizeof(mca_btl_mvapi_addr_t); + mvapi_proc->proc_port_count = size/sizeof(mca_btl_mvapi_port_info_t); - /* XXX: Right now, there can be only 1 peer associated - * with a proc. Needs a little bit change in - * mca_btl_mvapi_proc_t to allow on demand increasing of - * number of endpoints for this proc */ - mvapi_proc->proc_endpoints = (mca_btl_base_endpoint_t**) - malloc(mvapi_proc->proc_addr_count * sizeof(mca_btl_base_endpoint_t*)); + malloc(mvapi_proc->proc_port_count * sizeof(mca_btl_base_endpoint_t*)); if(NULL == mvapi_proc->proc_endpoints) { OBJ_RELEASE(mvapi_proc); @@ -179,14 +174,12 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc) int mca_btl_mvapi_proc_insert(mca_btl_mvapi_proc_t* mvapi_proc, mca_btl_base_endpoint_t* mvapi_endpoint) { - mca_btl_mvapi_module_t* mvapi_btl = mvapi_endpoint->endpoint_btl; - + /* insert into endpoint array */ - if(mvapi_proc->proc_addr_count <= mvapi_proc->proc_endpoint_count) + if(mvapi_proc->proc_port_count <= mvapi_proc->proc_endpoint_count) return OMPI_ERR_OUT_OF_RESOURCE; mvapi_endpoint->endpoint_proc = mvapi_proc; - mvapi_endpoint->endpoint_addr = mvapi_proc->proc_addrs[mvapi_proc->proc_endpoint_count]; mvapi_proc->proc_endpoints[mvapi_proc->proc_endpoint_count++] = mvapi_endpoint; return OMPI_SUCCESS; diff --git a/ompi/mca/btl/mvapi/btl_mvapi_proc.h b/ompi/mca/btl/mvapi/btl_mvapi_proc.h index 48b3ae2dad..b409561d07 100644 --- a/ompi/mca/btl/mvapi/btl_mvapi_proc.h +++ b/ompi/mca/btl/mvapi/btl_mvapi_proc.h @@ -44,9 +44,9 @@ struct mca_btl_mvapi_proc_t { orte_process_name_t proc_guid; /**< globally unique identifier for the process */ - struct mca_btl_mvapi_addr_t* proc_addrs; - size_t proc_addr_count; - /**< number of addresses published by endpoint */ + struct mca_btl_mvapi_port_info_t* proc_ports; + size_t proc_port_count; + /**< number of ports published by endpoint */ struct mca_btl_base_endpoint_t **proc_endpoints; /**< array of endpoints that have been created to access this proc */