From c68c6ac122db4763210610bcc2554974fc284f29 Mon Sep 17 00:00:00 2001 From: Andrew Friedley Date: Mon, 12 Jun 2006 22:42:01 +0000 Subject: [PATCH] A number of fixes and the usual cleanup.. - Added some basic flow control to limit number of posted sends. - Merged endpoint send/recv lock into single endpoint lock. - Set the LMR triplet length in the send path, not at allocation time. This has to be done because upper layers might send less than the amount allocated. - Alter the tie-breaker if statement protecting the second call to dat_ep_connect(). The logic was reversed compared to the tie- breaker for the first dat_ep_connect(), making it possible for 3 or more processes to form a deadlock loop. - Some asserts were added for debugging purposes.. leaving them in place for now. This commit was SVN r10317. --- ompi/mca/btl/udapl/btl_udapl.c | 32 ++--- ompi/mca/btl/udapl/btl_udapl.h | 52 +------ ompi/mca/btl/udapl/btl_udapl_component.c | 116 +++++++++++---- ompi/mca/btl/udapl/btl_udapl_endpoint.c | 172 +++++++++++++++-------- ompi/mca/btl/udapl/btl_udapl_endpoint.h | 11 +- ompi/mca/btl/udapl/btl_udapl_frag.h | 18 +-- 6 files changed, 223 insertions(+), 178 deletions(-) diff --git a/ompi/mca/btl/udapl/btl_udapl.c b/ompi/mca/btl/udapl/btl_udapl.c index 4b8327154c..b05f5c388b 100644 --- a/ompi/mca/btl/udapl/btl_udapl.c +++ b/ompi/mca/btl/udapl/btl_udapl.c @@ -171,10 +171,6 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t* btl) OBJ_CONSTRUCT(&btl->udapl_frag_eager, ompi_free_list_t); OBJ_CONSTRUCT(&btl->udapl_frag_max, ompi_free_list_t); OBJ_CONSTRUCT(&btl->udapl_frag_user, ompi_free_list_t); - OBJ_CONSTRUCT(&btl->udapl_frag_recv, ompi_free_list_t); - OBJ_CONSTRUCT(&btl->udapl_pending, opal_list_t); - OBJ_CONSTRUCT(&btl->udapl_repost, opal_list_t); - OBJ_CONSTRUCT(&btl->udapl_mru_reg, opal_list_t); OBJ_CONSTRUCT(&btl->udapl_lock, opal_mutex_t); /* initialize free lists */ @@ -221,8 +217,6 @@ int mca_btl_udapl_finalize(struct mca_btl_base_module_t* base_btl) { mca_btl_udapl_module_t* udapl_btl = (mca_btl_udapl_module_t*) base_btl; - OPAL_OUTPUT((0, "udapl_finalize\n")); - /* release uDAPL resources */ dat_evd_free(udapl_btl->udapl_evd_dto); dat_evd_free(udapl_btl->udapl_evd_conn); @@ -254,8 +248,6 @@ int mca_btl_udapl_add_procs( mca_btl_udapl_module_t* udapl_btl = (mca_btl_udapl_module_t*)btl; int i, rc; - OPAL_OUTPUT((0, "udapl_add_procs\n")); - for(i = 0; i < (int) nprocs; i++) { struct ompi_proc_t* ompi_proc = ompi_procs[i]; @@ -309,7 +301,6 @@ int mca_btl_udapl_del_procs(struct mca_btl_base_module_t* btl, struct ompi_proc_t **procs, struct mca_btl_base_endpoint_t ** peers) { - OPAL_OUTPUT((0, "udapl_del_procs\n")); /* TODO */ return OMPI_SUCCESS; } @@ -329,7 +320,6 @@ int mca_btl_udapl_register( udapl_btl->udapl_reg[tag].cbfunc = cbfunc; udapl_btl->udapl_reg[tag].cbdata = cbdata; - OPAL_OUTPUT((0, "udapl_register %d %p\n", tag, cbfunc)); return OMPI_SUCCESS; } @@ -368,6 +358,8 @@ mca_btl_base_descriptor_t* mca_btl_udapl_alloc( frag->triplet.virtual_address = (DAT_VADDR)frag->hdr; frag->triplet.segment_length = frag->segment.seg_len + sizeof(mca_btl_base_header_t); + assert(frag->triplet.lmr_context == + ((mca_mpool_udapl_registration_t*)frag->registration)->lmr_triplet.lmr_context); frag->btl = udapl_btl; frag->base.des_src = &frag->segment; @@ -375,6 +367,7 @@ mca_btl_base_descriptor_t* mca_btl_udapl_alloc( frag->base.des_dst = NULL; frag->base.des_dst_cnt = 0; frag->base.des_flags = 0; + return &frag->base; } @@ -426,13 +419,13 @@ mca_btl_base_descriptor_t* mca_btl_udapl_prepare_src( int32_t free_after; int rc; +#if 0 /* * If the data has already been pinned and is contigous than we can * use it in place. */ -#if 0 if (NULL != registration && 0 == ompi_convertor_need_buffers(convertor)) { - //size_t reg_len; + size_t reg_len; OPAL_OUTPUT((0, "udapl_prepare_src 1\n")); MCA_BTL_UDAPL_FRAG_ALLOC_USER(btl, frag, rc); @@ -459,6 +452,8 @@ mca_btl_base_descriptor_t* mca_btl_udapl_prepare_src( */ btl->btl_mpool->mpool_retain(btl->btl_mpool, registration); frag->registration = registration; + frag->triplet.lmr_context = + ((mca_mpool_udapl_registration_t*)registration)->lmr_triplet.lmr_context; /* * if the data is not already pinned - but the leave pinned option is set, @@ -483,10 +478,6 @@ mca_btl_base_descriptor_t* mca_btl_udapl_prepare_src( ompi_convertor_pack(convertor, &iov, &iov_count, &max_data, &free_after); - frag->segment.seg_len = max_data; - frag->segment.seg_addr.pval = iov.iov_base; - frag->triplet.segment_length = max_data; - frag->triplet.virtual_address = (DAT_VADDR)iov.iov_base; rc = mpool->mpool_register( mpool, @@ -503,6 +494,11 @@ mca_btl_base_descriptor_t* mca_btl_udapl_prepare_src( frag->registration = registration; frag->triplet.lmr_context = ((mca_mpool_udapl_registration_t*)registration)->lmr_triplet.lmr_context; + /* TODO - should our base addr be frag->hdr? */ + frag->segment.seg_len = max_data; + frag->segment.seg_addr.pval = iov.iov_base; + frag->triplet.segment_length = max_data; + frag->triplet.virtual_address = (DAT_VADDR)iov.iov_base; } /* @@ -517,7 +513,6 @@ mca_btl_base_descriptor_t* mca_btl_udapl_prepare_src( return NULL; } - //OPAL_OUTPUT((0, "udapl_prepare_src 3\n")); iov.iov_len = max_data; iov.iov_base = (unsigned char*) frag->segment.seg_addr.pval + reserve; @@ -540,7 +535,6 @@ mca_btl_base_descriptor_t* mca_btl_udapl_prepare_src( * that is the max send size. */ else { - //OPAL_OUTPUT((0, "udapl_prepare_src 4\n")); MCA_BTL_UDAPL_FRAG_ALLOC_MAX(btl, frag, rc); if(NULL == frag) { return NULL; @@ -668,8 +662,6 @@ int mca_btl_udapl_send( { mca_btl_udapl_frag_t* frag = (mca_btl_udapl_frag_t*)des; - OPAL_OUTPUT((0, "udapl_send %d\n", tag)); - frag->btl = (mca_btl_udapl_module_t*)btl; frag->endpoint = endpoint; frag->hdr->tag = tag; diff --git a/ompi/mca/btl/udapl/btl_udapl.h b/ompi/mca/btl/udapl/btl_udapl.h index 71428d22f4..3553eb125f 100644 --- a/ompi/mca/btl/udapl/btl_udapl.h +++ b/ompi/mca/btl/udapl/btl_udapl.h @@ -53,9 +53,9 @@ struct mca_btl_udapl_component_t { size_t udapl_num_btls; /**< number of hcas available to the uDAPL component */ size_t udapl_max_btls; /**< maximum number of supported hcas */ struct mca_btl_udapl_module_t **udapl_btls; /**< array of available BTL modules */ - size_t udapl_num_mru; size_t udapl_evd_qlen; - int32_t udapl_num_repost; + int32_t udapl_num_recvs; /**< number of recv buffers to keep posted */ + int32_t udapl_num_sends; /**< number of sends to post on endpoint */ int32_t udapl_timeout; /**< connection timeout, in microseconds */ size_t udapl_eager_frag_size; @@ -101,11 +101,7 @@ struct mca_btl_udapl_module_t { ompi_free_list_t udapl_frag_eager; ompi_free_list_t udapl_frag_max; ompi_free_list_t udapl_frag_user; - ompi_free_list_t udapl_frag_recv; - opal_list_t udapl_pending; /**< list of pending send fragments */ - opal_list_t udapl_repost; /**< list of pending recv fragments */ - opal_list_t udapl_mru_reg; /**< list of most recently used registrations */ opal_mutex_t udapl_lock; /* lock for accessing module state */ }; typedef struct mca_btl_udapl_module_t mca_btl_udapl_module_t; @@ -342,50 +338,6 @@ extern mca_btl_base_descriptor_t* mca_btl_udapl_prepare_dst( size_t* size); -/** - * Acquire a send token - queue the fragment if none available - */ - -#define MCA_BTL_UDAPL_ACQUIRE_TOKEN(btl, frag) \ -do { \ - /* queue the descriptor if there are no send tokens */ \ - if(OPAL_THREAD_ADD32(&udapl_btl->udapl_num_send_tokens, -1) < 0) { \ - OPAL_THREAD_LOCK(&udapl_btl->udapl_lock); \ - opal_list_append(&udapl_btl->udapl_pending, (opal_list_item_t*)frag); \ - OPAL_THREAD_UNLOCK(&udapl_btl->udapl_lock); \ - OPAL_THREAD_ADD32(&udapl_btl->udapl_num_send_tokens, 1); \ - return OMPI_SUCCESS; \ - } \ -} while (0) \ - -/** - * Return send token and dequeue and pending fragments - */ - -#define MCA_BTL_UDAPL_RETURN_TOKEN(btl) \ -do { \ - OPAL_THREAD_ADD32( &btl->udapl_num_send_tokens, 1 ); \ - if(opal_list_get_size(&btl->udapl_pending)) { \ - mca_btl_udapl_frag_t* frag; \ - OPAL_THREAD_LOCK(&btl->udapl_lock); \ - frag = (mca_btl_udapl_frag_t*)opal_list_remove_first(&btl->udapl_pending); \ - OPAL_THREAD_UNLOCK(&btl->udapl_lock); \ - if(NULL != frag) { \ - switch(frag->type) { \ - case MCA_BTL_UDAPL_SEND: \ - mca_btl_udapl_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); \ - break; \ - case MCA_BTL_UDAPL_PUT: \ - mca_btl_udapl_put(&btl->super, frag->endpoint, &frag->base); \ - break; \ - case MCA_BTL_UDAPL_GET: \ - mca_btl_udapl_get(&btl->super, frag->endpoint, &frag->base); \ - break; \ - } \ - } \ - } \ -} while (0) - #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/ompi/mca/btl/udapl/btl_udapl_component.c b/ompi/mca/btl/udapl/btl_udapl_component.c index f6ca6c300c..9151517b81 100644 --- a/ompi/mca/btl/udapl/btl_udapl_component.c +++ b/ompi/mca/btl/udapl/btl_udapl_component.c @@ -131,8 +131,6 @@ int mca_btl_udapl_component_open(void) { int param, value; - OPAL_OUTPUT((0, "udapl_component_open\n")); - /* initialize state */ mca_btl_udapl_component.udapl_num_btls=0; mca_btl_udapl_component.udapl_btls=NULL; @@ -153,21 +151,22 @@ int mca_btl_udapl_component_open(void) mca_btl_udapl_component.udapl_max_btls = mca_btl_udapl_param_register_int("max_modules", 8); mca_btl_udapl_component.udapl_evd_qlen = - mca_btl_udapl_param_register_int("evd_qlen", 8); - mca_btl_udapl_component.udapl_num_repost = - mca_btl_udapl_param_register_int("num_repost", 4); - mca_btl_udapl_component.udapl_num_mru = - mca_btl_udapl_param_register_int("num_mru", 64); + mca_btl_udapl_param_register_int("evd_qlen", 32); + mca_btl_udapl_component.udapl_num_recvs = + mca_btl_udapl_param_register_int("num_recvs", 8); + mca_btl_udapl_component.udapl_num_sends = + mca_btl_udapl_param_register_int("num_sends", 8); mca_btl_udapl_component.udapl_port_low = mca_btl_udapl_param_register_int("port_low", 45000); mca_btl_udapl_component.udapl_port_high = - mca_btl_udapl_param_register_int("port_high", 49000); + mca_btl_udapl_param_register_int("port_high", 49000); mca_btl_udapl_component.udapl_timeout = mca_btl_udapl_param_register_int("timeout", 10000000); /* register uDAPL module parameters */ mca_btl_udapl_module.super.btl_exclusivity = - mca_btl_udapl_param_register_int ("exclusivity", MCA_BTL_EXCLUSIVITY_DEFAULT - 10); + mca_btl_udapl_param_register_int ("exclusivity", + MCA_BTL_EXCLUSIVITY_DEFAULT - 10); mca_btl_udapl_module.super.btl_eager_limit = mca_btl_udapl_param_register_int ("eager_limit", 32*1024); mca_btl_udapl_module.super.btl_min_send_size = @@ -208,8 +207,6 @@ int mca_btl_udapl_component_open(void) int mca_btl_udapl_component_close(void) { - OPAL_OUTPUT((0, "udapl_component_close\n")); - /* TODO - what needs to be done here? */ return OMPI_SUCCESS; } @@ -269,8 +266,6 @@ mca_btl_udapl_component_init (int *num_btl_modules, DAT_COUNT num_ias; int32_t i; - OPAL_OUTPUT((0, "udapl_component_init\n")); - /* enumerate uDAPL interfaces */ /* Have to do weird pointer stuff to make uDAPL happy - just an array of DAT_PROVIDER_INFO isn't good enough. */ @@ -306,8 +301,6 @@ mca_btl_udapl_component_init (int *num_btl_modules, /* create a BTL module for each interface */ for(mca_btl_udapl_component.udapl_num_btls = i = 0; i < num_ias; i++) { - OPAL_OUTPUT((0, "udapl creating btl for %s\n", datinfo[i].ia_name)); - btl = malloc(sizeof(mca_btl_udapl_module_t)); if(NULL == btl) { free(datinfo); @@ -368,7 +361,8 @@ mca_btl_udapl_component_init (int *num_btl_modules, } -static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl, DAT_CR_HANDLE cr_handle) +static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl, + DAT_CR_HANDLE cr_handle) { DAT_EP_HANDLE endpoint; int rc; @@ -419,7 +413,7 @@ int mca_btl_udapl_component_progress() dat_evd_dequeue(btl->udapl_evd_dto, &event)) { DAT_DTO_COMPLETION_EVENT_DATA* dto; mca_btl_udapl_frag_t* frag; - + switch(event.event_number) { case DAT_DTO_COMPLETION_EVENT: dto = &event.event_data.dto_completion_event_data; @@ -435,21 +429,90 @@ int mca_btl_udapl_component_progress() switch(frag->type) { case MCA_BTL_UDAPL_SEND: - OPAL_OUTPUT((0, "btl_udapl UDAPL_SEND %d", - dto->transfered_length)); + { + mca_btl_udapl_endpoint_t* endpoint = frag->endpoint; + /*OPAL_OUTPUT((0, "btl_udapl UDAPL_SEND %d", + dto->transfered_length));*/ + + assert(frag->base.des_src == &frag->segment); + assert(frag->base.des_src_cnt == 1); + assert(frag->base.des_dst == NULL); + assert(frag->base.des_dst_cnt == 0); + assert(frag->type == MCA_BTL_UDAPL_SEND); frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); - /* TODO - anything else to do here? */ + + if(frag->size == + mca_btl_udapl_component.udapl_eager_frag_size) { + if(!opal_list_is_empty( + &endpoint->endpoint_eager_frags)) { + DAT_DTO_COOKIE cookie; + + frag = (mca_btl_udapl_frag_t*) + opal_list_remove_first( + &endpoint->endpoint_eager_frags); + + assert(frag->triplet.segment_length == + frag->segment.seg_len + + sizeof(mca_btl_base_header_t)); + + cookie.as_ptr = frag; + dat_ep_post_send(endpoint->endpoint_eager, + 1, &frag->triplet, cookie, + DAT_COMPLETION_DEFAULT_FLAG); + } else { + OPAL_THREAD_ADD32( + &endpoint->endpoint_eager_sends, 1); + } + } else { + assert(frag->size == + mca_btl_udapl_component.udapl_max_frag_size); + if(!opal_list_is_empty( + &endpoint->endpoint_max_frags)) { + DAT_DTO_COOKIE cookie; + + frag = (mca_btl_udapl_frag_t*) + opal_list_remove_first( + &endpoint->endpoint_max_frags); + + assert(frag->triplet.segment_length == + frag->segment.seg_len + + sizeof(mca_btl_base_header_t)); + + cookie.as_ptr = frag; + dat_ep_post_send(endpoint->endpoint_max, + 1, &frag->triplet, cookie, + DAT_COMPLETION_DEFAULT_FLAG); + } else { + OPAL_THREAD_ADD32( + &endpoint->endpoint_max_sends, 1); + } + } + break; + } case MCA_BTL_UDAPL_RECV: { mca_btl_base_recv_reg_t* reg = &btl->udapl_reg[frag->hdr->tag]; - OPAL_OUTPUT((0, "btl_udapl UDAPL_RECV %d", - dto->transfered_length)); + assert(frag->base.des_dst == &frag->segment); + assert(frag->base.des_dst_cnt == 1); + assert(frag->base.des_src == NULL); + assert(frag->base.des_src_cnt == 0); + assert(frag->type == MCA_BTL_UDAPL_RECV); + assert(frag->triplet.virtual_address == + (DAT_VADDR)frag->hdr); + assert(frag->triplet.segment_length == frag->size); + assert(frag->btl == btl); + /*OPAL_OUTPUT((0, "btl_udapl UDAPL_RECV %d", + dto->transfered_length));*/ + + /* OPAL_OUTPUT((0, "recv from %s %d %p\n", + inet_ntoa(addr->sin_addr), ntohs(addr->sin_port), + frag->endpoint));*/ frag->segment.seg_addr.pval = frag->hdr + 1; frag->segment.seg_len = dto->transfered_length - sizeof(mca_btl_base_header_t); @@ -463,20 +526,19 @@ int mca_btl_udapl_component_progress() frag->segment.seg_addr.pval = frag->hdr; frag->segment.seg_len = frag->size - sizeof(mca_btl_base_header_t); + frag->base.des_flags = 0; if(frag->size == mca_btl_udapl_component.udapl_eager_frag_size) { dat_ep_post_recv(frag->endpoint->endpoint_eager, 1, &frag->triplet, dto->user_cookie, DAT_COMPLETION_DEFAULT_FLAG); - } else if(frag->size == - mca_btl_udapl_component.udapl_max_frag_size) { + } else { + assert(frag->size == + mca_btl_udapl_component.udapl_max_frag_size); dat_ep_post_recv(frag->endpoint->endpoint_max, 1, &frag->triplet, dto->user_cookie, DAT_COMPLETION_DEFAULT_FLAG); - } else { - OPAL_OUTPUT((0, - "btl_udapl ERROR unknown frag size\n")); } break; diff --git a/ompi/mca/btl/udapl/btl_udapl_endpoint.c b/ompi/mca/btl/udapl/btl_udapl_endpoint.c index b5e6a32ee3..6a0885b74b 100644 --- a/ompi/mca/btl/udapl/btl_udapl_endpoint.c +++ b/ompi/mca/btl/udapl/btl_udapl_endpoint.c @@ -53,22 +53,39 @@ int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, int rc = OMPI_SUCCESS; DAT_DTO_COOKIE cookie; - OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock); + /* Fix up the segment length before we do anything with the frag */ + frag->triplet.segment_length = + frag->segment.seg_len + sizeof(mca_btl_base_header_t); + + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); switch(endpoint->endpoint_state) { case MCA_BTL_UDAPL_CONNECTED: /* just send it already.. */ - OPAL_OUTPUT((0, "sending %d bytes\n", frag->triplet.segment_length)); cookie.as_ptr = frag; if(frag->size == mca_btl_udapl_component.udapl_eager_frag_size) { - rc = dat_ep_post_send(endpoint->endpoint_eager, 1, - &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); - } else if(frag->size == - mca_btl_udapl_component.udapl_max_frag_size) { - rc = dat_ep_post_send(endpoint->endpoint_max, 1, - &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); + + if(OPAL_THREAD_ADD32(&endpoint->endpoint_eager_sends, -1) < 0) { + OPAL_THREAD_ADD32(&endpoint->endpoint_eager_sends, 1); + opal_list_append(&endpoint->endpoint_eager_frags, + (opal_list_item_t*)frag); + } else { + rc = dat_ep_post_send(endpoint->endpoint_eager, 1, + &frag->triplet, cookie, + DAT_COMPLETION_DEFAULT_FLAG); + } } else { - OPAL_OUTPUT((0, "btl_udapl ERROR unknown frag size\n")); + assert(frag->size == + mca_btl_udapl_component.udapl_max_frag_size); + if(OPAL_THREAD_ADD32(&endpoint->endpoint_max_sends, -1) < 0) { + OPAL_THREAD_ADD32(&endpoint->endpoint_max_sends, 1); + opal_list_append(&endpoint->endpoint_max_frags, + (opal_list_item_t*)frag); + } else { + rc = dat_ep_post_send(endpoint->endpoint_max, 1, + &frag->triplet, cookie, + DAT_COMPLETION_DEFAULT_FLAG); + } } if(DAT_SUCCESS != rc) { @@ -89,15 +106,24 @@ int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint, case MCA_BTL_UDAPL_CONN_EAGER: case MCA_BTL_UDAPL_CONN_MAX: /* Add this send to a queue */ - OPAL_OUTPUT((0, "queueing send %d bytes\n", frag->triplet.segment_length)); - opal_list_append(&endpoint->endpoint_frags, - (opal_list_item_t*)frag); + if(frag->size == + mca_btl_udapl_component.udapl_eager_frag_size) { + opal_list_append(&endpoint->endpoint_eager_frags, + (opal_list_item_t*)frag); + } else { + assert(frag->size == + mca_btl_udapl_component.udapl_max_frag_size); + OPAL_THREAD_ADD32(&endpoint->endpoint_max_sends, -1); + opal_list_append(&endpoint->endpoint_max_frags, + (opal_list_item_t*)frag); + } + break; case MCA_BTL_UDAPL_FAILED: rc = OMPI_ERR_UNREACH; break; } - OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); return rc; } @@ -186,7 +212,6 @@ void mca_btl_udapl_endpoint_recv(int status, orte_process_name_t* endpoint, /* Does this endpoint match? */ if(!memcmp(&addr, &ep->endpoint_addr, sizeof(mca_btl_udapl_addr_t))) { - OPAL_OUTPUT((0, "btl_udapl found endpoint!\n")); OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock); mca_btl_udapl_endpoint_connect(ep); return; @@ -195,7 +220,6 @@ void mca_btl_udapl_endpoint_recv(int status, orte_process_name_t* endpoint, } } OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock); - OBJ_RELEASE(buffer); } @@ -215,7 +239,7 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint) mca_btl_udapl_module_t* btl = endpoint->endpoint_btl; int rc; - OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock); + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); /* Nasty test to prevent deadlock and unwanted connection attempts */ /* This right here is the whole point of using the ORTE/RML handshake */ @@ -225,7 +249,7 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint) &ompi_proc_local()->proc_name)) || (MCA_BTL_UDAPL_CLOSED != endpoint->endpoint_state && MCA_BTL_UDAPL_CONN_EAGER != endpoint->endpoint_state)) { - OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); return; } @@ -247,7 +271,7 @@ void mca_btl_udapl_endpoint_connect(mca_btl_udapl_endpoint_t* endpoint) } endpoint->endpoint_state = MCA_BTL_UDAPL_CONN_EAGER; - OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); return; failure: @@ -255,7 +279,7 @@ failure: failure_create: endpoint->endpoint_eager = DAT_HANDLE_NULL; endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; - OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); return; } @@ -299,9 +323,8 @@ int mca_btl_udapl_endpoint_finish_connect(struct mca_btl_udapl_module_t* btl, if(ep->endpoint_btl == btl && !memcmp(param.remote_ia_address_ptr, &ep->endpoint_addr.addr, sizeof(DAT_SOCK_ADDR))) { - OPAL_OUTPUT((0, "btl_udapl matched endpoint!\n")); - OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock); + OPAL_THREAD_LOCK(&endpoint->endpoint_lock); if(MCA_BTL_UDAPL_CONN_EAGER == ep->endpoint_state) { ep->endpoint_eager = endpoint; rc = mca_btl_udapl_endpoint_finish_eager(ep); @@ -338,10 +361,10 @@ static int mca_btl_udapl_endpoint_finish_eager( int rc; endpoint->endpoint_state = MCA_BTL_UDAPL_CONN_MAX; - OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); /* Only one side does dat_ep_connect() */ - if(0 > orte_ns.compare(ORTE_NS_CMP_ALL, + if(0 < orte_ns.compare(ORTE_NS_CMP_ALL, &endpoint->endpoint_proc->proc_guid, &ompi_proc_local()->proc_name)) { @@ -385,31 +408,59 @@ static int mca_btl_udapl_endpoint_finish_max(mca_btl_udapl_endpoint_t* endpoint) /* post queued sends */ - while(NULL != (frag = (mca_btl_udapl_frag_t*) - opal_list_remove_first(&endpoint->endpoint_frags))) { + assert(endpoint->endpoint_eager_sends == + mca_btl_udapl_component.udapl_num_sends); + while(OPAL_THREAD_ADD32(&endpoint->endpoint_eager_sends, -1) >= 0 && + NULL != (frag = (mca_btl_udapl_frag_t*) + opal_list_remove_first(&endpoint->endpoint_eager_frags))) { cookie.as_ptr = frag; - if(frag->size == - mca_btl_udapl_component.udapl_eager_frag_size) { - rc = dat_ep_post_send(endpoint->endpoint_eager, 1, - &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); - } else if(frag->size == mca_btl_udapl_component.udapl_max_frag_size) { - rc = dat_ep_post_send(endpoint->endpoint_max, 1, - &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); - } else { - OPAL_OUTPUT((0, "btl_udapl ERROR unknown frag size\n")); - rc = !DAT_SUCCESS; - } - + assert(frag->triplet.virtual_address == (DAT_VADDR)frag->hdr); + assert(frag->triplet.segment_length == + frag->segment.seg_len + sizeof(mca_btl_base_header_t)); + assert(frag->size == + mca_btl_udapl_component.udapl_eager_frag_size); + rc = dat_ep_post_send(endpoint->endpoint_eager, 1, + &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); if(DAT_SUCCESS != rc) { - MCA_BTL_UDAPL_ERROR(rc, "dat_ep_post_send"); + MCA_BTL_UDAPL_ERROR(rc, "dat_ep_post_send (eager)"); endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; ret = OMPI_ERROR; break; } } - OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock); + if(endpoint->endpoint_eager_sends < 0) { + OPAL_THREAD_ADD32(&endpoint->endpoint_eager_sends, 1); + } + + assert(endpoint->endpoint_max_sends == + mca_btl_udapl_component.udapl_num_sends); + while(OPAL_THREAD_ADD32(&endpoint->endpoint_max_sends, -1) >= 0 && + NULL != (frag = (mca_btl_udapl_frag_t*) + opal_list_remove_first(&endpoint->endpoint_max_frags))) { + cookie.as_ptr = frag; + + assert(frag->triplet.virtual_address == (DAT_VADDR)frag->hdr); + assert(frag->triplet.segment_length == + frag->segment.seg_len + sizeof(mca_btl_base_header_t)); + assert(frag->size == + mca_btl_udapl_component.udapl_eager_frag_size); + + rc = dat_ep_post_send(endpoint->endpoint_max, 1, + &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); + if(DAT_SUCCESS != rc) { + MCA_BTL_UDAPL_ERROR(rc, "dat_ep_post_send (max)"); + endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED; + ret = OMPI_ERROR; + break; + } + } + + if(endpoint->endpoint_max_sends < 0) { + OPAL_THREAD_ADD32(&endpoint->endpoint_max_sends, 1); + } + OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); return ret; } @@ -423,17 +474,21 @@ static int mca_btl_udapl_endpoint_post_recv(mca_btl_udapl_endpoint_t* endpoint, { mca_btl_udapl_frag_t* frag = NULL; DAT_DTO_COOKIE cookie; + DAT_EP_HANDLE ep; int rc; int i; - OPAL_THREAD_LOCK(&endpoint->endpoint_recv_lock); - for(i = 0; i < mca_btl_udapl_component.udapl_num_repost; i++) { + for(i = 0; i < mca_btl_udapl_component.udapl_num_recvs; i++) { if(size == mca_btl_udapl_component.udapl_eager_frag_size) { MCA_BTL_UDAPL_FRAG_ALLOC_EAGER(endpoint->endpoint_btl, frag, rc); - } else if(size == mca_btl_udapl_component.udapl_max_frag_size) { + ep = endpoint->endpoint_eager; + } else { + assert(size == mca_btl_udapl_component.udapl_max_frag_size); MCA_BTL_UDAPL_FRAG_ALLOC_MAX(endpoint->endpoint_btl, frag, rc); + ep = endpoint->endpoint_max; } + assert(size == frag->size); /* Set up the LMR triplet from the frag segment */ /* Note that this triplet defines a sub-region of a registered LMR */ frag->triplet.virtual_address = (DAT_VADDR)frag->hdr; @@ -443,27 +498,20 @@ static int mca_btl_udapl_endpoint_post_recv(mca_btl_udapl_endpoint_t* endpoint, frag->endpoint = endpoint; frag->base.des_dst = &frag->segment; frag->base.des_dst_cnt = 1; + frag->base.des_src = NULL; + frag->base.des_src_cnt = 0; + frag->base.des_flags = 0; frag->type = MCA_BTL_UDAPL_RECV; cookie.as_ptr = frag; - if(size == mca_btl_udapl_component.udapl_eager_frag_size) { - rc = dat_ep_post_recv(frag->endpoint->endpoint_eager, 1, - &frag->triplet,cookie, DAT_COMPLETION_DEFAULT_FLAG); - } else if(size == mca_btl_udapl_component.udapl_max_frag_size) { - rc = dat_ep_post_recv(frag->endpoint->endpoint_max, 1, - &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); - } else { - rc = !DAT_SUCCESS; - } - + rc = dat_ep_post_recv(ep, 1, + &frag->triplet, cookie, DAT_COMPLETION_DEFAULT_FLAG); if(DAT_SUCCESS != rc) { MCA_BTL_UDAPL_ERROR(rc, "dat_ep_post_recv"); - OPAL_THREAD_UNLOCK(&endpoint->endpoint_recv_lock); return OMPI_ERROR; } } - OPAL_THREAD_UNLOCK(&endpoint->endpoint_recv_lock); return OMPI_SUCCESS; } @@ -478,13 +526,17 @@ static void mca_btl_udapl_endpoint_construct(mca_btl_base_endpoint_t* endpoint) { endpoint->endpoint_btl = 0; endpoint->endpoint_proc = 0; + + endpoint->endpoint_eager_sends = mca_btl_udapl_component.udapl_num_sends; + endpoint->endpoint_max_sends = mca_btl_udapl_component.udapl_num_sends; + endpoint->endpoint_state = MCA_BTL_UDAPL_CLOSED; endpoint->endpoint_eager = DAT_HANDLE_NULL; endpoint->endpoint_max = DAT_HANDLE_NULL; - OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t); - OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t); - OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t); + OBJ_CONSTRUCT(&endpoint->endpoint_eager_frags, opal_list_t); + OBJ_CONSTRUCT(&endpoint->endpoint_max_frags, opal_list_t); + OBJ_CONSTRUCT(&endpoint->endpoint_lock, opal_mutex_t); } @@ -495,9 +547,9 @@ static void mca_btl_udapl_endpoint_construct(mca_btl_base_endpoint_t* endpoint) static void mca_btl_udapl_endpoint_destruct(mca_btl_base_endpoint_t* endpoint) { - OBJ_DESTRUCT(&endpoint->endpoint_frags); - OBJ_DESTRUCT(&endpoint->endpoint_send_lock); - OBJ_DESTRUCT(&endpoint->endpoint_recv_lock); + OBJ_DESTRUCT(&endpoint->endpoint_eager_frags); + OBJ_DESTRUCT(&endpoint->endpoint_max_frags); + OBJ_DESTRUCT(&endpoint->endpoint_lock); } diff --git a/ompi/mca/btl/udapl/btl_udapl_endpoint.h b/ompi/mca/btl/udapl/btl_udapl_endpoint.h index 6931057934..7f7b9b0e1d 100644 --- a/ompi/mca/btl/udapl/btl_udapl_endpoint.h +++ b/ompi/mca/btl/udapl/btl_udapl_endpoint.h @@ -74,16 +74,19 @@ struct mca_btl_base_endpoint_t { mca_btl_udapl_endpoint_state_t endpoint_state; /**< current state of the endpoint connection */ - opal_list_t endpoint_frags; + opal_list_t endpoint_eager_frags; + opal_list_t endpoint_max_frags; /**< pending send frags on this endpoint */ - opal_mutex_t endpoint_send_lock; - /**< lock for concurrent access to endpoint state */ + int32_t endpoint_eager_sends; + int32_t endpoint_max_sends; + /**< number of sends that may be posted */ - opal_mutex_t endpoint_recv_lock; + opal_mutex_t endpoint_lock; /**< lock for concurrent access to endpoint state */ mca_btl_udapl_addr_t endpoint_addr; + /**< remote address on the other side of this endpoint */ DAT_EP_HANDLE endpoint_eager; DAT_EP_HANDLE endpoint_max; diff --git a/ompi/mca/btl/udapl/btl_udapl_frag.h b/ompi/mca/btl/udapl/btl_udapl_frag.h index 9a7ebb9cf9..0382c90932 100644 --- a/ompi/mca/btl/udapl/btl_udapl_frag.h +++ b/ompi/mca/btl/udapl/btl_udapl_frag.h @@ -43,7 +43,7 @@ typedef enum { * uDAPL fragment derived type. */ struct mca_btl_udapl_frag_t { - mca_btl_base_descriptor_t base; + mca_btl_base_descriptor_t base; mca_btl_base_segment_t segment; struct mca_btl_udapl_module_t* btl; @@ -117,22 +117,6 @@ OBJ_CLASS_DECLARATION(mca_btl_udapl_frag_user_t); } -#define MCA_BTL_UDAPL_FRAG_POST(btl,frag) \ -do { \ - if(opal_list_get_size(&btl->udapl_repost) < (size_t)btl->udapl_num_repost) { \ - OPAL_THREAD_LOCK(&btl->udapl_lock); \ - opal_list_append(&btl->udapl_repost, (opal_list_item_t*)frag); \ - OPAL_THREAD_UNLOCK(&btl->udapl_lock); \ - } else { \ - OPAL_THREAD_LOCK(&btl->udapl_lock); \ - do { \ - udapl_provide_receive_buffer(btl->port, frag->hdr, frag->size, frag->priority); \ - } while (NULL != (frag = (mca_btl_udapl_frag_t*)opal_list_remove_first(&btl->udapl_repost))); \ - OPAL_THREAD_UNLOCK(&btl->udapl_lock); \ - } \ -} while(0) - - #if defined(c_plusplus) || defined(__cplusplus) } #endif