diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.c b/src/mca/ptl/gm/src/ptl_gm_priv.c index f32cbbc35e..4cf4a39073 100644 --- a/src/mca/ptl/gm/src/ptl_gm_priv.c +++ b/src/mca/ptl/gm/src/ptl_gm_priv.c @@ -42,27 +42,24 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta switch( status ) { case GM_SUCCESS: - if( frag->frag_send.frag_base.frag_size <= mca_ptl_gm_component.gm_eager_limit ) { - /* small message */ - frag->frag_bytes_validated += header->hdr_frag.hdr_frag_length; - } + /*frag->frag_bytes_validated += header->hdr_frag.hdr_frag_length;*/ - OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) ); - /* release the send token */ - ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 ); + OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) ); + /* release the send token */ + ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 ); - if( frag->frag_bytes_validated >= frag->frag_send.frag_base.frag_size ) { - OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) ); - } - break; + if( frag->frag_bytes_validated >= frag->frag_send.frag_base.frag_size ) { + OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) ); + } + break; case GM_SEND_TIMED_OUT: - ompi_output( 0, "send_continue timed out\n" ); - break; + ompi_output( 0, "send_continue timed out\n" ); + break; case GM_SEND_DROPPED: - ompi_output( 0, "send_continue dropped\n" ); - break; + ompi_output( 0, "send_continue dropped\n" ); + break; default: - ompi_output( 0, "send_continue other error %d\n", status ); + ompi_output( 0, "send_continue other error %d\n", status ); } } @@ -107,30 +104,30 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only get_line->flags ^= PTL_GM_PIPELINE_REMOTE; DO_DEBUG( count += sprintf( buffer + count, " start get %lld (%d)", get_line->length, frag->pipeline.pos_transfert ); ); frag->pipeline.pos_transfert = (frag->pipeline.pos_transfert + 1) % GM_PIPELINE_DEPTH; - } else if( 1 == onlyifget ) return OMPI_SUCCESS; + } else if( 1 == onlyifget ) goto check_completion_status; /* register the next segment */ reg_line = &(frag->pipeline.lines[frag->pipeline.pos_register]); length = frag->frag_recv.frag_base.frag_size - frag->frag_bytes_processed; if( (0 != length) && !(reg_line->flags & PTL_GM_PIPELINE_REGISTER) ) { reg_line->hdr_flags = get_line->hdr_flags; + reg_line->offset = get_line->offset + get_line->length; reg_line->length = length; if( reg_line->length > mca_ptl_gm_component.gm_rdma_frag_size ) reg_line->length = mca_ptl_gm_component.gm_rdma_frag_size; - reg_line->offset = get_line->offset + get_line->length; reg_line->local_memory.lval = 0L; reg_line->local_memory.pval = (char*)frag->frag_recv.frag_base.frag_addr + reg_line->offset; - status = gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval, - reg_line->length ); + status = mca_ptl_gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval, + reg_line->length ); if( GM_SUCCESS != status ) { ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n", - reg_line->local_memory.pval, - reg_line->length, reg_line->offset ); + reg_line->local_memory.pval, reg_line->length, reg_line->offset ); return OMPI_ERROR; } - DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld(%d)", - reg_line->length, reg_line->offset, frag->pipeline.pos_register ); ) + DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld processed %ld(%d)", + reg_line->length, reg_line->offset, frag->frag_bytes_processed, + frag->pipeline.pos_register ); ); reg_line->flags |= PTL_GM_PIPELINE_REGISTER; frag->frag_bytes_processed += reg_line->length; frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH; @@ -139,9 +136,8 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only /* deregister the previous one */ dereg_line = &(frag->pipeline.lines[frag->pipeline.pos_deregister]); if( dereg_line->flags & PTL_GM_PIPELINE_DEREGISTER ) { /* something usefull */ - status = gm_deregister_memory( peer->peer_ptl->gm_port, - dereg_line->local_memory.pval, - dereg_line->length ); + status = mca_ptl_gm_deregister_memory( peer->peer_ptl->gm_port, + dereg_line->local_memory.pval, dereg_line->length ); if( GM_SUCCESS != status ) { ompi_output( 0, "unpinning receiver memory from get (%p, %u) failed \n", dereg_line->local_memory.pval, @@ -154,7 +150,7 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only dereg_line->offset, frag->pipeline.pos_deregister ); ) frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH; } - + check_completion_status: if( frag->frag_recv.frag_base.frag_size <= frag->frag_bytes_validated ) { peer->peer_ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)peer->peer_ptl, frag->frag_recv.frag_request, frag->frag_recv.frag_base.frag_size, @@ -173,7 +169,7 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) gm_status_t status; mca_ptl_gm_pipeline_line_t *send_line, *reg_line, *dereg_line; mca_ptl_gm_frag_header_t* hdr; - DO_DEBUG( int count = 0; char buffer[128]; ) + DO_DEBUG( int count = 0; char buffer[256]; ) peer = (mca_ptl_gm_peer_t*)frag->frag_send.frag_base.frag_peer; DO_DEBUG( count = sprintf( buffer, "sender %p", (void*)frag ); ) @@ -188,7 +184,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) hdr = (mca_ptl_gm_frag_header_t*)item; hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; - hdr->hdr_frag.hdr_common.hdr_flags = send_line->hdr_flags | frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags; + hdr->hdr_frag.hdr_common.hdr_flags = send_line->hdr_flags | + frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags; hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr->hdr_frag.hdr_src_ptr.pval = frag; hdr->hdr_frag.hdr_dst_ptr = frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match; @@ -196,7 +193,7 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) hdr->hdr_frag.hdr_frag_length = send_line->length; hdr->registered_memory = send_line->local_memory; - gm_send_with_callback( peer->peer_ptl->gm_port, hdr, + gm_send_with_callback( peer->peer_ptl->gm_port, hdr, GM_SIZE, sizeof(mca_ptl_gm_frag_header_t), GM_HIGH_PRIORITY, peer->local_id, peer->port_number, send_continue_callback, (void*)hdr ); @@ -209,8 +206,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) /* deregister previous segment */ dereg_line = &(frag->pipeline.lines[frag->pipeline.pos_deregister]); if( dereg_line->flags & PTL_GM_PIPELINE_DEREGISTER ) { /* something usefull */ - status = gm_deregister_memory( peer->peer_ptl->gm_port, - dereg_line->local_memory.pval, dereg_line->length ); + status = mca_ptl_gm_deregister_memory( peer->peer_ptl->gm_port, + dereg_line->local_memory.pval, dereg_line->length ); if( GM_SUCCESS != status ) { ompi_output( 0, "unpinning receiver memory from get (%p, %u) failed \n", dereg_line->local_memory.pval, dereg_line->length ); @@ -219,8 +216,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) assert( dereg_line->flags == 0 ); frag->frag_bytes_validated += dereg_line->length; frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH; - DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld offset %ld", - dereg_line->length, dereg_line->offset ); ) + DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld offset %lld (validated %lld)", + dereg_line->length, dereg_line->offset, frag->frag_bytes_validated ); ) } /* register next segment */ @@ -238,8 +235,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) reg_line->local_memory.lval = 0L; reg_line->local_memory.pval = (char*)frag->frag_send.frag_base.frag_addr + reg_line->offset; - status = gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval, - reg_line->length ); + status = mca_ptl_gm_register_memory( peer->peer_ptl->gm_port, reg_line->local_memory.pval, + reg_line->length ); if( GM_SUCCESS != status ) { ompi_output( 0, "Cannot register sender memory (%p, %ld) bytes offset %ld\n", reg_line->local_memory.pval, reg_line->length, reg_line->offset ); @@ -248,7 +245,7 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) reg_line->flags |= PTL_GM_PIPELINE_TRANSFERT; frag->frag_bytes_processed += reg_line->length; frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH; - DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld", + DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %lld", reg_line->length, reg_line->offset ); ) } } @@ -257,6 +254,107 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) return OMPI_SUCCESS; } +static inline +int mca_ptl_gm_send_internal_rndv_header( mca_ptl_gm_peer_t *ptl_peer, + mca_ptl_gm_send_frag_t *fragment, + mca_ptl_gm_frag_header_t* hdr, + int flags ) +{ + struct iovec iov; + uint32_t max_data, in_size; + int32_t freeAfter; + ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor); + + iov.iov_base = (char*)hdr + sizeof(mca_ptl_gm_frag_header_t); + iov.iov_len = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed; + if( iov.iov_len > (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_gm_frag_header_t)) ) + iov.iov_len = (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_gm_frag_header_t)); + max_data = iov.iov_len; + in_size = 1; + + if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0) + return OMPI_ERROR; + + hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; + hdr->hdr_frag.hdr_common.hdr_flags = flags; + hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_frag.hdr_src_ptr.pval = fragment; + hdr->hdr_frag.hdr_dst_ptr = fragment->frag_send.frag_request->req_peer_match; + hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed; + hdr->hdr_frag.hdr_frag_length = fragment->frag_send.frag_base.frag_size - + fragment->frag_bytes_processed; + hdr->registered_memory.lval = 0L; + hdr->registered_memory.pval = NULL; + + DO_DEBUG( ompi_output( 0, "sender before send internal rndv header hdr_offset %lld hdr_length %lld max_data %u", + hdr->hdr_frag.hdr_frag_offset, hdr->hdr_frag.hdr_frag_length, max_data ); ); + gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, + sizeof(mca_ptl_gm_frag_header_t) + max_data, + GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number, + mca_ptl_gm_basic_frag_callback, (void *)hdr ); + fragment->frag_bytes_processed += max_data; + fragment->frag_bytes_validated += max_data; + DO_DEBUG( ompi_output( 0, "sender after send internal rndv header processed %lld, validated %lld max_data %u", + fragment->frag_bytes_processed, fragment->frag_bytes_validated, max_data ); ); + return OMPI_SUCCESS; +} + +static inline +int mca_ptl_gm_send_burst_data( mca_ptl_gm_peer_t *ptl_peer, + mca_ptl_gm_send_frag_t *fragment, + uint32_t burst_length, + mca_ptl_base_frag_header_t* hdr, + int32_t flags ) +{ + int32_t freeAfter, rc; + uint32_t max_data, in_size; + struct iovec iov; + ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor); + + while( 0 < burst_length ) { /* send everything for the burst_length size */ + if( NULL == hdr ) { + ompi_list_item_t* item; + OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc ); + ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 ); + hdr = (mca_ptl_base_frag_header_t*)item; + } + iov.iov_base = (char*)hdr + sizeof(mca_ptl_base_frag_header_t); + iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t); + if( iov.iov_len >= burst_length ) + iov.iov_len = burst_length; + max_data = iov.iov_len; + in_size = 1; + + if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0) + return OMPI_ERROR; + + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_src_ptr.pval = fragment; + hdr->hdr_dst_ptr = fragment->frag_send.frag_request->req_peer_match; + hdr->hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed; + hdr->hdr_frag_length = iov.iov_len; + + fragment->frag_bytes_processed += max_data; + fragment->frag_bytes_validated += max_data; + burst_length -= iov.iov_len; + if( fragment->frag_send.frag_base.frag_size == fragment->frag_bytes_processed ) { + assert( burst_length == 0 ); + hdr->hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT; + } + /* for the last piece set the header type to FIN */ + gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, + iov.iov_len + sizeof(mca_ptl_base_frag_header_t), + GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number, + send_continue_callback, (void*)hdr ); + hdr = NULL; /* force to retrieve a new one on the next loop */ + } + DO_DEBUG( ompi_output( 0, "sender after burst offset %lld, processed %lld, validated %lld\n", + fragment->frag_offset, fragment->frag_bytes_processed, fragment->frag_bytes_validated); ); + return OMPI_SUCCESS; +} + int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, mca_ptl_gm_send_frag_t *fragment, struct mca_pml_base_send_request_t *sendreq, @@ -270,67 +368,37 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, int rc = 0; gm_status_t status; mca_ptl_gm_pipeline_line_t* pipeline; - ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor); - int32_t freeAfter; - uint32_t max_data, in_size; - struct iovec iov; fragment->frag_offset = offset; /* must update the offset after actual fragment size is determined * before attempting to send the fragment */ - mca_pml_base_send_request_offset( sendreq, + mca_pml_base_send_request_offset( fragment->frag_send.frag_request, fragment->frag_send.frag_base.frag_size ); - DO_DEBUG( ompi_output( 0, "sender start new send length %ld\n", *size ); ) + DO_DEBUG( ompi_output( 0, "sender start new send length %ld offset %ld\n", *size, offset ); ) /* The first DMA memory buffer has been alocated in same time as the fragment */ item = (ompi_list_item_t*)fragment->send_buf; hdr = (mca_ptl_gm_frag_header_t*)item; remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed; - if( remaining_bytes > mca_ptl_gm_component.gm_eager_limit ) { - burst_length = remaining_bytes % mca_ptl_gm_component.gm_rdma_frag_size; - } else { + if( remaining_bytes < mca_ptl_gm_component.gm_eager_limit ) { burst_length = remaining_bytes; + } else if( remaining_bytes < mca_ptl_gm_component.gm_rndv_burst_limit ) { + burst_length = remaining_bytes % (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t)); + } else { + if( mca_ptl_gm_component.gm_rdma_frag_size == UINT_MAX ) + burst_length = 0; + else + burst_length = remaining_bytes % mca_ptl_gm_component.gm_rdma_frag_size; } - while( 0 < burst_length ) { /* send everything for the burst_length size */ - if( NULL == item ) { - OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc ); - ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 ); - hdr = (mca_ptl_gm_frag_header_t*)item; - } - iov.iov_base = (char*)item + sizeof(mca_ptl_base_frag_header_t); - iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t); - if( iov.iov_len >= burst_length ) - iov.iov_len = burst_length; - max_data = iov.iov_len; - in_size = 1; - - if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0) - return OMPI_ERROR; - - hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; - hdr->hdr_frag.hdr_common.hdr_flags = flags; - hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_frag.hdr_src_ptr.pval = fragment; - hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; - hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed; - hdr->hdr_frag.hdr_frag_length = iov.iov_len; - - fragment->frag_bytes_processed += iov.iov_len; - fragment->frag_bytes_validated += max_data; - burst_length -= iov.iov_len; - if( (burst_length == 0) && (remaining_bytes < mca_ptl_gm_component.gm_eager_limit) ) - hdr->hdr_frag.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT; - - /* for the last piece set the header type to FIN */ - gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, - iov.iov_len + sizeof(mca_ptl_base_frag_header_t), - GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number, - send_continue_callback, (void*)hdr ); - item = NULL; /* force to retrieve a new one on the next loop */ + if( burst_length > 0 ) { + mca_ptl_gm_send_burst_data( ptl_peer, fragment, burst_length, &(hdr->hdr_frag), flags ); + item = NULL; /* this buffer was already used by the mca_ptl_gm_send_burst_data function */ + DO_DEBUG( ompi_output( 0, "sender burst %ld bytes", burst_length ); ); } - if( remaining_bytes < mca_ptl_gm_component.gm_eager_limit ) { + + if( fragment->frag_send.frag_base.frag_size == fragment->frag_bytes_processed ) { *size = fragment->frag_bytes_processed; if( !(flags & MCA_PTL_FLAGS_ACK) ) { ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl, @@ -344,66 +412,31 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 ); hdr = (mca_ptl_gm_frag_header_t*)item; } - /* recompute the remaining length */ - remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed; - pipeline = &(fragment->pipeline.lines[0]); /* Large set of data => we have to setup a rendez-vous protocol. Here we can * use the match header already filled in by the upper level and just complete it * with the others informations. When we reach this point the rendez-vous protocol * has already been realized so we know that the receiver expect our message. */ -#if 0 - iov.iov_base = (char*)item + sizeof(mca_ptl_gm_frag_header_t); - iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_gm_frag_header_t); - if( iov.iov_len >= remaining_bytes ) - iov.iov_len = remaining_bytes; - max_data = iov.iov_len; - in_size = 1; - - if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0) - return OMPI_ERROR; - fragment->frag_bytes_processed += max_data; - fragment->frag_bytes_validated += max_data; -#endif + if( remaining_bytes > mca_ptl_gm_component.gm_rndv_burst_limit ) + flags |= PTL_FLAG_GM_REQUIRE_LOCK; + mca_ptl_gm_send_internal_rndv_header( ptl_peer, fragment, hdr, flags ); + if( !(PTL_FLAG_GM_REQUIRE_LOCK & flags) ) + return OMPI_SUCCESS; - hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; - hdr->hdr_frag.hdr_common.hdr_flags = flags; - hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_frag.hdr_src_ptr.pval = fragment; - hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; - hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed; - hdr->hdr_frag.hdr_frag_length = *size - fragment->frag_bytes_processed; - hdr->registered_memory.lval = 0L; - hdr->registered_memory.pval = NULL; - - gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, - sizeof(mca_ptl_gm_frag_header_t) /*+ max_data*/, - GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number, - mca_ptl_gm_basic_frag_callback, (void *)hdr ); - -#if 0 - pipeline->length = fragment->frag_send.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size; - if( pipeline->length < (mca_ptl_gm_component.gm_rdma_frag_size >> 1) ) { - if( 0 == pipeline->length ) - pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size; - else - if( fragment->frag_send.frag_base.frag_size > mca_ptl_gm_component.gm_rdma_frag_size ) - pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1); - } -#endif + pipeline = &(fragment->pipeline.lines[0]); pipeline->length = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed; - if( fragment->frag_send.frag_base.frag_size > mca_ptl_gm_component.gm_rdma_frag_size ) { + if( pipeline->length > mca_ptl_gm_component.gm_rdma_frag_size ) { pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size; } pipeline->offset = fragment->frag_offset + fragment->frag_bytes_processed; pipeline->hdr_flags = fragment->frag_send.frag_base.frag_header.hdr_common.hdr_flags; pipeline->local_memory.lval = 0L; pipeline->local_memory.pval = (char*)fragment->frag_send.frag_base.frag_addr + pipeline->offset; - status = gm_register_memory( ptl_peer->peer_ptl->gm_port, pipeline->local_memory.pval, - pipeline->length ); + status = mca_ptl_gm_register_memory( ptl_peer->peer_ptl->gm_port, pipeline->local_memory.pval, + pipeline->length ); if( GM_SUCCESS != status ) { - ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n", + ompi_output( 0, "Cannot register sender memory (%p, %ld) bytes offset %ld\n", pipeline->local_memory.pval, pipeline->length, pipeline->offset ); } pipeline->flags = PTL_GM_PIPELINE_TRANSFERT; @@ -594,9 +627,6 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl, /* get some memory and copy the data inside. We can then release the receive buffer */ if( 0 != length ) { char* ptr = (char*)gm_get_local_buffer(); - if (NULL == ptr) { - ompi_output(0, "[%s:%d] error in allocating memory \n", __FILE__, __LINE__); - } recv_frag->have_allocated_buffer = true; memcpy( ptr, recv_frag->frag_recv.frag_base.frag_addr, length ); recv_frag->frag_recv.frag_base.frag_addr = ptr; @@ -636,7 +666,7 @@ static int mca_ptl_gm_send_quick_fin_message( struct mca_ptl_gm_peer_t* ptl_peer hdr = (mca_ptl_base_header_t*)item; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; - hdr->hdr_common.hdr_flags = PTL_FLAG_GM_HAS_FRAGMENT; + hdr->hdr_common.hdr_flags = PTL_FLAG_GM_HAS_FRAGMENT | frag->frag_header.hdr_common.hdr_flags; hdr->hdr_ack.hdr_src_ptr.pval = frag->frag_header.hdr_frag.hdr_src_ptr.pval; hdr->hdr_ack.hdr_dst_match.lval = 0; hdr->hdr_ack.hdr_dst_match.pval = frag; @@ -686,26 +716,33 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st static mca_ptl_gm_recv_frag_t* mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl, - mca_ptl_gm_frag_header_t* hdr, uint32_t msg_len ) + mca_ptl_gm_frag_header_t* hdr, uint32_t msg_len ) { mca_pml_base_recv_request_t *request; ompi_convertor_t local_convertor, *convertor; struct iovec iov; - uint32_t iov_count, max_data; + uint32_t iov_count, max_data = 0, header_length; int32_t freeAfter, rc; mca_ptl_gm_recv_frag_t* frag; + mca_ptl_gm_pipeline_line_t* pipeline; + header_length = sizeof(mca_ptl_base_frag_header_t); if( hdr->hdr_frag.hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) { frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval; + frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag; request = (mca_pml_base_recv_request_t*)frag->frag_recv.frag_request; /* here we can have a synchronisation problem if several threads work in same time * with the same request. The only question is if it's possible ? */ convertor = &(frag->frag_recv.frag_base.frag_convertor); + DO_DEBUG( ompi_output( 0, "receiver get message tagged as HAS_FRAGMENT" ); ); + if( PTL_FLAG_GM_REQUIRE_LOCK & hdr->hdr_frag.hdr_common.hdr_flags ) + header_length = sizeof(mca_ptl_gm_frag_header_t); } else { request = (mca_pml_base_recv_request_t*)hdr->hdr_frag.hdr_dst_ptr.pval; - if( hdr->hdr_frag.hdr_frag_length <= (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t)) ) { + if( hdr->hdr_frag.hdr_frag_length <= (mca_ptl_gm_component.gm_segment_size - + sizeof(mca_ptl_base_frag_header_t)) ) { ompi_proc_t* proc = ompi_comm_peer_lookup( request->req_base.req_comm, request->req_base.req_ompi.req_status.MPI_SOURCE ); convertor = &local_convertor; @@ -720,10 +757,19 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl, frag->frag_recv.frag_base.frag_addr = frag->frag_recv.frag_request->req_base.req_addr; frag->frag_recv.frag_base.frag_size = hdr->hdr_frag.hdr_frag_length; frag->frag_recv.frag_base.frag_peer = (struct mca_ptl_base_peer_t*) - mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm, - request->req_base.req_ompi.req_status.MPI_SOURCE, - (struct mca_ptl_base_module_t*)ptl ); + mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm, + request->req_base.req_ompi.req_status.MPI_SOURCE, + (struct mca_ptl_base_module_t*)ptl ); + /* send an ack message to the sender ... quick hack (TODO) */ + frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag; + frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = 0; + mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer, + &(frag->frag_recv.frag_base) ); + header_length = sizeof(mca_ptl_gm_frag_header_t); + frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length; convertor = &(frag->frag_recv.frag_base.frag_convertor); + DO_DEBUG( ompi_output( 0, "receiver create fragment with offset %lld and length %lld", + frag->frag_offset, frag->frag_recv.frag_base.frag_size ); ); } ompi_convertor_init_for_recv( convertor, 0, request->req_base.req_datatype, @@ -732,83 +778,55 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl, hdr->hdr_frag.hdr_frag_offset, NULL ); } - if( NULL == frag ) { - iov.iov_base = (char*)hdr + sizeof(mca_ptl_base_frag_header_t); - iov.iov_len = hdr->hdr_frag.hdr_frag_length; + if( header_length != msg_len ) { + iov.iov_base = (char*)hdr + header_length; + iov.iov_len = msg_len - header_length; iov_count = 1; - max_data = hdr->hdr_frag.hdr_frag_length; + max_data = iov.iov_len; freeAfter = 0; /* unused here */ rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter ); assert( 0 == freeAfter ); - ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data ); - } else { - gm_status_t status; - mca_ptl_gm_pipeline_line_t* pipeline; - - frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag; - if( NULL == hdr->registered_memory.pval ) { /* first round of the local rendez-vous protocol */ - /* send an ack message to the sender ... quick hack (TODO) */ - frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = 0; - mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer, - &(frag->frag_recv.frag_base) ); - frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length; - - iov.iov_base = (char*)hdr + sizeof(mca_ptl_gm_frag_header_t); - iov.iov_len = msg_len - sizeof(mca_ptl_gm_frag_header_t); - iov_count = 1; - max_data = iov.iov_len; - freeAfter = 0; /* unused here */ - rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter ); - assert( 0 == freeAfter ); - frag->frag_bytes_processed += max_data; - frag->frag_bytes_validated += max_data; - - pipeline = &(frag->pipeline.lines[0]); -#if 0 - pipeline->length = frag->frag_recv.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size; - if( pipeline->length < (mca_ptl_gm_component.gm_rdma_frag_size >> 1) ) { - if( 0 == pipeline->length ) - pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size; - else - if( frag->frag_recv.frag_base.frag_size > mca_ptl_gm_component.gm_rdma_frag_size ) - pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1); - } -#endif - pipeline->length = frag->frag_recv.frag_base.frag_size - max_data; - if( pipeline->length > mca_ptl_gm_component.gm_rdma_frag_size ) { - pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size; - } - pipeline->offset = hdr->hdr_frag.hdr_frag_offset + max_data; - pipeline->local_memory.lval = 0L; - pipeline->local_memory.pval = (char*)request->req_base.req_addr + pipeline->offset; - status = gm_register_memory( ptl->gm_port, - pipeline->local_memory.pval, pipeline->length ); - if( status != GM_SUCCESS ) { - ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n", - (char*)request->req_base.req_addr + pipeline->offset, - pipeline->length, hdr->hdr_frag.hdr_frag_offset ); - return NULL; - } - pipeline->flags |= PTL_GM_PIPELINE_REGISTER; - frag->frag_bytes_processed += pipeline->length; - DO_DEBUG( ompi_output( 0, "receiver %p start register %lld (%d) offset %ld\n", frag, pipeline->length, frag->pipeline.pos_register, pipeline->offset ); ) - frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH; - } else { - /* There is a kind of rendez-vous protocol used internally by the GM driver. If the amount of data - * to transfert is large enough, then the sender will start sending a frag message with the - * remote_memory set to NULL (but with the length set to the length of the first fragment). - * It will allow the receiver to start to register it's own memory. Later when the receiver - * get a fragment with the remote_memory field not NULL it can start getting the data. - */ - pipeline = &(frag->pipeline.lines[frag->pipeline.pos_remote]); - DO_DEBUG( ompi_output( 0, "receiver %p get remote memory length %lld (%d)\n", frag, hdr->hdr_frag.hdr_frag_length, frag->pipeline.pos_remote ); ) - frag->pipeline.pos_remote = (frag->pipeline.pos_remote + 1) % GM_PIPELINE_DEPTH; - assert( (pipeline->flags & PTL_GM_PIPELINE_REMOTE) == 0 ); - pipeline->remote_memory = hdr->registered_memory; - pipeline->flags |= PTL_GM_PIPELINE_REMOTE; - /*if( false == ((mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer)->get_started )*/ - mca_ptl_gm_receiver_advance_pipeline( frag, 1 ); + /* If we are in a short burst mode then update the request */ + if( NULL == frag ) { + ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data ); + return NULL; } + } + + /* Update the status of the fragment depending on the amount of data converted so far */ + frag->frag_bytes_processed += max_data; + frag->frag_bytes_validated += max_data; + if( !(PTL_FLAG_GM_REQUIRE_LOCK & hdr->hdr_frag.hdr_common.hdr_flags) ) { + if( frag->frag_bytes_validated == frag->frag_recv.frag_base.frag_size ) { + ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, + frag->frag_recv.frag_base.frag_size, + frag->frag_recv.frag_base.frag_size ); + OMPI_FREE_LIST_RETURN( &(((mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer)->peer_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag ); + } + DO_DEBUG( ompi_output( 0, "receiver waiting for burst with fragment ..." ); ); + return NULL; + } + + /* There is a kind of rendez-vous protocol used internally by the GM driver. If the amount of data + * to transfert is large enough, then the sender will start sending a frag message with the + * remote_memory set to NULL (but with the length set to the length of the first fragment). + * It will allow the receiver to start to register it's own memory. Later when the receiver + * get a fragment with the remote_memory field not NULL it can start getting the data. + */ + if( NULL == hdr->registered_memory.pval ) { /* first round of the local rendez-vous protocol */ + pipeline = &(frag->pipeline.lines[0]); + pipeline->hdr_flags = hdr->hdr_frag.hdr_common.hdr_flags; + pipeline->offset = frag->frag_offset + frag->frag_bytes_processed; + pipeline->length = 0; /* we can lie about this one */ + mca_ptl_gm_receiver_advance_pipeline( frag, 0 ); + } else { + pipeline = &(frag->pipeline.lines[frag->pipeline.pos_remote]); + DO_DEBUG( ompi_output( 0, "receiver %p get remote memory length %lld (%d)\n", frag, hdr->hdr_frag.hdr_frag_length, frag->pipeline.pos_remote ); ); + frag->pipeline.pos_remote = (frag->pipeline.pos_remote + 1) % GM_PIPELINE_DEPTH; + assert( (pipeline->flags & PTL_GM_PIPELINE_REMOTE) == 0 ); + pipeline->remote_memory = hdr->registered_memory; + pipeline->flags |= PTL_GM_PIPELINE_REMOTE; + mca_ptl_gm_receiver_advance_pipeline( frag, 0 ); } return NULL; @@ -824,20 +842,28 @@ mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t* ptl, frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags = hdr->hdr_common.hdr_flags; frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match = hdr->hdr_ack.hdr_dst_match; - if( 0 == hdr->hdr_ack.hdr_dst_size ) { - DO_DEBUG( ompi_output( 0, "sender %p get FIN message (initial)", frag ); ) - /* I just receive the ack for the first fragment => setup the pipeline */ + frag->frag_send.frag_request->req_peer_match = hdr->hdr_ack.hdr_dst_match; + if( PTL_FLAG_GM_REQUIRE_LOCK & hdr->hdr_common.hdr_flags ) { + if( 0 == hdr->hdr_ack.hdr_dst_size ) { + DO_DEBUG( ompi_output( 0, "sender %p get FIN message (initial)", frag ); ); + /* I just receive the ack for the first fragment => setup the pipeline */ + mca_ptl_gm_sender_advance_pipeline( frag ); + } else { + /* mark the memory as ready to be deregistered */ + frag->pipeline.lines[frag->pipeline.pos_deregister].flags |= PTL_GM_PIPELINE_DEREGISTER; + DO_DEBUG( ompi_output( 0, "sender %p get FIN message (%d)", frag, frag->pipeline.pos_deregister ); ); + } + /* continue the pipeline ... send the next segment */ mca_ptl_gm_sender_advance_pipeline( frag ); } else { - /* mark the memory as ready to be deregistered */ - frag->pipeline.lines[frag->pipeline.pos_deregister].flags |= PTL_GM_PIPELINE_DEREGISTER; - DO_DEBUG( ompi_output( 0, "sender %p get FIN message (%d)", frag, frag->pipeline.pos_deregister ); ) + DO_DEBUG( ompi_output( 0, "sender burst data after rendez-vous protocol" ); ); + /* do a burst but with the remote fragment as we just get it from the message */ + mca_ptl_gm_send_burst_data( (mca_ptl_gm_peer_t*)frag->frag_send.frag_base.frag_peer, frag, + frag->frag_send.frag_base.frag_size - frag->frag_bytes_validated, + NULL, hdr->hdr_common.hdr_flags ); } - /* continue the pipeline ... send the next segment */ - mca_ptl_gm_sender_advance_pipeline( frag ); - if( frag->frag_send.frag_base.frag_size == frag->frag_bytes_validated ) { - /* mark the request as done before deregistering the memory */ + DO_DEBUG( ompi_output( 0, "sender complete send operation" ); ); ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->frag_send.frag_request, frag->frag_bytes_validated );