1
1

Just a checkpoint. Now compile with the new header.

Improuvement:
- latency went down by 15 us. It's still a little bit hist 12 but I'm working on that.
- correctly initialize and cleaups independing on the number of myrinet cards.
- associate one PTL by card and one thread by PTL if we are in a MT environment.

This commit was SVN r3609.
Этот коммит содержится в:
George Bosilca 2004-11-18 00:23:19 +00:00
родитель 89978d698b
Коммит 33426c5af1
7 изменённых файлов: 273 добавлений и 254 удалений

Просмотреть файл

@ -136,6 +136,18 @@ mca_ptl_gm_finalize (struct mca_ptl_base_module_t *base_ptl)
uint32_t index;
mca_ptl_gm_module_t* ptl = (mca_ptl_gm_module_t*)base_ptl;
for( index = 0; index < mca_ptl_gm_component.gm_num_ptl_modules; index++ ) {
if( mca_ptl_gm_component.gm_ptl_modules[index] == ptl ) {
mca_ptl_gm_component.gm_ptl_modules[index] = NULL;
break;
}
}
if( index == mca_ptl_gm_component.gm_num_ptl_modules ) {
ompi_output( 0, "%p is not a GM PTL !!!\n", (void*)base_ptl );
return OMPI_ERROR;
}
/* we should do the same things as in the init step in reverse order.
* First we shutdown all threads if there are any.
*/
@ -147,18 +159,47 @@ mca_ptl_gm_finalize (struct mca_ptl_base_module_t *base_ptl)
ompi_thread_join( &(ptl->thread), &thread_return );
}
#endif /* OMPI_HAVE_POSIX_THREADS */
for( index = 0; index < mca_ptl_gm_component.gm_num_ptl_modules; index++ ) {
if( mca_ptl_gm_component.gm_ptl_modules[index] == ptl ) {
mca_ptl_gm_component.gm_ptl_modules[index] = NULL;
OMPI_OUTPUT((0, "GM ptl %p succesfully finalized\n", (void*)ptl));
}
/* Closing each port require several steps. As there is no way to cancel all
* already posted messages we start by unregistering all memory and then close
* the port. After we can release all internal data.
*/
if( ptl->gm_send_dma_memory != NULL ) {
gm_dma_free( ptl->gm_port, ptl->gm_send_dma_memory );
ptl->gm_send_dma_memory = NULL;
}
/* based on the fact that the port 0 is reserved, we can use ZERO
* to mark a port as unused.
*/
if( 0 != ptl->gm_port ) gm_close( ptl->gm_port );
ptl->gm_port = 0;
if( ptl->gm_recv_dma_memory != NULL ) {
gm_dma_free( ptl->gm_port, ptl->gm_recv_dma_memory );
ptl->gm_recv_dma_memory = NULL;
}
/* Now close the port if one is open */
if( ptl->gm_port != NULL ) {
gm_close( ptl->gm_port );
ptl->gm_port = NULL;
}
/* And now release all internal ressources. */
OBJ_DESTRUCT( &(ptl->gm_send_frags) );
if( ptl->gm_send_fragments != NULL ) {
free( ptl->gm_send_fragments );
ptl->gm_send_fragments = NULL;
}
OBJ_DESTRUCT( &(ptl->gm_recv_frags_free) );
if( ptl->gm_recv_fragments != NULL ) {
free( ptl->gm_recv_fragments );
ptl->gm_recv_fragments = NULL;
}
/* These are supposed to be empty by now */
OBJ_DESTRUCT( &(ptl->gm_send_frags_queue) );
OBJ_DESTRUCT( &(ptl->gm_pending_acks) );
OBJ_DESTRUCT( &(ptl->gm_recv_outstanding_queue) );
/* And finally release the PTL itself */
OMPI_OUTPUT((0, "GM ptl %p succesfully finalized\n", (void*)ptl));
free( ptl );
return OMPI_SUCCESS;
@ -224,21 +265,21 @@ mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl,
gm_ptl = (mca_ptl_gm_module_t *)ptl;
sendfrag = mca_ptl_gm_alloc_send_frag( gm_ptl, sendreq );
if (NULL == sendfrag) {
ompi_output(0,"[%s:%d] Unable to allocate a gm send frag\n",
__FILE__, __LINE__);
ompi_output( 0,"[%s:%d] Unable to allocate a gm send frag\n",
__FILE__, __LINE__ );
return OMPI_ERR_OUT_OF_RESOURCE;
}
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag = sendfrag;
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags;
rc = mca_ptl_gm_send_frag_init( sendfrag, (mca_ptl_gm_peer_t*)ptl_peer, sendreq, offset, &size, flags );
/* initiate the send */
gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer;
rc = mca_ptl_gm_send_frag_init( sendfrag, gm_ptl_peer, sendreq, offset, &size, flags );
rc = mca_ptl_gm_peer_send( gm_ptl_peer, sendfrag, sendreq, offset, &size, flags );
ompi_atomic_sub( &(gm_ptl->num_send_tokens), 1 );
assert( gm_ptl->num_send_tokens >= 0 );
sendreq->req_offset += size;
return OMPI_SUCCESS;
}
@ -388,17 +429,16 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
unsigned int max_data, out_size;
int freeAfter;
proc = ompi_comm_peer_lookup(request->req_base.req_comm,
request->req_base.req_peer);
proc = ompi_comm_peer_lookup( request->req_base.req_comm,
header->hdr_match.hdr_src );
ompi_convertor_copy(proc->proc_convertor,
&frag->frag_base.frag_convertor);
ompi_convertor_init_for_recv(
&frag->frag_base.frag_convertor,
0,
request->req_base.req_datatype,
request->req_base.req_count,
request->req_base.req_addr,
header->hdr_frag.hdr_frag_offset, NULL );
ompi_convertor_init_for_recv( &frag->frag_base.frag_convertor,
0,
request->req_base.req_datatype,
request->req_base.req_count,
request->req_base.req_addr,
header->hdr_frag.hdr_frag_offset, NULL );
out_size = 1;
max_data = iov.iov_len;
rc = ompi_convertor_unpack( &frag->frag_base.frag_convertor, &(iov),
@ -418,10 +458,6 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
}
/* return to free list */
gm_ptl = (mca_ptl_gm_module_t *)ptl;
OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_recv_frags_free),
(ompi_list_item_t*)((mca_ptl_gm_recv_frag_t*)frag) );
}

Просмотреть файл

@ -40,6 +40,7 @@ struct mca_ptl_gm_component_t {
int gm_max_boards_number; /**< maximum number of boards on the node */
int gm_max_rdma_frag_size; /**< maximum fragment size used to transfer data over RDMA */
char* gm_port_name; /**< the name used to get the port */
struct mca_ptl_gm_proc_t* gm_local;
ompi_list_t gm_procs;
ompi_list_t gm_send_req;
@ -63,6 +64,10 @@ struct mca_ptl_gm_module_t {
unsigned int num_recv_tokens;
unsigned int max_send_tokens;
unsigned int max_recv_tokens;
void* gm_send_dma_memory; /**< pointer to the send DMA registered memory attached to the PTL */
void* gm_recv_dma_memory; /**< pointer to the recv DMA registered memory attached to the PTL */
struct mca_ptl_gm_send_frag_t* gm_send_fragments;
struct mca_ptl_gm_recv_frag_t* gm_recv_fragments;
ompi_free_list_t gm_send_frags;
ompi_free_list_t gm_recv_frags_free;

Просмотреть файл

@ -40,8 +40,6 @@ mca_ptl_gm_component_t mca_ptl_gm_component = {
}
};
static bool mca_ptl_gm_component_initialized = false;
/*
* utility routines for parameter registration
*/
@ -118,6 +116,16 @@ mca_ptl_gm_component_open(void)
*/
int mca_ptl_gm_component_close (void)
{
uint32_t index;
mca_ptl_base_module_t* ptl;
for( index = 0; index < mca_ptl_gm_component.gm_num_ptl_modules; index++ ) {
ptl = (mca_ptl_base_module_t*)mca_ptl_gm_component.gm_ptl_modules[index];
if( NULL != ptl )
ptl->ptl_finalize( ptl );
}
mca_ptl_gm_component.gm_num_ptl_modules = 0;
if (NULL != mca_ptl_gm_component.gm_ptl_modules)
free (mca_ptl_gm_component.gm_ptl_modules);
@ -144,7 +152,7 @@ mca_ptl_gm_create( mca_ptl_gm_module_t** pptl )
}
/* copy the basic informations in the new PTL */
memcpy (ptl, &mca_ptl_gm_module, sizeof (mca_ptl_gm_module));
memcpy (ptl, &mca_ptl_gm_module, sizeof(mca_ptl_gm_module_t) );
#if OMPI_HAVE_POSIX_THREADS
ptl->thread.t_handle = (pthread_t)-1;
#endif /* OMPI_HAVE_POSIX_THREADS */
@ -260,13 +268,14 @@ static inline int
mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
{
uint32_t i;
void *gm_send_reg_memory , *gm_recv_reg_memory;
mca_ptl_gm_send_frag_t *sfragment;
mca_ptl_gm_recv_frag_t *free_rfragment;
ptl->num_send_tokens = gm_num_send_tokens (ptl->gm_port);
ptl->max_send_tokens = ptl->num_send_tokens;
ptl->num_send_tokens -= PTL_GM_ADMIN_SEND_TOKENS;
ptl->num_recv_tokens = gm_num_receive_tokens (ptl->gm_port);
ptl->max_recv_tokens = ptl->num_recv_tokens;
ptl->num_recv_tokens -= PTL_GM_ADMIN_RECV_TOKENS;
/****************SEND****************************/
@ -281,19 +290,20 @@ mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
/* allocate the elements */
sfragment = (mca_ptl_gm_send_frag_t *)calloc( ptl->num_send_tokens, sizeof(mca_ptl_gm_send_frag_t) );
ptl->gm_send_fragments = sfragment;
/* allocate the registered memory */
gm_send_reg_memory = gm_dma_malloc( ptl->gm_port,
(GM_BUF_SIZE * ptl->num_send_tokens) + GM_PAGE_LEN );
if( NULL == gm_send_reg_memory ) {
ptl->gm_send_dma_memory = gm_dma_malloc( ptl->gm_port,
(GM_BUF_SIZE * ptl->num_send_tokens) + GM_PAGE_LEN );
if( NULL == ptl->gm_send_dma_memory ) {
ompi_output( 0, "unable to allocate registered memory\n" );
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i = 0; i < ptl->num_send_tokens; i++) {
OMPI_FREE_LIST_RETURN( &(ptl->gm_send_frags), (ompi_list_item_t *)sfragment );
sfragment->send_buf = gm_send_reg_memory;
printf( "%3d : gm register sendreq %p with GM buffer %p\n", i, (void*)sfragment, (void*)sfragment->send_buf );
gm_send_reg_memory = ((char *)gm_send_reg_memory) + GM_BUF_SIZE;
sfragment->send_buf = ptl->gm_send_dma_memory;
DO_DEBUG( printf( "%3d : gm register sendreq %p with GM buffer %p\n", i,
(void*)sfragment, (void*)sfragment->send_buf ) );
ptl->gm_send_dma_memory = ((char *)ptl->gm_send_dma_memory) + GM_BUF_SIZE;
sfragment++;
}
A_PRINT( ("recv_tokens = %d send_tokens = %d, allocted free lis = %d\n",
@ -311,19 +321,20 @@ mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
/* construct the list of recv fragments free */
OBJ_CONSTRUCT (&(ptl->gm_recv_frags_free), ompi_free_list_t);
ompi_free_list_init (&(ptl->gm_recv_frags_free),
ompi_free_list_init( &(ptl->gm_recv_frags_free),
sizeof (mca_ptl_gm_recv_frag_t),
OBJ_CLASS (mca_ptl_gm_recv_frag_t),
ptl->num_recv_tokens,ptl->num_recv_tokens, 1, NULL);
ptl->num_recv_tokens,ptl->num_recv_tokens, 1, NULL );
/*allocate the elements */
/* allocate the elements */
free_rfragment = (mca_ptl_gm_recv_frag_t *)
calloc( ptl->num_recv_tokens, sizeof(mca_ptl_gm_recv_frag_t) );
ptl->gm_recv_fragments = free_rfragment;
/*allocate the registered memory */
gm_recv_reg_memory =
gm_dma_malloc (ptl->gm_port, (GM_BUF_SIZE * ptl->num_recv_tokens) + GM_PAGE_LEN );
if( NULL == gm_recv_reg_memory ) {
ptl->gm_recv_dma_memory =
gm_dma_malloc( ptl->gm_port, (GM_BUF_SIZE * ptl->num_recv_tokens) + GM_PAGE_LEN );
if( NULL == ptl->gm_recv_dma_memory ) {
ompi_output( 0, "unable to allocate registered memory for receive\n" );
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -332,12 +343,14 @@ mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
OMPI_FREE_LIST_RETURN( &(ptl->gm_recv_frags_free), (ompi_list_item_t *)free_rfragment );
free_rfragment++;
gm_provide_receive_buffer( ptl->gm_port, gm_recv_reg_memory,
gm_provide_receive_buffer( ptl->gm_port, ptl->gm_recv_dma_memory,
GM_SIZE, GM_LOW_PRIORITY );
printf( "%3d : gm register GM receive buffer %p\n", i, (void*)gm_recv_reg_memory );
gm_recv_reg_memory = ((char *)gm_recv_reg_memory) + GM_BUF_SIZE;
DO_DEBUG(printf( "%3d : gm register GM receive buffer %p\n", i, (void*)ptl->gm_recv_dma_memory ) );
ptl->gm_recv_dma_memory = ((char *)ptl->gm_recv_dma_memory) + GM_BUF_SIZE;
}
OBJ_CONSTRUCT( &(ptl->gm_pending_acks), ompi_list_t );
return OMPI_SUCCESS;
}
@ -438,6 +451,7 @@ mca_ptl_gm_component_init (int *num_ptl_modules,
memcpy (ptls, mca_ptl_gm_component.gm_ptl_modules,
mca_ptl_gm_component.gm_num_ptl_modules * sizeof(mca_ptl_gm_module_t *));
*num_ptl_modules = mca_ptl_gm_component.gm_num_ptl_modules;
return ptls;
}
@ -455,16 +469,21 @@ mca_ptl_gm_component_control (int param, void *value, size_t size)
/*
* GM module progress.
*/
int
mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp)
{
int rc;
/* XXX: Do all the following inside a dispatcher, either in this routine
* or mca_ptl_gm_incoming_recv()
* i) check the send queue to see if any pending send can proceed
* ii) check for recieve and , call ptl_match to send it to the upper level
* BTW, ptl_matched is invoked inside ptl_match() via PML.
*/
rc = mca_ptl_gm_incoming_recv(&mca_ptl_gm_component);
return OMPI_SUCCESS;
uint32_t i;
gm_recv_event_t *event;
mca_ptl_gm_module_t *ptl;
for( i = 0; i < mca_ptl_gm_component.gm_num_ptl_modules; i++) {
ptl = mca_ptl_gm_component.gm_ptl_modules[i];
event = gm_receive(ptl->gm_port);
/* If there are no receive events just skip the function call */
if( GM_NO_RECV_EVENT != gm_ntohc(event->recv.type) ) {
mca_ptl_gm_analyze_recv_event( ptl, event );
}
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -47,10 +47,10 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
struct iovec outvec;
size_t size_in,size_out;
int header_length;
mca_ptl_base_frag_header_t* header;
mca_ptl_base_header_t* header;
header = (mca_ptl_base_frag_header_t*)fragment->send_buf;
header_length = header->hdr_common.hdr_size;
header = (mca_ptl_base_header_t*)fragment->send_buf;
header_length = sizeof(mca_ptl_base_match_header_t);
A_PRINT("peer send (could be ack) : headerlen is %d \n", header_length);
size_in = *size;
@ -96,7 +96,7 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
if( (header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FRAG) ||
(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_MATCH) )
header->hdr_frag_length = outvec.iov_len;
header->hdr_match.hdr_msg_length = outvec.iov_len;
/* adjust size and request offset to reflect actual number of bytes
* packed by convertor
@ -107,12 +107,8 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
sendreq, fragment->req, fragment, size_out,
((mca_ptl_base_header_t *)header)->hdr_ack.hdr_src_ptr);
printf( "send pointer %p SIZE %d length %d \n", (void*)fragment->send_buf, GM_BUF_SIZE, size_out );
/* initiate the gm send */
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf,
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
ptl_peer->port_number, send_callback, (void *)fragment );
DO_DEBUG( printf( "send pointer %p SIZE %d length %lu\n",
(void*)fragment->send_buf, GM_BUF_SIZE, size_out ) );
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
@ -120,6 +116,11 @@ int mca_ptl_gm_peer_send( mca_ptl_gm_peer_t *ptl_peer,
fragment->send_frag.frag_base.frag_size = size_out - header_length;
fragment->send_frag.frag_request = sendreq;
/* initiate the gm send */
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf,
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
ptl_peer->port_number, send_callback, (void *)fragment );
*size = (size_out - header_length);
A_PRINT("inside peer send : bytes sent is %d\n",*size);
return OMPI_SUCCESS;
@ -187,7 +188,6 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
{
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *frag;
int header_length;
mca_pml_base_send_request_t *gm_send_req;
mca_ptl_base_header_t* header;
@ -197,18 +197,18 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
gm_send_req = frag->send_frag.frag_request;
header = (mca_ptl_base_header_t*)frag->send_buf;
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
switch (status) {
case GM_SUCCESS:
DO_DEBUG( printf( "send_callback for data ptr %p\n", (void*)frag->send_buf ) );
ompi_atomic_add( &(ptl->num_send_tokens), 1 );
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) {
A_PRINT("send callback: Completion of send_ack, sent frag is %p\n", frag);
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
} else if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) {
A_PRINT("send callback : Completion of fin, bytes complete =\
%d\n",header->hdr_ack.hdr_dst_size);
A_PRINT("send callback : Completion of fin, bytes complete = %d\n",
header->hdr_ack.hdr_dst_size);
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
header->hdr_ack.hdr_dst_size);
@ -251,7 +251,7 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
mca_ptl_gm_send_frag_t * frag;
mca_pml_base_send_request_t *req;
mca_pml_base_recv_request_t *request;
int header_length, bytes;
int bytes;
char * reg_buf;
int status;
@ -267,7 +267,6 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
req->req_peer_size = header->hdr_ack.hdr_dst_size;
frag->wait_for_ack = 0;
header_length = frag->send_frag.frag_base.frag_header.hdr_frag.hdr_common.hdr_size;
bytes = frag->send_frag.frag_base.frag_size;
if(frag->send_complete == 1) {
@ -284,7 +283,8 @@ static void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_heade
header->hdr_ack.hdr_dst_match.pval;
/* call receive progress and indicate the recv has been completed */
GM_DBG(PTL_GM_DBG_COMM,"Calling recv_progress with bytes = %d\n",header->hdr_ack.hdr_dst_size);
GM_DBG( PTL_GM_DBG_COMM,"Calling recv_progress with bytes = %ld\n",
(long)header->hdr_ack.hdr_dst_size );
ptl->super.ptl_recv_progress (
(mca_ptl_base_module_t *) ptl,
request ,
@ -329,16 +329,10 @@ mca_ptl_gm_recv_frag_match( struct mca_ptl_gm_module_t *ptl,
recv_frag->frag_progressed = 0;
recv_frag->frag_recv.frag_base.frag_header = *header;
#if 0
recv_frag->frag_recv.frag_base.frag_addr = header;
/* + sizeof(mca_ptl_base_header_t);*/ /* XXX: bug */
recv_frag->frag_recv.frag_base.frag_size = gm_ntohl(event->recv.length);
#endif
#if 1
recv_frag->frag_recv.frag_base.frag_addr =
(char *) header + sizeof (mca_ptl_base_header_t);
(char *) header + sizeof(mca_ptl_base_match_header_t);
recv_frag->frag_recv.frag_base.frag_size = header->hdr_frag.hdr_frag_length;
#endif
recv_frag->matched = false;
recv_frag->have_allocated_buffer = false;
@ -381,16 +375,9 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
recv_frag->frag_progressed = 0;
recv_frag->frag_recv.frag_base.frag_header = *header;
#if 0
recv_frag->frag_recv.frag_base.frag_addr = header;
/* + sizeof(mca_ptl_base_header_t);*/ /* XXX: bug */
recv_frag->frag_recv.frag_base.frag_size = gm_ntohl(event->recv.length);
#endif
#if 1
recv_frag->frag_recv.frag_base.frag_addr =
(char *) header + sizeof (mca_ptl_base_header_t);
(char *) header + sizeof(mca_ptl_base_frag_header_t);
recv_frag->frag_recv.frag_base.frag_size = header->hdr_frag.hdr_frag_length;
#endif
recv_frag->matched = false;
recv_frag->have_allocated_buffer = false;
@ -409,6 +396,38 @@ mca_ptl_gm_recv_frag_frag( struct mca_ptl_gm_module_t *ptl,
return recv_frag;
}
void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl )
{
mca_ptl_gm_recv_frag_t * frag = NULL;
int size;
bool matched;
size = ompi_list_get_size (&ptl->gm_recv_outstanding_queue);
if (size > 0) {
frag = (mca_ptl_gm_recv_frag_t *)
ompi_list_remove_first( (ompi_list_t *)&(ptl->gm_recv_outstanding_queue) );
GM_DBG(PTL_GM_DBG_COMM," the frag size to be matched is %ld\n",frag->frag_recv.frag_base.frag_size);
matched = ptl->super.ptl_match( &(ptl->super),
&(frag->frag_recv),
&(frag->frag_recv.frag_base.frag_header.hdr_match) );
GM_DBG(PTL_GM_DBG_COMM,"the value of matched is %d\n", matched);
if(!matched) {
ompi_list_append((ompi_list_t *)&(ptl->gm_recv_outstanding_queue),
(ompi_list_item_t *) frag);
} else {
/* if allocated buffer, free the buffer */
/* return the recv descriptor to the free list */
OMPI_FREE_LIST_RETURN(&(ptl->gm_recv_frags_free), (ompi_list_item_t *)frag);
}
}
}
static inline
mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( struct mca_ptl_gm_module_t *ptl, gm_recv_event_t* event )
{
mca_ptl_gm_recv_frag_t* frag = NULL;
@ -437,37 +456,6 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( struct mca_ptl_gm_module_t *ptl, gm_
return frag;
}
void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl )
{
mca_ptl_gm_recv_frag_t * frag = NULL;
int size;
bool matched;
size = ompi_list_get_size (&ptl->gm_recv_outstanding_queue);
if (size > 0) {
frag = (mca_ptl_gm_recv_frag_t *)
ompi_list_remove_first( (ompi_list_t *)&(ptl->gm_recv_outstanding_queue) );
GM_DBG(PTL_GM_DBG_COMM," the frag size to be matched is %d\n",frag->frag_recv.frag_base.frag_size);
matched = ptl->super.ptl_match( &(ptl->super),
&(frag->frag_recv),
&(frag->frag_recv.frag_base.frag_header.hdr_match) );
GM_DBG(PTL_GM_DBG_COMM,"the value of matched is %d\n", matched);
if(!matched) {
ompi_list_append((ompi_list_t *)&(ptl->gm_recv_outstanding_queue),
(ompi_list_item_t *) frag);
} else {
/* if allocated buffer, free the buffer */
/* return the recv descriptor to the free list */
OMPI_FREE_LIST_RETURN(&(ptl->gm_recv_frags_free), (ompi_list_item_t *)frag);
}
}
}
int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_event_t* event )
{
void * mesg;
@ -507,25 +495,3 @@ int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_even
return 0;
}
/* Parse all the interfaces and check for a message. This function is used in the case
* where OpenMPI does not use a thread for each PTL. The real analisys of the message
* is done in a common place for all possibilities (threaded or not) in the function
* mca_ptl_gm_analyze_recv_event.
*/
int mca_ptl_gm_incoming_recv( struct mca_ptl_gm_component_t * gm_comp )
{
uint32_t i;
gm_recv_event_t *event;
mca_ptl_gm_module_t *ptl;
for( i = 0; i< gm_comp->gm_num_ptl_modules; i++) {
ptl = gm_comp->gm_ptl_modules[i];
event = gm_receive(ptl->gm_port);
/* If there are no receive events just skip the function call */
if( GM_NO_RECV_EVENT != gm_ntohc(event->recv.type) ) {
mca_ptl_gm_analyze_recv_event( ptl, event );
}
}
return 0;
}

Просмотреть файл

@ -12,6 +12,9 @@ struct mca_ptl_gm_peer_t;
#define PTL_GM_DBG_FLAG (PTL_GM_DBG_NONE)
/*#define DO_DEBUG(inst) inst*/
#define DO_DEBUG(inst)
#define GM_DBG(flag, args...) \
do { \
if (PTL_GM_DBG_FLAG & flag) { \
@ -31,13 +34,8 @@ do { \
#define A_PRINT(fmt, args...)
#endif
struct mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( struct mca_ptl_gm_module_t *ptl,
gm_recv_event_t* event );
int mca_ptl_gm_analyze_recv_event( struct mca_ptl_gm_module_t* ptl, gm_recv_event_t* event );
int mca_ptl_gm_incoming_recv( struct mca_ptl_gm_component_t * gm_comp );
void mca_ptl_gm_outstanding_recv( struct mca_ptl_gm_module_t *ptl);
int

Просмотреть файл

@ -81,10 +81,10 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
char * buffer,
int size )
{
mca_ptl_base_header_t * hdr;
mca_ptl_base_ack_header_t * hdr;
mca_pml_base_recv_request_t *request;
hdr = (mca_ptl_base_header_t *)ack->send_buf;
hdr = (mca_ptl_base_ack_header_t*)ack->send_buf;
ack->status = -1;
ack->type = -1;
@ -96,19 +96,18 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
hdr->hdr_ack.hdr_src_ptr.pval =
hdr->hdr_src_ptr.pval =
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr.pval;
A_PRINT("inside ack init: the src frag ptr is %p,hdr_len is %u\n",
hdr->hdr_ack.hdr_src_ptr,hdr->hdr_common.hdr_size);
A_PRINT( "inside ack init: the src frag ptr is %p,hdr_len is %u\n",
hdr->hdr_src_ptr, hdr->hdr_common.hdr_size );
hdr->hdr_ack.hdr_dst_match.lval = 0;
hdr->hdr_ack.hdr_dst_match.pval = request; /*should this be dst_match */
hdr->hdr_ack.hdr_dst_addr.lval = 0; /*we are filling both p and val of dest address */
hdr->hdr_ack.hdr_dst_addr.pval = (void *)buffer;
hdr->hdr_ack.hdr_dst_size = size;
hdr->hdr_dst_match.lval = 0;
hdr->hdr_dst_match.pval = request; /*should this be dst_match */
hdr->hdr_dst_addr.lval = 0; /*we are filling both p and val of dest address */
hdr->hdr_dst_addr.pval = (void *)buffer;
hdr->hdr_dst_size = size;
ack->send_frag.frag_request = 0;
ack->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer;
@ -117,7 +116,7 @@ int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
ack->send_frag.frag_base.frag_size = 0;
ack->status = 1; /* was able to register memory */
ack->ptl = ptl;
ack->send_frag.frag_base.frag_header = *hdr;
ack->send_frag.frag_base.frag_header.hdr_ack = *hdr;
ack->wait_for_ack = 0;
ack->type = ACK;
@ -145,7 +144,6 @@ int mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* putfrag,
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
hdr->hdr_ack.hdr_dst_match.lval = 0;
hdr->hdr_ack.hdr_dst_match.pval = request->req_peer_match.pval;

Просмотреть файл

@ -21,116 +21,113 @@
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
OBJ_CLASS_DECLARATION (mca_ptl_gm_send_frag_t);
OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t);
OBJ_CLASS_DECLARATION (mca_ptl_gm_send_frag_t);
OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t);
/*struct mca_ptl_base_peer_t;*/
/*struct mca_ptl_base_peer_t;*/
/**
* GM send fragment derived type.
*/
struct mca_ptl_gm_send_frag_t {
mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */
void * send_buf;
void * registered_buf;
struct mca_pml_base_send_request_t *req;
struct mca_ptl_gm_module_t *ptl;
struct mca_ptl_gm_peer_t *peer;
/**
* GM send fragment derived type.
*/
struct mca_ptl_gm_send_frag_t {
mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */
void * send_buf;
void * registered_buf;
struct mca_pml_base_send_request_t *req;
struct mca_ptl_gm_module_t *ptl;
struct mca_ptl_gm_peer_t *peer;
int status;
int type;
int wait_for_ack;
int put_sent;
int send_complete;
};
typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t;
int status;
int type;
int wait_for_ack;
int put_sent;
int send_complete;
};
typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t;
struct mca_ptl_gm_recv_frag_t {
mca_ptl_base_recv_frag_t frag_recv;
size_t frag_hdr_cnt;
size_t frag_msg_cnt;
volatile int frag_progressed;
bool frag_ack_pending;
void *alloc_recv_buffer;
void *unex_recv_buffer;
void * registered_buf;
struct mca_ptl_gm_module_t *ptl;
bool matched;
bool have_allocated_buffer;
};
typedef struct mca_ptl_gm_recv_frag_t mca_ptl_gm_recv_frag_t;
struct mca_ptl_gm_recv_frag_t {
mca_ptl_base_recv_frag_t frag_recv;
size_t frag_hdr_cnt;
size_t frag_msg_cnt;
volatile int frag_progressed;
bool frag_ack_pending;
void *alloc_recv_buffer;
void *unex_recv_buffer;
void * registered_buf;
struct mca_ptl_gm_module_t *ptl;
bool matched;
bool have_allocated_buffer;
};
typedef struct mca_ptl_gm_recv_frag_t mca_ptl_gm_recv_frag_t;
mca_ptl_gm_send_frag_t *
mca_ptl_gm_alloc_send_frag ( struct mca_ptl_gm_module_t* ptl,
struct mca_pml_base_send_request_t* sendreq );
mca_ptl_gm_send_frag_t *
mca_ptl_gm_alloc_send_frag ( struct mca_ptl_gm_module_t* ptl,
struct mca_pml_base_send_request_t* sendreq );
int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
struct mca_ptl_gm_module_t *ptl,
struct mca_ptl_gm_peer_t* ptl_peer,
struct mca_ptl_gm_recv_frag_t* frag,
char * buffer,
int size );
int mca_ptl_gm_send_ack_init( struct mca_ptl_gm_send_frag_t* ack,
struct mca_ptl_gm_module_t *ptl,
struct mca_ptl_gm_peer_t* ptl_peer,
struct mca_ptl_gm_recv_frag_t* frag,
char * buffer,
int size );
int
mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* sendfrag,
struct mca_ptl_gm_peer_t * ptl_peer,
struct mca_ptl_gm_module_t *ptl,
struct mca_pml_base_send_request_t * sendreq,
size_t offset,
size_t* size,
int flags );
int
mca_ptl_gm_put_frag_init( struct mca_ptl_gm_send_frag_t* sendfrag,
struct mca_ptl_gm_peer_t * ptl_peer,
struct mca_ptl_gm_module_t *ptl,
struct mca_pml_base_send_request_t * sendreq,
size_t offset,
size_t* size,
int flags );
static inline int
mca_ptl_gm_send_frag_init( struct mca_ptl_gm_send_frag_t* sendfrag,
struct mca_ptl_gm_peer_t * ptl_peer,
struct mca_pml_base_send_request_t * sendreq,
size_t offset,
size_t* size,
int flags )
static inline int
mca_ptl_gm_send_frag_init( struct mca_ptl_gm_send_frag_t* sendfrag,
struct mca_ptl_gm_peer_t * ptl_peer,
struct mca_pml_base_send_request_t * sendreq,
size_t offset,
size_t* size,
int flags )
{
mca_ptl_base_header_t *hdr = (mca_ptl_base_header_t *)sendfrag->send_buf;
{
mca_ptl_base_header_t *hdr = (mca_ptl_base_header_t *)sendfrag->send_buf;
sendfrag->status = -1;
sendfrag->type = -1;
sendfrag->wait_for_ack = 0;
sendfrag->put_sent = -1;
sendfrag->send_complete = -1;
sendfrag->status = -1;
sendfrag->type = -1;
sendfrag->wait_for_ack = 0;
sendfrag->put_sent = -1;
sendfrag->send_complete = -1;
hdr->hdr_common.hdr_flags = flags;
if (offset == 0) {
/* When the offset is ZERO we send the match header. */
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_common.hdr_flags = flags;
if (offset == 0) {
/* When the offset is ZERO we send the match header. */
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
sendfrag->type = MATCH;
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_frag_header_t);
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
sendfrag->type = MATCH;
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
sendfrag->type = FRAG;
sendfrag->type = FRAG;
}
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_length = *size;
hdr->hdr_frag.hdr_src_ptr.lval = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* pointer to the frag */
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
return OMPI_SUCCESS;
}
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_length = *size;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* pointer to the frag */
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
return OMPI_SUCCESS;
}
int mca_ptl_gm_send_frag_done( struct mca_ptl_gm_send_frag_t* frag,
struct mca_pml_base_send_request_t* req);
int mca_ptl_gm_send_frag_done( struct mca_ptl_gm_send_frag_t* frag,
struct mca_pml_base_send_request_t* req);
mca_ptl_gm_recv_frag_t *
mca_ptl_gm_alloc_recv_frag( struct mca_ptl_base_module_t *ptl );
mca_ptl_gm_recv_frag_t *
mca_ptl_gm_alloc_recv_frag( struct mca_ptl_base_module_t *ptl );
#if defined(c_plusplus) || defined(__cplusplus)
}