Improuve the perfs a little bit by changing the rendez-vous protocol. Now we start sending burst
of data while waiting for the rendez-vous to establish. This commit was SVN r5154.
Этот коммит содержится в:
родитель
453b449596
Коммит
dd50d36423
@ -481,7 +481,6 @@ mca_ptl_gm_init( mca_ptl_gm_component_t * gm )
|
||||
0, /* maximum number of list allocated elements will be zero */
|
||||
0,
|
||||
NULL ); /* not using mpool */
|
||||
|
||||
return (mca_ptl_gm_component.gm_num_ptl_modules > 0 ? OMPI_SUCCESS : OMPI_ERR_OUT_OF_RESOURCE);
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only
|
||||
get_line->local_memory.pval, get_line->length,
|
||||
GM_LOW_PRIORITY, peer->local_id, peer->port_number, mca_ptl_gm_get_callback, frag );
|
||||
get_line->flags ^= PTL_GM_PIPELINE_REMOTE;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start get %lld (%d)", get_line->length, frag->pipeline.pos_transfert ); )
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start get %lld (%d)", get_line->length, frag->pipeline.pos_transfert ); );
|
||||
frag->pipeline.pos_transfert = (frag->pipeline.pos_transfert + 1) % GM_PIPELINE_DEPTH;
|
||||
} else if( 1 == onlyifget ) return OMPI_SUCCESS;
|
||||
|
||||
@ -129,7 +129,8 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only
|
||||
reg_line->length, reg_line->offset );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %lld (%d)", reg_line->length, frag->pipeline.pos_register ); )
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld(%d)",
|
||||
reg_line->length, reg_line->offset, frag->pipeline.pos_register ); )
|
||||
reg_line->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += reg_line->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
@ -149,7 +150,8 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only
|
||||
dereg_line->flags ^= (PTL_GM_PIPELINE_DEREGISTER|PTL_GM_PIPELINE_REGISTER);
|
||||
assert( dereg_line->flags == 0 );
|
||||
frag->frag_bytes_validated += dereg_line->length;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld (%d)", dereg_line->length, frag->pipeline.pos_deregister ); )
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld offset %ld (%d)", dereg_line->length,
|
||||
dereg_line->offset, frag->pipeline.pos_deregister ); )
|
||||
frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH;
|
||||
}
|
||||
|
||||
@ -217,7 +219,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
assert( dereg_line->flags == 0 );
|
||||
frag->frag_bytes_validated += dereg_line->length;
|
||||
frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld", dereg_line->length ); )
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld offset %ld",
|
||||
dereg_line->length, dereg_line->offset ); )
|
||||
}
|
||||
|
||||
/* register next segment */
|
||||
@ -245,7 +248,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
reg_line->flags |= PTL_GM_PIPELINE_TRANSFERT;
|
||||
frag->frag_bytes_processed += reg_line->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %lld", reg_line->length ); )
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld",
|
||||
reg_line->length, reg_line->offset ); )
|
||||
}
|
||||
}
|
||||
|
||||
@ -261,11 +265,15 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
int flags )
|
||||
{
|
||||
mca_ptl_gm_frag_header_t* hdr;
|
||||
uint64_t remaining_bytes;
|
||||
uint64_t remaining_bytes, burst_length;
|
||||
ompi_list_item_t *item;
|
||||
int rc = 0;
|
||||
gm_status_t status;
|
||||
mca_ptl_gm_pipeline_line_t* pipeline;
|
||||
ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor);
|
||||
int32_t freeAfter;
|
||||
uint32_t max_data, in_size;
|
||||
struct iovec iov;
|
||||
|
||||
fragment->frag_offset = offset;
|
||||
|
||||
@ -279,15 +287,13 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
item = (ompi_list_item_t*)fragment->send_buf;
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed;
|
||||
if( remaining_bytes > mca_ptl_gm_component.gm_eager_limit ) {
|
||||
burst_length = remaining_bytes % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
} else {
|
||||
burst_length = remaining_bytes;
|
||||
}
|
||||
|
||||
if( remaining_bytes <= mca_ptl_gm_component.gm_eager_limit ) { /* small protocol */
|
||||
int32_t freeAfter;
|
||||
uint32_t max_data, in_size;
|
||||
struct iovec iov;
|
||||
ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor);
|
||||
|
||||
/* If we have an eager send then we should send the rest of the data. */
|
||||
while( 0 < remaining_bytes ) {
|
||||
while( 0 < burst_length ) { /* send everything for the burst_length size */
|
||||
if( NULL == item ) {
|
||||
OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
|
||||
@ -295,8 +301,8 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
}
|
||||
iov.iov_base = (char*)item + sizeof(mca_ptl_base_frag_header_t);
|
||||
iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t);
|
||||
if( iov.iov_len >= remaining_bytes )
|
||||
iov.iov_len = remaining_bytes;
|
||||
if( iov.iov_len >= burst_length )
|
||||
iov.iov_len = burst_length;
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
|
||||
@ -312,8 +318,9 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
hdr->hdr_frag.hdr_frag_length = iov.iov_len;
|
||||
|
||||
fragment->frag_bytes_processed += iov.iov_len;
|
||||
remaining_bytes -= iov.iov_len;
|
||||
if( remaining_bytes == 0 )
|
||||
fragment->frag_bytes_validated += max_data;
|
||||
burst_length -= iov.iov_len;
|
||||
if( (burst_length == 0) && (remaining_bytes < mca_ptl_gm_component.gm_eager_limit) )
|
||||
hdr->hdr_frag.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
|
||||
|
||||
/* for the last piece set the header type to FIN */
|
||||
@ -323,6 +330,7 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
send_continue_callback, (void*)hdr );
|
||||
item = NULL; /* force to retrieve a new one on the next loop */
|
||||
}
|
||||
if( remaining_bytes < mca_ptl_gm_component.gm_eager_limit ) {
|
||||
*size = fragment->frag_bytes_processed;
|
||||
if( !(flags & MCA_PTL_FLAGS_ACK) ) {
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
@ -331,24 +339,46 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
if( NULL == item ) {
|
||||
OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
}
|
||||
/* recompute the remaining length */
|
||||
remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed;
|
||||
|
||||
pipeline = &(fragment->pipeline.lines[0]);
|
||||
/* Large set of data => we have to setup a rendez-vous protocol. Here we can
|
||||
* use the match header already filled in by the upper level and just complete it
|
||||
* with the others informations. When we reach this point the rendez-vous protocol
|
||||
* has already been realized so we know that the receiver expect our message.
|
||||
*/
|
||||
#if 0
|
||||
iov.iov_base = (char*)item + sizeof(mca_ptl_gm_frag_header_t);
|
||||
iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_gm_frag_header_t);
|
||||
if( iov.iov_len >= remaining_bytes )
|
||||
iov.iov_len = remaining_bytes;
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
|
||||
if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0)
|
||||
return OMPI_ERROR;
|
||||
fragment->frag_bytes_processed += max_data;
|
||||
fragment->frag_bytes_validated += max_data;
|
||||
#endif
|
||||
|
||||
hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag.hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = fragment;
|
||||
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
|
||||
hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset;
|
||||
hdr->hdr_frag.hdr_frag_length = *size;
|
||||
hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed;
|
||||
hdr->hdr_frag.hdr_frag_length = *size - fragment->frag_bytes_processed;
|
||||
hdr->registered_memory.lval = 0L;
|
||||
hdr->registered_memory.pval = NULL;
|
||||
|
||||
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE,
|
||||
sizeof(mca_ptl_base_frag_header_t) + sizeof(ompi_ptr_t),
|
||||
sizeof(mca_ptl_gm_frag_header_t) /*+ max_data*/,
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number,
|
||||
mca_ptl_gm_basic_frag_callback, (void *)hdr );
|
||||
|
||||
@ -362,12 +392,11 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1);
|
||||
}
|
||||
#endif
|
||||
if( fragment->frag_send.frag_base.frag_size < mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
pipeline->length = fragment->frag_send.frag_base.frag_size;
|
||||
} else {
|
||||
pipeline->length = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed;
|
||||
if( fragment->frag_send.frag_base.frag_size > mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
}
|
||||
pipeline->offset = fragment->frag_offset;
|
||||
pipeline->offset = fragment->frag_offset + fragment->frag_bytes_processed;
|
||||
pipeline->hdr_flags = fragment->frag_send.frag_base.frag_header.hdr_common.hdr_flags;
|
||||
pipeline->local_memory.lval = 0L;
|
||||
pipeline->local_memory.pval = (char*)fragment->frag_send.frag_base.frag_addr + pipeline->offset;
|
||||
@ -485,7 +514,7 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
|
||||
|
||||
static mca_ptl_gm_recv_frag_t*
|
||||
mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
mca_ptl_base_header_t * header )
|
||||
mca_ptl_base_header_t * header, uint32_t msg_len )
|
||||
{
|
||||
mca_pml_base_send_request_t *req;
|
||||
|
||||
@ -530,7 +559,7 @@ mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
*/
|
||||
static mca_ptl_gm_recv_frag_t*
|
||||
mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
|
||||
mca_ptl_base_header_t* hdr )
|
||||
mca_ptl_base_header_t* hdr, uint32_t msg_len )
|
||||
{
|
||||
mca_ptl_gm_recv_frag_t* recv_frag;
|
||||
bool matched;
|
||||
@ -657,7 +686,7 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st
|
||||
|
||||
static mca_ptl_gm_recv_frag_t*
|
||||
mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
mca_ptl_gm_frag_header_t* hdr )
|
||||
mca_ptl_gm_frag_header_t* hdr, uint32_t msg_len )
|
||||
{
|
||||
mca_pml_base_recv_request_t *request;
|
||||
ompi_convertor_t local_convertor, *convertor;
|
||||
@ -723,6 +752,17 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer,
|
||||
&(frag->frag_recv.frag_base) );
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length;
|
||||
|
||||
iov.iov_base = (char*)hdr + sizeof(mca_ptl_gm_frag_header_t);
|
||||
iov.iov_len = msg_len - sizeof(mca_ptl_gm_frag_header_t);
|
||||
iov_count = 1;
|
||||
max_data = iov.iov_len;
|
||||
freeAfter = 0; /* unused here */
|
||||
rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter );
|
||||
assert( 0 == freeAfter );
|
||||
frag->frag_bytes_processed += max_data;
|
||||
frag->frag_bytes_validated += max_data;
|
||||
|
||||
pipeline = &(frag->pipeline.lines[0]);
|
||||
#if 0
|
||||
pipeline->length = frag->frag_recv.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
@ -734,25 +774,24 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1);
|
||||
}
|
||||
#endif
|
||||
if( frag->frag_recv.frag_base.frag_size < mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
pipeline->length = frag->frag_recv.frag_base.frag_size;
|
||||
} else {
|
||||
pipeline->length = frag->frag_recv.frag_base.frag_size - max_data;
|
||||
if( pipeline->length > mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
}
|
||||
pipeline->offset = hdr->hdr_frag.hdr_frag_offset + max_data;
|
||||
pipeline->local_memory.lval = 0L;
|
||||
pipeline->local_memory.pval = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset;
|
||||
pipeline->local_memory.pval = (char*)request->req_base.req_addr + pipeline->offset;
|
||||
status = gm_register_memory( ptl->gm_port,
|
||||
pipeline->local_memory.pval, pipeline->length );
|
||||
if( status != GM_SUCCESS ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
(char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset,
|
||||
(char*)request->req_base.req_addr + pipeline->offset,
|
||||
pipeline->length, hdr->hdr_frag.hdr_frag_offset );
|
||||
return NULL;
|
||||
}
|
||||
pipeline->offset = hdr->hdr_frag.hdr_frag_offset;
|
||||
pipeline->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += pipeline->length;
|
||||
DO_DEBUG( ompi_output( 0, "receiver %p start register %lld (%d)\n", frag, pipeline->length, frag->pipeline.pos_register ); )
|
||||
DO_DEBUG( ompi_output( 0, "receiver %p start register %lld (%d) offset %ld\n", frag, pipeline->length, frag->pipeline.pos_register, pipeline->offset ); )
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
} else {
|
||||
/* There is a kind of rendez-vous protocol used internally by the GM driver. If the amount of data
|
||||
@ -777,7 +816,7 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
|
||||
static mca_ptl_gm_recv_frag_t*
|
||||
mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t* ptl,
|
||||
mca_ptl_base_header_t* hdr )
|
||||
mca_ptl_base_header_t* hdr, uint32_t msg_len )
|
||||
{
|
||||
mca_ptl_gm_send_frag_t* frag;
|
||||
|
||||
@ -836,7 +875,7 @@ void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl )
|
||||
}
|
||||
|
||||
typedef mca_ptl_gm_recv_frag_t* (frag_management_fct_t)( struct mca_ptl_gm_module_t *ptl,
|
||||
mca_ptl_base_header_t *hdr );
|
||||
mca_ptl_base_header_t *hdr, uint32_t msg_len );
|
||||
frag_management_fct_t* frag_management_fct[MCA_PTL_HDR_TYPE_MAX] = {
|
||||
NULL,
|
||||
mca_ptl_gm_recv_frag_match,
|
||||
@ -853,7 +892,7 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even
|
||||
mca_ptl_gm_recv_frag_t * frag;
|
||||
mca_ptl_base_header_t *header = NULL, *release_buf;
|
||||
frag_management_fct_t* function;
|
||||
uint32_t priority = GM_HIGH_PRIORITY;
|
||||
uint32_t priority = GM_HIGH_PRIORITY, msg_len;
|
||||
|
||||
release_buf = (mca_ptl_base_header_t*)gm_ntohp(event->recv.buffer);
|
||||
|
||||
@ -881,18 +920,13 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even
|
||||
return 0;
|
||||
|
||||
have_event:
|
||||
if( header->hdr_common.hdr_type >= MCA_PTL_HDR_TYPE_MAX ) {
|
||||
ompi_output( 0, "[%s:%d] unexpected frag type %d\n",
|
||||
__FILE__, __LINE__, header->hdr_common.hdr_type );
|
||||
} else {
|
||||
assert( header->hdr_common.hdr_type < MCA_PTL_HDR_TYPE_MAX );
|
||||
function = frag_management_fct[header->hdr_common.hdr_type];
|
||||
if( NULL == function ) {
|
||||
ompi_output( 0, "[%s:%d] NOT yet implemented function for the header type %d\n",
|
||||
__FILE__, __LINE__, header->hdr_common.hdr_type );
|
||||
} else {
|
||||
frag = function( ptl, header );
|
||||
}
|
||||
}
|
||||
assert( NULL != function );
|
||||
|
||||
msg_len = gm_ntohl( event->recv.length );
|
||||
frag = function( ptl, header, msg_len );
|
||||
|
||||
gm_provide_receive_buffer( ptl->gm_port, release_buf, GM_SIZE, priority );
|
||||
|
||||
return 0;
|
||||
|
@ -48,7 +48,7 @@ extern "C" {
|
||||
OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t);
|
||||
|
||||
/* specific header for GM rendezvous protocol. It will be filled up by the sender
|
||||
* and should be ab;e to hold a pointer to the last registered memory location.
|
||||
* and should be able to hold a pointer to the last registered memory location.
|
||||
*/
|
||||
struct mca_ptl_gm_frag_header_t {
|
||||
mca_ptl_base_frag_header_t hdr_frag;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user