From 847c08fda5abd80bf1af959732d8f78f651bde7a Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Thu, 28 Oct 2004 15:40:46 +0000 Subject: [PATCH] - for non-threaded builds - set progress to be blocking for non-mpi apps - reorg MX This commit was SVN r3383. --- src/mca/ptl/mx/ptl_mx.c | 261 +++++++++++++++++++++++++++- src/mca/ptl/mx/ptl_mx.h | 24 +++ src/mca/ptl/mx/ptl_mx_component.c | 9 +- src/mca/ptl/mx/ptl_mx_module.c | 26 ++- src/mca/ptl/mx/ptl_mx_module.h | 159 ++++++++++------- src/mca/ptl/mx/ptl_mx_proc.c | 20 +++ src/mca/ptl/mx/ptl_mx_recvfrag.c | 3 + src/mca/ptl/mx/ptl_mx_recvfrag.h | 63 ------- src/mca/ptl/mx/ptl_mx_sendfrag.c | 122 +------------ src/mca/ptl/mx/ptl_mx_sendfrag.h | 53 ------ src/mpi/runtime/ompi_mpi_finalize.c | 4 + src/mpi/runtime/ompi_mpi_init.c | 4 + src/runtime/ompi_progress.c | 10 +- src/runtime/ompi_progress.h | 4 + 14 files changed, 447 insertions(+), 315 deletions(-) diff --git a/src/mca/ptl/mx/ptl_mx.c b/src/mca/ptl/mx/ptl_mx.c index f05643f336..8250052a9a 100644 --- a/src/mca/ptl/mx/ptl_mx.c +++ b/src/mca/ptl/mx/ptl_mx.c @@ -32,8 +32,8 @@ mca_ptl_mx_module_t mca_ptl_mx_module = { mca_ptl_mx_add_procs, mca_ptl_mx_del_procs, mca_ptl_mx_finalize, - mca_ptl_mx_send, /* put */ - mca_ptl_mx_send, /* put */ + mca_ptl_mx_send, + mca_ptl_mx_send_continue, NULL, /* get */ mca_ptl_mx_matched, /* matched */ mca_ptl_mx_request_init, @@ -45,6 +45,16 @@ mca_ptl_mx_module_t mca_ptl_mx_module = { }; + +/** + * Allocate memory for use by the convert. + */ + +static void *mca_ptl_mx_alloc(size_t *size) +{ + return malloc(*size); +} + /** * PML->PTL Initialize a send request for use by the PTL. * @@ -125,8 +135,10 @@ int mca_ptl_mx_send( size_t size, int flags) { - mca_ptl_mx_module_t* ptl_mx = (mca_ptl_mx_module_t*)ptl; + mca_ptl_mx_module_t* mx_ptl = (mca_ptl_mx_module_t*)ptl; mca_ptl_mx_send_frag_t* sendfrag; + mca_ptl_base_header_t* hdr; + mx_return_t mx_return; int rc; if (offset == 0 && sendreq->req_cached) { @@ -137,18 +149,234 @@ int mca_ptl_mx_send( if(NULL == (sendfrag = (mca_ptl_mx_send_frag_t*)item)) return rc; } - rc = mca_ptl_mx_send_frag_init(sendfrag, ptl_peer, sendreq, offset, &size, flags); - if(rc != OMPI_SUCCESS) - return rc; + + /* setup message header */ + hdr = &sendfrag->frag_send.frag_base.frag_header; + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; + hdr->hdr_frag.hdr_dst_ptr.lval = 0; + hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; + hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; + hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer; + hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag; + hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed; + hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; + + /* initialize convertor */ + sendfrag->frag_free = 0; + if(size > 0) { + ompi_convertor_t *convertor; + struct iovec iov; + unsigned int iov_count; + unsigned int max_data; + int rc; + + convertor = &sendfrag->frag_send.frag_base.frag_convertor; + ompi_convertor_copy(&sendreq->req_convertor, convertor); + ompi_convertor_init_for_send( + convertor, + 0, + sendreq->req_datatype, + sendreq->req_count, + sendreq->req_addr, + offset, + mca_ptl_mx_alloc ); + + /* if data is contigous convertor will return an offset + * into users buffer - otherwise will return an allocated buffer + * that holds the packed data + */ + iov.iov_base = NULL; + iov.iov_len = size; + iov_count = 1; + max_data = size; + if((rc = ompi_convertor_pack( + convertor, + &iov, + &iov_count, + &max_data, + &(sendfrag->frag_free))) < 0) { + return OMPI_ERROR; + } + /* adjust the freeAfter as the position zero is reserved for the header */ + sendfrag->frag_free <<= 1; + sendfrag->frag_segments[1].segment_ptr = iov.iov_base; + sendfrag->frag_segments[1].segment_length = iov.iov_len; + sendfrag->frag_segment_count = 2; + sendfrag->frag_send.frag_base.frag_addr = iov.iov_base; + } else { + sendfrag->frag_send.frag_base.frag_addr = NULL; + sendfrag->frag_send.frag_base.frag_size = 0; + sendfrag->frag_segment_count = 1; + } + hdr->hdr_frag.hdr_frag_length = size; + + /* convert header to network byte order if required */ + if(ptl_peer->peer_byte_swap) { + hdr->hdr_common.hdr_flags |= MCA_PTL_FLAGS_NBO; + MCA_PTL_BASE_MATCH_HDR_HTON(hdr->hdr_match); + } + + /* fragment state */ + sendfrag->frag_send.frag_base.frag_owner = &ptl_peer->peer_ptl->super; + sendfrag->frag_send.frag_request = sendreq; + sendfrag->frag_send.frag_base.frag_size = size; + sendfrag->frag_send.frag_base.frag_peer = ptl_peer; /* must update the offset after actual fragment size is determined * before attempting to send the fragment */ sendreq->req_offset += size; - return mca_ptl_mx_send_frag_start(sendfrag, ptl_mx); + + /* start the fragment */ + mx_return = mx_isend( + mx_ptl->mx_endpoint, + sendfrag->frag_segments, + sendfrag->frag_segment_count, + ptl_peer->peer_addr, + 1, + sendfrag, + &sendfrag->frag_request); + if(mx_return != MX_SUCCESS) { + ompi_output(0, "mca_ptl_mx_send: mx_isend() failed with return value=%d\n", mx_return); + return OMPI_ERROR; + } + return OMPI_SUCCESS; } +/** + * PML->PTL Initiate a send to the peer. + * + * @param ptl (IN) PTL instance + * @param ptl_base_peer (IN) PTL peer addressing + * @param request (IN) Send request + * @param offset Current offset into packed/contiguous buffer. + * @param size (IN) Number of bytes PML is requesting PTL to deliver, + * @param flags (IN) Flags that should be passed to the peer via the message header. + * @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments + * + * Continue sending fragments of a large message to the peer. + */ + +int mca_ptl_mx_send_continue( + struct mca_ptl_base_module_t* ptl, + struct mca_ptl_base_peer_t* ptl_peer, + struct mca_pml_base_send_request_t* sendreq, + size_t offset, + size_t size, + int flags) +{ + mca_ptl_mx_module_t* mx_ptl = (mca_ptl_mx_module_t*)ptl; + mca_ptl_mx_send_frag_t* sendfrag; + mca_ptl_base_header_t* hdr; + mx_return_t mx_return; + int rc; + + /* allocate fragment */ + MCA_PTL_MX_SEND_FRAG_ALLOC(sendfrag, rc); + if(rc != OMPI_SUCCESS) { + return rc; + } + + /* setup message header */ + hdr = &sendfrag->frag_send.frag_base.frag_header; + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; + hdr->hdr_frag.hdr_dst_ptr.lval = 0; + + /* initialize convertor */ + sendfrag->frag_free = 0; + if(size > 0) { + ompi_convertor_t *convertor; + struct iovec iov; + unsigned int iov_count; + unsigned int max_data; + int rc; + + convertor = &sendfrag->frag_send.frag_base.frag_convertor; + ompi_convertor_copy(&sendreq->req_convertor, convertor); + ompi_convertor_init_for_send( + convertor, + 0, + sendreq->req_datatype, + sendreq->req_count, + sendreq->req_addr, + offset, + mca_ptl_mx_alloc ); + + /* if data is contigous convertor will return an offset + * into users buffer - otherwise will return an allocated buffer + * that holds the packed data + */ + iov.iov_base = NULL; + iov.iov_len = size; + iov_count = 1; + max_data = size; + if((rc = ompi_convertor_pack( + convertor, + &iov, + &iov_count, + &max_data, + &(sendfrag->frag_free))) < 0) { + return OMPI_ERROR; + } + /* adjust the freeAfter as the position zero is reserved for the header */ + sendfrag->frag_free <<= 1; + sendfrag->frag_segments[1].segment_ptr = iov.iov_base; + sendfrag->frag_segments[1].segment_length = iov.iov_len; + sendfrag->frag_segment_count = 2; + sendfrag->frag_send.frag_base.frag_addr = iov.iov_base; + } else { + sendfrag->frag_send.frag_base.frag_addr = NULL; + sendfrag->frag_send.frag_base.frag_size = 0; + sendfrag->frag_segment_count = 1; + } + hdr->hdr_frag.hdr_frag_length = size; + + /* fragment state */ + sendfrag->frag_send.frag_base.frag_owner = &ptl_peer->peer_ptl->super; + sendfrag->frag_send.frag_request = sendreq; + sendfrag->frag_send.frag_base.frag_size = size; + sendfrag->frag_send.frag_base.frag_peer = ptl_peer; + + /* convert header to network byte order if required */ + if(ptl_peer->peer_byte_swap) { + hdr->hdr_common.hdr_flags |= MCA_PTL_FLAGS_NBO; + MCA_PTL_BASE_FRAG_HDR_HTON(hdr->hdr_frag); + } + + /* must update the offset after actual fragment size is determined + * before attempting to send the fragment + */ + sendreq->req_offset += size; + + /* start the fragment */ + mx_return = mx_isend( + mx_ptl->mx_endpoint, + sendfrag->frag_segments, + sendfrag->frag_segment_count, + sendfrag->frag_send.frag_base.frag_peer->peer_addr, + 1, + sendfrag, + &sendfrag->frag_request); + if(mx_return != MX_SUCCESS) { + ompi_output(0, "mca_ptl_mx_send: mx_isend() failed with return value=%d\n", mx_return); + return OMPI_ERROR; + } + return OMPI_SUCCESS; +} + /** * PML->PTL Notification from the PML to the PTL that a receive * has been posted and matched against the indicated fragment. @@ -192,11 +420,28 @@ void mca_ptl_mx_matched( ompi_list_append(&mca_ptl_mx_component.mx_pending_acks, (ompi_list_item_t*)frag); OMPI_THREAD_UNLOCK(&mca_ptl_mx_component.mx_lock); } else { + mx_return_t mx_return; mca_ptl_mx_send_frag_init_ack(ack, mx_ptl, mx_frag); if(ack->frag_send.frag_base.frag_peer->peer_byte_swap) { MCA_PTL_BASE_ACK_HDR_HTON(ack->frag_send.frag_base.frag_header.hdr_ack); } - mca_ptl_mx_send_frag_start(ack, mx_ptl); + + /* start the fragment */ + mx_return = mx_isend( + mx_ptl->mx_endpoint, + ack->frag_segments, + ack->frag_segment_count, + ack->frag_send.frag_base.frag_peer->peer_addr, + 1, + ack, + &ack->frag_request); + if(mx_return != MX_SUCCESS) { + ompi_output(0, "mca_ptl_mx_matched: mx_isend() failed with return value=%d\n", mx_return); + OMPI_THREAD_LOCK(&mca_ptl_mx_component.mx_lock); + ack_pending = true; + ompi_list_append(&mca_ptl_mx_component.mx_pending_acks, (ompi_list_item_t*)frag); + OMPI_THREAD_UNLOCK(&mca_ptl_mx_component.mx_lock); + } } } diff --git a/src/mca/ptl/mx/ptl_mx.h b/src/mca/ptl/mx/ptl_mx.h index aab187ddfd..0b18db1141 100644 --- a/src/mca/ptl/mx/ptl_mx.h +++ b/src/mca/ptl/mx/ptl_mx.h @@ -25,6 +25,7 @@ struct mca_ptl_mx_component_t { int mx_free_list_max; /**< maximum size of free lists */ int mx_free_list_inc; /**< number of elements to growing free lists by */ int mx_prepost; /**< number of preposted recvs */ + int mx_debug; /**< debug level */ uint32_t mx_filter; /**< filter assigned to application */ uint32_t mx_num_ptls; /**< number of MX NICs available to app */ struct mca_ptl_mx_module_t** mx_ptls; /**< array of available PTL moduless */ @@ -316,5 +317,28 @@ extern int mca_ptl_mx_send( ); +/** + * PML->PTL Continue sending fragments of a large message. + * + * @param ptl (IN) PTL instance + * @param ptl_base_peer (IN) PTL peer addressing + * @param request (IN) Send request + * @param offset Current offset into packed/contiguous buffer. + * @param size (IN) Number of bytes PML is requesting PTL to deliver, + * @param flags (IN) Flags that should be passed to the peer via the message header. + * @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments + * + */ + +extern int mca_ptl_mx_send_continue( + struct mca_ptl_base_module_t* ptl, + struct mca_ptl_base_peer_t* ptl_peer, + struct mca_pml_base_send_request_t*, + size_t offset, + size_t size, + int flags +); + + #endif diff --git a/src/mca/ptl/mx/ptl_mx_component.c b/src/mca/ptl/mx/ptl_mx_component.c index 705654639f..3cca87fd69 100644 --- a/src/mca/ptl/mx/ptl_mx_component.c +++ b/src/mca/ptl/mx/ptl_mx_component.c @@ -91,9 +91,11 @@ int mca_ptl_mx_component_open(void) /* register MX module parameters */ mca_ptl_mx_component.mx_filter = - (uint32_t)mca_ptl_mx_param_register_int("filter", 0xdeadbeef); + (uint32_t)mca_ptl_mx_param_register_int("filter", 0x12345); mca_ptl_mx_component.mx_prepost = - mca_ptl_mx_param_register_int("prepost", 16); + mca_ptl_mx_param_register_int("prepost", 1); + mca_ptl_mx_component.mx_debug = + mca_ptl_mx_param_register_int("debug", 0); mca_ptl_mx_component.mx_free_list_num = mca_ptl_mx_param_register_int("free_list_num", 256); mca_ptl_mx_component.mx_free_list_max = @@ -117,6 +119,7 @@ int mca_ptl_mx_component_open(void) int mca_ptl_mx_component_close(void) { + mx_finalize(); if (mca_ptl_mx_component.mx_send_frags.fl_num_allocated != mca_ptl_mx_component.mx_send_frags.super.ompi_list_length) { ompi_output(0, "mx send frags: %d allocated %d returned\n", @@ -232,7 +235,7 @@ int mca_ptl_mx_component_progress(mca_ptl_tstamp_t tstamp) return OMPI_ERROR; } if(mx_result > 0) { - mca_ptl_mx_progress(ptl, mx_request); + MCA_PTL_MX_PROGRESS(ptl, mx_request); } } return OMPI_SUCCESS; diff --git a/src/mca/ptl/mx/ptl_mx_module.c b/src/mca/ptl/mx/ptl_mx_module.c index d32215b0e0..636d919756 100644 --- a/src/mca/ptl/mx/ptl_mx_module.c +++ b/src/mca/ptl/mx/ptl_mx_module.c @@ -112,14 +112,16 @@ static void* mca_ptl_mx_thread(ompi_object_t *arg) &mx_result); if(mx_return == MX_TIMEOUT) continue; - if(mx_return != MX_SUCCESS) { + else if(ptl->mx_thread_run == false) + break; + else if(mx_return != MX_SUCCESS) { ompi_output(0, "mca_ptl_mx_thread: mx_probe() failed with status %d\n", mx_return); break; } - + /* process the pending request */ - mca_ptl_mx_progress(ptl, mx_request); + MCA_PTL_MX_PROGRESS(ptl, mx_request); } return NULL; } @@ -146,7 +148,7 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr) /* open local endpoint */ status = mx_open_endpoint( addr, - MX_ANY_ENDPOINT, + 1, mca_ptl_mx_component.mx_filter, NULL, 0, @@ -165,7 +167,7 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr) mca_ptl_mx_finalize(&ptl->super); return NULL; } - + /* breakup the endpoint address */ if((status = mx_decompose_endpoint_addr( ptl->mx_endpoint_addr, @@ -177,9 +179,19 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr) return NULL; } + if(mca_ptl_mx_component.mx_debug) { + ompi_output(0, "mca_ptl_mx_create: opened %08X:%08X:%08X:%08X\n", + (uint32_t)(ptl->mx_nic_addr >> 32), + (uint32_t)ptl->mx_nic_addr, + ptl->mx_endpoint_id, + ptl->mx_filter); + } + /* pre-post receive buffers */ for(i=0; isuper); return NULL; } @@ -208,10 +220,12 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr) int mca_ptl_mx_finalize(struct mca_ptl_base_module_t* ptl) { mca_ptl_mx_module_t* ptl_mx = (mca_ptl_mx_module_t*)ptl; + mx_wakeup(ptl_mx->mx_endpoint); #if OMPI_HAVE_THREADS ptl_mx->mx_thread_run = false; ompi_thread_join(&ptl_mx->mx_thread, NULL); #endif + mx_close_endpoint(ptl_mx->mx_endpoint); free(ptl_mx); return OMPI_SUCCESS; } diff --git a/src/mca/ptl/mx/ptl_mx_module.h b/src/mca/ptl/mx/ptl_mx_module.h index 09374d5c3a..daf1dc2de0 100644 --- a/src/mca/ptl/mx/ptl_mx_module.h +++ b/src/mca/ptl/mx/ptl_mx_module.h @@ -18,74 +18,109 @@ * Prepost recv buffers */ -static inline int mca_ptl_mx_post(mca_ptl_mx_module_t* ptl) -{ - mca_ptl_mx_recv_frag_t* frag; - mx_return_t status; - int rc; - /* post an additional recv */ - MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc); - if(rc != OMPI_SUCCESS) { - ompi_output(0, "mca_ptl_mx_thread: unable to allocate recv frag\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - mca_ptl_mx_recv_frag_init(frag, ptl); - status = mx_irecv( - ptl->mx_endpoint, - frag->frag_segments, - frag->frag_segment_count, - 1, - MX_MATCH_MASK_NONE, - frag, - &frag->frag_request); - if(status != MX_SUCCESS) { - ompi_output(0, "mca_ptl_mx_post: mx_irecv() failed with status=%d\n", status); - return OMPI_ERROR; - } - return OMPI_SUCCESS; -} +#define MCA_PTL_MX_POST(ptl, rc) \ +do { \ + mca_ptl_mx_recv_frag_t* frag; \ + mx_return_t mx_return; \ + /* post an additional recv */ \ + MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc); \ + if(rc != OMPI_SUCCESS) { \ + ompi_output(0, "mca_ptl_mx_post: unable to allocate recv fragn"); \ + rc = OMPI_ERR_OUT_OF_RESOURCE; \ + break; \ + } \ + frag->frag_recv.frag_base.frag_owner = &ptl->super; \ + frag->frag_recv.frag_base.frag_peer = NULL; \ + frag->frag_segment_count = 2; \ + frag->frag_segments[1].segment_ptr = frag->frag_data; \ + frag->frag_segments[1].segment_length = sizeof(frag->frag_data); \ + \ + mx_return = mx_irecv( \ + ptl->mx_endpoint, \ + frag->frag_segments, \ + frag->frag_segment_count, \ + 1, \ + MX_MATCH_MASK_NONE, \ + frag, \ + &frag->frag_request); \ + if(mx_return != MX_SUCCESS) { \ + ompi_output(0, "mca_ptl_mx_post: mx_irecv() failed with status=%dn", \ + mx_return); \ + rc = OMPI_ERROR; \ + } \ + rc = OMPI_SUCCESS; \ +} while(0) /** * Routine to process complete request(s). */ -static inline void mca_ptl_mx_progress(mca_ptl_mx_module_t* ptl, mx_request_t mx_request) -{ - mx_return_t mx_return; - mx_status_t mx_status; - uint32_t mx_result; - mca_ptl_base_frag_t* frag; - - mx_return = mx_test( - ptl->mx_endpoint, - &mx_request, - &mx_status, - &mx_result); - if(mx_return != MX_SUCCESS) { - ompi_output(0, "mca_ptl_mx_progress: mx_test() failed with status=%d\n", mx_return); - return; - } - - frag = (mca_ptl_base_frag_t*)mx_status.context; - switch(frag->frag_type) { - case MCA_PTL_FRAGMENT_SEND: - { - mca_ptl_mx_send_frag_handler((mca_ptl_mx_send_frag_t*)frag, ptl); - break; - } - case MCA_PTL_FRAGMENT_RECV: - { - mca_ptl_mx_recv_frag_handler((mca_ptl_mx_recv_frag_t*)frag, ptl); - mca_ptl_mx_post(ptl); - break; - } - default: - { - ompi_output(0, "mca_ptl_mx_progress: invalid request type: %d\n", frag->frag_type); - break; - } - } +static inline void MCA_PTL_MX_PROGRESS(mca_ptl_mx_module_t* ptl, mx_request_t mx_request) +{ + mx_return_t mx_return; + mx_status_t mx_status; + uint32_t mx_result; + mca_ptl_base_frag_t* frag; + + mx_return = mx_test( + ptl->mx_endpoint, + &mx_request, + &mx_status, + &mx_result); + if(mx_return != MX_SUCCESS) { + ompi_output(0, "mca_ptl_mx_progress: mx_test() failed with status=%dn", + mx_return); + return; + } + + frag = (mca_ptl_base_frag_t*)mx_status.context; + switch(frag->frag_type) { + case MCA_PTL_FRAGMENT_SEND: + { + mca_ptl_mx_send_frag_t* sendfrag = (mca_ptl_mx_send_frag_t*)frag; + mca_pml_base_send_request_t* sendreq = + sendfrag->frag_send.frag_request; + bool req_cached = sendreq->req_cached; + ptl->super.ptl_send_progress( + &ptl->super, + sendreq, + sendfrag->frag_send.frag_base.frag_size); + if(req_cached == false) + MCA_PTL_MX_SEND_FRAG_RETURN(sendfrag); + break; + } + case MCA_PTL_FRAGMENT_RECV: + { + mca_ptl_mx_recv_frag_t* recvfrag = (mca_ptl_mx_recv_frag_t*)frag; + mca_ptl_base_header_t* hdr = + &recvfrag->frag_recv.frag_base.frag_header; + int rc; + switch(hdr->hdr_common.hdr_type) { + case MCA_PTL_HDR_TYPE_MATCH: + { + if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) { + MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match); + } + ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv, + &hdr->hdr_match); + break; + } + case MCA_PTL_HDR_TYPE_FRAG: + break; + case MCA_PTL_HDR_TYPE_ACK: + break; + } + MCA_PTL_MX_POST(ptl, rc); + break; + } + default: + { + ompi_output(0, "mca_ptl_mx_progress: invalid request type: %dn", + frag->frag_type); + break; + } + } } diff --git a/src/mca/ptl/mx/ptl_mx_proc.c b/src/mca/ptl/mx/ptl_mx_proc.c index 82363a5450..8a1ebd6376 100644 --- a/src/mca/ptl/mx/ptl_mx_proc.c +++ b/src/mca/ptl/mx/ptl_mx_proc.c @@ -141,10 +141,30 @@ mca_ptl_mx_proc_t* mca_ptl_mx_proc_lookup(const ompi_process_name_t *name) int mca_ptl_mx_proc_insert(mca_ptl_mx_proc_t* ptl_proc, mca_ptl_base_peer_t* ptl_peer) { /* insert into peer array */ + mx_endpoint_addr_t addr; + uint64_t mx_nic_addr; + uint32_t mx_endpoint_id; + uint32_t mx_filter; + ptl_peer->peer_proc = ptl_proc; ptl_peer->peer_addr = ptl_proc->proc_addrs[ptl_proc->proc_peer_count]; ptl_proc->proc_peers[ptl_proc->proc_peer_count] = ptl_peer; ptl_proc->proc_peer_count++; + + /* breakup the endpoint address and reconstruct - otherwise it doesn't + * appear to be initialized correctly for this proc + */ + mx_decompose_endpoint_addr( + ptl_peer->peer_addr, + &mx_nic_addr, + &mx_endpoint_id, + &mx_filter); + memset(&ptl_peer->peer_addr, 0, sizeof(ptl_peer->peer_addr)); + mx_compose_endpoint_addr( + mx_nic_addr, + mx_endpoint_id, + mx_filter, + &ptl_peer->peer_addr); return OMPI_SUCCESS; } diff --git a/src/mca/ptl/mx/ptl_mx_recvfrag.c b/src/mca/ptl/mx/ptl_mx_recvfrag.c index f658a2e01e..bda5bd2261 100644 --- a/src/mca/ptl/mx/ptl_mx_recvfrag.c +++ b/src/mca/ptl/mx/ptl_mx_recvfrag.c @@ -22,6 +22,9 @@ OBJ_CLASS_INSTANCE( static void mca_ptl_mx_recv_frag_construct(mca_ptl_mx_recv_frag_t* frag) { + /* one time initialization */ + frag->frag_segments[0].segment_ptr = &frag->frag_recv.frag_base.frag_header; + frag->frag_segments[0].segment_length = sizeof(frag->frag_recv.frag_base.frag_header); } diff --git a/src/mca/ptl/mx/ptl_mx_recvfrag.h b/src/mca/ptl/mx/ptl_mx_recvfrag.h index ac7414c9f8..6f64dea06d 100644 --- a/src/mca/ptl/mx/ptl_mx_recvfrag.h +++ b/src/mca/ptl/mx/ptl_mx_recvfrag.h @@ -36,68 +36,5 @@ OBJ_CLASS_DECLARATION(mca_ptl_mx_recv_frag_t); #define MCA_PTL_MX_RECV_FRAG_RETURN(recvfrag) \ OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_recv_frags, (ompi_list_item_t*)recvfrag); - -/** - * - */ - -static inline void mca_ptl_mx_recv_frag_init( - mca_ptl_mx_recv_frag_t* frag, - mca_ptl_mx_module_t* ptl) -{ - frag->frag_recv.frag_base.frag_owner = &ptl->super; - frag->frag_recv.frag_base.frag_peer = NULL; - frag->frag_segment_count = 2; - frag->frag_segments[0].segment_ptr = &frag->frag_recv.frag_base.frag_header; - frag->frag_segments[0].segment_length = sizeof(frag->frag_recv.frag_base.frag_header); - frag->frag_segments[1].segment_ptr = frag->frag_data; - frag->frag_segments[1].segment_length = sizeof(frag->frag_data); -} - - -/** - * - */ - -static inline void mca_ptl_mx_recv_frag_handler( - mca_ptl_mx_recv_frag_t* frag, - mca_ptl_mx_module_t* ptl) -{ - mca_ptl_base_header_t* hdr = &frag->frag_recv.frag_base.frag_header; - switch(hdr->hdr_common.hdr_type) { - case MCA_PTL_HDR_TYPE_MATCH: - { - if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) { - MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match); - } - ptl->super.ptl_match(&ptl->super, &frag->frag_recv, &hdr->hdr_match); - break; - } - case MCA_PTL_HDR_TYPE_FRAG: - break; - case MCA_PTL_HDR_TYPE_ACK: - break; - } -} - -/** - * - */ - -static inline void mca_ptl_mx_recv_frag_progress( - mca_ptl_mx_recv_frag_t* frag, - mca_ptl_mx_module_t* ptl) -{ - /* copy data into user buffer */ - - /* update request status */ - ptl->super.ptl_recv_progress( - &ptl->super, - frag->frag_recv.frag_request, - frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length, - frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length); -} - - #endif diff --git a/src/mca/ptl/mx/ptl_mx_sendfrag.c b/src/mca/ptl/mx/ptl_mx_sendfrag.c index 8b152d1337..0a508a0362 100644 --- a/src/mca/ptl/mx/ptl_mx_sendfrag.c +++ b/src/mca/ptl/mx/ptl_mx_sendfrag.c @@ -23,6 +23,9 @@ OBJ_CLASS_INSTANCE( static void mca_ptl_mx_send_frag_construct(mca_ptl_mx_send_frag_t* frag) { + /* one time initialization */ + frag->frag_segments[0].segment_ptr = &frag->frag_send.frag_base.frag_header; + frag->frag_segments[0].segment_length = sizeof(mca_ptl_base_header_t); } @@ -36,125 +39,6 @@ static void *mca_ptl_mx_alloc(size_t *size) return malloc(*size); } -/* - * Initialize the fragment based on the current offset into the users - * data buffer, and the indicated size. - */ - -int mca_ptl_mx_send_frag_init( - mca_ptl_mx_send_frag_t* sendfrag, - mca_ptl_mx_peer_t* ptl_peer, - mca_pml_base_send_request_t* sendreq, - size_t offset, - size_t* size, - int flags) -{ - /* message header */ - size_t size_in = *size; - size_t size_out; - unsigned int iov_count, max_data; - - mca_ptl_base_header_t* hdr = &sendfrag->frag_send.frag_base.frag_header; - sendfrag->frag_segments[0].segment_ptr = hdr; - sendfrag->frag_segments[0].segment_length = sizeof(mca_ptl_base_header_t); - - if(offset == 0) { - hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; - hdr->hdr_common.hdr_flags = flags; - hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); - hdr->hdr_frag.hdr_frag_offset = offset; - hdr->hdr_frag.hdr_frag_seq = 0; - hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; - hdr->hdr_frag.hdr_dst_ptr.lval = 0; - hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; - hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; - hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer; - hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag; - hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed; - hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; - } else { - hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; - hdr->hdr_common.hdr_flags = flags; - hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t); - hdr->hdr_frag.hdr_frag_offset = offset; - hdr->hdr_frag.hdr_frag_seq = 0; - hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; - hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; - } - sendfrag->frag_free = 0; - - /* initialize convertor */ - if(size_in > 0) { - ompi_convertor_t *convertor; - struct iovec iov; - int rc; - - convertor = &sendfrag->frag_send.frag_base.frag_convertor; - ompi_convertor_copy(&sendreq->req_convertor, convertor); - ompi_convertor_init_for_send( - convertor, - 0, - sendreq->req_datatype, - sendreq->req_count, - sendreq->req_addr, - offset, - mca_ptl_mx_alloc ); - - /* if data is contigous convertor will return an offset - * into users buffer - otherwise will return an allocated buffer - * that holds the packed data - */ - iov.iov_base = NULL; - iov.iov_len = size_in; - iov_count = 1; - max_data = size_in; - if((rc = ompi_convertor_pack( - convertor, - &iov, - &iov_count, - &max_data, - &(sendfrag->frag_free))) < 0) { - return OMPI_ERROR; - } - /* adjust the freeAfter as the position zero is reserved for the header */ - sendfrag->frag_free <<= 1; - sendfrag->frag_segments[1].segment_ptr = iov.iov_base; - sendfrag->frag_segments[1].segment_length = size_out; - sendfrag->frag_segment_count = 2; - sendfrag->frag_send.frag_base.frag_addr = iov.iov_base; - - /* adjust size and request offset to reflect actual number of bytes packed by convertor */ - size_out = iov.iov_len; - } else { - size_out = size_in; - sendfrag->frag_send.frag_base.frag_addr = NULL; - sendfrag->frag_send.frag_base.frag_size = 0; - sendfrag->frag_segment_count = 1; - } - hdr->hdr_frag.hdr_frag_length = size_out; - - /* convert to network byte order if required */ - if(ptl_peer->peer_byte_swap) { - hdr->hdr_common.hdr_flags |= MCA_PTL_FLAGS_NBO; - if(offset == 0) { - MCA_PTL_BASE_MATCH_HDR_HTON(hdr->hdr_match); - } else { - MCA_PTL_BASE_FRAG_HDR_HTON(hdr->hdr_frag); - } - } - - /* fragment state */ - sendfrag->frag_send.frag_base.frag_owner = &ptl_peer->peer_ptl->super; - sendfrag->frag_send.frag_request = sendreq; - sendfrag->frag_send.frag_base.frag_size = size_out; - sendfrag->frag_send.frag_base.frag_peer = ptl_peer; - - *size = size_out; - return OMPI_SUCCESS; -} - void mca_ptl_mx_send_frag_init_ack( mca_ptl_mx_send_frag_t* ack, diff --git a/src/mca/ptl/mx/ptl_mx_sendfrag.h b/src/mca/ptl/mx/ptl_mx_sendfrag.h index 15f6a30ba0..7a102de3f5 100644 --- a/src/mca/ptl/mx/ptl_mx_sendfrag.h +++ b/src/mca/ptl/mx/ptl_mx_sendfrag.h @@ -50,59 +50,6 @@ typedef struct mca_ptl_mx_send_frag_t mca_ptl_mx_send_frag_t; OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t); -/* - * Initialize the fragment based on the current offset into the users - * data buffer, and the indicated size. - */ - -int mca_ptl_mx_send_frag_init( - mca_ptl_mx_send_frag_t* sendfrag, - struct mca_ptl_base_peer_t* ptl_peer, - mca_pml_base_send_request_t* sendreq, - size_t offset, - size_t* size, - int flags); - -/* - * Start the MX send for the fragment. - */ - -static inline int mca_ptl_mx_send_frag_start( - mca_ptl_mx_send_frag_t* sendfrag, - mca_ptl_mx_module_t* ptl) -{ - mx_return_t mx_return = mx_isend( - ptl->mx_endpoint, - sendfrag->frag_segments, - sendfrag->frag_segment_count, - sendfrag->frag_send.frag_base.frag_peer->peer_addr, - 1, - sendfrag, - &sendfrag->frag_request); - if(mx_return != MX_SUCCESS) { - ompi_output(0, "mca_ptl_mx_send_frag_start: mx_isend() failed with return value=%d\n", mx_return); - return OMPI_ERROR; - } - return OMPI_SUCCESS; -} - - -/** - * Callback on MX send completion. - */ - -static inline void mca_ptl_mx_send_frag_handler( - mca_ptl_mx_send_frag_t* sendfrag, - mca_ptl_mx_module_t* ptl) -{ - ptl->super.ptl_send_progress( - &ptl->super, - sendfrag->frag_send.frag_request, - sendfrag->frag_send.frag_base.frag_size); - if(sendfrag->frag_send.frag_base.frag_header.hdr_frag.hdr_frag_offset != 0) - MCA_PTL_MX_SEND_FRAG_RETURN(sendfrag); -} - void mca_ptl_mx_send_frag_init_ack( mca_ptl_mx_send_frag_t* ack, diff --git a/src/mpi/runtime/ompi_mpi_finalize.c b/src/mpi/runtime/ompi_mpi_finalize.c index 85f66e97f7..8d242ffc92 100644 --- a/src/mpi/runtime/ompi_mpi_finalize.c +++ b/src/mpi/runtime/ompi_mpi_finalize.c @@ -20,6 +20,7 @@ #include "info/info.h" #include "util/proc_info.h" #include "runtime/runtime.h" +#include "runtime/ompi_progress.h" #include "runtime/ompi_rte_wait.h" #include "mca/base/base.h" @@ -43,6 +44,9 @@ int ompi_mpi_finalize(void) int ret; ompi_mpi_finalized = true; +#if OMPI_HAVE_THREADS == 0 + ompi_progress_events(OMPI_EVLOOP_ONCE); +#endif /* unregister process */ if (OMPI_SUCCESS != (ret = ompi_registry.rte_unregister( diff --git a/src/mpi/runtime/ompi_mpi_init.c b/src/mpi/runtime/ompi_mpi_init.c index 534449ea43..3e683e14c5 100644 --- a/src/mpi/runtime/ompi_mpi_init.c +++ b/src/mpi/runtime/ompi_mpi_init.c @@ -308,6 +308,10 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) goto error; } +#if OMPI_HAVE_THREADS == 0 + ompi_progress_events(OMPI_EVLOOP_NONBLOCK); +#endif + error: if (ret != OMPI_SUCCESS) { ompi_show_help("help-mpi-runtime", diff --git a/src/runtime/ompi_progress.c b/src/runtime/ompi_progress.c index cb8e30fa54..9460f35e92 100644 --- a/src/runtime/ompi_progress.c +++ b/src/runtime/ompi_progress.c @@ -4,9 +4,17 @@ #include "runtime/ompi_progress.h" +static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE; + +void ompi_progress_events(int flag) +{ + ompi_progress_event_flag = flag; +} + void ompi_progress(void) { - ompi_event_loop(OMPI_EVLOOP_NONBLOCK); + if(ompi_progress_event_flag != 0) + ompi_event_loop(ompi_progress_event_flag); mca_pml.pml_progress(); } diff --git a/src/runtime/ompi_progress.h b/src/runtime/ompi_progress.h index a00f550213..7d65cdfaf4 100644 --- a/src/runtime/ompi_progress.h +++ b/src/runtime/ompi_progress.h @@ -3,7 +3,11 @@ #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif + +OMPI_DECLSPEC extern void ompi_progress_events(int); + OMPI_DECLSPEC extern void ompi_progress(void); + #if defined(c_plusplus) || defined(__cplusplus) } #endif