From 8c1027d9748c1a07cef9d45e87c5d198abf1c809 Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Tue, 20 Dec 2005 21:42:58 +0000 Subject: [PATCH] first cut at ack/retrans protocol This commit was SVN r8570. --- ompi/mca/pml/dr/Makefile.am | 5 +- ompi/mca/pml/dr/pml_dr.c | 2 +- ompi/mca/pml/dr/pml_dr.h | 4 + ompi/mca/pml/dr/pml_dr_comm.c | 13 +- ompi/mca/pml/dr/pml_dr_comm.h | 12 +- ompi/mca/pml/dr/pml_dr_component.c | 10 + ompi/mca/pml/dr/pml_dr_endpoint.h | 35 -- ompi/mca/pml/dr/pml_dr_hdr.h | 37 +- ompi/mca/pml/dr/pml_dr_proc.h | 52 --- ompi/mca/pml/dr/pml_dr_recvfrag.c | 124 +++++-- ompi/mca/pml/dr/pml_dr_recvfrag.h | 56 ++-- ompi/mca/pml/dr/pml_dr_recvreq.c | 152 ++++++++- ompi/mca/pml/dr/pml_dr_recvreq.h | 94 +++++- ompi/mca/pml/dr/pml_dr_sendreq.c | 316 ++++++++++++++---- ompi/mca/pml/dr/pml_dr_sendreq.h | 208 +++++++++--- .../dr/{pml_dr_endpoint.c => pml_dr_vfrag.c} | 35 +- ompi/mca/pml/dr/pml_dr_vfrag.h | 79 +++++ 17 files changed, 923 insertions(+), 311 deletions(-) delete mode 100644 ompi/mca/pml/dr/pml_dr_endpoint.h rename ompi/mca/pml/dr/{pml_dr_endpoint.c => pml_dr_vfrag.c} (60%) create mode 100644 ompi/mca/pml/dr/pml_dr_vfrag.h diff --git a/ompi/mca/pml/dr/Makefile.am b/ompi/mca/pml/dr/Makefile.am index 69447d3c25..afddbb38e7 100644 --- a/ompi/mca/pml/dr/Makefile.am +++ b/ompi/mca/pml/dr/Makefile.am @@ -23,8 +23,6 @@ ob1_sources = \ pml_dr_comm.h \ pml_dr_component.c \ pml_dr_component.h \ - pml_dr_endpoint.c \ - pml_dr_endpoint.h \ pml_dr_hdr.h \ pml_dr_iprobe.c \ pml_dr_irecv.c \ @@ -38,7 +36,8 @@ ob1_sources = \ pml_dr_recvreq.h \ pml_dr_sendreq.c \ pml_dr_sendreq.h \ - pml_dr_start.c + pml_dr_start.c \ + pml_dr_vfrag.c if OMPI_BUILD_pml_dr_DSO component_noinst = diff --git a/ompi/mca/pml/dr/pml_dr.c b/ompi/mca/pml/dr/pml_dr.c index 8fa1a43108..db962779a8 100644 --- a/ompi/mca/pml/dr/pml_dr.c +++ b/ompi/mca/pml/dr/pml_dr.c @@ -74,7 +74,7 @@ int mca_pml_dr_add_comm(ompi_communicator_t* comm) if (NULL == pml_comm) { return OMPI_ERR_OUT_OF_RESOURCE; } - mca_pml_dr_comm_init_size(pml_comm, comm->c_remote_group->grp_proc_count); + mca_pml_dr_comm_init(pml_comm, comm); comm->c_pml_comm = pml_comm; comm->c_pml_procs = (mca_pml_proc_t**)malloc( comm->c_remote_group->grp_proc_count * sizeof(mca_pml_proc_t)); diff --git a/ompi/mca/pml/dr/pml_dr.h b/ompi/mca/pml/dr/pml_dr.h index 75528085b2..7e609104cb 100644 --- a/ompi/mca/pml/dr/pml_dr.h +++ b/ompi/mca/pml/dr/pml_dr.h @@ -53,6 +53,9 @@ struct mca_pml_dr_t { size_t send_pipeline_depth; bool enabled; + time_t tout_ack; + time_t tout_watch_dog; + /* lock queue access */ opal_mutex_t lock; @@ -64,6 +67,7 @@ struct mca_pml_dr_t { ompi_free_list_t send_requests; ompi_free_list_t recv_requests; ompi_free_list_t recv_frags; + ompi_free_list_t vfrags; ompi_free_list_t buffers; }; typedef struct mca_pml_dr_t mca_pml_dr_t; diff --git a/ompi/mca/pml/dr/pml_dr_comm.c b/ompi/mca/pml/dr/pml_dr_comm.c index 35a31d3b11..0f82153229 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.c +++ b/ompi/mca/pml/dr/pml_dr_comm.c @@ -27,6 +27,7 @@ static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc) { proc->expected_sequence = 1; + proc->vfrag_id = 1; proc->send_sequence = 0; OBJ_CONSTRUCT(&proc->frags_cant_match, opal_list_t); OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t); @@ -78,19 +79,21 @@ OBJ_CLASS_INSTANCE( mca_pml_dr_comm_destruct); -int mca_pml_dr_comm_init_size(mca_pml_dr_comm_t* comm, size_t size) +int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_comm) { size_t i; + size_t size = ompi_comm->c_remote_group->grp_proc_count; /* send message sequence-number support - sender side */ - comm->procs = malloc(sizeof(mca_pml_dr_comm_proc_t)*size); - if(NULL == comm->procs) { + dr_comm->procs = malloc(sizeof(mca_pml_dr_comm_proc_t)*size); + if(NULL == dr_comm->procs) { return OMPI_ERR_OUT_OF_RESOURCE; } for(i=0; iprocs+i, mca_pml_dr_comm_proc_t); + OBJ_CONSTRUCT(dr_comm->procs+i, mca_pml_dr_comm_proc_t); + dr_comm->procs[i].ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i]; } - comm->num_procs = size; + dr_comm->num_procs = size; return OMPI_SUCCESS; } diff --git a/ompi/mca/pml/dr/pml_dr_comm.h b/ompi/mca/pml/dr/pml_dr_comm.h index fc9adae1db..a17277837f 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.h +++ b/ompi/mca/pml/dr/pml_dr_comm.h @@ -25,6 +25,8 @@ #include "opal/threads/condition.h" #include "mca/ptl/ptl.h" #include "opal/class/opal_list.h" +#include "ompi/communicator/communicator.h" +#include "ompi/proc/proc.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif @@ -33,6 +35,7 @@ extern "C" { struct mca_pml_dr_comm_proc_t { opal_object_t super; uint16_t expected_sequence; /**< send message sequence number - receiver side */ + uint32_t vfrag_id; /**< virtual fragment identifier */ #if OMPI_HAVE_THREAD_SUPPORT volatile int32_t send_sequence; /**< send side sequence number */ #else @@ -41,6 +44,7 @@ struct mca_pml_dr_comm_proc_t { opal_list_t frags_cant_match; /**< out-of-order fragment queues */ opal_list_t specific_receives; /**< queues of unmatched specific receives */ opal_list_t unexpected_frags; /**< unexpected fragment queues */ + ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */ }; typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t; @@ -69,12 +73,12 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_comm_t); /** * Initialize an instance of mca_pml_dr_comm_t based on the communicator size. * - * @param comm Instance of mca_pml_dr_comm_t - * @param size Size of communicator - * @return OMPI_SUCCESS or error status on failure. + * @param dr_comm Instance of mca_pml_dr_comm_t + * @param pml_comm Communicator + * @return OMPI_SUCCESS or error status on failure. */ -OMPI_DECLSPEC extern int mca_pml_dr_comm_init_size(mca_pml_dr_comm_t* comm, size_t size); +OMPI_DECLSPEC extern int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_comm); #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/ompi/mca/pml/dr/pml_dr_component.c b/ompi/mca/pml/dr/pml_dr_component.c index 9fbf8007d4..fc0e54f307 100644 --- a/ompi/mca/pml/dr/pml_dr_component.c +++ b/ompi/mca/pml/dr/pml_dr_component.c @@ -125,6 +125,16 @@ int mca_pml_dr_component_open(void) mca_pml_dr.free_list_inc, NULL); + OBJ_CONSTRUCT(&mca_pml_dr.vfrags, ompi_free_list_t); + ompi_free_list_init( + &mca_pml_dr.vfrags, + sizeof(mca_pml_dr_vfrag_t), + OBJ_CLASS(mca_pml_dr_vfrag_t), + mca_pml_dr.free_list_num, + mca_pml_dr.free_list_max, + mca_pml_dr.free_list_inc, + NULL); + OBJ_CONSTRUCT(&mca_pml_dr.send_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_dr.acks_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_dr.buffers, ompi_free_list_t); diff --git a/ompi/mca/pml/dr/pml_dr_endpoint.h b/ompi/mca/pml/dr/pml_dr_endpoint.h deleted file mode 100644 index 5c133d94d8..0000000000 --- a/ompi/mca/pml/dr/pml_dr_endpoint.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana - * University Research and Technology - * Corporation. All rights reserved. - * Copyright (c) 2004-2005 The University of Tennessee and The University - * of Tennessee Research Foundation. All rights - * reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, - * University of Stuttgart. All rights reserved. - * Copyright (c) 2004-2005 The Regents of the University of California. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ -/** - * @file - */ -#ifndef MCA_PML_DR_ENDPOINT_H -#define MCA_PML_DR_ENDPOINT_H - -#include "opal/util/output.h" -#include "mca/btl/btl.h" -#if defined(c_plusplus) || defined(__cplusplus) -extern "C" { -#endif - - -#if defined(c_plusplus) || defined(__cplusplus) -} -#endif -#endif - diff --git a/ompi/mca/pml/dr/pml_dr_hdr.h b/ompi/mca/pml/dr/pml_dr_hdr.h index c30ffb45e1..a3a124d540 100644 --- a/ompi/mca/pml/dr/pml_dr_hdr.h +++ b/ompi/mca/pml/dr/pml_dr_hdr.h @@ -31,15 +31,13 @@ #define MCA_PML_DR_HDR_TYPE_MATCH 1 #define MCA_PML_DR_HDR_TYPE_RNDV 2 -#define MCA_PML_DR_HDR_TYPE_ACK 4 -#define MCA_PML_DR_HDR_TYPE_NACK 5 -#define MCA_PML_DR_HDR_TYPE_FRAG 6 -#define MCA_PML_DR_HDR_TYPE_MAX 10 +#define MCA_PML_DR_HDR_TYPE_ACK 3 +#define MCA_PML_DR_HDR_TYPE_NACK 4 +#define MCA_PML_DR_HDR_TYPE_FRAG 5 -#define MCA_PML_DR_HDR_FLAGS_ACK 1 /* is an ack required */ -#define MCA_PML_DR_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */ -#define MCA_PML_DR_HDR_FLAGS_PIN 4 /* is user buffer pinned */ -#define MCA_PML_DR_HDR_FLAGS_CONTIG 8 /* is user buffer contiguous */ +#define MCA_PML_DR_HDR_FLAGS_ACK 1 /* is an ack required */ +#define MCA_PML_DR_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */ +#define MCA_PML_DR_HDR_FLAGS_MATCH 4 /* is the ack in response to a match */ /* @@ -88,6 +86,7 @@ static inline uint64_t ntoh64(uint64_t val) struct mca_pml_dr_common_hdr_t { uint8_t hdr_type; /**< type of envelope */ uint8_t hdr_flags; /**< flags indicating how fragment should be processed */ + uint16_t hdr_csum; /**< checksum over header */ }; typedef struct mca_pml_dr_common_hdr_t mca_pml_dr_common_hdr_t; @@ -99,11 +98,14 @@ typedef struct mca_pml_dr_common_hdr_t mca_pml_dr_common_hdr_t; * attributes required to match the corresponding posted receive. */ struct mca_pml_dr_match_hdr_t { - mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ + mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ + uint32_t hdr_vid; /**< vfrag id */ uint16_t hdr_ctx; /**< communicator index */ + uint16_t hdr_seq; /**< message sequence number */ int32_t hdr_src; /**< source rank */ int32_t hdr_tag; /**< user tag */ - uint16_t hdr_seq; /**< message sequence number */ + uint32_t hdr_csum; /**< checksum over data */ + ompi_ptr_t hdr_src_req; /**< pointer to source request - returned in ack */ }; typedef struct mca_pml_dr_match_hdr_t mca_pml_dr_match_hdr_t; @@ -133,7 +135,6 @@ typedef struct mca_pml_dr_match_hdr_t mca_pml_dr_match_hdr_t; struct mca_pml_dr_rendezvous_hdr_t { mca_pml_dr_match_hdr_t hdr_match; uint64_t hdr_msg_length; /**< message length */ - ompi_ptr_t hdr_src_req; /**< pointer to source request - returned in ack */ }; typedef struct mca_pml_dr_rendezvous_hdr_t mca_pml_dr_rendezvous_hdr_t; @@ -153,10 +154,14 @@ typedef struct mca_pml_dr_rendezvous_hdr_t mca_pml_dr_rendezvous_hdr_t; * Header for subsequent fragments. */ struct mca_pml_dr_frag_hdr_t { - mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ - uint64_t hdr_frag_offset; /**< offset into message */ - ompi_ptr_t hdr_src_req; /**< pointer to source request */ - ompi_ptr_t hdr_dst_req; /**< pointer to matched receive */ + mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ + uint32_t hdr_vid; /**< virtual frag id */ + uint16_t hdr_vlen; /**< length of entire vfrag */ + uint16_t hdr_frag_idx; /**< bit index of this frag w/in vfrag */ + uint32_t hdr_frag_csum; /**< checksum over data */ + uint64_t hdr_frag_offset; /**< absolute offset of this fragment */ + ompi_ptr_t hdr_src_req; /**< pointer to source req */ + ompi_ptr_t hdr_dst_req; /**< pointer to receive req */ }; typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t; @@ -179,6 +184,8 @@ typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t; struct mca_pml_dr_ack_hdr_t { mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ + uint32_t hdr_vid; /**< virtual fragment id */ + uint64_t hdr_vmask; /**< acknowledged frags */ ompi_ptr_t hdr_src_req; /**< source request */ ompi_ptr_t hdr_dst_req; /**< matched receive request */ }; diff --git a/ompi/mca/pml/dr/pml_dr_proc.h b/ompi/mca/pml/dr/pml_dr_proc.h index c272e3f93d..aa133fb38f 100644 --- a/ompi/mca/pml/dr/pml_dr_proc.h +++ b/ompi/mca/pml/dr/pml_dr_proc.h @@ -25,7 +25,6 @@ #include "communicator/communicator.h" #include "group/group.h" #include "proc/proc.h" -#include "pml_dr_endpoint.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { @@ -40,57 +39,6 @@ struct mca_pml_dr_proc_t { typedef struct mca_pml_dr_proc_t mca_pml_dr_proc_t; OMPI_COMP_EXPORT extern opal_class_t mca_pml_dr_proc_t_class; -/** - * Return the mca_pml_proc_t instance cached in the communicators local group. - * - * @param comm Communicator - * @param rank Peer rank - * @return mca_pml_proc_t instance - */ - -/* static inline mca_pml_dr_proc_t* mca_pml_dr_proc_lookup_local(ompi_communicator_t* comm, int rank) */ -/* { */ -/* return (mca_pml_dr_proc_t*) comm->c_local_group->grp_proc_pointers[rank]->proc_pml; */ -/* } */ - -/* /\** */ -/* * Return the mca_pml_proc_t instance cached on the communicators remote group. */ -/* * */ -/* * @param comm Communicator */ -/* * @param rank Peer rank */ -/* * @return mca_pml_proc_t instance */ -/* *\/ */ - -/* static inline mca_pml_dr_proc_t* mca_pml_dr_proc_lookup_remote(ompi_communicator_t* comm, int rank) */ -/* { */ -/* return (mca_pml_dr_proc_t*) comm->c_pml_procs[rank]; */ -/* } */ - -/* /\** */ -/* * Return the mca_btl_peer_t instance corresponding to the process/btl combination. */ -/* * */ -/* * @param comm Communicator */ -/* * @param rank Peer rank */ -/* * @return mca_pml_proc_t instance */ -/* *\/ */ - -/* static inline struct mca_btl_base_endpoint_t* mca_pml_dr_proc_lookup_remote_endpoint( */ -/* ompi_communicator_t* comm, */ -/* int rank, */ -/* struct mca_btl_base_module_t* btl) */ -/* { */ -/* mca_pml_dr_proc_t* proc = (mca_pml_dr_proc_t*) comm->c_pml_procs[rank]; */ -/* size_t i, size = mca_pml_dr_ep_array_get_size(&proc->btl_eager); */ -/* mca_pml_dr_endpoint_t* endpoint = proc->btl_eager.arr_endpoints; */ -/* for(i = 0; i < size; i++) { */ -/* if(endpoint->btl == btl) { */ -/* return endpoint->btl_endpoint; */ -/* } */ -/* endpoint++; */ -/* } */ -/* return NULL; */ -/* } */ - #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index aedb25a637..78c21704d7 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -50,6 +50,19 @@ OBJ_CLASS_INSTANCE( NULL ); +/* + * Release resources. + */ + +static void mca_pml_dr_ctl_completion( + mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t* ep, + struct mca_btl_base_descriptor_t* des, + int status) +{ + mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*)des->des_context; + MCA_BML_BASE_BTL_DES_RETURN(bml_btl, des); +} /** @@ -79,8 +92,14 @@ void mca_pml_dr_recv_frag_callback( { mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) hdr->hdr_ack.hdr_src_req.pval; - sendreq->req_recv = hdr->hdr_ack.hdr_dst_req; - MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); + mca_pml_dr_send_request_acked(sendreq, &hdr->hdr_ack); + break; + } + case MCA_PML_DR_HDR_TYPE_NACK: + { + mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) + hdr->hdr_ack.hdr_src_req.pval; + mca_pml_dr_send_request_nacked(sendreq, &hdr->hdr_ack); break; } case MCA_PML_DR_HDR_TYPE_FRAG: @@ -382,7 +401,7 @@ static bool mca_pml_dr_check_cantmatch_for_match( * - fragments may be corrupt * - this routine may be called simultaneously by more than one thread */ -int mca_pml_dr_recv_frag_match( +bool mca_pml_dr_recv_frag_match( mca_btl_base_module_t *btl, mca_pml_dr_match_hdr_t *hdr, mca_btl_base_segment_t* segments, @@ -396,6 +415,7 @@ int mca_pml_dr_recv_frag_match( mca_pml_dr_comm_proc_t *proc; bool additional_match=false; opal_list_t additional_matches; + ompi_proc_t* ompi_proc; int rc; /* communicator pointer */ @@ -405,6 +425,7 @@ int mca_pml_dr_recv_frag_match( /* source sequence number */ frag_msg_seq = hdr->hdr_seq; proc = comm->procs + hdr->hdr_src; + ompi_proc = proc->ompi_proc; /* get next expected message sequence number - if threaded * run, lock to make sure that if another thread is processing @@ -427,6 +448,7 @@ int mca_pml_dr_recv_frag_match( /* We're now expecting the next sequence number. */ (proc->expected_sequence)++; +rematch: /* * figure out what sort of matching logic to use, if need to @@ -459,15 +481,12 @@ int mca_pml_dr_recv_frag_match( */ if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) { - /* Match a probe, rollback the next expected sequence number */ - (proc->expected_sequence)--; - OPAL_THREAD_UNLOCK(&comm->matching_lock); - /* complete the probe */ mca_pml_dr_recv_request_matched_probe(match,btl,segments,num_segments); - /* attempt to match actual request */ - return mca_pml_dr_recv_frag_match(btl,hdr,segments,num_segments); + /* retry the matchh */ + match = NULL; + goto rematch; } } else { @@ -478,7 +497,7 @@ int mca_pml_dr_recv_frag_match( OPAL_THREAD_UNLOCK(&comm->matching_lock); return rc; } - MCA_PML_DR_RECV_FRAG_INIT(frag,hdr,segments,num_segments,btl); + MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl); opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag ); } @@ -503,9 +522,8 @@ int mca_pml_dr_recv_frag_match( OPAL_THREAD_UNLOCK(&comm->matching_lock); return rc; } - MCA_PML_DR_RECV_FRAG_INIT(frag,hdr,segments,num_segments,btl); + MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl); opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag); - } OPAL_THREAD_UNLOCK(&comm->matching_lock); @@ -522,7 +540,7 @@ int mca_pml_dr_recv_frag_match( MCA_PML_DR_RECV_FRAG_RETURN(frag); } } - return OMPI_SUCCESS; + return (match != NULL); } @@ -550,7 +568,6 @@ static bool mca_pml_dr_check_cantmatch_for_match( int match_found; uint16_t next_msg_seq_expected, frag_seq; mca_pml_dr_recv_frag_t *frag; - mca_pml_dr_recv_request_t *match = NULL; bool match_made = false; /* @@ -582,6 +599,7 @@ static bool mca_pml_dr_check_cantmatch_for_match( */ frag_seq=frag->hdr.hdr_match.hdr_seq; if (frag_seq == next_msg_seq_expected) { + mca_pml_dr_recv_request_t *match = NULL; mca_pml_dr_match_hdr_t* hdr = &frag->hdr.hdr_match; /* We're now expecting the next sequence number. */ @@ -596,6 +614,7 @@ static bool mca_pml_dr_check_cantmatch_for_match( opal_list_remove_item(&proc->frags_cant_match, (opal_list_item_t *)frag); +rematch: /* * figure out what sort of matching logic to use, if need to * look only at "specific" receives, or "wild" receives, @@ -623,18 +642,33 @@ static bool mca_pml_dr_check_cantmatch_for_match( /* if match found, process data */ if (match) { - /* associate the receive descriptor with the fragment - * descriptor */ - frag->request=match; - - /* add this fragment descriptor to the list of - * descriptors to be processed later + /* + * If this was a probe need to queue fragment on unexpected list */ - if(match_made == false) { - match_made = true; - OBJ_CONSTRUCT(additional_matches, opal_list_t); + if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) { + + /* complete the probe */ + mca_pml_dr_recv_request_matched_probe(match,frag->btl,frag->segments,frag->num_segments); + + /* retry the match */ + match = NULL; + goto rematch; + + } else { + + /* associate the receive descriptor with the fragment + * descriptor */ + frag->request=match; + + /* add this fragment descriptor to the list of + * descriptors to be processed later + */ + if(match_made == false) { + match_made = true; + OBJ_CONSTRUCT(additional_matches, opal_list_t); + } + opal_list_append(additional_matches, (opal_list_item_t *)frag); } - opal_list_append(additional_matches, (opal_list_item_t *)frag); } else { @@ -656,3 +690,45 @@ static bool mca_pml_dr_check_cantmatch_for_match( return match_made; } + +/** + * Generate an acknowledgement to the request + */ + +void mca_pml_dr_recv_frag_ack(mca_pml_dr_recv_frag_t* frag) +{ + mca_pml_dr_ack_hdr_t* ack; + mca_btl_base_descriptor_t* des; + mca_bml_base_endpoint_t* bml_endpoint; + mca_bml_base_btl_t* bml_btl; + int rc; + + /* lookup btl */ + bml_endpoint = (mca_bml_base_endpoint_t*)frag->proc->proc_pml; + bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager); + + /* allocate descriptor */ + MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t)); + if(NULL == des) { + return; + } + + /* fill out header */ + ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; + ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; + ack->hdr_common.hdr_flags = 0; + ack->hdr_vmask = 1; + ack->hdr_vid = frag->hdr.hdr_match.hdr_vid; + ack->hdr_src_req = frag->hdr.hdr_match.hdr_src_req; + ack->hdr_dst_req.pval = NULL; + + /* initialize descriptor */ + des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; + des->des_cbfunc = mca_pml_dr_ctl_completion; + + rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); + if(rc != OMPI_SUCCESS) { + mca_bml_base_free(bml_btl, des); + } +} + diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.h b/ompi/mca/pml/dr/pml_dr_recvfrag.h index edf860cb18..cb8db61efe 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.h +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.h @@ -41,7 +41,9 @@ struct mca_pml_dr_recv_frag_t { mca_pml_dr_hdr_t hdr; struct mca_pml_dr_recv_request_t* request; size_t num_segments; + uint32_t csum; mca_btl_base_module_t* btl; + ompi_proc_t* proc; mca_btl_base_segment_t segments[MCA_BTL_DES_MAX_SEGMENTS]; mca_pml_dr_buffer_t* buffers[MCA_BTL_DES_MAX_SEGMENTS]; }; @@ -50,30 +52,33 @@ typedef struct mca_pml_dr_recv_frag_t mca_pml_dr_recv_frag_t; OBJ_CLASS_DECLARATION(mca_pml_dr_recv_frag_t); -#define MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc) \ +#define MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc) \ do { \ opal_list_item_t* item; \ - OMPI_FREE_LIST_WAIT(&mca_pml_dr.recv_frags, item, rc); \ - frag = (mca_pml_dr_recv_frag_t*)item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_dr.recv_frags, item, rc); \ + frag = (mca_pml_dr_recv_frag_t*)item; \ } while(0) -#define MCA_PML_DR_RECV_FRAG_INIT(frag, hdr,segs,cnt,btl) \ +#define MCA_PML_DR_RECV_FRAG_INIT(frag,oproc,hdr,segs,cnt,btl) \ do { \ size_t i; \ mca_btl_base_segment_t* macro_segments = frag->segments; \ - mca_pml_dr_buffer_t** buffers = frag->buffers; \ + mca_pml_dr_buffer_t** buffers = frag->buffers; \ \ /* init recv_frag */ \ frag->btl = btl; \ - frag->hdr = *(mca_pml_dr_hdr_t*)hdr; \ + frag->hdr = *(mca_pml_dr_hdr_t*)hdr; \ frag->num_segments = cnt; \ + frag->csum = 0; \ + frag->proc = oproc; \ + \ /* copy over data */ \ for(i=0; iaddr; \ macro_segments[i].seg_len = segs[i].seg_len; \ @@ -81,23 +86,24 @@ do { \ segs[i].seg_addr.pval, \ segs[i].seg_len); \ } \ + mca_pml_dr_recv_frag_ack(frag); \ \ } while(0) -#define MCA_PML_DR_RECV_FRAG_RETURN(frag) \ +#define MCA_PML_DR_RECV_FRAG_RETURN(frag) \ do { \ size_t i; \ \ /* return buffers */ \ for(i=0; inum_segments; i++) { \ - OMPI_FREE_LIST_RETURN(&mca_pml_dr.buffers, \ + OMPI_FREE_LIST_RETURN(&mca_pml_dr.buffers, \ (opal_list_item_t*)frag->buffers[i]); \ } \ frag->num_segments = 0; \ \ /* return recv_frag */ \ - OMPI_FREE_LIST_RETURN(&mca_pml_dr.recv_frags, \ + OMPI_FREE_LIST_RETURN(&mca_pml_dr.recv_frags, \ (opal_list_item_t*)frag); \ } while(0) @@ -107,11 +113,10 @@ do { \ */ OMPI_DECLSPEC void mca_pml_dr_recv_frag_callback( - mca_btl_base_module_t *btl, - mca_btl_base_tag_t tag, - mca_btl_base_descriptor_t* descriptor, - void* cbdata - ); + mca_btl_base_module_t *btl, + mca_btl_base_tag_t tag, + mca_btl_base_descriptor_t* descriptor, + void* cbdata); /** * Match incoming recv_frags against posted receives. @@ -123,11 +128,18 @@ OMPI_DECLSPEC void mca_pml_dr_recv_frag_callback( * @param additional_matches (OUT) List of additional matches * @return OMPI_SUCCESS or error status on failure. */ -OMPI_DECLSPEC int mca_pml_dr_recv_frag_match( - mca_btl_base_module_t* btl, - mca_pml_dr_match_hdr_t *hdr, - mca_btl_base_segment_t* segments, - size_t num_segments); +OMPI_DECLSPEC bool mca_pml_dr_recv_frag_match( + mca_btl_base_module_t* btl, + mca_pml_dr_match_hdr_t *hdr, + mca_btl_base_segment_t* segments, + size_t num_segments); + +/** + * Generate an acknowledgment for an unexpected/out-of-order fragment + */ + +OMPI_DECLSPEC void mca_pml_dr_recv_frag_ack( + mca_pml_dr_recv_frag_t* frag); #if defined(c_plusplus) || defined(__cplusplus) diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index a90f09c0cd..aa7c82c43d 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -79,8 +79,8 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i ompi_request->req_status._cancelled = true; ompi_request->req_complete = true; /* mark it as completed so all the test/wait functions * on this particular request will finish */ - /* Now we have a problem if we are in a multi-threaded environment. We shou ld - * broadcast the condition on the request in order to allow the other threa ds + /* Now we have a problem if we are in a multi-threaded environment. We should + * broadcast the condition on the request in order to allow the other threads * to complete their test/wait functions. */ ompi_request_completed++; @@ -97,10 +97,16 @@ static void mca_pml_dr_recv_request_construct(mca_pml_dr_recv_request_t* request request->req_recv.req_base.req_ompi.req_fini = mca_pml_dr_recv_request_fini; request->req_recv.req_base.req_ompi.req_free = mca_pml_dr_recv_request_free; request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel; + OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t); + OBJ_CONSTRUCT(&request->req_vfrags, opal_list_t); + OBJ_CONSTRUCT(&request->req_mutex, opal_mutex_t); } static void mca_pml_dr_recv_request_destruct(mca_pml_dr_recv_request_t* request) { + OBJ_DESTRUCT(&request->req_vfrag0); + OBJ_DESTRUCT(&request->req_vfrags); + OBJ_DESTRUCT(&request->req_mutex); } @@ -127,15 +133,15 @@ static void mca_pml_dr_ctl_completion( /* - * + * Generate an ack to the peer after first fragment is matched. */ -static void mca_pml_dr_recv_request_ack( +static void mca_pml_dr_recv_request_matched( mca_pml_dr_recv_request_t* recvreq, mca_pml_dr_rendezvous_hdr_t* hdr, - size_t bytes_received) + uint8_t type) { - ompi_proc_t* proc = (ompi_proc_t*) recvreq->req_proc; + ompi_proc_t* proc = recvreq->req_proc; mca_bml_base_endpoint_t* bml_endpoint = NULL; mca_btl_base_descriptor_t* des; mca_bml_base_btl_t* bml_btl; @@ -160,9 +166,11 @@ static void mca_pml_dr_recv_request_ack( /* fill out header */ ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; - ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; - ack->hdr_common.hdr_flags = 0; - ack->hdr_src_req = hdr->hdr_src_req; + ack->hdr_common.hdr_type = type; + ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH; + ack->hdr_vid = hdr->hdr_match.hdr_vid; + ack->hdr_vmask = 0x1; + ack->hdr_src_req = hdr->hdr_match.hdr_src_req; ack->hdr_dst_req.pval = recvreq; /* initialize descriptor */ @@ -185,6 +193,94 @@ retry: opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag); } +/* + * Generate an ack to the peer after first fragment is matched. + */ + +static void mca_pml_dr_recv_request_nack( + mca_pml_dr_recv_request_t* recvreq, + mca_pml_dr_frag_hdr_t* hdr) +{ + ompi_proc_t* proc = recvreq->req_proc; + mca_bml_base_endpoint_t* bml_endpoint = NULL; + mca_btl_base_descriptor_t* des; + mca_bml_base_btl_t* bml_btl; + mca_pml_dr_ack_hdr_t* nack; + int rc; + + bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml; + bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager); + + /* allocate descriptor */ + MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t)); + if(NULL == des) { + return; + } + + /* fill out header */ + nack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; + nack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_NACK; + nack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH; + nack->hdr_vid = hdr->hdr_vid; + nack->hdr_vmask = 1 << hdr->hdr_frag_idx; + nack->hdr_src_req = hdr->hdr_src_req; + nack->hdr_dst_req.pval = recvreq; + + /* initialize descriptor */ + des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; + des->des_cbfunc = mca_pml_dr_ctl_completion; + + rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); + if(rc != OMPI_SUCCESS) { + mca_bml_base_free(bml_btl, des); + } +} + + +/* + * Generate an ack w/ the current vfrag status. + */ + +static void mca_pml_dr_recv_request_vfrag_ack( + mca_pml_dr_recv_request_t* recvreq, + mca_pml_dr_vfrag_t* vfrag) +{ + ompi_proc_t* proc = recvreq->req_proc; + mca_bml_base_endpoint_t* bml_endpoint = NULL; + mca_btl_base_descriptor_t* des; + mca_bml_base_btl_t* bml_btl; + mca_pml_dr_ack_hdr_t* ack; + int rc; + + bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml; + bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager); + + /* allocate descriptor */ + MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t)); + if(NULL == des) { + return; + } + + /* fill out header */ + ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; + ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; + ack->hdr_common.hdr_flags = 0; + ack->hdr_vid = vfrag->vf_id; + ack->hdr_vmask = vfrag->vf_ack; + ack->hdr_src_req = recvreq->req_vfrag0.vf_send; + ack->hdr_dst_req.pval = recvreq; + + /* initialize descriptor */ + des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; + des->des_cbfunc = mca_pml_dr_ctl_completion; + + rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); + if(rc != OMPI_SUCCESS) { + mca_bml_base_free(bml_btl, des); + } +} + + /* * Update the recv request status to reflect the number of bytes @@ -202,6 +298,7 @@ void mca_pml_dr_recv_request_progress( size_t data_offset = 0; mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval; size_t i; + uint32_t csum = 0; for(i=0; ireq_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length; - recvreq->req_send = hdr->hdr_rndv.hdr_src_req; + recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_req; MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match); - mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received); MCA_PML_DR_RECV_REQUEST_UNPACK( recvreq, segments, @@ -236,7 +333,10 @@ void mca_pml_dr_recv_request_progress( sizeof(mca_pml_dr_rendezvous_hdr_t), data_offset, bytes_received, - bytes_delivered); + bytes_delivered, + csum); + mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv, + (csum == hdr->hdr_match.hdr_csum) ? MCA_PML_DR_HDR_TYPE_ACK : MCA_PML_DR_HDR_TYPE_NACK); break; case MCA_PML_DR_HDR_TYPE_FRAG: @@ -250,7 +350,30 @@ void mca_pml_dr_recv_request_progress( sizeof(mca_pml_dr_frag_hdr_t), data_offset, bytes_received, - bytes_delivered); + bytes_delivered, + csum); + + /* if checksum fails - immediately nack this fragment */ + if(csum != hdr->hdr_frag.hdr_frag_csum) { + bytes_received = bytes_delivered = 0; + mca_pml_dr_recv_request_nack(recvreq, &hdr->hdr_frag); + } else { + mca_pml_dr_vfrag_t* vfrag; + uint64_t bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx); + + /* update vfrag status */ + MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag); + if((vfrag->vf_ack & bit) == 0) { + vfrag->vf_ack |= bit; + if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) { + /* done w/ this vfrag - ack it */ + mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag); + } + } else { + /* duplicate fragment - send an ack w/ the frags completed */ + mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag); + } + } break; default: @@ -468,3 +591,4 @@ static mca_pml_dr_recv_frag_t* mca_pml_dr_recv_request_match_specific_proc( return frag; } + diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.h b/ompi/mca/pml/dr/pml_dr_recvreq.h index 2e4c17dfd0..ae4f2f3802 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.h +++ b/ompi/mca/pml/dr/pml_dr_recvreq.h @@ -21,11 +21,14 @@ #ifndef OMPI_PML_DR_RECV_REQUEST_H #define OMPI_PML_DR_RECV_REQUEST_H -#include "pml_dr.h" -#include "pml_dr_proc.h" +#include "ompi_config.h" #include "mca/mpool/base/base.h" #include "mca/pml/base/pml_base_recvreq.h" +#include "pml_dr.h" +#include "pml_dr_proc.h" +#include "pml_dr_vfrag.h" + #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif @@ -34,15 +37,19 @@ extern "C" { struct mca_pml_dr_recv_request_t { mca_pml_base_recv_request_t req_recv; struct ompi_proc_t *req_proc; - ompi_ptr_t req_send; #if OMPI_HAVE_THREAD_SUPPORT volatile int32_t req_lock; #else int32_t req_lock; #endif - size_t req_pipeline_depth; - size_t req_bytes_received; - size_t req_bytes_delivered; + size_t req_pipeline_depth; + size_t req_bytes_received; + size_t req_bytes_delivered; + + mca_pml_dr_vfrag_t *req_vfrag; + mca_pml_dr_vfrag_t req_vfrag0; + opal_list_t req_vfrags; + opal_mutex_t req_mutex; }; typedef struct mca_pml_dr_recv_request_t mca_pml_dr_recv_request_t; @@ -56,12 +63,12 @@ OBJ_CLASS_DECLARATION(mca_pml_dr_recv_request_t); * @param rc (OUT) OMPI_SUCCESS or error status on failure. * @return Receive request. */ -#define MCA_PML_DR_RECV_REQUEST_ALLOC(recvreq, rc) \ +#define MCA_PML_DR_RECV_REQUEST_ALLOC(recvreq, rc) \ do { \ opal_list_item_t* item; \ rc = OMPI_SUCCESS; \ - OMPI_FREE_LIST_GET(&mca_pml_dr.recv_requests, item, rc); \ - recvreq = (mca_pml_dr_recv_request_t*)item; \ + OMPI_FREE_LIST_GET(&mca_pml_dr.recv_requests, item, rc); \ + recvreq = (mca_pml_dr_recv_request_t*)item; \ } while(0) @@ -77,7 +84,7 @@ do { \ * @param comm (IN) Communicator. * @param persistent (IN) Is this a ersistent request. */ -#define MCA_PML_DR_RECV_REQUEST_INIT( \ +#define MCA_PML_DR_RECV_REQUEST_INIT( \ request, \ addr, \ count, \ @@ -105,6 +112,16 @@ do { \ */ #define MCA_PML_DR_RECV_REQUEST_RETURN(recvreq) \ do { \ + opal_list_item_t* item; \ + \ + /* return vfrags */ \ + OPAL_THREAD_LOCK(&(recvreq)->req_mutex); \ + while(NULL != (item = opal_list_remove_first(&(recvreq)->req_vfrags))) { \ + OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, item); \ + } \ + OPAL_THREAD_UNLOCK(&(recvreq)->req_mutex); \ + \ + /* decrement reference counts */ \ MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \ OMPI_FREE_LIST_RETURN(&mca_pml_dr.recv_requests, (opal_list_item_t*)(recvreq)); \ } while(0) @@ -131,7 +148,7 @@ void mca_pml_dr_recv_request_match_specific(mca_pml_dr_recv_request_t* request); * @param request Receive request. * @return OMPI_SUCESS or error status on failure. */ -#define MCA_PML_DR_RECV_REQUEST_START(request) \ +#define MCA_PML_DR_RECV_REQUEST_START(request) \ do { \ /* init/re-init the request */ \ (request)->req_bytes_received = 0; \ @@ -141,6 +158,7 @@ do { (request)->req_recv.req_base.req_pml_complete = false; \ (request)->req_recv.req_base.req_ompi.req_complete = false; \ (request)->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \ + (request)->req_vfrag = &(request)->req_vfrag0; \ \ /* always set the req_status.MPI_TAG to ANY_TAG before starting the \ * request. This field is used if cancelled to find out if the request \ @@ -152,9 +170,9 @@ do { \ /* attempt to match posted recv */ \ if((request)->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) { \ - mca_pml_dr_recv_request_match_wild(request); \ + mca_pml_dr_recv_request_match_wild(request); \ } else { \ - mca_pml_dr_recv_request_match_specific(request); \ + mca_pml_dr_recv_request_match_specific(request); \ } \ } while (0) @@ -163,7 +181,7 @@ do { * */ -#define MCA_PML_DR_RECV_REQUEST_MATCHED( \ +#define MCA_PML_DR_RECV_REQUEST_MATCHED( \ request, \ hdr) \ do { \ @@ -190,14 +208,15 @@ do { * */ -#define MCA_PML_DR_RECV_REQUEST_UNPACK( \ +#define MCA_PML_DR_RECV_REQUEST_UNPACK( \ request, \ segments, \ num_segments, \ seg_offset, \ data_offset, \ bytes_received, \ - bytes_delivered) \ + bytes_delivered, \ + csum) \ do { \ if(request->req_recv.req_bytes_packed > 0) { \ struct iovec iov[MCA_BTL_DES_MAX_SEGMENTS]; \ @@ -259,6 +278,49 @@ void mca_pml_dr_recv_request_matched_probe( void mca_pml_dr_recv_request_schedule( mca_pml_dr_recv_request_t* req); +/* + * + */ + +#define MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq,hdr,vfrag) \ +do { \ + if((recvreq)->req_vfrag->vf_id == (hdr)->hdr_vid) { \ + vfrag = (recvreq)->req_vfrag; \ + } else if ((hdr)->hdr_frag_offset == 0) { \ + vfrag = &(recvreq)->req_vfrag0; \ + } else { \ + opal_list_item_t* item; \ + int rc; \ + \ + vfrag = NULL; \ + OPAL_THREAD_LOCK(&(recvreq)->req_mutex); \ + for(item = opal_list_get_first(&(recvreq)->req_vfrags); \ + item != opal_list_get_end(&(recvreq)->req_vfrags); \ + item = opal_list_get_next(item)) { \ + mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \ + if(vf->vf_id == (hdr)->hdr_vid) { \ + vfrag = vf; \ + break; \ + } \ + } \ + if(NULL == vfrag) { \ + MCA_PML_DR_VFRAG_ALLOC(vfrag,rc); \ + if(NULL != vfrag) { \ + (vfrag)->vf_id = (hdr)->hdr_vid; \ + (vfrag)->vf_len = (hdr)->hdr_vlen; \ + (vfrag)->vf_ack = 0; \ + if((hdr)->hdr_vlen == 64) { \ + (vfrag)->vf_mask = ~(uint64_t)0; \ + } else { \ + (vfrag)->vf_mask = (((uint64_t)1 << (hdr)->hdr_vlen)-1); \ + } \ + opal_list_append(&(recvreq)->req_vfrags, (opal_list_item_t*)vfrag); \ + (recvreq)->req_vfrag = vfrag; \ + } \ + } \ + OPAL_THREAD_UNLOCK(&(recvreq)->req_mutex); \ + } \ +} while(0) #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index a44c568e77..e0aa3b99d1 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -20,6 +20,7 @@ /*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ #include "ompi_config.h" +#include "opal/util/crc.h" #include "ompi/include/constants.h" #include "mca/pml/pml.h" #include "mca/btl/btl.h" @@ -30,10 +31,9 @@ #include "pml_dr_proc.h" #include "pml_dr_sendreq.h" #include "pml_dr_recvreq.h" -#include "pml_dr_endpoint.h" #include "mca/bml/base/base.h" - + static int mca_pml_dr_send_request_fini(struct ompi_request_t** request) { mca_pml_dr_send_request_t* sendreq = *(mca_pml_dr_send_request_t**)(request); @@ -73,14 +73,26 @@ static int mca_pml_dr_send_request_cancel(struct ompi_request_t* request, int co static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) { + req->req_vfrag0.vf_len = 1; + req->req_vfrag0.vf_idx = 1; + req->req_vfrag0.vf_mask = 1; req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini; req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free; req->req_send.req_base.req_ompi.req_cancel = mca_pml_dr_send_request_cancel; + + OBJ_CONSTRUCT(&req->req_vfrag0, mca_pml_dr_vfrag_t); + OBJ_CONSTRUCT(&req->req_pending, opal_list_t); + OBJ_CONSTRUCT(&req->req_retrans, opal_list_t); + OBJ_CONSTRUCT(&req->req_mutex, opal_mutex_t); } static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req) { + OBJ_DESTRUCT(&req->req_vfrag0); + OBJ_DESTRUCT(&req->req_pending); + OBJ_DESTRUCT(&req->req_retrans); + OBJ_DESTRUCT(&req->req_mutex); } @@ -115,7 +127,9 @@ void mca_pml_dr_match_completion_cache( /* signal request completion */ OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + if(sendreq->req_num_acks == sendreq->req_num_vfrags) { + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } OPAL_THREAD_UNLOCK(&ompi_request_lock); } @@ -144,7 +158,9 @@ void mca_pml_dr_match_completion_free( /* signal request completion */ OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + if(sendreq->req_num_acks == sendreq->req_num_vfrags) { + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } OPAL_THREAD_UNLOCK(&ompi_request_lock); } @@ -211,7 +227,8 @@ static void mca_pml_dr_frag_completion( /* count bytes of user data actually delivered */ MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor,sizeof(mca_pml_dr_frag_hdr_t)); if (OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1) == 0 && - sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { + sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed && + sendreq->req_num_acks == sendreq->req_num_vfrags) { MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); schedule = false; } else { @@ -272,20 +289,10 @@ int mca_pml_dr_send_request_start_buffered( return rc; } - /* build rendezvous header */ - hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; - hdr->hdr_common.hdr_flags = 0; - hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; - hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; - hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; - hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; - hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; - hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; - hdr->hdr_rndv.hdr_src_req.pval = sendreq; - /* update lengths */ segment->seg_len = sizeof(mca_pml_dr_rendezvous_hdr_t) + max_data; sendreq->req_send_offset = max_data; + sendreq->req_vfrag0.vf_size = max_data; descriptor->des_cbfunc = mca_pml_dr_rndv_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -311,6 +318,21 @@ int mca_pml_dr_send_request_start_buffered( return rc; } + /* build rendezvous header */ + hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; + hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_csum = 0; + hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; + hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; + hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; + hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; + hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; + hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum; + hdr->hdr_match.hdr_src_req.pval = sendreq; + hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; + hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t)); + /* re-init convertor for packed data */ ompi_convertor_prepare_for_send( &sendreq->req_send.req_convertor, @@ -376,15 +398,19 @@ int mca_pml_dr_send_request_start_copy( /* build match header */ hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; + hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum; + hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); /* update lengths */ segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data; sendreq->req_send_offset = max_data; + sendreq->req_vfrag0.vf_size = max_data; /* short message */ descriptor->des_cbfunc = mca_pml_dr_match_completion_free; @@ -435,11 +461,14 @@ int mca_pml_dr_send_request_start_prepare( /* build match header */ hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; + hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum; + hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); /* short message */ descriptor->des_cbfunc = mca_pml_dr_match_completion_free; @@ -448,6 +477,7 @@ int mca_pml_dr_send_request_start_prepare( /* update lengths */ sendreq->req_send_offset = size; + sendreq->req_vfrag0.vf_size = size; /* send */ rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); @@ -504,14 +534,15 @@ int mca_pml_dr_send_request_start_rndv( hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; + hdr->hdr_match.hdr_src_req.pval = sendreq; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; - hdr->hdr_rndv.hdr_src_req.pval = sendreq; /* first fragment of a long message */ des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_cbdata = sendreq; des->des_cbfunc = mca_pml_dr_rndv_completion; sendreq->req_send_offset = size; + sendreq->req_vfrag0.vf_size = size; /* send */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); @@ -537,7 +568,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) */ mca_bml_base_endpoint_t* bml_endpoint = sendreq->req_endpoint; - if(OPAL_THREAD_ADD32(&sendreq->req_lock,1) == 1) { do { /* allocate remaining bytes to BTLs */ @@ -547,58 +577,36 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) mca_pml_dr_frag_hdr_t* hdr; mca_btl_base_descriptor_t* des; - int rc; - size_t size; mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); - size_t num_btl_avail = bml_endpoint->btl_send.arr_size; - - if(num_btl_avail == 1 || bytes_remaining < bml_btl->btl_min_send_size) { - size = bytes_remaining; + mca_pml_dr_vfrag_t* vfrag = sendreq->req_vfrag; + size_t size = bytes_remaining; + size_t offset = sendreq->req_send_offset - vfrag->vf_offset; + int rc; - /* otherwise attempt to give the BTL a percentage of the message - * based on a weighting factor. for simplicity calculate this as - * a percentage of the overall message length (regardless of amount - * previously assigned) - */ - } else { - size = (size_t)(bml_btl->btl_weight * bytes_remaining); - } - - /* makes sure that we don't exceed BTL max send size */ - if (bml_btl->btl_max_send_size != 0 && - size > bml_btl->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t)) { - size = bml_btl->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t); - - /* very expensive - however for buffered sends we need to send on a - * boundary that the receiver will be able to unpack completely - * using the native datatype - */ - if(sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { - ompi_convertor_t convertor; - size_t position = sendreq->req_send_offset + size; - /* - * We need this local convertor in order to correctly compute - * the correct position. Therefore we have to correctly construct and - * destruct it. - */ - OBJ_CONSTRUCT( &convertor, ompi_convertor_t ); - ompi_convertor_copy_and_prepare_for_send( - &sendreq->req_send.req_convertor, - sendreq->req_send.req_base.req_datatype, - sendreq->req_send.req_base.req_count, - sendreq->req_send.req_base.req_addr, - &convertor); - ompi_convertor_set_position(&convertor, &position); - OBJ_DESTRUCT( &convertor ); - size = position - sendreq->req_send_offset; + /* do we need to allocate a new vfrag */ + if(vfrag->vf_size == offset) { + MCA_PML_DR_VFRAG_ALLOC(vfrag,rc); + if(NULL == vfrag) { + OPAL_THREAD_LOCK(&mca_pml_dr.lock); + opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); + OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); + break; } + MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag); + offset = 0; + sendreq->req_num_vfrags++; } - - + + /* makes sure that we don't exceed vfrag size */ + if (size > vfrag->vf_max_send_size) { + size = vfrag->vf_max_send_size; + } + if (size > vfrag->vf_size - offset) { + size = vfrag->vf_size - offset; + } + /* pack into a descriptor */ - ompi_convertor_set_position(&sendreq->req_send.req_convertor, - &sendreq->req_send_offset); - + ompi_convertor_set_position(&sendreq->req_send.req_convertor, &sendreq->req_send_offset); mca_bml_base_prepare_src( bml_btl, NULL, @@ -607,7 +615,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) &size, &des ); - if(des == NULL) { OPAL_THREAD_LOCK(&mca_pml_dr.lock); opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); @@ -620,15 +627,25 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) /* setup header */ hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval; hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; + hdr->hdr_vid = vfrag->vf_id; + hdr->hdr_vlen = vfrag->vf_len; + hdr->hdr_frag_idx = vfrag->vf_idx; + hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum; hdr->hdr_frag_offset = sendreq->req_send_offset; hdr->hdr_src_req.pval = sendreq; - hdr->hdr_dst_req = sendreq->req_recv; + hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv; + hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)); /* update state */ + vfrag->vf_idx++; sendreq->req_send_offset += size; OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); + /* start vfrag watchdog timer */ + MCA_PML_DR_VFRAG_WDOG_START(vfrag); + /* initiate send - note that this may complete before the call returns */ rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML); @@ -645,8 +662,173 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) } mca_pml_dr_progress(); } + + /* + * VFrags w/ nacks or that timed out + */ + while(opal_list_get_size(&sendreq->req_retrans) && + sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { + mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans); + + /* + * Retransmit fragments that have not been acked. + */ + while(vfrag->vf_idx < vfrag->vf_len && + sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { + if(((1 << vfrag->vf_idx) & vfrag->vf_ack) == 0) { + mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + mca_pml_dr_frag_hdr_t* hdr; + mca_btl_base_descriptor_t* des; + size_t offset = vfrag->vf_offset + (vfrag->vf_max_send_size * vfrag->vf_idx); + size_t size; + int rc; + + if(vfrag->vf_idx == vfrag->vf_len - 1) { + size = vfrag->vf_size - offset; + } else { + size = vfrag->vf_max_send_size; + } + + /* pack into a descriptor */ + ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); + mca_bml_base_prepare_src( + bml_btl, + NULL, + &sendreq->req_send.req_convertor, + sizeof(mca_pml_dr_frag_hdr_t), + &size, + &des + ); + if(des == NULL) { + OPAL_THREAD_LOCK(&mca_pml_dr.lock); + opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); + OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); + break; + } + des->des_cbfunc = mca_pml_dr_frag_completion; + des->des_cbdata = sendreq; + + /* setup header */ + hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval; + hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_csum = 0; + hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; + hdr->hdr_vid = vfrag->vf_id; + hdr->hdr_vlen = vfrag->vf_len; + hdr->hdr_frag_idx = vfrag->vf_idx; + hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum; + hdr->hdr_frag_offset = sendreq->req_send_offset; + hdr->hdr_src_req.pval = sendreq; + hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv; + hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)); + + /* update state */ + vfrag->vf_idx++; + sendreq->req_send_offset += size; + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); + + /* start vfrag watchdog timer */ + MCA_PML_DR_VFRAG_WDOG_START(vfrag); + + /* initiate send - note that this may complete before the call returns */ + rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML); + + if(rc == OMPI_SUCCESS) { + bytes_remaining -= size; + } else { + vfrag->vf_idx--; + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1); + mca_bml_base_free(bml_btl,des); + OPAL_THREAD_LOCK(&mca_pml_dr.lock); + opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); + OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); + break; + } + } + vfrag->vf_idx++; + } + + /* move from retrans to pending list */ + if(vfrag->vf_idx == vfrag->vf_len) { + OPAL_THREAD_LOCK(&senddreq->req_mutex); + opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag); + opal_list_append(&sendreq->req_pending, (opal_list_item_t*)vfrag); + OPAL_THREAD_UNLOCK(&sendreq->req_mutex); + } + } } while (OPAL_THREAD_ADD32(&sendreq->req_lock,-1) > 0); } return OMPI_SUCCESS; } + +/** + * Acknowledgment of vfrag + */ +void mca_pml_dr_send_request_acked( + mca_pml_dr_send_request_t* sendreq, + mca_pml_dr_ack_hdr_t* ack) +{ + if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCH) { + sendreq->req_vfrag0.vf_recv = ack->hdr_dst_req; + MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); + } else { + mca_pml_dr_vfrag_t* vfrag; + MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag); + if(NULL == vfrag) { + return; + } + + /* add in acknowledged fragments */ + vfrag->vf_ack |= ack->hdr_vmask; + + /* have all fragments w/in this vfrag been acked? */ + if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) { + + /* return vfrag */ + if (vfrag != &sendreq->req_vfrag0) { + MCA_PML_DR_VFRAG_RETURN(vfrag); + } + + /* are we done with this request */ + OPAL_THREAD_LOCK(&ompi_request_lock); + sendreq->req_num_acks++; + if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed && + sendreq->req_num_acks == sendreq->req_num_vfrags) { + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + + } else { + /* retransmit missing fragments */ + OPAL_THREAD_LOCK(&sendreq->req_mutex); + vfrag->vf_idx = 0; + opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); + OPAL_THREAD_UNLOCK(&sendreq->req_mutex); + mca_pml_dr_send_request_schedule(sendreq); + } + } +} + + +void mca_pml_dr_send_request_nacked( + mca_pml_dr_send_request_t* sendreq, + mca_pml_dr_ack_hdr_t* ack) +{ + mca_pml_dr_vfrag_t* vfrag; + MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag); + if(NULL == vfrag) { + return; + } + + /* removed nacked bits from acknowledged fragments */ + vfrag->vf_idx = 0; + vfrag->vf_ack &= ~ack->hdr_vmask; + + /* retransmit missing fragments */ + OPAL_THREAD_LOCK(&sendreq->req_mutex); + opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); + OPAL_THREAD_UNLOCK(&sendreq->req_mutex); + mca_pml_dr_send_request_schedule(sendreq); +} + diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index ebf877668d..1804dadf72 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -21,14 +21,17 @@ #ifndef OMPI_PML_DR_SEND_REQUEST_H #define OMPI_PML_DR_SEND_REQUEST_H -#include "mca/btl/btl.h" -#include "mca/pml/base/pml_base_sendreq.h" -#include "mca/mpool/base/base.h" +#include "ompi_config.h" +#include "ompi/datatype/convertor.h" +#include "ompi/mca/btl/btl.h" +#include "ompi/mca/pml/base/pml_base_sendreq.h" +#include "ompi/mca/mpool/base/base.h" +#include "ompi/mca/bml/bml.h" + #include "pml_dr_proc.h" #include "pml_dr_comm.h" #include "pml_dr_hdr.h" -#include "datatype/convertor.h" -#include "mca/bml/bml.h" +#include "pml_dr_vfrag.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { @@ -39,17 +42,24 @@ struct mca_pml_dr_send_request_t { mca_pml_base_send_request_t req_send; ompi_proc_t* req_proc; mca_bml_base_endpoint_t* req_endpoint; - ompi_ptr_t req_recv; #if OMPI_HAVE_THREAD_SUPPORT volatile int32_t req_state; volatile int32_t req_lock; #else - volatile int32_t req_state; - volatile int32_t req_lock; + int32_t req_state; + int32_t req_lock; #endif size_t req_pipeline_depth; size_t req_bytes_delivered; size_t req_send_offset; + + mca_pml_dr_vfrag_t* req_vfrag; + mca_pml_dr_vfrag_t req_vfrag0; + opal_list_t req_pending; + opal_list_t req_retrans; + opal_mutex_t req_mutex; + size_t req_num_acks; + size_t req_num_vfrags; }; typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t; @@ -57,7 +67,7 @@ typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t; OBJ_CLASS_DECLARATION(mca_pml_dr_send_request_t); -#define MCA_PML_DR_SEND_REQUEST_ALLOC( \ +#define MCA_PML_DR_SEND_REQUEST_ALLOC( \ comm, \ dst, \ sendreq, \ @@ -71,14 +81,14 @@ OBJ_CLASS_DECLARATION(mca_pml_dr_send_request_t); rc = OMPI_ERR_OUT_OF_RESOURCE; \ } else { \ rc = OMPI_SUCCESS; \ - OMPI_FREE_LIST_WAIT(&mca_pml_dr.send_requests, item, rc); \ - sendreq = (mca_pml_dr_send_request_t*)item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_dr.send_requests, item, rc); \ + sendreq = (mca_pml_dr_send_request_t*)item; \ sendreq->req_proc = proc; \ } \ } -#define MCA_PML_DR_SEND_REQUEST_INIT( \ +#define MCA_PML_DR_SEND_REQUEST_INIT( \ sendreq, \ buf, \ count, \ @@ -108,6 +118,7 @@ OBJ_CLASS_DECLARATION(mca_pml_dr_send_request_t); #define MCA_PML_DR_SEND_REQUEST_START(sendreq, rc) \ do { \ mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \ + mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \ mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*)sendreq->req_proc->proc_pml; \ mca_bml_base_btl_t* bml_btl; \ size_t size = sendreq->req_send.req_bytes_packed; \ @@ -126,9 +137,12 @@ do { sendreq->req_send.req_base.req_ompi.req_complete = false; \ sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \ sendreq->req_send.req_base.req_ompi.req_status._cancelled = 0; \ - sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32( \ - &comm->procs[sendreq->req_send.req_base.req_peer].send_sequence,1); \ + sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32(&proc->send_sequence,1); \ sendreq->req_endpoint = endpoint; \ + sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ + sendreq->req_vfrag = &sendreq->req_vfrag0; \ + sendreq->req_num_acks = 0; \ + sendreq->req_num_vfrags = 0; \ \ /* select a btl */ \ bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ @@ -137,32 +151,32 @@ do { if(size == 0 && sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) { \ mca_btl_base_descriptor_t* descriptor; \ mca_btl_base_segment_t* segment; \ - mca_pml_dr_hdr_t* hdr; \ + mca_pml_dr_hdr_t* hdr; \ \ /* allocate a descriptor */ \ - MCA_PML_DR_DES_ALLOC(bml_btl, descriptor, sizeof(mca_pml_dr_match_hdr_t)); \ + MCA_PML_DR_DES_ALLOC(bml_btl, descriptor, sizeof(mca_pml_dr_match_hdr_t)); \ if(NULL == descriptor) { \ return OMPI_ERR_OUT_OF_RESOURCE; \ } \ segment = descriptor->des_src; \ \ /* build hdr */ \ - hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; \ + hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; \ hdr->hdr_common.hdr_flags = 0; \ - hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; \ + hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; \ hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \ hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \ hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \ hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \ \ /* short message */ \ - descriptor->des_cbfunc = mca_pml_dr_match_completion_cache; \ + descriptor->des_cbfunc = mca_pml_dr_match_completion_cache; \ descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; \ descriptor->des_cbdata = sendreq; \ \ /* request is complete at mpi level */ \ OPAL_THREAD_LOCK(&ompi_request_lock); \ - MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ + MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ OPAL_THREAD_UNLOCK(&ompi_request_lock); \ \ /* send */ \ @@ -205,7 +219,7 @@ do { * Mark a send request as completed at the MPI level. */ -#define MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq) \ +#define MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq) \ do { \ (sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \ (sendreq)->req_send.req_base.req_comm->c_my_rank; \ @@ -226,7 +240,7 @@ do { * may have been orphaned by the user or have already completed * at the MPI level. */ -#define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \ +#define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \ do { \ /* request completed at pml level */ \ (sendreq)->req_send.req_base.req_pml_complete = true; \ @@ -238,10 +252,10 @@ do { (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ } \ - MCA_PML_DR_SEND_REQUEST_RETURN(sendreq); \ + MCA_PML_DR_SEND_REQUEST_RETURN(sendreq); \ /* is request complete at mpi level */ \ } else if ((sendreq)->req_send.req_base.req_ompi.req_complete == false) { \ - MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ + MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ /* buffered send - release any resources */ \ } else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ @@ -256,7 +270,7 @@ do { * However, these events may occur in either order. */ -#define MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq) \ +#define MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq) \ do { \ bool schedule = false; \ \ @@ -264,7 +278,7 @@ do { if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \ OPAL_THREAD_LOCK(&ompi_request_lock); \ if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \ - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); \ + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); \ } else { \ schedule = true; \ } \ @@ -273,7 +287,7 @@ do { \ /* additional data to schedule */ \ if(schedule == true) { \ - mca_pml_dr_send_request_schedule(sendreq); \ + mca_pml_dr_send_request_schedule(sendreq); \ } \ } while (0) @@ -281,20 +295,111 @@ do { * Release resources associated with a request */ -#define MCA_PML_DR_SEND_REQUEST_RETURN(sendreq) \ -{ \ - /* Let the base handle the reference counts */ \ - MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \ - OMPI_FREE_LIST_RETURN( \ - &mca_pml_dr.send_requests, (opal_list_item_t*)sendreq); \ -} - +#define MCA_PML_DR_SEND_REQUEST_RETURN(sendreq) \ +do { \ + /* Let the base handle the reference counts */ \ + MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \ + OMPI_FREE_LIST_RETURN( \ + &mca_pml_dr.send_requests, (opal_list_item_t*)sendreq); \ +} while(0) + +/* + * Lookup/allocate a vfrag for the pending send + */ + +#define MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq, endpoint, size, vfrag) \ +do { \ + mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \ + mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \ + size_t max_send_size = endpoint->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t); \ + size_t div = size / max_send_size; \ + \ + if(div == 0) { \ + vfrag->vf_len = 1; \ + vfrag->vf_size = size; \ + vfrag->vf_mask = 1; \ + } else if(div > 64) { \ + vfrag->vf_len = 64; \ + vfrag->vf_size = (max_send_size << 6); /* size * 64 */ \ + vfrag->vf_mask = ~(uint64_t)0; \ + } else if (div == 64) { \ + size_t mod = size % max_send_size; \ + vfrag->vf_len = 64; \ + vfrag->vf_size = (mod ? (size - mod) : size); \ + vfrag->vf_mask = ~(uint64_t)0; \ + } else { \ + size_t mod = size % max_send_size; \ + vfrag->vf_len = div + (mod ? 1 : 0); \ + vfrag->vf_size = size; \ + vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \ + } \ + \ + vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ + vfrag->vf_ack = 0; \ + vfrag->vf_offset = sendreq->req_send_offset; \ + vfrag->vf_idx = 0; \ + vfrag->vf_max_send_size = max_send_size; \ + opal_list_append(&sendreq->req_pending, (opal_list_item_t*)vfrag); \ + sendreq->req_vfrag = vfrag; \ +} while(0) + + +/* + * + */ + +#define MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq,hdr,vfrag) \ +do { \ + if ((hdr)->hdr_vid == (sendreq)->req_vfrag0.vf_id) { \ + vfrag = &(sendreq)->req_vfrag0; \ + } else if((sendreq)->req_vfrag->vf_id == (hdr)->hdr_vid) { \ + vfrag = (sendreq)->req_vfrag; \ + opal_list_remove_item(&(sendreq)->req_pending,(opal_list_item_t*)vfrag); \ + } else { \ + opal_list_item_t* item; \ + vfrag = NULL; \ + OPAL_THREAD_LOCK(&(sendreq)->req_mutex); \ + for(item = opal_list_get_first(&(sendreq)->req_pending); \ + item != opal_list_get_end(&(sendreq)->req_pending); \ + item = opal_list_get_next(item)) { \ + mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \ + if(vf->vf_id == (hdr)->hdr_vid) { \ + opal_list_remove_item(&(sendreq)->req_pending,item); \ + vfrag = vf; \ + break; \ + } \ + } \ + OPAL_THREAD_UNLOCK(&(sendreq)->req_mutex); \ + } \ +} while(0) + +/* + * + */ + +#define MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq,hdr,vfrag) \ +do { \ + opal_list_item_t* item; \ + vfrag = NULL; \ + OPAL_THREAD_LOCK(&(sendreq)->req_mutex); \ + for(item = opal_list_get_first(&(sendreq)->req_retrans); \ + item != opal_list_get_end(&(sendreq)->req_retrans); \ + item = opal_list_get_next(item)) { \ + mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \ + if(vf->vf_id == (hdr)->hdr_vid) { \ + vfrag = vf; \ + break; \ + } \ + } \ + OPAL_THREAD_UNLOCK(&(sendreq)->req_mutex); \ +} while(0) + /* * Update bytes delivered on request based on supplied descriptor */ -#define MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, descriptor, hdrlen) \ +#define MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, descriptor, hdrlen) \ do { \ size_t i; \ mca_btl_base_segment_t* segments = descriptor->des_src; \ @@ -310,18 +415,18 @@ do { * Attempt to process any pending requests */ -#define MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING() \ +#define MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING() \ do { \ /* advance pending requests */ \ - while(opal_list_get_size(&mca_pml_dr.send_pending)) { \ - mca_pml_dr_send_request_t* sendreq; \ - OPAL_THREAD_LOCK(&mca_pml_dr.lock); \ - sendreq = (mca_pml_dr_send_request_t*) \ - opal_list_remove_first(&mca_pml_dr.send_pending); \ - OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); \ + while(opal_list_get_size(&mca_pml_dr.send_pending)) { \ + mca_pml_dr_send_request_t* sendreq; \ + OPAL_THREAD_LOCK(&mca_pml_dr.lock); \ + sendreq = (mca_pml_dr_send_request_t*) \ + opal_list_remove_first(&mca_pml_dr.send_pending); \ + OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); \ if(NULL == sendreq) \ break; \ - mca_pml_dr_send_request_schedule(sendreq); \ + mca_pml_dr_send_request_schedule(sendreq); \ } \ } while (0) @@ -357,6 +462,21 @@ int mca_pml_dr_send_request_start_rndv( int mca_pml_dr_send_request_schedule( mca_pml_dr_send_request_t* sendreq); +int mca_pml_dr_send_request_reschedule( + mca_pml_dr_send_request_t* sendreq, + mca_pml_dr_vfrag_t* vfrag); + +/** + * Acknowledgment of vfrag + */ +void mca_pml_dr_send_request_acked( + mca_pml_dr_send_request_t* sendreq, + mca_pml_dr_ack_hdr_t*); + +void mca_pml_dr_send_request_nacked( + mca_pml_dr_send_request_t* sendreq, + mca_pml_dr_ack_hdr_t*); + /** * Completion callback on match header * Cache descriptor. diff --git a/ompi/mca/pml/dr/pml_dr_endpoint.c b/ompi/mca/pml/dr/pml_dr_vfrag.c similarity index 60% rename from ompi/mca/pml/dr/pml_dr_endpoint.c rename to ompi/mca/pml/dr/pml_dr_vfrag.c index 881e7fc1df..fbb524ea2a 100644 --- a/ompi/mca/pml/dr/pml_dr_endpoint.c +++ b/ompi/mca/pml/dr/pml_dr_vfrag.c @@ -5,22 +5,39 @@ * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ - + #include "ompi_config.h" - -#include - -#include "mca/pml/pml.h" -#include "pml_dr_endpoint.h" +#include "pml_dr_vfrag.h" + + +static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag) +{ + memset(&vfrag->vf_event, 0, sizeof(vfrag->vf_event)); +} + + +static void mca_pml_dr_vfrag_destruct(mca_pml_dr_vfrag_t* vfrag) +{ + +} + + +OBJ_CLASS_INSTANCE( + mca_pml_dr_vfrag_t, + opal_list_item_t, + mca_pml_dr_vfrag_construct, + mca_pml_dr_vfrag_destruct +); + diff --git a/ompi/mca/pml/dr/pml_dr_vfrag.h b/ompi/mca/pml/dr/pml_dr_vfrag.h new file mode 100644 index 0000000000..fd28c21951 --- /dev/null +++ b/ompi/mca/pml/dr/pml_dr_vfrag.h @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ +#ifndef OMPI_PML_DR_VFRAG_H_ +#define OMPI_PML_DR_VFRAG_H_ + +#include "ompi_config.h" +#include "opal/event/event.h" +#include "pml_dr.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + + +struct mca_pml_dr_vfrag_t { + opal_list_item_t super; + ompi_ptr_t vf_send; + ompi_ptr_t vf_recv; + uint32_t vf_id; + uint16_t vf_idx; + uint16_t vf_len; + size_t vf_offset; + size_t vf_size; + size_t vf_max_send_size; + uint64_t vf_ack; + uint64_t vf_mask; + opal_event_t vf_event; +}; +typedef struct mca_pml_dr_vfrag_t mca_pml_dr_vfrag_t; + +OBJ_CLASS_DECLARATION(mca_pml_dr_vfrag_t); + +#define MCA_PML_DR_VFRAG_ALLOC(vfrag,rc) \ +do { \ + opal_list_item_t* item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_dr.vfrags, item, rc); \ + vfrag = (mca_pml_dr_vfrag_t*)item; \ +} while(0) + + +#define MCA_PML_DR_VFRAG_RETURN(vfrag) \ +do { \ + OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, (opal_list_item_t*)vfrag); \ +} while(0) + +#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \ +do { \ + \ +} while(0) + +#define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag) \ +do { \ + \ +} while(0) + + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif +