From fb6cb846b945ae3864241b3f9826e389e91c8d0a Mon Sep 17 00:00:00 2001 From: Gopal Santhanaraman Date: Tue, 7 Sep 2004 21:29:18 +0000 Subject: [PATCH] This commit was SVN r2527. --- src/mca/ptl/gm/src/ptl_gm.c | 72 ++++----- src/mca/ptl/gm/src/ptl_gm.h | 3 +- src/mca/ptl/gm/src/ptl_gm_component.c | 15 +- src/mca/ptl/gm/src/ptl_gm_priv.c | 225 +++++++++++++++++++------- src/mca/ptl/gm/src/ptl_gm_priv.h | 3 + src/mca/ptl/gm/src/ptl_gm_sendfrag.c | 101 +++--------- src/mca/ptl/gm/src/ptl_gm_sendfrag.h | 2 + 7 files changed, 240 insertions(+), 181 deletions(-) diff --git a/src/mca/ptl/gm/src/ptl_gm.c b/src/mca/ptl/gm/src/ptl_gm.c index 2aea559c12..b122d83b32 100644 --- a/src/mca/ptl/gm/src/ptl_gm.c +++ b/src/mca/ptl/gm/src/ptl_gm.c @@ -21,6 +21,8 @@ #include "ptl_gm_peer.h" #include "ptl_gm_priv.h" +#define DEBUG 0 + mca_ptl_gm_module_t mca_ptl_gm_module = { { &mca_ptl_gm_component.super, @@ -132,17 +134,11 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl, peers[i] = (struct mca_ptl_base_peer_t*)ptl_peer; - /*printf ("Global_id\t local_id\t port_number\t process name \n");*/ - /*fflush (stdout);*/ - /*printf ("%u %d %d %d\n", ptl_proc->peer_arr[0]->global_id,*/ - /*ptl_proc->peer_arr[0]->local_id,*/ - /*ptl_proc->peer_arr[0]->port_number, - * ptl_proc->proc_guid);*/ - /*fflush (stdout);*/ - } + #if DEBUG printf ("returning with success from gm_add_procs\n"); + #endif return OMPI_SUCCESS; } @@ -190,11 +186,9 @@ mca_ptl_gm_request_init(struct mca_ptl_base_module_t *ptl, else { req = (mca_ptl_gm_send_request_t *)request; - /*((mca_ptl_gm_send_request_t *)request)->req_frag = frag;*/ req->req_frag = frag; frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/ frag->ptl = (mca_ptl_gm_module_t*)ptl; - /*frag->peer = request->req_peer;*/ } return OMPI_SUCCESS; @@ -254,12 +248,12 @@ mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl, gm_ptl->num_send_tokens--; /*Update offset */ - sendreq->req_offset += size; /* XXX: should be what convertor packs */ + sendreq->req_offset += rc; /* XXX: should be what convertor packs */ /*append to the send_fragments_queue. */ - ompi_list_append (&(gm_ptl->gm_send_frags_queue), - (ompi_list_item_t *) sendfrag); - return rc; + /*ompi_list_append (&(gm_ptl->gm_send_frags_queue),*/ + /*(ompi_list_item_t *) sendfrag);*/ + return OMPI_SUCCESS; } /* @@ -273,8 +267,7 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl, size_t offset, size_t size, int flags) { int rc; - mca_ptl_gm_send_frag_t *sendfrag, *putfrag; - mca_ptl_gm_peer_t *gm_ptl_peer; + mca_ptl_gm_send_frag_t *putfrag; mca_ptl_gm_module_t * gm_ptl; void* destination_buffer; char * buffer_ptr; @@ -301,10 +294,16 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl, putfrag->registered_buf = (void *)buffer_ptr; putfrag->peer = (mca_ptl_gm_peer_t *)ptl_peer; + ((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =putfrag; + ((struct mca_ptl_gm_send_request_t *)sendreq)->need_ack = flags; + + + rc = mca_ptl_gm_put_frag_init(putfrag , (mca_ptl_gm_peer_t*)ptl_peer,gm_ptl, sendreq, offset, &size, flags); + /* check that we have a send token available */ rc = mca_ptl_gm_peer_put((mca_ptl_gm_peer_t *)ptl_peer, putfrag, sendreq, offset, &size, flags, @@ -314,17 +313,6 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl, gm_ptl->num_send_tokens--; - /*do a send to notify completion */ - /*sendfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq);*/ - - /*rc = mca_ptl_gm_send_fini_init (sendfrag,gm_ptl, */ - /*(mca_ptl_gm_peer_t*)ptl_peer, sendreq);*/ - - /*gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer;*/ - /*rc = mca_ptl_gm_peer_send (gm_ptl_peer,sendfrag,sendreq,*/ - /*offset,&size,flags);*/ - - /*gm_ptl->num_send_tokens--;*/ sendreq->req_offset += size; return OMPI_SUCCESS; @@ -376,8 +364,9 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, request = frag->frag_request; gm_ptl = (mca_ptl_gm_module_t *)ptl; + + if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { -#if 1 /* might need to send an ack back */ recv_frag = (mca_ptl_gm_recv_frag_t *) frag; ack = mca_ptl_gm_alloc_send_frag(ptl,NULL); @@ -401,12 +390,16 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, bytes_reg = total_bytes - bytes_recv; buffer_ptr += bytes_recv; status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg); + recv_frag->registered_buf = buffer_ptr; + printf("Receiver: register addr: %p, bytes: %d\n",buffer_ptr,bytes_reg); + fflush(stdout); + if(GM_SUCCESS != status) { ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__); } - /* send the registered memory information, send recv request * ptr */ + /* send the regiscered memory information, send recv request * ptr */ rc1 = mca_ptl_gm_send_ack_init (ack, gm_ptl, (mca_ptl_gm_peer_t *)(recv_frag->frag_recv.frag_base.frag_peer) , recv_frag, buffer_ptr, bytes_reg); @@ -419,11 +412,10 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, ack,srequest,0,&size,0 ); gm_ptl->num_send_tokens--; - ompi_list_append (&(gm_ptl->gm_send_frags_queue), - (ompi_list_item_t *) ack); + /*ompi_list_append (&(gm_ptl->gm_send_frags_queue),*/ + /*(ompi_list_item_t *) ack);*/ } -#endif } /* Here we expect that frag_addr is the begin of the buffer header included */ @@ -431,9 +423,9 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size; iov[0].iov_len = bytes_recv; - /*process fragment if complete */ + if (header->hdr_frag.hdr_frag_length > 0) { - ompi_proc_t *proc; + /* ompi_proc_t *proc; proc = ompi_comm_peer_lookup(request->req_base.req_comm, request->req_base.req_peer); @@ -448,15 +440,19 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl, request->req_base.req_addr, header->hdr_frag.hdr_frag_offset); rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1); - assert( rc == 1 ); -} + assert( rc == 1 );*/ + printf ("in matched: bytes received is %d\n", bytes_recv); + fflush(stdout); + memcpy(request->req_base.req_addr,iov[0].iov_base,bytes_recv); - /*update progress*/ /* XXX : check this */ + } + + /*update progress*/ ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len ); /* Now update the status of the fragment */ ((mca_ptl_gm_recv_frag_t*)frag)->matched = true; - if( ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer == true ) { + if( ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer == true ){ free( frag->frag_base.frag_addr ); ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer = false; } diff --git a/src/mca/ptl/gm/src/ptl_gm.h b/src/mca/ptl/gm/src/ptl_gm.h index 54a7f98f7b..2e14d632a2 100644 --- a/src/mca/ptl/gm/src/ptl_gm.h +++ b/src/mca/ptl/gm/src/ptl_gm.h @@ -65,13 +65,12 @@ struct mca_ptl_gm_module_t { unsigned int num_recv_tokens; unsigned int max_send_tokens; unsigned int max_recv_tokens; - /*struct mca_ptl_gm_addr_t *proc_id_table;*/ ompi_free_list_t gm_send_frags; ompi_free_list_t gm_recv_frags_free; ompi_list_t gm_send_frags_queue; ompi_list_t gm_pending_acks; - + ompi_list_t gm_recv_outstanding_queue; #if MCA_PTL_GM_STATISTICS size_t ptl_bytes_sent; size_t ptl_bytes_recv; diff --git a/src/mca/ptl/gm/src/ptl_gm_component.c b/src/mca/ptl/gm/src/ptl_gm_component.c index 68419db381..fd75ce6049 100644 --- a/src/mca/ptl/gm/src/ptl_gm_component.c +++ b/src/mca/ptl/gm/src/ptl_gm_component.c @@ -111,14 +111,14 @@ mca_ptl_gm_component_open (void) /* register GM component parameters */ mca_ptl_gm_module.super.ptl_first_frag_size = - mca_ptl_gm_param_register_int ("first_frag_size", 16 * 1024); + mca_ptl_gm_param_register_int ("first_frag_size", ((16 * 1024) - 64)); mca_ptl_gm_module.super.ptl_min_frag_size = mca_ptl_gm_param_register_int ("min_frag_size", 1<<16); mca_ptl_gm_module.super.ptl_max_frag_size = mca_ptl_gm_param_register_int ("max_frag_size", 256 * 1024); mca_ptl_gm_component.gm_free_list_num = - mca_ptl_gm_param_register_int ("free_list_num", 32); + mca_ptl_gm_param_register_int ("free_list_num", 256); mca_ptl_gm_component.gm_free_list_inc = mca_ptl_gm_param_register_int ("free_list_inc", 32); @@ -296,7 +296,7 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm) ompi_free_list_init (&(ptl->gm_send_frags), sizeof (mca_ptl_gm_send_frag_t), OBJ_CLASS (mca_ptl_gm_send_frag_t), - 32, 32, 1, NULL); /* not using mpool */ + ptl->num_send_tokens,ptl->num_send_tokens, 1, NULL); /* not using mpool */ /* allocate the elements */ sfragment = (mca_ptl_gm_send_frag_t *) @@ -329,11 +329,18 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm) } + OBJ_CONSTRUCT (&(ptl->gm_recv_outstanding_queue), ompi_list_t); /* construct the list of recv fragments free */ OBJ_CONSTRUCT (&(ptl->gm_recv_frags_free), ompi_free_list_t); free_rlist = &(ptl->gm_recv_frags_free); - + + ompi_free_list_init (&(ptl->gm_recv_frags_free), + sizeof (mca_ptl_gm_recv_frag_t), + OBJ_CLASS (mca_ptl_gm_recv_frag_t), + ptl->num_recv_tokens,ptl->num_recv_tokens, 1, NULL); /* not using mpool */ + + /*allocate the elements */ free_rfragment = (mca_ptl_gm_recv_frag_t *) malloc(sizeof(mca_ptl_gm_recv_frag_t) * NUM_RECV_FRAGS); diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.c b/src/mca/ptl/gm/src/ptl_gm_priv.c index ec4da878cc..25665e8368 100755 --- a/src/mca/ptl/gm/src/ptl_gm_priv.c +++ b/src/mca/ptl/gm/src/ptl_gm_priv.c @@ -23,6 +23,8 @@ #include "ptl_gm_sendfrag.h" #include "ptl_gm_priv.h" +#define DEBUG 0 + int mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer, mca_ptl_gm_send_frag_t *fragment, struct mca_pml_base_send_request_t *sendreq, @@ -68,15 +70,11 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, outvec[0].iov_base = (char*)fragment->send_buf; - if( (size_in + header_length) < GM_SEND_BUF_SIZE ) + if( (size_in + header_length) <= GM_SEND_BUF_SIZE ) outvec[0].iov_len = size_in; else outvec[0].iov_len = GM_SEND_BUF_SIZE - header_length; - /*header_length = sizeof(mca_ptl_base_frag_header_t);*/ - - /* copy the header in the buffer */ - /**header = fragment->send_frag.frag_base.frag_header;*/ if(size_in > 0) { ompi_convertor_t *convertor; @@ -129,9 +127,10 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super; fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer; fragment->send_frag.frag_base.frag_addr = outvec[0].iov_base; - fragment->send_frag.frag_base.frag_size = size_out; /*XXX: should this be size_out */ + fragment->send_frag.frag_base.frag_size = size_out; - return OMPI_SUCCESS; + return (size_out - header_length); + /*return OMPI_SUCCESS;*/ } @@ -139,50 +138,61 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status) { mca_ptl_gm_module_t *ptl; mca_ptl_gm_send_frag_t *putfrag; - /*ompi_list_t *list;*/ - int bytes, header_length; - mca_pml_base_send_request_t *gm_send_req; - mca_ptl_base_frag_header_t* header; - int offset = 0; - int size = 0; + int bytes; + mca_pml_base_send_request_t *send_req; + size_t offset = 0; + size_t size = 0; int flags = 0; int rc; putfrag = (mca_ptl_gm_send_frag_t *)context; ptl = (mca_ptl_gm_module_t *)putfrag->ptl; - gm_send_req = putfrag->req; + send_req = putfrag->req; - header = (mca_ptl_base_frag_header_t*)putfrag->registered_buf; - header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; - /* XXX : what should the header length be */ - bytes = putfrag->send_frag.frag_base.frag_size - header_length; + bytes = putfrag->send_frag.frag_base.frag_size; + #if DEBUG printf("ENTERING PUT CALLBACK\n"); fflush(stdout); - + #endif switch (status) { case GM_SUCCESS: /* local put completed, mark put as complete */ + + #if DEBUG printf("PUTCALLBACK WITH CASE GM_SUCCESS\n"); fflush(stdout); + #endif ptl->num_send_tokens++; putfrag->put_sent = 1; /* send the header information through send/receive channel */ - rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,gm_send_req, + rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,send_req, offset,&size,flags); - + + #if DEBUG + printf("after issuing the put completion the request offset = %d\n",send_req->req_offset); + fflush(stdout); + #endif + /* deregister the user memory */ status = gm_deregister_memory(ptl->my_port, (char *)(putfrag->registered_buf), bytes); if(GM_SUCCESS != status) + { + #if DEBUG ompi_output(0," unpinning memory failed\n"); + #endif + } else - ompi_output(0," unpinning memory success\n"); - + { + #if DEBUG + ompi_output(0," unpinning %d bytes of memory success\n",bytes); + #endif + } break; case GM_SEND_TIMED_OUT: @@ -209,49 +219,57 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status) ompi_list_t *list; int bytes, header_length; mca_pml_base_send_request_t *gm_send_req; - mca_ptl_base_frag_header_t* header; + mca_ptl_base_header_t* header; frag = (mca_ptl_gm_send_frag_t *)context; - /*ptl = (mca_ptl_gm_module_t *)frag->ptl;*/ ptl = (mca_ptl_gm_module_t *)frag->send_frag.frag_base.frag_owner; gm_send_req = frag->req; - header = (mca_ptl_base_frag_header_t*)frag->send_buf; + header = (mca_ptl_base_header_t*)frag->send_buf; header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; - bytes = frag->send_frag.frag_base.frag_size - 64;/*header_length;*/ + + if (frag->type == 1) + { + bytes = header->hdr_ack.hdr_dst_size; + } + else + bytes = frag->send_frag.frag_base.frag_size - header_length; + if (NULL != gm_send_req) { if(1 == (( mca_ptl_gm_send_request_t *)gm_send_req)->need_ack ) frag->wait_for_ack = 1; - } - printf("ENTERING SEND CALLBACK\n"); - fflush(stdout); + } + switch (status) { case GM_SUCCESS: + /* send completed, can reuse the user buffer */ - printf("SENDCALLBACK WITH CASE GM_SUCCESS\n"); - fflush(stdout); + #if DEBUG + printf("SENDCALLBACK WITH CASE GM_SUCCESS\n"); + fflush(stdout); + #endif + ptl->num_send_tokens++; frag->send_complete = 1; - /* - while (1 == (frag->wait_for_ack)) - { - - mca_ptl_gm_incoming_recv (&mca_ptl_gm_component); - //This is recursive - - } - */ + if (frag->wait_for_ack == 0 && (gm_send_req != NULL)) { - ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, + #if DEBUG + printf("inside send callback : calling send progress bytes = %d\n",bytes); + fflush(stdout); + #endif + ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, gm_send_req, bytes ); } - list = (ompi_list_t *)(&(ptl->gm_send_frags_queue)); - ompi_list_remove_first(list); + + OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), (ompi_list_item_t *)frag); + /*list = (ompi_list_t *)(&(ptl->gm_send_frags_queue));*/ + /*ompi_list_remove_first(list);*/ + break; case GM_SEND_TIMED_OUT: @@ -269,6 +287,7 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status) } } + void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_ptl_base_header_t * header) { @@ -276,48 +295,79 @@ void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, mca_pml_base_send_request_t *req; mca_pml_base_recv_request_t *request; int header_length, bytes; - + char * reg_buf; + int status; + if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) { frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval; req = (mca_pml_base_send_request_t *) frag->req; - req->req_peer_match = header->hdr_ack.hdr_dst_match; - req->req_peer_addr = header->hdr_ack.hdr_dst_addr; + req->req_peer_match.pval = header->hdr_ack.hdr_dst_match.pval; + req->req_peer_addr.pval = header->hdr_ack.hdr_dst_addr.pval; req->req_peer_size = header->hdr_ack.hdr_dst_size; frag->wait_for_ack = 0; - /* check if send has completed */ - - header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; - bytes = frag->send_frag.frag_base.frag_size - header_length; + /* check if send has completed */ + header_length = + frag->send_frag.frag_base.frag_header.hdr_frag.hdr_common.hdr_size; + bytes = frag->send_frag.frag_base.frag_size - 64; /*header_length;*/ if(frag->send_complete == 1) { + ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, req, bytes ); } } - #if 1 if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FIN) { request = (mca_pml_base_recv_request_t*) - header->hdr_frag.hdr_dst_ptr.pval; + header->hdr_ack.hdr_dst_match.pval; /* call receive progress and indicate the recv has been completed */ - - printf("Calling recv_progress\n"); + + #if DEBUG + printf("Calling recv_progress with bytes = %d\n",header->hdr_ack.hdr_dst_size); fflush(stdout); + #endif ptl->super.ptl_recv_progress ( (mca_ptl_base_module_t *) ptl, - request, - header->hdr_frag.hdr_frag_length, - header->hdr_frag.hdr_frag_length); + request , + header->hdr_ack.hdr_dst_size, + header->hdr_ack.hdr_dst_size); + /* deregister the memory */ + bytes = header->hdr_ack.hdr_dst_size; + reg_buf =(char *) header->hdr_ack.hdr_dst_addr.pval; + status = gm_deregister_memory(ptl->my_port, reg_buf, + bytes); + + if(GM_SUCCESS != status) + { + #if DEBUG + ompi_output(0," unpinning memory failed\n"); + #endif + } + else + { + #if DEBUG + ompi_output(0,"unpinning memory success,addr:%p,bytes:%d\n",reg_buf,bytes); + #endif + } + + #if 0 + /*return the recv fragment to the free list */ + OMPI_FREE_LIST_RETURN(&(((mca_ptl_gm_module_t + *)ptl)->gm_recv_frags_free), (ompi_list_item_t *)recv_frag); + + /* free the associated buffer */ + if(recv_frag->have_allocated == true) + free(recv_frag->frag_recv.frag_base.frag_add GM_SEND_BUF_SIZE); + #endif } - #endif /* XXX: will handle NACK later */ @@ -351,6 +401,7 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl, recv_frag->matched = false; recv_frag->have_allocated_buffer = false; + recv_frag->ptl = ptl; matched = ptl->super.ptl_match( &(ptl->super), &(recv_frag->frag_recv), @@ -358,8 +409,10 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl, if( matched ) { return NULL; } + #if DEBUG ompi_output( 0,"matching receive not yet posted get tag %d comm %d source %d\n", header->hdr_match.hdr_tag, header->hdr_match.hdr_contextid, header->hdr_match.hdr_src ); + #endif return recv_frag; } @@ -391,6 +444,47 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_ev return frag; } +void mca_ptl_gm_outstanding_recv(mca_ptl_gm_module_t *ptl) +{ + + mca_ptl_gm_recv_frag_t * frag = NULL; + int i, size; + bool matched; + + size = ompi_list_get_size (&ptl->gm_recv_outstanding_queue); + + + if (size > 0) + { + frag = (mca_ptl_gm_recv_frag_t *) + ompi_list_remove_first( (ompi_list_t *)&(ptl->gm_recv_outstanding_queue) ); + + + printf(" the frag size to be matched is %d\n",frag->frag_recv.frag_base.frag_size); + fflush(stdout); + matched = ptl->super.ptl_match( &(ptl->super), + &(frag->frag_recv), + &(frag->frag_recv.frag_base.frag_header.hdr_match) ); + + printf("the value of matched is %d\n", matched); + fflush(stdout); + + if(!matched) + { + ompi_list_append((ompi_list_t *)&(ptl->gm_recv_outstanding_queue), + (ompi_list_item_t *) frag); + } + else + { + /* if allocated buffer, free the buffer */ + + /* return the recv descriptor to the free list */ + OMPI_FREE_LIST_RETURN(&(ptl->gm_recv_frags_free), (ompi_list_item_t *)frag); + } + + } + +} int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp) { @@ -420,6 +514,15 @@ int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp) frag->frag_recv.frag_base.frag_addr = buffer; /* mark the fragment as having pending buffers */ frag->have_allocated_buffer = true; + + #if 0 + /* append to the receive queue */ + ompi_list_append (&(ptl->gm_recv_outstanding_queue), + (ompi_list_item_t *) frag); + + printf ("frag appended to recv_oustanding queue \n"); + #endif + } gm_provide_receive_buffer( ptl->my_port, gm_ntohp(event->recv.buffer), GM_SIZE, GM_LOW_PRIORITY ); @@ -431,6 +534,10 @@ int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp) gm_unknown(ptl->my_port, event); } + /* process the outstanding frags in the queue */ + /*mca_ptl_gm_outstanding_recv(ptl); */ + + } return 0; } diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.h b/src/mca/ptl/gm/src/ptl_gm_priv.h index ad556ee33e..66773ff9db 100644 --- a/src/mca/ptl/gm/src/ptl_gm_priv.h +++ b/src/mca/ptl/gm/src/ptl_gm_priv.h @@ -28,6 +28,9 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp); + +void mca_ptl_gm_outstanding_recv(mca_ptl_gm_module_t *ptl); + int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, mca_ptl_gm_send_frag_t *fragment, diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.c b/src/mca/ptl/gm/src/ptl_gm_sendfrag.c index 31cda65f99..d613a5dc58 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.c +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.c @@ -16,11 +16,7 @@ #include "ptl_gm_sendfrag.h" #include "ptl_gm_priv.h" - -/*#define frag_header super.super.frag_header -#define frag_owner super.super.frag_owner -#define frag_peer super.super.frag_peer -#define frag_convertor super.super.frag_convertor */ +#define DEBUG 0 static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag); @@ -77,7 +73,7 @@ mca_ptl_gm_alloc_send_frag(struct mca_ptl_base_module_t *ptl, frag = (mca_ptl_gm_send_frag_t *)item; frag->req = (struct mca_pml_base_send_request_t *)sendreq; - frag->type = 0 ;/* XXX: should be EAGER_SEND; */ + frag->type = 0 ; return frag; } @@ -102,10 +98,11 @@ int mca_ptl_gm_send_ack_init( char * buffer, int size) { - int header_length; mca_ptl_base_header_t * hdr; mca_pml_base_recv_request_t *request; hdr = (mca_ptl_base_header_t *)ack->send_buf; + printf("ack buf is %p\n",ack->send_buf); + fflush(stdout); request = frag->frag_recv.frag_request; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; hdr->hdr_common.hdr_flags = 0; @@ -113,15 +110,13 @@ int mca_ptl_gm_send_ack_init( hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr; hdr->hdr_ack.hdr_dst_match.lval = 0; - hdr->hdr_ack.hdr_dst_match.pval = request; - hdr->hdr_ack.hdr_dst_addr.lval = 0; - hdr->hdr_ack.hdr_dst_addr.pval = (void *)buffer;/*request->req_base.req_addr;*/ - /*posted registered buffer */ + hdr->hdr_ack.hdr_dst_match.pval = request; /*should this be dst_match */ + hdr->hdr_ack.hdr_dst_addr.lval = 0; /*we are filling both p and val of +dest addrees */ + hdr->hdr_ack.hdr_dst_addr.pval = (void *)buffer; hdr->hdr_ack.hdr_dst_size = size; - /*size of registered buffer */ ack->send_frag.frag_request = 0; - ack->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer; ack->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)ptl; ack->send_frag.frag_base.frag_addr = NULL; @@ -130,49 +125,13 @@ int mca_ptl_gm_send_ack_init( ack->ptl = ptl; ack->send_frag.frag_base.frag_header = *hdr; ack->wait_for_ack = 0; - header_length = sizeof(mca_ptl_base_ack_header_t); - /* need to add registered buffer information */ return OMPI_SUCCESS; } -/* -int mca_ptl_gm_send_fini_init( - mca_ptl_gm_send_frag_t* fini, - mca_ptl_gm_module_t *ptl, - mca_ptl_gm_peer_t* ptl_peer, - mca_pml_base_send_request_t* request) -{ - -#if 1 - int header_length; - mca_ptl_base_header_t * hdr; - hdr = (mca_ptl_base_header_t *)fini->send_buf; - hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; - hdr->hdr_common.hdr_flags = 0; - hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); - hdr->hdr_ack.hdr_dst_match.lval = 0; - hdr->hdr_ack.hdr_dst_addr.lval = 0; - - fini->send_frag.frag_request = 0; - fini->send_frag.frag_base.frag_peer = ptl_peer; - fini->send_frag.frag_base.frag_owner = ptl; - fini->send_frag.frag_base.frag_addr = NULL; - fini->send_frag.frag_base.frag_size = 0; - fini->ptl = ptl; - - fini->wait_for_ack = 0; - header_length = sizeof(mca_ptl_base_ack_header_t); -#endif - - return OMPI_SUCCESS; - -} -*/ - int mca_ptl_gm_put_frag_init( mca_ptl_gm_send_frag_t* putfrag, mca_ptl_gm_peer_t * ptl_peer, @@ -183,36 +142,31 @@ int mca_ptl_gm_put_frag_init( int flags) { mca_ptl_base_header_t *hdr; - void * buffer; - int header_length; + hdr = (mca_ptl_base_header_t *)putfrag->send_buf; - #if 1 - hdr = (mca_ptl_base_header_t *)putfrag->send_buf; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); + hdr->hdr_ack.hdr_dst_match.lval = 0; - /*hdr->hdr_ack.hdr_dst_match.pval = request->req_peer_match;*/ + hdr->hdr_ack.hdr_dst_match.pval = request->req_peer_match.pval; hdr->hdr_ack.hdr_dst_addr.lval = 0; - hdr->hdr_ack.hdr_dst_addr.pval = (void *)(request->req_base.req_addr); - hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed; + hdr->hdr_ack.hdr_dst_addr.pval = (void *)(request->req_peer_addr.pval); + hdr->hdr_ack.hdr_dst_size = *size; - - putfrag->send_frag.frag_request = request; /* XXX: check this */ - putfrag->send_frag.frag_base.frag_peer = ptl_peer; + putfrag->send_frag.frag_request = request; + putfrag->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t *)ptl_peer; putfrag->send_frag.frag_base.frag_owner = (mca_ptl_base_module_t *)gm_ptl; putfrag->send_frag.frag_base.frag_addr = NULL; putfrag->send_frag.frag_base.frag_size = 0; putfrag->ptl = gm_ptl; - #endif - - hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); - putfrag->send_frag.frag_base.frag_size = *size; - putfrag->ptl = gm_ptl; - putfrag->wait_for_ack = 0; - putfrag->put_sent = 0; - return OMPI_SUCCESS; + putfrag->wait_for_ack = 0; + putfrag->put_sent = 0; + putfrag->type = 1; + putfrag->req = request; /* gm_send_request */ + + return OMPI_SUCCESS; } @@ -237,11 +191,12 @@ int mca_ptl_gm_send_frag_init( hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; hdr->hdr_frag.hdr_frag_seq = 0; hdr->hdr_frag.hdr_dst_ptr.lval = 0; hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* pointer to the frag */ - hdr->hdr_frag.hdr_dst_ptr.lval = 0; + hdr->hdr_frag.hdr_frag_length = *size; hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; @@ -262,16 +217,6 @@ int mca_ptl_gm_send_frag_init( header_length = sizeof (mca_ptl_base_frag_header_t); } - /*initialize convertor */ - -#if 0 - /*fragment state*/ - sendfrag->frag_base.frag_owner = &ptl_peer->peer_ptl->super; - sendfrag->frag_base.frag_peer = ptl_peer; - sendfrag->frag_base.frag_addr = NULL; - sendfrag->frag_base.frag_size = *size; -#endif - return OMPI_SUCCESS; } diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h index 434fe6a968..ecb5ecdeca 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h @@ -57,6 +57,8 @@ struct mca_ptl_gm_recv_frag_t { bool frag_ack_pending; void *alloc_recv_buffer; void *unex_recv_buffer; + void * registered_buf; + mca_ptl_gm_module_t *ptl; bool matched; bool have_allocated_buffer; };