diff --git a/src/mca/ptl/portals/configure.stub b/src/mca/ptl/portals/configure.stub index 460bc2f153..801ea9f203 100644 --- a/src/mca/ptl/portals/configure.stub +++ b/src/mca/ptl/portals/configure.stub @@ -92,6 +92,7 @@ AC_DEFUN([MCA_CONFIGURE_STUB],[ PTL_PORTALS_UTCP=0 PTL_PORTALS_REDSTORM=0 PTL_PORTALS_COMPAT="" + PTL_PORTALS_HAVE_EVENT_UNLINK=0 AC_ARG_WITH([ptl-portals-config], AC_HELP_STRING([--with-ptl-portals-config], @@ -105,11 +106,13 @@ AC_DEFUN([MCA_CONFIGURE_STUB],[ "utcp") PTL_PORTALS_UTCP=1 PORTALS_LIBS="-lutcpapi -lutcplib -lp3api -lp3lib -lp3rt" - AC_MSG_RESULT([utcp]) + PTL_PORTALS_HAVE_EVENT_UNLINK=1 + AC_MSG_RESULT([utcp]) ;; "redstorm") PTL_PORTALS_REDSTORM=1 PORTALS_LIBS="-lp3api -lp3lib -lp3rt" + PTL_PORTALS_HAVE_EVENT_UNLINK=0 AC_MSG_RESULT([red storm]) ;; *) @@ -136,6 +139,10 @@ AC_DEFUN([MCA_CONFIGURE_STUB],[ [AC_MSG_RESULT([no]) AC_MSG_ERROR([Can not link with Portals libraries])]) + AC_DEFINE_UNQUOTED([PTL_PORTALS_HAVE_EVENT_UNLINK], + [$PTL_PORTALS_HAVE_EVENT_UNLINK], + [Does Portals send a PTL_EVENT_UNLINK event]) + AC_DEFINE_UNQUOTED([PTL_PORTALS_UTCP], [$PTL_PORTALS_UTCP], [Use the UTCP reference implementation or Portals]) AM_CONDITIONAL([PTL_PORTALS_UTCP], [test "$PTL_PORTALS_UTCP" = "1"]) diff --git a/src/mca/ptl/portals/src/ptl_portals_component.c b/src/mca/ptl/portals/src/ptl_portals_component.c index 7e4c73dd94..eb407bd472 100644 --- a/src/mca/ptl/portals/src/ptl_portals_component.c +++ b/src/mca/ptl/portals/src/ptl_portals_component.c @@ -334,8 +334,11 @@ mca_ptl_portals_component_progress(mca_ptl_tstamp_t tstamp) } /* only one place we can have an event */ - assert(which == 0); + assert(which == 0); +#if PTL_PORTALS_HAVE_EVENT_UNLINK + if (PTL_EVENT_UNLINK == ev.type) continue; +#endif if (ev.md.user_ptr == NULL) { /* no request associated with it - it's a receive */ mca_ptl_portals_process_recv_event(module, &ev); diff --git a/src/mca/ptl/portals/src/ptl_portals_recv.c b/src/mca/ptl/portals/src/ptl_portals_recv.c index 06ab25791d..6ef804b114 100644 --- a/src/mca/ptl/portals/src/ptl_portals_recv.c +++ b/src/mca/ptl/portals/src/ptl_portals_recv.c @@ -169,33 +169,68 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl, break; case MCA_PTL_HDR_TYPE_FRAG: - /* get a fragment header */ - OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret); - recvfrag = (mca_ptl_portals_recv_frag_t*) item; - if (OMPI_SUCCESS != ret) { - ompi_output(mca_ptl_portals_component.portals_output, - "unable to allocate resources"); - return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + { + unsigned int bytes_delivered; + mca_ptl_base_recv_request_t* request; + + /* get a fragment header */ + OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret); + recvfrag = (mca_ptl_portals_recv_frag_t*) item; + if (OMPI_SUCCESS != ret) { + ompi_output(mca_ptl_portals_component.portals_output, + "unable to allocate resources"); + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + /* save the sender */ + recvfrag->frag_source = ev->initiator; + + recvfrag->frag_data = ((mca_ptl_base_frag_header_t*) hdr) + 1; + recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_frag_header_t); + memcpy(&(recvfrag->frag_recv.frag_base.frag_header), + hdr, sizeof(mca_ptl_base_frag_header_t)); + recvfrag->frag_recv.frag_base.frag_owner = + (struct mca_ptl_base_module_t*) ptl; + recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */ + recvfrag->frag_recv.frag_base.frag_size = 0; + recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data; + recvfrag->frag_recv.frag_is_buffered = true; + recvfrag->frag_recv.frag_request = hdr->hdr_frag.hdr_dst_ptr.pval; + bytes_delivered = recvfrag->frag_size; + request = recvfrag->frag_recv.frag_request; + + if(recvfrag->frag_size > 0) { + struct iovec iov; + unsigned int iov_count = 1; + int free_after = 0; + ompi_proc_t *proc = ompi_comm_peer_lookup(request->req_recv.req_base.req_comm, + request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE); + ompi_convertor_t* convertor = &(recvfrag->frag_recv.frag_base.frag_convertor); + + /* initialize receive convertor */ + ompi_convertor_copy(proc->proc_convertor, convertor); + ompi_convertor_init_for_recv( + convertor, /* convertor */ + 0, /* flags */ + request->req_recv.req_base.req_datatype, /* datatype */ + request->req_recv.req_base.req_count, /* count elements */ + request->req_recv.req_base.req_addr, /* users buffer */ + hdr->hdr_frag.hdr_frag_offset, /* offset in bytes into packed buffer */ + NULL ); /* not allocating memory */ + /*ompi_convertor_get_packed_size(convertor, &request->req_bytes_packed); */ + + iov.iov_base = recvfrag->frag_data; + iov.iov_len = recvfrag->frag_size; + ompi_convertor_unpack(convertor, &iov, &iov_count, &bytes_delivered, &free_after ); + } + + /* update request status */ + ptl->super.ptl_recv_progress(&ptl->super, + request, + recvfrag->frag_size, + bytes_delivered); + } - - /* save the sender */ - recvfrag->frag_source = ev->initiator; - - recvfrag->frag_data = ((mca_ptl_base_frag_header_t*) hdr) + 1; - recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_frag_header_t); - memcpy(&(recvfrag->frag_recv.frag_base.frag_header), - hdr, sizeof(mca_ptl_base_frag_header_t)); - recvfrag->frag_recv.frag_base.frag_owner = - (struct mca_ptl_base_module_t*) ptl; - recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */ - recvfrag->frag_recv.frag_base.frag_size = 0; - recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data; - recvfrag->frag_recv.frag_is_buffered = true; - recvfrag->frag_recv.frag_request = NULL; - - ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv, - &hdr->hdr_match); - break; case MCA_PTL_HDR_TYPE_ACK: @@ -207,6 +242,10 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl, sendreq->req_peer_match = hdr->hdr_ack.hdr_dst_match; + ompi_output_verbose(100, mca_ptl_portals_component.portals_output, + "received ack for request %p", + hdr->hdr_ack.hdr_dst_match); + sendfrag->frag_send.frag_base.frag_owner-> ptl_send_progress(sendfrag->frag_send.frag_base.frag_owner, sendfrag->frag_send.frag_request, diff --git a/src/mca/ptl/portals/src/ptl_portals_send.c b/src/mca/ptl/portals/src/ptl_portals_send.c index 17f7bbdc62..25ea9e2033 100644 --- a/src/mca/ptl/portals/src/ptl_portals_send.c +++ b/src/mca/ptl/portals/src/ptl_portals_send.c @@ -155,10 +155,12 @@ mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl_base, hdr->hdr_common.hdr_flags = flags; hdr->hdr_frag.hdr_frag_offset = offset; hdr->hdr_frag.hdr_frag_length = sendfrag->frag_send.frag_base.frag_size; - hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; + ompi_output_verbose(100, mca_ptl_portals_component.portals_output, + "sending frag for request %p", + hdr->hdr_frag.hdr_dst_ptr); + sendfrag->frag_send.frag_base.frag_size = size; } diff --git a/src/mca/ptl/portals/src/ptl_portals_send.h b/src/mca/ptl/portals/src/ptl_portals_send.h index b252f0c773..19d062ede7 100644 --- a/src/mca/ptl/portals/src/ptl_portals_send.h +++ b/src/mca/ptl/portals/src/ptl_portals_send.h @@ -129,6 +129,10 @@ mca_ptl_portals_send_ack(struct mca_ptl_portals_module_t *ptl, sendfrag->frag_send.frag_base.frag_size = 0; sendfrag->frag_vector[0].iov_len = sizeof(mca_ptl_base_ack_header_t); + + ompi_output_verbose(100, mca_ptl_portals_component.portals_output, + "sending ack for request %p", request); + return mca_ptl_portals_send_frag(ptl, sendfrag); }