1
1

Some cleanup to remove calls to opal_progress when running with orte progress threads, and to ensure that all orte-related events are in the orte event base.

This commit was SVN r26591.
Этот коммит содержится в:
Ralph Castain 2012-06-11 19:59:53 +00:00
родитель 75e66ad51e
Коммит 269cb2b8d9
16 изменённых файлов: 127 добавлений и 94 удалений

Просмотреть файл

@ -58,7 +58,7 @@
/* Local static variables */
static opal_mutex_t ompi_dpm_port_mutex;
static orte_rml_tag_t next_tag;
static bool recv_completed;
static bool waiting_for_recv = false;
static opal_buffer_t *cabuf=NULL;
static orte_process_name_t carport;
@ -205,14 +205,12 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
/* send the request - doesn't have to include any data */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, nbuf, ORTE_RML_TAG_COLL_ID_REQ, 0, rml_cbfunc, NULL);
/* wait for the id */
recv_completed = false;
waiting_for_recv = true;
cabuf = OBJ_NEW(opal_buffer_t);
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_ID,
ORTE_RML_NON_PERSISTENT, recv_cb, NULL);
/* wait for response */
while (!recv_completed) {
opal_progress();
}
ORTE_WAIT_FOR_COMPLETION(waiting_for_recv);
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(cabuf, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
@ -232,14 +230,12 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
rc = orte_rml.send_buffer_nb(&port, nbuf, tag, 0, rml_cbfunc, NULL);
} else {
/* wait to recv the collective id */
recv_completed = false;
waiting_for_recv = true;
cabuf = OBJ_NEW(opal_buffer_t);
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT, recv_cb, NULL);
/* wait for response */
while (!recv_completed) {
opal_progress();
}
ORTE_WAIT_FOR_COMPLETION(waiting_for_recv);
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(cabuf, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
@ -316,13 +312,11 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm:orte:connect_accept waiting for response",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
recv_completed = false;
waiting_for_recv = true;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT, recv_cb, NULL);
/* wait for response */
while (!recv_completed) {
opal_progress();
}
ORTE_WAIT_FOR_COMPLETION(waiting_for_recv);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm:orte:connect_accept got data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -333,13 +327,11 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
"%s dpm:orte:connect_accept recving first",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup to recv */
recv_completed = false;
waiting_for_recv = true;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT, recv_cb, NULL);
/* wait for response */
while (!recv_completed) {
opal_progress();
}
ORTE_WAIT_FOR_COMPLETION(waiting_for_recv);
/* now send our info */
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm:orte:connect_accept sending info to %s",
@ -1634,6 +1626,6 @@ static void recv_cb(int status, orte_process_name_t* sender,
carport.vpid = sender->vpid;
/* flag complete */
recv_completed = true;
waiting_for_recv = false;
}

Просмотреть файл

@ -37,7 +37,6 @@
#include "opal/util/output.h"
#include "opal/runtime/opal.h"
#include "opal/runtime/opal_cr.h"
#include "opal/runtime/opal_progress.h"
#include "orte/mca/rml/base/base.h"
#include "orte/mca/routed/base/base.h"
@ -272,9 +271,7 @@ int orte_ess_base_app_setup(void)
error = "orte barrier";
goto error;
}
while (coll.active) {
opal_progress(); /* block in progress pending events */
}
ORTE_WAIT_FOR_COMPLETION(coll.active);
OBJ_DESTRUCT(&coll);
}
@ -339,14 +336,14 @@ int orte_ess_base_app_finalize(void)
* to prevent the abort file from being created. This allows the
* session directory tree to cleanly be eliminated.
*/
static bool sync_recvd;
static bool sync_waiting = false;
static void report_sync(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
/* flag as complete */
sync_recvd = true;
sync_waiting = false;
}
void orte_ess_base_app_abort(int status, bool report)
@ -379,14 +376,12 @@ void orte_ess_base_app_abort(int status, bool report)
* gets serviced by the event library on the orted prior to the
* process exiting
*/
sync_recvd = false;
sync_waiting = true;
if (ORTE_SUCCESS != orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ABORT,
ORTE_RML_NON_PERSISTENT, report_sync, NULL)) {
exit(status);
}
while (!sync_recvd) {
opal_progress();
}
ORTE_WAIT_FOR_COMPLETION(sync_waiting);
}
/* - Clean out the global structures

Просмотреть файл

@ -636,6 +636,7 @@ static int rte_init(void)
}
}
#if !ORTE_ENABLE_PROGRESS_THREADS
/* We actually do *not* want an HNP to voluntarily yield() the
processor more than necessary. Orterun already blocks when
it is doing nothing, so it doesn't use any more CPU cycles than
@ -655,6 +656,7 @@ static int rte_init(void)
problematic in some scenarios (e.g., COMM_SPAWN, BTL's that
require OOB messages for wireup, etc.). */
opal_progress_set_yield_when_idle(false);
#endif
return ORTE_SUCCESS;

Просмотреть файл

@ -30,6 +30,12 @@
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#if HAVE_TIME_H
#include <time.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
@ -279,7 +285,17 @@ int orte_filem_rsh_module_finalize(void)
*/
if( orte_filem_base_is_active ) {
while(0 < opal_list_get_size(&work_pool_active) ) {
#if ORTE_ENABLE_PROGRESS_THREADS
{
/* provide a very short quiet period so we
* don't hammer the cpu while we wait
*/
struct timespec tp = {0, 100};
nanosleep(&tp, NULL);
}
#else
opal_progress();
#endif
}
}

Просмотреть файл

@ -50,8 +50,10 @@ static bool event_started = false;
void mca_oob_ud_event_start_monitor (mca_oob_ud_device_t *device)
{
if (!event_started) {
#if !ORTE_ENABLE_PROGRESS_THREADS
opal_progress_event_users_increment ();
opal_event_set (opal_event_base, &device->event, device->ib_channel->fd,
#endif
opal_event_set (orte_event_base, &device->event, device->ib_channel->fd,
OPAL_EV_READ, mca_oob_ud_event_dispatch, (void *) device);
opal_event_add (&device->event, NULL);
event_started = true;
@ -61,7 +63,9 @@ void mca_oob_ud_event_start_monitor (mca_oob_ud_device_t *device)
void mca_oob_ud_event_stop_monitor (mca_oob_ud_device_t *device)
{
if (event_started) {
#if !ORTE_ENABLE_PROGRESS_THREADS
opal_progress_event_users_decrement ();
#endif
opal_event_del (&device->event);
mca_oob_ud_stop_events (device);
event_started = false;
@ -385,7 +389,7 @@ void mca_oob_ud_event_queue_completed (mca_oob_ud_req_t *req)
mca_oob_ud_req_append_to_list (req, &mca_oob_ud_component.ud_event_queued_reqs);
if (!opal_event_evtimer_pending (&mca_oob_ud_component.ud_complete_event, &now)) {
opal_event_evtimer_set (opal_event_base, &mca_oob_ud_component.ud_complete_event,
opal_event_evtimer_set (orte_event_base, &mca_oob_ud_component.ud_complete_event,
mca_oob_ud_complete_dispatch, NULL);
opal_event_add (&mca_oob_ud_component.ud_complete_event, &now);
}

Просмотреть файл

@ -363,7 +363,7 @@ void mca_oob_ud_peer_start_timer (mca_oob_ud_peer_t *peer)
if (!peer->peer_timer.active && opal_list_get_size (&peer->peer_flying_messages)) {
peer->peer_timer.active = true;
opal_event_evtimer_set (opal_event_base, &peer->peer_timer.event,
opal_event_evtimer_set (orte_event_base, &peer->peer_timer.event,
mca_oob_ud_peer_msg_timeout, (void *) peer);
opal_event_evtimer_add (&peer->peer_timer.event, &peer->peer_timer.value);
}

Просмотреть файл

@ -71,7 +71,7 @@ static void mca_oob_ud_req_destruct (mca_oob_ud_req_t *req)
void mca_oob_ud_req_timer_set (mca_oob_ud_req_t *req, const struct timeval *timeout,
int max_tries, void (*cb)(evutil_socket_t, short, void *))
{
opal_event_evtimer_set (opal_event_base, &req->timer.event, cb, (void *) req);
opal_event_evtimer_set (orte_event_base, &req->timer.event, cb, (void *) req);
req->timer.value.tv_sec = timeout->tv_sec;
req->timer.value.tv_usec = timeout->tv_usec;
opal_event_evtimer_add (&req->timer.event, &req->timer.value);

Просмотреть файл

@ -33,7 +33,6 @@
#include <ctype.h>
#include "opal/util/argv.h"
#include "opal/runtime/opal_progress.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/dss/dss.h"
#include "opal/mca/base/mca_base_param.h"

Просмотреть файл

@ -63,7 +63,6 @@
#include "opal/util/opal_environ.h"
#include "opal/util/basename.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/runtime/opal_progress.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
@ -544,7 +543,7 @@ static int plm_tm_connect(void)
{
int ret;
struct tm_roots tm_root;
int count, progress;
int count;
/* try a couple times to connect - might get busy signals every
now and then */
@ -554,12 +553,27 @@ static int plm_tm_connect(void)
return ORTE_SUCCESS;
}
for (progress = 0 ; progress < 10 ; ++progress) {
opal_progress();
#if ORTE_ENABLE_PROGRESS_THREADS
{
/* provide a very short quiet period so we
* don't hammer the cpu while we wait
*/
struct timespec tp = {0, 100};
nanosleep(&tp, NULL);
#if HAVE_SCHED_YIELD
sched_yield();
#endif
}
#else
{
int progress;
for (progress = 0 ; progress < 10 ; ++progress) {
opal_progress();
#if HAVE_SCHED_YIELD
sched_yield();
#endif
}
}
}
return ORTE_ERR_RESOURCE_BUSY;

Просмотреть файл

@ -23,21 +23,14 @@
#include "orte/constants.h"
#include "orte/types.h"
#if HAVE_TIME_H
#include <time.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "opal/dss/dss.h"
#include "opal/runtime/opal_progress.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/routed/base/base.h"
@ -233,7 +226,7 @@ void orte_routed_base_coll_peers(orte_grpcomm_collective_t *coll,
}
static bool sync_recvd;
static bool sync_waiting = false;
static void report_sync(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
@ -242,7 +235,7 @@ static void report_sync(int status, orte_process_name_t* sender,
/* just copy the payload to the sync_buf */
opal_dss.copy_payload(orte_process_info.sync_buf, buffer);
/* flag as complete */
sync_recvd = true;
sync_waiting = false;
}
int orte_routed_base_register_sync(bool setup)
@ -304,7 +297,7 @@ int orte_routed_base_register_sync(bool setup)
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output,
"%s registering sync waiting for ack",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
sync_recvd = false;
sync_waiting = true;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SYNC,
ORTE_RML_NON_PERSISTENT, report_sync, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
@ -313,19 +306,7 @@ int orte_routed_base_register_sync(bool setup)
}
/* it is okay to block here as we are -not- in an event */
while (!sync_recvd) {
#if !ORTE_ENABLE_PROGRESS_THREADS
opal_progress();
#else
{
/* provide a very short quiet period so we
* don't hammer the cpu while we wait
*/
struct timespec tp = {0, 100};
nanosleep(&tp, NULL);
}
#endif
}
ORTE_WAIT_FOR_COMPLETION(sync_waiting);
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output,
"%s registering sync ack recvd",

Просмотреть файл

@ -1,10 +1,8 @@
/*
* Copyright (c) 2007-2011 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2012 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
@ -21,7 +19,6 @@
#include "opal/dss/dss.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/class/opal_bitmap.h"
#include "opal/runtime/opal_progress.h"
#include "opal/util/bit_ops.h"
#include "opal/util/output.h"
@ -87,7 +84,7 @@ static orte_process_name_t *lifeline=NULL;
static orte_process_name_t local_lifeline;
static int num_children;
static opal_list_t my_children;
static bool ack_recvd;
static bool ack_waiting = false;
static bool hnp_direct=true;
static int init(void)
@ -450,7 +447,7 @@ static void recv_ack(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
ack_recvd = true;
ack_waiting = false;
}
@ -617,13 +614,11 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
ack_waiting = true;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
while (!ack_recvd) {
opal_progress();
}
ORTE_WAIT_FOR_COMPLETION(ack_waiting);
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_binomial_init_routes: ack recvd",

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2007-2011 Los Alamos National Security, LLC.
* Copyright (c) 2007-2012 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
@ -22,7 +22,6 @@
#include "opal/dss/dss.h"
#include "opal/class/opal_hash_table.h"
#include "opal/class/opal_bitmap.h"
#include "opal/runtime/opal_progress.h"
#include "opal/util/bit_ops.h"
#include "opal/util/output.h"
@ -85,7 +84,7 @@ orte_routed_module_t orte_routed_cm_module = {
/* local globals */
static orte_process_name_t *lifeline=NULL;
static orte_process_name_t local_lifeline;
static bool ack_recvd;
static bool ack_waiting = false;
static int init(void)
@ -411,7 +410,7 @@ static void recv_ack(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
ack_recvd = true;
ack_waiting = false;
}
@ -627,13 +626,11 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
ack_waiting = true;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
while (!ack_recvd) {
opal_progress();
}
ORTE_WAIT_FOR_COMPLETION(ack_waiting);
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_init_routes: ack recvd",

Просмотреть файл

@ -83,7 +83,7 @@ orte_routed_module_t orte_routed_debruijn_module = {
static orte_process_name_t *lifeline=NULL;
static orte_process_name_t local_lifeline;
static opal_list_t my_children;
static bool ack_recvd;
static bool ack_waiting = false;
static bool hnp_direct=true;
static int log_nranks;
static int log_npeers;
@ -433,7 +433,7 @@ static void recv_ack(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
ack_recvd = true;
ack_waiting = false;
}
@ -599,7 +599,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
ack_waiting = true;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);

Просмотреть файл

@ -81,7 +81,14 @@ const char orte_version_string[] = ORTE_IDENT_STRING;
#if !ORTE_DISABLE_FULL_SUPPORT && ORTE_ENABLE_PROGRESS_THREADS
static void ignore_callback(int fd, short args, void *cbdata)
{
/* nothing to do here */
if (NULL == cbdata) {
/* nothing to do here */
} else {
opal_event_t *ev = (opal_event_t*)cbdata;
struct timeval tv = {1, 0};
opal_output(0, "TIMER FIRED");
opal_event_evtimer_add(ev, &tv);
}
}
#endif
@ -151,6 +158,16 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags)
*/
opal_event_set(orte_event_base, &orte_finalize_event, -1, OPAL_EV_WRITE, ignore_callback, NULL);
opal_event_set_priority(&orte_finalize_event, ORTE_ERROR_PRI);
{
/* seems strange, but wake us up once a second just so we can check for new events */
opal_event_t *ev;
struct timeval tv = {1,0};
ev = opal_event_alloc();
opal_event_evtimer_set(orte_event_base,
ev, ignore_callback, ev);
opal_event_set_priority(ev, ORTE_INFO_PRI);
opal_event_evtimer_add(ev, &tv);
}
/* construct the thread object */
OBJ_CONSTRUCT(&orte_progress_thread, opal_thread_t);
/* fork off a thread to progress it */

Просмотреть файл

@ -34,7 +34,10 @@
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_TIME_H
#if HAVE_TIME_H
#include <time.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
@ -42,6 +45,7 @@
#include "opal/util/output.h"
#include "opal/sys/atomic.h"
#include "opal/mca/event/event.h"
#include "opal/runtime/opal_progress.h"
#include "orte/types.h"
#include "orte/mca/rml/rml_types.h"
@ -107,6 +111,33 @@ typedef struct {
} orte_timer_t;
OBJ_CLASS_DECLARATION(orte_timer_t);
/* In a few places, we need to barrier until something happens
* that changes a flag to indicate we can release - e.g., waiting
* for a specific message to arrive. If no progress thread is running,
* we cycle across opal_progress - however, if a progress thread
* is active, then we need to just nanosleep to avoid cross-thread
* confusion
*/
#if ORTE_ENABLE_PROGRESS_THREADS
#define ORTE_WAIT_FOR_COMPLETION(flg) \
do { \
while ((flg)) { \
/* provide a very short quiet period so we \
* don't hammer the cpu while we wait \
*/ \
struct timespec tp = {0, 100}; \
nanosleep(&tp, NULL); \
} \
}while(0);
#else
#define ORTE_WAIT_FOR_COMPLETION(flg) \
do { \
while ((flg)) { \
opal_progress(); \
} \
}while(0);
#endif
/**
* In a number of places within the code, we want to setup a timer
* to detect when some procedure failed to complete. For example,

Просмотреть файл

@ -784,16 +784,6 @@ int orterun(int argc, char *argv[])
}
}
/* Change the default behavior of libevent such that we want to
continually block rather than blocking for the default timeout
and then looping around the progress engine again. There
should be nothing in the orted that cannot block in libevent
until "something" happens (i.e., there's no need to keep
cycling through progress because the only things that should
happen will happen in libevent). This is a minor optimization,
but what the heck... :-) */
opal_progress_set_event_flag(OPAL_EVLOOP_ONCE);
/* If we have a prefix, then modify the PATH and
LD_LIBRARY_PATH environment variables in our copy. This
will ensure that any locally-spawned children will