Merge pull request #1816 from hjelmn/request_perfm_regression
opal/sync: fix race condition
Этот коммит содержится в:
Коммит
955269b4f1
@ -15,6 +15,7 @@
|
|||||||
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
|
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
|
||||||
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
|
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
|
||||||
* reserved.
|
* reserved.
|
||||||
|
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
*
|
*
|
||||||
* Additional copyrights may follow
|
* Additional copyrights may follow
|
||||||
@ -116,7 +117,8 @@ int ompi_request_default_wait_any(size_t count,
|
|||||||
if (MPI_STATUS_IGNORE != status) {
|
if (MPI_STATUS_IGNORE != status) {
|
||||||
*status = ompi_status_empty;
|
*status = ompi_status_empty;
|
||||||
}
|
}
|
||||||
WAIT_SYNC_RELEASE(&sync);
|
/* No signal-in-flight can be in this case */
|
||||||
|
WAIT_SYNC_RELEASE_NOWAIT(&sync);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,6 +142,15 @@ int ompi_request_default_wait_any(size_t count,
|
|||||||
*index = i;
|
*index = i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if( *index == completed ){
|
||||||
|
/* Only one request has triggered. There was no
|
||||||
|
* in-flight completions.
|
||||||
|
* Drop the signalled flag so we won't block
|
||||||
|
* in WAIT_SYNC_RELEASE
|
||||||
|
*/
|
||||||
|
WAIT_SYNC_SIGNALLED(&sync);
|
||||||
|
}
|
||||||
|
|
||||||
request = requests[*index];
|
request = requests[*index];
|
||||||
assert( REQUEST_COMPLETE(request) );
|
assert( REQUEST_COMPLETE(request) );
|
||||||
@ -361,7 +372,8 @@ int ompi_request_default_wait_some(size_t count,
|
|||||||
ompi_request_t **rptr = NULL;
|
ompi_request_t **rptr = NULL;
|
||||||
ompi_request_t *request = NULL;
|
ompi_request_t *request = NULL;
|
||||||
ompi_wait_sync_t sync;
|
ompi_wait_sync_t sync;
|
||||||
|
size_t sync_sets = 0, sync_unsets = 0;
|
||||||
|
|
||||||
WAIT_SYNC_INIT(&sync, 1);
|
WAIT_SYNC_INIT(&sync, 1);
|
||||||
|
|
||||||
*outcount = 0;
|
*outcount = 0;
|
||||||
@ -386,10 +398,12 @@ int ompi_request_default_wait_some(size_t count,
|
|||||||
num_requests_done++;
|
num_requests_done++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sync_sets = count - num_requests_null_inactive - num_requests_done;
|
||||||
|
|
||||||
if(num_requests_null_inactive == count) {
|
if(num_requests_null_inactive == count) {
|
||||||
*outcount = MPI_UNDEFINED;
|
*outcount = MPI_UNDEFINED;
|
||||||
WAIT_SYNC_RELEASE(&sync);
|
/* nobody will signall us */
|
||||||
|
WAIT_SYNC_RELEASE_NOWAIT(&sync);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -420,6 +434,14 @@ int ompi_request_default_wait_some(size_t count,
|
|||||||
num_requests_done++;
|
num_requests_done++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sync_unsets = count - num_requests_null_inactive - num_requests_done;
|
||||||
|
|
||||||
|
if( sync_sets == sync_unsets ){
|
||||||
|
/* nobody knows about us,
|
||||||
|
* set signa-in-progress flag to false
|
||||||
|
*/
|
||||||
|
WAIT_SYNC_SIGNALLED(&sync);
|
||||||
|
}
|
||||||
|
|
||||||
WAIT_SYNC_RELEASE(&sync);
|
WAIT_SYNC_RELEASE(&sync);
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
* reserved.
|
* reserved.
|
||||||
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
|
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
|
||||||
* reserved.
|
* reserved.
|
||||||
|
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
*
|
*
|
||||||
* Additional copyrights may follow
|
* Additional copyrights may follow
|
||||||
@ -24,6 +25,7 @@ typedef struct ompi_wait_sync_t {
|
|||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
struct ompi_wait_sync_t *next;
|
struct ompi_wait_sync_t *next;
|
||||||
struct ompi_wait_sync_t *prev;
|
struct ompi_wait_sync_t *prev;
|
||||||
|
volatile bool signaling;
|
||||||
} ompi_wait_sync_t;
|
} ompi_wait_sync_t;
|
||||||
|
|
||||||
#define REQUEST_PENDING (void*)0L
|
#define REQUEST_PENDING (void*)0L
|
||||||
@ -31,19 +33,42 @@ typedef struct ompi_wait_sync_t {
|
|||||||
|
|
||||||
#define SYNC_WAIT(sync) (opal_using_threads() ? sync_wait_mt (sync) : sync_wait_st (sync))
|
#define SYNC_WAIT(sync) (opal_using_threads() ? sync_wait_mt (sync) : sync_wait_st (sync))
|
||||||
|
|
||||||
|
/* The loop in release handles a race condition between the signaling
|
||||||
|
* thread and the destruction of the condition variable. The signaling
|
||||||
|
* member will be set to false after the final signaling thread has
|
||||||
|
* finished opertating on the sync object. This is done to avoid
|
||||||
|
* extra atomics in the singalling function and keep it as fast
|
||||||
|
* as possible. Note that the race window is small so spinning here
|
||||||
|
* is more optimal than sleeping since this macro is called in
|
||||||
|
* the critical path. */
|
||||||
#define WAIT_SYNC_RELEASE(sync) \
|
#define WAIT_SYNC_RELEASE(sync) \
|
||||||
if (opal_using_threads()) { \
|
if (opal_using_threads()) { \
|
||||||
pthread_cond_destroy(&(sync)->condition); \
|
while ((sync)->signaling) { \
|
||||||
pthread_mutex_destroy(&(sync)->lock); \
|
continue; \
|
||||||
|
} \
|
||||||
|
pthread_cond_destroy(&(sync)->condition); \
|
||||||
|
pthread_mutex_destroy(&(sync)->lock); \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define WAIT_SYNC_RELEASE_NOWAIT(sync) \
|
||||||
|
if (opal_using_threads()) { \
|
||||||
|
pthread_cond_destroy(&(sync)->condition); \
|
||||||
|
pthread_mutex_destroy(&(sync)->lock); \
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#define WAIT_SYNC_SIGNAL(sync) \
|
#define WAIT_SYNC_SIGNAL(sync) \
|
||||||
if (opal_using_threads()) { \
|
if (opal_using_threads()) { \
|
||||||
pthread_mutex_lock(&(sync->lock)); \
|
pthread_mutex_lock(&(sync->lock)); \
|
||||||
pthread_cond_signal(&sync->condition); \
|
pthread_cond_signal(&sync->condition); \
|
||||||
pthread_mutex_unlock(&(sync->lock)); \
|
pthread_mutex_unlock(&(sync->lock)); \
|
||||||
|
sync->signaling = false; \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define WAIT_SYNC_SIGNALLED(sync){ \
|
||||||
|
(sync)->signaling = false; \
|
||||||
|
}
|
||||||
|
|
||||||
OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync);
|
OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync);
|
||||||
static inline int sync_wait_st (ompi_wait_sync_t *sync)
|
static inline int sync_wait_st (ompi_wait_sync_t *sync)
|
||||||
{
|
{
|
||||||
@ -61,6 +86,7 @@ static inline int sync_wait_st (ompi_wait_sync_t *sync)
|
|||||||
(sync)->next = NULL; \
|
(sync)->next = NULL; \
|
||||||
(sync)->prev = NULL; \
|
(sync)->prev = NULL; \
|
||||||
(sync)->status = 0; \
|
(sync)->status = 0; \
|
||||||
|
(sync)->signaling = true; \
|
||||||
if (opal_using_threads()) { \
|
if (opal_using_threads()) { \
|
||||||
pthread_cond_init (&(sync)->condition, NULL); \
|
pthread_cond_init (&(sync)->condition, NULL); \
|
||||||
pthread_mutex_init (&(sync)->lock, NULL); \
|
pthread_mutex_init (&(sync)->lock, NULL); \
|
||||||
@ -81,8 +107,9 @@ static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int sta
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* this is an error path so just use the atomic */
|
/* this is an error path so just use the atomic */
|
||||||
opal_atomic_swap_32 (&sync->count, 0);
|
|
||||||
sync->status = OPAL_ERROR;
|
sync->status = OPAL_ERROR;
|
||||||
|
opal_atomic_wmb ();
|
||||||
|
opal_atomic_swap_32 (&sync->count, 0);
|
||||||
}
|
}
|
||||||
WAIT_SYNC_SIGNAL(sync);
|
WAIT_SYNC_SIGNAL(sync);
|
||||||
}
|
}
|
||||||
|
Загрузка…
Ссылка в новой задаче
Block a user