From cf9246f7b966db751903a373e242791d6884301b Mon Sep 17 00:00:00 2001 From: Andrew Friedley Date: Tue, 21 Mar 2006 00:12:55 +0000 Subject: [PATCH] Long overdue commit.. many changes. In short, I'm very close to having connection establishment and eager send/recv working. Part of the connection process involves sending address information from the client to server. For some reason, I am never receiving an event indicating completetion of the send on the client side. Otherwise, connection establishment is working and eager send/recv should be trivial from here. Some more detailed changes: - Send partially implemented, just handles starting up new connections. - Several support functions implemented for establishing connection. Client side code went in btl_udapl_endpoint.c, server side in btl_udapl_component.c - Frags list and send/recv locks added to the endpoint structure. - BTL sets up a public service point, which listens for new connections. Steps over ports that are already bound, iterating through a range of ports. - Remove any traces of recv frags, don't think I need them after all. - Pieces of component_progress() implemented for connection establishment. - Frags have two new types for connection establishment - CONN_SEND and CONN_RECV. - Many other minor cleanups not affecting functionality This commit was SVN r9345. --- ompi/mca/btl/udapl/btl_udapl.c | 92 ++++++----- ompi/mca/btl/udapl/btl_udapl.h | 10 +- ompi/mca/btl/udapl/btl_udapl_component.c | 196 ++++++++++++++++++++--- ompi/mca/btl/udapl/btl_udapl_endpoint.c | 104 ++++++++++++ ompi/mca/btl/udapl/btl_udapl_endpoint.h | 18 ++- ompi/mca/btl/udapl/btl_udapl_frag.c | 19 +++ ompi/mca/btl/udapl/btl_udapl_frag.h | 7 +- ompi/mca/btl/udapl/btl_udapl_proc.c | 4 +- 8 files changed, 379 insertions(+), 71 deletions(-) diff --git a/ompi/mca/btl/udapl/btl_udapl.c b/ompi/mca/btl/udapl/btl_udapl.c index ebd26c2ed8..80888d1038 100644 --- a/ompi/mca/btl/udapl/btl_udapl.c +++ b/ompi/mca/btl/udapl/btl_udapl.c @@ -30,10 +30,10 @@ #include "ompi/datatype/convertor.h" #include "ompi/datatype/datatype.h" #include "ompi/mca/mpool/base/base.h" -/*#include "ompi/mca/mpool/mpool.h"*/ #include "ompi/mca/mpool/udapl/mpool_udapl.h" #include "ompi/proc/proc.h" + mca_btl_udapl_module_t mca_btl_udapl_module = { { &mca_btl_udapl_component.super, @@ -67,9 +67,10 @@ mca_btl_udapl_module_t mca_btl_udapl_module = { */ int -mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl) +mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t* btl) { mca_mpool_base_resources_t res; + DAT_CONN_QUAL port; DAT_IA_ATTR attr; DAT_RETURN rc; @@ -86,7 +87,7 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl) rc = dat_pz_create(btl->udapl_ia, &btl->udapl_pz); if(DAT_SUCCESS != rc) { mca_btl_udapl_error(rc, "dat_pz_create"); - return OMPI_ERROR; + goto failure; } /* query to get address information */ @@ -95,8 +96,7 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl) DAT_IA_FIELD_IA_ADDRESS_PTR, &attr, DAT_IA_FIELD_NONE, NULL); if(DAT_SUCCESS != rc) { mca_btl_udapl_error(rc, "dat_ia_query"); - dat_ia_close(btl->udapl_ia, DAT_CLOSE_GRACEFUL_FLAG); - return OMPI_ERROR; + goto failure; } memcpy(&btl->udapl_addr.addr, attr.ia_address_ptr, sizeof(DAT_SOCK_ADDR)); @@ -107,8 +107,7 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl) DAT_EVD_DTO_FLAG | DAT_EVD_RMR_BIND_FLAG, &btl->udapl_evd_dto); if(DAT_SUCCESS != rc) { mca_btl_udapl_error(rc, "dat_evd_create (dto)"); - dat_ia_close(btl->udapl_ia, DAT_CLOSE_GRACEFUL_FLAG); - return OMPI_ERROR; + goto failure; } rc = dat_evd_create(btl->udapl_ia, @@ -116,11 +115,32 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl) DAT_EVD_CR_FLAG | DAT_EVD_CONNECTION_FLAG, &btl->udapl_evd_conn); if(DAT_SUCCESS != rc) { mca_btl_udapl_error(rc, "dat_evd_create (conn)"); - dat_evd_free(btl->udapl_evd_dto); - dat_ia_close(btl->udapl_ia, DAT_CLOSE_GRACEFUL_FLAG); - return OMPI_ERROR; + goto failure; } + /* create our public service point */ + /* We have to specify a port, so we go through a range until we + find a port that works */ + for(port = mca_btl_udapl_component.udapl_port_low; + port <= mca_btl_udapl_component.udapl_port_high; port++) { + + rc = dat_psp_create(btl->udapl_ia, port, btl->udapl_evd_conn, + DAT_PSP_CONSUMER_FLAG, &btl->udapl_psp); + if(DAT_SUCCESS == rc) { + break; + } else if(DAT_CONN_QUAL_IN_USE != rc) { + mca_btl_udapl_error(rc, "dat_psp_create"); + goto failure; + } + } + + if(port == mca_btl_udapl_component.udapl_port_high) { + goto failure; + } + + /* Save the port with the address information */ + btl->udapl_addr.port = port; + /* initialize the memory pool */ res.udapl_ia = btl->udapl_ia; res.udapl_pz = btl->udapl_pz; @@ -165,22 +185,13 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl) mca_btl_udapl_component.udapl_free_list_inc, NULL); - ompi_free_list_init(&btl->udapl_frag_recv, - sizeof(mca_btl_udapl_frag_recv_t), - OBJ_CLASS(mca_btl_udapl_frag_recv_t), - mca_btl_udapl_component.udapl_free_list_num, - mca_btl_udapl_component.udapl_free_list_max, - mca_btl_udapl_component.udapl_free_list_inc, - btl->super.btl_mpool); - - /* Connections are done lazily - the process doing the send acts as a client - when initiating the connect. progress should always be checking for - incoming connections, and establishing them when they arrive. When - connection is established, recv's are posted. */ - /* TODO - post receives */ /* TODO - can I always use SRQ, or just on new enough uDAPLs? */ return OMPI_SUCCESS; + +failure: + dat_ia_close(btl->udapl_ia, DAT_CLOSE_ABRUPT_FLAG); + return OMPI_ERROR; } @@ -243,12 +254,12 @@ int mca_btl_udapl_add_procs( /* * Check to make sure that the peer has at least as many interface * addresses exported as we are trying to use. If not, then - * don't bind this PTL instance to the proc. + * don't bind this BTL instance to the proc. */ OPAL_THREAD_LOCK(&udapl_proc->proc_lock); - /* The btl_proc datastructure is shared by all uDAPL PTL + /* The btl_proc datastructure is shared by all uDAPL BTL * instances that are trying to reach this destination. * Cache the peer instance on the btl_proc. */ @@ -265,10 +276,12 @@ int mca_btl_udapl_add_procs( OPAL_THREAD_UNLOCK(&udapl_proc->proc_lock); continue; } + ompi_bitmap_set_bit(reachable, i); OPAL_THREAD_UNLOCK(&udapl_proc->proc_lock); peers[i] = udapl_endpoint; } + return OMPI_SUCCESS; } @@ -323,14 +336,22 @@ mca_btl_base_descriptor_t* mca_btl_udapl_alloc( MCA_BTL_UDAPL_FRAG_ALLOC_EAGER(udapl_btl, frag, rc); frag->segment.seg_len = size <= btl->btl_eager_limit ? - size : btl->btl_eager_limit ; + size : btl->btl_eager_limit; } else { MCA_BTL_UDAPL_FRAG_ALLOC_MAX(udapl_btl, frag, rc); frag->segment.seg_len = size <= btl->btl_max_send_size ? - size : btl->btl_max_send_size ; + size : btl->btl_max_send_size; } - + + /* TODO - this the right place for this? */ + if(OMPI_SUCCESS != mca_mpool_udapl_register(btl->btl_mpool, + frag->segment.seg_addr.pval, size, 0, &frag->registration)) { + /* TODO - handle this fully */ + return NULL; + } + + frag->btl = udapl_btl; frag->base.des_src = &frag->segment; frag->base.des_src_cnt = 1; frag->base.des_dst = NULL; @@ -416,9 +437,7 @@ mca_btl_base_descriptor_t* mca_btl_udapl_prepare_src( /* bump reference count as so that the registration * doesn't go away when the operation completes */ - btl->btl_mpool->mpool_retain(btl->btl_mpool, - (mca_mpool_base_registration_t*) registration); - + btl->btl_mpool->mpool_retain(btl->btl_mpool, registration); frag->registration = registration; /* @@ -615,16 +634,7 @@ int mca_btl_udapl_send( frag->hdr->tag = tag; frag->type = MCA_BTL_UDAPL_SEND; - /* Check if we are connected to this peer. - Should be three states we care about - - connected, connecting, disconnected. - If no connection exists, request the connection and queue the send. - If a connection is pending, queue the send - If the connection is established, fire off the send. - need to consider locking around the connection state and queue. - */ - - return OMPI_SUCCESS; + return mca_btl_udapl_endpoint_send(endpoint, frag); } diff --git a/ompi/mca/btl/udapl/btl_udapl.h b/ompi/mca/btl/udapl/btl_udapl.h index 7ddafe210d..f5706b8347 100644 --- a/ompi/mca/btl/udapl/btl_udapl.h +++ b/ompi/mca/btl/udapl/btl_udapl.h @@ -56,11 +56,15 @@ struct mca_btl_udapl_component_t { size_t udapl_num_mru; size_t udapl_evd_qlen; int32_t udapl_num_repost; + int32_t udapl_timeout; /**< connection timeout, in microseconds */ int udapl_debug; /**< turn on debug output */ size_t udapl_eager_frag_size; size_t udapl_max_frag_size; + DAT_CONN_QUAL udapl_port_low; /**< first port for binding service point */ + DAT_CONN_QUAL udapl_port_high; /**< last port for binding service point */ + int udapl_free_list_num; /**< initial size of free lists */ int udapl_free_list_max; /**< maximum size of free lists */ int udapl_free_list_inc; /**< number of elements to alloc when growing */ @@ -84,11 +88,12 @@ struct mca_btl_udapl_module_t { mca_btl_base_recv_reg_t udapl_reg[256]; mca_btl_udapl_addr_t udapl_addr; - /* interface handle and protection zone */ + /* uDAPL interface and other handles */ DAT_IA_HANDLE udapl_ia; DAT_PZ_HANDLE udapl_pz; + DAT_PSP_HANDLE udapl_psp; - /* event dispatchers - default, data transfer, connection negotiation */ + /* event dispatchers - async, data transfer, connection negotiation */ DAT_EVD_HANDLE udapl_evd_async; DAT_EVD_HANDLE udapl_evd_dto; DAT_EVD_HANDLE udapl_evd_conn; @@ -141,6 +146,7 @@ extern mca_btl_base_module_t** mca_btl_udapl_component_init( /** * uDAPL component progress. */ + extern int mca_btl_udapl_component_progress(void); diff --git a/ompi/mca/btl/udapl/btl_udapl_component.c b/ompi/mca/btl/udapl/btl_udapl_component.c index b8fb1eb4c0..a312bf0fb3 100644 --- a/ompi/mca/btl/udapl/btl_udapl_component.c +++ b/ompi/mca/btl/udapl/btl_udapl_component.c @@ -29,9 +29,11 @@ #include "opal/mca/base/mca_base_param.h" #include "orte/mca/errmgr/errmgr.h" #include "ompi/mca/mpool/base/base.h" +#include "ompi/mca/mpool/udapl/mpool_udapl.h" #include "btl_udapl.h" #include "btl_udapl_frag.h" #include "btl_udapl_endpoint.h" +#include "btl_udapl_proc.h" #include "ompi/mca/btl/base/base.h" #include "ompi/mca/btl/base/btl_base_error.h" #include "ompi/datatype/convertor.h" @@ -39,6 +41,15 @@ #include "orte/util/proc_info.h" #include "ompi/mca/pml/base/pml_base_module_exchange.h" + +/* + * Local functions + */ + +static int mca_btl_udapl_finish_connect(mca_btl_udapl_module_t* btl, + mca_btl_udapl_frag_t* frag, + DAT_EP_HANDLE endpoint); + mca_btl_udapl_component_t mca_btl_udapl_component = { { /* First, the mca_base_component_t struct containing meta information @@ -160,6 +171,12 @@ int mca_btl_udapl_component_open(void) mca_btl_udapl_param_register_int("num_repost", 4); mca_btl_udapl_component.udapl_num_mru = mca_btl_udapl_param_register_int("num_mru", 64); + mca_btl_udapl_component.udapl_port_low = + mca_btl_udapl_param_register_int("port_low", 45000); + mca_btl_udapl_component.udapl_port_high = + mca_btl_udapl_param_register_int("port_high", 47000); + mca_btl_udapl_component.udapl_timeout = + mca_btl_udapl_param_register_int("timeout", 10000000); /* register uDAPL module parameters */ mca_btl_udapl_module.super.btl_exclusivity = @@ -177,7 +194,7 @@ int mca_btl_udapl_component_open(void) mca_btl_udapl_module.super.btl_bandwidth = mca_btl_udapl_param_register_int("bandwidth", 225); - /* TODO - computer udapl_eager_frag_size and udapl_max_frag_size */ + /* cmpute udapl_eager_frag_size and udapl_max_frag_size */ mca_btl_udapl_component.udapl_eager_frag_size = mca_btl_udapl_module.super.btl_eager_limit; mca_btl_udapl_component.udapl_max_frag_size = @@ -221,9 +238,6 @@ mca_btl_udapl_modex_send(void) size = sizeof(mca_btl_udapl_addr_t) * mca_btl_udapl_component.udapl_num_btls; - OPAL_OUTPUT((0, "udapl_modex_send %d addrs %d bytes\n", - mca_btl_udapl_component.udapl_num_btls, size)); - if (0 != size) { addrs = (mca_btl_udapl_addr_t *)malloc(size); if (NULL == addrs) { @@ -299,8 +313,6 @@ mca_btl_udapl_component_init (int *num_btl_modules, /* initialize this BTL */ /* TODO - make use of the thread-safety info in datinfo also */ if(OMPI_SUCCESS != mca_btl_udapl_init(datinfo[i].ia_name, btl)) { - opal_output(0, "udapl module init for %s failed\n", - datinfo[i].ia_name); free(btl); continue; } @@ -345,19 +357,101 @@ mca_btl_udapl_component_init (int *num_btl_modules, } +static int mca_btl_udapl_finish_connect(mca_btl_udapl_module_t* btl, + mca_btl_udapl_frag_t* frag, + DAT_EP_HANDLE endpoint) +{ + mca_btl_udapl_proc_t* proc; + mca_btl_base_endpoint_t* ep; + mca_btl_udapl_addr_t* addr; + size_t i; + + addr = (mca_btl_udapl_addr_t*)frag->hdr; + + OPAL_THREAD_LOCK(&mca_btl_udapl_component.udapl_lock); + for(proc = (mca_btl_udapl_proc_t*) + opal_list_get_first(&mca_btl_udapl_component.udapl_procs); + proc != (mca_btl_udapl_proc_t*) + opal_list_get_end(&mca_btl_udapl_component.udapl_procs); + proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) { + + for(i = 0; i < proc->proc_endpoint_count; i++) { + ep = proc->proc_endpoints[i]; + + /* Does this endpoint match? */ + if(ep->endpoint_btl == btl && + !memcmp(addr, &ep->endpoint_addr, + sizeof(mca_btl_udapl_addr_t))) { + ep->endpoint_ep = endpoint; + OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock); + OPAL_OUTPUT((0, "btl_udapl matched endpoint! HAPPY DANCE!!!\n")); + return OMPI_SUCCESS; + } + } + } + + /* If this point is reached, no matching endpoint was found */ + OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock); + OPAL_OUTPUT((0, "btl_udapl ERROR could not match endpoint\n")); + return OMPI_ERROR; +} + + +static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl, + DAT_CR_HANDLE cr_handle) +{ + mca_btl_udapl_frag_t* frag; + DAT_DTO_COOKIE cookie; + DAT_EP_HANDLE endpoint; + int rc; + + rc = dat_ep_create(btl->udapl_ia, btl->udapl_pz, + btl->udapl_evd_dto, btl->udapl_evd_dto, + btl->udapl_evd_conn, NULL, &endpoint); + if(DAT_SUCCESS != rc) { + mca_btl_udapl_error(rc, "dat_ep_create"); + return OMPI_ERROR; + } + + rc = dat_cr_accept(cr_handle, endpoint, 0, NULL); + if(DAT_SUCCESS != rc) { + mca_btl_udapl_error(rc, "dat_cr_accept"); + return OMPI_ERROR; + } + + /* Post a receive to get the address data */ + frag = (mca_btl_udapl_frag_t*)mca_btl_udapl_alloc( + (mca_btl_base_module_t*)btl, sizeof(mca_btl_udapl_addr_t)); + + memcpy(frag->hdr, &btl->udapl_addr, sizeof(mca_btl_udapl_addr_t)); + frag->endpoint = NULL; + frag->type = MCA_BTL_UDAPL_CONN_RECV; + cookie.as_ptr = frag; + + rc = dat_ep_post_recv(endpoint, 1, + &((mca_mpool_udapl_registration_t*)frag->registration)->lmr_triplet, + cookie, DAT_COMPLETION_DEFAULT_FLAG); + if(DAT_SUCCESS != rc) { + mca_btl_udapl_error(rc, "dat_ep_post_send"); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + + /* * uDAPL component progress. */ - int mca_btl_udapl_component_progress() { mca_btl_udapl_module_t* btl; + mca_btl_udapl_frag_t* frag; static int32_t inprogress = 0; DAT_EVENT event; int count = 0; size_t i; - int rc; /* prevent deadlock - only one thread should be 'progressing' at a time */ if(OPAL_THREAD_ADD32(&inprogress, 1) > 1) { @@ -365,8 +459,6 @@ int mca_btl_udapl_component_progress() return OMPI_SUCCESS; } - OPAL_OUTPUT((0, "udapl_component_progress\n")); - /* check for work to do on each uDAPL btl */ for(i = 0; i < mca_btl_udapl_component.udapl_num_btls; i++) { btl = mca_btl_udapl_component.udapl_btls[i]; @@ -375,35 +467,93 @@ int mca_btl_udapl_component_progress() /* Check DTO EVD */ while(DAT_SUCCESS == dat_evd_dequeue(btl->udapl_evd_dto, &event)) { + DAT_DTO_COMPLETION_EVENT_DATA* dto; + switch(event.event_number) { case DAT_DTO_COMPLETION_EVENT: + OPAL_OUTPUT((0, "btl_udapl DTO completion\n")); + /* questions to answer: + should i use separate endpoints for eager/max frags? + i need to do this if i only want to post recv's for + the exact eager/max size, and uDAPL won't just pick + a large enough buffer + + how about just worrying about eager frags for now? + */ + dto = &event.event_data.dto_completion_event_data; + + /* Was the DTO successful? */ + if(DAT_DTO_SUCCESS != dto->status) { + OPAL_OUTPUT((0, + "btl_udapl DTO error %d\n", dto->status)); + break; + } + + frag = dto->user_cookie.as_ptr; + + switch(frag->type) { + case MCA_BTL_UDAPL_SEND: + /* TODO - write me */ + break; + case MCA_BTL_UDAPL_CONN_SEND: + /* Set the endpoint state to connected */ + OPAL_OUTPUT((0, + "btl_udapl SEND SIDE CONNECT COMPLETED!!\n")); + frag->endpoint->endpoint_state = + MCA_BTL_UDAPL_CONNECTED; + + /* TODO - fire off any queued sends */ + + /* Retire the fragment */ + MCA_BTL_UDAPL_FRAG_RETURN_EAGER(btl, frag); + break; + case MCA_BTL_UDAPL_CONN_RECV: + /* Just got the address data we need for completing + a new connection - match endpoints */ + mca_btl_udapl_finish_connect(btl, frag, dto->ep_handle); + + /* Retire the fragment */ + MCA_BTL_UDAPL_FRAG_RETURN_EAGER(btl, frag); + break; +#ifdef OMPI_ENABLE_DEBUG + default: + OPAL_OUTPUT((0, "WARNING unknown frag type: %d\n", + frag->type)); +#endif + } count++; break; +#ifdef OMPI_ENABLE_DEBUG default: OPAL_OUTPUT((0, "WARNING unknown dto event: %d\n", event.event_number)); +#endif } } /* Check connection EVD */ while(DAT_SUCCESS == dat_evd_dequeue(btl->udapl_evd_conn, &event)) { + switch(event.event_number) { case DAT_CONNECTION_REQUEST_EVENT: /* Accept a new connection */ - rc = dat_cr_accept( - event.event_data.cr_arrival_event_data.cr_handle, - DAT_HANDLE_NULL, 0, NULL); - if(DAT_SUCCESS != rc) { - mca_btl_udapl_error(rc, "dat_cr_accept"); - } + OPAL_OUTPUT((0, "btl_udapl accepting connection\n")); + mca_btl_udapl_accept_connect(btl, + event.event_data.cr_arrival_event_data.cr_handle); count++; break; case DAT_CONNECTION_EVENT_ESTABLISHED: - /* TODO - at this point we have a uDPAL enpoint in - event.event_data.connect_event_data.ep_handle, - need to figure out how to tie back into the BTL */ + OPAL_OUTPUT((0, "btl_udapl connection established\n")); + + /* Both the client and server side of a connection generate + this event */ + /* Really shouldn't do anything here, as we won't have the + address data we need to match a uDAPL EP to a BTL EP. + Connections are finished when DTOs are completed for + the address transfer */ + count++; break; case DAT_CONNECTION_EVENT_PEER_REJECTED: @@ -412,11 +562,17 @@ int mca_btl_udapl_component_progress() case DAT_CONNECTION_EVENT_DISCONNECTED: case DAT_CONNECTION_EVENT_BROKEN: case DAT_CONNECTION_EVENT_TIMED_OUT: + /* handle this case specially? if we have finite timeout, + we might want to try connecting again here. */ case DAT_CONNECTION_EVENT_UNREACHABLE: + /* Need to set the BTL endpoint to MCA_BTL_UDAPL_FAILED + See dat_ep_connect documentation pdf pg 198 */ break; +#ifdef OMPI_ENABLE_DEBUG default: OPAL_OUTPUT((0, "WARNING unknown conn event: %d\n", event.event_number)); +#endif } } @@ -430,9 +586,11 @@ int mca_btl_udapl_component_progress() case DAT_ASYNC_ERROR_TIMED_OUT: case DAT_ASYNC_ERROR_PROVIDER_INTERNAL_ERROR: break; +#ifdef OMPI_ENABLE_DEBUG default: OPAL_OUTPUT((0, "WARNING unknown async event: %d\n", event.event_number)); +#endif } } } diff --git a/ompi/mca/btl/udapl/btl_udapl_endpoint.c b/ompi/mca/btl/udapl/btl_udapl_endpoint.c index 0fb0b83327..62e1de30c6 100644 --- a/ompi/mca/btl/udapl/btl_udapl_endpoint.c +++ b/ompi/mca/btl/udapl/btl_udapl_endpoint.c @@ -26,12 +26,106 @@ #include "orte/mca/rml/rml.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/dss/dss.h" +#include "ompi/mca/mpool/udapl/mpool_udapl.h" #include "btl_udapl.h" #include "btl_udapl_endpoint.h" #include "btl_udapl_proc.h" #include "btl_udapl_frag.h" +static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint); + + +int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, + mca_btl_udapl_frag_t* frag) +{ + int rc = OMPI_SUCCESS; + + OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock); + switch(endpoint->endpoint_state) { + case MCA_BTL_UDAPL_CONNECTED: + /* just send it already.. */ + break; + case MCA_BTL_UDAPL_CLOSED: + /* Initiate a new connection, add this send to a queue */ + rc = mca_btl_udapl_start_connect(endpoint); + if(OMPI_SUCCESS != rc) { + break; + } + + /* Fall through on purpose to queue the send */ + case MCA_BTL_UDAPL_CONNECTING: + /* Add this send to a queue */ + opal_list_append(&endpoint->endpoint_frags, + (opal_list_item_t*)frag); + break; + case MCA_BTL_UDAPL_FAILED: + rc = OMPI_ERR_UNREACH; + break; + } + OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + + return rc; +} + + +static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint) +{ + mca_btl_udapl_module_t* btl = endpoint->endpoint_btl; + mca_btl_udapl_frag_t* frag; + DAT_DTO_COOKIE cookie; + int rc; + + /* Create a new uDAPL endpoint and start the connection process */ + rc = dat_ep_create(btl->udapl_ia, btl->udapl_pz, + btl->udapl_evd_dto, btl->udapl_evd_dto, btl->udapl_evd_conn, + NULL, &endpoint->endpoint_ep); + if(DAT_SUCCESS != rc) { + mca_btl_udapl_error(rc, "dat_ep_create"); + goto failure_create; + } + + rc = dat_ep_connect(endpoint->endpoint_ep, &endpoint->endpoint_addr.addr, + endpoint->endpoint_addr.port, mca_btl_udapl_component.udapl_timeout, + 0, NULL, 0, DAT_CONNECT_DEFAULT_FLAG); + if(DAT_SUCCESS != rc) { + mca_btl_udapl_error(rc, "dat_ep_connect"); + goto failure; + } + + /* Send our local address data over this EP */ + /* Can't use btl_udapl_send here, the send will just get queued */ + frag = (mca_btl_udapl_frag_t*)mca_btl_udapl_alloc( + (mca_btl_base_module_t*)btl, sizeof(mca_btl_udapl_addr_t)); + + memcpy(frag->hdr, &btl->udapl_addr, sizeof(mca_btl_udapl_addr_t)); + frag->endpoint = endpoint; + frag->type = MCA_BTL_UDAPL_CONN_SEND; + cookie.as_ptr = frag; + + /* Do the actual send now.. */ + OPAL_OUTPUT((0, "posting send!\n")); + rc = dat_ep_post_send(endpoint->endpoint_ep, 1, + &((mca_mpool_udapl_registration_t*)frag->registration)->lmr_triplet, + cookie, DAT_COMPLETION_DEFAULT_FLAG); + if(DAT_SUCCESS != rc) { + mca_btl_udapl_error(rc, "dat_ep_post_send"); + goto failure; + } + OPAL_OUTPUT((0, "after post send\n")); + + endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTING; + return OMPI_SUCCESS; + +failure: + dat_ep_free(endpoint->endpoint_ep); +failure_create: + endpoint->endpoint_ep = DAT_HANDLE_NULL; + endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; + return OMPI_ERROR; +} + + /* * Initialize state of the endpoint instance. * @@ -41,8 +135,15 @@ static void mca_btl_udapl_endpoint_construct(mca_btl_base_endpoint_t* endpoint) { endpoint->endpoint_btl = 0; endpoint->endpoint_proc = 0; + endpoint->endpoint_state = MCA_BTL_UDAPL_CLOSED; + endpoint->endpoint_ep = DAT_HANDLE_NULL; + + OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t); + OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t); + OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t); } + /* * Destroy a endpoint * @@ -50,6 +151,9 @@ static void mca_btl_udapl_endpoint_construct(mca_btl_base_endpoint_t* endpoint) static void mca_btl_udapl_endpoint_destruct(mca_btl_base_endpoint_t* endpoint) { + OBJ_DESTRUCT(&endpoint->endpoint_frags); + OBJ_DESTRUCT(&endpoint->endpoint_send_lock); + OBJ_DESTRUCT(&endpoint->endpoint_recv_lock); } diff --git a/ompi/mca/btl/udapl/btl_udapl_endpoint.h b/ompi/mca/btl/udapl/btl_udapl_endpoint.h index d169be7d7a..225a64a6f6 100644 --- a/ompi/mca/btl/udapl/btl_udapl_endpoint.h +++ b/ompi/mca/btl/udapl/btl_udapl_endpoint.h @@ -34,6 +34,7 @@ extern "C" { * Structure used to publish uDAPL id information to peers. */ struct mca_btl_udapl_addr_t { + DAT_CONN_QUAL port; DAT_SOCK_ADDR addr; }; typedef struct mca_btl_udapl_addr_t mca_btl_udapl_addr_t; @@ -56,7 +57,7 @@ typedef enum { * An instance of mca_btl_base_endpoint_t is associated w/ each process * and BTL pair at startup. However, connections to the endpoint * are established dynamically on an as-needed basis: - */ +*/ struct mca_btl_base_endpoint_t { opal_list_item_t super; @@ -70,10 +71,19 @@ struct mca_btl_base_endpoint_t { mca_btl_udapl_endpoint_state_t endpoint_state; /**< current state of the endpoint connection */ - opal_list_t pending_frags; + opal_list_t endpoint_frags; /**< pending send frags on this endpoint */ + opal_mutex_t endpoint_send_lock; + /**< lock for concurrent access to endpoint state */ + + opal_mutex_t endpoint_recv_lock; + /**< lock for concurrent access to endpoint state */ + mca_btl_udapl_addr_t endpoint_addr; + + DAT_EP_HANDLE endpoint_ep; + /**< uDAPL endpoint handle */ }; typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t; @@ -81,6 +91,10 @@ typedef mca_btl_base_endpoint_t mca_btl_udapl_endpoint_t; OBJ_CLASS_DECLARATION(mca_btl_udapl_endpoint_t); + +int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, + mca_btl_udapl_frag_t* frag); + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/ompi/mca/btl/udapl/btl_udapl_frag.c b/ompi/mca/btl/udapl/btl_udapl_frag.c index b786ca6e64..01e4de624b 100644 --- a/ompi/mca/btl/udapl/btl_udapl_frag.c +++ b/ompi/mca/btl/udapl/btl_udapl_frag.c @@ -1,3 +1,22 @@ +/* + * Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "btl_udapl.h" #include "btl_udapl_frag.h" diff --git a/ompi/mca/btl/udapl/btl_udapl_frag.h b/ompi/mca/btl/udapl/btl_udapl_frag.h index 34ba12b735..41daedb6d6 100644 --- a/ompi/mca/btl/udapl/btl_udapl_frag.h +++ b/ompi/mca/btl/udapl/btl_udapl_frag.h @@ -22,7 +22,6 @@ #define MCA_BTL_UDAPL_FRAG_ALIGN (8) #include "ompi_config.h" -#include "btl_udapl.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { @@ -32,6 +31,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_udapl_frag_t); typedef enum { MCA_BTL_UDAPL_SEND, + MCA_BTL_UDAPL_CONN_SEND, + MCA_BTL_UDAPL_CONN_RECV, MCA_BTL_UDAPL_PUT, MCA_BTL_UDAPL_GET } mca_btl_udapl_frag_type_t; @@ -68,10 +69,6 @@ typedef struct mca_btl_udapl_frag_t mca_btl_udapl_frag_user_t; OBJ_CLASS_DECLARATION(mca_btl_udapl_frag_user_t); -typedef struct mca_btl_udapl_frag_t mca_btl_udapl_frag_recv_t; - -OBJ_CLASS_DECLARATION(mca_btl_udapl_frag_recv_t); - /* * Macros to allocate/return descriptors from module specific diff --git a/ompi/mca/btl/udapl/btl_udapl_proc.c b/ompi/mca/btl/udapl/btl_udapl_proc.c index 17edefd5dc..56330829fd 100644 --- a/ompi/mca/btl/udapl/btl_udapl_proc.c +++ b/ompi/mca/btl/udapl/btl_udapl_proc.c @@ -38,12 +38,14 @@ void mca_btl_udapl_proc_construct(mca_btl_udapl_proc_t* proc) proc->proc_endpoints = 0; proc->proc_endpoint_count = 0; OBJ_CONSTRUCT(&proc->proc_lock, opal_mutex_t); + /* add to list of all proc instance */ OPAL_THREAD_LOCK(&mca_btl_udapl_component.udapl_lock); opal_list_append(&mca_btl_udapl_component.udapl_procs, &proc->super); OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock); } + /* * Cleanup uDAPL proc instance */ @@ -170,8 +172,6 @@ int mca_btl_udapl_proc_insert( if(udapl_proc->proc_endpoint_count > udapl_proc->proc_addr_count) return OMPI_ERR_OUT_OF_RESOURCE; - opal_output(0, "udapl_proc_insert\n"); - udapl_endpoint->endpoint_proc = udapl_proc; udapl_endpoint->endpoint_addr = udapl_proc->proc_addrs[udapl_proc->proc_endpoint_count];