1
1
This patch adds some much needed comments, reduces the amount of code
wrapping, and rearrges and removes redundant code.

This commit was SVN r18417.
Этот коммит содержится в:
Jon Mason 2008-05-08 21:20:12 +00:00
родитель da2f1c58e2
Коммит 99ab66e131

Просмотреть файл

@ -44,7 +44,6 @@
static void rdmacm_component_register(void); static void rdmacm_component_register(void);
static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl,
ompi_btl_openib_connect_base_module_t **cpc); ompi_btl_openib_connect_base_module_t **cpc);
static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc, static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *endpoint); mca_btl_base_endpoint_t *endpoint);
static int rdmacm_component_destroy(void); static int rdmacm_component_destroy(void);
@ -75,7 +74,6 @@ struct message {
struct rdmacm_endpoint_local_cpc_data { struct rdmacm_endpoint_local_cpc_data {
int rdmacm_counter; int rdmacm_counter;
opal_mutex_t endpoint_state_lock;
}; };
struct id_contexts { struct id_contexts {
@ -96,8 +94,7 @@ struct list_item {
}; };
typedef struct list_item list_item_t; typedef struct list_item list_item_t;
static OBJ_CLASS_INSTANCE(list_item_t, opal_list_item_t, static OBJ_CLASS_INSTANCE(list_item_t, opal_list_item_t, NULL, NULL);
NULL, NULL);
static opal_list_t server_list; static opal_list_t server_list;
static opal_list_t client_list; static opal_list_t client_list;
@ -143,7 +140,15 @@ static void rdmacm_component_register(void)
} }
} }
static mca_btl_openib_endpoint_t *rdmacm_find_endpoint(struct rdmacm_contents *local, struct rdma_cm_id *id, uint16_t rem_port) /* This function traverses the list of endpoints associated with the hca
* and determines which of them the remote side is attempting to connect
* to. This is determined based on the local endpoint's modex message
* recevied and the IP address and port associated with the rdma_cm
* event id
*/
static mca_btl_openib_endpoint_t *rdmacm_find_endpoint(struct rdmacm_contents *local,
struct rdma_cm_id *id,
uint16_t rem_port)
{ {
int i; int i;
mca_btl_openib_endpoint_t *ep = NULL; mca_btl_openib_endpoint_t *ep = NULL;
@ -178,10 +183,10 @@ static mca_btl_openib_endpoint_t *rdmacm_find_endpoint(struct rdmacm_contents *l
return ep; return ep;
} }
static void rdmacm_cleanup(struct rdmacm_contents *local, struct rdma_cm_id *id, uint32_t num) static void rdmacm_cleanup(struct rdmacm_contents *local,
struct rdma_cm_id *id,
uint32_t num)
{ {
BTL_VERBOSE(("Calling cleanup for %s, qp = %d", local->server?"server":"client", num));
if (NULL == id) if (NULL == id)
return; return;
@ -194,7 +199,10 @@ static void rdmacm_cleanup(struct rdmacm_contents *local, struct rdma_cm_id *id,
} }
} }
static int rdmacm_setup_qp(struct rdmacm_contents *local, mca_btl_openib_endpoint_t *endpoint, struct rdma_cm_id *id, int qpnum) static int rdmacm_setup_qp(struct rdmacm_contents *local,
mca_btl_openib_endpoint_t *endpoint,
struct rdma_cm_id *id,
int qpnum)
{ {
struct ibv_qp_init_attr attr; struct ibv_qp_init_attr attr;
struct ibv_qp *qp; struct ibv_qp *qp;
@ -219,7 +227,6 @@ static int rdmacm_setup_qp(struct rdmacm_contents *local, mca_btl_openib_endpoin
max_send_wr = mca_btl_openib_component.qp_infos[qpnum].u.srq_qp.sd_max + credits; max_send_wr = mca_btl_openib_component.qp_infos[qpnum].u.srq_qp.sd_max + credits;
} }
/* create the qp via rdma_create_qp() */
memset(&attr, 0, sizeof(attr)); memset(&attr, 0, sizeof(attr));
attr.qp_type = IBV_QPT_RC; attr.qp_type = IBV_QPT_RC;
attr.send_cq = local->openib_btl->hca->ib_cq[BTL_OPENIB_LP_CQ]; attr.send_cq = local->openib_btl->hca->ib_cq[BTL_OPENIB_LP_CQ];
@ -239,57 +246,68 @@ static int rdmacm_setup_qp(struct rdmacm_contents *local, mca_btl_openib_endpoin
endpoint->qps[qpnum].qp->lcl_qp = qp; endpoint->qps[qpnum].qp->lcl_qp = qp;
id->qp = qp; id->qp = qp;
/* After creating the qp, the driver will write the max_inline_data
* in the attributes. Update the btl with this data.
*/
local->openib_btl->ib_inline_max = attr.cap.max_inline_data; local->openib_btl->ib_inline_max = attr.cap.max_inline_data;
BTL_VERBOSE(("QP#%d is %p", qpnum, endpoint->qps[qpnum].qp->lcl_qp));
return 0; return 0;
out: out:
return 1; return 1;
} }
static int rdma_client_connect_one(struct rdmacm_contents *local, struct message *message, int num) static int rdma_client_connect_one(struct rdmacm_contents *local,
struct message *message,
int num)
{ {
struct sockaddr_in din; struct sockaddr_in din;
struct id_contexts *context;
int rc; int rc;
/* create rdma_cm_id */ /* We'll need to access some data in the event handler. We can
rc = rdma_create_id(event_channel, &local->id[num], (void *)(unsigned long)num, RDMA_PS_TCP); * encapsulate it in this data struct and attach it to the id being
if (0 != rc) { * created below. The event->id will contain this same pointer.
BTL_ERROR(("Failed to create a rdma id with %d", rc)); */
goto out; context = malloc(sizeof(struct id_contexts));
} if (NULL == context) {
local->id[num]->context = malloc(sizeof(struct id_contexts));
if (NULL == local->id[num]->context) {
BTL_ERROR(("malloc error")); BTL_ERROR(("malloc error"));
goto out; goto out;
} }
((struct id_contexts *)local->id[num]->context)->local = local; context->local = local;
((struct id_contexts *)local->id[num]->context)->qpnum = num; context->qpnum = num;
((struct id_contexts *)local->id[num]->context)->endpoint = local->endpoint; context->endpoint = local->endpoint;
BTL_VERBOSE(("Attemping to send to remote port %d ipaddr %x", message->port, message->ipaddr)); rc = rdma_create_id(event_channel, &local->id[num], context, RDMA_PS_TCP);
if (0 != rc) {
BTL_ERROR(("Failed to create a rdma id with %d", rc));
goto out1;
}
memset(&din, 0, sizeof(din)); memset(&din, 0, sizeof(din));
din.sin_family = AF_INET; din.sin_family = AF_INET;
din.sin_addr.s_addr = message->ipaddr; din.sin_addr.s_addr = message->ipaddr;
din.sin_port = message->port; din.sin_port = message->port;
/* resolve the remote address via rdma_resolve_addr() */ /* Once the route to the remote system is discovered, a
rc = rdma_resolve_addr(local->id[num], NULL, (struct sockaddr *)&din, RDMA_RESOLVE_ADDR_TIMEOUT); * RDMA_CM_EVENT_ADDR_RESOLVED event will occur on the local event
* handler.
*/
rc = rdma_resolve_addr(local->id[num],
NULL,
(struct sockaddr *)&din,
RDMA_RESOLVE_ADDR_TIMEOUT);
if (0 != rc) { if (0 != rc) {
BTL_ERROR(("Failed to resolve the remote address with %d", rc)); BTL_ERROR(("Failed to resolve the remote address with %d", rc));
goto out; goto out1;
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
out: out1:
rdmacm_cleanup(local, local->id[num], num); rdmacm_cleanup(local, local->id[num], num);
out:
return OMPI_ERROR; return OMPI_ERROR;
} }
@ -320,29 +338,29 @@ out:
return OMPI_ERROR; return OMPI_ERROR;
} }
/* Connect method called by the upper layers to connect the local
* endpoint to the remote endpoint by creating QP(s) to connect the two.
* Already holding endpoint lock when this function is called.
*/
static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc, static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cpc,
mca_btl_base_endpoint_t *endpoint) mca_btl_base_endpoint_t *endpoint)
{ {
int rc;
struct rdmacm_contents *client; struct rdmacm_contents *client;
struct message *message = (struct message *)endpoint->endpoint_remote_cpc_data->cbm_modex_message; struct message *message;
opal_mutex_t *endpoint_state_lock = &((struct rdmacm_endpoint_local_cpc_data *)endpoint->endpoint_local_cpc_data)->endpoint_state_lock; int rc;
/*Already holding endpoint lock when this is called */ message = (struct message *)endpoint->endpoint_remote_cpc_data->cbm_modex_message;
OPAL_THREAD_LOCK(endpoint_state_lock);
BTL_VERBOSE(("remote ip addr = %x, port = %d ep state = %d", message->ipaddr, message->port, endpoint->endpoint_state)); BTL_VERBOSE(("Connecting to remote ip addr = %x, port = %d ep state = %d",
BTL_VERBOSE(("%p local message = %x %d", endpoint->endpoint_local_cpc->data.cbm_modex_message, ((struct message *)endpoint->endpoint_local_cpc->data.cbm_modex_message)->ipaddr, ((struct message *)endpoint->endpoint_local_cpc->data.cbm_modex_message)->port)); message->ipaddr, message->port, endpoint->endpoint_state));
if (MCA_BTL_IB_CONNECTED == endpoint->endpoint_state || if (MCA_BTL_IB_CONNECTED == endpoint->endpoint_state ||
MCA_BTL_IB_CONNECTING == endpoint->endpoint_state || MCA_BTL_IB_CONNECTING == endpoint->endpoint_state ||
MCA_BTL_IB_CONNECT_ACK == endpoint->endpoint_state) { MCA_BTL_IB_CONNECT_ACK == endpoint->endpoint_state) {
OPAL_THREAD_UNLOCK(endpoint_state_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
endpoint->endpoint_state = MCA_BTL_IB_CONNECT_ACK; endpoint->endpoint_state = MCA_BTL_IB_CONNECT_ACK;
OPAL_THREAD_UNLOCK(endpoint_state_lock);
client = calloc(1, sizeof(struct rdmacm_contents)); client = calloc(1, sizeof(struct rdmacm_contents));
if (NULL == client) { if (NULL == client) {
@ -353,6 +371,12 @@ static int rdmacm_module_start_connect(ompi_btl_openib_connect_base_module_t *cp
client->openib_btl = endpoint->endpoint_btl; client->openib_btl = endpoint->endpoint_btl;
client->endpoint = endpoint; client->endpoint = endpoint;
client->server = false; client->server = false;
/* Populate the port information with the local port the server is
* listening on instead of the ephemerial port this client is
* connecting with. This port is used to determine which endpoint
* is being connected from, in the isntance where there are
* multiple listeners on the local system.
*/
client->port = ((struct message *)endpoint->endpoint_local_cpc->data.cbm_modex_message)->port; client->port = ((struct message *)endpoint->endpoint_local_cpc->data.cbm_modex_message)->port;
rc = rdma_client_connect(client, message); rc = rdma_client_connect(client, message);
@ -367,46 +391,64 @@ out:
return OMPI_ERROR; return OMPI_ERROR;
} }
static int handle_connect_request(struct rdmacm_contents *local, struct rdma_cm_event *event) /* The server thread will handle the incoming connection requests and
* allow them or reject them based on a unidirectional connection
* method. The choonections are allowed based on the IP address and
* port values. This determination is arbitrary, but is uniform in
* allowing the connections only in 1 direction. If the connection in
* the requestion is disallowed by this rule, then the server will
* reject the connection and make its own in the proper direction.
*/
static int handle_connect_request(struct rdmacm_contents *local,
struct rdma_cm_event *event)
{ {
struct rdma_conn_param conn_param;
int rc = -1, qpnum;
struct message *message;
mca_btl_openib_endpoint_t *endpoint; mca_btl_openib_endpoint_t *endpoint;
opal_mutex_t *endpoint_state_lock; struct rdma_conn_param conn_param;
struct message *message;
struct conn_message msg; struct conn_message msg;
int rc = -1, qpnum;
uint32_t rem_index;
uint16_t rem_port; uint16_t rem_port;
qpnum = ((struct conn_message *)event->param.conn.private_data)->qpnum; qpnum = ((struct conn_message *)event->param.conn.private_data)->qpnum;
rem_port = ((struct conn_message *)event->param.conn.private_data)->rem_port; rem_port = ((struct conn_message *)event->param.conn.private_data)->rem_port;
rem_index = ((struct conn_message *)event->param.conn.private_data)->rem_index;
/* Determine which endpoint the remote side is trying to connect to */
endpoint = rdmacm_find_endpoint(local, event->id, rem_port); endpoint = rdmacm_find_endpoint(local, event->id, rem_port);
if (NULL == endpoint) { if (NULL == endpoint) {
BTL_ERROR(("Failed to find endpoint")); BTL_ERROR(("Failed to find endpoint"));
return -1; return -1;
} }
endpoint_state_lock = &((struct rdmacm_endpoint_local_cpc_data *)endpoint->endpoint_local_cpc_data)->endpoint_state_lock;
OPAL_THREAD_LOCK(endpoint_state_lock);
message = endpoint->endpoint_remote_cpc_data->cbm_modex_message; message = endpoint->endpoint_remote_cpc_data->cbm_modex_message;
BTL_VERBOSE(("ep state = %d, local ipaddr = %x, remote ipaddr = %x port %d", endpoint->endpoint_state, local->ipaddr, message->ipaddr, rem_port)); BTL_VERBOSE(("ep state = %d, local ipaddr = %x, remote ipaddr = %x port %d",
endpoint->endpoint_state, local->ipaddr, message->ipaddr, rem_port));
/* See if a race is occurring between the two sides attempting to connect to each other at the same time */
if ((local->ipaddr > message->ipaddr && local->port > rem_port) || if ((local->ipaddr > message->ipaddr && local->port > rem_port) ||
local->ipaddr > message->ipaddr) { local->ipaddr > message->ipaddr) {
int race = 1; int race = 1;
OPAL_THREAD_UNLOCK(endpoint_state_lock); BTL_VERBOSE(("Received a connect request from an endpoint in the wrong direction"));
BTL_VERBOSE(("No soup today.....back of line! ep state = %d", endpoint->endpoint_state));
/* This will cause a event on the remote system. By passing in
* a value in the second arg of rdma_reject, the remote side
* can check for this to know if it was an intentional reject or
* a reject based on an error.
*/
rc = rdma_reject(event->id, &race, sizeof(int)); rc = rdma_reject(event->id, &race, sizeof(int));
if (0 != rc) { if (0 != rc) {
BTL_ERROR(("rdma_reject failed %d", rc)); BTL_ERROR(("rdma_reject failed %d", rc));
goto out; goto out;
} }
/* If there are multiple QPs attempting to connect from the
* wrong direction, only make one call to
* rdmacm_module_start_connect to connect in the proper
* direction, as it will connect to the remote side with the
* correct number of QPs.
*/
if (0 == qpnum) { if (0 == qpnum) {
rdmacm_module_start_connect(NULL, endpoint); rdmacm_module_start_connect(NULL, endpoint);
} }
@ -414,13 +456,11 @@ static int handle_connect_request(struct rdmacm_contents *local, struct rdma_cm_
return 0; return 0;
} }
endpoint->endpoint_state = MCA_BTL_IB_CONNECTING; endpoint->rem_info.rem_index = rem_index;
OPAL_THREAD_UNLOCK(endpoint_state_lock);
endpoint->rem_info.rem_index = ((struct conn_message *)event->param.conn.private_data)->rem_index;
/* Setup QP for new connection */ /* Setup QP for new connection */
BTL_VERBOSE(("ACCEPTING src port = %d, dst port = %d, qpnum = %d", rdma_get_src_port(event->id), rdma_get_dst_port(event->id), qpnum)); BTL_VERBOSE(("ACCEPTING src port = %d, dst port = %d, qpnum = %d",
rdma_get_src_port(event->id), rdma_get_dst_port(event->id), qpnum));
rc = rdmacm_setup_qp(local, endpoint, event->id, qpnum); rc = rdmacm_setup_qp(local, endpoint, event->id, qpnum);
if (0 != rc) { if (0 != rc) {
@ -428,6 +468,11 @@ static int handle_connect_request(struct rdmacm_contents *local, struct rdma_cm_
goto out; goto out;
} }
/* Recvs must be posted prior to accepting the rdma connection.
* Otherwise, it is possible to get data before there are recvs to
* put it, which for iWARP will result in tearing down of the
* connection.
*/
if (BTL_OPENIB_QP_TYPE_PP(qpnum)) { if (BTL_OPENIB_QP_TYPE_PP(qpnum)) {
rc = mca_btl_openib_endpoint_post_rr(endpoint, qpnum); rc = mca_btl_openib_endpoint_post_rr(endpoint, qpnum);
} else { } else {
@ -438,15 +483,10 @@ static int handle_connect_request(struct rdmacm_contents *local, struct rdma_cm_
goto out1; goto out1;
} }
memset(&conn_param, 0, sizeof(conn_param)); /* Since the event id is already created, we cannot add this
conn_param.responder_resources = 1; * information in the normal way. Instead we must reference its
conn_param.initiator_depth = 1; * location and put the data there so that it can be access later.
conn_param.private_data = &msg; */
conn_param.private_data_len = sizeof(struct conn_message);
msg.qpnum = qpnum;
msg.rem_index = endpoint->index;
event->id->context = malloc(sizeof(struct id_contexts)); event->id->context = malloc(sizeof(struct id_contexts));
if (NULL == event->id->context) { if (NULL == event->id->context) {
BTL_ERROR(("malloc error")); BTL_ERROR(("malloc error"));
@ -457,13 +497,25 @@ static int handle_connect_request(struct rdmacm_contents *local, struct rdma_cm_
((struct id_contexts *)event->id->context)->qpnum = qpnum; ((struct id_contexts *)event->id->context)->qpnum = qpnum;
((struct id_contexts *)event->id->context)->endpoint = endpoint; ((struct id_contexts *)event->id->context)->endpoint = endpoint;
memset(&conn_param, 0, sizeof(conn_param));
conn_param.responder_resources = 1;
conn_param.initiator_depth = 1;
conn_param.private_data = &msg;
conn_param.private_data_len = sizeof(struct conn_message);
msg.qpnum = qpnum;
msg.rem_index = endpoint->index;
/* Accepting the connection will result in a
* RDMA_CM_EVENT_ESTABLISHED event on both the client and server
* side.
*/
rc = rdma_accept(event->id, &conn_param); rc = rdma_accept(event->id, &conn_param);
if (0 != rc) { if (0 != rc) {
BTL_ERROR(("rdma_accept error %d", rc)); BTL_ERROR(("rdma_accept error %d", rc));
goto out2; goto out2;
} }
BTL_VERBOSE(("Exiting handle_connect_request, qpnum = %d ", qpnum));
return 0; return 0;
out2: out2:
@ -476,8 +528,6 @@ out:
static void *rdmacm_unmonitor(int fd, int flags, void *context) static void *rdmacm_unmonitor(int fd, int flags, void *context)
{ {
BTL_VERBOSE(("rdmacm_event_dispatch unregistering..."));
if (NULL != event_channel) { if (NULL != event_channel) {
rdma_destroy_event_channel(event_channel); rdma_destroy_event_channel(event_channel);
event_channel = NULL; event_channel = NULL;
@ -503,8 +553,6 @@ static void rdmacm_destroy(struct rdmacm_contents *local)
local->id = NULL; local->id = NULL;
free(local); free(local);
} }
BTL_VERBOSE(("cleaning done"));
} }
static void rdmacm_server_cleanup(struct rdmacm_contents *local) static void rdmacm_server_cleanup(struct rdmacm_contents *local)
@ -526,21 +574,27 @@ static int rdmacm_connection_shutdown(struct mca_btl_base_endpoint_t *endpoint)
return 0; return 0;
} }
for (item = opal_list_get_first(&client_list); item != opal_list_get_end(&client_list); item = opal_list_get_next(item)) { /* Determine which id cooreleates to the endpoint we should now be
struct list_item *temp = (struct list_item *)item; * shutting down. By disconnecting instead of simply destroying the
* QPs,we are shutting down in a more graceful way thus preventing
* errors on the line.
*/
for (item = opal_list_get_first(&client_list);
item != opal_list_get_end(&client_list);
item = opal_list_get_next(item)) {
struct list_item *cli = (struct list_item *)item;
if (endpoint == temp->item->endpoint) { if (endpoint == cli->item->endpoint) {
int i; int i;
for (i = 0; i < mca_btl_openib_component.num_qps; i++) for (i = 0; i < mca_btl_openib_component.num_qps; i++)
if (NULL != temp->item->id[i] && if (NULL != cli->item->id[i] &&
NULL != temp->item->id[i]->qp && NULL != cli->item->id[i]->qp &&
NULL != temp->item->endpoint->qps) { NULL != cli->item->endpoint->qps) {
rdma_disconnect(temp->item->id[i]); rdma_disconnect(cli->item->id[i]);
} }
} }
} }
BTL_VERBOSE(("Finished disconnecting"));
return 0; return 0;
} }
@ -552,48 +606,53 @@ static void *local_endpoint_connected(void *context)
mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t *)context; mca_btl_openib_endpoint_t *endpoint = (mca_btl_openib_endpoint_t *)context;
mca_btl_openib_endpoint_connected(endpoint); mca_btl_openib_endpoint_connected(endpoint);
BTL_VERBOSE(("endpoint connected"));
return NULL; return NULL;
} }
static int rdmacm_connect_endpoint(struct rdmacm_contents *local, struct rdma_cm_event *event) static int rdmacm_connect_endpoint(struct rdmacm_contents *local, struct rdma_cm_event *event)
{ {
struct message *message; struct rdmacm_endpoint_local_cpc_data *data;
mca_btl_openib_endpoint_t *endpoint; mca_btl_openib_endpoint_t *endpoint;
struct message *message;
if (local->server) if (local->server)
endpoint = ((struct id_contexts *)event->id->context)->endpoint; endpoint = ((struct id_contexts *)event->id->context)->endpoint;
else { else {
struct list_item *temp; struct list_item *li;
uint32_t rem_index;
rem_index = ((struct conn_message *)event->param.conn.private_data)->rem_index;
endpoint = local->endpoint; endpoint = local->endpoint;
local->endpoint->rem_info.rem_index = ((struct conn_message *)event->param.conn.private_data)->rem_index; local->endpoint->rem_info.rem_index = rem_index;
temp = OBJ_NEW(list_item_t); li = OBJ_NEW(list_item_t);
if (NULL == temp) { if (NULL == li) {
BTL_ERROR(("malloc error")); BTL_ERROR(("malloc error"));
return -1; return -1;
} }
temp->item = local; li->item = local;
opal_list_append(&client_list, &(temp->super)); opal_list_append(&client_list, &(li->super));
} }
if (NULL == endpoint) { if (NULL == endpoint) {
BTL_ERROR(("Can't find endpoint")); BTL_ERROR(("Can't find endpoint"));
return -1; return -1;
} }
data = (struct rdmacm_endpoint_local_cpc_data *)endpoint->endpoint_local_cpc_data;
if (++((struct rdmacm_endpoint_local_cpc_data *)endpoint->endpoint_local_cpc_data)->rdmacm_counter < mca_btl_openib_component.num_qps) { /* Only notify the upper layers after the last QO has been connected */
BTL_VERBOSE(("%s count == %d", local->server?"server":"client", ((struct rdmacm_endpoint_local_cpc_data *)endpoint->endpoint_local_cpc_data)->rdmacm_counter)); if (++data->rdmacm_counter < mca_btl_openib_component.num_qps) {
BTL_VERBOSE(("%s count == %d", local->server?"server":"client", data->rdmacm_counter));
return 0; return 0;
} }
message = endpoint->endpoint_remote_cpc_data->cbm_modex_message; message = endpoint->endpoint_remote_cpc_data->cbm_modex_message;
if (local->server) { BTL_VERBOSE(("%s connected!!! local %x remote %x state = %d",
BTL_VERBOSE(("zzzzzzzzz! local %x remote %x state = %d", local->ipaddr, message->ipaddr, endpoint->endpoint_state)); local->server?"server":"client",
} else { local->ipaddr,
BTL_VERBOSE(("boiiiiiiing! local %x remote %x state = %d", local->ipaddr, message->ipaddr, endpoint->endpoint_state)); message->ipaddr,
} endpoint->endpoint_state));
ompi_btl_openib_fd_schedule(local_endpoint_connected, endpoint); ompi_btl_openib_fd_schedule(local_endpoint_connected, endpoint);
@ -603,17 +662,10 @@ static int rdmacm_connect_endpoint(struct rdmacm_contents *local, struct rdma_cm
static int start_connect(struct rdmacm_contents *local, int num) static int start_connect(struct rdmacm_contents *local, int num)
{ {
int rc; int rc;
opal_mutex_t *endpoint_state_lock = &((struct rdmacm_endpoint_local_cpc_data *)local->endpoint->endpoint_local_cpc_data)->endpoint_state_lock;
OPAL_THREAD_LOCK(endpoint_state_lock); /* Resolve the route to the remote system. Onced established, the
if (MCA_BTL_IB_CONNECT_ACK != local->endpoint->endpoint_state) { * local system will get a RDMA_CM_EVENT_ROUTE_RESOLVED event.
OPAL_THREAD_UNLOCK(endpoint_state_lock); */
BTL_VERBOSE(("Connection race. EP state = %d", local->endpoint->endpoint_state));
return 0;
}
OPAL_THREAD_UNLOCK(endpoint_state_lock);
/* resolve the remote route via rdma_resolve_route() */
rc = rdma_resolve_route(local->id[num], RDMA_RESOLVE_ADDR_TIMEOUT); rc = rdma_resolve_route(local->id[num], RDMA_RESOLVE_ADDR_TIMEOUT);
if (0 != rc) { if (0 != rc) {
BTL_ERROR(("Failed to resolve the route with %d", rc)); BTL_ERROR(("Failed to resolve the route with %d", rc));
@ -704,6 +756,13 @@ static int finish_connect(struct rdmacm_contents *local, int num)
goto out1; goto out1;
} }
} else { } else {
/* If we are establishing a connection in the "wrong" direction,
* setup a dummy CQ and QP and do NOT post any recvs on them.
* Otherwise this will screwup the recv accounting and will
* result in not posting recvs when you really really wanted to.
* All of the dummy cq and qps will be cleaned up on the reject
* event.
*/
rc = create_dummy_cq(local, local->openib_btl); rc = create_dummy_cq(local, local->openib_btl);
if (0 != rc) { if (0 != rc) {
BTL_ERROR(("create_dummy_cq error %d", rc)); BTL_ERROR(("create_dummy_cq error %d", rc));
@ -730,7 +789,10 @@ static int finish_connect(struct rdmacm_contents *local, int num)
BTL_VERBOSE(("Connecting from %x, port %d to %x", localipaddr, msg.rem_port, remoteipaddr)); BTL_VERBOSE(("Connecting from %x, port %d to %x", localipaddr, msg.rem_port, remoteipaddr));
/* connect via rdma_connect() */ /* Now all of the local setup has been done. The remote system
* should now get a RDMA_CM_EVENT_CONNECT_REQUEST event to further
* the setup of the QP.
*/
rc = rdma_connect(local->id[num], &conn_param); rc = rdma_connect(local->id[num], &conn_param);
if (0 != rc) { if (0 != rc) {
BTL_ERROR(("rdma_connect Failed with %d", rc)); BTL_ERROR(("rdma_connect Failed with %d", rc));
@ -823,8 +885,8 @@ static int rdma_event_handler(struct rdma_cm_event *event)
static void *rdmacm_event_dispatch(int fd, int flags, void *context) static void *rdmacm_event_dispatch(int fd, int flags, void *context)
{ {
struct rdma_cm_event *event, ecopy; struct rdma_cm_event *event, ecopy;
void *data = NULL;
int rc; int rc;
void *temp = NULL;
/* blocks until next cm_event */ /* blocks until next cm_event */
rc = rdma_get_cm_event(event_channel, &event); rc = rdma_get_cm_event(event_channel, &event);
@ -833,15 +895,24 @@ static void *rdmacm_event_dispatch(int fd, int flags, void *context)
return NULL; return NULL;
} }
/* If the incomming event is not acked in a sufficient amount of
* time, there will be a timeout error and the connection will be
* torndown. Also, the act of acking the event destroys the
* included data in the event. In certain circumstances, the time
* it takes to handle a incoming event could approach or exceed this
* time. To prevent this from happening, we will copy the event and
* all of its data, ack the event, and process the copy of the
* event.
*/
memcpy(&ecopy, event, sizeof(struct rdma_cm_event)); memcpy(&ecopy, event, sizeof(struct rdma_cm_event));
if (event->param.conn.private_data_len > 0) { if (event->param.conn.private_data_len > 0) {
temp = malloc(event->param.conn.private_data_len); data = malloc(event->param.conn.private_data_len);
if (NULL == temp) { if (NULL == data) {
BTL_ERROR(("error mallocing memory")); BTL_ERROR(("error mallocing memory"));
return NULL; return NULL;
} }
memcpy(temp, event->param.conn.private_data, event->param.conn.private_data_len); memcpy(data, event->param.conn.private_data, event->param.conn.private_data_len);
ecopy.param.conn.private_data = temp; ecopy.param.conn.private_data = data;
} }
rdma_ack_cm_event(event); rdma_ack_cm_event(event);
@ -852,65 +923,23 @@ static void *rdmacm_event_dispatch(int fd, int flags, void *context)
ecopy.status)); ecopy.status));
} }
if (NULL != temp) if (NULL != data)
free(temp); free(data);
return NULL; return NULL;
} }
static int rdmacm_query(mca_btl_openib_hca_t *hca) /* CPC init function - Setup all globals here */
{
struct in_addr inetaddr;
struct ibv_device_attr attr;
int i, rc;
bool valid_ip = 0;
/* It's not an error if these things fail; perhaps RDMA CM is not
supported on this HCA. So just gracefully return "sorry,
Charlie" */
rc = build_rdma_addr_list();
if (-1 == rc) {
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC unable to find IP address for %s",
ibv_get_device_name(hca->ib_dev));
return OMPI_ERR_NOT_SUPPORTED;
}
rc = ibv_query_device(hca->ib_dev_context, &attr);
if (-1 == rc) {
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC system error (verbs failure)");
return OMPI_ERROR;
}
for (i = 1; i <= attr.phys_port_cnt; i++) {
inetaddr.s_addr = rdma_get_ipv4addr(hca->ib_dev_context, i);
BTL_VERBOSE(("dev %s port %d ipaddr %s", hca->ib_dev->name, i, inet_ntoa(inetaddr)));
if (0 != inetaddr.s_addr) {
valid_ip = 1;
}
}
if (valid_ip == 1) {
return rdmacm_priority;
}
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC unable to find IP address for %s",
ibv_get_device_name(hca->ib_dev));
return OMPI_ERR_NOT_SUPPORTED;
}
static int rdmacm_init(mca_btl_openib_endpoint_t *endpoint) static int rdmacm_init(mca_btl_openib_endpoint_t *endpoint)
{ {
void *temp; void *data;
temp = calloc(1, sizeof(struct rdmacm_endpoint_local_cpc_data)); data = calloc(1, sizeof(struct rdmacm_endpoint_local_cpc_data));
if (NULL == temp) { if (NULL == data) {
BTL_ERROR(("malloc failed")); BTL_ERROR(("malloc failed"));
goto out; goto out;
} }
endpoint->endpoint_local_cpc_data = temp; endpoint->endpoint_local_cpc_data = data;
return 0; return 0;
out: out:
@ -924,33 +953,37 @@ static int ipaddrcheck(struct rdmacm_contents *server, mca_btl_openib_module_t *
rc = ibv_query_device(openib_btl->hca->ib_dev_context, &attr); rc = ibv_query_device(openib_btl->hca->ib_dev_context, &attr);
if (-1 == rc) { if (-1 == rc) {
BTL_ERROR(("ibv_query_device failed")); opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC system error (verbs failure)");
goto out; goto out;
} }
for (i = 0; i < attr.phys_port_cnt; i++) { for (i = 0; i < attr.phys_port_cnt; i++) {
bool found = false; bool found = false;
uint32_t temp = rdma_get_ipv4addr(openib_btl->hca->ib_dev_context, i+1); uint32_t ipaddr = rdma_get_ipv4addr(openib_btl->hca->ib_dev_context, i+1);
opal_list_item_t *item; opal_list_item_t *item;
for (item = opal_list_get_first(&server_list); item != opal_list_get_end(&server_list); item = opal_list_get_next(item)) { for (item = opal_list_get_first(&server_list); item != opal_list_get_end(&server_list); item = opal_list_get_next(item)) {
struct list_item *pitem = (struct list_item *)item; struct list_item *pitem = (struct list_item *)item;
BTL_VERBOSE(("paddr = %x, temp addr = %x", pitem->item->ipaddr, temp)); BTL_VERBOSE(("paddr = %x, ipaddr addr = %x", pitem->item->ipaddr, ipaddr));
if (pitem->item->ipaddr == temp || 0 == temp) { if (pitem->item->ipaddr == ipaddr || 0 == ipaddr) {
BTL_VERBOSE(("addr %x already exists", temp)); BTL_VERBOSE(("addr %x already exists", ipaddr));
found = true; found = true;
break; break;
} }
} }
if (!found) { if (!found) {
server->ipaddr = temp; server->ipaddr = ipaddr;
server->port = rdma_get_src_port(server->id[0]); server->port = rdma_get_src_port(server->id[0]);
break; break;
} }
} }
/* It's not an error if these things fail; perhaps RDMA CM is not
supported on this HCA. So just gracefully return "sorry,
Charlie" */
if (0 == server->ipaddr) { if (0 == server->ipaddr) {
BTL_ERROR(("No IP address found")); opal_output_verbose(5, mca_btl_base_output, "openib BTL: rdmacm CPC unable to find IP address for %s", ibv_get_device_name(openib_btl->hca->ib_dev));
goto out; goto out;
} }
@ -983,27 +1016,24 @@ out:
return OMPI_ERROR; return OMPI_ERROR;
} }
static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, /* This function determines if the RDMACM is a possible cpc method and
ompi_btl_openib_connect_base_module_t **cpc) * sets it up accordingly.
*/
static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, ompi_btl_openib_connect_base_module_t **cpc)
{ {
struct rdmacm_contents *server = NULL; struct rdmacm_contents *server = NULL;
struct sockaddr_in sin; struct sockaddr_in sin;
struct list_item *temp; struct list_item *li;
struct id_contexts *context;
int rc; int rc;
/* RDMACM is not supported if we have any XRC QPs */ /* RDMACM is not supported if we have any XRC QPs */
if (mca_btl_openib_component.num_xrc_qps > 0) { if (mca_btl_openib_component.num_xrc_qps > 0) {
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output, "openib BTL: rdmacm CPC not supported with XRC receive queues; skipped");
"openib BTL: rdmacm CPC not supported with XRC receive queues; skipped");
rc = OMPI_ERR_NOT_SUPPORTED; rc = OMPI_ERR_NOT_SUPPORTED;
goto out; goto out;
} }
rc = rdmacm_query(openib_btl->hca);
if (rc < 0) {
goto out;
}
BTL_VERBOSE(("rdmacm_component_query")); BTL_VERBOSE(("rdmacm_component_query"));
*cpc = malloc(sizeof(ompi_btl_openib_connect_base_module_t)); *cpc = malloc(sizeof(ompi_btl_openib_connect_base_module_t));
@ -1022,68 +1052,70 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl,
server = malloc(sizeof(struct rdmacm_contents)); server = malloc(sizeof(struct rdmacm_contents));
if (NULL == server) { if (NULL == server) {
rc = OMPI_ERR_OUT_OF_RESOURCE; rc = OMPI_ERR_OUT_OF_RESOURCE;
goto out; goto out1;
} }
server->id = malloc(sizeof(struct rdma_cm_id *)); server->id = malloc(sizeof(struct rdma_cm_id *));
if (NULL == server->id) { if (NULL == server->id) {
rc = OMPI_ERR_OUT_OF_RESOURCE; rc = OMPI_ERR_OUT_OF_RESOURCE;
goto out; goto out2;
} }
server->server = true; server->server = true;
server->openib_btl = openib_btl; server->openib_btl = openib_btl;
/* create an rdma_cm_id */ context = malloc(sizeof(struct id_contexts));
rc = rdma_create_id(event_channel, &server->id[0], NULL, RDMA_PS_TCP); if (NULL == context) {
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC system error (malloc failed)");
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto out3;
}
context->local = server;
context->qpnum = 0;
rc = rdma_create_id(event_channel, &server->id[0], context, RDMA_PS_TCP);
if (0 != rc) { if (0 != rc) {
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC failed to create ID"); "openib BTL: rdmacm CPC failed to create ID");
rc = OMPI_ERR_OUT_OF_RESOURCE; rc = OMPI_ERR_OUT_OF_RESOURCE;
goto out; goto out4;
} }
server->id[0]->context = malloc(sizeof(struct id_contexts));
if (NULL == server->id[0]->context) {
opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC system error (malloc failed)");
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto out;
}
((struct id_contexts *)server->id[0]->context)->local = server;
((struct id_contexts *)server->id[0]->context)->qpnum = 0;
/* rdma_bind() */
memset(&sin, 0, sizeof(sin)); memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET; sin.sin_family = AF_INET;
sin.sin_addr.s_addr = rdmacm_addr; sin.sin_addr.s_addr = rdmacm_addr;
sin.sin_port = rdmacm_port; sin.sin_port = rdmacm_port;
/* Bind the rdmacm server to the local IP address and an ephemerial
* port or one specified by a comand arg.
*/
rc = rdma_bind_addr(server->id[0], (struct sockaddr *)&sin); rc = rdma_bind_addr(server->id[0], (struct sockaddr *)&sin);
if (0 != rc) { if (0 != rc) {
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC unable to bind to address"); "openib BTL: rdmacm CPC unable to bind to address");
rc = OMPI_ERR_UNREACH; rc = OMPI_ERR_UNREACH;
goto out; goto out5;
} }
/* Verify that the HCA has a valid IP address on it, or we cannot use the cpc */
rc = ipaddrcheck(server, openib_btl); rc = ipaddrcheck(server, openib_btl);
if (0 != rc) { if (0 != rc) {
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm IP address not found on port"); "openib BTL: rdmacm IP address not found on port");
rc = OMPI_ERR_NOT_SUPPORTED; rc = OMPI_ERR_NOT_SUPPORTED;
goto out; goto out5;
} }
/* rdma_listen() */ /* Listen on the specified address/port with the rdmacm, limit the amount of incoming connections to 1024 */
/* FIXME - 1024 should be (num of connectors * mca_btl_openib_component.num_qps) */ /* FIXME - 1024 should be (num of connectors * mca_btl_openib_component.num_qps) */
rc = rdma_listen(server->id[0], 1024); rc = rdma_listen(server->id[0], 1024);
if (0 != rc) { if (0 != rc) {
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC unable to listen"); "openib BTL: rdmacm CPC unable to listen");
rc = OMPI_ERR_UNREACH; rc = OMPI_ERR_UNREACH;
goto out; goto out5;
} }
rc = create_message(server, openib_btl, &(*cpc)->data); rc = create_message(server, openib_btl, &(*cpc)->data);
@ -1091,23 +1123,36 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl,
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC unable to create message"); "openib BTL: rdmacm CPC unable to create message");
rc = OMPI_ERR_OUT_OF_RESOURCE; rc = OMPI_ERR_OUT_OF_RESOURCE;
goto out; goto out5;
} }
temp = OBJ_NEW(list_item_t); li = OBJ_NEW(list_item_t);
if (NULL== temp) { if (NULL== li) {
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC unable to add to list"); "openib BTL: rdmacm CPC unable to add to list");
goto out; rc = OMPI_ERR_OUT_OF_RESOURCE;
goto out6;
} }
temp->item = server; li->item = server;
opal_list_append(&server_list, &(temp->super)); opal_list_append(&server_list, &(li->super));
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output,
"openib BTL: rdmacm CPC available for use on %s", "openib BTL: rdmacm CPC available for use on %s",
ibv_get_device_name(openib_btl->hca->ib_dev)); ibv_get_device_name(openib_btl->hca->ib_dev));
return OMPI_SUCCESS; return OMPI_SUCCESS;
out6:
free(&(*cpc)->data.cbm_modex_message);
out5:
rdma_destroy_id(server->id[0]);
out4:
free(context);
out3:
free(server->id);
out2:
free(server);
out1:
free(*cpc);
out: out:
if (OMPI_ERR_NOT_SUPPORTED == rc) { if (OMPI_ERR_NOT_SUPPORTED == rc) {
opal_output_verbose(5, mca_btl_base_output, opal_output_verbose(5, mca_btl_base_output,
@ -1119,9 +1164,6 @@ out:
ibv_get_device_name(openib_btl->hca->ib_dev), rc, ibv_get_device_name(openib_btl->hca->ib_dev), rc,
opal_strerror(rc)); opal_strerror(rc));
} }
if (NULL != server) {
rdmacm_server_cleanup(server);
}
return rc; return rc;
} }
@ -1130,13 +1172,15 @@ static int rdmacm_component_destroy(void)
opal_list_item_t *item; opal_list_item_t *item;
int rc; int rc;
BTL_VERBOSE(("rdmacm_component_destroy"));
if (0 != opal_list_get_size(&client_list)) { if (0 != opal_list_get_size(&client_list)) {
for (item = opal_list_get_first(&client_list); for (item = opal_list_get_first(&client_list);
item != opal_list_get_end(&client_list); item != opal_list_get_end(&client_list);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
struct rdmacm_contents *temp = ((struct list_item *)item)->item; struct rdmacm_contents *local = ((struct list_item *)item)->item;
rdmacm_destroy(temp); rdmacm_destroy(local);
opal_list_remove_item(&client_list, item); opal_list_remove_item(&client_list, item);
} }
} }
@ -1145,9 +1189,9 @@ static int rdmacm_component_destroy(void)
for (item = opal_list_get_first(&server_list); for (item = opal_list_get_first(&server_list);
item != opal_list_get_end(&server_list); item != opal_list_get_end(&server_list);
item = opal_list_get_next(item)) { item = opal_list_get_next(item)) {
struct rdmacm_contents *temp = ((struct list_item *)item)->item; struct rdmacm_contents *local = ((struct list_item *)item)->item;
rdmacm_destroy(temp); rdmacm_server_cleanup(local);
opal_list_remove_item(&server_list, item); opal_list_remove_item(&server_list, item);
} }
} }
@ -1160,7 +1204,6 @@ static int rdmacm_component_destroy(void)
free_rdma_addr_list(); free_rdma_addr_list();
BTL_VERBOSE(("rdmacm_component_destroy"));
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }