diff --git a/ompi/mca/btl/gm/btl_gm.c b/ompi/mca/btl/gm/btl_gm.c index 60908a0758..da6362a42e 100644 --- a/ompi/mca/btl/gm/btl_gm.c +++ b/ompi/mca/btl/gm/btl_gm.c @@ -434,6 +434,10 @@ static void mca_btl_gm_drop_callback( struct gm_port* port, void* context, gm_st OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); } +/** + * Callback on send completion and/or error + */ + static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_status_t status ) { mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)context; @@ -464,21 +468,11 @@ static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_st mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); break; case GM_SUCCESS: - /* release the send token */ - OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); - /* call the completion callback */ frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); - /* check for pending fragments */ - if(opal_list_get_size(&btl->gm_pending)) { - OPAL_THREAD_LOCK(&btl->gm_lock); - frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&btl->gm_pending); - OPAL_THREAD_UNLOCK(&btl->gm_lock); - if(NULL != frag) { - mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); - } - } + /* return the send token and deque pending fragments */ + MCA_BTL_GM_RETURN_TOKEN(btl); break; default: @@ -495,24 +489,6 @@ static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_st } -static void mca_btl_gm_rdma_callback( struct gm_port* port, void* context, gm_status_t status ) -{ - mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)context; - mca_btl_gm_module_t* btl = frag->btl; - - /* call the completion callback */ - switch(status) { - case GM_SUCCESS: - frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); - break; - default: - opal_output(0, "[%s:%d] gm rdma operation failed with status %d\n", __FILE__, __LINE__, status); - frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR); - break; - } -} - - /** * Initiate an asynchronous send. * @@ -535,17 +511,12 @@ int mca_btl_gm_send( frag->btl = gm_btl; frag->endpoint = endpoint; frag->hdr->tag = tag; + frag->type = MCA_BTL_GM_SEND; /* queue the descriptor if there are no send tokens */ - if(OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, -1) < 0) { - OPAL_THREAD_LOCK(&gm_btl->gm_lock); - opal_list_append(&gm_btl->gm_pending, (opal_list_item_t*)frag); - OPAL_THREAD_UNLOCK(&gm_btl->gm_lock); - OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, 1); - return OMPI_SUCCESS; - } + MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag); - /* post the send request */ + /* post the send descriptor */ if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY && frag->size == mca_btl_gm_component.gm_eager_frag_size) { gm_send_with_callback( @@ -583,6 +554,62 @@ int mca_btl_gm_send( } + +/** + * Callback on put completion and/or error + */ + +static void mca_btl_gm_put_callback( struct gm_port* port, void* context, gm_status_t status ) +{ + mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)context; + mca_btl_gm_module_t* btl = frag->btl; + + /* call the completion callback */ + switch(status) { + case GM_TRY_AGAIN: + case GM_SEND_TIMED_OUT: + case GM_TIMED_OUT: + /* drop all sends to this destination port */ + gm_drop_sends( + btl->port, + (frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) ? GM_HIGH_PRIORITY : GM_LOW_PRIORITY, + frag->endpoint->endpoint_addr.node_id, + frag->endpoint->endpoint_addr.port_id, + mca_btl_gm_drop_callback, + btl + ); + + /* retry the failed fragment */ + mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base); + break; + case GM_SEND_DROPPED: + /* release the send token */ + OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1); + + /* retry the dropped fragment */ + mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base); + break; + case GM_SUCCESS: + /* call completion callback */ + frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); + + /* return the send token and deque pending fragments */ + MCA_BTL_GM_RETURN_TOKEN(btl); + break; + default: + /* error condition can't deal with */ + opal_output(0, "[%s:%d] gm_put operation failed with status %d\n", __FILE__, __LINE__, status); + + /* release the send token */ + OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); + + /* call the completion callback */ + frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR); + break; + } +} + + /** * Initiate an asynchronous put. * @@ -602,7 +629,12 @@ int mca_btl_gm_put( frag->btl = gm_btl; frag->endpoint = endpoint; + frag->type = MCA_BTL_GM_PUT; + /* queue the descriptor if there are no send tokens */ + MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag); + + /* post the put descriptor */ gm_put(gm_btl->port, des->des_src->seg_addr.pval, des->des_dst->seg_addr.lval, @@ -610,7 +642,7 @@ int mca_btl_gm_put( GM_LOW_PRIORITY, endpoint->endpoint_addr.node_id, endpoint->endpoint_addr.port_id, - mca_btl_gm_rdma_callback, + mca_btl_gm_put_callback, frag); return OMPI_SUCCESS; #else @@ -619,6 +651,63 @@ int mca_btl_gm_put( } + +/** + * Callback on get completion and/or error + */ + +static void mca_btl_gm_get_callback( struct gm_port* port, void* context, gm_status_t status ) +{ + mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)context; + mca_btl_gm_module_t* btl = frag->btl; + + /* call the completion callback */ + switch(status) { + case GM_TRY_AGAIN: + case GM_SEND_TIMED_OUT: + case GM_TIMED_OUT: + /* drop all sends to this destination port */ + gm_drop_sends( + btl->port, + (frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) ? GM_HIGH_PRIORITY : GM_LOW_PRIORITY, + frag->endpoint->endpoint_addr.node_id, + frag->endpoint->endpoint_addr.port_id, + mca_btl_gm_drop_callback, + btl + ); + + /* retry the failed fragment */ + mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base); + break; + case GM_SEND_DROPPED: + /* release the send token */ + OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1); + + /* retry the dropped fragment */ + mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base); + break; + case GM_SUCCESS: + /* call completion callback */ + frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); + + /* return the send token and deque pending fragments */ + MCA_BTL_GM_RETURN_TOKEN(btl); + break; + default: + /* error condition can't deal with */ + opal_output(0, "[%s:%d] gm_get operation failed with status %d\n", __FILE__, __LINE__, status); + + /* release the send token */ + OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); + + /* call the completion callback */ + frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR); + break; + } +} + + + /** * Initiate an asynchronous get. * @@ -639,7 +728,12 @@ int mca_btl_gm_get( frag->btl = gm_btl; frag->endpoint = endpoint; + frag->type = MCA_BTL_GM_GET; + /* queue the descriptor if there are no send tokens */ + MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag); + + /* post get put descriptor */ gm_get(gm_btl->port, des->des_dst->seg_addr.lval, des->des_src->seg_addr.pval, @@ -647,7 +741,7 @@ int mca_btl_gm_get( GM_LOW_PRIORITY, endpoint->endpoint_addr.node_id, endpoint->endpoint_addr.port_id, - mca_btl_gm_rdma_callback, + mca_btl_gm_get_callback, frag); return OMPI_SUCCESS; #else diff --git a/ompi/mca/btl/gm/btl_gm.h b/ompi/mca/btl/gm/btl_gm.h index 8b71451273..921812d5ea 100644 --- a/ompi/mca/btl/gm/btl_gm.h +++ b/ompi/mca/btl/gm/btl_gm.h @@ -312,6 +312,51 @@ extern mca_btl_base_descriptor_t* mca_btl_gm_prepare_dst( size_t* size); +/** + * Acquire a send token - queue the fragment if none available + */ + +#define MCA_BTL_GM_ACQUIRE_TOKEN(btl, frag) \ +do { \ + /* queue the descriptor if there are no send tokens */ \ + if(OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, -1) < 0) { \ + OPAL_THREAD_LOCK(&gm_btl->gm_lock); \ + opal_list_append(&gm_btl->gm_pending, (opal_list_item_t*)frag); \ + OPAL_THREAD_UNLOCK(&gm_btl->gm_lock); \ + OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, 1); \ + return OMPI_SUCCESS; \ + } \ +} while (0) \ + +/** + * Return send token and dequeue and pending fragments + */ + +#define MCA_BTL_GM_RETURN_TOKEN(btl) \ +do { \ + OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); \ + if(opal_list_get_size(&btl->gm_pending)) { \ + mca_btl_gm_frag_t* frag; \ + OPAL_THREAD_LOCK(&btl->gm_lock); \ + frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&btl->gm_pending); \ + OPAL_THREAD_UNLOCK(&btl->gm_lock); \ + if(NULL != frag) { \ + switch(frag->type) { \ + case MCA_BTL_GM_SEND: \ + mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); \ + break; \ + case MCA_BTL_GM_PUT: \ + mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base); \ + break; \ + case MCA_BTL_GM_GET: \ + mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base); \ + break; \ + } \ + } \ + } \ +} while (0) + + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/ompi/mca/btl/gm/btl_gm_component.c b/ompi/mca/btl/gm/btl_gm_component.c index f986d7ba5f..964b1f743d 100644 --- a/ompi/mca/btl/gm/btl_gm_component.c +++ b/ompi/mca/btl/gm/btl_gm_component.c @@ -502,7 +502,8 @@ int mca_btl_gm_component_progress() int count = 0; size_t i; - if(OPAL_THREAD_ADD32(&inprogress, 1) != 1) { + /* could get into deadlock in this case as we post recvs after callback completes */ + if(OPAL_THREAD_ADD32(&inprogress, 1) >= mca_btl_gm_component.gm_num_repost) { OPAL_THREAD_ADD32(&inprogress, -1); return OMPI_SUCCESS; } diff --git a/ompi/mca/btl/gm/btl_gm_frag.h b/ompi/mca/btl/gm/btl_gm_frag.h index a2bd779894..63e2c90838 100644 --- a/ompi/mca/btl/gm/btl_gm_frag.h +++ b/ompi/mca/btl/gm/btl_gm_frag.h @@ -28,6 +28,12 @@ extern "C" { OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_gm_frag_t); +typedef enum { + MCA_BTL_GM_SEND, + MCA_BTL_GM_PUT, + MCA_BTL_GM_GET +} mca_btl_gm_frag_type_t; + /** * GM send fragment derived type. @@ -41,6 +47,7 @@ struct mca_btl_gm_frag_t { mca_btl_base_header_t *hdr; size_t size; enum gm_priority priority; + mca_btl_gm_frag_type_t type; }; typedef struct mca_btl_gm_frag_t mca_btl_gm_frag_t; OBJ_CLASS_DECLARATION(mca_btl_gm_frag_t);