diff --git a/ompi/mca/btl/udapl/btl_udapl_component.c b/ompi/mca/btl/udapl/btl_udapl_component.c index 4321b59b2e..8943832d65 100644 --- a/ompi/mca/btl/udapl/btl_udapl_component.c +++ b/ompi/mca/btl/udapl/btl_udapl_component.c @@ -587,17 +587,51 @@ mca_btl_udapl_component_init (int *num_btl_modules, static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl, DAT_CR_HANDLE cr_handle) { - DAT_EP_HANDLE endpoint; + DAT_EP_HANDLE ep; int rc; + mca_btl_base_endpoint_t* proc_ep; + mca_btl_udapl_addr_t priv_data_in_addr; + int32_t priv_data_in_conn_type; /* incoming endpoint type */ - rc = mca_btl_udapl_endpoint_create(btl, &endpoint); + if (mca_btl_udapl_component.udapl_conn_priv_data) { + DAT_CR_PARAM cr_param; + + /* query the connection request for incoming private data */ + rc = dat_cr_query(cr_handle, + DAT_CR_FIELD_ALL, + &cr_param); + if (rc != DAT_SUCCESS) { + char* major; + char* minor; + + dat_strerror(rc, (const char**)&major, + (const char**)&minor); + BTL_ERROR(("ERROR: %s %s %s\n", "dat_cr_query", + major, minor)); + return OMPI_ERROR; + } + + /* retrieve data from connection request event; + * cr_param contains remote_port_qual but we need to + * match on the psp port and address of remote + * so we get this from the private data. + */ + memcpy(&priv_data_in_addr, + (mca_btl_udapl_addr_t *)cr_param.private_data, + sizeof(mca_btl_udapl_addr_t)); + priv_data_in_conn_type = *(int32_t *) + ((char *)cr_param.private_data + sizeof(mca_btl_udapl_addr_t)); + } + + /* create the endpoint for the incoming connection */ + rc = mca_btl_udapl_endpoint_create(btl, &ep); if(OMPI_SUCCESS != rc) { BTL_ERROR(("ERROR: mca_btl_udapl_endpoint_create")); return OMPI_ERROR; } - rc = dat_cr_accept(cr_handle, endpoint, sizeof(mca_btl_udapl_addr_t), - &btl->udapl_addr); + /* cr_param no longer valid once dat_cr_accept called */ + rc = dat_cr_accept(cr_handle, ep, 0, NULL); if(DAT_SUCCESS != rc) { char* major; char* minor; @@ -609,6 +643,28 @@ static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl, return OMPI_ERROR; } + if (mca_btl_udapl_component.udapl_conn_priv_data) { + /* With accept now in process find a home for the DAT ep by + * matching against the private data that came in on the + * connection request event + */ + + /* find the endpoint which matches the address in data received */ + proc_ep = + mca_btl_udapl_find_endpoint_address_match(btl, priv_data_in_addr); + + if (proc_ep == NULL) { + return OMPI_ERROR; + } + + if (BTL_UDAPL_EAGER_CONNECTION == priv_data_in_conn_type) { + proc_ep->endpoint_eager = ep; + } else { + assert(BTL_UDAPL_MAX_CONNECTION == priv_data_in_conn_type); + proc_ep->endpoint_max = ep; + } + } + return OMPI_SUCCESS; } @@ -1031,10 +1087,10 @@ int mca_btl_udapl_component_progress() /* Both the client and server side of a connection generate this event */ if (mca_btl_udapl_component.udapl_conn_priv_data) { - /* use dat private data to exchange process data */ - mca_btl_udapl_endpoint_finish_connect(btl, - event.event_data.connect_event_data.private_data, - NULL, + /* private data is only valid at this point if this + * event is from a dat_ep_connect call, not an accept + */ + mca_btl_udapl_endpoint_pd_established_conn(btl, event.event_data.connect_event_data.ep_handle); } else { /* explicitly exchange process data */ diff --git a/ompi/mca/btl/udapl/btl_udapl_endpoint.c b/ompi/mca/btl/udapl/btl_udapl_endpoint.c index 19ccdbdfad..f196d5ad75 100644 --- a/ompi/mca/btl/udapl/btl_udapl_endpoint.c +++ b/ompi/mca/btl/udapl/btl_udapl_endpoint.c @@ -44,26 +44,50 @@ #include "btl_udapl_mca.h" #include "btl_udapl_proc.h" -static void mca_btl_udapl_endpoint_send_cb(int status, orte_process_name_t* endpoint, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata); +static void mca_btl_udapl_endpoint_send_cb( + int status, + orte_process_name_t* endpoint, + opal_buffer_t* buffer, + orte_rml_tag_t tag, + void* cbdata); static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint); -static int mca_btl_udapl_endpoint_post_recv(mca_btl_udapl_endpoint_t* endpoint, - size_t size); +static int mca_btl_udapl_endpoint_post_recv( + mca_btl_udapl_endpoint_t* endpoint, + size_t size); void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint); -void mca_btl_udapl_endpoint_recv(int status, orte_process_name_t* endpoint, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata); +void mca_btl_udapl_endpoint_recv( + int status, + orte_process_name_t* endpoint, + opal_buffer_t* buffer, + orte_rml_tag_t tag, + void* cbdata); static int mca_btl_udapl_endpoint_finish_eager(mca_btl_udapl_endpoint_t*); static int mca_btl_udapl_endpoint_finish_max(mca_btl_udapl_endpoint_t*); -static void mca_btl_udapl_endpoint_connect_eager_rdma(mca_btl_udapl_endpoint_t* endpoint); -static int mca_btl_udapl_endpoint_write_eager(mca_btl_base_endpoint_t* endpoint, - mca_btl_udapl_frag_t* frag); -static void mca_btl_udapl_endpoint_control_send_cb(mca_btl_base_module_t* btl, - mca_btl_base_endpoint_t* endpoint, - mca_btl_base_descriptor_t* descriptor, - int status); -static int mca_btl_udapl_endpoint_send_eager_rdma(mca_btl_base_endpoint_t* endpoint); +static mca_btl_base_endpoint_t* mca_btl_udapl_find_endpoint_connection_match( + struct mca_btl_udapl_module_t* btl, + DAT_EP_HANDLE ep); +static int mca_btl_udapl_endpoint_pd_finish_eager( + mca_btl_udapl_endpoint_t* endpoint); +static int mca_btl_udapl_endpoint_pd_finish_max( + mca_btl_udapl_endpoint_t* endpoint); +static int mca_btl_udapl_endpoint_pd_connections_completed( + mca_btl_udapl_endpoint_t* endpoint); +static void mca_btl_udapl_endpoint_connect_eager_rdma( + mca_btl_udapl_endpoint_t* endpoint); +static int mca_btl_udapl_endpoint_write_eager( + mca_btl_base_endpoint_t* endpoint, + mca_btl_udapl_frag_t* frag); +static void +mca_btl_udapl_endpoint_control_send_cb(mca_btl_base_module_t* btl, + mca_btl_base_endpoint_t* endpoint, + mca_btl_base_descriptor_t* descriptor, + int status); +static int mca_btl_udapl_endpoint_send_eager_rdma( + mca_btl_base_endpoint_t* endpoint); +extern void mca_btl_udapl_frag_progress_pending( + mca_btl_udapl_module_t* udapl_btl, + mca_btl_base_endpoint_t* endpoint, + const int connection); /* @@ -598,6 +622,8 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint) { mca_btl_udapl_module_t* btl = endpoint->endpoint_btl; int rc; + char *priv_data_ptr = NULL; + DAT_COUNT priv_data_size = 0; OPAL_THREAD_LOCK(&endpoint->endpoint_lock); OPAL_THREAD_ADD32(&(btl->udapl_connect_inprogress), 1); @@ -621,9 +647,30 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint) goto failure_create; } + /* create private data as required */ + if (mca_btl_udapl_component.udapl_conn_priv_data) { + int32_t priv_data_conn_type = BTL_UDAPL_EAGER_CONNECTION; + + priv_data_size = sizeof(mca_btl_udapl_addr_t) + sizeof(int32_t); + priv_data_ptr = (char *)malloc(priv_data_size); + + if (NULL == priv_data_ptr) { + BTL_ERROR(("ERROR: %s %s\n", "mca_btl_udapl_endpoint_connect", + "out of resources")); + goto failure_create; + } + + /* private data consists of local btl address, listen port (psp), + * and endpoint state to indicate EAGER or MAX endpoint + */ + memcpy(priv_data_ptr, &btl->udapl_addr, sizeof(mca_btl_udapl_addr_t)); + memcpy((priv_data_ptr + sizeof(mca_btl_udapl_addr_t)), + &priv_data_conn_type, sizeof(int32_t)); + } + rc = dat_ep_connect(endpoint->endpoint_eager, &endpoint->endpoint_addr.addr, endpoint->endpoint_addr.port, mca_btl_udapl_component.udapl_timeout, - sizeof(mca_btl_udapl_addr_t), &btl->udapl_addr, 0, DAT_CONNECT_DEFAULT_FLAG); + priv_data_size, priv_data_ptr, 0, DAT_CONNECT_DEFAULT_FLAG); if(DAT_SUCCESS != rc) { char* major; char* minor; @@ -637,6 +684,11 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint) endpoint->endpoint_state = MCA_BTL_UDAPL_CONN_EAGER; OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + + if (mca_btl_udapl_component.udapl_conn_priv_data) { + free(priv_data_ptr); + } + return; failure: @@ -660,66 +712,52 @@ int mca_btl_udapl_endpoint_finish_connect(struct mca_btl_udapl_module_t* btl, int32_t* connection_seq, DAT_EP_HANDLE endpoint) { - mca_btl_udapl_proc_t* proc; mca_btl_base_endpoint_t* ep; - size_t i; - int rc; + int rc = OMPI_SUCCESS; - /* Search for the matching BTL EP */ - for(proc = (mca_btl_udapl_proc_t*) - opal_list_get_first(&mca_btl_udapl_component.udapl_procs); - proc != (mca_btl_udapl_proc_t*) - opal_list_get_end(&mca_btl_udapl_component.udapl_procs); - proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) { - - for(i = 0; i < proc->proc_endpoint_count; i++) { - ep = proc->proc_endpoints[i]; + /* find the endpoint which matches the address in data received */ + ep = mca_btl_udapl_find_endpoint_address_match(btl, *addr); - /* Does this endpoint match? */ - /* TODO - Check that the DAT_CONN_QUAL's match too */ - if(ep->endpoint_btl == btl && - !memcmp(addr, &ep->endpoint_addr, sizeof(DAT_SOCK_ADDR))) { - OPAL_THREAD_LOCK(&ep->endpoint_lock); - if(MCA_BTL_UDAPL_CONN_EAGER == ep->endpoint_state) { - ep->endpoint_connection_seq = (NULL != connection_seq) ? - *connection_seq:0; - ep->endpoint_eager = endpoint; - rc = mca_btl_udapl_endpoint_finish_eager(ep); - } else if(MCA_BTL_UDAPL_CONN_MAX == ep->endpoint_state) { - /* Check to see order of messages received are in - * the same order the actual connections are made. - * If they are not we need to swap the eager and - * max connections. This inversion is possible due - * to a race condition that one process may actually - * receive the sendrecv messages from the max connection - * before the eager connection. - */ - if (NULL == connection_seq || - ep->endpoint_connection_seq < *connection_seq) { - /* normal order connection matching */ - ep->endpoint_max = endpoint; - } else { - /* inverted order connection matching */ - ep->endpoint_max = ep->endpoint_eager; - ep->endpoint_eager = endpoint; - } - - rc = mca_btl_udapl_endpoint_finish_max(ep); - } else { - BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_DIAGNOSE, - ("ERROR: invalid EP state %d\n", - ep->endpoint_state)); - return OMPI_ERROR; - } - return rc; - } - } + if (ep == NULL) { + /* If this point is reached, no matching endpoint was found */ + BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_CRITICAL, + ("ERROR: could not match endpoint\n")); + return OMPI_ERROR; } - /* If this point is reached, no matching endpoint was found */ - BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_DIAGNOSE, - ("btl_udapl ERROR could not match endpoint\n")); - return OMPI_ERROR; + if(MCA_BTL_UDAPL_CONN_EAGER == ep->endpoint_state) { + ep->endpoint_connection_seq = (NULL != connection_seq) ? + *connection_seq:0; + ep->endpoint_eager = endpoint; + rc = mca_btl_udapl_endpoint_finish_eager(ep); + } else if(MCA_BTL_UDAPL_CONN_MAX == ep->endpoint_state) { + /* Check to see order of messages received are in + * the same order the actual connections are made. + * If they are not we need to swap the eager and + * max connections. This inversion is possible due + * to a race condition that one process may actually + * receive the sendrecv messages from the max connection + * before the eager connection. + */ + if (NULL == connection_seq || + ep->endpoint_connection_seq < *connection_seq) { + /* normal order connection matching */ + ep->endpoint_max = endpoint; + } else { + /* inverted order connection matching */ + ep->endpoint_max = ep->endpoint_eager; + ep->endpoint_eager = endpoint; + } + + rc = mca_btl_udapl_endpoint_finish_max(ep); + } else { + BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_DIAGNOSE, + ("ERROR: invalid EP state %d\n", + ep->endpoint_state)); + return OMPI_ERROR; + } + + return rc; } @@ -738,7 +776,7 @@ static int mca_btl_udapl_endpoint_finish_eager( /* establish eager rdma connection */ if ((1 == mca_btl_udapl_component.udapl_use_eager_rdma) && - (btl->udapl_eager_rdma_endpoint_count < + (btl->udapl_eager_rdma_endpoint_count < mca_btl_udapl_component.udapl_max_eager_rdma_peers)) { mca_btl_udapl_endpoint_connect_eager_rdma(endpoint); } @@ -758,7 +796,7 @@ static int mca_btl_udapl_endpoint_finish_eager( rc = dat_ep_connect(endpoint->endpoint_max, &endpoint->endpoint_addr.addr, endpoint->endpoint_addr.port, mca_btl_udapl_component.udapl_timeout, - sizeof(mca_btl_udapl_addr_t),&btl->udapl_addr , 0, + 0, NULL, 0, DAT_CONNECT_DEFAULT_FLAG); if(DAT_SUCCESS != rc) { char* major; @@ -779,22 +817,20 @@ static int mca_btl_udapl_endpoint_finish_eager( static int mca_btl_udapl_endpoint_finish_max(mca_btl_udapl_endpoint_t* endpoint) { - mca_btl_udapl_frag_t* frag; int ret = OMPI_SUCCESS; - int token_avail; - int queue_len; - int i; + mca_btl_udapl_module_t* udapl_btl = endpoint->endpoint_btl; endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTED; OPAL_THREAD_ADD32(&(endpoint->endpoint_btl->udapl_connect_inprogress), -1); - /* post eager/max recv buffers */ + /* post eager recv buffers */ ret = mca_btl_udapl_endpoint_post_recv(endpoint, mca_btl_udapl_component.udapl_eager_frag_size); if (OMPI_SUCCESS != ret) { return ret; } + /* post max recv buffers */ ret = mca_btl_udapl_endpoint_post_recv(endpoint, mca_btl_udapl_component.udapl_max_frag_size); if (OMPI_SUCCESS != ret) { @@ -802,44 +838,317 @@ static int mca_btl_udapl_endpoint_finish_max(mca_btl_udapl_endpoint_t* endpoint) } /* progress eager frag queue as allowed */ - queue_len = opal_list_get_size(&(endpoint->endpoint_eager_frags)); - BTL_UDAPL_TOKEN_AVAIL(endpoint, BTL_UDAPL_EAGER_CONNECTION, token_avail); - - for(i = 0; i < queue_len && token_avail > 0; i++) { - - frag = (mca_btl_udapl_frag_t*)opal_list_remove_first(&(endpoint->endpoint_eager_frags)); - - if(NULL == frag) { - break; - } - - mca_btl_udapl_endpoint_send(frag->endpoint, frag); - - BTL_UDAPL_TOKEN_AVAIL(endpoint, BTL_UDAPL_EAGER_CONNECTION, - token_avail); - } + mca_btl_udapl_frag_progress_pending(udapl_btl, endpoint, + BTL_UDAPL_EAGER_CONNECTION); /* progress max frag queue as allowed */ - queue_len = opal_list_get_size(&(endpoint->endpoint_max_frags)); - BTL_UDAPL_TOKEN_AVAIL(endpoint, BTL_UDAPL_MAX_CONNECTION, token_avail); - - for(i = 0; i < queue_len && token_avail > 0; i++) { - - frag = (mca_btl_udapl_frag_t*)opal_list_remove_first(&(endpoint->endpoint_max_frags)); - - if(NULL == frag) { - break; - } - - mca_btl_udapl_endpoint_send(frag->endpoint, frag); - - BTL_UDAPL_TOKEN_AVAIL(endpoint, BTL_UDAPL_MAX_CONNECTION, token_avail); - } + mca_btl_udapl_frag_progress_pending(udapl_btl, endpoint, + BTL_UDAPL_MAX_CONNECTION); return ret; } +/* + * Utility routine. Search list of endpoints to find one that matches + * the given address. + * + * @param btl (IN) BTL module + * @param addr (IN) Address used to find endpoint to be returned + * + * @return Pointer to the base endpoint matching addr or NULL + */ +mca_btl_base_endpoint_t* +mca_btl_udapl_find_endpoint_address_match(struct mca_btl_udapl_module_t* btl, + mca_btl_udapl_addr_t addr) +{ + size_t i; + mca_btl_udapl_proc_t *proc; + mca_btl_base_endpoint_t *proc_ep; + mca_btl_base_endpoint_t *endpoint = NULL; + + for(proc = (mca_btl_udapl_proc_t*) + opal_list_get_first(&mca_btl_udapl_component.udapl_procs); + proc != (mca_btl_udapl_proc_t*) + opal_list_get_end(&mca_btl_udapl_component.udapl_procs); + proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) { + + for(i = 0; i < proc->proc_endpoint_count; i++) { + proc_ep = proc->proc_endpoints[i]; + + if(proc_ep->endpoint_btl == btl && + !memcmp(&addr, &proc_ep->endpoint_addr, + (sizeof(DAT_CONN_QUAL) + sizeof(DAT_SOCK_ADDR)))) { + + /* match found */ + endpoint = proc_ep; + return endpoint; + } + } + } + + return endpoint; +} + + +/* + * Utility routine. Search list of endpoints to find one that matches + * the given DAT endpoint handle, this could either be the eager or + * max ep. + * + * @param btl (IN) BTL module + * @param ep (IN) EP handle used to find endpoint to be returned + * + * @return Pointer to the base endpoint matching addr or NULL + */ +static mca_btl_base_endpoint_t* +mca_btl_udapl_find_endpoint_connection_match(struct mca_btl_udapl_module_t* btl, + DAT_EP_HANDLE ep) +{ + size_t i; + mca_btl_udapl_proc_t *proc; + mca_btl_base_endpoint_t *proc_ep; + mca_btl_base_endpoint_t *endpoint = NULL; + + for(proc = (mca_btl_udapl_proc_t*) + opal_list_get_first(&mca_btl_udapl_component.udapl_procs); + proc != (mca_btl_udapl_proc_t*) + opal_list_get_end(&mca_btl_udapl_component.udapl_procs); + proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) { + + for(i = 0; i < proc->proc_endpoint_count; i++) { + proc_ep = proc->proc_endpoints[i]; + + if(proc_ep->endpoint_btl == btl) { + if (ep == proc_ep->endpoint_eager || + ep == proc_ep->endpoint_max) { + /* match found */ + endpoint = proc_ep; + return endpoint; + } else { + continue; + } + } + } + } + + return endpoint; +} + + +/* + * Private Data connection establishment process. Operations to be + * performed once the eager connection of the given endpoint has + * completed. + * + * @param btl (IN) BTL module + * @param endpoint (IN) BTL addressing information + * + * @return OMPI_SUCCESS or error status on failure + */ +static int mca_btl_udapl_endpoint_pd_finish_eager( + mca_btl_udapl_endpoint_t* endpoint) +{ + mca_btl_udapl_module_t* btl = endpoint->endpoint_btl; + int rc = OMPI_SUCCESS; + char *priv_data_ptr = NULL; + DAT_COUNT priv_data_size = 0; + + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); + endpoint->endpoint_state = MCA_BTL_UDAPL_CONN_MAX; + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + + /* initiate the eager rdma connection */ + if ((1 == mca_btl_udapl_component.udapl_use_eager_rdma) && + (btl->udapl_eager_rdma_endpoint_count < + mca_btl_udapl_component.udapl_max_eager_rdma_peers)) { + mca_btl_udapl_endpoint_connect_eager_rdma(endpoint); + } + + /* Only one side does dat_ep_connect() and if by chance the + * connection is already established we don't need to bother + * with this. + */ + if((BTL_UDAPL_NUM_CONNECTION != endpoint->endpoint_connections_completed) + && (0 < orte_util_compare_name_fields(ORTE_NS_CMP_ALL, + &endpoint->endpoint_proc->proc_guid, + &ompi_proc_local()->proc_name))) { + + rc = mca_btl_udapl_endpoint_create(btl, &endpoint->endpoint_max); + if(DAT_SUCCESS != rc) { + endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; + return OMPI_ERROR; + } + + if (mca_btl_udapl_component.udapl_conn_priv_data) { + int32_t priv_data_conn_type = BTL_UDAPL_MAX_CONNECTION; + + priv_data_size = (sizeof(mca_btl_udapl_addr_t) + sizeof(int32_t)); + priv_data_ptr = (char *)malloc(priv_data_size); + + if (NULL == priv_data_ptr) { + BTL_ERROR(("ERROR: %s %s\n", + "mca_btl_udapl_endpoint_pd_finish_eager", + "out of resources")); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + /* private data consists of local btl address, listen port (psp), + * and endpoint state to indicate EAGER or MAX endpoint + */ + memcpy(priv_data_ptr, &btl->udapl_addr, + sizeof(mca_btl_udapl_addr_t)); + memcpy((priv_data_ptr + sizeof(mca_btl_udapl_addr_t)), + &priv_data_conn_type, sizeof(int32_t)); + } + + rc = dat_ep_connect(endpoint->endpoint_max, + &endpoint->endpoint_addr.addr, endpoint->endpoint_addr.port, + mca_btl_udapl_component.udapl_timeout, + priv_data_size, priv_data_ptr, 0, + DAT_CONNECT_DEFAULT_FLAG); + + if (mca_btl_udapl_component.udapl_conn_priv_data) { + free(priv_data_ptr); + } + + if(DAT_SUCCESS != rc) { + char* major; + char* minor; + + dat_strerror(rc, (const char**)&major, + (const char**)&minor); + BTL_ERROR(("ERROR: %s %s %s\n", "dat_ep_connect", + major, minor)); + dat_ep_free(endpoint->endpoint_max); + return OMPI_ERROR; + } + } + + /* post eager recv buffers */ + rc = mca_btl_udapl_endpoint_post_recv(endpoint, + mca_btl_udapl_component.udapl_eager_frag_size); + if (OMPI_SUCCESS != rc) { + return rc; + } + + /* Not progressing here because the entire endpoint needs to be + * marked MCA_BTL_UDAPL_CONNECTED, otherwise + * mca_btl_udapl_endpoint_send() will just put queued sends back on + * the queue. + */ + + return OMPI_SUCCESS; +} + + +/* + * Private Data connection establishment process. Operations to be + * performed once the max connection of the given endpoint has + * completed. + * + * @param btl (IN) BTL module + * @param endpoint (IN) BTL addressing information + * + * @return OMPI_SUCCESS or error status on failure + */ +static int +mca_btl_udapl_endpoint_pd_finish_max(mca_btl_udapl_endpoint_t* endpoint) +{ + int rc = OMPI_SUCCESS; + + /* post max recv buffers */ + rc = mca_btl_udapl_endpoint_post_recv(endpoint, + mca_btl_udapl_component.udapl_max_frag_size); + + /* Not progressing here because the entire endpoint needs to be + * marked MCA_BTL_UDAPL_CONNECTED otherwise + * mca_btl_udapl_endpoint_send() will just put queued sends back on + * the queue. + */ + + return rc; +} + + +/* + * Private Data connection establishment process. Operations to be + * performed once both the eager and max max connections of the given + * endpoint has completed. + * + * @param endpoint (IN) BTL addressing information + * + * @return OMPI_SUCCESS or error status on failure */ +static int +mca_btl_udapl_endpoint_pd_connections_completed(mca_btl_udapl_endpoint_t* endpoint) +{ + int rc = OMPI_SUCCESS; + mca_btl_udapl_module_t* udapl_btl = endpoint->endpoint_btl; + + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); + endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTED; + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); + + OPAL_THREAD_ADD32(&(endpoint->endpoint_btl->udapl_connect_inprogress), -1); + + /* progress eager frag queue */ + mca_btl_udapl_frag_progress_pending(udapl_btl, endpoint, + BTL_UDAPL_EAGER_CONNECTION); + + /* progress max frag queue */ + mca_btl_udapl_frag_progress_pending(udapl_btl, endpoint, + BTL_UDAPL_MAX_CONNECTION); + + return rc; +} + + +/* + * Private Data connection establishment process. Called once the + * DAT_CONNECTION_EVENT_ESTABLISHED is dequeued from the connecton + * event dispatcher (evd). This event is the local completion event + * for both the dat_ep_connect and dat_cr_accpept calls. + * + * @param btl (IN) BTL module + * @param ep (IN) EP handle used to find endpoint to be returned + * + * @return Pointer to the base endpoint matching addr + */ +int +mca_btl_udapl_endpoint_pd_established_conn(struct mca_btl_udapl_module_t* btl, + DAT_EP_HANDLE established_ep) +{ + int rc = OMPI_SUCCESS; + mca_btl_base_endpoint_t* proc_ep = NULL; + + /* search for ep and decide what to do next */ + proc_ep = + mca_btl_udapl_find_endpoint_connection_match(btl, established_ep); + + if (proc_ep == NULL) { + /* If this point is reached, no matching endpoint was found */ + BTL_UDAPL_VERBOSE_OUTPUT(VERBOSE_CRITICAL, + ("ERROR: could not match endpoint\n")); + return OMPI_ERROR; + } + + proc_ep->endpoint_connections_completed++; + + if (established_ep == proc_ep->endpoint_eager) { + rc = mca_btl_udapl_endpoint_pd_finish_eager(proc_ep); + } else if (established_ep == proc_ep->endpoint_max) { + rc = mca_btl_udapl_endpoint_pd_finish_max(proc_ep); + } + + if (rc == OMPI_SUCCESS && BTL_UDAPL_NUM_CONNECTION == + proc_ep->endpoint_connections_completed) { + rc = mca_btl_udapl_endpoint_pd_connections_completed(proc_ep); + } + + return rc; +} + + /* * Post receive buffers for a newly established endpoint connection. */ @@ -916,6 +1225,7 @@ static void mca_btl_udapl_endpoint_construct(mca_btl_base_endpoint_t* endpoint) endpoint->endpoint_proc = 0; endpoint->endpoint_connection_seq = 0; + endpoint->endpoint_connections_completed = 0;; endpoint->endpoint_eager_sends = mca_btl_udapl_component.udapl_num_sends; endpoint->endpoint_max_sends = mca_btl_udapl_component.udapl_num_sends; @@ -997,7 +1307,7 @@ static void mca_btl_udapl_endpoint_control_send_cb( mca_btl_udapl_frag_t* frag = (mca_btl_udapl_frag_t*)descriptor; if(frag->size != mca_btl_udapl_component.udapl_eager_frag_size) { - connection = BTL_UDAPL_MAX_CONNECTION; + connection = BTL_UDAPL_MAX_CONNECTION; } /* control messages are not part of the regular accounting @@ -1111,7 +1421,7 @@ static int mca_btl_udapl_endpoint_send_eager_rdma( OPAL_THREAD_LOCK(&endpoint->endpoint_lock); opal_list_append(&endpoint->endpoint_eager_frags, - (opal_list_item_t*)data_frag); + (opal_list_item_t*)data_frag); OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); return rc; diff --git a/ompi/mca/btl/udapl/btl_udapl_endpoint.h b/ompi/mca/btl/udapl/btl_udapl_endpoint.h index f3db2a8eaf..2701c13a00 100644 --- a/ompi/mca/btl/udapl/btl_udapl_endpoint.h +++ b/ompi/mca/btl/udapl/btl_udapl_endpoint.h @@ -11,7 +11,7 @@ * All rights reserved. * Copyright (c) 2006 Sandia National Laboratories. All rights * reserved. - * Copyright (c) 2006-2008 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2006-2009 Sun Microsystems, Inc. All rights reserved. * * $COPYRIGHT$ * @@ -130,6 +130,9 @@ struct mca_btl_base_endpoint_t { int32_t endpoint_connection_seq; /**< sequence number of sendrecv message for the connection est */ + int32_t endpoint_connections_completed; + /**< count of completed connections for priv data connection est. */ + opal_mutex_t endpoint_lock; /**< lock for concurrent access to endpoint state */ @@ -199,6 +202,21 @@ int mca_btl_udapl_endpoint_create(struct mca_btl_udapl_module_t* btl, int mca_btl_udapl_endpoint_send_sr_credits(mca_btl_base_endpoint_t* endpoint, const int connection); +/* + * Handle the established DAT endpoint when private data is in use + */ +int mca_btl_udapl_endpoint_pd_established_conn( + struct mca_btl_udapl_module_t* btl, + DAT_EP_HANDLE established_ep); + +/* + * Utility routine. Search list of endpoints to find one that matches + * the given address. + */ +mca_btl_udapl_endpoint_t* mca_btl_udapl_find_endpoint_address_match( + struct mca_btl_udapl_module_t* btl, + mca_btl_udapl_addr_t addr); + #if defined(c_plusplus) || defined(__cplusplus) } #endif