From 722867c82d86ea0df6e4c321d834d85dba0f5edc Mon Sep 17 00:00:00 2001 From: Gopal Santhanaraman Date: Mon, 30 Aug 2004 20:48:13 +0000 Subject: [PATCH] a preliminary implementation of put This commit was SVN r2401. --- src/mca/ptl/gm/src/ptl_gm.c | 127 +++++++++++++++--- src/mca/ptl/gm/src/ptl_gm_component.c | 3 + src/mca/ptl/gm/src/ptl_gm_priv.c | 179 +++++++++++++++++++++++++- src/mca/ptl/gm/src/ptl_gm_priv.h | 13 ++ src/mca/ptl/gm/src/ptl_gm_req.h | 1 + src/mca/ptl/gm/src/ptl_gm_sendfrag.c | 139 +++++++++++++++++++- src/mca/ptl/gm/src/ptl_gm_sendfrag.h | 37 +++++- 7 files changed, 471 insertions(+), 28 deletions(-) diff --git a/src/mca/ptl/gm/src/ptl_gm.c b/src/mca/ptl/gm/src/ptl_gm.c index bf71ba25f1..2aea559c12 100644 --- a/src/mca/ptl/gm/src/ptl_gm.c +++ b/src/mca/ptl/gm/src/ptl_gm.c @@ -243,6 +243,7 @@ mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl, } ((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =sendfrag; + ((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags; rc = mca_ptl_gm_send_frag_init (sendfrag, (mca_ptl_gm_peer_t*)ptl_peer, sendreq, offset, &size, flags); @@ -271,7 +272,62 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl, struct mca_pml_base_send_request_t *sendreq, size_t offset, size_t size, int flags) { - return OMPI_SUCCESS; + int rc; + mca_ptl_gm_send_frag_t *sendfrag, *putfrag; + mca_ptl_gm_peer_t *gm_ptl_peer; + mca_ptl_gm_module_t * gm_ptl; + void* destination_buffer; + char * buffer_ptr; + int status, bytes_reg; + + gm_ptl= (mca_ptl_gm_module_t *)ptl; + buffer_ptr = ((char *) (sendreq->req_base.req_addr)) + offset ; + bytes_reg = size; + + destination_buffer =(void *)( (sendreq->req_peer_addr).pval); + + /* register the user buffer */ + if (offset > 0) + { + status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg); + if(GM_SUCCESS != status) + { + ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__); + } + + } + + putfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq); /*alloc_put_frag */ + putfrag->registered_buf = (void *)buffer_ptr; + putfrag->peer = (mca_ptl_gm_peer_t *)ptl_peer; + + rc = mca_ptl_gm_put_frag_init(putfrag , + (mca_ptl_gm_peer_t*)ptl_peer,gm_ptl, + sendreq, offset, &size, flags); + + rc = + mca_ptl_gm_peer_put((mca_ptl_gm_peer_t *)ptl_peer, putfrag, + sendreq, offset, &size, flags, + destination_buffer, bytes_reg); + + + + gm_ptl->num_send_tokens--; + + /*do a send to notify completion */ + /*sendfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq);*/ + + /*rc = mca_ptl_gm_send_fini_init (sendfrag,gm_ptl, */ + /*(mca_ptl_gm_peer_t*)ptl_peer, sendreq);*/ + + /*gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer;*/ + /*rc = mca_ptl_gm_peer_send (gm_ptl_peer,sendfrag,sendreq,*/ + /*offset,&size,flags);*/ + + /*gm_ptl->num_send_tokens--;*/ + sendreq->req_offset += size; + + return OMPI_SUCCESS; } @@ -299,46 +355,78 @@ mca_ptl_gm_get (struct mca_ptl_base_module_t *ptl, * ack back to the peer and process the fragment. */ + void mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, mca_ptl_base_recv_frag_t * frag ) { mca_pml_base_recv_request_t *request; - /*mca_ptl_base_recv_request_t *request;*/ + mca_pml_base_send_request_t *srequest; mca_ptl_base_header_t *header; - int bytes_recv, rc; - mca_ptl_gm_module_t *gm_ptl; + int bytes_recv, rc,rc1, total_bytes, bytes_reg; + mca_ptl_gm_module_t *gm_ptl; struct iovec iov[1]; + mca_ptl_gm_send_frag_t *ack; + mca_ptl_gm_recv_frag_t *recv_frag; + char *buffer_ptr; + gm_status_t status; + size_t size = 0; header = &frag->frag_base.frag_header; request = frag->frag_request; + gm_ptl = (mca_ptl_gm_module_t *)ptl; + if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { -#if 0 +#if 1 /* might need to send an ack back */ - int rc; - mca_ptl_gm_send_frag_t *ack; recv_frag = (mca_ptl_gm_recv_frag_t *) frag; ack = mca_ptl_gm_alloc_send_frag(ptl,NULL); if (NULL == ack) { ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n", - __FILE__,___LINE__); - OMPI_THREAD_LOCK (&mca_ptl_gm_module.gm_lock); + __FILE__,__LINE__); + OMPI_THREAD_LOCK (&mca_ptl_gm_component.gm_lock); recv_frag->frag_ack_pending = true; ompi_list_append (&mca_ptl_gm_module.gm_pending_acks, (ompi_list_item_t *) frag); - OMPI_THREAD_UNLOCK (&mca_ptl_gm_module.gm_lock); - } else { - mca_ptl_gm_send_frag_init_ack (ack, ptl, - recv_frag->super.super. - frag_peer, recv_frag); + OMPI_THREAD_UNLOCK (&mca_ptl_gm_component.gm_lock); + } + else + { + + buffer_ptr = (char *)( request->req_base.req_addr ); + total_bytes = (request->req_base.req_datatype->size) * + (request->req_base.req_count); + bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size; + bytes_reg = total_bytes - bytes_recv; + buffer_ptr += bytes_recv; + status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg); + if(GM_SUCCESS != status) + { + ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__); + } + + /* send the registered memory information, send recv request * ptr */ + rc1 = mca_ptl_gm_send_ack_init (ack, gm_ptl, + (mca_ptl_gm_peer_t *)(recv_frag->frag_recv.frag_base.frag_peer) + , recv_frag, buffer_ptr, bytes_reg); + + /*XXX : put the registered memory in pin-down cache */ + /*XXX: check this*/ - mca_ptl_gm_peer_send (ack->super.super.frag_peer, ack,0,0,0 ); + rc1 = mca_ptl_gm_peer_send ((mca_ptl_gm_peer_t *) + (ack->send_frag.frag_base.frag_peer), + ack,srequest,0,&size,0 ); + + gm_ptl->num_send_tokens--; + ompi_list_append (&(gm_ptl->gm_send_frags_queue), + (ompi_list_item_t *) ack); + } #endif } - /* Here we expect that frag_addr is the beging of the buffer header included */ + /* Here we expect that frag_addr is the begin of the buffer header included */ iov[0].iov_base = ((char*)frag->frag_base.frag_addr) + header->hdr_common.hdr_size; bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size; iov[0].iov_len = bytes_recv; @@ -361,7 +449,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, header->hdr_frag.hdr_frag_offset); rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1); assert( rc == 1 ); - } +} /*update progress*/ /* XXX : check this */ ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len ); @@ -374,5 +462,8 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, } /*return to free list */ gm_ptl = (mca_ptl_gm_module_t *)ptl; - OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag); + OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag); } + + + diff --git a/src/mca/ptl/gm/src/ptl_gm_component.c b/src/mca/ptl/gm/src/ptl_gm_component.c index e4f364a575..68419db381 100644 --- a/src/mca/ptl/gm/src/ptl_gm_component.c +++ b/src/mca/ptl/gm/src/ptl_gm_component.c @@ -114,6 +114,9 @@ mca_ptl_gm_component_open (void) mca_ptl_gm_param_register_int ("first_frag_size", 16 * 1024); mca_ptl_gm_module.super.ptl_min_frag_size = mca_ptl_gm_param_register_int ("min_frag_size", 1<<16); + mca_ptl_gm_module.super.ptl_max_frag_size = + mca_ptl_gm_param_register_int ("max_frag_size", 256 * 1024); + mca_ptl_gm_component.gm_free_list_num = mca_ptl_gm_param_register_int ("free_list_num", 32); mca_ptl_gm_component.gm_free_list_inc = diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.c b/src/mca/ptl/gm/src/ptl_gm_priv.c index 04b1022226..ec4da878cc 100755 --- a/src/mca/ptl/gm/src/ptl_gm_priv.c +++ b/src/mca/ptl/gm/src/ptl_gm_priv.c @@ -16,12 +16,39 @@ #include "mca/pml/base/pml_base_sendreq.h" #include "mca/ns/base/base.h" #include "ptl_gm.h" +#include "ptl_gm_req.h" #include "ptl_gm_addr.h" #include "ptl_gm_peer.h" #include "ptl_gm_proc.h" #include "ptl_gm_sendfrag.h" #include "ptl_gm_priv.h" +int mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer, + mca_ptl_gm_send_frag_t *fragment, + struct mca_pml_base_send_request_t *sendreq, + size_t offset, + size_t *size, + int flags, + void * target_buffer, + int bytes) +{ + + gm_put( ptl_peer->peer_ptl->my_port, fragment->registered_buf, + (gm_remote_ptr_t) target_buffer,bytes, GM_LOW_PRIORITY, + ptl_peer->local_id, ptl_peer->port_number, + put_callback, (void *)fragment ); + + + fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super; + fragment->send_frag.frag_base.frag_peer = + (struct mca_ptl_base_peer_t*)ptl_peer; + fragment->send_frag.frag_base.frag_addr =(void *)target_buffer; + fragment->send_frag.frag_base.frag_size = bytes; + return OMPI_SUCCESS; +} + + + int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, mca_ptl_gm_send_frag_t *fragment, struct mca_pml_base_send_request_t *sendreq, @@ -40,8 +67,11 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, size_in = *size; outvec[0].iov_base = (char*)fragment->send_buf; - if( (size_in + header_length) < GM_SEND_BUF_SIZE ) outvec[0].iov_len = size_in; - else outvec[0].iov_len = GM_SEND_BUF_SIZE; + + if( (size_in + header_length) < GM_SEND_BUF_SIZE ) + outvec[0].iov_len = size_in; + else + outvec[0].iov_len = GM_SEND_BUF_SIZE - header_length; /*header_length = sizeof(mca_ptl_base_frag_header_t);*/ @@ -105,6 +135,73 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, } +void put_callback(struct gm_port *port,void * context, gm_status_t status) +{ + mca_ptl_gm_module_t *ptl; + mca_ptl_gm_send_frag_t *putfrag; + /*ompi_list_t *list;*/ + int bytes, header_length; + mca_pml_base_send_request_t *gm_send_req; + mca_ptl_base_frag_header_t* header; + int offset = 0; + int size = 0; + int flags = 0; + int rc; + + putfrag = (mca_ptl_gm_send_frag_t *)context; + ptl = (mca_ptl_gm_module_t *)putfrag->ptl; + gm_send_req = putfrag->req; + + header = (mca_ptl_base_frag_header_t*)putfrag->registered_buf; + header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; + /* XXX : what should the header length be */ + bytes = putfrag->send_frag.frag_base.frag_size - header_length; + + printf("ENTERING PUT CALLBACK\n"); + fflush(stdout); + + + switch (status) { + case GM_SUCCESS: + /* local put completed, mark put as complete */ + printf("PUTCALLBACK WITH CASE GM_SUCCESS\n"); + fflush(stdout); + + ptl->num_send_tokens++; + putfrag->put_sent = 1; + + /* send the header information through send/receive channel */ + + rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,gm_send_req, + offset,&size,flags); + + /* deregister the user memory */ + status = gm_deregister_memory(ptl->my_port, (char *)(putfrag->registered_buf), bytes); + + if(GM_SUCCESS != status) + ompi_output(0," unpinning memory failed\n"); + else + ompi_output(0," unpinning memory success\n"); + + break; + + case GM_SEND_TIMED_OUT: + /* need to take care of retransmission */ + break; + + case GM_SEND_DROPPED: + /* need to handle this case */ + break; + + default: + ompi_output(0, + "[%s:%d] error in message completion\n",__FILE__,__LINE__); + break; + } + +} + + void send_callback(struct gm_port *port,void * context, gm_status_t status) { mca_ptl_gm_module_t *ptl; @@ -115,18 +212,44 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status) mca_ptl_base_frag_header_t* header; frag = (mca_ptl_gm_send_frag_t *)context; - ptl = (mca_ptl_gm_module_t *)frag->ptl; + /*ptl = (mca_ptl_gm_module_t *)frag->ptl;*/ + ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner; gm_send_req = frag->req; header = (mca_ptl_base_frag_header_t*)frag->send_buf; header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; - bytes = frag->send_frag.frag_base.frag_size - header_length; + bytes = frag->send_frag.frag_base.frag_size - 64;/*header_length;*/ + + if (NULL != gm_send_req) + { + if(1 == (( mca_ptl_gm_send_request_t *)gm_send_req)->need_ack ) + frag->wait_for_ack = 1; + } + printf("ENTERING SEND CALLBACK\n"); + fflush(stdout); switch (status) { case GM_SUCCESS: /* send completed, can reuse the user buffer */ + printf("SENDCALLBACK WITH CASE GM_SUCCESS\n"); + fflush(stdout); ptl->num_send_tokens++; - ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, gm_send_req, bytes ); + frag->send_complete = 1; + /* + while (1 == (frag->wait_for_ack)) + { + + mca_ptl_gm_incoming_recv (&mca_ptl_gm_component); + //This is recursive + + } + */ + if (frag->wait_for_ack == 0 && (gm_send_req != NULL)) + { + ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, + gm_send_req, bytes ); + + } list = (ompi_list_t *)(&(ptl->gm_send_frags_queue)); ompi_list_remove_first(list); break; @@ -151,12 +274,53 @@ void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, { mca_ptl_gm_send_frag_t * frag; mca_pml_base_send_request_t *req; - + mca_pml_base_recv_request_t *request; + int header_length, bytes; + + if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) + { frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval; req = (mca_pml_base_send_request_t *) frag->req; req->req_peer_match = header->hdr_ack.hdr_dst_match; + req->req_peer_addr = header->hdr_ack.hdr_dst_addr; + req->req_peer_size = header->hdr_ack.hdr_dst_size; + + frag->wait_for_ack = 0; + + /* check if send has completed */ + + header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; + bytes = frag->send_frag.frag_base.frag_size - header_length; + + if(frag->send_complete == 1) + { + ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, + req, bytes ); + } + + } + + #if 1 + if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) + { + + request = (mca_pml_base_recv_request_t*) + header->hdr_frag.hdr_dst_ptr.pval; + /* call receive progress and indicate the recv has been completed */ + + printf("Calling recv_progress\n"); + fflush(stdout); + ptl->super.ptl_recv_progress ( + (mca_ptl_base_module_t *) ptl, + request, + header->hdr_frag.hdr_frag_length, + header->hdr_frag.hdr_frag_length); + + } + #endif + + /* XXX: will handle NACK later */ - /* return the send fragment to the free list */ } mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl, @@ -216,6 +380,7 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_ev case MCA_PTL_HDR_TYPE_ACK: case MCA_PTL_HDR_TYPE_NACK: + case MCA_PTL_HDR_TYPE_FIN: ptl_gm_ctrl_frag(ptl,header); break; default: diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.h b/src/mca/ptl/gm/src/ptl_gm_priv.h index 226190d5de..ad556ee33e 100644 --- a/src/mca/ptl/gm/src/ptl_gm_priv.h +++ b/src/mca/ptl/gm/src/ptl_gm_priv.h @@ -36,8 +36,21 @@ mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, size_t *size, int flags); +int +mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer, + mca_ptl_gm_send_frag_t *fragment, + struct mca_pml_base_send_request_t *sendreq, + size_t offset, + size_t *size, + int flags, + void *target_buffer, + int bytes); + void send_callback(struct gm_port *port,void * context, gm_status_t status); +void put_callback(struct gm_port *port,void * context, gm_status_t +status); + diff --git a/src/mca/ptl/gm/src/ptl_gm_req.h b/src/mca/ptl/gm/src/ptl_gm_req.h index 476936debc..8c82ff1af8 100644 --- a/src/mca/ptl/gm/src/ptl_gm_req.h +++ b/src/mca/ptl/gm/src/ptl_gm_req.h @@ -22,6 +22,7 @@ struct mca_ptl_gm_send_request_t { mca_pml_base_send_request_t super; /* add stuff here */ mca_ptl_gm_send_frag_t *req_frag; + int need_ack; }; typedef struct mca_ptl_gm_send_request_t mca_ptl_gm_send_request_t; diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.c b/src/mca/ptl/gm/src/ptl_gm_sendfrag.c index 0d79a42641..31cda65f99 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.c +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.c @@ -17,10 +17,10 @@ #include "ptl_gm_priv.h" -#define frag_header super.super.frag_header +/*#define frag_header super.super.frag_header #define frag_owner super.super.frag_owner #define frag_peer super.super.frag_peer -#define frag_convertor super.super.frag_convertor +#define frag_convertor super.super.frag_convertor */ static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag); @@ -83,6 +83,141 @@ mca_ptl_gm_alloc_send_frag(struct mca_ptl_base_module_t *ptl, } +int mca_ptl_gm_send_frag_done( + mca_ptl_gm_send_frag_t * frag, + mca_pml_base_send_request_t * req) +{ + + return OMPI_SUCCESS; + +} + + + +int mca_ptl_gm_send_ack_init( + struct mca_ptl_gm_send_frag_t* ack, + mca_ptl_gm_module_t *ptl, + mca_ptl_gm_peer_t* ptl_peer, + struct mca_ptl_gm_recv_frag_t* frag, + char * buffer, + int size) +{ + int header_length; + mca_ptl_base_header_t * hdr; + mca_pml_base_recv_request_t *request; + hdr = (mca_ptl_base_header_t *)ack->send_buf; + request = frag->frag_recv.frag_request; + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; + hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); + + hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr; + hdr->hdr_ack.hdr_dst_match.lval = 0; + hdr->hdr_ack.hdr_dst_match.pval = request; + hdr->hdr_ack.hdr_dst_addr.lval = 0; + hdr->hdr_ack.hdr_dst_addr.pval = (void *)buffer;/*request->req_base.req_addr;*/ + /*posted registered buffer */ + hdr->hdr_ack.hdr_dst_size = size; + /*size of registered buffer */ + + ack->send_frag.frag_request = 0; + + ack->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer; + ack->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)ptl; + ack->send_frag.frag_base.frag_addr = NULL; + ack->send_frag.frag_base.frag_size = 0; + ack->status = 1; /* was able to register memory */ + ack->ptl = ptl; + ack->send_frag.frag_base.frag_header = *hdr; + ack->wait_for_ack = 0; + header_length = sizeof(mca_ptl_base_ack_header_t); + + /* need to add registered buffer information */ + + return OMPI_SUCCESS; + +} + + +/* +int mca_ptl_gm_send_fini_init( + mca_ptl_gm_send_frag_t* fini, + mca_ptl_gm_module_t *ptl, + mca_ptl_gm_peer_t* ptl_peer, + mca_pml_base_send_request_t* request) +{ + +#if 1 + int header_length; + mca_ptl_base_header_t * hdr; + hdr = (mca_ptl_base_header_t *)fini->send_buf; + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; + hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); + hdr->hdr_ack.hdr_dst_match.lval = 0; + hdr->hdr_ack.hdr_dst_addr.lval = 0; + + fini->send_frag.frag_request = 0; + fini->send_frag.frag_base.frag_peer = ptl_peer; + fini->send_frag.frag_base.frag_owner = ptl; + fini->send_frag.frag_base.frag_addr = NULL; + fini->send_frag.frag_base.frag_size = 0; + fini->ptl = ptl; + + fini->wait_for_ack = 0; + header_length = sizeof(mca_ptl_base_ack_header_t); +#endif + + return OMPI_SUCCESS; + +} +*/ + +int mca_ptl_gm_put_frag_init( + mca_ptl_gm_send_frag_t* putfrag, + mca_ptl_gm_peer_t * ptl_peer, + mca_ptl_gm_module_t * gm_ptl, + mca_pml_base_send_request_t * request, + size_t offset, + size_t* size, + int flags) +{ + mca_ptl_base_header_t *hdr; + void * buffer; + int header_length; + + #if 1 + hdr = (mca_ptl_base_header_t *)putfrag->send_buf; + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; + hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); + hdr->hdr_ack.hdr_dst_match.lval = 0; + /*hdr->hdr_ack.hdr_dst_match.pval = request->req_peer_match;*/ + hdr->hdr_ack.hdr_dst_addr.lval = 0; + hdr->hdr_ack.hdr_dst_addr.pval = (void *)(request->req_base.req_addr); + hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed; + + + putfrag->send_frag.frag_request = request; /* XXX: check this */ + putfrag->send_frag.frag_base.frag_peer = ptl_peer; + putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)gm_ptl; + putfrag->send_frag.frag_base.frag_addr = NULL; + putfrag->send_frag.frag_base.frag_size = 0; + putfrag->ptl = gm_ptl; + + #endif + + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); + putfrag->send_frag.frag_base.frag_size = *size; + putfrag->ptl = gm_ptl; + putfrag->wait_for_ack = 0; + putfrag->put_sent = 0; + return OMPI_SUCCESS; +} + + + + int mca_ptl_gm_send_frag_init( mca_ptl_gm_send_frag_t* sendfrag, mca_ptl_gm_peer_t * ptl_peer, diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h index a2be71bc9f..434fe6a968 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h @@ -31,11 +31,16 @@ OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t); struct mca_ptl_gm_send_frag_t { mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */ void * send_buf; + void * registered_buf; mca_pml_base_send_request_t *req; mca_ptl_gm_module_t *ptl; - /*mca_ptl_gm_peer_t *peer;*/ + mca_ptl_gm_peer_t *peer; + int status; int type; + int wait_for_ack; + int put_sent; + int send_complete; }; typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t; @@ -65,6 +70,32 @@ mca_ptl_gm_alloc_send_frag ( struct mca_ptl_base_module_t *ptl, struct mca_pml_base_send_request_t *sendreq); + +int mca_ptl_gm_send_ack_init( + struct mca_ptl_gm_send_frag_t* ack, + mca_ptl_gm_module_t *ptl, + mca_ptl_gm_peer_t* ptl_peer, + struct mca_ptl_gm_recv_frag_t* frag, + char * buffer, + int size); + +/* int mca_ptl_gm_send_fini_init( + mca_ptl_gm_send_frag_t* fini, + mca_ptl_gm_module_t *ptl, + mca_ptl_gm_peer_t* ptl_peer, + mca_pml_base_send_request_t * sendreq); + */ + +int + mca_ptl_gm_put_frag_init( mca_ptl_gm_send_frag_t* sendfrag, + mca_ptl_gm_peer_t * ptl_peer, + mca_ptl_gm_module_t *ptl, + mca_pml_base_send_request_t * sendreq, + size_t offset, + size_t* size, + int flags); + + int mca_ptl_gm_send_frag_init( mca_ptl_gm_send_frag_t* sendfrag, mca_ptl_gm_peer_t * ptl_peer, @@ -74,6 +105,10 @@ int int flags); +int mca_ptl_gm_send_frag_done( + mca_ptl_gm_send_frag_t * frag, + mca_pml_base_send_request_t * req); + mca_ptl_gm_recv_frag_t * mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl);