diff --git a/config/ompi_config_threads.m4 b/config/ompi_config_threads.m4 index 88eb516497..fcdc0daf0b 100644 --- a/config/ompi_config_threads.m4 +++ b/config/ompi_config_threads.m4 @@ -168,5 +168,65 @@ configure with the '--without-threads' option. EOF AC_MSG_ERROR(["*** Can not continue."]) fi -]) + +# +# Now configure the whole MPI and progress thread gorp +# +AC_MSG_CHECKING([if want MPI thread support]) +AC_ARG_ENABLE([mpi-threads], + AC_HELP_STRING([--enable-mpi-threads], + [Enable threads for MPI applications (default: enabled)]), + [enable_mpi_threads="$enableval"]) + +if test "$enable_mpi_threads" = "" ; then + # no argument given either way. Default to whether we have threads or not + if test "$THREAD_TYPE" != "none" ; then + OMPI_ENABLE_MPI_THREADS=1 + enable_mpi_threads="yes" + else + OMPI_ENABLE_MPI_THREADS=0 + enable_mpi_threads="no" + fi +elif test "$enable_mpi_threads" = "no" ; then + OMPI_ENABLE_MPI_THREADS=0 +else + # they want MPI threads. Make sure we have threads + if "$THREAD_TYPE" != "none" ; then + OMPI_ENABLE_MPI_THREADS=1 + enable_mpi_threads="yes" + else + AC_MSG_ERROR([User requested MPI threads, but no threading model supported]) + fi +fi +AC_DEFINE_UNQUOTED([OMPI_ENABLE_MPI_THREADS], [$OMPI_ENABLE_MPI_THREADS], + [Whether we should enable support for multiple user threads]) +AC_MSG_RESULT([$enable_mpi_threads]) + + +AC_MSG_CHECKING([if want asynchronous progress thread support]) +AC_ARG_ENABLE([progress-threads], + AC_HELP_STRING([--enable-progress-threads], + [Enable threads asynchronous communication progress (default: disabled)]), + [enable_progress_threads="$enableval"]) + +if test "$enable_progress_threads" = "" ; then + # no argument given either way. Default to no. + OMPI_ENABLE_PROGRESS_THREADS=0 + enable_progress_threads="no" +elif test "$enable_progress_threads" = "no" ; then + OMPI_ENABLE_PROGRESS_THREADS=0 + enable_progress_threads="no" +else + # they want threaded progress + if "$THREAD_TYPE" != "none" ; then + OMPI_ENABLE_PROGRESS_THREADS=1 + enable_progress_threads="yes" + else + AC_MSG_ERROR([User requested progress threads, but no threading model supported]) + fi +fi +AC_DEFINE_UNQUOTED([OMPI_ENABLE_PROGRESS_THREADS], [$OMPI_ENABLE_PROGRESS_THREADS], + [Whether we should use progress threads rather than polling]) +AC_MSG_RESULT([$enable_progress_threads]) +])dnl diff --git a/include/ompi_config_bottom.h b/include/ompi_config_bottom.h index 2b587997c6..19c559f57e 100644 --- a/include/ompi_config_bottom.h +++ b/include/ompi_config_bottom.h @@ -104,7 +104,7 @@ typedef long long bool; /* * Do we have thread support? */ -#define OMPI_HAVE_THREADS (OMPI_HAVE_SOLARIS_THREADS || OMPI_HAVE_POSIX_THREADS) +#define OMPI_HAVE_THREAD_SUPPORT (OMPI_ENABLE_MPI_THREADS || OMPI_ENABLE_PROGRESS_THREADS) /* * Do we have ? diff --git a/src/event/event.c b/src/event/event.c index 05d5f88855..f3adbd7e7f 100644 --- a/src/event/event.c +++ b/src/event/event.c @@ -149,7 +149,7 @@ static struct timeval ompi_event_tv; OMPI_DECLSPEC ompi_mutex_t ompi_event_lock; static int ompi_event_inited = 0; static bool ompi_event_enabled = false; -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS static ompi_thread_t ompi_event_thread; static ompi_event_t ompi_event_pipe_event; static int ompi_event_pipe[2]; @@ -158,7 +158,7 @@ static int ompi_event_pipe_signalled; bool ompi_event_progress_thread(void) { -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS return ompi_thread_self_compare(&ompi_event_thread); #else return false; @@ -220,7 +220,7 @@ static void* ompi_event_run(ompi_object_t* arg) } -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS static void ompi_event_pipe_handler(int sd, short flags, void* user) { unsigned char byte; @@ -258,7 +258,7 @@ ompi_event_init(void) if (ompi_evbase == NULL) errx(1, "%s: no event mechanism available", __func__); -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS if(pipe(ompi_event_pipe) != 0) { ompi_output(0, "ompi_event_init: pipe() failed with errno=%d\n", errno); return OMPI_ERROR; @@ -285,7 +285,7 @@ ompi_event_init(void) int ompi_event_fini(void) { -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS if(--ompi_event_inited == 0) ompi_event_disable(); #endif @@ -294,7 +294,7 @@ int ompi_event_fini(void) int ompi_event_disable(void) { -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS OMPI_THREAD_LOCK(&ompi_event_lock); if(ompi_event_inited > 0 && ompi_event_enabled) { ompi_event_enabled = false; @@ -317,7 +317,7 @@ int ompi_event_disable(void) int ompi_event_enable(void) { -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS int rc; /* spin up a thread to dispatch events */ OMPI_THREAD_LOCK(&ompi_event_lock); @@ -416,11 +416,11 @@ ompi_event_loop(int flags) } else timerclear(&tv); -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS ompi_event_pipe_signalled = 0; #endif res = ompi_evsel->dispatch(ompi_evbase, &tv); -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS ompi_event_pipe_signalled = 1; #endif if (res == -1) { @@ -520,7 +520,7 @@ ompi_event_add_i(struct ompi_event *ev, struct timeval *tv) rc = (ompi_evsel->add(ompi_evbase, ev)); } -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS if(ompi_event_pipe_signalled == 0) { unsigned char byte = 0; if(write(ompi_event_pipe[1], &byte, 1) != 1) @@ -557,7 +557,7 @@ int ompi_event_del_i(struct ompi_event *ev) rc = (ompi_evsel->del(ompi_evbase, ev)); } -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS if(ompi_event_pipe_signalled == 0) { unsigned char byte = 0; if(write(ompi_event_pipe[1], &byte, 1) != 1) diff --git a/src/mca/io/base/io_base_request.c b/src/mca/io/base/io_base_request.c index 86db9e57dc..29116e1df8 100644 --- a/src/mca/io/base/io_base_request.c +++ b/src/mca/io/base/io_base_request.c @@ -228,7 +228,7 @@ void mca_io_base_request_return(ompi_file_t *file) OMPI_THREAD_UNLOCK(&file->f_io_requests_lock); } -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS static volatile bool thread_running = false; static volatile bool thread_done = false; static ompi_thread_t progress_thread; @@ -255,14 +255,14 @@ request_progress_thread(ompi_object_t *arg) return NULL; } -#endif /* OMPI_HAVE_THREADS */ +#endif /* OMPI_ENABLE_PROGRESS_THREADS */ void mca_io_base_request_progress_init() { mca_io_base_request_num_pending = 0; -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS thread_running = false; thread_done = false; @@ -272,14 +272,14 @@ mca_io_base_request_progress_init() progress_thread.t_run = request_progress_thread; progress_thread.t_arg = NULL; -#endif /* OMPI_HAVE_THREADS */ +#endif /* OMPI_ENABLE_PROGRESS_THREADS */ } void mca_io_base_request_progress_add() { -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS /* if we don't have a progress thread, make us have a progress thread */ if (! thread_running) { @@ -290,13 +290,13 @@ mca_io_base_request_progress_add() } OMPI_THREAD_UNLOCK(&progress_mutex); } -#endif /* OMPI_HAVE_THREADS */ +#endif /* OMPI_ENABLE_PROGRESS_THREADS */ OMPI_THREAD_ADD32(&mca_io_base_request_num_pending, 1); -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS ompi_condition_signal(&progress_cond); -#endif /* OMPI_HAVE_THREADS */ +#endif /* OMPI_ENABLE_PROGRESS_THREADS */ } @@ -310,7 +310,7 @@ mca_io_base_request_progress_del() void mca_io_base_request_progress_fini() { -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS void *ret; /* make the helper thread die */ @@ -324,5 +324,5 @@ mca_io_base_request_progress_fini() OBJ_DESTRUCT(&progress_thread); OBJ_DESTRUCT(&progress_cond); OBJ_DESTRUCT(&progress_mutex); -#endif /* OMPI_HAVE_THREADS */ +#endif /* OMPI_ENABLE_PROGRESS_THREADS */ } diff --git a/src/mca/oob/tcp/oob_tcp.c b/src/mca/oob/tcp/oob_tcp.c index 4f431feed9..2b8f95c6fd 100644 --- a/src/mca/oob/tcp/oob_tcp.c +++ b/src/mca/oob/tcp/oob_tcp.c @@ -424,7 +424,7 @@ mca_oob_t* mca_oob_tcp_component_init(int* priority, bool *allow_multi_user_thre { *priority = 1; *allow_multi_user_threads = true; - *have_hidden_threads = OMPI_HAVE_THREADS; + *have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS; /* are there any interfaces? */ diff --git a/src/mca/oob/tcp/oob_tcp_msg.c b/src/mca/oob/tcp/oob_tcp_msg.c index 319b670024..bcb527d678 100644 --- a/src/mca/oob/tcp/oob_tcp_msg.c +++ b/src/mca/oob/tcp/oob_tcp_msg.c @@ -55,7 +55,7 @@ static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg) int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc) { -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS OMPI_THREAD_LOCK(&msg->msg_lock); while(msg->msg_complete == false) { if(ompi_event_progress_thread()) { @@ -97,7 +97,7 @@ int mca_oob_tcp_msg_timedwait(mca_oob_tcp_msg_t* msg, int* rc, struct timespec* uint32_t usecs = abstime->tv_nsec * 1000; gettimeofday(&tv,NULL); -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS OMPI_THREAD_LOCK(&msg->msg_lock); while(msg->msg_complete == false && ((uint32_t)tv.tv_sec <= secs || diff --git a/src/mca/pcm/bproc/src/pcm_bproc_spawn.c b/src/mca/pcm/bproc/src/pcm_bproc_spawn.c index 24a36ae59c..776dbc638b 100644 --- a/src/mca/pcm/bproc/src/pcm_bproc_spawn.c +++ b/src/mca/pcm/bproc/src/pcm_bproc_spawn.c @@ -74,7 +74,7 @@ internal_bproc_vexecmove_io(int nnodes, int *nodes, int *pids, char ***env, int *envc, int offset); -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS static void spawn_procs_callback(int fd, short flags, void *data); #endif @@ -84,7 +84,7 @@ mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super, mca_ns_base_jobid_t jobid, ompi_list_t *schedlist) { -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS spawn_procs_data_t data; struct timeval tv; struct ompi_event ev; @@ -125,7 +125,7 @@ mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super, } -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS static void spawn_procs_callback(int fd, short flags, void *data) { diff --git a/src/mca/pcm/rsh/pcm_rsh_spawn.c b/src/mca/pcm/rsh/pcm_rsh_spawn.c index 450deae5fb..6a08077521 100644 --- a/src/mca/pcm/rsh/pcm_rsh_spawn.c +++ b/src/mca/pcm/rsh/pcm_rsh_spawn.c @@ -79,7 +79,7 @@ static int internal_spawn_proc(mca_pcm_rsh_module_t *me, static void internal_wait_cb(pid_t pid, int status, void *data); static int internal_start_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super, mca_ns_base_jobid_t jobid, ompi_list_t *schedlist); -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS static void spawn_procs_callback(int fd, short flags, void *data); #endif @@ -87,7 +87,7 @@ int mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super, mca_ns_base_jobid_t jobid, ompi_list_t *schedlist) { -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS spawn_procs_data_t data; struct timeval tv; struct ompi_event ev; @@ -127,7 +127,7 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super, } -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS static void spawn_procs_callback(int fd, short flags, void *data) { diff --git a/src/mca/ptl/elan/src/ptl_elan_component.c b/src/mca/ptl/elan/src/ptl_elan_component.c index 12a95d3d9e..189ae7148a 100644 --- a/src/mca/ptl/elan/src/ptl_elan_component.c +++ b/src/mca/ptl/elan/src/ptl_elan_component.c @@ -203,10 +203,10 @@ mca_ptl_elan_component_init (int *num_ptls, *num_ptls = 0; *allow_multi_user_threads = true; - *have_hidden_threads = OMPI_HAVE_THREADS; + *have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS; /* XXX: Set the global variable to be true for threading */ - if (OMPI_HAVE_THREADS) + if (OMPI_ENABLE_PROGRESS_THREADS) ompi_set_using_threads(true); ompi_free_list_init (&(elan_mp->elan_recv_frags_free), diff --git a/src/mca/ptl/ib/src/ptl_ib_component.c b/src/mca/ptl/ib/src/ptl_ib_component.c index 326601b009..65b14fd974 100644 --- a/src/mca/ptl/ib/src/ptl_ib_component.c +++ b/src/mca/ptl/ib/src/ptl_ib_component.c @@ -175,11 +175,11 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, *num_ptl_modules = 0; mca_ptl_ib_component.ib_num_hcas=0; *allow_multi_user_threads = true; - *have_hidden_threads = OMPI_HAVE_THREADS; + *have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS; /* need to set ompi_using_threads() as ompi_event_init() * will spawn a thread if supported */ - if(OMPI_HAVE_THREADS) { + if(OMPI_ENABLE_PROGRESS_THREADS) { ompi_set_using_threads(true); } diff --git a/src/mca/ptl/mx/ptl_mx.h b/src/mca/ptl/mx/ptl_mx.h index 2a93e2753f..13998c3fe3 100644 --- a/src/mca/ptl/mx/ptl_mx.h +++ b/src/mca/ptl/mx/ptl_mx.h @@ -139,7 +139,7 @@ struct mca_ptl_mx_module_t { mx_endpoint_t mx_endpoint; /**< endpoint */ mx_endpoint_addr_t mx_endpoint_addr; /**< endpoint address */ volatile int32_t mx_recvs_posted; /**< count of posted match fragments */ -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS ompi_thread_t mx_thread; /**< thread for progressing outstanding requests */ #endif }; diff --git a/src/mca/ptl/mx/ptl_mx_module.c b/src/mca/ptl/mx/ptl_mx_module.c index 7e940f496d..916474625b 100644 --- a/src/mca/ptl/mx/ptl_mx_module.c +++ b/src/mca/ptl/mx/ptl_mx_module.c @@ -116,7 +116,7 @@ int mca_ptl_mx_module_init(void) * Thread to progress outstanding requests. */ -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS static void* mca_ptl_mx_thread(ompi_object_t *arg) { @@ -346,7 +346,7 @@ void mca_ptl_mx_enable() for(i=0; imx_enabled = true; -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS /* create a thread to progress requests */ OBJ_CONSTRUCT(&ptl->mx_thread, ompi_thread_t); ptl->mx_thread.t_run = mca_ptl_mx_thread; @@ -365,7 +365,7 @@ void mca_ptl_mx_disable(void) for(i=0; imx_enabled = false; -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS mx_wakeup(ptl->mx_endpoint); ompi_thread_join(&ptl->mx_thread, NULL); #endif @@ -380,7 +380,7 @@ int mca_ptl_mx_finalize(struct mca_ptl_base_module_t* ptl) { mca_ptl_mx_module_t* mx_ptl = (mca_ptl_mx_module_t*)ptl; mx_ptl->mx_enabled = false; -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS mx_wakeup(mx_ptl->mx_endpoint); ompi_thread_join(&mx_ptl->mx_thread, NULL); #endif diff --git a/src/mca/ptl/sm/src/ptl_sm.c b/src/mca/ptl/sm/src/ptl_sm.c index 573f376d5b..a315863820 100644 --- a/src/mca/ptl/sm/src/ptl_sm.c +++ b/src/mca/ptl/sm/src/ptl_sm.c @@ -184,7 +184,7 @@ int mca_ptl_sm_add_procs_same_base_addr( if( 0 == strncmp(ompi_system_info.nodename, (char *)(sm_proc_info[proc]),len) ) { struct mca_ptl_base_peer_t *peer = peers[proc]; -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 char path[PATH_MAX]; /* int flags; */ #endif @@ -198,7 +198,7 @@ int mca_ptl_sm_add_procs_same_base_addr( peer->peer_smp_rank=n_local_procs+ mca_ptl_sm_component.num_smp_procs; -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 sprintf(path, "%s/sm_fifo.%d", ompi_process_info.job_session_dir, procs[proc]->proc_name.vpid); peer->fifo_fd = open(path, O_WRONLY); diff --git a/src/mca/ptl/sm/src/ptl_sm.h b/src/mca/ptl/sm/src/ptl_sm.h index 91d2f72784..e219759471 100644 --- a/src/mca/ptl/sm/src/ptl_sm.h +++ b/src/mca/ptl/sm/src/ptl_sm.h @@ -53,7 +53,7 @@ extern mca_ptl_sm_module_resource_t mca_ptl_sm_module_resource; #define SM_CONNECTED_SAME_BASE_ADDR 2 #define SM_CONNECTED_DIFFERENT_BASE_ADDR 3 -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 #define DATA (char)0 #define DONE (char)1 #endif @@ -126,7 +126,7 @@ struct mca_ptl_sm_component_t { acked */ struct mca_ptl_base_peer_t **sm_peers; -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 char sm_fifo_path[PATH_MAX]; /**< path to fifo used to signal this process */ int sm_fifo_fd; /**< file descriptor corresponding to opened fifo */ ompi_thread_t sm_fifo_thread; @@ -421,11 +421,11 @@ typedef struct mca_ptl_sm_exchange{ char host_name[MCA_PTL_SM_MAX_HOSTNAME_LEN]; }mca_ptl_sm_exchange_t; -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 void mca_ptl_sm_component_event_thread(ompi_object_t*); #endif -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 #define MCA_PTL_SM_SIGNAL_PEER(peer) \ { \ unsigned char cmd = DATA; \ diff --git a/src/mca/ptl/sm/src/ptl_sm_component.c b/src/mca/ptl/sm/src/ptl_sm_component.c index 8606704db6..b105b95274 100644 --- a/src/mca/ptl/sm/src/ptl_sm_component.c +++ b/src/mca/ptl/sm/src/ptl_sm_component.c @@ -209,7 +209,7 @@ int mca_ptl_sm_component_close(void) unlink(mca_ptl_sm_component.mmap_file->map_path); } -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 /* close/cleanup fifo create for event notification */ if(mca_ptl_sm_component.sm_fifo_fd >= 0) { /* write a done message down the pipe */ @@ -246,7 +246,7 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init( *num_ptls = 0; *allow_multi_user_threads = true; - *have_hidden_threads = OMPI_HAVE_THREADS; + *have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS; /* lookup shared memory pool */ mca_ptl_sm_component.sm_mpool = @@ -258,7 +258,7 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init( if(mca_ptl_sm_component_exchange() != OMPI_SUCCESS) return NULL; -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 /* create a named pipe to receive events */ sprintf(mca_ptl_sm_component.sm_fifo_path, "%s/sm_fifo.%d", ompi_process_info.job_session_dir, @@ -335,7 +335,7 @@ int mca_ptl_sm_component_control(int param, void* value, size_t size) * SM component progress. */ -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 void mca_ptl_sm_component_event_thread(ompi_object_t* thread) { while(1) { diff --git a/src/mca/ptl/sm/src/ptl_sm_peer.h b/src/mca/ptl/sm/src/ptl_sm_peer.h index efac48c08b..2a2a20487e 100644 --- a/src/mca/ptl/sm/src/ptl_sm_peer.h +++ b/src/mca/ptl/sm/src/ptl_sm_peer.h @@ -17,7 +17,7 @@ #ifndef MCA_PTL_SM_PEER_H #define MCA_PTL_SM_PEER_H -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 #include "event/event.h" #endif @@ -32,7 +32,7 @@ struct mca_ptl_base_peer_t { * SMP specfic data structures. */ int peer_smp_rank; /**< My peer's SMP process rank. Used for accessing * SMP specfic data structures. */ -#if OMPI_HAVE_THREADS == 1 +#if OMPI_ENABLE_PROGRESS_THREADS == 1 int fifo_fd; /**< pipe/fifo used to signal peer that data is queued */ #endif }; diff --git a/src/mca/ptl/tcp/src/ptl_tcp_component.c b/src/mca/ptl/tcp/src/ptl_tcp_component.c index 84d1bd5a8a..e4be1accd4 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_component.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_component.c @@ -486,7 +486,7 @@ mca_ptl_base_module_t** mca_ptl_tcp_component_init(int *num_ptl_modules, mca_ptl_base_module_t **ptls; *num_ptl_modules = 0; *allow_multi_user_threads = true; - *have_hidden_threads = OMPI_HAVE_THREADS; + *have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS; ompi_free_list_init(&mca_ptl_tcp_component.tcp_send_frags, sizeof(mca_ptl_tcp_send_frag_t), diff --git a/src/mpi/c/is_thread_main.c b/src/mpi/c/is_thread_main.c index 71b003a856..4d6cd39f92 100644 --- a/src/mpi/c/is_thread_main.c +++ b/src/mpi/c/is_thread_main.c @@ -43,7 +43,7 @@ int MPI_Is_thread_main(int *flag) /* Compare this thread ID to the main thread ID */ -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_MPI_THREADS *flag = (int) ompi_thread_self_compare(ompi_mpi_main_thread); #else *flag = 1; diff --git a/src/mpi/runtime/ompi_mpi_abort.c b/src/mpi/runtime/ompi_mpi_abort.c index 9c01a898e8..490fe9432f 100644 --- a/src/mpi/runtime/ompi_mpi_abort.c +++ b/src/mpi/runtime/ompi_mpi_abort.c @@ -89,7 +89,7 @@ ompi_mpi_abort(struct ompi_communicator_t* comm, is actually dead. But just in case there are some race conditions, keep progressing the event loop until we get killed */ - if (!OMPI_HAVE_THREADS || ompi_event_progress_thread()) { + if (!OMPI_ENABLE_PROGRESS_THREADS || ompi_event_progress_thread()) { ompi_event_loop(0); } else { sleep(1000); diff --git a/src/mpi/runtime/ompi_mpi_finalize.c b/src/mpi/runtime/ompi_mpi_finalize.c index b9c6786396..bf77f7aede 100644 --- a/src/mpi/runtime/ompi_mpi_finalize.c +++ b/src/mpi/runtime/ompi_mpi_finalize.c @@ -57,7 +57,7 @@ int ompi_mpi_finalize(void) mca_ns_base_jobid_t my_jobid; ompi_mpi_finalized = true; -#if OMPI_HAVE_THREADS == 0 +#if OMPI_ENABLE_PROGRESS_THREADS == 0 ompi_progress_events(OMPI_EVLOOP_NONBLOCK); #endif diff --git a/src/mpi/runtime/ompi_mpi_init.c b/src/mpi/runtime/ompi_mpi_init.c index be47b5e861..d5b170e75f 100644 --- a/src/mpi/runtime/ompi_mpi_init.c +++ b/src/mpi/runtime/ompi_mpi_init.c @@ -337,7 +337,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) ompi_mpi_thread_provided = *provided; ompi_mpi_thread_multiple = (ompi_mpi_thread_provided == MPI_THREAD_MULTIPLE); -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_MPI_THREADS ompi_mpi_main_thread = ompi_thread_get_self(); #else ompi_mpi_main_thread = NULL; @@ -357,7 +357,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) goto error; } -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS /* BWB - XXX - FIXME - is this actually correct? */ /* setup I/O forwarding */ if(ompi_process_info.seed == false) { if (OMPI_SUCCESS != (ret = ompi_mpi_init_io())) { @@ -370,7 +370,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) /* Wait for everyone to initialize */ mca_oob_barrier(); -#if OMPI_HAVE_THREADS == 0 +#if OMPI_ENABLE_PROGRESS_THREADS == 0 ompi_progress_events(OMPI_EVLOOP_NONBLOCK); #endif diff --git a/src/request/req_test.c b/src/request/req_test.c index b616a9b466..0c77dd822d 100644 --- a/src/request/req_test.c +++ b/src/request/req_test.c @@ -52,7 +52,7 @@ int ompi_request_test_any( /* Only fall through here if we found nothing */ if(num_requests_null_inactive != count) { *completed = false; -#if OMPI_HAVE_THREADS == 0 +#if OMPI_ENABLE_PROGRESS_THREADS == 0 ompi_progress(); #endif } else { @@ -90,7 +90,7 @@ int ompi_request_test_all( if (num_completed != count) { *completed = false; -#if OMPI_HAVE_THREADS == 0 +#if OMPI_ENABLE_PROGRESS_THREADS == 0 ompi_progress(); #endif return OMPI_SUCCESS; diff --git a/src/request/req_wait.c b/src/request/req_wait.c index 968b9b5e1f..ec76485aa0 100644 --- a/src/request/req_wait.c +++ b/src/request/req_wait.c @@ -23,7 +23,7 @@ int ompi_request_wait_any( int *index, ompi_status_public_t * status) { -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS int c; #endif size_t i=0, num_requests_null_inactive=0; @@ -32,7 +32,7 @@ int ompi_request_wait_any( ompi_request_t **rptr=NULL; ompi_request_t *request=NULL; -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS /* poll for completion */ ompi_atomic_mb(); for (c = 0; completed < 0 && c < ompi_request_poll_iterations; c++) { @@ -83,9 +83,10 @@ int ompi_request_wait_any( ompi_request_waiting--; OMPI_THREAD_UNLOCK(&ompi_request_lock); -#if OMPI_HAVE_THREADS +#if OMPI_ENABLE_PROGRESS_THREADS finished: -#endif /* OMPI_HAVE_THREADS */ +#endif /* OMPI_ENABLE_PROGRESS_THREADS */ + if(num_requests_null_inactive == count) { *index = MPI_UNDEFINED; if (MPI_STATUS_IGNORE != status) { diff --git a/src/request/request.h b/src/request/request.h index 912cb2ef31..7fcc8c5d99 100644 --- a/src/request/request.h +++ b/src/request/request.h @@ -223,7 +223,7 @@ static inline int ompi_request_test( return request->req_fini(rptr); } else { *completed = false; -#if OMPI_HAVE_THREADS == 0 +#if OMPI_ENABLE_PROGRESS_THREADS == 0 ompi_progress(); #endif return OMPI_SUCCESS; diff --git a/src/runtime/ompi_init.c b/src/runtime/ompi_init.c index b007543607..eb9aa1f622 100644 --- a/src/runtime/ompi_init.c +++ b/src/runtime/ompi_init.c @@ -61,7 +61,7 @@ int ompi_init(int argc, char *argv[]) * otherwise. */ - ompi_set_using_threads(OMPI_HAVE_THREADS); + ompi_set_using_threads(OMPI_HAVE_THREAD_SUPPORT); /* For malloc debugging */ diff --git a/src/runtime/ompi_progress.c b/src/runtime/ompi_progress.c index 6fe7d29126..7527bdd504 100644 --- a/src/runtime/ompi_progress.c +++ b/src/runtime/ompi_progress.c @@ -23,8 +23,16 @@ static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE; +static ompi_lock_t progress_lock; +int +ompi_progress_init(void) +{ + ompi_atomic_init(&progress_lock, OMPI_ATOMIC_UNLOCKED); + return OMPI_SUCCESS; +} + void ompi_progress_events(int flag) { ompi_progress_event_flag = flag; @@ -35,7 +43,22 @@ void ompi_progress(void) { /* progress any outstanding communications */ int ret, events = 0; -#if OMPI_HAVE_THREADS == 0 +#if OMPI_HAVE_THREAD_SUPPORT + /* NOTE: BWB - XXX - FIXME: Currently, there are some progress functions + (the event library, for one) that are not reentrant. It is not known + if the PTL progress functions are all reentrant or not. The I/O + code should all be reentrant. Because of the uncertainty, we are + preventing more than one thread of execution from progressing the + via ompi_progress (which is usually only called when there are + no PROGRESS_THREADS running). This should be made more fine-grained + at some point in the future. */ + if (! ompi_atomic_trylock(&progress_lock)) { + /* someone is already progressing - return */ + return; + } +#endif + +#if OMPI_ENABLE_PROGRESS_THREADS == 0 if (ompi_progress_event_flag != 0) { ret = ompi_event_loop(ompi_progress_event_flag); if (ret > 0) { @@ -54,6 +77,9 @@ void ompi_progress(void) events += ret; } + /* release the lock before yielding, for obvious reasons */ + ompi_atomic_unlock(&progress_lock); + #if 1 /* TSW - disable this until can validate that it doesn't impact SMP * performance diff --git a/src/runtime/ompi_progress.h b/src/runtime/ompi_progress.h index 61a6577a38..a32aef87ab 100644 --- a/src/runtime/ompi_progress.h +++ b/src/runtime/ompi_progress.h @@ -18,6 +18,8 @@ extern "C" { #endif +OMPI_DECLSPEC extern int ompi_progress_init(void); + OMPI_DECLSPEC extern void ompi_progress_events(int); OMPI_DECLSPEC extern void ompi_progress(void); diff --git a/src/runtime/ompi_rte_init.c b/src/runtime/ompi_rte_init.c index 8a3cd41e64..bb473f940d 100644 --- a/src/runtime/ompi_rte_init.c +++ b/src/runtime/ompi_rte_init.c @@ -157,6 +157,13 @@ int ompi_rte_init(ompi_cmd_line_t *cmd_line, bool *allow_multi_user_threads, boo return ret; } + /* + * And the progress code + */ + if (OMPI_SUCCESS != (ret = ompi_progress_init())) { + return ret; + } + /* * Internal startup */ diff --git a/src/runtime/ompi_rte_wait.c b/src/runtime/ompi_rte_wait.c index eb9101d689..e3f1f51216 100644 --- a/src/runtime/ompi_rte_wait.c +++ b/src/runtime/ompi_rte_wait.c @@ -145,7 +145,7 @@ static void register_sig_event(void); static int unregister_callback(pid_t pid); void ompi_rte_wait_signal_callback(int fd, short event, void *arg); static pid_t internal_waitpid(pid_t pid, int *status, int options); -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS static void internal_waitpid_callback(int fd, short event, void *arg); #endif @@ -252,7 +252,7 @@ ompi_rte_waitpid(pid_t wpid, int *status, int options) ompi_condition_timedwait(data->cond, cond_mutex, &spintime); -#if OMPI_HAVE_THREADS +#if OMPI_HAVE_THREAD_SUPPORT if (ompi_event_progress_thread()) { ompi_event_loop(OMPI_EVLOOP_NONBLOCK); } @@ -272,7 +272,7 @@ ompi_rte_waitpid(pid_t wpid, int *status, int options) pthreads gets really unhappy when we pull the rug out from under it. Yes, it's spinning. No, we won't spin for long */ -#if OMPI_HAVE_THREADS +#if OMPI_HAVE_THREAD_SUPPORT if (ompi_event_progress_thread()) { ompi_event_loop(OMPI_EVLOOP_NONBLOCK); } @@ -548,7 +548,7 @@ register_sig_event(void) /* it seems that signal events are only added to the queue at the next progress call. So push the event library */ -#if OMPI_HAVE_THREADS +#if OMPI_HAVE_THREAD_SUPPORT if (ompi_event_progress_thread()) { ompi_event_loop(OMPI_EVLOOP_NONBLOCK); } @@ -564,7 +564,7 @@ register_sig_event(void) static pid_t internal_waitpid(pid_t pid, int *status, int options) { -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS waitpid_callback_data_t data; struct timeval tv; struct ompi_event ev; @@ -606,7 +606,7 @@ internal_waitpid(pid_t pid, int *status, int options) } -#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS static void internal_waitpid_callback(int fd, short event, void *arg) { diff --git a/src/threads/condition.h b/src/threads/condition.h index d4184d5e5c..734ed4f0c6 100644 --- a/src/threads/condition.h +++ b/src/threads/condition.h @@ -16,7 +16,7 @@ #include "ompi_config.h" -#if OMPI_HAVE_POSIX_THREADS +#if OMPI_HAVE_POSIX_THREADS && OMPI_ENABLE_PROGRESS_THREADS #include "condition_pthread.h" #else #include "condition_spinlock.h" diff --git a/src/threads/condition_pthread.c b/src/threads/condition_pthread.c index 5e55f6d2d0..e80b97a666 100644 --- a/src/threads/condition_pthread.c +++ b/src/threads/condition_pthread.c @@ -17,7 +17,7 @@ #include "threads/mutex.h" #include "threads/condition.h" -#if OMPI_HAVE_POSIX_THREADS +#if OMPI_HAVE_POSIX_THREADS && OMPI_ENABLE_PROGRESS_THREADS static void ompi_condition_construct(ompi_condition_t *c) { diff --git a/src/threads/condition_spinlock.c b/src/threads/condition_spinlock.c index 0d5054bfb9..5fb83dab65 100644 --- a/src/threads/condition_spinlock.c +++ b/src/threads/condition_spinlock.c @@ -17,7 +17,7 @@ #include "threads/mutex.h" #include "threads/condition.h" -#if (OMPI_HAVE_THREADS == 0) +#if (OMPI_HAVE_POSIX_THREADS == 0) || (OMPI_ENABLE_PROGRESS_THREADS == 0) static void ompi_condition_construct(ompi_condition_t *c) { diff --git a/src/threads/mutex.c b/src/threads/mutex.c index b41d23525f..6752245362 100644 --- a/src/threads/mutex.c +++ b/src/threads/mutex.c @@ -19,7 +19,7 @@ /* * Default to a safe value */ -bool ompi_uses_threads = (bool) OMPI_HAVE_THREADS; +bool ompi_uses_threads = (bool) OMPI_HAVE_THREAD_SUPPORT; #ifdef WIN32 diff --git a/src/threads/mutex.h b/src/threads/mutex.h index 832462a39b..96bf46e6b5 100644 --- a/src/threads/mutex.h +++ b/src/threads/mutex.h @@ -148,7 +148,7 @@ static inline bool ompi_using_threads(void) */ static inline bool ompi_set_using_threads(bool have) { -#if OMPI_HAVE_THREADS +#if OMPI_HAVE_THREAD_SUPPORT ompi_uses_threads = have; #else ompi_uses_threads = false;