Done:
a) Have PTL_Elan running for short messages <= 2048. b) Have RMS_RANK setting the ompi:vpid for simple matching with RMS. c) Fix add_Procs d) Fix bugs related to elan4 style of addressing elan memory e) Pending -malign-double option to be added into configuration f) Temporarily added data integrity check and debugging MACRO TODO: g) To add chained DMA support (2 day) h) test support for arbitrary message sizes in polling mode (1 day) i) test combined functionality with TCP and Elan (1 day) f) Get threaded progress engine working and ready for release (a couple of days) This commit was SVN r1760.
Этот коммит содержится в:
родитель
93115f589e
Коммит
a48d5a3075
@ -10,7 +10,7 @@ AM_CPPFLAGS = -I$(top_ompi_builddir)/src/include \
|
||||
-I$(top_ompi_srcdir)/src -I$(top_ompi_srcdir)/src/include \
|
||||
-I/usr/lib/qsnet/elan4/include
|
||||
|
||||
SUBDIRS = src
|
||||
SUBDIRS = src tests
|
||||
|
||||
EXTRA_DIST = VERSION
|
||||
|
||||
|
@ -5,7 +5,11 @@
|
||||
|
||||
include $(top_ompi_srcdir)/config/Makefile.options
|
||||
|
||||
AM_CPPFLAGS = -I$(top_ompi_builddir)/src/include \
|
||||
#Assumption. We support only i686 for now.
|
||||
|
||||
#AM_CPPFLAGS = -fPIC -D_ELAN4 -D_REENTRANT -O3 -malign-double -march=i686
|
||||
AM_CPPFLAGS = -fPIC -D_ELAN4 -D_REENTRANT -O3 -march=i686
|
||||
AM_CPPFLAGS += -I$(top_ompi_builddir)/src/include \
|
||||
-I$(top_ompi_srcdir)/src -I$(top_ompi_srcdir)/src/include \
|
||||
-I/usr/lib/qsnet/elan4/include
|
||||
|
||||
@ -17,14 +21,13 @@ libmca_ptl_elan_la_SOURCES = \
|
||||
ptl_elan_req.h \
|
||||
ptl_elan_proc.h \
|
||||
ptl_elan_peer.h \
|
||||
ptl_elan_priv.h \
|
||||
ptl_elan_frag.c \
|
||||
ptl_elan_req.c \
|
||||
ptl_elan_proc.c \
|
||||
ptl_elan_module.c \
|
||||
ptl_elan_priv.c \
|
||||
ptl_elan_peer.c \
|
||||
ptl_elan_init.c \
|
||||
ptl_elan_comm_init.c \
|
||||
ptl_elan_module.c \
|
||||
ptl_elan.c
|
||||
|
||||
|
||||
|
@ -55,7 +55,6 @@ mca_ptl_elan_add_procs (struct mca_ptl_t *ptl,
|
||||
mca_ptl_elan_proc_t *ptl_proc;
|
||||
mca_ptl_elan_peer_t *ptl_peer;
|
||||
|
||||
int rc;
|
||||
int i;
|
||||
|
||||
/* Here nprocs is the number of peer processes */
|
||||
@ -83,16 +82,10 @@ mca_ptl_elan_add_procs (struct mca_ptl_t *ptl,
|
||||
return OMPI_ERR_UNREACH;
|
||||
}
|
||||
|
||||
if (ptl_proc == mca_ptl_elan_module.elan_local) {
|
||||
OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* The ptl_proc datastructure is shared by all PTL
|
||||
* instances that are trying to reach this destination.
|
||||
* Cache the peer instance on the ptl_proc.
|
||||
*/
|
||||
|
||||
ptl_peer = OBJ_NEW (mca_ptl_elan_peer_t);
|
||||
if (NULL == ptl_peer) {
|
||||
OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock);
|
||||
@ -101,17 +94,31 @@ mca_ptl_elan_add_procs (struct mca_ptl_t *ptl,
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* TODO: Make the add_procs function cleaner and simpler
|
||||
* 1) Since elan_proc_t will only have one vp to address the
|
||||
* remote processes, there will be only one ptl_peer_t per proc.
|
||||
* The information to be stored there should be just
|
||||
* the addressing information, which is the elan_vp.
|
||||
* If needed, the information can be expanded with memory-handle,
|
||||
* queue-handle, put/get-handle, memory-statics.
|
||||
* 2) XXX: Consider matching elan_vp and ompi_proc_name_t.
|
||||
*/
|
||||
ptl_peer->peer_ptl = (mca_ptl_elan_t *) ptl;
|
||||
ptl_peer->peer_proc = ptl_proc;
|
||||
ptl_proc->proc_peers[ptl_proc->proc_peer_count] = ptl_peer;
|
||||
ptl_proc->proc_peer_count++;
|
||||
|
||||
/* XXX XXX: There might be no order on this ptl_proc's,
|
||||
* But one-to-one corresponding is still there */
|
||||
ptl_peer->peer_addr = ptl_proc->proc_addrs + i;
|
||||
ptl_peer->peer_addr->addr_inuse++;
|
||||
/* There is only one peer per elan_peer_proc_t. */
|
||||
ptl_proc->proc_peers[0] = ptl_peer;
|
||||
if (ptl_proc == mca_ptl_elan_module.elan_local) {
|
||||
ptl_peer->peer_vp = ((mca_ptl_elan_t *)ptl)->elan_vp;
|
||||
} else {
|
||||
ptl_peer->peer_vp = ptl_proc->proc_addrs->elan_vp;
|
||||
ptl_proc->proc_addrs->inuse = 1;
|
||||
}
|
||||
ptl_peer->peer_rails = ((mca_ptl_elan_t *)ptl)->ptl_ni_total;
|
||||
|
||||
/* There is only one peer per elan_peer_proc_t */
|
||||
ptl_proc->proc_peer_count = 1;
|
||||
ompi_bitmap_set_bit (reachable, i);
|
||||
|
||||
OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock);
|
||||
peers[i] = (struct mca_ptl_base_peer_t *) ptl_peer;
|
||||
}
|
||||
@ -160,10 +167,7 @@ mca_ptl_elan_req_alloc (struct mca_ptl_t *ptl,
|
||||
mca_pml_base_send_request_t *sendreq;
|
||||
ompi_list_item_t *item;
|
||||
|
||||
/* FIXME, Error here, rc is passed in by value
|
||||
* Which will not bring any output from this allocation request */
|
||||
OMPI_FREE_LIST_GET (&mca_ptl_elan_module.elan_reqs_free, item, rc);
|
||||
|
||||
if (NULL != (sendreq = (mca_pml_base_send_request_t *) item))
|
||||
sendreq->req_owner = ptl;
|
||||
*request = sendreq;
|
||||
@ -211,32 +215,33 @@ mca_ptl_elan_put (struct mca_ptl_t *ptl,
|
||||
size_t size,
|
||||
int flags)
|
||||
{
|
||||
int rc = OMPI_SUCCESS;
|
||||
mca_ptl_elan_desc_item_t *sd;
|
||||
|
||||
/* XXX: fix pml_send?
|
||||
* Why presenting so many arguments while each of them is already
|
||||
* contained in the request descriptors,
|
||||
*
|
||||
* XXX:
|
||||
/* XXX:
|
||||
* PML extract an request from PTL module and then use this
|
||||
* a request to ask for a fragment
|
||||
* Is it too deep across stacks to get a request and
|
||||
* correspondingly multiple LOCKS to go through*/
|
||||
|
||||
sd = mca_ptl_elan_alloc_send_desc(sendreq);
|
||||
START_FUNC();
|
||||
sd = mca_ptl_elan_alloc_send_desc(ptl, sendreq);
|
||||
if (NULL == sd) {
|
||||
ompi_output(0,
|
||||
"[%s:%d] Unable to allocate an elan send descriptors \n",
|
||||
__FILE__, __LINE__);
|
||||
}
|
||||
|
||||
/* Update offset, in TCP case, this is a must.
|
||||
* XXX: Not sure how it is going to be here */
|
||||
/* Update offset */
|
||||
sendreq->req_offset += size;
|
||||
((struct mca_ptl_elan_send_request_t *)sendreq)->req_frag = sd;
|
||||
|
||||
return mca_ptl_elan_start_desc(
|
||||
((struct mca_ptl_elan_send_request_t *)sendreq)->desc_type, sd);
|
||||
rc = mca_ptl_elan_start_desc(sd,
|
||||
(struct mca_ptl_elan_peer_t *)ptl_peer,
|
||||
sendreq, offset, size, flags);
|
||||
|
||||
END_FUNC();
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -264,5 +269,26 @@ void
|
||||
mca_ptl_elan_matched (mca_ptl_t * ptl,
|
||||
mca_ptl_base_recv_frag_t * frag)
|
||||
{
|
||||
mca_ptl_base_header_t* header = &frag->super.frag_header;
|
||||
|
||||
/* if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) */
|
||||
|
||||
/* XXX: Pseudocode, for additional processing of header fragments
|
||||
* a) (ACK:no, Get:No) Remove the frag. no need for further processing
|
||||
* b) (ACK:yes, Get:No) Send an ACK only
|
||||
* c) (ACK:yes, Get:yes) Get a message, update the fragment descriptor
|
||||
* and then send an ACK,
|
||||
* d) Consider moving time-consuming tasks to some BH-like mechanisms.
|
||||
*/
|
||||
|
||||
/* FIXME: skip the acknowledgement part */
|
||||
if (0) {
|
||||
/* Allocate a send descriptor and send an ACK */
|
||||
}
|
||||
|
||||
/* process fragment if complete */
|
||||
mca_ptl_elan_recv_frag_progress((mca_ptl_elan_recv_frag_t*)frag);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -3,9 +3,6 @@
|
||||
#include <unistd.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#define _ELAN4
|
||||
#define __elan4__
|
||||
|
||||
#include "ptl_elan.h"
|
||||
#include "ptl_elan_priv.h"
|
||||
|
||||
@ -33,10 +30,11 @@ ompi_init_elan_queue_events (mca_ptl_elan_t * ptl,
|
||||
ompi_ptl_elan_qdma_desc_t *ptr;
|
||||
ompi_elan_event_t *elan_ptr;
|
||||
|
||||
START_FUNC();
|
||||
|
||||
rail = (RAIL *) ptl->ptl_elan_rail;
|
||||
ctx = (ELAN4_CTX *) ptl->ptl_elan_ctx;
|
||||
|
||||
|
||||
/* initialize list */
|
||||
OBJ_CONSTRUCT (&queue->tx_desc, ompi_list_t);
|
||||
OBJ_CONSTRUCT (&queue->tx_desc_free, ompi_free_list_t);
|
||||
@ -78,8 +76,9 @@ ompi_init_elan_queue_events (mca_ptl_elan_t * ptl,
|
||||
ompi_list_item_t *item;
|
||||
|
||||
ptr->rail = rail;
|
||||
ptr->ptl = ptl;
|
||||
ptr->elan_data_event = elan_ptr;
|
||||
desc->item = (mca_ptl_elan_send_desc_t)ptr;
|
||||
desc->desc = (ompi_ptl_elan_base_desc_t *)ptr;
|
||||
|
||||
/* Initialize some of the dma structures */
|
||||
{
|
||||
@ -103,6 +102,7 @@ ompi_init_elan_queue_events (mca_ptl_elan_t * ptl,
|
||||
}
|
||||
flist->fl_num_allocated += flist->fl_num_per_alloc;
|
||||
|
||||
END_FUNC();
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -125,6 +125,8 @@ ompi_init_elan_qdma (mca_ptl_elan_module_1_0_0_t * emp,
|
||||
ELAN4_CTX *ctx;
|
||||
struct mca_ptl_elan_t *ptl;
|
||||
|
||||
START_FUNC();
|
||||
|
||||
/* Init the Transmit Queue structure */
|
||||
for (i = 0; i < num_rails; i++) {
|
||||
|
||||
@ -152,6 +154,7 @@ ompi_init_elan_qdma (mca_ptl_elan_module_1_0_0_t * emp,
|
||||
CQ_WriteEnableBit |
|
||||
CQ_DmaStartEnableBit |
|
||||
CQ_STENEnableBit, NULL);
|
||||
|
||||
OMPI_PTL_ELAN_CHECK_UNEX (queue->tx_cmdq, NULL, OMPI_ERROR, 0);
|
||||
|
||||
/*
|
||||
@ -193,6 +196,7 @@ ompi_init_elan_qdma (mca_ptl_elan_module_1_0_0_t * emp,
|
||||
|
||||
/* Set the top et al */
|
||||
rxq->qr_efitem = (E4_uint64) elan4_main2elan (ctx, rxq->qr_fptr);
|
||||
assert(rxq->qr_efitem != ELAN_BAD_ADDR);
|
||||
rxq->qr_base = rxq->qr_fptr;
|
||||
rxq->qr_top = (void *) ((uintptr_t) rxq->qr_base
|
||||
+ (queue->rx_slotsize * (nslots - 1)));
|
||||
@ -200,11 +204,11 @@ ompi_init_elan_qdma (mca_ptl_elan_module_1_0_0_t * emp,
|
||||
rxq->qr_elitem =
|
||||
rxq->qr_efitem + (queue->rx_slotsize * (nslots - 1));
|
||||
|
||||
/* Event to wait/block on */
|
||||
rxq->qr_qEvent = &rxq->qr_elanDone;
|
||||
/* Event to wait/block on, Bug here for the event */
|
||||
rxq->qr_qEvent = rxq->qr_elanDone;
|
||||
|
||||
queue->input->q_event =
|
||||
SDRAM2ELAN (ctx, (void *) &rxq->qr_elanDone);
|
||||
SDRAM2ELAN (ctx, (void *) rxq->qr_elanDone);
|
||||
queue->input->q_fptr = rxq->qr_efitem;
|
||||
queue->input->q_bptr = rxq->qr_efitem;
|
||||
queue->input->q_control =
|
||||
@ -212,15 +216,17 @@ ompi_init_elan_qdma (mca_ptl_elan_module_1_0_0_t * emp,
|
||||
queue->rx_slotsize);
|
||||
|
||||
/* The event */
|
||||
INITEVENT_WORD (ctx, (EVENT *) & rxq->qr_elanDone,
|
||||
INITEVENT_WORD (ctx, (EVENT *) rxq->qr_elanDone,
|
||||
&rxq->qr_doneWord);
|
||||
RESETEVENT_WORD (&rxq->qr_doneWord);
|
||||
PRIMEEVENT_WORD (ctx, (EVENT *) & rxq->qr_elanDone, 1);
|
||||
PRIMEEVENT_WORD (ctx, (EVENT *) rxq->qr_elanDone, 1);
|
||||
|
||||
rxq->qr_cmdq = elan4_alloc_cmdq (ctx, rail->r_alloc,
|
||||
CQ_Size1K,
|
||||
CQ_WriteEnableBit |
|
||||
CQ_WaitEventEnableBit, NULL);
|
||||
|
||||
/*elan4_disp_cmdq_params (rxq->qr_cmdq);*/
|
||||
OMPI_PTL_ELAN_CHECK_UNEX (rxq->qr_cmdq, NULL, OMPI_ERROR, 0);
|
||||
|
||||
/* Allocate a sleepDesc for threads to block on */
|
||||
@ -231,6 +237,7 @@ ompi_init_elan_qdma (mca_ptl_elan_module_1_0_0_t * emp,
|
||||
OBJ_CONSTRUCT (&queue->rx_lock, ompi_mutex_t);
|
||||
}
|
||||
|
||||
END_FUNC();
|
||||
return (OMPI_SUCCESS);
|
||||
}
|
||||
|
||||
|
@ -15,33 +15,13 @@
|
||||
#include "ptl_elan_req.h"
|
||||
#include "ptl_elan_priv.h"
|
||||
|
||||
#if 0
|
||||
static void
|
||||
mca_ptl_elan_send_frag_construct (mca_ptl_elan_send_frag_t * frag)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
static void
|
||||
mca_ptl_elan_send_frag_destruct (mca_ptl_elan_send_frag_t * frag)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
ompi_class_t mca_ptl_elan_send_frag_t_class = {
|
||||
"mca_ptl_elan_send_frag_t",
|
||||
OBJ_CLASS (mca_ptl_base_send_frag_t),
|
||||
(ompi_construct_t) mca_ptl_elan_send_frag_construct,
|
||||
(ompi_destruct_t) mca_ptl_elan_send_frag_destruct
|
||||
};
|
||||
#endif
|
||||
|
||||
static void
|
||||
mca_ptl_elan_recv_frag_construct (mca_ptl_elan_recv_frag_t * frag)
|
||||
{
|
||||
/*OBJ_CONSTRUCT (frag, mca_ptl_elan_recv_frag_t);*/
|
||||
frag->frag_hdr_cnt = 0;
|
||||
frag->frag_msg_cnt = 0;
|
||||
frag->frag_progressed = 0;
|
||||
|
||||
frag->frag.qdma = NULL;
|
||||
frag->alloc_buff = (char *) malloc (sizeof (char) * 2048 + 32);
|
||||
if (NULL == frag->alloc_buff) {
|
||||
@ -59,11 +39,12 @@ mca_ptl_elan_recv_frag_destruct (mca_ptl_elan_recv_frag_t * frag)
|
||||
* works only for non-dynamically allocated objects */
|
||||
frag->frag_hdr_cnt = 0;
|
||||
frag->frag_msg_cnt = 0;
|
||||
frag->frag_progressed = 0;
|
||||
|
||||
frag->frag.qdma = NULL;
|
||||
free (frag->alloc_buff);
|
||||
frag->alloc_buff = NULL;
|
||||
frag->unex_buff = NULL;
|
||||
/*OBJ_DESTRUCT (frag);*/
|
||||
}
|
||||
|
||||
ompi_class_t mca_ptl_elan_recv_frag_t_class = {
|
||||
@ -76,31 +57,27 @@ ompi_class_t mca_ptl_elan_recv_frag_t_class = {
|
||||
extern mca_ptl_elan_state_t mca_ptl_elan_global_state;
|
||||
|
||||
mca_ptl_elan_desc_item_t *
|
||||
mca_ptl_elan_alloc_send_desc (struct mca_pml_base_send_request_t *req)
|
||||
mca_ptl_elan_alloc_send_desc (struct mca_ptl_t *ptl_ptr,
|
||||
struct mca_pml_base_send_request_t *sendreq)
|
||||
{
|
||||
struct ompi_ptl_elan_queue_ctrl_t *queue;
|
||||
mca_ptl_elan_t *ptl;
|
||||
struct mca_ptl_elan_peer_t *peer;
|
||||
size_t offset;
|
||||
size_t size;
|
||||
|
||||
ompi_free_list_t *flist;
|
||||
ompi_list_item_t *item;
|
||||
mca_ptl_elan_desc_item_t *desc;
|
||||
|
||||
ptl = (mca_ptl_elan_t *) req->req_owner;
|
||||
peer = (mca_ptl_elan_peer_t *) req->req_peer;
|
||||
offset = req->req_offset;
|
||||
size = ptl->super.ptl_first_frag_size;
|
||||
START_FUNC();
|
||||
|
||||
/* For now, bind to queue DMA directly */
|
||||
{
|
||||
queue = ptl->queue;
|
||||
queue = ((mca_ptl_elan_t *) ptl_ptr)->queue;
|
||||
flist = &queue->tx_desc_free;
|
||||
|
||||
if (ompi_using_threads ()) {
|
||||
|
||||
ompi_mutex_lock (&((flist)->fl_lock));
|
||||
ompi_mutex_lock(&flist->fl_lock);
|
||||
|
||||
item = ompi_list_remove_first (&((flist)->super));
|
||||
|
||||
/* Progress this PTL module to get back a descriptor,
|
||||
@ -108,15 +85,15 @@ mca_ptl_elan_alloc_send_desc (struct mca_pml_base_send_request_t *req)
|
||||
while (NULL == item) {
|
||||
mca_ptl_tstamp_t tstamp = 0;
|
||||
|
||||
ptl->super.ptl_module->ptlm_progress (tstamp);
|
||||
ptl_ptr->ptl_module->ptlm_progress (tstamp);
|
||||
item = ompi_list_remove_first (&((flist)->super));
|
||||
}
|
||||
ompi_mutex_unlock (&((flist)->fl_lock));
|
||||
ompi_mutex_unlock(&flist->fl_lock);
|
||||
} else {
|
||||
item = ompi_list_remove_first (&((flist)->super));
|
||||
|
||||
/* Progress this PTL module to get back a descriptor,
|
||||
* Is it OK to progress with ptl->ptl_send_progress? */
|
||||
* Is it OK to progress with ptl->ptl_send_progress()? */
|
||||
while (NULL == item) {
|
||||
mca_ptl_tstamp_t tstamp = 0;
|
||||
|
||||
@ -125,16 +102,16 @@ mca_ptl_elan_alloc_send_desc (struct mca_pml_base_send_request_t *req)
|
||||
* PTL's from other modules. Wait for PML to change.
|
||||
* Otherwise have to trigger PML progress from PTL. Ouch..
|
||||
*/
|
||||
ptl->super.ptl_module->ptlm_progress (tstamp);
|
||||
ptl_ptr->ptl_module->ptlm_progress (tstamp);
|
||||
item = ompi_list_remove_first (&((flist)->super));
|
||||
}
|
||||
}
|
||||
|
||||
((struct mca_ptl_elan_send_request_t *) req)->desc_type
|
||||
= MCA_PTL_ELAN_QDMA_DESC;
|
||||
desc = (mca_ptl_elan_desc_item_t *) item;
|
||||
desc->desc->desc_type = MCA_PTL_ELAN_QDMA_DESC;
|
||||
}
|
||||
desc->desc->req = (struct mca_pml_base_send_request_t *)sendreq;
|
||||
|
||||
END_FUNC();
|
||||
return desc;
|
||||
}
|
||||
|
||||
|
@ -20,21 +20,11 @@
|
||||
extern ompi_class_t mca_ptl_elan_recv_frag_t_class;
|
||||
|
||||
struct mca_ptl_elan_peer_t;
|
||||
struct ompi_ptl_elan_qdma_frag_t;
|
||||
struct ompi_ptl_elan_putget_frag_t;
|
||||
|
||||
/**
|
||||
* ELAN descriptor for send
|
||||
*/
|
||||
union mca_ptl_elan_send_desc_t {
|
||||
struct ompi_ptl_elan_qdma_desc_t *qdma;
|
||||
struct ompi_ptl_elan_putget_desc_t *putget;
|
||||
};
|
||||
typedef union mca_ptl_elan_send_desc_t mca_ptl_elan_send_desc_t;
|
||||
struct ompi_ptl_elan_base_desc_t;
|
||||
|
||||
struct mca_ptl_elan_desc_item_t {
|
||||
ompi_list_item_t super;
|
||||
mca_ptl_elan_send_desc_t item;
|
||||
struct ompi_ptl_elan_base_desc_t *desc;
|
||||
};
|
||||
typedef struct mca_ptl_elan_desc_item_t mca_ptl_elan_desc_item_t;
|
||||
|
||||
@ -45,6 +35,7 @@ struct mca_ptl_elan_recv_frag_t {
|
||||
mca_ptl_base_recv_frag_t super;
|
||||
size_t frag_hdr_cnt;
|
||||
size_t frag_msg_cnt;
|
||||
int frag_progressed;
|
||||
union {
|
||||
struct ompi_ptl_elan_qdma_frag_t *qdma;
|
||||
struct ompi_ptl_elan_putget_frag_t *putget;
|
||||
@ -55,7 +46,8 @@ struct mca_ptl_elan_recv_frag_t {
|
||||
typedef struct mca_ptl_elan_recv_frag_t mca_ptl_elan_recv_frag_t;
|
||||
|
||||
mca_ptl_elan_desc_item_t *
|
||||
mca_ptl_elan_alloc_send_desc(struct mca_pml_base_send_request_t *req);
|
||||
mca_ptl_elan_alloc_send_desc( struct mca_ptl_t *ptl,
|
||||
struct mca_pml_base_send_request_t *sendreq);
|
||||
|
||||
mca_ptl_elan_recv_frag_t *
|
||||
mca_ptl_elan_alloc_recv_desc(struct mca_pml_base_recv_request_t *req);
|
||||
@ -63,6 +55,10 @@ mca_ptl_elan_alloc_recv_desc(struct mca_pml_base_recv_request_t *req);
|
||||
static inline void
|
||||
mca_ptl_elan_recv_frag_progress(mca_ptl_elan_recv_frag_t* frag)
|
||||
{
|
||||
/* Upto this point, this only means the fragment has been
|
||||
matched with a posted receive descriptor */
|
||||
if (fetchNset (&frag->frag_progressed, 1) == 0) {
|
||||
#if 1
|
||||
/* make sure this only happens once for threaded case */
|
||||
mca_pml_base_recv_request_t* request;
|
||||
mca_ptl_base_recv_progress_fn_t progress;
|
||||
@ -73,6 +69,9 @@ mca_ptl_elan_recv_frag_progress(mca_ptl_elan_recv_frag_t* frag)
|
||||
/* progress the request */
|
||||
progress((frag)->super.super.frag_owner, request, &(frag)->super);
|
||||
mca_ptl_elan_recv_frag_return((frag)->super.super.frag_owner, (frag));
|
||||
#endif
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -21,7 +21,11 @@ ompi_mca_ptl_elan_setup (mca_ptl_elan_state_t * ems)
|
||||
mca_ptl_elan_module_1_0_0_t *emp;
|
||||
int rail_count;
|
||||
|
||||
START_FUNC();
|
||||
|
||||
rail_count = ems->elan_nrails;
|
||||
emp = ems->elan_module;
|
||||
emp->elan_num_ptls = 0;
|
||||
emp->elan_ptls = malloc (rail_count * sizeof (mca_ptl_elan_t *));
|
||||
if (NULL == emp->elan_ptls) {
|
||||
ompi_output (0,
|
||||
@ -50,21 +54,22 @@ ompi_mca_ptl_elan_setup (mca_ptl_elan_state_t * ems)
|
||||
|
||||
ptl->ptl_ni_local = emp->elan_num_ptls;
|
||||
ptl->ptl_ni_total = rail_count;
|
||||
emp->elan_num_ptls++;
|
||||
|
||||
/* allow user to specify per rail bandwidth and latency */
|
||||
sprintf (param, "bandwidth_elanrail%d", emp->elan_num_ptls);
|
||||
ptl->super.ptl_bandwidth =
|
||||
mca_ptl_elan_param_register_int (param, 1000);
|
||||
sprintf (param, "latency_elanrail%d", emp->elan_num_ptls);
|
||||
|
||||
ptl->super.ptl_latency =
|
||||
mca_ptl_elan_param_register_int (param, 1);
|
||||
|
||||
/* Setup elan related structures such as ctx, rail */
|
||||
ptl->ptl_elan_rail = ems->elan_rail[rail_count];
|
||||
ptl->ptl_elan_ctx = ems->elan_rail[rail_count]->rail_ctx;
|
||||
ptl->ptl_elan_rail = ems->elan_rail[emp->elan_num_ptls];
|
||||
ptl->ptl_elan_ctx = ems->elan_rail[emp->elan_num_ptls]->rail_ctx;
|
||||
ptl->elan_vp = ems->elan_vp;
|
||||
ptl->elan_nvp = ems->elan_nvp;
|
||||
emp->elan_num_ptls++;
|
||||
} while (emp->elan_num_ptls < rail_count);
|
||||
|
||||
/* Allocating all the communication strcutures for PTL's, */
|
||||
@ -86,6 +91,7 @@ ompi_mca_ptl_elan_setup (mca_ptl_elan_state_t * ems)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
END_FUNC();
|
||||
return (OMPI_SUCCESS);
|
||||
}
|
||||
|
||||
@ -307,6 +313,8 @@ ompi_mca_ptl_elan_init (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
|
||||
mca_ptl_elan_state_t *ems;
|
||||
|
||||
START_FUNC();
|
||||
|
||||
ems = &mca_ptl_elan_global_state;
|
||||
|
||||
/* Hook two of them togther */
|
||||
@ -523,6 +531,7 @@ ompi_mca_ptl_elan_init (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
END_FUNC();
|
||||
return (OMPI_SUCCESS);
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,8 @@
|
||||
|
||||
/*#define UNIT_TESTING 1*/
|
||||
|
||||
extern ompi_proc_t *ompi_proc_local_proc;
|
||||
|
||||
mca_ptl_elan_module_1_0_0_t mca_ptl_elan_module = {
|
||||
{
|
||||
/* Base module information about itself */
|
||||
@ -75,7 +77,8 @@ static int mca_ptl_elan_module_register (mca_ptl_elan_module_1_0_0_t *emp)
|
||||
for(i=0; i<emp->elan_num_ptls; i++) {
|
||||
mca_ptl_elan_t * ptl = emp->elan_ptls[i];
|
||||
addrs[i].elan_vp = ptl->elan_vp;
|
||||
addrs[i].addr_inuse = 0;
|
||||
addrs[i].inuse = 0;
|
||||
addrs[i].gid = ompi_proc_local_proc->proc_name;
|
||||
}
|
||||
|
||||
rc = mca_base_modex_send(&emp->super.ptlm_version, addrs, size);
|
||||
@ -159,14 +162,16 @@ mca_ptl_elan_module_close (void)
|
||||
/* Check whether all the entries are return to the free list */
|
||||
if (elan_mp->elan_reqs_free.fl_num_allocated !=
|
||||
elan_mp->elan_reqs_free.super.ompi_list_length) {
|
||||
ompi_output (0, "elan requests: %d allocated %d returned\n",
|
||||
ompi_output (0, "[%s:%d] send_requests: %d allocated %d returned\n",
|
||||
__FILE__, __LINE__,
|
||||
elan_mp->elan_reqs_free.fl_num_allocated,
|
||||
elan_mp->elan_reqs_free.super.ompi_list_length);
|
||||
}
|
||||
|
||||
if (elan_mp->elan_recv_frags_free.fl_num_allocated !=
|
||||
elan_mp->elan_recv_frags_free.super.ompi_list_length) {
|
||||
ompi_output (0, "elan requests: %d allocated %d returned\n",
|
||||
ompi_output (0, "[%s:%d] recv_frags : %d allocated %d returned\n",
|
||||
__FILE__, __LINE__,
|
||||
elan_mp->elan_recv_frags_free.fl_num_allocated,
|
||||
elan_mp->elan_recv_frags_free.super.ompi_list_length);
|
||||
}
|
||||
@ -207,6 +212,15 @@ mca_ptl_elan_module_init (int *num_ptls,
|
||||
|
||||
*num_ptls = 0;
|
||||
|
||||
START_FUNC();
|
||||
|
||||
if (CHECK_ELAN)
|
||||
{
|
||||
char hostname[32]; gethostname(hostname, 32);
|
||||
fprintf(stderr, "[%s:%s:%d] debugging ...\n",
|
||||
hostname, __FUNCTION__, __LINE__);
|
||||
}
|
||||
|
||||
/* TODO: support multiple threads */
|
||||
|
||||
*allow_multi_user_threads = true;
|
||||
@ -234,14 +248,12 @@ mca_ptl_elan_module_init (int *num_ptls,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#ifndef UNIT_TESTING
|
||||
if (OMPI_SUCCESS != mca_ptl_elan_module_register(&mca_ptl_elan_module)) {
|
||||
ompi_output(0,
|
||||
"[%s:%d] error in registering with Runtime/OOB \n",
|
||||
__FILE__, __LINE__);
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
ptls = (mca_ptl_t **) malloc (elan_mp->elan_num_ptls *
|
||||
sizeof (mca_ptl_elan_t *));
|
||||
@ -257,6 +269,7 @@ mca_ptl_elan_module_init (int *num_ptls,
|
||||
*num_ptls = elan_mp->elan_num_ptls;
|
||||
mca_ptl_elan_module_initialized = 1;
|
||||
|
||||
END_FUNC();
|
||||
return ptls;
|
||||
}
|
||||
|
||||
@ -277,10 +290,14 @@ mca_ptl_elan_module_control (int param,
|
||||
|
||||
/* TODO: to support event-based module progress later. */
|
||||
|
||||
static int times = 0;
|
||||
int
|
||||
mca_ptl_elan_module_progress (mca_ptl_tstamp_t tstamp)
|
||||
{
|
||||
START_FUNC();
|
||||
/*if ( times == 5) exit(1); else times ++;*/
|
||||
mca_ptl_elan_drain_recv(elan_mp);
|
||||
mca_ptl_elan_update_send(elan_mp);
|
||||
END_FUNC();
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -15,13 +15,12 @@ mca_ptl_elan_peer_construct (mca_ptl_elan_peer_t * ptl_peer)
|
||||
{
|
||||
ptl_peer->peer_ptl = NULL;
|
||||
ptl_peer->peer_proc = NULL;
|
||||
ptl_peer->peer_addr = NULL;
|
||||
ptl_peer->peer_state = MCA_PTL_ELAN_CLOSED;
|
||||
ptl_peer->peer_vp = -1;
|
||||
ptl_peer->peer_rails = 0;
|
||||
ptl_peer->num_credits = 0; /* Number of credits for the local PTL */
|
||||
ptl_peer->max_credits = 0; /* Number of credits for the local PTL */
|
||||
ptl_peer->resending = 0; /* A resending stage, no more new dma's */
|
||||
ptl_peer->num_resend = 0; /* How many times I have retried */
|
||||
ptl_peer->known_alive_time = 0;
|
||||
ptl_peer->num_resends = 0; /* How many times I have retried */
|
||||
}
|
||||
|
||||
/* Cleanup any resources held by the peer. */
|
||||
|
@ -16,15 +16,6 @@
|
||||
#include "mca/ptl/ptl.h"
|
||||
#include "ptl_elan_frag.h"
|
||||
|
||||
typedef enum {
|
||||
MCA_PTL_ELAN_CLOSED,
|
||||
MCA_PTL_ELAN_CONNECTED,
|
||||
MCA_PTL_ELAN_FAILED,
|
||||
NUM_MCA_PTL_ELAN_STAT
|
||||
} mca_ptl_elan_status_t;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* An abstraction that represents a connection to a peer process.
|
||||
*/
|
||||
@ -32,16 +23,14 @@ struct mca_ptl_elan_peer_t {
|
||||
ompi_list_item_t super;
|
||||
|
||||
struct mca_ptl_elan_t* peer_ptl;
|
||||
/*struct mca_ptl_elan_remote_t *peer_ptl; */
|
||||
struct mca_ptl_elan_proc_t* peer_proc;
|
||||
struct mca_ptl_elan_addr_t* peer_addr; /**< address of peer */
|
||||
|
||||
int peer_state;
|
||||
int num_credits;
|
||||
int max_credits;
|
||||
int peer_vp;
|
||||
int peer_rails;
|
||||
int num_credits; /* Got to be an arry for rails */
|
||||
int max_credits; /* Got to be an arry for rails */
|
||||
int resending;
|
||||
int num_resend;
|
||||
double known_alive_time;
|
||||
int num_resends;
|
||||
};
|
||||
typedef struct mca_ptl_elan_peer_t mca_ptl_elan_peer_t;
|
||||
|
||||
|
@ -15,25 +15,33 @@
|
||||
static void
|
||||
mca_ptl_elan_init_qdma_desc (struct ompi_ptl_elan_qdma_desc_t *desc,
|
||||
mca_ptl_elan_t * ptl,
|
||||
mca_ptl_elan_send_request_t * req)
|
||||
struct mca_ptl_elan_peer_t *ptl_peer,
|
||||
mca_pml_base_send_request_t *pml_req,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
int flags)
|
||||
{
|
||||
char *app_buff;
|
||||
int header_length;
|
||||
int mesg_length;
|
||||
int flags = 0; /* FIXME: now */
|
||||
mca_ptl_base_header_t *hdr;
|
||||
|
||||
int destvp = ptl->elan_vp;
|
||||
int destvp;
|
||||
|
||||
START_FUNC();
|
||||
|
||||
destvp = ptl_peer->peer_vp;
|
||||
|
||||
/* TODO: For now, assume data are contiguous and less than eager size */
|
||||
app_buff = (char *) req->super.super.req_addr;
|
||||
app_buff = (char *) pml_req->super.req_addr;
|
||||
|
||||
header_length = sizeof (mca_ptl_base_match_header_t);
|
||||
mesg_length = req->super.req_bytes_packed;
|
||||
mesg_length = pml_req->req_bytes_packed;
|
||||
|
||||
hdr = (mca_ptl_base_header_t *) & desc->buff[0];
|
||||
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
|
||||
/*hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;*/
|
||||
hdr->hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_match_header_t);
|
||||
hdr->hdr_frag.hdr_frag_offset = 0;
|
||||
@ -41,12 +49,12 @@ mca_ptl_elan_init_qdma_desc (struct ompi_ptl_elan_qdma_desc_t *desc,
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = 0;
|
||||
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
|
||||
|
||||
hdr->hdr_match.hdr_contextid = req->super.super.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = req->super.super.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = req->super.super.req_peer;
|
||||
hdr->hdr_match.hdr_tag = req->super.super.req_tag;
|
||||
hdr->hdr_match.hdr_contextid = pml_req->super.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = pml_req->super.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = pml_req->super.req_peer;
|
||||
hdr->hdr_match.hdr_tag = pml_req->super.req_tag;
|
||||
hdr->hdr_match.hdr_msg_length = mesg_length;
|
||||
hdr->hdr_match.hdr_msg_seq = req->super.super.req_sequence;
|
||||
hdr->hdr_match.hdr_msg_seq = pml_req->super.req_sequence;
|
||||
hdr->hdr_frag.hdr_frag_length = mesg_length;
|
||||
|
||||
/* Fill up all the necessary fields */
|
||||
@ -59,45 +67,60 @@ mca_ptl_elan_init_qdma_desc (struct ompi_ptl_elan_qdma_desc_t *desc,
|
||||
mesg_length),
|
||||
DMA_DataTypeByte,
|
||||
DMA_QueueWrite, 16);
|
||||
desc->main_dma.dma_cookie =
|
||||
elan4_local_cookie (ptl->queue->tx_cpool, E4_COOKIE_TYPE_LOCAL_DMA,
|
||||
destvp);
|
||||
desc->main_dma.dma_vproc = destvp;
|
||||
|
||||
ompi_output (0,
|
||||
"elan_queueTx(%p): DMA: typeSize %Lx vproc %lx srcAddr %Lx "
|
||||
"dstAddr %Lx srcEvent %Lx dstEvent %Lx\n",
|
||||
desc->rail,
|
||||
desc->main_dma.dma_typeSize,
|
||||
desc->main_dma.dma_vproc,
|
||||
desc->main_dma.dma_srcAddr,
|
||||
desc->main_dma.dma_dstAddr,
|
||||
desc->main_dma.dma_srcEvent, desc->main_dma.dma_dstEvent);
|
||||
desc->main_dma.dma_cookie =
|
||||
elan4_local_cookie (ptl->queue->tx_cpool,
|
||||
E4_COOKIE_TYPE_LOCAL_DMA, destvp);
|
||||
|
||||
if (CHECK_ELAN)
|
||||
{
|
||||
char hostname[32];
|
||||
gethostname(hostname, 32);
|
||||
|
||||
fprintf(stderr,
|
||||
"[%s send...] destvp %d type %d flag %d size %d\n",
|
||||
hostname, destvp,
|
||||
hdr->hdr_common.hdr_type,
|
||||
hdr->hdr_common.hdr_flags,
|
||||
hdr->hdr_common.hdr_size);
|
||||
}
|
||||
|
||||
desc->main_dma.dma_vproc = destvp;
|
||||
|
||||
/* Make main memory coherent with IO domain (IA64) */
|
||||
MEMBAR_VISIBLE ();
|
||||
END_FUNC();
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
mca_ptl_elan_start_desc (int type,
|
||||
mca_ptl_elan_desc_item_t * desc)
|
||||
mca_ptl_elan_start_desc (mca_ptl_elan_desc_item_t * desc,
|
||||
struct mca_ptl_elan_peer_t *ptl_peer,
|
||||
struct mca_pml_base_send_request_t *sendreq,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
int flags)
|
||||
{
|
||||
mca_ptl_elan_t *ptl;
|
||||
mca_ptl_elan_send_request_t *req;
|
||||
|
||||
if (type == MCA_PTL_ELAN_QDMA_DESC) {
|
||||
START_FUNC();
|
||||
|
||||
if (desc->desc->desc_type == MCA_PTL_ELAN_QDMA_DESC) {
|
||||
struct ompi_ptl_elan_qdma_desc_t *qdma;
|
||||
|
||||
qdma = desc->item.qdma;
|
||||
qdma = (ompi_ptl_elan_qdma_desc_t *)desc->desc;
|
||||
ptl = qdma->ptl;
|
||||
req = qdma->req;
|
||||
|
||||
mca_ptl_elan_init_qdma_desc (qdma, ptl, req);
|
||||
/* Could assert the following is the same as sendreq */
|
||||
/*req = qdma->req;*/
|
||||
|
||||
mca_ptl_elan_init_qdma_desc (qdma, ptl, ptl_peer, sendreq,
|
||||
offset, size, flags);
|
||||
|
||||
elan4_run_dma_cmd (ptl->queue->tx_cmdq, (DMA *) & qdma->main_dma);
|
||||
ptl->queue->tx_cmdq->cmdq_flush (ptl->queue->tx_cmdq);
|
||||
ompi_output (0, "elan_queueTx(%p) returning %p\n",
|
||||
ptl->queue, desc);
|
||||
|
||||
/*ptl->queue->tx_cmdq->cmdq_flush */
|
||||
elan4_flush_cmdq_reorder (ptl->queue->tx_cmdq);
|
||||
|
||||
/* Insert desc into the list of outstanding DMA's */
|
||||
ompi_list_append (&ptl->queue->tx_desc, (ompi_list_item_t *) desc);
|
||||
@ -107,6 +130,8 @@ mca_ptl_elan_start_desc (int type,
|
||||
"Other types of DMA are not supported right now \n");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
END_FUNC();
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -115,6 +140,7 @@ static void
|
||||
mca_ptl_elan_data_frag (struct mca_ptl_elan_t *ptl,
|
||||
mca_ptl_base_header_t * header)
|
||||
{
|
||||
/* For PML interfacing, refer to mca_ptl_tcp_recv_frag_match(frag, sd);*/
|
||||
/* Allocate a recv frag descriptor */
|
||||
mca_ptl_elan_recv_frag_t *recv_frag;
|
||||
ompi_list_item_t *item;
|
||||
@ -134,11 +160,15 @@ mca_ptl_elan_data_frag (struct mca_ptl_elan_t *ptl,
|
||||
}
|
||||
|
||||
recv_frag = (mca_ptl_elan_recv_frag_t *) item;
|
||||
|
||||
recv_frag->super.super.frag_owner = (mca_ptl_t *) ptl;
|
||||
recv_frag->super.super.frag_addr = NULL;
|
||||
recv_frag->super.super.frag_size = 0;
|
||||
recv_frag->super.super.frag_peer = NULL; /* FIXME: peer; */
|
||||
|
||||
/* XXX:
|
||||
* Since elan is not connection oriented,
|
||||
* No information about which peer until checking the header
|
||||
*/
|
||||
recv_frag->super.super.frag_peer = NULL;
|
||||
recv_frag->super.frag_request = 0;
|
||||
recv_frag->super.frag_is_buffered = false;
|
||||
recv_frag->frag_hdr_cnt = 0;
|
||||
@ -152,12 +182,19 @@ mca_ptl_elan_data_frag (struct mca_ptl_elan_t *ptl,
|
||||
&recv_frag->super,
|
||||
&recv_frag->super.super.frag_header.
|
||||
hdr_match)) {
|
||||
/*mca_ptl_tcp_recv_frag_matched(frag);*/
|
||||
/* copy into the request buffer */
|
||||
request = recv_frag->super.frag_request;
|
||||
if (header->hdr_frag.hdr_frag_length > 0) {
|
||||
memcpy (request->super.req_addr,
|
||||
(char *) header + sizeof (mca_ptl_base_header_t),
|
||||
header->hdr_frag.hdr_frag_length);
|
||||
}
|
||||
recv_frag->super.frag_is_buffered = false ;
|
||||
recv_frag->super.super.frag_addr = request->super.req_addr;
|
||||
recv_frag->super.super.frag_size = header->hdr_frag.hdr_frag_length;
|
||||
} else {
|
||||
/* XXX: Fragment is buffered */
|
||||
recv_frag->super.frag_is_buffered = true;
|
||||
recv_frag->super.super.frag_addr = recv_frag->unex_buff;
|
||||
recv_frag->super.super.frag_size =
|
||||
@ -168,7 +205,8 @@ mca_ptl_elan_data_frag (struct mca_ptl_elan_t *ptl,
|
||||
}
|
||||
|
||||
/* Complete the fragment */
|
||||
if (NULL != recv_frag->super.frag_request) {
|
||||
if (fetchNset (&recv_frag->frag_progressed, 1) == 0 &&
|
||||
NULL != recv_frag->super.frag_request) {
|
||||
mca_ptl_base_recv_progress_fn_t progress;
|
||||
|
||||
progress = recv_frag->super.super.frag_owner->ptl_recv_progress;
|
||||
@ -177,9 +215,12 @@ mca_ptl_elan_data_frag (struct mca_ptl_elan_t *ptl,
|
||||
/* progress the request */
|
||||
progress (recv_frag->super.super.frag_owner, request,
|
||||
&recv_frag->super);
|
||||
|
||||
/* FIXME:
|
||||
* To support the required ACK, do not return
|
||||
* until the ack is out */
|
||||
mca_ptl_elan_recv_frag_return (recv_frag->super.super.frag_owner,
|
||||
recv_frag);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -208,10 +249,12 @@ mca_ptl_elan_drain_recv (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
int i;
|
||||
int rc;
|
||||
|
||||
START_FUNC();
|
||||
num_ptls = emp->elan_num_ptls;
|
||||
|
||||
/* Iterate over all the PTL input Queues */
|
||||
for (i = 0; i < num_ptls; i++) {
|
||||
|
||||
ptl = emp->elan_ptls[i];
|
||||
queue = emp->elan_ptls[i]->queue;
|
||||
rxq = queue->rxq;
|
||||
@ -219,7 +262,7 @@ mca_ptl_elan_drain_recv (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
|
||||
OMPI_LOCK (&queue->rx_lock);
|
||||
|
||||
rc = elan4_pollevent_word (ctx, &rxq->qr_doneWord, 1);
|
||||
rc = elan4_pollevent_word (ctx, &rxq->qr_doneWord, 0);
|
||||
|
||||
if (rc) {
|
||||
|
||||
@ -227,6 +270,19 @@ mca_ptl_elan_drain_recv (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
|
||||
header = (mca_ptl_base_header_t *) rxq->qr_fptr;
|
||||
|
||||
if (CHECK_ELAN)
|
||||
{
|
||||
char hostname[32];
|
||||
gethostname(hostname, 32);
|
||||
|
||||
fprintf(stderr,
|
||||
"[%s recv...] type %x flag %x size %x\n",
|
||||
hostname,
|
||||
header->hdr_common.hdr_type,
|
||||
header->hdr_common.hdr_flags,
|
||||
header->hdr_common.hdr_size);
|
||||
}
|
||||
|
||||
switch (header->hdr_common.hdr_type) {
|
||||
case MCA_PTL_HDR_TYPE_MATCH:
|
||||
case MCA_PTL_HDR_TYPE_FRAG:
|
||||
@ -239,9 +295,10 @@ mca_ptl_elan_drain_recv (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
mca_ptl_elan_ctrl_frag (ptl, header);
|
||||
break;
|
||||
default:
|
||||
ompi_output (0, "[%s:%d] unknow fragment type %d\n"
|
||||
fprintf(stdout, "[%s:%d] unknow fragment type %d\n",
|
||||
__FILE__, __LINE__,
|
||||
header->hdr_common.hdr_type);
|
||||
fflush(stdout);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -268,17 +325,20 @@ mca_ptl_elan_drain_recv (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
/* Re-prime queue event by issuing a waitevent(1) on it */
|
||||
elan4_wait_event_cmd (rxq->qr_cmdq,
|
||||
/* Is qr_elanDone really a main memory address? */
|
||||
MAIN2ELAN (ctx, &rxq->qr_elanDone),
|
||||
MAIN2ELAN (ctx, rxq->qr_elanDone),
|
||||
E4_EVENT_INIT_VALUE (-32, E4_EVENT_WRITE,
|
||||
E4_EVENT_DTYPE_LONG, 0),
|
||||
MAIN2ELAN (ctx, (void *) &rxq-> qr_doneWord),
|
||||
MAIN2ELAN (ctx, (void *) &rxq->qr_doneWord),
|
||||
0xfeedfacedeadbeef);
|
||||
rxq->qr_cmdq->cmdq_flush (rxq->qr_cmdq);
|
||||
|
||||
/*rxq->qr_cmdq->cmdq_flush */
|
||||
elan4_flush_cmdq_reorder (rxq->qr_cmdq);
|
||||
|
||||
}
|
||||
OMPI_UNLOCK (&queue->rx_lock);
|
||||
}
|
||||
|
||||
END_FUNC();
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -293,6 +353,8 @@ mca_ptl_elan_update_send (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
int num_ptls;
|
||||
int i;
|
||||
|
||||
START_FUNC();
|
||||
|
||||
num_ptls = emp->elan_num_ptls;
|
||||
|
||||
/* Update the send request if any of send's is completed */
|
||||
@ -301,31 +363,53 @@ mca_ptl_elan_update_send (mca_ptl_elan_module_1_0_0_t * emp)
|
||||
queue = ptl->queue;
|
||||
ctx = ptl->ptl_elan_ctx;
|
||||
|
||||
do {
|
||||
while (ompi_list_get_size (&queue->tx_desc) > 0) {
|
||||
desc = (mca_ptl_elan_desc_item_t *)
|
||||
ompi_list_get_first (&queue->tx_desc);
|
||||
#if 1
|
||||
if ((int *) (&desc->item.qdma->main_doneWord))
|
||||
|
||||
#if 0
|
||||
if ((int *) (&desc->desc->main_doneWord))
|
||||
#else
|
||||
/* Poll the completion event for 1usec */
|
||||
if (elan4_pollevent_word
|
||||
(ctx, desc->item.qdma->main_doneWord, 1))
|
||||
(ctx, &desc->desc->main_doneWord, 1))
|
||||
#endif
|
||||
{
|
||||
mca_ptl_elan_send_request_t *req;
|
||||
/* Remove the desc, update the request, put back to free list */
|
||||
desc = (mca_ptl_elan_desc_item_t *)
|
||||
ompi_list_remove_first (&queue->tx_desc);
|
||||
req = desc->item.qdma->req;
|
||||
req->super.super.req_mpi_done = true;
|
||||
req->super.super.req_pml_done = true;
|
||||
req = desc->desc->req;
|
||||
|
||||
if(NULL == req ) { /*(IS_ACK)*/
|
||||
OMPI_FREE_LIST_RETURN (&queue->tx_desc_free,
|
||||
(ompi_list_item_t *) desc);
|
||||
} else if (1) { //NO_NEED_FOR_MATCH || ALREADY_MATCHED)
|
||||
/* XXX:
|
||||
* mca_pml_base_send_request_matched(request)
|
||||
* hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
|
||||
* Be sure to add an atomic lock for threaded cases */
|
||||
|
||||
mca_ptl_base_send_frag_t frag;
|
||||
frag.super.frag_size = 4;
|
||||
req->super.req_bytes_sent = req->super.req_bytes_packed;
|
||||
ptl->super.ptl_send_progress(ptl, req, &frag);
|
||||
|
||||
/* the first fragment is allocated with the
|
||||
* request, all others need to be returned
|
||||
* to the free list */
|
||||
if (1) { // NOT_FIRST_FRAG
|
||||
OMPI_FREE_LIST_RETURN (&queue->tx_desc_free,
|
||||
(ompi_list_item_t *) desc);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} while (ompi_list_get_size (&queue->tx_desc) > 0);
|
||||
}
|
||||
}
|
||||
|
||||
END_FUNC();
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -29,7 +29,6 @@
|
||||
#include "ptl_elan_frag.h"
|
||||
#include "ptl_elan_req.h"
|
||||
|
||||
#define __elan4__
|
||||
#define _TRACK_MALLOC 0
|
||||
|
||||
#include <elan/elan.h>
|
||||
@ -44,18 +43,44 @@
|
||||
do { \
|
||||
if (value == unexp) { \
|
||||
ompi_output(output, \
|
||||
"[%s:%d] received unexpect allocated value \n", \
|
||||
"[%s:%d] received unexpect allocated value \n",\
|
||||
__FILE__, __LINE__); \
|
||||
return errno; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CHECK_ELAN 0
|
||||
|
||||
#if CHECK_ELAN
|
||||
#define START_FUNC() \
|
||||
do { \
|
||||
char hostname[32]; gethostname(hostname, 32); \
|
||||
fprintf(stderr, "[%s:%s:%d] Entering ...\n", \
|
||||
hostname, __FUNCTION__, __LINE__); \
|
||||
} while (0);
|
||||
|
||||
#define END_FUNC() \
|
||||
do { \
|
||||
char hostname[32]; gethostname(hostname, 32); \
|
||||
fprintf(stderr, "[%s:%s:%d] Completes ...\n", \
|
||||
hostname, __FUNCTION__, __LINE__); \
|
||||
} while (0);
|
||||
|
||||
#else
|
||||
|
||||
#define START_FUNC()
|
||||
#define END_FUNC()
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
/**
|
||||
* Structure used to publish elan information to peers.
|
||||
*/
|
||||
struct mca_ptl_elan_addr_t {
|
||||
int elan_vp; /* Right now only elan_vp is needed */
|
||||
int addr_inuse;
|
||||
int elan_vp;
|
||||
int inuse;
|
||||
ompi_process_name_t gid;
|
||||
};
|
||||
typedef struct mca_ptl_elan_addr_t mca_ptl_elan_addr_t;
|
||||
|
||||
@ -86,17 +111,34 @@ typedef struct {
|
||||
E4_Event32 event32; /* Local elan completion event */
|
||||
} ompi_elan_event_t;
|
||||
|
||||
struct ompi_ptl_elan_qdma_desc_t {
|
||||
E4_DMA64 main_dma; /**< Must be 8-byte aligned */
|
||||
/**
|
||||
* ELAN descriptor for send
|
||||
*/
|
||||
#define ELAN_BASE_DESC_FIELDS \
|
||||
E4_DMA64 main_dma; /**< Must be 8-byte aligned */ \
|
||||
/* 8 byte aligned */ \
|
||||
volatile E4_uint64 main_doneWord; \
|
||||
/* 8 byte aligned */ \
|
||||
ompi_elan_event_t *elan_data_event; \
|
||||
mca_ptl_elan_send_request_t *req; \
|
||||
/* 8 byte aligned */ \
|
||||
int desc_type; \
|
||||
int desc_status; \
|
||||
/* 8 byte aligned */
|
||||
|
||||
volatile E4_uint64 main_doneWord; /**< main memory location to poll */
|
||||
ompi_elan_event_t *elan_data_event; /**< 128-byte aligned copy event */
|
||||
RAIL *rail;
|
||||
struct ompi_ptl_elan_base_desc_t {
|
||||
ELAN_BASE_DESC_FIELDS
|
||||
/* 8 byte aligned */
|
||||
};
|
||||
typedef struct ompi_ptl_elan_base_desc_t ompi_ptl_elan_base_desc_t;
|
||||
|
||||
struct ompi_ptl_elan_qdma_desc_t {
|
||||
|
||||
ELAN_BASE_DESC_FIELDS
|
||||
/* 8 byte aligned */
|
||||
|
||||
mca_ptl_elan_t *ptl;
|
||||
mca_ptl_elan_send_request_t *req;
|
||||
RAIL *rail;
|
||||
/* 8 byte aligned */
|
||||
|
||||
uint8_t buff[INPUT_QUEUE_MAX]; /**< queue data */
|
||||
@ -200,9 +242,15 @@ int ompi_init_elan_stat (mca_ptl_elan_module_1_0_0_t * emp,
|
||||
int num_rails);
|
||||
|
||||
/* communication prototypes */
|
||||
int mca_ptl_elan_start_desc(int type, mca_ptl_elan_desc_item_t *desc);
|
||||
int mca_ptl_elan_poll_desc(int type, mca_ptl_elan_desc_item_t *desc);
|
||||
int mca_ptl_elan_wait_desc(int type, mca_ptl_elan_desc_item_t *desc);
|
||||
int mca_ptl_elan_start_desc(mca_ptl_elan_desc_item_t *desc,
|
||||
struct mca_ptl_elan_peer_t *ptl_peer,
|
||||
struct mca_pml_base_send_request_t *sendreq,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
int flags);
|
||||
|
||||
int mca_ptl_elan_poll_desc(mca_ptl_elan_desc_item_t *desc);
|
||||
int mca_ptl_elan_wait_desc(mca_ptl_elan_desc_item_t *desc);
|
||||
|
||||
/* control, synchronization and state prototypes */
|
||||
int mca_ptl_elan_drain_recv(mca_ptl_elan_module_1_0_0_t *emp);
|
||||
|
@ -37,7 +37,7 @@ mca_ptl_elan_proc_construct (mca_ptl_elan_proc_t * proc)
|
||||
proc->proc_peer_count = 0;
|
||||
proc->proc_guid.cellid = 0;
|
||||
proc->proc_guid.jobid = 0;
|
||||
proc->proc_guid.procid = 0;
|
||||
proc->proc_guid.vpid = 0;
|
||||
|
||||
OBJ_CONSTRUCT (&proc->proc_lock, ompi_mutex_t);
|
||||
|
||||
@ -173,52 +173,6 @@ mca_ptl_elan_proc_lookup (void *guid,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Note that this routine must be called with the lock on the process already
|
||||
* held. Insert a ptl instance into the proc array and assign it an address.
|
||||
*/
|
||||
int
|
||||
mca_ptl_elan_proc_insert (mca_ptl_elan_proc_t * ptl_proc,
|
||||
mca_ptl_elan_peer_t * ptl_peer)
|
||||
{
|
||||
int i;
|
||||
struct mca_ptl_elan_t *ptl_elan;
|
||||
|
||||
ptl_elan = ptl_peer->peer_ptl;
|
||||
ptl_peer->peer_proc = ptl_proc;
|
||||
|
||||
ptl_proc->proc_peers[ptl_proc->proc_peer_count] = ptl_peer;
|
||||
ptl_proc->proc_peer_count++;
|
||||
|
||||
/* Look through the proc instance for an address that is on the
|
||||
* directly attached network. If we don't find one, pick the first
|
||||
* unused address. */
|
||||
|
||||
for (i = 0; i < ptl_proc->proc_addr_count; i++) {
|
||||
|
||||
unsigned vp_local;
|
||||
unsigned vp_remote;
|
||||
mca_ptl_elan_addr_t *peer_addr;
|
||||
|
||||
peer_addr = ptl_proc->proc_addrs + i;
|
||||
|
||||
if (peer_addr->addr_inuse != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
vp_local = ptl_elan->elan_vp;
|
||||
vp_remote = peer_addr->elan_vp;
|
||||
|
||||
assert (vp_local != vp_remote);
|
||||
ptl_peer->peer_addr = peer_addr;
|
||||
}
|
||||
|
||||
ptl_peer->peer_addr->addr_inuse++;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Remove a peer from the proc array and indicate the address is
|
||||
* no longer in use.
|
||||
|
@ -45,10 +45,8 @@ static inline mca_ptl_elan_proc_t* mca_ptl_elan_proc_local(void)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int mca_ptl_elan_proc_insert(mca_ptl_elan_proc_t *, mca_ptl_elan_peer_t *);
|
||||
int mca_ptl_elan_proc_remove(mca_ptl_elan_proc_t *, mca_ptl_elan_peer_t *);
|
||||
bool mca_ptl_elan_proc_accept(mca_ptl_elan_proc_t *,
|
||||
struct sockaddr_in *, int sd);
|
||||
|
||||
|
||||
#endif
|
||||
|
@ -13,7 +13,6 @@ void
|
||||
mca_ptl_elan_send_request_construct (mca_ptl_elan_send_request_t * request)
|
||||
{
|
||||
OBJ_CONSTRUCT (&request->super, mca_pml_base_send_request_t);
|
||||
request->desc_type = 0;
|
||||
request->req_frag = NULL;
|
||||
}
|
||||
|
||||
@ -22,7 +21,6 @@ void
|
||||
mca_ptl_elan_send_request_destruct (mca_ptl_elan_send_request_t * request)
|
||||
{
|
||||
OBJ_DESTRUCT (&request->super);
|
||||
request->desc_type = 0;
|
||||
request->req_frag = NULL;
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,6 @@ OBJ_CLASS_DECLARATION(mca_ptl_elan_recv_request_t);
|
||||
*/
|
||||
struct mca_ptl_elan_send_request_t {
|
||||
mca_pml_base_send_request_t super;
|
||||
int desc_type;
|
||||
mca_ptl_elan_desc_item_t *req_frag;
|
||||
};
|
||||
typedef struct mca_ptl_elan_send_request_t mca_ptl_elan_send_request_t;
|
||||
|
@ -4,17 +4,20 @@
|
||||
|
||||
int main (int argc, char ** argv)
|
||||
{
|
||||
char hostname[32];
|
||||
int proc, nproc;
|
||||
|
||||
/* Get some environmental variables set for Open MPI, OOB */
|
||||
env_init_for_elan();
|
||||
|
||||
proc = 0;
|
||||
/*while (proc < 1) ;*/
|
||||
gethostname(hostname, 32);
|
||||
|
||||
MPI_Init(&argc, &argv);
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &proc);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
|
||||
MPI_Barrier(MPI_COMM_WORLD);
|
||||
fprintf(stdout, "[%s:%s:%d] done with init \n",
|
||||
hostname, __FUNCTION__, __LINE__);
|
||||
fflush(stdout);
|
||||
MPI_Finalize();
|
||||
return 0;
|
||||
}
|
||||
|
@ -1,8 +1,9 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
static void env_init_for_elan()
|
||||
{
|
||||
char hostname[32];
|
||||
char *rms_rank;
|
||||
|
||||
setenv("OMPI_MCA_oob_cofs_dir", "/home/1/yuw/tmp", 1);
|
||||
/*setenv("OMPI_MCA_oob_cofs_dir", "/tmp/COFS", 1);*/
|
||||
@ -10,15 +11,10 @@ static void env_init_for_elan()
|
||||
setenv("OMPI_MCA_pcm_cofs_jobid", "1", 1);
|
||||
setenv("OMPI_MCA_pcm_cofs_num_procs", "2", 1);
|
||||
|
||||
gethostname(hostname, 32);
|
||||
|
||||
if ( strcmp("quad0", hostname) == 0) {
|
||||
fprintf(stdout, "I am %s rank %d\n", hostname, 0);
|
||||
fflush(stdout);
|
||||
setenv("OMPI_MCA_pcm_cofs_procid", "0", 1);
|
||||
if (NULL != (rms_rank = getenv("RMS_RANK"))) {
|
||||
/* RMS_JOBID:RMS_NNODES:RMS_NPROCS:RMS_NODEID:RMS_RESOURCEID */
|
||||
setenv("OMPI_MCA_pcm_cofs_procid", rms_rank, 1);
|
||||
} else {
|
||||
fprintf(stdout, "I am %s rank %d\n", hostname, 1);
|
||||
fflush(stdout);
|
||||
setenv("OMPI_MCA_pcm_cofs_procid", "1", 1);
|
||||
fprintf(stderr, "Hi, please test elan4 from RMS for now\n");
|
||||
}
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user