First fully working version of GM. Next step ... improuvements ...
This commit was SVN r3843.
Этот коммит содержится в:
родитель
c134c911e3
Коммит
17f37130d2
@ -309,9 +309,7 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
|
||||
sendreq, offset, &size, flags );
|
||||
|
||||
rc = mca_ptl_gm_peer_send_continue( (mca_ptl_gm_peer_t *)ptl_peer, putfrag,
|
||||
sendreq, offset, &size, flags,
|
||||
((char*)(sendreq->req_base.req_addr)) + offset,
|
||||
size );
|
||||
sendreq, offset, &size, flags );
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -356,7 +354,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
|
||||
header_length = sizeof(mca_ptl_base_match_header_t);
|
||||
}
|
||||
|
||||
if (hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) {
|
||||
if( hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK ) {
|
||||
/* need to send an ack back */
|
||||
ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL );
|
||||
if( NULL == ack ) {
|
||||
@ -369,9 +367,10 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
|
||||
mca_ptl_base_header_t* ack_hdr = (mca_ptl_base_header_t*)ack->send_buf;
|
||||
ack_hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
|
||||
ack_hdr->hdr_ack.hdr_common.hdr_flags = 0;
|
||||
ack_hdr->hdr_ack.hdr_src_ptr.lval = hdr->hdr_rndv.hdr_src_ptr.lval;
|
||||
ack_hdr->hdr_ack.hdr_src_ptr = hdr->hdr_rndv.hdr_src_ptr;
|
||||
ack_hdr->hdr_ack.hdr_dst_match.lval = 0L;
|
||||
ack_hdr->hdr_ack.hdr_dst_match.pval = frag;
|
||||
/* just a easy way to remember that there is a request not a fragment */
|
||||
ack_hdr->hdr_ack.hdr_dst_match.pval = request;
|
||||
ack_hdr->hdr_ack.hdr_dst_addr.lval = 0L;
|
||||
ack_hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed;
|
||||
gm_send_to_peer_with_callback( ((mca_ptl_gm_module_t*)ptl)->gm_port, ack_hdr,
|
||||
@ -407,7 +406,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
|
||||
assert( rc >= 0 );
|
||||
}
|
||||
|
||||
/* update progress*/
|
||||
/* update progress*/
|
||||
ptl->ptl_recv_progress( ptl, request, bytes_recv, bytes_recv );
|
||||
|
||||
/* Now update the status of the fragment */
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include "ptl_gm_proc.h"
|
||||
#include "ptl_gm_sendfrag.h"
|
||||
#include "ptl_gm_priv.h"
|
||||
#include "mca/pml/teg/src/pml_teg_proc.h"
|
||||
|
||||
static void send_continue_callback( struct gm_port *port, void * context, gm_status_t status )
|
||||
{
|
||||
@ -42,12 +43,16 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta
|
||||
case GM_SUCCESS:
|
||||
/*OMPI_OUTPUT( (0, "[%s:%d] send_continue_callback release header %p from fragment %p (available %d)\n",
|
||||
__FILE__, __LINE__, (void*)header, (void*)frag, gm_ptl->num_send_tokens) );*/
|
||||
frag->already_send += header->hdr_frag.hdr_frag_length;
|
||||
if( header->hdr_frag.hdr_frag_length <= (5 * GM_BUF_SIZE) ) {
|
||||
/* small message */
|
||||
frag->frag_bytes_processed += header->hdr_frag.hdr_frag_length;
|
||||
}
|
||||
|
||||
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) );
|
||||
/* release the send token */
|
||||
ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 );
|
||||
|
||||
if( frag->already_send >= frag->send_frag.frag_base.frag_size ) {
|
||||
if( frag->frag_bytes_processed >= frag->send_frag.frag_base.frag_size ) {
|
||||
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) );
|
||||
}
|
||||
break;
|
||||
@ -67,12 +72,10 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
struct mca_pml_base_send_request_t *sendreq,
|
||||
size_t offset,
|
||||
size_t *size,
|
||||
int flags,
|
||||
void * target_buffer,
|
||||
int bytes )
|
||||
int flags )
|
||||
{
|
||||
mca_ptl_base_header_t hdr;
|
||||
size_t update_offset = offset, header_length = sizeof(mca_ptl_base_frag_header_t);
|
||||
size_t header_length = sizeof(mca_ptl_base_frag_header_t);
|
||||
ompi_list_item_t* item;
|
||||
int rc = 0;
|
||||
|
||||
@ -80,59 +83,68 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
__FILE__, __LINE__, (void*)ptl_peer, (void*)fragment, (void*)sendreq, offset, *size,
|
||||
flags, target_buffer, bytes) );*/
|
||||
|
||||
fragment->send_frag.frag_base.frag_size = *size;
|
||||
fragment->frag_bytes_processed = 0;
|
||||
fragment->frag_offset = offset;
|
||||
|
||||
hdr.hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr.hdr_frag.hdr_common.hdr_flags = flags;
|
||||
hdr.hdr_frag.hdr_frag_length = *size;
|
||||
hdr.hdr_frag.hdr_frag_offset = update_offset;
|
||||
hdr.hdr_frag.hdr_frag_offset = fragment->frag_offset;
|
||||
hdr.hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr.hdr_frag.hdr_src_ptr.pval = fragment;
|
||||
hdr.hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
|
||||
|
||||
fragment->send_frag.frag_base.frag_addr =(void *)target_buffer;
|
||||
fragment->send_frag.frag_base.frag_size = bytes;
|
||||
fragment->already_send = 0;
|
||||
/* must update the offset after actual fragment size is determined
|
||||
* before attempting to send the fragment
|
||||
*/
|
||||
mca_pml_base_send_request_offset( sendreq,
|
||||
fragment->send_frag.frag_base.frag_size );
|
||||
|
||||
fragment->send_frag.frag_base.frag_addr = (char*)sendreq->req_base.req_addr + offset;
|
||||
|
||||
/* The first DMA memory buffer has been alocated in same time as the fragment */
|
||||
item = (ompi_list_item_t*)fragment->send_buf;
|
||||
|
||||
if( (*size) <= (5 * GM_BUF_SIZE) ) { /* small protocol */
|
||||
size_t max_data;
|
||||
size_t max_data, remaining_bytes = fragment->send_frag.frag_base.frag_size;
|
||||
int freeAfter;
|
||||
unsigned int in_size;
|
||||
struct iovec iov;
|
||||
ompi_convertor_t *convertor = &(fragment->send_frag.frag_base.frag_convertor);
|
||||
|
||||
/* If we have an eager send then we should send the rest of the data. */
|
||||
while( 0 == rc ) {
|
||||
while( 0 < remaining_bytes ) {
|
||||
if( NULL == item ) {
|
||||
OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
|
||||
}
|
||||
iov.iov_base = (char*)item + header_length;
|
||||
iov.iov_len = GM_BUF_SIZE - header_length;
|
||||
if( iov.iov_len >= remaining_bytes )
|
||||
iov.iov_len = remaining_bytes;
|
||||
max_data = iov.iov_len;
|
||||
in_size = 1;
|
||||
|
||||
if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0)
|
||||
if( ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter) < 0)
|
||||
return OMPI_ERROR;
|
||||
|
||||
hdr.hdr_frag.hdr_frag_offset = update_offset;
|
||||
hdr.hdr_frag.hdr_frag_offset = fragment->frag_offset;
|
||||
hdr.hdr_frag.hdr_frag_length = iov.iov_len;
|
||||
update_offset += iov.iov_len;
|
||||
*(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag;
|
||||
fragment->frag_offset += iov.iov_len;
|
||||
remaining_bytes -= iov.iov_len;
|
||||
|
||||
if( remaining_bytes == 0 ) hdr.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
|
||||
|
||||
*(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag;
|
||||
/* for the last piece set the header type to FIN */
|
||||
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, item,
|
||||
GM_SIZE, iov.iov_len + sizeof(mca_ptl_gm_eager_header_t),
|
||||
GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
send_continue_callback, (void*)item );
|
||||
item = NULL; /* force to retrieve a new one on the next loop */
|
||||
}
|
||||
*size = update_offset - offset;
|
||||
*size = fragment->frag_offset - offset;
|
||||
if( !(flags & MCA_PTL_FLAGS_ACK) ) {
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
fragment->send_frag.frag_request,
|
||||
@ -152,18 +164,20 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
ompi_ptr_t* local_address;
|
||||
|
||||
status = gm_register_memory( ptl_peer->peer_ptl->gm_port,
|
||||
target_buffer /*sendreq->req_base.req_addr */,
|
||||
fragment->send_frag.frag_base.frag_addr,
|
||||
(*size) );
|
||||
if( status != GM_SUCCESS ) {
|
||||
printf( "Cannot register memory from %p length %ud bytes\n",
|
||||
(void*)sendreq->req_base.req_addr, (*size) );
|
||||
ompi_output( 0, "Cannot register memory from %p length %ud bytes\n",
|
||||
(void*)sendreq->req_base.req_addr, (*size) );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
*(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag;
|
||||
|
||||
local_address = (ompi_ptr_t*)((char*)item + header_length);
|
||||
local_address->lval = 0;
|
||||
local_address->pval = target_buffer /*(void*)sendreq->req_base.req_addr */;
|
||||
local_address->pval = fragment->send_frag.frag_base.frag_addr;
|
||||
|
||||
fragment->frag_offset += fragment->send_frag.frag_base.frag_size;
|
||||
|
||||
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, item,
|
||||
GM_SIZE, header_length + sizeof(ompi_ptr_t), GM_LOW_PRIORITY,
|
||||
@ -199,7 +213,7 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
|
||||
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
|
||||
fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
|
||||
fragment->send_frag.frag_request = sendreq;
|
||||
fragment->already_send = 0;
|
||||
fragment->frag_bytes_processed = 0;
|
||||
|
||||
/* At this point the header is already filled up with informations as a match header */
|
||||
if( (flags & MCA_PTL_FLAGS_ACK) || (0 == offset) ) {
|
||||
@ -275,9 +289,6 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
|
||||
*/
|
||||
size_out = iov.iov_len + header_length;
|
||||
|
||||
DO_DEBUG( printf( "send pointer %p SIZE %d length %lu\n",
|
||||
(void*)fragment->send_buf, GM_BUF_SIZE, size_out ) );
|
||||
|
||||
/* must update the offset after actual fragment size is determined
|
||||
* before attempting to send the fragment
|
||||
*/
|
||||
@ -288,13 +299,13 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
|
||||
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf,
|
||||
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
|
||||
send_callback, (void *)fragment );
|
||||
|
||||
fragment->frag_bytes_processed = size_out - header_length;
|
||||
*size = fragment->frag_bytes_processed;
|
||||
if( !(flags & MCA_PTL_FLAGS_ACK) ) {
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
fragment->send_frag.frag_request,
|
||||
size_out );
|
||||
}
|
||||
*size = size_out - header_length;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -311,7 +322,7 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
|
||||
putfrag = (mca_ptl_gm_send_frag_t *)context;
|
||||
header = (mca_ptl_base_header_t*)putfrag->send_buf;
|
||||
bytes2 = header->hdr_ack.hdr_dst_size;
|
||||
ptl = (mca_ptl_gm_module_t *)putfrag->ptl;
|
||||
ptl = (mca_ptl_gm_module_t*)putfrag->send_frag.frag_base.frag_owner;
|
||||
send_req = putfrag->req;
|
||||
|
||||
switch (status) {
|
||||
@ -382,18 +393,13 @@ void send_callback( struct gm_port *port, void * context, gm_status_t status )
|
||||
*/
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_ACK:
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_FIN:
|
||||
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
|
||||
hdr_dst_size);
|
||||
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
|
||||
break;
|
||||
default:
|
||||
/* Not going to call progress on this send,
|
||||
* and not free-ing descriptor */
|
||||
frag->send_complete = 1;
|
||||
/* Not going to call progress on this send, and not free-ing descriptor */
|
||||
printf( "Called with a strange headertype ...\n" );
|
||||
break;
|
||||
}
|
||||
break;
|
||||
|
||||
@ -415,12 +421,12 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
|
||||
{
|
||||
mca_ptl_gm_send_frag_t * frag;
|
||||
mca_pml_base_send_request_t *req;
|
||||
mca_pml_base_recv_request_t *request;
|
||||
int status;
|
||||
|
||||
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) {
|
||||
frag = (mca_ptl_gm_send_frag_t *)(header->hdr_ack.hdr_src_ptr.pval);
|
||||
req = (mca_pml_base_send_request_t *) frag->req;
|
||||
if( MCA_PTL_HDR_TYPE_ACK == header->hdr_common.hdr_type ) {
|
||||
frag = (mca_ptl_gm_send_frag_t*)(header->hdr_ack.hdr_src_ptr.pval);
|
||||
/* update the fragment header with the most up2date informations */
|
||||
frag->send_frag.frag_base.frag_header.hdr_ack.hdr_dst_match = header->hdr_ack.hdr_dst_match;
|
||||
req = (mca_pml_base_send_request_t*)frag->req;
|
||||
assert(req != NULL);
|
||||
req->req_peer_match = header->hdr_ack.hdr_dst_match;
|
||||
req->req_peer_addr = header->hdr_ack.hdr_dst_addr;
|
||||
@ -432,22 +438,12 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
|
||||
frag->send_frag.frag_request,
|
||||
frag->send_frag.frag_base.frag_size );
|
||||
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag);
|
||||
} else {
|
||||
if( header->hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) {
|
||||
frag->send_frag.frag_base.frag_header.hdr_common.hdr_flags |= PTL_FLAG_GM_HAS_FRAGMENT;
|
||||
}
|
||||
}
|
||||
} else if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) {
|
||||
request = (mca_pml_base_recv_request_t*)header->hdr_ack.hdr_dst_match.pval;
|
||||
/* call receive progress and indicate the recv has been completed */
|
||||
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t *) ptl,
|
||||
request ,
|
||||
header->hdr_ack.hdr_dst_size,
|
||||
header->hdr_ack.hdr_dst_size );
|
||||
/* deregister the memory */
|
||||
status = gm_deregister_memory( ptl->gm_port,
|
||||
header->hdr_ack.hdr_dst_addr.pval,
|
||||
header->hdr_ack.hdr_dst_size );
|
||||
|
||||
if(GM_SUCCESS != status) {
|
||||
ompi_output(0," unpinning memory failed\n");
|
||||
}
|
||||
} else if( MCA_PTL_HDR_TYPE_NACK == header->hdr_common.hdr_type ) {
|
||||
} else {
|
||||
OMPI_OUTPUT((0, "Unkonwn header type in ptl_gm_ctrl_frag\n"));
|
||||
}
|
||||
@ -475,13 +471,8 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
|
||||
recv_frag->frag_recv.frag_base.frag_peer = NULL;
|
||||
recv_frag->frag_recv.frag_request = NULL;
|
||||
recv_frag->frag_recv.frag_is_buffered = false;
|
||||
recv_frag->frag_hdr_cnt = 0;
|
||||
recv_frag->frag_msg_cnt = 0;
|
||||
recv_frag->frag_ack_pending = false;
|
||||
recv_frag->frag_progressed = 0;
|
||||
|
||||
recv_frag->frag_recv.frag_base.frag_header.hdr_rndv = hdr->hdr_rndv;
|
||||
|
||||
if( MCA_PTL_HDR_TYPE_MATCH == hdr->hdr_rndv.hdr_match.hdr_common.hdr_type ) {
|
||||
recv_frag->frag_recv.frag_base.frag_addr =
|
||||
(char *) hdr + sizeof(mca_ptl_base_match_header_t);
|
||||
@ -495,21 +486,24 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
|
||||
|
||||
recv_frag->matched = false;
|
||||
recv_frag->have_allocated_buffer = false;
|
||||
recv_frag->ptl = ptl;
|
||||
|
||||
recv_frag->frag_ack_pending = false;
|
||||
recv_frag->frag_progressed = 0;
|
||||
recv_frag->frag_offset = 0; /* initial frgment */
|
||||
recv_frag->frag_bytes_processed = 0;
|
||||
|
||||
matched = ptl->super.ptl_match( &(ptl->super),
|
||||
&(recv_frag->frag_recv),
|
||||
&(recv_frag->frag_recv.frag_base.frag_header.hdr_match) );
|
||||
if( matched ) {
|
||||
if( !matched ) {
|
||||
size_t length = recv_frag->frag_recv.frag_base.frag_size;
|
||||
/* get some memory and copy the data inside. We can then release the receive buffer */
|
||||
char* ptr = (char*)malloc( sizeof(char) * length );
|
||||
recv_frag->have_allocated_buffer = true;
|
||||
memcpy( ptr, recv_frag->frag_recv.frag_base.frag_addr, length );
|
||||
recv_frag->frag_recv.frag_base.frag_addr = ptr;
|
||||
return NULL;
|
||||
return recv_frag;
|
||||
}
|
||||
return recv_frag;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* This function get called when the gm_get is finish (i.e. when the read from remote memory
|
||||
@ -524,7 +518,10 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st
|
||||
mca_ptl_gm_send_frag_t *ack;
|
||||
mca_pml_base_recv_request_t *request;
|
||||
mca_ptl_gm_peer_t* peer;
|
||||
mca_ptl_base_header_t* hdr;
|
||||
int rc;
|
||||
size_t length;
|
||||
void* pointer;
|
||||
|
||||
frag = (mca_ptl_gm_recv_frag_t*)context;
|
||||
gm_ptl = (mca_ptl_gm_module_t *)frag->frag_recv.frag_base.frag_owner;
|
||||
@ -533,17 +530,33 @@ static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_st
|
||||
|
||||
switch( status ) {
|
||||
case GM_SUCCESS:
|
||||
/*OMPI_OUTPUT( (0, "[%s:%d] mca_ptl_gm_get_callback release header %p from fragment %p (available %\d)\n",
|
||||
__FILE__, __LINE__, (void*)header, (void*)frag, gm_ptl->num_send_tokens) );*/
|
||||
pointer = frag->frag_recv.frag_base.frag_addr;
|
||||
length = frag->frag_recv.frag_base.frag_size;
|
||||
ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL );
|
||||
rc = mca_ptl_gm_send_ack_init( ack, gm_ptl,
|
||||
(mca_ptl_gm_peer_t *)(frag->frag_recv.frag_base.frag_peer),
|
||||
frag, NULL,
|
||||
frag->frag_recv.frag_base.frag_size );
|
||||
|
||||
hdr = (mca_ptl_base_header_t*)ack->send_buf;
|
||||
hdr->hdr_common.hdr_flags |= PTL_FLAG_GM_HAS_FRAGMENT;
|
||||
frag->frag_bytes_processed += frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length;
|
||||
if( frag->frag_recv.frag_base.frag_size <= frag->frag_bytes_processed ) {
|
||||
gm_ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)gm_ptl,
|
||||
request,
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length,
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length );
|
||||
/* This request is done. I will send back the FIN message */
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
|
||||
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag );
|
||||
}
|
||||
gm_send_to_peer_with_callback( ((mca_ptl_gm_module_t*)(ack->send_frag.frag_base.frag_owner))->gm_port,
|
||||
ack->send_buf, GM_SIZE, sizeof(mca_ptl_base_ack_header_t),
|
||||
GM_LOW_PRIORITY, peer->local_id, send_callback, (void*)ack );
|
||||
status = gm_deregister_memory( ((mca_ptl_gm_module_t*)(ack->send_frag.frag_base.frag_owner))->gm_port,
|
||||
pointer, length );
|
||||
if( GM_SUCCESS != status ) {
|
||||
OMPI_OUTPUT( (0, "unpinning memory (%p, %u) failed \n", pointer, length) );
|
||||
}
|
||||
break;
|
||||
case GM_SEND_TIMED_OUT:
|
||||
printf( "mca_ptl_gm_get_callback timed out\n" );
|
||||
@ -561,7 +574,7 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
gm_recv_event_t* event )
|
||||
{
|
||||
mca_pml_base_recv_request_t *request;
|
||||
ompi_convertor_t* convertor;
|
||||
ompi_convertor_t* convertor = NULL;
|
||||
mca_ptl_base_header_t *hdr;
|
||||
struct iovec iov;
|
||||
uint32_t iov_count, max_data;
|
||||
@ -570,19 +583,44 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
|
||||
hdr = (mca_ptl_base_header_t *)gm_ntohp(event->recv.buffer);
|
||||
|
||||
recv_frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
|
||||
request = (mca_pml_base_recv_request_t*)recv_frag->frag_recv.frag_request;
|
||||
/* here we can have a synchronisation problem if several threads work in same time
|
||||
* with the same request. The only question is if it's possible ?
|
||||
*/
|
||||
convertor = &(recv_frag->frag_recv.frag_base.frag_convertor);
|
||||
ompi_convertor_init_for_recv( convertor, 0,
|
||||
request->req_base.req_datatype,
|
||||
request->req_base.req_count,
|
||||
request->req_base.req_addr,
|
||||
hdr->hdr_frag.hdr_frag_offset, NULL );
|
||||
if( hdr->hdr_common.hdr_flags & PTL_FLAG_GM_HAS_FRAGMENT ) {
|
||||
recv_frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
|
||||
request = (mca_pml_base_recv_request_t*)recv_frag->frag_recv.frag_request;
|
||||
/* here we can have a synchronisation problem if several threads work in same time
|
||||
* with the same request. The only question is if it's possible ?
|
||||
*/
|
||||
convertor = &(recv_frag->frag_recv.frag_base.frag_convertor);
|
||||
} else {
|
||||
request = (mca_pml_base_recv_request_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
|
||||
|
||||
if( hdr->hdr_frag.hdr_frag_length <= (GM_BUF_SIZE - sizeof(mca_ptl_base_frag_header_t)) ) {
|
||||
if( hdr->hdr_frag.hdr_frag_length <= (GM_BUF_SIZE - sizeof(mca_ptl_base_frag_header_t)) ) {
|
||||
ompi_proc_t* proc = ompi_comm_peer_lookup( request->req_base.req_comm,
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE );
|
||||
convertor = ompi_convertor_get_copy( proc->proc_convertor );
|
||||
recv_frag = NULL;
|
||||
} else { /* large message => we have to create a receive fragment */
|
||||
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
|
||||
recv_frag->frag_recv.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl;
|
||||
recv_frag->frag_recv.frag_request = request;
|
||||
recv_frag->frag_recv.frag_base.frag_header.hdr_frag = hdr->hdr_frag;
|
||||
recv_frag->frag_recv.frag_base.frag_peer =
|
||||
mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm,
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE,
|
||||
(struct mca_ptl_base_module_t*)ptl );
|
||||
recv_frag->frag_offset = hdr->hdr_frag.hdr_frag_offset;
|
||||
recv_frag->matched = true;
|
||||
recv_frag->frag_bytes_processed = 0;
|
||||
recv_frag->frag_recv.frag_base.frag_size = hdr->hdr_frag.hdr_frag_length;
|
||||
convertor = &(recv_frag->frag_recv.frag_base.frag_convertor);
|
||||
}
|
||||
ompi_convertor_init_for_recv( convertor, 0,
|
||||
request->req_base.req_datatype,
|
||||
request->req_base.req_count,
|
||||
request->req_base.req_addr,
|
||||
hdr->hdr_frag.hdr_frag_offset, NULL );
|
||||
}
|
||||
|
||||
if( NULL == recv_frag ) {
|
||||
iov.iov_base = (char*)hdr + sizeof(mca_ptl_base_frag_header_t);
|
||||
iov.iov_len = hdr->hdr_frag.hdr_frag_length;
|
||||
iov_count = 1;
|
||||
@ -590,12 +628,16 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
freeAfter = 0; /* unused here */
|
||||
rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter );
|
||||
assert( 0 == freeAfter );
|
||||
if( (hdr->hdr_frag.hdr_frag_offset + hdr->hdr_frag.hdr_frag_length) >=
|
||||
recv_frag->frag_recv.frag_base.frag_size ) {
|
||||
/* update the request status if we are done */
|
||||
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data );
|
||||
OBJ_RELEASE( recv_frag );
|
||||
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data );
|
||||
if( PTL_FLAG_GM_LAST_FRAGMENT & hdr->hdr_common.hdr_flags ) {
|
||||
/* I'm done with this fragment. Return it to the free list */
|
||||
if( NULL != recv_frag ) {
|
||||
OMPI_FREE_LIST_RETURN( &(ptl->gm_recv_frags_free), (ompi_list_item_t*)recv_frag );
|
||||
}
|
||||
}
|
||||
if( NULL == recv_frag ) {
|
||||
OBJ_RELEASE( convertor );
|
||||
}
|
||||
} else {
|
||||
gm_status_t status;
|
||||
ompi_ptr_t* remote_memory = (ompi_ptr_t*)((char*)hdr + sizeof(mca_ptl_base_frag_header_t));
|
||||
@ -605,21 +647,40 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
(char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset,
|
||||
hdr->hdr_frag.hdr_frag_length );
|
||||
if( status != GM_SUCCESS ) {
|
||||
printf( "Cannot register memory from %p length %lld bytes\n",
|
||||
(void*)request->req_base.req_addr, hdr->hdr_frag.hdr_frag_length );
|
||||
ompi_output( 0, "Cannot register memory from %p length %lld bytes\n",
|
||||
(void*)request->req_base.req_addr, hdr->hdr_frag.hdr_frag_length );
|
||||
return NULL;
|
||||
}
|
||||
|
||||
peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer;
|
||||
recv_frag->frag_recv.frag_base.frag_addr = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset;
|
||||
gm_get( ptl->gm_port, remote_memory->lval,
|
||||
recv_frag->frag_recv.frag_base.frag_addr,
|
||||
recv_frag->frag_recv.frag_base.frag_size,
|
||||
GM_HIGH_PRIORITY, peer->peer_addr->global_id, peer->peer_addr->port_id,
|
||||
GM_LOW_PRIORITY, peer->local_id, peer->port_number,
|
||||
mca_ptl_gm_get_callback, recv_frag );
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static mca_ptl_gm_recv_frag_t*
|
||||
mca_ptl_gm_recv_frag_fin( struct mca_ptl_gm_module_t *ptl,
|
||||
gm_recv_event_t* event )
|
||||
{
|
||||
mca_ptl_gm_send_frag_t* frag;
|
||||
mca_ptl_base_header_t *hdr;
|
||||
|
||||
hdr = (mca_ptl_base_header_t *)gm_ntohp(event->recv.buffer);
|
||||
frag = (mca_ptl_gm_send_frag_t*)hdr->hdr_ack.hdr_src_ptr.pval;
|
||||
|
||||
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
|
||||
frag->send_frag.frag_request,
|
||||
hdr->hdr_ack.hdr_dst_size );
|
||||
|
||||
OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t*)frag );
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl )
|
||||
{
|
||||
mca_ptl_gm_recv_frag_t * frag = NULL;
|
||||
@ -661,13 +722,15 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( struct mca_ptl_gm_module_t *ptl, gm_
|
||||
case MCA_PTL_HDR_TYPE_RNDV:
|
||||
frag = mca_ptl_gm_recv_frag_match( ptl, event );
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_FIN:
|
||||
frag = mca_ptl_gm_recv_frag_fin( ptl, event );
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_FRAG:
|
||||
frag = mca_ptl_gm_recv_frag_frag( ptl, event );
|
||||
break;
|
||||
|
||||
case MCA_PTL_HDR_TYPE_ACK:
|
||||
case MCA_PTL_HDR_TYPE_NACK:
|
||||
case MCA_PTL_HDR_TYPE_FIN:
|
||||
ptl_gm_ctrl_frag(ptl,header);
|
||||
break;
|
||||
default:
|
||||
|
@ -20,7 +20,9 @@
|
||||
struct mca_ptl_gm_send_frag_t;
|
||||
struct mca_ptl_gm_peer_t;
|
||||
|
||||
#define PTL_GM_FIRST_FRAG_SIZE (1<<14)
|
||||
#define PTL_GM_FIRST_FRAG_SIZE (1<<14)
|
||||
#define PTL_FLAG_GM_HAS_FRAGMENT 0x04
|
||||
#define PTL_FLAG_GM_LAST_FRAGMENT 0x08
|
||||
|
||||
/*#define DO_DEBUG(inst) inst*/
|
||||
#define DO_DEBUG(inst)
|
||||
@ -43,11 +45,7 @@ mca_ptl_gm_peer_send_continue( struct mca_ptl_gm_peer_t *ptl_peer,
|
||||
struct mca_pml_base_send_request_t *sendreq,
|
||||
size_t offset,
|
||||
size_t *size,
|
||||
int flags,
|
||||
void *target_buffer,
|
||||
int bytes );
|
||||
|
||||
|
||||
int flags );
|
||||
|
||||
void send_callback(struct gm_port *port,void * context, gm_status_t status);
|
||||
|
||||
|
@ -86,7 +86,6 @@ mca_ptl_gm_alloc_send_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
sendfrag->type = -1;
|
||||
sendfrag->wait_for_ack = 0;
|
||||
sendfrag->put_sent = -1;
|
||||
sendfrag->send_complete = -1;
|
||||
|
||||
return sendfrag;
|
||||
}
|
||||
@ -114,7 +113,6 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
|
||||
ack->type = -1;
|
||||
ack->wait_for_ack = 0;
|
||||
ack->put_sent = -1;
|
||||
ack->send_complete = -1;
|
||||
|
||||
request = frag->frag_recv.frag_request;
|
||||
|
||||
@ -135,13 +133,12 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
|
||||
ack->send_frag.frag_base.frag_addr = NULL;
|
||||
ack->send_frag.frag_base.frag_size = 0;
|
||||
ack->status = 1; /* was able to register memory */
|
||||
ack->ptl = ptl;
|
||||
ack->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t*)ptl;
|
||||
ack->send_frag.frag_base.frag_header.hdr_ack = *hdr;
|
||||
ack->wait_for_ack = 0;
|
||||
ack->type = ACK;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -170,13 +167,12 @@ int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* putfrag,
|
||||
}
|
||||
|
||||
putfrag->status = -1;
|
||||
putfrag->send_complete = -1;
|
||||
putfrag->wait_for_ack = 0;
|
||||
putfrag->put_sent = 0;
|
||||
putfrag->type = PUT;
|
||||
putfrag->req = sendreq;
|
||||
putfrag->ptl = gm_ptl;
|
||||
putfrag->peer = ptl_peer;
|
||||
putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t*)gm_ptl;
|
||||
putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -195,8 +191,6 @@ ompi_class_t mca_ptl_gm_recv_frag_t_class = {
|
||||
static void
|
||||
mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t * frag)
|
||||
{
|
||||
frag->frag_hdr_cnt = 0;
|
||||
frag->frag_msg_cnt = 0;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -52,38 +52,35 @@ extern "C" {
|
||||
};
|
||||
typedef struct mca_ptl_gm_rdv_header_t mca_ptl_gm_rdv_header_t;
|
||||
|
||||
/*struct mca_ptl_base_peer_t;*/
|
||||
struct mca_ptl_gm_peer_t;
|
||||
|
||||
/**
|
||||
* GM send fragment derived type.
|
||||
*/
|
||||
struct mca_ptl_gm_send_frag_t {
|
||||
mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */
|
||||
void * send_buf;
|
||||
void * registered_buf;
|
||||
struct mca_pml_base_send_request_t *req;
|
||||
struct mca_ptl_gm_module_t *ptl;
|
||||
struct mca_ptl_gm_peer_t *peer;
|
||||
|
||||
uint32_t already_send; /**< data sended so far */
|
||||
void* send_buf;
|
||||
ompi_ptr_t* registered_buf;
|
||||
|
||||
size_t frag_bytes_processed; /**< data sended so far */
|
||||
size_t frag_offset;
|
||||
int status;
|
||||
int type;
|
||||
int wait_for_ack;
|
||||
int put_sent;
|
||||
int send_complete;
|
||||
};
|
||||
typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t;
|
||||
|
||||
|
||||
struct mca_ptl_gm_recv_frag_t {
|
||||
mca_ptl_base_recv_frag_t frag_recv;
|
||||
size_t frag_hdr_cnt;
|
||||
size_t frag_msg_cnt;
|
||||
struct mca_pml_base_recv_request_t* req;
|
||||
size_t frag_bytes_processed;
|
||||
size_t frag_offset;
|
||||
volatile int frag_progressed;
|
||||
bool frag_ack_pending;
|
||||
void *alloc_recv_buffer;
|
||||
void *unex_recv_buffer;
|
||||
void * registered_buf;
|
||||
struct mca_ptl_gm_module_t *ptl;
|
||||
bool matched;
|
||||
bool have_allocated_buffer;
|
||||
bool have_registered_buffer;
|
||||
@ -94,12 +91,12 @@ extern "C" {
|
||||
mca_ptl_gm_send_frag_t *
|
||||
mca_ptl_gm_alloc_send_frag ( struct mca_ptl_gm_module_t* ptl,
|
||||
struct mca_pml_base_send_request_t* sendreq );
|
||||
|
||||
|
||||
int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
|
||||
struct mca_ptl_gm_module_t *ptl,
|
||||
struct mca_ptl_gm_peer_t* ptl_peer,
|
||||
struct mca_ptl_gm_recv_frag_t* frag,
|
||||
char * buffer,
|
||||
char* buffer,
|
||||
int size );
|
||||
|
||||
int
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user