1
1
Use the correct indentation.
Now we can force the progress function to grab as many events as possible
(in order to avoid starvation for the send queue).
Add more elems in the unexpected queue (internal buffers use to temporary
store the data for the unexpected messages).
Decrease the number of variables in some functions (cleanup).
Avoid using goto ...

This commit was SVN r5949.
Этот коммит содержится в:
George Bosilca 2005-06-06 18:42:24 +00:00
родитель 462fe884c8
Коммит c7acb3bc5f
4 изменённых файлов: 81 добавлений и 77 удалений

Просмотреть файл

@ -35,29 +35,29 @@
mca_ptl_gm_module_t mca_ptl_gm_module = {
{
&mca_ptl_gm_component.super,
1, /* max size of request cache */
sizeof(mca_ptl_gm_send_frag_t), /* bytes required by ptl for a request */
0, /* max size of first fragment */
0, /* min fragment size */
0, /* max fragment size */
0, /* exclusivity */
50, /* latency */
0, /* bandwidth */
MCA_PTL_PUT, /* ptl flags */
/* collection of interfaces */
mca_ptl_gm_add_procs,
mca_ptl_gm_del_procs,
mca_ptl_gm_finalize,
mca_ptl_gm_peer_send,
mca_ptl_gm_put,
mca_ptl_gm_get,
mca_ptl_gm_matched,
mca_ptl_gm_request_init,
mca_ptl_gm_request_fini,
NULL,
NULL,
NULL
&mca_ptl_gm_component.super,
1, /* max size of request cache */
sizeof(mca_ptl_gm_send_frag_t), /* bytes required by ptl for a request */
0, /* max size of first fragment */
0, /* min fragment size */
0, /* max fragment size */
0, /* exclusivity */
50, /* latency */
0, /* bandwidth */
MCA_PTL_PUT, /* ptl flags */
/* collection of interfaces */
mca_ptl_gm_add_procs,
mca_ptl_gm_del_procs,
mca_ptl_gm_finalize,
mca_ptl_gm_peer_send,
mca_ptl_gm_put,
mca_ptl_gm_get,
mca_ptl_gm_matched,
mca_ptl_gm_request_init,
mca_ptl_gm_request_fini,
NULL,
NULL,
NULL
}
};

Просмотреть файл

@ -241,14 +241,15 @@ mca_ptl_gm_thread_progress( ompi_thread_t* thread )
pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
while(1) {
event = gm_blocking_receive(ptl->gm_port);
if( GM_NO_RECV_EVENT != gm_ntohc(event->recv.type) )
mca_ptl_gm_analyze_recv_event( ptl, event );
event = gm_blocking_receive(ptl->gm_port);
if( GM_NO_RECV_EVENT != gm_ntohc(event->recv.type) )
mca_ptl_gm_analyze_recv_event( ptl, event );
}
return PTHREAD_CANCELED;
}
#endif /* OMPI_HAVE_POSIX_THREADS */
/* Scan all ports on the boards. As it's difficult to find the total number of boards
* we use a predefined maximum.
* Return the number of discovered boards where opening a port was a succesfull operation.
@ -500,9 +501,9 @@ mca_ptl_gm_init( mca_ptl_gm_component_t * gm )
ompi_free_list_init( &(mca_ptl_gm_component.gm_unexpected_frags_data),
mca_ptl_gm_component.gm_segment_size,
OBJ_CLASS (ompi_list_item_t),
1, /* keep is small in the begining */
32, /* maximum number of list allocated elements will be zero */
1, /* Number of elements to grow by per allocation */
16, /* keep is small in the begining */
128, /* maximum number of list elements */
16, /* Number of elements to grow by per allocation */
NULL ); /* not using mpool */
#if OMPI_MCA_PTL_GM_CACHE_ENABLE
mca_ptl_gm_regcache_init();
@ -587,9 +588,10 @@ mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp)
event = gm_receive(ptl->gm_port);
/* If there are no receive events just skip the function call */
if( GM_NO_RECV_EVENT != gm_ntohc(event->recv.type) ) {
mca_ptl_gm_analyze_recv_event( ptl, event );
/* we try to empty the GM event queue */
/*continue;*/
if( 1 == mca_ptl_gm_analyze_recv_event( ptl, event ) ) {
/* we try to empty the GM event queue */
continue;
}
}
i++;
}

Просмотреть файл

@ -350,14 +350,15 @@ int mca_ptl_gm_send_burst_data( mca_ptl_gm_peer_t *ptl_peer,
hdr->hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_src_ptr.pval = fragment;
hdr->hdr_dst_ptr = fragment->frag_send.frag_request->req_peer_match;
assert( hdr->hdr_dst_ptr.pval != NULL );
hdr->hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed;
hdr->hdr_frag_length = iov.iov_len;
hdr->hdr_frag_length = max_data;
fragment->frag_bytes_processed += max_data;
fragment->frag_bytes_validated += max_data;
burst_length -= iov.iov_len;
if( fragment->frag_send.frag_base.frag_size == fragment->frag_bytes_processed ) {
assert( burst_length == 0 );
burst_length -= max_data;
if( 0 == burst_length ) {
assert( fragment->frag_send.frag_base.frag_size == fragment->frag_bytes_processed );
hdr->hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
}
/* for the last piece set the header type to FIN */
@ -509,13 +510,11 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
size_t size,
int flags )
{
struct iovec iov;
size_t size_in, size_out;
const int header_length = sizeof(mca_ptl_base_rendezvous_header_t);
mca_ptl_base_header_t* hdr;
ompi_convertor_t *convertor = NULL;
int rc, freeAfter;
unsigned int in_size, max_data = 0;
unsigned int max_data = 0;
mca_ptl_gm_peer_t* ptl_peer = (mca_ptl_gm_peer_t*)ptl_base_peer;
ompi_list_item_t *item;
char* sendbuf;
@ -525,43 +524,43 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
sendbuf = (char*)item;
hdr = (mca_ptl_base_header_t*)item;
size_in = size;
/* Populate the header with the match informations */
(void)mca_ptl_gm_init_header_rndv( hdr, sendreq, flags );
hdr->hdr_rndv.hdr_frag_length = (uint64_t)((long)ptl);
if( size_in > 0 ) {
convertor = &sendreq->req_send.req_convertor;
if( size > 0 ) {
struct iovec iov;
unsigned int iov_count = 1;
if( (size_in + header_length) <= mca_ptl_gm_component.gm_segment_size )
iov.iov_len = size_in;
convertor = &sendreq->req_send.req_convertor;
/* personalize the convertor */
ompi_convertor_init_for_send( convertor, 0, sendreq->req_send.req_base.req_datatype,
sendreq->req_send.req_base.req_count,
sendreq->req_send.req_base.req_addr,
0, NULL );
if( (size + header_length) <= mca_ptl_gm_component.gm_segment_size )
iov.iov_len = size;
else
iov.iov_len = mca_ptl_gm_component.gm_segment_size - header_length;
/* copy the data to the registered buffer */
iov.iov_base = ((char*)hdr) + header_length;
max_data = iov.iov_len;
in_size = 1;
if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0)
if((rc = ompi_convertor_pack(convertor, &(iov), &iov_count, &max_data, &freeAfter)) < 0)
return OMPI_ERROR;
assert( max_data != 0 );
/* must update the offset after actual fragment size is determined
* before attempting to send the fragment
*/
mca_ptl_base_send_request_offset( sendreq, max_data );
} else {
iov.iov_len = 0; /* no data will be transmitted */
}
/* adjust size and request offset to reflect actual number of bytes
* packed by convertor
*/
size_out = iov.iov_len + header_length;
/* Send the first fragment */
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr,
GM_SIZE, size_out, GM_LOW_PRIORITY,
GM_SIZE, max_data + header_length, GM_LOW_PRIORITY,
ptl_peer->peer_addr.local_id, ptl_peer->peer_addr.port_id,
send_match_callback, (void *)hdr );
@ -569,18 +568,18 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
sendreq, max_data );
DO_DEBUG( ompi_output( 0, "sender %d complete request %p w/o rndv with %d bytes",
orte_process_info.my_name->vpid, sendreq, iov.iov_len ); );
orte_process_info.my_name->vpid, sendreq, max_data ); );
} else {
DO_DEBUG( ompi_output( 0, "sender %d sent request %p for rndv with %d bytes",
orte_process_info.my_name->vpid, sendreq, iov.iov_len ); );
orte_process_info.my_name->vpid, sendreq, max_data ); );
}
return OMPI_SUCCESS;
}
static mca_ptl_gm_recv_frag_t*
mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t * header, uint32_t msg_len )
mca_ptl_gm_recv_frag_ctrl( struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t * header, uint32_t msg_len )
{
mca_ptl_base_send_request_t *req;
@ -888,14 +887,12 @@ void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl )
}
}
typedef mca_ptl_gm_recv_frag_t* (frag_management_fct_t)( struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t *hdr, uint32_t msg_len );
frag_management_fct_t* frag_management_fct[MCA_PTL_HDR_TYPE_MAX] = {
NULL,
NULL, /* empty no header type equal to zero */
NULL, /* mca_ptl_gm_recv_frag_match, */
mca_ptl_gm_recv_frag_match,
(frag_management_fct_t*)mca_ptl_gm_recv_frag_frag, /* force the conversion to remove a warning */
mca_ptl_gm_ctrl_frag,
mca_ptl_gm_recv_frag_ctrl,
NULL,
NULL,
mca_ptl_gm_recv_frag_fin,
@ -903,7 +900,6 @@ frag_management_fct_t* frag_management_fct[MCA_PTL_HDR_TYPE_MAX] = {
int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_event_t* event )
{
mca_ptl_gm_recv_frag_t * frag;
mca_ptl_base_header_t *header = NULL, *release_buf;
frag_management_fct_t* function;
uint32_t priority = GM_HIGH_PRIORITY, msg_len;
@ -917,35 +913,34 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even
case GM_FAST_HIGH_RECV_EVENT:
case GM_FAST_HIGH_PEER_RECV_EVENT:
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
goto have_event;
break;
case GM_RECV_EVENT:
case GM_PEER_RECV_EVENT:
priority = GM_LOW_PRIORITY;
case GM_HIGH_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT:
header = release_buf;
goto have_event;
break;
case GM_NO_RECV_EVENT:
break;
default:
gm_unknown(ptl->gm_port, event);
return 1;
}
return 0;
have_event:
assert( header->hdr_common.hdr_type < MCA_PTL_HDR_TYPE_MAX );
function = frag_management_fct[header->hdr_common.hdr_type];
assert( NULL != function );
msg_len = gm_ntohl( event->recv.length );
frag = function( ptl, header, msg_len );
(void)function( ptl, header, msg_len );
gm_provide_receive_buffer( ptl->gm_port, release_buf, GM_SIZE, priority );
return 0;
}
void mca_ptl_gm_dump_header( char* str, mca_ptl_base_header_t* hdr )
{
switch( hdr->hdr_common.hdr_type ) {

Просмотреть файл

@ -123,6 +123,13 @@ extern "C" {
size_t* size,
int flags );
/*
*
*/
typedef mca_ptl_gm_recv_frag_t* (frag_management_fct_t)( struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t *hdr,
uint32_t msg_len );
#define OMPI_FREE_LIST_TRY_GET(fl, item) \
{ \
item = NULL; \
@ -182,15 +189,15 @@ extern "C" {
static inline void ompi_ptl_gm_init_pipeline( mca_ptl_gm_pipeline_info_t* pipeline )
{
int i;
int i;
pipeline->pos_register = 0;
pipeline->pos_remote = 0;
pipeline->pos_deregister = 0;
pipeline->pos_transfert = 0;
for( i = 0; i < GM_PIPELINE_DEPTH; i++ )
pipeline->lines[i].flags = 0;
}
pipeline->pos_register = 0;
pipeline->pos_remote = 0;
pipeline->pos_deregister = 0;
pipeline->pos_transfert = 0;
for( i = 0; i < GM_PIPELINE_DEPTH; i++ )
pipeline->lines[i].flags = 0;
}
static inline mca_ptl_gm_recv_frag_t*
mca_ptl_gm_alloc_recv_frag( struct mca_ptl_base_module_t *ptl )