From 5271948ec09fe5b4a1df9197c99bd348178fe6c7 Mon Sep 17 00:00:00 2001 From: Galen Shipman Date: Wed, 29 Mar 2006 16:19:17 +0000 Subject: [PATCH] --- opal object changes add object size to opal class no longer need the size when allocating a new object as this is stored in the class structure --- dr changes Previous rev. maintained state on the communicator used for acking duplicate fragments, but the communicator may be destroyed prior to successfull delivery of an ack to the peer. We must therefore maintain this state globally on a per peer, not a per peer, per communicator basis. This requires that we use a global rank on the wire and translate this as appropriate to a local rank within the communicator. This commit was SVN r9454. --- ompi/mca/bml/base/base.h | 4 +- ompi/mca/bml/base/bml_base_btl.c | 10 ++-- ompi/mca/bml/base/bml_base_init.c | 6 ++- ompi/mca/bml/base/bml_base_open.c | 22 ++++++--- ompi/mca/bml/bml.h | 5 +- ompi/mca/bml/r2/bml_r2.c | 3 +- ompi/mca/bml/r2/bml_r2.h | 6 ++- ompi/mca/bml/r2/bml_r2_component.c | 4 +- ompi/mca/mpool/base/mpool_base_init.c | 2 - ompi/mca/pml/dr/Makefile.am | 2 + ompi/mca/pml/dr/pml_dr.c | 39 +++++++++++---- ompi/mca/pml/dr/pml_dr.h | 9 +++- ompi/mca/pml/dr/pml_dr_comm.c | 39 +++++++++------ ompi/mca/pml/dr/pml_dr_comm.h | 8 +-- ompi/mca/pml/dr/pml_dr_component.c | 13 +++-- ompi/mca/pml/dr/pml_dr_endpoint.c | 43 ++++++++++++++++ ompi/mca/pml/dr/pml_dr_endpoint.h | 53 ++++++++++++++++++++ ompi/mca/pml/dr/pml_dr_recvfrag.c | 71 +++++++++++++++++---------- ompi/mca/pml/dr/pml_dr_recvreq.c | 20 ++++---- ompi/mca/pml/dr/pml_dr_recvreq.h | 10 ++-- ompi/mca/pml/dr/pml_dr_sendreq.c | 42 ++++++++-------- ompi/mca/pml/dr/pml_dr_sendreq.h | 34 +++++++------ ompi/mca/pml/ob1/pml_ob1_component.c | 6 ++- opal/class/opal_object.h | 21 ++++---- 24 files changed, 332 insertions(+), 140 deletions(-) create mode 100644 ompi/mca/pml/dr/pml_dr_endpoint.c create mode 100644 ompi/mca/pml/dr/pml_dr_endpoint.h diff --git a/ompi/mca/bml/base/base.h b/ompi/mca/bml/base/base.h index 7a1487d25a..cc0c157ea1 100644 --- a/ompi/mca/bml/base/base.h +++ b/ompi/mca/bml/base/base.h @@ -49,7 +49,9 @@ OBJ_CLASS_DECLARATION(mca_bml_base_selected_module_t); */ OMPI_DECLSPEC int mca_bml_base_open(void); -OMPI_DECLSPEC int mca_bml_base_init(bool enable_progress_threads, bool enable_mpi_threads); +OMPI_DECLSPEC int mca_bml_base_init(bool enable_progress_threads, + bool enable_mpi_threads, + opal_class_t* endpoint_class); OMPI_DECLSPEC int mca_bml_base_close(void); diff --git a/ompi/mca/bml/base/bml_base_btl.c b/ompi/mca/bml/base/bml_base_btl.c index 8fb152960b..571dd2a9c9 100644 --- a/ompi/mca/bml/base/bml_base_btl.c +++ b/ompi/mca/bml/base/bml_base_btl.c @@ -64,7 +64,8 @@ int mca_bml_base_btl_array_reserve(mca_bml_base_btl_array_t* array, size_t size) #if OMPI_ENABLE_DEBUG_RELIABILITY -extern double mca_bml_base_error_rate; +extern double mca_bml_base_error_rate_floor; +extern double mca_bml_base_error_rate_ceiling; extern int mca_bml_base_error_count; struct mca_bml_base_context_t { @@ -97,8 +98,11 @@ int mca_bml_base_send( { static int count; des->des_context = bml_btl; - if(mca_bml_base_error_count <= 0 && mca_bml_base_error_rate > 0) { - mca_bml_base_error_count = (int) ((mca_bml_base_error_rate * rand())/(RAND_MAX+1.0)); + if(mca_bml_base_error_count <= 0 && mca_bml_base_error_rate_ceiling > 0) { + mca_bml_base_error_count = (int) ((mca_bml_base_error_rate_ceiling * rand())/(RAND_MAX+1.0)); + if(mca_bml_base_error_count < mca_bml_base_error_rate_floor) { + mca_bml_base_error_count = mca_bml_base_error_rate_floor; + } if(mca_bml_base_error_count % 2) { /* local completion - network "drops" packet */ opal_output(0, "%s:%d: dropping data\n", __FILE__, __LINE__); diff --git a/ompi/mca/bml/base/bml_base_init.c b/ompi/mca/bml/base/bml_base_init.c index 27617afb42..ca01093bb8 100644 --- a/ompi/mca/bml/base/bml_base_init.c +++ b/ompi/mca/bml/base/bml_base_init.c @@ -39,7 +39,8 @@ mca_bml_base_component_t mca_bml_component; int mca_bml_base_init( bool enable_progress_threads, - bool enable_mpi_threads ) { + bool enable_mpi_threads, + opal_class_t* endpoint_class) { opal_list_item_t *item = NULL; mca_bml_base_component_t *component = NULL, *best_component = NULL; mca_bml_base_module_t *module = NULL, *best_module = NULL; @@ -59,7 +60,8 @@ int mca_bml_base_init( bool enable_progress_threads, } module = component->bml_init(&priority, enable_progress_threads, - enable_mpi_threads ); + enable_mpi_threads, + endpoint_class); if(NULL == module) { continue; diff --git a/ompi/mca/bml/base/bml_base_open.c b/ompi/mca/bml/base/bml_base_open.c index ae5e2efb2b..df4ce39d7e 100644 --- a/ompi/mca/bml/base/bml_base_open.c +++ b/ompi/mca/bml/base/bml_base_open.c @@ -29,7 +29,8 @@ opal_list_t mca_bml_base_components_available; #if OMPI_ENABLE_DEBUG_RELIABILITY -double mca_bml_base_error_rate; +double mca_bml_base_error_rate_floor; +double mca_bml_base_error_rate_ceiling; int mca_bml_base_error_count; #endif @@ -45,11 +46,17 @@ int mca_bml_base_open( void ) { #if OMPI_ENABLE_DEBUG_RELIABILITY do { int param, value; - - mca_base_param_register_int("bml", NULL, "error_rate", "error_rate", 0); - param = mca_base_param_find("bml", NULL, "error_rate"); + + mca_base_param_register_int("bml", NULL, "error_rate_floor", "error_rate_floor", 0); + param = mca_base_param_find("bml", NULL, "error_rate_floor"); mca_base_param_lookup_int(param, &value); - mca_bml_base_error_rate = value; + mca_bml_base_error_rate_floor = value; + + mca_base_param_register_int("bml", NULL, "error_rate_ceiling", "error_rate_ceiling", 0); + param = mca_base_param_find("bml", NULL, "error_rate_ceiling"); + mca_base_param_lookup_int(param, &value); + mca_bml_base_error_rate_ceiling = value; + mca_base_param_register_int("bml", NULL, "srand", "srand", 1); param = mca_base_param_find("bml", NULL, "srand"); @@ -63,8 +70,9 @@ int mca_bml_base_open( void ) { } /* initialize count */ - if(mca_bml_base_error_rate > 0) { - mca_bml_base_error_count = (int) ((mca_bml_base_error_rate * rand())/(RAND_MAX+1.0)); + if(mca_bml_base_error_rate_ceiling > 0 + && mca_bml_base_error_rate_floor <= mca_bml_base_error_rate_ceiling) { + mca_bml_base_error_count = (int) ((mca_bml_base_error_rate_ceiling * rand())/(RAND_MAX+1.0)); } } while (0); #endif diff --git a/ompi/mca/bml/bml.h b/ompi/mca/bml/bml.h index 5605ab6435..660a187191 100644 --- a/ompi/mca/bml/bml.h +++ b/ompi/mca/bml/bml.h @@ -207,7 +207,7 @@ static inline mca_bml_base_btl_t* mca_bml_base_btl_array_find( /** * Structure associated w/ ompi_proc_t that contains the set - * of BTLs used to reach a destinationation + * of BTLs used to reach a destination */ struct mca_bml_base_endpoint_t { mca_pml_proc_t super; @@ -398,7 +398,8 @@ static inline void mca_bml_base_prepare_dst(mca_bml_base_btl_t* bml_btl, typedef struct mca_bml_base_module_t* (*mca_bml_base_component_init_fn_t)( int* priority, bool enable_progress_threads, - bool enable_mpi_threads + bool enable_mpi_threads, + opal_class_t* endpoint_class ); /** diff --git a/ompi/mca/bml/r2/bml_r2.c b/ompi/mca/bml/r2/bml_r2.c index 0e850e2e00..1b0d89600e 100644 --- a/ompi/mca/bml/r2/bml_r2.c +++ b/ompi/mca/bml/r2/bml_r2.c @@ -248,7 +248,8 @@ int mca_bml_r2_add_procs( /* allocate bml specific proc data */ - bml_endpoint = OBJ_NEW(mca_bml_base_endpoint_t); + bml_endpoint = (mca_bml_base_endpoint_t*) + opal_obj_new(mca_bml_r2.endpoint_class); if (NULL == bml_endpoint) { opal_output(0, "mca_bml_r2_add_procs: unable to allocate resources"); free(btl_endpoints); diff --git a/ompi/mca/bml/r2/bml_r2.h b/ompi/mca/bml/r2/bml_r2.h index ef1808b2f2..237256972a 100644 --- a/ompi/mca/bml/r2/bml_r2.h +++ b/ompi/mca/bml/r2/bml_r2.h @@ -52,7 +52,8 @@ struct mca_bml_r2_module_t { size_t num_btl_progress; mca_btl_base_component_progress_fn_t * btl_progress; mca_bml_r2_recv_reg_t r2_reg[256]; - bool btls_added; + bool btls_added; + opal_class_t * endpoint_class; }; typedef struct mca_bml_r2_module_t mca_bml_r2_module_t; @@ -65,7 +66,8 @@ extern int mca_bml_r2_component_close(void); extern mca_bml_base_module_t* mca_bml_r2_component_init( int* priority, bool enable_progress_threads, - bool enable_mpi_threads + bool enable_mpi_threads, + opal_class_t* endpoint_class ); extern int mca_bml_r2_progress(void); diff --git a/ompi/mca/bml/r2/bml_r2_component.c b/ompi/mca/bml/r2/bml_r2_component.c index db7ec420b9..3a6c2f84d4 100644 --- a/ompi/mca/bml/r2/bml_r2_component.c +++ b/ompi/mca/bml/r2/bml_r2_component.c @@ -83,7 +83,8 @@ int mca_bml_r2_component_close(void) mca_bml_base_module_t* mca_bml_r2_component_init( int* priority, bool enable_progress_threads, - bool enable_mpi_threads + bool enable_mpi_threads, + opal_class_t* endpoint_class ) { /* initialize BTLs */ @@ -93,5 +94,6 @@ mca_bml_base_module_t* mca_bml_r2_component_init( *priority = 100; mca_bml_r2.btls_added = false; + mca_bml_r2.endpoint_class = endpoint_class; return &mca_bml_r2.super; } diff --git a/ompi/mca/mpool/base/mpool_base_init.c b/ompi/mca/mpool/base/mpool_base_init.c index e4288f437e..d8494ed44c 100644 --- a/ompi/mca/mpool/base/mpool_base_init.c +++ b/ompi/mca/mpool/base/mpool_base_init.c @@ -31,8 +31,6 @@ OBJ_CLASS_INSTANCE(mca_mpool_base_selected_module_t, opal_list_item_t, NULL, NUL static bool mca_mpool_enable_progress_threads = true; static bool mca_mpool_enable_mpi_threads = true; -OBJ_CLASS_INSTANCE(mca_mpool_base_chunk_t, opal_list_item_t, NULL, NULL); - /** * Function for weeding out mpool modules that don't want to run. * diff --git a/ompi/mca/pml/dr/Makefile.am b/ompi/mca/pml/dr/Makefile.am index 04131ffed2..5b96c76a19 100644 --- a/ompi/mca/pml/dr/Makefile.am +++ b/ompi/mca/pml/dr/Makefile.am @@ -24,6 +24,8 @@ dr_sources = \ pml_dr_comm.h \ pml_dr_component.c \ pml_dr_component.h \ + pml_dr_endpoint.c \ + pml_dr_endpoint.h \ pml_dr_hdr.h \ pml_dr_iprobe.c \ pml_dr_irecv.c \ diff --git a/ompi/mca/pml/dr/pml_dr.c b/ompi/mca/pml/dr/pml_dr.c index 4f61b46650..2d222e4e5f 100644 --- a/ompi/mca/pml/dr/pml_dr.c +++ b/ompi/mca/pml/dr/pml_dr.c @@ -1,3 +1,4 @@ + /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology @@ -34,6 +35,7 @@ #include "pml_dr_sendreq.h" #include "pml_dr_recvreq.h" #include "ompi/mca/bml/base/base.h" +#include "orte/mca/ns/ns.h" mca_pml_dr_t mca_pml_dr = { { @@ -111,8 +113,9 @@ int mca_pml_dr_del_comm(ompi_communicator_t* comm) int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs) { ompi_bitmap_t reachable; - struct mca_bml_base_endpoint_t ** bml_endpoints = NULL; + struct mca_pml_dr_endpoint_t ** endpoints = NULL; int rc; + size_t i; if(nprocs == 0) return OMPI_SUCCESS; @@ -122,16 +125,16 @@ int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs) if(OMPI_SUCCESS != rc) return rc; - bml_endpoints = (struct mca_bml_base_endpoint_t **) malloc ( nprocs * - sizeof(struct mca_bml_base_endpoint_t*)); - if ( NULL == bml_endpoints ) { + endpoints = (struct mca_pml_dr_endpoint_t **) malloc ( nprocs * + sizeof(struct mca_pml_dr_endpoint_t*)); + if ( NULL == endpoints ) { return OMPI_ERR_OUT_OF_RESOURCE; } - + rc = mca_bml.bml_add_procs( nprocs, procs, - bml_endpoints, + (mca_bml_base_endpoint_t**) endpoints, &reachable ); if(OMPI_SUCCESS != rc) @@ -151,10 +154,28 @@ int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs) mca_pml_dr.free_list_max, mca_pml_dr.free_list_inc, NULL); - - if ( NULL != bml_endpoints ) { - free ( bml_endpoints) ; + for(i = 0; i < nprocs; i++) { + int idx; + /* this won't work for comm spawn and other dynamic + processes, but will work for initial job start */ + idx = ompi_pointer_array_add(&mca_pml_dr.procs, + (void*) endpoints[i]); + if(orte_ns.compare(ORTE_NS_CMP_ALL, + orte_process_info.my_name, + &endpoints[i]->base.super.proc_ompi->proc_name) == 0) { + mca_pml_dr.my_rank = idx; + } + endpoints[i]->local = endpoints[i]->dst = idx; } + + for(i = 0; i < nprocs; i++) { + endpoints[i]->src = mca_pml_dr.my_rank; + } + + /* no longer need this */ + if ( NULL != endpoints ) { + free ( endpoints) ; + } return rc; } diff --git a/ompi/mca/pml/dr/pml_dr.h b/ompi/mca/pml/dr/pml_dr.h index 00f97ddf55..98b712ac90 100644 --- a/ompi/mca/pml/dr/pml_dr.h +++ b/ompi/mca/pml/dr/pml_dr.h @@ -33,6 +33,7 @@ #include "ompi/mca/pml/base/pml_base_request.h" #include "ompi/mca/pml/base/pml_base_bsend.h" #include "ompi/mca/pml/base/pml_base_sendreq.h" +#include "ompi/class/ompi_pointer_array.h" #include "ompi/mca/btl/btl.h" #include "ompi/datatype/datatype.h" @@ -71,7 +72,13 @@ struct mca_pml_dr_t { ompi_free_list_t recv_frags; ompi_free_list_t vfrags; ompi_free_list_t buffers; - + + /* proc pointer array */ + ompi_pointer_array_t procs; + + /* my 'global' rank */ + int32_t my_rank; + int timer_wdog_sec; int timer_wdog_usec; int timer_wdog_multiplier; diff --git a/ompi/mca/pml/dr/pml_dr_comm.c b/ompi/mca/pml/dr/pml_dr_comm.c index 64ee03f988..1c4f367f40 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.c +++ b/ompi/mca/pml/dr/pml_dr_comm.c @@ -21,20 +21,17 @@ #include "pml_dr.h" #include "pml_dr_comm.h" - +#include "pml_dr_endpoint.h" static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc) { proc->expected_sequence = 1; - proc->vfrag_id = 1; proc->send_sequence = 0; OBJ_CONSTRUCT(&proc->frags_cant_match, opal_list_t); OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t); OBJ_CONSTRUCT(&proc->matched_receives, opal_list_t); OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t); - OBJ_CONSTRUCT(&proc->seq_sends, ompi_seq_tracker_t); - OBJ_CONSTRUCT(&proc->seq_recvs, ompi_seq_tracker_t); - OBJ_CONSTRUCT(&proc->seq_recvs_matched, ompi_seq_tracker_t); + } @@ -44,9 +41,6 @@ static void mca_pml_dr_comm_proc_destruct(mca_pml_dr_comm_proc_t* proc) OBJ_DESTRUCT(&proc->matched_receives); OBJ_DESTRUCT(&proc->specific_receives); OBJ_DESTRUCT(&proc->unexpected_frags); - OBJ_DESTRUCT(&proc->seq_sends); - OBJ_DESTRUCT(&proc->seq_recvs); - OBJ_DESTRUCT(&proc->seq_recvs_matched); } @@ -60,6 +54,7 @@ static void mca_pml_dr_comm_construct(mca_pml_dr_comm_t* comm) { OBJ_CONSTRUCT(&comm->wild_receives, opal_list_t); OBJ_CONSTRUCT(&comm->matching_lock, opal_mutex_t); + OBJ_CONSTRUCT(&comm->sparse_procs, ompi_pointer_array_t); comm->recv_sequence = 0; comm->procs = NULL; comm->num_procs = 0; @@ -69,12 +64,16 @@ static void mca_pml_dr_comm_construct(mca_pml_dr_comm_t* comm) static void mca_pml_dr_comm_destruct(mca_pml_dr_comm_t* comm) { size_t i; - for(i=0; inum_procs; i++) + for(i=0; inum_procs; i++) { OBJ_DESTRUCT((&comm->procs[i])); - if(NULL != comm->procs) + } + if(NULL != comm->procs) { free(comm->procs); + } + OBJ_DESTRUCT(&comm->wild_receives); OBJ_DESTRUCT(&comm->matching_lock); + OBJ_DESTRUCT(&comm->sparse_procs); } @@ -89,18 +88,30 @@ int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_c { size_t i; size_t size = ompi_comm->c_remote_group->grp_proc_count; - + /* send message sequence-number support - sender side */ dr_comm->procs = malloc(sizeof(mca_pml_dr_comm_proc_t)*size); if(NULL == dr_comm->procs) { return OMPI_ERR_OUT_OF_RESOURCE; } for(i=0; iprocs+i, mca_pml_dr_comm_proc_t); - dr_comm->procs[i].ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i]; + mca_pml_dr_comm_proc_t* proc; + mca_pml_dr_endpoint_t* ep; + ompi_proc_t* ompi_proc; + proc = dr_comm->procs+i; + OBJ_CONSTRUCT(proc, mca_pml_dr_comm_proc_t); + proc->comm_rank = i; + ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i]; + proc->ompi_proc = ompi_proc; + ep = (mca_pml_dr_endpoint_t*) ompi_proc->proc_pml; + ompi_pointer_array_set_item(&dr_comm->sparse_procs, + ep->dst, /* from our view this is the + peers source 'global rank' */ + proc); + proc->endpoint = ep; + } dr_comm->num_procs = size; - return OMPI_SUCCESS; } diff --git a/ompi/mca/pml/dr/pml_dr_comm.h b/ompi/mca/pml/dr/pml_dr_comm.h index bd31b7b633..65c22bafb9 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.h +++ b/ompi/mca/pml/dr/pml_dr_comm.h @@ -27,6 +27,7 @@ #include "ompi/class/ompi_seq_tracker.h" #include "ompi/communicator/communicator.h" #include "ompi/proc/proc.h" +#include "pml_dr_endpoint.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif @@ -35,7 +36,6 @@ extern "C" { struct mca_pml_dr_comm_proc_t { opal_object_t super; uint16_t expected_sequence; /**< send message sequence number - receiver side */ - uint32_t vfrag_id; /**< virtual fragment identifier */ #if OMPI_HAVE_THREAD_SUPPORT volatile int32_t send_sequence; /**< send side sequence number */ #else @@ -46,9 +46,8 @@ struct mca_pml_dr_comm_proc_t { opal_list_t unexpected_frags; /**< unexpected fragment queues */ opal_list_t matched_receives; /**< list of in-progress matched receives */ ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */ - ompi_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */ - ompi_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */ - ompi_seq_tracker_t seq_recvs_matched; /**< Tracks the received vfrags that have been matched */ + mca_pml_dr_endpoint_t* endpoint; /**< back pointer to the endpoint */ + int32_t comm_rank; /**< rank in the communicator */ }; typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t; @@ -65,6 +64,7 @@ struct mca_pml_comm_t { #endif opal_mutex_t matching_lock; /**< matching lock */ opal_list_t wild_receives; /**< queue of unmatched wild (source process not specified) receives */ + ompi_pointer_array_t sparse_procs; /**< sparse array, allows lookup of comm_proc using a global rank */ mca_pml_dr_comm_proc_t* procs; size_t num_procs; }; diff --git a/ompi/mca/pml/dr/pml_dr_component.c b/ompi/mca/pml/dr/pml_dr_component.c index 72f0cbab9b..0babda03d5 100644 --- a/ompi/mca/pml/dr/pml_dr_component.c +++ b/ompi/mca/pml/dr/pml_dr_component.c @@ -31,6 +31,7 @@ #include "pml_dr_sendreq.h" #include "pml_dr_recvreq.h" #include "pml_dr_recvfrag.h" +#include "pml_dr_endpoint.h" #include "ompi/mca/bml/base/base.h" @@ -154,7 +155,7 @@ int mca_pml_dr_component_open(void) OBJ_CONSTRUCT(&mca_pml_dr.send_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_dr.acks_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_dr.buffers, ompi_free_list_t); - + OBJ_CONSTRUCT(&mca_pml_dr.procs, ompi_pointer_array_t); OBJ_CONSTRUCT(&mca_pml_dr.lock, opal_mutex_t); mca_pml_dr.enabled = false; @@ -193,11 +194,17 @@ mca_pml_base_module_t* mca_pml_dr_component_init(int* priority, opal_output(0, "mca_pml_dr_component_init: mca_pml_bsend_init failed\n"); return NULL; } - - if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, enable_mpi_threads)) + + if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, + enable_mpi_threads, + OBJ_CLASS(mca_pml_dr_endpoint_t) + )) { return NULL; + } + mca_pml_dr.super.pml_progress = mca_bml.bml_progress; + return &mca_pml_dr.super; } diff --git a/ompi/mca/pml/dr/pml_dr_endpoint.c b/ompi/mca/pml/dr/pml_dr_endpoint.c new file mode 100644 index 0000000000..93c6246469 --- /dev/null +++ b/ompi/mca/pml/dr/pml_dr_endpoint.c @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "pml_dr.h" +#include "pml_dr_endpoint.h" + +static void mca_pml_dr_endpoint_construct(mca_pml_dr_endpoint_t* ep) +{ + OBJ_CONSTRUCT(&ep->seq_sends, ompi_seq_tracker_t); + OBJ_CONSTRUCT(&ep->seq_recvs, ompi_seq_tracker_t); + OBJ_CONSTRUCT(&ep->seq_recvs_matched, ompi_seq_tracker_t); +} + + +static void mca_pml_dr_endpoint_destruct(mca_pml_dr_endpoint_t* ep) +{ + OBJ_DESTRUCT(&ep->seq_sends); + OBJ_DESTRUCT(&ep->seq_recvs); + OBJ_DESTRUCT(&ep->seq_recvs_matched); +} + + +OBJ_CLASS_INSTANCE( + mca_pml_dr_endpoint_t, + mca_bml_base_endpoint_t, + mca_pml_dr_endpoint_construct, + mca_pml_dr_endpoint_destruct); diff --git a/ompi/mca/pml/dr/pml_dr_endpoint.h b/ompi/mca/pml/dr/pml_dr_endpoint.h new file mode 100644 index 0000000000..32e6e30ec5 --- /dev/null +++ b/ompi/mca/pml/dr/pml_dr_endpoint.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ +#ifndef MCA_PML_ENDPOINT_H +#define MCA_PML_ENDPOINT_H + +#include "ompi/mca/bml/bml.h" +#include "ompi/class/ompi_seq_tracker.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif +/** + * This is the pml level endpoint + * simply inherity the bml_base_endpoint and + * add whatever else is needed + */ +struct mca_pml_dr_endpoint_t { + mca_bml_base_endpoint_t base; + int32_t local; /* local view of the rank */ + int32_t src; /* peers view of the src rank */ + int32_t dst; /* peers destination rank */ + ompi_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */ + ompi_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */ + ompi_seq_tracker_t seq_recvs_matched; /**< Tracks the received vfrags that have been matched */ + uint32_t vfrag_seq; /**< current virtual fragment identifier sequence */ +}; +typedef struct mca_pml_dr_endpoint_t mca_pml_dr_endpoint_t; +OBJ_CLASS_DECLARATION(mca_pml_dr_endpoint_t); + + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif + diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index 469d7db103..ebc06af47b 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -40,13 +40,29 @@ #define MCA_PML_DR_HDR_VALIDATE(hdr, type) \ do { \ + ompi_communicator_t* comm = NULL; \ uint16_t csum = opal_csum(hdr, sizeof(type)); \ if(hdr->hdr_common.hdr_csum != csum) { \ OPAL_OUTPUT((0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n", \ __FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum)); \ return; \ } \ - if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) { \ + comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \ + if(NULL == comm ) { \ + mca_pml_dr_endpoint_t* ep; \ + OPAL_OUTPUT((0, "%s:%d: communicator not found\n", \ + __FILE__, __LINE__)); \ + ep = ompi_pointer_array_get_item(&mca_pml_dr.procs, \ + hdr->hdr_common.hdr_src); \ + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {\ + mca_pml_dr_recv_frag_ack(&ep->base, \ + &hdr->hdr_common, \ + hdr->hdr_match.hdr_src_ptr.pval, \ + 1, 0); \ + } \ + return; \ + } \ + if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) { \ OPAL_OUTPUT((0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n", \ __FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst)); \ return; \ @@ -54,7 +70,7 @@ do { } while (0) -#define MCA_PML_DR_COMM_PROC_LOOKUP(hdr, comm, proc) \ +#define MCA_PML_DR_COMM_PROC_LOOKUP(hdr, comm, proc, ep) \ do { \ ompi_communicator_t* comm_ptr=ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \ if(NULL == comm_ptr) { \ @@ -63,7 +79,8 @@ do { return; \ } \ comm = (mca_pml_dr_comm_t*)comm_ptr->c_pml_comm; \ - proc = comm->procs + hdr->hdr_common.hdr_src; \ + proc = ompi_pointer_array_get_item(&comm->sparse_procs, hdr->hdr_common.hdr_src); \ + ep = proc->endpoint; \ } while (0) @@ -110,6 +127,7 @@ void mca_pml_dr_recv_frag_callback( mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval; mca_pml_dr_comm_t *comm; mca_pml_dr_comm_proc_t *proc; + mca_pml_dr_endpoint_t *ep; if(segments->seg_len < sizeof(mca_pml_dr_common_hdr_t)) { return; } @@ -118,14 +136,14 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_MATCH: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_match_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); - + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); + /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__)); - mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml, + mca_pml_dr_recv_frag_ack(&ep->base, &hdr->hdr_common, hdr->hdr_match.hdr_src_ptr.pval, 1, 0); @@ -138,11 +156,11 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_MATCH_ACK: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack); } else { @@ -153,11 +171,11 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_RNDV: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_rendezvous_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)){ + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)){ /* ack only if the vfrag has been matched */ mca_pml_dr_recv_request_t* recvreq = mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid); @@ -167,9 +185,9 @@ void mca_pml_dr_recv_frag_callback( mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, hdr->hdr_match.hdr_src_ptr, recvreq->req_bytes_received, 1); } else { - if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs_matched, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) { OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n", __FILE__, __LINE__)); - mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*) proc->ompi_proc->proc_pml, + mca_pml_dr_recv_frag_ack(&ep->base, &hdr->hdr_common, hdr->hdr_match.hdr_src_ptr.pval, ~(uint64_t) 0, hdr->hdr_rndv.hdr_msg_length); @@ -186,11 +204,11 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_RNDV_ACK: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack); } else { @@ -202,14 +220,14 @@ void mca_pml_dr_recv_frag_callback( { mca_pml_dr_recv_request_t* recvreq; MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_frag_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__)); - mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml, + mca_pml_dr_recv_frag_ack(&ep->base, &hdr->hdr_common, hdr->hdr_frag.hdr_src_ptr.pval, ~(uint64_t) 0, 0); @@ -224,11 +242,11 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_FRAG_ACK: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack); } else { @@ -545,7 +563,8 @@ bool mca_pml_dr_recv_frag_match( ompi_proc_t* ompi_proc = proc->ompi_proc; int rc; uint32_t csum = OPAL_CSUM_ZERO; - + mca_pml_dr_endpoint_t* ep = (mca_pml_dr_endpoint_t*) proc->endpoint; + /* source sequence number */ frag_msg_seq = hdr->hdr_seq; @@ -668,7 +687,7 @@ rematch: opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag); } - ompi_seq_tracker_insert(&proc->seq_recvs, hdr->hdr_common.hdr_vid); + ompi_seq_tracker_insert(&ep->seq_recvs, hdr->hdr_common.hdr_vid); OPAL_THREAD_UNLOCK(&comm->matching_lock); /* release matching lock before processing fragment */ @@ -720,7 +739,7 @@ void mca_pml_dr_recv_frag_ack( ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type; ack->hdr_common.hdr_flags = 0; - ack->hdr_common.hdr_src = comm->c_my_rank; + ack->hdr_common.hdr_src = hdr->hdr_dst; ack->hdr_common.hdr_dst = hdr->hdr_src; ack->hdr_common.hdr_vid = hdr->hdr_vid; ack->hdr_common.hdr_ctx = hdr->hdr_ctx; @@ -828,7 +847,9 @@ rematch: * look only at "specific" receives, or "wild" receives, * or if we need to traverse both sets at the same time. */ - proc = comm->procs + hdr->hdr_common.hdr_src; + proc = ompi_pointer_array_get_item(&comm->sparse_procs, + hdr->hdr_common.hdr_src); + if (opal_list_get_size(&proc->specific_receives) == 0 ) { /* * There are only wild irecvs, so specialize the algorithm. @@ -868,7 +889,7 @@ rematch: * descriptor */ frag->request=match; match->req_proc = proc; - match->req_endpoint = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml; + match->req_endpoint = (mca_pml_dr_endpoint_t*)proc->ompi_proc->proc_pml; /* add this fragment descriptor to the list of * descriptors to be processed later diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index e8ef04de75..9ffe507f40 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -41,7 +41,7 @@ if(csum != hdr->hdr_match.hdr_csum) { \ } else { \ mca_pml_dr_recv_request_match_specific(recvreq); \ } \ - mca_pml_dr_recv_frag_ack(recvreq->req_endpoint, \ + mca_pml_dr_recv_frag_ack(&recvreq->req_endpoint->base, \ &hdr->hdr_common, \ hdr->hdr_match.hdr_src_ptr.pval, \ 0, 0); \ @@ -53,7 +53,7 @@ if(csum != hdr->hdr_match.hdr_csum) { \ bytes_received = bytes_delivered = 0; \ } else if (recvreq->req_acked == false) { \ mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, \ - hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \ + hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \ } @@ -92,8 +92,8 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i if( request->req_recv.req_base.req_peer == OMPI_ANY_SOURCE ) { opal_list_remove_item( &comm->wild_receives, (opal_list_item_t*)request ); } else { - mca_pml_dr_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer; - opal_list_remove_item(&proc->specific_receives, (opal_list_item_t*)request); + mca_pml_dr_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer; + opal_list_remove_item(&proc->specific_receives, (opal_list_item_t*)request); } } OPAL_THREAD_UNLOCK(&comm->matching_lock); @@ -167,7 +167,7 @@ void mca_pml_dr_recv_request_ack( int rc; /* allocate descriptor */ - bml_btl = mca_bml_base_btl_array_get_next(&recvreq->req_endpoint->btl_eager); + bml_btl = mca_bml_base_btl_array_get_next(&recvreq->req_endpoint->base.btl_eager); MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t)); if(NULL == des) { return; @@ -177,8 +177,8 @@ void mca_pml_dr_recv_request_ack( ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type; ack->hdr_common.hdr_flags = 0; - ack->hdr_common.hdr_dst = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;; - ack->hdr_common.hdr_src = recvreq->req_recv.req_base.req_comm->c_my_rank; + ack->hdr_common.hdr_dst = recvreq->req_endpoint->dst; + ack->hdr_common.hdr_src = recvreq->req_endpoint->src; ack->hdr_common.hdr_ctx = recvreq->req_recv.req_base.req_comm->c_contextid; ack->hdr_common.hdr_vid = hdr->hdr_vid; ack->hdr_vlen = vlen; @@ -255,7 +255,7 @@ void mca_pml_dr_recv_request_progress( bytes_received, bytes_delivered, csum); - MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received); + MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received); break; case MCA_PML_DR_HDR_TYPE_FRAG: @@ -290,7 +290,7 @@ void mca_pml_dr_recv_request_progress( if((vfrag->vf_pending & vfrag->vf_mask) == vfrag->vf_mask) { /* we have received all the pieces of the vfrag, ack everything that passed the checksum */ - ompi_seq_tracker_insert(&recvreq->req_proc->seq_recvs, vfrag->vf_id); + ompi_seq_tracker_insert(&recvreq->req_endpoint->seq_recvs, vfrag->vf_id); mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask); } @@ -347,7 +347,7 @@ void mca_pml_dr_recv_request_matched_probe( /* check completion status */ OPAL_THREAD_LOCK(&ompi_request_lock); recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag; - recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_common.hdr_src; + recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = recvreq->req_proc->comm_rank; recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed; recvreq->req_recv.req_base.req_pml_complete = true; recvreq->req_recv.req_base.req_ompi.req_complete = true; diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.h b/ompi/mca/pml/dr/pml_dr_recvreq.h index 8794f73583..b6a49b4f14 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.h +++ b/ompi/mca/pml/dr/pml_dr_recvreq.h @@ -44,7 +44,7 @@ struct mca_pml_dr_recv_request_t { /* filled in after match */ struct mca_pml_dr_comm_proc_t* req_proc; - struct mca_bml_base_endpoint_t* req_endpoint; + struct mca_pml_dr_endpoint_t* req_endpoint; opal_mutex_t* req_mutex; /* vfrag state */ @@ -220,13 +220,13 @@ do { do { \ (request)->req_mutex = &comm->matching_lock; \ (request)->req_proc = proc; \ - (request)->req_endpoint = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml; \ + (request)->req_endpoint = (mca_pml_dr_endpoint_t*)proc->ompi_proc->proc_pml; \ (request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \ - (request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = \ - (hdr)->hdr_common.hdr_src; \ + (request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = proc->comm_rank; \ (request)->req_vfrag0.vf_id = (hdr)->hdr_common.hdr_vid; \ opal_list_append(&proc->matched_receives, (opal_list_item_t*)request); \ - ompi_seq_tracker_insert(&proc->seq_recvs_matched, (hdr)->hdr_common.hdr_vid); \ + ompi_seq_tracker_insert(&request->req_endpoint->seq_recvs_matched, \ + (hdr)->hdr_common.hdr_vid); \ } while(0) diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index ed71b68360..b991d3e8bb 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -132,7 +132,7 @@ static void mca_pml_dr_match_completion( /* update statistics and complete */ sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed; - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); /* on negative ack need to retransmit */ @@ -187,7 +187,7 @@ static void mca_pml_dr_rndv_completion( } /* update statistics and complete */ - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } else { @@ -250,7 +250,7 @@ static void mca_pml_dr_frag_completion( } /* record vfrag id to drop duplicate acks */ - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); /* return this vfrag */ MCA_PML_DR_VFRAG_RETURN(vfrag); @@ -370,8 +370,8 @@ int mca_pml_dr_send_request_start_buffered( hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; @@ -452,9 +452,9 @@ int mca_pml_dr_send_request_start_copy( hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OPAL_CSUM_ZERO; @@ -521,9 +521,9 @@ int mca_pml_dr_send_request_start_prepare( hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OPAL_CSUM_ZERO; @@ -595,8 +595,8 @@ int mca_pml_dr_send_request_start_rndv( hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; @@ -640,7 +640,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) * the scheduling logic once for every call. */ - mca_bml_base_endpoint_t* bml_endpoint = sendreq->req_endpoint; + mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; assert(sendreq->req_vfrag0.vf_recv.pval != NULL); if(OPAL_THREAD_ADD32(&sendreq->req_lock,1) == 1) { do { @@ -662,7 +662,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) /* do we need to allocate a new vfrag (we scheduled all the vfrag already) */ if(vfrag->vf_size == bytes_sent) { - bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + bml_btl = mca_bml_base_btl_array_get_next(&endpoint->base.btl_send); MCA_PML_DR_VFRAG_ALLOC(vfrag,rc); if(NULL == vfrag) { OPAL_THREAD_LOCK(&mca_pml_dr.lock); @@ -670,7 +670,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); break; } - MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag); + MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,endpoint,bytes_remaining,vfrag); MCA_PML_DR_VFRAG_WDOG_START(vfrag); vfrag->bml_btl = bml_btl; bytes_sent = 0; @@ -711,9 +711,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; hdr->hdr_common.hdr_vid = vfrag->vf_id; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_vlen = vfrag->vf_len; hdr->hdr_frag_idx = vfrag->vf_idx; @@ -800,9 +800,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; hdr->hdr_common.hdr_vid = vfrag->vf_id; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_vlen = vfrag->vf_len; hdr->hdr_frag_idx = vfrag->vf_idx; @@ -875,7 +875,7 @@ void mca_pml_dr_send_request_match_ack( /* update statistics */ sendreq->req_bytes_delivered = vfrag->vf_size; - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } @@ -941,7 +941,7 @@ void mca_pml_dr_send_request_rndv_ack( } /* stash the vfrag id for duplicate acks.. */ - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); OPAL_THREAD_UNLOCK(&ompi_request_lock); if(schedule) { @@ -998,7 +998,7 @@ void mca_pml_dr_send_request_frag_ack( assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed); /* stash the vfid for duplicate acks.. */ - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); /* return vfrag */ MCA_PML_DR_VFRAG_RETURN(vfrag); diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index 19511c50cb..af769e6a25 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -33,6 +33,7 @@ #include "pml_dr_comm.h" #include "pml_dr_hdr.h" #include "pml_dr_vfrag.h" +#include "pml_dr_endpoint.h" #include "opal/event/event.h" #if defined(c_plusplus) || defined(__cplusplus) @@ -42,7 +43,7 @@ extern "C" { struct mca_pml_dr_send_request_t { mca_pml_base_send_request_t req_send; mca_pml_dr_comm_proc_t* req_proc; - mca_bml_base_endpoint_t* req_endpoint; + mca_pml_dr_endpoint_t* req_endpoint; #if OMPI_HAVE_THREAD_SUPPORT volatile int32_t req_state; volatile int32_t req_lock; @@ -143,9 +144,10 @@ do { #define MCA_PML_DR_SEND_REQUEST_START(sendreq, rc) \ do { \ mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \ - mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \ - mca_bml_base_endpoint_t* endpoint = \ - (mca_bml_base_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \ + mca_pml_dr_endpoint_t* endpoint = \ + (mca_pml_dr_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \ + mca_pml_dr_comm_proc_t* proc = \ + comm->procs + sendreq->req_send.req_base.req_peer; \ mca_bml_base_btl_t* bml_btl; \ size_t size = sendreq->req_send.req_bytes_packed; \ size_t eager_limit; \ @@ -154,9 +156,9 @@ do { break; \ } \ \ - bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ + bml_btl = mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \ MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \ - sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ + sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&endpoint->vfrag_seq,1); \ sendreq->req_vfrag = &sendreq->req_vfrag0; \ sendreq->req_endpoint = endpoint; \ sendreq->req_proc = proc; \ @@ -274,8 +276,8 @@ do { #define MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq, endpoint, size, vfrag) \ do { \ mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \ - mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \ - size_t max_send_size = endpoint->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t); \ + size_t max_send_size = endpoint->base.btl_max_send_size - \ + sizeof(mca_pml_dr_frag_hdr_t); \ size_t div = size / max_send_size; \ \ MCA_PML_DR_VFRAG_INIT(vfrag); \ @@ -301,7 +303,7 @@ do { else \ vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \ } \ - vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ + vfrag->vf_id = OPAL_THREAD_ADD32(&endpoint->vfrag_seq,1); \ vfrag->vf_offset = sendreq->req_send_offset; \ vfrag->vf_max_send_size = max_send_size; \ vfrag->vf_send.pval = sendreq; \ @@ -354,8 +356,9 @@ do { \ #define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \ do { \ - mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \ - mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ + mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; \ + mca_bml_base_btl_t* bml_btl = \ + mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \ mca_btl_base_descriptor_t *des_old, *des_new; \ \ if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \ @@ -384,8 +387,9 @@ do { \ #define MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag) \ do { \ - mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \ - mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ + mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; \ + mca_bml_base_btl_t* bml_btl = \ + mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \ mca_btl_base_descriptor_t *des_old, *des_new; \ mca_pml_dr_hdr_t *hdr; \ \ @@ -405,9 +409,9 @@ do { \ hdr = (mca_pml_dr_hdr_t*)des_new->des_src->seg_addr.pval; \ hdr->hdr_common.hdr_flags = 0; \ hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; \ - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; \ + hdr->hdr_common.hdr_dst = endpoint->dst; \ hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \ - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \ + hdr->hdr_common.hdr_src = endpoint->src; \ hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \ hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \ hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; \ diff --git a/ompi/mca/pml/ob1/pml_ob1_component.c b/ompi/mca/pml/ob1/pml_ob1_component.c index 2a27ce961e..44c535e45e 100644 --- a/ompi/mca/pml/ob1/pml_ob1_component.c +++ b/ompi/mca/pml/ob1/pml_ob1_component.c @@ -222,9 +222,11 @@ mca_pml_base_module_t* mca_pml_ob1_component_init(int* priority, } - if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, enable_mpi_threads)) + if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, + enable_mpi_threads, + OBJ_CLASS(mca_bml_base_endpoint_t))) { return NULL; - + } /* As our own progress function does nothing except calling the BML * progress, let's modify the progress function pointer in our structure * to avoid useless functions calls. The event library will instead call diff --git a/opal/class/opal_object.h b/opal/class/opal_object.h index 0993c2e190..f40883132e 100644 --- a/opal/class/opal_object.h +++ b/opal/class/opal_object.h @@ -163,6 +163,7 @@ struct opal_class_t { /**< array of parent class constructors */ opal_destruct_t *cls_destruct_array; /**< array of parent class destructors */ + size_t cls_sizeof; /**< size of an object instance */ }; /** @@ -207,7 +208,8 @@ struct opal_object_t { OBJ_CLASS(PARENT), \ (opal_construct_t) CONSTRUCTOR, \ (opal_destruct_t) DESTRUCTOR, \ - 0, 0, NULL, NULL \ + 0, 0, NULL, NULL, \ + sizeof(NAME) \ } @@ -229,20 +231,20 @@ struct opal_object_t { * @param type Type (class) of the object * @return Pointer to the object */ -static inline opal_object_t *opal_obj_new(size_t size, opal_class_t * cls); +static inline opal_object_t *opal_obj_new(opal_class_t * cls); #if OMPI_ENABLE_DEBUG -static inline opal_object_t *opal_obj_new_debug(size_t obj_size, opal_class_t* type, const char* file, int line) +static inline opal_object_t *opal_obj_new_debug(opal_class_t* type, const char* file, int line) { - opal_object_t* object = opal_obj_new(obj_size, type); + opal_object_t* object = opal_obj_new(type); object->cls_init_file_name = file; object->cls_init_lineno = line; return object; } #define OBJ_NEW(type) \ - ((type *)opal_obj_new_debug(sizeof(type), OBJ_CLASS(type), __FILE__, __LINE__)) + ((type *)opal_obj_new_debug(OBJ_CLASS(type), __FILE__, __LINE__)) #else #define OBJ_NEW(type) \ - ((type *) opal_obj_new(sizeof(type), OBJ_CLASS(type))) + ((type *) opal_obj_new(OBJ_CLASS(type))) #endif /* OMPI_ENABLE_DEBUG */ /** @@ -416,13 +418,12 @@ static inline void opal_obj_run_destructors(opal_object_t * object) * @param cls Pointer to the class descriptor of this object * @return Pointer to the object */ -static inline opal_object_t *opal_obj_new(size_t size, opal_class_t * cls) +static inline opal_object_t *opal_obj_new(opal_class_t * cls) { opal_object_t *object; + assert(cls->cls_sizeof >= sizeof(opal_object_t)); - assert(size >= sizeof(opal_object_t)); - - object = (opal_object_t *) malloc(size); + object = (opal_object_t *) malloc(cls->cls_sizeof); if (0 == cls->cls_initialized) { opal_class_initialize(cls); }