diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index d559c3ad6b..5ab2b39648 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -15,6 +15,7 @@ * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2016 Los Alamos National Security, LLC. All rights * reserved. + * Copyright (c) 2016 Mellanox Technologies. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -116,7 +117,8 @@ int ompi_request_default_wait_any(size_t count, if (MPI_STATUS_IGNORE != status) { *status = ompi_status_empty; } - WAIT_SYNC_RELEASE(&sync); + /* No signal-in-flight can be in this case */ + WAIT_SYNC_RELEASE_NOWAIT(&sync); return rc; } @@ -140,6 +142,15 @@ int ompi_request_default_wait_any(size_t count, *index = i; } } + + if( *index == completed ){ + /* Only one request has triggered. There was no + * in-flight completions. + * Drop the signalled flag so we won't block + * in WAIT_SYNC_RELEASE + */ + WAIT_SYNC_SIGNALLED(&sync); + } request = requests[*index]; assert( REQUEST_COMPLETE(request) ); @@ -361,7 +372,8 @@ int ompi_request_default_wait_some(size_t count, ompi_request_t **rptr = NULL; ompi_request_t *request = NULL; ompi_wait_sync_t sync; - + size_t sync_sets = 0, sync_unsets = 0; + WAIT_SYNC_INIT(&sync, 1); *outcount = 0; @@ -386,10 +398,12 @@ int ompi_request_default_wait_some(size_t count, num_requests_done++; } } + sync_sets = count - num_requests_null_inactive - num_requests_done; if(num_requests_null_inactive == count) { *outcount = MPI_UNDEFINED; - WAIT_SYNC_RELEASE(&sync); + /* nobody will signall us */ + WAIT_SYNC_RELEASE_NOWAIT(&sync); return rc; } @@ -420,6 +434,14 @@ int ompi_request_default_wait_some(size_t count, num_requests_done++; } } + sync_unsets = count - num_requests_null_inactive - num_requests_done; + + if( sync_sets == sync_unsets ){ + /* nobody knows about us, + * set signa-in-progress flag to false + */ + WAIT_SYNC_SIGNALLED(&sync); + } WAIT_SYNC_RELEASE(&sync); diff --git a/opal/threads/wait_sync.h b/opal/threads/wait_sync.h index b29f01c474..afc4bceb84 100644 --- a/opal/threads/wait_sync.h +++ b/opal/threads/wait_sync.h @@ -5,6 +5,7 @@ * reserved. * Copyright (c) 2016 Los Alamos National Security, LLC. All rights * reserved. + * Copyright (c) 2016 Mellanox Technologies. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -24,6 +25,7 @@ typedef struct ompi_wait_sync_t { pthread_mutex_t lock; struct ompi_wait_sync_t *next; struct ompi_wait_sync_t *prev; + volatile bool signaling; } ompi_wait_sync_t; #define REQUEST_PENDING (void*)0L @@ -31,19 +33,42 @@ typedef struct ompi_wait_sync_t { #define SYNC_WAIT(sync) (opal_using_threads() ? sync_wait_mt (sync) : sync_wait_st (sync)) +/* The loop in release handles a race condition between the signaling + * thread and the destruction of the condition variable. The signaling + * member will be set to false after the final signaling thread has + * finished opertating on the sync object. This is done to avoid + * extra atomics in the singalling function and keep it as fast + * as possible. Note that the race window is small so spinning here + * is more optimal than sleeping since this macro is called in + * the critical path. */ #define WAIT_SYNC_RELEASE(sync) \ if (opal_using_threads()) { \ - pthread_cond_destroy(&(sync)->condition); \ - pthread_mutex_destroy(&(sync)->lock); \ + while ((sync)->signaling) { \ + continue; \ + } \ + pthread_cond_destroy(&(sync)->condition); \ + pthread_mutex_destroy(&(sync)->lock); \ } +#define WAIT_SYNC_RELEASE_NOWAIT(sync) \ + if (opal_using_threads()) { \ + pthread_cond_destroy(&(sync)->condition); \ + pthread_mutex_destroy(&(sync)->lock); \ + } + + #define WAIT_SYNC_SIGNAL(sync) \ if (opal_using_threads()) { \ pthread_mutex_lock(&(sync->lock)); \ pthread_cond_signal(&sync->condition); \ pthread_mutex_unlock(&(sync->lock)); \ + sync->signaling = false; \ } +#define WAIT_SYNC_SIGNALLED(sync){ \ + (sync)->signaling = false; \ +} + OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync); static inline int sync_wait_st (ompi_wait_sync_t *sync) { @@ -61,6 +86,7 @@ static inline int sync_wait_st (ompi_wait_sync_t *sync) (sync)->next = NULL; \ (sync)->prev = NULL; \ (sync)->status = 0; \ + (sync)->signaling = true; \ if (opal_using_threads()) { \ pthread_cond_init (&(sync)->condition, NULL); \ pthread_mutex_init (&(sync)->lock, NULL); \ @@ -81,8 +107,9 @@ static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int sta } } else { /* this is an error path so just use the atomic */ - opal_atomic_swap_32 (&sync->count, 0); sync->status = OPAL_ERROR; + opal_atomic_wmb (); + opal_atomic_swap_32 (&sync->count, 0); } WAIT_SYNC_SIGNAL(sync); }