From f64e52a28d4b6848281e6bc38f5e4d6399313adc Mon Sep 17 00:00:00 2001 From: Brian Barrett Date: Fri, 13 May 2005 04:04:08 +0000 Subject: [PATCH] * more refactoring to reduce duplicate code This commit was SVN r5708. --- src/mca/ptl/portals/src/ptl_portals_recv.c | 157 +++------------------ src/mca/ptl/portals/src/ptl_portals_recv.h | 116 ++++++++++++++- src/mca/ptl/portals/src/ptl_portals_send.c | 3 +- 3 files changed, 135 insertions(+), 141 deletions(-) diff --git a/src/mca/ptl/portals/src/ptl_portals_recv.c b/src/mca/ptl/portals/src/ptl_portals_recv.c index cabf890832..6dbc4087d2 100644 --- a/src/mca/ptl/portals/src/ptl_portals_recv.c +++ b/src/mca/ptl/portals/src/ptl_portals_recv.c @@ -80,8 +80,8 @@ ptl_portals_post_recv_md(struct mca_ptl_portals_module_t *ptl, void *data_ptr) return OMPI_ERROR; } - ompi_output_verbose(100, mca_ptl_portals_component.portals_output, - "new receive buffer posted"); + OMPI_OUTPUT_VERBOSE((100, mca_ptl_portals_component.portals_output, + "new receive buffer posted")); return OMPI_SUCCESS; } @@ -94,160 +94,51 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl, int ret; if (ev->type == PTL_EVENT_PUT_START) { - ompi_output_verbose(100, mca_ptl_portals_component.portals_output, - "PUT_START event received (%ld)", ev->link); + OMPI_OUTPUT_VERBOSE((100, mca_ptl_portals_component.portals_output, + "PUT_START event received (%ld)", ev->link)); } else if (ev->type == PTL_EVENT_PUT_END) { - ompi_list_item_t *item; - mca_ptl_portals_recv_frag_t *recvfrag; mca_ptl_base_header_t *hdr; - ompi_output_verbose(100, mca_ptl_portals_component.portals_output, - "message %ld received, start: %p, mlength: %lld, offset: %lld", - ev->link, ev->md.start, ev->mlength, ev->offset); + OMPI_OUTPUT_VERBOSE((100, mca_ptl_portals_component.portals_output, + "message %ld received, start: %p, mlength: %lld," + " offset: %lld", + ev->link, ev->md.start, ev->mlength, ev->offset)); /* buffer is going to be header followed by data */ hdr = (mca_ptl_base_header_t*) (((char*) ev->md.start) + ev->offset); switch (hdr->hdr_common.hdr_type) { case MCA_PTL_HDR_TYPE_MATCH: - /* get a fragment header */ - OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret); - recvfrag = (mca_ptl_portals_recv_frag_t*) item; - if (OMPI_SUCCESS != ret) { - ompi_output(mca_ptl_portals_component.portals_output, - "unable to allocate resources"); - return OMPI_ERR_TEMP_OUT_OF_RESOURCE; - } - - /* save the sender */ - recvfrag->frag_source = ev->initiator; - - recvfrag->frag_data = ((mca_ptl_base_match_header_t*) hdr) + 1; - recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_match_header_t); - memcpy(&(recvfrag->frag_recv.frag_base.frag_header), - hdr, sizeof(mca_ptl_base_match_header_t)); - recvfrag->frag_recv.frag_base.frag_owner = - (struct mca_ptl_base_module_t*) ptl; - recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */ - recvfrag->frag_recv.frag_base.frag_size = 0; - recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data; - recvfrag->frag_recv.frag_is_buffered = true; - recvfrag->frag_recv.frag_request = NULL; - - ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv, - &hdr->hdr_match); + ret = mca_ptl_portals_process_first_frag(ptl, hdr, ev, + sizeof(mca_ptl_base_match_header_t)); + if (OMPI_SUCCESS != ret) return ret; break; case MCA_PTL_HDR_TYPE_RNDV: - /* get a fragment header */ - OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret); - recvfrag = (mca_ptl_portals_recv_frag_t*) item; - if (OMPI_SUCCESS != ret) { - ompi_output(mca_ptl_portals_component.portals_output, - "unable to allocate resources"); - return OMPI_ERR_TEMP_OUT_OF_RESOURCE; - } - - /* save the sender */ - recvfrag->frag_source = ev->initiator; - - recvfrag->frag_data = ((mca_ptl_base_rendezvous_header_t*) hdr) + 1; - recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_rendezvous_header_t); - memcpy(&(recvfrag->frag_recv.frag_base.frag_header), - hdr, sizeof(mca_ptl_base_rendezvous_header_t)); - recvfrag->frag_recv.frag_base.frag_owner = - (struct mca_ptl_base_module_t*) ptl; - recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */ - recvfrag->frag_recv.frag_base.frag_size = 0; - recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data; - recvfrag->frag_recv.frag_is_buffered = true; - recvfrag->frag_recv.frag_request = NULL; - - ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv, - &hdr->hdr_match); - + ret = mca_ptl_portals_process_first_frag(ptl, hdr, ev, + sizeof(mca_ptl_base_rendezvous_header_t)); + if (OMPI_SUCCESS != ret) return ret; break; case MCA_PTL_HDR_TYPE_FRAG: - { - unsigned int bytes_delivered; - mca_ptl_base_recv_request_t* request; - - /* get a fragment header */ - OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret); - recvfrag = (mca_ptl_portals_recv_frag_t*) item; - if (OMPI_SUCCESS != ret) { - ompi_output(mca_ptl_portals_component.portals_output, - "unable to allocate resources"); - return OMPI_ERR_TEMP_OUT_OF_RESOURCE; - } - - /* save the sender */ - recvfrag->frag_source = ev->initiator; - - recvfrag->frag_data = ((mca_ptl_base_frag_header_t*) hdr) + 1; - recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_frag_header_t); - memcpy(&(recvfrag->frag_recv.frag_base.frag_header), - hdr, sizeof(mca_ptl_base_frag_header_t)); - recvfrag->frag_recv.frag_base.frag_owner = - (struct mca_ptl_base_module_t*) ptl; - recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */ - recvfrag->frag_recv.frag_base.frag_size = 0; - recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data; - recvfrag->frag_recv.frag_is_buffered = true; - recvfrag->frag_recv.frag_request = hdr->hdr_frag.hdr_dst_ptr.pval; - bytes_delivered = recvfrag->frag_size; - request = recvfrag->frag_recv.frag_request; - - if(recvfrag->frag_size > 0) { - struct iovec iov; - unsigned int iov_count = 1; - int free_after = 0; - ompi_proc_t *proc = ompi_comm_peer_lookup(request->req_recv.req_base.req_comm, - request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE); - ompi_convertor_t* convertor = &(recvfrag->frag_recv.frag_base.frag_convertor); - - /* initialize receive convertor */ - ompi_convertor_copy(proc->proc_convertor, convertor); - ompi_convertor_init_for_recv( - convertor, /* convertor */ - 0, /* flags */ - request->req_recv.req_base.req_datatype, /* datatype */ - request->req_recv.req_base.req_count, /* count elements */ - request->req_recv.req_base.req_addr, /* users buffer */ - hdr->hdr_frag.hdr_frag_offset, /* offset in bytes into packed buffer */ - NULL ); /* not allocating memory */ - /*ompi_convertor_get_packed_size(convertor, &request->req_bytes_packed); */ - - iov.iov_base = recvfrag->frag_data; - iov.iov_len = recvfrag->frag_size; - ompi_convertor_unpack(convertor, &iov, &iov_count, &bytes_delivered, &free_after ); - } - - /* update request status */ - ptl->super.ptl_recv_progress(&ptl->super, - request, - recvfrag->frag_size, - bytes_delivered); - - } + ret = mca_ptl_portals_process_frag_frag(ptl, hdr, ev); + if (OMPI_SUCCESS != ret) return ret; break; case MCA_PTL_HDR_TYPE_ACK: { mca_ptl_portals_send_frag_t *sendfrag; mca_ptl_base_send_request_t *sendreq; - sendfrag = hdr->hdr_ack.hdr_src_ptr.pval; - sendreq = sendfrag->frag_send.frag_request; - - sendreq->req_peer_match = hdr->hdr_ack.hdr_dst_match; - - OMPI_OUTPUT_VERBOSE((100, mca_ptl_portals_component.portals_output, + OMPI_OUTPUT_VERBOSE((100, + mca_ptl_portals_component.portals_output, "received ack for request %p", hdr->hdr_ack.hdr_dst_match)); + + sendfrag = hdr->hdr_ack.hdr_src_ptr.pval; + sendreq = sendfrag->frag_send.frag_request; + sendreq->req_peer_match = hdr->hdr_ack.hdr_dst_match; mca_ptl_portals_complete_send_event(sendfrag); } - break; default: @@ -257,11 +148,7 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl, break; } - /* see if we need to repost an md */ if (ev->md.length - (ev->offset + ev->mlength) < ev->md.max_size) { - ompi_output_verbose(100, mca_ptl_portals_component.portals_output, - "must repost event: %lld, %lld, %lld", - ev->offset, ev->mlength, ev->md.max_size); /* use the same memory as the old md - it's not using it anymore */ ret = ptl_portals_post_recv_md(ptl, ev->md.start); if (OMPI_SUCCESS != ret) { diff --git a/src/mca/ptl/portals/src/ptl_portals_recv.h b/src/mca/ptl/portals/src/ptl_portals_recv.h index 125ababbe5..63e2dee401 100644 --- a/src/mca/ptl/portals/src/ptl_portals_recv.h +++ b/src/mca/ptl/portals/src/ptl_portals_recv.h @@ -36,10 +36,118 @@ typedef struct mca_ptl_portals_recv_frag_t mca_ptl_portals_recv_frag_t; OBJ_CLASS_DECLARATION(mca_ptl_portals_recv_frag_t); -extern int ptl_portals_post_recv_md(struct mca_ptl_portals_module_t *ptl, - void *data_ptr); -extern int mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl, - ptl_event_t *ev); +int ptl_portals_post_recv_md(struct mca_ptl_portals_module_t *ptl, + void *data_ptr); +int mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl, + ptl_event_t *ev); +static inline mca_ptl_portals_recv_frag_t * +mca_ptl_portals_recv_get_frag(struct mca_ptl_portals_module_t *ptl, + mca_ptl_base_header_t *hdr, + ptl_event_t *ev, + size_t header_size) +{ + mca_ptl_portals_recv_frag_t * recvfrag; + ompi_list_item_t *item; + int ret; + + /* get a fragment header */ + OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, + item, ret); + recvfrag = (mca_ptl_portals_recv_frag_t*) item; + if (OMPI_SUCCESS != ret) { + ompi_output(mca_ptl_portals_component.portals_output, + "unable to allocate resources"); + return NULL; + } + + /* save the sender */ + recvfrag->frag_source = ev->initiator; + + recvfrag->frag_data = ((char*) hdr) + header_size; + recvfrag->frag_size = ev->mlength - header_size; + memcpy(&(recvfrag->frag_recv.frag_base.frag_header), + hdr, header_size); + recvfrag->frag_recv.frag_base.frag_owner = &(ptl->super); + recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */ + recvfrag->frag_recv.frag_base.frag_size = 0; + recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data; + recvfrag->frag_recv.frag_is_buffered = true; + + return recvfrag; +} + + +static inline int +mca_ptl_portals_process_first_frag(struct mca_ptl_portals_module_t *ptl, + mca_ptl_base_header_t *hdr, + ptl_event_t *ev, + size_t header_size) +{ + mca_ptl_portals_recv_frag_t *recvfrag; + + recvfrag = mca_ptl_portals_recv_get_frag(ptl, hdr, ev, header_size); + if (NULL == recvfrag) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + + recvfrag->frag_recv.frag_request = NULL; + ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv, + &hdr->hdr_match); + + return OMPI_SUCCESS; +} + +static inline int +mca_ptl_portals_process_frag_frag(struct mca_ptl_portals_module_t *ptl, + mca_ptl_base_header_t *hdr, + ptl_event_t *ev) +{ + unsigned int bytes_delivered; + mca_ptl_base_recv_request_t* request; + mca_ptl_portals_recv_frag_t *recvfrag; + + /* get a frag and fill it in */ + recvfrag = mca_ptl_portals_recv_get_frag(ptl, hdr, ev, + sizeof(mca_ptl_base_frag_header_t)); + if (NULL == recvfrag) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + + recvfrag->frag_recv.frag_request = hdr->hdr_frag.hdr_dst_ptr.pval; + bytes_delivered = recvfrag->frag_size; + request = recvfrag->frag_recv.frag_request; + + if (recvfrag->frag_size > 0) { + struct iovec iov; + unsigned int iov_count = 1; + int free_after = 0; + ompi_proc_t *proc = ompi_comm_peer_lookup( + request->req_recv.req_base.req_comm, + request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE); + ompi_convertor_t* convertor = + &(recvfrag->frag_recv.frag_base.frag_convertor); + + /* initialize receive convertor */ + ompi_convertor_copy(proc->proc_convertor, convertor); + ompi_convertor_init_for_recv( + convertor, /* convertor */ + 0, /* flags */ + request->req_recv.req_base.req_datatype, /* datatype */ + request->req_recv.req_base.req_count, /* count elements */ + request->req_recv.req_base.req_addr, /* users buffer */ + hdr->hdr_frag.hdr_frag_offset, /* offset in bytes into packed buffer */ + NULL ); /* not allocating memory */ + + iov.iov_base = recvfrag->frag_data; + iov.iov_len = recvfrag->frag_size; + ompi_convertor_unpack(convertor, &iov, &iov_count, + &bytes_delivered, &free_after ); + } + + /* update request status */ + ptl->super.ptl_recv_progress(&ptl->super, + request, + recvfrag->frag_size, + bytes_delivered); + + return OMPI_SUCCESS; +} #endif diff --git a/src/mca/ptl/portals/src/ptl_portals_send.c b/src/mca/ptl/portals/src/ptl_portals_send.c index 5f4227811e..7efe2650a3 100644 --- a/src/mca/ptl/portals/src/ptl_portals_send.c +++ b/src/mca/ptl/portals/src/ptl_portals_send.c @@ -96,7 +96,7 @@ mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl_base, sendreq->req_send.req_addr, offset, mca_ptl_portals_alloc ); - + /* if data is contigous convertor will return an offset * into users buffer - otherwise will return an allocated buffer * that holds the packed data @@ -167,7 +167,6 @@ mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl_base, sendfrag->frag_send.frag_base.frag_owner = ptl_base; sendfrag->frag_send.frag_request = sendreq; sendfrag->frag_send.frag_base.frag_peer = ptl_peer; - /* must update the offset after actual fragment size is determined * before attempting to send the fragment