From e2a66e7a819b39857385afe0250118db974e230f Mon Sep 17 00:00:00 2001 From: Gopal Santhanaraman Date: Thu, 12 Aug 2004 23:37:37 +0000 Subject: [PATCH] Commit a temporary version. George want to give a try ... ) This commit was SVN r2101. --- src/mca/ptl/gm/src/Makefile.am | 2 + src/mca/ptl/gm/src/ptl_gm.c | 308 +++++++++++++++++--------- src/mca/ptl/gm/src/ptl_gm.h | 39 +++- src/mca/ptl/gm/src/ptl_gm_component.c | 276 ++++++++++++----------- src/mca/ptl/gm/src/ptl_gm_peer.h | 17 +- src/mca/ptl/gm/src/ptl_gm_priv.c | 302 +++++++++++++++++++++++++ src/mca/ptl/gm/src/ptl_gm_priv.h | 35 ++- src/mca/ptl/gm/src/ptl_gm_proc.c | 24 +- src/mca/ptl/gm/src/ptl_gm_proc.h | 1 + src/mca/ptl/gm/src/ptl_gm_req.h | 2 +- src/mca/ptl/gm/src/ptl_gm_sendfrag.c | 176 +++++++++++---- src/mca/ptl/gm/src/ptl_gm_sendfrag.h | 53 ++++- 12 files changed, 931 insertions(+), 304 deletions(-) create mode 100755 src/mca/ptl/gm/src/ptl_gm_priv.c diff --git a/src/mca/ptl/gm/src/Makefile.am b/src/mca/ptl/gm/src/Makefile.am index 2e0188bd88..4575b833b2 100644 --- a/src/mca/ptl/gm/src/Makefile.am +++ b/src/mca/ptl/gm/src/Makefile.am @@ -14,7 +14,9 @@ libmca_ptl_gm_la_SOURCES = \ ptl_gm.h \ ptl_gm_component.c \ ptl_gm_peer.h \ + ptl_gm_peer.c \ ptl_gm_priv.h \ + ptl_gm_priv.c \ ptl_gm_proc.c \ ptl_gm_proc.h \ ptl_gm_req.c \ diff --git a/src/mca/ptl/gm/src/ptl_gm.c b/src/mca/ptl/gm/src/ptl_gm.c index 92c35a5f93..d0db87a2a2 100644 --- a/src/mca/ptl/gm/src/ptl_gm.c +++ b/src/mca/ptl/gm/src/ptl_gm.c @@ -19,39 +19,41 @@ #include "ptl_gm_req.h" #include "ptl_gm_req.c" #include "ptl_gm_peer.h" +#include "ptl_gm_priv.h" mca_ptl_gm_module_t mca_ptl_gm_module = { { - &mca_ptl_gm_component.super, - 1, /* max size of request cache */ - sizeof(mca_ptl_gm_send_frag_t), /* bytes required by ptl for a request */ - 0, /* max size of first fragment */ - 0, /* min fragment size */ - 0, /* max fragment size */ - 0, /* exclusivity */ - 0, /* latency */ - 0, /* bandwidth */ - MCA_PTL_PUT, /* ptl flags */ + &mca_ptl_gm_component.super, + 1, /* max size of request cache */ + sizeof(mca_ptl_gm_send_frag_t), /* bytes required by ptl for a request */ + 0, /* max size of first fragment */ + 0, /* min fragment size */ + 0, /* max fragment size */ + 0, /* exclusivity */ + 0, /* latency */ + 0, /* bandwidth */ + MCA_PTL_PUT, /* ptl flags */ - /* collection of interfaces */ - mca_ptl_gm_add_procs, - mca_ptl_gm_del_procs, - mca_ptl_gm_finalize, - NULL, /* JMS: Need send here */ - mca_ptl_gm_put, - mca_ptl_gm_get, - mca_ptl_gm_matched, - NULL, /* JMS need request init here */ - NULL, /* JMS need request fini here */ - NULL, /* JMS need match here */ - NULL, /* JMS need send_progress here */ - NULL, /* JMS need recv_progress here */ + /* collection of interfaces */ + mca_ptl_gm_add_procs, + mca_ptl_gm_del_procs, + mca_ptl_gm_finalize, + mca_ptl_gm_send, /* JMS: Need send here */ + mca_ptl_gm_put, + mca_ptl_gm_get, + mca_ptl_gm_matched, + mca_ptl_gm_request_init, /* JMS need request init here */ + mca_ptl_gm_request_fini, /* JMS need request fini here */ + NULL, /* JMS need match here */ + NULL, /* JMS need send_progress here */ + NULL /* JMS need recv_progress here */ } }; -OBJ_CLASS_INSTANCE (mca_ptl_gm_recv_frag_t, - mca_ptl_base_recv_frag_t, NULL, NULL); + +/*OBJ_CLASS_INSTANCE (mca_ptl_gm_recv_frag_t,*/ + /*mca_ptl_base_recv_frag_t, NULL, NULL);*/ OBJ_CLASS_INSTANCE (mca_ptl_gm_send_request_t, mca_pml_base_send_request_t, NULL, NULL); @@ -72,10 +74,12 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl, struct mca_ptl_base_peer_t **peers, ompi_bitmap_t * reachable) { - int i; + int i,j; + int num_peer_ptls = 1; struct ompi_proc_t *ompi_proc; mca_ptl_gm_proc_t *ptl_proc; mca_ptl_gm_peer_t *ptl_peer; + unsigned int lid; for (i = 0; i < nprocs; i++) { ompi_proc = ompi_procs[i]; @@ -87,31 +91,60 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl, return OMPI_ERR_OUT_OF_RESOURCE; } - OMPI_THREAD_LOCK (&ptl_proc->proc_lock); if (ptl_proc->proc_addr_count == ptl_proc->proc_peer_count) { OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock); return OMPI_ERR_UNREACH; } + /* TODO: make this extensible to multiple nics */ + /* XXX: */ + /* FIXME: */ + + for (j=0; j < num_peer_ptls; j++) + { + /*XXX: check for self */ + ptl_peer = OBJ_NEW (mca_ptl_gm_peer_t); if (NULL == ptl_peer) { OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock); return OMPI_ERR_OUT_OF_RESOURCE; } - + ptl_peer->peer_ptl = (mca_ptl_gm_module_t *) ptl; ptl_peer->peer_proc = ptl_proc; - ptl_proc->peer_arr[ptl_proc->proc_peer_count] = ptl_peer; - ptl_proc->proc_peer_count++; + ptl_peer->global_id = ptl_proc->proc_addrs->global_id; + ptl_peer->port_number = ptl_proc->proc_addrs->port_id; + if (GM_SUCCESS != + gm_global_id_to_node_id (((mca_ptl_gm_module_t *) ptl)->my_port, + ptl_proc->proc_addrs[j].global_id, + &lid)) { + ompi_output (0, + "[%s:%d] error in converting global to local id \n", __FILE__, __LINE__); + } + ptl_peer->local_id = lid; + + ptl_proc->peer_arr[0] = ptl_peer; + ptl_proc->proc_peer_count++; ptl_peer->peer_addr = ptl_proc->proc_addrs + i; + } ompi_bitmap_set_bit (reachable, i); OMPI_THREAD_UNLOCK (&ptl_proc->proc_lock); - peers[i] = ptl_peer; + peers[i] = (struct mca_ptl_base_peer_t*)ptl_peer; + + /*printf ("Global_id\t local_id\t port_number\t process name \n");*/ + /*fflush (stdout);*/ + /*printf ("%u %d %d %d\n", ptl_proc->peer_arr[0]->global_id,*/ + /*ptl_proc->peer_arr[0]->local_id,*/ + /*ptl_proc->peer_arr[0]->port_number, + * ptl_proc->proc_guid);*/ + /*fflush (stdout);*/ + } + printf ("returning with success from gm_add_procs\n"); return OMPI_SUCCESS; } @@ -148,49 +181,99 @@ mca_ptl_gm_finalize (struct mca_ptl_base_module_t *ptl) return OMPI_SUCCESS; } - - - -/* - * - */ - int -mca_ptl_gm_request_alloc (struct mca_ptl_base_module_t *ptl, - struct mca_pml_base_send_request_t **request) +mca_ptl_gm_request_init(struct mca_ptl_base_module_t *ptl, + struct mca_pml_base_send_request_t *request) { - int rc; - mca_pml_base_send_request_t *sendreq; - ompi_list_item_t *item; -#if 0 - OMPI_FREE_LIST_GET (&mca_ptl_gm_module.gm_send_req, item, rc); - - if (NULL != (sendreq = (mca_pml_base_send_request_t *) item)) - sendreq->req_owner = ptl; - *request = sendreq; /* the allocated memory must be registered */ -#endif - return rc; + mca_ptl_gm_send_frag_t *frag; + struct mca_ptl_gm_send_request_t *req; + frag = mca_ptl_gm_alloc_send_frag(ptl, request); + + if (NULL == frag) + { + ompi_output(0,"[%s:%d] Unable to allocate a gm send fragment\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + else + { + req = (mca_ptl_gm_send_request_t *)request; + /*((mca_ptl_gm_send_request_t *)request)->req_frag = frag;*/ + req->req_frag = frag; + frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/ + frag->ptl = (mca_ptl_gm_module_t*)ptl; + /*frag->peer = request->req_peer;*/ + } + + return OMPI_SUCCESS; } + + + /* * */ void -mca_ptl_gm_request_return (struct mca_ptl_base_module_t *ptl, +mca_ptl_gm_request_fini (struct mca_ptl_base_module_t *ptl, struct mca_pml_base_send_request_t *request) { - /*OMPI_FREE_LIST_RETURN(&mca_ptl_gm_module.gm_send_req, - (ompi_list_item_t*)request); */ - return; + + + mca_ptl_gm_send_frag_t *frag; + + frag = ((mca_ptl_gm_send_request_t *)request)->req_frag; + OMPI_FREE_LIST_RETURN(&(((mca_ptl_gm_module_t *)ptl)->gm_send_frags), + (ompi_list_item_t *)frag); + frag->status = 0;/*XXX: MCA_PTL_GM_FRAG_LOCAL; */ } +int +mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl, + struct mca_ptl_base_peer_t *ptl_peer, + struct mca_pml_base_send_request_t *sendreq, + size_t offset, size_t size, int flags) +{ + mca_ptl_gm_send_frag_t *sendfrag; + mca_ptl_gm_peer_t *gm_ptl_peer; + mca_ptl_gm_module_t * gm_ptl; + int rc; + gm_ptl = (mca_ptl_gm_module_t *)ptl; + if (offset == 0) { + sendfrag = ((mca_ptl_gm_send_request_t *)sendreq)->req_frag; + } else { + sendfrag = mca_ptl_gm_alloc_send_frag (ptl,sendreq); + if (NULL == sendfrag) { + ompi_output(0,"[%s:%d] Unable to allocate a gm send frag\n", + __FILE__, __LINE__); + return 0; /*XXX: return error */ + } + } + + ((struct mca_ptl_gm_send_request_t *)sendreq)->req_frag =sendfrag; + rc = mca_ptl_gm_send_frag_init (sendfrag, (mca_ptl_gm_peer_t*)ptl_peer, sendreq, offset, + &size, flags); + + /*initiate the send */ + gm_ptl_peer = (mca_ptl_gm_peer_t *)ptl_peer; + rc = mca_ptl_gm_peer_send (gm_ptl_peer,sendfrag,sendreq, + offset,&size,flags); + + gm_ptl->num_send_tokens--; + /*Update offset */ + sendreq->req_offset += size; /* XXX: should be what convertor packs */ + + /*append to the send_fragments_queue. */ + ompi_list_append (&(gm_ptl->gm_send_frags_queue), + (ompi_list_item_t *) sendfrag); + return rc; +} /* * Initiate a put @@ -202,28 +285,6 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl, struct mca_pml_base_send_request_t *sendreq, size_t offset, size_t size, int flags) { -#if 0 - mca_ptl_gm_send_frag_t *sendfrag; - int rc; - - if (offset == 0) { - sendfrag = &((mca_ptl_gm_send_request_t *) sendreq)->req_frag; - } else { - ompi_list_item_t *item; - OMPI_FREE_LIST_GET (&mca_ptl_gm_module.gm_send_frags, item, rc); - if (NULL == (sendfrag = (mca_ptl_gm_send_frag_t *) item)) - return rc; - } - - rc = mca_ptl_gm_send_frag_init (sendfrag, ptl_peer, sendreq, offset, - &size, flags); - - if (rc != OMPI_SUCCESS) - return rc; - - sendreq->req_offset += size; - return mca_ptl_gm_peer_send (ptl_peer, sendfrag); -#endif return OMPI_SUCCESS; } @@ -257,32 +318,77 @@ mca_ptl_gm_matched (mca_ptl_base_module_t * ptl, mca_ptl_base_recv_frag_t * frag) { -/* might need to send an ack back */ + /* might need to send an ack back */ +#if 1 + + mca_pml_base_recv_request_t *request; + /*mca_ptl_base_recv_request_t *request;*/ + mca_ptl_base_header_t *header; + int bytes_recv, rc; + mca_ptl_gm_module_t *gm_ptl; + struct iovec iov[1]; + + + header = &frag->frag_base.frag_header; + request = frag->frag_request; + if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { #if 0 - mca_ptl_base_header_t *header = &frag->super.frag_header; - if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { - int rc; - mca_ptl_gm_send_frag_t *ack; - mca_ptl_gm_recv_frag_t *recv_frag = - (mca_ptl_gm_recv_frag_t *) frag; - ompi_list_item_t *item; - MCA_PTL_GM_SEND_FRAG_ALLOC (item, rc); - ack = (mca_ptl_gm_send_frag_t *) item; - - if (NULL == ack) { - OMPI_THREAD_LOCK (&mca_ptl_gm_module.gm_lock); - recv_frag->frag_ack_pending = true; - ompi_list_append (&mca_ptl_gm_module.gm_pending_acks, - (ompi_list_item_t *) frag); - OMPI_THREAD_UNLOCK (&mca_ptl_gm_module.gm_lock); - } else { - mca_ptl_gm_send_frag_init_ack (ack, ptl, - recv_frag->super.super. - frag_peer, recv_frag); - mca_ptl_gm_peer_send (ack->super.super.frag_peer, ack); - } + int rc; + mca_ptl_gm_send_frag_t *ack; + recv_frag = (mca_ptl_gm_recv_frag_t *) frag; + ack = mca_ptl_gm_alloc_send_frag(ptl,NULL); + if (NULL == ack) { + ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n", + __FILE__,___LINE__); + OMPI_THREAD_LOCK (&mca_ptl_gm_module.gm_lock); + recv_frag->frag_ack_pending = true; + ompi_list_append (&mca_ptl_gm_module.gm_pending_acks, + (ompi_list_item_t *) frag); + OMPI_THREAD_UNLOCK (&mca_ptl_gm_module.gm_lock); + } else { + mca_ptl_gm_send_frag_init_ack (ack, ptl, + recv_frag->super.super. + frag_peer, recv_frag); + /*XXX: check this*/ + mca_ptl_gm_peer_send (ack->super.super.frag_peer, ack,0,0,0 ); } - /* process fragment if complete */ +#endif + + } + + iov[0].iov_base = ((char*)frag->frag_base.frag_addr) + header->hdr_common.hdr_size; + bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size; + iov[0].iov_len = bytes_recv; + + /*process fragment if complete */ + if (header->hdr_frag.hdr_frag_length > 0) { + ompi_proc_t *proc; + + proc = ompi_comm_peer_lookup(request->req_base.req_comm, + request->req_base.req_peer); + + ompi_convertor_copy(proc->proc_convertor, + &frag->frag_base.frag_convertor); + ompi_convertor_init_for_recv( + &frag->frag_base.frag_convertor, + 0, + request->req_base.req_datatype, + request->req_base.req_count, + request->req_base.req_addr, + header->hdr_frag.hdr_frag_offset); + rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1); + assert( rc == 1 ); + } + + /*update progress*/ /* XXX : check this */ + ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len ); + + + /*return to free list */ + gm_ptl = (mca_ptl_gm_module_t *)ptl; + OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags),(ompi_list_item_t*)frag); + + #endif } diff --git a/src/mca/ptl/gm/src/ptl_gm.h b/src/mca/ptl/gm/src/ptl_gm.h index 404d31885c..ecfc813063 100644 --- a/src/mca/ptl/gm/src/ptl_gm.h +++ b/src/mca/ptl/gm/src/ptl_gm.h @@ -16,11 +16,18 @@ #include "mca/ptl/ptl.h" #include "gm.h" + #define MCA_PTL_GM_STATISTICS 0 -#define SIZE 30 +#define GM_SIZE 30 #define THRESHOLD 16384 #define MAX_GM_PORTS 16 #define MAX_RECV_TOKENS 256 +#define PTL_GM_ADMIN_SEND_TOKENS 0 +#define PTL_GM_ADMIN_RECV_TOKENS 0 +#define GM_SEND_BUF_SIZE 16384 +#define GM_RECV_BUF_SIZE 16384 +#define NUM_RECV_FRAGS 100 +#define MCA_PTL_GM_FRAG_CACHED /** * GM PTL component @@ -33,7 +40,7 @@ struct mca_ptl_gm_component_t { int gm_free_list_num; /**< initial size of free lists */ int gm_free_list_max; /**< maximum size of free lists */ int gm_free_list_inc; /**< number of elements to alloc when growing free lists */ - + struct mca_ptl_gm_proc_t* gm_local; ompi_list_t gm_procs; ompi_list_t gm_send_req; @@ -51,16 +58,19 @@ extern mca_ptl_gm_component_t mca_ptl_gm_component; struct mca_ptl_gm_module_t { mca_ptl_base_module_t super; /**< base PTL module interface */ struct gm_port *my_port; - unsigned int my_lid; - unsigned int my_gid; + unsigned int my_local_id; + unsigned int my_global_id; unsigned int my_port_id; unsigned int num_send_tokens; unsigned int num_recv_tokens; unsigned int max_send_tokens; unsigned int max_recv_tokens; - struct mca_ptl_gm_addr_t *proc_id_table; + /*struct mca_ptl_gm_addr_t *proc_id_table;*/ ompi_free_list_t gm_send_frags; + ompi_free_list_t gm_recv_frags; + ompi_free_list_t gm_recv_frags_free; + ompi_list_t gm_send_frags_queue; ompi_list_t gm_pending_acks; #if MCA_PTL_GM_STATISTICS @@ -111,6 +121,19 @@ extern int mca_ptl_gm_component_control (int param, extern int mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp); + +/** + * GM send + */ + +extern int mca_ptl_gm_send (struct mca_ptl_base_module_t *ptl, + struct mca_ptl_base_peer_t *ptl_peer, + struct mca_pml_base_send_request_t *sendreq, + size_t offset, size_t size, int flags); + + + + /** * GM put */ @@ -173,16 +196,16 @@ extern int mca_ptl_gm_del_procs (struct mca_ptl_base_module_t *ptl, * @return Status indicating if allocation was successful. * */ -extern int mca_ptl_gm_request_alloc (struct mca_ptl_base_module_t *ptl, +extern int mca_ptl_gm_request_init (struct mca_ptl_base_module_t *ptl, struct mca_pml_base_send_request_t - **); + *req); /** * */ -extern void mca_ptl_gm_request_return (struct mca_ptl_base_module_t *ptl, +extern void mca_ptl_gm_request_fini (struct mca_ptl_base_module_t *ptl, struct mca_pml_base_send_request_t *); diff --git a/src/mca/ptl/gm/src/ptl_gm_component.c b/src/mca/ptl/gm/src/ptl_gm_component.c index 511c258c4d..1714628198 100644 --- a/src/mca/ptl/gm/src/ptl_gm_component.c +++ b/src/mca/ptl/gm/src/ptl_gm_component.c @@ -24,7 +24,7 @@ #include "ptl_gm_addr.h" #include "ptl_gm_proc.h" #include "ptl_gm_req.h" - +#include "ptl_gm_priv.h" mca_ptl_gm_component_t mca_ptl_gm_component = { { @@ -118,10 +118,10 @@ mca_ptl_gm_component_open (void) mca_ptl_gm_module.super.ptl_first_frag_size = mca_ptl_gm_param_register_int ("first_frag_size", 16 * 1024); mca_ptl_gm_module.super.ptl_min_frag_size = - mca_ptl_gm_param_register_int ("min_frag_size", 0); - mca_ptl_gm_module.super.ptl_min_frag_size = + mca_ptl_gm_param_register_int ("min_frag_size", 1<<16); + mca_ptl_gm_component.gm_free_list_num = mca_ptl_gm_param_register_int ("free_list_num", 32); - mca_ptl_gm_module.super.ptl_min_frag_size = + mca_ptl_gm_component.gm_free_list_inc = mca_ptl_gm_param_register_int ("free_list_inc", 32); return OMPI_SUCCESS; @@ -137,38 +137,36 @@ mca_ptl_gm_component_open (void) int mca_ptl_gm_component_close (void) { - - /* if (OMPI_SUCCESS != ompi_mca_ptl_gm_finalize(&mca_ptl_gm_component)) { +#ifdef GOPAL_TODO + if (OMPI_SUCCESS != ompi_mca_ptl_gm_finalize(&mca_ptl_gm_component)) { ompi_output(0, "[%s:%d] error in finalizing gm state and PTL's.\n", __FILE__, __LINE__); return NULL; - } */ + } +#endif if (NULL != mca_ptl_gm_component.gm_ptl_modules) free (mca_ptl_gm_component.gm_ptl_modules); OBJ_DESTRUCT (&mca_ptl_gm_component.gm_procs); OBJ_DESTRUCT (&mca_ptl_gm_component.gm_send_req); + OBJ_DESTRUCT (&mca_ptl_gm_component.gm_lock); - return ompi_event_fini (); + return OMPI_SUCCESS; } - - - /* * Create a ptl instance and add to components list. */ static int -mca_ptl_gm_create (void) +mca_ptl_gm_create (int i) { mca_ptl_gm_module_t *ptl; - char param[256]; - ptl = malloc (sizeof (mca_ptl_gm_module_t)); + ptl = (mca_ptl_gm_module_t *)malloc (sizeof (mca_ptl_gm_module_t)); if (NULL == ptl) { ompi_output (0, " ran out of resource to allocate ptl_instance \n"); @@ -176,7 +174,7 @@ mca_ptl_gm_create (void) } memcpy (ptl, &mca_ptl_gm_module, sizeof (mca_ptl_gm_module)); - mca_ptl_gm_component.gm_ptl_modules[mca_ptl_gm_component.gm_num_ptl_modules++] = ptl; + mca_ptl_gm_component.gm_ptl_modules[i] = ptl; return OMPI_SUCCESS; } @@ -192,20 +190,7 @@ mca_ptl_gm_create (void) static int mca_ptl_gm_module_create_instances (void) { - - int i; - int maxptls = 1; /* maxptls set to 1 */ - /* allocate memory for ptls */ - - mca_ptl_gm_component.gm_max_ptl_modules = maxptls; - mca_ptl_gm_component.gm_ptl_modules = malloc (maxptls * sizeof (mca_ptl_gm_module_t *)); - if (NULL == mca_ptl_gm_component.gm_ptl_modules) - return OMPI_ERR_OUT_OF_RESOURCE; - - for (i = 0; i < maxptls; i++) { - mca_ptl_gm_create (); - } - return OMPI_SUCCESS; + return 0; } @@ -225,7 +210,7 @@ mca_ptl_gm_module_store_data_toexchange (void) mca_ptl_gm_addr_t *addrs; size = mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_gm_addr_t); - addrs = malloc (size); + addrs = (mca_ptl_gm_addr_t *)malloc (size);/*XXX: check this out */ if (NULL == addrs) { return OMPI_ERR_OUT_OF_RESOURCE; @@ -233,8 +218,8 @@ mca_ptl_gm_module_store_data_toexchange (void) for (i = 0; i < mca_ptl_gm_component.gm_num_ptl_modules; i++) { mca_ptl_gm_module_t *ptl = mca_ptl_gm_component.gm_ptl_modules[i]; - addrs[i].local_id = ptl->my_lid; - addrs[i].global_id = ptl->my_gid; + addrs[i].local_id = ptl->my_local_id; + addrs[i].global_id = ptl->my_global_id; addrs[i].port_id = ptl->my_port_id; } rc = mca_base_modex_send (&mca_ptl_gm_component.super.ptlm_version, addrs, @@ -243,68 +228,61 @@ mca_ptl_gm_module_store_data_toexchange (void) return rc; } - - - -/* - * initialize a ptl interface - * - */ - static int ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm) { mca_ptl_gm_module_t *ptl; unsigned int board_no, port_no; - char *buffer_ptr; gm_status_t status; - int buf_len; int i; + int maxptls = 1; /* maxptls set to 1 */ - if (OMPI_SUCCESS != mca_ptl_gm_module_create_instances ()) { - return 0; + mca_ptl_gm_component.gm_max_ptl_modules = maxptls; + mca_ptl_gm_component.gm_ptl_modules = malloc (maxptls * + sizeof (mca_ptl_gm_module_t *)); + if (NULL == mca_ptl_gm_component.gm_ptl_modules) + return OMPI_ERR_OUT_OF_RESOURCE; + + for (i = 0; i < maxptls; i++) { + mca_ptl_gm_create (i); } - /*hack : we have set the gm_max_ptl_modules to 1 */ + /*Hack : we have set the gm_max_ptl_modules to 1 */ for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) { ptl = mca_ptl_gm_component.gm_ptl_modules[i]; /* open the first available gm port for this board */ board_no = i; - for (port_no = 2; port_no < MAX_GM_PORTS; port_no++) { + for (port_no = 2; port_no < MAX_GM_PORTS; port_no++) { + printf ("about to call open port\n"); + if (port_no == 3) continue; + /* port 0,1,3 reserved */ + status = gm_open (&(ptl->my_port), board_no, + port_no, "OMPI-GM", GM_API_VERSION_2_0); - printf ("about to call open port\n"); - if (port_no != 3) { - status = gm_open (&(ptl->my_port), board_no, port_no, "OMPI-GM", GM_API_VERSION_2_0); /* port 0,1,3 reserved */ - - if (GM_SUCCESS == status) { - ptl->my_port_id = port_no; - break; - } - } - - } + if (GM_SUCCESS == status) { + ptl->my_port_id = port_no; + break; + } + } +#if 1 /* Get node local Id */ - if (GM_SUCCESS != gm_get_node_id (ptl->my_port, &(ptl->my_lid))) { + if (GM_SUCCESS != gm_get_node_id (ptl->my_port, &(ptl->my_local_id))) { ompi_output (0, " failure to get local_id \n"); return 0; } +#endif /* Convert local id to global id */ if (GM_SUCCESS != - gm_node_id_to_global_id (ptl->my_port, ptl->my_lid, - &(ptl->my_gid))) { + gm_node_id_to_global_id (ptl->my_port, ptl->my_local_id, + &(ptl->my_global_id))) { ompi_output (0, " Error: Unable to get my GM global id \n"); return 0; } - } - /* publish GM parameters with the MCA framework */ - if (OMPI_SUCCESS != mca_ptl_gm_module_store_data_toexchange ()) - return 0; - return OMPI_SUCCESS; } @@ -315,26 +293,56 @@ ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm) static int ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm) { - int i; + int i, rc; mca_ptl_gm_module_t *ptl; gm_status_t status; - int buf_len; - void *buffer_ptr; + void *gm_send_reg_memory , *gm_recv_reg_memory; + ompi_free_list_t *fslist, *frlist, *free_rlist; + ompi_list_item_t *item; + mca_ptl_gm_send_frag_t *sfragment; + mca_ptl_gm_recv_frag_t *rfragment, *frag, *free_rfragment; for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) { ptl = mca_ptl_gm_component.gm_ptl_modules[i]; -#if 0 - /* initialise the free lists */ - ompi_free_list_init (&(mca_ptl_gm_component.gm_send_req), - sizeof (mca_ptl_gm_send_request_t), - OBJ_CLASS (mca_ptl_gm_send_request_t), - mca_ptl_gm_component.gm_free_list_num, - mca_ptl_gm_component.gm_free_list_max, - mca_ptl_gm_component.gm_free_list_inc, NULL); -#endif + ptl->num_send_tokens = gm_num_send_tokens (ptl->my_port); + ptl->num_send_tokens -= PTL_GM_ADMIN_SEND_TOKENS; + ptl->num_recv_tokens = gm_num_receive_tokens (ptl->my_port); + ptl->num_recv_tokens -= PTL_GM_ADMIN_RECV_TOKENS; - /** Receive part **/ + /****************SEND****************************/ + /* construct a list of send fragments */ + OBJ_CONSTRUCT (&(ptl->gm_send_frags), ompi_free_list_t); + OBJ_CONSTRUCT (&(ptl->gm_send_frags_queue), ompi_list_t); + fslist = &(ptl->gm_send_frags); + + ompi_free_list_init (&(ptl->gm_send_frags), + sizeof (mca_ptl_gm_send_frag_t), + OBJ_CLASS (mca_ptl_gm_send_frag_t), + 32, 32, 1, NULL); /* not using mpool */ + + /* allocate the elements */ + sfragment = (mca_ptl_gm_send_frag_t *) + malloc (sizeof(mca_ptl_gm_send_frag_t) * + (ptl->num_send_tokens)); + + /* allocate the registered memory */ + gm_send_reg_memory = gm_dma_malloc ( ptl->my_port, + (GM_SEND_BUF_SIZE * ptl->num_send_tokens) ); + + for (i = 0; i < ptl->num_send_tokens; i++) { + ompi_list_item_t *item; + sfragment->send_buf = gm_send_reg_memory; + item = (ompi_list_item_t *) sfragment; + ompi_list_append (&(fslist->super), item); + + gm_send_reg_memory = ((char *) gm_send_reg_memory + + GM_SEND_BUF_SIZE); + sfragment++; + + } + + /*****************RECEIVE*****************************/ /*allow remote memory access */ status = gm_allow_remote_memory_access (ptl->my_port); if (GM_SUCCESS != status) { @@ -342,42 +350,65 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm) } - ptl->num_send_tokens = gm_num_send_tokens (ptl->my_port); - ptl->num_recv_tokens = gm_num_receive_tokens (ptl->my_port); - /* set acceptable sizes */ - /*status = gm_set_acceptable_sizes(ptl->my_port, GM_LOW_PRIORITY, - * MASK);*/ + /* construct the list of recv fragments free */ + OBJ_CONSTRUCT (&(ptl->gm_recv_frags_free), ompi_free_list_t); + free_rlist = &(ptl->gm_recv_frags_free); + + /*allocate the elements */ + free_rfragment = (mca_ptl_gm_recv_frag_t *) + malloc(sizeof(mca_ptl_gm_recv_frag_t) * NUM_RECV_FRAGS); - /* post receive buffers for each available token */ - buf_len = THRESHOLD; - /*TODO need to provide buffers with two different sizes to distinguish - * between header and data */ - - for (i = 0; i < ptl->num_recv_tokens; i++) { - buffer_ptr = gm_dma_malloc (ptl->my_port, buf_len); - gm_provide_receive_buffer (ptl->my_port, buffer_ptr, - SIZE, GM_LOW_PRIORITY); + for (i = 0; i < NUM_RECV_FRAGS; i++) { + ompi_list_item_t *item; + item = (ompi_list_item_t *) free_rfragment; + ompi_list_append (&(free_rlist->super), item); /* XXX: check this */ + free_rfragment++; + } + + + /*construct the list of recv fragments*/ + OBJ_CONSTRUCT (&(ptl->gm_recv_frags), ompi_free_list_t); + frlist = &(ptl->gm_recv_frags); + + /*allocate the elements */ + rfragment = (mca_ptl_gm_recv_frag_t *) + malloc (sizeof (mca_ptl_gm_recv_frag_t) * + (ptl->num_recv_tokens - 1)); + + /*allocate the registered memory */ + gm_recv_reg_memory = + gm_dma_malloc (ptl->my_port, + (GM_RECV_BUF_SIZE * ptl->num_recv_tokens ) ); + + for (i = 0; i < ptl->num_recv_tokens ; i++) { + ompi_list_item_t *item; + rfragment->alloc_recv_buffer = gm_recv_reg_memory; + item = (ompi_list_item_t *) rfragment; + ompi_list_append (&(frlist->super), item); /* XXX: check this */ + gm_recv_reg_memory = ((char *) + gm_recv_reg_memory + GM_RECV_BUF_SIZE); + rfragment++; + } + + /*TODO : need to provide buffers with two different sizes + * to distinguish between header and data + * post receive buffers */ + for (i = 0; i < (ptl->num_recv_tokens-1) ; i++) { + OMPI_FREE_LIST_GET( &(ptl->gm_recv_frags), item, rc); + assert( rc == OMPI_SUCCESS ); + frag = (mca_ptl_gm_recv_frag_t*)item; + gm_provide_receive_buffer (ptl->my_port,frag->alloc_recv_buffer, + GM_SIZE, GM_LOW_PRIORITY); } -#if 0 - /** Send Part **/ - OBJ_CONSTRUCT (&mca_ptl_gm_module.gm_send_frag, ompi_free_list_t); - ompi_free_list_init (&(mca_ptl_gm_component.gm_send_frag), - sizeof (mca_ptl_gm_send_frag_t), - OBJ_CLASS (mca_ptl_gm_send_frag_t)); - /* allocate send buffers */ - total_registered_memory = max_send_buf * SIZE; - ptl->send_req->req_frag->head = - (struct send_buf *) gm_dma_malloc (ptl->my_port, - total_registered_memory); -#endif } return OMPI_SUCCESS; } + /* @@ -391,22 +422,11 @@ mca_ptl_gm_component_init (int *num_ptl_modules, bool * have_hidden_threads) { mca_ptl_base_module_t **ptls; - int rc; - unsigned int board_id, port_id; *num_ptl_modules = 0; *allow_multi_user_threads = false; *have_hidden_threads = false; - -/* - ompi_free_list_init (&(mca_ptl_gm_component.gm_send_req), - sizeof (mca_ptl_gm_send_request_t), - OBJ_CLASS (mca_ptl_gm_send_request_t), - mca_ptl_gm_component.gm_free_list_num, - mca_ptl_gm_component.gm_free_list_max, - mca_ptl_gm_component.gm_free_list_inc, NULL); -*/ if (OMPI_SUCCESS != ompi_mca_ptl_gm_init (&mca_ptl_gm_component)) { ompi_output (0, "[%s:%d] error in initializing gm state and PTL's.\n", @@ -421,14 +441,19 @@ mca_ptl_gm_component_init (int *num_ptl_modules, return NULL; } + /* publish GM parameters with the MCA framework */ + if (OMPI_SUCCESS != mca_ptl_gm_module_store_data_toexchange ()) + return 0; + /* return array of PTLs */ - ptls = malloc (mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_base_module_t *)); + ptls = (mca_ptl_base_module_t**) malloc ( + mca_ptl_gm_component.gm_num_ptl_modules * + sizeof (mca_ptl_base_module_t *)); if (NULL == ptls) { return NULL; } - memcpy (ptls, mca_ptl_gm_component.gm_ptl_modules, mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_gm_module_t *)); *num_ptl_modules = mca_ptl_gm_component.gm_num_ptl_modules; @@ -453,16 +478,17 @@ mca_ptl_gm_component_control (int param, void *value, size_t size) int mca_ptl_gm_component_progress (mca_ptl_tstamp_t tstamp) { - - + int rc; /* check the send queue to see if any pending send can proceed */ - - - /* check for recieve and , call ptl_match to send it to the upper + /* check for receive and , call ptl_match to send it to the upper level */ - - - /* in case matched, do the appropriate queuing. */ + rc = mca_ptl_gm_incoming_recv(&mca_ptl_gm_component); return OMPI_SUCCESS; + + /* check the send queue to see if any pending send can proceed */ + /* check for recieve and , call ptl_match to send it to the upper + level */ + /* in case matched, do the appropriate queuing. */ + } diff --git a/src/mca/ptl/gm/src/ptl_gm_peer.h b/src/mca/ptl/gm/src/ptl_gm_peer.h index aa7c2b7cbf..5e95887323 100644 --- a/src/mca/ptl/gm/src/ptl_gm_peer.h +++ b/src/mca/ptl/gm/src/ptl_gm_peer.h @@ -14,26 +14,31 @@ #include "event/event.h" #include "mca/pml/pml.h" #include "mca/ptl/ptl.h" - - - +#include "include/types.h" +/*#include "ptl_gm_sendfrag.h"*/ +#include "ptl_gm_proc.h" +#include "ptl_gm_addr.h" +#include "ptl_gm.h" /** * An abstraction that represents a connection to a peer process. */ struct mca_ptl_gm_peer_t { ompi_list_item_t super; - struct mca_ptl_gm_module_t *peer_ptl; struct mca_ptl_gm_proc_t *peer_proc; struct mca_ptl_gm_addr_t *peer_addr; /**< address of peer */ - + unsigned int global_id; + unsigned int port_number; + unsigned int local_id; int num_credits; int max_credits; int resending; int num_resend; }; typedef struct mca_ptl_gm_peer_t mca_ptl_gm_peer_t; +/*extern omp_class_t mca_ptl_gm_peer_t_class;*/ + +OBJ_CLASS_DECLARATION(mca_ptl_gm_peer_t); -OBJ_CLASS_DECLARATION (mca_ptl_gm_peer_t); #endif diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.c b/src/mca/ptl/gm/src/ptl_gm_priv.c new file mode 100755 index 0000000000..4f659f68fe --- /dev/null +++ b/src/mca/ptl/gm/src/ptl_gm_priv.c @@ -0,0 +1,302 @@ + +/* + * $HEADER$ + */ +#include +#include +#include +#include +#include +#include +/*#include */ +#include +#include + +#include "include/types.h" +#include "mca/pml/base/pml_base_sendreq.h" +#include "mca/ns/base/base.h" +#include "ptl_gm.h" +#include "ptl_gm_addr.h" +#include "ptl_gm_peer.h" +#include "ptl_gm_proc.h" +#include "ptl_gm_sendfrag.h" +#include "ptl_gm_priv.h" + +int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, + mca_ptl_gm_send_frag_t *fragment, + struct mca_pml_base_send_request_t *sendreq, + size_t offset, + size_t *size, + int flags) +{ + struct iovec outvec[1]; + size_t size_in,size_out; + int header_length; + mca_ptl_base_frag_header_t* header; + + header = (mca_ptl_base_frag_header_t*)fragment->send_buf; + header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size; + + size_in = *size; + + outvec[0].iov_base = (char*)fragment->send_buf; + if( (size_in + header_length) < GM_SEND_BUF_SIZE ) outvec[0].iov_len = size_in; + else outvec[0].iov_len = GM_SEND_BUF_SIZE; + + /*header_length = sizeof(mca_ptl_base_frag_header_t);*/ + + /* copy the header in the buffer */ + /**header = fragment->send_frag.frag_base.frag_header;*/ + + if(size_in > 0) { + ompi_convertor_t *convertor; + int rc; + + /* first fragment (eager send) and first fragment of long protocol + * can use the convertor initialized on the request, remaining + * fragments + * must copy/reinit the convertor as the transfer could be in + * parallel. + */ + if( offset <= mca_ptl_gm_module.super.ptl_first_frag_size ) { + convertor = &sendreq->req_convertor; + } else { + convertor = &(fragment->send_frag.frag_base.frag_convertor); + ompi_convertor_copy(&sendreq->req_convertor, convertor); + ompi_convertor_init_for_send( + convertor, + 0, + sendreq->req_base.req_datatype, + sendreq->req_base.req_count, + sendreq->req_base.req_addr, + offset); + } + + /* if data is contigous convertor will return an offset + * into users buffer - otherwise will return an allocated buffer + * that holds the packed data + */ + + /*XXX: need to add the header */ + + /*copy the data to the registered buffer*/ + outvec[0].iov_base = ((char*)fragment->send_buf) + header_length; + outvec[0].iov_len -= header_length; /*XXXcheck this */ + + if((rc = ompi_convertor_pack(convertor, &(outvec[0]), 1)) < 0) + return OMPI_ERROR; + } + /* update the fields */ + outvec[0].iov_len += header_length; + outvec[0].iov_base = fragment->send_buf; + /* adjust size and request offset to reflect actual number of bytes + * packed by convertor */ + size_out = outvec[0].iov_len; + + /* initiate the gm send */ + gm_send_with_callback(ptl_peer->peer_ptl->my_port, fragment->send_buf, + GM_SIZE,size_out, GM_LOW_PRIORITY, ptl_peer->local_id, + ptl_peer->port_number,send_callback, (void *)fragment ); + + fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super; + fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer; + fragment->send_frag.frag_base.frag_addr = outvec[0].iov_base; + fragment->send_frag.frag_base.frag_size = size_out; /*XXX: should this be size_out */ + + return OMPI_SUCCESS; +} + + +void send_callback(struct gm_port *port,void * context, gm_status_t status) +{ + #if 1 + mca_ptl_gm_module_t *ptl; + mca_ptl_gm_send_frag_t *frag; + ompi_list_t *list; + int bytes; + mca_pml_base_send_request_t *gm_send_req; + frag = (mca_ptl_gm_send_frag_t *)context; + ptl = (mca_ptl_gm_module_t *)frag->ptl; + gm_send_req = frag->req; + switch (status) + { + case GM_SUCCESS: + /* send completed, can reuse the user buffer */ + ptl->num_send_tokens++; + ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, + gm_send_req,bytes); + /*ompi_list_append(&(ptl->gm_send_frags_queue),(ompi_list_item_t + * *)frag); */ + list = (ompi_list_t *)(&(ptl->gm_send_frags_queue)); + ompi_list_remove_first(list); + break; + + case GM_SEND_TIMED_OUT: + /* need to take care of retransmission */ + break; + + case GM_SEND_DROPPED: + /* need to handle this case */ + break; + + default: + ompi_output(0, + "[%s:%d] error in message completion\n",__FILE__,__LINE__); + + break; + + } +#endif +} + +void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, + mca_ptl_base_header_t * header) +{ + + #if 1 + mca_ptl_gm_send_frag_t * frag; + mca_pml_base_send_request_t *req; + + frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval; + req = (mca_pml_base_send_request_t *) frag->req; + req->req_peer_match = header->hdr_ack.hdr_dst_match; + + /* return the send fragment to the free list */ + + #endif + +} + + + +void ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl, + gm_recv_event_t* event ) +{ + #if 1 + mca_ptl_gm_recv_frag_t * recv_frag; + bool matched; + mca_ptl_base_header_t *header; + + header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message); + + recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl ); + /* allocate a receive fragment */ + + recv_frag->frag_recv.frag_base.frag_peer = NULL; + recv_frag->frag_recv.frag_request = NULL; + recv_frag->frag_recv.frag_is_buffered = false; + recv_frag->frag_hdr_cnt = 0; + recv_frag->frag_msg_cnt = 0; + recv_frag->frag_ack_pending = false; + recv_frag->frag_progressed = 0; + + recv_frag->frag_recv.frag_base.frag_header = *header; + + recv_frag->frag_recv.frag_base.frag_addr = + (char *)header + sizeof(mca_ptl_base_header_t); + recv_frag->frag_recv.frag_base.frag_size = gm_ntohl(event->recv.length); + /* header->hdr_frag.hdr_frag_length; */ + + matched = mca_ptl_base_match( + &recv_frag->frag_recv.frag_base.frag_header.hdr_match, + &(recv_frag->frag_recv), + NULL ); + + + if (!matched) + { + + ompi_output(0,"matching receive not yet posted\n"); + } + /**/ + #endif + +} + + + +int ptl_gm_handle_recv(mca_ptl_gm_module_t *ptl, gm_recv_event_t* event ) +{ + + #if 1 + /*int matched;*/ + mca_ptl_base_header_t *header; + + header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message); + + switch(header->hdr_common.hdr_type) + { + case MCA_PTL_HDR_TYPE_MATCH: + case MCA_PTL_HDR_TYPE_FRAG: + ptl_gm_data_frag( ptl, event ); + break; + + case MCA_PTL_HDR_TYPE_ACK: + case MCA_PTL_HDR_TYPE_NACK: + ptl_gm_ctrl_frag(ptl,header); + break; + default: + ompi_output(0,"[%s:%d] unexpected frag type %d\n", + __FILE__,__LINE__,header->hdr_common.hdr_type); + break; + } + #endif + return OMPI_SUCCESS; + +} + + +int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp) +{ +/*#if 0*/ + int i,rc; + int num_ptls; + gm_recv_event_t *event; + void * mesg; + mca_ptl_gm_module_t *ptl; + mca_ptl_gm_recv_frag_t * frag; + ompi_list_item_t* item; + num_ptls = gm_comp->gm_num_ptl_modules; + + for (i=0; i< num_ptls; i++) + { + ptl = gm_comp->gm_ptl_modules[i]; + + { + event = gm_receive(ptl->my_port); + + switch (gm_ntohc(event->recv.type)) + { + case GM_RECV_EVENT: + case GM_HIGH_RECV_EVENT: + case GM_PEER_RECV_EVENT: + case GM_HIGH_PEER_RECV_EVENT: + mesg = gm_ntohp(event->recv.message); + ptl_gm_handle_recv( ptl, event ); + + /* post a replacement buffer */ /*XXX: do this after frag done*/ + OMPI_FREE_LIST_GET( &(ptl->gm_recv_frags), item, rc ); + if(rc != OMPI_SUCCESS) + ompi_output(0,"unable to allocate a buffer\n"); + frag = (mca_ptl_gm_recv_frag_t*)item; + /*frag =(mca_ptl_gm_recv_frag_t *) st_remove_first (*/ + /*&ptl->gm_recv_frags);*/ + gm_provide_receive_buffer(ptl->my_port, + frag->alloc_recv_buffer, + GM_SIZE, + GM_LOW_PRIORITY + ); + case GM_NO_RECV_EVENT: + break; + + default: + gm_unknown(ptl->my_port, event); + + } + } + } + return 0; +/* #endif*/ +} + + diff --git a/src/mca/ptl/gm/src/ptl_gm_priv.h b/src/mca/ptl/gm/src/ptl_gm_priv.h index ab71070178..d316ec89e1 100644 --- a/src/mca/ptl/gm/src/ptl_gm_priv.h +++ b/src/mca/ptl/gm/src/ptl_gm_priv.h @@ -5,11 +5,38 @@ #include "event/event.h" #include "mca/pml/pml.h" #include "mca/ptl/ptl.h" +#include "ptl_gm_peer.h" +#include "ptl_gm_sendfrag.h" #include "gm.h" /* maintain list of registered buffers for send and receive */ -struct reg_buf { - void *start; /* pointer to registered memory */ - int length; -}; +/*struct reg_buf {*/ + /*void *start; pointer to registered memory */ + /*int length;*/ +/*};*/ + + +void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl, + mca_ptl_base_header_t * header); + +void ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl, + gm_recv_event_t* event ); + +int ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_event_t* event ); + +int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp); + +int +mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer, + mca_ptl_gm_send_frag_t *fragment, + struct mca_pml_base_send_request_t *sendreq, + size_t offset, + size_t *size, + int flags); + + + +void send_callback(struct gm_port *port,void * context, gm_status_t +status); + diff --git a/src/mca/ptl/gm/src/ptl_gm_proc.c b/src/mca/ptl/gm/src/ptl_gm_proc.c index 7786eaf624..ce2d9f9a79 100644 --- a/src/mca/ptl/gm/src/ptl_gm_proc.c +++ b/src/mca/ptl/gm/src/ptl_gm_proc.c @@ -83,10 +83,18 @@ mca_ptl_gm_proc_create (mca_ptl_gm_module_t * ptl, ompi_proc_t * ompi_proc) int i; mca_ptl_gm_proc_t *ptl_proc; + ptl_proc = mca_ptl_gm_proc_lookup_ompi (ompi_proc); + if (ptl_proc != NULL) + { + return ptl_proc; + } + + /* only gm ptl opened */ ptl_proc = OBJ_NEW (mca_ptl_gm_proc_t); ptl_proc->proc_ompi = ompi_proc; + /* Extract exposed addresses from remote proc */ rc = mca_base_modex_recv (&mca_ptl_gm_component.super.ptlm_version, ompi_proc, (void **) &ptl_proc->proc_addrs, @@ -107,18 +115,6 @@ mca_ptl_gm_proc_create (mca_ptl_gm_module_t * ptl, ompi_proc_t * ompi_proc) } ptl_proc->proc_addr_count = size / sizeof (mca_ptl_gm_addr_t); - for (i = 0; i < ptl_proc->proc_addr_count; i++) { - /*convert from global id to local id */ - if (GM_SUCCESS != - gm_global_id_to_node_id (ptl->my_port, - ptl_proc->proc_addrs[i].global_id, - ptl_proc->proc_addrs[i].local_id)) { - ompi_output (0, - "[%s:%d] error in converting global to local id \n", - __FILE__, __LINE__); - } - } - /* allocate space for peer array - one for each exported address */ ptl_proc->peer_arr = (mca_ptl_gm_peer_t **) @@ -131,6 +127,10 @@ mca_ptl_gm_proc_create (mca_ptl_gm_module_t * ptl, ompi_proc_t * ompi_proc) return NULL; } + if(NULL == mca_ptl_gm_component.gm_local && ompi_proc == +ompi_proc_local() ) + mca_ptl_gm_component.gm_local = ptl_proc; + return ptl_proc; } diff --git a/src/mca/ptl/gm/src/ptl_gm_proc.h b/src/mca/ptl/gm/src/ptl_gm_proc.h index 644a6cdc14..1ebb548df0 100644 --- a/src/mca/ptl/gm/src/ptl_gm_proc.h +++ b/src/mca/ptl/gm/src/ptl_gm_proc.h @@ -24,6 +24,7 @@ struct mca_ptl_gm_proc_t { size_t proc_peer_count; size_t proc_addr_count; struct mca_ptl_gm_peer_t **peer_arr; + ompi_process_name_t proc_guid; }; typedef struct mca_ptl_gm_proc_t mca_ptl_gm_proc_t; diff --git a/src/mca/ptl/gm/src/ptl_gm_req.h b/src/mca/ptl/gm/src/ptl_gm_req.h index 276ed9f5f6..476936debc 100644 --- a/src/mca/ptl/gm/src/ptl_gm_req.h +++ b/src/mca/ptl/gm/src/ptl_gm_req.h @@ -21,7 +21,7 @@ OBJ_CLASS_DECLARATION (mca_ptl_gm_send_request_t); struct mca_ptl_gm_send_request_t { mca_pml_base_send_request_t super; /* add stuff here */ - mca_ptl_gm_send_frag_t req_frag; + mca_ptl_gm_send_frag_t *req_frag; }; typedef struct mca_ptl_gm_send_request_t mca_ptl_gm_send_request_t; diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.c b/src/mca/ptl/gm/src/ptl_gm_sendfrag.c index a76ecc8673..5987eca1a6 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.c +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.c @@ -7,10 +7,15 @@ #include "include/types.h" #include "datatype/datatype.h" #include "mca/pml/base/pml_base_sendreq.h" +#include "mca/pml/base/pml_base_recvreq.h" +#include "mca/ptl/base/ptl_base_sendfrag.h" +#include "mca/ptl/base/ptl_base_recvfrag.h" #include "ptl_gm.h" #include "ptl_gm_peer.h" #include "ptl_gm_proc.h" #include "ptl_gm_sendfrag.h" +#include "ptl_gm_priv.h" + #define frag_header super.super.frag_header #define frag_owner super.super.frag_owner @@ -21,6 +26,8 @@ static void mca_ptl_gm_send_frag_construct (mca_ptl_gm_send_frag_t * frag); static void mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t * frag); +static void mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t * frag); +static void mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t * frag); ompi_class_t mca_ptl_gm_send_frag_t_class = { "mca_ptl_gm_send_frag_t", @@ -45,53 +52,140 @@ mca_ptl_gm_send_frag_destruct (mca_ptl_gm_send_frag_t * frag) } + + +/*XXX : take care of multi threading*/ + +mca_ptl_gm_send_frag_t * +mca_ptl_gm_alloc_send_frag(struct mca_ptl_base_module_t *ptl, + struct mca_pml_base_send_request_t * sendreq) +{ + + ompi_free_list_t *flist; + ompi_list_item_t *item; + mca_ptl_gm_send_frag_t *frag; + mca_ptl_tstamp_t tstamp = 0; + + flist =&( ((mca_ptl_gm_module_t *)ptl)->gm_send_frags ); + item = ompi_list_remove_first(&((flist)->super)); + + while(NULL == item) + { + ptl->ptl_component->ptlm_progress(tstamp); + item = ompi_list_remove_first (&((flist)->super)); + } + + frag = (mca_ptl_gm_send_frag_t *)item; + frag->req = (struct mca_pml_base_send_request_t *)sendreq; + frag->type = 0 ;/* XXX: should be EAGER_SEND; */ + return frag; + +} + + +int mca_ptl_gm_send_frag_init( + mca_ptl_gm_send_frag_t* sendfrag, + mca_ptl_gm_peer_t * ptl_peer, + mca_pml_base_send_request_t * sendreq, + size_t offset, + size_t* size, + int flags) + +{ + int header_length; + mca_ptl_base_header_t *hdr; + void *buffer; + buffer = sendfrag->send_buf; + + hdr = (mca_ptl_base_header_t *)sendfrag->send_buf; + if (offset == 0) { + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + hdr->hdr_frag.hdr_dst_ptr.lval = 0; + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* pointer to the frag */ + hdr->hdr_frag.hdr_dst_ptr.lval = 0; + + hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; + hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; + hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer; + hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag; + hdr->hdr_match.hdr_msg_length= sendreq->req_bytes_packed; + hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; + header_length = sizeof (mca_ptl_base_match_header_t); + } else { + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_frag_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + hdr->hdr_frag.hdr_src_ptr.lval = 0; + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; + hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; + header_length = sizeof (mca_ptl_base_frag_header_t); + } + + /*initialize convertor */ + +#if 0 + /*fragment state*/ + sendfrag->frag_base.frag_owner = &ptl_peer->peer_ptl->super; + sendfrag->frag_base.frag_peer = ptl_peer; + sendfrag->frag_base.frag_addr = NULL; + sendfrag->frag_base.frag_size = *size; +#endif + + return OMPI_SUCCESS; +} + +ompi_class_t mca_ptl_gm_recv_frag_t_class = { + "mca_ptl_gm_recv_frag_t", + OBJ_CLASS (mca_ptl_base_recv_frag_t), + (ompi_construct_t) mca_ptl_gm_recv_frag_construct, + (ompi_construct_t) mca_ptl_gm_recv_frag_destruct +}; + /* -static void send_callback(struct gm_port *, gm_status) + * recv fragment constructor/destructors. + */ + +static void +mca_ptl_gm_recv_frag_construct (mca_ptl_gm_recv_frag_t * frag) { - gm_status_t status; + frag->frag_hdr_cnt = 0; + frag->frag_msg_cnt = 0; - switch (status) - { - case GM_SUCCESS: - - break; - - case GM_SEND_TIMED_OUT: - break; - - case GM_SEND_DROPPED: - break; - - default: - break; - - } } - - - -static void put_callback(struct gm_port *, gm_status) +static void +mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t *frag) { +; + +} + +mca_ptl_gm_recv_frag_t * +mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl) +{ + + ompi_free_list_t *flist; + ompi_list_item_t *item; + mca_ptl_gm_recv_frag_t *frag; + mca_ptl_tstamp_t tstamp = 0; + + flist =&( ((mca_ptl_gm_module_t *)ptl)->gm_recv_frags_free); + item = ompi_list_remove_first(&((flist)->super)); + + while(NULL == item) + { + ptl->ptl_component->ptlm_progress(tstamp); + item = ompi_list_remove_first (&((flist)->super)); + } + + frag = (mca_ptl_gm_recv_frag_t *)item; + return frag; - gm_status_t status; - - - switch (status) - { - case GM_SUCCESS: - break; - - case GM_SEND_TIMED_OUT: - break; - - case GM_SEND_DROPPED: - break; - - default: - break; - - } } -*/ diff --git a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h index bbe557a7e7..4d0d67e7f1 100644 --- a/src/mca/ptl/gm/src/ptl_gm_sendfrag.h +++ b/src/mca/ptl/gm/src/ptl_gm_sendfrag.h @@ -13,26 +13,67 @@ #include "os/atomic.h" #include "ompi_config.h" #include "mca/pml/base/pml_base_sendreq.h" +#include "mca/pml/base/pml_base_recvreq.h" #include "mca/ptl/base/ptl_base_sendfrag.h" +#include "mca/ptl/base/ptl_base_recvfrag.h" #include "ptl_gm.h" -#include "ptl_gm_priv.h" +/*#include "ptl_gm_priv.h"*/ +#include "ptl_gm_peer.h" OBJ_CLASS_DECLARATION (mca_ptl_gm_send_frag_t); +OBJ_CLASS_DECLARATION (mca_ptl_gm_recv_frag_t); -struct mca_ptl_base_peer_t; +/*struct mca_ptl_base_peer_t;*/ /** * GM send fragment derived type. */ struct mca_ptl_gm_send_frag_t { - mca_ptl_base_send_frag_t super; /**< base send fragment descriptor */ - struct reg_buf *sbuf; + mca_ptl_base_send_frag_t send_frag; /**< base send fragment descriptor */ + void * send_buf; + mca_pml_base_send_request_t *req; + mca_ptl_gm_module_t *ptl; + /*mca_ptl_gm_peer_t *peer;*/ + int status; + int type; }; typedef struct mca_ptl_gm_send_frag_t mca_ptl_gm_send_frag_t; -#define MCA_PTL_GM_SEND_FRAG_ALLOC(item, rc) \ - OMPI_FREE_LIST_GET(&mca_ptl_gm_module.gm_send_frags, item, rc); +/*#define MCA_PTL_GM_SEND_FRAG_ALLOC(item, rc) \*/ + /*OMPI_FREE_LIST_GET(&mca_ptl_gm_module.gm_send_frags, item, rc);*/ +struct mca_ptl_gm_recv_frag_t { + mca_ptl_base_recv_frag_t frag_recv; + size_t frag_hdr_cnt; + size_t frag_msg_cnt; + volatile int frag_progressed; + bool frag_ack_pending; + void *alloc_recv_buffer; + void *unex_recv_buffer; +}; + +typedef struct mca_ptl_gm_recv_frag_t mca_ptl_gm_recv_frag_t; + + + +mca_ptl_gm_send_frag_t * +mca_ptl_gm_alloc_send_frag ( struct mca_ptl_base_module_t *ptl, + struct mca_pml_base_send_request_t *sendreq); + + +int + mca_ptl_gm_send_frag_init( mca_ptl_gm_send_frag_t* sendfrag, + mca_ptl_gm_peer_t * ptl_peer, + mca_pml_base_send_request_t * sendreq, + size_t offset, + size_t* size, + int flags); + + + +mca_ptl_gm_recv_frag_t * +mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl); + #endif