a preliminary implementation of put
This commit was SVN r2401.
Этот коммит содержится в:
родитель
345e7d295f
Коммит
722867c82d
@ -243,6 +243,7 @@ mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl,
|
|||||||
}
|
}
|
||||||
|
|
||||||
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =sendfrag;
|
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =sendfrag;
|
||||||
|
((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags;
|
||||||
rc = mca_ptl_gm_send_frag_init (sendfrag, (mca_ptl_gm_peer_t*)ptl_peer, sendreq, offset,
|
rc = mca_ptl_gm_send_frag_init (sendfrag, (mca_ptl_gm_peer_t*)ptl_peer, sendreq, offset,
|
||||||
&size, flags);
|
&size, flags);
|
||||||
|
|
||||||
@ -271,6 +272,61 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
|
|||||||
struct mca_pml_base_send_request_t *sendreq,
|
struct mca_pml_base_send_request_t *sendreq,
|
||||||
size_t offset, size_t size, int flags)
|
size_t offset, size_t size, int flags)
|
||||||
{
|
{
|
||||||
|
int rc;
|
||||||
|
mca_ptl_gm_send_frag_t *sendfrag, *putfrag;
|
||||||
|
mca_ptl_gm_peer_t *gm_ptl_peer;
|
||||||
|
mca_ptl_gm_module_t * gm_ptl;
|
||||||
|
void* destination_buffer;
|
||||||
|
char * buffer_ptr;
|
||||||
|
int status, bytes_reg;
|
||||||
|
|
||||||
|
gm_ptl= (mca_ptl_gm_module_t *)ptl;
|
||||||
|
buffer_ptr = ((char *) (sendreq->req_base.req_addr)) + offset ;
|
||||||
|
bytes_reg = size;
|
||||||
|
|
||||||
|
destination_buffer =(void *)( (sendreq->req_peer_addr).pval);
|
||||||
|
|
||||||
|
/* register the user buffer */
|
||||||
|
if (offset > 0)
|
||||||
|
{
|
||||||
|
status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg);
|
||||||
|
if(GM_SUCCESS != status)
|
||||||
|
{
|
||||||
|
ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
putfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq); /*alloc_put_frag */
|
||||||
|
putfrag->registered_buf = (void *)buffer_ptr;
|
||||||
|
putfrag->peer = (mca_ptl_gm_peer_t *)ptl_peer;
|
||||||
|
|
||||||
|
rc = mca_ptl_gm_put_frag_init(putfrag ,
|
||||||
|
(mca_ptl_gm_peer_t*)ptl_peer,gm_ptl,
|
||||||
|
sendreq, offset, &size, flags);
|
||||||
|
|
||||||
|
rc =
|
||||||
|
mca_ptl_gm_peer_put((mca_ptl_gm_peer_t *)ptl_peer, putfrag,
|
||||||
|
sendreq, offset, &size, flags,
|
||||||
|
destination_buffer, bytes_reg);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
gm_ptl->num_send_tokens--;
|
||||||
|
|
||||||
|
/*do a send to notify completion */
|
||||||
|
/*sendfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq);*/
|
||||||
|
|
||||||
|
/*rc = mca_ptl_gm_send_fini_init (sendfrag,gm_ptl, */
|
||||||
|
/*(mca_ptl_gm_peer_t*)ptl_peer, sendreq);*/
|
||||||
|
|
||||||
|
/*gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer;*/
|
||||||
|
/*rc = mca_ptl_gm_peer_send (gm_ptl_peer,sendfrag,sendreq,*/
|
||||||
|
/*offset,&size,flags);*/
|
||||||
|
|
||||||
|
/*gm_ptl->num_send_tokens--;*/
|
||||||
|
sendreq->req_offset += size;
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -299,46 +355,78 @@ mca_ptl_gm_get (struct mca_ptl_base_module_t *ptl,
|
|||||||
* ack back to the peer and process the fragment.
|
* ack back to the peer and process the fragment.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
|
mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
|
||||||
mca_ptl_base_recv_frag_t * frag )
|
mca_ptl_base_recv_frag_t * frag )
|
||||||
{
|
{
|
||||||
mca_pml_base_recv_request_t *request;
|
mca_pml_base_recv_request_t *request;
|
||||||
/*mca_ptl_base_recv_request_t *request;*/
|
mca_pml_base_send_request_t *srequest;
|
||||||
mca_ptl_base_header_t *header;
|
mca_ptl_base_header_t *header;
|
||||||
int bytes_recv, rc;
|
int bytes_recv, rc,rc1, total_bytes, bytes_reg;
|
||||||
mca_ptl_gm_module_t *gm_ptl;
|
mca_ptl_gm_module_t *gm_ptl;
|
||||||
struct iovec iov[1];
|
struct iovec iov[1];
|
||||||
|
mca_ptl_gm_send_frag_t *ack;
|
||||||
|
mca_ptl_gm_recv_frag_t *recv_frag;
|
||||||
|
char *buffer_ptr;
|
||||||
|
gm_status_t status;
|
||||||
|
size_t size = 0;
|
||||||
|
|
||||||
header = &frag->frag_base.frag_header;
|
header = &frag->frag_base.frag_header;
|
||||||
request = frag->frag_request;
|
request = frag->frag_request;
|
||||||
|
gm_ptl = (mca_ptl_gm_module_t *)ptl;
|
||||||
|
|
||||||
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
|
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
|
||||||
#if 0
|
#if 1
|
||||||
/* might need to send an ack back */
|
/* might need to send an ack back */
|
||||||
int rc;
|
|
||||||
mca_ptl_gm_send_frag_t *ack;
|
|
||||||
recv_frag = (mca_ptl_gm_recv_frag_t *) frag;
|
recv_frag = (mca_ptl_gm_recv_frag_t *) frag;
|
||||||
ack = mca_ptl_gm_alloc_send_frag(ptl,NULL);
|
ack = mca_ptl_gm_alloc_send_frag(ptl,NULL);
|
||||||
|
|
||||||
if (NULL == ack) {
|
if (NULL == ack) {
|
||||||
ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n",
|
ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n",
|
||||||
__FILE__,___LINE__);
|
__FILE__,__LINE__);
|
||||||
OMPI_THREAD_LOCK (&mca_ptl_gm_module.gm_lock);
|
OMPI_THREAD_LOCK (&mca_ptl_gm_component.gm_lock);
|
||||||
recv_frag->frag_ack_pending = true;
|
recv_frag->frag_ack_pending = true;
|
||||||
ompi_list_append (&mca_ptl_gm_module.gm_pending_acks,
|
ompi_list_append (&mca_ptl_gm_module.gm_pending_acks,
|
||||||
(ompi_list_item_t *) frag);
|
(ompi_list_item_t *) frag);
|
||||||
OMPI_THREAD_UNLOCK (&mca_ptl_gm_module.gm_lock);
|
OMPI_THREAD_UNLOCK (&mca_ptl_gm_component.gm_lock);
|
||||||
} else {
|
}
|
||||||
mca_ptl_gm_send_frag_init_ack (ack, ptl,
|
else
|
||||||
recv_frag->super.super.
|
{
|
||||||
frag_peer, recv_frag);
|
|
||||||
|
buffer_ptr = (char *)( request->req_base.req_addr );
|
||||||
|
total_bytes = (request->req_base.req_datatype->size) *
|
||||||
|
(request->req_base.req_count);
|
||||||
|
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
|
||||||
|
bytes_reg = total_bytes - bytes_recv;
|
||||||
|
buffer_ptr += bytes_recv;
|
||||||
|
status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg);
|
||||||
|
if(GM_SUCCESS != status)
|
||||||
|
{
|
||||||
|
ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* send the registered memory information, send recv request * ptr */
|
||||||
|
rc1 = mca_ptl_gm_send_ack_init (ack, gm_ptl,
|
||||||
|
(mca_ptl_gm_peer_t *)(recv_frag->frag_recv.frag_base.frag_peer)
|
||||||
|
, recv_frag, buffer_ptr, bytes_reg);
|
||||||
|
|
||||||
|
/*XXX : put the registered memory in pin-down cache */
|
||||||
|
|
||||||
/*XXX: check this*/
|
/*XXX: check this*/
|
||||||
mca_ptl_gm_peer_send (ack->super.super.frag_peer, ack,0,0,0 );
|
rc1 = mca_ptl_gm_peer_send ((mca_ptl_gm_peer_t *)
|
||||||
|
(ack->send_frag.frag_base.frag_peer),
|
||||||
|
ack,srequest,0,&size,0 );
|
||||||
|
|
||||||
|
gm_ptl->num_send_tokens--;
|
||||||
|
ompi_list_append (&(gm_ptl->gm_send_frags_queue),
|
||||||
|
(ompi_list_item_t *) ack);
|
||||||
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Here we expect that frag_addr is the beging of the buffer header included */
|
/* Here we expect that frag_addr is the begin of the buffer header included */
|
||||||
iov[0].iov_base = ((char*)frag->frag_base.frag_addr) + header->hdr_common.hdr_size;
|
iov[0].iov_base = ((char*)frag->frag_base.frag_addr) + header->hdr_common.hdr_size;
|
||||||
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
|
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
|
||||||
iov[0].iov_len = bytes_recv;
|
iov[0].iov_len = bytes_recv;
|
||||||
@ -361,7 +449,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
|
|||||||
header->hdr_frag.hdr_frag_offset);
|
header->hdr_frag.hdr_frag_offset);
|
||||||
rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1);
|
rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1);
|
||||||
assert( rc == 1 );
|
assert( rc == 1 );
|
||||||
}
|
}
|
||||||
|
|
||||||
/*update progress*/ /* XXX : check this */
|
/*update progress*/ /* XXX : check this */
|
||||||
ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len );
|
ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len );
|
||||||
@ -376,3 +464,6 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
|
|||||||
gm_ptl = (mca_ptl_gm_module_t *)ptl;
|
gm_ptl = (mca_ptl_gm_module_t *)ptl;
|
||||||
OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag);
|
OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -114,6 +114,9 @@ mca_ptl_gm_component_open (void)
|
|||||||
mca_ptl_gm_param_register_int ("first_frag_size", 16 * 1024);
|
mca_ptl_gm_param_register_int ("first_frag_size", 16 * 1024);
|
||||||
mca_ptl_gm_module.super.ptl_min_frag_size =
|
mca_ptl_gm_module.super.ptl_min_frag_size =
|
||||||
mca_ptl_gm_param_register_int ("min_frag_size", 1<<16);
|
mca_ptl_gm_param_register_int ("min_frag_size", 1<<16);
|
||||||
|
mca_ptl_gm_module.super.ptl_max_frag_size =
|
||||||
|
mca_ptl_gm_param_register_int ("max_frag_size", 256 * 1024);
|
||||||
|
|
||||||
mca_ptl_gm_component.gm_free_list_num =
|
mca_ptl_gm_component.gm_free_list_num =
|
||||||
mca_ptl_gm_param_register_int ("free_list_num", 32);
|
mca_ptl_gm_param_register_int ("free_list_num", 32);
|
||||||
mca_ptl_gm_component.gm_free_list_inc =
|
mca_ptl_gm_component.gm_free_list_inc =
|
||||||
|
@ -16,12 +16,39 @@
|
|||||||
#include "mca/pml/base/pml_base_sendreq.h"
|
#include "mca/pml/base/pml_base_sendreq.h"
|
||||||
#include "mca/ns/base/base.h"
|
#include "mca/ns/base/base.h"
|
||||||
#include "ptl_gm.h"
|
#include "ptl_gm.h"
|
||||||
|
#include "ptl_gm_req.h"
|
||||||
#include "ptl_gm_addr.h"
|
#include "ptl_gm_addr.h"
|
||||||
#include "ptl_gm_peer.h"
|
#include "ptl_gm_peer.h"
|
||||||
#include "ptl_gm_proc.h"
|
#include "ptl_gm_proc.h"
|
||||||
#include "ptl_gm_sendfrag.h"
|
#include "ptl_gm_sendfrag.h"
|
||||||
#include "ptl_gm_priv.h"
|
#include "ptl_gm_priv.h"
|
||||||
|
|
||||||
|
int mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer,
|
||||||
|
mca_ptl_gm_send_frag_t *fragment,
|
||||||
|
struct mca_pml_base_send_request_t *sendreq,
|
||||||
|
size_t offset,
|
||||||
|
size_t *size,
|
||||||
|
int flags,
|
||||||
|
void * target_buffer,
|
||||||
|
int bytes)
|
||||||
|
{
|
||||||
|
|
||||||
|
gm_put( ptl_peer->peer_ptl->my_port, fragment->registered_buf,
|
||||||
|
(gm_remote_ptr_t) target_buffer,bytes, GM_LOW_PRIORITY,
|
||||||
|
ptl_peer->local_id, ptl_peer->port_number,
|
||||||
|
put_callback, (void *)fragment );
|
||||||
|
|
||||||
|
|
||||||
|
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
|
||||||
|
fragment->send_frag.frag_base.frag_peer =
|
||||||
|
(struct mca_ptl_base_peer_t*)ptl_peer;
|
||||||
|
fragment->send_frag.frag_base.frag_addr =(void *)target_buffer;
|
||||||
|
fragment->send_frag.frag_base.frag_size = bytes;
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
|
int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
|
||||||
mca_ptl_gm_send_frag_t *fragment,
|
mca_ptl_gm_send_frag_t *fragment,
|
||||||
struct mca_pml_base_send_request_t *sendreq,
|
struct mca_pml_base_send_request_t *sendreq,
|
||||||
@ -40,8 +67,11 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
|
|||||||
size_in = *size;
|
size_in = *size;
|
||||||
|
|
||||||
outvec[0].iov_base = (char*)fragment->send_buf;
|
outvec[0].iov_base = (char*)fragment->send_buf;
|
||||||
if( (size_in + header_length) < GM_SEND_BUF_SIZE ) outvec[0].iov_len = size_in;
|
|
||||||
else outvec[0].iov_len = GM_SEND_BUF_SIZE;
|
if( (size_in + header_length) < GM_SEND_BUF_SIZE )
|
||||||
|
outvec[0].iov_len = size_in;
|
||||||
|
else
|
||||||
|
outvec[0].iov_len = GM_SEND_BUF_SIZE - header_length;
|
||||||
|
|
||||||
/*header_length = sizeof(mca_ptl_base_frag_header_t);*/
|
/*header_length = sizeof(mca_ptl_base_frag_header_t);*/
|
||||||
|
|
||||||
@ -105,6 +135,73 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void put_callback(struct gm_port *port,void * context, gm_status_t status)
|
||||||
|
{
|
||||||
|
mca_ptl_gm_module_t *ptl;
|
||||||
|
mca_ptl_gm_send_frag_t *putfrag;
|
||||||
|
/*ompi_list_t *list;*/
|
||||||
|
int bytes, header_length;
|
||||||
|
mca_pml_base_send_request_t *gm_send_req;
|
||||||
|
mca_ptl_base_frag_header_t* header;
|
||||||
|
int offset = 0;
|
||||||
|
int size = 0;
|
||||||
|
int flags = 0;
|
||||||
|
int rc;
|
||||||
|
|
||||||
|
putfrag = (mca_ptl_gm_send_frag_t *)context;
|
||||||
|
ptl = (mca_ptl_gm_module_t *)putfrag->ptl;
|
||||||
|
gm_send_req = putfrag->req;
|
||||||
|
|
||||||
|
header = (mca_ptl_base_frag_header_t*)putfrag->registered_buf;
|
||||||
|
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
|
||||||
|
/* XXX : what should the header length be */
|
||||||
|
bytes = putfrag->send_frag.frag_base.frag_size - header_length;
|
||||||
|
|
||||||
|
printf("ENTERING PUT CALLBACK\n");
|
||||||
|
fflush(stdout);
|
||||||
|
|
||||||
|
|
||||||
|
switch (status) {
|
||||||
|
case GM_SUCCESS:
|
||||||
|
/* local put completed, mark put as complete */
|
||||||
|
printf("PUTCALLBACK WITH CASE GM_SUCCESS\n");
|
||||||
|
fflush(stdout);
|
||||||
|
|
||||||
|
ptl->num_send_tokens++;
|
||||||
|
putfrag->put_sent = 1;
|
||||||
|
|
||||||
|
/* send the header information through send/receive channel */
|
||||||
|
|
||||||
|
rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,gm_send_req,
|
||||||
|
offset,&size,flags);
|
||||||
|
|
||||||
|
/* deregister the user memory */
|
||||||
|
status = gm_deregister_memory(ptl->my_port, (char *)(putfrag->registered_buf), bytes);
|
||||||
|
|
||||||
|
if(GM_SUCCESS != status)
|
||||||
|
ompi_output(0," unpinning memory failed\n");
|
||||||
|
else
|
||||||
|
ompi_output(0," unpinning memory success\n");
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
case GM_SEND_TIMED_OUT:
|
||||||
|
/* need to take care of retransmission */
|
||||||
|
break;
|
||||||
|
|
||||||
|
case GM_SEND_DROPPED:
|
||||||
|
/* need to handle this case */
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
ompi_output(0,
|
||||||
|
"[%s:%d] error in message completion\n",__FILE__,__LINE__);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void send_callback(struct gm_port *port,void * context, gm_status_t status)
|
void send_callback(struct gm_port *port,void * context, gm_status_t status)
|
||||||
{
|
{
|
||||||
mca_ptl_gm_module_t *ptl;
|
mca_ptl_gm_module_t *ptl;
|
||||||
@ -115,18 +212,44 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
|
|||||||
mca_ptl_base_frag_header_t* header;
|
mca_ptl_base_frag_header_t* header;
|
||||||
|
|
||||||
frag = (mca_ptl_gm_send_frag_t *)context;
|
frag = (mca_ptl_gm_send_frag_t *)context;
|
||||||
ptl = (mca_ptl_gm_module_t *)frag->ptl;
|
/*ptl = (mca_ptl_gm_module_t *)frag->ptl;*/
|
||||||
|
ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner;
|
||||||
gm_send_req = frag->req;
|
gm_send_req = frag->req;
|
||||||
|
|
||||||
header = (mca_ptl_base_frag_header_t*)frag->send_buf;
|
header = (mca_ptl_base_frag_header_t*)frag->send_buf;
|
||||||
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
|
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
|
||||||
bytes = frag->send_frag.frag_base.frag_size - header_length;
|
bytes = frag->send_frag.frag_base.frag_size - 64;/*header_length;*/
|
||||||
|
|
||||||
|
if (NULL != gm_send_req)
|
||||||
|
{
|
||||||
|
if(1 == (( mca_ptl_gm_send_request_t *)gm_send_req)->need_ack )
|
||||||
|
frag->wait_for_ack = 1;
|
||||||
|
}
|
||||||
|
printf("ENTERING SEND CALLBACK\n");
|
||||||
|
fflush(stdout);
|
||||||
|
|
||||||
switch (status) {
|
switch (status) {
|
||||||
case GM_SUCCESS:
|
case GM_SUCCESS:
|
||||||
/* send completed, can reuse the user buffer */
|
/* send completed, can reuse the user buffer */
|
||||||
|
printf("SENDCALLBACK WITH CASE GM_SUCCESS\n");
|
||||||
|
fflush(stdout);
|
||||||
ptl->num_send_tokens++;
|
ptl->num_send_tokens++;
|
||||||
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, gm_send_req, bytes );
|
frag->send_complete = 1;
|
||||||
|
/*
|
||||||
|
while (1 == (frag->wait_for_ack))
|
||||||
|
{
|
||||||
|
|
||||||
|
mca_ptl_gm_incoming_recv (&mca_ptl_gm_component);
|
||||||
|
//This is recursive
|
||||||
|
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
if (frag->wait_for_ack == 0 && (gm_send_req != NULL))
|
||||||
|
{
|
||||||
|
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
|
||||||
|
gm_send_req, bytes );
|
||||||
|
|
||||||
|
}
|
||||||
list = (ompi_list_t *)(&(ptl->gm_send_frags_queue));
|
list = (ompi_list_t *)(&(ptl->gm_send_frags_queue));
|
||||||
ompi_list_remove_first(list);
|
ompi_list_remove_first(list);
|
||||||
break;
|
break;
|
||||||
@ -151,12 +274,53 @@ void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
|
|||||||
{
|
{
|
||||||
mca_ptl_gm_send_frag_t * frag;
|
mca_ptl_gm_send_frag_t * frag;
|
||||||
mca_pml_base_send_request_t *req;
|
mca_pml_base_send_request_t *req;
|
||||||
|
mca_pml_base_recv_request_t *request;
|
||||||
|
int header_length, bytes;
|
||||||
|
|
||||||
|
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK)
|
||||||
|
{
|
||||||
frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval;
|
frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval;
|
||||||
req = (mca_pml_base_send_request_t *) frag->req;
|
req = (mca_pml_base_send_request_t *) frag->req;
|
||||||
req->req_peer_match = header->hdr_ack.hdr_dst_match;
|
req->req_peer_match = header->hdr_ack.hdr_dst_match;
|
||||||
|
req->req_peer_addr = header->hdr_ack.hdr_dst_addr;
|
||||||
|
req->req_peer_size = header->hdr_ack.hdr_dst_size;
|
||||||
|
|
||||||
|
frag->wait_for_ack = 0;
|
||||||
|
|
||||||
|
/* check if send has completed */
|
||||||
|
|
||||||
|
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
|
||||||
|
bytes = frag->send_frag.frag_base.frag_size - header_length;
|
||||||
|
|
||||||
|
if(frag->send_complete == 1)
|
||||||
|
{
|
||||||
|
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
|
||||||
|
req, bytes );
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN)
|
||||||
|
{
|
||||||
|
|
||||||
|
request = (mca_pml_base_recv_request_t*)
|
||||||
|
header->hdr_frag.hdr_dst_ptr.pval;
|
||||||
|
/* call receive progress and indicate the recv has been completed */
|
||||||
|
|
||||||
|
printf("Calling recv_progress\n");
|
||||||
|
fflush(stdout);
|
||||||
|
ptl->super.ptl_recv_progress (
|
||||||
|
(mca_ptl_base_module_t *) ptl,
|
||||||
|
request,
|
||||||
|
header->hdr_frag.hdr_frag_length,
|
||||||
|
header->hdr_frag.hdr_frag_length);
|
||||||
|
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* XXX: will handle NACK later */
|
||||||
|
|
||||||
/* return the send fragment to the free list */
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
|
mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
|
||||||
@ -216,6 +380,7 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_ev
|
|||||||
|
|
||||||
case MCA_PTL_HDR_TYPE_ACK:
|
case MCA_PTL_HDR_TYPE_ACK:
|
||||||
case MCA_PTL_HDR_TYPE_NACK:
|
case MCA_PTL_HDR_TYPE_NACK:
|
||||||
|
case MCA_PTL_HDR_TYPE_FIN:
|
||||||
ptl_gm_ctrl_frag(ptl,header);
|
ptl_gm_ctrl_frag(ptl,header);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -36,8 +36,21 @@ mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
|
|||||||
size_t *size,
|
size_t *size,
|
||||||
int flags);
|
int flags);
|
||||||
|
|
||||||
|
int
|
||||||
|
mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer,
|
||||||
|
mca_ptl_gm_send_frag_t *fragment,
|
||||||
|
struct mca_pml_base_send_request_t *sendreq,
|
||||||
|
size_t offset,
|
||||||
|
size_t *size,
|
||||||
|
int flags,
|
||||||
|
void *target_buffer,
|
||||||
|
int bytes);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void send_callback(struct gm_port *port,void * context, gm_status_t
|
void send_callback(struct gm_port *port,void * context, gm_status_t
|
||||||
status);
|
status);
|
||||||
|
|
||||||
|
void put_callback(struct gm_port *port,void * context, gm_status_t
|
||||||
|
status);
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ struct mca_ptl_gm_send_request_t {
|
|||||||
mca_pml_base_send_request_t super;
|
mca_pml_base_send_request_t super;
|
||||||
/* add stuff here */
|
/* add stuff here */
|
||||||
mca_ptl_gm_send_frag_t *req_frag;
|
mca_ptl_gm_send_frag_t *req_frag;
|
||||||
|
int need_ack;
|
||||||
};
|
};
|
||||||
typedef struct mca_ptl_gm_send_request_t mca_ptl_gm_send_request_t;
|
typedef struct mca_ptl_gm_send_request_t mca_ptl_gm_send_request_t;
|
||||||
|
|
||||||
|
@ -17,10 +17,10 @@
|
|||||||
#include "ptl_gm_priv.h"
|
#include "ptl_gm_priv.h"
|
||||||
|
|
||||||
|
|
||||||
#define frag_header super.super.frag_header
|
/*#define frag_header super.super.frag_header
|
||||||
#define frag_owner super.super.frag_owner
|
#define frag_owner super.super.frag_owner
|
||||||
#define frag_peer super.super.frag_peer
|
#define frag_peer super.super.frag_peer
|
||||||
#define frag_convertor super.super.frag_convertor
|
#define frag_convertor super.super.frag_convertor */
|
||||||
|
|
||||||
|
|
||||||
static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag);
|
static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag);
|
||||||
@ -83,6 +83,141 @@ mca_ptl_gm_alloc_send_frag(struct mca_ptl_base_module_t *ptl,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int mca_ptl_gm_send_frag_done(
|
||||||
|
mca_ptl_gm_send_frag_t * frag,
|
||||||
|
mca_pml_base_send_request_t * req)
|
||||||
|
{
|
||||||
|
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int mca_ptl_gm_send_ack_init(
|
||||||
|
struct mca_ptl_gm_send_frag_t* ack,
|
||||||
|
mca_ptl_gm_module_t *ptl,
|
||||||
|
mca_ptl_gm_peer_t* ptl_peer,
|
||||||
|
struct mca_ptl_gm_recv_frag_t* frag,
|
||||||
|
char * buffer,
|
||||||
|
int size)
|
||||||
|
{
|
||||||
|
int header_length;
|
||||||
|
mca_ptl_base_header_t * hdr;
|
||||||
|
mca_pml_base_recv_request_t *request;
|
||||||
|
hdr = (mca_ptl_base_header_t *)ack->send_buf;
|
||||||
|
request = frag->frag_recv.frag_request;
|
||||||
|
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
|
||||||
|
hdr->hdr_common.hdr_flags = 0;
|
||||||
|
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
|
||||||
|
|
||||||
|
hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr;
|
||||||
|
hdr->hdr_ack.hdr_dst_match.lval = 0;
|
||||||
|
hdr->hdr_ack.hdr_dst_match.pval = request;
|
||||||
|
hdr->hdr_ack.hdr_dst_addr.lval = 0;
|
||||||
|
hdr->hdr_ack.hdr_dst_addr.pval = (void *)buffer;/*request->req_base.req_addr;*/
|
||||||
|
/*posted registered buffer */
|
||||||
|
hdr->hdr_ack.hdr_dst_size = size;
|
||||||
|
/*size of registered buffer */
|
||||||
|
|
||||||
|
ack->send_frag.frag_request = 0;
|
||||||
|
|
||||||
|
ack->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer;
|
||||||
|
ack->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)ptl;
|
||||||
|
ack->send_frag.frag_base.frag_addr = NULL;
|
||||||
|
ack->send_frag.frag_base.frag_size = 0;
|
||||||
|
ack->status = 1; /* was able to register memory */
|
||||||
|
ack->ptl = ptl;
|
||||||
|
ack->send_frag.frag_base.frag_header = *hdr;
|
||||||
|
ack->wait_for_ack = 0;
|
||||||
|
header_length = sizeof(mca_ptl_base_ack_header_t);
|
||||||
|
|
||||||
|
/* need to add registered buffer information */
|
||||||
|
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
int mca_ptl_gm_send_fini_init(
|
||||||
|
mca_ptl_gm_send_frag_t* fini,
|
||||||
|
mca_ptl_gm_module_t *ptl,
|
||||||
|
mca_ptl_gm_peer_t* ptl_peer,
|
||||||
|
mca_pml_base_send_request_t* request)
|
||||||
|
{
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
int header_length;
|
||||||
|
mca_ptl_base_header_t * hdr;
|
||||||
|
hdr = (mca_ptl_base_header_t *)fini->send_buf;
|
||||||
|
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
|
||||||
|
hdr->hdr_common.hdr_flags = 0;
|
||||||
|
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
|
||||||
|
hdr->hdr_ack.hdr_dst_match.lval = 0;
|
||||||
|
hdr->hdr_ack.hdr_dst_addr.lval = 0;
|
||||||
|
|
||||||
|
fini->send_frag.frag_request = 0;
|
||||||
|
fini->send_frag.frag_base.frag_peer = ptl_peer;
|
||||||
|
fini->send_frag.frag_base.frag_owner = ptl;
|
||||||
|
fini->send_frag.frag_base.frag_addr = NULL;
|
||||||
|
fini->send_frag.frag_base.frag_size = 0;
|
||||||
|
fini->ptl = ptl;
|
||||||
|
|
||||||
|
fini->wait_for_ack = 0;
|
||||||
|
header_length = sizeof(mca_ptl_base_ack_header_t);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
int mca_ptl_gm_put_frag_init(
|
||||||
|
mca_ptl_gm_send_frag_t* putfrag,
|
||||||
|
mca_ptl_gm_peer_t * ptl_peer,
|
||||||
|
mca_ptl_gm_module_t * gm_ptl,
|
||||||
|
mca_pml_base_send_request_t * request,
|
||||||
|
size_t offset,
|
||||||
|
size_t* size,
|
||||||
|
int flags)
|
||||||
|
{
|
||||||
|
mca_ptl_base_header_t *hdr;
|
||||||
|
void * buffer;
|
||||||
|
int header_length;
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
hdr = (mca_ptl_base_header_t *)putfrag->send_buf;
|
||||||
|
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
|
||||||
|
hdr->hdr_common.hdr_flags = 0;
|
||||||
|
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
|
||||||
|
hdr->hdr_ack.hdr_dst_match.lval = 0;
|
||||||
|
/*hdr->hdr_ack.hdr_dst_match.pval = request->req_peer_match;*/
|
||||||
|
hdr->hdr_ack.hdr_dst_addr.lval = 0;
|
||||||
|
hdr->hdr_ack.hdr_dst_addr.pval = (void *)(request->req_base.req_addr);
|
||||||
|
hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed;
|
||||||
|
|
||||||
|
|
||||||
|
putfrag->send_frag.frag_request = request; /* XXX: check this */
|
||||||
|
putfrag->send_frag.frag_base.frag_peer = ptl_peer;
|
||||||
|
putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)gm_ptl;
|
||||||
|
putfrag->send_frag.frag_base.frag_addr = NULL;
|
||||||
|
putfrag->send_frag.frag_base.frag_size = 0;
|
||||||
|
putfrag->ptl = gm_ptl;
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
|
||||||
|
putfrag->send_frag.frag_base.frag_size = *size;
|
||||||
|
putfrag->ptl = gm_ptl;
|
||||||
|
putfrag->wait_for_ack = 0;
|
||||||
|
putfrag->put_sent = 0;
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int mca_ptl_gm_send_frag_init(
|
int mca_ptl_gm_send_frag_init(
|
||||||
mca_ptl_gm_send_frag_t* sendfrag,
|
mca_ptl_gm_send_frag_t* sendfrag,
|
||||||
mca_ptl_gm_peer_t * ptl_peer,
|
mca_ptl_gm_peer_t * ptl_peer,
|
||||||
|
@ -31,11 +31,16 @@ OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t);
|
|||||||
struct mca_ptl_gm_send_frag_t {
|
struct mca_ptl_gm_send_frag_t {
|
||||||
mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */
|
mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */
|
||||||
void * send_buf;
|
void * send_buf;
|
||||||
|
void * registered_buf;
|
||||||
mca_pml_base_send_request_t *req;
|
mca_pml_base_send_request_t *req;
|
||||||
mca_ptl_gm_module_t *ptl;
|
mca_ptl_gm_module_t *ptl;
|
||||||
/*mca_ptl_gm_peer_t *peer;*/
|
mca_ptl_gm_peer_t *peer;
|
||||||
|
|
||||||
int status;
|
int status;
|
||||||
int type;
|
int type;
|
||||||
|
int wait_for_ack;
|
||||||
|
int put_sent;
|
||||||
|
int send_complete;
|
||||||
};
|
};
|
||||||
typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t;
|
typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t;
|
||||||
|
|
||||||
@ -65,6 +70,32 @@ mca_ptl_gm_alloc_send_frag ( struct mca_ptl_base_module_t *ptl,
|
|||||||
struct mca_pml_base_send_request_t *sendreq);
|
struct mca_pml_base_send_request_t *sendreq);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int mca_ptl_gm_send_ack_init(
|
||||||
|
struct mca_ptl_gm_send_frag_t* ack,
|
||||||
|
mca_ptl_gm_module_t *ptl,
|
||||||
|
mca_ptl_gm_peer_t* ptl_peer,
|
||||||
|
struct mca_ptl_gm_recv_frag_t* frag,
|
||||||
|
char * buffer,
|
||||||
|
int size);
|
||||||
|
|
||||||
|
/* int mca_ptl_gm_send_fini_init(
|
||||||
|
mca_ptl_gm_send_frag_t* fini,
|
||||||
|
mca_ptl_gm_module_t *ptl,
|
||||||
|
mca_ptl_gm_peer_t* ptl_peer,
|
||||||
|
mca_pml_base_send_request_t * sendreq);
|
||||||
|
*/
|
||||||
|
|
||||||
|
int
|
||||||
|
mca_ptl_gm_put_frag_init( mca_ptl_gm_send_frag_t* sendfrag,
|
||||||
|
mca_ptl_gm_peer_t * ptl_peer,
|
||||||
|
mca_ptl_gm_module_t *ptl,
|
||||||
|
mca_pml_base_send_request_t * sendreq,
|
||||||
|
size_t offset,
|
||||||
|
size_t* size,
|
||||||
|
int flags);
|
||||||
|
|
||||||
|
|
||||||
int
|
int
|
||||||
mca_ptl_gm_send_frag_init( mca_ptl_gm_send_frag_t* sendfrag,
|
mca_ptl_gm_send_frag_init( mca_ptl_gm_send_frag_t* sendfrag,
|
||||||
mca_ptl_gm_peer_t * ptl_peer,
|
mca_ptl_gm_peer_t * ptl_peer,
|
||||||
@ -74,6 +105,10 @@ int
|
|||||||
int flags);
|
int flags);
|
||||||
|
|
||||||
|
|
||||||
|
int mca_ptl_gm_send_frag_done(
|
||||||
|
mca_ptl_gm_send_frag_t * frag,
|
||||||
|
mca_pml_base_send_request_t * req);
|
||||||
|
|
||||||
|
|
||||||
mca_ptl_gm_recv_frag_t *
|
mca_ptl_gm_recv_frag_t *
|
||||||
mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl);
|
mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl);
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user