From 58413fa1e44e652d5a309161ccd39877d228af65 Mon Sep 17 00:00:00 2001 From: Brian Barrett Date: Fri, 6 Jul 2012 13:59:03 +0000 Subject: [PATCH] * properly setup communication infrastructure for libnbc. * Prevent infinite recursion in progress loop. Should fix improper barrier eugene was seeing. This commit was SVN r26758. --- ompi/mca/coll/libnbc/coll_libnbc.h | 5 +++-- ompi/mca/coll/libnbc/coll_libnbc_component.c | 13 ++++++++++++- ompi/mca/coll/libnbc/nbc.c | 19 ++----------------- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/ompi/mca/coll/libnbc/coll_libnbc.h b/ompi/mca/coll/libnbc/coll_libnbc.h index b5e624aa1a..6c370ec3e8 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc.h +++ b/ompi/mca/coll/libnbc/coll_libnbc.h @@ -22,6 +22,7 @@ #include "ompi/mca/coll/coll.h" #include "ompi/request/request.h" +#include "opal/sys/atomic.h" BEGIN_C_DECLS @@ -62,6 +63,7 @@ struct ompi_coll_libnbc_component_t { ompi_free_list_t requests; opal_list_t active_requests; uint32_t active_comms; + opal_atomic_lock_t progress_lock; }; typedef struct ompi_coll_libnbc_component_t ompi_coll_libnbc_component_t; @@ -95,8 +97,7 @@ struct ompi_coll_libnbc_request_t { long row_offset; int tag; volatile int req_count; - /*ompi_request_t **req_array;*/ - MPI_Request *req_array; + ompi_request_t **req_array; NBC_Comminfo *comminfo; volatile NBC_Schedule *schedule; void *tmpbuf; /* temporary buffer e.g. used for Reduce */ diff --git a/ompi/mca/coll/libnbc/coll_libnbc_component.c b/ompi/mca/coll/libnbc/coll_libnbc_component.c index a804ba2a56..7a81afe937 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc_component.c +++ b/ompi/mca/coll/libnbc/coll_libnbc_component.c @@ -97,6 +97,8 @@ libnbc_open(void) OBJ_CONSTRUCT(&mca_coll_libnbc_component.active_requests, opal_list_t); mca_coll_libnbc_component.active_comms = 0; + opal_atomic_init(&mca_coll_libnbc_component.progress_lock, OPAL_ATOMIC_UNLOCKED); + return OMPI_SUCCESS; } @@ -229,18 +231,27 @@ libnbc_progress(void) { opal_list_item_t *item; + if (opal_atomic_trylock(&mca_coll_libnbc_component.progress_lock)) return 0; + for (item = opal_list_get_first(&mca_coll_libnbc_component.active_requests) ; item != opal_list_get_end(&mca_coll_libnbc_component.active_requests) ; item = opal_list_get_next(item)) { ompi_coll_libnbc_request_t* request = (ompi_coll_libnbc_request_t*) item; if (NBC_OK == NBC_Progress(request)) { - /* done, remove */ + /* done, remove and complete */ item = opal_list_remove_item(&mca_coll_libnbc_component.active_requests, &request->super.super.super); + + request->super.req_status.MPI_ERROR = OMPI_SUCCESS; + OPAL_THREAD_LOCK(&ompi_request_lock); + ompi_request_complete(&request->super, true); + OPAL_THREAD_UNLOCK(&ompi_request_lock); } item = opal_list_get_next(item); } + opal_atomic_unlock(&mca_coll_libnbc_component.progress_lock); + return 0; } diff --git a/ompi/mca/coll/libnbc/nbc.c b/ompi/mca/coll/libnbc/nbc.c index 4c0da6ac31..8ed99f5509 100644 --- a/ompi/mca/coll/libnbc/nbc.c +++ b/ompi/mca/coll/libnbc/nbc.c @@ -11,6 +11,7 @@ #include "nbc_internal.h" #include "ompi/mca/coll/base/coll_tags.h" #include "ompi/op/op.h" +#include "ompi/mca/pml/pml.h" /* only used in this file */ static inline int NBC_Start_round(NBC_Handle *handle); @@ -274,11 +275,6 @@ static inline int NBC_Free(NBC_Handle* handle) { handle->tmpbuf = NULL; } - handle->super.req_status.MPI_ERROR = OMPI_SUCCESS; - OPAL_THREAD_LOCK(&ompi_request_lock); - ompi_request_complete(&handle->super, true); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - return NBC_OK; } @@ -298,11 +294,8 @@ int NBC_Progress(NBC_Handle *handle) { #ifdef NBC_TIMING Test_time -= MPI_Wtime(); #endif -#ifdef HAVE_OMPI res = ompi_request_test_all(handle->req_count, handle->req_array, &flag, MPI_STATUSES_IGNORE); - /* res = MPI_Testall(handle->req_count, handle->req_array, &flag, MPI_STATUSES_IGNORE); */ if(res != OMPI_SUCCESS) { printf("MPI Error in MPI_Testall() (%i)\n", res); ret=res; goto error; } -#endif #ifdef NBC_TIMING Test_time += MPI_Wtime(); #endif @@ -393,14 +386,10 @@ static inline int NBC_Start_round(NBC_Handle *handle) { #ifdef NBC_TIMING Isend_time -= MPI_Wtime(); #endif -#ifdef HAVE_OMPI handle->req_array = (MPI_Request*)realloc((void*)handle->req_array, (handle->req_count)*sizeof(MPI_Request)); NBC_CHECK_NULL(handle->req_array); - res = MCA_PML_CALL(isend_init(buf1, sendargs->count, sendargs->datatype, sendargs->dest, handle->tag, MCA_PML_BASE_SEND_STANDARD, handle->comm, handle->req_array+handle->req_count-1)); - /*printf("MPI_Isend(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, sendargs->count, (unsigned long)sendargs->datatype, sendargs->dest, handle->tag, (unsigned long)handle->comm, res);*/ - /* res = MPI_Isend(buf1, sendargs->count, sendargs->datatype, sendargs->dest, handle->tag, handle->comm, handle->req_array+handle->req_count-1); */ + res = MCA_PML_CALL(isend(buf1, sendargs->count, sendargs->datatype, sendargs->dest, handle->tag, MCA_PML_BASE_SEND_STANDARD, handle->comm, handle->req_array+handle->req_count-1)); if(OMPI_SUCCESS != res) { printf("Error in MPI_Isend(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, sendargs->count, (unsigned long)sendargs->datatype, sendargs->dest, handle->tag, (unsigned long)handle->comm, res); ret=res; goto error; } -#endif #ifdef NBC_TIMING Isend_time += MPI_Wtime(); #endif @@ -421,14 +410,10 @@ static inline int NBC_Start_round(NBC_Handle *handle) { #ifdef NBC_TIMING Irecv_time -= MPI_Wtime(); #endif -#ifdef HAVE_OMPI handle->req_array = (MPI_Request*)realloc((void*)handle->req_array, (handle->req_count)*sizeof(MPI_Request)); NBC_CHECK_NULL(handle->req_array); res = MCA_PML_CALL(irecv(buf1, recvargs->count, recvargs->datatype, recvargs->source, handle->tag, handle->comm, handle->req_array+handle->req_count-1)); - /*printf("MPI_Irecv(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, recvargs->count, (unsigned long)recvargs->datatype, recvargs->source, handle->tag, (unsigned long)handle->comm, res); */ - /*res = MPI_Irecv(buf1, recvargs->count, recvargs->datatype, recvargs->source, handle->tag, handle->comm, handle->req_array+handle->req_count-1); */ if(OMPI_SUCCESS != res) { printf("Error in MPI_Irecv(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, recvargs->count, (unsigned long)recvargs->datatype, recvargs->source, handle->tag, (unsigned long)handle->comm, res); ret=res; goto error; } -#endif #ifdef NBC_TIMING Irecv_time += MPI_Wtime(); #endif