Big change: a new pipelined protocol (I still have to investigate what's the correct size for the fragments)
Some minor cleanups (remove useless variables, functions). Add some comments. Add a dump header function. (It should be moved from GM driver to some PTL generic file). This commit was SVN r3982.
Этот коммит содержится в:
родитель
2284867095
Коммит
d83f31e3bc
@ -219,6 +219,9 @@ extern "C" {
|
||||
extern char* gm_get_local_buffer( void );
|
||||
extern void gm_release_local_buffer( char* ptr );
|
||||
|
||||
union mca_ptl_base_header_t;
|
||||
void mca_ptl_gm_dump_header( char* str, union mca_ptl_base_header_t* hdr );
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -374,7 +374,14 @@ mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
for( i = 0; i < ptl->num_recv_tokens; i++ ) {
|
||||
for( i = 0; i < 2; i++ ) {
|
||||
OMPI_FREE_LIST_RETURN( &(ptl->gm_recv_frags_free), (ompi_list_item_t *)free_rfragment );
|
||||
free_rfragment++;
|
||||
|
||||
gm_provide_receive_buffer( ptl->gm_port, (char*)ptl->gm_recv_dma_memory + i * mca_ptl_gm_component.gm_segment_size,
|
||||
GM_SIZE, GM_HIGH_PRIORITY );
|
||||
}
|
||||
for( i = 2; i < ptl->num_recv_tokens; i++ ) {
|
||||
OMPI_FREE_LIST_RETURN( &(ptl->gm_recv_frags_free), (ompi_list_item_t *)free_rfragment );
|
||||
free_rfragment++;
|
||||
|
||||
|
@ -54,6 +54,7 @@ struct mca_ptl_gm_peer_t {
|
||||
int max_credits;
|
||||
int resending;
|
||||
int num_resend;
|
||||
bool get_started;
|
||||
};
|
||||
typedef struct mca_ptl_gm_peer_t mca_ptl_gm_peer_t;
|
||||
|
||||
|
@ -55,13 +55,13 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta
|
||||
}
|
||||
break;
|
||||
case GM_SEND_TIMED_OUT:
|
||||
printf( "send_continue timed out\n" );
|
||||
ompi_output( 0, "send_continue timed out\n" );
|
||||
break;
|
||||
case GM_SEND_DROPPED:
|
||||
printf( "send_continue dropped\n" );
|
||||
ompi_output( 0, "send_continue dropped\n" );
|
||||
break;
|
||||
default:
|
||||
printf( "send_continue other error %d\n", status );
|
||||
ompi_output( 0, "send_continue other error %d\n", status );
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,64 +81,172 @@ static void mca_ptl_gm_basic_frag_callback( struct gm_port* port, void* context,
|
||||
ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 );
|
||||
}
|
||||
|
||||
static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_status_t status );
|
||||
|
||||
#define DO_DEBUG( INST )
|
||||
|
||||
static inline
|
||||
int mca_ptl_gm_send_next_long_segment( mca_ptl_gm_send_frag_t* frag,
|
||||
int flags )
|
||||
int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag )
|
||||
{
|
||||
struct mca_ptl_gm_peer_t* ptl_peer;
|
||||
mca_ptl_gm_peer_t* peer;
|
||||
gm_status_t status;
|
||||
char* pointer;
|
||||
uint64_t length;
|
||||
int32_t hdr_flags = 0;
|
||||
mca_ptl_gm_frag_header_t* hdr;
|
||||
mca_ptl_gm_pipeline_line_t *get_line, *reg_line, *dereg_line;
|
||||
DO_DEBUG( int count;
|
||||
char buffer[128]; )
|
||||
|
||||
ptl_peer = (struct mca_ptl_gm_peer_t*)frag->frag_send.frag_base.frag_peer;
|
||||
|
||||
length = frag->frag_send.frag_base.frag_size - frag->frag_bytes_processed;
|
||||
if( length <= mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
hdr_flags = PTL_FLAG_GM_LAST_FRAGMENT;
|
||||
} else {
|
||||
length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
peer = (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer;
|
||||
DO_DEBUG( count = sprintf( buffer, "receiver %p", (void*)frag ); )
|
||||
/* start the current get */
|
||||
get_line = &(frag->pipeline.lines[frag->pipeline.pos_transfert]);
|
||||
if( (PTL_GM_PIPELINE_TRANSFERT & get_line->flags) == PTL_GM_PIPELINE_TRANSFERT ) {
|
||||
peer->get_started = true;
|
||||
gm_get( peer->peer_ptl->gm_port, get_line->remote_memory.lval,
|
||||
get_line->local_memory.pval, get_line->length,
|
||||
GM_LOW_PRIORITY, peer->local_id, peer->port_number, mca_ptl_gm_get_callback, frag );
|
||||
get_line->flags ^= PTL_GM_PIPELINE_REMOTE;
|
||||
frag->pipeline.pos_transfert = (frag->pipeline.pos_transfert + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start get %d", get_line->length ); )
|
||||
/* register the next segment */
|
||||
reg_line = &(frag->pipeline.lines[frag->pipeline.pos_register]);
|
||||
reg_line->length = frag->frag_recv.frag_base.frag_size - frag->frag_bytes_processed;
|
||||
if( 0 != reg_line->length ) {
|
||||
reg_line->hdr_flags = get_line->hdr_flags;
|
||||
if( reg_line->length > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
reg_line->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
reg_line->offset = get_line->offset + get_line->length;
|
||||
reg_line->local_memory.lval = 0L;
|
||||
reg_line->local_memory.pval = (char*)frag->frag_recv.frag_base.frag_addr +
|
||||
reg_line->offset;
|
||||
status = gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval,
|
||||
reg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
reg_line->local_memory.pval,
|
||||
reg_line->length, reg_line->offset );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %d", reg_line->length ); )
|
||||
reg_line->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += reg_line->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
}
|
||||
}
|
||||
/* deregister the previous one */
|
||||
dereg_line = &(frag->pipeline.lines[frag->pipeline.pos_deregister]);
|
||||
if( dereg_line->flags & PTL_GM_PIPELINE_DEREGISTER ) { /* something usefull */
|
||||
status = gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
dereg_line->local_memory.pval,
|
||||
dereg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "unpinning receiver memory from get (%p, %u) failed \n",
|
||||
dereg_line->local_memory.pval,
|
||||
dereg_line->length );
|
||||
}
|
||||
dereg_line->flags ^= (PTL_GM_PIPELINE_DEREGISTER|PTL_GM_PIPELINE_REGISTER);
|
||||
assert( dereg_line->flags == 0 );
|
||||
frag->frag_bytes_validated += dereg_line->length;
|
||||
frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %d", dereg_line->length ); )
|
||||
}
|
||||
pointer = (char*)frag->frag_send.frag_base.frag_addr + frag->frag_offset + frag->frag_bytes_processed;
|
||||
|
||||
if( flags & GM_PTL_SEND_MESSAGE ) {
|
||||
if( frag->frag_recv.frag_base.frag_size <= frag->frag_bytes_validated ) {
|
||||
peer->peer_ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)peer->peer_ptl,
|
||||
frag->frag_recv.frag_request, frag->frag_recv.frag_base.frag_size,
|
||||
frag->frag_recv.frag_base.frag_size );
|
||||
OMPI_FREE_LIST_RETURN( &(peer->peer_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag );
|
||||
DO_DEBUG( count += sprintf( buffer + count, "finish" ); )
|
||||
}
|
||||
DO_DEBUG( ompi_output( 0, "%s", buffer ); )
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline
|
||||
int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
{
|
||||
mca_ptl_gm_peer_t* peer;
|
||||
gm_status_t status;
|
||||
mca_ptl_gm_pipeline_line_t *send_line, *reg_line, *dereg_line;
|
||||
mca_ptl_gm_frag_header_t* hdr;
|
||||
DO_DEBUG( int count;
|
||||
char buffer[128]; )
|
||||
|
||||
peer = (mca_ptl_gm_peer_t*)frag->frag_send.frag_base.frag_peer;
|
||||
DO_DEBUG( count = sprintf( buffer, "sender %p", (void*)frag ); )
|
||||
/* send current segment */
|
||||
send_line = &(frag->pipeline.lines[frag->pipeline.pos_transfert]);
|
||||
if( (send_line->flags & PTL_GM_PIPELINE_TRANSFERT) == PTL_GM_PIPELINE_TRANSFERT ) {
|
||||
ompi_list_item_t* item;
|
||||
int32_t rc;
|
||||
|
||||
OMPI_FREE_LIST_GET( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
|
||||
OMPI_FREE_LIST_WAIT( &(peer->peer_ptl->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(peer->peer_ptl->num_send_tokens), 1 );
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
|
||||
|
||||
hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag.hdr_common.hdr_flags = frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags | hdr_flags;
|
||||
hdr->hdr_frag.hdr_common.hdr_flags = send_line->hdr_flags | frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = frag;
|
||||
hdr->hdr_frag.hdr_dst_ptr = frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match;
|
||||
hdr->hdr_frag.hdr_frag_offset = frag->frag_bytes_processed;
|
||||
hdr->hdr_frag.hdr_frag_length = length;
|
||||
hdr->registered_memory.lval = 0L;
|
||||
hdr->registered_memory.pval = pointer;
|
||||
|
||||
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, hdr,
|
||||
hdr->hdr_frag.hdr_frag_offset = send_line->offset;
|
||||
hdr->hdr_frag.hdr_frag_length = send_line->length;
|
||||
hdr->registered_memory = send_line->local_memory;
|
||||
|
||||
gm_send_to_peer_with_callback( peer->peer_ptl->gm_port, hdr,
|
||||
GM_SIZE, sizeof(mca_ptl_gm_frag_header_t),
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
GM_HIGH_PRIORITY, peer->local_id,
|
||||
send_continue_callback, (void*)hdr );
|
||||
frag->frag_bytes_processed += length;
|
||||
pointer += length;
|
||||
length = frag->frag_send.frag_base.frag_size - frag->frag_bytes_processed;
|
||||
if( length > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
|
||||
send_line->flags ^= PTL_GM_PIPELINE_REMOTE;
|
||||
frag->pipeline.pos_transfert = (frag->pipeline.pos_transfert + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " send new fragment %d", send_line->length ); )
|
||||
}
|
||||
|
||||
/* deregister previous segment */
|
||||
dereg_line = &(frag->pipeline.lines[frag->pipeline.pos_deregister]);
|
||||
if( dereg_line->flags & PTL_GM_PIPELINE_DEREGISTER ) { /* something usefull */
|
||||
status = gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
dereg_line->local_memory.pval, dereg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "unpinning receiver memory from get (%p, %u) failed \n",
|
||||
dereg_line->local_memory.pval, dereg_line->length );
|
||||
}
|
||||
dereg_line->flags ^= (PTL_GM_PIPELINE_REGISTER | PTL_GM_PIPELINE_DEREGISTER);
|
||||
assert( dereg_line->flags == 0 );
|
||||
frag->frag_bytes_validated += dereg_line->length;
|
||||
frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %d", dereg_line->length ); )
|
||||
}
|
||||
|
||||
if( (flags & GM_PTL_REGISTER_MEMORY) && (0 != length) ) {
|
||||
status = gm_register_memory( ptl_peer->peer_ptl->gm_port, pointer, length );
|
||||
if( status != GM_SUCCESS ) {
|
||||
ompi_output( 0, "Cannot register sender memory (%p, %lld) bytes offset %d\n",
|
||||
pointer, length, frag->frag_bytes_processed );
|
||||
return OMPI_ERROR;
|
||||
/* register next segment */
|
||||
reg_line = &(frag->pipeline.lines[frag->pipeline.pos_register]);
|
||||
if( !(reg_line->flags & PTL_GM_PIPELINE_REGISTER) ) {
|
||||
reg_line->length = frag->frag_send.frag_base.frag_size - frag->frag_bytes_processed;
|
||||
if( 0 != reg_line->length ) {
|
||||
reg_line->hdr_flags = frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags;
|
||||
if( reg_line->length > mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
reg_line->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
} else {
|
||||
reg_line->hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
|
||||
}
|
||||
reg_line->offset = send_line->offset + send_line->length;
|
||||
reg_line->local_memory.lval = 0L;
|
||||
reg_line->local_memory.pval = (char*)frag->frag_send.frag_base.frag_addr +
|
||||
reg_line->offset;
|
||||
status = gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval,
|
||||
reg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
reg_line->local_memory.pval, reg_line->length, reg_line->offset );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
reg_line->flags |= PTL_GM_PIPELINE_TRANSFERT;
|
||||
frag->frag_bytes_processed += reg_line->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %d", reg_line->length ); )
|
||||
}
|
||||
}
|
||||
|
||||
DO_DEBUG( ompi_output( 0, "%s", buffer ); )
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -153,6 +261,8 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
uint64_t remaining_bytes;
|
||||
ompi_list_item_t *item;
|
||||
int rc = 0;
|
||||
gm_status_t status;
|
||||
mca_ptl_gm_pipeline_line_t* pipeline;
|
||||
|
||||
fragment->frag_offset = offset;
|
||||
|
||||
@ -161,7 +271,7 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
*/
|
||||
mca_pml_base_send_request_offset( sendreq,
|
||||
fragment->frag_send.frag_base.frag_size );
|
||||
|
||||
DO_DEBUG( ompi_output( 0, "Start new send length %ld\n", *size ); )
|
||||
/* The first DMA memory buffer has been alocated in same time as the fragment */
|
||||
item = (ompi_list_item_t*)fragment->send_buf;
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
@ -218,6 +328,7 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
pipeline = &(fragment->pipeline.lines[0]);
|
||||
/* Large set of data => we have to setup a rendez-vous protocol. Here we can
|
||||
* use the match header already filled in by the upper level and just complete it
|
||||
* with the others informations. When we reach this point the rendez-vous protocol
|
||||
@ -237,11 +348,29 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
GM_SIZE, sizeof(mca_ptl_base_frag_header_t) + sizeof(ompi_ptr_t),
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
mca_ptl_gm_basic_frag_callback, (void *)hdr );
|
||||
|
||||
|
||||
pipeline->length = fragment->frag_send.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
if( 0 == pipeline->length )
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
pipeline->offset = fragment->frag_offset;
|
||||
pipeline->hdr_flags = fragment->frag_send.frag_base.frag_header.hdr_common.hdr_flags;
|
||||
pipeline->local_memory.lval = 0L;
|
||||
pipeline->local_memory.pval = (char*)fragment->frag_send.frag_base.frag_addr +
|
||||
pipeline->offset;
|
||||
status = gm_register_memory( ptl_peer->peer_ptl->gm_port, pipeline->local_memory.pval,
|
||||
pipeline->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
pipeline->local_memory.pval, pipeline->length, pipeline->offset );
|
||||
}
|
||||
pipeline->flags = PTL_GM_PIPELINE_TRANSFERT;
|
||||
fragment->frag_bytes_processed += pipeline->length;
|
||||
fragment->pipeline.pos_register = (fragment->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
return OMPI_SUCCESS;
|
||||
/* Now we are waiting for the ack message. Meanwhile we can register the sender first piece
|
||||
* of data. In this way we have a recovery between the expensive registration on both sides.
|
||||
*/
|
||||
return mca_ptl_gm_send_next_long_segment( fragment, GM_PTL_REGISTER_MEMORY );
|
||||
/*return mca_ptl_gm_send_next_long_segment( fragment, GM_PTL_REGISTER_MEMORY );*/
|
||||
}
|
||||
|
||||
static void send_match_callback( struct gm_port* port, void* context, gm_status_t status )
|
||||
@ -471,7 +600,7 @@ static int mca_ptl_gm_send_quick_fin_message( struct mca_ptl_gm_peer_t* ptl_peer
|
||||
hdr->hdr_ack.hdr_dst_size = frag->frag_header.hdr_frag.hdr_frag_length;
|
||||
|
||||
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, sizeof(mca_ptl_base_ack_header_t),
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
GM_HIGH_PRIORITY, ptl_peer->local_id,
|
||||
recv_short_callback, (void*)hdr );
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -483,49 +612,27 @@ static int mca_ptl_gm_send_quick_fin_message( struct mca_ptl_gm_peer_t* ptl_peer
|
||||
*/
|
||||
static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_status_t status )
|
||||
{
|
||||
mca_ptl_gm_module_t* gm_ptl;
|
||||
mca_ptl_gm_recv_frag_t* frag;
|
||||
mca_pml_base_recv_request_t *request;
|
||||
mca_ptl_gm_peer_t* peer;
|
||||
uint64_t length;
|
||||
void* pointer;
|
||||
mca_ptl_gm_recv_frag_t* frag = (mca_ptl_gm_recv_frag_t*)context;
|
||||
mca_ptl_gm_peer_t* peer = (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer;
|
||||
|
||||
frag = (mca_ptl_gm_recv_frag_t*)context;
|
||||
gm_ptl = (mca_ptl_gm_module_t *)frag->frag_recv.frag_base.frag_owner;
|
||||
request = frag->frag_recv.frag_request;
|
||||
peer = (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer;
|
||||
|
||||
switch( status ) {
|
||||
case GM_SUCCESS:
|
||||
pointer = (char*)request->req_base.req_addr + frag->frag_offset + frag->frag_bytes_validated;
|
||||
length = frag->frag_recv.frag_base.frag_size - frag->frag_bytes_validated;
|
||||
if( length > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
|
||||
/* send an ack message to the sender */
|
||||
mca_ptl_gm_send_quick_fin_message( peer, &(frag->frag_recv.frag_base) );
|
||||
|
||||
frag->frag_bytes_validated += length;
|
||||
|
||||
if( frag->frag_recv.frag_base.frag_size <= frag->frag_bytes_validated ) {
|
||||
gm_ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)gm_ptl, request,
|
||||
frag->frag_recv.frag_base.frag_size,
|
||||
frag->frag_recv.frag_base.frag_size );
|
||||
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag );
|
||||
}
|
||||
status = gm_deregister_memory( peer->peer_ptl->gm_port, pointer, length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
OMPI_OUTPUT( (0, "unpinning receiver memory from get (%p, %u) failed \n", pointer, length) );
|
||||
}
|
||||
peer->get_started = false;
|
||||
/* mark the memory as being ready to be deregistered */
|
||||
frag->pipeline.lines[frag->pipeline.pos_deregister].flags |= PTL_GM_PIPELINE_DEREGISTER;
|
||||
DO_DEBUG( ompi_output( 0, "from get_callback" ); )
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag );
|
||||
break;
|
||||
case GM_SEND_TIMED_OUT:
|
||||
printf( "mca_ptl_gm_get_callback timed out\n" );
|
||||
ompi_output( 0, "mca_ptl_gm_get_callback timed out\n" );
|
||||
break;
|
||||
case GM_SEND_DROPPED:
|
||||
printf( "mca_ptl_gm_get_callback dropped\n" );
|
||||
ompi_output( 0, "mca_ptl_gm_get_callback dropped\n" );
|
||||
break;
|
||||
default:
|
||||
printf( "mca_ptl_gm_get_callback other error %d\n", status );
|
||||
ompi_output( 0, "mca_ptl_gm_get_callback other error %d\n", status );
|
||||
}
|
||||
}
|
||||
|
||||
@ -538,15 +645,15 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
struct iovec iov;
|
||||
uint32_t iov_count, max_data;
|
||||
int32_t freeAfter, rc;
|
||||
mca_ptl_gm_recv_frag_t* recv_frag;
|
||||
mca_ptl_gm_recv_frag_t* frag;
|
||||
|
||||
if( hdr->hdr_frag.hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) {
|
||||
recv_frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
|
||||
request = (mca_pml_base_recv_request_t*)recv_frag->frag_recv.frag_request;
|
||||
frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
|
||||
request = (mca_pml_base_recv_request_t*)frag->frag_recv.frag_request;
|
||||
/* here we can have a synchronisation problem if several threads work in same time
|
||||
* with the same request. The only question is if it's possible ?
|
||||
*/
|
||||
convertor = &(recv_frag->frag_recv.frag_base.frag_convertor);
|
||||
convertor = &(frag->frag_recv.frag_base.frag_convertor);
|
||||
} else {
|
||||
request = (mca_pml_base_recv_request_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
|
||||
|
||||
@ -556,18 +663,19 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
convertor = &local_convertor;
|
||||
convertor->stack_size = 0; /* dont let the convertor free the stack */
|
||||
ompi_convertor_copy( proc->proc_convertor, convertor );
|
||||
recv_frag = NULL;
|
||||
frag = NULL;
|
||||
} else { /* large message => we have to create a receive fragment */
|
||||
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
|
||||
recv_frag->frag_recv.frag_request = request;
|
||||
recv_frag->frag_offset = hdr->hdr_frag.hdr_frag_offset;
|
||||
recv_frag->matched = true;
|
||||
recv_frag->frag_recv.frag_base.frag_size = hdr->hdr_frag.hdr_frag_length;
|
||||
recv_frag->frag_recv.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)
|
||||
frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
|
||||
frag->frag_recv.frag_request = request;
|
||||
frag->frag_offset = hdr->hdr_frag.hdr_frag_offset;
|
||||
frag->matched = true;
|
||||
frag->frag_recv.frag_base.frag_addr = frag->frag_recv.frag_request->req_base.req_addr;
|
||||
frag->frag_recv.frag_base.frag_size = hdr->hdr_frag.hdr_frag_length;
|
||||
frag->frag_recv.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)
|
||||
mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm,
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE,
|
||||
(struct mca_ptl_base_module_t*)ptl );
|
||||
convertor = &(recv_frag->frag_recv.frag_base.frag_convertor);
|
||||
convertor = &(frag->frag_recv.frag_base.frag_convertor);
|
||||
}
|
||||
ompi_convertor_init_for_recv( convertor, 0,
|
||||
request->req_base.req_datatype,
|
||||
@ -576,7 +684,7 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
hdr->hdr_frag.hdr_frag_offset, NULL );
|
||||
}
|
||||
|
||||
if( NULL == recv_frag ) {
|
||||
if( NULL == frag ) {
|
||||
iov.iov_base = (char*)hdr + sizeof(mca_ptl_base_frag_header_t);
|
||||
iov.iov_len = hdr->hdr_frag.hdr_frag_length;
|
||||
iov_count = 1;
|
||||
@ -587,19 +695,33 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data );
|
||||
} else {
|
||||
gm_status_t status;
|
||||
mca_ptl_gm_peer_t* peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer;
|
||||
char* pointer = (char*)request->req_base.req_addr + recv_frag->frag_offset;
|
||||
uint64_t length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
mca_ptl_gm_pipeline_line_t* pipeline;
|
||||
|
||||
recv_frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag;
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag;
|
||||
if( NULL == hdr->registered_memory.pval ) { /* first round of the local rendez-vous protocol */
|
||||
/* send an ack message to the sender ... quick hack (TODO) */
|
||||
recv_frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = 0;
|
||||
mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer,
|
||||
&(recv_frag->frag_recv.frag_base) );
|
||||
recv_frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length;
|
||||
if( length >= hdr->hdr_frag.hdr_frag_length )
|
||||
length = hdr->hdr_frag.hdr_frag_length;
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = 0;
|
||||
mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer,
|
||||
&(frag->frag_recv.frag_base) );
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length;
|
||||
pipeline = &(frag->pipeline.lines[0]);
|
||||
pipeline->length = frag->frag_recv.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
if( 0 == pipeline->length )
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
pipeline->local_memory.lval = 0L;
|
||||
pipeline->local_memory.pval = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset;
|
||||
status = gm_register_memory( ptl->gm_port,
|
||||
pipeline->local_memory.pval, pipeline->length );
|
||||
if( status != GM_SUCCESS ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
(char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset,
|
||||
pipeline->length, hdr->hdr_frag.hdr_frag_offset );
|
||||
return NULL;
|
||||
}
|
||||
pipeline->offset = hdr->hdr_frag.hdr_frag_offset;
|
||||
pipeline->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += pipeline->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
} else {
|
||||
/* There is a kind of rendez-vous protocol used internally by the GM driver. If the amount of data
|
||||
* to transfert is large enough, then the sender will start sending a frag message with the
|
||||
@ -607,23 +729,14 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
* It will allow the receiver to start to register it's own memory. Later when the receiver
|
||||
* get a fragment with the remote_memory field not NULL it can start getting the data.
|
||||
*/
|
||||
pointer += hdr->hdr_frag.hdr_frag_offset;
|
||||
gm_get( ptl->gm_port, hdr->registered_memory.lval, pointer, hdr->hdr_frag.hdr_frag_length,
|
||||
GM_LOW_PRIORITY, peer->local_id, peer->port_number, mca_ptl_gm_get_callback, recv_frag );
|
||||
pointer += hdr->hdr_frag.hdr_frag_length;
|
||||
length = recv_frag->frag_recv.frag_base.frag_size - recv_frag->frag_bytes_processed;
|
||||
if( length > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
}
|
||||
|
||||
if( 0 != length ) {
|
||||
status = gm_register_memory( ptl->gm_port, pointer, length );
|
||||
if( status != GM_SUCCESS ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
pointer, length, hdr->hdr_frag.hdr_frag_offset );
|
||||
return NULL;
|
||||
}
|
||||
recv_frag->frag_bytes_processed += length;
|
||||
pipeline = &(frag->pipeline.lines[frag->pipeline.pos_remote]);
|
||||
frag->pipeline.pos_remote = (frag->pipeline.pos_remote + 1) % GM_PIPELINE_DEPTH;
|
||||
assert( pipeline->length == (uint32_t)hdr->hdr_frag.hdr_frag_length );
|
||||
assert( (pipeline->flags & PTL_GM_PIPELINE_REMOTE) == 0 );
|
||||
pipeline->remote_memory = hdr->registered_memory;
|
||||
pipeline->flags |= PTL_GM_PIPELINE_REMOTE;
|
||||
/*if( false == ((mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer)->get_started )*/
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag );
|
||||
}
|
||||
}
|
||||
|
||||
@ -635,45 +748,28 @@ mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t* ptl,
|
||||
mca_ptl_base_header_t* hdr )
|
||||
{
|
||||
mca_ptl_gm_send_frag_t* frag;
|
||||
gm_status_t status;
|
||||
char* memory_to_deregister;
|
||||
uint64_t length;
|
||||
|
||||
frag = (mca_ptl_gm_send_frag_t*)hdr->hdr_ack.hdr_src_ptr.pval;
|
||||
|
||||
frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags = hdr->hdr_common.hdr_flags;
|
||||
frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match = hdr->hdr_ack.hdr_dst_match;
|
||||
|
||||
memory_to_deregister = (char*)frag->frag_send.frag_base.frag_addr
|
||||
+ frag->frag_offset + frag->frag_bytes_validated;
|
||||
length = frag->frag_send.frag_base.frag_size - frag->frag_bytes_validated;
|
||||
|
||||
if( 0 == frag->frag_bytes_processed ) {
|
||||
if( 0 == hdr->hdr_ack.hdr_dst_size ) {
|
||||
/* I just receive the ack for the first fragment => setup the pipeline */
|
||||
mca_ptl_gm_send_next_long_segment( frag, GM_PTL_SEND_MESSAGE | GM_PTL_REGISTER_MEMORY );
|
||||
mca_ptl_gm_sender_advance_pipeline( frag );
|
||||
} else {
|
||||
/* mark the memory as ready to be deregistered */
|
||||
frag->pipeline.lines[frag->pipeline.pos_deregister].flags |= PTL_GM_PIPELINE_DEREGISTER;
|
||||
}
|
||||
/* continue the pipeline ... send the next segment */
|
||||
if( frag->frag_bytes_processed != frag->frag_send.frag_base.frag_size ) {
|
||||
/* If there is still something pending ... */
|
||||
mca_ptl_gm_send_next_long_segment( frag, GM_PTL_SEND_MESSAGE | GM_PTL_REGISTER_MEMORY );
|
||||
}
|
||||
mca_ptl_gm_sender_advance_pipeline( frag );
|
||||
|
||||
if( 0 != hdr->hdr_ack.hdr_dst_size ) {
|
||||
if( length > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
frag->frag_bytes_validated += length;
|
||||
if( frag->frag_send.frag_base.frag_size == frag->frag_bytes_validated ) {
|
||||
/* mark the request as done before deregistering the memory */
|
||||
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
|
||||
frag->frag_send.frag_request,
|
||||
frag->frag_bytes_validated );
|
||||
OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t*)frag );
|
||||
}
|
||||
status = gm_deregister_memory( ptl->gm_port, memory_to_deregister, length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Unable to deregister sender GM memory at %p length %d bytes\n",
|
||||
memory_to_deregister, length );
|
||||
}
|
||||
if( frag->frag_send.frag_base.frag_size == frag->frag_bytes_validated ) {
|
||||
/* mark the request as done before deregistering the memory */
|
||||
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
|
||||
frag->frag_send.frag_request,
|
||||
frag->frag_bytes_validated );
|
||||
OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t*)frag );
|
||||
}
|
||||
|
||||
return NULL;
|
||||
@ -725,18 +821,21 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even
|
||||
mca_ptl_gm_recv_frag_t * frag;
|
||||
mca_ptl_base_header_t *header = NULL, *release_buf;
|
||||
frag_management_fct_t* function;
|
||||
uint32_t priority = GM_HIGH_PRIORITY;
|
||||
|
||||
release_buf = (mca_ptl_base_header_t*)gm_ntohp(event->recv.buffer);
|
||||
|
||||
switch (gm_ntohc(event->recv.type)) {
|
||||
case GM_FAST_RECV_EVENT:
|
||||
case GM_FAST_PEER_RECV_EVENT:
|
||||
priority = GM_LOW_PRIORITY;
|
||||
case GM_FAST_HIGH_RECV_EVENT:
|
||||
case GM_FAST_HIGH_PEER_RECV_EVENT:
|
||||
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
|
||||
goto have_event;
|
||||
case GM_RECV_EVENT:
|
||||
case GM_PEER_RECV_EVENT:
|
||||
priority = GM_LOW_PRIORITY;
|
||||
case GM_HIGH_RECV_EVENT:
|
||||
case GM_HIGH_PEER_RECV_EVENT:
|
||||
header = release_buf;
|
||||
@ -762,7 +861,60 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even
|
||||
frag = function( ptl, header );
|
||||
}
|
||||
}
|
||||
gm_provide_receive_buffer( ptl->gm_port, release_buf, GM_SIZE, GM_LOW_PRIORITY );
|
||||
gm_provide_receive_buffer( ptl->gm_port, release_buf, GM_SIZE, priority );
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mca_ptl_gm_dump_header( char* str, mca_ptl_base_header_t* hdr )
|
||||
{
|
||||
switch( hdr->hdr_common.hdr_type ) {
|
||||
case MCA_PTL_HDR_TYPE_MATCH:
|
||||
goto print_match_hdr;
|
||||
case MCA_PTL_HDR_TYPE_RNDV:
|
||||
goto print_rndv_hdr;
|
||||
case MCA_PTL_HDR_TYPE_FRAG:
|
||||
goto print_frag_hdr;
|
||||
case MCA_PTL_HDR_TYPE_ACK:
|
||||
goto print_ack_hdr;
|
||||
case MCA_PTL_HDR_TYPE_NACK:
|
||||
goto print_ack_hdr;
|
||||
case MCA_PTL_HDR_TYPE_GET:
|
||||
goto print_match_hdr;
|
||||
case MCA_PTL_HDR_TYPE_FIN:
|
||||
goto print_ack_hdr;
|
||||
case MCA_PTL_HDR_TYPE_FIN_ACK:
|
||||
goto print_match_hdr;
|
||||
default:
|
||||
ompi_output( 0, "unknown header of type %d\n", hdr->hdr_common.hdr_type );
|
||||
}
|
||||
return;
|
||||
|
||||
print_ack_hdr:
|
||||
ompi_output( 0, "%s hdr_common hdr_type %d hdr_flags %x\nack header hdr_src_ptr (lval %lld, pval %p)\n hdr_dst_match (lval %lld pval %p)\n hdr_dst_addr (lval %lld pval %p)\n hdr_dst_size %lld\n",
|
||||
str, hdr->hdr_common.hdr_type, hdr->hdr_common.hdr_flags,
|
||||
hdr->hdr_ack.hdr_src_ptr.lval, hdr->hdr_ack.hdr_src_ptr.pval,
|
||||
hdr->hdr_ack.hdr_dst_match.lval, hdr->hdr_ack.hdr_dst_match.pval,
|
||||
hdr->hdr_ack.hdr_dst_addr.lval, hdr->hdr_ack.hdr_dst_addr.pval, hdr->hdr_ack.hdr_dst_size );
|
||||
return;
|
||||
print_frag_hdr:
|
||||
ompi_output( 0, "%s hdr_common hdr_type %d hdr_flags %x\nfrag header hdr_frag_length %lld hdr_frag_offset %lld\n hdr_src_ptr (lval %lld, pval %p)\n hdr_dst_ptr (lval %lld, pval %p)\n",
|
||||
str, hdr->hdr_common.hdr_type, hdr->hdr_common.hdr_flags,
|
||||
hdr->hdr_frag.hdr_frag_length, hdr->hdr_frag.hdr_frag_offset, hdr->hdr_frag.hdr_src_ptr.lval,
|
||||
hdr->hdr_frag.hdr_src_ptr.pval, hdr->hdr_frag.hdr_dst_ptr.lval, hdr->hdr_frag.hdr_dst_ptr.pval );
|
||||
return;
|
||||
print_match_hdr:
|
||||
ompi_output( 0, "%s hdr_common hdr_type %d hdr_flags %x\nmatch header hdr_contextid %d hdr_src %d hdr_dst %d hdr_tag %d\n hdr_msg_length %lld hdr_msg_seq %d\n",
|
||||
str, hdr->hdr_common.hdr_type, hdr->hdr_common.hdr_flags,
|
||||
hdr->hdr_match.hdr_contextid, hdr->hdr_match.hdr_src, hdr->hdr_match.hdr_dst,
|
||||
hdr->hdr_match.hdr_tag, hdr->hdr_match.hdr_msg_length, hdr->hdr_match.hdr_msg_seq );
|
||||
return;
|
||||
print_rndv_hdr:
|
||||
ompi_output( 0, "%s hdr_common hdr_type %d hdr_flags %x\nrndv header hdr_contextid %d hdr_src %d hdr_dst %d hdr_tag %d\n hdr_msg_length %lld hdr_msg_seq %d\n hdr_frag_length %lld hdr_src_ptr (lval %lld, pval %p)\n",
|
||||
str, hdr->hdr_common.hdr_type, hdr->hdr_common.hdr_flags,
|
||||
hdr->hdr_rndv.hdr_match.hdr_contextid, hdr->hdr_rndv.hdr_match.hdr_src,
|
||||
hdr->hdr_rndv.hdr_match.hdr_dst, hdr->hdr_rndv.hdr_match.hdr_tag,
|
||||
hdr->hdr_rndv.hdr_match.hdr_msg_length, hdr->hdr_rndv.hdr_match.hdr_msg_seq,
|
||||
hdr->hdr_rndv.hdr_frag_length, hdr->hdr_rndv.hdr_src_ptr.lval, hdr->hdr_rndv.hdr_src_ptr.pval);
|
||||
return;
|
||||
}
|
||||
|
@ -64,33 +64,28 @@ mca_ptl_gm_alloc_send_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
struct mca_pml_base_send_request_t* sendreq )
|
||||
{
|
||||
ompi_list_item_t* item;
|
||||
mca_ptl_gm_send_frag_t* sendfrag;
|
||||
mca_ptl_gm_send_frag_t* frag;
|
||||
int32_t rc;
|
||||
|
||||
/* first get a gm_send_frag */
|
||||
OMPI_FREE_LIST_GET( &(ptl->gm_send_frags), item, rc );
|
||||
sendfrag = (mca_ptl_gm_send_frag_t*)item;
|
||||
frag = (mca_ptl_gm_send_frag_t*)item;
|
||||
/* And then get some DMA memory to put the data */
|
||||
OMPI_FREE_LIST_WAIT( &(ptl->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(ptl->num_send_tokens), 1 );
|
||||
assert( ptl->num_send_tokens >= 0 );
|
||||
sendfrag->send_buf = (void*)item;
|
||||
frag->send_buf = (void*)item;
|
||||
|
||||
sendfrag->frag_send.frag_request = sendreq;
|
||||
sendfrag->frag_send.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl;
|
||||
sendfrag->frag_send.frag_base.frag_addr = sendreq->req_base.req_addr;
|
||||
sendfrag->frag_bytes_processed = 0;
|
||||
sendfrag->frag_bytes_validated = 0;
|
||||
sendfrag->status = -1;
|
||||
sendfrag->type = PUT;
|
||||
frag->frag_send.frag_request = sendreq;
|
||||
frag->frag_send.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl;
|
||||
frag->frag_send.frag_base.frag_addr = sendreq->req_base.req_addr;
|
||||
frag->frag_bytes_processed = 0;
|
||||
frag->frag_bytes_validated = 0;
|
||||
frag->status = -1;
|
||||
frag->type = PUT;
|
||||
ompi_ptl_gm_init_pipeline( &(frag->pipeline) );
|
||||
|
||||
return sendfrag;
|
||||
}
|
||||
|
||||
int mca_ptl_gm_send_frag_done( mca_ptl_gm_send_frag_t* frag,
|
||||
mca_pml_base_send_request_t* req )
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
return frag;
|
||||
}
|
||||
|
||||
int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t** putfrag,
|
||||
|
@ -30,10 +30,14 @@
|
||||
#define ACK 2
|
||||
#define PUT 3
|
||||
|
||||
#define PTL_GM_PIPELINE_EMPTY 0x01
|
||||
/* depth of the GM internal pipeline */
|
||||
#define GM_PIPELINE_DEPTH 3
|
||||
|
||||
#define PTL_GM_PIPELINE_EMPTY 0x00
|
||||
#define PTL_GM_PIPELINE_DEREGISTER 0x01
|
||||
#define PTL_GM_PIPELINE_REGISTER 0x02
|
||||
#define PTL_GM_PIPELINE_SEND 0x04
|
||||
#define PTL_GM_PIPELINE_DEREGISTER 0x08
|
||||
#define PTL_GM_PIPELINE_REMOTE 0x04
|
||||
#define PTL_GM_PIPELINE_TRANSFERT (PTL_GM_PIPELINE_REGISTER | PTL_GM_PIPELINE_REMOTE)
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
@ -41,13 +45,6 @@ extern "C" {
|
||||
OBJ_CLASS_DECLARATION (mca_ptl_gm_send_frag_t);
|
||||
OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t);
|
||||
|
||||
/* header definition for intermediary fragments on eager p2p communications */
|
||||
struct mca_ptl_gm_eager_header_t {
|
||||
mca_ptl_base_common_header_t hdr_common; /**< common attributes */
|
||||
ompi_ptr_t hdr_src_ptr; /**< pointer to source fragment */
|
||||
};
|
||||
typedef struct mca_ptl_gm_eager_header_t mca_ptl_gm_eager_header_t;
|
||||
|
||||
/* specific header for GM rendezvous protocol. It will be filled up by the sender
|
||||
* and should be ab;e to hold a pointer to the last registered memory location.
|
||||
*/
|
||||
@ -57,14 +54,23 @@ extern "C" {
|
||||
};
|
||||
typedef struct mca_ptl_gm_frag_header_t mca_ptl_gm_frag_header_t;
|
||||
|
||||
struct mca_ptl_gm_pipeline_info_t {
|
||||
uint32_t flags;
|
||||
uint32_t hdr_flags;
|
||||
struct mca_ptl_gm_pipeline_line_t {
|
||||
uint16_t flags;
|
||||
uint16_t hdr_flags;
|
||||
uint32_t length;
|
||||
uint64_t offset;
|
||||
uint64_t length;
|
||||
ompi_ptr_t local_memory;
|
||||
ompi_ptr_t remote_memory;
|
||||
};
|
||||
typedef struct mca_ptl_gm_pipeline_line_t mca_ptl_gm_pipeline_line_t;
|
||||
|
||||
struct mca_ptl_gm_pipeline_info_t {
|
||||
mca_ptl_gm_pipeline_line_t lines[GM_PIPELINE_DEPTH];
|
||||
uint32_t pos_register;
|
||||
uint32_t pos_remote;
|
||||
uint32_t pos_deregister;
|
||||
uint32_t pos_transfert;
|
||||
};
|
||||
typedef struct mca_ptl_gm_pipeline_info_t mca_ptl_gm_pipeline_info_t;
|
||||
|
||||
struct mca_ptl_gm_peer_t;
|
||||
@ -80,8 +86,8 @@ extern "C" {
|
||||
uint64_t frag_bytes_processed; /**< data sended so far */
|
||||
uint64_t frag_bytes_validated; /**< amount of data for which we receive an ack */
|
||||
uint64_t frag_offset; /**< initial offset of the fragment as specified by the upper level */
|
||||
mca_ptl_gm_pipeline_info_t pipeline[3]; /**< storing the information about the status of the pipeline
|
||||
* for long messages. */
|
||||
mca_ptl_gm_pipeline_info_t pipeline; /**< storing the information about the status
|
||||
* of the pipeline for long messages. */
|
||||
int status;
|
||||
uint32_t type;
|
||||
};
|
||||
@ -92,8 +98,8 @@ extern "C" {
|
||||
uint64_t frag_bytes_processed;
|
||||
uint64_t frag_bytes_validated; /**< amount of data for which we receive an ack */
|
||||
uint64_t frag_offset;
|
||||
mca_ptl_gm_pipeline_info_t pipeline[3]; /**< storing the information about the status of the pipeline
|
||||
* for long messages. */
|
||||
mca_ptl_gm_pipeline_info_t pipeline; /**< storing the information about the status of
|
||||
* the pipeline for long messages. */
|
||||
uint32_t type;
|
||||
bool matched;
|
||||
bool have_allocated_buffer;
|
||||
@ -169,8 +175,17 @@ extern "C" {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int mca_ptl_gm_send_frag_done( struct mca_ptl_gm_send_frag_t* frag,
|
||||
struct mca_pml_base_send_request_t* req);
|
||||
static inline void ompi_ptl_gm_init_pipeline( mca_ptl_gm_pipeline_info_t* pipeline )
|
||||
{
|
||||
int i;
|
||||
|
||||
pipeline->pos_register = 0;
|
||||
pipeline->pos_remote = 0;
|
||||
pipeline->pos_deregister = 0;
|
||||
pipeline->pos_transfert = 0;
|
||||
for( i = 0; i < GM_PIPELINE_DEPTH; i++ )
|
||||
pipeline->lines[i].flags = 0;
|
||||
}
|
||||
|
||||
static inline mca_ptl_gm_recv_frag_t*
|
||||
mca_ptl_gm_alloc_recv_frag( struct mca_ptl_base_module_t *ptl )
|
||||
@ -183,9 +198,10 @@ extern "C" {
|
||||
|
||||
frag = (mca_ptl_gm_recv_frag_t*)item;
|
||||
frag->frag_recv.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl;
|
||||
frag->frag_bytes_processed = 0;
|
||||
frag->frag_bytes_validated = 0;
|
||||
frag->frag_offset = 0;
|
||||
frag->frag_bytes_processed = 0;
|
||||
frag->frag_bytes_validated = 0;
|
||||
frag->frag_offset = 0;
|
||||
ompi_ptl_gm_init_pipeline( &(frag->pipeline) );
|
||||
return frag;
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user