1
1

-- Add shared completion queue support

-- To add threads for asynchronous progression

This commit was SVN r2266.
Этот коммит содержится в:
Weikuan Yu 2004-08-23 13:48:21 +00:00
родитель c61d124c76
Коммит a30d508423
5 изменённых файлов: 170 добавлений и 40 удалений

Просмотреть файл

@ -334,7 +334,7 @@ mca_ptl_elan_get (struct mca_ptl_base_module_t *ptl,
{
int rc = OMPI_SUCCESS;
#if OMPI_PTL_ELAN_ENABLE_GET
#if OMPI_PTL_ELAN_ENABLE_GET && defined (HAVE_GET_INTERFACE)
mca_ptl_elan_send_frag_t *desc;
/* TODO:

Просмотреть файл

@ -45,8 +45,9 @@ struct mca_ptl_elan_module_t {
ompi_list_t recv_frags; /**< outstanding recv's */
ompi_list_t pending_acks;
struct ompi_ptl_elan_comp_queue_t *comp; /**< completion queue */
struct ompi_ptl_elan_queue_ctrl_t *queue; /**< Queue ctrl struct*/
struct ompi_ptl_elan_putget_ctrl_t *putget; /**< putget ctrl struct */
struct ompi_ptl_elan_putget_ctrl_t *putget;/**< putget ctrl struct */
};
typedef struct mca_ptl_elan_module_t mca_ptl_elan_module_t;
extern mca_ptl_elan_module_t mca_ptl_elan_module;

Просмотреть файл

@ -85,11 +85,15 @@ ompi_init_elan_queue_events (mca_ptl_elan_module_t * ptl,
/* Initialize some of the dma structures */
desc->main_dma.dma_dstAddr = 0;
#if OMPI_PTL_ELAN_COMP_QUEUE
/* Have all the source event fired to the Queue */
#else
desc->main_dma.dma_srcEvent = SDRAM2ELAN (ctx, desc->elan_event);
desc->main_dma.dma_dstEvent = SDRAM2ELAN (ctx, queue->input);
INITEVENT_WORD (ctx, desc->elan_event, &desc->main_doneWord);
RESETEVENT_WORD (&desc->main_doneWord);
PRIMEEVENT_WORD (ctx, desc->elan_event, 1);
#endif
item = (ompi_list_item_t *) frag;
ompi_list_append (&flist->super, item);
@ -134,6 +138,9 @@ mca_ptl_elan_putget_desc_contruct (
desc->main_dma.dma_srcAddr = src_elan4_addr;
desc->main_dma.dma_dstAddr = dst_elan4_addr;
#if OMPI_PTL_ELAN_COMP_QUEUE
/* Have all the source event fired to the Queue */
#else
if (local) {
desc->main_dma.dma_srcEvent = elan4_main2elan(ctx, elan_event);
} else {
@ -144,6 +151,7 @@ mca_ptl_elan_putget_desc_contruct (
INITEVENT_WORD (ctx, elan_event, &desc->main_doneWord);
RESETEVENT_WORD (&desc->main_doneWord);
PRIMEEVENT_WORD (ctx, elan_event, 1);
#endif
/* Make PCI write visable */
mb();
@ -285,7 +293,7 @@ ompi_init_elan_qdma (mca_ptl_elan_component_t * emp,
OMPI_PTL_ELAN_CHECK_UNEX (queue, NULL, OMPI_ERROR, 0);
memset (queue, 0, sizeof (ompi_ptl_elan_queue_ctrl_t));
/* Allocate input queue */
/* TODO: move the input queue into ptl->comp */
queue->input = (E4_InputQueue *) elan4_allocElan (rail->r_alloc,
INPUT_QUEUE_ALIGN,
INPUT_QUEUE_SIZE);
@ -377,6 +385,92 @@ ompi_init_elan_qdma (mca_ptl_elan_component_t * emp,
OBJ_CONSTRUCT (&queue->rx_lock, ompi_mutex_t);
}
#if OMPI_PTL_ELAN_COMP_QUEUE || 1
/* Create a complete queue here, later use the queue above directly */
/* Init the Transmit Queue structure */
for (i = 0; i < num_rails; i++) {
ompi_ptl_elan_recv_queue_t *rxq;
ompi_ptl_elan_comp_queue_t *comp;
ptl = emp->modules[i];
rail = (RAIL *) ptl->ptl_elan_rail;
ctx = (ELAN4_CTX *) ptl->ptl_elan_ctx;
comp = ptl->comp = (ompi_ptl_elan_comp_queue_t *)
malloc (sizeof (ompi_ptl_elan_comp_queue_t));
OMPI_PTL_ELAN_CHECK_UNEX (comp, NULL, OMPI_ERROR, 0);
memset (comp, 0, sizeof (ompi_ptl_elan_comp_queue_t));
/* Allocate input queue */
comp->input = (E4_InputQueue *) elan4_allocElan (rail->r_alloc,
INPUT_QUEUE_ALIGN,
INPUT_QUEUE_SIZE);
OMPI_PTL_ELAN_CHECK_UNEX (comp->input, NULL, OMPI_ERROR, 0);
/* Init the Receive Queue structure */
comp->rx_nslots = nslots;
nslots += OMPI_PTL_ELAN_LOST_QSLOTS;
comp->rx_buffsize = (slotsize > INPUT_QUEUE_MAX) ?
INPUT_QUEUE_MAX : slotsize;
comp->rx_slotsize = ELAN_ALIGNUP (slotsize, OMPI_PTL_ELAN_SLOT_ALIGN);
rxq = comp->rxq = (ompi_ptl_elan_recv_queue_t *)
elan4_allocMain (rail->r_alloc, 64,
sizeof (ompi_ptl_elan_recv_queue_t));
OMPI_PTL_ELAN_CHECK_UNEX (rxq, NULL, OMPI_ERROR, 0);
memset (rxq, 0, sizeof (ompi_ptl_elan_recv_queue_t));
rxq->qr_rail = rail;
rxq->qr_fptr = elan4_allocMain (rail->r_alloc,
128, nslots * comp->rx_slotsize);
OMPI_PTL_ELAN_CHECK_UNEX (rxq->qr_fptr, NULL, OMPI_ERROR, 0);
memset (rxq->qr_fptr, 0xeb, nslots * comp->rx_slotsize);
rxq->qr_elanDone = ALLOC_ELAN (rail,
OMPI_PTL_ELAN_SLOT_ALIGN, sizeof (EVENT32));
OMPI_PTL_ELAN_CHECK_UNEX (rxq->qr_elanDone, NULL, OMPI_ERROR, 0);
/* Set the top et al */
rxq->qr_efitem = (E4_uint64) elan4_main2elan (ctx, rxq->qr_fptr);
assert(rxq->qr_efitem != ELAN_BAD_ADDR);
rxq->qr_base = rxq->qr_fptr;
rxq->qr_top = (void *) ((uintptr_t) rxq->qr_base +
(comp->rx_slotsize * (nslots - OMPI_PTL_ELAN_LOST_QSLOTS)));
rxq->qr_efptr = rxq->qr_efitem;
rxq->qr_elitem = rxq->qr_efitem +
(comp->rx_slotsize * (nslots - OMPI_PTL_ELAN_LOST_QSLOTS));
/* Event to wait/block on, Bug here for the event */
rxq->qr_qEvent = rxq->qr_elanDone;
comp->input->q_event =
SDRAM2ELAN (ctx, (void *) rxq->qr_elanDone);
comp->input->q_fptr = rxq->qr_efitem;
comp->input->q_bptr = rxq->qr_efitem;
comp->input->q_control =
E4_InputQueueControl (rxq->qr_efitem, rxq->qr_elitem,
comp->rx_slotsize);
/* The event */
INITEVENT_WORD (ctx, (EVENT *) rxq->qr_elanDone,
&rxq->qr_doneWord);
RESETEVENT_WORD (&rxq->qr_doneWord);
PRIMEEVENT_WORD (ctx, (EVENT *) rxq->qr_elanDone, 1);
rxq->qr_cmdq = elan4_alloc_cmdq (ctx, rail->r_alloc,
CQ_Size1K,
CQ_WriteEnableBit |
CQ_WaitEventEnableBit, NULL);
OMPI_PTL_ELAN_CHECK_UNEX (rxq->qr_cmdq, NULL, OMPI_ERROR, 0);
/* Allocate a sleepDesc for threads to block on */
rxq->qr_es = ompi_init_elan_sleepdesc (&mca_ptl_elan_global_state,
rxq->qr_rail);
OMPI_PTL_ELAN_CHECK_UNEX (rxq->qr_es, NULL, OMPI_ERROR, 0);
OBJ_CONSTRUCT (&comp->rx_lock, ompi_mutex_t);
}
#endif
END_FUNC(PTL_ELAN_DEBUG_INIT);
return (OMPI_SUCCESS);
}

Просмотреть файл

@ -49,11 +49,9 @@ mca_ptl_elan_data_frag (struct mca_ptl_elan_module_t *ptl,
* No information about which peer until checking the header
* Somewhere after the frag is matched, this peer information needs
* to be filled in so that ACK can be sent out.
*
* b) Possibly, another drawback of hooking the ack to the particular
* recv fragment. If the ack fragment is not hooked this way,
* PML will provide the peer information when the ack is requested.
*
* c) What if the recv request specifies MPI_ANY_SOURCE, then
* for the handshaking to complete, peer should be fixed the
* handshaking. Then in this case, PML needs information from
@ -190,8 +188,9 @@ mca_ptl_elan_init_qdma_desc (struct mca_ptl_elan_send_frag_t *frag,
hdr->hdr_frag.hdr_src_ptr.lval = 0;
hdr->hdr_frag.hdr_src_ptr.pval = frag;
/* Stash local buffer address into the header, for ptl_elan_get */
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_frag.hdr_dst_ptr.pval = pml_req->req_base.req_addr,
hdr->hdr_frag.hdr_dst_ptr.pval = 0;
hdr->hdr_frag.hdr_dst_ptr.lval = elan4_main2elan(
ptl->ptl_elan_ctx, pml_req->req_base.req_addr);
hdr->hdr_match.hdr_contextid = pml_req->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = pml_req->req_base.req_comm->c_my_rank;
@ -376,17 +375,21 @@ mca_ptl_elan_init_putget_desc (struct mca_ptl_elan_send_frag_t *frag,
desc->chain_dma.dma_vproc = destvp;
desc->chain_dma.dma_srcAddr = elan4_main2elan (ctx, (void *) hdr);
desc->chain_dma.dma_dstAddr = 0x0ULL;
desc->chain_dma.dma_srcEvent = elan4_main2elan (ctx, desc->elan_event);
/* causes the inputter to redirect the dma to the inputq */
desc->chain_dma.dma_dstEvent = elan4_main2elan (ctx,
(void *) ptl->queue->input);
#if OMPI_PTL_ELAN_COMP_QUEUE
/* Have all the source event fired to the Queue */
#else
desc->chain_dma.dma_srcEvent = elan4_main2elan (ctx, desc->elan_event);
INITEVENT_WORD (ctx, (E4_Event *) desc->elan_event, &desc->main_doneWord);
RESETEVENT_WORD (&desc->main_doneWord);
/* Be sure that padding E4_Event is not causing problems */
PRIMEEVENT_WORD (ctx, (E4_Event *)desc->elan_event, 1);
#endif
desc->chain_dma.dma_typeSize |= RUN_DMA_CMD;
desc->chain_dma.dma_pad = NOP_CMD;
@ -444,31 +447,37 @@ mca_ptl_elan_init_putget_desc (struct mca_ptl_elan_send_frag_t *frag,
#if OMPI_PTL_ELAN_ENABLE_GET
static void
mca_ptl_elan_init_get_desc (struct mca_ptl_elan_send_frag_t *frag,
mca_ptl_elan_module_t * ptl,
struct mca_ptl_elan_peer_t *ptl_peer,
mca_ptl_elan_init_get_desc (mca_ptl_elan_module_t *ptl,
struct mca_ptl_elan_send_frag_t *frag,
mca_ptl_elan_recv_frag_t * recv_frag,
mca_pml_base_recv_request_t *pml_req,
size_t offset,
size_t *size,
int flags)
{
int destvp;
int size_out;
int size_in;
size_t offset;
ELAN4_CTX *ctx;
struct ompi_ptl_elan_putget_desc_t * desc;
mca_ptl_base_header_t *hdr;
mca_ptl_base_header_t *recv_header;
struct mca_ptl_elan_peer_t *ptl_peer;
START_FUNC(PTL_ELAN_DEBUG_GET);
ctx = ptl->ptl_elan_ctx;
hdr = &frag->frag_base.frag_header;
recv_header= &recv_frag->frag_recv.frag_base.frag_header;
ptl_peer = recv_frag->frag_recv.frag_base.frag_peer,
offset = pml_req->req_bytes_received,
desc = (ompi_ptl_elan_putget_desc_t *)frag->desc;
destvp = ptl_peer->peer_vp;
size_in = *size;
desc->src_elan_addr = elan4_main2elan(ctx, pml_req->req_base.req_addr);
desc->dst_elan_addr = 0; // (E4_Addr)pml_req->req_peer_addr.lval;
/* XXX: If doing get, the first frag will be left as 0-byte */
desc->src_elan_addr = recv_header->hdr_frag.hdr_dst_ptr.lval;
desc->dst_elan_addr = elan4_main2elan(ctx, pml_req->req_base.req_addr);
desc->desc_buff = hdr;
/* FIXME:
@ -485,12 +494,17 @@ mca_ptl_elan_init_get_desc (struct mca_ptl_elan_send_frag_t *frag,
desc->chain_dma.dma_vproc = destvp;
desc->chain_dma.dma_srcAddr = elan4_main2elan (ctx, (void *) hdr);
desc->chain_dma.dma_dstAddr = 0x0ULL;
desc->chain_dma.dma_srcEvent = elan4_main2elan (ctx, desc->elan_event);
desc->chain_dma.dma_dstEvent = elan4_main2elan (ctx,
(void *) ptl->queue->input);
#if OMPI_PTL_ELAN_COMP_QUEUE
/* Have all the source event fired to the Queue */
#else
desc->chain_dma.dma_srcEvent = elan4_main2elan (ctx, desc->elan_event);
INITEVENT_WORD (ctx, (E4_Event *) desc->elan_event, &desc->main_doneWord);
RESETEVENT_WORD (&desc->main_doneWord);
PRIMEEVENT_WORD (ctx, (E4_Event *)desc->elan_event, 1);
#endif
desc->chain_dma.dma_typeSize |= RUN_DMA_CMD;
desc->chain_dma.dma_pad = NOP_CMD;
@ -515,7 +529,6 @@ mca_ptl_elan_init_get_desc (struct mca_ptl_elan_send_frag_t *frag,
* Allocate space from command queues hanged off the CTX. */
desc->chain_event->ev_Params[1] = elan4_alloccq_space (ctx, 8, CQ_Size8K);
/* FIXME: Find the correct addresses to fill in */
desc->main_dma.dma_srcAddr = desc->src_elan_addr;
desc->main_dma.dma_dstAddr = desc->dst_elan_addr;
desc->main_dma.dma_srcEvent= 0x0ULL; /*disable remote event */
@ -541,10 +554,11 @@ mca_ptl_elan_init_get_desc (struct mca_ptl_elan_send_frag_t *frag,
END_FUNC(PTL_ELAN_DEBUG_SEND);
}
#if OMPI_PTL_ELAN_ENABLE_GET && defined (HAVE_GET_INTERFACE)
int
mca_ptl_elan_start_get (mca_ptl_elan_send_frag_t * frag,
struct mca_ptl_elan_peer_t *ptl_peer,
struct mca_pml_base_recv_request_t *req,
struct mca_pml_base_recv_request_t *request,
size_t offset,
size_t *size,
int flags)
@ -552,30 +566,39 @@ mca_ptl_elan_start_get (mca_ptl_elan_send_frag_t * frag,
mca_ptl_elan_module_t *ptl;
struct ompi_ptl_elan_putget_desc_t *gdesc;
START_FUNC(PTL_ELAN_DEBUG_SEND);
START_FUNC(PTL_ELAN_DEBUG_GET);
ptl = ptl_peer->peer_ptl;
gdesc = (ompi_ptl_elan_putget_desc_t *)frag->desc;
/*mca_ptl_elan_init_putget_desc */
mca_ptl_elan_init_get_desc (frag, ptl, ptl_peer,
mca_ptl_elan_init_get_desc (ptl, frag, ptl_peer,
req, offset, size, flags);
elan4_run_dma_cmd (ptl->putget->get_cmdq, (E4_DMA *) &gdesc->main_dma);
elan4_flush_cmdq_reorder (ptl->putget->get_cmdq);
ompi_list_append (&ptl->send_frags, (ompi_list_item_t *) frag);
frag->frag_base.frag_owner = (struct mca_ptl_base_module_t *)
&ptl_peer->peer_ptl->super;
frag->frag_base.frag_peer = (struct mca_ptl_base_peer_t *) ptl_peer;
frag->frag_base.frag_addr = NULL;
frag->frag_base.frag_size = *size;
frag->frag_progressed = 0;
frag->frag_ack_pending = 0; /* this is ack for internal elan */
/* XXX:
* Trigger a STEN packet to the remote side and then from there
* a elan_get is triggered
* Not sure which remote queue is being used by GET_DMA here */
elan4_remote_dma(elan_ptl->putget->get_cmdq,
(E4_DMA *)&gdesc->main_dma, destvp,
elan4_local_cookie(elan_ptl->putget->pg_cpool,
E4_COOKIE_TYPE_STEN , destvp));
elan4_flush_cmdq_reorder (elan_ptl->putget->get_cmdq);
MEMBAR_DRAIN();
ompi_list_append (&elan_ptl->send_frags, (ompi_list_item_t *) frag);
END_FUNC(PTL_ELAN_DEBUG_SEND);
/* XXX: fragment state, remember the recv_frag may be gone */
frag->desc->req = (mca_pml_base_request_t *) request ; /*recv req*/
frag->desc->desc_status = MCA_PTL_ELAN_DESC_LOCAL;
frag->frag_base.frag_owner= &ptl_peer->peer_ptl->super;
frag->frag_base.frag_peer = recv_frag->frag_recv.frag_base.frag_peer;
frag->frag_base.frag_addr = req->req_base.req_addr;/*final buff*/
frag->frag_base.frag_size = *size;
frag->frag_progressed = 0;
END_FUNC(PTL_ELAN_DEBUG_GET);
return OMPI_SUCCESS;
}
#endif
#endif /* End of OMPI_PTL_ELAN_ENABLE_GET */
int
mca_ptl_elan_start_desc (mca_ptl_elan_send_frag_t * frag,
struct mca_ptl_elan_peer_t *ptl_peer,
@ -649,7 +672,7 @@ mca_ptl_elan_get_with_ack ( mca_ptl_base_module_t * ptl,
flags = 0; /* XXX: No special flags for get */
elan_ptl = (mca_ptl_elan_module_t *) ptl;
request = recv_frag->frag_recv.frag_request;
request = recv_frag->frag_recv.frag_request;
destvp = ((mca_ptl_elan_peer_t *)
recv_frag->frag_recv.frag_base.frag_peer)->peer_vp;
frag->desc->desc_type = MCA_PTL_ELAN_DESC_PUT;
@ -672,16 +695,15 @@ mca_ptl_elan_get_with_ack ( mca_ptl_base_module_t * ptl,
hdr->hdr_ack.hdr_dst_size = remain_len;
LOG_PRINT(PTL_ELAN_DEBUG_ACK,
"remote frag %p local req %p buffer %p size %d \n",
"remote buff %x frag %p local req %p buffer %p size %d \n",
hdr->hdr_frag.hdr_dst_ptr.lval,
hdr->hdr_ack.hdr_src_ptr.pval,
hdr->hdr_ack.hdr_dst_match.pval,
hdr->hdr_ack.hdr_dst_addr.pval,
hdr->hdr_ack.hdr_dst_size);
mca_ptl_elan_init_get_desc (frag, ptl,
recv_frag->frag_recv.frag_base.frag_peer,
request, request->req_bytes_received,
remain_len, flags);
recv_frag, request, remain_len, flags);
/* Trigger a STEN packet to the remote side and then from there
* a elan_get is triggered */

Просмотреть файл

@ -52,12 +52,16 @@
#define PTL_ELAN_DEBUG_GET (0x400)
#define PTL_ELAN_DEBUG_CHAIN (0x800)
#define OMPI_PTL_ELAN_ENABLE_GET (1)
#define OMPI_PTL_ELAN_COMP_QUEUE (0)
#define OMPI_PTL_ELAN_MAX_QSIZE (2048)
#define OMPI_PTL_ELAN_MAX_QSLOTS (128)
#define OMPI_PTL_ELAN_LOST_QSLOTS (1)
#define OMPI_PTL_ELAN_MAX_QDESCS (128)
#define OMPI_PTL_ELAN_NUM_QDESCS (4)
#define OMPI_PTL_ELAN_QDMA_RETRY (16)
#define OMPI_PTL_ELAN_MAX_PUTGET (32)
#define OMPI_PTL_ELAN_NUM_PUTGET (8)
@ -69,9 +73,6 @@
#define OMPI_PTL_ELAN_GET_MIN(a,b) ((a<b)? a:b)
#define OMPI_PTL_ELAN_ALIGNUP(x,a) (((unsigned int)(x) + ((a)-1)) & (-(a)))
#define OMPI_PTL_ELAN_ENABLE_GET (1)
#define OMPI_PTL_ELAN_QDMA_RETRY (16)
/* For now only debug send's */
#if 1
#define PTL_ELAN_DEBUG_FLAG PTL_ELAN_DEBUG_NONE
@ -159,6 +160,18 @@ struct ompi_ptl_elan_recv_queue_t {
};
typedef struct ompi_ptl_elan_recv_queue_t ompi_ptl_elan_recv_queue_t;
struct ompi_ptl_elan_comp_queue_t {
/** <Elan located INPUT_QUEUE_ALIGN'ed with INPUT_QUEUE_SIZE */
E4_InputQueue *input;
ompi_mutex_t rx_lock;
int rx_buffsize;
int rx_slotsize;
int rx_nslots;
/* Recv Queue has to be well-aligned */
ompi_ptl_elan_recv_queue_t *rxq;
};
typedef struct ompi_ptl_elan_comp_queue_t ompi_ptl_elan_comp_queue_t;
/**
* ELAN descriptor for send
*/