diff --git a/src/mca/ptl/gm/src/ptl_gm.c b/src/mca/ptl/gm/src/ptl_gm.c index 5de1d3b798..7d588dafec 100644 --- a/src/mca/ptl/gm/src/ptl_gm.c +++ b/src/mca/ptl/gm/src/ptl_gm.c @@ -309,9 +309,7 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl, sendreq, offset, &size, flags ); rc = mca_ptl_gm_peer_send_continue( (mca_ptl_gm_peer_t *)ptl_peer, putfrag, - sendreq, offset, &size, flags, - ((char*)(sendreq->req_base.req_addr)) + offset, - size ); + sendreq, offset, &size, flags ); return OMPI_SUCCESS; } @@ -356,7 +354,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, header_length = sizeof(mca_ptl_base_match_header_t); } - if (hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) { + if( hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK ) { /* need to send an ack back */ ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL ); if( NULL == ack ) { @@ -369,9 +367,10 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, mca_ptl_base_header_t* ack_hdr = (mca_ptl_base_header_t*)ack->send_buf; ack_hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; ack_hdr->hdr_ack.hdr_common.hdr_flags = 0; - ack_hdr->hdr_ack.hdr_src_ptr.lval = hdr->hdr_rndv.hdr_src_ptr.lval; + ack_hdr->hdr_ack.hdr_src_ptr = hdr->hdr_rndv.hdr_src_ptr; ack_hdr->hdr_ack.hdr_dst_match.lval = 0L; - ack_hdr->hdr_ack.hdr_dst_match.pval = frag; + /* just a easy way to remember that there is a request not a fragment */ + ack_hdr->hdr_ack.hdr_dst_match.pval = request; ack_hdr->hdr_ack.hdr_dst_addr.lval = 0L; ack_hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed; gm_send_to_peer_with_callback( ((mca_ptl_gm_module_t*)ptl)->gm_port, ack_hdr, @@ -407,7 +406,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, assert( rc >= 0 ); } - /* update progress*/ + /* update progress*/ ptl->ptl_recv_progress( ptl, request, bytes_recv, bytes_recv ); /* Now update the status of the fragment */ diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.c b/src/mca/ptl/gm/src/ptl_gm_priv.c index d40a974473..b39be3a6fb 100644 --- a/src/mca/ptl/gm/src/ptl_gm_priv.c +++ b/src/mca/ptl/gm/src/ptl_gm_priv.c @@ -26,6 +26,7 @@ #include "ptl_gm_proc.h" #include "ptl_gm_sendfrag.h" #include "ptl_gm_priv.h" +#include "mca/pml/teg/src/pml_teg_proc.h" static void send_continue_callback( struct gm_port *port, void * context, gm_status_t status ) { @@ -42,12 +43,16 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta case GM_SUCCESS: /*OMPI_OUTPUT( (0, "[%s:%d] send_continue_callback release header %p from fragment %p (available %d)\n", __FILE__, __LINE__, (void*)header, (void*)frag, gm_ptl->num_send_tokens) );*/ - frag->already_send += header->hdr_frag.hdr_frag_length; + if( header->hdr_frag.hdr_frag_length <= (5 * GM_BUF_SIZE) ) { + /* small message */ + frag->frag_bytes_processed += header->hdr_frag.hdr_frag_length; + } + OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) ); /* release the send token */ ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 ); - if( frag->already_send >= frag->send_frag.frag_base.frag_size ) { + if( frag->frag_bytes_processed >= frag->send_frag.frag_base.frag_size ) { OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) ); } break; @@ -67,12 +72,10 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, struct mca_pml_base_send_request_t *sendreq, size_t offset, size_t *size, - int flags, - void * target_buffer, - int bytes ) + int flags ) { mca_ptl_base_header_t hdr; - size_t update_offset = offset, header_length = sizeof(mca_ptl_base_frag_header_t); + size_t header_length = sizeof(mca_ptl_base_frag_header_t); ompi_list_item_t* item; int rc = 0; @@ -80,59 +83,68 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, __FILE__, __LINE__, (void*)ptl_peer, (void*)fragment, (void*)sendreq, offset, *size, flags, target_buffer, bytes) );*/ + fragment->send_frag.frag_base.frag_size = *size; + fragment->frag_bytes_processed = 0; + fragment->frag_offset = offset; + hdr.hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; hdr.hdr_frag.hdr_common.hdr_flags = flags; hdr.hdr_frag.hdr_frag_length = *size; - hdr.hdr_frag.hdr_frag_offset = update_offset; + hdr.hdr_frag.hdr_frag_offset = fragment->frag_offset; hdr.hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr.hdr_frag.hdr_src_ptr.pval = fragment; hdr.hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; - fragment->send_frag.frag_base.frag_addr =(void *)target_buffer; - fragment->send_frag.frag_base.frag_size = bytes; - fragment->already_send = 0; /* must update the offset after actual fragment size is determined * before attempting to send the fragment */ mca_pml_base_send_request_offset( sendreq, fragment->send_frag.frag_base.frag_size ); + fragment->send_frag.frag_base.frag_addr = (char*)sendreq->req_base.req_addr + offset; + /* The first DMA memory buffer has been alocated in same time as the fragment */ item = (ompi_list_item_t*)fragment->send_buf; if( (*size) <= (5 * GM_BUF_SIZE) ) { /* small protocol */ - size_t max_data; + size_t max_data, remaining_bytes = fragment->send_frag.frag_base.frag_size; int freeAfter; unsigned int in_size; struct iovec iov; ompi_convertor_t *convertor = &(fragment->send_frag.frag_base.frag_convertor); /* If we have an eager send then we should send the rest of the data. */ - while( 0 == rc ) { + while( 0 < remaining_bytes ) { if( NULL == item ) { OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc ); ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 ); } iov.iov_base = (char*)item + header_length; iov.iov_len = GM_BUF_SIZE - header_length; + if( iov.iov_len >= remaining_bytes ) + iov.iov_len = remaining_bytes; max_data = iov.iov_len; in_size = 1; - if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0) + if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0) return OMPI_ERROR; - hdr.hdr_frag.hdr_frag_offset = update_offset; + hdr.hdr_frag.hdr_frag_offset = fragment->frag_offset; hdr.hdr_frag.hdr_frag_length = iov.iov_len; - update_offset += iov.iov_len; - *(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag; + fragment->frag_offset += iov.iov_len; + remaining_bytes -= iov.iov_len; + if( remaining_bytes == 0 ) hdr.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT; + + *(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag; + /* for the last piece set the header type to FIN */ gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, item, GM_SIZE, iov.iov_len + sizeof(mca_ptl_gm_eager_header_t), GM_LOW_PRIORITY, ptl_peer->local_id, send_continue_callback, (void*)item ); item = NULL; /* force to retrieve a new one on the next loop */ } - *size = update_offset - offset; + *size = fragment->frag_offset - offset; if( !(flags & MCA_PTL_FLAGS_ACK) ) { ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl, fragment->send_frag.frag_request, @@ -152,18 +164,20 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer, ompi_ptr_t* local_address; status = gm_register_memory( ptl_peer->peer_ptl->gm_port, - target_buffer /*sendreq->req_base.req_addr */, + fragment->send_frag.frag_base.frag_addr, (*size) ); if( status != GM_SUCCESS ) { - printf( "Cannot register memory from %p length %ud bytes\n", - (void*)sendreq->req_base.req_addr, (*size) ); + ompi_output( 0, "Cannot register memory from %p length %ud bytes\n", + (void*)sendreq->req_base.req_addr, (*size) ); return OMPI_ERROR; } *(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag; local_address = (ompi_ptr_t*)((char*)item + header_length); local_address->lval = 0; - local_address->pval = target_buffer /*(void*)sendreq->req_base.req_addr */; + local_address->pval = fragment->send_frag.frag_base.frag_addr; + + fragment->frag_offset += fragment->send_frag.frag_base.frag_size; gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, item, GM_SIZE, header_length + sizeof(ompi_ptr_t), GM_LOW_PRIORITY, @@ -199,7 +213,7 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer, fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super; fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer; fragment->send_frag.frag_request = sendreq; - fragment->already_send = 0; + fragment->frag_bytes_processed = 0; /* At this point the header is already filled up with informations as a match header */ if( (flags & MCA_PTL_FLAGS_ACK) || (0 == offset) ) { @@ -275,9 +289,6 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer, */ size_out = iov.iov_len + header_length; - DO_DEBUG( printf( "send pointer %p SIZE %d length %lu\n", - (void*)fragment->send_buf, GM_BUF_SIZE, size_out ) ); - /* must update the offset after actual fragment size is determined * before attempting to send the fragment */ @@ -288,13 +299,13 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer, gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf, GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id, send_callback, (void *)fragment ); - + fragment->frag_bytes_processed = size_out - header_length; + *size = fragment->frag_bytes_processed; if( !(flags & MCA_PTL_FLAGS_ACK) ) { ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl, fragment->send_frag.frag_request, size_out ); } - *size = size_out - header_length; return OMPI_SUCCESS; } @@ -311,7 +322,7 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status) putfrag = (mca_ptl_gm_send_frag_t *)context; header = (mca_ptl_base_header_t*)putfrag->send_buf; bytes2 = header->hdr_ack.hdr_dst_size; - ptl = (mca_ptl_gm_module_t *)putfrag->ptl; + ptl = (mca_ptl_gm_module_t*)putfrag->send_frag.frag_base.frag_owner; send_req = putfrag->req; switch (status) { @@ -382,18 +393,13 @@ void send_callback( struct gm_port *port, void * context, gm_status_t status ) */ break; case MCA_PTL_HDR_TYPE_ACK: - OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag)); - break; case MCA_PTL_HDR_TYPE_FIN: - ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request, - hdr_dst_size); - OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag)); break; default: - /* Not going to call progress on this send, - * and not free-ing descriptor */ - frag->send_complete = 1; + /* Not going to call progress on this send, and not free-ing descriptor */ + printf( "Called with a strange headertype ...\n" ); + break; } break; @@ -415,12 +421,12 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade { mca_ptl_gm_send_frag_t * frag; mca_pml_base_send_request_t *req; - mca_pml_base_recv_request_t *request; - int status; - if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) { - frag = (mca_ptl_gm_send_frag_t *)(header->hdr_ack.hdr_src_ptr.pval); - req = (mca_pml_base_send_request_t *) frag->req; + if( MCA_PTL_HDR_TYPE_ACK == header->hdr_common.hdr_type ) { + frag = (mca_ptl_gm_send_frag_t*)(header->hdr_ack.hdr_src_ptr.pval); + /* update the fragment header with the most up2date informations */ + frag->send_frag.frag_base.frag_header.hdr_ack.hdr_dst_match = header->hdr_ack.hdr_dst_match; + req = (mca_pml_base_send_request_t*)frag->req; assert(req != NULL); req->req_peer_match = header->hdr_ack.hdr_dst_match; req->req_peer_addr = header->hdr_ack.hdr_dst_addr; @@ -432,22 +438,12 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade frag->send_frag.frag_request, frag->send_frag.frag_base.frag_size ); OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag); + } else { + if( header->hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) { + frag->send_frag.frag_base.frag_header.hdr_common.hdr_flags |= PTL_FLAG_GM_HAS_FRAGMENT; + } } - } else if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) { - request = (mca_pml_base_recv_request_t*)header->hdr_ack.hdr_dst_match.pval; - /* call receive progress and indicate the recv has been completed */ - ptl->super.ptl_recv_progress( (mca_ptl_base_module_t *) ptl, - request , - header->hdr_ack.hdr_dst_size, - header->hdr_ack.hdr_dst_size ); - /* deregister the memory */ - status = gm_deregister_memory( ptl->gm_port, - header->hdr_ack.hdr_dst_addr.pval, - header->hdr_ack.hdr_dst_size ); - - if(GM_SUCCESS != status) { - ompi_output(0," unpinning memory failed\n"); - } + } else if( MCA_PTL_HDR_TYPE_NACK == header->hdr_common.hdr_type ) { } else { OMPI_OUTPUT((0, "Unkonwn header type in ptl_gm_ctrl_frag\n")); } @@ -475,13 +471,8 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl, recv_frag->frag_recv.frag_base.frag_peer = NULL; recv_frag->frag_recv.frag_request = NULL; recv_frag->frag_recv.frag_is_buffered = false; - recv_frag->frag_hdr_cnt = 0; - recv_frag->frag_msg_cnt = 0; - recv_frag->frag_ack_pending = false; - recv_frag->frag_progressed = 0; recv_frag->frag_recv.frag_base.frag_header.hdr_rndv = hdr->hdr_rndv; - if( MCA_PTL_HDR_TYPE_MATCH == hdr->hdr_rndv.hdr_match.hdr_common.hdr_type ) { recv_frag->frag_recv.frag_base.frag_addr = (char *) hdr + sizeof(mca_ptl_base_match_header_t); @@ -495,21 +486,24 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl, recv_frag->matched = false; recv_frag->have_allocated_buffer = false; - recv_frag->ptl = ptl; - + recv_frag->frag_ack_pending = false; + recv_frag->frag_progressed = 0; + recv_frag->frag_offset = 0; /* initial frgment */ + recv_frag->frag_bytes_processed = 0; + matched = ptl->super.ptl_match( &(ptl->super), &(recv_frag->frag_recv), &(recv_frag->frag_recv.frag_base.frag_header.hdr_match) ); - if( matched ) { + if( !matched ) { size_t length = recv_frag->frag_recv.frag_base.frag_size; /* get some memory and copy the data inside. We can then release the receive buffer */ char* ptr = (char*)malloc( sizeof(char) * length ); recv_frag->have_allocated_buffer = true; memcpy( ptr, recv_frag->frag_recv.frag_base.frag_addr, length ); recv_frag->frag_recv.frag_base.frag_addr = ptr; - return NULL; + return recv_frag; } - return recv_frag; + return NULL; } /* This function get called when the gm_get is finish (i.e. when the read from remote memory @@ -524,7 +518,10 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st mca_ptl_gm_send_frag_t *ack; mca_pml_base_recv_request_t *request; mca_ptl_gm_peer_t* peer; + mca_ptl_base_header_t* hdr; int rc; + size_t length; + void* pointer; frag = (mca_ptl_gm_recv_frag_t*)context; gm_ptl = (mca_ptl_gm_module_t *)frag->frag_recv.frag_base.frag_owner; @@ -533,17 +530,33 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st switch( status ) { case GM_SUCCESS: - /*OMPI_OUTPUT( (0, "[%s:%d] mca_ptl_gm_get_callback release header %p from fragment %p (available %\d)\n", - __FILE__, __LINE__, (void*)header, (void*)frag, gm_ptl->num_send_tokens) );*/ + pointer = frag->frag_recv.frag_base.frag_addr; + length = frag->frag_recv.frag_base.frag_size; ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL ); rc = mca_ptl_gm_send_ack_init( ack, gm_ptl, (mca_ptl_gm_peer_t *)(frag->frag_recv.frag_base.frag_peer), frag, NULL, frag->frag_recv.frag_base.frag_size ); - + hdr = (mca_ptl_base_header_t*)ack->send_buf; + hdr->hdr_common.hdr_flags |= PTL_FLAG_GM_HAS_FRAGMENT; + frag->frag_bytes_processed += frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length; + if( frag->frag_recv.frag_base.frag_size <= frag->frag_bytes_processed ) { + gm_ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)gm_ptl, + request, + frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length, + frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length ); + /* This request is done. I will send back the FIN message */ + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; + OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag ); + } gm_send_to_peer_with_callback( ((mca_ptl_gm_module_t*)(ack->send_frag.frag_base.frag_owner))->gm_port, ack->send_buf, GM_SIZE, sizeof(mca_ptl_base_ack_header_t), GM_LOW_PRIORITY, peer->local_id, send_callback, (void*)ack ); + status = gm_deregister_memory( ((mca_ptl_gm_module_t*)(ack->send_frag.frag_base.frag_owner))->gm_port, + pointer, length ); + if( GM_SUCCESS != status ) { + OMPI_OUTPUT( (0, "unpinning memory (%p, %u) failed \n", pointer, length) ); + } break; case GM_SEND_TIMED_OUT: printf( "mca_ptl_gm_get_callback timed out\n" ); @@ -561,7 +574,7 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl, gm_recv_event_t* event ) { mca_pml_base_recv_request_t *request; - ompi_convertor_t* convertor; + ompi_convertor_t* convertor = NULL; mca_ptl_base_header_t *hdr; struct iovec iov; uint32_t iov_count, max_data; @@ -570,19 +583,44 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl, hdr = (mca_ptl_base_header_t *)gm_ntohp(event->recv.buffer); - recv_frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval; - request = (mca_pml_base_recv_request_t*)recv_frag->frag_recv.frag_request; - /* here we can have a synchronisation problem if several threads work in same time - * with the same request. The only question is if it's possible ? - */ - convertor = &(recv_frag->frag_recv.frag_base.frag_convertor); - ompi_convertor_init_for_recv( convertor, 0, - request->req_base.req_datatype, - request->req_base.req_count, - request->req_base.req_addr, - hdr->hdr_frag.hdr_frag_offset, NULL ); + if( hdr->hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) { + recv_frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval; + request = (mca_pml_base_recv_request_t*)recv_frag->frag_recv.frag_request; + /* here we can have a synchronisation problem if several threads work in same time + * with the same request. The only question is if it's possible ? + */ + convertor = &(recv_frag->frag_recv.frag_base.frag_convertor); + } else { + request = (mca_pml_base_recv_request_t*)hdr->hdr_frag.hdr_dst_ptr.pval; - if( hdr->hdr_frag.hdr_frag_length <= (GM_BUF_SIZE - sizeof(mca_ptl_base_frag_header_t)) ) { + if( hdr->hdr_frag.hdr_frag_length <= (GM_BUF_SIZE - sizeof(mca_ptl_base_frag_header_t)) ) { + ompi_proc_t* proc = ompi_comm_peer_lookup( request->req_base.req_comm, + request->req_base.req_ompi.req_status.MPI_SOURCE ); + convertor = ompi_convertor_get_copy( proc->proc_convertor ); + recv_frag = NULL; + } else { /* large message => we have to create a receive fragment */ + recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl ); + recv_frag->frag_recv.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl; + recv_frag->frag_recv.frag_request = request; + recv_frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag; + recv_frag->frag_recv.frag_base.frag_peer = + mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm, + request->req_base.req_ompi.req_status.MPI_SOURCE, + (struct mca_ptl_base_module_t*)ptl ); + recv_frag->frag_offset = hdr->hdr_frag.hdr_frag_offset; + recv_frag->matched = true; + recv_frag->frag_bytes_processed = 0; + recv_frag->frag_recv.frag_base.frag_size = hdr->hdr_frag.hdr_frag_length; + convertor = &(recv_frag->frag_recv.frag_base.frag_convertor); + } + ompi_convertor_init_for_recv( convertor, 0, + request->req_base.req_datatype, + request->req_base.req_count, + request->req_base.req_addr, + hdr->hdr_frag.hdr_frag_offset, NULL ); + } + + if( NULL == recv_frag ) { iov.iov_base = (char*)hdr + sizeof(mca_ptl_base_frag_header_t); iov.iov_len = hdr->hdr_frag.hdr_frag_length; iov_count = 1; @@ -590,12 +628,16 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl, freeAfter = 0; /* unused here */ rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter ); assert( 0 == freeAfter ); - if( (hdr->hdr_frag.hdr_frag_offset + hdr->hdr_frag.hdr_frag_length) >= - recv_frag->frag_recv.frag_base.frag_size ) { - /* update the request status if we are done */ - ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data ); - OBJ_RELEASE( recv_frag ); + ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data ); + if( PTL_FLAG_GM_LAST_FRAGMENT & hdr->hdr_common.hdr_flags ) { + /* I'm done with this fragment. Return it to the free list */ + if( NULL != recv_frag ) { + OMPI_FREE_LIST_RETURN( &(ptl->gm_recv_frags_free), (ompi_list_item_t*)recv_frag ); + } } + if( NULL == recv_frag ) { + OBJ_RELEASE( convertor ); + } } else { gm_status_t status; ompi_ptr_t* remote_memory = (ompi_ptr_t*)((char*)hdr + sizeof(mca_ptl_base_frag_header_t)); @@ -605,21 +647,40 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl, (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset, hdr->hdr_frag.hdr_frag_length ); if( status != GM_SUCCESS ) { - printf( "Cannot register memory from %p length %lld bytes\n", - (void*)request->req_base.req_addr, hdr->hdr_frag.hdr_frag_length ); + ompi_output( 0, "Cannot register memory from %p length %lld bytes\n", + (void*)request->req_base.req_addr, hdr->hdr_frag.hdr_frag_length ); return NULL; } - + peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer; + recv_frag->frag_recv.frag_base.frag_addr = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset; gm_get( ptl->gm_port, remote_memory->lval, recv_frag->frag_recv.frag_base.frag_addr, recv_frag->frag_recv.frag_base.frag_size, - GM_HIGH_PRIORITY, peer->peer_addr->global_id, peer->peer_addr->port_id, + GM_LOW_PRIORITY, peer->local_id, peer->port_number, mca_ptl_gm_get_callback, recv_frag ); } return NULL; } +static mca_ptl_gm_recv_frag_t* +mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t *ptl, + gm_recv_event_t* event ) +{ + mca_ptl_gm_send_frag_t* frag; + mca_ptl_base_header_t *hdr; + + hdr = (mca_ptl_base_header_t *)gm_ntohp(event->recv.buffer); + frag = (mca_ptl_gm_send_frag_t*)hdr->hdr_ack.hdr_src_ptr.pval; + + ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, + frag->send_frag.frag_request, + hdr->hdr_ack.hdr_dst_size ); + + OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t*)frag ); + return NULL; +} + void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl ) { mca_ptl_gm_recv_frag_t * frag = NULL; @@ -661,13 +722,15 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( struct mca_ptl_gm_module_t *ptl, gm_ case MCA_PTL_HDR_TYPE_RNDV: frag = mca_ptl_gm_recv_frag_match( ptl, event ); break; + case MCA_PTL_HDR_TYPE_FIN: + frag = mca_ptl_gm_recv_frag_fin( ptl, event ); + break; case MCA_PTL_HDR_TYPE_FRAG: frag = mca_ptl_gm_recv_frag_frag( ptl, event ); break; case MCA_PTL_HDR_TYPE_ACK: case MCA_PTL_HDR_TYPE_NACK: - case MCA_PTL_HDR_TYPE_FIN: ptl_gm_ctrl_frag(ptl,header); break; default: diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.h b/src/mca/ptl/gm/src/ptl_gm_priv.h index 53c74b685f..9e3c21c650 100644 --- a/src/mca/ptl/gm/src/ptl_gm_priv.h +++ b/src/mca/ptl/gm/src/ptl_gm_priv.h @@ -20,7 +20,9 @@ struct mca_ptl_gm_send_frag_t; struct mca_ptl_gm_peer_t; -#define PTL_GM_FIRST_FRAG_SIZE (1<<14) +#define PTL_GM_FIRST_FRAG_SIZE (1<<14) +#define PTL_FLAG_GM_HAS_FRAGMENT 0x04 +#define PTL_FLAG_GM_LAST_FRAGMENT 0x08 /*#define DO_DEBUG(inst) inst*/ #define DO_DEBUG(inst) @@ -43,11 +45,7 @@ mca_ptl_gm_peer_send_continue( struct mca_ptl_gm_peer_t *ptl_peer, struct mca_pml_base_send_request_t *sendreq, size_t offset, size_t *size, - int flags, - void *target_buffer, - int bytes ); - - + int flags ); void send_callback(struct gm_port *port,void * context, gm_status_t status); diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.c b/src/mca/ptl/gm/src/ptl_gm_sendfrag.c index 9fc602567d..ffb31ece27 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.c +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.c @@ -86,7 +86,6 @@ mca_ptl_gm_alloc_send_frag( struct mca_ptl_gm_module_t *ptl, sendfrag->type = -1; sendfrag->wait_for_ack = 0; sendfrag->put_sent = -1; - sendfrag->send_complete = -1; return sendfrag; } @@ -114,7 +113,6 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack, ack->type = -1; ack->wait_for_ack = 0; ack->put_sent = -1; - ack->send_complete = -1; request = frag->frag_recv.frag_request; @@ -135,13 +133,12 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack, ack->send_frag.frag_base.frag_addr = NULL; ack->send_frag.frag_base.frag_size = 0; ack->status = 1; /* was able to register memory */ - ack->ptl = ptl; + ack->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t*)ptl; ack->send_frag.frag_base.frag_header.hdr_ack = *hdr; ack->wait_for_ack = 0; ack->type = ACK; return OMPI_SUCCESS; - } @@ -170,13 +167,12 @@ int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* putfrag, } putfrag->status = -1; - putfrag->send_complete = -1; putfrag->wait_for_ack = 0; putfrag->put_sent = 0; putfrag->type = PUT; putfrag->req = sendreq; - putfrag->ptl = gm_ptl; - putfrag->peer = ptl_peer; + putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t*)gm_ptl; + putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer; return OMPI_SUCCESS; } @@ -195,8 +191,6 @@ ompi_class_t mca_ptl_gm_recv_frag_t_class = { static void mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t * frag) { - frag->frag_hdr_cnt = 0; - frag->frag_msg_cnt = 0; } static void diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h index 7f1adb0a50..902eda7218 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h @@ -52,38 +52,35 @@ extern "C" { }; typedef struct mca_ptl_gm_rdv_header_t mca_ptl_gm_rdv_header_t; - /*struct mca_ptl_base_peer_t;*/ + struct mca_ptl_gm_peer_t; /** * GM send fragment derived type. */ struct mca_ptl_gm_send_frag_t { mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */ - void * send_buf; - void * registered_buf; struct mca_pml_base_send_request_t *req; - struct mca_ptl_gm_module_t *ptl; - struct mca_ptl_gm_peer_t *peer; - - uint32_t already_send; /**< data sended so far */ + void* send_buf; + ompi_ptr_t* registered_buf; + + size_t frag_bytes_processed; /**< data sended so far */ + size_t frag_offset; int status; int type; int wait_for_ack; int put_sent; - int send_complete; }; typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t; - + struct mca_ptl_gm_recv_frag_t { mca_ptl_base_recv_frag_t frag_recv; - size_t frag_hdr_cnt; - size_t frag_msg_cnt; + struct mca_pml_base_recv_request_t* req; + size_t frag_bytes_processed; + size_t frag_offset; volatile int frag_progressed; bool frag_ack_pending; void *alloc_recv_buffer; - void *unex_recv_buffer; void * registered_buf; - struct mca_ptl_gm_module_t *ptl; bool matched; bool have_allocated_buffer; bool have_registered_buffer; @@ -94,12 +91,12 @@ extern "C" { mca_ptl_gm_send_frag_t * mca_ptl_gm_alloc_send_frag ( struct mca_ptl_gm_module_t* ptl, struct mca_pml_base_send_request_t* sendreq ); - + int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack, struct mca_ptl_gm_module_t *ptl, struct mca_ptl_gm_peer_t* ptl_peer, struct mca_ptl_gm_recv_frag_t* frag, - char * buffer, + char* buffer, int size ); int