1
1

-- Checkin more code to detect events being fired

-- To double check thread-triggered progress and join 

This commit was SVN r2413.
Этот коммит содержится в:
Weikuan Yu 2004-08-31 17:45:33 +00:00
родитель a691dfbb4e
Коммит 44febd0a86
3 изменённых файлов: 92 добавлений и 21 удалений

Просмотреть файл

@ -484,11 +484,10 @@ mca_ptl_elan_state_init (mca_ptl_elan_component_t * emp)
/* Allocate a Sleep Desc */
es = ompi_init_elan_sleepdesc (ems, rail);
/* XXX: put a lock and hold a lock */
OMPI_LOCK(&mca_ptl_elan_component.elan_lock);
es->es_next = rail->r_sleepDescs;
rail->r_sleepDescs = es;
/* XXX: release the lock */
OMPI_UNLOCK(&mca_ptl_elan_component.elan_lock);
estate->alloc = rail->r_alloc;
estate->vp = ems->elan_vp;
@ -651,14 +650,17 @@ mca_ptl_elan_thread_close (mca_ptl_elan_component_t * emp)
START_FUNC(PTL_ELAN_DEBUG_FIN);
num_rails = emp->num_modules;
for (i = 0; i < num_rails; i ++) {
/* FIXME: Generate a QUEUE DMA to each thread */
}
/* Join all threads */
for (i = 0; i < num_rails; i ++) {
ompi_ptl_elan_thread_t * tsend, *trecv;
tsend = emp->send_threads[i];
trecv = emp->recv_threads[i];
/* FIXME: Generate a QUEUE DMA to each thread */
ompi_thread_join(&tsend->thread, NULL);
ompi_thread_join(&trecv->thread, NULL);
}

Просмотреть файл

@ -705,6 +705,69 @@ mca_ptl_elan_init_get_desc (mca_ptl_elan_module_t *ptl,
}
#endif /* End of OMPI_PTL_ELAN_ENABLE_GET */
/* XXX: a customized elan_waitWord function that do without a ELAN_STATE input,
* Need to replace with elan4_waitevent_word () */
int
mca_ptl_elan_wait_queue(mca_ptl_elan_module_t * ptl,
ompi_ptl_elan_recv_queue_t *rxq, long usecs)
{
int ret = 1;
RAIL *rail;
ELAN4_CTX *ctx;
ADDR_SDRAM ready;
EVENT_WORD *readyWord;
START_FUNC(PTL_ELAN_DEBUG_THREAD);
rail = (RAIL *)ptl->ptl_elan_rail;
ctx = ptl->ptl_elan_ctx;
ready = rxq->qr_qEvent;
readyWord = &rxq->qr_doneWord;
/* FIXME: Make sure the event and doneWord are correctly initialized */
LOG_PRINT(PTL_ELAN_DEBUG_THREAD,
"rail %p ctx %p ready %p readyWord %p\n",
rail, ctx, ready, ready);
/* Poll for usec (at least one), then go to sleep. */
if (ret = elan4_pollevent_word(ctx, readyWord, usecs)) {
return ret;
}
LOG_PRINT(PTL_ELAN_DEBUG_THREAD,
"eventWord(%p) TIMED_OUT: ready %lx [%d.%x] readyWord %p [%d]\n",
ready,
EVENT_COUNT(((EVENT32 *)(ready))),
EVENT_TYPE(((EVENT32 *)(ready))),
readyWord, *readyWord);
/* XXXX Temporary Elan4 blocking wait code */
{
ELAN_SLEEP *es;
OMPI_LOCK(&mca_ptl_elan_component.elan_lock);
if ((es = rail->r_sleepDescs) == NULL)
es = ompi_init_elan_sleepdesc (&mca_ptl_elan_global_state, rail);
else
rail->r_sleepDescs = es->es_next;
OMPI_UNLOCK(&mca_ptl_elan_component.elan_lock);
LOG_PRINT(PTL_ELAN_DEBUG_THREAD,
"eventWord(%p): es %p cookie %x cmdq %p ecmdq %p\n",
ready, es, es->es_cookie, es->es_cmdq, es->es_ecmdq);
WAITEVENT_WORD(ctx, es->es_cmdq, es->es_ecmdq, es->es_cmdBlk,
es->es_cookie, ready, readyWord, usecs);
OMPI_LOCK(&mca_ptl_elan_component.elan_lock);
es->es_next = rail->r_sleepDescs;
rail->r_sleepDescs = es;
OMPI_UNLOCK(&mca_ptl_elan_component.elan_lock);
}
END_FUNC(PTL_ELAN_DEBUG_THREAD);
return ret;
}
#if OMPI_PTL_ELAN_ENABLE_GET && defined (HAVE_GET_INTERFACE)
int
mca_ptl_elan_start_get (mca_ptl_elan_send_frag_t * frag,
@ -996,17 +1059,17 @@ mca_ptl_elan_drain_recv (struct mca_ptl_elan_module_t *ptl)
ELAN_CTX *ctx;
int rc;
START_FUNC(PTL_ELAN_DEBUG_THREAD);
queue = ptl->queue;
rxq = queue->rxq;
ctx = ptl->ptl_elan_ctx;
OMPI_LOCK (&queue->rx_lock);
#if 1
rc = (*(int *) (&rxq->qr_doneWord));
#if OMPI_PTL_ELAN_THREADING
rc = mca_ptl_elan_wait_queue(ptl, rxq, 1);
#else
rc = elan4_pollevent_word (ctx, &rxq->qr_doneWord, 1);
rc = (*(int *) (&rxq->qr_doneWord));
#endif
if (rc) {
mca_ptl_base_header_t *header;
@ -1064,6 +1127,7 @@ mca_ptl_elan_drain_recv (struct mca_ptl_elan_module_t *ptl)
}
OMPI_UNLOCK (&queue->rx_lock);
END_FUNC(PTL_ELAN_DEBUG_THREAD);
return OMPI_SUCCESS;
}
@ -1078,22 +1142,24 @@ mca_ptl_elan_update_desc (struct mca_ptl_elan_module_t *ptl)
ompi_ptl_elan_recv_queue_t *rxq;
START_FUNC(PTL_ELAN_DEBUG_THREAD);
comp = ptl->comp;
ctx = ptl->ptl_elan_ctx;
rxq = comp->rxq;
#if 1
OMPI_LOCK (&comp->rx_lock);
#if OMPI_PTL_ELAN_THREADING
/* XXX: block on the recv queue without holding a lock */
rc = mca_ptl_elan_wait_queue(ptl, rxq, 1);
#else
/* XXX: Just test and go */
rc = (*(int *) (&rxq->qr_doneWord));
#else
/* XXX: block on the event without holding a lock */
rc = elan4_pollevent_word (ctx, &rxq->qr_doneWord, 1);
#endif
if (rc) {
mca_ptl_elan_send_frag_t *frag;
mca_ptl_base_header_t *header;
ompi_ptl_elan_base_desc_t *basic;
OMPI_LOCK (&comp->rx_lock);
header = (mca_ptl_base_header_t *) rxq->qr_fptr;
LOG_PRINT(PTL_ELAN_DEBUG_MAC,
@ -1141,8 +1207,8 @@ mca_ptl_elan_update_desc (struct mca_ptl_elan_module_t *ptl)
MAIN2ELAN (ctx, (void *) &rxq->qr_doneWord),
0xfeedfacedeadbeef);
elan4_flush_cmdq_reorder (rxq->qr_cmdq);
OMPI_UNLOCK (&comp->rx_lock);
}
OMPI_UNLOCK (&comp->rx_lock);
#else
ctx = ptl->ptl_elan_ctx;
while (ompi_list_get_size (&ptl->send_frags) > 0) {
@ -1171,6 +1237,7 @@ mca_ptl_elan_update_desc (struct mca_ptl_elan_module_t *ptl)
} /* end of the while loop */
#endif
END_FUNC(PTL_ELAN_DEBUG_THREAD);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -42,7 +42,7 @@
#define PTL_ELAN_DEBUG_INIT (0x001)
#define PTL_ELAN_DEBUG_FIN (0x002)
#define PTL_ELAN_DEBUG_QDESC (0x004)
#define PTL_ELAN_DEBUG_RDESC (0x008)
#define PTL_ELAN_DEBUG_THREAD (0x008)
#define PTL_ELAN_DEBUG_SEND (0x010)
#define PTL_ELAN_DEBUG_RECV (0x020)
#define PTL_ELAN_DEBUG_ACK (0x040)
@ -53,7 +53,7 @@
#define PTL_ELAN_DEBUG_CHAIN (0x800)
#define PTL_ELAN_DEBUG_FLAG \
(PTL_ELAN_DEBUG_PUT|PTL_ELAN_DEBUG_GET)
(PTL_ELAN_DEBUG_FIN | PTL_ELAN_DEBUG_INIT | PTL_ELAN_DEBUG_THREAD)
#define START_FUNC(flag) \
do { \
@ -86,7 +86,8 @@ do { \
/* PTL_ELAN related MACROS, expose some as configurable options if needed */
#define OMPI_PTL_ELAN_ENABLE_GET (0)
#define OMPI_PTL_ELAN_COMP_QUEUE (1)
#define OMPI_PTL_ELAN_THREADING (OMPI_HAVE_POSIX_THREADS)
#define OMPI_PTL_ELAN_THREADING \
(OMPI_PTL_ELAN_COMP_QUEUE && OMPI_HAVE_POSIX_THREADS)
#define OMPI_PTL_ELAN_MAX_QSIZE (2048)
#define OMPI_PTL_ELAN_MAX_QSLOTS (128)
@ -377,9 +378,10 @@ int mca_ptl_elan_get_with_ack (mca_ptl_base_module_t * ptl,
mca_ptl_elan_send_frag_t * frag,
mca_ptl_elan_recv_frag_t * recv_frag);
int mca_ptl_elan_poll_desc(mca_ptl_elan_send_frag_t *desc);
int mca_ptl_elan_wait_desc(mca_ptl_elan_send_frag_t *desc);
int mca_ptl_elan_poll_queue(ompi_ptl_elan_recv_queue_t *rxq);
int mca_ptl_elan_wait_queue(mca_ptl_elan_module_t * ptl,
ompi_ptl_elan_recv_queue_t *rxq,
long usecs);
/* control, synchronization and state prototypes */
int mca_ptl_elan_drain_recv(mca_ptl_elan_module_t * ptl);
int mca_ptl_elan_update_desc(mca_ptl_elan_module_t * ptl);