merge in changes from release branch - acquire/release send token for put/get
This commit was SVN r7784.
Этот коммит содержится в:
родитель
e52448c10e
Коммит
c944988b9e
@ -434,6 +434,10 @@ static void mca_btl_gm_drop_callback( struct gm_port* port, void* context, gm_st
|
||||
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 );
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback on send completion and/or error
|
||||
*/
|
||||
|
||||
static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_status_t status )
|
||||
{
|
||||
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)context;
|
||||
@ -464,21 +468,11 @@ static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_st
|
||||
mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag);
|
||||
break;
|
||||
case GM_SUCCESS:
|
||||
/* release the send token */
|
||||
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 );
|
||||
|
||||
/* call the completion callback */
|
||||
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
|
||||
|
||||
/* check for pending fragments */
|
||||
if(opal_list_get_size(&btl->gm_pending)) {
|
||||
OPAL_THREAD_LOCK(&btl->gm_lock);
|
||||
frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&btl->gm_pending);
|
||||
OPAL_THREAD_UNLOCK(&btl->gm_lock);
|
||||
if(NULL != frag) {
|
||||
mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag);
|
||||
}
|
||||
}
|
||||
/* return the send token and deque pending fragments */
|
||||
MCA_BTL_GM_RETURN_TOKEN(btl);
|
||||
break;
|
||||
|
||||
default:
|
||||
@ -495,24 +489,6 @@ static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_st
|
||||
}
|
||||
|
||||
|
||||
static void mca_btl_gm_rdma_callback( struct gm_port* port, void* context, gm_status_t status )
|
||||
{
|
||||
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)context;
|
||||
mca_btl_gm_module_t* btl = frag->btl;
|
||||
|
||||
/* call the completion callback */
|
||||
switch(status) {
|
||||
case GM_SUCCESS:
|
||||
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
|
||||
break;
|
||||
default:
|
||||
opal_output(0, "[%s:%d] gm rdma operation failed with status %d\n", __FILE__, __LINE__, status);
|
||||
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Initiate an asynchronous send.
|
||||
*
|
||||
@ -535,17 +511,12 @@ int mca_btl_gm_send(
|
||||
frag->btl = gm_btl;
|
||||
frag->endpoint = endpoint;
|
||||
frag->hdr->tag = tag;
|
||||
frag->type = MCA_BTL_GM_SEND;
|
||||
|
||||
/* queue the descriptor if there are no send tokens */
|
||||
if(OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, -1) < 0) {
|
||||
OPAL_THREAD_LOCK(&gm_btl->gm_lock);
|
||||
opal_list_append(&gm_btl->gm_pending, (opal_list_item_t*)frag);
|
||||
OPAL_THREAD_UNLOCK(&gm_btl->gm_lock);
|
||||
OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, 1);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag);
|
||||
|
||||
/* post the send request */
|
||||
/* post the send descriptor */
|
||||
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
|
||||
frag->size == mca_btl_gm_component.gm_eager_frag_size) {
|
||||
gm_send_with_callback(
|
||||
@ -583,6 +554,62 @@ int mca_btl_gm_send(
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Callback on put completion and/or error
|
||||
*/
|
||||
|
||||
static void mca_btl_gm_put_callback( struct gm_port* port, void* context, gm_status_t status )
|
||||
{
|
||||
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)context;
|
||||
mca_btl_gm_module_t* btl = frag->btl;
|
||||
|
||||
/* call the completion callback */
|
||||
switch(status) {
|
||||
case GM_TRY_AGAIN:
|
||||
case GM_SEND_TIMED_OUT:
|
||||
case GM_TIMED_OUT:
|
||||
/* drop all sends to this destination port */
|
||||
gm_drop_sends(
|
||||
btl->port,
|
||||
(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) ? GM_HIGH_PRIORITY : GM_LOW_PRIORITY,
|
||||
frag->endpoint->endpoint_addr.node_id,
|
||||
frag->endpoint->endpoint_addr.port_id,
|
||||
mca_btl_gm_drop_callback,
|
||||
btl
|
||||
);
|
||||
|
||||
/* retry the failed fragment */
|
||||
mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base);
|
||||
break;
|
||||
case GM_SEND_DROPPED:
|
||||
/* release the send token */
|
||||
OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1);
|
||||
|
||||
/* retry the dropped fragment */
|
||||
mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base);
|
||||
break;
|
||||
case GM_SUCCESS:
|
||||
/* call completion callback */
|
||||
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
|
||||
|
||||
/* return the send token and deque pending fragments */
|
||||
MCA_BTL_GM_RETURN_TOKEN(btl);
|
||||
break;
|
||||
default:
|
||||
/* error condition can't deal with */
|
||||
opal_output(0, "[%s:%d] gm_put operation failed with status %d\n", __FILE__, __LINE__, status);
|
||||
|
||||
/* release the send token */
|
||||
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 );
|
||||
|
||||
/* call the completion callback */
|
||||
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Initiate an asynchronous put.
|
||||
*
|
||||
@ -602,7 +629,12 @@ int mca_btl_gm_put(
|
||||
|
||||
frag->btl = gm_btl;
|
||||
frag->endpoint = endpoint;
|
||||
frag->type = MCA_BTL_GM_PUT;
|
||||
|
||||
/* queue the descriptor if there are no send tokens */
|
||||
MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag);
|
||||
|
||||
/* post the put descriptor */
|
||||
gm_put(gm_btl->port,
|
||||
des->des_src->seg_addr.pval,
|
||||
des->des_dst->seg_addr.lval,
|
||||
@ -610,7 +642,7 @@ int mca_btl_gm_put(
|
||||
GM_LOW_PRIORITY,
|
||||
endpoint->endpoint_addr.node_id,
|
||||
endpoint->endpoint_addr.port_id,
|
||||
mca_btl_gm_rdma_callback,
|
||||
mca_btl_gm_put_callback,
|
||||
frag);
|
||||
return OMPI_SUCCESS;
|
||||
#else
|
||||
@ -619,6 +651,63 @@ int mca_btl_gm_put(
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Callback on get completion and/or error
|
||||
*/
|
||||
|
||||
static void mca_btl_gm_get_callback( struct gm_port* port, void* context, gm_status_t status )
|
||||
{
|
||||
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)context;
|
||||
mca_btl_gm_module_t* btl = frag->btl;
|
||||
|
||||
/* call the completion callback */
|
||||
switch(status) {
|
||||
case GM_TRY_AGAIN:
|
||||
case GM_SEND_TIMED_OUT:
|
||||
case GM_TIMED_OUT:
|
||||
/* drop all sends to this destination port */
|
||||
gm_drop_sends(
|
||||
btl->port,
|
||||
(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) ? GM_HIGH_PRIORITY : GM_LOW_PRIORITY,
|
||||
frag->endpoint->endpoint_addr.node_id,
|
||||
frag->endpoint->endpoint_addr.port_id,
|
||||
mca_btl_gm_drop_callback,
|
||||
btl
|
||||
);
|
||||
|
||||
/* retry the failed fragment */
|
||||
mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base);
|
||||
break;
|
||||
case GM_SEND_DROPPED:
|
||||
/* release the send token */
|
||||
OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1);
|
||||
|
||||
/* retry the dropped fragment */
|
||||
mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base);
|
||||
break;
|
||||
case GM_SUCCESS:
|
||||
/* call completion callback */
|
||||
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
|
||||
|
||||
/* return the send token and deque pending fragments */
|
||||
MCA_BTL_GM_RETURN_TOKEN(btl);
|
||||
break;
|
||||
default:
|
||||
/* error condition can't deal with */
|
||||
opal_output(0, "[%s:%d] gm_get operation failed with status %d\n", __FILE__, __LINE__, status);
|
||||
|
||||
/* release the send token */
|
||||
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 );
|
||||
|
||||
/* call the completion callback */
|
||||
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Initiate an asynchronous get.
|
||||
*
|
||||
@ -639,7 +728,12 @@ int mca_btl_gm_get(
|
||||
|
||||
frag->btl = gm_btl;
|
||||
frag->endpoint = endpoint;
|
||||
frag->type = MCA_BTL_GM_GET;
|
||||
|
||||
/* queue the descriptor if there are no send tokens */
|
||||
MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag);
|
||||
|
||||
/* post get put descriptor */
|
||||
gm_get(gm_btl->port,
|
||||
des->des_dst->seg_addr.lval,
|
||||
des->des_src->seg_addr.pval,
|
||||
@ -647,7 +741,7 @@ int mca_btl_gm_get(
|
||||
GM_LOW_PRIORITY,
|
||||
endpoint->endpoint_addr.node_id,
|
||||
endpoint->endpoint_addr.port_id,
|
||||
mca_btl_gm_rdma_callback,
|
||||
mca_btl_gm_get_callback,
|
||||
frag);
|
||||
return OMPI_SUCCESS;
|
||||
#else
|
||||
|
@ -312,6 +312,51 @@ extern mca_btl_base_descriptor_t* mca_btl_gm_prepare_dst(
|
||||
size_t* size);
|
||||
|
||||
|
||||
/**
|
||||
* Acquire a send token - queue the fragment if none available
|
||||
*/
|
||||
|
||||
#define MCA_BTL_GM_ACQUIRE_TOKEN(btl, frag) \
|
||||
do { \
|
||||
/* queue the descriptor if there are no send tokens */ \
|
||||
if(OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, -1) < 0) { \
|
||||
OPAL_THREAD_LOCK(&gm_btl->gm_lock); \
|
||||
opal_list_append(&gm_btl->gm_pending, (opal_list_item_t*)frag); \
|
||||
OPAL_THREAD_UNLOCK(&gm_btl->gm_lock); \
|
||||
OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, 1); \
|
||||
return OMPI_SUCCESS; \
|
||||
} \
|
||||
} while (0) \
|
||||
|
||||
/**
|
||||
* Return send token and dequeue and pending fragments
|
||||
*/
|
||||
|
||||
#define MCA_BTL_GM_RETURN_TOKEN(btl) \
|
||||
do { \
|
||||
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); \
|
||||
if(opal_list_get_size(&btl->gm_pending)) { \
|
||||
mca_btl_gm_frag_t* frag; \
|
||||
OPAL_THREAD_LOCK(&btl->gm_lock); \
|
||||
frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&btl->gm_pending); \
|
||||
OPAL_THREAD_UNLOCK(&btl->gm_lock); \
|
||||
if(NULL != frag) { \
|
||||
switch(frag->type) { \
|
||||
case MCA_BTL_GM_SEND: \
|
||||
mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); \
|
||||
break; \
|
||||
case MCA_BTL_GM_PUT: \
|
||||
mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base); \
|
||||
break; \
|
||||
case MCA_BTL_GM_GET: \
|
||||
mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base); \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -502,7 +502,8 @@ int mca_btl_gm_component_progress()
|
||||
int count = 0;
|
||||
size_t i;
|
||||
|
||||
if(OPAL_THREAD_ADD32(&inprogress, 1) != 1) {
|
||||
/* could get into deadlock in this case as we post recvs after callback completes */
|
||||
if(OPAL_THREAD_ADD32(&inprogress, 1) >= mca_btl_gm_component.gm_num_repost) {
|
||||
OPAL_THREAD_ADD32(&inprogress, -1);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -28,6 +28,12 @@ extern "C" {
|
||||
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_gm_frag_t);
|
||||
|
||||
|
||||
typedef enum {
|
||||
MCA_BTL_GM_SEND,
|
||||
MCA_BTL_GM_PUT,
|
||||
MCA_BTL_GM_GET
|
||||
} mca_btl_gm_frag_type_t;
|
||||
|
||||
|
||||
/**
|
||||
* GM send fragment derived type.
|
||||
@ -41,6 +47,7 @@ struct mca_btl_gm_frag_t {
|
||||
mca_btl_base_header_t *hdr;
|
||||
size_t size;
|
||||
enum gm_priority priority;
|
||||
mca_btl_gm_frag_type_t type;
|
||||
};
|
||||
typedef struct mca_btl_gm_frag_t mca_btl_gm_frag_t;
|
||||
OBJ_CLASS_DECLARATION(mca_btl_gm_frag_t);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user