Separate the send request and the DMA memory in 2 lists. Now they are independent as we dont always need a send request to be able to send a message to a peer.
This commit was SVN r3641.
Этот коммит содержится в:
родитель
a777f4ad18
Коммит
02820f0d3b
@ -70,6 +70,7 @@ struct mca_ptl_gm_module_t {
|
||||
struct mca_ptl_gm_recv_frag_t* gm_recv_fragments;
|
||||
|
||||
ompi_free_list_t gm_send_frags;
|
||||
ompi_free_list_t gm_send_dma_frags;
|
||||
ompi_free_list_t gm_recv_frags_free;
|
||||
ompi_list_t gm_send_frags_queue;
|
||||
ompi_list_t gm_pending_acks;
|
||||
|
@ -281,16 +281,28 @@ mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
|
||||
/****************SEND****************************/
|
||||
/* construct a list of send fragments */
|
||||
OBJ_CONSTRUCT (&(ptl->gm_send_frags), ompi_free_list_t);
|
||||
OBJ_CONSTRUCT (&(ptl->gm_send_dma_frags), ompi_free_list_t);
|
||||
OBJ_CONSTRUCT (&(ptl->gm_send_frags_queue), ompi_list_t);
|
||||
|
||||
/* We need a free list just to handle the send fragment that we provide.
|
||||
* This free list does not have the right to allocate any new item
|
||||
* as they should have a DMA buffer attach to them.
|
||||
* Just to make sure that we dont waste memory, we dont allow this list to
|
||||
* grow anymore.
|
||||
*/
|
||||
ompi_free_list_init( &(ptl->gm_send_frags),
|
||||
sizeof (mca_ptl_gm_send_frag_t),
|
||||
OBJ_CLASS (mca_ptl_gm_send_frag_t),
|
||||
0, /* do not allocate any items I'll provide them */
|
||||
sizeof (mca_ptl_gm_send_frag_t),
|
||||
OBJ_CLASS (mca_ptl_gm_send_frag_t),
|
||||
0, /* do not allocate any items I'll provide them */
|
||||
0, /* maximum number of list allocated elements will be zero */
|
||||
0,
|
||||
NULL ); /* not using mpool */
|
||||
/* A free list containing all DMA allocate memory.
|
||||
* This free list does not have the right to allocate any new item
|
||||
* as they should be allocated with a special GM function.
|
||||
*/
|
||||
ompi_free_list_init( &(ptl->gm_send_dma_frags),
|
||||
GM_BUF_SIZE,
|
||||
OBJ_CLASS (ompi_list_item_t),
|
||||
0, /* do not allocate any items I'll provide them */
|
||||
0, /* maximum number of list allocated elements will be zero */
|
||||
0,
|
||||
NULL ); /* not using mpool */
|
||||
@ -306,10 +318,10 @@ mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
for (i = 0; i < ptl->num_send_tokens; i++) {
|
||||
sfragment->send_buf = NULL;
|
||||
OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t *)sfragment );
|
||||
sfragment->send_buf = (char*)ptl->gm_send_dma_memory + i * GM_BUF_SIZE;
|
||||
DO_DEBUG( printf( "%3d : gm register sendreq %p with GM buffer %p\n", i,
|
||||
(void*)sfragment, (void*)sfragment->send_buf ) );
|
||||
OMPI_FREE_LIST_RETURN( &(ptl->gm_send_dma_frags),
|
||||
(ompi_list_item_t *)((char*)ptl->gm_send_dma_memory + i * GM_BUF_SIZE) );
|
||||
sfragment++;
|
||||
}
|
||||
A_PRINT( ("recv_tokens = %d send_tokens = %d, allocted free lis = %d\n",
|
||||
@ -317,7 +329,7 @@ mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
|
||||
|
||||
|
||||
/*****************RECEIVE*****************************/
|
||||
/*allow remote memory access */
|
||||
/* allow remote memory access */
|
||||
if( GM_SUCCESS != gm_allow_remote_memory_access (ptl->gm_port) ) {
|
||||
ompi_output (0, "unable to allow remote memory access\n");
|
||||
}
|
||||
@ -438,7 +450,7 @@ mca_ptl_gm_component_init (int *num_ptl_modules,
|
||||
#else
|
||||
*have_hidden_threads = false;
|
||||
#endif /* OMPI_HAVE_POSIX_THREADS */
|
||||
|
||||
sleep(20);
|
||||
if (OMPI_SUCCESS != mca_ptl_gm_init (&mca_ptl_gm_component)) {
|
||||
ompi_output( 0, "[%s:%d] error in initializing gm state and PTL's.\n",
|
||||
__FILE__, __LINE__ );
|
||||
|
@ -44,10 +44,13 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
|
||||
size_t *size,
|
||||
int flags )
|
||||
{
|
||||
struct iovec outvec;
|
||||
struct iovec iov;
|
||||
size_t size_in,size_out;
|
||||
int header_length;
|
||||
mca_ptl_base_header_t* header;
|
||||
ompi_convertor_t *convertor = NULL;
|
||||
int rc, freeAfter;
|
||||
unsigned int in_size, max_data;
|
||||
|
||||
header = (mca_ptl_base_header_t*)fragment->send_buf;
|
||||
header_length = sizeof(mca_ptl_base_match_header_t);
|
||||
@ -55,15 +58,11 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
|
||||
size_in = *size;
|
||||
|
||||
if( (size_in + header_length) <= GM_BUF_SIZE )
|
||||
outvec.iov_len = size_in;
|
||||
iov.iov_len = size_in;
|
||||
else
|
||||
outvec.iov_len = GM_BUF_SIZE - header_length;
|
||||
|
||||
if(size_in > 0) {
|
||||
ompi_convertor_t *convertor;
|
||||
int rc, freeAfter;
|
||||
unsigned int in_size, max_data;
|
||||
iov.iov_len = GM_BUF_SIZE - header_length;
|
||||
|
||||
if( size_in > 0 ) {
|
||||
/* first fragment (eager send) and first fragment of long protocol
|
||||
* can use the convertor initialized on the request. The remaining
|
||||
* fragments must copy/reinit the convertor.
|
||||
@ -87,21 +86,21 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
|
||||
*/
|
||||
|
||||
/* copy the data to the registered buffer */
|
||||
outvec.iov_base = ((char*)fragment->send_buf) + header_length;
|
||||
max_data = outvec.iov_len;
|
||||
iov.iov_base = ((char*)fragment->send_buf) + header_length;
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
if((rc = ompi_convertor_pack(convertor, &(outvec), &in_size, &max_data, &freeAfter)) < 0)
|
||||
if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if( (header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FRAG) ||
|
||||
(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_MATCH) )
|
||||
header->hdr_match.hdr_msg_length = outvec.iov_len;
|
||||
header->hdr_match.hdr_msg_length = iov.iov_len;
|
||||
|
||||
/* adjust size and request offset to reflect actual number of bytes
|
||||
* packed by convertor
|
||||
*/
|
||||
size_out = outvec.iov_len + header_length;
|
||||
size_out = iov.iov_len + header_length;
|
||||
|
||||
A_PRINT( "peer_send request is %p\t, frag->req = %p, fragment is %p,size is %d, send_frag is %p\n",
|
||||
sendreq, fragment->req, fragment, size_out,
|
||||
@ -116,20 +115,51 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
|
||||
fragment->send_frag.frag_base.frag_size = size_out - header_length;
|
||||
fragment->send_frag.frag_request = sendreq;
|
||||
|
||||
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
|
||||
assert( ptl_peer->peer_ptl->num_send_tokens >= 0 );
|
||||
fragment->already_send = 0;
|
||||
/* initiate the gm send */
|
||||
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf,
|
||||
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
send_callback, (void *)fragment );
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
fragment->send_frag.frag_request,
|
||||
outvec.iov_len );
|
||||
/*gm_send_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf,
|
||||
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
ptl_peer->port_number, send_callback, (void *)fragment );*/
|
||||
if( size_out <= GM_BUF_SIZE ) {
|
||||
/* small message. All data went out */
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
fragment->send_frag.frag_request,
|
||||
iov.iov_len );
|
||||
/*gm_send_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf,
|
||||
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
ptl_peer->port_number, send_callback, (void *)fragment );*/
|
||||
*size = iov.iov_len;
|
||||
} else if( size_in <= (5 * GM_BUF_SIZE) ) { /* eager message */
|
||||
while( size_out < size_in ) {
|
||||
ompi_list_item_t* item;
|
||||
mca_ptl_gm_eager_header_t* header;
|
||||
|
||||
OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
|
||||
iov.iov_base = (char*)header + sizeof(mca_ptl_gm_eager_header_t);
|
||||
iov.iov_len = GM_BUF_SIZE - sizeof(mca_ptl_gm_eager_header_t);
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
|
||||
if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0)
|
||||
return OMPI_ERROR;
|
||||
|
||||
size_out += iov.iov_len;
|
||||
header = (mca_ptl_gm_eager_header_t*)item;
|
||||
header->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
header->hdr_common.hdr_flags = 0;
|
||||
header->hdr_src_ptr.pval = fragment;
|
||||
|
||||
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, header,
|
||||
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
send_callback, (void *)header );
|
||||
}
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
fragment->send_frag.frag_request, size_out );
|
||||
*size = size_out;
|
||||
} else { /* large message */
|
||||
|
||||
}
|
||||
|
||||
*size = (size_out - header_length);
|
||||
A_PRINT("inside peer send : bytes sent is %d\n",*size);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -192,7 +222,7 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
|
||||
}
|
||||
}
|
||||
|
||||
void send_callback(struct gm_port *port,void * context, gm_status_t status)
|
||||
void send_callback( struct gm_port *port, void * context, gm_status_t status )
|
||||
{
|
||||
mca_ptl_gm_module_t *ptl;
|
||||
mca_ptl_gm_send_frag_t *frag;
|
||||
@ -210,32 +240,66 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
|
||||
case GM_SUCCESS:
|
||||
DO_DEBUG( printf( "send_callback for data ptr %p\n", (void*)frag->send_buf ) );
|
||||
ompi_atomic_add( &(ptl->num_send_tokens), 1 );
|
||||
|
||||
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) {
|
||||
A_PRINT("send callback: Completion of send_ack, sent frag is %p\n", frag);
|
||||
|
||||
switch( header->hdr_common.hdr_type ) {
|
||||
case MCA_PTL_HDR_TYPE_MATCH:
|
||||
/*ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
|
||||
header->hdr_match.hdr_msg_length);*/
|
||||
|
||||
/* return the DMA memory */
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)frag->send_buf));
|
||||
frag->send_buf = NULL;
|
||||
/* Return sendfrag to free list */
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
|
||||
} else if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) {
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_RNDV:
|
||||
case MCA_PTL_HDR_TYPE_FRAG:
|
||||
{
|
||||
/* Use by eager messages. It contains just enough informations to be able
|
||||
* to retrieve the fragment to whom it belong. I use the mca_ptl_gm_eager_header_t
|
||||
* and I store in hdr_src_ptr the pointer to the fragment that have been
|
||||
* generated the send operation.
|
||||
*/
|
||||
mca_ptl_gm_eager_header_t* header = (mca_ptl_gm_eager_header_t*)context;
|
||||
frag = (mca_ptl_gm_send_frag_t*)(header->hdr_src_ptr.pval);
|
||||
ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
|
||||
frag->already_send += (GM_BUF_SIZE - sizeof(mca_ptl_gm_eager_header_t));
|
||||
if( frag->already_send > frag->send_frag.frag_base.frag_size ) {
|
||||
frag->send_buf = NULL;
|
||||
/* I will update the status of the request just once when everything is
|
||||
* already done. Dont waste the time :)
|
||||
*/
|
||||
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
|
||||
frag->send_frag.frag_base.frag_size );
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
|
||||
}
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)header));
|
||||
}
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_ACK:
|
||||
A_PRINT("send callback: Completion of send_ack, sent frag is %p\n", frag);
|
||||
/* return the DMA memory */
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)frag->send_buf));
|
||||
frag->send_buf = NULL;
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_FIN:
|
||||
A_PRINT("send callback : Completion of fin, bytes complete = %d\n",
|
||||
header->hdr_ack.hdr_dst_size);
|
||||
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
|
||||
header->hdr_ack.hdr_dst_size);
|
||||
|
||||
/* return the DMA memory */
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)frag->send_buf));
|
||||
frag->send_buf = NULL;
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
|
||||
} else if (0 == (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK)
|
||||
|| mca_pml_base_send_request_matched(gm_send_req)) {
|
||||
A_PRINT(" send callback : match not required\n");
|
||||
/*ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
|
||||
header->hdr_match.hdr_msg_length);*/
|
||||
|
||||
/* Return sendfrag to free list */
|
||||
A_PRINT("Return frag : %p", frag);
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
|
||||
} else {
|
||||
break;
|
||||
default:
|
||||
/* Not going to call progress on this send,
|
||||
* and not free-ing descriptor */
|
||||
frag->send_complete = 1;
|
||||
A_PRINT("send callback : match required but not yet recv ack sendfrag is %p\n",frag);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case GM_SEND_TIMED_OUT:
|
||||
|
@ -59,9 +59,15 @@ mca_ptl_gm_alloc_send_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
|
||||
flist = &(ptl->gm_send_frags);
|
||||
|
||||
/* first get a gm_send_frag */
|
||||
OMPI_FREE_LIST_WAIT( &(ptl->gm_send_frags), item, rc );
|
||||
|
||||
sendfrag = (mca_ptl_gm_send_frag_t *)item;
|
||||
/* And then get some DMA memory to put the data */
|
||||
ompi_atomic_sub( &(ptl->num_send_tokens), 1 );
|
||||
assert( ptl->num_send_tokens >= 0 );
|
||||
OMPI_FREE_LIST_WAIT( &(ptl->gm_send_dma_frags), item, rc );
|
||||
sendfrag->send_buf = (void*)item;
|
||||
|
||||
sendfrag->req = (struct mca_pml_base_send_request_t *)sendreq;
|
||||
GM_DBG( PTL_GM_DBG_COMM, "request is %p\t, frag->req = %p\n", (void*)sendreq, (void*)sendfrag->req );
|
||||
sendfrag->status = -1;
|
||||
|
@ -24,6 +24,13 @@ extern "C" {
|
||||
OBJ_CLASS_DECLARATION (mca_ptl_gm_send_frag_t);
|
||||
OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t);
|
||||
|
||||
/* header definition for intermediary fragments on eager p2p communications */
|
||||
struct mca_ptl_gm_eager_header_t {
|
||||
mca_ptl_base_common_header_t hdr_common; /**< common attributes */
|
||||
ompi_ptr_t hdr_src_ptr; /**< pointer to source fragment */
|
||||
};
|
||||
typedef struct mca_ptl_gm_eager_header_t mca_ptl_gm_eager_header_t;
|
||||
|
||||
/*struct mca_ptl_base_peer_t;*/
|
||||
|
||||
/**
|
||||
@ -37,11 +44,12 @@ extern "C" {
|
||||
struct mca_ptl_gm_module_t *ptl;
|
||||
struct mca_ptl_gm_peer_t *peer;
|
||||
|
||||
int status;
|
||||
int type;
|
||||
int wait_for_ack;
|
||||
int put_sent;
|
||||
int send_complete;
|
||||
uint32_t already_send; /**< data sended so far */
|
||||
int status;
|
||||
int type;
|
||||
int wait_for_ack;
|
||||
int put_sent;
|
||||
int send_complete;
|
||||
};
|
||||
typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t;
|
||||
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user