1
1
Этот коммит содержится в:
Gopal Santhanaraman 2004-09-07 21:29:18 +00:00
родитель 985b219c6c
Коммит fb6cb846b9
7 изменённых файлов: 240 добавлений и 181 удалений

Просмотреть файл

@ -21,6 +21,8 @@
#include "ptl_gm_peer.h" #include "ptl_gm_peer.h"
#include "ptl_gm_priv.h" #include "ptl_gm_priv.h"
#define DEBUG 0
mca_ptl_gm_module_t mca_ptl_gm_module = { mca_ptl_gm_module_t mca_ptl_gm_module = {
{ {
&mca_ptl_gm_component.super, &mca_ptl_gm_component.super,
@ -132,17 +134,11 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl,
peers[i] = (struct mca_ptl_base_peer_t*)ptl_peer; peers[i] = (struct mca_ptl_base_peer_t*)ptl_peer;
/*printf ("Global_id\t local_id\t port_number\t process name \n");*/
/*fflush (stdout);*/
/*printf ("%u %d %d %d\n", ptl_proc->peer_arr[0]->global_id,*/
/*ptl_proc->peer_arr[0]->local_id,*/
/*ptl_proc->peer_arr[0]->port_number,
* ptl_proc->proc_guid);*/
/*fflush (stdout);*/
} }
#if DEBUG
printf ("returning with success from gm_add_procs\n"); printf ("returning with success from gm_add_procs\n");
#endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -190,11 +186,9 @@ mca_ptl_gm_request_init(struct mca_ptl_base_module_t *ptl,
else else
{ {
req = (mca_ptl_gm_send_request_t *)request; req = (mca_ptl_gm_send_request_t *)request;
/*((mca_ptl_gm_send_request_t *)request)->req_frag = frag;*/
req->req_frag = frag; req->req_frag = frag;
frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/ frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/
frag->ptl = (mca_ptl_gm_module_t*)ptl; frag->ptl = (mca_ptl_gm_module_t*)ptl;
/*frag->peer = request->req_peer;*/
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -254,12 +248,12 @@ mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl,
gm_ptl->num_send_tokens--; gm_ptl->num_send_tokens--;
/*Update offset */ /*Update offset */
sendreq->req_offset += size; /* XXX: should be what convertor packs */ sendreq->req_offset += rc; /* XXX: should be what convertor packs */
/*append to the send_fragments_queue. */ /*append to the send_fragments_queue. */
ompi_list_append (&(gm_ptl->gm_send_frags_queue), /*ompi_list_append (&(gm_ptl->gm_send_frags_queue),*/
(ompi_list_item_t *) sendfrag); /*(ompi_list_item_t *) sendfrag);*/
return rc; return OMPI_SUCCESS;
} }
/* /*
@ -273,8 +267,7 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
size_t offset, size_t size, int flags) size_t offset, size_t size, int flags)
{ {
int rc; int rc;
mca_ptl_gm_send_frag_t *sendfrag, *putfrag; mca_ptl_gm_send_frag_t *putfrag;
mca_ptl_gm_peer_t *gm_ptl_peer;
mca_ptl_gm_module_t * gm_ptl; mca_ptl_gm_module_t * gm_ptl;
void* destination_buffer; void* destination_buffer;
char * buffer_ptr; char * buffer_ptr;
@ -301,10 +294,16 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
putfrag->registered_buf = (void *)buffer_ptr; putfrag->registered_buf = (void *)buffer_ptr;
putfrag->peer = (mca_ptl_gm_peer_t *)ptl_peer; putfrag->peer = (mca_ptl_gm_peer_t *)ptl_peer;
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =putfrag;
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags;
rc = mca_ptl_gm_put_frag_init(putfrag , rc = mca_ptl_gm_put_frag_init(putfrag ,
(mca_ptl_gm_peer_t*)ptl_peer,gm_ptl, (mca_ptl_gm_peer_t*)ptl_peer,gm_ptl,
sendreq, offset, &size, flags); sendreq, offset, &size, flags);
/* check that we have a send token available */
rc = rc =
mca_ptl_gm_peer_put((mca_ptl_gm_peer_t *)ptl_peer, putfrag, mca_ptl_gm_peer_put((mca_ptl_gm_peer_t *)ptl_peer, putfrag,
sendreq, offset, &size, flags, sendreq, offset, &size, flags,
@ -314,17 +313,6 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
gm_ptl->num_send_tokens--; gm_ptl->num_send_tokens--;
/*do a send to notify completion */
/*sendfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq);*/
/*rc = mca_ptl_gm_send_fini_init (sendfrag,gm_ptl, */
/*(mca_ptl_gm_peer_t*)ptl_peer, sendreq);*/
/*gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer;*/
/*rc = mca_ptl_gm_peer_send (gm_ptl_peer,sendfrag,sendreq,*/
/*offset,&size,flags);*/
/*gm_ptl->num_send_tokens--;*/
sendreq->req_offset += size; sendreq->req_offset += size;
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -376,8 +364,9 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
request = frag->frag_request; request = frag->frag_request;
gm_ptl = (mca_ptl_gm_module_t *)ptl; gm_ptl = (mca_ptl_gm_module_t *)ptl;
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
#if 1
/* might need to send an ack back */ /* might need to send an ack back */
recv_frag = (mca_ptl_gm_recv_frag_t *) frag; recv_frag = (mca_ptl_gm_recv_frag_t *) frag;
ack = mca_ptl_gm_alloc_send_frag(ptl,NULL); ack = mca_ptl_gm_alloc_send_frag(ptl,NULL);
@ -401,12 +390,16 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
bytes_reg = total_bytes - bytes_recv; bytes_reg = total_bytes - bytes_recv;
buffer_ptr += bytes_recv; buffer_ptr += bytes_recv;
status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg); status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg);
recv_frag->registered_buf = buffer_ptr;
printf("Receiver: register addr: %p, bytes: %d\n",buffer_ptr,bytes_reg);
fflush(stdout);
if(GM_SUCCESS != status) if(GM_SUCCESS != status)
{ {
ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__); ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__);
} }
/* send the registered memory information, send recv request * ptr */ /* send the regiscered memory information, send recv request * ptr */
rc1 = mca_ptl_gm_send_ack_init (ack, gm_ptl, rc1 = mca_ptl_gm_send_ack_init (ack, gm_ptl,
(mca_ptl_gm_peer_t *)(recv_frag->frag_recv.frag_base.frag_peer) (mca_ptl_gm_peer_t *)(recv_frag->frag_recv.frag_base.frag_peer)
, recv_frag, buffer_ptr, bytes_reg); , recv_frag, buffer_ptr, bytes_reg);
@ -419,11 +412,10 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
ack,srequest,0,&size,0 ); ack,srequest,0,&size,0 );
gm_ptl->num_send_tokens--; gm_ptl->num_send_tokens--;
ompi_list_append (&(gm_ptl->gm_send_frags_queue), /*ompi_list_append (&(gm_ptl->gm_send_frags_queue),*/
(ompi_list_item_t *) ack); /*(ompi_list_item_t *) ack);*/
} }
#endif
} }
/* Here we expect that frag_addr is the begin of the buffer header included */ /* Here we expect that frag_addr is the begin of the buffer header included */
@ -431,9 +423,9 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size; bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
iov[0].iov_len = bytes_recv; iov[0].iov_len = bytes_recv;
/*process fragment if complete */
if (header->hdr_frag.hdr_frag_length > 0) { if (header->hdr_frag.hdr_frag_length > 0) {
ompi_proc_t *proc; /* ompi_proc_t *proc;
proc = ompi_comm_peer_lookup(request->req_base.req_comm, proc = ompi_comm_peer_lookup(request->req_base.req_comm,
request->req_base.req_peer); request->req_base.req_peer);
@ -448,10 +440,14 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
request->req_base.req_addr, request->req_base.req_addr,
header->hdr_frag.hdr_frag_offset); header->hdr_frag.hdr_frag_offset);
rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1); rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1);
assert( rc == 1 ); assert( rc == 1 );*/
printf ("in matched: bytes received is %d\n", bytes_recv);
fflush(stdout);
memcpy(request->req_base.req_addr,iov[0].iov_base,bytes_recv);
} }
/*update progress*/ /* XXX : check this */ /*update progress*/
ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len ); ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len );
/* Now update the status of the fragment */ /* Now update the status of the fragment */

Просмотреть файл

@ -65,13 +65,12 @@ struct mca_ptl_gm_module_t {
unsigned int num_recv_tokens; unsigned int num_recv_tokens;
unsigned int max_send_tokens; unsigned int max_send_tokens;
unsigned int max_recv_tokens; unsigned int max_recv_tokens;
/*struct mca_ptl_gm_addr_t *proc_id_table;*/
ompi_free_list_t gm_send_frags; ompi_free_list_t gm_send_frags;
ompi_free_list_t gm_recv_frags_free; ompi_free_list_t gm_recv_frags_free;
ompi_list_t gm_send_frags_queue; ompi_list_t gm_send_frags_queue;
ompi_list_t gm_pending_acks; ompi_list_t gm_pending_acks;
ompi_list_t gm_recv_outstanding_queue;
#if MCA_PTL_GM_STATISTICS #if MCA_PTL_GM_STATISTICS
size_t ptl_bytes_sent; size_t ptl_bytes_sent;
size_t ptl_bytes_recv; size_t ptl_bytes_recv;

Просмотреть файл

@ -111,14 +111,14 @@ mca_ptl_gm_component_open (void)
/* register GM component parameters */ /* register GM component parameters */
mca_ptl_gm_module.super.ptl_first_frag_size = mca_ptl_gm_module.super.ptl_first_frag_size =
mca_ptl_gm_param_register_int ("first_frag_size", 16 * 1024); mca_ptl_gm_param_register_int ("first_frag_size", ((16 * 1024) - 64));
mca_ptl_gm_module.super.ptl_min_frag_size = mca_ptl_gm_module.super.ptl_min_frag_size =
mca_ptl_gm_param_register_int ("min_frag_size", 1<<16); mca_ptl_gm_param_register_int ("min_frag_size", 1<<16);
mca_ptl_gm_module.super.ptl_max_frag_size = mca_ptl_gm_module.super.ptl_max_frag_size =
mca_ptl_gm_param_register_int ("max_frag_size", 256 * 1024); mca_ptl_gm_param_register_int ("max_frag_size", 256 * 1024);
mca_ptl_gm_component.gm_free_list_num = mca_ptl_gm_component.gm_free_list_num =
mca_ptl_gm_param_register_int ("free_list_num", 32); mca_ptl_gm_param_register_int ("free_list_num", 256);
mca_ptl_gm_component.gm_free_list_inc = mca_ptl_gm_component.gm_free_list_inc =
mca_ptl_gm_param_register_int ("free_list_inc", 32); mca_ptl_gm_param_register_int ("free_list_inc", 32);
@ -296,7 +296,7 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
ompi_free_list_init (&(ptl->gm_send_frags), ompi_free_list_init (&(ptl->gm_send_frags),
sizeof (mca_ptl_gm_send_frag_t), sizeof (mca_ptl_gm_send_frag_t),
OBJ_CLASS (mca_ptl_gm_send_frag_t), OBJ_CLASS (mca_ptl_gm_send_frag_t),
32, 32, 1, NULL); /* not using mpool */ ptl->num_send_tokens,ptl->num_send_tokens, 1, NULL); /* not using mpool */
/* allocate the elements */ /* allocate the elements */
sfragment = (mca_ptl_gm_send_frag_t *) sfragment = (mca_ptl_gm_send_frag_t *)
@ -329,11 +329,18 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
} }
OBJ_CONSTRUCT (&(ptl->gm_recv_outstanding_queue), ompi_list_t);
/* construct the list of recv fragments free */ /* construct the list of recv fragments free */
OBJ_CONSTRUCT (&(ptl->gm_recv_frags_free), ompi_free_list_t); OBJ_CONSTRUCT (&(ptl->gm_recv_frags_free), ompi_free_list_t);
free_rlist = &(ptl->gm_recv_frags_free); free_rlist = &(ptl->gm_recv_frags_free);
ompi_free_list_init (&(ptl->gm_recv_frags_free),
sizeof (mca_ptl_gm_recv_frag_t),
OBJ_CLASS (mca_ptl_gm_recv_frag_t),
ptl->num_recv_tokens,ptl->num_recv_tokens, 1, NULL); /* not using mpool */
/*allocate the elements */ /*allocate the elements */
free_rfragment = (mca_ptl_gm_recv_frag_t *) free_rfragment = (mca_ptl_gm_recv_frag_t *)
malloc(sizeof(mca_ptl_gm_recv_frag_t) * NUM_RECV_FRAGS); malloc(sizeof(mca_ptl_gm_recv_frag_t) * NUM_RECV_FRAGS);

Просмотреть файл

@ -23,6 +23,8 @@
#include "ptl_gm_sendfrag.h" #include "ptl_gm_sendfrag.h"
#include "ptl_gm_priv.h" #include "ptl_gm_priv.h"
#define DEBUG 0
int mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer, int mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer,
mca_ptl_gm_send_frag_t *fragment, mca_ptl_gm_send_frag_t *fragment,
struct mca_pml_base_send_request_t *sendreq, struct mca_pml_base_send_request_t *sendreq,
@ -68,15 +70,11 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
outvec[0].iov_base = (char*)fragment->send_buf; outvec[0].iov_base = (char*)fragment->send_buf;
if( (size_in + header_length) < GM_SEND_BUF_SIZE ) if( (size_in + header_length) <= GM_SEND_BUF_SIZE )
outvec[0].iov_len = size_in; outvec[0].iov_len = size_in;
else else
outvec[0].iov_len = GM_SEND_BUF_SIZE - header_length; outvec[0].iov_len = GM_SEND_BUF_SIZE - header_length;
/*header_length = sizeof(mca_ptl_base_frag_header_t);*/
/* copy the header in the buffer */
/**header = fragment->send_frag.frag_base.frag_header;*/
if(size_in > 0) { if(size_in > 0) {
ompi_convertor_t *convertor; ompi_convertor_t *convertor;
@ -129,9 +127,10 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super; fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer; fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
fragment->send_frag.frag_base.frag_addr = outvec[0].iov_base; fragment->send_frag.frag_base.frag_addr = outvec[0].iov_base;
fragment->send_frag.frag_base.frag_size = size_out; /*XXX: should this be size_out */ fragment->send_frag.frag_base.frag_size = size_out;
return OMPI_SUCCESS; return (size_out - header_length);
/*return OMPI_SUCCESS;*/
} }
@ -139,50 +138,61 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
{ {
mca_ptl_gm_module_t *ptl; mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *putfrag; mca_ptl_gm_send_frag_t *putfrag;
/*ompi_list_t *list;*/ int bytes;
int bytes, header_length; mca_pml_base_send_request_t *send_req;
mca_pml_base_send_request_t *gm_send_req; size_t offset = 0;
mca_ptl_base_frag_header_t* header; size_t size = 0;
int offset = 0;
int size = 0;
int flags = 0; int flags = 0;
int rc; int rc;
putfrag = (mca_ptl_gm_send_frag_t *)context; putfrag = (mca_ptl_gm_send_frag_t *)context;
ptl = (mca_ptl_gm_module_t *)putfrag->ptl; ptl = (mca_ptl_gm_module_t *)putfrag->ptl;
gm_send_req = putfrag->req; send_req = putfrag->req;
header = (mca_ptl_base_frag_header_t*)putfrag->registered_buf; bytes = putfrag->send_frag.frag_base.frag_size;
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
/* XXX : what should the header length be */
bytes = putfrag->send_frag.frag_base.frag_size - header_length;
#if DEBUG
printf("ENTERING PUT CALLBACK\n"); printf("ENTERING PUT CALLBACK\n");
fflush(stdout); fflush(stdout);
#endif
switch (status) { switch (status) {
case GM_SUCCESS: case GM_SUCCESS:
/* local put completed, mark put as complete */ /* local put completed, mark put as complete */
#if DEBUG
printf("PUTCALLBACK WITH CASE GM_SUCCESS\n"); printf("PUTCALLBACK WITH CASE GM_SUCCESS\n");
fflush(stdout); fflush(stdout);
#endif
ptl->num_send_tokens++; ptl->num_send_tokens++;
putfrag->put_sent = 1; putfrag->put_sent = 1;
/* send the header information through send/receive channel */ /* send the header information through send/receive channel */
rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,gm_send_req, rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,send_req,
offset,&size,flags); offset,&size,flags);
#if DEBUG
printf("after issuing the put completion the request offset = %d\n",send_req->req_offset);
fflush(stdout);
#endif
/* deregister the user memory */ /* deregister the user memory */
status = gm_deregister_memory(ptl->my_port, (char *)(putfrag->registered_buf), bytes); status = gm_deregister_memory(ptl->my_port, (char *)(putfrag->registered_buf), bytes);
if(GM_SUCCESS != status) if(GM_SUCCESS != status)
{
#if DEBUG
ompi_output(0," unpinning memory failed\n"); ompi_output(0," unpinning memory failed\n");
#endif
}
else else
ompi_output(0," unpinning memory success\n"); {
#if DEBUG
ompi_output(0," unpinning %d bytes of memory success\n",bytes);
#endif
}
break; break;
case GM_SEND_TIMED_OUT: case GM_SEND_TIMED_OUT:
@ -209,49 +219,57 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
ompi_list_t *list; ompi_list_t *list;
int bytes, header_length; int bytes, header_length;
mca_pml_base_send_request_t *gm_send_req; mca_pml_base_send_request_t *gm_send_req;
mca_ptl_base_frag_header_t* header; mca_ptl_base_header_t* header;
frag = (mca_ptl_gm_send_frag_t *)context; frag = (mca_ptl_gm_send_frag_t *)context;
/*ptl = (mca_ptl_gm_module_t *)frag->ptl;*/
ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner; ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
gm_send_req = frag->req; gm_send_req = frag->req;
header = (mca_ptl_base_frag_header_t*)frag->send_buf; header = (mca_ptl_base_header_t*)frag->send_buf;
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
bytes = frag->send_frag.frag_base.frag_size - 64;/*header_length;*/
if (frag->type == 1)
{
bytes = header->hdr_ack.hdr_dst_size;
}
else
bytes = frag->send_frag.frag_base.frag_size - header_length;
if (NULL != gm_send_req) if (NULL != gm_send_req)
{ {
if(1 == (( mca_ptl_gm_send_request_t *)gm_send_req)->need_ack ) if(1 == (( mca_ptl_gm_send_request_t *)gm_send_req)->need_ack )
frag->wait_for_ack = 1; frag->wait_for_ack = 1;
} }
printf("ENTERING SEND CALLBACK\n");
fflush(stdout);
switch (status) { switch (status) {
case GM_SUCCESS: case GM_SUCCESS:
/* send completed, can reuse the user buffer */ /* send completed, can reuse the user buffer */
#if DEBUG
printf("SENDCALLBACK WITH CASE GM_SUCCESS\n"); printf("SENDCALLBACK WITH CASE GM_SUCCESS\n");
fflush(stdout); fflush(stdout);
#endif
ptl->num_send_tokens++; ptl->num_send_tokens++;
frag->send_complete = 1; frag->send_complete = 1;
/*
while (1 == (frag->wait_for_ack))
{
mca_ptl_gm_incoming_recv (&mca_ptl_gm_component);
//This is recursive
}
*/
if (frag->wait_for_ack == 0 && (gm_send_req != NULL)) if (frag->wait_for_ack == 0 && (gm_send_req != NULL))
{ {
#if DEBUG
printf("inside send callback : calling send progress bytes = %d\n",bytes);
fflush(stdout);
#endif
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
gm_send_req, bytes ); gm_send_req, bytes );
} }
list = (ompi_list_t *)(&(ptl->gm_send_frags_queue));
ompi_list_remove_first(list); OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag);
/*list = (ompi_list_t *)(&(ptl->gm_send_frags_queue));*/
/*ompi_list_remove_first(list);*/
break; break;
case GM_SEND_TIMED_OUT: case GM_SEND_TIMED_OUT:
@ -269,6 +287,7 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
} }
} }
void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t * header) mca_ptl_base_header_t * header)
{ {
@ -276,48 +295,79 @@ void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
mca_pml_base_send_request_t *req; mca_pml_base_send_request_t *req;
mca_pml_base_recv_request_t *request; mca_pml_base_recv_request_t *request;
int header_length, bytes; int header_length, bytes;
char * reg_buf;
int status;
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK)
{ {
frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval; frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval;
req = (mca_pml_base_send_request_t *) frag->req; req = (mca_pml_base_send_request_t *) frag->req;
req->req_peer_match = header->hdr_ack.hdr_dst_match; req->req_peer_match.pval = header->hdr_ack.hdr_dst_match.pval;
req->req_peer_addr = header->hdr_ack.hdr_dst_addr; req->req_peer_addr.pval = header->hdr_ack.hdr_dst_addr.pval;
req->req_peer_size = header->hdr_ack.hdr_dst_size; req->req_peer_size = header->hdr_ack.hdr_dst_size;
frag->wait_for_ack = 0; frag->wait_for_ack = 0;
/* check if send has completed */ /* check if send has completed */
header_length =
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; frag->send_frag.frag_base.frag_header.hdr_frag.hdr_common.hdr_size;
bytes = frag->send_frag.frag_base.frag_size - header_length; bytes = frag->send_frag.frag_base.frag_size - 64; /*header_length;*/
if(frag->send_complete == 1) if(frag->send_complete == 1)
{ {
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
req, bytes ); req, bytes );
} }
} }
#if 1
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN)
{ {
request = (mca_pml_base_recv_request_t*) request = (mca_pml_base_recv_request_t*)
header->hdr_frag.hdr_dst_ptr.pval; header->hdr_ack.hdr_dst_match.pval;
/* call receive progress and indicate the recv has been completed */ /* call receive progress and indicate the recv has been completed */
printf("Calling recv_progress\n"); #if DEBUG
printf("Calling recv_progress with bytes = %d\n",header->hdr_ack.hdr_dst_size);
fflush(stdout); fflush(stdout);
#endif
ptl->super.ptl_recv_progress ( ptl->super.ptl_recv_progress (
(mca_ptl_base_module_t *) ptl, (mca_ptl_base_module_t *) ptl,
request , request ,
header->hdr_frag.hdr_frag_length, header->hdr_ack.hdr_dst_size,
header->hdr_frag.hdr_frag_length); header->hdr_ack.hdr_dst_size);
/* deregister the memory */
bytes = header->hdr_ack.hdr_dst_size;
reg_buf =(char *) header->hdr_ack.hdr_dst_addr.pval;
status = gm_deregister_memory(ptl->my_port, reg_buf,
bytes);
if(GM_SUCCESS != status)
{
#if DEBUG
ompi_output(0," unpinning memory failed\n");
#endif
}
else
{
#if DEBUG
ompi_output(0,"unpinning memory success,addr:%p,bytes:%d\n",reg_buf,bytes);
#endif
}
#if 0
/*return the recv fragment to the free list */
OMPI_FREE_LIST_RETURN(&(((mca_ptl_gm_module_t
*)ptl)->gm_recv_frags_free), (ompi_list_item_t *)recv_frag);
/* free the associated buffer */
if(recv_frag->have_allocated == true)
free(recv_frag->frag_recv.frag_base.frag_add GM_SEND_BUF_SIZE);
#endif
} }
#endif
/* XXX: will handle NACK later */ /* XXX: will handle NACK later */
@ -351,6 +401,7 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
recv_frag->matched = false; recv_frag->matched = false;
recv_frag->have_allocated_buffer = false; recv_frag->have_allocated_buffer = false;
recv_frag->ptl = ptl;
matched = ptl->super.ptl_match( &(ptl->super), matched = ptl->super.ptl_match( &(ptl->super),
&(recv_frag->frag_recv), &(recv_frag->frag_recv),
@ -358,8 +409,10 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
if( matched ) { if( matched ) {
return NULL; return NULL;
} }
#if DEBUG
ompi_output( 0,"matching receive not yet posted get tag %d comm %d source %d\n", ompi_output( 0,"matching receive not yet posted get tag %d comm %d source %d\n",
header->hdr_match.hdr_tag, header->hdr_match.hdr_contextid, header->hdr_match.hdr_src ); header->hdr_match.hdr_tag, header->hdr_match.hdr_contextid, header->hdr_match.hdr_src );
#endif
return recv_frag; return recv_frag;
} }
@ -391,6 +444,47 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_ev
return frag; return frag;
} }
void mca_ptl_gm_outstanding_recv(mca_ptl_gm_module_t *ptl)
{
mca_ptl_gm_recv_frag_t * frag = NULL;
int i, size;
bool matched;
size = ompi_list_get_size (&ptl->gm_recv_outstanding_queue);
if (size > 0)
{
frag = (mca_ptl_gm_recv_frag_t *)
ompi_list_remove_first( (ompi_list_t *)&(ptl->gm_recv_outstanding_queue) );
printf(" the frag size to be matched is %d\n",frag->frag_recv.frag_base.frag_size);
fflush(stdout);
matched = ptl->super.ptl_match( &(ptl->super),
&(frag->frag_recv),
&(frag->frag_recv.frag_base.frag_header.hdr_match) );
printf("the value of matched is %d\n", matched);
fflush(stdout);
if(!matched)
{
ompi_list_append((ompi_list_t *)&(ptl->gm_recv_outstanding_queue),
(ompi_list_item_t *) frag);
}
else
{
/* if allocated buffer, free the buffer */
/* return the recv descriptor to the free list */
OMPI_FREE_LIST_RETURN(&(ptl->gm_recv_frags_free), (ompi_list_item_t *)frag);
}
}
}
int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp) int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
{ {
@ -420,6 +514,15 @@ int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
frag->frag_recv.frag_base.frag_addr = buffer; frag->frag_recv.frag_base.frag_addr = buffer;
/* mark the fragment as having pending buffers */ /* mark the fragment as having pending buffers */
frag->have_allocated_buffer = true; frag->have_allocated_buffer = true;
#if 0
/* append to the receive queue */
ompi_list_append (&(ptl->gm_recv_outstanding_queue),
(ompi_list_item_t *) frag);
printf ("frag appended to recv_oustanding queue \n");
#endif
} }
gm_provide_receive_buffer( ptl->my_port, gm_ntohp(event->recv.buffer), gm_provide_receive_buffer( ptl->my_port, gm_ntohp(event->recv.buffer),
GM_SIZE, GM_LOW_PRIORITY ); GM_SIZE, GM_LOW_PRIORITY );
@ -431,6 +534,10 @@ int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
gm_unknown(ptl->my_port, event); gm_unknown(ptl->my_port, event);
} }
/* process the outstanding frags in the queue */
/*mca_ptl_gm_outstanding_recv(ptl); */
} }
return 0; return 0;
} }

Просмотреть файл

@ -28,6 +28,9 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl,
int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp); int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp);
void mca_ptl_gm_outstanding_recv(mca_ptl_gm_module_t *ptl);
int int
mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
mca_ptl_gm_send_frag_t *fragment, mca_ptl_gm_send_frag_t *fragment,

Просмотреть файл

@ -16,11 +16,7 @@
#include "ptl_gm_sendfrag.h" #include "ptl_gm_sendfrag.h"
#include "ptl_gm_priv.h" #include "ptl_gm_priv.h"
#define DEBUG 0
/*#define frag_header super.super.frag_header
#define frag_owner super.super.frag_owner
#define frag_peer super.super.frag_peer
#define frag_convertor super.super.frag_convertor */
static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag); static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag);
@ -77,7 +73,7 @@ mca_ptl_gm_alloc_send_frag(struct mca_ptl_base_module_t *ptl,
frag = (mca_ptl_gm_send_frag_t *)item; frag = (mca_ptl_gm_send_frag_t *)item;
frag->req = (struct mca_pml_base_send_request_t *)sendreq; frag->req = (struct mca_pml_base_send_request_t *)sendreq;
frag->type = 0 ;/* XXX: should be EAGER_SEND; */ frag->type = 0 ;
return frag; return frag;
} }
@ -102,10 +98,11 @@ int mca_ptl_gm_send_ack_init(
char * buffer, char * buffer,
int size) int size)
{ {
int header_length;
mca_ptl_base_header_t * hdr; mca_ptl_base_header_t * hdr;
mca_pml_base_recv_request_t *request; mca_pml_base_recv_request_t *request;
hdr = (mca_ptl_base_header_t *)ack->send_buf; hdr = (mca_ptl_base_header_t *)ack->send_buf;
printf("ack buf is %p\n",ack->send_buf);
fflush(stdout);
request = frag->frag_recv.frag_request; request = frag->frag_recv.frag_request;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_flags = 0;
@ -113,15 +110,13 @@ int mca_ptl_gm_send_ack_init(
hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr; hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr;
hdr->hdr_ack.hdr_dst_match.lval = 0; hdr->hdr_ack.hdr_dst_match.lval = 0;
hdr->hdr_ack.hdr_dst_match.pval = request; hdr->hdr_ack.hdr_dst_match.pval = request; /*should this be dst_match */
hdr->hdr_ack.hdr_dst_addr.lval = 0; hdr->hdr_ack.hdr_dst_addr.lval = 0; /*we are filling both p and val of
hdr->hdr_ack.hdr_dst_addr.pval = (void *)buffer;/*request->req_base.req_addr;*/ dest addrees */
/*posted registered buffer */ hdr->hdr_ack.hdr_dst_addr.pval = (void *)buffer;
hdr->hdr_ack.hdr_dst_size = size; hdr->hdr_ack.hdr_dst_size = size;
/*size of registered buffer */
ack->send_frag.frag_request = 0; ack->send_frag.frag_request = 0;
ack->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer; ack->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer;
ack->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)ptl; ack->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)ptl;
ack->send_frag.frag_base.frag_addr = NULL; ack->send_frag.frag_base.frag_addr = NULL;
@ -130,49 +125,13 @@ int mca_ptl_gm_send_ack_init(
ack->ptl = ptl; ack->ptl = ptl;
ack->send_frag.frag_base.frag_header = *hdr; ack->send_frag.frag_base.frag_header = *hdr;
ack->wait_for_ack = 0; ack->wait_for_ack = 0;
header_length = sizeof(mca_ptl_base_ack_header_t);
/* need to add registered buffer information */
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/*
int mca_ptl_gm_send_fini_init(
mca_ptl_gm_send_frag_t* fini,
mca_ptl_gm_module_t *ptl,
mca_ptl_gm_peer_t* ptl_peer,
mca_pml_base_send_request_t* request)
{
#if 1
int header_length;
mca_ptl_base_header_t * hdr;
hdr = (mca_ptl_base_header_t *)fini->send_buf;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
hdr->hdr_ack.hdr_dst_match.lval = 0;
hdr->hdr_ack.hdr_dst_addr.lval = 0;
fini->send_frag.frag_request = 0;
fini->send_frag.frag_base.frag_peer = ptl_peer;
fini->send_frag.frag_base.frag_owner = ptl;
fini->send_frag.frag_base.frag_addr = NULL;
fini->send_frag.frag_base.frag_size = 0;
fini->ptl = ptl;
fini->wait_for_ack = 0;
header_length = sizeof(mca_ptl_base_ack_header_t);
#endif
return OMPI_SUCCESS;
}
*/
int mca_ptl_gm_put_frag_init( int mca_ptl_gm_put_frag_init(
mca_ptl_gm_send_frag_t* putfrag, mca_ptl_gm_send_frag_t* putfrag,
mca_ptl_gm_peer_t * ptl_peer, mca_ptl_gm_peer_t * ptl_peer,
@ -183,35 +142,30 @@ int mca_ptl_gm_put_frag_init(
int flags) int flags)
{ {
mca_ptl_base_header_t *hdr; mca_ptl_base_header_t *hdr;
void * buffer;
int header_length;
#if 1
hdr = (mca_ptl_base_header_t *)putfrag->send_buf; hdr = (mca_ptl_base_header_t *)putfrag->send_buf;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
hdr->hdr_ack.hdr_dst_match.lval = 0; hdr->hdr_ack.hdr_dst_match.lval = 0;
/*hdr->hdr_ack.hdr_dst_match.pval = request->req_peer_match;*/ hdr->hdr_ack.hdr_dst_match.pval = request->req_peer_match.pval;
hdr->hdr_ack.hdr_dst_addr.lval = 0; hdr->hdr_ack.hdr_dst_addr.lval = 0;
hdr->hdr_ack.hdr_dst_addr.pval = (void *)(request->req_base.req_addr); hdr->hdr_ack.hdr_dst_addr.pval = (void *)(request->req_peer_addr.pval);
hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed; hdr->hdr_ack.hdr_dst_size = *size;
putfrag->send_frag.frag_request = request;
putfrag->send_frag.frag_request = request; /* XXX: check this */ putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer;
putfrag->send_frag.frag_base.frag_peer = ptl_peer;
putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)gm_ptl; putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)gm_ptl;
putfrag->send_frag.frag_base.frag_addr = NULL; putfrag->send_frag.frag_base.frag_addr = NULL;
putfrag->send_frag.frag_base.frag_size = 0; putfrag->send_frag.frag_base.frag_size = 0;
putfrag->ptl = gm_ptl; putfrag->ptl = gm_ptl;
#endif
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
putfrag->send_frag.frag_base.frag_size = *size;
putfrag->ptl = gm_ptl;
putfrag->wait_for_ack = 0; putfrag->wait_for_ack = 0;
putfrag->put_sent = 0; putfrag->put_sent = 0;
putfrag->type = 1;
putfrag->req = request; /* gm_send_request */
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -237,11 +191,12 @@ int mca_ptl_gm_send_frag_init(
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = offset; hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0; hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_dst_ptr.lval = 0; hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* pointer to the frag */ hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* pointer to the frag */
hdr->hdr_frag.hdr_dst_ptr.lval = 0; hdr->hdr_frag.hdr_frag_length = *size;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
@ -262,16 +217,6 @@ int mca_ptl_gm_send_frag_init(
header_length = sizeof (mca_ptl_base_frag_header_t); header_length = sizeof (mca_ptl_base_frag_header_t);
} }
/*initialize convertor */
#if 0
/*fragment state*/
sendfrag->frag_base.frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->frag_base.frag_peer = ptl_peer;
sendfrag->frag_base.frag_addr = NULL;
sendfrag->frag_base.frag_size = *size;
#endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -57,6 +57,8 @@ struct mca_ptl_gm_recv_frag_t {
bool frag_ack_pending; bool frag_ack_pending;
void *alloc_recv_buffer; void *alloc_recv_buffer;
void *unex_recv_buffer; void *unex_recv_buffer;
void * registered_buf;
mca_ptl_gm_module_t *ptl;
bool matched; bool matched;
bool have_allocated_buffer; bool have_allocated_buffer;
}; };