1
1
And will put back .ompi_ignore to avoid confusion

This commit was SVN r1805.
Этот коммит содержится в:
Weikuan Yu 2004-07-26 13:05:59 +00:00
родитель 0c421ef645
Коммит 04f726852b
10 изменённых файлов: 396 добавлений и 340 удалений

Просмотреть файл

@ -6,28 +6,26 @@
include $(top_ompi_srcdir)/config/Makefile.options
#Assumption. We support only i686 for now.
#AM_CPPFLAGS = -fPIC -D_ELAN4 -D_REENTRANT -O3 -malign-double -march=i686
AM_CPPFLAGS = -fPIC -D_ELAN4 -D_REENTRANT -O3 -march=i686
AM_CPPFLAGS += -I$(top_ompi_builddir)/src/include \
AM_CPPFLAGS += -I$(top_ompi_builddir)/src/include \
-I$(top_ompi_srcdir)/src -I$(top_ompi_srcdir)/src/include \
-I/usr/lib/qsnet/elan4/include
noinst_LTLIBRARIES = libmca_ptl_elan.la
libmca_ptl_elan_la_SOURCES = \
elan_config.h \
ptl_elan.h \
ptl_elan_frag.h \
ptl_elan_req.h \
ptl_elan_proc.h \
ptl_elan_peer.h \
ptl_elan_priv.h \
ptl_elan_frag.c \
ptl_elan_req.c \
ptl_elan_proc.c \
ptl_elan_priv.c \
ptl_elan_peer.c \
ptl_elan_init.c \
elan_config.h \
ptl_elan.h \
ptl_elan_frag.h \
ptl_elan_proc.h \
ptl_elan_peer.h \
ptl_elan_priv.h \
ptl_elan_frag.c \
ptl_elan_proc.c \
ptl_elan_priv.c \
ptl_elan_peer.c \
ptl_elan_init.c \
ptl_elan_comm_init.c \
ptl_elan_module.c \
ptl_elan_module.c \
ptl_elan.c

Просмотреть файл

@ -16,7 +16,6 @@
#include "ptl_elan.h"
#include "ptl_elan_peer.h"
#include "ptl_elan_proc.h"
#include "ptl_elan_req.h"
#include "ptl_elan_frag.h"
#include "ptl_elan_priv.h"
@ -24,6 +23,8 @@
mca_ptl_elan_t mca_ptl_elan = {
{
&mca_ptl_elan_module.super,
4,
sizeof(mca_ptl_elan_desc_item_t),
0, /* ptl_exclusivity */
0, /* ptl_latency */
0, /* ptl_bandwidth */
@ -36,11 +37,12 @@ mca_ptl_elan_t mca_ptl_elan = {
mca_ptl_elan_add_procs,
mca_ptl_elan_del_procs,
mca_ptl_elan_finalize,
mca_ptl_elan_isend,
mca_ptl_elan_put,
mca_ptl_elan_get,
mca_ptl_elan_matched,
mca_ptl_elan_req_alloc,
mca_ptl_elan_req_return
mca_ptl_elan_req_init,
mca_ptl_elan_req_fini,
}
};
@ -146,9 +148,8 @@ mca_ptl_elan_finalize (struct mca_ptl_t *ptl)
elan_ptl = (struct mca_ptl_elan_t *) ptl;
/* XXX: Free all the lists, etc, hanged over PTL */
/* Free the PTL */
/* XXX: Free all the lists, etc, hanged over PTL
* before freeing the PTLs */
rail_index = elan_ptl->ptl_ni_local;
free (elan_ptl);
@ -160,28 +161,44 @@ mca_ptl_elan_finalize (struct mca_ptl_t *ptl)
}
int
mca_ptl_elan_req_alloc (struct mca_ptl_t *ptl,
struct mca_pml_base_send_request_t **request)
mca_ptl_elan_req_init (struct mca_ptl_t *ptl,
struct mca_pml_base_send_request_t *request)
{
int rc = OMPI_SUCCESS;
mca_pml_base_send_request_t *sendreq;
ompi_list_item_t *item;
mca_ptl_elan_desc_item_t *sd;
mca_ptl_elan_send_request_t * elan_req;
OMPI_FREE_LIST_GET (&mca_ptl_elan_module.elan_reqs_free, item, rc);
if (NULL != (sendreq = (mca_pml_base_send_request_t *) item))
sendreq->req_owner = ptl;
*request = sendreq;
START_FUNC();
/*OBJ_CONSTRUCT (request, mca_pml_base_send_request_t);*/
sd = mca_ptl_elan_alloc_send_desc(ptl, request);
if (NULL == sd) {
ompi_output(0,
"[%s:%d] Unable to allocate an elan send descriptors \n",
__FILE__, __LINE__);
rc = OMPI_ERR_OUT_OF_RESOURCE;
} else {
/* XXX: Hope PML never writes into the fragment */
(mca_ptl_elan_send_request_t *request)->req_frag = sd;
}
sd->desc->desc_status = MCA_PTL_ELAN_DESC_CACHED;
END_FUNC();
return rc;
}
void
mca_ptl_elan_req_return (struct mca_ptl_t *ptl,
struct mca_pml_base_send_request_t *request)
mca_ptl_elan_req_fini (struct mca_ptl_t *ptl,
struct mca_pml_base_send_request_t *request)
{
OMPI_FREE_LIST_RETURN (&mca_ptl_elan_module.elan_reqs_free,
(ompi_list_item_t *) request);
/* XXX: Lock to be added */
ompi_ptl_elan_queue_ctrl_t *queue;
queue = (struct mca_ptl_elan_t * ptl)->queue;
/* return the fragment and update the status */
desc = (mca_ptl_elan_send_request_t *) request->req_frag;
OMPI_FREE_LIST_RETURN (&queue->tx_desc_free, (ompi_list_item_t *) desc);
desc->desc->desc_status = MCA_PTL_ELAN_DESC_LOCAL;
return;
}
@ -203,6 +220,53 @@ mca_ptl_elan_send_frag_return (struct mca_ptl_t *ptl,
return;
}
/*
* Initiate an isend operation
*/
int
mca_ptl_elan_isend (struct mca_ptl_t *ptl,
struct mca_ptl_base_peer_t *ptl_peer,
struct mca_pml_base_send_request_t *sendreq,
size_t offset,
size_t size,
int flags)
{
int rc = OMPI_SUCCESS;
mca_ptl_elan_desc_item_t *sd;
/* XXX:
* PML extract an request from PTL module and then use this
* a request to ask for a fragment
* Is it too deep across stacks to get a request and
* correspondingly multiple LOCKS to go through*/
START_FUNC();
if (offset == 0) /* The first fragment uses a cached desc */
sd = ((mca_ptl_elan_send_request_t*)sendreq)->req_frag;
} else {
sd = mca_ptl_elan_alloc_send_desc(ptl, sendreq);
if (NULL == sd) {
ompi_output(0,
"[%s:%d] Unable to allocate an elan send descriptors \n",
__FILE__, __LINE__);
}
}
((struct mca_ptl_elan_send_request_t *)sendreq)->req_frag = sd;
rc = mca_ptl_elan_start_desc(sd,
(struct mca_ptl_elan_peer_t *)ptl_peer,
sendreq, offset, &size, flags);
/* Update offset */
sendreq->req_offset += size;
END_FUNC();
return rc;
}
/*
* Initiate a put operation.
*/
@ -216,31 +280,6 @@ mca_ptl_elan_put (struct mca_ptl_t *ptl,
int flags)
{
int rc = OMPI_SUCCESS;
mca_ptl_elan_desc_item_t *sd;
/* XXX:
* PML extract an request from PTL module and then use this
* a request to ask for a fragment
* Is it too deep across stacks to get a request and
* correspondingly multiple LOCKS to go through*/
START_FUNC();
sd = mca_ptl_elan_alloc_send_desc(ptl, sendreq);
if (NULL == sd) {
ompi_output(0,
"[%s:%d] Unable to allocate an elan send descriptors \n",
__FILE__, __LINE__);
}
/* Update offset */
sendreq->req_offset += size;
((struct mca_ptl_elan_send_request_t *)sendreq)->req_frag = sd;
rc = mca_ptl_elan_start_desc(sd,
(struct mca_ptl_elan_peer_t *)ptl_peer,
sendreq, offset, size, flags);
END_FUNC();
return rc;
}
@ -261,34 +300,80 @@ mca_ptl_elan_get (struct mca_ptl_t *ptl,
}
/*
* A posted receive has been matched - if required send an
* ack back to the peer and process the fragment.
* A posted receive has been matched
* + Copy the data into user buffer
* + Return an ack if need to
*/
void
mca_ptl_elan_matched (mca_ptl_t * ptl,
mca_ptl_base_recv_frag_t * frag)
{
mca_ptl_base_header_t* header = &frag->super.frag_header;
mca_pml_base_recv_request_t *request;
mca_ptl_base_header_t *header;
int set = 0;
/* if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) */
header = &frag->frag_base.frag_header;
request = frag->frag_request;
/* XXX: Pseudocode, for additional processing of header fragments
* a) (ACK:no, Get:No) Remove the frag. no need for further processing
* b) (ACK:yes, Get:No) Send an ACK only
* c) (ACK:yes, Get:yes) Get a message, update the fragment descriptor
* and then send an ACK,
* d) Consider moving time-consuming tasks to some BH-like mechanisms.
*/
/* Process the fragment */
set = fetchNset (&(mca_ptl_elan_recv_frag_t *)frag->frag_progressed, 1);
/* FIXME: skip the acknowledgement part */
if (0) {
/* Allocate a send descriptor and send an ACK */
if (!set) {
/* IN TCP case, IO_VEC is first allocated.
* then recv the data, and copy if needed,
*
* But in ELAN cases, we save the data into an unex buffer
* if the recv descriptor is not posted (for too long) (TODO).
* We then need to copy from unex_buffer to application buffer */
if(header->hdr_frag_length > 0) {
struct iovec iov;
ompi_proc_t *proc;
/* XXX: if (frag->frag_is_buffered) */
iov.iov_base = frag->frag_base.frag_addr;
iov.iov_len = frag->frag_base.frag_size;
proc = ompi_comm_peer_lookup(request->req_base.req_comm,
request->req_base.req_peer);
ompi_convertor_copy(proc->proc_convertor,
&frag->frag_base.frag_convertor);
ompi_convertor_init_for_recv(
&frag->frag_base.frag_convertor,
0,
request->req_base.req_datatype,
request->req_base.req_count,
request->req_base.req_addr,
header->hdr_frag.hdr_frag_offset);
ompi_convertor_unpack(&frag->frag_base.frag_convertor, &iov, 1);
}
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
/* FIXME: Pseudocode, for additional processing of fragments
* a) (ACK:no, Get:No)
* Remove the frag. no need for further processing
* b) (ACK:yes, Get:No)
* Send an ACK only
* c) (ACK:yes, Get:yes)
* Get a message, update the fragment descriptor and
* then send an ACK,
* d) Consider moving time-consuming tasks to some BH-like
* mechanisms.
*/
}
frag->frag_base.frag_owner->ptl_recv_progress
(frag->frag_base.frag_owner, request, frag->frag_base.frag_size);
/* FIXME:
* To support the required ACK, do not return
* until the ack is out */
if (((mca_ptl_elan_recv_frag_t *) frag)->frag_ack_pending == false)
mca_ptl_elan_recv_frag_return (frag->frag_base.frag_owner,
(mca_ptl_elan_recv_frag_t *) frag);
}
/* process fragment if complete */
mca_ptl_elan_recv_frag_progress((mca_ptl_elan_recv_frag_t*)frag);
return;
}

Просмотреть файл

@ -69,11 +69,8 @@ struct mca_ptl_elan_module_1_0_0_t {
size_t elan_num_ptls; /**< number of ptls activated */
ompi_list_t elan_procs; /**< elan proc's */
ompi_list_t elan_reqs; /**< all elan requests */
ompi_list_t elan_recv_frags;
ompi_list_t elan_pending_acks;
ompi_free_list_t elan_reqs_free; /**< all elan requests */
ompi_free_list_t elan_recv_frags_free;
struct mca_ptl_elan_proc_t *elan_local;
@ -163,24 +160,24 @@ mca_ptl_elan_del_procs (struct mca_ptl_t *ptl,
struct mca_ptl_base_peer_t **peers);
/**
* PML->PTL Allocate a send request from the PTL modules free list.
* PML->PTL acquire and initialize a send desc
*
* @param ptl (IN) PTL instance
* @param request (OUT) Pointer to allocated request.
* @return Status indicating if allocation was successful.
*/
extern int
mca_ptl_elan_req_alloc (struct mca_ptl_t *ptl,
struct mca_pml_base_send_request_t **);
mca_ptl_elan_req_init (struct mca_ptl_t *ptl,
struct mca_pml_base_send_request_t *req);
/**
* PML->PTL Return a send request to the PTL modules free list.
* PML->PTL free the cached desc
*
* @param ptl (IN) PTL instance
* @param request (IN) Pointer to allocated request.
*/
extern void mca_ptl_elan_req_return (struct mca_ptl_t *ptl,
struct mca_pml_base_send_request_t *);
extern void mca_ptl_elan_req_fini (struct mca_ptl_t *ptl,
struct mca_pml_base_send_request_t *);
/**
* PML->PTL Notification that a receive fragment has been matched.
@ -191,6 +188,28 @@ extern void mca_ptl_elan_req_return (struct mca_ptl_t *ptl,
extern void mca_ptl_elan_matched (struct mca_ptl_t *ptl,
struct mca_ptl_base_recv_frag_t *frag);
/**
* PML->PTL Initiate an isend operation
*
* @param ptl (IN) PTL instance
* @param ptl_base_peer (IN) PTL peer addressing
* @param send_request (IN/OUT) Send request (allocated by PML via
* mca_ptl_base_request_alloc_fn_t)
* @param size (IN)
* Number of bytes PML is requesting PTL to deliver
* @param flags (IN)
* Flags that should be passed to the peer via the message header.
* @param request (OUT)
* OMPI_SUCCESS if the PTL was able to queue one or more fragments
*/
extern int
mca_ptl_elan_isend (struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_base_peer,
struct mca_pml_base_send_request_t* request,
size_t offset,
size_t size,
int flags);
/**
* PML->PTL Initiate a put of the specified size.
*

Просмотреть файл

@ -12,7 +12,6 @@
#include "ptl_elan_peer.h"
#include "ptl_elan_proc.h"
#include "ptl_elan_frag.h"
#include "ptl_elan_req.h"
#include "ptl_elan_priv.h"
static void

Просмотреть файл

@ -23,7 +23,15 @@ struct mca_ptl_elan_peer_t;
struct ompi_ptl_elan_base_desc_t;
struct mca_ptl_elan_desc_item_t {
#if 0
mca_ptl_base_send_frag_t frag_send;
struct iovec *frag_vec_ptr;
size_t frag_vec_cnt;
struct iovec frag_vec[2];
volatile int frag_progressed;
#endif
ompi_list_item_t super;
volatile int frag_progressed;
struct ompi_ptl_elan_base_desc_t *desc;
};
typedef struct mca_ptl_elan_desc_item_t mca_ptl_elan_desc_item_t;
@ -32,10 +40,11 @@ typedef struct mca_ptl_elan_desc_item_t mca_ptl_elan_desc_item_t;
* ELAN received fragment derived type.
*/
struct mca_ptl_elan_recv_frag_t {
mca_ptl_base_recv_frag_t super;
mca_ptl_base_recv_frag_t frag_base;
size_t frag_hdr_cnt;
size_t frag_msg_cnt;
int frag_progressed;
bool frag_ack_pending;
union {
struct ompi_ptl_elan_qdma_frag_t *qdma;
struct ompi_ptl_elan_putget_frag_t *putget;
@ -52,27 +61,28 @@ mca_ptl_elan_alloc_send_desc( struct mca_ptl_t *ptl,
mca_ptl_elan_recv_frag_t *
mca_ptl_elan_alloc_recv_desc(struct mca_pml_base_recv_request_t *req);
#if 0
static inline void
mca_ptl_elan_recv_frag_progress(mca_ptl_elan_recv_frag_t* frag)
{
/* Upto this point, this only means the fragment has been
matched with a posted receive descriptor */
if (fetchNset (&frag->frag_progressed, 1) == 0) {
#if 1
if (0 == fetchNset (&frag->frag_progressed, 1)) {
/* make sure this only happens once for threaded case */
mca_pml_base_recv_request_t* request;
mca_ptl_base_recv_progress_fn_t progress;
progress = (frag)->super.super.frag_owner->ptl_recv_progress;
request = (frag)->super.frag_request;
progress = (frag)->frag_recv.frag_base.frag_owner->ptl_recv_progress;
request = (frag)->frag_recv.frag_request;
/* progress the request */
progress((frag)->super.super.frag_owner, request, &(frag)->super);
mca_ptl_elan_recv_frag_return((frag)->super.super.frag_owner, (frag));
#endif
progress((frag)->frag_recv.frag_base.frag_owner,
request, &(frag)->frag_recv);
mca_ptl_elan_recv_frag_return((frag)->frag_recv.frag_base.frag_owner,
(frag));
}
}
#endif
#endif

Просмотреть файл

@ -23,11 +23,8 @@
#include "ptl_elan.h"
#include "ptl_elan_proc.h"
#include "ptl_elan_frag.h"
#include "ptl_elan_req.h"
#include "ptl_elan_priv.h"
/*#define UNIT_TESTING 1*/
extern ompi_proc_t *ompi_proc_local_proc;
mca_ptl_elan_module_1_0_0_t mca_ptl_elan_module = {
@ -120,14 +117,10 @@ mca_ptl_elan_module_open (void)
elan_mp->elan_num_ptls = 0;
elan_mp->elan_local = NULL;
/* initialize list */
OBJ_CONSTRUCT (&elan_mp->elan_reqs, ompi_list_t);
/* initialize lists */
OBJ_CONSTRUCT (&elan_mp->elan_procs, ompi_list_t);
OBJ_CONSTRUCT (&elan_mp->elan_pending_acks, ompi_list_t);
OBJ_CONSTRUCT (&elan_mp->elan_recv_frags, ompi_list_t);
/* initialize free list */
OBJ_CONSTRUCT (&elan_mp->elan_reqs_free, ompi_free_list_t);
OBJ_CONSTRUCT (&elan_mp->elan_recv_frags_free, ompi_free_list_t);
/* initialize other objects */
@ -159,15 +152,6 @@ mca_ptl_elan_module_close (void)
}
}
/* Check whether all the entries are return to the free list */
if (elan_mp->elan_reqs_free.fl_num_allocated !=
elan_mp->elan_reqs_free.super.ompi_list_length) {
ompi_output (0, "[%s:%d] send_requests: %d allocated %d returned\n",
__FILE__, __LINE__,
elan_mp->elan_reqs_free.fl_num_allocated,
elan_mp->elan_reqs_free.super.ompi_list_length);
}
if (elan_mp->elan_recv_frags_free.fl_num_allocated !=
elan_mp->elan_recv_frags_free.super.ompi_list_length) {
ompi_output (0, "[%s:%d] recv_frags : %d allocated %d returned\n",
@ -176,10 +160,9 @@ mca_ptl_elan_module_close (void)
elan_mp->elan_recv_frags_free.super.ompi_list_length);
}
/* FIXME: free free list entries before destructing lists */
/* FIXME: free free_list entries before destructing lists */
/* Free the empty list holders */
OBJ_DESTRUCT (&(elan_mp->elan_reqs));
OBJ_DESTRUCT (&(elan_mp->elan_procs));
OBJ_DESTRUCT (&(elan_mp->elan_pending_acks));
OBJ_DESTRUCT (&(elan_mp->elan_recv_frags));
@ -187,8 +170,6 @@ mca_ptl_elan_module_close (void)
/* TODO:
* We need free all the memory allocated for this list
* before desctructing this free_list */
OBJ_DESTRUCT (&(elan_mp->elan_reqs_free));
OBJ_DESTRUCT (&(elan_mp->elan_recv_frags_free));
/* Destruct other structures */
@ -214,8 +195,7 @@ mca_ptl_elan_module_init (int *num_ptls,
START_FUNC();
if (CHECK_ELAN)
{
if (CHECK_ELAN) {
char hostname[32]; gethostname(hostname, 32);
fprintf(stderr, "[%s:%s:%d] debugging ...\n",
hostname, __FUNCTION__, __LINE__);
@ -226,13 +206,6 @@ mca_ptl_elan_module_init (int *num_ptls,
*allow_multi_user_threads = true;
*have_hidden_threads = OMPI_HAVE_THREADS;
ompi_free_list_init (&(elan_mp->elan_reqs_free),
sizeof (mca_ptl_elan_send_request_t),
OBJ_CLASS (mca_ptl_elan_send_request_t),
elan_mp->elan_free_list_num,
elan_mp->elan_free_list_max,
elan_mp->elan_free_list_inc, NULL);
ompi_free_list_init (&(elan_mp->elan_recv_frags_free),
sizeof (mca_ptl_elan_recv_frag_t),
OBJ_CLASS (mca_ptl_elan_recv_frag_t),
@ -295,7 +268,6 @@ int
mca_ptl_elan_module_progress (mca_ptl_tstamp_t tstamp)
{
START_FUNC();
/*if ( times == 5) exit(1); else times ++;*/
mca_ptl_elan_drain_recv(elan_mp);
mca_ptl_elan_update_send(elan_mp);
END_FUNC();

Просмотреть файл

@ -9,7 +9,6 @@
#include "ptl_elan_peer.h"
#include "ptl_elan_proc.h"
#include "ptl_elan_frag.h"
#include "ptl_elan_req.h"
#include "ptl_elan_priv.h"
static void
@ -18,53 +17,109 @@ mca_ptl_elan_init_qdma_desc (struct ompi_ptl_elan_qdma_desc_t *desc,
struct mca_ptl_elan_peer_t *ptl_peer,
mca_pml_base_send_request_t *pml_req,
size_t offset,
size_t size,
size_t *size,
int flags)
{
char *app_buff;
int header_length;
int mesg_length;
mca_ptl_base_header_t *hdr;
int destvp;
int size_out;
int size_in;
START_FUNC();
destvp = ptl_peer->peer_vp;
/* TODO: For now, assume data are contiguous and less than eager size */
app_buff = (char *) pml_req->super.req_addr;
header_length = sizeof (mca_ptl_base_match_header_t);
mesg_length = pml_req->req_bytes_packed;
size_in = *size;
hdr = (mca_ptl_base_header_t *) & desc->buff[0];
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
/*hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;*/
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = 0;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.pval = 0;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
if(offset == 0) {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0;
hdr->hdr_frag.hdr_src_ptr.pval = desc;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_match.hdr_contextid = pml_req->super.req_comm->c_contextid;
hdr->hdr_match.hdr_src = pml_req->super.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = pml_req->super.req_peer;
hdr->hdr_match.hdr_tag = pml_req->super.req_tag;
hdr->hdr_match.hdr_msg_length = mesg_length;
hdr->hdr_match.hdr_msg_seq = pml_req->super.req_sequence;
hdr->hdr_frag.hdr_frag_length = mesg_length;
hdr->hdr_match.hdr_contextid = pml_req->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = pml_req->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = pml_req->req_base.req_peer;
hdr->hdr_match.hdr_tag = pml_req->req_base.req_tag;
hdr->hdr_match.hdr_msg_length = pml_req->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = pml_req->req_base.req_sequence;
header_length = sizeof (mca_ptl_base_match_header_t);
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_frag_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0;
hdr->hdr_frag.hdr_src_ptr.pval = desc;
hdr->hdr_frag.hdr_dst_ptr = pml_req->req_peer_match;
header_length = sizeof (mca_ptl_base_frag_header_t);
}
/* initialize convertor */
if(size_in > 0) {
struct iovec iov;
ompi_convertor_t *convertor;
if( offset <= mca_ptl_elan.super.ptl_first_frag_size ) {
convertor = &pml_req->req_convertor;
} else {
convertor = &desc->frag_convertor;
ompi_convertor_copy(&pml_req->req_convertor, convertor);
ompi_convertor_init_for_send(
convertor,
0,
pml_req->req_base.req_datatype,
pml_req->req_base.req_count,
pml_req->req_base.req_addr,
offset);
}
/* For now, eager sends are always packed into the descriptor
* TODO: Inline up to 256 bytes (including the header), then
* do a chained send for mesg < first_frag_size */
iov.iov_base = &desc->buff[header_length];
iov.iov_len = size_in;
rc = ompi_convertor_pack(convertor, &iov, 1);
if (rc < 0) {
ompi_output (0, "[%s:%d] Unable to pack data\n",
__FILE__, __LINE__);
return OMPI_ERROR;
}
size_out = iov.iov_len;
} else {
size_out = size_in;
}
*size = size_out;
hdr->hdr_frag.hdr_frag_length = size_out;
/* fragment state */
#if 0
sendfrag->frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->frag_send.frag_request = sendreq;
sendfrag->frag_send.frag_base.frag_addr = sendfrag->frag_vec[1].iov_base;
sendfrag->frag_send.frag_base.frag_size = size_out;
sendfrag->frag_peer = ptl_peer;
/* XXX: Fragment state, is this going to be set anywhere in PML */
sendfrag->frag_progressed = 0;
#endif
/* Fill up all the necessary fields */
memcpy (&desc->buff[header_length], app_buff, mesg_length);
desc->main_dma.dma_srcAddr = MAIN2ELAN (desc->rail->r_ctx,
&desc->buff[0]);
/* XXXX Hardwired DMA retry count */
/* XXX: Hardcoded DMA retry count */
desc->main_dma.dma_typeSize = E4_DMA_TYPE_SIZE ((header_length +
mesg_length),
size_out),
DMA_DataTypeByte,
DMA_QueueWrite, 16);
@ -72,15 +127,12 @@ mca_ptl_elan_init_qdma_desc (struct ompi_ptl_elan_qdma_desc_t *desc,
elan4_local_cookie (ptl->queue->tx_cpool,
E4_COOKIE_TYPE_LOCAL_DMA, destvp);
if (CHECK_ELAN)
{
if (CHECK_ELAN) {
char hostname[32];
gethostname(hostname, 32);
fprintf(stderr,
"[%s send...] destvp %d type %d flag %d size %d\n",
hostname, destvp,
hdr->hdr_common.hdr_type,
gethostname(hostname, 32);
fprintf(stderr, "[%s send...] destvp %d type %d flag %d size %d\n",
hostname, destvp, hdr->hdr_common.hdr_type,
hdr->hdr_common.hdr_flags,
hdr->hdr_common.hdr_size);
}
@ -98,7 +150,7 @@ mca_ptl_elan_start_desc (mca_ptl_elan_desc_item_t * desc,
struct mca_ptl_elan_peer_t *ptl_peer,
struct mca_pml_base_send_request_t *sendreq,
size_t offset,
size_t size,
size_t *size,
int flags)
{
mca_ptl_elan_t *ptl;
@ -111,9 +163,6 @@ mca_ptl_elan_start_desc (mca_ptl_elan_desc_item_t * desc,
qdma = (ompi_ptl_elan_qdma_desc_t *)desc->desc;
ptl = qdma->ptl;
/* Could assert the following is the same as sendreq */
/*req = qdma->req;*/
mca_ptl_elan_init_qdma_desc (qdma, ptl, ptl_peer, sendreq,
offset, size, flags);
@ -141,6 +190,7 @@ mca_ptl_elan_data_frag (struct mca_ptl_elan_t *ptl,
mca_ptl_base_header_t * header)
{
/* For PML interfacing, refer to mca_ptl_tcp_recv_frag_match(frag, sd);*/
/* Allocate a recv frag descriptor */
mca_ptl_elan_recv_frag_t *recv_frag;
ompi_list_item_t *item;
@ -148,80 +198,60 @@ mca_ptl_elan_data_frag (struct mca_ptl_elan_t *ptl,
int rc;
OMPI_FREE_LIST_GET (&mca_ptl_elan_module.elan_recv_frags_free,
item, rc);
rc = OMPI_FREE_LIST_GET (&mca_ptl_elan_module.elan_recv_frags_free,
item, rc);
if (OMPI_SUCCESS != rc) {
while (OMPI_SUCCESS != rc) {
/* TODO: progress the recv state machine */
ompi_output (0,
"[%s:%d] Unable to allocate a recv fragment",
"[%s:%d] Retry to allocate a recv fragment",
__FILE__, __LINE__);
return;
/* TODO: progress the recv state machine */
}
rc = OMPI_FREE_LIST_GET (&mca_ptl_elan_module.elan_recv_frags_free,
item, rc);
}
recv_frag = (mca_ptl_elan_recv_frag_t *) item;
recv_frag->super.super.frag_owner = (mca_ptl_t *) ptl;
recv_frag->super.super.frag_addr = NULL;
recv_frag->super.super.frag_size = 0;
recv_frag->frag_recv.frag_base.frag_owner = (mca_ptl_t *) ptl;
/* XXX:
* Since elan is not connection oriented,
* No information about which peer until checking the header
*/
recv_frag->super.super.frag_peer = NULL;
recv_frag->super.frag_request = 0;
recv_frag->super.frag_is_buffered = false;
recv_frag->frag_recv.frag_base.frag_peer = NULL;
recv_frag->frag_recv.frag_request = NULL;
recv_frag->frag_recv.frag_is_buffered = false;
recv_frag->frag_hdr_cnt = 0;
recv_frag->frag_msg_cnt = 0;
recv_frag->frag_ack_pending = false;
recv_frag->frag_progressed = 0;
/* Take the header */
recv_frag->super.super.frag_header = *header;
/* Copy the header, mca_ptl_base_match() does not do what it claims */
recv_frag->frag_recv.frag_base.frag_header = *header;
/* Taking the data starting point be default */
recv_frag->frag_recv.frag_base.frag_addr =
(char *) header + sizeof (mca_ptl_base_header_t);
recv_frag->frag_recv.frag_base.frag_size =
header->hdr_frag.hdr_frag_length;
/* match with preposted requests */
if (mca_ptl_base_recv_frag_match (recv_frag->super.super.frag_owner,
&recv_frag->super,
&recv_frag->super.super.frag_header.
hdr_match)) {
/*mca_ptl_tcp_recv_frag_matched(frag);*/
/* copy into the request buffer */
request = recv_frag->super.frag_request;
if (header->hdr_frag.hdr_frag_length > 0) {
memcpy (request->super.req_addr,
(char *) header + sizeof (mca_ptl_base_header_t),
header->hdr_frag.hdr_frag_length);
}
recv_frag->super.frag_is_buffered = false ;
recv_frag->super.super.frag_addr = request->super.req_addr;
recv_frag->super.super.frag_size = header->hdr_frag.hdr_frag_length;
} else {
/* XXX: Fragment is buffered */
recv_frag->super.frag_is_buffered = true;
recv_frag->super.super.frag_addr = recv_frag->unex_buff;
recv_frag->super.super.frag_size =
header->hdr_frag.hdr_frag_length;
matched = mca_ptl_base_recv_frag_match (
recv_frag->frag_recv.frag_base.frag_owner,
&recv_frag->frag_recv,
&recv_frag->frag_recv.frag_base.frag_header.hdr_match);
if (!matched) {
/* Buffer the fragment into unex buffer
* TODO:
* Next version need to do this only when it is
* blocking the recv queue */
memcpy (recv_frag->unex_buff,
(char *) header + sizeof (mca_ptl_base_header_t),
header->hdr_frag.hdr_frag_length);
}
/* Complete the fragment */
if (fetchNset (&recv_frag->frag_progressed, 1) == 0 &&
NULL != recv_frag->super.frag_request) {
mca_ptl_base_recv_progress_fn_t progress;
progress = recv_frag->super.super.frag_owner->ptl_recv_progress;
request = recv_frag->super.frag_request;
/* progress the request */
progress (recv_frag->super.super.frag_owner, request,
&recv_frag->super);
/* FIXME:
* To support the required ACK, do not return
* until the ack is out */
mca_ptl_elan_recv_frag_return (recv_frag->super.super.frag_owner,
recv_frag);
}
recv_frag->frag_recv.frag_is_buffered = true;
recv_frag->frag_recv.frag_base.frag_addr = recv_frag->unex_buff;
}
}
static void
@ -262,16 +292,19 @@ mca_ptl_elan_drain_recv (mca_ptl_elan_module_1_0_0_t * emp)
OMPI_LOCK (&queue->rx_lock);
rc = elan4_pollevent_word (ctx, &rxq->qr_doneWord, 0);
#if 1
rc = (int *) (&rxq->qr_doneWord);
#else
rc = elan4_pollevent_word (ctx, &rxq->qr_doneWord, 1);
#endif
if (rc) {
mca_ptl_base_header_t *header;
header = (mca_ptl_base_header_t *) rxq->qr_fptr;
if (CHECK_ELAN)
{
if (CHECK_ELAN) {
char hostname[32];
gethostname(hostname, 32);
@ -314,7 +347,6 @@ mca_ptl_elan_drain_recv (mca_ptl_elan_module_1_0_0_t * emp)
/* PCI Write */
queue->input->q_fptr = rxq->qr_efptr;
MEMBAR_STORESTORE ();
/* Reset the event */
RESETEVENT_WORD (&rxq->qr_doneWord);
@ -352,6 +384,7 @@ mca_ptl_elan_update_send (mca_ptl_elan_module_1_0_0_t * emp)
int num_ptls;
int i;
int rc = 0;
START_FUNC();
@ -359,6 +392,7 @@ mca_ptl_elan_update_send (mca_ptl_elan_module_1_0_0_t * emp)
/* Update the send request if any of send's is completed */
for (i = 0; i < num_ptls; i++) {
ptl = emp->elan_ptls[i];
queue = ptl->queue;
ctx = ptl->ptl_elan_ctx;
@ -366,49 +400,47 @@ mca_ptl_elan_update_send (mca_ptl_elan_module_1_0_0_t * emp)
while (ompi_list_get_size (&queue->tx_desc) > 0) {
desc = (mca_ptl_elan_desc_item_t *)
ompi_list_get_first (&queue->tx_desc);
#if 0
if ((int *) (&desc->desc->main_doneWord))
#if 1
rc = (int *) (&desc->desc->main_doneWord);
#else
/* Poll the completion event for 1usec */
if (elan4_pollevent_word
(ctx, &desc->desc->main_doneWord, 1))
rc = elan4_pollevent_word(ctx, &desc->desc->main_doneWord, 1);
#endif
{
if (rc) {
mca_ptl_base_header_t *header;
mca_ptl_elan_send_request_t *req;
/* Remove the desc, update the request, put back to free list */
desc = (mca_ptl_elan_desc_item_t *)
ompi_list_remove_first (&queue->tx_desc);
req = desc->desc->req;
header = (mca_ptl_base_header_t *)&desc->desc->buff[0];
if(NULL == req ) { /*(IS_ACK)*/
if(NULL == req) { /* An ack descriptor */
OMPI_FREE_LIST_RETURN (&queue->tx_desc_free,
(ompi_list_item_t *) desc);
} else if (1) { //NO_NEED_FOR_MATCH || ALREADY_MATCHED)
/* XXX:
* mca_pml_base_send_request_matched(request)
* hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
* Be sure to add an atomic lock for threaded cases */
} else if (0 == (header->hdr_common.hdr_flags
& MCA_PTL_FLAGS_ACK_MATCHED)
|| mca_pml_base_send_request_matched(request)) {
/* XXX: NO_NEED_FOR_MATCH || ALREADY_MATCHED */
mca_ptl_base_send_frag_t frag;
frag.super.frag_size = 4;
req->super.req_bytes_sent = req->super.req_bytes_packed;
ptl->super.ptl_send_progress(ptl, req, &frag);
/* the first fragment is allocated with the
* request, all others need to be returned
* to the free list */
if (1) { // NOT_FIRST_FRAG
OMPI_FREE_LIST_RETURN (&queue->tx_desc_free,
(ompi_list_item_t *) desc);
if(fetchNset (&desc->frag_progressed, 1) == 0) {
ptl->super.ptl_send_progress(ptl, req,
header->hdr_frag.hdr_frag_length);
}
}
} else {
break;
}
}
}
/* Return a followup frag or if not cached */
if((header->hdr_frag.hdr_frag_offset != 0)
|| (desc->desc->desc_status
!= MCA_PTL_ELAN_DESC_CACHED))
OMPI_FREE_LIST_RETURN (&queue->tx_desc_free,
(ompi_list_item_t *) desc);
}
} else {
/* Stop at any incomplete send desc */
break;
}
} /* end of the while loop */
} /* end of the for loop */
END_FUNC();
return OMPI_SUCCESS;

Просмотреть файл

@ -27,7 +27,6 @@
#include "ptl_elan.h"
#include "ptl_elan_proc.h"
#include "ptl_elan_frag.h"
#include "ptl_elan_req.h"
#define _TRACK_MALLOC 0
@ -73,6 +72,15 @@
#endif
enum {
/* the first four bits for type */
MCA_PTL_ELAN_DESC_NULL = 0x00,
MCA_PTL_ELAN_DESC_QDMA = 0x01,
MCA_PTL_ELAN_DESC_PUTGET = 0x02,
/* next first four bits for status */
MCA_PTL_ELAN_DESC_LOCAL = 0x10,
MCA_PTL_ELAN_DESC_CACHED = 0x20
};
/**
* Structure used to publish elan information to peers.
@ -111,6 +119,17 @@ typedef struct {
E4_Event32 event32; /* Local elan completion event */
} ompi_elan_event_t;
/**
* ELAN send request derived type. The send request contains
* the base send request and a point to the elan fragment descriptor
*/
struct mca_ptl_elan_send_request_t {
mca_pml_base_send_request_t super;
mca_ptl_elan_desc_item_t *req_frag;
};
typedef struct mca_ptl_elan_send_request_t mca_ptl_elan_send_request_t;
/**
* ELAN descriptor for send
*/
@ -143,6 +162,7 @@ struct ompi_ptl_elan_qdma_desc_t {
uint8_t buff[INPUT_QUEUE_MAX]; /**< queue data */
/* 8 byte aligned */
ompi_convertor_t frag_convertor; /**< datatype convertor */
};
typedef struct ompi_ptl_elan_qdma_desc_t ompi_ptl_elan_qdma_desc_t;
@ -246,7 +266,7 @@ int mca_ptl_elan_start_desc(mca_ptl_elan_desc_item_t *desc,
struct mca_ptl_elan_peer_t *ptl_peer,
struct mca_pml_base_send_request_t *sendreq,
size_t offset,
size_t size,
size_t *size,
int flags);
int mca_ptl_elan_poll_desc(mca_ptl_elan_desc_item_t *desc);

Просмотреть файл

@ -1,32 +0,0 @@
/*
* $HEADER$
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/errno.h>
#include "types.h"
#include "mca/pml/base/pml_base_sendreq.h"
#include "ptl_elan_req.h"
#include "ptl_elan.h"
void
mca_ptl_elan_send_request_construct (mca_ptl_elan_send_request_t * request)
{
OBJ_CONSTRUCT (&request->super, mca_pml_base_send_request_t);
request->req_frag = NULL;
}
void
mca_ptl_elan_send_request_destruct (mca_ptl_elan_send_request_t * request)
{
OBJ_DESTRUCT (&request->super);
request->req_frag = NULL;
}
ompi_class_t mca_ptl_elan_send_request_t_class = {
"mca_ptl_elan_send_request_t",
OBJ_CLASS (mca_pml_base_send_request_t),
(ompi_construct_t) mca_ptl_elan_send_request_construct,
(ompi_destruct_t) mca_ptl_elan_send_request_destruct
};

Просмотреть файл

@ -1,47 +0,0 @@
/*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PTL_ELAN_REQ_H
#define MCA_PTL_ELAN_REQ_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "ompi_config.h"
#include "class/ompi_object.h"
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "mca/base/mca_base_param.h"
#include "mca/base/mca_base_module_exchange.h"
#include "mca/pml/base/pml_base_sendreq.h"
#include "mca/pml/base/pml_base_recvreq.h"
#include "ptl_elan.h"
#include "ptl_elan_frag.h"
enum {
MCA_PTL_ELAN_NULL_DESC,
MCA_PTL_ELAN_QDMA_DESC,
MCA_PTL_ELAN_PUTGET_DESC
};
OBJ_CLASS_DECLARATION(mca_ptl_elan_send_request_t);
OBJ_CLASS_DECLARATION(mca_ptl_elan_recv_request_t);
/**
* ELAN send request derived type. The send request contains
* the base send request and a point to the elan fragment descriptor
*/
struct mca_ptl_elan_send_request_t {
mca_pml_base_send_request_t super;
mca_ptl_elan_desc_item_t *req_frag;
};
typedef struct mca_ptl_elan_send_request_t mca_ptl_elan_send_request_t;
#endif