From dd50d364230b16842798f16557b1fa1eb1ea65d7 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Mon, 4 Apr 2005 16:02:08 +0000 Subject: [PATCH] Improuve the perfs a little bit by changing the rendez-vous protocol. Now we start sending burst of data while waiting for the rendez-vous to establish. This commit was SVN r5154. --- src/mca/ptl/gm/src/ptl_gm_component.c | 1 - src/mca/ptl/gm/src/ptl_gm_priv.c | 194 +++++++++++++++----------- src/mca/ptl/gm/src/ptl_gm_sendfrag.h | 2 +- 3 files changed, 115 insertions(+), 82 deletions(-) diff --git a/src/mca/ptl/gm/src/ptl_gm_component.c b/src/mca/ptl/gm/src/ptl_gm_component.c index a16c21ebf4..9122652626 100644 --- a/src/mca/ptl/gm/src/ptl_gm_component.c +++ b/src/mca/ptl/gm/src/ptl_gm_component.c @@ -481,7 +481,6 @@ mca_ptl_gm_init( mca_ptl_gm_component_t * gm ) 0, /* maximum number of list allocated elements will be zero */ 0, NULL ); /* not using mpool */ - return (mca_ptl_gm_component.gm_num_ptl_modules > 0 ? OMPI_SUCCESS : OMPI_ERR_OUT_OF_RESOURCE); } diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.c b/src/mca/ptl/gm/src/ptl_gm_priv.c index 4767efc63a..f32cbbc35e 100644 --- a/src/mca/ptl/gm/src/ptl_gm_priv.c +++ b/src/mca/ptl/gm/src/ptl_gm_priv.c @@ -105,7 +105,7 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only get_line->local_memory.pval, get_line->length, GM_LOW_PRIORITY, peer->local_id, peer->port_number, mca_ptl_gm_get_callback, frag ); get_line->flags ^= PTL_GM_PIPELINE_REMOTE; - DO_DEBUG( count += sprintf( buffer + count, " start get %lld (%d)", get_line->length, frag->pipeline.pos_transfert ); ) + DO_DEBUG( count += sprintf( buffer + count, " start get %lld (%d)", get_line->length, frag->pipeline.pos_transfert ); ); frag->pipeline.pos_transfert = (frag->pipeline.pos_transfert + 1) % GM_PIPELINE_DEPTH; } else if( 1 == onlyifget ) return OMPI_SUCCESS; @@ -129,7 +129,8 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only reg_line->length, reg_line->offset ); return OMPI_ERROR; } - DO_DEBUG( count += sprintf( buffer + count, " start register %lld (%d)", reg_line->length, frag->pipeline.pos_register ); ) + DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld(%d)", + reg_line->length, reg_line->offset, frag->pipeline.pos_register ); ) reg_line->flags |= PTL_GM_PIPELINE_REGISTER; frag->frag_bytes_processed += reg_line->length; frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH; @@ -149,7 +150,8 @@ int mca_ptl_gm_receiver_advance_pipeline( mca_ptl_gm_recv_frag_t* frag, int only dereg_line->flags ^= (PTL_GM_PIPELINE_DEREGISTER|PTL_GM_PIPELINE_REGISTER); assert( dereg_line->flags == 0 ); frag->frag_bytes_validated += dereg_line->length; - DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld (%d)", dereg_line->length, frag->pipeline.pos_deregister ); ) + DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld offset %ld (%d)", dereg_line->length, + dereg_line->offset, frag->pipeline.pos_deregister ); ) frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH; } @@ -217,7 +219,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) assert( dereg_line->flags == 0 ); frag->frag_bytes_validated += dereg_line->length; frag->pipeline.pos_deregister = (frag->pipeline.pos_deregister + 1) % GM_PIPELINE_DEPTH; - DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld", dereg_line->length ); ) + DO_DEBUG( count += sprintf( buffer + count, " start deregister %lld offset %ld", + dereg_line->length, dereg_line->offset ); ) } /* register next segment */ @@ -245,7 +248,8 @@ int mca_ptl_gm_sender_advance_pipeline( mca_ptl_gm_send_frag_t* frag ) reg_line->flags |= PTL_GM_PIPELINE_TRANSFERT; frag->frag_bytes_processed += reg_line->length; frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH; - DO_DEBUG( count += sprintf( buffer + count, " start register %lld", reg_line->length ); ) + DO_DEBUG( count += sprintf( buffer + count, " start register %lld offset %ld", + reg_line->length, reg_line->offset ); ) } } @@ -261,11 +265,15 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, int flags ) { mca_ptl_gm_frag_header_t* hdr; - uint64_t remaining_bytes; + uint64_t remaining_bytes, burst_length; ompi_list_item_t *item; int rc = 0; gm_status_t status; mca_ptl_gm_pipeline_line_t* pipeline; + ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor); + int32_t freeAfter; + uint32_t max_data, in_size; + struct iovec iov; fragment->frag_offset = offset; @@ -279,50 +287,50 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, item = (ompi_list_item_t*)fragment->send_buf; hdr = (mca_ptl_gm_frag_header_t*)item; remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed; + if( remaining_bytes > mca_ptl_gm_component.gm_eager_limit ) { + burst_length = remaining_bytes % mca_ptl_gm_component.gm_rdma_frag_size; + } else { + burst_length = remaining_bytes; + } - if( remaining_bytes <= mca_ptl_gm_component.gm_eager_limit ) { /* small protocol */ - int32_t freeAfter; - uint32_t max_data, in_size; - struct iovec iov; - ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor); - - /* If we have an eager send then we should send the rest of the data. */ - while( 0 < remaining_bytes ) { - if( NULL == item ) { - OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc ); - ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 ); - hdr = (mca_ptl_gm_frag_header_t*)item; - } - iov.iov_base = (char*)item + sizeof(mca_ptl_base_frag_header_t); - iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t); - if( iov.iov_len >= remaining_bytes ) - iov.iov_len = remaining_bytes; - max_data = iov.iov_len; - in_size = 1; - - if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0) - return OMPI_ERROR; - - hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; - hdr->hdr_frag.hdr_common.hdr_flags = flags; - hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_frag.hdr_src_ptr.pval = fragment; - hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; - hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed; - hdr->hdr_frag.hdr_frag_length = iov.iov_len; - - fragment->frag_bytes_processed += iov.iov_len; - remaining_bytes -= iov.iov_len; - if( remaining_bytes == 0 ) - hdr->hdr_frag.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT; - - /* for the last piece set the header type to FIN */ - gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, - iov.iov_len + sizeof(mca_ptl_base_frag_header_t), - GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number, - send_continue_callback, (void*)hdr ); - item = NULL; /* force to retrieve a new one on the next loop */ + while( 0 < burst_length ) { /* send everything for the burst_length size */ + if( NULL == item ) { + OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc ); + ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 ); + hdr = (mca_ptl_gm_frag_header_t*)item; } + iov.iov_base = (char*)item + sizeof(mca_ptl_base_frag_header_t); + iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t); + if( iov.iov_len >= burst_length ) + iov.iov_len = burst_length; + max_data = iov.iov_len; + in_size = 1; + + if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0) + return OMPI_ERROR; + + hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; + hdr->hdr_frag.hdr_common.hdr_flags = flags; + hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_frag.hdr_src_ptr.pval = fragment; + hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; + hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed; + hdr->hdr_frag.hdr_frag_length = iov.iov_len; + + fragment->frag_bytes_processed += iov.iov_len; + fragment->frag_bytes_validated += max_data; + burst_length -= iov.iov_len; + if( (burst_length == 0) && (remaining_bytes < mca_ptl_gm_component.gm_eager_limit) ) + hdr->hdr_frag.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT; + + /* for the last piece set the header type to FIN */ + gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, + iov.iov_len + sizeof(mca_ptl_base_frag_header_t), + GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number, + send_continue_callback, (void*)hdr ); + item = NULL; /* force to retrieve a new one on the next loop */ + } + if( remaining_bytes < mca_ptl_gm_component.gm_eager_limit ) { *size = fragment->frag_bytes_processed; if( !(flags & MCA_PTL_FLAGS_ACK) ) { ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl, @@ -331,24 +339,46 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, } return OMPI_SUCCESS; } + if( NULL == item ) { + OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc ); + ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 ); + hdr = (mca_ptl_gm_frag_header_t*)item; + } + /* recompute the remaining length */ + remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed; + pipeline = &(fragment->pipeline.lines[0]); /* Large set of data => we have to setup a rendez-vous protocol. Here we can * use the match header already filled in by the upper level and just complete it * with the others informations. When we reach this point the rendez-vous protocol * has already been realized so we know that the receiver expect our message. */ +#if 0 + iov.iov_base = (char*)item + sizeof(mca_ptl_gm_frag_header_t); + iov.iov_len = mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_gm_frag_header_t); + if( iov.iov_len >= remaining_bytes ) + iov.iov_len = remaining_bytes; + max_data = iov.iov_len; + in_size = 1; + + if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0) + return OMPI_ERROR; + fragment->frag_bytes_processed += max_data; + fragment->frag_bytes_validated += max_data; +#endif + hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; hdr->hdr_frag.hdr_common.hdr_flags = flags; hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr->hdr_frag.hdr_src_ptr.pval = fragment; hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; - hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset; - hdr->hdr_frag.hdr_frag_length = *size; + hdr->hdr_frag.hdr_frag_offset = fragment->frag_offset + fragment->frag_bytes_processed; + hdr->hdr_frag.hdr_frag_length = *size - fragment->frag_bytes_processed; hdr->registered_memory.lval = 0L; hdr->registered_memory.pval = NULL; gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, - sizeof(mca_ptl_base_frag_header_t) + sizeof(ompi_ptr_t), + sizeof(mca_ptl_gm_frag_header_t) /*+ max_data*/, GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->port_number, mca_ptl_gm_basic_frag_callback, (void *)hdr ); @@ -362,12 +392,11 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1); } #endif - if( fragment->frag_send.frag_base.frag_size < mca_ptl_gm_component.gm_rdma_frag_size ) { - pipeline->length = fragment->frag_send.frag_base.frag_size; - } else { + pipeline->length = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed; + if( fragment->frag_send.frag_base.frag_size > mca_ptl_gm_component.gm_rdma_frag_size ) { pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size; } - pipeline->offset = fragment->frag_offset; + pipeline->offset = fragment->frag_offset + fragment->frag_bytes_processed; pipeline->hdr_flags = fragment->frag_send.frag_base.frag_header.hdr_common.hdr_flags; pipeline->local_memory.lval = 0L; pipeline->local_memory.pval = (char*)fragment->frag_send.frag_base.frag_addr + pipeline->offset; @@ -485,7 +514,7 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl, static mca_ptl_gm_recv_frag_t* mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl, - mca_ptl_base_header_t * header ) + mca_ptl_base_header_t * header, uint32_t msg_len ) { mca_pml_base_send_request_t *req; @@ -530,7 +559,7 @@ mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl, */ static mca_ptl_gm_recv_frag_t* mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl, - mca_ptl_base_header_t* hdr ) + mca_ptl_base_header_t* hdr, uint32_t msg_len ) { mca_ptl_gm_recv_frag_t* recv_frag; bool matched; @@ -657,7 +686,7 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st static mca_ptl_gm_recv_frag_t* mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl, - mca_ptl_gm_frag_header_t* hdr ) + mca_ptl_gm_frag_header_t* hdr, uint32_t msg_len ) { mca_pml_base_recv_request_t *request; ompi_convertor_t local_convertor, *convertor; @@ -723,6 +752,17 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl, mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer, &(frag->frag_recv.frag_base) ); frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length; + + iov.iov_base = (char*)hdr + sizeof(mca_ptl_gm_frag_header_t); + iov.iov_len = msg_len - sizeof(mca_ptl_gm_frag_header_t); + iov_count = 1; + max_data = iov.iov_len; + freeAfter = 0; /* unused here */ + rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter ); + assert( 0 == freeAfter ); + frag->frag_bytes_processed += max_data; + frag->frag_bytes_validated += max_data; + pipeline = &(frag->pipeline.lines[0]); #if 0 pipeline->length = frag->frag_recv.frag_base.frag_size % mca_ptl_gm_component.gm_rdma_frag_size; @@ -734,25 +774,24 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl, pipeline->length = (mca_ptl_gm_component.gm_rdma_frag_size >> 1); } #endif - if( frag->frag_recv.frag_base.frag_size < mca_ptl_gm_component.gm_rdma_frag_size ) { - pipeline->length = frag->frag_recv.frag_base.frag_size; - } else { + pipeline->length = frag->frag_recv.frag_base.frag_size - max_data; + if( pipeline->length > mca_ptl_gm_component.gm_rdma_frag_size ) { pipeline->length = mca_ptl_gm_component.gm_rdma_frag_size; } + pipeline->offset = hdr->hdr_frag.hdr_frag_offset + max_data; pipeline->local_memory.lval = 0L; - pipeline->local_memory.pval = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset; + pipeline->local_memory.pval = (char*)request->req_base.req_addr + pipeline->offset; status = gm_register_memory( ptl->gm_port, pipeline->local_memory.pval, pipeline->length ); if( status != GM_SUCCESS ) { ompi_output( 0, "Cannot register receiver memory (%p, %ld) bytes offset %ld\n", - (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset, + (char*)request->req_base.req_addr + pipeline->offset, pipeline->length, hdr->hdr_frag.hdr_frag_offset ); return NULL; } - pipeline->offset = hdr->hdr_frag.hdr_frag_offset; pipeline->flags |= PTL_GM_PIPELINE_REGISTER; frag->frag_bytes_processed += pipeline->length; - DO_DEBUG( ompi_output( 0, "receiver %p start register %lld (%d)\n", frag, pipeline->length, frag->pipeline.pos_register ); ) + DO_DEBUG( ompi_output( 0, "receiver %p start register %lld (%d) offset %ld\n", frag, pipeline->length, frag->pipeline.pos_register, pipeline->offset ); ) frag->pipeline.pos_register = (frag->pipeline.pos_register + 1) % GM_PIPELINE_DEPTH; } else { /* There is a kind of rendez-vous protocol used internally by the GM driver. If the amount of data @@ -777,7 +816,7 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl, static mca_ptl_gm_recv_frag_t* mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t* ptl, - mca_ptl_base_header_t* hdr ) + mca_ptl_base_header_t* hdr, uint32_t msg_len ) { mca_ptl_gm_send_frag_t* frag; @@ -836,7 +875,7 @@ void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl ) } typedef mca_ptl_gm_recv_frag_t* (frag_management_fct_t)( struct mca_ptl_gm_module_t *ptl, - mca_ptl_base_header_t *hdr ); + mca_ptl_base_header_t *hdr, uint32_t msg_len ); frag_management_fct_t* frag_management_fct[MCA_PTL_HDR_TYPE_MAX] = { NULL, mca_ptl_gm_recv_frag_match, @@ -853,7 +892,7 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even mca_ptl_gm_recv_frag_t * frag; mca_ptl_base_header_t *header = NULL, *release_buf; frag_management_fct_t* function; - uint32_t priority = GM_HIGH_PRIORITY; + uint32_t priority = GM_HIGH_PRIORITY, msg_len; release_buf = (mca_ptl_base_header_t*)gm_ntohp(event->recv.buffer); @@ -881,18 +920,13 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even return 0; have_event: - if( header->hdr_common.hdr_type >= MCA_PTL_HDR_TYPE_MAX ) { - ompi_output( 0, "[%s:%d] unexpected frag type %d\n", - __FILE__, __LINE__, header->hdr_common.hdr_type ); - } else { - function = frag_management_fct[header->hdr_common.hdr_type]; - if( NULL == function ) { - ompi_output( 0, "[%s:%d] NOT yet implemented function for the header type %d\n", - __FILE__, __LINE__, header->hdr_common.hdr_type ); - } else { - frag = function( ptl, header ); - } - } + assert( header->hdr_common.hdr_type < MCA_PTL_HDR_TYPE_MAX ); + function = frag_management_fct[header->hdr_common.hdr_type]; + assert( NULL != function ); + + msg_len = gm_ntohl( event->recv.length ); + frag = function( ptl, header, msg_len ); + gm_provide_receive_buffer( ptl->gm_port, release_buf, GM_SIZE, priority ); return 0; diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h index b6ee1a4fa3..861bf7517e 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h @@ -48,7 +48,7 @@ extern "C" { OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t); /* specific header for GM rendezvous protocol. It will be filled up by the sender - * and should be ab;e to hold a pointer to the last registered memory location. + * and should be able to hold a pointer to the last registered memory location. */ struct mca_ptl_gm_frag_header_t { mca_ptl_base_frag_header_t hdr_frag;