1
1

Decrease the latency. A new protocol for long messages. It's just the begining as it still can be improved but

it increase the bandwidth by nearly 10%. Now we are very close to the NetPipe (GM) bandwidth.

This commit was SVN r3873.
Этот коммит содержится в:
George Bosilca 2004-12-23 00:32:32 +00:00
родитель bfa4f158b2
Коммит 6e60619434
4 изменённых файлов: 81 добавлений и 57 удалений

Просмотреть файл

@ -337,7 +337,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
{
mca_pml_base_recv_request_t *request;
mca_ptl_base_header_t *hdr;
int bytes_recv, rc, header_length;
int rc;
mca_ptl_gm_module_t *gm_ptl;
mca_ptl_gm_send_frag_t *ack;
mca_ptl_gm_recv_frag_t *recv_frag;
@ -349,13 +349,6 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
recv_frag = (mca_ptl_gm_recv_frag_t *)frag;
peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer;
if( MCA_PTL_HDR_TYPE_RNDV == hdr->hdr_common.hdr_type ) {
header_length = sizeof(mca_ptl_base_rendezvous_header_t);
} else {
assert( MCA_PTL_HDR_TYPE_MATCH == hdr->hdr_common.hdr_type );
header_length = sizeof(mca_ptl_base_match_header_t);
}
if( hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK ) {
/* need to send an ack back */
ack = mca_ptl_gm_alloc_send_frag( gm_ptl, NULL );
@ -382,16 +375,14 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
}
}
bytes_recv = frag->frag_base.frag_size;
if( frag->frag_base.frag_size > 0 ) {
unsigned int max_data, out_size;
int freeAfter;
struct iovec iov;
/* Here we expect that frag_addr is the begin of the buffer header included */
iov.iov_base = ((char*)frag->frag_base.frag_addr);
iov.iov_len = bytes_recv;
iov.iov_base = frag->frag_base.frag_addr;
iov.iov_len = frag->frag_base.frag_size;
ompi_convertor_copy( peer->peer_proc->proc_ompi->proc_convertor,
&frag->frag_base.frag_convertor);
@ -409,7 +400,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
}
/* update progress*/
ptl->ptl_recv_progress( ptl, request, bytes_recv, bytes_recv );
ptl->ptl_recv_progress( ptl, request, frag->frag_base.frag_size, frag->frag_base.frag_size );
/* Now update the status of the fragment */
if( ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer == true ) {

Просмотреть файл

@ -65,6 +65,22 @@ static void send_continue_callback( struct gm_port *port, void * context, gm_sta
}
}
static void send_continue_short_callback( struct gm_port* port, void* context, gm_status_t status )
{
mca_ptl_gm_module_t* gm_ptl;
mca_ptl_gm_send_frag_t* frag;
mca_ptl_base_header_t* header;
header = (mca_ptl_base_header_t*)context;
frag = header->hdr_frag.hdr_src_ptr.pval;
gm_ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((ompi_list_item_t*)header) );
/* release the send token */
ompi_atomic_add( &(gm_ptl->num_send_tokens), 1 );
}
int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
mca_ptl_gm_send_frag_t *fragment,
struct mca_pml_base_send_request_t *sendreq,
@ -77,10 +93,6 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
ompi_list_item_t* item;
int rc = 0;
/*OMPI_OUTPUT( (0, "[%s:%d]mca_ptl_gm_peer_send_continue peer %p fragment %p send request %p\n\toffset %ld size %ld flags %d target buffer %p bytes %d\n",
__FILE__, __LINE__, (void*)ptl_peer, (void*)fragment, (void*)sendreq, offset, *size,
flags, target_buffer, bytes) );*/
fragment->send_frag.frag_base.frag_size = *size;
fragment->frag_bytes_processed = 0;
fragment->frag_offset = offset;
@ -132,7 +144,9 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
fragment->frag_offset += iov.iov_len;
remaining_bytes -= iov.iov_len;
if( remaining_bytes == 0 ) hdr.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
if( remaining_bytes == 0 ) {
hdr.hdr_common.hdr_flags |= PTL_FLAG_GM_LAST_FRAGMENT;
}
*(mca_ptl_base_frag_header_t*)item = hdr.hdr_frag;
/* for the last piece set the header type to FIN */
@ -156,6 +170,22 @@ int mca_ptl_gm_peer_send_continue( mca_ptl_gm_peer_t *ptl_peer,
*/
gm_status_t status;
ompi_ptr_t* local_address;
ompi_list_item_t* item2;
OMPI_FREE_LIST_TRY_GET( &(ptl_peer->peer_ptl->gm_send_dma_frags), item2 );
if( NULL != item2 ) {
ompi_atomic_sub( &(ptl_peer->peer_ptl->num_send_tokens), 1 );
*(mca_ptl_base_frag_header_t*)item2 = hdr.hdr_frag;
local_address = (ompi_ptr_t*)((char*)item2 + header_length);
local_address->lval = 0;
local_address->pval = NULL;
gm_send_to_peer_with_callback( ptl_peer->peer_ptl->gm_port, item2,
GM_SIZE, header_length + sizeof(ompi_ptr_t), GM_LOW_PRIORITY,
ptl_peer->local_id,
send_continue_short_callback, (void *)item2 );
}
status = gm_register_memory( ptl_peer->peer_ptl->gm_port,
fragment->send_frag.frag_base.frag_addr,
@ -201,7 +231,7 @@ int mca_ptl_gm_peer_send( struct mca_ptl_base_module_t* ptl,
int rc, freeAfter;
unsigned int in_size, max_data = 0;
mca_ptl_gm_send_frag_t *fragment;
mca_ptl_gm_peer_t* ptl_peer;
mca_ptl_gm_peer_t* ptl_peer = (mca_ptl_gm_peer_t*)ptl_base_peer;
fragment = mca_ptl_gm_alloc_send_frag( (mca_ptl_gm_module_t*)ptl, sendreq );
if( NULL == fragment ) {
@ -313,7 +343,7 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
header = (mca_ptl_base_header_t*)putfrag->send_buf;
bytes2 = header->hdr_ack.hdr_dst_size;
ptl = (mca_ptl_gm_module_t*)putfrag->send_frag.frag_base.frag_owner;
send_req = putfrag->req;
send_req = putfrag->send_frag.frag_request;
switch (status) {
case GM_SUCCESS:
@ -416,7 +446,7 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
frag = (mca_ptl_gm_send_frag_t*)(header->hdr_ack.hdr_src_ptr.pval);
/* update the fragment header with the most up2date informations */
frag->send_frag.frag_base.frag_header.hdr_ack.hdr_dst_match = header->hdr_ack.hdr_dst_match;
req = (mca_pml_base_send_request_t*)frag->req;
req = frag->send_frag.frag_request;
assert(req != NULL);
req->req_peer_match = header->hdr_ack.hdr_dst_match;
req->req_peer_addr = header->hdr_ack.hdr_dst_addr;
@ -459,12 +489,12 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
if( MCA_PTL_HDR_TYPE_MATCH == hdr->hdr_rndv.hdr_match.hdr_common.hdr_type ) {
recv_frag->frag_recv.frag_base.frag_addr =
(char *) hdr + sizeof(mca_ptl_base_match_header_t);
(char*)hdr + sizeof(mca_ptl_base_match_header_t);
recv_frag->frag_recv.frag_base.frag_size = hdr->hdr_rndv.hdr_match.hdr_msg_length;
} else {
assert( MCA_PTL_HDR_TYPE_RNDV == hdr->hdr_rndv.hdr_match.hdr_common.hdr_type );
recv_frag->frag_recv.frag_base.frag_addr =
(char *) hdr + sizeof(mca_ptl_base_rendezvous_header_t);
(char*)hdr + sizeof(mca_ptl_base_rendezvous_header_t);
recv_frag->frag_recv.frag_base.frag_size = hdr->hdr_rndv.hdr_frag_length;
}
@ -595,6 +625,7 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
ompi_proc_t* proc = ompi_comm_peer_lookup( request->req_base.req_comm,
request->req_base.req_ompi.req_status.MPI_SOURCE );
convertor = &local_convertor;
convertor->stack_size = 0; /* dont let the convertor free the stack */
ompi_convertor_copy( proc->proc_convertor, convertor );
recv_frag = NULL;
} else { /* large message => we have to create a receive fragment */
@ -623,36 +654,40 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
freeAfter = 0; /* unused here */
rc = ompi_convertor_unpack( convertor, &iov, &iov_count, &max_data, &freeAfter );
assert( 0 == freeAfter );
if( PTL_FLAG_GM_LAST_FRAGMENT & hdr->hdr_common.hdr_flags ) {
/* All the data transferred. Update the receive request */
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data );
}
ptl->super.ptl_recv_progress( (mca_ptl_base_module_t*)ptl, request, max_data, max_data );
} else {
gm_status_t status;
ompi_ptr_t* remote_memory = (ompi_ptr_t*)((char*)hdr + sizeof(mca_ptl_base_frag_header_t));
mca_ptl_gm_peer_t* peer;
status = gm_register_memory( ptl->gm_port,
(char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset,
hdr->hdr_frag.hdr_frag_length );
if( status != GM_SUCCESS ) {
ompi_output( 0, "Cannot register memory from %p length %lld bytes\n",
(void*)request->req_base.req_addr, hdr->hdr_frag.hdr_frag_length );
return NULL;
/* We will send two messages. The first with a NULL pointer just to allow the remote node
* to start registering the memory and then with the second message we will really
* send the pinned down memory location.
*/
if( NULL == remote_memory->pval ) {
status = gm_register_memory( ptl->gm_port,
(char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset,
hdr->hdr_frag.hdr_frag_length );
if( status != GM_SUCCESS ) {
ompi_output( 0, "Cannot register memory from %p length %lld bytes\n",
(void*)request->req_base.req_addr, hdr->hdr_frag.hdr_frag_length );
return NULL;
}
} else {
if( NULL == recv_frag->frag_recv.frag_base.frag_peer ) {
recv_frag->frag_recv.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)
mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm,
request->req_base.req_ompi.req_status.MPI_SOURCE,
(struct mca_ptl_base_module_t*)ptl );
}
peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer;
recv_frag->frag_recv.frag_base.frag_addr = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset;
gm_get( ptl->gm_port, remote_memory->lval,
recv_frag->frag_recv.frag_base.frag_addr,
recv_frag->frag_recv.frag_base.frag_size,
GM_LOW_PRIORITY, peer->local_id, peer->port_number,
mca_ptl_gm_get_callback, recv_frag );
}
if( NULL == recv_frag->frag_recv.frag_base.frag_peer ) {
recv_frag->frag_recv.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)
mca_pml_teg_proc_lookup_remote_peer( request->req_base.req_comm,
request->req_base.req_ompi.req_status.MPI_SOURCE,
(struct mca_ptl_base_module_t*)ptl );
}
peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer;
recv_frag->frag_recv.frag_base.frag_addr = (char*)request->req_base.req_addr + hdr->hdr_frag.hdr_frag_offset;
gm_get( ptl->gm_port, remote_memory->lval,
recv_frag->frag_recv.frag_base.frag_addr,
recv_frag->frag_recv.frag_base.frag_size,
GM_LOW_PRIORITY, peer->local_id, peer->port_number,
mca_ptl_gm_get_callback, recv_frag );
}
return NULL;

Просмотреть файл

@ -76,7 +76,7 @@ mca_ptl_gm_alloc_send_frag( struct mca_ptl_gm_module_t *ptl,
assert( ptl->num_send_tokens >= 0 );
sendfrag->send_buf = (void*)item;
sendfrag->req = sendreq;
sendfrag->send_frag.frag_request = sendreq;
sendfrag->send_frag.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl;
sendfrag->frag_bytes_processed = 0;
sendfrag->status = -1;
@ -161,13 +161,13 @@ int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* putfrag,
offset, NULL );
}
putfrag->status = -1;
putfrag->wait_for_ack = 0;
putfrag->put_sent = 0;
putfrag->type = PUT;
putfrag->req = sendreq;
putfrag->status = -1;
putfrag->wait_for_ack = 0;
putfrag->put_sent = 0;
putfrag->type = PUT;
putfrag->send_frag.frag_request = sendreq;
putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t*)gm_ptl;
putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -59,7 +59,6 @@ extern "C" {
*/
struct mca_ptl_gm_send_frag_t {
mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */
struct mca_pml_base_send_request_t *req;
void* send_buf;
ompi_ptr_t* registered_buf;
@ -74,7 +73,6 @@ extern "C" {
struct mca_ptl_gm_recv_frag_t {
mca_ptl_base_recv_frag_t frag_recv;
struct mca_pml_base_recv_request_t* req;
size_t frag_bytes_processed;
size_t frag_offset;
volatile int frag_progressed;
@ -112,7 +110,7 @@ extern "C" {
{ \
item = NULL; \
if(ompi_using_threads()) { \
if( ompi_atomic_trylock(&((fl)->fl_lock)) ) { \
if( ompi_mutex_trylock( &((fl)->fl_lock)) ) { \
/* We get the lock. Now let's remove one of the elements */ \
item = ompi_list_remove_first(&((fl)->super)); \
ompi_mutex_unlock(&((fl)->fl_lock)); \