1
1

fix #1828; rework the private data connection establishment process; reviewed by terry d.

This commit was SVN r20889.
Этот коммит содержится в:
Donald Kerr 2009-03-26 17:54:44 +00:00
родитель 27ed29a0a1
Коммит 47dc1bd493
3 изменённых файлов: 505 добавлений и 121 удалений

Просмотреть файл

@ -587,17 +587,51 @@ mca_btl_udapl_component_init (int *num_btl_modules,
static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl,
DAT_CR_HANDLE cr_handle)
{
DAT_EP_HANDLE endpoint;
DAT_EP_HANDLE ep;
int rc;
mca_btl_base_endpoint_t* proc_ep;
mca_btl_udapl_addr_t priv_data_in_addr;
int32_t priv_data_in_conn_type; /* incoming endpoint type */
rc = mca_btl_udapl_endpoint_create(btl, &endpoint);
if (mca_btl_udapl_component.udapl_conn_priv_data) {
DAT_CR_PARAM cr_param;
/* query the connection request for incoming private data */
rc = dat_cr_query(cr_handle,
DAT_CR_FIELD_ALL,
&cr_param);
if (rc != DAT_SUCCESS) {
char* major;
char* minor;
dat_strerror(rc, (const char**)&major,
(const char**)&minor);
BTL_ERROR(("ERROR: %s %s %s\n", "dat_cr_query",
major, minor));
return OMPI_ERROR;
}
/* retrieve data from connection request event;
* cr_param contains remote_port_qual but we need to
* match on the psp port and address of remote
* so we get this from the private data.
*/
memcpy(&priv_data_in_addr,
(mca_btl_udapl_addr_t *)cr_param.private_data,
sizeof(mca_btl_udapl_addr_t));
priv_data_in_conn_type = *(int32_t *)
((char *)cr_param.private_data + sizeof(mca_btl_udapl_addr_t));
}
/* create the endpoint for the incoming connection */
rc = mca_btl_udapl_endpoint_create(btl, &ep);
if(OMPI_SUCCESS != rc) {
BTL_ERROR(("ERROR: mca_btl_udapl_endpoint_create"));
return OMPI_ERROR;
}
rc = dat_cr_accept(cr_handle, endpoint, sizeof(mca_btl_udapl_addr_t),
&btl->udapl_addr);
/* cr_param no longer valid once dat_cr_accept called */
rc = dat_cr_accept(cr_handle, ep, 0, NULL);
if(DAT_SUCCESS != rc) {
char* major;
char* minor;
@ -609,6 +643,28 @@ static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl,
return OMPI_ERROR;
}
if (mca_btl_udapl_component.udapl_conn_priv_data) {
/* With accept now in process find a home for the DAT ep by
* matching against the private data that came in on the
* connection request event
*/
/* find the endpoint which matches the address in data received */
proc_ep =
mca_btl_udapl_find_endpoint_address_match(btl, priv_data_in_addr);
if (proc_ep == NULL) {
return OMPI_ERROR;
}
if (BTL_UDAPL_EAGER_CONNECTION == priv_data_in_conn_type) {
proc_ep->endpoint_eager = ep;
} else {
assert(BTL_UDAPL_MAX_CONNECTION == priv_data_in_conn_type);
proc_ep->endpoint_max = ep;
}
}
return OMPI_SUCCESS;
}
@ -1031,10 +1087,10 @@ int mca_btl_udapl_component_progress()
/* Both the client and server side of a connection generate
this event */
if (mca_btl_udapl_component.udapl_conn_priv_data) {
/* use dat private data to exchange process data */
mca_btl_udapl_endpoint_finish_connect(btl,
event.event_data.connect_event_data.private_data,
NULL,
/* private data is only valid at this point if this
* event is from a dat_ep_connect call, not an accept
*/
mca_btl_udapl_endpoint_pd_established_conn(btl,
event.event_data.connect_event_data.ep_handle);
} else {
/* explicitly exchange process data */

Просмотреть файл

@ -44,26 +44,50 @@
#include "btl_udapl_mca.h"
#include "btl_udapl_proc.h"
static void mca_btl_udapl_endpoint_send_cb(int status, orte_process_name_t* endpoint,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void mca_btl_udapl_endpoint_send_cb(
int status,
orte_process_name_t* endpoint,
opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata);
static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint);
static int mca_btl_udapl_endpoint_post_recv(mca_btl_udapl_endpoint_t* endpoint,
size_t size);
static int mca_btl_udapl_endpoint_post_recv(
mca_btl_udapl_endpoint_t* endpoint,
size_t size);
void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint);
void mca_btl_udapl_endpoint_recv(int status, orte_process_name_t* endpoint,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
void mca_btl_udapl_endpoint_recv(
int status,
orte_process_name_t* endpoint,
opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata);
static int mca_btl_udapl_endpoint_finish_eager(mca_btl_udapl_endpoint_t*);
static int mca_btl_udapl_endpoint_finish_max(mca_btl_udapl_endpoint_t*);
static void mca_btl_udapl_endpoint_connect_eager_rdma(mca_btl_udapl_endpoint_t* endpoint);
static int mca_btl_udapl_endpoint_write_eager(mca_btl_base_endpoint_t* endpoint,
mca_btl_udapl_frag_t* frag);
static void mca_btl_udapl_endpoint_control_send_cb(mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor,
int status);
static int mca_btl_udapl_endpoint_send_eager_rdma(mca_btl_base_endpoint_t* endpoint);
static mca_btl_base_endpoint_t* mca_btl_udapl_find_endpoint_connection_match(
struct mca_btl_udapl_module_t* btl,
DAT_EP_HANDLE ep);
static int mca_btl_udapl_endpoint_pd_finish_eager(
mca_btl_udapl_endpoint_t* endpoint);
static int mca_btl_udapl_endpoint_pd_finish_max(
mca_btl_udapl_endpoint_t* endpoint);
static int mca_btl_udapl_endpoint_pd_connections_completed(
mca_btl_udapl_endpoint_t* endpoint);
static void mca_btl_udapl_endpoint_connect_eager_rdma(
mca_btl_udapl_endpoint_t* endpoint);
static int mca_btl_udapl_endpoint_write_eager(
mca_btl_base_endpoint_t* endpoint,
mca_btl_udapl_frag_t* frag);
static void
mca_btl_udapl_endpoint_control_send_cb(mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor,
int status);
static int mca_btl_udapl_endpoint_send_eager_rdma(
mca_btl_base_endpoint_t* endpoint);
extern void mca_btl_udapl_frag_progress_pending(
mca_btl_udapl_module_t* udapl_btl,
mca_btl_base_endpoint_t* endpoint,
const int connection);
/*
@ -598,6 +622,8 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint)
{
mca_btl_udapl_module_t* btl = endpoint->endpoint_btl;
int rc;
char *priv_data_ptr = NULL;
DAT_COUNT priv_data_size = 0;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
OPAL_THREAD_ADD32(&(btl->udapl_connect_inprogress), 1);
@ -621,9 +647,30 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint)
goto failure_create;
}
/* create private data as required */
if (mca_btl_udapl_component.udapl_conn_priv_data) {
int32_t priv_data_conn_type = BTL_UDAPL_EAGER_CONNECTION;
priv_data_size = sizeof(mca_btl_udapl_addr_t) + sizeof(int32_t);
priv_data_ptr = (char *)malloc(priv_data_size);
if (NULL == priv_data_ptr) {
BTL_ERROR(("ERROR: %s %s\n", "mca_btl_udapl_endpoint_connect",
"out of resources"));
goto failure_create;
}
/* private data consists of local btl address, listen port (psp),
* and endpoint state to indicate EAGER or MAX endpoint
*/
memcpy(priv_data_ptr, &btl->udapl_addr, sizeof(mca_btl_udapl_addr_t));
memcpy((priv_data_ptr + sizeof(mca_btl_udapl_addr_t)),
&priv_data_conn_type, sizeof(int32_t));
}
rc = dat_ep_connect(endpoint->endpoint_eager, &endpoint->endpoint_addr.addr,
endpoint->endpoint_addr.port, mca_btl_udapl_component.udapl_timeout,
sizeof(mca_btl_udapl_addr_t), &btl->udapl_addr, 0, DAT_CONNECT_DEFAULT_FLAG);
priv_data_size, priv_data_ptr, 0, DAT_CONNECT_DEFAULT_FLAG);
if(DAT_SUCCESS != rc) {
char* major;
char* minor;
@ -637,6 +684,11 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint)
endpoint->endpoint_state = MCA_BTL_UDAPL_CONN_EAGER;
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
if (mca_btl_udapl_component.udapl_conn_priv_data) {
free(priv_data_ptr);
}
return;
failure:
@ -660,66 +712,52 @@ int mca_btl_udapl_endpoint_finish_connect(struct mca_btl_udapl_module_t* btl,
int32_t* connection_seq,
DAT_EP_HANDLE endpoint)
{
mca_btl_udapl_proc_t* proc;
mca_btl_base_endpoint_t* ep;
size_t i;
int rc;
int rc = OMPI_SUCCESS;
/* Search for the matching BTL EP */
for(proc = (mca_btl_udapl_proc_t*)
opal_list_get_first(&mca_btl_udapl_component.udapl_procs);
proc != (mca_btl_udapl_proc_t*)
opal_list_get_end(&mca_btl_udapl_component.udapl_procs);
proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) {
for(i = 0; i < proc->proc_endpoint_count; i++) {
ep = proc->proc_endpoints[i];
/* find the endpoint which matches the address in data received */
ep = mca_btl_udapl_find_endpoint_address_match(btl, *addr);
/* Does this endpoint match? */
/* TODO - Check that the DAT_CONN_QUAL's match too */
if(ep->endpoint_btl == btl &&
!memcmp(addr, &ep->endpoint_addr, sizeof(DAT_SOCK_ADDR))) {
OPAL_THREAD_LOCK(&ep->endpoint_lock);
if(MCA_BTL_UDAPL_CONN_EAGER == ep->endpoint_state) {
ep->endpoint_connection_seq = (NULL != connection_seq) ?
*connection_seq:0;
ep->endpoint_eager = endpoint;
rc = mca_btl_udapl_endpoint_finish_eager(ep);
} else if(MCA_BTL_UDAPL_CONN_MAX == ep->endpoint_state) {
/* Check to see order of messages received are in
* the same order the actual connections are made.
* If they are not we need to swap the eager and
* max connections. This inversion is possible due
* to a race condition that one process may actually
* receive the sendrecv messages from the max connection
* before the eager connection.
*/
if (NULL == connection_seq ||
ep->endpoint_connection_seq < *connection_seq) {
/* normal order connection matching */
ep->endpoint_max = endpoint;
} else {
/* inverted order connection matching */
ep->endpoint_max = ep->endpoint_eager;
ep->endpoint_eager = endpoint;
}
rc = mca_btl_udapl_endpoint_finish_max(ep);
} else {
BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_DIAGNOSE,
("ERROR: invalid EP state %d\n",
ep->endpoint_state));
return OMPI_ERROR;
}
return rc;
}
}
if (ep == NULL) {
/* If this point is reached, no matching endpoint was found */
BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_CRITICAL,
("ERROR: could not match endpoint\n"));
return OMPI_ERROR;
}
/* If this point is reached, no matching endpoint was found */
BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_DIAGNOSE,
("btl_udapl ERROR could not match endpoint\n"));
return OMPI_ERROR;
if(MCA_BTL_UDAPL_CONN_EAGER == ep->endpoint_state) {
ep->endpoint_connection_seq = (NULL != connection_seq) ?
*connection_seq:0;
ep->endpoint_eager = endpoint;
rc = mca_btl_udapl_endpoint_finish_eager(ep);
} else if(MCA_BTL_UDAPL_CONN_MAX == ep->endpoint_state) {
/* Check to see order of messages received are in
* the same order the actual connections are made.
* If they are not we need to swap the eager and
* max connections. This inversion is possible due
* to a race condition that one process may actually
* receive the sendrecv messages from the max connection
* before the eager connection.
*/
if (NULL == connection_seq ||
ep->endpoint_connection_seq < *connection_seq) {
/* normal order connection matching */
ep->endpoint_max = endpoint;
} else {
/* inverted order connection matching */
ep->endpoint_max = ep->endpoint_eager;
ep->endpoint_eager = endpoint;
}
rc = mca_btl_udapl_endpoint_finish_max(ep);
} else {
BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_DIAGNOSE,
("ERROR: invalid EP state %d\n",
ep->endpoint_state));
return OMPI_ERROR;
}
return rc;
}
@ -738,7 +776,7 @@ static int mca_btl_udapl_endpoint_finish_eager(
/* establish eager rdma connection */
if ((1 == mca_btl_udapl_component.udapl_use_eager_rdma) &&
(btl->udapl_eager_rdma_endpoint_count <
(btl->udapl_eager_rdma_endpoint_count <
mca_btl_udapl_component.udapl_max_eager_rdma_peers)) {
mca_btl_udapl_endpoint_connect_eager_rdma(endpoint);
}
@ -758,7 +796,7 @@ static int mca_btl_udapl_endpoint_finish_eager(
rc = dat_ep_connect(endpoint->endpoint_max,
&endpoint->endpoint_addr.addr, endpoint->endpoint_addr.port,
mca_btl_udapl_component.udapl_timeout,
sizeof(mca_btl_udapl_addr_t),&btl->udapl_addr , 0,
0, NULL, 0,
DAT_CONNECT_DEFAULT_FLAG);
if(DAT_SUCCESS != rc) {
char* major;
@ -779,22 +817,20 @@ static int mca_btl_udapl_endpoint_finish_eager(
static int mca_btl_udapl_endpoint_finish_max(mca_btl_udapl_endpoint_t* endpoint)
{
mca_btl_udapl_frag_t* frag;
int ret = OMPI_SUCCESS;
int token_avail;
int queue_len;
int i;
mca_btl_udapl_module_t* udapl_btl = endpoint->endpoint_btl;
endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTED;
OPAL_THREAD_ADD32(&(endpoint->endpoint_btl->udapl_connect_inprogress), -1);
/* post eager/max recv buffers */
/* post eager recv buffers */
ret = mca_btl_udapl_endpoint_post_recv(endpoint,
mca_btl_udapl_component.udapl_eager_frag_size);
if (OMPI_SUCCESS != ret) {
return ret;
}
/* post max recv buffers */
ret = mca_btl_udapl_endpoint_post_recv(endpoint,
mca_btl_udapl_component.udapl_max_frag_size);
if (OMPI_SUCCESS != ret) {
@ -802,44 +838,317 @@ static int mca_btl_udapl_endpoint_finish_max(mca_btl_udapl_endpoint_t* endpoint)
}
/* progress eager frag queue as allowed */
queue_len = opal_list_get_size(&(endpoint->endpoint_eager_frags));
BTL_UDAPL_TOKEN_AVAIL(endpoint, BTL_UDAPL_EAGER_CONNECTION, token_avail);
for(i = 0; i < queue_len && token_avail > 0; i++) {
frag = (mca_btl_udapl_frag_t*)opal_list_remove_first(&(endpoint->endpoint_eager_frags));
if(NULL == frag) {
break;
}
mca_btl_udapl_endpoint_send(frag->endpoint, frag);
BTL_UDAPL_TOKEN_AVAIL(endpoint, BTL_UDAPL_EAGER_CONNECTION,
token_avail);
}
mca_btl_udapl_frag_progress_pending(udapl_btl, endpoint,
BTL_UDAPL_EAGER_CONNECTION);
/* progress max frag queue as allowed */
queue_len = opal_list_get_size(&(endpoint->endpoint_max_frags));
BTL_UDAPL_TOKEN_AVAIL(endpoint, BTL_UDAPL_MAX_CONNECTION, token_avail);
for(i = 0; i < queue_len && token_avail > 0; i++) {
frag = (mca_btl_udapl_frag_t*)opal_list_remove_first(&(endpoint->endpoint_max_frags));
if(NULL == frag) {
break;
}
mca_btl_udapl_endpoint_send(frag->endpoint, frag);
BTL_UDAPL_TOKEN_AVAIL(endpoint, BTL_UDAPL_MAX_CONNECTION, token_avail);
}
mca_btl_udapl_frag_progress_pending(udapl_btl, endpoint,
BTL_UDAPL_MAX_CONNECTION);
return ret;
}
/*
* Utility routine. Search list of endpoints to find one that matches
* the given address.
*
* @param btl (IN) BTL module
* @param addr (IN) Address used to find endpoint to be returned
*
* @return Pointer to the base endpoint matching addr or NULL
*/
mca_btl_base_endpoint_t*
mca_btl_udapl_find_endpoint_address_match(struct mca_btl_udapl_module_t* btl,
mca_btl_udapl_addr_t addr)
{
size_t i;
mca_btl_udapl_proc_t *proc;
mca_btl_base_endpoint_t *proc_ep;
mca_btl_base_endpoint_t *endpoint = NULL;
for(proc = (mca_btl_udapl_proc_t*)
opal_list_get_first(&mca_btl_udapl_component.udapl_procs);
proc != (mca_btl_udapl_proc_t*)
opal_list_get_end(&mca_btl_udapl_component.udapl_procs);
proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) {
for(i = 0; i < proc->proc_endpoint_count; i++) {
proc_ep = proc->proc_endpoints[i];
if(proc_ep->endpoint_btl == btl &&
!memcmp(&addr, &proc_ep->endpoint_addr,
(sizeof(DAT_CONN_QUAL) + sizeof(DAT_SOCK_ADDR)))) {
/* match found */
endpoint = proc_ep;
return endpoint;
}
}
}
return endpoint;
}
/*
* Utility routine. Search list of endpoints to find one that matches
* the given DAT endpoint handle, this could either be the eager or
* max ep.
*
* @param btl (IN) BTL module
* @param ep (IN) EP handle used to find endpoint to be returned
*
* @return Pointer to the base endpoint matching addr or NULL
*/
static mca_btl_base_endpoint_t*
mca_btl_udapl_find_endpoint_connection_match(struct mca_btl_udapl_module_t* btl,
DAT_EP_HANDLE ep)
{
size_t i;
mca_btl_udapl_proc_t *proc;
mca_btl_base_endpoint_t *proc_ep;
mca_btl_base_endpoint_t *endpoint = NULL;
for(proc = (mca_btl_udapl_proc_t*)
opal_list_get_first(&mca_btl_udapl_component.udapl_procs);
proc != (mca_btl_udapl_proc_t*)
opal_list_get_end(&mca_btl_udapl_component.udapl_procs);
proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) {
for(i = 0; i < proc->proc_endpoint_count; i++) {
proc_ep = proc->proc_endpoints[i];
if(proc_ep->endpoint_btl == btl) {
if (ep == proc_ep->endpoint_eager ||
ep == proc_ep->endpoint_max) {
/* match found */
endpoint = proc_ep;
return endpoint;
} else {
continue;
}
}
}
}
return endpoint;
}
/*
* Private Data connection establishment process. Operations to be
* performed once the eager connection of the given endpoint has
* completed.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
*
* @return OMPI_SUCCESS or error status on failure
*/
static int mca_btl_udapl_endpoint_pd_finish_eager(
mca_btl_udapl_endpoint_t* endpoint)
{
mca_btl_udapl_module_t* btl = endpoint->endpoint_btl;
int rc = OMPI_SUCCESS;
char *priv_data_ptr = NULL;
DAT_COUNT priv_data_size = 0;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
endpoint->endpoint_state = MCA_BTL_UDAPL_CONN_MAX;
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
/* initiate the eager rdma connection */
if ((1 == mca_btl_udapl_component.udapl_use_eager_rdma) &&
(btl->udapl_eager_rdma_endpoint_count <
mca_btl_udapl_component.udapl_max_eager_rdma_peers)) {
mca_btl_udapl_endpoint_connect_eager_rdma(endpoint);
}
/* Only one side does dat_ep_connect() and if by chance the
* connection is already established we don't need to bother
* with this.
*/
if((BTL_UDAPL_NUM_CONNECTION != endpoint->endpoint_connections_completed)
&& (0 < orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&endpoint->endpoint_proc->proc_guid,
&ompi_proc_local()->proc_name))) {
rc = mca_btl_udapl_endpoint_create(btl, &endpoint->endpoint_max);
if(DAT_SUCCESS != rc) {
endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED;
return OMPI_ERROR;
}
if (mca_btl_udapl_component.udapl_conn_priv_data) {
int32_t priv_data_conn_type = BTL_UDAPL_MAX_CONNECTION;
priv_data_size = (sizeof(mca_btl_udapl_addr_t) + sizeof(int32_t));
priv_data_ptr = (char *)malloc(priv_data_size);
if (NULL == priv_data_ptr) {
BTL_ERROR(("ERROR: %s %s\n",
"mca_btl_udapl_endpoint_pd_finish_eager",
"out of resources"));
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* private data consists of local btl address, listen port (psp),
* and endpoint state to indicate EAGER or MAX endpoint
*/
memcpy(priv_data_ptr, &btl->udapl_addr,
sizeof(mca_btl_udapl_addr_t));
memcpy((priv_data_ptr + sizeof(mca_btl_udapl_addr_t)),
&priv_data_conn_type, sizeof(int32_t));
}
rc = dat_ep_connect(endpoint->endpoint_max,
&endpoint->endpoint_addr.addr, endpoint->endpoint_addr.port,
mca_btl_udapl_component.udapl_timeout,
priv_data_size, priv_data_ptr, 0,
DAT_CONNECT_DEFAULT_FLAG);
if (mca_btl_udapl_component.udapl_conn_priv_data) {
free(priv_data_ptr);
}
if(DAT_SUCCESS != rc) {
char* major;
char* minor;
dat_strerror(rc, (const char**)&major,
(const char**)&minor);
BTL_ERROR(("ERROR: %s %s %s\n", "dat_ep_connect",
major, minor));
dat_ep_free(endpoint->endpoint_max);
return OMPI_ERROR;
}
}
/* post eager recv buffers */
rc = mca_btl_udapl_endpoint_post_recv(endpoint,
mca_btl_udapl_component.udapl_eager_frag_size);
if (OMPI_SUCCESS != rc) {
return rc;
}
/* Not progressing here because the entire endpoint needs to be
* marked MCA_BTL_UDAPL_CONNECTED, otherwise
* mca_btl_udapl_endpoint_send() will just put queued sends back on
* the queue.
*/
return OMPI_SUCCESS;
}
/*
* Private Data connection establishment process. Operations to be
* performed once the max connection of the given endpoint has
* completed.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
*
* @return OMPI_SUCCESS or error status on failure
*/
static int
mca_btl_udapl_endpoint_pd_finish_max(mca_btl_udapl_endpoint_t* endpoint)
{
int rc = OMPI_SUCCESS;
/* post max recv buffers */
rc = mca_btl_udapl_endpoint_post_recv(endpoint,
mca_btl_udapl_component.udapl_max_frag_size);
/* Not progressing here because the entire endpoint needs to be
* marked MCA_BTL_UDAPL_CONNECTED otherwise
* mca_btl_udapl_endpoint_send() will just put queued sends back on
* the queue.
*/
return rc;
}
/*
* Private Data connection establishment process. Operations to be
* performed once both the eager and max max connections of the given
* endpoint has completed.
*
* @param endpoint (IN) BTL addressing information
*
* @return OMPI_SUCCESS or error status on failure */
static int
mca_btl_udapl_endpoint_pd_connections_completed(mca_btl_udapl_endpoint_t* endpoint)
{
int rc = OMPI_SUCCESS;
mca_btl_udapl_module_t* udapl_btl = endpoint->endpoint_btl;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTED;
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
OPAL_THREAD_ADD32(&(endpoint->endpoint_btl->udapl_connect_inprogress), -1);
/* progress eager frag queue */
mca_btl_udapl_frag_progress_pending(udapl_btl, endpoint,
BTL_UDAPL_EAGER_CONNECTION);
/* progress max frag queue */
mca_btl_udapl_frag_progress_pending(udapl_btl, endpoint,
BTL_UDAPL_MAX_CONNECTION);
return rc;
}
/*
* Private Data connection establishment process. Called once the
* DAT_CONNECTION_EVENT_ESTABLISHED is dequeued from the connecton
* event dispatcher (evd). This event is the local completion event
* for both the dat_ep_connect and dat_cr_accpept calls.
*
* @param btl (IN) BTL module
* @param ep (IN) EP handle used to find endpoint to be returned
*
* @return Pointer to the base endpoint matching addr
*/
int
mca_btl_udapl_endpoint_pd_established_conn(struct mca_btl_udapl_module_t* btl,
DAT_EP_HANDLE established_ep)
{
int rc = OMPI_SUCCESS;
mca_btl_base_endpoint_t* proc_ep = NULL;
/* search for ep and decide what to do next */
proc_ep =
mca_btl_udapl_find_endpoint_connection_match(btl, established_ep);
if (proc_ep == NULL) {
/* If this point is reached, no matching endpoint was found */
BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_CRITICAL,
("ERROR: could not match endpoint\n"));
return OMPI_ERROR;
}
proc_ep->endpoint_connections_completed++;
if (established_ep == proc_ep->endpoint_eager) {
rc = mca_btl_udapl_endpoint_pd_finish_eager(proc_ep);
} else if (established_ep == proc_ep->endpoint_max) {
rc = mca_btl_udapl_endpoint_pd_finish_max(proc_ep);
}
if (rc == OMPI_SUCCESS && BTL_UDAPL_NUM_CONNECTION ==
proc_ep->endpoint_connections_completed) {
rc = mca_btl_udapl_endpoint_pd_connections_completed(proc_ep);
}
return rc;
}
/*
* Post receive buffers for a newly established endpoint connection.
*/
@ -916,6 +1225,7 @@ static void mca_btl_udapl_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
endpoint->endpoint_proc = 0;
endpoint->endpoint_connection_seq = 0;
endpoint->endpoint_connections_completed = 0;;
endpoint->endpoint_eager_sends = mca_btl_udapl_component.udapl_num_sends;
endpoint->endpoint_max_sends = mca_btl_udapl_component.udapl_num_sends;
@ -997,7 +1307,7 @@ static void mca_btl_udapl_endpoint_control_send_cb(
mca_btl_udapl_frag_t* frag = (mca_btl_udapl_frag_t*)descriptor;
if(frag->size != mca_btl_udapl_component.udapl_eager_frag_size) {
connection = BTL_UDAPL_MAX_CONNECTION;
connection = BTL_UDAPL_MAX_CONNECTION;
}
/* control messages are not part of the regular accounting
@ -1111,7 +1421,7 @@ static int mca_btl_udapl_endpoint_send_eager_rdma(
OPAL_THREAD_LOCK(&endpoint->endpoint_lock);
opal_list_append(&endpoint->endpoint_eager_frags,
(opal_list_item_t*)data_frag);
(opal_list_item_t*)data_frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock);
return rc;

Просмотреть файл

@ -11,7 +11,7 @@
* All rights reserved.
* Copyright (c) 2006 Sandia National Laboratories. All rights
* reserved.
* Copyright (c) 2006-2008 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2006-2009 Sun Microsystems, Inc. All rights reserved.
*
* $COPYRIGHT$
*
@ -130,6 +130,9 @@ struct mca_btl_base_endpoint_t {
int32_t endpoint_connection_seq;
/**< sequence number of sendrecv message for the connection est */
int32_t endpoint_connections_completed;
/**< count of completed connections for priv data connection est. */
opal_mutex_t endpoint_lock;
/**< lock for concurrent access to endpoint state */
@ -199,6 +202,21 @@ int mca_btl_udapl_endpoint_create(struct mca_btl_udapl_module_t* btl,
int mca_btl_udapl_endpoint_send_sr_credits(mca_btl_base_endpoint_t* endpoint,
const int connection);
/*
* Handle the established DAT endpoint when private data is in use
*/
int mca_btl_udapl_endpoint_pd_established_conn(
struct mca_btl_udapl_module_t* btl,
DAT_EP_HANDLE established_ep);
/*
* Utility routine. Search list of endpoints to find one that matches
* the given address.
*/
mca_btl_udapl_endpoint_t* mca_btl_udapl_find_endpoint_address_match(
struct mca_btl_udapl_module_t* btl,
mca_btl_udapl_addr_t addr);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif