From 44febd0a8637b2d326413c82b0a5e72057773792 Mon Sep 17 00:00:00 2001 From: Weikuan Yu Date: Tue, 31 Aug 2004 17:45:33 +0000 Subject: [PATCH] -- Checkin more code to detect events being fired -- To double check thread-triggered progress and join This commit was SVN r2413. --- src/mca/ptl/elan/src/ptl_elan_init.c | 12 ++-- src/mca/ptl/elan/src/ptl_elan_priv.c | 87 ++++++++++++++++++++++++---- src/mca/ptl/elan/src/ptl_elan_priv.h | 14 +++-- 3 files changed, 92 insertions(+), 21 deletions(-) diff --git a/src/mca/ptl/elan/src/ptl_elan_init.c b/src/mca/ptl/elan/src/ptl_elan_init.c index b2f993f60c..4a75532cac 100644 --- a/src/mca/ptl/elan/src/ptl_elan_init.c +++ b/src/mca/ptl/elan/src/ptl_elan_init.c @@ -484,11 +484,10 @@ mca_ptl_elan_state_init (mca_ptl_elan_component_t * emp) /* Allocate a Sleep Desc */ es = ompi_init_elan_sleepdesc (ems, rail); - - /* XXX: put a lock and hold a lock */ + OMPI_LOCK(&mca_ptl_elan_component.elan_lock); es->es_next = rail->r_sleepDescs; rail->r_sleepDescs = es; - /* XXX: release the lock */ + OMPI_UNLOCK(&mca_ptl_elan_component.elan_lock); estate->alloc = rail->r_alloc; estate->vp = ems->elan_vp; @@ -651,14 +650,17 @@ mca_ptl_elan_thread_close (mca_ptl_elan_component_t * emp) START_FUNC(PTL_ELAN_DEBUG_FIN); num_rails = emp->num_modules; + for (i = 0; i < num_rails; i ++) { + /* FIXME: Generate a QUEUE DMA to each thread */ + } + + /* Join all threads */ for (i = 0; i < num_rails; i ++) { ompi_ptl_elan_thread_t * tsend, *trecv; tsend = emp->send_threads[i]; trecv = emp->recv_threads[i]; - /* FIXME: Generate a QUEUE DMA to each thread */ - ompi_thread_join(&tsend->thread, NULL); ompi_thread_join(&trecv->thread, NULL); } diff --git a/src/mca/ptl/elan/src/ptl_elan_priv.c b/src/mca/ptl/elan/src/ptl_elan_priv.c index 57b0fbbc6a..1d0f24f0c7 100644 --- a/src/mca/ptl/elan/src/ptl_elan_priv.c +++ b/src/mca/ptl/elan/src/ptl_elan_priv.c @@ -705,6 +705,69 @@ mca_ptl_elan_init_get_desc (mca_ptl_elan_module_t *ptl, } #endif /* End of OMPI_PTL_ELAN_ENABLE_GET */ + +/* XXX: a customized elan_waitWord function that do without a ELAN_STATE input, + * Need to replace with elan4_waitevent_word () */ +int +mca_ptl_elan_wait_queue(mca_ptl_elan_module_t * ptl, + ompi_ptl_elan_recv_queue_t *rxq, long usecs) +{ + int ret = 1; + RAIL *rail; + ELAN4_CTX *ctx; + ADDR_SDRAM ready; + EVENT_WORD *readyWord; + + START_FUNC(PTL_ELAN_DEBUG_THREAD); + + rail = (RAIL *)ptl->ptl_elan_rail; + ctx = ptl->ptl_elan_ctx; + ready = rxq->qr_qEvent; + readyWord = &rxq->qr_doneWord; + + /* FIXME: Make sure the event and doneWord are correctly initialized */ + LOG_PRINT(PTL_ELAN_DEBUG_THREAD, + "rail %p ctx %p ready %p readyWord %p\n", + rail, ctx, ready, ready); + + /* Poll for usec (at least one), then go to sleep. */ + if (ret = elan4_pollevent_word(ctx, readyWord, usecs)) { + return ret; + } + + LOG_PRINT(PTL_ELAN_DEBUG_THREAD, + "eventWord(%p) TIMED_OUT: ready %lx [%d.%x] readyWord %p [%d]\n", + ready, + EVENT_COUNT(((EVENT32 *)(ready))), + EVENT_TYPE(((EVENT32 *)(ready))), + readyWord, *readyWord); + + /* XXXX Temporary Elan4 blocking wait code */ + { + ELAN_SLEEP *es; + OMPI_LOCK(&mca_ptl_elan_component.elan_lock); + if ((es = rail->r_sleepDescs) == NULL) + es = ompi_init_elan_sleepdesc (&mca_ptl_elan_global_state, rail); + else + rail->r_sleepDescs = es->es_next; + OMPI_UNLOCK(&mca_ptl_elan_component.elan_lock); + + LOG_PRINT(PTL_ELAN_DEBUG_THREAD, + "eventWord(%p): es %p cookie %x cmdq %p ecmdq %p\n", + ready, es, es->es_cookie, es->es_cmdq, es->es_ecmdq); + WAITEVENT_WORD(ctx, es->es_cmdq, es->es_ecmdq, es->es_cmdBlk, + es->es_cookie, ready, readyWord, usecs); + + OMPI_LOCK(&mca_ptl_elan_component.elan_lock); + es->es_next = rail->r_sleepDescs; + rail->r_sleepDescs = es; + OMPI_UNLOCK(&mca_ptl_elan_component.elan_lock); + } + END_FUNC(PTL_ELAN_DEBUG_THREAD); + return ret; +} + + #if OMPI_PTL_ELAN_ENABLE_GET && defined (HAVE_GET_INTERFACE) int mca_ptl_elan_start_get (mca_ptl_elan_send_frag_t * frag, @@ -996,17 +1059,17 @@ mca_ptl_elan_drain_recv (struct mca_ptl_elan_module_t *ptl) ELAN_CTX *ctx; int rc; + START_FUNC(PTL_ELAN_DEBUG_THREAD); queue = ptl->queue; rxq = queue->rxq; ctx = ptl->ptl_elan_ctx; OMPI_LOCK (&queue->rx_lock); -#if 1 - rc = (*(int *) (&rxq->qr_doneWord)); +#if OMPI_PTL_ELAN_THREADING + rc = mca_ptl_elan_wait_queue(ptl, rxq, 1); #else - rc = elan4_pollevent_word (ctx, &rxq->qr_doneWord, 1); + rc = (*(int *) (&rxq->qr_doneWord)); #endif - if (rc) { mca_ptl_base_header_t *header; @@ -1064,6 +1127,7 @@ mca_ptl_elan_drain_recv (struct mca_ptl_elan_module_t *ptl) } OMPI_UNLOCK (&queue->rx_lock); + END_FUNC(PTL_ELAN_DEBUG_THREAD); return OMPI_SUCCESS; } @@ -1078,22 +1142,24 @@ mca_ptl_elan_update_desc (struct mca_ptl_elan_module_t *ptl) ompi_ptl_elan_recv_queue_t *rxq; + START_FUNC(PTL_ELAN_DEBUG_THREAD); comp = ptl->comp; ctx = ptl->ptl_elan_ctx; rxq = comp->rxq; -#if 1 + OMPI_LOCK (&comp->rx_lock); +#if OMPI_PTL_ELAN_THREADING + /* XXX: block on the recv queue without holding a lock */ + rc = mca_ptl_elan_wait_queue(ptl, rxq, 1); +#else /* XXX: Just test and go */ rc = (*(int *) (&rxq->qr_doneWord)); -#else - /* XXX: block on the event without holding a lock */ - rc = elan4_pollevent_word (ctx, &rxq->qr_doneWord, 1); #endif + if (rc) { mca_ptl_elan_send_frag_t *frag; mca_ptl_base_header_t *header; ompi_ptl_elan_base_desc_t *basic; - OMPI_LOCK (&comp->rx_lock); header = (mca_ptl_base_header_t *) rxq->qr_fptr; LOG_PRINT(PTL_ELAN_DEBUG_MAC, @@ -1141,8 +1207,8 @@ mca_ptl_elan_update_desc (struct mca_ptl_elan_module_t *ptl) MAIN2ELAN (ctx, (void *) &rxq->qr_doneWord), 0xfeedfacedeadbeef); elan4_flush_cmdq_reorder (rxq->qr_cmdq); - OMPI_UNLOCK (&comp->rx_lock); } + OMPI_UNLOCK (&comp->rx_lock); #else ctx = ptl->ptl_elan_ctx; while (ompi_list_get_size (&ptl->send_frags) > 0) { @@ -1171,6 +1237,7 @@ mca_ptl_elan_update_desc (struct mca_ptl_elan_module_t *ptl) } /* end of the while loop */ #endif + END_FUNC(PTL_ELAN_DEBUG_THREAD); return OMPI_SUCCESS; } diff --git a/src/mca/ptl/elan/src/ptl_elan_priv.h b/src/mca/ptl/elan/src/ptl_elan_priv.h index 93003749df..f82f61f936 100644 --- a/src/mca/ptl/elan/src/ptl_elan_priv.h +++ b/src/mca/ptl/elan/src/ptl_elan_priv.h @@ -42,7 +42,7 @@ #define PTL_ELAN_DEBUG_INIT (0x001) #define PTL_ELAN_DEBUG_FIN (0x002) #define PTL_ELAN_DEBUG_QDESC (0x004) -#define PTL_ELAN_DEBUG_RDESC (0x008) +#define PTL_ELAN_DEBUG_THREAD (0x008) #define PTL_ELAN_DEBUG_SEND (0x010) #define PTL_ELAN_DEBUG_RECV (0x020) #define PTL_ELAN_DEBUG_ACK (0x040) @@ -53,7 +53,7 @@ #define PTL_ELAN_DEBUG_CHAIN (0x800) #define PTL_ELAN_DEBUG_FLAG \ -(PTL_ELAN_DEBUG_PUT|PTL_ELAN_DEBUG_GET) + (PTL_ELAN_DEBUG_FIN | PTL_ELAN_DEBUG_INIT | PTL_ELAN_DEBUG_THREAD) #define START_FUNC(flag) \ do { \ @@ -86,7 +86,8 @@ do { \ /* PTL_ELAN related MACROS, expose some as configurable options if needed */ #define OMPI_PTL_ELAN_ENABLE_GET (0) #define OMPI_PTL_ELAN_COMP_QUEUE (1) -#define OMPI_PTL_ELAN_THREADING (OMPI_HAVE_POSIX_THREADS) +#define OMPI_PTL_ELAN_THREADING \ + (OMPI_PTL_ELAN_COMP_QUEUE && OMPI_HAVE_POSIX_THREADS) #define OMPI_PTL_ELAN_MAX_QSIZE (2048) #define OMPI_PTL_ELAN_MAX_QSLOTS (128) @@ -377,9 +378,10 @@ int mca_ptl_elan_get_with_ack (mca_ptl_base_module_t * ptl, mca_ptl_elan_send_frag_t * frag, mca_ptl_elan_recv_frag_t * recv_frag); -int mca_ptl_elan_poll_desc(mca_ptl_elan_send_frag_t *desc); -int mca_ptl_elan_wait_desc(mca_ptl_elan_send_frag_t *desc); - +int mca_ptl_elan_poll_queue(ompi_ptl_elan_recv_queue_t *rxq); +int mca_ptl_elan_wait_queue(mca_ptl_elan_module_t * ptl, + ompi_ptl_elan_recv_queue_t *rxq, + long usecs); /* control, synchronization and state prototypes */ int mca_ptl_elan_drain_recv(mca_ptl_elan_module_t * ptl); int mca_ptl_elan_update_desc(mca_ptl_elan_module_t * ptl);