1
1

Fixed a race in connection establishment..

This commit was SVN r7110.
Этот коммит содержится в:
Galen Shipman 2005-08-31 19:43:22 +00:00
родитель c6054662d5
Коммит 09873f299f
4 изменённых файлов: 93 добавлений и 62 удалений

Просмотреть файл

@ -117,8 +117,8 @@ struct mca_btl_mvapi_component_t {
uint32_t ib_service_level;
uint32_t ib_static_rate;
uint32_t ib_src_path_bits;
/* number of send tokes available */
uint32_t max_send_tokens;
}; typedef struct mca_btl_mvapi_component_t mca_btl_mvapi_component_t;
@ -176,6 +176,13 @@ struct mca_btl_mvapi_module_t {
/**< an array to allow posting of rr in one swoop */
size_t ib_inline_max; /**< max size of inline send*/
/* number of outstanding send's */
uint32_t send_tokens;
opal_list_t pending_send_frags;
/**< list of pending send frags for this endpoint */
}; typedef struct mca_btl_mvapi_module_t mca_btl_mvapi_module_t;

Просмотреть файл

@ -35,6 +35,7 @@
#include <vapi_common.h>
#include "datatype/convertor.h"
#include "mca/mpool/mvapi/mpool_mvapi.h"
#include "btl_mvapi_endpoint.h"
mca_btl_mvapi_component_t mca_btl_mvapi_component = {
{
@ -138,7 +139,7 @@ int mca_btl_mvapi_component_open(void)
10000);
mca_btl_mvapi_component.ib_sg_list_size =
mca_btl_mvapi_param_register_int("ib_sg_list_size",
1);
16);
mca_btl_mvapi_component.ib_pkey_ix =
mca_btl_mvapi_param_register_int("ib_pkey_ix",
0);
@ -147,16 +148,16 @@ int mca_btl_mvapi_component_open(void)
0);
mca_btl_mvapi_component.ib_qp_ous_rd_atom =
mca_btl_mvapi_param_register_int("ib_qp_ous_rd_atom",
1);
4);
mca_btl_mvapi_component.ib_mtu =
mca_btl_mvapi_param_register_int("ib_mtu",
MTU1024);
mca_btl_mvapi_component.ib_min_rnr_timer =
mca_btl_mvapi_param_register_int("ib_min_rnr_timer",
5);
24);
mca_btl_mvapi_component.ib_timeout =
mca_btl_mvapi_param_register_int("ib_timeout",
10);
10);
mca_btl_mvapi_component.ib_retry_count =
mca_btl_mvapi_param_register_int("ib_retry_count",
7);
@ -206,7 +207,14 @@ int mca_btl_mvapi_component_open(void)
param = mca_base_param_find("mpi", NULL, "leave_pinned");
mca_base_param_lookup_int(param, &value);
mca_btl_mvapi_component.leave_pinned = value;
mca_base_param_reg_int(&mca_btl_mvapi_component.super.btl_version,
"max_send_tokens",
"Maximum number of send tokens",
false,
false,
16,
&(mca_btl_mvapi_component.max_send_tokens));
mca_btl_mvapi_component.max_send_size = mca_btl_mvapi_module.super.btl_max_send_size;
mca_btl_mvapi_component.eager_limit = mca_btl_mvapi_module.super.btl_eager_limit;
@ -378,9 +386,6 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules,
for(i = 0; i < mca_btl_mvapi_component.ib_num_btls; i++){
/* uint16_t tbl_len_in = 0; */
/* uint16_t tbl_len_out = 0; */
/* IB_gid_t *gid_tbl_p = NULL; */
item = opal_list_remove_first(&btl_list);
ib_selected = (mca_btl_base_selected_module_t*)item;
@ -408,42 +413,13 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules,
OBJ_CONSTRUCT(&mvapi_btl->repost, opal_list_t);
OBJ_CONSTRUCT(&mvapi_btl->reg_mru_list, opal_list_t);
OBJ_CONSTRUCT(&mvapi_btl->pending_send_frags, opal_list_t);
if(mca_btl_mvapi_module_init(mvapi_btl) != OMPI_SUCCESS) {
free(hca_ids);
return NULL;
}
/* vapi_ret = VAPI_query_hca_gid_tbl(mvapi_btl->nic, */
/* mvapi_btl->port_id, */
/* tbl_len_in, */
/* &tbl_len_out, */
/* gid_tbl_p); */
/* if(OMPI_SUCCESS != vapi_ret) { */
/* BTL_ERROR(("error querying gid table to obtain subnet mask")); */
/* return NULL; */
/* } */
/* if(tbl_len_out == 0) { */
/* BTL_ERROR(("error querying gid table, table length 0!")); */
/* return NULL; */
/* } */
/* tbl_len_in = tbl_len_out; */
/* gid_tbl_p = (IB_gid_t*) malloc(tbl_len_out * sizeof(IB_gid_t*)); */
/* vapi_ret = VAPI_query_hca_gid_tbl(mvapi_btl->nic, */
/* mvapi_btl->port_id, */
/* tbl_len_in, */
/* &tbl_len_out, */
/* gid_tbl_p); */
/* if(OMPI_SUCCESS != vapi_ret) { */
/* BTL_ERROR(("error querying gid table to obtain subnet mask")); */
/* return NULL; */
/* } */
/* /\* first 64 bits of the first gid entry should be the subnet mask *\/ */
/* memcpy(&mvapi_btl->mvapi_addr.subnet, &gid_tbl_p[0], 8); */
hca_pd.hca = mvapi_btl->nic;
hca_pd.pd_tag = mvapi_btl->ptag;
@ -527,6 +503,8 @@ mca_btl_base_module_t** mca_btl_mvapi_component_init(int *num_btl_modules,
/* Initialize the rr_desc_post array for posting of rr*/
mvapi_btl->rr_desc_post = (VAPI_rr_desc_t*) malloc((mca_btl_mvapi_component.ib_rr_buf_max * sizeof(VAPI_rr_desc_t)));
mvapi_btl->send_tokens = mca_btl_mvapi_component.max_send_tokens;
btls[i] = &mvapi_btl->super;
}
@ -574,16 +552,31 @@ int mca_btl_mvapi_component_progress()
case VAPI_CQE_RQ_RDMA_WITH_IMM:
BTL_ERROR(("Got an RDMA with Immediate data!, not supported!"));
return OMPI_ERROR;
case VAPI_CQE_SQ_SEND_DATA :
mvapi_btl->send_tokens++;
/* fall through */
case VAPI_CQE_SQ_RDMA_READ:
case VAPI_CQE_SQ_RDMA_WRITE:
case VAPI_CQE_SQ_SEND_DATA :
/* Process a completed send or an rdma write */
frag = (mca_btl_mvapi_frag_t*) comp.id;
frag->rc = OMPI_SUCCESS;
frag->base.des_cbfunc(&mvapi_btl->super, frag->endpoint, &frag->base, frag->rc);
count++;
/* check and see if we need to progress pending sends */
if(mvapi_btl->send_tokens && !opal_list_is_empty(&(mvapi_btl->pending_send_frags))) {
opal_list_item_t *frag_item;
frag_item = opal_list_remove_first(&(mvapi_btl->pending_send_frags));
frag = (mca_btl_mvapi_frag_t *) frag_item;
if(OMPI_SUCCESS != mca_btl_mvapi_endpoint_send(frag->endpoint, frag)) {
BTL_ERROR(("error in posting pending send\n"));
}
}
break;
case VAPI_CQE_RQ_SEND_DATA:

Просмотреть файл

@ -389,6 +389,9 @@ static int mca_btl_mvapi_endpoint_reply_start_connect(mca_btl_mvapi_endpoint_t *
return OMPI_SUCCESS;
}
static void mca_btl_mvapi_endpoint_waiting_ack(mca_btl_mvapi_endpoint_t *endpoint) {
endpoint->endpoint_state = MCA_BTL_IB_WAITING_ACK;
}
/*
*
*/
@ -453,26 +456,30 @@ static void mca_btl_mvapi_endpoint_recv(
/* Setup state as connected */
ib_endpoint->endpoint_state = MCA_BTL_IB_CONNECT_ACK;
break;
case MCA_BTL_IB_CONNECTING :
mca_btl_mvapi_endpoint_set_remote_info(ib_endpoint, buffer);
if(OMPI_SUCCESS != (rc = mca_btl_mvapi_endpoint_connect(ib_endpoint))) {
BTL_ERROR(("endpoint connect error: %d", rc));
break;
}
/* Setup state as connected */
mca_btl_mvapi_endpoint_connected(ib_endpoint);
/* Setup state as awaiting ack from peer */
mca_btl_mvapi_endpoint_waiting_ack(ib_endpoint);
/* Send him an ack */
mca_btl_mvapi_endpoint_send_connect_ack(ib_endpoint);
break;
case MCA_BTL_IB_WAITING_ACK:
mca_btl_mvapi_endpoint_connected(ib_endpoint);
break;
case MCA_BTL_IB_CONNECT_ACK:
mca_btl_mvapi_endpoint_connected(ib_endpoint);
mca_btl_mvapi_endpoint_send_connect_ack(ib_endpoint);
break;
case MCA_BTL_IB_CONNECTED :
@ -526,7 +533,7 @@ int mca_btl_mvapi_endpoint_send(
rc = OMPI_SUCCESS;
break;
case MCA_BTL_IB_WAITING_ACK:
case MCA_BTL_IB_CONNECT_ACK:
BTL_VERBOSE(("Queuing because waiting for ack"));
@ -536,6 +543,9 @@ int mca_btl_mvapi_endpoint_send(
rc = OMPI_SUCCESS;
break;
case MCA_BTL_IB_CLOSED:
@ -555,19 +565,30 @@ int mca_btl_mvapi_endpoint_send(
case MCA_BTL_IB_CONNECTED:
{
mvapi_btl = endpoint->endpoint_btl;
if(0 == mvapi_btl->send_tokens) {
BTL_VERBOSE(("Queing because no send tokens \n"));
opal_list_append(&mvapi_btl->pending_send_frags,
(opal_list_item_t *)frag);
rc = OMPI_SUCCESS;
} else {
BTL_VERBOSE(("Send to : %d, len : %d, frag : %p",
endpoint->endpoint_proc->proc_guid.vpid,
frag->sg_entry.len,
frag));
rc = mca_btl_mvapi_endpoint_post_send(mvapi_btl, endpoint, frag);
mvapi_btl->send_tokens--;
BTL_VERBOSE(("Send to : %d, len : %d, frag : %p",
endpoint->endpoint_proc->proc_guid.vpid,
frag->sg_entry.len,
frag));
rc = mca_btl_mvapi_endpoint_post_send(mvapi_btl, endpoint, frag);
}
break;
}
default:
rc = OMPI_ERR_UNREACH;
}
@ -694,14 +715,21 @@ int mca_btl_mvapi_endpoint_create_qp(
return OMPI_ERR_NOT_IMPLEMENTED;
}
qp_init_attr_ext.srq_hndl = srq_hndl;
ret = VAPI_create_qp_ext(nic,
if(mca_btl_mvapi_component.use_srq) {
qp_init_attr_ext.srq_hndl = srq_hndl;
ret = VAPI_create_qp_ext(nic,
&qp_init_attr,
&qp_init_attr_ext,
qp_hndl,
qp_prop);
}
else {
ret = VAPI_create_qp(nic,
&qp_init_attr,
&qp_init_attr_ext,
qp_hndl,
qp_prop);
qp_prop);
}
if(VAPI_OK != ret) {
BTL_ERROR(("error creating the queue pair: %s", VAPI_strerror(ret)));
return OMPI_ERROR;

Просмотреть файл

@ -55,6 +55,9 @@ typedef enum {
/* Waiting for ack from endpoint */
MCA_BTL_IB_CONNECT_ACK,
/*Waiting for final connection ACK from endpoint */
MCA_BTL_IB_WAITING_ACK,
/* Connected ... both sender & receiver have
* buffers associated with this connection */