1
1

Try to improve flow control in the sm BTL:

- poll FIFO occasionally even if just sending messages
- retry pending sends more often
  - just before trying a new send
  - as part of mca_btl_sm_component_progress
Maintain two new mca_btl_sm_component variables, num_outstanding_frags
and num_pending_sends, to keep overhead low.


Drain only one message fragment from the FIFO per btl_sm_component_progress
call (rather than drain until empty, which in retrospect everyone considers
to have been a mistake).

This commit was SVN r21551.
Этот коммит содержится в:
Eugene Loh 2009-06-27 00:12:56 +00:00
родитель d0a5468deb
Коммит bd995d26b4
5 изменённых файлов: 86 добавлений и 39 удалений

Просмотреть файл

@ -330,6 +330,9 @@ static int sm_btl_first_time_init(mca_btl_sm_t *sm_btl, int n)
if ( OMPI_SUCCESS != i )
return i;
mca_btl_sm_component.num_outstanding_frags = 0;
mca_btl_sm_component.num_pending_sends = 0;
i = opal_free_list_init(&mca_btl_sm_component.pending_send_fl,
sizeof(btl_sm_pending_send_item_t),
OBJ_CLASS(opal_free_list_item_t),
@ -728,6 +731,10 @@ int mca_btl_sm_sendi( struct mca_btl_base_module_t* btl,
mca_btl_sm_frag_t* frag;
int rc;
if ( mca_btl_sm_component.num_outstanding_frags * 2 > mca_btl_sm_component.fifo_size ) {
mca_btl_sm_component_progress();
}
/* this check should be unnecessary... turn into an assertion? */
if( length < mca_btl_sm_component.eager_limit ) {
@ -776,8 +783,9 @@ int mca_btl_sm_sendi( struct mca_btl_base_module_t* btl,
* the return code indicates failure, the write has still "completed" from
* our point of view: it has been posted to a "pending send" queue.
*/
OPAL_THREAD_ADD32(&mca_btl_sm_component.num_outstanding_frags, +1);
MCA_BTL_SM_FIFO_WRITE(endpoint, endpoint->my_smp_rank,
endpoint->peer_smp_rank, (void *) VIRTUAL2RELATIVE(frag->hdr), false, rc);
endpoint->peer_smp_rank, (void *) VIRTUAL2RELATIVE(frag->hdr), false, true, rc);
return OMPI_SUCCESS;
}
@ -801,6 +809,10 @@ int mca_btl_sm_send( struct mca_btl_base_module_t* btl,
mca_btl_sm_frag_t* frag = (mca_btl_sm_frag_t*)descriptor;
int rc;
if ( mca_btl_sm_component.num_outstanding_frags * 2 > mca_btl_sm_component.fifo_size ) {
mca_btl_sm_component_progress();
}
/* available header space */
frag->hdr->len = frag->segment.seg_len;
/* type of message, pt-2-pt, one-sided, etc */
@ -814,8 +826,9 @@ int mca_btl_sm_send( struct mca_btl_base_module_t* btl,
* post the descriptor in the queue - post with the relative
* address
*/
OPAL_THREAD_ADD32(&mca_btl_sm_component.num_outstanding_frags, +1);
MCA_BTL_SM_FIFO_WRITE(endpoint, endpoint->my_smp_rank,
endpoint->peer_smp_rank, (void *) VIRTUAL2RELATIVE(frag->hdr), false, rc);
endpoint->peer_smp_rank, (void *) VIRTUAL2RELATIVE(frag->hdr), false, true, rc);
if( OPAL_LIKELY(0 == rc) ) {
return 1; /* the data is completely gone */
}

Просмотреть файл

@ -157,6 +157,8 @@ struct mca_btl_sm_component_t {
struct mca_btl_base_endpoint_t **sm_peers;
opal_free_list_t pending_send_fl;
int num_outstanding_frags; /**< number of fragments sent but not yet returned to free list */
int num_pending_sends; /**< total number on all of my pending-send queues */
int mem_node;
int num_mem_nodes;

Просмотреть файл

@ -331,42 +331,26 @@ void mca_btl_sm_component_event_thread(opal_object_t* thread)
}
#endif
void
btl_sm_add_pending(struct mca_btl_base_endpoint_t *ep, void *data, bool resend)
{
int rc;
btl_sm_pending_send_item_t *si;
opal_free_list_item_t *i;
OPAL_FREE_LIST_GET(&mca_btl_sm_component.pending_send_fl, i, rc);
/* don't handle error for now */
assert(i != NULL && rc == OMPI_SUCCESS);
si = (btl_sm_pending_send_item_t*)i;
si->data = data;
/* if data was on pending send list then prepend it to the list to
* minimize reordering */
if(resend)
opal_list_prepend(&ep->pending_sends, (opal_list_item_t*)si);
else
opal_list_append(&ep->pending_sends, (opal_list_item_t*)si);
}
static int process_pending_send(struct mca_btl_base_endpoint_t *ep)
void btl_sm_process_pending_sends(struct mca_btl_base_endpoint_t *ep)
{
btl_sm_pending_send_item_t *si;
int rc;
si = (btl_sm_pending_send_item_t*)opal_list_remove_first(&ep->pending_sends);
if(NULL == si) return OMPI_ERROR;
while ( 0 < opal_list_get_size(&ep->pending_sends) ) {
si = (btl_sm_pending_send_item_t*)opal_list_remove_first(&ep->pending_sends);
if(NULL == si) return; /* ??? WHAT DOES THIS CONDITION MEAN? */
OPAL_FREE_LIST_RETURN(&mca_btl_sm_component.pending_send_fl, (opal_list_item_t*)si);
OPAL_FREE_LIST_RETURN(&mca_btl_sm_component.pending_send_fl, (opal_list_item_t*)si);
MCA_BTL_SM_FIFO_WRITE(ep, ep->my_smp_rank, ep->peer_smp_rank, si->data,
true, rc);
OPAL_THREAD_ADD32(&mca_btl_sm_component.num_pending_sends, -1);
return rc;
MCA_BTL_SM_FIFO_WRITE(ep, ep->my_smp_rank, ep->peer_smp_rank, si->data,
true, false, rc);
if ( OMPI_SUCCESS != rc )
return;
}
}
int mca_btl_sm_component_progress(void)
@ -379,6 +363,22 @@ int mca_btl_sm_component_progress(void)
int my_smp_rank = mca_btl_sm_component.my_smp_rank;
int peer_smp_rank, j, rc = 0;
/* first, deal with any pending sends */
/* This check should be fast since we only need to check one variable. */
if ( 0 < mca_btl_sm_component.num_pending_sends ) {
/* perform a loop to find the endpoints that have pending sends */
/* This can take a while longer if there are many endpoints to check. */
for ( peer_smp_rank = 0; peer_smp_rank < mca_btl_sm_component.num_smp_procs; peer_smp_rank++) {
struct mca_btl_base_endpoint_t* endpoint;
if ( peer_smp_rank == my_smp_rank )
continue;
endpoint = mca_btl_sm_component.sm_peers[peer_smp_rank];
if ( 0 < opal_list_get_size(&endpoint->pending_sends) )
btl_sm_process_pending_sends(endpoint);
}
}
/* poll each fifo */
for(j = 0; j < FIFO_MAP_NUM(mca_btl_sm_component.num_smp_procs); j++) {
fifo = &(mca_btl_sm_component.fifo[my_smp_rank][j]);
@ -428,7 +428,11 @@ int mca_btl_sm_component_progress(void)
/* return the fragment */
MCA_BTL_SM_FIFO_WRITE(
mca_btl_sm_component.sm_peers[peer_smp_rank],
<<<<<<< .mine
my_smp_rank, peer_smp_rank, hdr->frag, false, true, rc);
=======
my_smp_rank, peer_smp_rank, hdr->frag, false, rc);
>>>>>>> .r21550
break;
}
case MCA_BTL_SM_FRAG_ACK:
@ -451,9 +455,9 @@ int mca_btl_sm_component_progress(void)
if( btl_ownership ) {
MCA_BTL_SM_FRAG_RETURN(frag);
}
if(opal_list_get_size(&endpoint->pending_sends)) {
if( OMPI_ERR_RESOURCE_BUSY == process_pending_send(endpoint) )
break;
OPAL_THREAD_ADD32(&mca_btl_sm_component.num_outstanding_frags, -1);
if ( 0 < opal_list_get_size(&endpoint->pending_sends) ) {
btl_sm_process_pending_sends(endpoint);
}
goto recheck_peer;
}
@ -476,7 +480,7 @@ int mca_btl_sm_component_progress(void)
MCA_BTL_SM_FRAG_STATUS_MASK);
MCA_BTL_SM_FIFO_WRITE(
mca_btl_sm_component.sm_peers[peer_smp_rank],
my_smp_rank, peer_smp_rank, hdr, false, rc);
my_smp_rank, peer_smp_rank, hdr, false, true, rc);
break;
}
}

Просмотреть файл

@ -43,6 +43,5 @@ struct mca_btl_base_endpoint_t {
opal_list_t pending_sends; /**< pending data to send */
};
void
btl_sm_add_pending(struct mca_btl_base_endpoint_t *ep, void *data, bool resend);
void btl_sm_process_pending_sends(struct mca_btl_base_endpoint_t *ep);
#endif

Просмотреть файл

@ -4,6 +4,29 @@
#include "btl_sm.h"
#include "btl_sm_endpoint.h"
static void
add_pending(struct mca_btl_base_endpoint_t *ep, void *data, bool resend)
{
int rc;
btl_sm_pending_send_item_t *si;
opal_free_list_item_t *i;
OPAL_FREE_LIST_GET(&mca_btl_sm_component.pending_send_fl, i, rc);
/* don't handle error for now */
assert(i != NULL && rc == OMPI_SUCCESS);
si = (btl_sm_pending_send_item_t*)i;
si->data = data;
OPAL_THREAD_ADD32(&mca_btl_sm_component.num_pending_sends, +1);
/* if data was on pending send list then prepend it to the list to
* minimize reordering */
if(resend)
opal_list_prepend(&ep->pending_sends, (opal_list_item_t*)si);
else
opal_list_append(&ep->pending_sends, (opal_list_item_t*)si);
}
/*
* FIFO_MAP(x) defines which FIFO on the receiver should be used
@ -37,14 +60,20 @@
#define MCA_BTL_SM_FIFO_WRITE(endpoint_peer, my_smp_rank, \
peer_smp_rank, hdr, resend, rc) \
peer_smp_rank, hdr, resend, retry_pending_sends, rc) \
do { \
sm_fifo_t* fifo = &(mca_btl_sm_component.fifo[peer_smp_rank][FIFO_MAP(my_smp_rank)]); \
\
if ( retry_pending_sends ) { \
if ( 0 < opal_list_get_size(&endpoint_peer->pending_sends) ) { \
btl_sm_process_pending_sends(endpoint_peer); \
} \
} \
\
opal_atomic_lock(&(fifo->head_lock)); \
/* post fragment */ \
if(sm_fifo_write(hdr, fifo) != OMPI_SUCCESS) { \
btl_sm_add_pending(endpoint_peer, hdr, resend); \
add_pending(endpoint_peer, hdr, resend); \
rc = OMPI_ERR_RESOURCE_BUSY; \
} else { \
MCA_BTL_SM_SIGNAL_PEER(endpoint_peer); \