1
1

- corrected locking in gm btl - gm api is not thread safe

- initial support for gm progress thread
- corrected threading issue in pml
- added polling progress for a configurable number of cycles to wait for threaded case

This commit was SVN r9188.
Этот коммит содержится в:
Tim Woodall 2006-03-02 00:39:07 +00:00
родитель 84d3055db5
Коммит 8bf6ed7a36
12 изменённых файлов: 463 добавлений и 66 удалений

Просмотреть файл

@ -33,6 +33,28 @@
#include "ompi/mca/mpool/mpool.h" #include "ompi/mca/mpool/mpool.h"
#include "ompi/proc/proc.h" #include "ompi/proc/proc.h"
/**
* Non-locking versions of public interfaces.
*/
static int mca_btl_gm_send_nl(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* des,
mca_btl_base_tag_t tag);
static int mca_btl_gm_get_nl(
mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* des);
static int mca_btl_gm_put_nl(
mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* des);
mca_btl_gm_module_t mca_btl_gm_module = { mca_btl_gm_module_t mca_btl_gm_module = {
{ {
&mca_btl_gm_component.super, &mca_btl_gm_component.super,
@ -53,9 +75,15 @@ mca_btl_gm_module_t mca_btl_gm_module = {
mca_btl_gm_free, mca_btl_gm_free,
mca_btl_gm_prepare_src, mca_btl_gm_prepare_src,
mca_btl_gm_prepare_dst, mca_btl_gm_prepare_dst,
#if OMPI_ENABLE_MPI_THREADS || OMPI_ENABLE_PROGRESS_THREADS
mca_btl_gm_send, mca_btl_gm_send,
mca_btl_gm_put, mca_btl_gm_put,
NULL /* get */ mca_btl_gm_get
#else
mca_btl_gm_send_nl,
mca_btl_gm_put_nl,
mca_btl_gm_get_nl
#endif
} }
}; };
@ -438,7 +466,8 @@ static void mca_btl_gm_drop_callback( struct gm_port* port, void* context, gm_st
} }
/** /**
* Callback on send completion and/or error * Callback on send completion and/or error.
* Called with mca_btl_gm_component.gm_lock held.
*/ */
static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_status_t status ) static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_status_t status )
@ -461,18 +490,20 @@ static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_st
); );
/* retry the failed fragment */ /* retry the failed fragment */
mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); mca_btl_gm_send_nl(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag);
break; break;
case GM_SEND_DROPPED: case GM_SEND_DROPPED:
/* release the send token */ /* release the send token */
OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1); OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1);
/* retry the dropped fragment */ /* retry the dropped fragment */
mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); mca_btl_gm_send_nl(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag);
break; break;
case GM_SUCCESS: case GM_SUCCESS:
/* call the completion callback */ /* call the completion callback */
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
/* return the send token and deque pending fragments */ /* return the send token and deque pending fragments */
MCA_BTL_GM_RETURN_TOKEN(btl); MCA_BTL_GM_RETURN_TOKEN(btl);
@ -486,12 +517,78 @@ static void mca_btl_gm_send_callback( struct gm_port* port, void* context, gm_st
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 );
/* call the completion callback */ /* call the completion callback */
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR); frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
break; break;
} }
} }
/**
* Initiate an asynchronous send. Do NOT acquire gm lock, must already be held,
* or in an unthreaded environment.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transfered
* @param tag (IN) The tag value used to notify the peer.
*/
static int mca_btl_gm_send_nl(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* des,
mca_btl_base_tag_t tag)
{
mca_btl_gm_module_t* gm_btl = (mca_btl_gm_module_t*) btl;
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)des;
frag->btl = gm_btl;
frag->endpoint = endpoint;
frag->hdr->tag = tag;
frag->type = MCA_BTL_GM_SEND;
/* queue the descriptor if there are no send tokens */
MCA_BTL_GM_ACQUIRE_TOKEN_NL(gm_btl, frag);
/* post the send descriptor */
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
frag->size == mca_btl_gm_component.gm_eager_frag_size) {
gm_send_with_callback(
gm_btl->port,
frag->hdr,
mca_btl_gm_component.gm_eager_frag_size,
frag->segment.seg_len + sizeof(mca_btl_base_header_t),
GM_HIGH_PRIORITY,
endpoint->endpoint_addr.node_id,
endpoint->endpoint_addr.port_id,
mca_btl_gm_send_callback,
frag);
} else {
gm_send_with_callback(
gm_btl->port,
frag->hdr,
mca_btl_gm_component.gm_max_frag_size,
frag->segment.seg_len + sizeof(mca_btl_base_header_t),
GM_LOW_PRIORITY,
endpoint->endpoint_addr.node_id,
endpoint->endpoint_addr.port_id,
mca_btl_gm_send_callback,
frag);
}
if(opal_list_get_size(&gm_btl->gm_repost)) {
mca_btl_gm_frag_t* frag;
while(NULL != (frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&gm_btl->gm_repost))) {
gm_provide_receive_buffer(gm_btl->port, frag->hdr, frag->size, frag->priority);
}
}
return OMPI_SUCCESS;
}
/** /**
* Initiate an asynchronous send. * Initiate an asynchronous send.
* *
@ -517,6 +614,7 @@ int mca_btl_gm_send(
frag->type = MCA_BTL_GM_SEND; frag->type = MCA_BTL_GM_SEND;
/* queue the descriptor if there are no send tokens */ /* queue the descriptor if there are no send tokens */
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag); MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag);
/* post the send descriptor */ /* post the send descriptor */
@ -547,19 +645,19 @@ int mca_btl_gm_send(
if(opal_list_get_size(&gm_btl->gm_repost)) { if(opal_list_get_size(&gm_btl->gm_repost)) {
mca_btl_gm_frag_t* frag; mca_btl_gm_frag_t* frag;
OPAL_THREAD_LOCK(&gm_btl->gm_lock);
while(NULL != (frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&gm_btl->gm_repost))) { while(NULL != (frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&gm_btl->gm_repost))) {
gm_provide_receive_buffer(gm_btl->port, frag->hdr, frag->size, frag->priority); gm_provide_receive_buffer(gm_btl->port, frag->hdr, frag->size, frag->priority);
} }
OPAL_THREAD_UNLOCK(&gm_btl->gm_lock);
} }
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/** /**
* Callback on put completion and/or error * Callback on put completion and/or error.
* Called with mca_btl_gm_component.gm_lock held.
*/ */
static void mca_btl_gm_put_callback( struct gm_port* port, void* context, gm_status_t status ) static void mca_btl_gm_put_callback( struct gm_port* port, void* context, gm_status_t status )
@ -573,6 +671,7 @@ static void mca_btl_gm_put_callback( struct gm_port* port, void* context, gm_sta
case GM_SEND_TIMED_OUT: case GM_SEND_TIMED_OUT:
case GM_TIMED_OUT: case GM_TIMED_OUT:
/* drop all sends to this destination port */ /* drop all sends to this destination port */
gm_drop_sends( gm_drop_sends(
btl->port, btl->port,
(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) ? GM_HIGH_PRIORITY : GM_LOW_PRIORITY, (frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY) ? GM_HIGH_PRIORITY : GM_LOW_PRIORITY,
@ -583,18 +682,20 @@ static void mca_btl_gm_put_callback( struct gm_port* port, void* context, gm_sta
); );
/* retry the failed fragment */ /* retry the failed fragment */
mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base); mca_btl_gm_put_nl(&btl->super, frag->endpoint, &frag->base);
break; break;
case GM_SEND_DROPPED: case GM_SEND_DROPPED:
/* release the send token */ /* release the send token */
OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1); OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1);
/* retry the dropped fragment */ /* retry the dropped fragment */
mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base); mca_btl_gm_put_nl(&btl->super, frag->endpoint, &frag->base);
break; break;
case GM_SUCCESS: case GM_SUCCESS:
/* call completion callback */ /* call completion callback */
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
/* return the send token and deque pending fragments */ /* return the send token and deque pending fragments */
MCA_BTL_GM_RETURN_TOKEN(btl); MCA_BTL_GM_RETURN_TOKEN(btl);
@ -607,12 +708,55 @@ static void mca_btl_gm_put_callback( struct gm_port* port, void* context, gm_sta
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 );
/* call the completion callback */ /* call the completion callback */
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR); frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
break; break;
} }
} }
/**
* Initiate an asynchronous put. Do not acquire lock.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*/
static int mca_btl_gm_put_nl(
mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* des)
{
#if OMPI_MCA_BTL_GM_HAVE_RDMA_PUT
mca_btl_gm_module_t* gm_btl = (mca_btl_gm_module_t*) btl;
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*) des;
frag->btl = gm_btl;
frag->endpoint = endpoint;
frag->type = MCA_BTL_GM_PUT;
/* queue the descriptor if there are no send tokens */
MCA_BTL_GM_ACQUIRE_TOKEN_NL(gm_btl, frag);
/* post the put descriptor */
gm_put(gm_btl->port,
des->des_src->seg_addr.pval,
des->des_dst->seg_addr.lval,
des->des_src->seg_len,
GM_LOW_PRIORITY,
endpoint->endpoint_addr.node_id,
endpoint->endpoint_addr.port_id,
mca_btl_gm_put_callback,
frag);
return OMPI_SUCCESS;
#else
return OMPI_ERR_NOT_IMPLEMENTED;
#endif
}
/** /**
* Initiate an asynchronous put. * Initiate an asynchronous put.
* *
@ -635,6 +779,7 @@ int mca_btl_gm_put(
frag->type = MCA_BTL_GM_PUT; frag->type = MCA_BTL_GM_PUT;
/* queue the descriptor if there are no send tokens */ /* queue the descriptor if there are no send tokens */
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag); MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag);
/* post the put descriptor */ /* post the put descriptor */
@ -647,6 +792,7 @@ int mca_btl_gm_put(
endpoint->endpoint_addr.port_id, endpoint->endpoint_addr.port_id,
mca_btl_gm_put_callback, mca_btl_gm_put_callback,
frag); frag);
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
#else #else
return OMPI_ERR_NOT_IMPLEMENTED; return OMPI_ERR_NOT_IMPLEMENTED;
@ -656,7 +802,8 @@ int mca_btl_gm_put(
/** /**
* Callback on get completion and/or error * Callback on get completion and/or error.
* Called with mca_btl_gm_component.gm_lock held.
*/ */
static void mca_btl_gm_get_callback( struct gm_port* port, void* context, gm_status_t status ) static void mca_btl_gm_get_callback( struct gm_port* port, void* context, gm_status_t status )
@ -680,18 +827,20 @@ static void mca_btl_gm_get_callback( struct gm_port* port, void* context, gm_sta
); );
/* retry the failed fragment */ /* retry the failed fragment */
mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base); mca_btl_gm_get_nl(&btl->super, frag->endpoint, &frag->base);
break; break;
case GM_SEND_DROPPED: case GM_SEND_DROPPED:
/* release the send token */ /* release the send token */
OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1); OPAL_THREAD_ADD32(&btl->gm_num_send_tokens, 1);
/* retry the dropped fragment */ /* retry the dropped fragment */
mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base); mca_btl_gm_get_nl(&btl->super, frag->endpoint, &frag->base);
break; break;
case GM_SUCCESS: case GM_SUCCESS:
/* call completion callback */ /* call completion callback */
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS); frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_SUCCESS);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
/* return the send token and deque pending fragments */ /* return the send token and deque pending fragments */
MCA_BTL_GM_RETURN_TOKEN(btl); MCA_BTL_GM_RETURN_TOKEN(btl);
@ -704,12 +853,55 @@ static void mca_btl_gm_get_callback( struct gm_port* port, void* context, gm_sta
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 );
/* call the completion callback */ /* call the completion callback */
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR); frag->base.des_cbfunc(&btl->super, frag->endpoint, &frag->base, OMPI_ERROR);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
break; break;
} }
} }
/**
* Initiate an asynchronous get. No locking.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*
*/
static int mca_btl_gm_get_nl(
mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* des)
{
#if OMPI_MCA_BTL_GM_HAVE_RDMA_GET
mca_btl_gm_module_t* gm_btl = (mca_btl_gm_module_t*) btl;
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*) des;
frag->btl = gm_btl;
frag->endpoint = endpoint;
frag->type = MCA_BTL_GM_GET;
/* queue the descriptor if there are no send tokens */
MCA_BTL_GM_ACQUIRE_TOKEN_NL(gm_btl, frag);
/* post get put descriptor */
gm_get(gm_btl->port,
des->des_dst->seg_addr.lval,
des->des_src->seg_addr.pval,
des->des_src->seg_len,
GM_LOW_PRIORITY,
endpoint->endpoint_addr.node_id,
endpoint->endpoint_addr.port_id,
mca_btl_gm_get_callback,
frag);
return OMPI_SUCCESS;
#else
return OMPI_ERR_NOT_IMPLEMENTED;
#endif
}
/** /**
* Initiate an asynchronous get. * Initiate an asynchronous get.
@ -734,6 +926,7 @@ int mca_btl_gm_get(
frag->type = MCA_BTL_GM_GET; frag->type = MCA_BTL_GM_GET;
/* queue the descriptor if there are no send tokens */ /* queue the descriptor if there are no send tokens */
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag); MCA_BTL_GM_ACQUIRE_TOKEN(gm_btl, frag);
/* post get put descriptor */ /* post get put descriptor */
@ -746,6 +939,7 @@ int mca_btl_gm_get(
endpoint->endpoint_addr.port_id, endpoint->endpoint_addr.port_id,
mca_btl_gm_get_callback, mca_btl_gm_get_callback,
frag); frag);
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
#else #else
return OMPI_ERR_NOT_IMPLEMENTED; return OMPI_ERR_NOT_IMPLEMENTED;
@ -782,7 +976,6 @@ int mca_btl_gm_finalize(struct mca_btl_base_module_t* btl)
} }
#endif #endif
OBJ_DESTRUCT(&gm_btl->gm_lock);
OBJ_DESTRUCT(&gm_btl->gm_frag_eager); OBJ_DESTRUCT(&gm_btl->gm_frag_eager);
OBJ_DESTRUCT(&gm_btl->gm_frag_max); OBJ_DESTRUCT(&gm_btl->gm_frag_max);
OBJ_DESTRUCT(&gm_btl->gm_frag_user); OBJ_DESTRUCT(&gm_btl->gm_frag_user);

Просмотреть файл

@ -107,7 +107,10 @@ struct mca_btl_gm_module_t {
opal_list_t gm_pending; /**< list of pending send descriptors */ opal_list_t gm_pending; /**< list of pending send descriptors */
opal_list_t gm_repost; /**< list of pending fragments */ opal_list_t gm_repost; /**< list of pending fragments */
opal_list_t gm_mru_reg; /**< list of most recently used registrations */ opal_list_t gm_mru_reg; /**< list of most recently used registrations */
opal_mutex_t gm_lock;
#if OMPI_ENABLE_PROGRESS_THREADS
opal_thread_t gm_thread;
#endif
}; };
typedef struct mca_btl_gm_module_t mca_btl_gm_module_t; typedef struct mca_btl_gm_module_t mca_btl_gm_module_t;
extern mca_btl_gm_module_t mca_btl_gm_module; extern mca_btl_gm_module_t mca_btl_gm_module;
@ -318,20 +321,31 @@ extern mca_btl_base_descriptor_t* mca_btl_gm_prepare_dst(
* Acquire a send token - queue the fragment if none available * Acquire a send token - queue the fragment if none available
*/ */
#define MCA_BTL_GM_ACQUIRE_TOKEN_NL(btl, frag) \
do { \
/* queue the descriptor if there are no send tokens */ \
if(OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, -1) < 0) { \
opal_list_append(&gm_btl->gm_pending, (opal_list_item_t*)frag); \
OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, 1); \
return OMPI_SUCCESS; \
} \
} while (0) \
#define MCA_BTL_GM_ACQUIRE_TOKEN(btl, frag) \ #define MCA_BTL_GM_ACQUIRE_TOKEN(btl, frag) \
do { \ do { \
/* queue the descriptor if there are no send tokens */ \ /* queue the descriptor if there are no send tokens */ \
if(OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, -1) < 0) { \ if(OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, -1) < 0) { \
OPAL_THREAD_LOCK(&gm_btl->gm_lock); \
opal_list_append(&gm_btl->gm_pending, (opal_list_item_t*)frag); \ opal_list_append(&gm_btl->gm_pending, (opal_list_item_t*)frag); \
OPAL_THREAD_UNLOCK(&gm_btl->gm_lock); \
OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, 1); \ OPAL_THREAD_ADD32(&gm_btl->gm_num_send_tokens, 1); \
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock); \
return OMPI_SUCCESS; \ return OMPI_SUCCESS; \
} \ } \
} while (0) \ } while (0) \
/** /**
* Return send token and dequeue and pending fragments * Return send token and dequeue and pending fragments
* mca_btl_gm_component.gm_lock is already held.
*/ */
#define MCA_BTL_GM_RETURN_TOKEN(btl) \ #define MCA_BTL_GM_RETURN_TOKEN(btl) \
@ -339,19 +353,17 @@ do {
OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); \ OPAL_THREAD_ADD32( &btl->gm_num_send_tokens, 1 ); \
if(opal_list_get_size(&btl->gm_pending)) { \ if(opal_list_get_size(&btl->gm_pending)) { \
mca_btl_gm_frag_t* frag; \ mca_btl_gm_frag_t* frag; \
OPAL_THREAD_LOCK(&btl->gm_lock); \
frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&btl->gm_pending); \ frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&btl->gm_pending); \
OPAL_THREAD_UNLOCK(&btl->gm_lock); \
if(NULL != frag) { \ if(NULL != frag) { \
switch(frag->type) { \ switch(frag->type) { \
case MCA_BTL_GM_SEND: \ case MCA_BTL_GM_SEND: \
mca_btl_gm_send(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); \ mca_btl_gm_send_nl(&btl->super, frag->endpoint, &frag->base, frag->hdr->tag); \
break; \ break; \
case MCA_BTL_GM_PUT: \ case MCA_BTL_GM_PUT: \
mca_btl_gm_put(&btl->super, frag->endpoint, &frag->base); \ mca_btl_gm_put_nl(&btl->super, frag->endpoint, &frag->base); \
break; \ break; \
case MCA_BTL_GM_GET: \ case MCA_BTL_GM_GET: \
mca_btl_gm_get(&btl->super, frag->endpoint, &frag->base); \ mca_btl_gm_get_nl(&btl->super, frag->endpoint, &frag->base); \
break; \ break; \
} \ } \
} \ } \

Просмотреть файл

@ -25,6 +25,7 @@
#include "opal/util/output.h" #include "opal/util/output.h"
#include "ompi/mca/pml/pml.h" #include "ompi/mca/pml/pml.h"
#include "ompi/mca/btl/btl.h" #include "ompi/mca/btl/btl.h"
#include "ompi/request/request.h"
#include "opal/mca/base/mca_base_param.h" #include "opal/mca/base/mca_base_param.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
@ -40,6 +41,12 @@
#include "orte/util/proc_info.h" #include "orte/util/proc_info.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h" #include "ompi/mca/pml/base/pml_base_module_exchange.h"
#if OMPI_ENABLE_PROGRESS_THREADS
static void* mca_btl_gm_progress_thread( opal_object_t* arg );
#endif
mca_btl_gm_component_t mca_btl_gm_component = { mca_btl_gm_component_t mca_btl_gm_component = {
{ {
/* First, the mca_base_component_t struct containing meta information /* First, the mca_base_component_t struct containing meta information
@ -214,7 +221,7 @@ mca_btl_gm_module_init (mca_btl_gm_module_t * btl)
OBJ_CONSTRUCT(&btl->gm_pending, opal_list_t); OBJ_CONSTRUCT(&btl->gm_pending, opal_list_t);
OBJ_CONSTRUCT(&btl->gm_repost, opal_list_t); OBJ_CONSTRUCT(&btl->gm_repost, opal_list_t);
OBJ_CONSTRUCT(&btl->gm_mru_reg, opal_list_t); OBJ_CONSTRUCT(&btl->gm_mru_reg, opal_list_t);
OBJ_CONSTRUCT(&btl->gm_lock, opal_mutex_t); OBJ_CONSTRUCT(&btl->gm_thread, opal_thread_t);
/* query nic tokens */ /* query nic tokens */
btl->gm_num_send_tokens = gm_num_send_tokens (btl->port); btl->gm_num_send_tokens = gm_num_send_tokens (btl->port);
@ -304,6 +311,16 @@ mca_btl_gm_module_init (mca_btl_gm_module_t * btl)
opal_output (0, "[%s:%d] unable to allow remote memory access", __FILE__, __LINE__); opal_output (0, "[%s:%d] unable to allow remote memory access", __FILE__, __LINE__);
return OMPI_ERROR; return OMPI_ERROR;
} }
#if OMPI_ENABLE_PROGRESS_THREADS
/* start progress thread */
btl->gm_thread.t_run = mca_btl_gm_progress_thread;
btl->gm_thread.t_arg = btl;
if(OPAL_SUCCESS != (rc = opal_thread_start(&btl->gm_thread))) {
opal_output (0, "[%s:%d] unable to create progress thread, retval=%d", __FILE__, __LINE__, rc);
return rc;
}
#endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -446,11 +463,14 @@ mca_btl_gm_component_init (int *num_btl_modules,
mca_btl_base_module_t **btls; mca_btl_base_module_t **btls;
*num_btl_modules = 0; *num_btl_modules = 0;
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
/* try to initialize GM */ /* try to initialize GM */
if( GM_SUCCESS != gm_init() ) { if( GM_SUCCESS != gm_init() ) {
opal_output( 0, "[%s:%d] error in initializing the gm library\n", __FILE__, __LINE__ ); opal_output( 0, "[%s:%d] error in initializing the gm library\n", __FILE__, __LINE__ );
mca_btl_gm_component.gm_num_btls = 0; mca_btl_gm_component.gm_num_btls = 0;
mca_btl_gm_modex_send(); mca_btl_gm_modex_send();
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return NULL; return NULL;
} }
@ -458,6 +478,7 @@ mca_btl_gm_component_init (int *num_btl_modules,
mca_btl_gm_component.gm_btls = malloc( mca_btl_gm_component.gm_max_btls * sizeof (mca_btl_gm_module_t *)); mca_btl_gm_component.gm_btls = malloc( mca_btl_gm_component.gm_max_btls * sizeof (mca_btl_gm_module_t *));
if (NULL == mca_btl_gm_component.gm_btls) { if (NULL == mca_btl_gm_component.gm_btls) {
opal_output( 0, "[%s:%d] out of resources.", __FILE__, __LINE__ ); opal_output( 0, "[%s:%d] out of resources.", __FILE__, __LINE__ );
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return NULL; return NULL;
} }
@ -466,17 +487,20 @@ mca_btl_gm_component_init (int *num_btl_modules,
mca_btl_base_error_no_nics("Myrinet/GM", "NIC"); mca_btl_base_error_no_nics("Myrinet/GM", "NIC");
mca_btl_gm_component.gm_num_btls = 0; mca_btl_gm_component.gm_num_btls = 0;
mca_btl_gm_modex_send(); mca_btl_gm_modex_send();
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return NULL; return NULL;
} }
if (mca_btl_gm_component.gm_num_btls == 0) { if (mca_btl_gm_component.gm_num_btls == 0) {
mca_btl_base_error_no_nics("Myrinet/GM", "NIC"); mca_btl_base_error_no_nics("Myrinet/GM", "NIC");
mca_btl_gm_component.gm_num_btls = 0; mca_btl_gm_component.gm_num_btls = 0;
mca_btl_gm_modex_send(); mca_btl_gm_modex_send();
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return NULL; return NULL;
} }
/* publish GM parameters with the MCA framework */ /* publish GM parameters with the MCA framework */
if (OMPI_SUCCESS != mca_btl_gm_modex_send()) { if (OMPI_SUCCESS != mca_btl_gm_modex_send()) {
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return NULL; return NULL;
} }
@ -484,12 +508,14 @@ mca_btl_gm_component_init (int *num_btl_modules,
btls = (mca_btl_base_module_t**) malloc ( btls = (mca_btl_base_module_t**) malloc (
mca_btl_gm_component.gm_num_btls * sizeof(mca_btl_base_module_t *)); mca_btl_gm_component.gm_num_btls * sizeof(mca_btl_base_module_t *));
if (NULL == btls) { if (NULL == btls) {
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return NULL; return NULL;
} }
memcpy(btls, mca_btl_gm_component.gm_btls, memcpy(btls, mca_btl_gm_component.gm_btls,
mca_btl_gm_component.gm_num_btls * sizeof(mca_btl_gm_module_t *)); mca_btl_gm_component.gm_num_btls * sizeof(mca_btl_gm_module_t *));
*num_btl_modules = mca_btl_gm_component.gm_num_btls; *num_btl_modules = mca_btl_gm_component.gm_num_btls;
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return btls; return btls;
} }
@ -511,12 +537,10 @@ int mca_btl_gm_component_progress()
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
for( i = 0; i < mca_btl_gm_component.gm_num_btls; ) { for( i = 0; i < mca_btl_gm_component.gm_num_btls; ) {
mca_btl_gm_module_t* btl = mca_btl_gm_component.gm_btls[i]; mca_btl_gm_module_t* btl = mca_btl_gm_component.gm_btls[i];
gm_recv_event_t* event = gm_receive(btl->port); gm_recv_event_t* event = gm_receive(btl->port);
unsigned char* buffer = (unsigned char*)gm_ntohp(event->recv.buffer);
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)(buffer - sizeof(mca_btl_gm_frag_t));
mca_btl_base_header_t* hdr;
/* If there are no receive events just skip the function call */ /* If there are no receive events just skip the function call */
switch(gm_ntohc(event->recv.type)) { switch(gm_ntohc(event->recv.type)) {
@ -525,12 +549,18 @@ int mca_btl_gm_component_progress()
case GM_FAST_HIGH_RECV_EVENT: case GM_FAST_HIGH_RECV_EVENT:
case GM_FAST_HIGH_PEER_RECV_EVENT: case GM_FAST_HIGH_PEER_RECV_EVENT:
{ {
unsigned char* buffer = (unsigned char*)gm_ntohp(event->recv.buffer);
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)(buffer - sizeof(mca_btl_gm_frag_t));
mca_btl_base_header_t* hdr = (mca_btl_base_header_t *)gm_ntohp(event->recv.message);
mca_btl_base_recv_reg_t* reg; mca_btl_base_recv_reg_t* reg;
hdr = (mca_btl_base_header_t *)gm_ntohp(event->recv.message);
frag->segment.seg_addr.pval = (hdr+1); frag->segment.seg_addr.pval = (hdr+1);
frag->segment.seg_len = gm_ntohl(event->recv.length) - sizeof(mca_btl_base_header_t); frag->segment.seg_len = gm_ntohl(event->recv.length) - sizeof(mca_btl_base_header_t);
reg = &btl->gm_reg[hdr->tag]; reg = &btl->gm_reg[hdr->tag];
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata); reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_FRAG_POST(btl,frag); MCA_BTL_GM_FRAG_POST(btl,frag);
count++; count++;
break; break;
@ -540,12 +570,18 @@ int mca_btl_gm_component_progress()
case GM_HIGH_RECV_EVENT: case GM_HIGH_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT: case GM_HIGH_PEER_RECV_EVENT:
{ {
unsigned char* buffer = (unsigned char*)gm_ntohp(event->recv.buffer);
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)(buffer - sizeof(mca_btl_gm_frag_t));
mca_btl_base_header_t* hdr = (mca_btl_base_header_t*)buffer;
mca_btl_base_recv_reg_t* reg; mca_btl_base_recv_reg_t* reg;
hdr = (mca_btl_base_header_t*)buffer;
frag->segment.seg_addr.pval = (hdr+1); frag->segment.seg_addr.pval = (hdr+1);
frag->segment.seg_len = gm_ntohl(event->recv.length) - sizeof(mca_btl_base_header_t); frag->segment.seg_len = gm_ntohl(event->recv.length) - sizeof(mca_btl_base_header_t);
reg = &btl->gm_reg[hdr->tag]; reg = &btl->gm_reg[hdr->tag];
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata); reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_FRAG_POST(btl,frag); MCA_BTL_GM_FRAG_POST(btl,frag);
count++; count++;
break; break;
@ -558,7 +594,90 @@ int mca_btl_gm_component_progress()
break; break;
} }
} }
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
OPAL_THREAD_ADD32(&inprogress, -1); OPAL_THREAD_ADD32(&inprogress, -1);
return count; return count;
} }
#if OMPI_ENABLE_PROGRESS_THREADS
static void* mca_btl_gm_progress_thread( opal_object_t* arg )
{
opal_thread_t* thread = (opal_thread_t*)arg;
mca_btl_gm_module_t* btl = thread->t_arg;
/* This thread enter in a cancel enabled state */
pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
while(1) {
gm_recv_event_t* event;
/* dont process events while the app is in the library */
while(opal_progress_threads()) {
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
while(opal_progress_threads())
sched_yield();
usleep(100); /* give app a chance to re-enter library */
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
}
/* otherwise processes any pending events */
event = gm_blocking_receive_no_spin(btl->port);
switch(gm_ntohc(event->recv.type)) {
case GM_FAST_RECV_EVENT:
case GM_FAST_PEER_RECV_EVENT:
case GM_FAST_HIGH_RECV_EVENT:
case GM_FAST_HIGH_PEER_RECV_EVENT:
{
unsigned char* buffer = (unsigned char*)gm_ntohp(event->recv.buffer);
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)(buffer - sizeof(mca_btl_gm_frag_t));
mca_btl_base_header_t* hdr = (mca_btl_base_header_t *)gm_ntohp(event->recv.message);
mca_btl_base_recv_reg_t* reg;
frag->segment.seg_addr.pval = (hdr+1);
frag->segment.seg_len = gm_ntohl(event->recv.length) - sizeof(mca_btl_base_header_t);
reg = &btl->gm_reg[hdr->tag];
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_FRAG_POST(btl,frag);
break;
}
case GM_RECV_EVENT:
case GM_PEER_RECV_EVENT:
case GM_HIGH_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT:
{
unsigned char* buffer = (unsigned char*)gm_ntohp(event->recv.buffer);
mca_btl_gm_frag_t* frag = (mca_btl_gm_frag_t*)(buffer - sizeof(mca_btl_gm_frag_t));
mca_btl_base_header_t* hdr = (mca_btl_base_header_t*)buffer;
mca_btl_base_recv_reg_t* reg;
frag->segment.seg_addr.pval = (hdr+1);
frag->segment.seg_len = gm_ntohl(event->recv.length) - sizeof(mca_btl_base_header_t);
reg = &btl->gm_reg[hdr->tag];
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_FRAG_POST(btl,frag);
break;
}
case _GM_SLEEP_EVENT:
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
gm_unknown(btl->port, event);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
break;
default:
gm_unknown(btl->port, event);
break;
}
}
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
return PTHREAD_CANCELED;
}
#endif

Просмотреть файл

@ -116,18 +116,16 @@ OBJ_CLASS_DECLARATION(mca_btl_gm_frag_user_t);
} }
/* called with mca_btl_gm_component.gm_lock held */
#define MCA_BTL_GM_FRAG_POST(btl,frag) \ #define MCA_BTL_GM_FRAG_POST(btl,frag) \
do { \ do { \
if(opal_list_get_size(&btl->gm_repost) < (size_t)btl->gm_num_repost) { \ if(opal_list_get_size(&btl->gm_repost) < (size_t)btl->gm_num_repost) { \
OPAL_THREAD_LOCK(&btl->gm_lock); \
opal_list_append(&btl->gm_repost, (opal_list_item_t*)frag); \ opal_list_append(&btl->gm_repost, (opal_list_item_t*)frag); \
OPAL_THREAD_UNLOCK(&btl->gm_lock); \
} else { \ } else { \
OPAL_THREAD_LOCK(&btl->gm_lock); \
do { \ do { \
gm_provide_receive_buffer(btl->port, frag->hdr, frag->size, frag->priority); \ gm_provide_receive_buffer(btl->port, frag->hdr, frag->size, frag->priority); \
} while (NULL != (frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&btl->gm_repost))); \ } while (NULL != (frag = (mca_btl_gm_frag_t*)opal_list_remove_first(&btl->gm_repost))); \
OPAL_THREAD_UNLOCK(&btl->gm_lock); \
} \ } \
} while(0) } while(0)

Просмотреть файл

@ -101,8 +101,8 @@ static mca_btl_gm_proc_t* mca_btl_gm_proc_lookup_ompi(ompi_proc_t* ompi_proc)
mca_btl_gm_proc_t* mca_btl_gm_proc_create(ompi_proc_t* ompi_proc) mca_btl_gm_proc_t* mca_btl_gm_proc_create(ompi_proc_t* ompi_proc)
{ {
mca_btl_gm_proc_t* gm_proc = NULL; mca_btl_gm_proc_t* gm_proc = NULL;
size_t size; size_t i, size;
int rc, i; int rc;
/* Check if we have already created a GM proc /* Check if we have already created a GM proc
* structure for this ompi process */ * structure for this ompi process */

Просмотреть файл

@ -88,6 +88,11 @@ int mca_pml_ob1_recv(void *addr,
MCA_PML_OB1_RECV_REQUEST_START(recvreq); MCA_PML_OB1_RECV_REQUEST_START(recvreq);
if (recvreq->req_recv.req_base.req_ompi.req_complete == false) { if (recvreq->req_recv.req_base.req_ompi.req_complete == false) {
#if OMPI_ENABLE_PROGRESS_THREADS
if(opal_progress_spin(&recvreq->req_recv.req_base.req_ompi.req_complete)) {
goto finished;
}
#endif
/* give up and sleep until completion */ /* give up and sleep until completion */
if (opal_using_threads()) { if (opal_using_threads()) {
opal_mutex_lock(&ompi_request_lock); opal_mutex_lock(&ompi_request_lock);
@ -104,6 +109,10 @@ int mca_pml_ob1_recv(void *addr,
} }
} }
#if OMPI_ENABLE_PROGRESS_THREADS
finished:
#endif
if (NULL != status) { /* return status */ if (NULL != status) { /* return status */
*status = recvreq->req_recv.req_base.req_ompi.req_status; *status = recvreq->req_recv.req_base.req_ompi.req_status;
} }

Просмотреть файл

@ -108,6 +108,15 @@ int mca_pml_ob1_send(void *buf,
} }
if (sendreq->req_send.req_base.req_ompi.req_complete == false) { if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
#if OMPI_ENABLE_PROGRESS_THREADS
if(opal_progress_spin(&sendreq->req_send.req_base.req_ompi.req_complete)) {
opal_mutex_lock(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_FREE( sendreq );
opal_mutex_unlock(&ompi_request_lock);
return OMPI_SUCCESS;
}
#endif
/* give up and sleep until completion */ /* give up and sleep until completion */
if (opal_using_threads()) { if (opal_using_threads()) {
opal_mutex_lock(&ompi_request_lock); opal_mutex_lock(&ompi_request_lock);
@ -115,17 +124,16 @@ int mca_pml_ob1_send(void *buf,
while (sendreq->req_send.req_base.req_ompi.req_complete == false) while (sendreq->req_send.req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock); opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--; ompi_request_waiting--;
MCA_PML_OB1_SEND_REQUEST_FREE( sendreq );
opal_mutex_unlock(&ompi_request_lock); opal_mutex_unlock(&ompi_request_lock);
} else { } else {
ompi_request_waiting++; ompi_request_waiting++;
while (sendreq->req_send.req_base.req_ompi.req_complete == false) while (sendreq->req_send.req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock); opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--; ompi_request_waiting--;
}
}
/* return request to pool */
MCA_PML_OB1_SEND_REQUEST_FREE( sendreq ); MCA_PML_OB1_SEND_REQUEST_FREE( sendreq );
}
}
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -39,6 +39,7 @@
static int mca_pml_ob1_send_request_fini(struct ompi_request_t** request) static int mca_pml_ob1_send_request_fini(struct ompi_request_t** request)
{ {
mca_pml_ob1_send_request_t* sendreq = *(mca_pml_ob1_send_request_t**)(request); mca_pml_ob1_send_request_t* sendreq = *(mca_pml_ob1_send_request_t**)(request);
OPAL_THREAD_LOCK(&ompi_request_lock);
if(sendreq->req_send.req_base.req_persistent) { if(sendreq->req_send.req_base.req_persistent) {
if(sendreq->req_send.req_base.req_free_called) { if(sendreq->req_send.req_base.req_free_called) {
MCA_PML_OB1_SEND_REQUEST_FREE(sendreq); MCA_PML_OB1_SEND_REQUEST_FREE(sendreq);
@ -60,12 +61,15 @@ static int mca_pml_ob1_send_request_fini(struct ompi_request_t** request)
MCA_PML_OB1_SEND_REQUEST_FREE(sendreq); MCA_PML_OB1_SEND_REQUEST_FREE(sendreq);
*request = MPI_REQUEST_NULL; *request = MPI_REQUEST_NULL;
} }
OPAL_THREAD_UNLOCK(&ompi_request_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
static int mca_pml_ob1_send_request_free(struct ompi_request_t** request) static int mca_pml_ob1_send_request_free(struct ompi_request_t** request)
{ {
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_FREE( *(mca_pml_ob1_send_request_t**)request ); MCA_PML_OB1_SEND_REQUEST_FREE( *(mca_pml_ob1_send_request_t**)request );
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL; *request = MPI_REQUEST_NULL;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -20,7 +20,43 @@
#include "ompi/constants.h" #include "ompi/constants.h"
#include "ompi/request/request.h" #include "ompi/request/request.h"
int ompi_request_poll_iterations = 20000;
int ompi_request_wait(
ompi_request_t ** req_ptr,
ompi_status_public_t * status)
{
ompi_request_t *req = *req_ptr;
if(req->req_complete == false) {
#if OMPI_ENABLE_PROGRESS_THREADS
/* poll for completion */
if(opal_progress_spin(&req->req_complete))
goto finished;
#endif
/* give up and sleep until completion */
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request_waiting++;
while (req->req_complete == false) {
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
}
ompi_request_waiting--;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
#if OMPI_ENABLE_PROGRESS_THREADS
finished:
#endif
/* return status */
if (MPI_STATUS_IGNORE != status) {
*status = req->req_status;
}
/* return request to pool */
return req->req_fini(req_ptr);
}
int ompi_request_wait_any( int ompi_request_wait_any(
size_t count, size_t count,
@ -39,8 +75,8 @@ int ompi_request_wait_any(
#if OMPI_ENABLE_PROGRESS_THREADS #if OMPI_ENABLE_PROGRESS_THREADS
/* poll for completion */ /* poll for completion */
opal_atomic_mb(); OPAL_THREAD_ADD32(&opal_progress_thread_count,1);
for (c = 0; completed < 0 && c < ompi_request_poll_iterations; c++) { for (c = 0; completed < 0 && c < opal_progress_spin_count; c++) {
rptr = requests; rptr = requests;
num_requests_null_inactive = 0; num_requests_null_inactive = 0;
for (i = 0; i < count; i++, rptr++) { for (i = 0; i < count; i++, rptr++) {
@ -58,9 +94,13 @@ int ompi_request_wait_any(
goto finished; goto finished;
} }
} }
if( num_requests_null_inactive == count ) if( num_requests_null_inactive == count ) {
OPAL_THREAD_ADD32(&opal_progress_thread_count,-1);
goto finished; goto finished;
} }
opal_progress();
}
OPAL_THREAD_ADD32(&opal_progress_thread_count,-1);
#endif #endif
/* give up and sleep until completion */ /* give up and sleep until completion */

Просмотреть файл

@ -140,6 +140,7 @@ typedef struct ompi_request_t ompi_request_t;
OMPI_DECLSPEC extern ompi_pointer_array_t ompi_request_f_to_c_table; OMPI_DECLSPEC extern ompi_pointer_array_t ompi_request_f_to_c_table;
OMPI_DECLSPEC extern size_t ompi_request_waiting; OMPI_DECLSPEC extern size_t ompi_request_waiting;
OMPI_DECLSPEC extern size_t ompi_request_completed; OMPI_DECLSPEC extern size_t ompi_request_completed;
OMPI_DECLSPEC extern int32_t ompi_request_poll;
OMPI_DECLSPEC extern opal_mutex_t ompi_request_lock; OMPI_DECLSPEC extern opal_mutex_t ompi_request_lock;
OMPI_DECLSPEC extern opal_condition_t ompi_request_cond; OMPI_DECLSPEC extern opal_condition_t ompi_request_cond;
OMPI_DECLSPEC extern ompi_request_t ompi_request_null; OMPI_DECLSPEC extern ompi_request_t ompi_request_null;
@ -286,31 +287,9 @@ OMPI_DECLSPEC int ompi_request_test_all(
* *
*/ */
static inline int ompi_request_wait( int ompi_request_wait(
ompi_request_t ** req_ptr, ompi_request_t ** req_ptr,
ompi_status_public_t * status) ompi_status_public_t * status);
{
ompi_request_t *req = *req_ptr;
if(req->req_complete == false) {
/* give up and sleep until completion */
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request_waiting++;
while (req->req_complete == false) {
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
}
ompi_request_waiting--;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/* return status */
if (MPI_STATUS_IGNORE != status) {
*status = req->req_status;
}
/* return request to pool */
return req->req_fini(req_ptr);
}
/** /**
* Wait (blocking-mode) for one of N requests to complete. * Wait (blocking-mode) for one of N requests to complete.

Просмотреть файл

@ -40,6 +40,11 @@ static const opal_timer_t opal_progress_default_tick_rate = 10000; /* 10ms */
static const int opal_progress_default_tick_rate = 10000; /* 10k calls to opal_progress */ static const int opal_progress_default_tick_rate = 10000; /* 10k calls to opal_progress */
#endif #endif
volatile int32_t opal_progress_thread_count = 0;
int opal_progress_spin_count = 10000;
/* /*
* Local variables * Local variables
*/ */

Просмотреть файл

@ -27,6 +27,7 @@
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
extern "C" { extern "C" {
#endif #endif
#include "opal/threads/mutex.h"
/** /**
* Initialize the progress engine * Initialize the progress engine
@ -89,7 +90,6 @@ OMPI_DECLSPEC extern void opal_progress(void);
typedef int (*opal_progress_callback_t)(void); typedef int (*opal_progress_callback_t)(void);
/** /**
* Register an event to be progressed * Register an event to be progressed
*/ */
@ -112,6 +112,36 @@ OMPI_DECLSPEC int opal_progress_event_increment(void);
*/ */
OMPI_DECLSPEC int opal_progress_event_decrement(void); OMPI_DECLSPEC int opal_progress_event_decrement(void);
/**
* Progress until flag is true or poll iterations completed
*/
extern volatile int32_t opal_progress_thread_count;
extern int opal_progress_spin_count;
static inline bool opal_progress_threads(void)
{
return (opal_progress_thread_count > 0);
}
static inline bool opal_progress_spin(volatile bool* complete)
{
int32_t c;
OPAL_THREAD_ADD32(&opal_progress_thread_count,1);
for (c = 0; c < opal_progress_spin_count; c++) {
if (true == *complete) {
OPAL_THREAD_ADD32(&opal_progress_thread_count,-1);
return true;
}
opal_progress();
}
OPAL_THREAD_ADD32(&opal_progress_thread_count,-1);
}
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }
#endif #endif