1
1
Additional work on multi-rail support. 

This commit was SVN r7139.
Этот коммит содержится в:
Galen Shipman 2005-09-02 03:04:28 +00:00
родитель 25787c4ca7
Коммит c8a23106c0
7 изменённых файлов: 305 добавлений и 256 удалений

Просмотреть файл

@ -102,6 +102,7 @@ int mca_btl_mvapi_add_procs(
}
ib_peer->endpoint_btl = mvapi_btl;
ib_peer->subnet = mvapi_btl->port_info.subnet;
rc = mca_btl_mvapi_proc_insert(ib_proc, ib_peer);
if(rc != OMPI_SUCCESS) {
OBJ_RELEASE(ib_peer);
@ -772,12 +773,21 @@ int mca_btl_mvapi_put( mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor)
{
int rc;
mca_btl_mvapi_module_t* mvapi_btl = (mca_btl_mvapi_module_t*) btl;
mca_btl_mvapi_frag_t* frag = (mca_btl_mvapi_frag_t*) descriptor;
frag->endpoint = endpoint;
frag->sr_desc.opcode = VAPI_RDMA_WRITE;
OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock);
/* atomically test and acquire a token */
if(OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) {
BTL_VERBOSE(("Queing because no rdma write tokens \n"));
BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_lp, endpoint->wr_sq_tokens_lp, rc);
} else {
frag->sr_desc.remote_qp = endpoint->rem_qp_num_low;
frag->endpoint = endpoint;
frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low;
frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval;
frag->sr_desc.r_key = frag->base.des_dst->seg_key.key32[0];
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval;
@ -787,7 +797,9 @@ int mca_btl_mvapi_put( mca_btl_base_module_t* btl,
endpoint->lcl_qp_hndl_low,
&frag->sr_desc);
if(VAPI_OK != frag->ret){
return OMPI_ERROR;
rc = OMPI_ERROR;
} else {
rc = OMPI_SUCCESS;
}
if(mca_btl_mvapi_component.use_srq) {
MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 1);
@ -796,7 +808,9 @@ int mca_btl_mvapi_put( mca_btl_base_module_t* btl,
MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 1);
MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 1);
}
return OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock);
return rc;
}
@ -808,12 +822,19 @@ int mca_btl_mvapi_get( mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor)
{
int rc;
mca_btl_mvapi_module_t* mvapi_btl = (mca_btl_mvapi_module_t*) btl;
mca_btl_mvapi_frag_t* frag = (mca_btl_mvapi_frag_t*) descriptor;
frag->endpoint = endpoint;
frag->sr_desc.opcode = VAPI_RDMA_READ;
frag->sr_desc.remote_qp = endpoint->rem_qp_num_low;
frag->sr_desc.opcode = VAPI_RDMA_READ;
OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock);
/* atomically test and acquire a token */
if(OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) {
BTL_VERBOSE(("Queing because no rdma write tokens \n"));
BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_lp, endpoint->wr_sq_tokens_lp, rc);
} else {
frag->endpoint = endpoint;
frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low;
frag->sr_desc.remote_addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_src->seg_addr.pval;
frag->sr_desc.r_key = frag->base.des_src->seg_key.key32[0];
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->base.des_dst->seg_addr.pval;
@ -823,7 +844,9 @@ int mca_btl_mvapi_get( mca_btl_base_module_t* btl,
endpoint->lcl_qp_hndl_low,
&frag->sr_desc);
if(VAPI_OK != frag->ret){
return OMPI_ERROR;
rc = OMPI_ERROR;
} else {
rc = OMPI_SUCCESS;
}
if(mca_btl_mvapi_component.use_srq) {
MCA_BTL_MVAPI_POST_SRR_HIGH(mvapi_btl, 1);
@ -832,11 +855,14 @@ int mca_btl_mvapi_get( mca_btl_base_module_t* btl,
MCA_BTL_MVAPI_ENDPOINT_POST_RR_HIGH(endpoint, 1);
MCA_BTL_MVAPI_ENDPOINT_POST_RR_LOW(endpoint, 1);
}
return OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock);
return rc;
}
/*
* Asynchronous event handler to detect unforseen
* events. Usually, such events are catastrophic.

Просмотреть файл

@ -135,7 +135,7 @@ struct mca_btl_mvapi_module_t {
mca_btl_base_module_t super; /**< base PTL interface */
bool btl_inited;
mca_btl_mvapi_recv_reg_t ib_reg[256];
mca_btl_mvapi_addr_t mvapi_addr; /* contains only the subnet right now */
mca_btl_mvapi_port_info_t port_info; /* contains only the subnet right now */
VAPI_hca_id_t hca_id; /**< ID of HCA */
IB_port_t port_id; /**< ID of the PORT */
VAPI_hca_port_t port; /**< IB port of this PTL */

Просмотреть файл

@ -233,7 +233,7 @@ int mca_btl_mvapi_component_close(void)
/*
* Register GM component addressing information. The MCA framework
* Register MVAPI port information. The MCA framework
* will make this available to all peers.
*/
@ -243,20 +243,20 @@ mca_btl_mvapi_modex_send(void)
int rc;
size_t i;
size_t size;
mca_btl_mvapi_addr_t *addrs;
mca_btl_mvapi_port_info_t *ports;
size = mca_btl_mvapi_component.ib_num_btls * sizeof (mca_btl_mvapi_addr_t);
addrs = (mca_btl_mvapi_addr_t *)malloc (size);
if (NULL == addrs) {
size = mca_btl_mvapi_component.ib_num_btls * sizeof (mca_btl_mvapi_port_info_t);
ports = (mca_btl_mvapi_port_info_t *)malloc (size);
if (NULL == ports) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i = 0; i < mca_btl_mvapi_component.ib_num_btls; i++) {
mca_btl_mvapi_module_t *btl = &mca_btl_mvapi_component.mvapi_btls[i];
addrs[i] = btl->mvapi_addr;
ports[i] = btl->port_info;
}
rc = mca_pml_base_modex_send (&mca_btl_mvapi_component.super.btl_version, addrs, size);
free (addrs);
rc = mca_pml_base_modex_send (&mca_btl_mvapi_component.super.btl_version, ports, size);
free (ports);
return rc;
}
@ -353,7 +353,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules,
mvapi_btl->nic = hca_hndl;
mvapi_btl->port_id = (IB_port_t) j;
mvapi_btl->port = hca_port;
mvapi_btl->mvapi_addr.subnet = hca_port.sm_lid;
mvapi_btl->port_info.subnet = hca_port.sm_lid;
opal_list_append(&btl_list, (opal_list_item_t*) ib_selected);
mca_btl_mvapi_component.ib_num_btls ++;
@ -507,6 +507,7 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules,
/* Post OOB receive to support dynamic connection setup */
mca_btl_mvapi_post_recv();
mca_btl_mvapi_modex_send();
*num_btl_modules = mca_btl_mvapi_component.ib_num_btls;
free(hca_ids);
@ -554,13 +555,13 @@ int mca_btl_mvapi_component_progress()
case VAPI_CQE_SQ_RDMA_READ:
case VAPI_CQE_SQ_RDMA_WRITE:
frag = (mca_btl_mvapi_frag_t*) comp.id;
OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_hp, 1);
/* Process a completed send or an rdma write */
frag->rc = OMPI_SUCCESS;
frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, frag->rc);
count++;
/* check and see if we need to progress pending sends */
if(frag->endpoint->wr_sq_tokens_hp && !opal_list_is_empty(&(frag->endpoint->pending_frags_hp))) {
if(OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_hp, 1) > 0
&& !opal_list_is_empty(&(frag->endpoint->pending_frags_hp))) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_hp));
frag = (mca_btl_mvapi_frag_t *) frag_item;
@ -624,19 +625,38 @@ int mca_btl_mvapi_component_progress()
/* Process a completed send */
frag = (mca_btl_mvapi_frag_t*) comp.id;
OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_lp, 1);
frag->rc = OMPI_SUCCESS;
frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, frag->rc);
count++;
/* check and see if we need to progress pending sends */
if(frag->endpoint->wr_sq_tokens_lp && !opal_list_is_empty(&(frag->endpoint->pending_frags_lp))) {
if(OPAL_THREAD_ADD32(&frag->endpoint->wr_sq_tokens_lp, 1) > 0
&& !opal_list_is_empty(&(frag->endpoint->pending_frags_lp))) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&(frag->endpoint->pending_frags_lp));
frag = (mca_btl_mvapi_frag_t *) frag_item;
switch(frag->sr_desc.opcode){
case VAPI_SEND:
if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
}
break;
case VAPI_RDMA_WRITE:
if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t*) mvapi_btl,
frag->endpoint,
(mca_btl_base_descriptor_t*) frag)) {
BTL_ERROR(("error in posting pending rdma write\n"));
}
break;
case VAPI_RDMA_READ:
if(OMPI_SUCCESS != mca_btl_mvapi_put((mca_btl_base_module_t *) mvapi_btl,
frag->endpoint,
(mca_btl_base_descriptor_t*) frag)) {
BTL_ERROR(("error in posting pending rdma read\n"));
}
break;
default:
BTL_ERROR(("error in posting pending operation, invalide opcode %d\n", frag->sr_desc.opcode));
}
}
break;

Просмотреть файл

@ -58,21 +58,21 @@ int mca_btl_mvapi_endpoint_qp_init_query(
static inline int mca_btl_mvapi_endpoint_post_send(mca_btl_mvapi_module_t* mvapi_btl, mca_btl_mvapi_endpoint_t * endpoint, mca_btl_mvapi_frag_t * frag)
{
int rc;
VAPI_qp_hndl_t qp_hndl;
frag->sr_desc.remote_qkey = 0;
frag->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) frag->hdr;
frag->sr_desc.opcode = VAPI_SEND;
VAPI_qp_hndl_t qp_hndl;
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY && frag->size <= mvapi_btl->super.btl_eager_limit){
/* atomically test and acquire a token */
if(OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_hp,-1) < 0) {
BTL_VERBOSE(("Queing because no send tokens \n"));
opal_list_append(&endpoint->pending_frags_hp, (opal_list_item_t *)frag);
OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_hp, 1);
BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_hp, endpoint->wr_sq_tokens_hp, rc);
return OMPI_SUCCESS;
} else {
frag->sr_desc.remote_qp = endpoint->rem_qp_num_high;
frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_high;
qp_hndl = endpoint->lcl_qp_hndl_high;
}
@ -81,15 +81,14 @@ static inline int mca_btl_mvapi_endpoint_post_send(mca_btl_mvapi_module_t* mvapi
/* atomically test and acquire a token */
if(OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,-1) < 0) {
BTL_VERBOSE(("Queing because no send tokens \n"));
opal_list_append(&endpoint->pending_frags_lp, (opal_list_item_t *)frag);
OPAL_THREAD_ADD32(&endpoint->wr_sq_tokens_lp,1);
BTL_MVAPI_INSERT_PENDING(frag, endpoint->pending_frags_hp, endpoint->wr_sq_tokens_hp, rc);
return OMPI_SUCCESS;
} else {
frag->sr_desc.remote_qp = endpoint->rem_qp_num_low;
frag->sr_desc.remote_qp = endpoint->rem_info.rem_qp_num_low;
qp_hndl = endpoint->lcl_qp_hndl_low;
}
}
frag->sr_desc.opcode = VAPI_SEND;
frag->sg_entry.len = frag->segment.seg_len +
((unsigned char*) frag->segment.seg_addr.pval - (unsigned char*) frag->hdr);
@ -149,7 +148,10 @@ static void mca_btl_mvapi_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
/* initialize the high and low priority tokens */
endpoint->wr_sq_tokens_hp = mca_btl_mvapi_component.max_wr_sq_tokens;
endpoint->wr_sq_tokens_lp = mca_btl_mvapi_component.max_wr_sq_tokens;
endpoint->rem_info.rem_qp_num_high = 0;
endpoint->rem_info.rem_qp_num_low = 0;
endpoint->rem_info.rem_lid = 0;
endpoint->rem_info.rem_subnet = 0;
}
/*
@ -177,7 +179,7 @@ static void mca_btl_mvapi_endpoint_send_cb(
}
static int mca_btl_mvapi_endpoint_send_connect_req(mca_btl_base_endpoint_t* endpoint)
static int mca_btl_mvapi_endpoint_send_connect_data(mca_btl_base_endpoint_t* endpoint)
{
orte_buffer_t* buffer = OBJ_NEW(orte_buffer_t);
int rc;
@ -205,6 +207,13 @@ static int mca_btl_mvapi_endpoint_send_connect_req(mca_btl_base_endpoint_t* endp
return rc;
}
rc = orte_dps.pack(buffer, &((mca_btl_mvapi_endpoint_t*)endpoint)->subnet, 1, ORTE_UINT32);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* send to endpoint */
rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buffer, ORTE_RML_TAG_DYNAMIC-1, 0,
mca_btl_mvapi_endpoint_send_cb, NULL);
@ -227,34 +236,34 @@ static int mca_btl_mvapi_endpoint_send_connect_req(mca_btl_base_endpoint_t* endp
*
*/
static int mca_btl_mvapi_endpoint_send_connect_ack(mca_btl_base_endpoint_t* endpoint)
{
orte_buffer_t* buffer = OBJ_NEW(orte_buffer_t);
int rc;
uint32_t zero = 0;
/* static int mca_btl_mvapi_endpoint_send_connect_data(mca_btl_base_endpoint_t* endpoint) */
/* { */
/* orte_buffer_t* buffer = OBJ_NEW(orte_buffer_t); */
/* int rc; */
/* uint32_t zero = 0; */
/* pack the info in the send buffer */
if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* /\* pack the info in the send buffer *\/ */
/* if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { */
/* ORTE_ERROR_LOG(rc); */
/* return rc; */
/* } */
/* if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { */
/* ORTE_ERROR_LOG(rc); */
/* return rc; */
/* } */
/* if(ORTE_SUCCESS != (rc = orte_dps.pack(buffer, &zero, 1, ORTE_UINT32))) { */
/* ORTE_ERROR_LOG(rc); */
/* return rc; */
/* } */
/* send to endpoint */
rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buffer, ORTE_RML_TAG_DYNAMIC-1, 0,
mca_btl_mvapi_endpoint_send_cb, NULL);
if(rc < 0) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* /\* send to endpoint *\/ */
/* rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buffer, ORTE_RML_TAG_DYNAMIC-1, 0, */
/* mca_btl_mvapi_endpoint_send_cb, NULL); */
/* if(rc < 0) { */
/* ORTE_ERROR_LOG(rc); */
/* } */
/* return rc; */
/* } */
/*
* Set remote connection info
@ -264,32 +273,15 @@ static int mca_btl_mvapi_endpoint_send_connect_ack(mca_btl_base_endpoint_t* endp
* setup.
*
*/
static int mca_btl_mvapi_endpoint_set_remote_info(mca_btl_base_endpoint_t* endpoint, orte_buffer_t* buffer)
static int mca_btl_mvapi_endpoint_set_remote_info(mca_btl_base_endpoint_t* endpoint, mca_btl_mvapi_rem_info_t* rem_info)
{
int rc;
memcpy(&((mca_btl_mvapi_endpoint_t*) endpoint)->rem_info, rem_info, sizeof(mca_btl_mvapi_rem_info_t));
size_t cnt = 1;
rc = orte_dps.unpack(buffer, &endpoint->rem_qp_num_high, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.unpack(buffer, &endpoint->rem_qp_num_low, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.unpack(buffer, &endpoint->rem_lid, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
BTL_VERBOSE(("Received High Priority QP num = %d, Low Priority QP num %d, LID = %d",
endpoint->rem_qp_num_high,
endpoint->rem_qp_num_low,
endpoint->rem_lid));
BTL_VERBOSE(("Setting High Priority QP num = %d, Low Priority QP num %d, LID = %d",
endpoint->rem_info.rem_qp_num_high,
endpoint->rem_info.rem_qp_num_low,
endpoint->rem_info.rem_lid));
return ORTE_SUCCESS;
}
@ -345,7 +337,7 @@ static int mca_btl_mvapi_endpoint_start_connect(mca_btl_base_endpoint_t* endpoin
/* Send connection info over to remote endpoint */
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING;
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_send_connect_req(endpoint))) {
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_send_connect_data(endpoint))) {
BTL_ERROR(("error sending connect request, error code %d", rc));
return rc;
}
@ -356,7 +348,8 @@ static int mca_btl_mvapi_endpoint_start_connect(mca_btl_base_endpoint_t* endpoin
* Reply to a `start - connect' message
*
*/
static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t *endpoint, orte_buffer_t* buffer)
static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t *endpoint,
mca_btl_mvapi_rem_info_t* rem_info)
{
int rc;
@ -397,7 +390,7 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t *
/* Set the remote side info */
mca_btl_mvapi_endpoint_set_remote_info(endpoint, buffer);
mca_btl_mvapi_endpoint_set_remote_info(endpoint, rem_info);
/* Connect to endpoint */
@ -408,7 +401,7 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t *
}
/* Send connection info over to remote endpoint */
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_send_connect_req(endpoint))) {
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_send_connect_data(endpoint))) {
BTL_ERROR(("error in endpoint send connect request error code is %d", rc));
return rc;
}
@ -448,6 +441,41 @@ static void mca_btl_mvapi_endpoint_recv(
mca_btl_mvapi_endpoint_t *ib_endpoint;
int endpoint_state;
int rc;
uint32_t i;
size_t cnt = 1;
mca_btl_mvapi_rem_info_t rem_info;
/* start by unpacking data first so we know who is knocking at
our door */
rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_high, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return;
}
rc = orte_dps.unpack(buffer, &rem_info.rem_qp_num_low, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return;
}
rc = orte_dps.unpack(buffer, &rem_info.rem_lid, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return;
}
rc = orte_dps.unpack(buffer, &rem_info.rem_subnet, &cnt, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return;
}
BTL_VERBOSE(("Received High Priority QP num = %d, Low Priority QP num %d, LID = %d",
rem_info.rem_qp_num_high,
rem_info.rem_qp_num_low,
rem_info.rem_lid));
for(ib_proc = (mca_btl_mvapi_proc_t*)
opal_list_get_first(&mca_btl_mvapi_component.ib_procs);
@ -456,14 +484,55 @@ static void mca_btl_mvapi_endpoint_recv(
ib_proc = (mca_btl_mvapi_proc_t*)opal_list_get_next(ib_proc)) {
if(ib_proc->proc_guid.vpid == endpoint->vpid) {
bool found = false;
/* ib_endpoint = ib_proc->proc_endpoints[0] */
/* Try to get the endpoint instance of this proc */
/* first match the endpoint based on lid meaning we've seen */
/* this endpoint before.. */
for(i = 0; i < ib_proc->proc_endpoint_count; i++) {
mca_btl_mvapi_port_info_t port_info;
port_info = ib_proc->proc_ports[i];
ib_endpoint = ib_proc->proc_endpoints[i];
if(ib_endpoint->rem_info.rem_lid &&
ib_endpoint->rem_info.rem_lid == rem_info.rem_lid) {
/* we've seen them before! */
found = true;
break;
}
}
/* If we haven't seen this remote lid before then try to match on
endpoint */
for(i = 0; !found && i < ib_proc->proc_endpoint_count; i++) {
mca_btl_mvapi_port_info_t port_info;
port_info = ib_proc->proc_ports[i];
ib_endpoint = ib_proc->proc_endpoints[i];
if(!ib_endpoint->rem_info.rem_lid &&
ib_endpoint->rem_info.rem_subnet == rem_info.rem_subnet) {
/* found a match based on subnet! */
found = true;
break;
}
}
/* try finding an open port, even if subnets
don't match
*/
for(i = 0; !found && i < ib_proc->proc_endpoint_count; i++) {
mca_btl_mvapi_port_info_t port_info;
port_info = ib_proc->proc_ports[i];
ib_endpoint = ib_proc->proc_endpoints[i];
if(!ib_endpoint->rem_info.rem_lid) {
/* found an unused end-point */
found = true;
break;
}
}
/* Limitation: Right now, we have only 1 endpoint
* for every process. Need several changes, some
* in PML/BTL interface to set this right */
ib_endpoint = ib_proc->proc_endpoints[0];
if(!found) {
BTL_ERROR(("can't find suitable endpoint for this peer\n"));
return;
}
endpoint_state = ib_endpoint->endpoint_state;
/* Update status */
@ -474,7 +543,7 @@ static void mca_btl_mvapi_endpoint_recv(
* status of this connection to CONNECTING,
* and then reply with our QP information */
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_reply_start_connect(ib_endpoint, buffer))) {
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_reply_start_connect(ib_endpoint, &rem_info))) {
BTL_ERROR(("error in endpoint reply start connect"));
break;
}
@ -485,7 +554,7 @@ static void mca_btl_mvapi_endpoint_recv(
case MCA_BTL_IB_CONNECTING :
mca_btl_mvapi_endpoint_set_remote_info(ib_endpoint, buffer);
mca_btl_mvapi_endpoint_set_remote_info(ib_endpoint, &rem_info);
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_connect(ib_endpoint))) {
BTL_ERROR(("endpoint connect error: %d", rc));
break;
@ -495,7 +564,7 @@ static void mca_btl_mvapi_endpoint_recv(
mca_btl_mvapi_endpoint_waiting_ack(ib_endpoint);
/* Send him an ack */
mca_btl_mvapi_endpoint_send_connect_ack(ib_endpoint);
mca_btl_mvapi_endpoint_send_connect_data(ib_endpoint);
break;
case MCA_BTL_IB_WAITING_ACK:
@ -505,7 +574,7 @@ static void mca_btl_mvapi_endpoint_recv(
case MCA_BTL_IB_CONNECT_ACK:
mca_btl_mvapi_endpoint_connected(ib_endpoint);
mca_btl_mvapi_endpoint_send_connect_ack(ib_endpoint);
mca_btl_mvapi_endpoint_send_connect_data(ib_endpoint);
break;
case MCA_BTL_IB_CONNECTED :
@ -651,15 +720,15 @@ int mca_btl_mvapi_endpoint_connect(
rc = mca_btl_mvapi_endpoint_qp_init_query(endpoint->endpoint_btl,
endpoint->endpoint_btl->nic,
endpoint->lcl_qp_hndl_high,
endpoint->rem_qp_num_high,
endpoint->rem_lid,
endpoint->rem_info.rem_qp_num_high,
endpoint->rem_info.rem_lid,
endpoint->endpoint_btl->port_id);
rc = mca_btl_mvapi_endpoint_qp_init_query(endpoint->endpoint_btl,
endpoint->endpoint_btl->nic,
endpoint->lcl_qp_hndl_low,
endpoint->rem_qp_num_low,
endpoint->rem_lid,
endpoint->rem_info.rem_qp_num_low,
endpoint->rem_info.rem_lid,
endpoint->endpoint_btl->port_id);
@ -699,6 +768,7 @@ int mca_btl_mvapi_endpoint_create_qp(
VAPI_ret_t ret;
VAPI_qp_init_attr_t qp_init_attr;
VAPI_qp_init_attr_ext_t qp_init_attr_ext;
switch(transport_type) {
case VAPI_TS_RC: /* Set up RC qp parameters */

Просмотреть файл

@ -38,10 +38,10 @@ OBJ_CLASS_DECLARATION(mca_btl_mvapi_endpoint_t);
struct mca_btl_mvapi_frag_t;
struct mca_btl_mvapi_addr_t {
struct mca_btl_mvapi_port_info_t {
uint32_t subnet;
};
typedef struct mca_btl_mvapi_addr_t mca_btl_mvapi_addr_t;
typedef struct mca_btl_mvapi_port_info_t mca_btl_mvapi_port_info_t;
/**
@ -72,6 +72,24 @@ typedef enum {
MCA_BTL_IB_FAILED
} mca_btl_mvapi_endpoint_state_t;
struct mca_btl_mvapi_rem_info_t {
VAPI_qp_num_t rem_qp_num_high;
/* High priority remote side QP number */
VAPI_qp_num_t rem_qp_num_low;
/* Low prioirty remote size QP number */
IB_lid_t rem_lid;
/* Local identifier of the remote process */
uint32_t rem_subnet;
/* subnet of remote process */
} ;
typedef struct mca_btl_mvapi_rem_info_t mca_btl_mvapi_rem_info_t;
/**
* An abstraction that represents a connection to a endpoint process.
* An instance of mca_btl_base_endpoint_t is associated w/ each process
@ -119,14 +137,7 @@ struct mca_btl_base_endpoint_t {
uint32_t wr_sq_tokens_lp;
/**< number of low priority frags that can be outstanding (down counter) */
VAPI_qp_num_t rem_qp_num_high;
/* High priority remote side QP number */
VAPI_qp_num_t rem_qp_num_low;
/* Low prioirty remote size QP number */
IB_lid_t rem_lid;
/* Local identifier of the remote process */
mca_btl_mvapi_rem_info_t rem_info;
VAPI_qp_hndl_t lcl_qp_hndl_high;
/* High priority local QP handle */
@ -144,13 +155,12 @@ struct mca_btl_base_endpoint_t {
uint32_t rr_posted_high; /**< number of high priority rr posted to the nic*/
uint32_t rr_posted_low; /**< number of low priority rr posted to the nic*/
mca_btl_mvapi_addr_t endpoint_addr;
uint32_t subnet;
};
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
typedef mca_btl_base_endpoint_t mca_btl_mvapi_endpoint_t;
int mca_btl_mvapi_endpoint_send(mca_btl_base_endpoint_t* endpoint, struct mca_btl_mvapi_frag_t* frag);
int mca_btl_mvapi_endpoint_connect(mca_btl_base_endpoint_t*);
void mca_btl_mvapi_post_recv(void);
@ -229,84 +239,14 @@ void mca_btl_mvapi_progress_send_frags(mca_btl_mvapi_endpoint_t*);
}\
}
/* static inline int mca_btl_mvapi_endpoint_post_rr_sub(int cnt, */
/* mca_btl_mvapi_endpoint_t* endpoint, */
/* ompi_free_list_t* frag_list, */
/* uint32_t* rr_posted, */
/* VAPI_hca_hndl_t nic, */
/* VAPI_qp_hndl_t qp_hndl */
/* ) */
/* { */
/* int rc, i; */
/* opal_list_item_t* item; */
/* mca_btl_mvapi_frag_t* frag; */
/* mca_btl_mvapi_module_t *mvapi_btl = endpoint->endpoint_btl; */
/* VAPI_rr_desc_t* rr_desc_post = mvapi_btl->rr_desc_post; */
/* /\* prepare frags and post receive requests *\/ */
/* for(i = 0; i < cnt; i++) { */
/* OMPI_FREE_LIST_WAIT(frag_list, item, rc); */
/* frag = (mca_btl_mvapi_frag_t*) item; */
/* frag->endpoint = endpoint; */
/* frag->sg_entry.len = frag->size + ((unsigned char*) frag->segment.seg_addr.pval- (unsigned char*) frag->hdr); /\* sizeof(mca_btl_mvapi_header_t); *\/ */
/* rr_desc_post[i] = frag->rr_desc; */
/* } */
/* frag->ret = EVAPI_post_rr_list(nic, */
/* qp_hndl, */
/* cnt, */
/* rr_desc_post); */
/* if(VAPI_OK != frag->ret) { */
/* BTL_ERROR(("error posting receive descriptors: %s", VAPI_strerror(frag->ret))); */
/* return OMPI_ERROR; */
/* } */
/* OPAL_THREAD_ADD32(rr_posted, cnt); */
/* return OMPI_SUCCESS; */
/* } */
/* static inline int mca_btl_mvapi_endpoint_post_rr( mca_btl_mvapi_endpoint_t * endpoint, int additional){ */
/* mca_btl_mvapi_module_t * mvapi_btl = endpoint->endpoint_btl; */
/* int rc; */
/* OPAL_THREAD_LOCK(&endpoint->ib_lock); */
/* if(endpoint->rr_posted_high <= mca_btl_mvapi_component.ib_rr_buf_min+additional && endpoint->rr_posted_high < mca_btl_mvapi_component.ib_rr_buf_max){ */
/* rc = mca_btl_mvapi_endpoint_post_rr_sub(mca_btl_mvapi_component.ib_rr_buf_max - endpoint->rr_posted_high, */
/* endpoint, */
/* &mvapi_btl->recv_free_eager, */
/* &endpoint->rr_posted_high, */
/* mvapi_btl->nic, */
/* endpoint->lcl_qp_hndl_high */
/* ); */
/* if(rc != OMPI_SUCCESS){ */
/* OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); */
/* return rc; */
/* } */
/* } */
/* if(endpoint->rr_posted_low <= mca_btl_mvapi_component.ib_rr_buf_min+additional && endpoint->rr_posted_low < mca_btl_mvapi_component.ib_rr_buf_max){ */
/* rc = mca_btl_mvapi_endpoint_post_rr_sub(mca_btl_mvapi_component.ib_rr_buf_max - endpoint->rr_posted_low, */
/* endpoint, */
/* &mvapi_btl->recv_free_max, */
/* &endpoint->rr_posted_low, */
/* mvapi_btl->nic, */
/* endpoint->lcl_qp_hndl_low */
/* ); */
/* if(rc != OMPI_SUCCESS) { */
/* OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); */
/* return rc; */
/* } */
/* } */
/* OPAL_THREAD_UNLOCK(&mvapi_btl->ib_lock); */
/* return OMPI_SUCCESS; */
/* } */
#define BTL_MVAPI_INSERT_PENDING(frag, frag_list, tokens, rc) \
{ \
do{ \
opal_list_append(&frag_list, (opal_list_item_t *)frag); \
OPAL_THREAD_ADD32(&tokens, 1); \
rc = OMPI_SUCCESS; \
} while(0); \
}
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -32,7 +32,7 @@ OBJ_CLASS_INSTANCE(mca_btl_mvapi_proc_t,
void mca_btl_mvapi_proc_construct(mca_btl_mvapi_proc_t* proc)
{
proc->proc_ompi = 0;
proc->proc_addr_count = 0;
proc->proc_port_count = 0;
proc->proc_endpoints = 0;
proc->proc_endpoint_count = 0;
OBJ_CONSTRUCT(&proc->proc_lock, opal_mutex_t);
@ -131,7 +131,7 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc)
rc = mca_pml_base_modex_recv(
&mca_btl_mvapi_component.super.btl_version,
ompi_proc,
(void*)&mvapi_proc->proc_addrs,
(void*)&mvapi_proc->proc_ports,
&size
);
@ -144,7 +144,7 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc)
return NULL;
}
if((size % sizeof(mca_btl_mvapi_addr_t)) != 0) {
if((size % sizeof(mca_btl_mvapi_port_info_t)) != 0) {
opal_output(0, "[%s:%d] invalid mvapi address for peer [%d,%d,%d]",
__FILE__,__LINE__,ORTE_NAME_ARGS(&ompi_proc->proc_name));
OBJ_RELEASE(mvapi_proc);
@ -152,16 +152,11 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc)
}
mvapi_proc->proc_addr_count = size/sizeof(mca_btl_mvapi_addr_t);
mvapi_proc->proc_port_count = size/sizeof(mca_btl_mvapi_port_info_t);
/* XXX: Right now, there can be only 1 peer associated
* with a proc. Needs a little bit change in
* mca_btl_mvapi_proc_t to allow on demand increasing of
* number of endpoints for this proc */
mvapi_proc->proc_endpoints = (mca_btl_base_endpoint_t**)
malloc(mvapi_proc->proc_addr_count * sizeof(mca_btl_base_endpoint_t*));
malloc(mvapi_proc->proc_port_count * sizeof(mca_btl_base_endpoint_t*));
if(NULL == mvapi_proc->proc_endpoints) {
OBJ_RELEASE(mvapi_proc);
@ -179,14 +174,12 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc)
int mca_btl_mvapi_proc_insert(mca_btl_mvapi_proc_t* mvapi_proc,
mca_btl_base_endpoint_t* mvapi_endpoint)
{
mca_btl_mvapi_module_t* mvapi_btl = mvapi_endpoint->endpoint_btl;
/* insert into endpoint array */
if(mvapi_proc->proc_addr_count <= mvapi_proc->proc_endpoint_count)
if(mvapi_proc->proc_port_count <= mvapi_proc->proc_endpoint_count)
return OMPI_ERR_OUT_OF_RESOURCE;
mvapi_endpoint->endpoint_proc = mvapi_proc;
mvapi_endpoint->endpoint_addr = mvapi_proc->proc_addrs[mvapi_proc->proc_endpoint_count];
mvapi_proc->proc_endpoints[mvapi_proc->proc_endpoint_count++] = mvapi_endpoint;
return OMPI_SUCCESS;

Просмотреть файл

@ -44,9 +44,9 @@ struct mca_btl_mvapi_proc_t {
orte_process_name_t proc_guid;
/**< globally unique identifier for the process */
struct mca_btl_mvapi_addr_t* proc_addrs;
size_t proc_addr_count;
/**< number of addresses published by endpoint */
struct mca_btl_mvapi_port_info_t* proc_ports;
size_t proc_port_count;
/**< number of ports published by endpoint */
struct mca_btl_base_endpoint_t **proc_endpoints;
/**< array of endpoints that have been created to access this proc */