diff --git a/ompi/mca/bml/base/base.h b/ompi/mca/bml/base/base.h index 7a1487d25a..cc0c157ea1 100644 --- a/ompi/mca/bml/base/base.h +++ b/ompi/mca/bml/base/base.h @@ -49,7 +49,9 @@ OBJ_CLASS_DECLARATION(mca_bml_base_selected_module_t); */ OMPI_DECLSPEC int mca_bml_base_open(void); -OMPI_DECLSPEC int mca_bml_base_init(bool enable_progress_threads, bool enable_mpi_threads); +OMPI_DECLSPEC int mca_bml_base_init(bool enable_progress_threads, + bool enable_mpi_threads, + opal_class_t* endpoint_class); OMPI_DECLSPEC int mca_bml_base_close(void); diff --git a/ompi/mca/bml/base/bml_base_btl.c b/ompi/mca/bml/base/bml_base_btl.c index 8fb152960b..571dd2a9c9 100644 --- a/ompi/mca/bml/base/bml_base_btl.c +++ b/ompi/mca/bml/base/bml_base_btl.c @@ -64,7 +64,8 @@ int mca_bml_base_btl_array_reserve(mca_bml_base_btl_array_t* array, size_t size) #if OMPI_ENABLE_DEBUG_RELIABILITY -extern double mca_bml_base_error_rate; +extern double mca_bml_base_error_rate_floor; +extern double mca_bml_base_error_rate_ceiling; extern int mca_bml_base_error_count; struct mca_bml_base_context_t { @@ -97,8 +98,11 @@ int mca_bml_base_send( { static int count; des->des_context = bml_btl; - if(mca_bml_base_error_count <= 0 && mca_bml_base_error_rate > 0) { - mca_bml_base_error_count = (int) ((mca_bml_base_error_rate * rand())/(RAND_MAX+1.0)); + if(mca_bml_base_error_count <= 0 && mca_bml_base_error_rate_ceiling > 0) { + mca_bml_base_error_count = (int) ((mca_bml_base_error_rate_ceiling * rand())/(RAND_MAX+1.0)); + if(mca_bml_base_error_count < mca_bml_base_error_rate_floor) { + mca_bml_base_error_count = mca_bml_base_error_rate_floor; + } if(mca_bml_base_error_count % 2) { /* local completion - network "drops" packet */ opal_output(0, "%s:%d: dropping data\n", __FILE__, __LINE__); diff --git a/ompi/mca/bml/base/bml_base_init.c b/ompi/mca/bml/base/bml_base_init.c index 27617afb42..ca01093bb8 100644 --- a/ompi/mca/bml/base/bml_base_init.c +++ b/ompi/mca/bml/base/bml_base_init.c @@ -39,7 +39,8 @@ mca_bml_base_component_t mca_bml_component; int mca_bml_base_init( bool enable_progress_threads, - bool enable_mpi_threads ) { + bool enable_mpi_threads, + opal_class_t* endpoint_class) { opal_list_item_t *item = NULL; mca_bml_base_component_t *component = NULL, *best_component = NULL; mca_bml_base_module_t *module = NULL, *best_module = NULL; @@ -59,7 +60,8 @@ int mca_bml_base_init( bool enable_progress_threads, } module = component->bml_init(&priority, enable_progress_threads, - enable_mpi_threads ); + enable_mpi_threads, + endpoint_class); if(NULL == module) { continue; diff --git a/ompi/mca/bml/base/bml_base_open.c b/ompi/mca/bml/base/bml_base_open.c index ae5e2efb2b..df4ce39d7e 100644 --- a/ompi/mca/bml/base/bml_base_open.c +++ b/ompi/mca/bml/base/bml_base_open.c @@ -29,7 +29,8 @@ opal_list_t mca_bml_base_components_available; #if OMPI_ENABLE_DEBUG_RELIABILITY -double mca_bml_base_error_rate; +double mca_bml_base_error_rate_floor; +double mca_bml_base_error_rate_ceiling; int mca_bml_base_error_count; #endif @@ -45,11 +46,17 @@ int mca_bml_base_open( void ) { #if OMPI_ENABLE_DEBUG_RELIABILITY do { int param, value; - - mca_base_param_register_int("bml", NULL, "error_rate", "error_rate", 0); - param = mca_base_param_find("bml", NULL, "error_rate"); + + mca_base_param_register_int("bml", NULL, "error_rate_floor", "error_rate_floor", 0); + param = mca_base_param_find("bml", NULL, "error_rate_floor"); mca_base_param_lookup_int(param, &value); - mca_bml_base_error_rate = value; + mca_bml_base_error_rate_floor = value; + + mca_base_param_register_int("bml", NULL, "error_rate_ceiling", "error_rate_ceiling", 0); + param = mca_base_param_find("bml", NULL, "error_rate_ceiling"); + mca_base_param_lookup_int(param, &value); + mca_bml_base_error_rate_ceiling = value; + mca_base_param_register_int("bml", NULL, "srand", "srand", 1); param = mca_base_param_find("bml", NULL, "srand"); @@ -63,8 +70,9 @@ int mca_bml_base_open( void ) { } /* initialize count */ - if(mca_bml_base_error_rate > 0) { - mca_bml_base_error_count = (int) ((mca_bml_base_error_rate * rand())/(RAND_MAX+1.0)); + if(mca_bml_base_error_rate_ceiling > 0 + && mca_bml_base_error_rate_floor <= mca_bml_base_error_rate_ceiling) { + mca_bml_base_error_count = (int) ((mca_bml_base_error_rate_ceiling * rand())/(RAND_MAX+1.0)); } } while (0); #endif diff --git a/ompi/mca/bml/bml.h b/ompi/mca/bml/bml.h index 5605ab6435..660a187191 100644 --- a/ompi/mca/bml/bml.h +++ b/ompi/mca/bml/bml.h @@ -207,7 +207,7 @@ static inline mca_bml_base_btl_t* mca_bml_base_btl_array_find( /** * Structure associated w/ ompi_proc_t that contains the set - * of BTLs used to reach a destinationation + * of BTLs used to reach a destination */ struct mca_bml_base_endpoint_t { mca_pml_proc_t super; @@ -398,7 +398,8 @@ static inline void mca_bml_base_prepare_dst(mca_bml_base_btl_t* bml_btl, typedef struct mca_bml_base_module_t* (*mca_bml_base_component_init_fn_t)( int* priority, bool enable_progress_threads, - bool enable_mpi_threads + bool enable_mpi_threads, + opal_class_t* endpoint_class ); /** diff --git a/ompi/mca/bml/r2/bml_r2.c b/ompi/mca/bml/r2/bml_r2.c index 0e850e2e00..1b0d89600e 100644 --- a/ompi/mca/bml/r2/bml_r2.c +++ b/ompi/mca/bml/r2/bml_r2.c @@ -248,7 +248,8 @@ int mca_bml_r2_add_procs( /* allocate bml specific proc data */ - bml_endpoint = OBJ_NEW(mca_bml_base_endpoint_t); + bml_endpoint = (mca_bml_base_endpoint_t*) + opal_obj_new(mca_bml_r2.endpoint_class); if (NULL == bml_endpoint) { opal_output(0, "mca_bml_r2_add_procs: unable to allocate resources"); free(btl_endpoints); diff --git a/ompi/mca/bml/r2/bml_r2.h b/ompi/mca/bml/r2/bml_r2.h index ef1808b2f2..237256972a 100644 --- a/ompi/mca/bml/r2/bml_r2.h +++ b/ompi/mca/bml/r2/bml_r2.h @@ -52,7 +52,8 @@ struct mca_bml_r2_module_t { size_t num_btl_progress; mca_btl_base_component_progress_fn_t * btl_progress; mca_bml_r2_recv_reg_t r2_reg[256]; - bool btls_added; + bool btls_added; + opal_class_t * endpoint_class; }; typedef struct mca_bml_r2_module_t mca_bml_r2_module_t; @@ -65,7 +66,8 @@ extern int mca_bml_r2_component_close(void); extern mca_bml_base_module_t* mca_bml_r2_component_init( int* priority, bool enable_progress_threads, - bool enable_mpi_threads + bool enable_mpi_threads, + opal_class_t* endpoint_class ); extern int mca_bml_r2_progress(void); diff --git a/ompi/mca/bml/r2/bml_r2_component.c b/ompi/mca/bml/r2/bml_r2_component.c index db7ec420b9..3a6c2f84d4 100644 --- a/ompi/mca/bml/r2/bml_r2_component.c +++ b/ompi/mca/bml/r2/bml_r2_component.c @@ -83,7 +83,8 @@ int mca_bml_r2_component_close(void) mca_bml_base_module_t* mca_bml_r2_component_init( int* priority, bool enable_progress_threads, - bool enable_mpi_threads + bool enable_mpi_threads, + opal_class_t* endpoint_class ) { /* initialize BTLs */ @@ -93,5 +94,6 @@ mca_bml_base_module_t* mca_bml_r2_component_init( *priority = 100; mca_bml_r2.btls_added = false; + mca_bml_r2.endpoint_class = endpoint_class; return &mca_bml_r2.super; } diff --git a/ompi/mca/mpool/base/mpool_base_init.c b/ompi/mca/mpool/base/mpool_base_init.c index e4288f437e..d8494ed44c 100644 --- a/ompi/mca/mpool/base/mpool_base_init.c +++ b/ompi/mca/mpool/base/mpool_base_init.c @@ -31,8 +31,6 @@ OBJ_CLASS_INSTANCE(mca_mpool_base_selected_module_t, opal_list_item_t, NULL, NUL static bool mca_mpool_enable_progress_threads = true; static bool mca_mpool_enable_mpi_threads = true; -OBJ_CLASS_INSTANCE(mca_mpool_base_chunk_t, opal_list_item_t, NULL, NULL); - /** * Function for weeding out mpool modules that don't want to run. * diff --git a/ompi/mca/pml/dr/Makefile.am b/ompi/mca/pml/dr/Makefile.am index 04131ffed2..5b96c76a19 100644 --- a/ompi/mca/pml/dr/Makefile.am +++ b/ompi/mca/pml/dr/Makefile.am @@ -24,6 +24,8 @@ dr_sources = \ pml_dr_comm.h \ pml_dr_component.c \ pml_dr_component.h \ + pml_dr_endpoint.c \ + pml_dr_endpoint.h \ pml_dr_hdr.h \ pml_dr_iprobe.c \ pml_dr_irecv.c \ diff --git a/ompi/mca/pml/dr/pml_dr.c b/ompi/mca/pml/dr/pml_dr.c index 4f61b46650..2d222e4e5f 100644 --- a/ompi/mca/pml/dr/pml_dr.c +++ b/ompi/mca/pml/dr/pml_dr.c @@ -1,3 +1,4 @@ + /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology @@ -34,6 +35,7 @@ #include "pml_dr_sendreq.h" #include "pml_dr_recvreq.h" #include "ompi/mca/bml/base/base.h" +#include "orte/mca/ns/ns.h" mca_pml_dr_t mca_pml_dr = { { @@ -111,8 +113,9 @@ int mca_pml_dr_del_comm(ompi_communicator_t* comm) int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs) { ompi_bitmap_t reachable; - struct mca_bml_base_endpoint_t ** bml_endpoints = NULL; + struct mca_pml_dr_endpoint_t ** endpoints = NULL; int rc; + size_t i; if(nprocs == 0) return OMPI_SUCCESS; @@ -122,16 +125,16 @@ int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs) if(OMPI_SUCCESS != rc) return rc; - bml_endpoints = (struct mca_bml_base_endpoint_t **) malloc ( nprocs * - sizeof(struct mca_bml_base_endpoint_t*)); - if ( NULL == bml_endpoints ) { + endpoints = (struct mca_pml_dr_endpoint_t **) malloc ( nprocs * + sizeof(struct mca_pml_dr_endpoint_t*)); + if ( NULL == endpoints ) { return OMPI_ERR_OUT_OF_RESOURCE; } - + rc = mca_bml.bml_add_procs( nprocs, procs, - bml_endpoints, + (mca_bml_base_endpoint_t**) endpoints, &reachable ); if(OMPI_SUCCESS != rc) @@ -151,10 +154,28 @@ int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs) mca_pml_dr.free_list_max, mca_pml_dr.free_list_inc, NULL); - - if ( NULL != bml_endpoints ) { - free ( bml_endpoints) ; + for(i = 0; i < nprocs; i++) { + int idx; + /* this won't work for comm spawn and other dynamic + processes, but will work for initial job start */ + idx = ompi_pointer_array_add(&mca_pml_dr.procs, + (void*) endpoints[i]); + if(orte_ns.compare(ORTE_NS_CMP_ALL, + orte_process_info.my_name, + &endpoints[i]->base.super.proc_ompi->proc_name) == 0) { + mca_pml_dr.my_rank = idx; + } + endpoints[i]->local = endpoints[i]->dst = idx; } + + for(i = 0; i < nprocs; i++) { + endpoints[i]->src = mca_pml_dr.my_rank; + } + + /* no longer need this */ + if ( NULL != endpoints ) { + free ( endpoints) ; + } return rc; } diff --git a/ompi/mca/pml/dr/pml_dr.h b/ompi/mca/pml/dr/pml_dr.h index 00f97ddf55..98b712ac90 100644 --- a/ompi/mca/pml/dr/pml_dr.h +++ b/ompi/mca/pml/dr/pml_dr.h @@ -33,6 +33,7 @@ #include "ompi/mca/pml/base/pml_base_request.h" #include "ompi/mca/pml/base/pml_base_bsend.h" #include "ompi/mca/pml/base/pml_base_sendreq.h" +#include "ompi/class/ompi_pointer_array.h" #include "ompi/mca/btl/btl.h" #include "ompi/datatype/datatype.h" @@ -71,7 +72,13 @@ struct mca_pml_dr_t { ompi_free_list_t recv_frags; ompi_free_list_t vfrags; ompi_free_list_t buffers; - + + /* proc pointer array */ + ompi_pointer_array_t procs; + + /* my 'global' rank */ + int32_t my_rank; + int timer_wdog_sec; int timer_wdog_usec; int timer_wdog_multiplier; diff --git a/ompi/mca/pml/dr/pml_dr_comm.c b/ompi/mca/pml/dr/pml_dr_comm.c index 64ee03f988..1c4f367f40 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.c +++ b/ompi/mca/pml/dr/pml_dr_comm.c @@ -21,20 +21,17 @@ #include "pml_dr.h" #include "pml_dr_comm.h" - +#include "pml_dr_endpoint.h" static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc) { proc->expected_sequence = 1; - proc->vfrag_id = 1; proc->send_sequence = 0; OBJ_CONSTRUCT(&proc->frags_cant_match, opal_list_t); OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t); OBJ_CONSTRUCT(&proc->matched_receives, opal_list_t); OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t); - OBJ_CONSTRUCT(&proc->seq_sends, ompi_seq_tracker_t); - OBJ_CONSTRUCT(&proc->seq_recvs, ompi_seq_tracker_t); - OBJ_CONSTRUCT(&proc->seq_recvs_matched, ompi_seq_tracker_t); + } @@ -44,9 +41,6 @@ static void mca_pml_dr_comm_proc_destruct(mca_pml_dr_comm_proc_t* proc) OBJ_DESTRUCT(&proc->matched_receives); OBJ_DESTRUCT(&proc->specific_receives); OBJ_DESTRUCT(&proc->unexpected_frags); - OBJ_DESTRUCT(&proc->seq_sends); - OBJ_DESTRUCT(&proc->seq_recvs); - OBJ_DESTRUCT(&proc->seq_recvs_matched); } @@ -60,6 +54,7 @@ static void mca_pml_dr_comm_construct(mca_pml_dr_comm_t* comm) { OBJ_CONSTRUCT(&comm->wild_receives, opal_list_t); OBJ_CONSTRUCT(&comm->matching_lock, opal_mutex_t); + OBJ_CONSTRUCT(&comm->sparse_procs, ompi_pointer_array_t); comm->recv_sequence = 0; comm->procs = NULL; comm->num_procs = 0; @@ -69,12 +64,16 @@ static void mca_pml_dr_comm_construct(mca_pml_dr_comm_t* comm) static void mca_pml_dr_comm_destruct(mca_pml_dr_comm_t* comm) { size_t i; - for(i=0; inum_procs; i++) + for(i=0; inum_procs; i++) { OBJ_DESTRUCT((&comm->procs[i])); - if(NULL != comm->procs) + } + if(NULL != comm->procs) { free(comm->procs); + } + OBJ_DESTRUCT(&comm->wild_receives); OBJ_DESTRUCT(&comm->matching_lock); + OBJ_DESTRUCT(&comm->sparse_procs); } @@ -89,18 +88,30 @@ int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_c { size_t i; size_t size = ompi_comm->c_remote_group->grp_proc_count; - + /* send message sequence-number support - sender side */ dr_comm->procs = malloc(sizeof(mca_pml_dr_comm_proc_t)*size); if(NULL == dr_comm->procs) { return OMPI_ERR_OUT_OF_RESOURCE; } for(i=0; iprocs+i, mca_pml_dr_comm_proc_t); - dr_comm->procs[i].ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i]; + mca_pml_dr_comm_proc_t* proc; + mca_pml_dr_endpoint_t* ep; + ompi_proc_t* ompi_proc; + proc = dr_comm->procs+i; + OBJ_CONSTRUCT(proc, mca_pml_dr_comm_proc_t); + proc->comm_rank = i; + ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i]; + proc->ompi_proc = ompi_proc; + ep = (mca_pml_dr_endpoint_t*) ompi_proc->proc_pml; + ompi_pointer_array_set_item(&dr_comm->sparse_procs, + ep->dst, /* from our view this is the + peers source 'global rank' */ + proc); + proc->endpoint = ep; + } dr_comm->num_procs = size; - return OMPI_SUCCESS; } diff --git a/ompi/mca/pml/dr/pml_dr_comm.h b/ompi/mca/pml/dr/pml_dr_comm.h index bd31b7b633..65c22bafb9 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.h +++ b/ompi/mca/pml/dr/pml_dr_comm.h @@ -27,6 +27,7 @@ #include "ompi/class/ompi_seq_tracker.h" #include "ompi/communicator/communicator.h" #include "ompi/proc/proc.h" +#include "pml_dr_endpoint.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif @@ -35,7 +36,6 @@ extern "C" { struct mca_pml_dr_comm_proc_t { opal_object_t super; uint16_t expected_sequence; /**< send message sequence number - receiver side */ - uint32_t vfrag_id; /**< virtual fragment identifier */ #if OMPI_HAVE_THREAD_SUPPORT volatile int32_t send_sequence; /**< send side sequence number */ #else @@ -46,9 +46,8 @@ struct mca_pml_dr_comm_proc_t { opal_list_t unexpected_frags; /**< unexpected fragment queues */ opal_list_t matched_receives; /**< list of in-progress matched receives */ ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */ - ompi_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */ - ompi_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */ - ompi_seq_tracker_t seq_recvs_matched; /**< Tracks the received vfrags that have been matched */ + mca_pml_dr_endpoint_t* endpoint; /**< back pointer to the endpoint */ + int32_t comm_rank; /**< rank in the communicator */ }; typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t; @@ -65,6 +64,7 @@ struct mca_pml_comm_t { #endif opal_mutex_t matching_lock; /**< matching lock */ opal_list_t wild_receives; /**< queue of unmatched wild (source process not specified) receives */ + ompi_pointer_array_t sparse_procs; /**< sparse array, allows lookup of comm_proc using a global rank */ mca_pml_dr_comm_proc_t* procs; size_t num_procs; }; diff --git a/ompi/mca/pml/dr/pml_dr_component.c b/ompi/mca/pml/dr/pml_dr_component.c index 72f0cbab9b..0babda03d5 100644 --- a/ompi/mca/pml/dr/pml_dr_component.c +++ b/ompi/mca/pml/dr/pml_dr_component.c @@ -31,6 +31,7 @@ #include "pml_dr_sendreq.h" #include "pml_dr_recvreq.h" #include "pml_dr_recvfrag.h" +#include "pml_dr_endpoint.h" #include "ompi/mca/bml/base/base.h" @@ -154,7 +155,7 @@ int mca_pml_dr_component_open(void) OBJ_CONSTRUCT(&mca_pml_dr.send_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_dr.acks_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_dr.buffers, ompi_free_list_t); - + OBJ_CONSTRUCT(&mca_pml_dr.procs, ompi_pointer_array_t); OBJ_CONSTRUCT(&mca_pml_dr.lock, opal_mutex_t); mca_pml_dr.enabled = false; @@ -193,11 +194,17 @@ mca_pml_base_module_t* mca_pml_dr_component_init(int* priority, opal_output(0, "mca_pml_dr_component_init: mca_pml_bsend_init failed\n"); return NULL; } - - if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, enable_mpi_threads)) + + if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, + enable_mpi_threads, + OBJ_CLASS(mca_pml_dr_endpoint_t) + )) { return NULL; + } + mca_pml_dr.super.pml_progress = mca_bml.bml_progress; + return &mca_pml_dr.super; } diff --git a/ompi/mca/pml/dr/pml_dr_endpoint.c b/ompi/mca/pml/dr/pml_dr_endpoint.c new file mode 100644 index 0000000000..93c6246469 --- /dev/null +++ b/ompi/mca/pml/dr/pml_dr_endpoint.c @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "pml_dr.h" +#include "pml_dr_endpoint.h" + +static void mca_pml_dr_endpoint_construct(mca_pml_dr_endpoint_t* ep) +{ + OBJ_CONSTRUCT(&ep->seq_sends, ompi_seq_tracker_t); + OBJ_CONSTRUCT(&ep->seq_recvs, ompi_seq_tracker_t); + OBJ_CONSTRUCT(&ep->seq_recvs_matched, ompi_seq_tracker_t); +} + + +static void mca_pml_dr_endpoint_destruct(mca_pml_dr_endpoint_t* ep) +{ + OBJ_DESTRUCT(&ep->seq_sends); + OBJ_DESTRUCT(&ep->seq_recvs); + OBJ_DESTRUCT(&ep->seq_recvs_matched); +} + + +OBJ_CLASS_INSTANCE( + mca_pml_dr_endpoint_t, + mca_bml_base_endpoint_t, + mca_pml_dr_endpoint_construct, + mca_pml_dr_endpoint_destruct); diff --git a/ompi/mca/pml/dr/pml_dr_endpoint.h b/ompi/mca/pml/dr/pml_dr_endpoint.h new file mode 100644 index 0000000000..32e6e30ec5 --- /dev/null +++ b/ompi/mca/pml/dr/pml_dr_endpoint.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ +#ifndef MCA_PML_ENDPOINT_H +#define MCA_PML_ENDPOINT_H + +#include "ompi/mca/bml/bml.h" +#include "ompi/class/ompi_seq_tracker.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif +/** + * This is the pml level endpoint + * simply inherity the bml_base_endpoint and + * add whatever else is needed + */ +struct mca_pml_dr_endpoint_t { + mca_bml_base_endpoint_t base; + int32_t local; /* local view of the rank */ + int32_t src; /* peers view of the src rank */ + int32_t dst; /* peers destination rank */ + ompi_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */ + ompi_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */ + ompi_seq_tracker_t seq_recvs_matched; /**< Tracks the received vfrags that have been matched */ + uint32_t vfrag_seq; /**< current virtual fragment identifier sequence */ +}; +typedef struct mca_pml_dr_endpoint_t mca_pml_dr_endpoint_t; +OBJ_CLASS_DECLARATION(mca_pml_dr_endpoint_t); + + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif + diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index 469d7db103..ebc06af47b 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -40,13 +40,29 @@ #define MCA_PML_DR_HDR_VALIDATE(hdr, type) \ do { \ + ompi_communicator_t* comm = NULL; \ uint16_t csum = opal_csum(hdr, sizeof(type)); \ if(hdr->hdr_common.hdr_csum != csum) { \ OPAL_OUTPUT((0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n", \ __FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum)); \ return; \ } \ - if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) { \ + comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \ + if(NULL == comm ) { \ + mca_pml_dr_endpoint_t* ep; \ + OPAL_OUTPUT((0, "%s:%d: communicator not found\n", \ + __FILE__, __LINE__)); \ + ep = ompi_pointer_array_get_item(&mca_pml_dr.procs, \ + hdr->hdr_common.hdr_src); \ + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {\ + mca_pml_dr_recv_frag_ack(&ep->base, \ + &hdr->hdr_common, \ + hdr->hdr_match.hdr_src_ptr.pval, \ + 1, 0); \ + } \ + return; \ + } \ + if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) { \ OPAL_OUTPUT((0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n", \ __FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst)); \ return; \ @@ -54,7 +70,7 @@ do { } while (0) -#define MCA_PML_DR_COMM_PROC_LOOKUP(hdr, comm, proc) \ +#define MCA_PML_DR_COMM_PROC_LOOKUP(hdr, comm, proc, ep) \ do { \ ompi_communicator_t* comm_ptr=ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \ if(NULL == comm_ptr) { \ @@ -63,7 +79,8 @@ do { return; \ } \ comm = (mca_pml_dr_comm_t*)comm_ptr->c_pml_comm; \ - proc = comm->procs + hdr->hdr_common.hdr_src; \ + proc = ompi_pointer_array_get_item(&comm->sparse_procs, hdr->hdr_common.hdr_src); \ + ep = proc->endpoint; \ } while (0) @@ -110,6 +127,7 @@ void mca_pml_dr_recv_frag_callback( mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval; mca_pml_dr_comm_t *comm; mca_pml_dr_comm_proc_t *proc; + mca_pml_dr_endpoint_t *ep; if(segments->seg_len < sizeof(mca_pml_dr_common_hdr_t)) { return; } @@ -118,14 +136,14 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_MATCH: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_match_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); - + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); + /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__)); - mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml, + mca_pml_dr_recv_frag_ack(&ep->base, &hdr->hdr_common, hdr->hdr_match.hdr_src_ptr.pval, 1, 0); @@ -138,11 +156,11 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_MATCH_ACK: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack); } else { @@ -153,11 +171,11 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_RNDV: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_rendezvous_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)){ + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)){ /* ack only if the vfrag has been matched */ mca_pml_dr_recv_request_t* recvreq = mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid); @@ -167,9 +185,9 @@ void mca_pml_dr_recv_frag_callback( mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, hdr->hdr_match.hdr_src_ptr, recvreq->req_bytes_received, 1); } else { - if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs_matched, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) { OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n", __FILE__, __LINE__)); - mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*) proc->ompi_proc->proc_pml, + mca_pml_dr_recv_frag_ack(&ep->base, &hdr->hdr_common, hdr->hdr_match.hdr_src_ptr.pval, ~(uint64_t) 0, hdr->hdr_rndv.hdr_msg_length); @@ -186,11 +204,11 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_RNDV_ACK: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack); } else { @@ -202,14 +220,14 @@ void mca_pml_dr_recv_frag_callback( { mca_pml_dr_recv_request_t* recvreq; MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_frag_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__)); - mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml, + mca_pml_dr_recv_frag_ack(&ep->base, &hdr->hdr_common, hdr->hdr_frag.hdr_src_ptr.pval, ~(uint64_t) 0, 0); @@ -224,11 +242,11 @@ void mca_pml_dr_recv_frag_callback( case MCA_PML_DR_HDR_TYPE_FRAG_ACK: { MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t); - MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc); + MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep); /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack); } else { @@ -545,7 +563,8 @@ bool mca_pml_dr_recv_frag_match( ompi_proc_t* ompi_proc = proc->ompi_proc; int rc; uint32_t csum = OPAL_CSUM_ZERO; - + mca_pml_dr_endpoint_t* ep = (mca_pml_dr_endpoint_t*) proc->endpoint; + /* source sequence number */ frag_msg_seq = hdr->hdr_seq; @@ -668,7 +687,7 @@ rematch: opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag); } - ompi_seq_tracker_insert(&proc->seq_recvs, hdr->hdr_common.hdr_vid); + ompi_seq_tracker_insert(&ep->seq_recvs, hdr->hdr_common.hdr_vid); OPAL_THREAD_UNLOCK(&comm->matching_lock); /* release matching lock before processing fragment */ @@ -720,7 +739,7 @@ void mca_pml_dr_recv_frag_ack( ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type; ack->hdr_common.hdr_flags = 0; - ack->hdr_common.hdr_src = comm->c_my_rank; + ack->hdr_common.hdr_src = hdr->hdr_dst; ack->hdr_common.hdr_dst = hdr->hdr_src; ack->hdr_common.hdr_vid = hdr->hdr_vid; ack->hdr_common.hdr_ctx = hdr->hdr_ctx; @@ -828,7 +847,9 @@ rematch: * look only at "specific" receives, or "wild" receives, * or if we need to traverse both sets at the same time. */ - proc = comm->procs + hdr->hdr_common.hdr_src; + proc = ompi_pointer_array_get_item(&comm->sparse_procs, + hdr->hdr_common.hdr_src); + if (opal_list_get_size(&proc->specific_receives) == 0 ) { /* * There are only wild irecvs, so specialize the algorithm. @@ -868,7 +889,7 @@ rematch: * descriptor */ frag->request=match; match->req_proc = proc; - match->req_endpoint = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml; + match->req_endpoint = (mca_pml_dr_endpoint_t*)proc->ompi_proc->proc_pml; /* add this fragment descriptor to the list of * descriptors to be processed later diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index e8ef04de75..9ffe507f40 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -41,7 +41,7 @@ if(csum != hdr->hdr_match.hdr_csum) { \ } else { \ mca_pml_dr_recv_request_match_specific(recvreq); \ } \ - mca_pml_dr_recv_frag_ack(recvreq->req_endpoint, \ + mca_pml_dr_recv_frag_ack(&recvreq->req_endpoint->base, \ &hdr->hdr_common, \ hdr->hdr_match.hdr_src_ptr.pval, \ 0, 0); \ @@ -53,7 +53,7 @@ if(csum != hdr->hdr_match.hdr_csum) { \ bytes_received = bytes_delivered = 0; \ } else if (recvreq->req_acked == false) { \ mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, \ - hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \ + hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \ } @@ -92,8 +92,8 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i if( request->req_recv.req_base.req_peer == OMPI_ANY_SOURCE ) { opal_list_remove_item( &comm->wild_receives, (opal_list_item_t*)request ); } else { - mca_pml_dr_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer; - opal_list_remove_item(&proc->specific_receives, (opal_list_item_t*)request); + mca_pml_dr_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer; + opal_list_remove_item(&proc->specific_receives, (opal_list_item_t*)request); } } OPAL_THREAD_UNLOCK(&comm->matching_lock); @@ -167,7 +167,7 @@ void mca_pml_dr_recv_request_ack( int rc; /* allocate descriptor */ - bml_btl = mca_bml_base_btl_array_get_next(&recvreq->req_endpoint->btl_eager); + bml_btl = mca_bml_base_btl_array_get_next(&recvreq->req_endpoint->base.btl_eager); MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t)); if(NULL == des) { return; @@ -177,8 +177,8 @@ void mca_pml_dr_recv_request_ack( ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type; ack->hdr_common.hdr_flags = 0; - ack->hdr_common.hdr_dst = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;; - ack->hdr_common.hdr_src = recvreq->req_recv.req_base.req_comm->c_my_rank; + ack->hdr_common.hdr_dst = recvreq->req_endpoint->dst; + ack->hdr_common.hdr_src = recvreq->req_endpoint->src; ack->hdr_common.hdr_ctx = recvreq->req_recv.req_base.req_comm->c_contextid; ack->hdr_common.hdr_vid = hdr->hdr_vid; ack->hdr_vlen = vlen; @@ -255,7 +255,7 @@ void mca_pml_dr_recv_request_progress( bytes_received, bytes_delivered, csum); - MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received); + MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received); break; case MCA_PML_DR_HDR_TYPE_FRAG: @@ -290,7 +290,7 @@ void mca_pml_dr_recv_request_progress( if((vfrag->vf_pending & vfrag->vf_mask) == vfrag->vf_mask) { /* we have received all the pieces of the vfrag, ack everything that passed the checksum */ - ompi_seq_tracker_insert(&recvreq->req_proc->seq_recvs, vfrag->vf_id); + ompi_seq_tracker_insert(&recvreq->req_endpoint->seq_recvs, vfrag->vf_id); mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask); } @@ -347,7 +347,7 @@ void mca_pml_dr_recv_request_matched_probe( /* check completion status */ OPAL_THREAD_LOCK(&ompi_request_lock); recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag; - recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_common.hdr_src; + recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = recvreq->req_proc->comm_rank; recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed; recvreq->req_recv.req_base.req_pml_complete = true; recvreq->req_recv.req_base.req_ompi.req_complete = true; diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.h b/ompi/mca/pml/dr/pml_dr_recvreq.h index 8794f73583..b6a49b4f14 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.h +++ b/ompi/mca/pml/dr/pml_dr_recvreq.h @@ -44,7 +44,7 @@ struct mca_pml_dr_recv_request_t { /* filled in after match */ struct mca_pml_dr_comm_proc_t* req_proc; - struct mca_bml_base_endpoint_t* req_endpoint; + struct mca_pml_dr_endpoint_t* req_endpoint; opal_mutex_t* req_mutex; /* vfrag state */ @@ -220,13 +220,13 @@ do { do { \ (request)->req_mutex = &comm->matching_lock; \ (request)->req_proc = proc; \ - (request)->req_endpoint = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml; \ + (request)->req_endpoint = (mca_pml_dr_endpoint_t*)proc->ompi_proc->proc_pml; \ (request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \ - (request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = \ - (hdr)->hdr_common.hdr_src; \ + (request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = proc->comm_rank; \ (request)->req_vfrag0.vf_id = (hdr)->hdr_common.hdr_vid; \ opal_list_append(&proc->matched_receives, (opal_list_item_t*)request); \ - ompi_seq_tracker_insert(&proc->seq_recvs_matched, (hdr)->hdr_common.hdr_vid); \ + ompi_seq_tracker_insert(&request->req_endpoint->seq_recvs_matched, \ + (hdr)->hdr_common.hdr_vid); \ } while(0) diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index ed71b68360..b991d3e8bb 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -132,7 +132,7 @@ static void mca_pml_dr_match_completion( /* update statistics and complete */ sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed; - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); /* on negative ack need to retransmit */ @@ -187,7 +187,7 @@ static void mca_pml_dr_rndv_completion( } /* update statistics and complete */ - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } else { @@ -250,7 +250,7 @@ static void mca_pml_dr_frag_completion( } /* record vfrag id to drop duplicate acks */ - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); /* return this vfrag */ MCA_PML_DR_VFRAG_RETURN(vfrag); @@ -370,8 +370,8 @@ int mca_pml_dr_send_request_start_buffered( hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; @@ -452,9 +452,9 @@ int mca_pml_dr_send_request_start_copy( hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OPAL_CSUM_ZERO; @@ -521,9 +521,9 @@ int mca_pml_dr_send_request_start_prepare( hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OPAL_CSUM_ZERO; @@ -595,8 +595,8 @@ int mca_pml_dr_send_request_start_rndv( hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; @@ -640,7 +640,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) * the scheduling logic once for every call. */ - mca_bml_base_endpoint_t* bml_endpoint = sendreq->req_endpoint; + mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; assert(sendreq->req_vfrag0.vf_recv.pval != NULL); if(OPAL_THREAD_ADD32(&sendreq->req_lock,1) == 1) { do { @@ -662,7 +662,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) /* do we need to allocate a new vfrag (we scheduled all the vfrag already) */ if(vfrag->vf_size == bytes_sent) { - bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + bml_btl = mca_bml_base_btl_array_get_next(&endpoint->base.btl_send); MCA_PML_DR_VFRAG_ALLOC(vfrag,rc); if(NULL == vfrag) { OPAL_THREAD_LOCK(&mca_pml_dr.lock); @@ -670,7 +670,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); break; } - MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag); + MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,endpoint,bytes_remaining,vfrag); MCA_PML_DR_VFRAG_WDOG_START(vfrag); vfrag->bml_btl = bml_btl; bytes_sent = 0; @@ -711,9 +711,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; hdr->hdr_common.hdr_vid = vfrag->vf_id; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_vlen = vfrag->vf_len; hdr->hdr_frag_idx = vfrag->vf_idx; @@ -800,9 +800,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_csum = 0; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; + hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst; hdr->hdr_common.hdr_vid = vfrag->vf_id; - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; + hdr->hdr_common.hdr_src = sendreq->req_endpoint->src; hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_vlen = vfrag->vf_len; hdr->hdr_frag_idx = vfrag->vf_idx; @@ -875,7 +875,7 @@ void mca_pml_dr_send_request_match_ack( /* update statistics */ sendreq->req_bytes_delivered = vfrag->vf_size; - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } @@ -941,7 +941,7 @@ void mca_pml_dr_send_request_rndv_ack( } /* stash the vfrag id for duplicate acks.. */ - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); OPAL_THREAD_UNLOCK(&ompi_request_lock); if(schedule) { @@ -998,7 +998,7 @@ void mca_pml_dr_send_request_frag_ack( assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed); /* stash the vfid for duplicate acks.. */ - ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id); /* return vfrag */ MCA_PML_DR_VFRAG_RETURN(vfrag); diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index 19511c50cb..af769e6a25 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -33,6 +33,7 @@ #include "pml_dr_comm.h" #include "pml_dr_hdr.h" #include "pml_dr_vfrag.h" +#include "pml_dr_endpoint.h" #include "opal/event/event.h" #if defined(c_plusplus) || defined(__cplusplus) @@ -42,7 +43,7 @@ extern "C" { struct mca_pml_dr_send_request_t { mca_pml_base_send_request_t req_send; mca_pml_dr_comm_proc_t* req_proc; - mca_bml_base_endpoint_t* req_endpoint; + mca_pml_dr_endpoint_t* req_endpoint; #if OMPI_HAVE_THREAD_SUPPORT volatile int32_t req_state; volatile int32_t req_lock; @@ -143,9 +144,10 @@ do { #define MCA_PML_DR_SEND_REQUEST_START(sendreq, rc) \ do { \ mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \ - mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \ - mca_bml_base_endpoint_t* endpoint = \ - (mca_bml_base_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \ + mca_pml_dr_endpoint_t* endpoint = \ + (mca_pml_dr_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \ + mca_pml_dr_comm_proc_t* proc = \ + comm->procs + sendreq->req_send.req_base.req_peer; \ mca_bml_base_btl_t* bml_btl; \ size_t size = sendreq->req_send.req_bytes_packed; \ size_t eager_limit; \ @@ -154,9 +156,9 @@ do { break; \ } \ \ - bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ + bml_btl = mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \ MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \ - sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ + sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&endpoint->vfrag_seq,1); \ sendreq->req_vfrag = &sendreq->req_vfrag0; \ sendreq->req_endpoint = endpoint; \ sendreq->req_proc = proc; \ @@ -274,8 +276,8 @@ do { #define MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq, endpoint, size, vfrag) \ do { \ mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \ - mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \ - size_t max_send_size = endpoint->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t); \ + size_t max_send_size = endpoint->base.btl_max_send_size - \ + sizeof(mca_pml_dr_frag_hdr_t); \ size_t div = size / max_send_size; \ \ MCA_PML_DR_VFRAG_INIT(vfrag); \ @@ -301,7 +303,7 @@ do { else \ vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \ } \ - vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ + vfrag->vf_id = OPAL_THREAD_ADD32(&endpoint->vfrag_seq,1); \ vfrag->vf_offset = sendreq->req_send_offset; \ vfrag->vf_max_send_size = max_send_size; \ vfrag->vf_send.pval = sendreq; \ @@ -354,8 +356,9 @@ do { \ #define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \ do { \ - mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \ - mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ + mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; \ + mca_bml_base_btl_t* bml_btl = \ + mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \ mca_btl_base_descriptor_t *des_old, *des_new; \ \ if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \ @@ -384,8 +387,9 @@ do { \ #define MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag) \ do { \ - mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \ - mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ + mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; \ + mca_bml_base_btl_t* bml_btl = \ + mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \ mca_btl_base_descriptor_t *des_old, *des_new; \ mca_pml_dr_hdr_t *hdr; \ \ @@ -405,9 +409,9 @@ do { \ hdr = (mca_pml_dr_hdr_t*)des_new->des_src->seg_addr.pval; \ hdr->hdr_common.hdr_flags = 0; \ hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; \ - hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; \ + hdr->hdr_common.hdr_dst = endpoint->dst; \ hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \ - hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \ + hdr->hdr_common.hdr_src = endpoint->src; \ hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \ hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \ hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; \ diff --git a/ompi/mca/pml/ob1/pml_ob1_component.c b/ompi/mca/pml/ob1/pml_ob1_component.c index 2a27ce961e..44c535e45e 100644 --- a/ompi/mca/pml/ob1/pml_ob1_component.c +++ b/ompi/mca/pml/ob1/pml_ob1_component.c @@ -222,9 +222,11 @@ mca_pml_base_module_t* mca_pml_ob1_component_init(int* priority, } - if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, enable_mpi_threads)) + if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, + enable_mpi_threads, + OBJ_CLASS(mca_bml_base_endpoint_t))) { return NULL; - + } /* As our own progress function does nothing except calling the BML * progress, let's modify the progress function pointer in our structure * to avoid useless functions calls. The event library will instead call diff --git a/opal/class/opal_object.h b/opal/class/opal_object.h index 0993c2e190..f40883132e 100644 --- a/opal/class/opal_object.h +++ b/opal/class/opal_object.h @@ -163,6 +163,7 @@ struct opal_class_t { /**< array of parent class constructors */ opal_destruct_t *cls_destruct_array; /**< array of parent class destructors */ + size_t cls_sizeof; /**< size of an object instance */ }; /** @@ -207,7 +208,8 @@ struct opal_object_t { OBJ_CLASS(PARENT), \ (opal_construct_t) CONSTRUCTOR, \ (opal_destruct_t) DESTRUCTOR, \ - 0, 0, NULL, NULL \ + 0, 0, NULL, NULL, \ + sizeof(NAME) \ } @@ -229,20 +231,20 @@ struct opal_object_t { * @param type Type (class) of the object * @return Pointer to the object */ -static inline opal_object_t *opal_obj_new(size_t size, opal_class_t * cls); +static inline opal_object_t *opal_obj_new(opal_class_t * cls); #if OMPI_ENABLE_DEBUG -static inline opal_object_t *opal_obj_new_debug(size_t obj_size, opal_class_t* type, const char* file, int line) +static inline opal_object_t *opal_obj_new_debug(opal_class_t* type, const char* file, int line) { - opal_object_t* object = opal_obj_new(obj_size, type); + opal_object_t* object = opal_obj_new(type); object->cls_init_file_name = file; object->cls_init_lineno = line; return object; } #define OBJ_NEW(type) \ - ((type *)opal_obj_new_debug(sizeof(type), OBJ_CLASS(type), __FILE__, __LINE__)) + ((type *)opal_obj_new_debug(OBJ_CLASS(type), __FILE__, __LINE__)) #else #define OBJ_NEW(type) \ - ((type *) opal_obj_new(sizeof(type), OBJ_CLASS(type))) + ((type *) opal_obj_new(OBJ_CLASS(type))) #endif /* OMPI_ENABLE_DEBUG */ /** @@ -416,13 +418,12 @@ static inline void opal_obj_run_destructors(opal_object_t * object) * @param cls Pointer to the class descriptor of this object * @return Pointer to the object */ -static inline opal_object_t *opal_obj_new(size_t size, opal_class_t * cls) +static inline opal_object_t *opal_obj_new(opal_class_t * cls) { opal_object_t *object; + assert(cls->cls_sizeof >= sizeof(opal_object_t)); - assert(size >= sizeof(opal_object_t)); - - object = (opal_object_t *) malloc(size); + object = (opal_object_t *) malloc(cls->cls_sizeof); if (0 == cls->cls_initialized) { opal_class_initialize(cls); }