1
1

Commit a temporary version. George want to give a try ... )

This commit was SVN r2101.
Этот коммит содержится в:
Gopal Santhanaraman 2004-08-12 23:37:37 +00:00
родитель 71391bed98
Коммит e2a66e7a81
12 изменённых файлов: 931 добавлений и 304 удалений

Просмотреть файл

@ -14,7 +14,9 @@ libmca_ptl_gm_la_SOURCES = \
ptl_gm.h \ ptl_gm.h \
ptl_gm_component.c \ ptl_gm_component.c \
ptl_gm_peer.h \ ptl_gm_peer.h \
ptl_gm_peer.c \
ptl_gm_priv.h \ ptl_gm_priv.h \
ptl_gm_priv.c \
ptl_gm_proc.c \ ptl_gm_proc.c \
ptl_gm_proc.h \ ptl_gm_proc.h \
ptl_gm_req.c \ ptl_gm_req.c \

Просмотреть файл

@ -19,39 +19,41 @@
#include "ptl_gm_req.h" #include "ptl_gm_req.h"
#include "ptl_gm_req.c" #include "ptl_gm_req.c"
#include "ptl_gm_peer.h" #include "ptl_gm_peer.h"
#include "ptl_gm_priv.h"
mca_ptl_gm_module_t mca_ptl_gm_module = { mca_ptl_gm_module_t mca_ptl_gm_module = {
{ {
&mca_ptl_gm_component.super, &mca_ptl_gm_component.super,
1, /* max size of request cache */ 1, /* max size of request cache */
sizeof(mca_ptl_gm_send_frag_t), /* bytes required by ptl for a request */ sizeof(mca_ptl_gm_send_frag_t), /* bytes required by ptl for a request */
0, /* max size of first fragment */ 0, /* max size of first fragment */
0, /* min fragment size */ 0, /* min fragment size */
0, /* max fragment size */ 0, /* max fragment size */
0, /* exclusivity */ 0, /* exclusivity */
0, /* latency */ 0, /* latency */
0, /* bandwidth */ 0, /* bandwidth */
MCA_PTL_PUT, /* ptl flags */ MCA_PTL_PUT, /* ptl flags */
/* collection of interfaces */ /* collection of interfaces */
mca_ptl_gm_add_procs, mca_ptl_gm_add_procs,
mca_ptl_gm_del_procs, mca_ptl_gm_del_procs,
mca_ptl_gm_finalize, mca_ptl_gm_finalize,
NULL, /* JMS: Need send here */ mca_ptl_gm_send, /* JMS: Need send here */
mca_ptl_gm_put, mca_ptl_gm_put,
mca_ptl_gm_get, mca_ptl_gm_get,
mca_ptl_gm_matched, mca_ptl_gm_matched,
NULL, /* JMS need request init here */ mca_ptl_gm_request_init, /* JMS need request init here */
NULL, /* JMS need request fini here */ mca_ptl_gm_request_fini, /* JMS need request fini here */
NULL, /* JMS need match here */ NULL, /* JMS need match here */
NULL, /* JMS need send_progress here */ NULL, /* JMS need send_progress here */
NULL, /* JMS need recv_progress here */ NULL /* JMS need recv_progress here */
} }
}; };
OBJ_CLASS_INSTANCE (mca_ptl_gm_recv_frag_t,
mca_ptl_base_recv_frag_t, NULL, NULL); /*OBJ_CLASS_INSTANCE (mca_ptl_gm_recv_frag_t,*/
/*mca_ptl_base_recv_frag_t, NULL, NULL);*/
OBJ_CLASS_INSTANCE (mca_ptl_gm_send_request_t, OBJ_CLASS_INSTANCE (mca_ptl_gm_send_request_t,
mca_pml_base_send_request_t, NULL, NULL); mca_pml_base_send_request_t, NULL, NULL);
@ -72,10 +74,12 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl,
struct mca_ptl_base_peer_t **peers, struct mca_ptl_base_peer_t **peers,
ompi_bitmap_t * reachable) ompi_bitmap_t * reachable)
{ {
int i; int i,j;
int num_peer_ptls = 1;
struct ompi_proc_t *ompi_proc; struct ompi_proc_t *ompi_proc;
mca_ptl_gm_proc_t *ptl_proc; mca_ptl_gm_proc_t *ptl_proc;
mca_ptl_gm_peer_t *ptl_peer; mca_ptl_gm_peer_t *ptl_peer;
unsigned int lid;
for (i = 0; i < nprocs; i++) { for (i = 0; i < nprocs; i++) {
ompi_proc = ompi_procs[i]; ompi_proc = ompi_procs[i];
@ -87,31 +91,60 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl,
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
OMPI_THREAD_LOCK (&ptl_proc->proc_lock); OMPI_THREAD_LOCK (&ptl_proc->proc_lock);
if (ptl_proc->proc_addr_count == ptl_proc->proc_peer_count) { if (ptl_proc->proc_addr_count == ptl_proc->proc_peer_count) {
OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock); OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock);
return OMPI_ERR_UNREACH; return OMPI_ERR_UNREACH;
} }
/* TODO: make this extensible to multiple nics */
/* XXX: */
/* FIXME: */
for (j=0; j < num_peer_ptls; j++)
{
/*XXX: check for self */
ptl_peer = OBJ_NEW (mca_ptl_gm_peer_t); ptl_peer = OBJ_NEW (mca_ptl_gm_peer_t);
if (NULL == ptl_peer) { if (NULL == ptl_peer) {
OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock); OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
ptl_peer->peer_ptl = (mca_ptl_gm_module_t *) ptl; ptl_peer->peer_ptl = (mca_ptl_gm_module_t *) ptl;
ptl_peer->peer_proc = ptl_proc; ptl_peer->peer_proc = ptl_proc;
ptl_proc->peer_arr[ptl_proc->proc_peer_count] = ptl_peer; ptl_peer->global_id = ptl_proc->proc_addrs->global_id;
ptl_proc->proc_peer_count++; ptl_peer->port_number = ptl_proc->proc_addrs->port_id;
if (GM_SUCCESS !=
gm_global_id_to_node_id (((mca_ptl_gm_module_t *) ptl)->my_port,
ptl_proc->proc_addrs[j].global_id,
&lid)) {
ompi_output (0,
"[%s:%d] error in converting global to local id \n", __FILE__, __LINE__);
}
ptl_peer->local_id = lid;
ptl_proc->peer_arr[0] = ptl_peer;
ptl_proc->proc_peer_count++;
ptl_peer->peer_addr = ptl_proc->proc_addrs + i; ptl_peer->peer_addr = ptl_proc->proc_addrs + i;
}
ompi_bitmap_set_bit (reachable, i); ompi_bitmap_set_bit (reachable, i);
OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock); OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock);
peers[i] = ptl_peer; peers[i] = (struct mca_ptl_base_peer_t*)ptl_peer;
/*printf ("Global_id\t local_id\t port_number\t process name \n");*/
/*fflush (stdout);*/
/*printf ("%u %d %d %d\n", ptl_proc->peer_arr[0]->global_id,*/
/*ptl_proc->peer_arr[0]->local_id,*/
/*ptl_proc->peer_arr[0]->port_number,
* ptl_proc->proc_guid);*/
/*fflush (stdout);*/
} }
printf ("returning with success from gm_add_procs\n");
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -148,49 +181,99 @@ mca_ptl_gm_finalize (struct mca_ptl_base_module_t *ptl)
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/*
*
*/
int int
mca_ptl_gm_request_alloc (struct mca_ptl_base_module_t *ptl, mca_ptl_gm_request_init(struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t **request) struct mca_pml_base_send_request_t *request)
{ {
int rc;
mca_pml_base_send_request_t *sendreq;
ompi_list_item_t *item;
#if 0 mca_ptl_gm_send_frag_t *frag;
OMPI_FREE_LIST_GET (&mca_ptl_gm_module.gm_send_req, item, rc); struct mca_ptl_gm_send_request_t *req;
frag = mca_ptl_gm_alloc_send_frag(ptl, request);
if (NULL != (sendreq = (mca_pml_base_send_request_t *) item))
sendreq->req_owner = ptl; if (NULL == frag)
*request = sendreq; /* the allocated memory must be registered */ {
#endif ompi_output(0,"[%s:%d] Unable to allocate a gm send fragment\n");
return rc; return OMPI_ERR_OUT_OF_RESOURCE;
}
else
{
req = (mca_ptl_gm_send_request_t *)request;
/*((mca_ptl_gm_send_request_t *)request)->req_frag = frag;*/
req->req_frag = frag;
frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/
frag->ptl = (mca_ptl_gm_module_t*)ptl;
/*frag->peer = request->req_peer;*/
}
return OMPI_SUCCESS;
} }
/* /*
* *
*/ */
void void
mca_ptl_gm_request_return (struct mca_ptl_base_module_t *ptl, mca_ptl_gm_request_fini (struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t *request) struct mca_pml_base_send_request_t *request)
{ {
/*OMPI_FREE_LIST_RETURN(&mca_ptl_gm_module.gm_send_req,
(ompi_list_item_t*)request); */
return; mca_ptl_gm_send_frag_t *frag;
frag = ((mca_ptl_gm_send_request_t *)request)->req_frag;
OMPI_FREE_LIST_RETURN(&(((mca_ptl_gm_module_t *)ptl)->gm_send_frags),
(ompi_list_item_t *)frag);
frag->status = 0;/*XXX: MCA_PTL_GM_FRAG_LOCAL; */
} }
int
mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl,
struct mca_ptl_base_peer_t *ptl_peer,
struct mca_pml_base_send_request_t *sendreq,
size_t offset, size_t size, int flags)
{
mca_ptl_gm_send_frag_t *sendfrag;
mca_ptl_gm_peer_t *gm_ptl_peer;
mca_ptl_gm_module_t * gm_ptl;
int rc;
gm_ptl = (mca_ptl_gm_module_t *)ptl;
if (offset == 0) {
sendfrag = ((mca_ptl_gm_send_request_t *)sendreq)->req_frag;
} else {
sendfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq);
if (NULL == sendfrag) {
ompi_output(0,"[%s:%d] Unable to allocate a gm send frag\n",
__FILE__, __LINE__);
return 0; /*XXX: return error */
}
}
((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =sendfrag;
rc = mca_ptl_gm_send_frag_init (sendfrag, (mca_ptl_gm_peer_t*)ptl_peer, sendreq, offset,
&size, flags);
/*initiate the send */
gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer;
rc = mca_ptl_gm_peer_send (gm_ptl_peer,sendfrag,sendreq,
offset,&size,flags);
gm_ptl->num_send_tokens--;
/*Update offset */
sendreq->req_offset += size; /* XXX: should be what convertor packs */
/*append to the send_fragments_queue. */
ompi_list_append (&(gm_ptl->gm_send_frags_queue),
(ompi_list_item_t *) sendfrag);
return rc;
}
/* /*
* Initiate a put * Initiate a put
@ -202,28 +285,6 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t *sendreq, struct mca_pml_base_send_request_t *sendreq,
size_t offset, size_t size, int flags) size_t offset, size_t size, int flags)
{ {
#if 0
mca_ptl_gm_send_frag_t *sendfrag;
int rc;
if (offset == 0) {
sendfrag = &((mca_ptl_gm_send_request_t *) sendreq)->req_frag;
} else {
ompi_list_item_t *item;
OMPI_FREE_LIST_GET (&mca_ptl_gm_module.gm_send_frags, item, rc);
if (NULL == (sendfrag = (mca_ptl_gm_send_frag_t *) item))
return rc;
}
rc = mca_ptl_gm_send_frag_init (sendfrag, ptl_peer, sendreq, offset,
&size, flags);
if (rc != OMPI_SUCCESS)
return rc;
sendreq->req_offset += size;
return mca_ptl_gm_peer_send (ptl_peer, sendfrag);
#endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -257,32 +318,77 @@ mca_ptl_gm_matched (mca_ptl_base_module_t * ptl,
mca_ptl_base_recv_frag_t * frag) mca_ptl_base_recv_frag_t * frag)
{ {
/* might need to send an ack back */ /* might need to send an ack back */
#if 1
mca_pml_base_recv_request_t *request;
/*mca_ptl_base_recv_request_t *request;*/
mca_ptl_base_header_t *header;
int bytes_recv, rc;
mca_ptl_gm_module_t *gm_ptl;
struct iovec iov[1];
header = &frag->frag_base.frag_header;
request = frag->frag_request;
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
#if 0 #if 0
mca_ptl_base_header_t *header = &frag->super.frag_header; int rc;
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { mca_ptl_gm_send_frag_t *ack;
int rc; recv_frag = (mca_ptl_gm_recv_frag_t *) frag;
mca_ptl_gm_send_frag_t *ack; ack = mca_ptl_gm_alloc_send_frag(ptl,NULL);
mca_ptl_gm_recv_frag_t *recv_frag =
(mca_ptl_gm_recv_frag_t *) frag;
ompi_list_item_t *item;
MCA_PTL_GM_SEND_FRAG_ALLOC (item, rc);
ack = (mca_ptl_gm_send_frag_t *) item;
if (NULL == ack) {
OMPI_THREAD_LOCK (&mca_ptl_gm_module.gm_lock);
recv_frag->frag_ack_pending = true;
ompi_list_append (&mca_ptl_gm_module.gm_pending_acks,
(ompi_list_item_t *) frag);
OMPI_THREAD_UNLOCK (&mca_ptl_gm_module.gm_lock);
} else {
mca_ptl_gm_send_frag_init_ack (ack, ptl,
recv_frag->super.super.
frag_peer, recv_frag);
mca_ptl_gm_peer_send (ack->super.super.frag_peer, ack);
}
if (NULL == ack) {
ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n",
__FILE__,___LINE__);
OMPI_THREAD_LOCK (&mca_ptl_gm_module.gm_lock);
recv_frag->frag_ack_pending = true;
ompi_list_append (&mca_ptl_gm_module.gm_pending_acks,
(ompi_list_item_t *) frag);
OMPI_THREAD_UNLOCK (&mca_ptl_gm_module.gm_lock);
} else {
mca_ptl_gm_send_frag_init_ack (ack, ptl,
recv_frag->super.super.
frag_peer, recv_frag);
/*XXX: check this*/
mca_ptl_gm_peer_send (ack->super.super.frag_peer, ack,0,0,0 );
} }
/* process fragment if complete */ #endif
}
iov[0].iov_base = ((char*)frag->frag_base.frag_addr) + header->hdr_common.hdr_size;
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
iov[0].iov_len = bytes_recv;
/*process fragment if complete */
if (header->hdr_frag.hdr_frag_length > 0) {
ompi_proc_t *proc;
proc = ompi_comm_peer_lookup(request->req_base.req_comm,
request->req_base.req_peer);
ompi_convertor_copy(proc->proc_convertor,
&frag->frag_base.frag_convertor);
ompi_convertor_init_for_recv(
&frag->frag_base.frag_convertor,
0,
request->req_base.req_datatype,
request->req_base.req_count,
request->req_base.req_addr,
header->hdr_frag.hdr_frag_offset);
rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1);
assert( rc == 1 );
}
/*update progress*/ /* XXX : check this */
ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len );
/*return to free list */
gm_ptl = (mca_ptl_gm_module_t *)ptl;
OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags),(ompi_list_item_t*)frag);
#endif #endif
} }

Просмотреть файл

@ -16,11 +16,18 @@
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "gm.h" #include "gm.h"
#define MCA_PTL_GM_STATISTICS 0 #define MCA_PTL_GM_STATISTICS 0
#define SIZE 30 #define GM_SIZE 30
#define THRESHOLD 16384 #define THRESHOLD 16384
#define MAX_GM_PORTS 16 #define MAX_GM_PORTS 16
#define MAX_RECV_TOKENS 256 #define MAX_RECV_TOKENS 256
#define PTL_GM_ADMIN_SEND_TOKENS 0
#define PTL_GM_ADMIN_RECV_TOKENS 0
#define GM_SEND_BUF_SIZE 16384
#define GM_RECV_BUF_SIZE 16384
#define NUM_RECV_FRAGS 100
#define MCA_PTL_GM_FRAG_CACHED
/** /**
* GM PTL component * GM PTL component
@ -33,7 +40,7 @@ struct mca_ptl_gm_component_t {
int gm_free_list_num; /**< initial size of free lists */ int gm_free_list_num; /**< initial size of free lists */
int gm_free_list_max; /**< maximum size of free lists */ int gm_free_list_max; /**< maximum size of free lists */
int gm_free_list_inc; /**< number of elements to alloc when growing free lists */ int gm_free_list_inc; /**< number of elements to alloc when growing free lists */
struct mca_ptl_gm_proc_t* gm_local;
ompi_list_t gm_procs; ompi_list_t gm_procs;
ompi_list_t gm_send_req; ompi_list_t gm_send_req;
@ -51,16 +58,19 @@ extern mca_ptl_gm_component_t mca_ptl_gm_component;
struct mca_ptl_gm_module_t { struct mca_ptl_gm_module_t {
mca_ptl_base_module_t super; /**< base PTL module interface */ mca_ptl_base_module_t super; /**< base PTL module interface */
struct gm_port *my_port; struct gm_port *my_port;
unsigned int my_lid; unsigned int my_local_id;
unsigned int my_gid; unsigned int my_global_id;
unsigned int my_port_id; unsigned int my_port_id;
unsigned int num_send_tokens; unsigned int num_send_tokens;
unsigned int num_recv_tokens; unsigned int num_recv_tokens;
unsigned int max_send_tokens; unsigned int max_send_tokens;
unsigned int max_recv_tokens; unsigned int max_recv_tokens;
struct mca_ptl_gm_addr_t *proc_id_table; /*struct mca_ptl_gm_addr_t *proc_id_table;*/
ompi_free_list_t gm_send_frags; ompi_free_list_t gm_send_frags;
ompi_free_list_t gm_recv_frags;
ompi_free_list_t gm_recv_frags_free;
ompi_list_t gm_send_frags_queue;
ompi_list_t gm_pending_acks; ompi_list_t gm_pending_acks;
#if MCA_PTL_GM_STATISTICS #if MCA_PTL_GM_STATISTICS
@ -111,6 +121,19 @@ extern int mca_ptl_gm_component_control (int param,
extern int mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp); extern int mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp);
/**
* GM send
*/
extern int mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl,
struct mca_ptl_base_peer_t *ptl_peer,
struct mca_pml_base_send_request_t *sendreq,
size_t offset, size_t size, int flags);
/** /**
* GM put * GM put
*/ */
@ -173,16 +196,16 @@ extern int mca_ptl_gm_del_procs (struct mca_ptl_base_module_t *ptl,
* @return Status indicating if allocation was successful. * @return Status indicating if allocation was successful.
* *
*/ */
extern int mca_ptl_gm_request_alloc (struct mca_ptl_base_module_t *ptl, extern int mca_ptl_gm_request_init (struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t struct mca_pml_base_send_request_t
**); *req);
/** /**
* *
*/ */
extern void mca_ptl_gm_request_return (struct mca_ptl_base_module_t *ptl, extern void mca_ptl_gm_request_fini (struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t struct mca_pml_base_send_request_t
*); *);

Просмотреть файл

@ -24,7 +24,7 @@
#include "ptl_gm_addr.h" #include "ptl_gm_addr.h"
#include "ptl_gm_proc.h" #include "ptl_gm_proc.h"
#include "ptl_gm_req.h" #include "ptl_gm_req.h"
#include "ptl_gm_priv.h"
mca_ptl_gm_component_t mca_ptl_gm_component = { mca_ptl_gm_component_t mca_ptl_gm_component = {
{ {
@ -118,10 +118,10 @@ mca_ptl_gm_component_open (void)
mca_ptl_gm_module.super.ptl_first_frag_size = mca_ptl_gm_module.super.ptl_first_frag_size =
mca_ptl_gm_param_register_int ("first_frag_size", 16 * 1024); mca_ptl_gm_param_register_int ("first_frag_size", 16 * 1024);
mca_ptl_gm_module.super.ptl_min_frag_size = mca_ptl_gm_module.super.ptl_min_frag_size =
mca_ptl_gm_param_register_int ("min_frag_size", 0); mca_ptl_gm_param_register_int ("min_frag_size", 1<<16);
mca_ptl_gm_module.super.ptl_min_frag_size = mca_ptl_gm_component.gm_free_list_num =
mca_ptl_gm_param_register_int ("free_list_num", 32); mca_ptl_gm_param_register_int ("free_list_num", 32);
mca_ptl_gm_module.super.ptl_min_frag_size = mca_ptl_gm_component.gm_free_list_inc =
mca_ptl_gm_param_register_int ("free_list_inc", 32); mca_ptl_gm_param_register_int ("free_list_inc", 32);
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -137,38 +137,36 @@ mca_ptl_gm_component_open (void)
int int
mca_ptl_gm_component_close (void) mca_ptl_gm_component_close (void)
{ {
#ifdef GOPAL_TODO
/* if (OMPI_SUCCESS != ompi_mca_ptl_gm_finalize(&mca_ptl_gm_component)) { if (OMPI_SUCCESS != ompi_mca_ptl_gm_finalize(&mca_ptl_gm_component)) {
ompi_output(0, ompi_output(0,
"[%s:%d] error in finalizing gm state and PTL's.\n", "[%s:%d] error in finalizing gm state and PTL's.\n",
__FILE__, __LINE__); __FILE__, __LINE__);
return NULL; return NULL;
} */ }
#endif
if (NULL != mca_ptl_gm_component.gm_ptl_modules) if (NULL != mca_ptl_gm_component.gm_ptl_modules)
free (mca_ptl_gm_component.gm_ptl_modules); free (mca_ptl_gm_component.gm_ptl_modules);
OBJ_DESTRUCT (&mca_ptl_gm_component.gm_procs); OBJ_DESTRUCT (&mca_ptl_gm_component.gm_procs);
OBJ_DESTRUCT (&mca_ptl_gm_component.gm_send_req); OBJ_DESTRUCT (&mca_ptl_gm_component.gm_send_req);
OBJ_DESTRUCT (&mca_ptl_gm_component.gm_lock);
return ompi_event_fini (); return OMPI_SUCCESS;
} }
/* /*
* Create a ptl instance and add to components list. * Create a ptl instance and add to components list.
*/ */
static int static int
mca_ptl_gm_create (void) mca_ptl_gm_create (int i)
{ {
mca_ptl_gm_module_t *ptl; mca_ptl_gm_module_t *ptl;
char param[256];
ptl = malloc (sizeof (mca_ptl_gm_module_t)); ptl = (mca_ptl_gm_module_t *)malloc (sizeof (mca_ptl_gm_module_t));
if (NULL == ptl) { if (NULL == ptl) {
ompi_output (0, ompi_output (0,
" ran out of resource to allocate ptl_instance \n"); " ran out of resource to allocate ptl_instance \n");
@ -176,7 +174,7 @@ mca_ptl_gm_create (void)
} }
memcpy (ptl, &mca_ptl_gm_module, sizeof (mca_ptl_gm_module)); memcpy (ptl, &mca_ptl_gm_module, sizeof (mca_ptl_gm_module));
mca_ptl_gm_component.gm_ptl_modules[mca_ptl_gm_component.gm_num_ptl_modules++] = ptl; mca_ptl_gm_component.gm_ptl_modules[i] = ptl;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -192,20 +190,7 @@ mca_ptl_gm_create (void)
static int static int
mca_ptl_gm_module_create_instances (void) mca_ptl_gm_module_create_instances (void)
{ {
return 0;
int i;
int maxptls = 1; /* maxptls set to 1 */
/* allocate memory for ptls */
mca_ptl_gm_component.gm_max_ptl_modules = maxptls;
mca_ptl_gm_component.gm_ptl_modules = malloc (maxptls * sizeof (mca_ptl_gm_module_t *));
if (NULL == mca_ptl_gm_component.gm_ptl_modules)
return OMPI_ERR_OUT_OF_RESOURCE;
for (i = 0; i < maxptls; i++) {
mca_ptl_gm_create ();
}
return OMPI_SUCCESS;
} }
@ -225,7 +210,7 @@ mca_ptl_gm_module_store_data_toexchange (void)
mca_ptl_gm_addr_t *addrs; mca_ptl_gm_addr_t *addrs;
size = mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_gm_addr_t); size = mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_gm_addr_t);
addrs = malloc (size); addrs = (mca_ptl_gm_addr_t *)malloc (size);/*XXX: check this out */
if (NULL == addrs) { if (NULL == addrs) {
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
@ -233,8 +218,8 @@ mca_ptl_gm_module_store_data_toexchange (void)
for (i = 0; i < mca_ptl_gm_component.gm_num_ptl_modules; i++) { for (i = 0; i < mca_ptl_gm_component.gm_num_ptl_modules; i++) {
mca_ptl_gm_module_t *ptl = mca_ptl_gm_component.gm_ptl_modules[i]; mca_ptl_gm_module_t *ptl = mca_ptl_gm_component.gm_ptl_modules[i];
addrs[i].local_id = ptl->my_lid; addrs[i].local_id = ptl->my_local_id;
addrs[i].global_id = ptl->my_gid; addrs[i].global_id = ptl->my_global_id;
addrs[i].port_id = ptl->my_port_id; addrs[i].port_id = ptl->my_port_id;
} }
rc = mca_base_modex_send (&mca_ptl_gm_component.super.ptlm_version, addrs, rc = mca_base_modex_send (&mca_ptl_gm_component.super.ptlm_version, addrs,
@ -243,68 +228,61 @@ mca_ptl_gm_module_store_data_toexchange (void)
return rc; return rc;
} }
/*
* initialize a ptl interface
*
*/
static int static int
ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm) ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm)
{ {
mca_ptl_gm_module_t *ptl; mca_ptl_gm_module_t *ptl;
unsigned int board_no, port_no; unsigned int board_no, port_no;
char *buffer_ptr;
gm_status_t status; gm_status_t status;
int buf_len;
int i; int i;
int maxptls = 1; /* maxptls set to 1 */
if (OMPI_SUCCESS != mca_ptl_gm_module_create_instances ()) { mca_ptl_gm_component.gm_max_ptl_modules = maxptls;
return 0; mca_ptl_gm_component.gm_ptl_modules = malloc (maxptls *
sizeof (mca_ptl_gm_module_t *));
if (NULL == mca_ptl_gm_component.gm_ptl_modules)
return OMPI_ERR_OUT_OF_RESOURCE;
for (i = 0; i < maxptls; i++) {
mca_ptl_gm_create (i);
} }
/*hack : we have set the gm_max_ptl_modules to 1 */ /*Hack : we have set the gm_max_ptl_modules to 1 */
for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) { for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) {
ptl = mca_ptl_gm_component.gm_ptl_modules[i]; ptl = mca_ptl_gm_component.gm_ptl_modules[i];
/* open the first available gm port for this board */ /* open the first available gm port for this board */
board_no = i; board_no = i;
for (port_no = 2; port_no < MAX_GM_PORTS; port_no++) { for (port_no = 2; port_no < MAX_GM_PORTS; port_no++) {
printf ("about to call open port\n");
if (port_no == 3) continue;
/* port 0,1,3 reserved */
status = gm_open (&(ptl->my_port), board_no,
port_no, "OMPI-GM", GM_API_VERSION_2_0);
printf ("about to call open port\n"); if (GM_SUCCESS == status) {
if (port_no != 3) { ptl->my_port_id = port_no;
status = gm_open (&(ptl->my_port), board_no, port_no, "OMPI-GM", GM_API_VERSION_2_0); /* port 0,1,3 reserved */ break;
}
if (GM_SUCCESS == status) { }
ptl->my_port_id = port_no;
break;
}
}
}
#if 1
/* Get node local Id */ /* Get node local Id */
if (GM_SUCCESS != gm_get_node_id (ptl->my_port, &(ptl->my_lid))) { if (GM_SUCCESS != gm_get_node_id (ptl->my_port, &(ptl->my_local_id))) {
ompi_output (0, " failure to get local_id \n"); ompi_output (0, " failure to get local_id \n");
return 0; return 0;
} }
#endif
/* Convert local id to global id */ /* Convert local id to global id */
if (GM_SUCCESS != if (GM_SUCCESS !=
gm_node_id_to_global_id (ptl->my_port, ptl->my_lid, gm_node_id_to_global_id (ptl->my_port, ptl->my_local_id,
&(ptl->my_gid))) { &(ptl->my_global_id))) {
ompi_output (0, " Error: Unable to get my GM global id \n"); ompi_output (0, " Error: Unable to get my GM global id \n");
return 0; return 0;
} }
} }
/* publish GM parameters with the MCA framework */
if (OMPI_SUCCESS != mca_ptl_gm_module_store_data_toexchange ())
return 0;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -315,26 +293,56 @@ ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm)
static int static int
ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm) ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
{ {
int i; int i, rc;
mca_ptl_gm_module_t *ptl; mca_ptl_gm_module_t *ptl;
gm_status_t status; gm_status_t status;
int buf_len; void *gm_send_reg_memory , *gm_recv_reg_memory;
void *buffer_ptr; ompi_free_list_t *fslist, *frlist, *free_rlist;
ompi_list_item_t *item;
mca_ptl_gm_send_frag_t *sfragment;
mca_ptl_gm_recv_frag_t *rfragment, *frag, *free_rfragment;
for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) { for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) {
ptl = mca_ptl_gm_component.gm_ptl_modules[i]; ptl = mca_ptl_gm_component.gm_ptl_modules[i];
#if 0
/* initialise the free lists */
ompi_free_list_init (&(mca_ptl_gm_component.gm_send_req),
sizeof (mca_ptl_gm_send_request_t),
OBJ_CLASS (mca_ptl_gm_send_request_t),
mca_ptl_gm_component.gm_free_list_num,
mca_ptl_gm_component.gm_free_list_max,
mca_ptl_gm_component.gm_free_list_inc, NULL);
#endif ptl->num_send_tokens = gm_num_send_tokens (ptl->my_port);
ptl->num_send_tokens -= PTL_GM_ADMIN_SEND_TOKENS;
ptl->num_recv_tokens = gm_num_receive_tokens (ptl->my_port);
ptl->num_recv_tokens -= PTL_GM_ADMIN_RECV_TOKENS;
/** Receive part **/ /****************SEND****************************/
/* construct a list of send fragments */
OBJ_CONSTRUCT (&(ptl->gm_send_frags), ompi_free_list_t);
OBJ_CONSTRUCT (&(ptl->gm_send_frags_queue), ompi_list_t);
fslist = &(ptl->gm_send_frags);
ompi_free_list_init (&(ptl->gm_send_frags),
sizeof (mca_ptl_gm_send_frag_t),
OBJ_CLASS (mca_ptl_gm_send_frag_t),
32, 32, 1, NULL); /* not using mpool */
/* allocate the elements */
sfragment = (mca_ptl_gm_send_frag_t *)
malloc (sizeof(mca_ptl_gm_send_frag_t) *
(ptl->num_send_tokens));
/* allocate the registered memory */
gm_send_reg_memory = gm_dma_malloc ( ptl->my_port,
(GM_SEND_BUF_SIZE * ptl->num_send_tokens) );
for (i = 0; i < ptl->num_send_tokens; i++) {
ompi_list_item_t *item;
sfragment->send_buf = gm_send_reg_memory;
item = (ompi_list_item_t *) sfragment;
ompi_list_append (&(fslist->super), item);
gm_send_reg_memory = ((char *) gm_send_reg_memory +
GM_SEND_BUF_SIZE);
sfragment++;
}
/*****************RECEIVE*****************************/
/*allow remote memory access */ /*allow remote memory access */
status = gm_allow_remote_memory_access (ptl->my_port); status = gm_allow_remote_memory_access (ptl->my_port);
if (GM_SUCCESS != status) { if (GM_SUCCESS != status) {
@ -342,42 +350,65 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
} }
ptl->num_send_tokens = gm_num_send_tokens (ptl->my_port);
ptl->num_recv_tokens = gm_num_receive_tokens (ptl->my_port);
/* set acceptable sizes */ /* construct the list of recv fragments free */
/*status = gm_set_acceptable_sizes(ptl->my_port, GM_LOW_PRIORITY, OBJ_CONSTRUCT (&(ptl->gm_recv_frags_free), ompi_free_list_t);
* MASK);*/ free_rlist = &(ptl->gm_recv_frags_free);
/*allocate the elements */
free_rfragment = (mca_ptl_gm_recv_frag_t *)
malloc(sizeof(mca_ptl_gm_recv_frag_t) * NUM_RECV_FRAGS);
/* post receive buffers for each available token */ for (i = 0; i < NUM_RECV_FRAGS; i++) {
buf_len = THRESHOLD; ompi_list_item_t *item;
/*TODO need to provide buffers with two different sizes to distinguish item = (ompi_list_item_t *) free_rfragment;
* between header and data */ ompi_list_append (&(free_rlist->super), item); /* XXX: check this */
free_rfragment++;
for (i = 0; i < ptl->num_recv_tokens; i++) { }
buffer_ptr = gm_dma_malloc (ptl->my_port, buf_len);
gm_provide_receive_buffer (ptl->my_port, buffer_ptr,
SIZE, GM_LOW_PRIORITY); /*construct the list of recv fragments*/
OBJ_CONSTRUCT (&(ptl->gm_recv_frags), ompi_free_list_t);
frlist = &(ptl->gm_recv_frags);
/*allocate the elements */
rfragment = (mca_ptl_gm_recv_frag_t *)
malloc (sizeof (mca_ptl_gm_recv_frag_t) *
(ptl->num_recv_tokens - 1));
/*allocate the registered memory */
gm_recv_reg_memory =
gm_dma_malloc (ptl->my_port,
(GM_RECV_BUF_SIZE * ptl->num_recv_tokens ) );
for (i = 0; i < ptl->num_recv_tokens ; i++) {
ompi_list_item_t *item;
rfragment->alloc_recv_buffer = gm_recv_reg_memory;
item = (ompi_list_item_t *) rfragment;
ompi_list_append (&(frlist->super), item); /* XXX: check this */
gm_recv_reg_memory = ((char *)
gm_recv_reg_memory + GM_RECV_BUF_SIZE);
rfragment++;
}
/*TODO : need to provide buffers with two different sizes
* to distinguish between header and data
* post receive buffers */
for (i = 0; i < (ptl->num_recv_tokens-1) ; i++) {
OMPI_FREE_LIST_GET( &(ptl->gm_recv_frags), item, rc);
assert( rc == OMPI_SUCCESS );
frag = (mca_ptl_gm_recv_frag_t*)item;
gm_provide_receive_buffer (ptl->my_port,frag->alloc_recv_buffer,
GM_SIZE, GM_LOW_PRIORITY);
} }
#if 0
/** Send Part **/
OBJ_CONSTRUCT (&mca_ptl_gm_module.gm_send_frag, ompi_free_list_t);
ompi_free_list_init (&(mca_ptl_gm_component.gm_send_frag),
sizeof (mca_ptl_gm_send_frag_t),
OBJ_CLASS (mca_ptl_gm_send_frag_t));
/* allocate send buffers */
total_registered_memory = max_send_buf * SIZE;
ptl->send_req->req_frag->head =
(struct send_buf *) gm_dma_malloc (ptl->my_port,
total_registered_memory);
#endif
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/* /*
@ -391,22 +422,11 @@ mca_ptl_gm_component_init (int *num_ptl_modules,
bool * have_hidden_threads) bool * have_hidden_threads)
{ {
mca_ptl_base_module_t **ptls; mca_ptl_base_module_t **ptls;
int rc;
unsigned int board_id, port_id;
*num_ptl_modules = 0; *num_ptl_modules = 0;
*allow_multi_user_threads = false; *allow_multi_user_threads = false;
*have_hidden_threads = false; *have_hidden_threads = false;
/*
ompi_free_list_init (&(mca_ptl_gm_component.gm_send_req),
sizeof (mca_ptl_gm_send_request_t),
OBJ_CLASS (mca_ptl_gm_send_request_t),
mca_ptl_gm_component.gm_free_list_num,
mca_ptl_gm_component.gm_free_list_max,
mca_ptl_gm_component.gm_free_list_inc, NULL);
*/
if (OMPI_SUCCESS != ompi_mca_ptl_gm_init (&mca_ptl_gm_component)) { if (OMPI_SUCCESS != ompi_mca_ptl_gm_init (&mca_ptl_gm_component)) {
ompi_output (0, ompi_output (0,
"[%s:%d] error in initializing gm state and PTL's.\n", "[%s:%d] error in initializing gm state and PTL's.\n",
@ -421,14 +441,19 @@ mca_ptl_gm_component_init (int *num_ptl_modules,
return NULL; return NULL;
} }
/* publish GM parameters with the MCA framework */
if (OMPI_SUCCESS != mca_ptl_gm_module_store_data_toexchange ())
return 0;
/* return array of PTLs */ /* return array of PTLs */
ptls = malloc (mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_base_module_t *)); ptls = (mca_ptl_base_module_t**) malloc (
mca_ptl_gm_component.gm_num_ptl_modules *
sizeof (mca_ptl_base_module_t *));
if (NULL == ptls) { if (NULL == ptls) {
return NULL; return NULL;
} }
memcpy (ptls, mca_ptl_gm_component.gm_ptl_modules, memcpy (ptls, mca_ptl_gm_component.gm_ptl_modules,
mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_gm_module_t *)); mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_gm_module_t *));
*num_ptl_modules = mca_ptl_gm_component.gm_num_ptl_modules; *num_ptl_modules = mca_ptl_gm_component.gm_num_ptl_modules;
@ -453,16 +478,17 @@ mca_ptl_gm_component_control (int param, void *value, size_t size)
int int
mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp) mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp)
{ {
int rc;
/* check the send queue to see if any pending send can proceed */ /* check the send queue to see if any pending send can proceed */
/* check for receive and , call ptl_match to send it to the upper
/* check for recieve and , call ptl_match to send it to the upper
level */ level */
rc = mca_ptl_gm_incoming_recv(&mca_ptl_gm_component);
/* in case matched, do the appropriate queuing. */
return OMPI_SUCCESS; return OMPI_SUCCESS;
/* check the send queue to see if any pending send can proceed */
/* check for recieve and , call ptl_match to send it to the upper
level */
/* in case matched, do the appropriate queuing. */
} }

Просмотреть файл

@ -14,26 +14,31 @@
#include "event/event.h" #include "event/event.h"
#include "mca/pml/pml.h" #include "mca/pml/pml.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "include/types.h"
/*#include "ptl_gm_sendfrag.h"*/
#include "ptl_gm_proc.h"
#include "ptl_gm_addr.h"
#include "ptl_gm.h"
/** /**
* An abstraction that represents a connection to a peer process. * An abstraction that represents a connection to a peer process.
*/ */
struct mca_ptl_gm_peer_t { struct mca_ptl_gm_peer_t {
ompi_list_item_t super; ompi_list_item_t super;
struct mca_ptl_gm_module_t *peer_ptl; struct mca_ptl_gm_module_t *peer_ptl;
struct mca_ptl_gm_proc_t *peer_proc; struct mca_ptl_gm_proc_t *peer_proc;
struct mca_ptl_gm_addr_t *peer_addr; /**< address of peer */ struct mca_ptl_gm_addr_t *peer_addr; /**< address of peer */
unsigned int global_id;
unsigned int port_number;
unsigned int local_id;
int num_credits; int num_credits;
int max_credits; int max_credits;
int resending; int resending;
int num_resend; int num_resend;
}; };
typedef struct mca_ptl_gm_peer_t mca_ptl_gm_peer_t; typedef struct mca_ptl_gm_peer_t mca_ptl_gm_peer_t;
/*extern omp_class_t mca_ptl_gm_peer_t_class;*/
OBJ_CLASS_DECLARATION(mca_ptl_gm_peer_t);
OBJ_CLASS_DECLARATION (mca_ptl_gm_peer_t);
#endif #endif

302
src/mca/ptl/gm/src/ptl_gm_priv.c Исполняемый файл
Просмотреть файл

@ -0,0 +1,302 @@
/*
* $HEADER$
*/
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/errno.h>
#include <sys/types.h>
#include <sys/fcntl.h>
/*#include <netinet/gm.h>*/
#include <netinet/in.h>
#include <arpa/inet.h>
#include "include/types.h"
#include "mca/pml/base/pml_base_sendreq.h"
#include "mca/ns/base/base.h"
#include "ptl_gm.h"
#include "ptl_gm_addr.h"
#include "ptl_gm_peer.h"
#include "ptl_gm_proc.h"
#include "ptl_gm_sendfrag.h"
#include "ptl_gm_priv.h"
int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
mca_ptl_gm_send_frag_t *fragment,
struct mca_pml_base_send_request_t *sendreq,
size_t offset,
size_t *size,
int flags)
{
struct iovec outvec[1];
size_t size_in,size_out;
int header_length;
mca_ptl_base_frag_header_t* header;
header = (mca_ptl_base_frag_header_t*)fragment->send_buf;
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
size_in = *size;
outvec[0].iov_base = (char*)fragment->send_buf;
if( (size_in + header_length) < GM_SEND_BUF_SIZE ) outvec[0].iov_len = size_in;
else outvec[0].iov_len = GM_SEND_BUF_SIZE;
/*header_length = sizeof(mca_ptl_base_frag_header_t);*/
/* copy the header in the buffer */
/**header = fragment->send_frag.frag_base.frag_header;*/
if(size_in > 0) {
ompi_convertor_t *convertor;
int rc;
/* first fragment (eager send) and first fragment of long protocol
* can use the convertor initialized on the request, remaining
* fragments
* must copy/reinit the convertor as the transfer could be in
* parallel.
*/
if( offset <= mca_ptl_gm_module.super.ptl_first_frag_size ) {
convertor = &sendreq->req_convertor;
} else {
convertor = &(fragment->send_frag.frag_base.frag_convertor);
ompi_convertor_copy(&sendreq->req_convertor, convertor);
ompi_convertor_init_for_send(
convertor,
0,
sendreq->req_base.req_datatype,
sendreq->req_base.req_count,
sendreq->req_base.req_addr,
offset);
}
/* if data is contigous convertor will return an offset
* into users buffer - otherwise will return an allocated buffer
* that holds the packed data
*/
/*XXX: need to add the header */
/*copy the data to the registered buffer*/
outvec[0].iov_base = ((char*)fragment->send_buf) + header_length;
outvec[0].iov_len -= header_length; /*XXXcheck this */
if((rc = ompi_convertor_pack(convertor, &(outvec[0]), 1)) < 0)
return OMPI_ERROR;
}
/* update the fields */
outvec[0].iov_len += header_length;
outvec[0].iov_base = fragment->send_buf;
/* adjust size and request offset to reflect actual number of bytes
* packed by convertor */
size_out = outvec[0].iov_len;
/* initiate the gm send */
gm_send_with_callback(ptl_peer->peer_ptl->my_port, fragment->send_buf,
GM_SIZE,size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
ptl_peer->port_number,send_callback, (void *)fragment );
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
fragment->send_frag.frag_base.frag_addr = outvec[0].iov_base;
fragment->send_frag.frag_base.frag_size = size_out; /*XXX: should this be size_out */
return OMPI_SUCCESS;
}
void send_callback(struct gm_port *port,void * context, gm_status_t status)
{
#if 1
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *frag;
ompi_list_t *list;
int bytes;
mca_pml_base_send_request_t *gm_send_req;
frag = (mca_ptl_gm_send_frag_t *)context;
ptl = (mca_ptl_gm_module_t *)frag->ptl;
gm_send_req = frag->req;
switch (status)
{
case GM_SUCCESS:
/* send completed, can reuse the user buffer */
ptl->num_send_tokens++;
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
gm_send_req,bytes);
/*ompi_list_append(&(ptl->gm_send_frags_queue),(ompi_list_item_t
* *)frag); */
list = (ompi_list_t *)(&(ptl->gm_send_frags_queue));
ompi_list_remove_first(list);
break;
case GM_SEND_TIMED_OUT:
/* need to take care of retransmission */
break;
case GM_SEND_DROPPED:
/* need to handle this case */
break;
default:
ompi_output(0,
"[%s:%d] error in message completion\n",__FILE__,__LINE__);
break;
}
#endif
}
void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t * header)
{
#if 1
mca_ptl_gm_send_frag_t * frag;
mca_pml_base_send_request_t *req;
frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval;
req = (mca_pml_base_send_request_t *) frag->req;
req->req_peer_match = header->hdr_ack.hdr_dst_match;
/* return the send fragment to the free list */
#endif
}
void ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
gm_recv_event_t* event )
{
#if 1
mca_ptl_gm_recv_frag_t * recv_frag;
bool matched;
mca_ptl_base_header_t *header;
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
/* allocate a receive fragment */
recv_frag->frag_recv.frag_base.frag_peer = NULL;
recv_frag->frag_recv.frag_request = NULL;
recv_frag->frag_recv.frag_is_buffered = false;
recv_frag->frag_hdr_cnt = 0;
recv_frag->frag_msg_cnt = 0;
recv_frag->frag_ack_pending = false;
recv_frag->frag_progressed = 0;
recv_frag->frag_recv.frag_base.frag_header = *header;
recv_frag->frag_recv.frag_base.frag_addr =
(char *)header + sizeof(mca_ptl_base_header_t);
recv_frag->frag_recv.frag_base.frag_size = gm_ntohl(event->recv.length);
/* header->hdr_frag.hdr_frag_length; */
matched = mca_ptl_base_match(
&recv_frag->frag_recv.frag_base.frag_header.hdr_match,
&(recv_frag->frag_recv),
NULL );
if (!matched)
{
ompi_output(0,"matching receive not yet posted\n");
}
/**/
#endif
}
int ptl_gm_handle_recv(mca_ptl_gm_module_t *ptl, gm_recv_event_t* event )
{
#if 1
/*int matched;*/
mca_ptl_base_header_t *header;
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
switch(header->hdr_common.hdr_type)
{
case MCA_PTL_HDR_TYPE_MATCH:
case MCA_PTL_HDR_TYPE_FRAG:
ptl_gm_data_frag( ptl, event );
break;
case MCA_PTL_HDR_TYPE_ACK:
case MCA_PTL_HDR_TYPE_NACK:
ptl_gm_ctrl_frag(ptl,header);
break;
default:
ompi_output(0,"[%s:%d] unexpected frag type %d\n",
__FILE__,__LINE__,header->hdr_common.hdr_type);
break;
}
#endif
return OMPI_SUCCESS;
}
int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
{
/*#if 0*/
int i,rc;
int num_ptls;
gm_recv_event_t *event;
void * mesg;
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_recv_frag_t * frag;
ompi_list_item_t* item;
num_ptls = gm_comp->gm_num_ptl_modules;
for (i=0; i< num_ptls; i++)
{
ptl = gm_comp->gm_ptl_modules[i];
{
event = gm_receive(ptl->my_port);
switch (gm_ntohc(event->recv.type))
{
case GM_RECV_EVENT:
case GM_HIGH_RECV_EVENT:
case GM_PEER_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT:
mesg = gm_ntohp(event->recv.message);
ptl_gm_handle_recv( ptl, event );
/* post a replacement buffer */ /*XXX: do this after frag done*/
OMPI_FREE_LIST_GET( &(ptl->gm_recv_frags), item, rc );
if(rc != OMPI_SUCCESS)
ompi_output(0,"unable to allocate a buffer\n");
frag = (mca_ptl_gm_recv_frag_t*)item;
/*frag =(mca_ptl_gm_recv_frag_t *) st_remove_first (*/
/*&ptl->gm_recv_frags);*/
gm_provide_receive_buffer(ptl->my_port,
frag->alloc_recv_buffer,
GM_SIZE,
GM_LOW_PRIORITY
);
case GM_NO_RECV_EVENT:
break;
default:
gm_unknown(ptl->my_port, event);
}
}
}
return 0;
/* #endif*/
}

Просмотреть файл

@ -5,11 +5,38 @@
#include "event/event.h" #include "event/event.h"
#include "mca/pml/pml.h" #include "mca/pml/pml.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "ptl_gm_peer.h"
#include "ptl_gm_sendfrag.h"
#include "gm.h" #include "gm.h"
/* maintain list of registered buffers for send and receive */ /* maintain list of registered buffers for send and receive */
struct reg_buf { /*struct reg_buf {*/
void *start; /* pointer to registered memory */ /*void *start; pointer to registered memory */
int length; /*int length;*/
}; /*};*/
void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t * header);
void ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
gm_recv_event_t* event );
int ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_event_t* event );
int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp);
int
mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
mca_ptl_gm_send_frag_t *fragment,
struct mca_pml_base_send_request_t *sendreq,
size_t offset,
size_t *size,
int flags);
void send_callback(struct gm_port *port,void * context, gm_status_t
status);

Просмотреть файл

@ -83,10 +83,18 @@ mca_ptl_gm_proc_create (mca_ptl_gm_module_t * ptl, ompi_proc_t * ompi_proc)
int i; int i;
mca_ptl_gm_proc_t *ptl_proc; mca_ptl_gm_proc_t *ptl_proc;
ptl_proc = mca_ptl_gm_proc_lookup_ompi (ompi_proc);
if (ptl_proc != NULL)
{
return ptl_proc;
}
/* only gm ptl opened */ /* only gm ptl opened */
ptl_proc = OBJ_NEW (mca_ptl_gm_proc_t); ptl_proc = OBJ_NEW (mca_ptl_gm_proc_t);
ptl_proc->proc_ompi = ompi_proc; ptl_proc->proc_ompi = ompi_proc;
/* Extract exposed addresses from remote proc */ /* Extract exposed addresses from remote proc */
rc = mca_base_modex_recv (&mca_ptl_gm_component.super.ptlm_version, rc = mca_base_modex_recv (&mca_ptl_gm_component.super.ptlm_version,
ompi_proc, (void **) &ptl_proc->proc_addrs, ompi_proc, (void **) &ptl_proc->proc_addrs,
@ -107,18 +115,6 @@ mca_ptl_gm_proc_create (mca_ptl_gm_module_t * ptl, ompi_proc_t * ompi_proc)
} }
ptl_proc->proc_addr_count = size / sizeof (mca_ptl_gm_addr_t); ptl_proc->proc_addr_count = size / sizeof (mca_ptl_gm_addr_t);
for (i = 0; i < ptl_proc->proc_addr_count; i++) {
/*convert from global id to local id */
if (GM_SUCCESS !=
gm_global_id_to_node_id (ptl->my_port,
ptl_proc->proc_addrs[i].global_id,
ptl_proc->proc_addrs[i].local_id)) {
ompi_output (0,
"[%s:%d] error in converting global to local id \n",
__FILE__, __LINE__);
}
}
/* allocate space for peer array - one for each exported address */ /* allocate space for peer array - one for each exported address */
ptl_proc->peer_arr = (mca_ptl_gm_peer_t **) ptl_proc->peer_arr = (mca_ptl_gm_peer_t **)
@ -131,6 +127,10 @@ mca_ptl_gm_proc_create (mca_ptl_gm_module_t * ptl, ompi_proc_t * ompi_proc)
return NULL; return NULL;
} }
if(NULL == mca_ptl_gm_component.gm_local && ompi_proc ==
ompi_proc_local() )
mca_ptl_gm_component.gm_local = ptl_proc;
return ptl_proc; return ptl_proc;
} }

Просмотреть файл

@ -24,6 +24,7 @@ struct mca_ptl_gm_proc_t {
size_t proc_peer_count; size_t proc_peer_count;
size_t proc_addr_count; size_t proc_addr_count;
struct mca_ptl_gm_peer_t **peer_arr; struct mca_ptl_gm_peer_t **peer_arr;
ompi_process_name_t proc_guid;
}; };
typedef struct mca_ptl_gm_proc_t mca_ptl_gm_proc_t; typedef struct mca_ptl_gm_proc_t mca_ptl_gm_proc_t;

Просмотреть файл

@ -21,7 +21,7 @@ OBJ_CLASS_DECLARATION (mca_ptl_gm_send_request_t);
struct mca_ptl_gm_send_request_t { struct mca_ptl_gm_send_request_t {
mca_pml_base_send_request_t super; mca_pml_base_send_request_t super;
/* add stuff here */ /* add stuff here */
mca_ptl_gm_send_frag_t req_frag; mca_ptl_gm_send_frag_t *req_frag;
}; };
typedef struct mca_ptl_gm_send_request_t mca_ptl_gm_send_request_t; typedef struct mca_ptl_gm_send_request_t mca_ptl_gm_send_request_t;

Просмотреть файл

@ -7,10 +7,15 @@
#include "include/types.h" #include "include/types.h"
#include "datatype/datatype.h" #include "datatype/datatype.h"
#include "mca/pml/base/pml_base_sendreq.h" #include "mca/pml/base/pml_base_sendreq.h"
#include "mca/pml/base/pml_base_recvreq.h"
#include "mca/ptl/base/ptl_base_sendfrag.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_gm.h" #include "ptl_gm.h"
#include "ptl_gm_peer.h" #include "ptl_gm_peer.h"
#include "ptl_gm_proc.h" #include "ptl_gm_proc.h"
#include "ptl_gm_sendfrag.h" #include "ptl_gm_sendfrag.h"
#include "ptl_gm_priv.h"
#define frag_header super.super.frag_header #define frag_header super.super.frag_header
#define frag_owner super.super.frag_owner #define frag_owner super.super.frag_owner
@ -21,6 +26,8 @@
static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag); static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag);
static void mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t * frag); static void mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t * frag);
static void mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t * frag);
static void mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t * frag);
ompi_class_t mca_ptl_gm_send_frag_t_class = { ompi_class_t mca_ptl_gm_send_frag_t_class = {
"mca_ptl_gm_send_frag_t", "mca_ptl_gm_send_frag_t",
@ -45,53 +52,140 @@ mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t * frag)
} }
/*XXX : take care of multi threading*/
mca_ptl_gm_send_frag_t *
mca_ptl_gm_alloc_send_frag(struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t * sendreq)
{
ompi_free_list_t *flist;
ompi_list_item_t *item;
mca_ptl_gm_send_frag_t *frag;
mca_ptl_tstamp_t tstamp = 0;
flist =&( ((mca_ptl_gm_module_t *)ptl)->gm_send_frags );
item = ompi_list_remove_first(&((flist)->super));
while(NULL == item)
{
ptl->ptl_component->ptlm_progress(tstamp);
item = ompi_list_remove_first (&((flist)->super));
}
frag = (mca_ptl_gm_send_frag_t *)item;
frag->req = (struct mca_pml_base_send_request_t *)sendreq;
frag->type = 0 ;/* XXX: should be EAGER_SEND; */
return frag;
}
int mca_ptl_gm_send_frag_init(
mca_ptl_gm_send_frag_t* sendfrag,
mca_ptl_gm_peer_t * ptl_peer,
mca_pml_base_send_request_t * sendreq,
size_t offset,
size_t* size,
int flags)
{
int header_length;
mca_ptl_base_header_t *hdr;
void *buffer;
buffer = sendfrag->send_buf;
hdr = (mca_ptl_base_header_t *)sendfrag->send_buf;
if (offset == 0) {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* pointer to the frag */
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length= sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
header_length = sizeof (mca_ptl_base_match_header_t);
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_frag_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
header_length = sizeof (mca_ptl_base_frag_header_t);
}
/*initialize convertor */
#if 0
/*fragment state*/
sendfrag->frag_base.frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->frag_base.frag_peer = ptl_peer;
sendfrag->frag_base.frag_addr = NULL;
sendfrag->frag_base.frag_size = *size;
#endif
return OMPI_SUCCESS;
}
ompi_class_t mca_ptl_gm_recv_frag_t_class = {
"mca_ptl_gm_recv_frag_t",
OBJ_CLASS (mca_ptl_base_recv_frag_t),
(ompi_construct_t) mca_ptl_gm_recv_frag_construct,
(ompi_construct_t) mca_ptl_gm_recv_frag_destruct
};
/* /*
static void send_callback(struct gm_port *, gm_status) * recv fragment constructor/destructors.
*/
static void
mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t * frag)
{ {
gm_status_t status; frag->frag_hdr_cnt = 0;
frag->frag_msg_cnt = 0;
switch (status)
{
case GM_SUCCESS:
break;
case GM_SEND_TIMED_OUT:
break;
case GM_SEND_DROPPED:
break;
default:
break;
}
} }
static void
mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t *frag)
static void put_callback(struct gm_port *, gm_status)
{ {
;
}
mca_ptl_gm_recv_frag_t *
mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl)
{
ompi_free_list_t *flist;
ompi_list_item_t *item;
mca_ptl_gm_recv_frag_t *frag;
mca_ptl_tstamp_t tstamp = 0;
flist =&( ((mca_ptl_gm_module_t *)ptl)->gm_recv_frags_free);
item = ompi_list_remove_first(&((flist)->super));
while(NULL == item)
{
ptl->ptl_component->ptlm_progress(tstamp);
item = ompi_list_remove_first (&((flist)->super));
}
frag = (mca_ptl_gm_recv_frag_t *)item;
return frag;
gm_status_t status;
switch (status)
{
case GM_SUCCESS:
break;
case GM_SEND_TIMED_OUT:
break;
case GM_SEND_DROPPED:
break;
default:
break;
}
} }
*/

Просмотреть файл

@ -13,26 +13,67 @@
#include "os/atomic.h" #include "os/atomic.h"
#include "ompi_config.h" #include "ompi_config.h"
#include "mca/pml/base/pml_base_sendreq.h" #include "mca/pml/base/pml_base_sendreq.h"
#include "mca/pml/base/pml_base_recvreq.h"
#include "mca/ptl/base/ptl_base_sendfrag.h" #include "mca/ptl/base/ptl_base_sendfrag.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_gm.h" #include "ptl_gm.h"
#include "ptl_gm_priv.h" /*#include "ptl_gm_priv.h"*/
#include "ptl_gm_peer.h"
OBJ_CLASS_DECLARATION (mca_ptl_gm_send_frag_t); OBJ_CLASS_DECLARATION (mca_ptl_gm_send_frag_t);
OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t);
struct mca_ptl_base_peer_t; /*struct mca_ptl_base_peer_t;*/
/** /**
* GM send fragment derived type. * GM send fragment derived type.
*/ */
struct mca_ptl_gm_send_frag_t { struct mca_ptl_gm_send_frag_t {
mca_ptl_base_send_frag_t super; /**< base send fragment descriptor */ mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */
struct reg_buf *sbuf; void * send_buf;
mca_pml_base_send_request_t *req;
mca_ptl_gm_module_t *ptl;
/*mca_ptl_gm_peer_t *peer;*/
int status;
int type;
}; };
typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t; typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t;
#define MCA_PTL_GM_SEND_FRAG_ALLOC(item, rc) \ /*#define MCA_PTL_GM_SEND_FRAG_ALLOC(item, rc) \*/
OMPI_FREE_LIST_GET(&mca_ptl_gm_module.gm_send_frags, item, rc); /*OMPI_FREE_LIST_GET(&mca_ptl_gm_module.gm_send_frags, item, rc);*/
struct mca_ptl_gm_recv_frag_t {
mca_ptl_base_recv_frag_t frag_recv;
size_t frag_hdr_cnt;
size_t frag_msg_cnt;
volatile int frag_progressed;
bool frag_ack_pending;
void *alloc_recv_buffer;
void *unex_recv_buffer;
};
typedef struct mca_ptl_gm_recv_frag_t mca_ptl_gm_recv_frag_t;
mca_ptl_gm_send_frag_t *
mca_ptl_gm_alloc_send_frag ( struct mca_ptl_base_module_t *ptl,
struct mca_pml_base_send_request_t *sendreq);
int
mca_ptl_gm_send_frag_init( mca_ptl_gm_send_frag_t* sendfrag,
mca_ptl_gm_peer_t * ptl_peer,
mca_pml_base_send_request_t * sendreq,
size_t offset,
size_t* size,
int flags);
mca_ptl_gm_recv_frag_t *
mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl);
#endif #endif