Improve the pipeline management.
This commit was SVN r3983.
Этот коммит содержится в:
родитель
d83f31e3bc
Коммит
e8562569d3
@ -117,7 +117,7 @@ mca_ptl_gm_component_open(void)
|
||||
mca_ptl_gm_component.gm_eager_limit =
|
||||
mca_ptl_gm_param_register_int( "eager_limit", 32 * mca_ptl_gm_component.gm_segment_size );
|
||||
mca_ptl_gm_component.gm_rdma_frag_size =
|
||||
mca_ptl_gm_param_register_int ("rdma_frag_size", 1024 * 1024);
|
||||
mca_ptl_gm_param_register_int ("rdma_frag_size", 512 * 1024);
|
||||
|
||||
mca_ptl_gm_component.gm_free_list_num =
|
||||
mca_ptl_gm_param_register_int ("free_list_num", 256);
|
||||
|
@ -86,11 +86,12 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st
|
||||
#define DO_DEBUG( INST )
|
||||
|
||||
static inline
|
||||
int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag )
|
||||
int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int onlyifget )
|
||||
{
|
||||
mca_ptl_gm_peer_t* peer;
|
||||
gm_status_t status;
|
||||
mca_ptl_gm_pipeline_line_t *get_line, *reg_line, *dereg_line;
|
||||
size_t length;
|
||||
DO_DEBUG( int count;
|
||||
char buffer[128]; )
|
||||
|
||||
@ -104,49 +105,52 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag )
|
||||
get_line->local_memory.pval, get_line->length,
|
||||
GM_LOW_PRIORITY, peer->local_id, peer->port_number, mca_ptl_gm_get_callback, frag );
|
||||
get_line->flags ^= PTL_GM_PIPELINE_REMOTE;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start get %d (%d)", get_line->length, frag->pipeline.pos_transfert ); );
|
||||
frag->pipeline.pos_transfert = (frag->pipeline.pos_transfert + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start get %d", get_line->length ); )
|
||||
/* register the next segment */
|
||||
reg_line = &(frag->pipeline.lines[frag->pipeline.pos_register]);
|
||||
reg_line->length = frag->frag_recv.frag_base.frag_size - frag->frag_bytes_processed;
|
||||
if( 0 != reg_line->length ) {
|
||||
reg_line->hdr_flags = get_line->hdr_flags;
|
||||
if( reg_line->length > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
reg_line->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
reg_line->offset = get_line->offset + get_line->length;
|
||||
reg_line->local_memory.lval = 0L;
|
||||
reg_line->local_memory.pval = (char*)frag->frag_recv.frag_base.frag_addr +
|
||||
reg_line->offset;
|
||||
status = gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval,
|
||||
reg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
reg_line->local_memory.pval,
|
||||
reg_line->length, reg_line->offset );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %d", reg_line->length ); )
|
||||
reg_line->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += reg_line->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
} else if( 1 == onlyifget ) return OMPI_SUCCESS;
|
||||
|
||||
/* register the next segment */
|
||||
reg_line = &(frag->pipeline.lines[frag->pipeline.pos_register]);
|
||||
length = frag->frag_recv.frag_base.frag_size - frag->frag_bytes_processed;
|
||||
if( (0 != length) && !(reg_line->flags & PTL_GM_PIPELINE_REGISTER) ) {
|
||||
reg_line->length = length;
|
||||
reg_line->hdr_flags = get_line->hdr_flags;
|
||||
if( reg_line->length > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
reg_line->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
reg_line->offset = get_line->offset + get_line->length;
|
||||
reg_line->local_memory.lval = 0L;
|
||||
reg_line->local_memory.pval = (char*)frag->frag_recv.frag_base.frag_addr +
|
||||
reg_line->offset;
|
||||
status = gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval,
|
||||
reg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
reg_line->local_memory.pval,
|
||||
reg_line->length, reg_line->offset );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %d (%d)", reg_line->length, frag->pipeline.pos_register ); );
|
||||
reg_line->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += reg_line->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
}
|
||||
|
||||
/* deregister the previous one */
|
||||
dereg_line = &(frag->pipeline.lines[frag->pipeline.pos_deregister]);
|
||||
if( dereg_line->flags & PTL_GM_PIPELINE_DEREGISTER ) { /* something usefull */
|
||||
status = gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
/*status = gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
dereg_line->local_memory.pval,
|
||||
dereg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "unpinning receiver memory from get (%p, %u) failed \n",
|
||||
dereg_line->local_memory.pval,
|
||||
dereg_line->length );
|
||||
}
|
||||
}*/
|
||||
dereg_line->flags ^= (PTL_GM_PIPELINE_DEREGISTER|PTL_GM_PIPELINE_REGISTER);
|
||||
assert( dereg_line->flags == 0 );
|
||||
frag->frag_bytes_validated += dereg_line->length;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %d (%d)", dereg_line->length, frag->pipeline.pos_deregister ); )
|
||||
frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %d", dereg_line->length ); )
|
||||
}
|
||||
|
||||
if( frag->frag_recv.frag_base.frag_size <= frag->frag_bytes_validated ) {
|
||||
@ -154,7 +158,7 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag )
|
||||
frag->frag_recv.frag_request, frag->frag_recv.frag_base.frag_size,
|
||||
frag->frag_recv.frag_base.frag_size );
|
||||
OMPI_FREE_LIST_RETURN( &(peer->peer_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag );
|
||||
DO_DEBUG( count += sprintf( buffer + count, "finish" ); )
|
||||
DO_DEBUG( count += sprintf( buffer + count, " finish" ); )
|
||||
}
|
||||
DO_DEBUG( ompi_output( 0, "%s", buffer ); )
|
||||
return OMPI_SUCCESS;
|
||||
@ -204,12 +208,12 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
/* deregister previous segment */
|
||||
dereg_line = &(frag->pipeline.lines[frag->pipeline.pos_deregister]);
|
||||
if( dereg_line->flags & PTL_GM_PIPELINE_DEREGISTER ) { /* something usefull */
|
||||
status = gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
/*status = gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
dereg_line->local_memory.pval, dereg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "unpinning receiver memory from get (%p, %u) failed \n",
|
||||
dereg_line->local_memory.pval, dereg_line->length );
|
||||
}
|
||||
}*/
|
||||
dereg_line->flags ^= (PTL_GM_PIPELINE_REGISTER | PTL_GM_PIPELINE_DEREGISTER);
|
||||
assert( dereg_line->flags == 0 );
|
||||
frag->frag_bytes_validated += dereg_line->length;
|
||||
@ -271,7 +275,7 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
*/
|
||||
mca_pml_base_send_request_offset( sendreq,
|
||||
fragment->frag_send.frag_base.frag_size );
|
||||
DO_DEBUG( ompi_output( 0, "Start new send length %ld\n", *size ); )
|
||||
DO_DEBUG( ompi_output( 0, "sender start new send length %ld\n", *size ); )
|
||||
/* The first DMA memory buffer has been alocated in same time as the fragment */
|
||||
item = (ompi_list_item_t*)fragment->send_buf;
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
@ -350,13 +354,16 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
mca_ptl_gm_basic_frag_callback, (void *)hdr );
|
||||
|
||||
pipeline->length = fragment->frag_send.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
if( 0 == pipeline->length )
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
if( pipeline->length < (mca_ptl_gm_component.gm_rdma_frag_size >> 1) ) {
|
||||
if( 0 == pipeline->length )
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
else
|
||||
pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1);
|
||||
}
|
||||
pipeline->offset = fragment->frag_offset;
|
||||
pipeline->hdr_flags = fragment->frag_send.frag_base.frag_header.hdr_common.hdr_flags;
|
||||
pipeline->local_memory.lval = 0L;
|
||||
pipeline->local_memory.pval = (char*)fragment->frag_send.frag_base.frag_addr +
|
||||
pipeline->offset;
|
||||
pipeline->local_memory.pval = (char*)fragment->frag_send.frag_base.frag_addr + pipeline->offset;
|
||||
status = gm_register_memory( ptl_peer->peer_ptl->gm_port, pipeline->local_memory.pval,
|
||||
pipeline->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
@ -365,6 +372,7 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
}
|
||||
pipeline->flags = PTL_GM_PIPELINE_TRANSFERT;
|
||||
fragment->frag_bytes_processed += pipeline->length;
|
||||
DO_DEBUG( ompi_output( 0, "sender %p start register %d (%d)", fragment, pipeline->length, fragment->pipeline.pos_register ); );
|
||||
fragment->pipeline.pos_register = (fragment->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
return OMPI_SUCCESS;
|
||||
/* Now we are waiting for the ack message. Meanwhile we can register the sender first piece
|
||||
@ -622,8 +630,8 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st
|
||||
peer->get_started = false;
|
||||
/* mark the memory as being ready to be deregistered */
|
||||
frag->pipeline.lines[frag->pipeline.pos_deregister].flags |= PTL_GM_PIPELINE_DEREGISTER;
|
||||
DO_DEBUG( ompi_output( 0, "from get_callback" ); )
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag );
|
||||
DO_DEBUG( ompi_output( 0, "receiver from get_callback" ); )
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag, 0 );
|
||||
break;
|
||||
case GM_SEND_TIMED_OUT:
|
||||
ompi_output( 0, "mca_ptl_gm_get_callback timed out\n" );
|
||||
@ -706,8 +714,12 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length;
|
||||
pipeline = &(frag->pipeline.lines[0]);
|
||||
pipeline->length = frag->frag_recv.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
if( 0 == pipeline->length )
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
if( pipeline->length < (mca_ptl_gm_component.gm_rdma_frag_size >> 1) ) {
|
||||
if( 0 == pipeline->length )
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
else
|
||||
pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1);
|
||||
}
|
||||
pipeline->local_memory.lval = 0L;
|
||||
pipeline->local_memory.pval = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset;
|
||||
status = gm_register_memory( ptl->gm_port,
|
||||
@ -721,6 +733,7 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
pipeline->offset = hdr->hdr_frag.hdr_frag_offset;
|
||||
pipeline->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += pipeline->length;
|
||||
DO_DEBUG( ompi_output( 0, "receiver %p start register %d (%d)\n", frag, pipeline->length, frag->pipeline.pos_register ); );
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
} else {
|
||||
/* There is a kind of rendez-vous protocol used internally by the GM driver. If the amount of data
|
||||
@ -730,13 +743,13 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
* get a fragment with the remote_memory field not NULL it can start getting the data.
|
||||
*/
|
||||
pipeline = &(frag->pipeline.lines[frag->pipeline.pos_remote]);
|
||||
DO_DEBUG( ompi_output( 0, "receiver %p get remote memory length %lld (%d)\n", frag, hdr->hdr_frag.hdr_frag_length, frag->pipeline.pos_remote ); );
|
||||
frag->pipeline.pos_remote = (frag->pipeline.pos_remote + 1) % GM_PIPELINE_DEPTH;
|
||||
assert( pipeline->length == (uint32_t)hdr->hdr_frag.hdr_frag_length );
|
||||
assert( (pipeline->flags & PTL_GM_PIPELINE_REMOTE) == 0 );
|
||||
pipeline->remote_memory = hdr->registered_memory;
|
||||
pipeline->flags |= PTL_GM_PIPELINE_REMOTE;
|
||||
/*if( false == ((mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer)->get_started )*/
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag );
|
||||
/*if( false == ((mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer)->get_started )*/
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag, 1 );
|
||||
}
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user