1
1

resolved some flow control issues

This commit was SVN r2922.
Этот коммит содержится в:
Gopal Santhanaraman 2004-10-04 18:21:44 +00:00
родитель 0dc4f4bb8d
Коммит a81d025b09
7 изменённых файлов: 382 добавлений и 191 удалений

Просмотреть файл

@ -84,9 +84,6 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl,
}
/* TODO: make this extensible to multiple nics */
/* XXX: */
/* FIXME: */
for (j=0; j < num_peer_ptls; j++) {
ptl_peer = OBJ_NEW (mca_ptl_gm_peer_t);
if (NULL == ptl_peer) {
@ -120,6 +117,8 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl,
return OMPI_SUCCESS;
}
/*
*
*/
@ -136,6 +135,8 @@ mca_ptl_gm_del_procs (struct mca_ptl_base_module_t *ptl,
return OMPI_SUCCESS;
}
/*
*
*/
@ -146,16 +147,16 @@ mca_ptl_gm_finalize (struct mca_ptl_base_module_t *ptl)
return OMPI_SUCCESS;
}
int
mca_ptl_gm_request_init(struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t *request)
{
#if 0
mca_ptl_gm_send_frag_t *frag;
struct mca_ptl_gm_send_request_t *req;
GM_DBG(PTL_GM_DBG_COMM,
"INSIDE REQUEST INIT: the request is %p\n",request);
frag = mca_ptl_gm_alloc_send_frag(ptl, request);
if (NULL == frag)
@ -170,10 +171,14 @@ mca_ptl_gm_request_init(struct mca_ptl_base_module_t *ptl,
frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/
frag->ptl = (mca_ptl_gm_module_t*)ptl;
}
return OMPI_SUCCESS;
#endif
return OMPI_ERROR;
}
/*
*
*/
@ -181,12 +186,17 @@ void
mca_ptl_gm_request_fini (struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t *request)
{
#if 0
mca_ptl_gm_send_frag_t *frag;
frag = ((mca_ptl_gm_send_request_t *)request)->req_frag;
OMPI_FREE_LIST_RETURN(&(((mca_ptl_gm_module_t *)ptl)->gm_send_frags),
(ompi_list_item_t *)frag);
frag->status = 0;/*XXX: MCA_PTL_GM_FRAG_LOCAL; */
frag->status = 0;
#endif
A_PRINT("entering request fini\n");
OBJ_DESTRUCT(request+1);
}
@ -205,25 +215,36 @@ mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl,
GM_DBG(PTL_GM_DBG_COMM,"INSIDE PTL GM SEND\n");
gm_ptl = (mca_ptl_gm_module_t *)ptl;
if (offset == 0) {
GM_DBG(PTL_GM_DBG_COMM,"OFFSET = 0\n");
sendfrag = ((mca_ptl_gm_send_request_t *)sendreq)->req_frag;
sendfrag->req = sendreq;
assert(sendreq != NULL);
} else {
if (offset == 0)
{
GM_DBG(PTL_GM_DBG_COMM,"INSIDE PTL GM SEND, OFFSET = 0,request is %p
frag is %p\n",sendreq,((mca_ptl_gm_send_request_t *)sendreq)->req_frag);
#if 0
sendfrag = ((mca_ptl_gm_send_request_t *)sendreq)->req_frag;
sendfrag->req = sendreq;
#endif
sendfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq);
if (NULL == sendfrag) {
ompi_output(0,"[%s:%d] Unable to allocate a gm send frag\n",
assert(sendreq != NULL);
}
else
{
sendfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq);
if (NULL == sendfrag)
{
ompi_output(0,"[%s:%d] Unable to allocate a gm send frag\n",
__FILE__, __LINE__);
return 0; /*XXX: return error */
return 0; /*XXX: return error */
}
}
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =sendfrag;
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags;
rc = mca_ptl_gm_send_frag_init (sendfrag,
(mca_ptl_gm_peer_t*)ptl_peer,
sendreq, offset, &size, flags);
A_PRINT(" INSIDE PTL GM SEND, OFFSET = 0,request is %p frag is %p\n",sendreq,((mca_ptl_gm_send_request_t *)sendreq)->req_frag);
rc = mca_ptl_gm_send_frag_init (sendfrag, (mca_ptl_gm_peer_t*)ptl_peer, sendreq, offset, &size, flags);
/*initiate the send */
gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer;
@ -231,11 +252,11 @@ mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl,
offset,&size,flags);
gm_ptl->num_send_tokens--;
/*Update offset */
sendreq->req_offset += size; /* XXX: should be what convertor packs */
sendreq->req_offset += size;
return OMPI_SUCCESS;
}
/*
* Initiate a put
*/
@ -256,7 +277,6 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
gm_ptl= (mca_ptl_gm_module_t *)ptl;
buffer_ptr = ((char *) (sendreq->req_base.req_addr)) + offset ;
bytes_reg = size;
destination_buffer =(void *)( (sendreq->req_peer_addr).pval);
/* register the user buffer */
@ -268,26 +288,37 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
}
putfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq); /*alloc_put_frag */
A_PRINT(" INSIDE PTL PUT,request is %p frag is %p\n",sendreq,putfrag);
putfrag->registered_buf = (void *)buffer_ptr;
putfrag->peer = (mca_ptl_gm_peer_t *)ptl_peer;
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =putfrag;
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags;
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =putfrag;
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags;
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = 0;
rc = mca_ptl_gm_put_frag_init(putfrag ,
(mca_ptl_gm_peer_t*)ptl_peer,gm_ptl,
sendreq, offset, &size, flags);
/* check that we have a send token available */
rc = mca_ptl_gm_peer_put((mca_ptl_gm_peer_t *)ptl_peer, putfrag,
sendreq, offset, &size, flags,
destination_buffer, bytes_reg);
gm_ptl->num_send_tokens--;
sendreq->req_offset += size; /* should be what is returned by put */
sendreq->req_offset += size;
#if 1
rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,sendreq,
offset,&size,flags);
assert(rc == 0);
A_PRINT("after issuing the put completion(fin) for the request");
#endif
return OMPI_SUCCESS;
}
/*
* initiate a get.
*/
@ -301,6 +332,8 @@ mca_ptl_gm_get (struct mca_ptl_base_module_t *ptl,
return OMPI_SUCCESS;
}
/* A posted receive has been matched - if required send an
* ack back to the peer and process the fragment.
*/
@ -322,16 +355,16 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
header = &frag->frag_base.frag_header;
request = frag->frag_request;
A_PRINT("inside match, the matched request is %p\n", request);
gm_ptl = (mca_ptl_gm_module_t *)ptl;
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
/* might need to send an ack back */
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED)
{
/* need to send an ack back */
recv_frag = (mca_ptl_gm_recv_frag_t *) frag;
ack = mca_ptl_gm_alloc_send_frag(ptl,NULL);
/*associate null request for ack*/
A_PRINT("the recv_frag inside matched is %p\n",recv_frag);
ack = mca_ptl_gm_alloc_send_frag(ptl,NULL);
if (NULL == ack) {
ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n",
__FILE__,__LINE__);
@ -341,51 +374,42 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
(ompi_list_item_t *) frag);
OMPI_THREAD_UNLOCK (&mca_ptl_gm_component.gm_lock);
}
else
else
{
buffer_ptr = (char *)( request->req_base.req_addr );
total_bytes = request->req_bytes_packed;
/*total_bytes = (request->req_base.req_datatype->size) **/
/*(request->req_base.req_count);*/
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
bytes_recv = frag->frag_base.frag_size;
bytes_reg = total_bytes - bytes_recv;
buffer_ptr += bytes_recv;
status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg);
recv_frag->registered_buf = buffer_ptr;
GM_DBG(PTL_GM_DBG_COMM,"Receiver: register addr: %p, bytes: %d\n",buffer_ptr,bytes_reg);
A_PRINT("Receiver: register addr: %p, bytes: %d\n",buffer_ptr,bytes_reg);
if(GM_SUCCESS != status) {
ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__);
}
/* send the registered memory information, send recv request * ptr */
rc1 = mca_ptl_gm_send_ack_init (ack, gm_ptl,
(mca_ptl_gm_peer_t *)(recv_frag->frag_recv.frag_base.frag_peer)
, recv_frag, buffer_ptr, bytes_reg);
rc1 = mca_ptl_gm_send_ack_init (ack, gm_ptl, (mca_ptl_gm_peer_t *)
(recv_frag->frag_recv.frag_base.frag_peer), recv_frag, buffer_ptr, bytes_reg);
/*XXX : put the registered memory in pin-down cache */
mca_ptl_gm_peer_send (
(mca_ptl_gm_peer_t *) (ack->send_frag.frag_base.frag_peer),
ack,srequest,0,&size,0 );
GM_DBG(PTL_GM_DBG_COMM,"RECEIVER FINISHED SENDING ACK\n");
/*TO DO : put the registered memory in pin-down cache */
mca_ptl_gm_peer_send ( (mca_ptl_gm_peer_t *) (ack->send_frag.frag_base.frag_peer),
ack,srequest,0,&size,0 );
gm_ptl->num_send_tokens--;
}
}
/* Here we expect that frag_addr is the begin of the buffer header included */
iov.iov_base = ((char*)frag->frag_base.frag_addr) + header->hdr_common.hdr_size;
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
iov.iov_base = ((char*)frag->frag_base.frag_addr);
bytes_recv = frag->frag_base.frag_size;
iov.iov_len = bytes_recv;
if (header->hdr_frag.hdr_frag_length > 0) {
if (header->hdr_frag.hdr_frag_length > 0)
{
ompi_proc_t *proc;
proc = ompi_comm_peer_lookup(request->req_base.req_comm,
request->req_base.req_peer);
ompi_convertor_copy(proc->proc_convertor,
&frag->frag_base.frag_convertor);
ompi_convertor_init_for_recv(
@ -397,21 +421,21 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
header->hdr_frag.hdr_frag_offset);
rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov), 1);
assert( rc >= 0 );
GM_DBG(PTL_GM_DBG_COMM,"in matched: bytes received is %d\n", bytes_recv);
/*memcpy(request->req_base.req_addr,iov.iov_base,bytes_recv);*/
A_PRINT("in matched: bytes received is %d\n", bytes_recv);
}
/*update progress*/
ptl->ptl_recv_progress( ptl, request, bytes_recv, iov.iov_len );
/* update progress*/
ptl->ptl_recv_progress( ptl, request, bytes_recv,bytes_recv);
/* Now update the status of the fragment */
((mca_ptl_gm_recv_frag_t*)frag)->matched = true;
/*if( ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer == true
* ){*/
/*free( frag->frag_base.frag_addr );*/
if( ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer == true )
{
free( recv_frag->frag_recv.frag_base.frag_addr);
((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer = false;
/*}*/
/*return to free list */
}
/* return to free list */
gm_ptl = (mca_ptl_gm_module_t *)ptl;
OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags_free),
(ompi_list_item_t*)((mca_ptl_gm_recv_frag_t*)frag));

Просмотреть файл

@ -26,7 +26,7 @@
#define PTL_GM_ADMIN_RECV_TOKENS 0
#define GM_SEND_BUF_SIZE 16384
#define GM_RECV_BUF_SIZE 16384
#define NUM_RECV_FRAGS 100
#define NUM_RECV_FRAGS 256
#define MCA_PTL_GM_FRAG_CACHED
/**
@ -48,10 +48,10 @@ struct mca_ptl_gm_component_t {
};
typedef struct mca_ptl_gm_component_t mca_ptl_gm_component_t;
extern mca_ptl_gm_component_t mca_ptl_gm_component;
/**
* GM PTL Interface
*/
@ -78,7 +78,6 @@ struct mca_ptl_gm_module_t {
};
typedef struct mca_ptl_gm_module_t mca_ptl_gm_module_t;
extern mca_ptl_gm_module_t mca_ptl_gm_module;
@ -142,6 +141,7 @@ extern int mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
size_t offset, size_t size, int flags);
/**
* GM get
*/
@ -186,6 +186,7 @@ extern int mca_ptl_gm_del_procs (struct mca_ptl_base_module_t *ptl,
struct ompi_proc_t **procs,
struct mca_ptl_base_peer_t **peers);
/**
* PML->PTL Allocate a send request from the PTL modules free list.
*

Просмотреть файл

@ -44,7 +44,7 @@ mca_ptl_gm_component_t mca_ptl_gm_component = {
,
/* Next the MCA v1.0.0 component meta data */
{
/* Whether the component is checkpointable or not */
/* Whether the component is checkpointable or not */
false
},
mca_ptl_gm_component_init,
@ -71,6 +71,8 @@ mca_ptl_gm_param_register_string (const char *param_name,
return param_value;
}
static inline int
mca_ptl_gm_param_register_int (const char *param_name, int default_value)
{
@ -82,6 +84,8 @@ mca_ptl_gm_param_register_int (const char *param_name, int default_value)
return param_value;
}
/*
*
*/
@ -92,6 +96,9 @@ ompi_mca_ptl_gm_finalize (mca_ptl_gm_module_t * gm)
return OMPI_SUCCESS;
}
/*
* Called by MCA framework to open the module, registers
* module parameters.
@ -126,6 +133,7 @@ mca_ptl_gm_component_open (void)
return OMPI_SUCCESS;
}
/*
* component close
*/
@ -174,6 +182,8 @@ mca_ptl_gm_create (int i)
return OMPI_SUCCESS;
}
/*
* Register GM component addressing information. The MCA framework
* will make this available to all peers.
@ -188,7 +198,7 @@ mca_ptl_gm_module_store_data_toexchange (void)
mca_ptl_gm_addr_t *addrs;
size = mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_gm_addr_t);
addrs = (mca_ptl_gm_addr_t *)malloc (size);/*XXX: check this out */
addrs = (mca_ptl_gm_addr_t *)malloc (size);
if (NULL == addrs) {
return OMPI_ERR_OUT_OF_RESOURCE;
@ -231,18 +241,19 @@ ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm)
/* open the first available gm port for this board */
board_no = i;
for (port_no = 2; port_no < MAX_GM_PORTS; port_no++) {
GM_DBG(PTL_GM_DBG_COMM,"about to call open port\n");
if (port_no == 3) continue;
/* port 0,1,3 reserved */
status = gm_open (&(ptl->my_port), board_no,
for (port_no = 2; port_no < MAX_GM_PORTS; port_no++)
{
GM_DBG(PTL_GM_DBG_COMM,"about to call open port\n");
if (port_no == 3) continue;
/* port 0,1,3 reserved */
status = gm_open (&(ptl->my_port), board_no,
port_no, "OMPI-GM", GM_API_VERSION_2_0);
if (GM_SUCCESS == status) {
ptl->my_port_id = port_no;
if (GM_SUCCESS == status) {
ptl->my_port_id = port_no;
mca_ptl_gm_component.gm_num_ptl_modules++;
break;
}
break;
}
}
#if 1
@ -316,11 +327,13 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
sfragment->send_buf = gm_send_reg_memory;
item = (ompi_list_item_t *) sfragment;
OMPI_FREE_LIST_RETURN( fslist, item );
gm_send_reg_memory = ((char *)gm_send_reg_memory) + GM_SEND_BUF_SIZE;
sfragment++;
}
A_PRINT("recv_tokens = %d send_tokens = %d, allocted free lis =
%d\n",ptl->num_recv_tokens,ptl->num_send_tokens,fslist->fl_num_allocated);
/*****************RECEIVE*****************************/
/*allow remote memory access */
@ -355,13 +368,16 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
/*allocate the registered memory */
gm_recv_reg_memory =
gm_dma_malloc (ptl->my_port,
(GM_RECV_BUF_SIZE * ptl->num_recv_tokens ) );
if( NULL == gm_recv_reg_memory ) {
gm_dma_malloc (ptl->my_port, (GM_RECV_BUF_SIZE * ptl->num_recv_tokens ) );
if( NULL == gm_recv_reg_memory )
{
ompi_output( 0, "unable to allocate registered memory for receive\n" );
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i = 0; i < ptl->num_recv_tokens ; i++) {
for (i = 0; i < ptl->num_recv_tokens ; i++)
{
gm_provide_receive_buffer( ptl->my_port, gm_recv_reg_memory,
GM_SIZE, GM_LOW_PRIORITY );
gm_recv_reg_memory = ((char *)gm_recv_reg_memory) + GM_RECV_BUF_SIZE;
@ -440,11 +456,11 @@ mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp)
{
int rc;
/* XXX: Do all the following inside a dispatcher, either in this routine
* or mca_ptl_gm_incoming_recv(), YUW
* i) check the send queue to see if any pending send can proceed
* ii) check for recieve and , call ptl_match to send it to the upper level
* BTW, ptl_matced is invoked inside ptl_match() via PML.
* or mca_ptl_gm_incoming_recv()
* i) check the send queue to see if any pending send can proceed
* ii) check for recieve and , call ptl_match to send it to the upper level
* BTW, ptl_matched is invoked inside ptl_match() via PML.
*/
rc = mca_ptl_gm_incoming_recv(&mca_ptl_gm_component);
return OMPI_SUCCESS;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -63,12 +63,12 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
header = (mca_ptl_base_frag_header_t*)fragment->send_buf;
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
A_PRINT("peer send (could be ack) : headerlen is %d \n", header_length);
size_in = *size;
outvec.iov_base = (char*)fragment->send_buf;
if( (size_in + header_length) <= GM_SEND_BUF_SIZE )
if( (size_in + header_length) <= GM_SEND_BUF_SIZE )
outvec.iov_len = size_in;
else
outvec.iov_len = GM_SEND_BUF_SIZE - header_length;
@ -102,8 +102,6 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
* that holds the packed data
*/
/*XXX: need to add the header */
/*copy the data to the registered buffer*/
outvec.iov_base = ((char*)fragment->send_buf) + header_length;
@ -117,7 +115,11 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
/* adjust size and request offset to reflect actual number of bytes
* packed by convertor */
size_out = outvec.iov_len;
A_PRINT(" inside peer_send request is %p\t, frag->req = %p, fragment is
%p,size is %d, send_frag is %p\n",sendreq, fragment->req,fragment,size_out,
((mca_ptl_base_header_t *)header)->hdr_ack.hdr_src_ptr);
/* initiate the gm send */
gm_send_with_callback( ptl_peer->peer_ptl->my_port, fragment->send_buf,
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
@ -125,10 +127,18 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
fragment->send_frag.frag_base.frag_addr = outvec.iov_base;
fragment->send_frag.frag_base.frag_size = size_out;
fragment->send_frag.frag_base.frag_addr = outvec.iov_base + header_length;
fragment->send_frag.frag_base.frag_size = size_out - header_length;
#if 1
fragment->send_frag.frag_request = sendreq;
if ((header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FRAG) ||
(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_MATCH))
header->hdr_frag_length = size_out - header_length;
#endif
*size = (size_out - header_length);
A_PRINT("inside peer send : bytes sent is %d\n",*size);
return OMPI_SUCCESS;
}
@ -137,20 +147,17 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
{
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *putfrag;
int bytes;
mca_pml_base_send_request_t *send_req;
size_t offset = 0;
size_t size = 0;
int flags = 0;
int rc;
mca_ptl_base_header_t* header;
int bytes2;
putfrag = (mca_ptl_gm_send_frag_t *)context;
header = (mca_ptl_base_header_t*)putfrag->send_buf;
bytes2 = header->hdr_ack.hdr_dst_size;
ptl = (mca_ptl_gm_module_t *)putfrag->ptl;
send_req = putfrag->req;
bytes = putfrag->send_frag.frag_base.frag_size;
GM_DBG(PTL_GM_DBG_COMM,"ENTERING PUT CALLBACK\n");
A_PRINT("ENTERING PUT CALLBACK\n");
switch (status) {
case GM_SUCCESS:
@ -161,21 +168,22 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
putfrag->put_sent = 1;
/* send the header information through send/receive channel */
#if 0
rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,send_req,
offset,&size,flags);
assert(rc == 0);
GM_DBG(PTL_GM_DBG_COMM,"FINISHED SENDING FIN\n");
GM_DBG(PTL_GM_DBG_COMM,"after issuing the put completion the request offset = %d\n",send_req->req_offset);
#endif
/* deregister the user memory */
status = gm_deregister_memory(ptl->my_port, (char *)(putfrag->registered_buf), bytes);
status = gm_deregister_memory(ptl->my_port, (char *)(putfrag->registered_buf), bytes2);
if(GM_SUCCESS != status) {
ompi_output(0," unpinning memory failed\n");
ompi_output(0," unpinning memory failed\n");
}
else {
GM_DBG(PTL_GM_DBG_COMM, " unpinning %d bytes of memory success\n",bytes);
GM_DBG(PTL_GM_DBG_COMM, " unpinning %d bytes of memory success\n",bytes2);
}
break;
@ -196,6 +204,93 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
}
void send_callback(struct gm_port *port,void * context, gm_status_t status)
{
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *frag;
int header_length;
mca_pml_base_send_request_t *gm_send_req;
mca_ptl_base_header_t* header;
frag = (mca_ptl_gm_send_frag_t *)context;
ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
gm_send_req = frag->send_frag.frag_request;
header = (mca_ptl_base_header_t*)frag->send_buf;
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
switch (status) {
case GM_SUCCESS:
ptl->num_send_tokens++;
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK)
{
A_PRINT("send callback: Completion of send_ack, sent frag is %p\n", frag);
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
}
else if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN)
{
A_PRINT("send callback : Completion of fin, bytes complete =
%d\n",header->hdr_ack.hdr_dst_size);
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
header->hdr_ack.hdr_dst_size);
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
}
/* else if (NULL == gm_send_req) {
A_PRINT("send callback for ack : \n");
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
} */
else if (0 == (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED)
|| mca_pml_base_send_request_matched(gm_send_req))
{
A_PRINT(" send callback : match not required\n");
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, frag->send_frag.frag_request,
header->hdr_frag.hdr_frag_length);
/* Return sendfrag to free list */
A_PRINT("Return frag : %p", frag);
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
}
else
{
/* Not going to call progress on this send,
* and not free-ing descriptor */
frag->send_complete = 1;
A_PRINT("send callback : match required but not yet recv ack
sendfrag is %p\n",frag);
}
break;
case GM_SEND_TIMED_OUT:
/* need to take care of retransmission */
break;
case GM_SEND_DROPPED:
/* need to handle this case */
break;
default:
ompi_output(0,
"[%s:%d] error in message completion\n",__FILE__,__LINE__);
break;
}
A_PRINT("RETURNING FROM SEND_CALLBACK\n");
}
#if 0
void send_callback(struct gm_port *port,void * context, gm_status_t status)
{
mca_ptl_gm_module_t *ptl;
@ -205,15 +300,23 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
mca_pml_base_send_request_t *gm_send_req;
mca_ptl_base_header_t* header;
frag = (mca_ptl_gm_send_frag_t *)context;
ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
gm_send_req = frag->req;
GM_DBG(PTL_GM_DBG_COMM,"INSIDE SENDCALLBACK request is %p\t, frag->req =
%p, frag is %p \n",gm_send_req, frag->req,frag);
header = (mca_ptl_base_header_t*)frag->send_buf;
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
if (frag->type == PUT)
{
/*printf("fin completion: frag is %p\n",frag);
fflush(stdout); */
bytes = header->hdr_ack.hdr_dst_size;
}
else
@ -236,14 +339,33 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
ptl->num_send_tokens++;
frag->send_complete = 1;
/*OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t * *)frag);*/
/*A_PRINT("returned frag pointer %p, free_list_num = %d\n",*/
/*frag,(&(ptl->gm_send_frags))->fl_num_allocated);*/
if ((frag->wait_for_ack == 0) && (gm_send_req != NULL)) {
GM_DBG(PTL_GM_DBG_COMM,"inside send callback : calling send progress bytes = %d\n",bytes);
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
gm_send_req, bytes );
}
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag);
break;
if ((frag->wait_for_ack == 0) && (gm_send_req != NULL))
{
if (frag->type == PUT)
{
/*printf("returned frag pointer %p, free_list_num = %d frag type =
%d\n", frag,(&(ptl->gm_send_frags))->fl_num_allocated,frag->type);
fflush(stdout);*/
}
GM_DBG(PTL_GM_DBG_COMM,"inside send callback : calling send progress bytes = %d\n",bytes);
A_PRINT("inside send callback : calling send progress bytes = %d\n",bytes);
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, gm_send_req, bytes );
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag);
}
if (gm_send_req == NULL)
{
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag);
A_PRINT("returned ack frag pointer %p, free_list_num = %d\n", frag,(&(ptl->gm_send_frags))->fl_num_allocated);
}
break;
case GM_SEND_TIMED_OUT:
/* need to take care of retransmission */
@ -261,6 +383,9 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
GM_DBG(PTL_GM_DBG_COMM,"RETURNING FROM SEND_CALLBACK\n");
}
#endif
void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
@ -275,25 +400,31 @@ void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK)
{
frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval;
A_PRINT("ack header is %d, frag_ptr is
%p\n",header->hdr_common.hdr_size,header->hdr_ack.hdr_src_ptr.pval);
frag = (mca_ptl_gm_send_frag_t *)(header->hdr_ack.hdr_src_ptr.pval);
A_PRINT("inside ACK, corresp frag pointer %p\n",frag);
req = (mca_pml_base_send_request_t *) frag->req;
assert(req != NULL);
req->req_peer_match.pval = header->hdr_ack.hdr_dst_match.pval;
req->req_peer_addr.pval = header->hdr_ack.hdr_dst_addr.pval;
req->req_peer_size = header->hdr_ack.hdr_dst_size;
frag->wait_for_ack = 0;
/* check if send has completed */
header_length =
frag->send_frag.frag_base.frag_header.hdr_frag.hdr_common.hdr_size;
bytes = frag->send_frag.frag_base.frag_size - 64; /*header_length;*/
header_length = frag->send_frag.frag_base.frag_header.hdr_frag.hdr_common.hdr_size;
bytes = frag->send_frag.frag_base.frag_size;
if(frag->send_complete == 1)
{
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
req, bytes );
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
frag->send_frag.frag_request,bytes);
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag);
A_PRINT("inside ACK,returning frag pointer %p, request is %p, bytes
is %d\n",frag, frag->send_frag.frag_request, header->hdr_frag.hdr_frag_length );
}
}
@ -302,45 +433,36 @@ void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
{
GM_DBG(PTL_GM_DBG_COMM,"CASE: HDR_TYPE_FIN\n");
request = (mca_pml_base_recv_request_t*)
header->hdr_ack.hdr_dst_match.pval;
/* call receive progress and indicate the recv has been completed */
GM_DBG(PTL_GM_DBG_COMM,"Calling recv_progress with bytes = %d\n",header->hdr_ack.hdr_dst_size);
ptl->super.ptl_recv_progress (
GM_DBG(PTL_GM_DBG_COMM,"Calling recv_progress with bytes = %d\n",header->hdr_ack.hdr_dst_size);
ptl->super.ptl_recv_progress (
(mca_ptl_base_module_t *) ptl,
request ,
header->hdr_ack.hdr_dst_size,
header->hdr_ack.hdr_dst_size);
/* deregister the memory */
bytes = header->hdr_ack.hdr_dst_size;
reg_buf =(char *) header->hdr_ack.hdr_dst_addr.pval;
status = gm_deregister_memory(ptl->my_port, reg_buf,
bytes);
bytes = header->hdr_ack.hdr_dst_size;
reg_buf =(char *) header->hdr_ack.hdr_dst_addr.pval;
status = gm_deregister_memory(ptl->my_port, reg_buf, bytes);
if(GM_SUCCESS != status) {
ompi_output(0," unpinning memory failed\n");
} else {
GM_DBG(PTL_GM_DBG_COMM,
"unpinning memory success,addr:%p,bytes:%d\n",reg_buf,bytes);
}
if(GM_SUCCESS != status)
{
ompi_output(0," unpinning memory failed\n");
}
else
{
GM_DBG(PTL_GM_DBG_COMM, "unpinning memory success,addr:%p,bytes:%d\n",reg_buf,bytes);
}
#if 0
/*return the recv fragment to the free list */
OMPI_FREE_LIST_RETURN(
&(((mca_ptl_gm_module_t *)ptl)->gm_recv_frags_free),
(ompi_list_item_t *)recv_frag);
/* free the associated buffer */
if(recv_frag->have_allocated == true)
free(recv_frag->frag_recv.frag_base.frag_add * GM_SEND_BUF_SIZE);
#endif
}
/* XXX: will handle NACK later */
}
mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
gm_recv_event_t* event )
{
@ -352,7 +474,8 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
/* allocate a receive fragment */
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
A_PRINT("the allocate drecv fragment is %p\n", recv_frag);
recv_frag->frag_recv.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl;
recv_frag->frag_recv.frag_base.frag_peer = NULL;
recv_frag->frag_recv.frag_request = NULL;
@ -363,9 +486,16 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
recv_frag->frag_progressed = 0;
recv_frag->frag_recv.frag_base.frag_header = *header;
#if 0
recv_frag->frag_recv.frag_base.frag_addr = header;
/* + sizeof(mca_ptl_base_header_t);*/ /* XXX: bug */
recv_frag->frag_recv.frag_base.frag_size = gm_ntohl(event->recv.length);
#endif
#if 1
recv_frag->frag_recv.frag_base.frag_addr =
(char *) header + sizeof (mca_ptl_base_header_t);
recv_frag->frag_recv.frag_base.frag_size = header->hdr_frag.hdr_frag_length;
#endif
recv_frag->matched = false;
recv_frag->have_allocated_buffer = false;
@ -377,6 +507,7 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
if( matched ) {
return NULL;
}
GM_DBG(PTL_GM_DBG_COMM,
"matching receive not yet posted get tag %d comm %d source %d\n",
header->hdr_match.hdr_tag, header->hdr_match.hdr_contextid, header->hdr_match.hdr_src );
@ -385,6 +516,7 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_event_t* event )
{
mca_ptl_gm_recv_frag_t* frag = NULL;
@ -411,11 +543,14 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_ev
return frag;
}
void mca_ptl_gm_outstanding_recv(mca_ptl_gm_module_t *ptl)
{
mca_ptl_gm_recv_frag_t * frag = NULL;
int i, size;
int size;
bool matched;
size = ompi_list_get_size (&ptl->gm_recv_outstanding_queue);
@ -483,14 +618,6 @@ int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
/* mark the fragment as having pending buffers */
frag->have_allocated_buffer = true;
#if 0
/* append to the receive queue */
ompi_list_append (&(ptl->gm_recv_outstanding_queue),
(ompi_list_item_t *) frag);
printf ("frag appended to recv_oustanding queue \n");
#endif
}
gm_provide_receive_buffer( ptl->my_port, gm_ntohp(event->recv.buffer),
GM_SIZE, GM_LOW_PRIORITY );

Просмотреть файл

@ -21,13 +21,22 @@
#define GM_DBG(flag, args...) \
do { \
if (PTL_GM_DBG_FLAG & flag) { \
char hostname[32]; gethostname(hostname, 32); \
fprintf(stderr, "[%s:%s:%d] ", \
hostname, __FUNCTION__, __LINE__); \
fprintf(stderr, args); \
char hostname[32]; gethostname(hostname, 32); \
fprintf(stderr, "[%s:%s:%d] ", \
hostname, __FUNCTION__, __LINE__); \
fprintf(stderr, args); \
} \
} while (0)
#if 0
#define A_PRINT(fmt, args...) { \
ompi_output(0, "[%s:%d:%s] " fmt, __FILE__, __LINE__, __func__, \
##args); \
}
#else
#define A_PRINT(fmt, args...)
#endif
void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t * header);

Просмотреть файл

@ -60,21 +60,27 @@ mca_ptl_gm_alloc_send_frag(struct mca_ptl_base_module_t *ptl,
mca_ptl_tstamp_t tstamp = 0;
GM_DBG(PTL_GM_DBG_COMM,"INSIDE ALLOC SEND FRAG\n");
flist =&( ((mca_ptl_gm_module_t *)ptl)->gm_send_frags );
A_PRINT("num_list_allocated: %d\n",flist->fl_num_allocated);
item = ompi_list_remove_first(&((flist)->super));
GM_DBG(PTL_GM_DBG_COMM,"AFTER ALLOC SEND FRAG\n");
A_PRINT("send_frag: %p\n", item);
A_PRINT("after removing a sendfrag num_list_allocated: %d\n",flist->fl_num_allocated);
while(NULL == item)
{
A_PRINT("888888888888888888888888 calling progress to allocate send frag\n");
ptl->ptl_component->ptlm_progress(tstamp);
item = ompi_list_remove_first (&((flist)->super));
}
frag = (mca_ptl_gm_send_frag_t *)item;
frag->req = (struct mca_pml_base_send_request_t *)sendreq;
GM_DBG(PTL_GM_DBG_COMM," request is %p\t, frag->req = %p\n",sendreq, frag->req);
GM_DBG(PTL_GM_DBG_COMM, "request is %p\t, frag->req = %p\n",sendreq, frag->req);
frag->type = 0 ;
return frag;
}
@ -102,12 +108,12 @@ int mca_ptl_gm_send_ack_init(
mca_ptl_base_header_t * hdr;
mca_pml_base_recv_request_t *request;
hdr = (mca_ptl_base_header_t *)ack->send_buf;
memset(hdr, 0, sizeof(mca_ptl_base_header_t));
ack->status = -1;
ack->type = -1;
ack->wait_for_ack = 0;
ack->put_sent = -1;
ack->send_complete = -1;
memset(hdr, 0, sizeof(mca_ptl_base_ack_header_t));
ack->status = -1;
ack->type = -1;
ack->wait_for_ack = 0;
ack->put_sent = -1;
ack->send_complete = -1;
GM_DBG(PTL_GM_DBG_COMM,"ack buf is %p\n",ack->send_buf);
@ -119,12 +125,15 @@ int mca_ptl_gm_send_ack_init(
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
hdr->hdr_ack.hdr_src_ptr.pval =
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr.pval;
/*assert(hdr->hdr_ack.hdr_src_ptr->req != NULL);*/
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr.pval;
A_PRINT("inside ack init: the src frag ptr is %p,hdr_len is %u
\n",hdr->hdr_ack.hdr_src_ptr,hdr->hdr_common.hdr_size);
/*assert(((mca_ptl_gm_send_frag_t *)(hdr->hdr_ack.hdr_src_ptr.pval))->req
* != NULL);*/
hdr->hdr_ack.hdr_dst_match.lval = 0;
hdr->hdr_ack.hdr_dst_match.pval = request; /*should this be dst_match */
hdr->hdr_ack.hdr_dst_addr.lval = 0; /*we are filling both p and val of
dest address */
hdr->hdr_ack.hdr_dst_addr.lval = 0; /*we are filling both p and val of dest address */
hdr->hdr_ack.hdr_dst_addr.pval = (void *)buffer;
hdr->hdr_ack.hdr_dst_size = size;
@ -139,7 +148,7 @@ dest address */
ack->wait_for_ack = 0;
ack->type = ACK;
return OMPI_SUCCESS;
return OMPI_SUCCESS;
}
@ -154,14 +163,14 @@ int mca_ptl_gm_put_frag_init(
int flags)
{
mca_ptl_base_header_t *hdr;
hdr = (mca_ptl_base_header_t *)putfrag->send_buf;
memset(hdr, 0, sizeof(mca_ptl_base_header_t));
putfrag->status = -1;
putfrag->type = -1;
putfrag->wait_for_ack = 0;
putfrag->put_sent = -1;
putfrag->send_complete = -1;
mca_ptl_base_header_t *hdr;
hdr = (mca_ptl_base_header_t *)putfrag->send_buf;
memset(hdr, 0, sizeof(mca_ptl_base_header_t));
putfrag->status = -1;
putfrag->type = -1;
putfrag->wait_for_ack = 0;
putfrag->put_sent = -1;
putfrag->send_complete = -1;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
hdr->hdr_common.hdr_flags = 0;
@ -183,7 +192,7 @@ int mca_ptl_gm_put_frag_init(
putfrag->wait_for_ack = 0;
putfrag->put_sent = 0;
putfrag->type = PUT;
putfrag->req = request; /* gm_send_request */
putfrag->req = request;
assert(putfrag->req != NULL);
return OMPI_SUCCESS;
}
@ -207,13 +216,13 @@ int mca_ptl_gm_send_frag_init(
hdr = (mca_ptl_base_header_t *)sendfrag->send_buf;
memset(hdr, 0, sizeof(mca_ptl_base_header_t));
sendfrag->status = -1;
sendfrag->type = -1;
sendfrag->wait_for_ack = 0;
sendfrag->put_sent = -1;
sendfrag->send_complete = -1;
sendfrag->status = -1;
sendfrag->type = -1;
sendfrag->wait_for_ack = 0;
sendfrag->put_sent = -1;
sendfrag->send_complete = -1;
assert(sendfrag->req != NULL);
if (offset == 0)
{
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
@ -234,6 +243,11 @@ int mca_ptl_gm_send_frag_init(
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length= sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
#if 1
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
#endif
}
else
{
@ -256,6 +270,8 @@ int mca_ptl_gm_send_frag_init(
return OMPI_SUCCESS;
}
ompi_class_t mca_ptl_gm_recv_frag_t_class = {
"mca_ptl_gm_recv_frag_t",
OBJ_CLASS (mca_ptl_base_recv_frag_t),

Просмотреть файл

@ -51,8 +51,6 @@ struct mca_ptl_gm_send_frag_t {
typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t;
/*#define MCA_PTL_GM_SEND_FRAG_ALLOC(item, rc) \*/
/*OMPI_FREE_LIST_GET(&mca_ptl_gm_module.gm_send_frags, item, rc);*/
struct mca_ptl_gm_recv_frag_t {