* Split thread support build conditionals into MPI threads and progress
threads (defaults to use MPI threads, disable progress threads). This allows us to have MPI threaded support, but without progress threads and all that fun stuff. This commit was SVN r4443.
Этот коммит содержится в:
родитель
b7c2c47494
Коммит
0d82642b40
@ -168,5 +168,65 @@ configure with the '--without-threads' option.
|
||||
EOF
|
||||
AC_MSG_ERROR(["*** Can not continue."])
|
||||
fi
|
||||
])
|
||||
|
||||
#
|
||||
# Now configure the whole MPI and progress thread gorp
|
||||
#
|
||||
AC_MSG_CHECKING([if want MPI thread support])
|
||||
AC_ARG_ENABLE([mpi-threads],
|
||||
AC_HELP_STRING([--enable-mpi-threads],
|
||||
[Enable threads for MPI applications (default: enabled)]),
|
||||
[enable_mpi_threads="$enableval"])
|
||||
|
||||
if test "$enable_mpi_threads" = "" ; then
|
||||
# no argument given either way. Default to whether we have threads or not
|
||||
if test "$THREAD_TYPE" != "none" ; then
|
||||
OMPI_ENABLE_MPI_THREADS=1
|
||||
enable_mpi_threads="yes"
|
||||
else
|
||||
OMPI_ENABLE_MPI_THREADS=0
|
||||
enable_mpi_threads="no"
|
||||
fi
|
||||
elif test "$enable_mpi_threads" = "no" ; then
|
||||
OMPI_ENABLE_MPI_THREADS=0
|
||||
else
|
||||
# they want MPI threads. Make sure we have threads
|
||||
if "$THREAD_TYPE" != "none" ; then
|
||||
OMPI_ENABLE_MPI_THREADS=1
|
||||
enable_mpi_threads="yes"
|
||||
else
|
||||
AC_MSG_ERROR([User requested MPI threads, but no threading model supported])
|
||||
fi
|
||||
fi
|
||||
AC_DEFINE_UNQUOTED([OMPI_ENABLE_MPI_THREADS], [$OMPI_ENABLE_MPI_THREADS],
|
||||
[Whether we should enable support for multiple user threads])
|
||||
AC_MSG_RESULT([$enable_mpi_threads])
|
||||
|
||||
|
||||
AC_MSG_CHECKING([if want asynchronous progress thread support])
|
||||
AC_ARG_ENABLE([progress-threads],
|
||||
AC_HELP_STRING([--enable-progress-threads],
|
||||
[Enable threads asynchronous communication progress (default: disabled)]),
|
||||
[enable_progress_threads="$enableval"])
|
||||
|
||||
if test "$enable_progress_threads" = "" ; then
|
||||
# no argument given either way. Default to no.
|
||||
OMPI_ENABLE_PROGRESS_THREADS=0
|
||||
enable_progress_threads="no"
|
||||
elif test "$enable_progress_threads" = "no" ; then
|
||||
OMPI_ENABLE_PROGRESS_THREADS=0
|
||||
enable_progress_threads="no"
|
||||
else
|
||||
# they want threaded progress
|
||||
if "$THREAD_TYPE" != "none" ; then
|
||||
OMPI_ENABLE_PROGRESS_THREADS=1
|
||||
enable_progress_threads="yes"
|
||||
else
|
||||
AC_MSG_ERROR([User requested progress threads, but no threading model supported])
|
||||
fi
|
||||
fi
|
||||
AC_DEFINE_UNQUOTED([OMPI_ENABLE_PROGRESS_THREADS], [$OMPI_ENABLE_PROGRESS_THREADS],
|
||||
[Whether we should use progress threads rather than polling])
|
||||
AC_MSG_RESULT([$enable_progress_threads])
|
||||
])dnl
|
||||
|
||||
|
@ -104,7 +104,7 @@ typedef long long bool;
|
||||
/*
|
||||
* Do we have thread support?
|
||||
*/
|
||||
#define OMPI_HAVE_THREADS (OMPI_HAVE_SOLARIS_THREADS || OMPI_HAVE_POSIX_THREADS)
|
||||
#define OMPI_HAVE_THREAD_SUPPORT (OMPI_ENABLE_MPI_THREADS || OMPI_ENABLE_PROGRESS_THREADS)
|
||||
|
||||
/*
|
||||
* Do we have <stdint.h>?
|
||||
|
@ -149,7 +149,7 @@ static struct timeval ompi_event_tv;
|
||||
OMPI_DECLSPEC ompi_mutex_t ompi_event_lock;
|
||||
static int ompi_event_inited = 0;
|
||||
static bool ompi_event_enabled = false;
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
static ompi_thread_t ompi_event_thread;
|
||||
static ompi_event_t ompi_event_pipe_event;
|
||||
static int ompi_event_pipe[2];
|
||||
@ -158,7 +158,7 @@ static int ompi_event_pipe_signalled;
|
||||
|
||||
bool ompi_event_progress_thread(void)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
return ompi_thread_self_compare(&ompi_event_thread);
|
||||
#else
|
||||
return false;
|
||||
@ -220,7 +220,7 @@ static void* ompi_event_run(ompi_object_t* arg)
|
||||
}
|
||||
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
static void ompi_event_pipe_handler(int sd, short flags, void* user)
|
||||
{
|
||||
unsigned char byte;
|
||||
@ -258,7 +258,7 @@ ompi_event_init(void)
|
||||
if (ompi_evbase == NULL)
|
||||
errx(1, "%s: no event mechanism available", __func__);
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
if(pipe(ompi_event_pipe) != 0) {
|
||||
ompi_output(0, "ompi_event_init: pipe() failed with errno=%d\n", errno);
|
||||
return OMPI_ERROR;
|
||||
@ -285,7 +285,7 @@ ompi_event_init(void)
|
||||
|
||||
int ompi_event_fini(void)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
if(--ompi_event_inited == 0)
|
||||
ompi_event_disable();
|
||||
#endif
|
||||
@ -294,7 +294,7 @@ int ompi_event_fini(void)
|
||||
|
||||
int ompi_event_disable(void)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
OMPI_THREAD_LOCK(&ompi_event_lock);
|
||||
if(ompi_event_inited > 0 && ompi_event_enabled) {
|
||||
ompi_event_enabled = false;
|
||||
@ -317,7 +317,7 @@ int ompi_event_disable(void)
|
||||
|
||||
int ompi_event_enable(void)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
int rc;
|
||||
/* spin up a thread to dispatch events */
|
||||
OMPI_THREAD_LOCK(&ompi_event_lock);
|
||||
@ -416,11 +416,11 @@ ompi_event_loop(int flags)
|
||||
} else
|
||||
timerclear(&tv);
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
ompi_event_pipe_signalled = 0;
|
||||
#endif
|
||||
res = ompi_evsel->dispatch(ompi_evbase, &tv);
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
ompi_event_pipe_signalled = 1;
|
||||
#endif
|
||||
if (res == -1) {
|
||||
@ -520,7 +520,7 @@ ompi_event_add_i(struct ompi_event *ev, struct timeval *tv)
|
||||
rc = (ompi_evsel->add(ompi_evbase, ev));
|
||||
}
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
if(ompi_event_pipe_signalled == 0) {
|
||||
unsigned char byte = 0;
|
||||
if(write(ompi_event_pipe[1], &byte, 1) != 1)
|
||||
@ -557,7 +557,7 @@ int ompi_event_del_i(struct ompi_event *ev)
|
||||
rc = (ompi_evsel->del(ompi_evbase, ev));
|
||||
}
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
if(ompi_event_pipe_signalled == 0) {
|
||||
unsigned char byte = 0;
|
||||
if(write(ompi_event_pipe[1], &byte, 1) != 1)
|
||||
|
@ -228,7 +228,7 @@ void mca_io_base_request_return(ompi_file_t *file)
|
||||
OMPI_THREAD_UNLOCK(&file->f_io_requests_lock);
|
||||
}
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
static volatile bool thread_running = false;
|
||||
static volatile bool thread_done = false;
|
||||
static ompi_thread_t progress_thread;
|
||||
@ -255,14 +255,14 @@ request_progress_thread(ompi_object_t *arg)
|
||||
|
||||
return NULL;
|
||||
}
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
#endif /* OMPI_ENABLE_PROGRESS_THREADS */
|
||||
|
||||
void
|
||||
mca_io_base_request_progress_init()
|
||||
{
|
||||
mca_io_base_request_num_pending = 0;
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
thread_running = false;
|
||||
thread_done = false;
|
||||
|
||||
@ -272,14 +272,14 @@ mca_io_base_request_progress_init()
|
||||
|
||||
progress_thread.t_run = request_progress_thread;
|
||||
progress_thread.t_arg = NULL;
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
#endif /* OMPI_ENABLE_PROGRESS_THREADS */
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
mca_io_base_request_progress_add()
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
/* if we don't have a progress thread, make us have a progress
|
||||
thread */
|
||||
if (! thread_running) {
|
||||
@ -290,13 +290,13 @@ mca_io_base_request_progress_add()
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&progress_mutex);
|
||||
}
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
#endif /* OMPI_ENABLE_PROGRESS_THREADS */
|
||||
|
||||
OMPI_THREAD_ADD32(&mca_io_base_request_num_pending, 1);
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
ompi_condition_signal(&progress_cond);
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
#endif /* OMPI_ENABLE_PROGRESS_THREADS */
|
||||
}
|
||||
|
||||
|
||||
@ -310,7 +310,7 @@ mca_io_base_request_progress_del()
|
||||
void
|
||||
mca_io_base_request_progress_fini()
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
void *ret;
|
||||
|
||||
/* make the helper thread die */
|
||||
@ -324,5 +324,5 @@ mca_io_base_request_progress_fini()
|
||||
OBJ_DESTRUCT(&progress_thread);
|
||||
OBJ_DESTRUCT(&progress_cond);
|
||||
OBJ_DESTRUCT(&progress_mutex);
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
#endif /* OMPI_ENABLE_PROGRESS_THREADS */
|
||||
}
|
||||
|
@ -424,7 +424,7 @@ mca_oob_t* mca_oob_tcp_component_init(int* priority, bool *allow_multi_user_thre
|
||||
{
|
||||
*priority = 1;
|
||||
*allow_multi_user_threads = true;
|
||||
*have_hidden_threads = OMPI_HAVE_THREADS;
|
||||
*have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS;
|
||||
|
||||
|
||||
/* are there any interfaces? */
|
||||
|
@ -55,7 +55,7 @@ static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg)
|
||||
|
||||
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
OMPI_THREAD_LOCK(&msg->msg_lock);
|
||||
while(msg->msg_complete == false) {
|
||||
if(ompi_event_progress_thread()) {
|
||||
@ -97,7 +97,7 @@ int mca_oob_tcp_msg_timedwait(mca_oob_tcp_msg_t* msg, int* rc, struct timespec*
|
||||
uint32_t usecs = abstime->tv_nsec * 1000;
|
||||
gettimeofday(&tv,NULL);
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
OMPI_THREAD_LOCK(&msg->msg_lock);
|
||||
while(msg->msg_complete == false &&
|
||||
((uint32_t)tv.tv_sec <= secs ||
|
||||
|
@ -74,7 +74,7 @@ internal_bproc_vexecmove_io(int nnodes, int *nodes, int *pids,
|
||||
char ***env, int *envc, int offset);
|
||||
|
||||
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
static void spawn_procs_callback(int fd, short flags, void *data);
|
||||
#endif
|
||||
|
||||
@ -84,7 +84,7 @@ mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
|
||||
mca_ns_base_jobid_t jobid,
|
||||
ompi_list_t *schedlist)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
spawn_procs_data_t data;
|
||||
struct timeval tv;
|
||||
struct ompi_event ev;
|
||||
@ -125,7 +125,7 @@ mca_pcm_bproc_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
|
||||
}
|
||||
|
||||
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
static void
|
||||
spawn_procs_callback(int fd, short flags, void *data)
|
||||
{
|
||||
|
@ -79,7 +79,7 @@ static int internal_spawn_proc(mca_pcm_rsh_module_t *me,
|
||||
static void internal_wait_cb(pid_t pid, int status, void *data);
|
||||
static int internal_start_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
|
||||
mca_ns_base_jobid_t jobid, ompi_list_t *schedlist);
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
static void spawn_procs_callback(int fd, short flags, void *data);
|
||||
#endif
|
||||
|
||||
@ -87,7 +87,7 @@ int
|
||||
mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
|
||||
mca_ns_base_jobid_t jobid, ompi_list_t *schedlist)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
spawn_procs_data_t data;
|
||||
struct timeval tv;
|
||||
struct ompi_event ev;
|
||||
@ -127,7 +127,7 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
|
||||
}
|
||||
|
||||
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
static void
|
||||
spawn_procs_callback(int fd, short flags, void *data)
|
||||
{
|
||||
|
@ -203,10 +203,10 @@ mca_ptl_elan_component_init (int *num_ptls,
|
||||
|
||||
*num_ptls = 0;
|
||||
*allow_multi_user_threads = true;
|
||||
*have_hidden_threads = OMPI_HAVE_THREADS;
|
||||
*have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS;
|
||||
|
||||
/* XXX: Set the global variable to be true for threading */
|
||||
if (OMPI_HAVE_THREADS)
|
||||
if (OMPI_ENABLE_PROGRESS_THREADS)
|
||||
ompi_set_using_threads(true);
|
||||
|
||||
ompi_free_list_init (&(elan_mp->elan_recv_frags_free),
|
||||
|
@ -175,11 +175,11 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules,
|
||||
*num_ptl_modules = 0;
|
||||
mca_ptl_ib_component.ib_num_hcas=0;
|
||||
*allow_multi_user_threads = true;
|
||||
*have_hidden_threads = OMPI_HAVE_THREADS;
|
||||
*have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS;
|
||||
|
||||
/* need to set ompi_using_threads() as ompi_event_init()
|
||||
* will spawn a thread if supported */
|
||||
if(OMPI_HAVE_THREADS) {
|
||||
if(OMPI_ENABLE_PROGRESS_THREADS) {
|
||||
ompi_set_using_threads(true);
|
||||
}
|
||||
|
||||
|
@ -139,7 +139,7 @@ struct mca_ptl_mx_module_t {
|
||||
mx_endpoint_t mx_endpoint; /**< endpoint */
|
||||
mx_endpoint_addr_t mx_endpoint_addr; /**< endpoint address */
|
||||
volatile int32_t mx_recvs_posted; /**< count of posted match fragments */
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
ompi_thread_t mx_thread; /**< thread for progressing outstanding requests */
|
||||
#endif
|
||||
};
|
||||
|
@ -116,7 +116,7 @@ int mca_ptl_mx_module_init(void)
|
||||
* Thread to progress outstanding requests.
|
||||
*/
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
|
||||
static void* mca_ptl_mx_thread(ompi_object_t *arg)
|
||||
{
|
||||
@ -346,7 +346,7 @@ void mca_ptl_mx_enable()
|
||||
for(i=0; i<mca_ptl_mx_component.mx_num_ptls; i++) {
|
||||
mca_ptl_mx_module_t* ptl = mca_ptl_mx_component.mx_ptls[i];
|
||||
ptl->mx_enabled = true;
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
/* create a thread to progress requests */
|
||||
OBJ_CONSTRUCT(&ptl->mx_thread, ompi_thread_t);
|
||||
ptl->mx_thread.t_run = mca_ptl_mx_thread;
|
||||
@ -365,7 +365,7 @@ void mca_ptl_mx_disable(void)
|
||||
for(i=0; i<mca_ptl_mx_component.mx_num_ptls; i++) {
|
||||
mca_ptl_mx_module_t* ptl = mca_ptl_mx_component.mx_ptls[i];
|
||||
ptl->mx_enabled = false;
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
mx_wakeup(ptl->mx_endpoint);
|
||||
ompi_thread_join(&ptl->mx_thread, NULL);
|
||||
#endif
|
||||
@ -380,7 +380,7 @@ int mca_ptl_mx_finalize(struct mca_ptl_base_module_t* ptl)
|
||||
{
|
||||
mca_ptl_mx_module_t* mx_ptl = (mca_ptl_mx_module_t*)ptl;
|
||||
mx_ptl->mx_enabled = false;
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
mx_wakeup(mx_ptl->mx_endpoint);
|
||||
ompi_thread_join(&mx_ptl->mx_thread, NULL);
|
||||
#endif
|
||||
|
@ -184,7 +184,7 @@ int mca_ptl_sm_add_procs_same_base_addr(
|
||||
if( 0 == strncmp(ompi_system_info.nodename,
|
||||
(char *)(sm_proc_info[proc]),len) ) {
|
||||
struct mca_ptl_base_peer_t *peer = peers[proc];
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
char path[PATH_MAX];
|
||||
/* int flags; */
|
||||
#endif
|
||||
@ -198,7 +198,7 @@ int mca_ptl_sm_add_procs_same_base_addr(
|
||||
peer->peer_smp_rank=n_local_procs+
|
||||
mca_ptl_sm_component.num_smp_procs;
|
||||
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
sprintf(path, "%s/sm_fifo.%d", ompi_process_info.job_session_dir,
|
||||
procs[proc]->proc_name.vpid);
|
||||
peer->fifo_fd = open(path, O_WRONLY);
|
||||
|
@ -53,7 +53,7 @@ extern mca_ptl_sm_module_resource_t mca_ptl_sm_module_resource;
|
||||
#define SM_CONNECTED_SAME_BASE_ADDR 2
|
||||
#define SM_CONNECTED_DIFFERENT_BASE_ADDR 3
|
||||
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
#define DATA (char)0
|
||||
#define DONE (char)1
|
||||
#endif
|
||||
@ -126,7 +126,7 @@ struct mca_ptl_sm_component_t {
|
||||
acked */
|
||||
|
||||
struct mca_ptl_base_peer_t **sm_peers;
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
char sm_fifo_path[PATH_MAX]; /**< path to fifo used to signal this process */
|
||||
int sm_fifo_fd; /**< file descriptor corresponding to opened fifo */
|
||||
ompi_thread_t sm_fifo_thread;
|
||||
@ -421,11 +421,11 @@ typedef struct mca_ptl_sm_exchange{
|
||||
char host_name[MCA_PTL_SM_MAX_HOSTNAME_LEN];
|
||||
}mca_ptl_sm_exchange_t;
|
||||
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
void mca_ptl_sm_component_event_thread(ompi_object_t*);
|
||||
#endif
|
||||
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
#define MCA_PTL_SM_SIGNAL_PEER(peer) \
|
||||
{ \
|
||||
unsigned char cmd = DATA; \
|
||||
|
@ -209,7 +209,7 @@ int mca_ptl_sm_component_close(void)
|
||||
unlink(mca_ptl_sm_component.mmap_file->map_path);
|
||||
}
|
||||
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
/* close/cleanup fifo create for event notification */
|
||||
if(mca_ptl_sm_component.sm_fifo_fd >= 0) {
|
||||
/* write a done message down the pipe */
|
||||
@ -246,7 +246,7 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init(
|
||||
|
||||
*num_ptls = 0;
|
||||
*allow_multi_user_threads = true;
|
||||
*have_hidden_threads = OMPI_HAVE_THREADS;
|
||||
*have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS;
|
||||
|
||||
/* lookup shared memory pool */
|
||||
mca_ptl_sm_component.sm_mpool =
|
||||
@ -258,7 +258,7 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init(
|
||||
if(mca_ptl_sm_component_exchange() != OMPI_SUCCESS)
|
||||
return NULL;
|
||||
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
/* create a named pipe to receive events */
|
||||
sprintf(mca_ptl_sm_component.sm_fifo_path,
|
||||
"%s/sm_fifo.%d", ompi_process_info.job_session_dir,
|
||||
@ -335,7 +335,7 @@ int mca_ptl_sm_component_control(int param, void* value, size_t size)
|
||||
* SM component progress.
|
||||
*/
|
||||
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
void mca_ptl_sm_component_event_thread(ompi_object_t* thread)
|
||||
{
|
||||
while(1) {
|
||||
|
@ -17,7 +17,7 @@
|
||||
#ifndef MCA_PTL_SM_PEER_H
|
||||
#define MCA_PTL_SM_PEER_H
|
||||
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
#include "event/event.h"
|
||||
#endif
|
||||
|
||||
@ -32,7 +32,7 @@ struct mca_ptl_base_peer_t {
|
||||
* SMP specfic data structures. */
|
||||
int peer_smp_rank; /**< My peer's SMP process rank. Used for accessing
|
||||
* SMP specfic data structures. */
|
||||
#if OMPI_HAVE_THREADS == 1
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 1
|
||||
int fifo_fd; /**< pipe/fifo used to signal peer that data is queued */
|
||||
#endif
|
||||
};
|
||||
|
@ -486,7 +486,7 @@ mca_ptl_base_module_t** mca_ptl_tcp_component_init(int *num_ptl_modules,
|
||||
mca_ptl_base_module_t **ptls;
|
||||
*num_ptl_modules = 0;
|
||||
*allow_multi_user_threads = true;
|
||||
*have_hidden_threads = OMPI_HAVE_THREADS;
|
||||
*have_hidden_threads = OMPI_ENABLE_PROGRESS_THREADS;
|
||||
|
||||
ompi_free_list_init(&mca_ptl_tcp_component.tcp_send_frags,
|
||||
sizeof(mca_ptl_tcp_send_frag_t),
|
||||
|
@ -43,7 +43,7 @@ int MPI_Is_thread_main(int *flag)
|
||||
|
||||
/* Compare this thread ID to the main thread ID */
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_MPI_THREADS
|
||||
*flag = (int) ompi_thread_self_compare(ompi_mpi_main_thread);
|
||||
#else
|
||||
*flag = 1;
|
||||
|
@ -89,7 +89,7 @@ ompi_mpi_abort(struct ompi_communicator_t* comm,
|
||||
is actually dead. But just in case there are some
|
||||
race conditions, keep progressing the event loop until
|
||||
we get killed */
|
||||
if (!OMPI_HAVE_THREADS || ompi_event_progress_thread()) {
|
||||
if (!OMPI_ENABLE_PROGRESS_THREADS || ompi_event_progress_thread()) {
|
||||
ompi_event_loop(0);
|
||||
} else {
|
||||
sleep(1000);
|
||||
|
@ -57,7 +57,7 @@ int ompi_mpi_finalize(void)
|
||||
mca_ns_base_jobid_t my_jobid;
|
||||
|
||||
ompi_mpi_finalized = true;
|
||||
#if OMPI_HAVE_THREADS == 0
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
ompi_progress_events(OMPI_EVLOOP_NONBLOCK);
|
||||
#endif
|
||||
|
||||
|
@ -337,7 +337,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
ompi_mpi_thread_provided = *provided;
|
||||
ompi_mpi_thread_multiple = (ompi_mpi_thread_provided ==
|
||||
MPI_THREAD_MULTIPLE);
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_MPI_THREADS
|
||||
ompi_mpi_main_thread = ompi_thread_get_self();
|
||||
#else
|
||||
ompi_mpi_main_thread = NULL;
|
||||
@ -357,7 +357,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
goto error;
|
||||
}
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS /* BWB - XXX - FIXME - is this actually correct? */
|
||||
/* setup I/O forwarding */
|
||||
if(ompi_process_info.seed == false) {
|
||||
if (OMPI_SUCCESS != (ret = ompi_mpi_init_io())) {
|
||||
@ -370,7 +370,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
/* Wait for everyone to initialize */
|
||||
mca_oob_barrier();
|
||||
|
||||
#if OMPI_HAVE_THREADS == 0
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
ompi_progress_events(OMPI_EVLOOP_NONBLOCK);
|
||||
#endif
|
||||
|
||||
|
@ -52,7 +52,7 @@ int ompi_request_test_any(
|
||||
/* Only fall through here if we found nothing */
|
||||
if(num_requests_null_inactive != count) {
|
||||
*completed = false;
|
||||
#if OMPI_HAVE_THREADS == 0
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
ompi_progress();
|
||||
#endif
|
||||
} else {
|
||||
@ -90,7 +90,7 @@ int ompi_request_test_all(
|
||||
|
||||
if (num_completed != count) {
|
||||
*completed = false;
|
||||
#if OMPI_HAVE_THREADS == 0
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
ompi_progress();
|
||||
#endif
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -23,7 +23,7 @@ int ompi_request_wait_any(
|
||||
int *index,
|
||||
ompi_status_public_t * status)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
int c;
|
||||
#endif
|
||||
size_t i=0, num_requests_null_inactive=0;
|
||||
@ -32,7 +32,7 @@ int ompi_request_wait_any(
|
||||
ompi_request_t **rptr=NULL;
|
||||
ompi_request_t *request=NULL;
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
/* poll for completion */
|
||||
ompi_atomic_mb();
|
||||
for (c = 0; completed < 0 && c < ompi_request_poll_iterations; c++) {
|
||||
@ -83,9 +83,10 @@ int ompi_request_wait_any(
|
||||
ompi_request_waiting--;
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
finished:
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
#endif /* OMPI_ENABLE_PROGRESS_THREADS */
|
||||
|
||||
if(num_requests_null_inactive == count) {
|
||||
*index = MPI_UNDEFINED;
|
||||
if (MPI_STATUS_IGNORE != status) {
|
||||
|
@ -223,7 +223,7 @@ static inline int ompi_request_test(
|
||||
return request->req_fini(rptr);
|
||||
} else {
|
||||
*completed = false;
|
||||
#if OMPI_HAVE_THREADS == 0
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
ompi_progress();
|
||||
#endif
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -61,7 +61,7 @@ int ompi_init(int argc, char *argv[])
|
||||
* otherwise.
|
||||
*/
|
||||
|
||||
ompi_set_using_threads(OMPI_HAVE_THREADS);
|
||||
ompi_set_using_threads(OMPI_HAVE_THREAD_SUPPORT);
|
||||
|
||||
/* For malloc debugging */
|
||||
|
||||
|
@ -23,8 +23,16 @@
|
||||
|
||||
|
||||
static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE;
|
||||
static ompi_lock_t progress_lock;
|
||||
|
||||
|
||||
int
|
||||
ompi_progress_init(void)
|
||||
{
|
||||
ompi_atomic_init(&progress_lock, OMPI_ATOMIC_UNLOCKED);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
void ompi_progress_events(int flag)
|
||||
{
|
||||
ompi_progress_event_flag = flag;
|
||||
@ -35,7 +43,22 @@ void ompi_progress(void)
|
||||
{
|
||||
/* progress any outstanding communications */
|
||||
int ret, events = 0;
|
||||
#if OMPI_HAVE_THREADS == 0
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
/* NOTE: BWB - XXX - FIXME: Currently, there are some progress functions
|
||||
(the event library, for one) that are not reentrant. It is not known
|
||||
if the PTL progress functions are all reentrant or not. The I/O
|
||||
code should all be reentrant. Because of the uncertainty, we are
|
||||
preventing more than one thread of execution from progressing the
|
||||
via ompi_progress (which is usually only called when there are
|
||||
no PROGRESS_THREADS running). This should be made more fine-grained
|
||||
at some point in the future. */
|
||||
if (! ompi_atomic_trylock(&progress_lock)) {
|
||||
/* someone is already progressing - return */
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
if (ompi_progress_event_flag != 0) {
|
||||
ret = ompi_event_loop(ompi_progress_event_flag);
|
||||
if (ret > 0) {
|
||||
@ -54,6 +77,9 @@ void ompi_progress(void)
|
||||
events += ret;
|
||||
}
|
||||
|
||||
/* release the lock before yielding, for obvious reasons */
|
||||
ompi_atomic_unlock(&progress_lock);
|
||||
|
||||
#if 1
|
||||
/* TSW - disable this until can validate that it doesn't impact SMP
|
||||
* performance
|
||||
|
@ -18,6 +18,8 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
OMPI_DECLSPEC extern int ompi_progress_init(void);
|
||||
|
||||
OMPI_DECLSPEC extern void ompi_progress_events(int);
|
||||
|
||||
OMPI_DECLSPEC extern void ompi_progress(void);
|
||||
|
@ -157,6 +157,13 @@ int ompi_rte_init(ompi_cmd_line_t *cmd_line, bool *allow_multi_user_threads, boo
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* And the progress code
|
||||
*/
|
||||
if (OMPI_SUCCESS != (ret = ompi_progress_init())) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Internal startup
|
||||
*/
|
||||
|
@ -145,7 +145,7 @@ static void register_sig_event(void);
|
||||
static int unregister_callback(pid_t pid);
|
||||
void ompi_rte_wait_signal_callback(int fd, short event, void *arg);
|
||||
static pid_t internal_waitpid(pid_t pid, int *status, int options);
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
static void internal_waitpid_callback(int fd, short event, void *arg);
|
||||
#endif
|
||||
|
||||
@ -252,7 +252,7 @@ ompi_rte_waitpid(pid_t wpid, int *status, int options)
|
||||
ompi_condition_timedwait(data->cond,
|
||||
cond_mutex,
|
||||
&spintime);
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
if (ompi_event_progress_thread()) {
|
||||
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
|
||||
}
|
||||
@ -272,7 +272,7 @@ ompi_rte_waitpid(pid_t wpid, int *status, int options)
|
||||
pthreads gets really unhappy when we pull the rug out
|
||||
from under it. Yes, it's spinning. No, we won't spin
|
||||
for long */
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
if (ompi_event_progress_thread()) {
|
||||
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
|
||||
}
|
||||
@ -548,7 +548,7 @@ register_sig_event(void)
|
||||
|
||||
/* it seems that signal events are only added to the queue at the next
|
||||
progress call. So push the event library */
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
if (ompi_event_progress_thread()) {
|
||||
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
|
||||
}
|
||||
@ -564,7 +564,7 @@ register_sig_event(void)
|
||||
static pid_t
|
||||
internal_waitpid(pid_t pid, int *status, int options)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
waitpid_callback_data_t data;
|
||||
struct timeval tv;
|
||||
struct ompi_event ev;
|
||||
@ -606,7 +606,7 @@ internal_waitpid(pid_t pid, int *status, int options)
|
||||
}
|
||||
|
||||
|
||||
#if OMPI_HAVE_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT && OMPI_THREADS_HAVE_DIFFERENT_PIDS
|
||||
static void
|
||||
internal_waitpid_callback(int fd, short event, void *arg)
|
||||
{
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#if OMPI_HAVE_POSIX_THREADS
|
||||
#if OMPI_HAVE_POSIX_THREADS && OMPI_ENABLE_PROGRESS_THREADS
|
||||
#include "condition_pthread.h"
|
||||
#else
|
||||
#include "condition_spinlock.h"
|
||||
|
@ -17,7 +17,7 @@
|
||||
#include "threads/mutex.h"
|
||||
#include "threads/condition.h"
|
||||
|
||||
#if OMPI_HAVE_POSIX_THREADS
|
||||
#if OMPI_HAVE_POSIX_THREADS && OMPI_ENABLE_PROGRESS_THREADS
|
||||
|
||||
static void ompi_condition_construct(ompi_condition_t *c)
|
||||
{
|
||||
|
@ -17,7 +17,7 @@
|
||||
#include "threads/mutex.h"
|
||||
#include "threads/condition.h"
|
||||
|
||||
#if (OMPI_HAVE_THREADS == 0)
|
||||
#if (OMPI_HAVE_POSIX_THREADS == 0) || (OMPI_ENABLE_PROGRESS_THREADS == 0)
|
||||
|
||||
static void ompi_condition_construct(ompi_condition_t *c)
|
||||
{
|
||||
|
@ -19,7 +19,7 @@
|
||||
/*
|
||||
* Default to a safe value
|
||||
*/
|
||||
bool ompi_uses_threads = (bool) OMPI_HAVE_THREADS;
|
||||
bool ompi_uses_threads = (bool) OMPI_HAVE_THREAD_SUPPORT;
|
||||
|
||||
|
||||
#ifdef WIN32
|
||||
|
@ -148,7 +148,7 @@ static inline bool ompi_using_threads(void)
|
||||
*/
|
||||
static inline bool ompi_set_using_threads(bool have)
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
ompi_uses_threads = have;
|
||||
#else
|
||||
ompi_uses_threads = false;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user