1
1

* Default to only progressing the event library every 100 calls to

ompi_progress, unless someone is actually using it (MPI-2 dynamic,
  TCP PTL).  This is only for end of MPI_Init to start of MPI_Finalize. 
  All other times, the event library will be progressed every call
  into ompi_progress().

This commit was SVN r5479.
Этот коммит содержится в:
Brian Barrett 2005-04-21 14:58:25 +00:00
родитель ea0e1de399
Коммит 591b8eebda
4 изменённых файлов: 104 добавлений и 26 удалений

Просмотреть файл

@ -84,6 +84,11 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
if (NULL == nbuf) { if (NULL == nbuf) {
return OMPI_ERROR; return OMPI_ERROR;
} }
/* tell the progress engine to tick the event library more
often, to make sure that the OOB messages get sent */
ompi_progress_event_increment();
if (ORTE_SUCCESS != (rc = orte_dps.pack(nbuf, &size, 1, ORTE_INT))) { if (ORTE_SUCCESS != (rc = orte_dps.pack(nbuf, &size, 1, ORTE_INT))) {
goto exit; goto exit;
} }
@ -210,6 +215,10 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
exit: exit:
/* done with OOB and such - slow our tick rate again */
ompi_progress();
ompi_progress_event_decrement();
if ( NULL != rprocs ) { if ( NULL != rprocs ) {
free ( rprocs ); free ( rprocs );
@ -318,6 +327,8 @@ ompi_comm_start_processes(int count, char **array_of_commands,
- "soft": see page 92 of MPI-2. - "soft": see page 92 of MPI-2.
*/ */
/* make sure the progress engine properly trips the event library */
ompi_progress_event_increment();
/* Convert the list of commands to an array of orte_app_context_t /* Convert the list of commands to an array of orte_app_context_t
pointers */ pointers */
@ -332,6 +343,7 @@ ompi_comm_start_processes(int count, char **array_of_commands,
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
/* rollback what was already done */ /* rollback what was already done */
for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); for (j=0; j < i; j++) OBJ_RELEASE(apps[j]);
ompi_progress_event_decrement();
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
/* copy over the name of the executable */ /* copy over the name of the executable */
@ -340,6 +352,7 @@ ompi_comm_start_processes(int count, char **array_of_commands,
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
/* rollback what was already done */ /* rollback what was already done */
for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); for (j=0; j < i; j++) OBJ_RELEASE(apps[j]);
ompi_progress_event_decrement();
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
/* record the number of procs to be generated */ /* record the number of procs to be generated */
@ -366,6 +379,7 @@ ompi_comm_start_processes(int count, char **array_of_commands,
for (j=0; j < i; j++) { for (j=0; j < i; j++) {
OBJ_RELEASE(apps[j]); OBJ_RELEASE(apps[j]);
} }
ompi_progress_event_decrement();
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
apps[i]->argv[0] = strdup(array_of_commands[i]); apps[i]->argv[0] = strdup(array_of_commands[i]);
@ -388,6 +402,7 @@ ompi_comm_start_processes(int count, char **array_of_commands,
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
/* rollback what was already done */ /* rollback what was already done */
for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); for (j=0; j < i; j++) OBJ_RELEASE(apps[j]);
ompi_progress_event_decrement();
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
asprintf(&(apps[i]->env[0]), "OMPI_PARENT_PORT=%s", port_name); asprintf(&(apps[i]->env[0]), "OMPI_PARENT_PORT=%s", port_name);
@ -419,10 +434,12 @@ ompi_comm_start_processes(int count, char **array_of_commands,
if (ORTE_SUCCESS != (rc = orte_rmgr.spawn(apps, count, &new_jobid, if (ORTE_SUCCESS != (rc = orte_rmgr.spawn(apps, count, &new_jobid,
NULL))) { NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return MPI_ERR_SPAWN; ompi_progress_event_decrement();
return MPI_ERR_SPAWN;
} }
/* clean up */ /* clean up */
ompi_progress_event_decrement();
for ( i=0; i<count; i++) { for ( i=0; i<count; i++) {
OBJ_RELEASE(apps[i]); OBJ_RELEASE(apps[i]);
} }

Просмотреть файл

@ -129,6 +129,10 @@ int mca_ptl_tcp_add_procs(
OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock); OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock);
peers[i] = ptl_peer; peers[i] = ptl_peer;
ompi_list_append(&ptl_tcp->ptl_peers, (ompi_list_item_t*)ptl_peer); ompi_list_append(&ptl_tcp->ptl_peers, (ompi_list_item_t*)ptl_peer);
/* we increase the count of MPI users of the event library
once per peer, so that we are used until we aren't
connected to a peer */
ompi_progress_event_increment();
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -146,6 +150,7 @@ int mca_ptl_tcp_del_procs(struct mca_ptl_base_module_t* ptl, size_t nprocs, stru
for(i=0; i<nprocs; i++) { for(i=0; i<nprocs; i++) {
ompi_list_remove_item(&ptl_tcp->ptl_peers, (ompi_list_item_t*)peers[i]); ompi_list_remove_item(&ptl_tcp->ptl_peers, (ompi_list_item_t*)peers[i]);
OBJ_RELEASE(peers[i]); OBJ_RELEASE(peers[i]);
ompi_progress_event_decrement();
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -163,6 +168,7 @@ int mca_ptl_tcp_finalize(struct mca_ptl_base_module_t* ptl)
item = ompi_list_remove_first(&ptl_tcp->ptl_peers)) { item = ompi_list_remove_first(&ptl_tcp->ptl_peers)) {
mca_ptl_tcp_peer_t *peer = (mca_ptl_tcp_peer_t*)item; mca_ptl_tcp_peer_t *peer = (mca_ptl_tcp_peer_t*)item;
OBJ_RELEASE(peer); OBJ_RELEASE(peer);
ompi_progress_event_decrement();
} }
free(ptl); free(ptl);
return OMPI_SUCCESS; return OMPI_SUCCESS;

Просмотреть файл

@ -28,22 +28,35 @@
#include "mca/gpr/gpr.h" #include "mca/gpr/gpr.h"
#include "include/orte_schema.h" #include "include/orte_schema.h"
/*
* default parameters
*/
static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE; static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE;
static const int ompi_progress_default_tick_rate = 100;
/*
* Local variables
*/
#if OMPI_HAVE_THREAD_SUPPORT #if OMPI_HAVE_THREAD_SUPPORT
static ompi_lock_t progress_lock; static ompi_lock_t progress_lock;
#endif /* OMPI_HAVE_THREAD_SUPPORT */ #endif /* OMPI_HAVE_THREAD_SUPPORT */
/* callbacks to progress */
static ompi_progress_callback_t *callbacks = NULL; static ompi_progress_callback_t *callbacks = NULL;
static size_t callbacks_len = 0; static size_t callbacks_len = 0;
static size_t callbacks_size = 0; static size_t callbacks_size = 0;
/* do we want to call sched_yield() if nothing happened */
static int call_yield = 1; static int call_yield = 1;
/* current count down until we tick the event library */
static long event_progress_counter = 0; static long event_progress_counter = 0;
/* reset value for counter when it hits 0 */
static long event_progress_counter_reset = 0; static long event_progress_counter_reset = 0;
/* users of the event library from MPI cause the tick rate to
be every time */
static int32_t event_num_mpi_users = 0;
#define BWB_DEBUG_PRINTF 0
static void static void
node_schedule_callback(orte_gpr_notify_data_t *notify_data, void *user_tag) node_schedule_callback(orte_gpr_notify_data_t *notify_data, void *user_tag)
@ -52,10 +65,6 @@ node_schedule_callback(orte_gpr_notify_data_t *notify_data, void *user_tag)
uint32_t used_proc_slots = 0; uint32_t used_proc_slots = 0;
int param, i; int param, i;
#if BWB_DEBUG_PRINTF
printf("callback triggered\n");
#endif
/* parse the response */ /* parse the response */
for(i = 0 ; i < notify_data->cnt ; i++) { for(i = 0 ; i < notify_data->cnt ; i++) {
orte_gpr_value_t* value = notify_data->values[i]; orte_gpr_value_t* value = notify_data->values[i];
@ -65,17 +74,11 @@ node_schedule_callback(orte_gpr_notify_data_t *notify_data, void *user_tag)
orte_gpr_keyval_t* keyval = value->keyvals[k]; orte_gpr_keyval_t* keyval = value->keyvals[k];
if(strcmp(keyval->key, ORTE_NODE_SLOTS_KEY) == 0) { if(strcmp(keyval->key, ORTE_NODE_SLOTS_KEY) == 0) {
proc_slots = keyval->value.ui32; proc_slots = keyval->value.ui32;
#if BWB_DEBUG_PRINTF
printf("setting proc_slots to %d\n", proc_slots);
#endif
continue; continue;
} }
if(strncmp(keyval->key, ORTE_NODE_SLOTS_ALLOC_KEY, if(strncmp(keyval->key, ORTE_NODE_SLOTS_ALLOC_KEY,
strlen(ORTE_NODE_SLOTS_ALLOC_KEY)) == 0) { strlen(ORTE_NODE_SLOTS_ALLOC_KEY)) == 0) {
used_proc_slots += keyval->value.ui32; used_proc_slots += keyval->value.ui32;
#if BWB_DEBUG_PRINTF
printf("setting used_proc_slots to %d\n", used_proc_slots);
#endif
continue; continue;
} }
} }
@ -221,6 +224,8 @@ ompi_progress_mpi_init(void)
if (OMPI_SUCCESS != rc) return rc; if (OMPI_SUCCESS != rc) return rc;
} }
event_num_mpi_users = 0;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -243,19 +248,15 @@ ompi_progress_mpi_enable(void)
call_yield = value; call_yield = value;
} }
#if BWB_DEBUG_PRINTF
printf("call_yield: %d\n", call_yield);
#endif
/* set the event tick rate */ /* set the event tick rate */
param = mca_base_param_find("mpi", NULL, "event_tick_rate"); param = mca_base_param_find("mpi", NULL, "event_tick_rate");
mca_base_param_lookup_int(param, &value); mca_base_param_lookup_int(param, &value);
if (value < 0) { if (value < 0) {
/* default - tick all the time */ /* user didn't specify - default tick rate */
event_progress_counter_reset = 0; event_progress_counter_reset = ompi_progress_default_tick_rate;
} else if (value == 0) { } else if (value == 0) {
/* user specified - don't count often */ /* user specified as never tick - don't count often */
event_progress_counter_reset = INT_MAX; event_progress_counter_reset = INT_MAX;
} else { } else {
/* subtract one so that we can do post-fix subtraction /* subtract one so that we can do post-fix subtraction
@ -263,7 +264,11 @@ ompi_progress_mpi_enable(void)
event_progress_counter_reset = value - 1; event_progress_counter_reset = value - 1;
} }
event_progress_counter = event_progress_counter_reset; /* it's possible that an init function bumped up our tick rate.
* If so, set the event_progress counter to 0. Otherwise, set it to
* the reset value */
event_progress_counter = (event_num_mpi_users > 0) ?
0 : event_progress_counter_reset;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -351,7 +356,8 @@ ompi_progress(void)
/* trip the event library if we've reached our tick rate and we are /* trip the event library if we've reached our tick rate and we are
enabled */ enabled */
if (event_progress_counter-- <= 0 && ompi_progress_event_flag != 0) { if (event_progress_counter-- <= 0 && ompi_progress_event_flag != 0) {
event_progress_counter = event_progress_counter_reset; event_progress_counter =
(event_num_mpi_users > 0) ? 1 : event_progress_counter_reset;
events += ompi_event_loop(ompi_progress_event_flag); events += ompi_event_loop(ompi_progress_event_flag);
} }
#endif #endif
@ -376,11 +382,7 @@ ompi_progress(void)
#endif /* OMPI_HAVE_THREAD_SUPPORT */ #endif /* OMPI_HAVE_THREAD_SUPPORT */
if (call_yield && events <= 0) { if (call_yield && events <= 0) {
/* /* If there is nothing to do - yield the processor - otherwise
* TSW - BWB - XXX - FIXME: this has a non-zero impact on
* performance. Evaluate reasonable defaults.
*
* If there is nothing to do - yield the processor - otherwise
* we could consume the processor for the entire time slice. If * we could consume the processor for the entire time slice. If
* the processor is oversubscribed - this will result in a best-case * the processor is oversubscribed - this will result in a best-case
* latency equivalent to the time-slice. * latency equivalent to the time-slice.
@ -466,3 +468,28 @@ ompi_progress_unregister(ompi_progress_callback_t cb)
return ret; return ret;
} }
int
ompi_progress_event_increment()
{
int32_t val;
val = ompi_atomic_add_32(&event_num_mpi_users, 1);
/* always reset the tick rate - can't hurt */
event_progress_counter = 0;
return OMPI_SUCCESS;
}
int
ompi_progress_event_decrement()
{
int32_t val;
val = ompi_atomic_sub_32(&event_num_mpi_users, 1);
if (val >= 0) {
event_progress_counter = event_progress_counter_reset;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -14,6 +14,12 @@
* $HEADER$ * $HEADER$
*/ */
/**
* @file
*
* Progress engine for Open MPI
*/
#ifndef _OMPI_PROGRESS_H_ #ifndef _OMPI_PROGRESS_H_
#define _OMPI_PROGRESS_H_ #define _OMPI_PROGRESS_H_
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
@ -35,16 +41,28 @@ OMPI_DECLSPEC extern int ompi_progress_init(void);
* *
* Register to receive any needed information from the GPR and * Register to receive any needed information from the GPR and
* intialize any data structures required for MPI applications. * intialize any data structures required for MPI applications.
*
* \note ompi_progress_init() must be called before calling
* this function. Failure to do so is an error.
*/ */
OMPI_DECLSPEC extern int ompi_progress_mpi_init(void); OMPI_DECLSPEC extern int ompi_progress_mpi_init(void);
/** /**
* Turn on optimizations for MPI progress * Turn on optimizations for MPI progress
*
* Turn on optimizations for MPI applications. This includes lowering
* the rate at which the event library is ticked if it is not under
* active use and possibly disabling the sched_yield call when the
* progress engine is idle
*/ */
OMPI_DECLSPEC extern int ompi_progress_mpi_enable(void); OMPI_DECLSPEC extern int ompi_progress_mpi_enable(void);
/** /**
* Turn off all optimizations enabled by ompi_progress_mpi_enable(). * Turn off all optimizations enabled by ompi_progress_mpi_enable().
*
* Completely reverses all optimizations enabled by
* ompi_progress_mpi_enable(). The event library resumes constant
* ticking and the progress engine yields the CPU when idle.
*/ */
OMPI_DECLSPEC extern int ompi_progress_mpi_disable(void); OMPI_DECLSPEC extern int ompi_progress_mpi_disable(void);
@ -82,6 +100,16 @@ OMPI_DECLSPEC int ompi_progress_register(ompi_progress_callback_t cb);
OMPI_DECLSPEC int ompi_progress_unregister(ompi_progress_callback_t cb); OMPI_DECLSPEC int ompi_progress_unregister(ompi_progress_callback_t cb);
/**
* Increase count of MPI users of the event library
*/
OMPI_DECLSPEC int ompi_progress_event_increment(void);
/**
* Decrease count of MPI users of the event library
*/
OMPI_DECLSPEC int ompi_progress_event_decrement(void);
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }
#endif #endif