From be253609acc59cb49ddc1abe525b18e7d6aa7d5b Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Thu, 18 Nov 2004 01:58:30 +0000 Subject: [PATCH] changes to support running a non-threaded build on oversubscribed processors This commit was SVN r3613. --- src/event/event.c | 6 ++++-- src/mca/oob/tcp/oob_tcp_msg.c | 4 ++-- src/mca/pml/teg/src/pml_teg_progress.c | 6 +++++- src/mca/ptl/mx/ptl_mx_component.c | 7 +++++-- src/mca/ptl/sm/src/ptl_sm_component.c | 13 ++++++++----- src/runtime/ompi_progress.c | 21 +++++++++++++++++++-- 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/event/event.c b/src/event/event.c index 821e435031..8fad476921 100644 --- a/src/event/event.c +++ b/src/event/event.c @@ -191,7 +191,7 @@ static int ompi_timeout_next(struct timeval *tv) static void* ompi_event_run(ompi_object_t* arg) { int rc = ompi_event_loop(0); - assert(rc == 0); + assert(rc >= 0); return NULL; } @@ -355,6 +355,7 @@ ompi_event_loop(int flags) { struct timeval tv; int res, done; + int num_active = 0; if (ompi_event_inited == false) return(0); @@ -421,6 +422,7 @@ ompi_event_loop(int flags) } if (TAILQ_FIRST(&ompi_activequeue)) { + num_active++; ompi_event_process_active(); if (flags & OMPI_EVLOOP_ONCE) done = 1; @@ -434,7 +436,7 @@ ompi_event_loop(int flags) } } OMPI_THREAD_UNLOCK(&ompi_event_lock); - return (0); + return (num_active); } diff --git a/src/mca/oob/tcp/oob_tcp_msg.c b/src/mca/oob/tcp/oob_tcp_msg.c index e04c06aa86..1ba817c6a2 100644 --- a/src/mca/oob/tcp/oob_tcp_msg.c +++ b/src/mca/oob/tcp/oob_tcp_msg.c @@ -51,7 +51,7 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc) int rc; OMPI_THREAD_UNLOCK(&msg->msg_lock); rc = ompi_event_loop(OMPI_EVLOOP_ONCE); - assert(rc == 0); + assert(rc >= 0); OMPI_THREAD_LOCK(&msg->msg_lock); } else { ompi_condition_wait(&msg->msg_condition, &msg->msg_lock); @@ -95,7 +95,7 @@ int mca_oob_tcp_msg_timedwait(mca_oob_tcp_msg_t* msg, int* rc, struct timespec* int rc; OMPI_THREAD_UNLOCK(&msg->msg_lock); rc = ompi_event_loop(OMPI_EVLOOP_ONCE); - assert(rc == 0); + assert(rc >= 0); OMPI_THREAD_LOCK(&msg->msg_lock); } else { ompi_condition_timedwait(&msg->msg_condition, &msg->msg_lock, abstime); diff --git a/src/mca/pml/teg/src/pml_teg_progress.c b/src/mca/pml/teg/src/pml_teg_progress.c index 1445a22ff9..70da5d3226 100644 --- a/src/mca/pml/teg/src/pml_teg_progress.c +++ b/src/mca/pml/teg/src/pml_teg_progress.c @@ -12,6 +12,7 @@ int mca_pml_teg_progress(void) { mca_ptl_tstamp_t tstamp = 0; size_t i; + int count; /* * Progress each of the PTL modules @@ -19,7 +20,10 @@ int mca_pml_teg_progress(void) for(i=0; iptlm_progress; if(NULL != progress) { - progress(tstamp); + int rc = progress(tstamp); + if(rc < 0) + return rc; + count += rc; } } return OMPI_SUCCESS; diff --git a/src/mca/ptl/mx/ptl_mx_component.c b/src/mca/ptl/mx/ptl_mx_component.c index 27796ed098..b0eb52211f 100644 --- a/src/mca/ptl/mx/ptl_mx_component.c +++ b/src/mca/ptl/mx/ptl_mx_component.c @@ -225,6 +225,7 @@ int mca_ptl_mx_component_control(int param, void* value, size_t size) int mca_ptl_mx_component_progress(mca_ptl_tstamp_t tstamp) { + int num_progressed = 0; size_t i; for(i=0; imx_endpoint, &mx_request, @@ -263,6 +264,7 @@ int mca_ptl_mx_component_progress(mca_ptl_tstamp_t tstamp) ompi_output(0, "mca_ptl_mx_progress: mx_test() failed with status=%dn", mx_return); } + num_progressed++; #else /* pre-post receive */ if(ptl->mx_recvs_posted == 0) { @@ -283,8 +285,9 @@ int mca_ptl_mx_component_progress(mca_ptl_tstamp_t tstamp) if(mx_result > 0) { MCA_PTL_MX_PROGRESS(ptl, mx_status); } + num_progressed++; #endif } - return OMPI_SUCCESS; + return num_progressed; } diff --git a/src/mca/ptl/sm/src/ptl_sm_component.c b/src/mca/ptl/sm/src/ptl_sm_component.c index 1024c50260..8e1129c960 100644 --- a/src/mca/ptl/sm/src/ptl_sm_component.c +++ b/src/mca/ptl/sm/src/ptl_sm_component.c @@ -263,7 +263,7 @@ int mca_ptl_sm_component_control(int param, void* value, size_t size) int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) { /* local variables */ - int my_local_smp_rank, proc, return_status; + int my_local_smp_rank, proc; unsigned int peer_local_smp_rank ; mca_ptl_sm_frag_t *header_ptr; volatile ompi_fifo_t *send_fifo; @@ -271,6 +271,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) mca_ptl_base_match_header_t *matching_header; mca_pml_base_send_request_t *base_send_req; ompi_list_item_t *item; + int return_status = 0; my_local_smp_rank=mca_ptl_sm_component.my_smp_rank; @@ -321,6 +322,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) } /* figure out what type of message this is */ + return_status++; switch (header_ptr->super.frag_base.frag_header.hdr_common.hdr_type) { @@ -430,6 +432,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) /* figure out what type of message this is */ + return_status++; switch (header_ptr->super.frag_base.frag_header.hdr_common.hdr_type) { @@ -506,7 +509,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) * on a different list */ item = ompi_list_remove_first(&(mca_ptl_sm_component.sm_pending_ack)); while ( item != ompi_list_get_end(&(mca_ptl_sm_component.sm_pending_ack)) ) { - + int rc; /* get fragment pointer */ header_ptr = (mca_ptl_sm_frag_t *)item; @@ -516,11 +519,11 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) /* fragment already marked as an ack */ - return_status=ompi_fifo_write_to_head_same_base_addr(header_ptr, + rc=ompi_fifo_write_to_head_same_base_addr(header_ptr, send_fifo, mca_ptl_sm_component.sm_mpool); /* if ack failed, break */ - if( 0 > return_status ) { + if( 0 > rc ) { /* put the descriptor back on the list */ ompi_list_prepend(&(mca_ptl_sm_component.sm_pending_ack),item); break; @@ -534,7 +537,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) OMPI_THREAD_UNLOCK(&(mca_ptl_sm_component.sm_pending_ack_lock)); } - return OMPI_SUCCESS; + return return_status; } diff --git a/src/runtime/ompi_progress.c b/src/runtime/ompi_progress.c index c5a3e5feee..4636ba8c25 100644 --- a/src/runtime/ompi_progress.c +++ b/src/runtime/ompi_progress.c @@ -1,4 +1,5 @@ #include "ompi_config.h" +#include #include "event/event.h" #include "mca/pml/pml.h" #include "runtime/ompi_progress.h" @@ -15,8 +16,24 @@ void ompi_progress_events(int flag) void ompi_progress(void) { + /* progress any outstanding communications */ + int events = 0; if(ompi_progress_event_flag != 0) - ompi_event_loop(ompi_progress_event_flag); - mca_pml.pml_progress(); + events += ompi_event_loop(ompi_progress_event_flag); + events += mca_pml.pml_progress(); + +#if 0 + /* TSW - disable this until can validate that it doesn't impact SMP + * performance + */ + /* + * if there is nothing to do - yield the processor - otherwise + * we could consume the processor for the entire time slice. If + * the processor is oversubscribed - this will result in a best-case + * latency equivalent to the time-slice. + */ + if(events == 0) + sched_yield(); +#endif }