Add more communication protocol. Transform the gm_registerPmemory on something more GM version unaware.
This commit was SVN r5238.
Этот коммит содержится в:
родитель
3a1438b838
Коммит
fad48b5fcc
@ -42,27 +42,24 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta
|
||||
|
||||
switch( status ) {
|
||||
case GM_SUCCESS:
|
||||
if( frag->frag_send.frag_base.frag_size <= mca_ptl_gm_component.gm_eager_limit ) {
|
||||
/* small message */
|
||||
frag->frag_bytes_validated += header->hdr_frag.hdr_frag_length;
|
||||
}
|
||||
/*frag->frag_bytes_validated += header->hdr_frag.hdr_frag_length;*/
|
||||
|
||||
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) );
|
||||
/* release the send token */
|
||||
ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 );
|
||||
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) );
|
||||
/* release the send token */
|
||||
ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 );
|
||||
|
||||
if( frag->frag_bytes_validated >= frag->frag_send.frag_base.frag_size ) {
|
||||
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) );
|
||||
}
|
||||
break;
|
||||
if( frag->frag_bytes_validated >= frag->frag_send.frag_base.frag_size ) {
|
||||
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) );
|
||||
}
|
||||
break;
|
||||
case GM_SEND_TIMED_OUT:
|
||||
ompi_output( 0, "send_continue timed out\n" );
|
||||
break;
|
||||
ompi_output( 0, "send_continue timed out\n" );
|
||||
break;
|
||||
case GM_SEND_DROPPED:
|
||||
ompi_output( 0, "send_continue dropped\n" );
|
||||
break;
|
||||
ompi_output( 0, "send_continue dropped\n" );
|
||||
break;
|
||||
default:
|
||||
ompi_output( 0, "send_continue other error %d\n", status );
|
||||
ompi_output( 0, "send_continue other error %d\n", status );
|
||||
}
|
||||
}
|
||||
|
||||
@ -107,30 +104,30 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only
|
||||
get_line->flags ^= PTL_GM_PIPELINE_REMOTE;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start get %lld (%d)", get_line->length, frag->pipeline.pos_transfert ); );
|
||||
frag->pipeline.pos_transfert = (frag->pipeline.pos_transfert + 1) % GM_PIPELINE_DEPTH;
|
||||
} else if( 1 == onlyifget ) return OMPI_SUCCESS;
|
||||
} else if( 1 == onlyifget ) goto check_completion_status;
|
||||
|
||||
/* register the next segment */
|
||||
reg_line = &(frag->pipeline.lines[frag->pipeline.pos_register]);
|
||||
length = frag->frag_recv.frag_base.frag_size - frag->frag_bytes_processed;
|
||||
if( (0 != length) && !(reg_line->flags & PTL_GM_PIPELINE_REGISTER) ) {
|
||||
reg_line->hdr_flags = get_line->hdr_flags;
|
||||
reg_line->offset = get_line->offset + get_line->length;
|
||||
reg_line->length = length;
|
||||
if( reg_line->length > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
reg_line->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
reg_line->offset = get_line->offset + get_line->length;
|
||||
reg_line->local_memory.lval = 0L;
|
||||
reg_line->local_memory.pval = (char*)frag->frag_recv.frag_base.frag_addr +
|
||||
reg_line->offset;
|
||||
status = gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval,
|
||||
reg_line->length );
|
||||
status = mca_ptl_gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval,
|
||||
reg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
reg_line->local_memory.pval,
|
||||
reg_line->length, reg_line->offset );
|
||||
reg_line->local_memory.pval, reg_line->length, reg_line->offset );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld(%d)",
|
||||
reg_line->length, reg_line->offset, frag->pipeline.pos_register ); )
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld processed %ld(%d)",
|
||||
reg_line->length, reg_line->offset, frag->frag_bytes_processed,
|
||||
frag->pipeline.pos_register ); );
|
||||
reg_line->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += reg_line->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
@ -139,9 +136,8 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only
|
||||
/* deregister the previous one */
|
||||
dereg_line = &(frag->pipeline.lines[frag->pipeline.pos_deregister]);
|
||||
if( dereg_line->flags & PTL_GM_PIPELINE_DEREGISTER ) { /* something usefull */
|
||||
status = gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
dereg_line->local_memory.pval,
|
||||
dereg_line->length );
|
||||
status = mca_ptl_gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
dereg_line->local_memory.pval, dereg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "unpinning receiver memory from get (%p, %u) failed \n",
|
||||
dereg_line->local_memory.pval,
|
||||
@ -154,7 +150,7 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only
|
||||
dereg_line->offset, frag->pipeline.pos_deregister ); )
|
||||
frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH;
|
||||
}
|
||||
|
||||
check_completion_status:
|
||||
if( frag->frag_recv.frag_base.frag_size <= frag->frag_bytes_validated ) {
|
||||
peer->peer_ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)peer->peer_ptl,
|
||||
frag->frag_recv.frag_request, frag->frag_recv.frag_base.frag_size,
|
||||
@ -173,7 +169,7 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
gm_status_t status;
|
||||
mca_ptl_gm_pipeline_line_t *send_line, *reg_line, *dereg_line;
|
||||
mca_ptl_gm_frag_header_t* hdr;
|
||||
DO_DEBUG( int count = 0; char buffer[128]; )
|
||||
DO_DEBUG( int count = 0; char buffer[256]; )
|
||||
|
||||
peer = (mca_ptl_gm_peer_t*)frag->frag_send.frag_base.frag_peer;
|
||||
DO_DEBUG( count = sprintf( buffer, "sender %p", (void*)frag ); )
|
||||
@ -188,7 +184,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
|
||||
hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag.hdr_common.hdr_flags = send_line->hdr_flags | frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags;
|
||||
hdr->hdr_frag.hdr_common.hdr_flags = send_line->hdr_flags |
|
||||
frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = frag;
|
||||
hdr->hdr_frag.hdr_dst_ptr = frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match;
|
||||
@ -196,7 +193,7 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
hdr->hdr_frag.hdr_frag_length = send_line->length;
|
||||
hdr->registered_memory = send_line->local_memory;
|
||||
|
||||
gm_send_with_callback( peer->peer_ptl->gm_port, hdr,
|
||||
gm_send_with_callback( peer->peer_ptl->gm_port, hdr,
|
||||
GM_SIZE, sizeof(mca_ptl_gm_frag_header_t),
|
||||
GM_HIGH_PRIORITY, peer->local_id, peer->port_number,
|
||||
send_continue_callback, (void*)hdr );
|
||||
@ -209,8 +206,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
/* deregister previous segment */
|
||||
dereg_line = &(frag->pipeline.lines[frag->pipeline.pos_deregister]);
|
||||
if( dereg_line->flags & PTL_GM_PIPELINE_DEREGISTER ) { /* something usefull */
|
||||
status = gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
dereg_line->local_memory.pval, dereg_line->length );
|
||||
status = mca_ptl_gm_deregister_memory( peer->peer_ptl->gm_port,
|
||||
dereg_line->local_memory.pval, dereg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "unpinning receiver memory from get (%p, %u) failed \n",
|
||||
dereg_line->local_memory.pval, dereg_line->length );
|
||||
@ -219,8 +216,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
assert( dereg_line->flags == 0 );
|
||||
frag->frag_bytes_validated += dereg_line->length;
|
||||
frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld offset %ld",
|
||||
dereg_line->length, dereg_line->offset ); )
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld offset %lld (validated %lld)",
|
||||
dereg_line->length, dereg_line->offset, frag->frag_bytes_validated ); )
|
||||
}
|
||||
|
||||
/* register next segment */
|
||||
@ -238,8 +235,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
reg_line->local_memory.lval = 0L;
|
||||
reg_line->local_memory.pval = (char*)frag->frag_send.frag_base.frag_addr +
|
||||
reg_line->offset;
|
||||
status = gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval,
|
||||
reg_line->length );
|
||||
status = mca_ptl_gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval,
|
||||
reg_line->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Cannot register sender memory (%p, %ld) bytes offset %ld\n",
|
||||
reg_line->local_memory.pval, reg_line->length, reg_line->offset );
|
||||
@ -248,7 +245,7 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
reg_line->flags |= PTL_GM_PIPELINE_TRANSFERT;
|
||||
frag->frag_bytes_processed += reg_line->length;
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld",
|
||||
DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %lld",
|
||||
reg_line->length, reg_line->offset ); )
|
||||
}
|
||||
}
|
||||
@ -257,6 +254,107 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag )
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline
|
||||
int mca_ptl_gm_send_internal_rndv_header( mca_ptl_gm_peer_t *ptl_peer,
|
||||
mca_ptl_gm_send_frag_t *fragment,
|
||||
mca_ptl_gm_frag_header_t* hdr,
|
||||
int flags )
|
||||
{
|
||||
struct iovec iov;
|
||||
uint32_t max_data, in_size;
|
||||
int32_t freeAfter;
|
||||
ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor);
|
||||
|
||||
iov.iov_base = (char*)hdr + sizeof(mca_ptl_gm_frag_header_t);
|
||||
iov.iov_len = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed;
|
||||
if( iov.iov_len > (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_gm_frag_header_t)) )
|
||||
iov.iov_len = (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_gm_frag_header_t));
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
|
||||
if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0)
|
||||
return OMPI_ERROR;
|
||||
|
||||
hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag.hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = fragment;
|
||||
hdr->hdr_frag.hdr_dst_ptr = fragment->frag_send.frag_request->req_peer_match;
|
||||
hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed;
|
||||
hdr->hdr_frag.hdr_frag_length = fragment->frag_send.frag_base.frag_size -
|
||||
fragment->frag_bytes_processed;
|
||||
hdr->registered_memory.lval = 0L;
|
||||
hdr->registered_memory.pval = NULL;
|
||||
|
||||
DO_DEBUG( ompi_output( 0, "sender before send internal rndv header hdr_offset %lld hdr_length %lld max_data %u",
|
||||
hdr->hdr_frag.hdr_frag_offset, hdr->hdr_frag.hdr_frag_length, max_data ); );
|
||||
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE,
|
||||
sizeof(mca_ptl_gm_frag_header_t) + max_data,
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number,
|
||||
mca_ptl_gm_basic_frag_callback, (void *)hdr );
|
||||
fragment->frag_bytes_processed += max_data;
|
||||
fragment->frag_bytes_validated += max_data;
|
||||
DO_DEBUG( ompi_output( 0, "sender after send internal rndv header processed %lld, validated %lld max_data %u",
|
||||
fragment->frag_bytes_processed, fragment->frag_bytes_validated, max_data ); );
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline
|
||||
int mca_ptl_gm_send_burst_data( mca_ptl_gm_peer_t *ptl_peer,
|
||||
mca_ptl_gm_send_frag_t *fragment,
|
||||
uint32_t burst_length,
|
||||
mca_ptl_base_frag_header_t* hdr,
|
||||
int32_t flags )
|
||||
{
|
||||
int32_t freeAfter, rc;
|
||||
uint32_t max_data, in_size;
|
||||
struct iovec iov;
|
||||
ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor);
|
||||
|
||||
while( 0 < burst_length ) { /* send everything for the burst_length size */
|
||||
if( NULL == hdr ) {
|
||||
ompi_list_item_t* item;
|
||||
OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
|
||||
hdr = (mca_ptl_base_frag_header_t*)item;
|
||||
}
|
||||
iov.iov_base = (char*)hdr + sizeof(mca_ptl_base_frag_header_t);
|
||||
iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t);
|
||||
if( iov.iov_len >= burst_length )
|
||||
iov.iov_len = burst_length;
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
|
||||
if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0)
|
||||
return OMPI_ERROR;
|
||||
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_src_ptr.pval = fragment;
|
||||
hdr->hdr_dst_ptr = fragment->frag_send.frag_request->req_peer_match;
|
||||
hdr->hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed;
|
||||
hdr->hdr_frag_length = iov.iov_len;
|
||||
|
||||
fragment->frag_bytes_processed += max_data;
|
||||
fragment->frag_bytes_validated += max_data;
|
||||
burst_length -= iov.iov_len;
|
||||
if( fragment->frag_send.frag_base.frag_size == fragment->frag_bytes_processed ) {
|
||||
assert( burst_length == 0 );
|
||||
hdr->hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
|
||||
}
|
||||
/* for the last piece set the header type to FIN */
|
||||
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE,
|
||||
iov.iov_len + sizeof(mca_ptl_base_frag_header_t),
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number,
|
||||
send_continue_callback, (void*)hdr );
|
||||
hdr = NULL; /* force to retrieve a new one on the next loop */
|
||||
}
|
||||
DO_DEBUG( ompi_output( 0, "sender after burst offset %lld, processed %lld, validated %lld\n",
|
||||
fragment->frag_offset, fragment->frag_bytes_processed, fragment->frag_bytes_validated); );
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
mca_ptl_gm_send_frag_t *fragment,
|
||||
struct mca_pml_base_send_request_t *sendreq,
|
||||
@ -270,67 +368,37 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
int rc = 0;
|
||||
gm_status_t status;
|
||||
mca_ptl_gm_pipeline_line_t* pipeline;
|
||||
ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor);
|
||||
int32_t freeAfter;
|
||||
uint32_t max_data, in_size;
|
||||
struct iovec iov;
|
||||
|
||||
fragment->frag_offset = offset;
|
||||
|
||||
/* must update the offset after actual fragment size is determined
|
||||
* before attempting to send the fragment
|
||||
*/
|
||||
mca_pml_base_send_request_offset( sendreq,
|
||||
mca_pml_base_send_request_offset( fragment->frag_send.frag_request,
|
||||
fragment->frag_send.frag_base.frag_size );
|
||||
DO_DEBUG( ompi_output( 0, "sender start new send length %ld\n", *size ); )
|
||||
DO_DEBUG( ompi_output( 0, "sender start new send length %ld offset %ld\n", *size, offset ); )
|
||||
/* The first DMA memory buffer has been alocated in same time as the fragment */
|
||||
item = (ompi_list_item_t*)fragment->send_buf;
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed;
|
||||
if( remaining_bytes > mca_ptl_gm_component.gm_eager_limit ) {
|
||||
burst_length = remaining_bytes % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
} else {
|
||||
burst_length = remaining_bytes;
|
||||
}
|
||||
|
||||
while( 0 < burst_length ) { /* send everything for the burst_length size */
|
||||
if( NULL == item ) {
|
||||
OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
}
|
||||
iov.iov_base = (char*)item + sizeof(mca_ptl_base_frag_header_t);
|
||||
iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t);
|
||||
if( iov.iov_len >= burst_length )
|
||||
iov.iov_len = burst_length;
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
|
||||
if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0)
|
||||
return OMPI_ERROR;
|
||||
|
||||
hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag.hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = fragment;
|
||||
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
|
||||
hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed;
|
||||
hdr->hdr_frag.hdr_frag_length = iov.iov_len;
|
||||
|
||||
fragment->frag_bytes_processed += iov.iov_len;
|
||||
fragment->frag_bytes_validated += max_data;
|
||||
burst_length -= iov.iov_len;
|
||||
if( (burst_length == 0) && (remaining_bytes < mca_ptl_gm_component.gm_eager_limit) )
|
||||
hdr->hdr_frag.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
|
||||
|
||||
/* for the last piece set the header type to FIN */
|
||||
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE,
|
||||
iov.iov_len + sizeof(mca_ptl_base_frag_header_t),
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number,
|
||||
send_continue_callback, (void*)hdr );
|
||||
item = NULL; /* force to retrieve a new one on the next loop */
|
||||
}
|
||||
if( remaining_bytes < mca_ptl_gm_component.gm_eager_limit ) {
|
||||
burst_length = remaining_bytes;
|
||||
} else if( remaining_bytes < mca_ptl_gm_component.gm_rndv_burst_limit ) {
|
||||
burst_length = remaining_bytes % (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t));
|
||||
} else {
|
||||
if( mca_ptl_gm_component.gm_rdma_frag_size == UINT_MAX )
|
||||
burst_length = 0;
|
||||
else
|
||||
burst_length = remaining_bytes % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
}
|
||||
|
||||
if( burst_length > 0 ) {
|
||||
mca_ptl_gm_send_burst_data( ptl_peer, fragment, burst_length, &(hdr->hdr_frag), flags );
|
||||
item = NULL; /* this buffer was already used by the mca_ptl_gm_send_burst_data function */
|
||||
DO_DEBUG( ompi_output( 0, "sender burst %ld bytes", burst_length ); );
|
||||
}
|
||||
|
||||
if( fragment->frag_send.frag_base.frag_size == fragment->frag_bytes_processed ) {
|
||||
*size = fragment->frag_bytes_processed;
|
||||
if( !(flags & MCA_PTL_FLAGS_ACK) ) {
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
@ -344,66 +412,31 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
|
||||
hdr = (mca_ptl_gm_frag_header_t*)item;
|
||||
}
|
||||
/* recompute the remaining length */
|
||||
remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed;
|
||||
|
||||
pipeline = &(fragment->pipeline.lines[0]);
|
||||
/* Large set of data => we have to setup a rendez-vous protocol. Here we can
|
||||
* use the match header already filled in by the upper level and just complete it
|
||||
* with the others informations. When we reach this point the rendez-vous protocol
|
||||
* has already been realized so we know that the receiver expect our message.
|
||||
*/
|
||||
#if 0
|
||||
iov.iov_base = (char*)item + sizeof(mca_ptl_gm_frag_header_t);
|
||||
iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_gm_frag_header_t);
|
||||
if( iov.iov_len >= remaining_bytes )
|
||||
iov.iov_len = remaining_bytes;
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
if( remaining_bytes > mca_ptl_gm_component.gm_rndv_burst_limit )
|
||||
flags |= PTL_FLAG_GM_REQUIRE_LOCK;
|
||||
mca_ptl_gm_send_internal_rndv_header( ptl_peer, fragment, hdr, flags );
|
||||
if( !(PTL_FLAG_GM_REQUIRE_LOCK & flags) )
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0)
|
||||
return OMPI_ERROR;
|
||||
fragment->frag_bytes_processed += max_data;
|
||||
fragment->frag_bytes_validated += max_data;
|
||||
#endif
|
||||
|
||||
hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag.hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = fragment;
|
||||
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
|
||||
hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed;
|
||||
hdr->hdr_frag.hdr_frag_length = *size - fragment->frag_bytes_processed;
|
||||
hdr->registered_memory.lval = 0L;
|
||||
hdr->registered_memory.pval = NULL;
|
||||
|
||||
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE,
|
||||
sizeof(mca_ptl_gm_frag_header_t) /*+ max_data*/,
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number,
|
||||
mca_ptl_gm_basic_frag_callback, (void *)hdr );
|
||||
|
||||
#if 0
|
||||
pipeline->length = fragment->frag_send.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
if( pipeline->length < (mca_ptl_gm_component.gm_rdma_frag_size >> 1) ) {
|
||||
if( 0 == pipeline->length )
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
else
|
||||
if( fragment->frag_send.frag_base.frag_size > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1);
|
||||
}
|
||||
#endif
|
||||
pipeline = &(fragment->pipeline.lines[0]);
|
||||
pipeline->length = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed;
|
||||
if( fragment->frag_send.frag_base.frag_size > mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
if( pipeline->length > mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
}
|
||||
pipeline->offset = fragment->frag_offset + fragment->frag_bytes_processed;
|
||||
pipeline->hdr_flags = fragment->frag_send.frag_base.frag_header.hdr_common.hdr_flags;
|
||||
pipeline->local_memory.lval = 0L;
|
||||
pipeline->local_memory.pval = (char*)fragment->frag_send.frag_base.frag_addr + pipeline->offset;
|
||||
status = gm_register_memory( ptl_peer->peer_ptl->gm_port, pipeline->local_memory.pval,
|
||||
pipeline->length );
|
||||
status = mca_ptl_gm_register_memory( ptl_peer->peer_ptl->gm_port, pipeline->local_memory.pval,
|
||||
pipeline->length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
ompi_output( 0, "Cannot register sender memory (%p, %ld) bytes offset %ld\n",
|
||||
pipeline->local_memory.pval, pipeline->length, pipeline->offset );
|
||||
}
|
||||
pipeline->flags = PTL_GM_PIPELINE_TRANSFERT;
|
||||
@ -594,9 +627,6 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
|
||||
/* get some memory and copy the data inside. We can then release the receive buffer */
|
||||
if( 0 != length ) {
|
||||
char* ptr = (char*)gm_get_local_buffer();
|
||||
if (NULL == ptr) {
|
||||
ompi_output(0, "[%s:%d] error in allocating memory \n", __FILE__, __LINE__);
|
||||
}
|
||||
recv_frag->have_allocated_buffer = true;
|
||||
memcpy( ptr, recv_frag->frag_recv.frag_base.frag_addr, length );
|
||||
recv_frag->frag_recv.frag_base.frag_addr = ptr;
|
||||
@ -636,7 +666,7 @@ static int mca_ptl_gm_send_quick_fin_message( struct mca_ptl_gm_peer_t* ptl_peer
|
||||
hdr = (mca_ptl_base_header_t*)item;
|
||||
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
|
||||
hdr->hdr_common.hdr_flags = PTL_FLAG_GM_HAS_FRAGMENT;
|
||||
hdr->hdr_common.hdr_flags = PTL_FLAG_GM_HAS_FRAGMENT | frag->frag_header.hdr_common.hdr_flags;
|
||||
hdr->hdr_ack.hdr_src_ptr.pval = frag->frag_header.hdr_frag.hdr_src_ptr.pval;
|
||||
hdr->hdr_ack.hdr_dst_match.lval = 0;
|
||||
hdr->hdr_ack.hdr_dst_match.pval = frag;
|
||||
@ -686,26 +716,33 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st
|
||||
|
||||
static mca_ptl_gm_recv_frag_t*
|
||||
mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
mca_ptl_gm_frag_header_t* hdr, uint32_t msg_len )
|
||||
mca_ptl_gm_frag_header_t* hdr, uint32_t msg_len )
|
||||
{
|
||||
mca_pml_base_recv_request_t *request;
|
||||
ompi_convertor_t local_convertor, *convertor;
|
||||
struct iovec iov;
|
||||
uint32_t iov_count, max_data;
|
||||
uint32_t iov_count, max_data = 0, header_length;
|
||||
int32_t freeAfter, rc;
|
||||
mca_ptl_gm_recv_frag_t* frag;
|
||||
mca_ptl_gm_pipeline_line_t* pipeline;
|
||||
|
||||
header_length = sizeof(mca_ptl_base_frag_header_t);
|
||||
if( hdr->hdr_frag.hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) {
|
||||
frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag;
|
||||
request = (mca_pml_base_recv_request_t*)frag->frag_recv.frag_request;
|
||||
/* here we can have a synchronisation problem if several threads work in same time
|
||||
* with the same request. The only question is if it's possible ?
|
||||
*/
|
||||
convertor = &(frag->frag_recv.frag_base.frag_convertor);
|
||||
DO_DEBUG( ompi_output( 0, "receiver get message tagged as HAS_FRAGMENT" ); );
|
||||
if( PTL_FLAG_GM_REQUIRE_LOCK & hdr->hdr_frag.hdr_common.hdr_flags )
|
||||
header_length = sizeof(mca_ptl_gm_frag_header_t);
|
||||
} else {
|
||||
request = (mca_pml_base_recv_request_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
|
||||
|
||||
if( hdr->hdr_frag.hdr_frag_length <= (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t)) ) {
|
||||
if( hdr->hdr_frag.hdr_frag_length <= (mca_ptl_gm_component.gm_segment_size -
|
||||
sizeof(mca_ptl_base_frag_header_t)) ) {
|
||||
ompi_proc_t* proc = ompi_comm_peer_lookup( request->req_base.req_comm,
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE );
|
||||
convertor = &local_convertor;
|
||||
@ -720,10 +757,19 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
frag->frag_recv.frag_base.frag_addr = frag->frag_recv.frag_request->req_base.req_addr;
|
||||
frag->frag_recv.frag_base.frag_size = hdr->hdr_frag.hdr_frag_length;
|
||||
frag->frag_recv.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)
|
||||
mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm,
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE,
|
||||
(struct mca_ptl_base_module_t*)ptl );
|
||||
mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm,
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE,
|
||||
(struct mca_ptl_base_module_t*)ptl );
|
||||
/* send an ack message to the sender ... quick hack (TODO) */
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag;
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = 0;
|
||||
mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer,
|
||||
&(frag->frag_recv.frag_base) );
|
||||
header_length = sizeof(mca_ptl_gm_frag_header_t);
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length;
|
||||
convertor = &(frag->frag_recv.frag_base.frag_convertor);
|
||||
DO_DEBUG( ompi_output( 0, "receiver create fragment with offset %lld and length %lld",
|
||||
frag->frag_offset, frag->frag_recv.frag_base.frag_size ); );
|
||||
}
|
||||
ompi_convertor_init_for_recv( convertor, 0,
|
||||
request->req_base.req_datatype,
|
||||
@ -732,85 +778,57 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
hdr->hdr_frag.hdr_frag_offset, NULL );
|
||||
}
|
||||
|
||||
if( NULL == frag ) {
|
||||
iov.iov_base = (char*)hdr + sizeof(mca_ptl_base_frag_header_t);
|
||||
iov.iov_len = hdr->hdr_frag.hdr_frag_length;
|
||||
if( header_length != msg_len ) {
|
||||
iov.iov_base = (char*)hdr + header_length;
|
||||
iov.iov_len = msg_len - header_length;
|
||||
iov_count = 1;
|
||||
max_data = hdr->hdr_frag.hdr_frag_length;
|
||||
max_data = iov.iov_len;
|
||||
freeAfter = 0; /* unused here */
|
||||
rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter );
|
||||
assert( 0 == freeAfter );
|
||||
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data );
|
||||
} else {
|
||||
gm_status_t status;
|
||||
mca_ptl_gm_pipeline_line_t* pipeline;
|
||||
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag;
|
||||
if( NULL == hdr->registered_memory.pval ) { /* first round of the local rendez-vous protocol */
|
||||
/* send an ack message to the sender ... quick hack (TODO) */
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = 0;
|
||||
mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer,
|
||||
&(frag->frag_recv.frag_base) );
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length;
|
||||
|
||||
iov.iov_base = (char*)hdr + sizeof(mca_ptl_gm_frag_header_t);
|
||||
iov.iov_len = msg_len - sizeof(mca_ptl_gm_frag_header_t);
|
||||
iov_count = 1;
|
||||
max_data = iov.iov_len;
|
||||
freeAfter = 0; /* unused here */
|
||||
rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter );
|
||||
assert( 0 == freeAfter );
|
||||
frag->frag_bytes_processed += max_data;
|
||||
frag->frag_bytes_validated += max_data;
|
||||
|
||||
pipeline = &(frag->pipeline.lines[0]);
|
||||
#if 0
|
||||
pipeline->length = frag->frag_recv.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
if( pipeline->length < (mca_ptl_gm_component.gm_rdma_frag_size >> 1) ) {
|
||||
if( 0 == pipeline->length )
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
else
|
||||
if( frag->frag_recv.frag_base.frag_size > mca_ptl_gm_component.gm_rdma_frag_size )
|
||||
pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1);
|
||||
}
|
||||
#endif
|
||||
pipeline->length = frag->frag_recv.frag_base.frag_size - max_data;
|
||||
if( pipeline->length > mca_ptl_gm_component.gm_rdma_frag_size ) {
|
||||
pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
}
|
||||
pipeline->offset = hdr->hdr_frag.hdr_frag_offset + max_data;
|
||||
pipeline->local_memory.lval = 0L;
|
||||
pipeline->local_memory.pval = (char*)request->req_base.req_addr + pipeline->offset;
|
||||
status = gm_register_memory( ptl->gm_port,
|
||||
pipeline->local_memory.pval, pipeline->length );
|
||||
if( status != GM_SUCCESS ) {
|
||||
ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n",
|
||||
(char*)request->req_base.req_addr + pipeline->offset,
|
||||
pipeline->length, hdr->hdr_frag.hdr_frag_offset );
|
||||
return NULL;
|
||||
}
|
||||
pipeline->flags |= PTL_GM_PIPELINE_REGISTER;
|
||||
frag->frag_bytes_processed += pipeline->length;
|
||||
DO_DEBUG( ompi_output( 0, "receiver %p start register %lld (%d) offset %ld\n", frag, pipeline->length, frag->pipeline.pos_register, pipeline->offset ); )
|
||||
frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH;
|
||||
} else {
|
||||
/* There is a kind of rendez-vous protocol used internally by the GM driver. If the amount of data
|
||||
* to transfert is large enough, then the sender will start sending a frag message with the
|
||||
* remote_memory set to NULL (but with the length set to the length of the first fragment).
|
||||
* It will allow the receiver to start to register it's own memory. Later when the receiver
|
||||
* get a fragment with the remote_memory field not NULL it can start getting the data.
|
||||
*/
|
||||
pipeline = &(frag->pipeline.lines[frag->pipeline.pos_remote]);
|
||||
DO_DEBUG( ompi_output( 0, "receiver %p get remote memory length %lld (%d)\n", frag, hdr->hdr_frag.hdr_frag_length, frag->pipeline.pos_remote ); )
|
||||
frag->pipeline.pos_remote = (frag->pipeline.pos_remote + 1) % GM_PIPELINE_DEPTH;
|
||||
assert( (pipeline->flags & PTL_GM_PIPELINE_REMOTE) == 0 );
|
||||
pipeline->remote_memory = hdr->registered_memory;
|
||||
pipeline->flags |= PTL_GM_PIPELINE_REMOTE;
|
||||
/*if( false == ((mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer)->get_started )*/
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag, 1 );
|
||||
/* If we are in a short burst mode then update the request */
|
||||
if( NULL == frag ) {
|
||||
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data );
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* Update the status of the fragment depending on the amount of data converted so far */
|
||||
frag->frag_bytes_processed += max_data;
|
||||
frag->frag_bytes_validated += max_data;
|
||||
if( !(PTL_FLAG_GM_REQUIRE_LOCK & hdr->hdr_frag.hdr_common.hdr_flags) ) {
|
||||
if( frag->frag_bytes_validated == frag->frag_recv.frag_base.frag_size ) {
|
||||
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request,
|
||||
frag->frag_recv.frag_base.frag_size,
|
||||
frag->frag_recv.frag_base.frag_size );
|
||||
OMPI_FREE_LIST_RETURN( &(((mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer)->peer_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag );
|
||||
}
|
||||
DO_DEBUG( ompi_output( 0, "receiver waiting for burst with fragment ..." ); );
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* There is a kind of rendez-vous protocol used internally by the GM driver. If the amount of data
|
||||
* to transfert is large enough, then the sender will start sending a frag message with the
|
||||
* remote_memory set to NULL (but with the length set to the length of the first fragment).
|
||||
* It will allow the receiver to start to register it's own memory. Later when the receiver
|
||||
* get a fragment with the remote_memory field not NULL it can start getting the data.
|
||||
*/
|
||||
if( NULL == hdr->registered_memory.pval ) { /* first round of the local rendez-vous protocol */
|
||||
pipeline = &(frag->pipeline.lines[0]);
|
||||
pipeline->hdr_flags = hdr->hdr_frag.hdr_common.hdr_flags;
|
||||
pipeline->offset = frag->frag_offset + frag->frag_bytes_processed;
|
||||
pipeline->length = 0; /* we can lie about this one */
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag, 0 );
|
||||
} else {
|
||||
pipeline = &(frag->pipeline.lines[frag->pipeline.pos_remote]);
|
||||
DO_DEBUG( ompi_output( 0, "receiver %p get remote memory length %lld (%d)\n", frag, hdr->hdr_frag.hdr_frag_length, frag->pipeline.pos_remote ); );
|
||||
frag->pipeline.pos_remote = (frag->pipeline.pos_remote + 1) % GM_PIPELINE_DEPTH;
|
||||
assert( (pipeline->flags & PTL_GM_PIPELINE_REMOTE) == 0 );
|
||||
pipeline->remote_memory = hdr->registered_memory;
|
||||
pipeline->flags |= PTL_GM_PIPELINE_REMOTE;
|
||||
mca_ptl_gm_receiver_advance_pipeline( frag, 0 );
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -824,20 +842,28 @@ mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t* ptl,
|
||||
|
||||
frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags = hdr->hdr_common.hdr_flags;
|
||||
frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match = hdr->hdr_ack.hdr_dst_match;
|
||||
if( 0 == hdr->hdr_ack.hdr_dst_size ) {
|
||||
DO_DEBUG( ompi_output( 0, "sender %p get FIN message (initial)", frag ); )
|
||||
/* I just receive the ack for the first fragment => setup the pipeline */
|
||||
frag->frag_send.frag_request->req_peer_match = hdr->hdr_ack.hdr_dst_match;
|
||||
if( PTL_FLAG_GM_REQUIRE_LOCK & hdr->hdr_common.hdr_flags ) {
|
||||
if( 0 == hdr->hdr_ack.hdr_dst_size ) {
|
||||
DO_DEBUG( ompi_output( 0, "sender %p get FIN message (initial)", frag ); );
|
||||
/* I just receive the ack for the first fragment => setup the pipeline */
|
||||
mca_ptl_gm_sender_advance_pipeline( frag );
|
||||
} else {
|
||||
/* mark the memory as ready to be deregistered */
|
||||
frag->pipeline.lines[frag->pipeline.pos_deregister].flags |= PTL_GM_PIPELINE_DEREGISTER;
|
||||
DO_DEBUG( ompi_output( 0, "sender %p get FIN message (%d)", frag, frag->pipeline.pos_deregister ); );
|
||||
}
|
||||
/* continue the pipeline ... send the next segment */
|
||||
mca_ptl_gm_sender_advance_pipeline( frag );
|
||||
} else {
|
||||
/* mark the memory as ready to be deregistered */
|
||||
frag->pipeline.lines[frag->pipeline.pos_deregister].flags |= PTL_GM_PIPELINE_DEREGISTER;
|
||||
DO_DEBUG( ompi_output( 0, "sender %p get FIN message (%d)", frag, frag->pipeline.pos_deregister ); )
|
||||
DO_DEBUG( ompi_output( 0, "sender burst data after rendez-vous protocol" ); );
|
||||
/* do a burst but with the remote fragment as we just get it from the message */
|
||||
mca_ptl_gm_send_burst_data( (mca_ptl_gm_peer_t*)frag->frag_send.frag_base.frag_peer, frag,
|
||||
frag->frag_send.frag_base.frag_size - frag->frag_bytes_validated,
|
||||
NULL, hdr->hdr_common.hdr_flags );
|
||||
}
|
||||
/* continue the pipeline ... send the next segment */
|
||||
mca_ptl_gm_sender_advance_pipeline( frag );
|
||||
|
||||
if( frag->frag_send.frag_base.frag_size == frag->frag_bytes_validated ) {
|
||||
/* mark the request as done before deregistering the memory */
|
||||
DO_DEBUG( ompi_output( 0, "sender complete send operation" ); );
|
||||
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
|
||||
frag->frag_send.frag_request,
|
||||
frag->frag_bytes_validated );
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user