1
1

* Make sure to free the callbacks array during finalize

* add MCA parameter (OMPI_MCA_mpi_yield_when_idle) to cause sched_yield()
  to be called when the progress engine is called and nothing happens.
  Default is to call sched_yield().
* add MCA parameter (OMPI_MCA_mpi_event_tick_rate) to adjust the rate
  at which the event library is called from ompi_progress.  When set
  to 0, the event library will never be ticked.  When set to 1, the
  event library will be progressed every time.  2 every other, etc.

The MCA parameters are only in effect from end of MPI_Init to start of
MPI_Finalize.

This commit was SVN r5099.
Этот коммит содержится в:
Brian Barrett 2005-03-30 01:40:26 +00:00
родитель e00782ed7d
Коммит 043e7682d2
7 изменённых файлов: 208 добавлений и 39 удалений

Просмотреть файл

@ -67,6 +67,9 @@ int ompi_mpi_finalize(void)
ompi_progress_events(OMPI_EVLOOP_NONBLOCK);
#endif
/* Change progress function priority back to RTE level stuff */
ompi_progress_mpi_finalize();
/* begin recording compound command */
/* if (OMPI_SUCCESS != (ret = orte_gpr.begin_compound_cmd())) {
return ret;

Просмотреть файл

@ -425,6 +425,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
return ret;
}
/* put the event library in "high performance MPI mode" */
if (OMPI_SUCCESS != ompi_progress_mpi_init()) {
error = "ompi_progress_mpi_init() failed";
goto error;
}
/* All done. Wasn't that simple? */
ompi_mpi_initialized = true;

Просмотреть файл

@ -68,6 +68,15 @@ int ompi_mpi_register_params(void)
*/
mca_base_param_register_string("mpi", NULL, "signal", NULL, NULL);
/*
* ompi_progress: decide whether to yield and the event library
* tick rate
*/
mca_base_param_register_int("mpi", NULL, "yield_when_idle", NULL, 1);
mca_base_param_register_int("mpi", NULL, "event_tick_rate", NULL, 1);
/* Whether or not to show MPI handle leaks */
show_leaks_param =

Просмотреть файл

@ -26,34 +26,117 @@
#include "include/constants.h"
static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE;
#if OMPI_HAVE_THREAD_SUPPORT
static ompi_lock_t progress_lock;
#endif /* OMPI_HAVE_THREAD_SUPPORT */
static ompi_progress_callback_t *callbacks = NULL;
static size_t callbacks_len = 0;
static size_t callbacks_size = 0;
static int call_yield = 1;
static long event_progress_counter = 0;
static long event_progress_counter_reset = 0;
int
ompi_progress_init(void)
{
/* reentrant issues */
#if OMPI_HAVE_THREAD_SUPPORT
ompi_atomic_init(&progress_lock, OMPI_ATOMIC_UNLOCKED);
#endif /* OMPI_HAVE_THREAD_SUPPORT */
/* always call sched yield when in the rte only... */
call_yield = 1;
event_progress_counter = event_progress_counter_reset = 0;
return OMPI_SUCCESS;
}
void ompi_progress_events(int flag)
int
ompi_progress_mpi_init(void)
{
int param, value;
/* call sched yield when oversubscribed. Should really set
* the default to something based on the RTE input
*/
param = mca_base_param_find("mpi", NULL, "yield_when_idle");
mca_base_param_lookup_int(param, &call_yield);
/* set the event tick rate */
param = mca_base_param_find("mpi", NULL, "event_tick_rate");
mca_base_param_lookup_int(param, &value);
if (value <= 0) {
event_progress_counter_reset = INT_MAX;
} else {
/* subtract one so that we can do post-fix subtraction
in the inner loop and go faster */
event_progress_counter_reset = value - 1;
}
event_progress_counter = event_progress_counter_reset;
return OMPI_SUCCESS;
}
int
ompi_progress_mpi_finalize(void)
{
/* always call sched yield from here on... */
call_yield = 1;
/* always tick the event library */
event_progress_counter = event_progress_counter_reset = 0;
return OMPI_SUCCESS;
}
int
ompi_progress_finalize(void)
{
/* don't need to free the progess lock */
/* free memory associated with the callbacks */
#if OMPI_HAVE_THREAD_SUPPORT
ompi_atomic_lock(&progress_lock);
#endif
free(callbacks);
callbacks = NULL;
callbacks_len = 0;
callbacks_size = 0;
#if OMPI_HAVE_THREAD_SUPPORT
ompi_atomic_unlock(&progress_lock);
#endif
return OMPI_SUCCESS;
}
void
ompi_progress_events(int flag)
{
ompi_progress_event_flag = flag;
}
void ompi_progress(void)
void
ompi_progress(void)
{
size_t i;
/* progress any outstanding communications */
int ret, events = 0;
#if OMPI_HAVE_THREAD_SUPPORT
/* NOTE: BWB - XXX - FIXME: Currently, there are some progress functions
(the event library, for one) that are not reentrant. It is not known
@ -70,7 +153,10 @@ void ompi_progress(void)
#endif
#if OMPI_ENABLE_PROGRESS_THREADS == 0
if (ompi_progress_event_flag != 0) {
/* trip the event library if we've reached our tick rate and we are
enabled */
if (event_progress_counter-- <= 0 && ompi_progress_event_flag != 0) {
event_progress_counter = event_progress_counter_reset;
ret = ompi_event_loop(ompi_progress_event_flag);
if (ret > 0) {
events += ret;
@ -78,6 +164,7 @@ void ompi_progress(void)
}
#endif
/* progress all registered callbacks */
for (i = 0 ; i < callbacks_len ; ++i) {
if (NULL != callbacks[i]) {
ret = (callbacks[i])();
@ -87,57 +174,46 @@ void ompi_progress(void)
}
}
#if 0 /* replaced by callback code */
ret = mca_pml.pml_progress();
if (ret > 0) {
events += ret;
}
/* Progress IO requests, if there are any */
ret = mca_io_base_request_progress();
if (ret > 0) {
events += ret;
}
#endif
#if OMPI_HAVE_THREAD_SUPPORT
/* release the lock before yielding, for obvious reasons */
ompi_atomic_unlock(&progress_lock);
#endif /* OMPI_HAVE_THREAD_SUPPORT */
#if 1
/* TSW - disable this until can validate that it doesn't impact SMP
* performance
*/
/*
* if there is nothing to do - yield the processor - otherwise
* we could consume the processor for the entire time slice. If
* the processor is oversubscribed - this will result in a best-case
* latency equivalent to the time-slice.
*/
if(events == 0) {
if(call_yield && events == 0) {
/*
* TSW - BWB - XXX - FIXME: this has a non-zero impact on
* performance. Evaluate reasonable defaults.
*
* If there is nothing to do - yield the processor - otherwise
* we could consume the processor for the entire time slice. If
* the processor is oversubscribed - this will result in a best-case
* latency equivalent to the time-slice.
*/
#ifndef WIN32
/* TODO: Find the windows equivalent for this */
sched_yield();
#endif
}
#endif
}
/*
* NOTE: This function is not in any way thread-safe. Do not allow
* multiple calls to ompi_progress_register and/or ompi_progress
* concurrently. This will be fixed in the near future.
*/
int
ompi_progress_register(ompi_progress_callback_t cb)
{
int ret = OMPI_SUCCESS;
#if OMPI_HAVE_THREAD_SUPPORT
ompi_atomic_lock(&progress_lock);
#endif
/* see if we need to allocate more space */
if (callbacks_len + 1 > callbacks_size) {
ompi_progress_callback_t *tmp;
tmp = realloc(callbacks, callbacks_size + 4);
if (tmp == NULL) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
if (tmp == NULL) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
callbacks = tmp;
callbacks_size += 4;
@ -145,20 +221,38 @@ ompi_progress_register(ompi_progress_callback_t cb)
callbacks[callbacks_len++] = cb;
return OMPI_SUCCESS;
cleanup:
#if OMPI_HAVE_THREAD_SUPPORT
ompi_atomic_unlock(&progress_lock);
#endif
return ret;
}
int
ompi_progress_unregister(ompi_progress_callback_t cb)
{
size_t i;
int ret = OMPI_ERR_NOT_FOUND;;
#if OMPI_HAVE_THREAD_SUPPORT
ompi_atomic_lock(&progress_lock);
#endif
for (i = 0 ; i < callbacks_len ; ++i) {
if (cb == callbacks[i]) {
callbacks[i] = NULL;
return OMPI_SUCCESS;
ret = OMPI_SUCCESS;
goto cleanup;
}
}
return OMPI_ERR_NOT_FOUND;
cleanup:
#if OMPI_HAVE_THREAD_SUPPORT
ompi_atomic_unlock(&progress_lock);
#endif
return ret;
}

Просмотреть файл

@ -20,18 +20,65 @@
extern "C" {
#endif
/**
* Initialize the progress engine
*
* Initialize the progress engine, including constructing the
* proper locks and allocating space for the progress registration
* functions. At this point, any function in the progress engine
* interface may be called.
*/
OMPI_DECLSPEC extern int ompi_progress_init(void);
/**
* Configure the progress engine for executing MPI applications
*
* Initialize and configrue the progress engine to optimize for MPI
* applications, or any application where low latency is critical.
* Will configure itself for lowest latency (look at the yield MCA
* parameter, adjust calling frequency into the event library, etc.)
*/
OMPI_DECLSPEC extern int ompi_progress_mpi_init(void);
/**
* Turn off all optimizations enabled by ompi_progress_mpi_init().
*/
OMPI_DECLSPEC extern int ompi_progress_mpi_finalize(void);
/**
* Shut down the progress engine
*
* Shut down the progress engine. This includes deregistering all
* registered callbacks and freeing all resources. After finalize
* returns, no calls into the progress interface are allowed.
*/
OMPI_DECLSPEC extern int ompi_progress_finalize(void);
/**
* Control how the event library is called
*/
OMPI_DECLSPEC extern void ompi_progress_events(int);
/**
* Progress all pending events
*/
OMPI_DECLSPEC extern void ompi_progress(void);
typedef int (*ompi_progress_callback_t)(void);
/**
* Register an event to be progressed
*/
OMPI_DECLSPEC int ompi_progress_register(ompi_progress_callback_t cb);
/**
* Unregister previously registered event
*/
OMPI_DECLSPEC int ompi_progress_unregister(ompi_progress_callback_t cb);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -53,6 +53,8 @@ int orte_finalize(void)
orte_gpr_base_close();
mca_oob_base_close();
ompi_progress_finalize();
ompi_event_fini();
#ifndef WIN32

Просмотреть файл

@ -123,6 +123,14 @@ int orte_init_stage1(void)
return ret;
}
/*
* Intialize the general progress engine
*/
if (OMPI_SUCCESS != (ret = ompi_progress_init())) {
ORTE_ERROR_LOG(ret);
return ret;
}
/*
* Internal startup
*/