1
1

Just a checkpoint. After a week of GM optimizations there is just one conclusion: too many optimizations

break the code. On the other hand tomorrow I will have 6 hours in the plane ...

This commit was SVN r3793.
Этот коммит содержится в:
George Bosilca 2004-12-13 05:38:34 +00:00
родитель 4857cd492a
Коммит 0149a8ca06
5 изменённых файлов: 434 добавлений и 367 удалений

Просмотреть файл

@ -286,8 +286,7 @@ mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl,
gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer; gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer;
rc = mca_ptl_gm_init_header_match( sendfrag, sendreq, flags ); rc = mca_ptl_gm_init_header_match( sendfrag, sendreq, flags );
rc = mca_ptl_gm_peer_send( gm_ptl_peer, sendfrag, sendreq, offset, &size, flags ); rc = mca_ptl_gm_peer_send( gm_ptl_peer, sendfrag, sendreq, offset, &size, flags );
sendreq->req_offset += size;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -303,47 +302,16 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
{ {
int rc; int rc;
mca_ptl_gm_send_frag_t *putfrag; mca_ptl_gm_send_frag_t *putfrag;
mca_ptl_gm_module_t * gm_ptl;
void* destination_buffer;
char * buffer_ptr;
int status, bytes_reg;
gm_ptl= (mca_ptl_gm_module_t *)ptl; putfrag = mca_ptl_gm_alloc_send_frag( (mca_ptl_gm_module_t*)ptl, sendreq ); /*alloc_put_frag */
buffer_ptr = ((char *) (sendreq->req_base.req_addr)) + offset ; rc = mca_ptl_gm_put_frag_init( putfrag,
bytes_reg = size; (mca_ptl_gm_peer_t*)ptl_peer, (mca_ptl_gm_module_t*)ptl,
destination_buffer =(void *)( (sendreq->req_peer_addr).pval);
/* register the user buffer */
if (offset > 0) {
status = gm_register_memory(gm_ptl->gm_port, buffer_ptr, bytes_reg);
if(GM_SUCCESS != status) {
ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__);
}
}
putfrag = mca_ptl_gm_alloc_send_frag( gm_ptl, sendreq ); /*alloc_put_frag */
putfrag->registered_buf = (void *)buffer_ptr;
putfrag->peer = (mca_ptl_gm_peer_t *)ptl_peer;
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =putfrag;
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags;
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = 0;
rc = mca_ptl_gm_put_frag_init( putfrag ,
(mca_ptl_gm_peer_t*)ptl_peer,gm_ptl,
sendreq, offset, &size, flags ); sendreq, offset, &size, flags );
rc = mca_ptl_gm_peer_put( (mca_ptl_gm_peer_t *)ptl_peer, putfrag, rc = mca_ptl_gm_peer_send_continue( (mca_ptl_gm_peer_t *)ptl_peer, putfrag,
sendreq, offset, &size, flags, sendreq, offset, &size, flags,
destination_buffer, bytes_reg ); ((char*)(sendreq->req_base.req_addr)) + offset,
gm_ptl->num_send_tokens--; size );
sendreq->req_offset += size;
rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,sendreq,
offset,&size,flags);
assert(rc == 0);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -360,30 +328,6 @@ mca_ptl_gm_get (struct mca_ptl_base_module_t *ptl,
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/* This function get called when the gm_get is finish (i.e. when the read from remote memory
* is completed. We have to send back the ack.
*/
static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_status_t status )
{
int rc;
mca_ptl_gm_recv_frag_t* recv_frag = (mca_ptl_gm_recv_frag_t*)context;
mca_ptl_gm_send_frag_t *ack;
mca_ptl_gm_module_t *gm_ptl;
mca_pml_base_recv_request_t *request;
size_t size = 0;
gm_ptl = (mca_ptl_gm_module_t*)recv_frag->frag_recv.frag_base.frag_owner;
/* send the registered memory information, send recv request * ptr */
request = recv_frag->frag_recv.frag_request;
rc = mca_ptl_gm_send_ack_init( ack, gm_ptl,
(mca_ptl_gm_peer_t *)(recv_frag->frag_recv.frag_base.frag_peer),
recv_frag, request->req_base.req_addr, recv_frag->frag_recv.frag_base.frag_size );
mca_ptl_gm_peer_send( (mca_ptl_gm_peer_t *) (ack->send_frag.frag_base.frag_peer),
ack, NULL, 0, &size, 0 );
gm_ptl->num_send_tokens--;
}
/* A posted receive has been matched - if required send an /* A posted receive has been matched - if required send an
* ack back to the peer and process the fragment. * ack back to the peer and process the fragment.
*/ */
@ -392,65 +336,62 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
mca_ptl_base_recv_frag_t * frag ) mca_ptl_base_recv_frag_t * frag )
{ {
mca_pml_base_recv_request_t *request; mca_pml_base_recv_request_t *request;
mca_ptl_base_header_t *header; mca_ptl_base_header_t *hdr;
int bytes_recv, rc, total_bytes, bytes_reg; int bytes_recv, rc, header_length;
mca_ptl_gm_module_t *gm_ptl; mca_ptl_gm_module_t *gm_ptl;
struct iovec iov;
mca_ptl_gm_send_frag_t *ack; mca_ptl_gm_send_frag_t *ack;
mca_ptl_gm_recv_frag_t *recv_frag; mca_ptl_gm_recv_frag_t *recv_frag;
char *buffer_ptr;
gm_status_t status;
mca_ptl_gm_peer_t* peer; mca_ptl_gm_peer_t* peer;
header = &frag->frag_base.frag_header; hdr = &frag->frag_base.frag_header;
request = frag->frag_request; request = frag->frag_request;
gm_ptl = (mca_ptl_gm_module_t *)ptl; gm_ptl = (mca_ptl_gm_module_t *)ptl;
recv_frag = (mca_ptl_gm_recv_frag_t *)frag; recv_frag = (mca_ptl_gm_recv_frag_t *)frag;
peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer; peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer;
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) { if( MCA_PTL_HDR_TYPE_RNDV == hdr->hdr_common.hdr_type ) {
/* need to send an ack back */ header_length = sizeof(mca_ptl_base_rendezvous_header_t);
} else {
assert( MCA_PTL_HDR_TYPE_MATCH == hdr->hdr_common.hdr_type );
header_length = sizeof(mca_ptl_base_match_header_t);
}
if (hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) {
/* need to send an ack back */
ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL ); ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL );
if( NULL == ack ) { if( NULL == ack ) {
ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n", __FILE__,__LINE__); ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n", __FILE__,__LINE__);
OMPI_THREAD_LOCK (&mca_ptl_gm_component.gm_lock); OMPI_THREAD_LOCK (&mca_ptl_gm_component.gm_lock);
recv_frag->frag_ack_pending = true; recv_frag->frag_ack_pending = true;
ompi_list_append (&mca_ptl_gm_module.gm_pending_acks, (ompi_list_item_t *) frag); ompi_list_append (&mca_ptl_gm_module.gm_pending_acks, (ompi_list_item_t *)frag);
OMPI_THREAD_UNLOCK (&mca_ptl_gm_component.gm_lock); OMPI_THREAD_UNLOCK (&mca_ptl_gm_component.gm_lock);
} else { } else {
buffer_ptr = (char*)(request->req_base.req_addr); mca_ptl_base_header_t* ack_hdr = (mca_ptl_base_header_t*)ack->send_buf;
total_bytes = request->req_bytes_packed; ack_hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
bytes_recv = frag->frag_base.frag_size; ack_hdr->hdr_ack.hdr_common.hdr_flags = 0;
bytes_reg = total_bytes - bytes_recv; ack_hdr->hdr_ack.hdr_src_ptr.lval = hdr->hdr_rndv.hdr_src_ptr.lval;
buffer_ptr += bytes_recv; ack_hdr->hdr_ack.hdr_dst_match.lval = 0L;
/* Register the local operation before */ ack_hdr->hdr_ack.hdr_dst_match.pval = frag;
status = gm_register_memory(gm_ptl->gm_port, buffer_ptr, bytes_reg); ack_hdr->hdr_ack.hdr_dst_addr.lval = 0L;
recv_frag->registered_buf = buffer_ptr; ack_hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed;
if(GM_SUCCESS != status) { gm_send_to_peer_with_callback( ((mca_ptl_gm_module_t*)ptl)->gm_port, ack_hdr,
ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__); GM_SIZE, sizeof(mca_ptl_base_ack_header_t), GM_LOW_PRIORITY,
} peer->local_id,
send_callback, (void *)ack );
/* And now read from the remote memory in the local memory. The amount used for this
* operation should be the minimum one between the server and the client.
*/
gm_get( gm_ptl->gm_port, recv_frag->remote_registered_memory.lval,
buffer_ptr, bytes_reg, GM_HIGH_PRIORITY, peer->peer_addr->global_id,
peer->peer_addr->port_id, mca_ptl_gm_get_callback, frag );
} }
} }
/* Here we expect that frag_addr is the begin of the buffer header included */
iov.iov_base = ((char*)frag->frag_base.frag_addr);
bytes_recv = frag->frag_base.frag_size; bytes_recv = frag->frag_base.frag_size;
iov.iov_len = bytes_recv;
if( frag->frag_base.frag_size > 0 ) {
if( header->hdr_match.hdr_msg_length > 0 ) {
unsigned int max_data, out_size; unsigned int max_data, out_size;
int freeAfter; int freeAfter;
struct iovec iov;
/*peer = ompi_comm_peer_lookup( request->req_base.req_comm, /* Here we expect that frag_addr is the begin of the buffer header included */
header->hdr_match.hdr_src );*/ iov.iov_base = ((char*)frag->frag_base.frag_addr);
iov.iov_len = bytes_recv;
ompi_convertor_copy( peer->peer_proc->proc_ompi->proc_convertor, ompi_convertor_copy( peer->peer_proc->proc_ompi->proc_convertor,
&frag->frag_base.frag_convertor); &frag->frag_base.frag_convertor);
ompi_convertor_init_for_recv( &frag->frag_base.frag_convertor, ompi_convertor_init_for_recv( &frag->frag_base.frag_convertor,

Просмотреть файл

@ -27,28 +27,157 @@
#include "ptl_gm_sendfrag.h" #include "ptl_gm_sendfrag.h"
#include "ptl_gm_priv.h" #include "ptl_gm_priv.h"
int mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer, static void send_continue_callback( struct gm_port *port, void * context, gm_status_t status )
mca_ptl_gm_send_frag_t *fragment,
struct mca_pml_base_send_request_t *sendreq,
size_t offset,
size_t *size,
int flags,
void * target_buffer,
int bytes)
{ {
gm_put( ptl_peer->peer_ptl->gm_port, fragment->registered_buf, mca_ptl_gm_module_t* gm_ptl;
(gm_remote_ptr_t)target_buffer, bytes, GM_LOW_PRIORITY, mca_ptl_gm_send_frag_t* frag;
ptl_peer->local_id, ptl_peer->port_number, mca_ptl_base_header_t* header;
put_callback, (void *)fragment );
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super; header = (mca_ptl_base_header_t*)context;
fragment->send_frag.frag_base.frag_peer =
(struct mca_ptl_base_peer_t*)ptl_peer; frag = header->hdr_frag.hdr_src_ptr.pval;
gm_ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
switch( status ) {
case GM_SUCCESS:
/*OMPI_OUTPUT( (0, "[%s:%d] send_continue_callback release header %p from fragment %p (available %d)\n",
__FILE__, __LINE__, (void*)header, (void*)frag, gm_ptl->num_send_tokens) );*/
frag->already_send += header->hdr_frag.hdr_frag_length;
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) );
/* release the send token */
ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 );
if( frag->already_send >= frag->send_frag.frag_base.frag_size ) {
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_frags), ((ompi_list_item_t*)frag) );
}
break;
case GM_SEND_TIMED_OUT:
printf( "send_continue timed out\n" );
break;
case GM_SEND_DROPPED:
printf( "send_continue dropped\n" );
break;
default:
printf( "send_continue other error %d\n", status );
}
}
int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
mca_ptl_gm_send_frag_t *fragment,
struct mca_pml_base_send_request_t *sendreq,
size_t offset,
size_t *size,
int flags,
void * target_buffer,
int bytes )
{
mca_ptl_base_header_t hdr;
size_t update_offset = offset, header_length = sizeof(mca_ptl_base_frag_header_t);
ompi_list_item_t* item;
int rc = 0;
/*OMPI_OUTPUT( (0, "[%s:%d]mca_ptl_gm_peer_send_continue peer %p fragment %p send request %p\n\toffset %ld size %ld flags %d target buffer %p bytes %d\n",
__FILE__, __LINE__, (void*)ptl_peer, (void*)fragment, (void*)sendreq, offset, *size,
flags, target_buffer, bytes) );*/
hdr.hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr.hdr_frag.hdr_common.hdr_flags = flags;
hdr.hdr_frag.hdr_frag_length = *size;
hdr.hdr_frag.hdr_frag_offset = update_offset;
hdr.hdr_frag.hdr_src_ptr.lval = 0L; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr.hdr_frag.hdr_src_ptr.pval = fragment;
hdr.hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
fragment->send_frag.frag_base.frag_addr =(void *)target_buffer; fragment->send_frag.frag_base.frag_addr =(void *)target_buffer;
fragment->send_frag.frag_base.frag_size = bytes; fragment->send_frag.frag_base.frag_size = bytes;
fragment->already_send = 0;
/* must update the offset after actual fragment size is determined
* before attempting to send the fragment
*/
mca_pml_base_send_request_offset( sendreq,
fragment->send_frag.frag_base.frag_size );
/* The first DMA memory buffer has been alocated in same time as the fragment */
item = (ompi_list_item_t*)fragment->send_buf;
if( (*size) <= (5 * GM_BUF_SIZE) ) { /* small protocol */
size_t max_data;
int freeAfter;
unsigned int in_size;
struct iovec iov;
ompi_convertor_t *convertor = &(fragment->send_frag.frag_base.frag_convertor);
/* If we have an eager send then we should send the rest of the data. */
while( 0 == rc ) {
if( NULL == item ) {
OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
}
iov.iov_base = (char*)item + header_length;
iov.iov_len = GM_BUF_SIZE - header_length;
max_data = iov.iov_len;
in_size = 1;
if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0)
return OMPI_ERROR;
hdr.hdr_frag.hdr_frag_offset = update_offset;
hdr.hdr_frag.hdr_frag_length = iov.iov_len;
update_offset += iov.iov_len;
*(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag;
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, item,
GM_SIZE, iov.iov_len + sizeof(mca_ptl_gm_eager_header_t),
GM_LOW_PRIORITY, ptl_peer->local_id,
send_continue_callback, (void*)item );
item = NULL; /* force to retrieve a new one on the next loop */
}
*size = update_offset - offset;
if( !(flags & MCA_PTL_FLAGS_ACK) ) {
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
fragment->send_frag.frag_request,
(*size) );
}
} else {
/* Large set of data => we have to setup a rendez-vous protocol. Here we can
* use the match header already filled in by the upper level and just complete it
* with the others informations.
*/
/* For large messages I prefer to use the read semantics. It does not work well
* if the receiver does not post the message shortly after it receive the
* rdv (as the memory on the sender will be still locked). But the protocol
* is easier to implement.
*/
gm_status_t status;
ompi_ptr_t* local_address;
status = gm_register_memory( ptl_peer->peer_ptl->gm_port,
target_buffer /*sendreq->req_base.req_addr */,
(*size) );
if( status != GM_SUCCESS ) {
printf( "Cannot register memory from %p length %ud bytes\n",
(void*)sendreq->req_base.req_addr, (*size) );
return OMPI_ERROR;
}
*(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag;
local_address = (ompi_ptr_t*)((char*)item + header_length);
local_address->lval = 0;
local_address->pval = target_buffer /*(void*)sendreq->req_base.req_addr */;
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, item,
GM_SIZE, header_length + sizeof(ompi_ptr_t), GM_LOW_PRIORITY,
ptl_peer->local_id,
send_continue_callback, (void *)item );
}
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/* At this point the header is already filled up with informations for matching.
* Now depending on the quantity of data that have to be transfered and on the flags
* we will add more informations on the header.
*/
int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer, int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
mca_ptl_gm_send_frag_t *fragment, mca_ptl_gm_send_frag_t *fragment,
struct mca_pml_base_send_request_t *sendreq, struct mca_pml_base_send_request_t *sendreq,
@ -57,21 +186,52 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
int flags ) int flags )
{ {
struct iovec iov; struct iovec iov;
size_t size_in,size_out; size_t size_in, size_out;
int header_length; int header_length;
mca_ptl_base_header_t* header; mca_ptl_base_header_t* hdr;
ompi_convertor_t *convertor = NULL; ompi_convertor_t *convertor = NULL;
int rc, freeAfter; int rc, freeAfter;
unsigned int in_size, max_data; unsigned int in_size, max_data = 0;
header = (mca_ptl_base_header_t*)fragment->send_buf; hdr = (mca_ptl_base_header_t*)fragment->send_buf;
header_length = sizeof(mca_ptl_base_match_header_t);
size_in = *size; size_in = *size;
if( (size_in + header_length) <= GM_BUF_SIZE ) fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
iov.iov_len = size_in; fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
else fragment->send_frag.frag_request = sendreq;
iov.iov_len = GM_BUF_SIZE - header_length; fragment->already_send = 0;
/* At this point the header is already filled up with informations as a match header */
if( (flags & MCA_PTL_FLAGS_ACK) || (0 == offset) ) {
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
if( flags & MCA_PTL_FLAGS_ACK ) {
header_length = sizeof(mca_ptl_base_rendezvous_header_t);
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_RNDV;
hdr->hdr_rndv.hdr_frag_length = size_in;
hdr->hdr_rndv.hdr_src_ptr.lval = 0;
hdr->hdr_rndv.hdr_src_ptr.pval = fragment;
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
header_length = sizeof(mca_ptl_base_match_header_t);
}
} else {
header_length = sizeof(mca_ptl_base_frag_header_t);
hdr->hdr_frag.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_frag.hdr_common.hdr_flags = flags;
hdr->hdr_frag.hdr_frag_length = size_in;
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = fragment;
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
}
iov.iov_len = 0; /* nothing yet */
if( size_in > 0 ) { if( size_in > 0 ) {
/* first fragment (eager send) and first fragment of long protocol /* first fragment (eager send) and first fragment of long protocol
@ -91,112 +251,50 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
offset, NULL ); offset, NULL );
} }
/* if data is contigous convertor will return an offset if( (size_in + header_length) <= GM_BUF_SIZE )
* into users buffer - otherwise will return an allocated buffer iov.iov_len = size_in;
* that holds the packed data else
*/ iov.iov_len = GM_BUF_SIZE - header_length;
/* copy the data to the registered buffer */ /* copy the data to the registered buffer */
iov.iov_base = ((char*)fragment->send_buf) + header_length; iov.iov_base = ((char*)fragment->send_buf) + header_length;
max_data = iov.iov_len; max_data = iov.iov_len;
in_size = 1; in_size = 1;
if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0) if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0)
return OMPI_ERROR; return OMPI_ERROR;
}
header->hdr_match.hdr_msg_length = iov.iov_len;
fragment->send_frag.frag_base.frag_addr = ((char*)fragment->send_buf) + header_length;
fragment->send_frag.frag_base.frag_size = max_data;
} else {
fragment->send_frag.frag_base.frag_addr = NULL;
fragment->send_frag.frag_base.frag_size = 0;
}
/* adjust size and request offset to reflect actual number of bytes /* adjust size and request offset to reflect actual number of bytes
* packed by convertor * packed by convertor
*/ */
size_out = iov.iov_len + header_length; size_out = iov.iov_len + header_length;
DO_DEBUG( printf( "send pointer %p SIZE %d length %lu\n", DO_DEBUG( printf( "send pointer %p SIZE %d length %lu\n",
(void*)fragment->send_buf, GM_BUF_SIZE, size_out ) ); (void*)fragment->send_buf, GM_BUF_SIZE, size_out ) );
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super; /* must update the offset after actual fragment size is determined
fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer; * before attempting to send the fragment
fragment->send_frag.frag_base.frag_addr = ((char*)fragment->send_buf) + header_length; */
fragment->send_frag.frag_base.frag_size = size_out - header_length; mca_pml_base_send_request_offset( sendreq,
fragment->send_frag.frag_request = sendreq; fragment->send_frag.frag_base.frag_size );
fragment->already_send = 0; /* Send the first fragment */
/* initiate the gm send */
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf, gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf,
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id, GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
send_callback, (void *)fragment ); send_callback, (void *)fragment );
if( size_out <= GM_BUF_SIZE ) {
/* small message. All data went out */
/* ompi_request_complete(sendreq); */
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
fragment->send_frag.frag_request,
iov.iov_len );
/*gm_send_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf, if( !(flags & MCA_PTL_FLAGS_ACK) ) {
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id, ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
ptl_peer->port_number, send_callback, (void *)fragment );*/ fragment->send_frag.frag_request,
*size = iov.iov_len; size_out );
} else if( size_in <= (5 * GM_BUF_SIZE) ) { /* eager message */
ompi_list_item_t* item;
mca_ptl_gm_eager_header_t* header;
while( size_out < size_in ) {
OMPI_FREE_LIST_WAIT( &(ptl_peer->peer_ptl->gm_send_dma_frags), item, rc );
header = (mca_ptl_gm_eager_header_t*)item;
header->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
header->hdr_common.hdr_flags = 0;
header->hdr_src_ptr.pval = fragment;
iov.iov_base = (char*)header + sizeof(mca_ptl_gm_eager_header_t);
iov.iov_len = GM_BUF_SIZE - sizeof(mca_ptl_gm_eager_header_t);
max_data = iov.iov_len;
in_size = 1;
if((rc = ompi_convertor_pack(convertor, &(iov), &in_size, &max_data, &freeAfter)) < 0)
return OMPI_ERROR;
size_out += iov.iov_len;
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, header,
GM_SIZE, size_out, GM_LOW_PRIORITY,
ptl_peer->local_id,
send_callback, (void *)header );
}
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
fragment->send_frag.frag_request,
size_out );
*size = size_out;
} else { /* large message */
/* For large messages I prefer to use the read semantics. It does not work well
* if the receiver does not post the message shortly after it receive the
* rdv (as the memory on the sender will be still locked). But the protocol
* is easier to implement.
*/
gm_status_t status;
mca_ptl_gm_rdv_header_t* header = (mca_ptl_gm_rdv_header_t*)fragment->send_buf;
status = gm_register_memory( ptl_peer->peer_ptl->gm_port,
sendreq->req_base.req_addr,
size_in );
if( status != GM_SUCCESS ) {
printf( "Cannot register memory from %p length %ld bytes\n",
(void*)sendreq->req_base.req_addr, size_in );
return OMPI_ERROR;
}
header_length = sizeof(mca_ptl_gm_rdv_header_t);
/* Overwrite just in case ... */
header->hdr_match.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_RNDV;
header->hdr_match.hdr_common.hdr_flags = flags;
header->registered_memory.lval = 0;
header->registered_memory.pval = (void*)sendreq->req_base.req_addr;
header->fragment.lval = 0;
header->fragment.pval = (void*)fragment;
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, header,
GM_SIZE, header_length, GM_LOW_PRIORITY,
ptl_peer->local_id,
send_callback, (void *)header );
} }
*size = size_out - header_length;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -222,15 +320,6 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
ompi_atomic_add( &(ptl->num_send_tokens), 1 ); ompi_atomic_add( &(ptl->num_send_tokens), 1 );
putfrag->put_sent = 1; putfrag->put_sent = 1;
/* send the header information through send/receive channel */
#if 0
rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,send_req,
offset,&size,flags);
assert(rc == 0);
GM_DBG(PTL_GM_DBG_COMM,"FINISHED SENDING FIN\n");
GM_DBG(PTL_GM_DBG_COMM,"after issuing the put completion the request offset = %d\n",send_req->req_offset);
#endif
/* deregister the user memory */ /* deregister the user memory */
status = gm_deregister_memory(ptl->gm_port, (char *)(putfrag->registered_buf), bytes2); status = gm_deregister_memory(ptl->gm_port, (char *)(putfrag->registered_buf), bytes2);
@ -257,74 +346,49 @@ void send_callback( struct gm_port *port, void * context, gm_status_t status )
{ {
mca_ptl_gm_module_t *ptl; mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *frag; mca_ptl_gm_send_frag_t *frag;
mca_pml_base_send_request_t *gm_send_req;
mca_ptl_base_header_t* header; mca_ptl_base_header_t* header;
int hdr_type, hdr_flags;
size_t hdr_dst_size;
frag = (mca_ptl_gm_send_frag_t *)context; frag = (mca_ptl_gm_send_frag_t *)context;
ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner; ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
gm_send_req = frag->send_frag.frag_request;
header = (mca_ptl_base_header_t*)frag->send_buf; header = (mca_ptl_base_header_t*)frag->send_buf;
frag->send_buf = NULL;
hdr_type = header->hdr_common.hdr_type;
hdr_flags = header->hdr_common.hdr_flags;
hdr_dst_size = header->hdr_ack.hdr_dst_size;
switch (status) { switch (status) {
case GM_SUCCESS: case GM_SUCCESS:
DO_DEBUG( printf( "send_callback for data ptr %p\n", (void*)frag->send_buf ) ); /* release the send DMA buffer as soon as possible */
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)header));
/* release the send token */
ompi_atomic_add( &(ptl->num_send_tokens), 1 ); ompi_atomic_add( &(ptl->num_send_tokens), 1 );
switch( header->hdr_common.hdr_type ) { switch( hdr_type ) {
case MCA_PTL_HDR_TYPE_FRAG:
case MCA_PTL_HDR_TYPE_MATCH: case MCA_PTL_HDR_TYPE_MATCH:
/*ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request, if( !(hdr_flags & MCA_PTL_FLAGS_ACK) ) {
header->hdr_match.hdr_msg_length);*/ /* Return sendfrag to free list */
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
/* return the DMA memory */ }
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)frag->send_buf));
frag->send_buf = NULL;
/* Return sendfrag to free list */
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
break; break;
case MCA_PTL_HDR_TYPE_RNDV: case MCA_PTL_HDR_TYPE_RNDV:
/*OMPI_OUTPUT( (0, "[%s:%d] send_callback release header %p from fragment %p (available %d)\n",
__FILE__, __LINE__, (void*)header, (void*)frag, ptl->num_send_tokens) );*/
/* As we actually use the read semantics for long messages, we dont /* As we actually use the read semantics for long messages, we dont
* have to do anything special here. * have to do anything special here except to release the DMA memory buffer.
*/ */
break; break;
case MCA_PTL_HDR_TYPE_FRAG:
{
/* Use by eager messages. It contains just enough informations to be able
* to retrieve the fragment to whom it belong. I use the mca_ptl_gm_eager_header_t
* and I store in hdr_src_ptr the pointer to the fragment that have been
* generated the send operation.
*/
mca_ptl_gm_eager_header_t* header = (mca_ptl_gm_eager_header_t*)context;
frag = (mca_ptl_gm_send_frag_t*)(header->hdr_src_ptr.pval);
ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
frag->already_send += (GM_BUF_SIZE - sizeof(mca_ptl_gm_eager_header_t));
if( frag->already_send > frag->send_frag.frag_base.frag_size ) {
frag->send_buf = NULL;
/* I will update the status of the request just once when everything is
* already done. Dont waste the time :)
*/
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
frag->send_frag.frag_base.frag_size );
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
}
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)header));
}
break;
case MCA_PTL_HDR_TYPE_ACK: case MCA_PTL_HDR_TYPE_ACK:
/* return the DMA memory */ OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)frag->send_buf));
frag->send_buf = NULL;
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
break; break;
case MCA_PTL_HDR_TYPE_FIN: case MCA_PTL_HDR_TYPE_FIN:
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request, ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
header->hdr_ack.hdr_dst_size); hdr_dst_size);
/* return the DMA memory */ OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t*)frag));
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_dma_frags), ((ompi_list_item_t *)frag->send_buf));
frag->send_buf = NULL;
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
break; break;
default: default:
/* Not going to call progress on this send, /* Not going to call progress on this send,
@ -352,25 +416,21 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
mca_ptl_gm_send_frag_t * frag; mca_ptl_gm_send_frag_t * frag;
mca_pml_base_send_request_t *req; mca_pml_base_send_request_t *req;
mca_pml_base_recv_request_t *request; mca_pml_base_recv_request_t *request;
int bytes;
char * reg_buf;
int status; int status;
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) { if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) {
frag = (mca_ptl_gm_send_frag_t *)(header->hdr_ack.hdr_src_ptr.pval); frag = (mca_ptl_gm_send_frag_t *)(header->hdr_ack.hdr_src_ptr.pval);
req = (mca_pml_base_send_request_t *) frag->req; req = (mca_pml_base_send_request_t *) frag->req;
assert(req != NULL); assert(req != NULL);
req->req_peer_match.pval = header->hdr_ack.hdr_dst_match.pval; req->req_peer_match = header->hdr_ack.hdr_dst_match;
req->req_peer_addr.pval = header->hdr_ack.hdr_dst_addr.pval; req->req_peer_addr = header->hdr_ack.hdr_dst_addr;
req->req_peer_size = header->hdr_ack.hdr_dst_size; req->req_peer_size = header->hdr_ack.hdr_dst_size;
frag->wait_for_ack = 0; frag->wait_for_ack = 0;
bytes = frag->send_frag.frag_base.frag_size; if( (req->req_peer_size != 0) && (req->req_peer_addr.pval == NULL) ) {
if(frag->send_complete == 1) {
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
frag->send_frag.frag_request,bytes); frag->send_frag.frag_request,
frag->send_frag.frag_base.frag_size );
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag); OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag);
} }
} else if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) { } else if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) {
@ -381,9 +441,9 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
header->hdr_ack.hdr_dst_size, header->hdr_ack.hdr_dst_size,
header->hdr_ack.hdr_dst_size ); header->hdr_ack.hdr_dst_size );
/* deregister the memory */ /* deregister the memory */
bytes = header->hdr_ack.hdr_dst_size; status = gm_deregister_memory( ptl->gm_port,
reg_buf =(char *) header->hdr_ack.hdr_dst_addr.pval; header->hdr_ack.hdr_dst_addr.pval,
status = gm_deregister_memory(ptl->gm_port, reg_buf, bytes); header->hdr_ack.hdr_dst_size );
if(GM_SUCCESS != status) { if(GM_SUCCESS != status) {
ompi_output(0," unpinning memory failed\n"); ompi_output(0," unpinning memory failed\n");
@ -393,15 +453,20 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
} }
} }
/* We get a RNDV header in two situations:
* - when the remote node need a ack
* - when we set a rendez-vous protocol with the remote node.
* In both cases we have to send an ack back.
*/
static mca_ptl_gm_recv_frag_t* static mca_ptl_gm_recv_frag_t*
mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl, mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
gm_recv_event_t* event ) gm_recv_event_t* event )
{ {
mca_ptl_gm_recv_frag_t * recv_frag; mca_ptl_gm_recv_frag_t* recv_frag;
bool matched; bool matched;
mca_ptl_gm_rdv_header_t *header; mca_ptl_base_header_t* hdr;
header = (mca_ptl_gm_rdv_header_t*)gm_ntohp(event->recv.buffer); hdr = (mca_ptl_base_header_t*)gm_ntohp(event->recv.buffer);
/* allocate a receive fragment */ /* allocate a receive fragment */
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl ); recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
@ -415,18 +480,18 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
recv_frag->frag_ack_pending = false; recv_frag->frag_ack_pending = false;
recv_frag->frag_progressed = 0; recv_frag->frag_progressed = 0;
recv_frag->frag_recv.frag_base.frag_header.hdr_match = header->hdr_match; recv_frag->frag_recv.frag_base.frag_header.hdr_rndv = hdr->hdr_rndv;
if( MCA_PTL_HDR_TYPE_MATCH == header->hdr_match.hdr_common.hdr_type ) { if( MCA_PTL_HDR_TYPE_MATCH == hdr->hdr_rndv.hdr_match.hdr_common.hdr_type ) {
recv_frag->frag_recv.frag_base.frag_addr = recv_frag->frag_recv.frag_base.frag_addr =
(char *) header + sizeof(mca_ptl_base_match_header_t); (char *) hdr + sizeof(mca_ptl_base_match_header_t);
recv_frag->frag_recv.frag_base.frag_size = hdr->hdr_rndv.hdr_match.hdr_msg_length;
} else { } else {
assert( MCA_PTL_HDR_TYPE_RNDV == header->hdr_match.hdr_common.hdr_type ); assert( MCA_PTL_HDR_TYPE_RNDV == hdr->hdr_rndv.hdr_match.hdr_common.hdr_type );
recv_frag->frag_recv.frag_base.frag_addr = recv_frag->frag_recv.frag_base.frag_addr =
(char *) header + sizeof(mca_ptl_gm_rdv_header_t); (char *) hdr + sizeof(mca_ptl_base_rendezvous_header_t);
recv_frag->remote_registered_memory.lval = header->registered_memory.lval; recv_frag->frag_recv.frag_base.frag_size = hdr->hdr_rndv.hdr_frag_length;
} }
recv_frag->frag_recv.frag_base.frag_size = header->hdr_match.hdr_msg_length;
recv_frag->matched = false; recv_frag->matched = false;
recv_frag->have_allocated_buffer = false; recv_frag->have_allocated_buffer = false;
@ -436,49 +501,118 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
&(recv_frag->frag_recv), &(recv_frag->frag_recv),
&(recv_frag->frag_recv.frag_base.frag_header.hdr_match) ); &(recv_frag->frag_recv.frag_base.frag_header.hdr_match) );
if( matched ) { if( matched ) {
/* get some memory and copy the data inside. We can then release the receive buffer */
return NULL; return NULL;
} }
return recv_frag; return recv_frag;
} }
/* This function get called when the gm_get is finish (i.e. when the read from remote memory
* is completed. We have to send back the ack. If the original data was too large for just one
* fragment it will be split in severals. We have to send back for each of these fragments one
* ack.
*/
static void mca_ptl_gm_get_callback( struct gm_port *port, void * context, gm_status_t status )
{
mca_ptl_gm_module_t* gm_ptl;
mca_ptl_gm_recv_frag_t* frag;
mca_ptl_gm_send_frag_t *ack;
mca_pml_base_recv_request_t *request;
mca_ptl_gm_peer_t* peer;
int rc;
frag = (mca_ptl_gm_recv_frag_t*)context;
gm_ptl = (mca_ptl_gm_module_t *)frag->frag_recv.frag_base.frag_owner;
request = frag->frag_recv.frag_request;
peer = (mca_ptl_gm_peer_t*)frag->frag_recv.frag_base.frag_peer;
switch( status ) {
case GM_SUCCESS:
/*OMPI_OUTPUT( (0, "[%s:%d] mca_ptl_gm_get_callback release header %p from fragment %p (available %\d)\n",
__FILE__, __LINE__, (void*)header, (void*)frag, gm_ptl->num_send_tokens) );*/
ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL );
rc = mca_ptl_gm_send_ack_init( ack, gm_ptl,
(mca_ptl_gm_peer_t *)(frag->frag_recv.frag_base.frag_peer),
frag, NULL,
frag->frag_recv.frag_base.frag_size );
gm_send_to_peer_with_callback( ((mca_ptl_gm_module_t*)(ack->send_frag.frag_base.frag_owner))->gm_port,
ack->send_buf, GM_SIZE, sizeof(mca_ptl_base_ack_header_t),
GM_LOW_PRIORITY, peer->local_id, send_callback, (void*)ack );
break;
case GM_SEND_TIMED_OUT:
printf( "mca_ptl_gm_get_callback timed out\n" );
break;
case GM_SEND_DROPPED:
printf( "mca_ptl_gm_get_callback dropped\n" );
break;
default:
printf( "mca_ptl_gm_get_callback other error %d\n", status );
}
}
static mca_ptl_gm_recv_frag_t* static mca_ptl_gm_recv_frag_t*
mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl, mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
gm_recv_event_t* event ) gm_recv_event_t* event )
{ {
mca_ptl_gm_recv_frag_t * recv_frag; mca_pml_base_recv_request_t *request;
bool matched; ompi_convertor_t* convertor;
mca_ptl_base_header_t *header; mca_ptl_base_header_t *hdr;
struct iovec iov;
uint32_t iov_count, max_data;
int32_t freeAfter, rc;
mca_ptl_gm_recv_frag_t* recv_frag;
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.buffer); hdr = (mca_ptl_base_header_t *)gm_ntohp(event->recv.buffer);
/* allocate a receive fragment */ recv_frag = (mca_ptl_gm_recv_frag_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl ); request = (mca_pml_base_recv_request_t*)recv_frag->frag_recv.frag_request;
/* here we can have a synchronisation problem if several threads work in same time
* with the same request. The only question is if it's possible ?
*/
convertor = &(recv_frag->frag_recv.frag_base.frag_convertor);
ompi_convertor_init_for_recv( convertor, 0,
request->req_base.req_datatype,
request->req_base.req_count,
request->req_base.req_addr,
hdr->hdr_frag.hdr_frag_offset, NULL );
recv_frag->frag_recv.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl; if( hdr->hdr_frag.hdr_frag_length <= (GM_BUF_SIZE - sizeof(mca_ptl_base_frag_header_t)) ) {
recv_frag->frag_recv.frag_base.frag_peer = NULL; iov.iov_base = (char*)hdr + sizeof(mca_ptl_base_frag_header_t);
recv_frag->frag_recv.frag_request = NULL; iov.iov_len = hdr->hdr_frag.hdr_frag_length;
recv_frag->frag_recv.frag_is_buffered = false; iov_count = 1;
recv_frag->frag_hdr_cnt = 0; max_data = hdr->hdr_frag.hdr_frag_length;
recv_frag->frag_msg_cnt = 0; freeAfter = 0; /* unused here */
recv_frag->frag_ack_pending = false; rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter );
recv_frag->frag_progressed = 0; assert( 0 == freeAfter );
if( (hdr->hdr_frag.hdr_frag_offset + hdr->hdr_frag.hdr_frag_length) >=
recv_frag->frag_recv.frag_base.frag_size ) {
/* update the request status if we are done */
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data );
OBJ_RELEASE( recv_frag );
}
} else {
gm_status_t status;
ompi_ptr_t* remote_memory = (ompi_ptr_t*)((char*)hdr + sizeof(mca_ptl_base_frag_header_t));
mca_ptl_gm_peer_t* peer;
recv_frag->frag_recv.frag_base.frag_header = *header; status = gm_register_memory( ptl->gm_port,
recv_frag->frag_recv.frag_base.frag_addr = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset,
(char *) header + sizeof(mca_ptl_base_frag_header_t); hdr->hdr_frag.hdr_frag_length );
recv_frag->frag_recv.frag_base.frag_size = header->hdr_frag.hdr_frag_length; if( status != GM_SUCCESS ) {
printf( "Cannot register memory from %p length %lld bytes\n",
(void*)request->req_base.req_addr, hdr->hdr_frag.hdr_frag_length );
return NULL;
}
recv_frag->matched = false; gm_get( ptl->gm_port, remote_memory->lval,
recv_frag->have_allocated_buffer = false; recv_frag->frag_recv.frag_base.frag_addr,
recv_frag->ptl = ptl; recv_frag->frag_recv.frag_base.frag_size,
GM_HIGH_PRIORITY, peer->peer_addr->global_id, peer->peer_addr->port_id,
matched = ptl->super.ptl_match( &(ptl->super), mca_ptl_gm_get_callback, recv_frag );
&(recv_frag->frag_recv),
&(recv_frag->frag_recv.frag_base.frag_header.hdr_match) );
if( matched ) {
return NULL;
} }
return recv_frag;
return NULL;
} }
void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl ) void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl )
@ -532,8 +666,8 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( struct mca_ptl_gm_module_t *ptl, gm_
ptl_gm_ctrl_frag(ptl,header); ptl_gm_ctrl_frag(ptl,header);
break; break;
default: default:
ompi_output(0,"[%s:%d] unexpected frag type %d\n", ompi_output( 0, "[%s:%d] unexpected frag type %d\n",
__FILE__,__LINE__,header->hdr_common.hdr_type); __FILE__, __LINE__, header->hdr_common.hdr_type );
break; break;
} }
return frag; return frag;
@ -546,8 +680,8 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even
switch (gm_ntohc(event->recv.type)) { switch (gm_ntohc(event->recv.type)) {
case GM_RECV_EVENT: case GM_RECV_EVENT:
case GM_HIGH_RECV_EVENT:
case GM_PEER_RECV_EVENT: case GM_PEER_RECV_EVENT:
case GM_HIGH_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT: case GM_HIGH_PEER_RECV_EVENT:
mesg = gm_ntohp(event->recv.buffer); mesg = gm_ntohp(event->recv.buffer);
frag = ptl_gm_handle_recv( ptl, event ); frag = ptl_gm_handle_recv( ptl, event );

Просмотреть файл

@ -38,14 +38,14 @@ mca_ptl_gm_peer_send( struct mca_ptl_gm_peer_t *ptl_peer,
int flags ); int flags );
int int
mca_ptl_gm_peer_put( struct mca_ptl_gm_peer_t *ptl_peer, mca_ptl_gm_peer_send_continue( struct mca_ptl_gm_peer_t *ptl_peer,
struct mca_ptl_gm_send_frag_t *fragment, struct mca_ptl_gm_send_frag_t *fragment,
struct mca_pml_base_send_request_t *sendreq, struct mca_pml_base_send_request_t *sendreq,
size_t offset, size_t offset,
size_t *size, size_t *size,
int flags, int flags,
void *target_buffer, void *target_buffer,
int bytes ); int bytes );

Просмотреть файл

@ -124,7 +124,7 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
hdr->hdr_src_ptr.pval = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr.pval; hdr->hdr_src_ptr.pval = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr.pval;
hdr->hdr_dst_match.lval = 0; hdr->hdr_dst_match.lval = 0;
hdr->hdr_dst_match.pval = request; /*should this be dst_match */ hdr->hdr_dst_match.pval = frag;
hdr->hdr_dst_addr.lval = 0; /*we are filling both p and val of dest address */ hdr->hdr_dst_addr.lval = 0; /*we are filling both p and val of dest address */
hdr->hdr_dst_addr.pval = (void *)buffer; hdr->hdr_dst_addr.pval = (void *)buffer;
hdr->hdr_dst_size = size; hdr->hdr_dst_size = size;
@ -146,45 +146,38 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* putfrag, int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* putfrag,
struct mca_ptl_gm_peer_t * ptl_peer, struct mca_ptl_gm_peer_t* ptl_peer,
struct mca_ptl_gm_module_t * gm_ptl, struct mca_ptl_gm_module_t* gm_ptl,
struct mca_pml_base_send_request_t * request, struct mca_pml_base_send_request_t* sendreq,
size_t offset, size_t offset,
size_t* size, size_t* size,
int flags ) int flags )
{ {
mca_ptl_base_header_t *hdr; ompi_convertor_t *convertor = NULL;
hdr = (mca_ptl_base_header_t *)putfrag->send_buf;
putfrag->status = -1; putfrag->send_frag.frag_request = sendreq;
putfrag->type = -1; putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
putfrag->wait_for_ack = 0; putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t*)gm_ptl;
putfrag->put_sent = -1;
putfrag->send_complete = -1;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_ack.hdr_dst_match.lval = 0;
hdr->hdr_ack.hdr_dst_match.pval = request->req_peer_match.pval;
hdr->hdr_ack.hdr_dst_addr.lval = 0;
hdr->hdr_ack.hdr_dst_addr.pval = (void *)(request->req_peer_addr.pval);
hdr->hdr_ack.hdr_dst_size = *size;
hdr->hdr_ack.hdr_src_ptr.lval = 0;
hdr->hdr_ack.hdr_src_ptr.pval = (void*)putfrag;
putfrag->send_frag.frag_request = request;
putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer;
putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)gm_ptl;
putfrag->send_frag.frag_base.frag_addr = NULL; putfrag->send_frag.frag_base.frag_addr = NULL;
putfrag->send_frag.frag_base.frag_size = 0; putfrag->send_frag.frag_base.frag_size = 0;
putfrag->ptl = gm_ptl; if( (*size) > 0 ) {
convertor = &(putfrag->send_frag.frag_base.frag_convertor);
ompi_convertor_init_for_send( convertor, 0,
sendreq->req_base.req_datatype,
sendreq->req_base.req_count,
sendreq->req_base.req_addr,
offset, NULL );
}
putfrag->status = -1;
putfrag->send_complete = -1;
putfrag->wait_for_ack = 0;
putfrag->put_sent = 0;
putfrag->type = PUT;
putfrag->req = sendreq;
putfrag->ptl = gm_ptl;
putfrag->peer = ptl_peer;
putfrag->wait_for_ack = 0;
putfrag->put_sent = 0;
putfrag->type = PUT;
putfrag->req = request;
assert(putfrag->req != NULL);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -47,9 +47,8 @@ extern "C" {
* memory pointer. * memory pointer.
*/ */
struct mca_ptl_gm_rdv_header_t { struct mca_ptl_gm_rdv_header_t {
mca_ptl_base_match_header_t hdr_match; mca_ptl_base_rendezvous_header_t hdr_rndv;
ompi_ptr_t registered_memory; ompi_ptr_t registered_memory;
ompi_ptr_t fragment;
}; };
typedef struct mca_ptl_gm_rdv_header_t mca_ptl_gm_rdv_header_t; typedef struct mca_ptl_gm_rdv_header_t mca_ptl_gm_rdv_header_t;