1
1

More cleanups. New callbcak system with more precise functions. Remove older callbacks.

Rename some internals to have a better conformance with the rest of the project.
Dont use a fragment for the ack on the match, use just a already registered buffer.
Delte a useless file (ptl_gm_addr.h). The structure is already present in the ptl_gm_peer.h file

This commit was SVN r3933.
Этот коммит содержится в:
George Bosilca 2005-01-10 18:33:52 +00:00
родитель 9a83c13779
Коммит bce922bb05
6 изменённых файлов: 113 добавлений и 206 удалений

Просмотреть файл

@ -22,13 +22,12 @@
#include "util/output.h" #include "util/output.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_header.h" #include "mca/ptl/base/ptl_base_header.h"
#include "mca/ptl/base/ptl_base_sendfrag.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_gm.h" #include "ptl_gm.h"
#include "ptl_gm_proc.h" #include "ptl_gm_proc.h"
#include "ptl_gm_req.h" #include "ptl_gm_req.h"
#include "ptl_gm_peer.h" #include "ptl_gm_peer.h"
#include "ptl_gm_priv.h" #include "ptl_gm_priv.h"
#include "ptl_gm_sendfrag.h"
mca_ptl_gm_module_t mca_ptl_gm_module = { mca_ptl_gm_module_t mca_ptl_gm_module = {
{ {
@ -233,7 +232,6 @@ mca_ptl_gm_request_init( struct mca_ptl_base_module_t *ptl,
req = (mca_ptl_gm_send_request_t *)request; req = (mca_ptl_gm_send_request_t *)request;
req->req_frag = frag; req->req_frag = frag;
frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/ frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/
frag->ptl = (mca_ptl_gm_module_t*)ptl;
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
#endif #endif
@ -273,8 +271,7 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
int rc; int rc;
mca_ptl_gm_send_frag_t *putfrag; mca_ptl_gm_send_frag_t *putfrag;
putfrag = mca_ptl_gm_alloc_send_frag( (mca_ptl_gm_module_t*)ptl, sendreq ); /*alloc_put_frag */ rc = mca_ptl_gm_put_frag_init( &putfrag,
rc = mca_ptl_gm_put_frag_init( putfrag,
(mca_ptl_gm_peer_t*)ptl_peer, (mca_ptl_gm_module_t*)ptl, (mca_ptl_gm_peer_t*)ptl_peer, (mca_ptl_gm_module_t*)ptl,
sendreq, offset, &size, flags ); sendreq, offset, &size, flags );
@ -296,6 +293,22 @@ mca_ptl_gm_get (struct mca_ptl_base_module_t *ptl,
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
static void mca_ptl_gm_basic_ack_callback( struct gm_port* port, void* context, gm_status_t status )
{
mca_ptl_gm_module_t* gm_ptl;
mca_ptl_base_frag_t* frag_base;
mca_ptl_base_header_t* header;
header = (mca_ptl_base_header_t*)context;
frag_base = (mca_ptl_base_frag_t*)header->hdr_ack.hdr_dst_addr.pval;
gm_ptl = (mca_ptl_gm_module_t *)frag_base->frag_owner;
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) );
/* release the send token */
ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 );
}
/* A posted receive has been matched - if required send an /* A posted receive has been matched - if required send an
* ack back to the peer and process the fragment. * ack back to the peer and process the fragment.
*/ */
@ -305,41 +318,43 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
{ {
mca_pml_base_recv_request_t *request; mca_pml_base_recv_request_t *request;
mca_ptl_base_header_t *hdr; mca_ptl_base_header_t *hdr;
int rc; int32_t rc;
mca_ptl_gm_module_t *gm_ptl; mca_ptl_gm_module_t *gm_ptl;
mca_ptl_gm_send_frag_t *ack;
mca_ptl_gm_recv_frag_t *recv_frag; mca_ptl_gm_recv_frag_t *recv_frag;
mca_ptl_gm_peer_t* peer; mca_ptl_gm_peer_t* peer;
struct iovec iov = { NULL, 0}; struct iovec iov = { NULL, 0};
gm_ptl = (mca_ptl_gm_module_t *)ptl; gm_ptl = (mca_ptl_gm_module_t *)ptl;
hdr = &frag->frag_base.frag_header;
request = frag->frag_request; request = frag->frag_request;
recv_frag = (mca_ptl_gm_recv_frag_t *)frag; recv_frag = (mca_ptl_gm_recv_frag_t *)frag;
peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer; peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer;
if( hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK ) { if( frag->frag_base.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK ) { /* need to send an ack back */
/* need to send an ack back */ ompi_list_item_t *item;
ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL );
if( NULL == ack ) { OMPI_FREE_LIST_TRY_GET( &(gm_ptl->gm_send_dma_frags), item );
if( NULL == item ) {
ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n", __FILE__,__LINE__); ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n", __FILE__,__LINE__);
OMPI_THREAD_LOCK (&mca_ptl_gm_component.gm_lock); OMPI_THREAD_LOCK (&mca_ptl_gm_component.gm_lock);
ompi_list_append (&mca_ptl_gm_module.gm_pending_acks, (ompi_list_item_t *)frag); ompi_list_append (&mca_ptl_gm_module.gm_pending_acks, (ompi_list_item_t *)frag);
OMPI_THREAD_UNLOCK (&mca_ptl_gm_component.gm_lock); OMPI_THREAD_UNLOCK (&mca_ptl_gm_component.gm_lock);
} else { } else {
mca_ptl_base_header_t* ack_hdr = (mca_ptl_base_header_t*)ack->send_buf; ompi_atomic_sub( &(gm_ptl->num_send_tokens), 1 );
ack_hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; assert( gm_ptl->num_send_tokens >= 0 );
ack_hdr->hdr_ack.hdr_common.hdr_flags = 0; hdr = (mca_ptl_base_header_t*)item;
ack_hdr->hdr_ack.hdr_src_ptr = hdr->hdr_rndv.hdr_src_ptr;
ack_hdr->hdr_ack.hdr_dst_match.lval = 0L; hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
/* just a easy way to remember that there is a request not a fragment */ hdr->hdr_ack.hdr_common.hdr_flags = 0;
ack_hdr->hdr_ack.hdr_dst_match.pval = request; hdr->hdr_ack.hdr_src_ptr = frag->frag_base.frag_header.hdr_rndv.hdr_src_ptr;
ack_hdr->hdr_ack.hdr_dst_addr.lval = 0L; hdr->hdr_ack.hdr_dst_match.lval = 0L;
ack_hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed; hdr->hdr_ack.hdr_dst_match.pval = request;
gm_send_to_peer_with_callback( ((mca_ptl_gm_module_t*)ptl)->gm_port, ack_hdr, hdr->hdr_ack.hdr_dst_addr.lval = 0L;
hdr->hdr_ack.hdr_dst_addr.pval = frag;
hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed;
gm_send_to_peer_with_callback( ((mca_ptl_gm_module_t*)ptl)->gm_port, hdr,
GM_SIZE, sizeof(mca_ptl_base_ack_header_t), GM_LOW_PRIORITY, GM_SIZE, sizeof(mca_ptl_base_ack_header_t), GM_LOW_PRIORITY,
peer->local_id, peer->local_id, mca_ptl_gm_basic_ack_callback, (void *)hdr );
send_callback, (void *)ack );
} }
} }

Просмотреть файл

@ -1,38 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004 The Ohio State University.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PTL_GM_ADDR_H
#define MCA_PTL_GM_ADDR_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
/**
* Structure used to publish GM id information to peers.
*/
struct mca_ptl_gm_addr_t {
unsigned int global_id;
unsigned int local_id;
unsigned int port_id;
};
typedef struct mca_ptl_gm_addr_t mca_ptl_gm_addr_t;
#endif

Просмотреть файл

@ -37,11 +37,11 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta
header = (mca_ptl_base_header_t*)context; header = (mca_ptl_base_header_t*)context;
frag = header->hdr_frag.hdr_src_ptr.pval; frag = header->hdr_frag.hdr_src_ptr.pval;
gm_ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner; gm_ptl = (mca_ptl_gm_module_t *)frag->frag_send.frag_base.frag_owner;
switch( status ) { switch( status ) {
case GM_SUCCESS: case GM_SUCCESS:
if( frag->send_frag.frag_base.frag_size <= mca_ptl_gm_component.gm_eager_limit ) { if( frag->frag_send.frag_base.frag_size <= mca_ptl_gm_component.gm_eager_limit ) {
/* small message */ /* small message */
frag->frag_bytes_validated += header->hdr_frag.hdr_frag_length; frag->frag_bytes_validated += header->hdr_frag.hdr_frag_length;
} }
@ -50,7 +50,7 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta
/* release the send token */ /* release the send token */
ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 ); ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 );
if( frag->frag_bytes_validated >= frag->send_frag.frag_base.frag_size ) { if( frag->frag_bytes_validated >= frag->frag_send.frag_base.frag_size ) {
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) ); OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) );
} }
break; break;
@ -65,7 +65,7 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta
} }
} }
static void send_continue_short_callback( struct gm_port* port, void* context, gm_status_t status ) static void mca_ptl_gm_basic_frag_callback( struct gm_port* port, void* context, gm_status_t status )
{ {
mca_ptl_gm_module_t* gm_ptl; mca_ptl_gm_module_t* gm_ptl;
mca_ptl_base_frag_t* frag_base; mca_ptl_base_frag_t* frag_base;
@ -92,15 +92,15 @@ int mca_ptl_gm_send_next_long_segment( mca_ptl_gm_send_frag_t* frag,
int32_t hdr_flags = 0; int32_t hdr_flags = 0;
mca_ptl_gm_frag_header_t* hdr; mca_ptl_gm_frag_header_t* hdr;
ptl_peer = (struct mca_ptl_gm_peer_t*)frag->send_frag.frag_base.frag_peer; ptl_peer = (struct mca_ptl_gm_peer_t*)frag->frag_send.frag_base.frag_peer;
length = frag->send_frag.frag_base.frag_size - frag->frag_bytes_processed; length = frag->frag_send.frag_base.frag_size - frag->frag_bytes_processed;
if( length <= mca_ptl_gm_component.gm_rdma_frag_size ) { if( length <= mca_ptl_gm_component.gm_rdma_frag_size ) {
hdr_flags = PTL_FLAG_GM_LAST_FRAGMENT; hdr_flags = PTL_FLAG_GM_LAST_FRAGMENT;
} else { } else {
length = mca_ptl_gm_component.gm_rdma_frag_size; length = mca_ptl_gm_component.gm_rdma_frag_size;
} }
pointer = (char*)frag->send_frag.frag_base.frag_addr + frag->frag_offset + frag->frag_bytes_processed; pointer = (char*)frag->frag_send.frag_base.frag_addr + frag->frag_offset + frag->frag_bytes_processed;
if( flags & GM_PTL_SEND_MESSAGE ) { if( flags & GM_PTL_SEND_MESSAGE ) {
ompi_list_item_t* item; ompi_list_item_t* item;
@ -110,10 +110,10 @@ int mca_ptl_gm_send_next_long_segment( mca_ptl_gm_send_frag_t* frag,
hdr = (mca_ptl_gm_frag_header_t*)item; hdr = (mca_ptl_gm_frag_header_t*)item;
hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_frag.hdr_common.hdr_flags = frag->send_frag.frag_base.frag_header.hdr_common.hdr_flags | hdr_flags; hdr->hdr_frag.hdr_common.hdr_flags = frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags | hdr_flags;
hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr->hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = frag; hdr->hdr_frag.hdr_src_ptr.pval = frag;
hdr->hdr_frag.hdr_dst_ptr = frag->send_frag.frag_base.frag_header.hdr_ack.hdr_dst_match; hdr->hdr_frag.hdr_dst_ptr = frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match;
hdr->hdr_frag.hdr_frag_offset = frag->frag_bytes_processed; hdr->hdr_frag.hdr_frag_offset = frag->frag_bytes_processed;
hdr->hdr_frag.hdr_frag_length = length; hdr->hdr_frag.hdr_frag_length = length;
hdr->registered_memory.lval = 0L; hdr->registered_memory.lval = 0L;
@ -125,7 +125,7 @@ int mca_ptl_gm_send_next_long_segment( mca_ptl_gm_send_frag_t* frag,
send_continue_callback, (void*)hdr ); send_continue_callback, (void*)hdr );
frag->frag_bytes_processed += length; frag->frag_bytes_processed += length;
pointer += length; pointer += length;
length = frag->send_frag.frag_base.frag_size - frag->frag_bytes_processed; length = frag->frag_send.frag_base.frag_size - frag->frag_bytes_processed;
if( length > mca_ptl_gm_component.gm_rdma_frag_size ) if( length > mca_ptl_gm_component.gm_rdma_frag_size )
length = mca_ptl_gm_component.gm_rdma_frag_size; length = mca_ptl_gm_component.gm_rdma_frag_size;
} }
@ -160,18 +160,18 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
* before attempting to send the fragment * before attempting to send the fragment
*/ */
mca_pml_base_send_request_offset( sendreq, mca_pml_base_send_request_offset( sendreq,
fragment->send_frag.frag_base.frag_size ); fragment->frag_send.frag_base.frag_size );
/* The first DMA memory buffer has been alocated in same time as the fragment */ /* The first DMA memory buffer has been alocated in same time as the fragment */
item = (ompi_list_item_t*)fragment->send_buf; item = (ompi_list_item_t*)fragment->send_buf;
hdr = (mca_ptl_gm_frag_header_t*)item; hdr = (mca_ptl_gm_frag_header_t*)item;
remaining_bytes = fragment->send_frag.frag_base.frag_size - fragment->frag_bytes_processed; remaining_bytes = fragment->frag_send.frag_base.frag_size - fragment->frag_bytes_processed;
if( remaining_bytes <= mca_ptl_gm_component.gm_eager_limit ) { /* small protocol */ if( remaining_bytes <= mca_ptl_gm_component.gm_eager_limit ) { /* small protocol */
int32_t freeAfter; int32_t freeAfter;
uint32_t max_data, in_size; uint32_t max_data, in_size;
struct iovec iov; struct iovec iov;
ompi_convertor_t *convertor = &(fragment->send_frag.frag_base.frag_convertor); ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor);
/* If we have an eager send then we should send the rest of the data. */ /* If we have an eager send then we should send the rest of the data. */
while( 0 < remaining_bytes ) { while( 0 < remaining_bytes ) {
@ -213,7 +213,7 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
*size = fragment->frag_bytes_processed; *size = fragment->frag_bytes_processed;
if( !(flags & MCA_PTL_FLAGS_ACK) ) { if( !(flags & MCA_PTL_FLAGS_ACK) ) {
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl, ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
fragment->send_frag.frag_request, fragment->frag_send.frag_request,
(*size) ); (*size) );
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -236,7 +236,7 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, hdr, gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, hdr,
GM_SIZE, sizeof(mca_ptl_base_frag_header_t) + sizeof(ompi_ptr_t), GM_SIZE, sizeof(mca_ptl_base_frag_header_t) + sizeof(ompi_ptr_t),
GM_LOW_PRIORITY, ptl_peer->local_id, GM_LOW_PRIORITY, ptl_peer->local_id,
send_continue_short_callback, (void *)hdr ); mca_ptl_gm_basic_frag_callback, (void *)hdr );
/* Now we are waiting for the ack message. Meanwhile we can register the sender first piece /* Now we are waiting for the ack message. Meanwhile we can register the sender first piece
* of data. In this way we have a recovery between the expensive registration on both sides. * of data. In this way we have a recovery between the expensive registration on both sides.
@ -338,70 +338,6 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
void send_callback( struct gm_port *port, void * context, gm_status_t status )
{
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *frag;
mca_ptl_base_header_t* header;
int hdr_type, hdr_flags;
size_t hdr_dst_size;
frag = (mca_ptl_gm_send_frag_t *)context;
ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
header = (mca_ptl_base_header_t*)frag->send_buf;
frag->send_buf = NULL;
hdr_type = header->hdr_common.hdr_type;
hdr_flags = header->hdr_common.hdr_flags;
hdr_dst_size = header->hdr_ack.hdr_dst_size;
switch (status) {
case GM_SUCCESS:
/* release the send DMA buffer as soon as possible */
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)header));
/* release the send token */
ompi_atomic_add( &(ptl->num_send_tokens), 1 );
switch( hdr_type ) {
case MCA_PTL_HDR_TYPE_FRAG:
case MCA_PTL_HDR_TYPE_MATCH:
if( !(hdr_flags & MCA_PTL_FLAGS_ACK) ) {
/* Return sendfrag to free list */
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
}
break;
case MCA_PTL_HDR_TYPE_RNDV:
/*OMPI_OUTPUT( (0, "[%s:%d] send_callback release header %p from fragment %p (available %d)\n",
__FILE__, __LINE__, (void*)header, (void*)frag, ptl->num_send_tokens) );*/
/* As we actually use the read semantics for long messages, we dont
* have to do anything special here except to release the DMA memory buffer.
*/
break;
case MCA_PTL_HDR_TYPE_ACK:
case MCA_PTL_HDR_TYPE_FIN:
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
break;
default:
/* Not going to call progress on this send, and not free-ing descriptor */
printf( "Called with a strange headertype ...\n" );
break;
}
break;
case GM_SEND_TIMED_OUT:
/* need to take care of retransmission */
break;
case GM_SEND_DROPPED:
/* need to handle this case */
break;
default:
ompi_output( 0, "[%s:%d] error in message completion\n", __FILE__, __LINE__ );
break;
}
}
static mca_ptl_gm_recv_frag_t* static mca_ptl_gm_recv_frag_t*
mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl, mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t * header ) mca_ptl_base_header_t * header )
@ -412,8 +348,8 @@ mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl,
if( header->hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) { if( header->hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) {
mca_ptl_gm_send_frag_t* frag = (mca_ptl_gm_send_frag_t*)(header->hdr_ack.hdr_src_ptr.pval); mca_ptl_gm_send_frag_t* frag = (mca_ptl_gm_send_frag_t*)(header->hdr_ack.hdr_src_ptr.pval);
/* update the fragment header with the most up2date informations */ /* update the fragment header with the most up2date informations */
frag->send_frag.frag_base.frag_header.hdr_ack.hdr_dst_match = header->hdr_ack.hdr_dst_match; frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match = header->hdr_ack.hdr_dst_match;
req = frag->send_frag.frag_request; req = frag->frag_send.frag_request;
assert(req != NULL); assert(req != NULL);
req->req_peer_match = header->hdr_ack.hdr_dst_match; req->req_peer_match = header->hdr_ack.hdr_dst_match;
req->req_peer_addr = header->hdr_ack.hdr_dst_addr; req->req_peer_addr = header->hdr_ack.hdr_dst_addr;
@ -421,11 +357,11 @@ mca_ptl_gm_ctrl_frag( struct mca_ptl_gm_module_t *ptl,
if( (req->req_peer_size != 0) && (req->req_peer_addr.pval == NULL) ) { if( (req->req_peer_size != 0) && (req->req_peer_addr.pval == NULL) ) {
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
req, frag->send_frag.frag_base.frag_size ); req, frag->frag_send.frag_base.frag_size );
OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t *)frag ); OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t *)frag );
} else { } else {
if( header->hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) { if( header->hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) {
frag->send_frag.frag_base.frag_header.hdr_common.hdr_flags |= PTL_FLAG_GM_HAS_FRAGMENT; frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags |= PTL_FLAG_GM_HAS_FRAGMENT;
} }
} }
} else { /* initial reply to a rendez-vous request */ } else { /* initial reply to a rendez-vous request */
@ -515,7 +451,7 @@ static void recv_short_callback( struct gm_port* port, void* context, gm_status_
} }
static int mca_ptl_gm_send_quick_fin_message( struct mca_ptl_gm_peer_t* ptl_peer, static int mca_ptl_gm_send_quick_fin_message( struct mca_ptl_gm_peer_t* ptl_peer,
struct mca_ptl_gm_recv_frag_t* frag ) struct mca_ptl_base_frag_t* frag )
{ {
ompi_list_item_t *item; ompi_list_item_t *item;
mca_ptl_base_header_t *hdr; mca_ptl_base_header_t *hdr;
@ -527,12 +463,12 @@ static int mca_ptl_gm_send_quick_fin_message( struct mca_ptl_gm_peer_t* ptl_peer
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
hdr->hdr_common.hdr_flags = PTL_FLAG_GM_HAS_FRAGMENT; hdr->hdr_common.hdr_flags = PTL_FLAG_GM_HAS_FRAGMENT;
hdr->hdr_ack.hdr_src_ptr.pval = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr.pval; hdr->hdr_ack.hdr_src_ptr.pval = frag->frag_header.hdr_frag.hdr_src_ptr.pval;
hdr->hdr_ack.hdr_dst_match.lval = 0; hdr->hdr_ack.hdr_dst_match.lval = 0;
hdr->hdr_ack.hdr_dst_match.pval = frag; hdr->hdr_ack.hdr_dst_match.pval = frag;
hdr->hdr_ack.hdr_dst_addr.lval = 0; /*we are filling both p and val of dest address */ hdr->hdr_ack.hdr_dst_addr.lval = 0; /*we are filling both p and val of dest address */
hdr->hdr_ack.hdr_dst_addr.pval = NULL; hdr->hdr_ack.hdr_dst_addr.pval = NULL;
hdr->hdr_ack.hdr_dst_size = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length; hdr->hdr_ack.hdr_dst_size = frag->frag_header.hdr_frag.hdr_frag_length;
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, sizeof(mca_ptl_base_ack_header_t), gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, hdr, GM_SIZE, sizeof(mca_ptl_base_ack_header_t),
GM_LOW_PRIORITY, ptl_peer->local_id, GM_LOW_PRIORITY, ptl_peer->local_id,
@ -567,7 +503,7 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st
length = mca_ptl_gm_component.gm_rdma_frag_size; length = mca_ptl_gm_component.gm_rdma_frag_size;
/* send an ack message to the sender */ /* send an ack message to the sender */
mca_ptl_gm_send_quick_fin_message( peer, frag ); mca_ptl_gm_send_quick_fin_message( peer, &(frag->frag_recv.frag_base) );
frag->frag_bytes_validated += length; frag->frag_bytes_validated += length;
@ -659,8 +595,8 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
if( NULL == hdr->registered_memory.pval ) { /* first round of the local rendez-vous protocol */ if( NULL == hdr->registered_memory.pval ) { /* first round of the local rendez-vous protocol */
/* send an ack message to the sender ... quick hack (TODO) */ /* send an ack message to the sender ... quick hack (TODO) */
recv_frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = 0; recv_frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = 0;
mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer, mca_ptl_gm_send_quick_fin_message( (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer,
recv_frag ); &(recv_frag->frag_recv.frag_base) );
recv_frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length; recv_frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length = hdr->hdr_frag.hdr_frag_length;
if( length >= hdr->hdr_frag.hdr_frag_length ) if( length >= hdr->hdr_frag.hdr_frag_length )
length = hdr->hdr_frag.hdr_frag_length; length = hdr->hdr_frag.hdr_frag_length;
@ -705,19 +641,19 @@ mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t* ptl,
frag = (mca_ptl_gm_send_frag_t*)hdr->hdr_ack.hdr_src_ptr.pval; frag = (mca_ptl_gm_send_frag_t*)hdr->hdr_ack.hdr_src_ptr.pval;
frag->send_frag.frag_base.frag_header.hdr_common.hdr_flags = hdr->hdr_common.hdr_flags; frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags = hdr->hdr_common.hdr_flags;
frag->send_frag.frag_base.frag_header.hdr_ack.hdr_dst_match = hdr->hdr_ack.hdr_dst_match; frag->frag_send.frag_base.frag_header.hdr_ack.hdr_dst_match = hdr->hdr_ack.hdr_dst_match;
memory_to_deregister = (char*)frag->send_frag.frag_base.frag_addr memory_to_deregister = (char*)frag->frag_send.frag_base.frag_addr
+ frag->frag_offset + frag->frag_bytes_validated; + frag->frag_offset + frag->frag_bytes_validated;
length = frag->send_frag.frag_base.frag_size - frag->frag_bytes_validated; length = frag->frag_send.frag_base.frag_size - frag->frag_bytes_validated;
if( 0 == frag->frag_bytes_processed ) { if( 0 == frag->frag_bytes_processed ) {
/* I just receive the ack for the first fragment => setup the pipeline */ /* I just receive the ack for the first fragment => setup the pipeline */
mca_ptl_gm_send_next_long_segment( frag, GM_PTL_SEND_MESSAGE | GM_PTL_REGISTER_MEMORY ); mca_ptl_gm_send_next_long_segment( frag, GM_PTL_SEND_MESSAGE | GM_PTL_REGISTER_MEMORY );
} }
/* continue the pipeline ... send the next segment */ /* continue the pipeline ... send the next segment */
if( frag->frag_bytes_processed != frag->send_frag.frag_base.frag_size ) { if( frag->frag_bytes_processed != frag->frag_send.frag_base.frag_size ) {
/* If there is still something pending ... */ /* If there is still something pending ... */
mca_ptl_gm_send_next_long_segment( frag, GM_PTL_SEND_MESSAGE | GM_PTL_REGISTER_MEMORY ); mca_ptl_gm_send_next_long_segment( frag, GM_PTL_SEND_MESSAGE | GM_PTL_REGISTER_MEMORY );
} }
@ -726,10 +662,10 @@ mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t* ptl,
if( length > mca_ptl_gm_component.gm_rdma_frag_size ) if( length > mca_ptl_gm_component.gm_rdma_frag_size )
length = mca_ptl_gm_component.gm_rdma_frag_size; length = mca_ptl_gm_component.gm_rdma_frag_size;
frag->frag_bytes_validated += length; frag->frag_bytes_validated += length;
if( frag->send_frag.frag_base.frag_size == frag->frag_bytes_validated ) { if( frag->frag_send.frag_base.frag_size == frag->frag_bytes_validated ) {
/* mark the request as done before deregistering the memory */ /* mark the request as done before deregistering the memory */
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
frag->send_frag.frag_request, frag->frag_send.frag_request,
frag->frag_bytes_validated ); frag->frag_bytes_validated );
OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t*)frag ); OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t*)frag );
} }

Просмотреть файл

@ -46,5 +46,3 @@ mca_ptl_gm_peer_send_continue( struct mca_ptl_gm_peer_t *ptl_peer,
size_t offset, size_t offset,
size_t *size, size_t *size,
int flags ); int flags );
void send_callback(struct gm_port *port,void * context, gm_status_t status);

Просмотреть файл

@ -22,23 +22,23 @@
#include "ptl_gm_sendfrag.h" #include "ptl_gm_sendfrag.h"
#include "ptl_gm_priv.h" #include "ptl_gm_priv.h"
static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag); static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t* frag);
static void mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t * frag); static void mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t* frag);
static void mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t * frag); static void mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t* frag);
static void mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t * frag); static void mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t* frag);
/* /*
* send fragment constructor/destructors. * send fragment constructor/destructors.
*/ */
static void static void
mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag) mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t* frag)
{ {
} }
static void static void
mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t * frag) mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t* frag)
{ {
} }
@ -59,69 +59,65 @@ ompi_class_t mca_ptl_gm_send_frag_t_class = {
* *
* I will implement the first case and add the second one in my TODO list. * I will implement the first case and add the second one in my TODO list.
*/ */
mca_ptl_gm_send_frag_t * mca_ptl_gm_send_frag_t*
mca_ptl_gm_alloc_send_frag( struct mca_ptl_gm_module_t *ptl, mca_ptl_gm_alloc_send_frag( struct mca_ptl_gm_module_t* ptl,
struct mca_pml_base_send_request_t * sendreq ) struct mca_pml_base_send_request_t* sendreq )
{ {
ompi_list_item_t *item; ompi_list_item_t* item;
mca_ptl_gm_send_frag_t *sendfrag; mca_ptl_gm_send_frag_t* sendfrag;
int32_t rc; int32_t rc;
/* first get a gm_send_frag */ /* first get a gm_send_frag */
OMPI_FREE_LIST_GET( &(ptl->gm_send_frags), item, rc ); OMPI_FREE_LIST_GET( &(ptl->gm_send_frags), item, rc );
sendfrag = (mca_ptl_gm_send_frag_t *)item; sendfrag = (mca_ptl_gm_send_frag_t*)item;
/* And then get some DMA memory to put the data */ /* And then get some DMA memory to put the data */
OMPI_FREE_LIST_WAIT( &(ptl->gm_send_dma_frags), item, rc ); OMPI_FREE_LIST_WAIT( &(ptl->gm_send_dma_frags), item, rc );
ompi_atomic_sub( &(ptl->num_send_tokens), 1 ); ompi_atomic_sub( &(ptl->num_send_tokens), 1 );
assert( ptl->num_send_tokens >= 0 ); assert( ptl->num_send_tokens >= 0 );
sendfrag->send_buf = (void*)item; sendfrag->send_buf = (void*)item;
sendfrag->send_frag.frag_request = sendreq; sendfrag->frag_send.frag_request = sendreq;
sendfrag->send_frag.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl; sendfrag->frag_send.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl;
sendfrag->frag_send.frag_base.frag_addr = sendreq->req_base.req_addr;
sendfrag->frag_bytes_processed = 0; sendfrag->frag_bytes_processed = 0;
sendfrag->frag_bytes_validated = 0; sendfrag->frag_bytes_validated = 0;
sendfrag->status = -1; sendfrag->status = -1;
sendfrag->type = -1; sendfrag->type = PUT;
return sendfrag; return sendfrag;
} }
int mca_ptl_gm_send_frag_done( mca_ptl_gm_send_frag_t * frag, int mca_ptl_gm_send_frag_done( mca_ptl_gm_send_frag_t* frag,
mca_pml_base_send_request_t * req ) mca_pml_base_send_request_t* req )
{ {
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* putfrag, int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t** putfrag,
struct mca_ptl_gm_peer_t* ptl_peer, struct mca_ptl_gm_peer_t* ptl_peer,
struct mca_ptl_gm_module_t* gm_ptl, struct mca_ptl_gm_module_t* gm_ptl,
struct mca_pml_base_send_request_t* sendreq, struct mca_pml_base_send_request_t* sendreq,
size_t offset, size_t offset, size_t* size, int flags )
size_t* size,
int flags )
{ {
ompi_convertor_t *convertor = NULL; ompi_convertor_t* convertor;
mca_ptl_gm_send_frag_t* frag;
putfrag->send_frag.frag_request = sendreq; frag = mca_ptl_gm_alloc_send_frag( gm_ptl, sendreq ); /*alloc_put_frag */
putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t*)gm_ptl; frag->frag_send.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
putfrag->send_frag.frag_base.frag_addr = sendreq->req_base.req_addr; frag->frag_send.frag_base.frag_size = *size;
putfrag->send_frag.frag_base.frag_size = *size; frag->frag_offset = offset;
putfrag->frag_offset = offset;
putfrag->frag_bytes_processed = 0;
putfrag->frag_bytes_validated = 0;
if( (*size) > 0 ) { if( (*size) > 0 ) {
convertor = &(putfrag->send_frag.frag_base.frag_convertor); convertor = &(frag->frag_send.frag_base.frag_convertor);
ompi_convertor_copy( &(sendreq->req_convertor), convertor );
ompi_convertor_init_for_send( convertor, 0, ompi_convertor_init_for_send( convertor, 0,
sendreq->req_base.req_datatype, sendreq->req_base.req_datatype,
sendreq->req_base.req_count, sendreq->req_base.req_count,
sendreq->req_base.req_addr, sendreq->req_base.req_addr,
offset, NULL ); offset, NULL );
} }
*putfrag = frag;
putfrag->status = -1;
putfrag->type = PUT;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -131,12 +127,12 @@ int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* putfrag,
*/ */
static void static void
mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t * frag) mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t* frag)
{ {
} }
static void static void
mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t *frag) mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t* frag)
{ {
} }

Просмотреть файл

@ -73,7 +73,7 @@ extern "C" {
* GM send fragment derived type. * GM send fragment derived type.
*/ */
struct mca_ptl_gm_send_frag_t { struct mca_ptl_gm_send_frag_t {
mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */ mca_ptl_base_send_frag_t frag_send; /**< base send fragment descriptor */
void* send_buf; void* send_buf;
ompi_ptr_t* registered_buf; ompi_ptr_t* registered_buf;
@ -102,17 +102,17 @@ extern "C" {
typedef struct mca_ptl_gm_recv_frag_t mca_ptl_gm_recv_frag_t; typedef struct mca_ptl_gm_recv_frag_t mca_ptl_gm_recv_frag_t;
mca_ptl_gm_send_frag_t * mca_ptl_gm_send_frag_t *
mca_ptl_gm_alloc_send_frag ( struct mca_ptl_gm_module_t* ptl, mca_ptl_gm_alloc_send_frag( struct mca_ptl_gm_module_t* ptl,
struct mca_pml_base_send_request_t* sendreq ); struct mca_pml_base_send_request_t* sendreq );
int int
mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* sendfrag, mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t** sendfrag,
struct mca_ptl_gm_peer_t * ptl_peer, struct mca_ptl_gm_peer_t * ptl_peer,
struct mca_ptl_gm_module_t *ptl, struct mca_ptl_gm_module_t *ptl,
struct mca_pml_base_send_request_t * sendreq, struct mca_pml_base_send_request_t * sendreq,
size_t offset, size_t offset,
size_t* size, size_t* size,
int flags ); int flags );
#define OMPI_FREE_LIST_TRY_GET(fl, item) \ #define OMPI_FREE_LIST_TRY_GET(fl, item) \
{ \ { \