Use the new datatype engine and the convertor attached to the requests (send and receive).
This commit was SVN r5989.
Этот коммит содержится в:
родитель
8cc028fee9
Коммит
6d4624e5d4
@ -24,9 +24,6 @@ include $(top_ompi_srcdir)/config/Makefile.options
|
||||
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
||||
# (for static builds).
|
||||
|
||||
AM_CPPFLAGS = -I$(top_ompi_builddir)/src/include -I$(top_ompi_builddir)/include \
|
||||
-I$(top_ompi_srcdir)/src -I$(top_ompi_srcdir)/src/include
|
||||
|
||||
#if OMPI_ENABLE_GM_CACHE
|
||||
# additional_source = ptl_gm_regcache.c
|
||||
#else
|
||||
|
@ -166,10 +166,10 @@ mca_ptl_gm_finalize (struct mca_ptl_base_module_t *base_ptl)
|
||||
mca_ptl_gm_module_t* ptl = (mca_ptl_gm_module_t*)base_ptl;
|
||||
|
||||
for( index = 0; index < mca_ptl_gm_component.gm_num_ptl_modules; index++ ) {
|
||||
if( mca_ptl_gm_component.gm_ptl_modules[index] == ptl ) {
|
||||
mca_ptl_gm_component.gm_ptl_modules[index] = NULL;
|
||||
if( mca_ptl_gm_component.gm_ptl_modules[index] == ptl ) {
|
||||
mca_ptl_gm_component.gm_ptl_modules[index] = NULL;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( index == mca_ptl_gm_component.gm_num_ptl_modules ) {
|
||||
@ -359,16 +359,16 @@ mca_ptl_gm_matched( mca_ptl_base_module_t* ptl,
|
||||
assert( gm_ptl->num_send_tokens >= 0 );
|
||||
hdr = (mca_ptl_base_header_t*)item;
|
||||
|
||||
hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
|
||||
hdr->hdr_ack.hdr_common.hdr_flags = frag->frag_base.frag_header.hdr_common.hdr_flags;
|
||||
hdr->hdr_ack.hdr_src_ptr = frag->frag_base.frag_header.hdr_rndv.hdr_src_ptr;
|
||||
hdr->hdr_ack.hdr_dst_match.lval = 0L;
|
||||
hdr->hdr_ack.hdr_dst_match.pval = request;
|
||||
hdr->hdr_ack.hdr_dst_addr.lval = 0L;
|
||||
hdr->hdr_ack.hdr_dst_addr.pval = ptl; /* local use */
|
||||
hdr->hdr_ack.hdr_dst_size = request->req_recv.req_bytes_packed;
|
||||
hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
|
||||
hdr->hdr_ack.hdr_common.hdr_flags = frag->frag_base.frag_header.hdr_common.hdr_flags;
|
||||
hdr->hdr_ack.hdr_src_ptr = frag->frag_base.frag_header.hdr_rndv.hdr_src_ptr;
|
||||
hdr->hdr_ack.hdr_dst_match.lval = 0L;
|
||||
hdr->hdr_ack.hdr_dst_match.pval = request;
|
||||
hdr->hdr_ack.hdr_dst_addr.lval = 0L;
|
||||
hdr->hdr_ack.hdr_dst_addr.pval = ptl; /* local use */
|
||||
hdr->hdr_ack.hdr_dst_size = request->req_recv.req_bytes_packed;
|
||||
|
||||
gm_send_with_callback( ((mca_ptl_gm_module_t*)ptl)->gm_port, hdr,
|
||||
gm_send_with_callback( ((mca_ptl_gm_module_t*)ptl)->gm_port, hdr,
|
||||
GM_SIZE, sizeof(mca_ptl_base_ack_header_t),
|
||||
GM_LOW_PRIORITY,
|
||||
peer->peer_addr.local_id,
|
||||
@ -379,24 +379,20 @@ mca_ptl_gm_matched( mca_ptl_base_module_t* ptl,
|
||||
}
|
||||
|
||||
if( frag->frag_base.frag_size > 0 ) {
|
||||
unsigned int max_data, out_size;
|
||||
int freeAfter;
|
||||
ompi_convertor_t* convertor;
|
||||
uint32_t out_size;
|
||||
int32_t freeAfter;
|
||||
size_t max_data;
|
||||
|
||||
iov.iov_len = recv_frag->attached_data_length;
|
||||
/* Here we expect that frag_addr is the begin of the buffer header included */
|
||||
iov.iov_base = frag->frag_base.frag_addr;
|
||||
|
||||
ompi_convertor_copy( peer->peer_proc->proc_ompi->proc_convertor,
|
||||
&frag->frag_base.frag_convertor );
|
||||
ompi_convertor_init_for_recv( &frag->frag_base.frag_convertor, 0,
|
||||
request->req_recv.req_base.req_datatype,
|
||||
request->req_recv.req_base.req_count,
|
||||
request->req_recv.req_base.req_addr,
|
||||
0, NULL );
|
||||
|
||||
convertor = &(request->req_recv.req_convertor);
|
||||
|
||||
out_size = 1;
|
||||
max_data = iov.iov_len;
|
||||
rc = ompi_convertor_unpack( &frag->frag_base.frag_convertor, &(iov),
|
||||
&out_size, &max_data, &freeAfter );
|
||||
rc = ompi_convertor_unpack( convertor, &(iov), &out_size, &max_data, &freeAfter );
|
||||
assert( rc >= 0 );
|
||||
recv_frag->frag_bytes_processed += max_data;
|
||||
}
|
||||
|
@ -278,7 +278,8 @@ int mca_ptl_gm_send_internal_rndv_header( mca_ptl_gm_peer_t *ptl_peer,
|
||||
int flags )
|
||||
{
|
||||
struct iovec iov;
|
||||
uint32_t max_data, in_size;
|
||||
uint32_t in_size;
|
||||
size_t max_data;
|
||||
int32_t freeAfter;
|
||||
ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor);
|
||||
|
||||
@ -324,7 +325,8 @@ int mca_ptl_gm_send_burst_data( mca_ptl_gm_peer_t *ptl_peer,
|
||||
int32_t flags )
|
||||
{
|
||||
int32_t freeAfter, rc;
|
||||
uint32_t max_data, in_size;
|
||||
uint32_t in_size;
|
||||
size_t max_data;
|
||||
struct iovec iov;
|
||||
ompi_convertor_t *convertor = &(fragment->frag_send.frag_base.frag_convertor);
|
||||
|
||||
@ -357,8 +359,8 @@ int mca_ptl_gm_send_burst_data( mca_ptl_gm_peer_t *ptl_peer,
|
||||
fragment->frag_bytes_processed += max_data;
|
||||
fragment->frag_bytes_validated += max_data;
|
||||
burst_length -= max_data;
|
||||
if( 0 == burst_length ) {
|
||||
assert( fragment->frag_send.frag_base.frag_size == fragment->frag_bytes_processed );
|
||||
if( fragment->frag_send.frag_base.frag_size == fragment->frag_bytes_processed ) {
|
||||
assert( 0 == burst_length );
|
||||
hdr->hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
|
||||
}
|
||||
/* for the last piece set the header type to FIN */
|
||||
@ -374,11 +376,11 @@ int mca_ptl_gm_send_burst_data( mca_ptl_gm_peer_t *ptl_peer,
|
||||
}
|
||||
|
||||
int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
mca_ptl_gm_send_frag_t *fragment,
|
||||
struct mca_ptl_base_send_request_t *sendreq,
|
||||
size_t offset,
|
||||
size_t *size,
|
||||
int flags )
|
||||
mca_ptl_gm_send_frag_t *fragment,
|
||||
struct mca_ptl_base_send_request_t *sendreq,
|
||||
size_t offset,
|
||||
size_t *size,
|
||||
int flags )
|
||||
{
|
||||
mca_ptl_gm_frag_header_t* hdr;
|
||||
uint64_t remaining_bytes, burst_length;
|
||||
@ -405,14 +407,14 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
burst_length = remaining_bytes;
|
||||
} else {
|
||||
#if OMPI_MCA_PTL_GM_HAVE_RDMA_GET
|
||||
if( remaining_bytes < mca_ptl_gm_component.gm_rndv_burst_limit ) {
|
||||
burst_length = remaining_bytes % (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t));
|
||||
} else {
|
||||
if( mca_ptl_gm_component.gm_rdma_frag_size == UINT_MAX )
|
||||
burst_length = 0;
|
||||
else
|
||||
burst_length = remaining_bytes % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
}
|
||||
if( remaining_bytes < mca_ptl_gm_component.gm_rndv_burst_limit ) {
|
||||
burst_length = remaining_bytes % (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t));
|
||||
} else {
|
||||
if( mca_ptl_gm_component.gm_rdma_frag_size == UINT_MAX )
|
||||
burst_length = 0;
|
||||
else
|
||||
burst_length = remaining_bytes % mca_ptl_gm_component.gm_rdma_frag_size;
|
||||
}
|
||||
#else
|
||||
/*burst_length = remaining_bytes % (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t));*/
|
||||
burst_length = (mca_ptl_gm_component.gm_segment_size - sizeof(mca_ptl_base_frag_header_t));
|
||||
@ -431,7 +433,7 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
fragment->frag_send.frag_request,
|
||||
(*size) );
|
||||
OMPI_FREE_LIST_RETURN( &(ptl_peer->peer_ptl->gm_send_frags), ((ompi_list_item_t*)fragment) );
|
||||
OMPI_FREE_LIST_RETURN( &(ptl_peer->peer_ptl->gm_send_frags), ((ompi_list_item_t*)fragment) );
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -512,15 +514,16 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
|
||||
{
|
||||
const int header_length = sizeof(mca_ptl_base_rendezvous_header_t);
|
||||
mca_ptl_base_header_t* hdr;
|
||||
mca_ptl_gm_module_t* ptl_gm = (mca_ptl_gm_module_t*)ptl;
|
||||
ompi_convertor_t *convertor = NULL;
|
||||
int rc, freeAfter;
|
||||
unsigned int max_data = 0;
|
||||
size_t max_data = 0;
|
||||
mca_ptl_gm_peer_t* ptl_peer = (mca_ptl_gm_peer_t*)ptl_base_peer;
|
||||
ompi_list_item_t *item;
|
||||
char* sendbuf;
|
||||
|
||||
OMPI_FREE_LIST_WAIT( &(((mca_ptl_gm_module_t*)ptl)->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(((mca_ptl_gm_module_t*)ptl)->num_send_tokens), 1 );
|
||||
OMPI_FREE_LIST_WAIT( &(ptl_gm->gm_send_dma_frags), item, rc );
|
||||
ompi_atomic_sub( &(ptl_gm->num_send_tokens), 1 );
|
||||
sendbuf = (char*)item;
|
||||
|
||||
hdr = (mca_ptl_base_header_t*)item;
|
||||
@ -531,14 +534,13 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
|
||||
|
||||
if( size > 0 ) {
|
||||
struct iovec iov;
|
||||
unsigned int iov_count = 1;
|
||||
uint32_t iov_count = 1;
|
||||
|
||||
convertor = &sendreq->req_send.req_convertor;
|
||||
/* personalize the convertor */
|
||||
ompi_convertor_init_for_send( convertor, 0, sendreq->req_send.req_base.req_datatype,
|
||||
sendreq->req_send.req_base.req_count,
|
||||
sendreq->req_send.req_base.req_addr,
|
||||
0, NULL );
|
||||
/* We send here the first fragment, and the convertor does not need any
|
||||
* particular options. Thus, we can use the one already prepared on the
|
||||
* request.
|
||||
*/
|
||||
|
||||
if( (size + header_length) <= mca_ptl_gm_component.gm_segment_size )
|
||||
iov.iov_len = size;
|
||||
@ -557,16 +559,14 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
|
||||
*/
|
||||
mca_ptl_base_send_request_offset( sendreq, max_data );
|
||||
}
|
||||
|
||||
/* Send the first fragment */
|
||||
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, hdr,
|
||||
gm_send_with_callback( ptl_gm->gm_port, hdr,
|
||||
GM_SIZE, max_data + header_length, GM_LOW_PRIORITY,
|
||||
ptl_peer->peer_addr.local_id, ptl_peer->peer_addr.port_id,
|
||||
send_match_callback, (void *)hdr );
|
||||
|
||||
if( !(flags & MCA_PTL_FLAGS_ACK) ) {
|
||||
ptl_peer->peer_ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl_peer->peer_ptl,
|
||||
sendreq, max_data );
|
||||
ptl->ptl_send_progress( ptl, sendreq, max_data );
|
||||
DO_DEBUG( ompi_output( 0, "sender %d complete request %p w/o rndv with %d bytes",
|
||||
orte_process_info.my_name->vpid, sendreq, max_data ); );
|
||||
} else {
|
||||
@ -631,7 +631,7 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
|
||||
if( true == matched ) return NULL; /* done and fragment already removed */
|
||||
|
||||
/* get some memory and copy the data inside. We can then release the receive buffer */
|
||||
if( 0 != (recv_frag->attached_data_length) ) {
|
||||
if( 0 != recv_frag->attached_data_length ) {
|
||||
char* ptr = (char*)gm_get_local_buffer();
|
||||
recv_frag->have_allocated_buffer = true;
|
||||
memcpy( ptr, recv_frag->frag_recv.frag_base.frag_addr, recv_frag->attached_data_length );
|
||||
@ -696,7 +696,8 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
mca_ptl_base_recv_request_t *request;
|
||||
ompi_convertor_t local_convertor, *convertor;
|
||||
struct iovec iov;
|
||||
uint32_t iov_count, max_data = 0, header_length;
|
||||
uint32_t iov_count, header_length;
|
||||
size_t max_data = 0;
|
||||
int32_t freeAfter, rc;
|
||||
mca_ptl_gm_recv_frag_t* frag;
|
||||
|
||||
@ -717,11 +718,10 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
|
||||
if( hdr->hdr_frag.hdr_frag_length <= (mca_ptl_gm_component.gm_segment_size -
|
||||
sizeof(mca_ptl_base_frag_header_t)) ) {
|
||||
ompi_proc_t* proc = ompi_comm_peer_lookup( request->req_recv.req_base.req_comm,
|
||||
request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE );
|
||||
convertor = &local_convertor;
|
||||
convertor->stack_size = 0; /* dont let the convertor free the stack */
|
||||
ompi_convertor_copy( proc->proc_convertor, convertor );
|
||||
request->req_recv.req_base.req_proc =
|
||||
ompi_comm_peer_lookup( request->req_recv.req_base.req_comm,
|
||||
request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE );
|
||||
frag = NULL;
|
||||
} else { /* large message => we have to create a receive fragment */
|
||||
frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
|
||||
@ -745,11 +745,12 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t* ptl,
|
||||
DO_DEBUG( ompi_output( 0, "receiver %d create fragment with offset %lld and length %lld",
|
||||
orte_process_info.my_name->vpid, frag->frag_offset, frag->frag_recv.frag_base.frag_size ); );
|
||||
}
|
||||
ompi_convertor_init_for_recv( convertor, 0,
|
||||
request->req_recv.req_base.req_datatype,
|
||||
request->req_recv.req_base.req_count,
|
||||
request->req_recv.req_base.req_addr,
|
||||
hdr->hdr_frag.hdr_frag_offset, NULL );
|
||||
/* GM does not use any of the convertor specializations, so we can just clone the
|
||||
* standard convertor attached to the request and set the position.
|
||||
*/
|
||||
ompi_convertor_clone_with_position( &(request->req_recv.req_convertor),
|
||||
convertor, 1,
|
||||
&(hdr->hdr_frag.hdr_frag_offset) );
|
||||
}
|
||||
|
||||
if( header_length != msg_len ) {
|
||||
|
@ -90,12 +90,12 @@ int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t** putfrag,
|
||||
|
||||
if( (*size) > 0 ) {
|
||||
convertor = &(frag->frag_send.frag_base.frag_convertor);
|
||||
ompi_convertor_copy( &(sendreq->req_send.req_convertor), convertor );
|
||||
ompi_convertor_init_for_send( convertor, 0,
|
||||
sendreq->req_send.req_datatype,
|
||||
sendreq->req_send.req_count,
|
||||
sendreq->req_send.req_addr,
|
||||
offset, NULL );
|
||||
/* GM use the default parameters for the convertor without any special memory
|
||||
* allocation function. We have to call the prepare_for_send in order to
|
||||
* initialize the missing parameters of the convertor.
|
||||
*/
|
||||
ompi_convertor_clone_with_position( &(sendreq->req_send.req_convertor), convertor, 1,
|
||||
&offset );
|
||||
}
|
||||
*putfrag = frag;
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user