From 591b8eebda9acd6697fe10ece15ba0c8cc3a162f Mon Sep 17 00:00:00 2001 From: Brian Barrett Date: Thu, 21 Apr 2005 14:58:25 +0000 Subject: [PATCH] * Default to only progressing the event library every 100 calls to ompi_progress, unless someone is actually using it (MPI-2 dynamic, TCP PTL). This is only for end of MPI_Init to start of MPI_Finalize. All other times, the event library will be progressed every call into ompi_progress(). This commit was SVN r5479. --- src/communicator/comm_dyn.c | 19 ++++++++- src/mca/ptl/tcp/src/ptl_tcp.c | 6 +++ src/runtime/ompi_progress.c | 77 +++++++++++++++++++++++------------ src/runtime/ompi_progress.h | 28 +++++++++++++ 4 files changed, 104 insertions(+), 26 deletions(-) diff --git a/src/communicator/comm_dyn.c b/src/communicator/comm_dyn.c index 04ac9d0958..b71d818f91 100644 --- a/src/communicator/comm_dyn.c +++ b/src/communicator/comm_dyn.c @@ -84,6 +84,11 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, if (NULL == nbuf) { return OMPI_ERROR; } + + /* tell the progress engine to tick the event library more + often, to make sure that the OOB messages get sent */ + ompi_progress_event_increment(); + if (ORTE_SUCCESS != (rc = orte_dps.pack(nbuf, &size, 1, ORTE_INT))) { goto exit; } @@ -210,6 +215,10 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, exit: + /* done with OOB and such - slow our tick rate again */ + ompi_progress(); + ompi_progress_event_decrement(); + if ( NULL != rprocs ) { free ( rprocs ); @@ -318,6 +327,8 @@ ompi_comm_start_processes(int count, char **array_of_commands, - "soft": see page 92 of MPI-2. */ + /* make sure the progress engine properly trips the event library */ + ompi_progress_event_increment(); /* Convert the list of commands to an array of orte_app_context_t pointers */ @@ -332,6 +343,7 @@ ompi_comm_start_processes(int count, char **array_of_commands, ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); /* rollback what was already done */ for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); + ompi_progress_event_decrement(); return ORTE_ERR_OUT_OF_RESOURCE; } /* copy over the name of the executable */ @@ -340,6 +352,7 @@ ompi_comm_start_processes(int count, char **array_of_commands, ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); /* rollback what was already done */ for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); + ompi_progress_event_decrement(); return ORTE_ERR_OUT_OF_RESOURCE; } /* record the number of procs to be generated */ @@ -366,6 +379,7 @@ ompi_comm_start_processes(int count, char **array_of_commands, for (j=0; j < i; j++) { OBJ_RELEASE(apps[j]); } + ompi_progress_event_decrement(); return ORTE_ERR_OUT_OF_RESOURCE; } apps[i]->argv[0] = strdup(array_of_commands[i]); @@ -388,6 +402,7 @@ ompi_comm_start_processes(int count, char **array_of_commands, ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); /* rollback what was already done */ for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); + ompi_progress_event_decrement(); return ORTE_ERR_OUT_OF_RESOURCE; } asprintf(&(apps[i]->env[0]), "OMPI_PARENT_PORT=%s", port_name); @@ -419,10 +434,12 @@ ompi_comm_start_processes(int count, char **array_of_commands, if (ORTE_SUCCESS != (rc = orte_rmgr.spawn(apps, count, &new_jobid, NULL))) { ORTE_ERROR_LOG(rc); - return MPI_ERR_SPAWN; + ompi_progress_event_decrement(); + return MPI_ERR_SPAWN; } /* clean up */ + ompi_progress_event_decrement(); for ( i=0; iproc_lock); peers[i] = ptl_peer; ompi_list_append(&ptl_tcp->ptl_peers, (ompi_list_item_t*)ptl_peer); + /* we increase the count of MPI users of the event library + once per peer, so that we are used until we aren't + connected to a peer */ + ompi_progress_event_increment(); } return OMPI_SUCCESS; } @@ -146,6 +150,7 @@ int mca_ptl_tcp_del_procs(struct mca_ptl_base_module_t* ptl, size_t nprocs, stru for(i=0; iptl_peers, (ompi_list_item_t*)peers[i]); OBJ_RELEASE(peers[i]); + ompi_progress_event_decrement(); } return OMPI_SUCCESS; } @@ -163,6 +168,7 @@ int mca_ptl_tcp_finalize(struct mca_ptl_base_module_t* ptl) item = ompi_list_remove_first(&ptl_tcp->ptl_peers)) { mca_ptl_tcp_peer_t *peer = (mca_ptl_tcp_peer_t*)item; OBJ_RELEASE(peer); + ompi_progress_event_decrement(); } free(ptl); return OMPI_SUCCESS; diff --git a/src/runtime/ompi_progress.c b/src/runtime/ompi_progress.c index d627a38e7b..592ca2ee77 100644 --- a/src/runtime/ompi_progress.c +++ b/src/runtime/ompi_progress.c @@ -28,22 +28,35 @@ #include "mca/gpr/gpr.h" #include "include/orte_schema.h" +/* + * default parameters + */ static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE; +static const int ompi_progress_default_tick_rate = 100; +/* + * Local variables + */ #if OMPI_HAVE_THREAD_SUPPORT static ompi_lock_t progress_lock; #endif /* OMPI_HAVE_THREAD_SUPPORT */ +/* callbacks to progress */ static ompi_progress_callback_t *callbacks = NULL; static size_t callbacks_len = 0; static size_t callbacks_size = 0; +/* do we want to call sched_yield() if nothing happened */ static int call_yield = 1; +/* current count down until we tick the event library */ static long event_progress_counter = 0; +/* reset value for counter when it hits 0 */ static long event_progress_counter_reset = 0; +/* users of the event library from MPI cause the tick rate to + be every time */ +static int32_t event_num_mpi_users = 0; -#define BWB_DEBUG_PRINTF 0 static void node_schedule_callback(orte_gpr_notify_data_t *notify_data, void *user_tag) @@ -52,10 +65,6 @@ node_schedule_callback(orte_gpr_notify_data_t *notify_data, void *user_tag) uint32_t used_proc_slots = 0; int param, i; -#if BWB_DEBUG_PRINTF - printf("callback triggered\n"); -#endif - /* parse the response */ for(i = 0 ; i < notify_data->cnt ; i++) { orte_gpr_value_t* value = notify_data->values[i]; @@ -65,17 +74,11 @@ node_schedule_callback(orte_gpr_notify_data_t *notify_data, void *user_tag) orte_gpr_keyval_t* keyval = value->keyvals[k]; if(strcmp(keyval->key, ORTE_NODE_SLOTS_KEY) == 0) { proc_slots = keyval->value.ui32; -#if BWB_DEBUG_PRINTF - printf("setting proc_slots to %d\n", proc_slots); -#endif continue; } if(strncmp(keyval->key, ORTE_NODE_SLOTS_ALLOC_KEY, strlen(ORTE_NODE_SLOTS_ALLOC_KEY)) == 0) { used_proc_slots += keyval->value.ui32; -#if BWB_DEBUG_PRINTF - printf("setting used_proc_slots to %d\n", used_proc_slots); -#endif continue; } } @@ -221,6 +224,8 @@ ompi_progress_mpi_init(void) if (OMPI_SUCCESS != rc) return rc; } + event_num_mpi_users = 0; + return OMPI_SUCCESS; } @@ -243,19 +248,15 @@ ompi_progress_mpi_enable(void) call_yield = value; } -#if BWB_DEBUG_PRINTF - printf("call_yield: %d\n", call_yield); -#endif - /* set the event tick rate */ param = mca_base_param_find("mpi", NULL, "event_tick_rate"); mca_base_param_lookup_int(param, &value); if (value < 0) { - /* default - tick all the time */ - event_progress_counter_reset = 0; + /* user didn't specify - default tick rate */ + event_progress_counter_reset = ompi_progress_default_tick_rate; } else if (value == 0) { - /* user specified - don't count often */ + /* user specified as never tick - don't count often */ event_progress_counter_reset = INT_MAX; } else { /* subtract one so that we can do post-fix subtraction @@ -263,7 +264,11 @@ ompi_progress_mpi_enable(void) event_progress_counter_reset = value - 1; } - event_progress_counter = event_progress_counter_reset; + /* it's possible that an init function bumped up our tick rate. + * If so, set the event_progress counter to 0. Otherwise, set it to + * the reset value */ + event_progress_counter = (event_num_mpi_users > 0) ? + 0 : event_progress_counter_reset; return OMPI_SUCCESS; } @@ -351,7 +356,8 @@ ompi_progress(void) /* trip the event library if we've reached our tick rate and we are enabled */ if (event_progress_counter-- <= 0 && ompi_progress_event_flag != 0) { - event_progress_counter = event_progress_counter_reset; + event_progress_counter = + (event_num_mpi_users > 0) ? 1 : event_progress_counter_reset; events += ompi_event_loop(ompi_progress_event_flag); } #endif @@ -376,11 +382,7 @@ ompi_progress(void) #endif /* OMPI_HAVE_THREAD_SUPPORT */ if (call_yield && events <= 0) { - /* - * TSW - BWB - XXX - FIXME: this has a non-zero impact on - * performance. Evaluate reasonable defaults. - * - * If there is nothing to do - yield the processor - otherwise + /* If there is nothing to do - yield the processor - otherwise * we could consume the processor for the entire time slice. If * the processor is oversubscribed - this will result in a best-case * latency equivalent to the time-slice. @@ -466,3 +468,28 @@ ompi_progress_unregister(ompi_progress_callback_t cb) return ret; } + + +int +ompi_progress_event_increment() +{ + int32_t val; + val = ompi_atomic_add_32(&event_num_mpi_users, 1); + /* always reset the tick rate - can't hurt */ + event_progress_counter = 0; + + return OMPI_SUCCESS; +} + + +int +ompi_progress_event_decrement() +{ + int32_t val; + val = ompi_atomic_sub_32(&event_num_mpi_users, 1); + if (val >= 0) { + event_progress_counter = event_progress_counter_reset; + } + + return OMPI_SUCCESS; +} diff --git a/src/runtime/ompi_progress.h b/src/runtime/ompi_progress.h index de50757fd6..03c162da8c 100644 --- a/src/runtime/ompi_progress.h +++ b/src/runtime/ompi_progress.h @@ -14,6 +14,12 @@ * $HEADER$ */ +/** + * @file + * + * Progress engine for Open MPI + */ + #ifndef _OMPI_PROGRESS_H_ #define _OMPI_PROGRESS_H_ #if defined(c_plusplus) || defined(__cplusplus) @@ -35,16 +41,28 @@ OMPI_DECLSPEC extern int ompi_progress_init(void); * * Register to receive any needed information from the GPR and * intialize any data structures required for MPI applications. + * + * \note ompi_progress_init() must be called before calling + * this function. Failure to do so is an error. */ OMPI_DECLSPEC extern int ompi_progress_mpi_init(void); /** * Turn on optimizations for MPI progress + * + * Turn on optimizations for MPI applications. This includes lowering + * the rate at which the event library is ticked if it is not under + * active use and possibly disabling the sched_yield call when the + * progress engine is idle */ OMPI_DECLSPEC extern int ompi_progress_mpi_enable(void); /** * Turn off all optimizations enabled by ompi_progress_mpi_enable(). + * + * Completely reverses all optimizations enabled by + * ompi_progress_mpi_enable(). The event library resumes constant + * ticking and the progress engine yields the CPU when idle. */ OMPI_DECLSPEC extern int ompi_progress_mpi_disable(void); @@ -82,6 +100,16 @@ OMPI_DECLSPEC int ompi_progress_register(ompi_progress_callback_t cb); OMPI_DECLSPEC int ompi_progress_unregister(ompi_progress_callback_t cb); +/** + * Increase count of MPI users of the event library + */ +OMPI_DECLSPEC int ompi_progress_event_increment(void); + +/** + * Decrease count of MPI users of the event library + */ +OMPI_DECLSPEC int ompi_progress_event_decrement(void); + #if defined(c_plusplus) || defined(__cplusplus) } #endif