From 8b40c0de9befc80812ffd9d4e69ed9204e491a98 Mon Sep 17 00:00:00 2001 From: Brian Barrett Date: Fri, 26 Oct 2012 15:36:09 +0000 Subject: [PATCH] * Lock around tag management, so that it's thread safe * Only register the progress function on first call to a non-blocking collective operation, to try to reduce overall performance impact * Fix tag management in roll-over case This commit was SVN r27498. --- ompi/mca/coll/libnbc/coll_libnbc.h | 4 +++ ompi/mca/coll/libnbc/coll_libnbc_component.c | 26 ++++++++++----- ompi/mca/coll/libnbc/nbc.c | 35 ++++++++++++++------ 3 files changed, 46 insertions(+), 19 deletions(-) diff --git a/ompi/mca/coll/libnbc/coll_libnbc.h b/ompi/mca/coll/libnbc/coll_libnbc.h index 6c370ec3e8..553f184095 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc.h +++ b/ompi/mca/coll/libnbc/coll_libnbc.h @@ -72,6 +72,8 @@ OMPI_MODULE_DECLSPEC extern ompi_coll_libnbc_component_t mca_coll_libnbc_compone struct ompi_coll_libnbc_module_t { mca_coll_base_module_t super; + opal_mutex_t mutex; + bool comm_registered; int tag; #ifdef NBC_CACHE_SCHEDULE void *NBC_Dict[NBC_NUM_COLL]; /* this should point to a struct @@ -129,6 +131,8 @@ typedef ompi_coll_libnbc_request_t NBC_Handle; (ompi_free_list_item_t*) req); \ } while (0) +int ompi_coll_libnbc_progress(void); + int NBC_Init_comm(MPI_Comm comm, ompi_coll_libnbc_module_t *module); int NBC_Progress(NBC_Handle *handle); diff --git a/ompi/mca/coll/libnbc/coll_libnbc_component.c b/ompi/mca/coll/libnbc/coll_libnbc_component.c index 7a81afe937..e9a9f08b72 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc_component.c +++ b/ompi/mca/coll/libnbc/coll_libnbc_component.c @@ -41,7 +41,7 @@ static int libnbc_register(void); static int libnbc_init_query(bool, bool); static mca_coll_base_module_t *libnbc_comm_query(struct ompi_communicator_t *, int *); static int libnbc_module_enable(mca_coll_base_module_t *, struct ompi_communicator_t *); -static int libnbc_progress(void); + /* * Instantiate the public struct with all of our public information @@ -95,6 +95,8 @@ libnbc_open(void) if (OMPI_SUCCESS != ret) return ret; OBJ_CONSTRUCT(&mca_coll_libnbc_component.active_requests, opal_list_t); + /* note: active comms is the number of communicators who have had + a non-blocking collective started */ mca_coll_libnbc_component.active_comms = 0; opal_atomic_init(&mca_coll_libnbc_component.progress_lock, OPAL_ATOMIC_UNLOCKED); @@ -106,7 +108,7 @@ static int libnbc_close(void) { if (0 != mca_coll_libnbc_component.active_comms) { - opal_progress_unregister(libnbc_progress); + opal_progress_unregister(ompi_coll_libnbc_progress); } OBJ_DESTRUCT(&mca_coll_libnbc_component.requests); @@ -219,15 +221,12 @@ libnbc_module_enable(mca_coll_base_module_t *module, struct ompi_communicator_t *comm) { /* All done */ - if (0 == mca_coll_libnbc_component.active_comms++) { - opal_progress_register(libnbc_progress); - } return OMPI_SUCCESS; } -static int -libnbc_progress(void) +int +ompi_coll_libnbc_progress(void) { opal_list_item_t *item; @@ -259,14 +258,23 @@ libnbc_progress(void) static void libnbc_module_construct(ompi_coll_libnbc_module_t *module) { + OBJ_CONSTRUCT(&module->mutex, opal_mutex_t); + module->comm_registered = false; } static void libnbc_module_destruct(ompi_coll_libnbc_module_t *module) { - if (0 == --mca_coll_libnbc_component.active_comms) { - opal_progress_unregister(libnbc_progress); + OBJ_DESTRUCT(&module->mutex); + + /* if we ever were used for a collective op, do the progress cleanup. */ + if (true == module->comm_registered) { + uint64_t tmp = + OPAL_THREAD_ADD32(&mca_coll_libnbc_component.active_comms, -1); + if (0 == tmp) { + opal_progress_unregister(ompi_coll_libnbc_progress); + } } } diff --git a/ompi/mca/coll/libnbc/nbc.c b/ompi/mca/coll/libnbc/nbc.c index 7ce9ea1061..3310053ca8 100644 --- a/ompi/mca/coll/libnbc/nbc.c +++ b/ompi/mca/coll/libnbc/nbc.c @@ -521,7 +521,8 @@ error: int NBC_Init_handle(struct ompi_communicator_t *comm, ompi_coll_libnbc_request_t **request, ompi_coll_libnbc_module_t *comminfo) { - int res; + int res, tmp_tag; + bool need_register = false; ompi_coll_libnbc_request_t *handle; OMPI_COLL_LIBNBC_REQUEST_ALLOC(comm, handle, res); @@ -538,18 +539,32 @@ int NBC_Init_handle(struct ompi_communicator_t *comm, ompi_coll_libnbc_request_t /******************** Do the tag and shadow comm administration ... ***************/ - /* we found it */ - comminfo->tag--; + OPAL_THREAD_LOCK(&comminfo->mutex); + tmp_tag = comminfo->tag--; + if (tmp_tag == (-1 * mca_pml.pml_max_tag)) { + tmp_tag = comminfo->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; + NBC_DEBUG(2,"resetting tags ...\n"); + } + + if (true != comminfo->comm_registered) { + comminfo->comm_registered = true; + need_register = true; + } + OPAL_THREAD_UNLOCK(&comminfo->mutex); + handle->tag=comminfo->tag; + + /* register progress */ + if (need_register) { + uint32_t tmp = + OPAL_THREAD_ADD32(&mca_coll_libnbc_component.active_comms, 1); + if (tmp == 1) { + opal_progress_register(ompi_coll_libnbc_progress); + } + } + handle->comm=comm; /*printf("got comminfo: %lu tag: %i\n", comminfo, comminfo->tag);*/ - - /* reset counter ... */ - if(handle->tag == -32767) { - handle->tag=1; - comminfo->tag=1; - NBC_DEBUG(2,"resetting tags ...\n"); - } /******************** end of tag and shadow comm administration ... ***************/ handle->comminfo = comminfo;