diff --git a/ompi/mca/btl/udapl/btl_udapl.c b/ompi/mca/btl/udapl/btl_udapl.c index 53d16484b4..9bcac66240 100644 --- a/ompi/mca/btl/udapl/btl_udapl.c +++ b/ompi/mca/btl/udapl/btl_udapl.c @@ -93,7 +93,7 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t* btl) /* query to get address information */ /* TODO - we only get the address, but there's other useful stuff here */ rc = dat_ia_query(btl->udapl_ia, &btl->udapl_evd_async, - DAT_IA_FIELD_IA_ADDRESS_PTR, &attr, DAT_IA_FIELD_NONE, NULL); + DAT_IA_FIELD_IA_ADDRESS_PTR, &attr, 0, NULL); if(DAT_SUCCESS != rc) { MCA_BTL_UDAPL_ERROR(rc, "dat_ia_query"); goto failure; diff --git a/ompi/mca/btl/udapl/btl_udapl_component.c b/ompi/mca/btl/udapl/btl_udapl_component.c index cba3b5eb3a..e27dca7588 100644 --- a/ompi/mca/btl/udapl/btl_udapl_component.c +++ b/ompi/mca/btl/udapl/btl_udapl_component.c @@ -328,6 +328,9 @@ mca_btl_udapl_component_init (int *num_btl_modules, return NULL; } + /* Post OOB receive */ + mca_btl_udapl_endpoint_post_oob_recv(); + /* return array of BTLs */ btls = (mca_btl_base_module_t**) malloc(sizeof(mca_btl_base_module_t *) * mca_btl_udapl_component.udapl_num_btls); @@ -344,8 +347,7 @@ mca_btl_udapl_component_init (int *num_btl_modules, } -static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl, - DAT_CR_HANDLE cr_handle) +static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl, DAT_CR_HANDLE cr_handle) { mca_btl_udapl_frag_t* frag; DAT_EP_HANDLE endpoint; @@ -421,8 +423,6 @@ int mca_btl_udapl_component_progress() a large enough buffer */ dto = &event.event_data.dto_completion_event_data; - OPAL_OUTPUT((0, "btl_udapl DTO transferred %d bytes\n", - dto->transfered_length)); /* Was the DTO successful? */ if(DAT_DTO_SUCCESS != dto->status) { @@ -435,7 +435,8 @@ int mca_btl_udapl_component_progress() switch(frag->type) { case MCA_BTL_UDAPL_SEND: - OPAL_OUTPUT((0, "btl_udapl UDAPL_SEND")); + OPAL_OUTPUT((0, "btl_udapl UDAPL_SEND %d", + dto->transfered_length)); frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); @@ -446,7 +447,8 @@ int mca_btl_udapl_component_progress() mca_btl_base_recv_reg_t* reg = &btl->udapl_reg[frag->hdr->tag]; - OPAL_OUTPUT((0, "btl_udapl UDAPL_RECV\n")); + OPAL_OUTPUT((0, "btl_udapl UDAPL_RECV %d", + dto->transfered_length)); frag->segment.seg_addr.pval = frag->hdr + 1; frag->segment.seg_len = dto->transfered_length - @@ -460,6 +462,7 @@ int mca_btl_udapl_component_progress() /* Repost the frag */ frag->segment.seg_addr.pval = frag->hdr; frag->segment.seg_len = frag->size; + dat_ep_post_recv(frag->endpoint->endpoint_ep, 1, &frag->triplet, (DAT_DTO_COOKIE)(void*)frag, DAT_COMPLETION_DEFAULT_FLAG); @@ -467,11 +470,6 @@ int mca_btl_udapl_component_progress() } case MCA_BTL_UDAPL_CONN_SEND: /* Client (send) side connection established */ - OPAL_OUTPUT((0, - "btl_udapl SEND SIDE CONNECT COMPLETED!!\n")); - frag->endpoint->endpoint_state = - MCA_BTL_UDAPL_CONNECTED; - mca_btl_udapl_endpoint_post_queue(frag->endpoint); MCA_BTL_UDAPL_FRAG_RETURN_EAGER(btl, frag); diff --git a/ompi/mca/btl/udapl/btl_udapl_endpoint.c b/ompi/mca/btl/udapl/btl_udapl_endpoint.c index 5a94c53e1d..534e575570 100644 --- a/ompi/mca/btl/udapl/btl_udapl_endpoint.c +++ b/ompi/mca/btl/udapl/btl_udapl_endpoint.c @@ -33,8 +33,13 @@ #include "btl_udapl_frag.h" +static void mca_btl_udapl_endpoint_send_cb(int status, orte_process_name_t* endpoint, + orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata); static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint); static int mca_btl_udapl_endpoint_post_recv(mca_btl_udapl_endpoint_t* endpoint); +void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint); +void mca_btl_udapl_endpoint_recv(int status, orte_process_name_t* endpoint, + orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata); int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, @@ -58,6 +63,7 @@ int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, /* Initiate a new connection, add this send to a queue */ rc = mca_btl_udapl_start_connect(endpoint); if(OMPI_SUCCESS != rc) { + endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; break; } @@ -77,12 +83,132 @@ int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, } +static void mca_btl_udapl_endpoint_send_cb(int status, orte_process_name_t* endpoint, + orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata) +{ + OBJ_RELEASE(buffer); +} + + static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint) +{ + mca_btl_udapl_addr_t* addr = &endpoint->endpoint_btl->udapl_addr; + orte_buffer_t* buf = OBJ_NEW(orte_buffer_t); + int rc; + + if(NULL == buf) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* Pack our address information */ + rc = orte_dss.pack(buf, &addr->port, 1, ORTE_UINT64); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return rc; + } + + rc = orte_dss.pack(buf, &addr->addr, sizeof(DAT_SOCK_ADDR), ORTE_UINT8); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* Send the buffer */ + rc = orte_rml.send_buffer_nb(&endpoint->endpoint_proc->proc_guid, buf, + ORTE_RML_TAG_DYNAMIC - 1, 0, mca_btl_udapl_endpoint_send_cb, NULL); + if(0 > rc) { + ORTE_ERROR_LOG(rc); + return rc; + } + + endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTING; + return OMPI_SUCCESS; +} + + +void mca_btl_udapl_endpoint_recv(int status, orte_process_name_t* endpoint, + orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata) +{ + mca_btl_udapl_addr_t addr; + mca_btl_udapl_proc_t* proc; + mca_btl_base_endpoint_t* ep; + size_t cnt = 1; + size_t i; + int rc; + + /* Unpack data */ + rc = orte_dss.unpack(buffer, &addr.port, &cnt, ORTE_UINT64); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + + cnt = sizeof(mca_btl_udapl_addr_t); + rc = orte_dss.unpack(buffer, &addr.addr, &cnt, ORTE_UINT8); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return; + } + + /* Match the endpoint and handle it */ + OPAL_THREAD_LOCK(&mca_btl_udapl_component.udapl_lock); + for(proc = (mca_btl_udapl_proc_t*) + opal_list_get_first(&mca_btl_udapl_component.udapl_procs); + proc != (mca_btl_udapl_proc_t*) + opal_list_get_end(&mca_btl_udapl_component.udapl_procs); + proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) { + + if(0 == orte_ns.compare(ORTE_NS_CMP_ALL, &proc->proc_guid, endpoint)) { + for(i = 0; i < proc->proc_endpoint_count; i++) { + ep = proc->proc_endpoints[i]; + + /* Does this endpoint match? */ + if(!memcmp(&addr, &ep->endpoint_addr, + sizeof(mca_btl_udapl_addr_t))) { + OPAL_OUTPUT((0, "btl_udapl found endpoint!\n")); + OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock); + mca_btl_udapl_endpoint_connect(ep); + return; + } + } + } + } + OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock); +} + + +/* + * Set up OOB recv callback. + */ + +void mca_btl_udapl_endpoint_post_oob_recv(void) +{ + orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_DYNAMIC-1, + ORTE_RML_PERSISTENT, mca_btl_udapl_endpoint_recv, NULL); +} + + +void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint) { mca_btl_udapl_module_t* btl = endpoint->endpoint_btl; mca_btl_udapl_frag_t* frag; int rc; + OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock); + + /* Nasty test to prevent deadlock and unwanted connection attempts */ + /* This right here is the whole point of using the ORTE/RML handshake */ + if((MCA_BTL_UDAPL_CONNECTING == endpoint->endpoint_state && + 0 > orte_ns.compare(ORTE_NS_CMP_ALL, + &endpoint->endpoint_proc->proc_guid, + &ompi_proc_local()->proc_name)) || + (MCA_BTL_UDAPL_CLOSED != endpoint->endpoint_state && + MCA_BTL_UDAPL_CONNECTING != endpoint->endpoint_state)) { + OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + return; + } + /* Create a new uDAPL endpoint and start the connection process */ rc = dat_ep_create(btl->udapl_ia, btl->udapl_pz, btl->udapl_evd_dto, btl->udapl_evd_dto, btl->udapl_evd_conn, @@ -119,14 +245,16 @@ static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint) } endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTING; - return OMPI_SUCCESS; + OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + return; failure: dat_ep_free(endpoint->endpoint_ep); failure_create: endpoint->endpoint_ep = DAT_HANDLE_NULL; endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; - return OMPI_ERROR; + OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + return; } @@ -140,6 +268,7 @@ int mca_btl_udapl_endpoint_post_queue(mca_btl_udapl_endpoint_t* endpoint) int rc = OMPI_SUCCESS; OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock); + endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTED; while(NULL != (frag = (mca_btl_udapl_frag_t*) opal_list_remove_first(&endpoint->endpoint_frags))) { rc = dat_ep_post_send(endpoint->endpoint_ep, 1, &frag->triplet, @@ -155,6 +284,7 @@ int mca_btl_udapl_endpoint_post_queue(mca_btl_udapl_endpoint_t* endpoint) return mca_btl_udapl_endpoint_post_recv(endpoint); } + /* * Match a uDAPL endpoint to a BTL endpoint. */ @@ -180,11 +310,10 @@ int mca_btl_udapl_endpoint_match(struct mca_btl_udapl_module_t* btl, /* Does this endpoint match? */ if(ep->endpoint_btl == btl && !memcmp(addr, &ep->endpoint_addr, - sizeof(mca_btl_udapl_addr_t))) { + sizeof(mca_btl_udapl_addr_t))) { OPAL_OUTPUT((0, "btl_udapl matched endpoint!\n")); ep->endpoint_ep = endpoint; - ep->endpoint_state = MCA_BTL_UDAPL_CONNECTED; - mca_btl_udapl_endpoint_post_recv(ep); + mca_btl_udapl_endpoint_post_queue(ep); OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock); return OMPI_SUCCESS; } @@ -217,7 +346,7 @@ static int mca_btl_udapl_endpoint_post_recv(mca_btl_udapl_endpoint_t* endpoint) /* Note that this triplet defines a sub-region of a registered LMR */ frag->triplet.virtual_address = (DAT_VADDR)frag->hdr; frag->triplet.segment_length = - frag->segment.seg_len + sizeof(mca_btl_base_header_t); + mca_btl_udapl_module.super.btl_eager_limit; frag->btl = endpoint->endpoint_btl; frag->endpoint = endpoint; diff --git a/ompi/mca/btl/udapl/btl_udapl_endpoint.h b/ompi/mca/btl/udapl/btl_udapl_endpoint.h index c88c62088d..8f10726ca0 100644 --- a/ompi/mca/btl/udapl/btl_udapl_endpoint.h +++ b/ompi/mca/btl/udapl/btl_udapl_endpoint.h @@ -99,6 +99,12 @@ OBJ_CLASS_DECLARATION(mca_btl_udapl_endpoint_t); int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, mca_btl_udapl_frag_t* frag); +/* + * Set up OOB recv callback. + */ + +void mca_btl_udapl_endpoint_post_oob_recv(void); + /* * Post queued sends. */ diff --git a/ompi/mca/btl/udapl/btl_udapl_frag.c b/ompi/mca/btl/udapl/btl_udapl_frag.c index d621b2e576..7d63f8b4cd 100644 --- a/ompi/mca/btl/udapl/btl_udapl_frag.c +++ b/ompi/mca/btl/udapl/btl_udapl_frag.c @@ -48,7 +48,7 @@ static void mca_btl_udapl_frag_eager_constructor(mca_btl_udapl_frag_t* frag) { frag->segment.seg_len = mca_btl_udapl_module.super.btl_eager_limit - sizeof(mca_btl_base_header_t); - frag->size = mca_btl_udapl_component.udapl_eager_frag_size; + frag->size = mca_btl_udapl_component.udapl_eager_frag_size; mca_btl_udapl_frag_common_constructor(frag); } @@ -68,6 +68,23 @@ static void mca_btl_udapl_frag_user_constructor(mca_btl_udapl_frag_t* frag) frag->size = 0; } +static void mca_btl_udapl_frag_common_destructor(mca_btl_udapl_frag_t* frag) +{ +#if OMPI_ENABLE_DEBUG + frag->hdr = NULL; + frag->size = 0; + frag->registration = NULL; + frag->segment.seg_len = 0; + frag->segment.seg_addr.pval = NULL; + + frag->base.des_src = NULL; + frag->base.des_src_cnt = 0; + frag->base.des_dst = NULL; + frag->base.des_dst_cnt = 0; + frag->base.des_flags = 0; +#endif +} + OBJ_CLASS_INSTANCE( mca_btl_udapl_frag_t, @@ -79,13 +96,13 @@ OBJ_CLASS_INSTANCE( mca_btl_udapl_frag_eager_t, mca_btl_base_descriptor_t, mca_btl_udapl_frag_eager_constructor, - NULL); + mca_btl_udapl_frag_common_destructor); OBJ_CLASS_INSTANCE( mca_btl_udapl_frag_max_t, mca_btl_base_descriptor_t, mca_btl_udapl_frag_max_constructor, - NULL); + mca_btl_udapl_frag_common_destructor); OBJ_CLASS_INSTANCE( mca_btl_udapl_frag_user_t, diff --git a/ompi/mca/mpool/udapl/mpool_udapl.h b/ompi/mca/mpool/udapl/mpool_udapl.h index 8e4cae5021..62631f3ea8 100644 --- a/ompi/mca/mpool/udapl/mpool_udapl.h +++ b/ompi/mca/mpool/udapl/mpool_udapl.h @@ -62,6 +62,21 @@ typedef struct mca_mpool_udapl_registration_t mca_mpool_udapl_registration_t; OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_mpool_udapl_registration_t); +/** + * Report a uDAPL error - for debugging + */ + +#if OMPI_ENABLE_DEBUG +extern void mca_mpool_udapl_error(DAT_RETURN ret, char* str); + +#define MCA_MPOOL_UDAPL_ERROR(ret, str) \ + mca_mpool_udapl_error((ret), (str)); + +#else +#define MCA_MPOOL_UDAPL_ERROR(ret, str) +#endif + + /* * Initializes the mpool module. */ diff --git a/ompi/mca/mpool/udapl/mpool_udapl_component.c b/ompi/mca/mpool/udapl/mpool_udapl_component.c index 8a37cee6cb..8ecd66c373 100644 --- a/ompi/mca/mpool/udapl/mpool_udapl_component.c +++ b/ompi/mca/mpool/udapl/mpool_udapl_component.c @@ -64,6 +64,29 @@ mca_mpool_udapl_component_t mca_mpool_udapl_component = { }; +/** + * Report a uDAPL error - for debugging + */ + +#if OMPI_ENABLE_DEBUG +void +mca_mpool_udapl_error(DAT_RETURN ret, char* str) +{ + char* major; + char* minor; + + if(DAT_SUCCESS != dat_strerror(ret, + (const char**)&major, (const char**)&minor)) + { + printf("dat_strerror failed! ret is %d\n", ret); + exit(-1); + } + + OPAL_OUTPUT((0, "ERROR: %s %s %s\n", str, major, minor)); +} +#endif + + /** * udapl_registration_t constructor/destructor */ diff --git a/ompi/mca/mpool/udapl/mpool_udapl_module.c b/ompi/mca/mpool/udapl/mpool_udapl_module.c index f30926d570..b14e034ce7 100644 --- a/ompi/mca/mpool/udapl/mpool_udapl_module.c +++ b/ompi/mca/mpool/udapl/mpool_udapl_module.c @@ -65,8 +65,6 @@ void* mca_mpool_udapl_alloc( { void* addr = malloc(size+align); - OPAL_OUTPUT((0, "mpool_udapl_alloc\n")); - /* TODO - align addr to dat_optimal_alignment */ if(NULL == addr) return NULL; @@ -121,8 +119,7 @@ int mca_mpool_udapl_register( ®->lmr, ®->lmr_triplet.lmr_context, ®->rmr_context, &dat_size, &dat_addr); if(DAT_SUCCESS != rc) { - /* TODO - bring in error reporting function from the BTL */ - OPAL_OUTPUT((0, "mpool_udapl_register failed: %d\n", rc)); + MCA_MPOOL_UDAPL_ERROR(rc, "dat_lmr_create"); OBJ_RELEASE(reg); return OMPI_ERR_OUT_OF_RESOURCE; @@ -192,8 +189,6 @@ void* mca_mpool_udapl_realloc( void mca_mpool_udapl_free(mca_mpool_base_module_t* mpool, void * addr, mca_mpool_base_registration_t* registration) { - OPAL_OUTPUT((0, "mpool_udapl_free\n")); - mpool->mpool_deregister(mpool, registration); free(addr); } @@ -218,27 +213,19 @@ int mca_mpool_udapl_release( struct mca_mpool_base_module_t* mpool, mca_mpool_base_registration_t* reg) { - mca_mpool_udapl_module_t * mpool_udapl = (mca_mpool_udapl_module_t*) mpool; - - OPAL_OUTPUT((0, "mpool_udapl_release\n")); - if(0 == OPAL_THREAD_ADD32(®->ref_count, -1)) { mca_mpool_udapl_registration_t *udapl_reg = (mca_mpool_udapl_registration_t*)reg; int rc = dat_lmr_free(udapl_reg->lmr); if(DAT_SUCCESS != rc) { - /* TODO - use error reporting function from the BTL */ - OPAL_OUTPUT((0, "[%s:%d] error(%d) deregistering udapl memory\n", __FILE__, __LINE__, rc)); + MCA_MPOOL_UDAPL_ERROR(rc, "dat_lmr_free"); return OMPI_ERROR; } OBJ_RELEASE(reg); } - else { - /* OPAL_OUTPUT((0, "release says ref_count is %d\n", reg->ref_count)); */ - } - + return OMPI_SUCCESS; }