diff --git a/src/mca/mpi/pml/teg/src/pml_teg.c b/src/mca/mpi/pml/teg/src/pml_teg.c index 6f43c43067..44134603bb 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg.c +++ b/src/mca/mpi/pml/teg/src/pml_teg.c @@ -1,17 +1,22 @@ -#include "mca/mpi/pml/pml.h" -#include "mca/mpi/pml/teg/teg.h" +#include "mca/mpi/pml/base/pml_base_sendreq.h" +#include "mca/mpi/pml/base/pml_base_recvreq.h" +#include "pml_teg.h" + +#define mca_teg_param_register(n,v) \ + mca_base_param_lookup_int( \ + mca_base_param_register_int("pml","teg",n,0,v)) mca_pml_base_module_1_0_0_t mca_pml_teg_module = { /* First, the mca_base_module_t struct containing meta information about the module itself */ - + { /* Indicate that we are a pml v1.0.0 module (which also implies a specific MCA version) */ - + MCA_PML_BASE_VERSION_1_0_0, - + "teg", /* MCA module name */ 1, /* MCA module major version */ 0, /* MCA module minor version */ @@ -19,29 +24,46 @@ mca_pml_base_module_1_0_0_t mca_pml_teg_module = { mca_pml_teg_open, /* module open */ mca_pml_teg_close /* module close */ }, - + /* Next the MCA v1.0.0 module meta data */ - + { /* Whether the module is checkpointable or not */ - + false }, - + mca_pml_teg_query, /* module query */ mca_pml_teg_init /* module init */ }; - + mca_pml_teg_1_0_0_t mca_pml_teg = { { mca_pml_teg_addprocs, mca_pml_teg_isend, mca_pml_teg_progress - }, + } }; +int mca_pml_teg_open(lam_cmd_line_t* cmd_line) +{ + mca_pml_teg.teg_send_free_list_min_pages = + mca_pml_teg_register_int("send_free_list_min_pages", 10); + mca_pml_teg.teg_send_free_list_max_pages = + mca_pml_teg_register_int("send_free_list_max_pages", -1); + mca_pml_teg.teg_send_free_list_inc_pages = + mca_pml_teg_register_int("send_free_list_inc_pages", 1); + mca_pml_teg.teg_recv_free_list_min_pages = + mca_pml_teg_register_int("recv_free_list_min_pages", 10); + mca_pml_teg.teg_recv_free_list_max_pages = + mca_pml_teg_register_int("recv_free_list_max_pages", -1); + mca_pml_teg.teg_recv_free_list_inc_pages = + mca_pml_teg_register_int("recv_free_list_inc_pages", 1); + return LAM_SUCCESS; +} + mca_pml_1_0_0_t* mca_pml_teg_init( struct lam_proc_t **procs, @@ -49,7 +71,33 @@ mca_pml_1_0_0_t* mca_pml_teg_init( int *max_tag, int *max_cid) { - lam_frl_init(&mca_pml_teg.teg_send_requests); - lam_frl_init(&mca_pml_teg.teg_recv_requests); + int rc; + lam_free_list_init(&mca_pml_teg.teg_send_free_list); + if((rc = lam_free_list_init_with( + &mca_pml_teg.teg_send_free_list, + sizeof(mca_pml_base_send_request_t), + mca_pml_teg.teg_send_free_list_min_pages, + mca_pml_teg.teg_send_free_list_max_pages, + mca_pml_teg.teg_send_free_list_inc_pages, + NULL)) != LAM_SUCCESS) + { + return NULL; + } + + lam_free_list_init(&mca_pml_teg.teg_recv_free_list); + if((rc = lam_free_list_init_with( + &mca_pml_teg.teg_recv_free_list, + sizeof(mca_pml_base_recv_request_t), + mca_pml_teg.teg_recv_free_list_min_pages, + mca_pml_teg.teg_recv_free_list_max_pages, + mca_pml_teg.teg_recv_free_list_inc_pages, + NULL)) != LAM_SUCCESS) + { + return NULL; + } + + lam_free_list_init(&mca_pml_teg.teg_incomplete_sends); return &mca_pml_teg.super; } + + diff --git a/src/mca/mpi/pml/teg/src/pml_teg.h b/src/mca/mpi/pml/teg/src/pml_teg.h index 2afbae8398..2bd22da444 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg.h +++ b/src/mca/mpi/pml/teg/src/pml_teg.h @@ -52,11 +52,27 @@ extern mca_pml_1_0_0_t* mca_pml_teg_init( struct mca_pml_teg_1_0_0_t { mca_pml_1_0_0_t super; - lam_free_list_t teg_send_requests; - lam_free_list_t teg_recv_requests; + + /* send request free list and parameters */ + lam_free_list_t teg_send_free_list; + int teg_send_free_list_min_pages; + int teg_send_free_list_max_pages; + int teg_send_free_list_inc_pages; + + /* recv request free list and parameters */ + lam_free_list_t teg_recv_free_list; + int teg_recv_free_list_min_pages; + int teg_recv_free_list_max_pages; + int teg_recv_free_list_inc_pages; + + /* incomplete posted sends */ + lam_list_t teg_incomplete_sends; + lam_mutex_t teg_lock; }; typedef struct mca_pml_teg_1_0_0_t mca_pml_teg_1_0_0_t; +extern mca_pml_teg_1_0_0_t mca_pml_teg; + /* * PML interface functions. */ diff --git a/src/mca/mpi/pml/teg/src/pml_teg_isend.c b/src/mca/mpi/pml/teg/src/pml_teg_isend.c new file mode 100644 index 0000000000..0b673388fd --- /dev/null +++ b/src/mca/mpi/pml/teg/src/pml_teg_isend.c @@ -0,0 +1,42 @@ +/* + * $HEADER$ + */ + +#include "mca/mpi/pml/base/pml_base_sendreq.h" +#include "pml_teg.h" + + +int mca_pml_teg_isend( + void *buf, + size_t size, + lam_datatype_t *datatype, + int dest, + int tag, + lam_communicator_t* comm, + mca_pml_base_request_type_t req_type, + lam_request_t **request) +{ + int rc; + mca_pml_base_send_request_t* sendreq = + (mca_pml_base_send_request_t*)lam_free_list_get(&mca_pml_teg.teg_send_free_list, &rc); + if(sendreq == 0) + return rc; + + mca_pml_base_send_request_rinit( + sendreq, + buf, + size, + datatype, + dest, + tag, + comm, + req_type, + false + ); + + if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS) + return rc; + *request = (lam_request_t*)sendreq; + return LAM_SUCCESS; +} + diff --git a/src/mca/mpi/pml/teg/src/pml_teg_proc.c b/src/mca/mpi/pml/teg/src/pml_teg_proc.c index eafdae9025..ca760ba838 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_proc.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_proc.c @@ -2,9 +2,9 @@ * $HEADER$ */ -#include "mca/mpi/pml/teg/proc.h" #include "lam/atomic.h" -#include "mca/mpi/pml/teg/ptl_array.h" +#include "pml_teg_proc.h" +#include "pml_teg_ptl_array.h" lam_class_info_t mca_pml_teg_proc_cls = { "mca_pml_teg_proc_t", diff --git a/src/mca/mpi/pml/teg/src/pml_teg_proc.h b/src/mca/mpi/pml/teg/src/pml_teg_proc.h index c3f02509c7..6000b29347 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_proc.h +++ b/src/mca/mpi/pml/teg/src/pml_teg_proc.h @@ -6,7 +6,7 @@ #define MCA_PML_PROC_H #include "mpi/proc/proc.h" -#include "mca/mpi/pml/teg/ptl_array.h" +#include "pml_teg_ptl_array.h" extern lam_class_info_t mca_pml_teg_proc_cls; diff --git a/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c new file mode 100644 index 0000000000..367348a5ec --- /dev/null +++ b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c @@ -0,0 +1,137 @@ +/* + * $HEADER$ + */ +/*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ + +#include "lam/constants.h" +#include "mca/mpi/ptl/ptl.h" +#include "mca/mpi/pml/base/pml_base_sendreq.h" + + +/* + * Start a send request by calling the scheduler to stripe + * the message across available PTLs. If the scheduler cannot + * fragment the entire message due to resource constraints, + * queue for later delivery. + */ + +int mca_pml_teg_send_request_start(lam_p2p_send_request_t* req) +{ + bool complete; + int rc = mca_pml_teg_send_request_schedule(req, &complete); + if(rc != LAM_SUCCESS) + return rc; + if(complete == false) { + THREAD_LOCK(&mca_pml_teg.teg_lock); + lam_list_append(&mca_pml_teg.teg_incomplete_sends, (lam_list_item_t*)req); + THREAD_UNLOCK(&mca_pml_teg.teg_lock); + } + return LAM_SUCCESS; +} + + +/* + * Schedule message delivery across potentially multiple PTLs. Use + * two seperate PTL pools. One for the first fragment (low latency) + * and a second for the remaining fragments (high bandwidth). Note that + * the same PTL could exist in both pools. + * + * If the first fragment cannot be scheduled due to a resource constraint, + * the entire message is queued for later delivery by the progress engine. + * Likewise, if the entire message cannot be fragmented, the message will + * be queued and the remainder of the message scheduled/fragmented + * by the progress engine. + * + */ + +int mca_pml_teg_send_request_schedule(lam_p2p_send_request_t* req, bool_t* complete) +{ + lam_proc_t *proc = lam_comm_get_remote_proc( + req->super.req_communicator, req->super.req_peer); + + /* allocate first fragment, if the first PTL in the list + * cannot allocate resources for the fragment, try the next + * available. + */ + if(req->req_frags_allocated == 0) { + size_t num_ptl_avail = proc->proc_ptl_first.ptl_size; + size_t i; + for(i = 0; i < num_ptl_avail; i++) { + lam_p2p_ptl_info_t* ptl_info = lam_p2p_ptl_array_get_next(&proc->proc_ptl_first); + lam_p2p_ptl_t* ptl = ptl_info->ptl; + int rc = ptl->ptl_fragment(ptl, req, ptl->ptl_frag_first_size); + if (rc == LAM_SUCCESS) + break; + else if (rc != LAM_ERR_TEMP_OUT_OF_RESOURCE) + return rc; + } + /* has first fragment been allocated? */ + if(req->p2ps_frags_allocated == 0) { + *complete = false; + return LAM_SUCCESS; + } + } + + /* allocate remaining bytes to PTLs */ + size_t bytes_remaining = req->req_length - req->req_bytes_fragmented; + size_t num_ptl_avail = proc->proc_ptl_next.ptl_size; + size_t num_ptl = 0; + while(bytes_remaining > 0 && num_ptl++ < num_ptl_avail) { + lam_p2p_ptl_info_t* ptl_info = lam_p2p_ptl_array_get_next(&proc->proc_ptl_next); + lam_p2p_ptl_t* ptl = ptl_info->ptl; + + /* if this is the last PTL that is available to use, or the number of + * bytes remaining in the message is less than the PTLs minimum fragment + * size, then go ahead and give the rest of the message to this PTL. + */ + size_t bytes_to_frag; + if(num_ptl == num_ptl_avail || bytes_remaining < ptl->ptl_frag_min_size) + bytes_to_frag = bytes_remaining; + + /* otherwise attempt to give the PTL a percentage of the message + * based on a weighting factor. for simplicity calculate this as + * a percentage of the overall message length (regardless of amount + * previously assigned) + */ + else { + bytes_to_frag = ptl_info->ptl_weight * req->p2ps_length; + if(bytes_to_frag > bytes_remaining) + bytes_to_frag = bytes_remaining; + } + + int rc = ptl->ptl_fragment(ptl, req, bytes_to_frag); + if(rc != LAM_SUCCESS && rc != LAM_ERR_TEMP_OUT_OF_RESOURCE) + return rc; + bytes_remaining = req->p2ps_length = req->p2ps_bytes_fragmented; + } + *complete = (req->p2ps_length == req->p2ps_bytes_fragmented); + return LAM_SUCCESS; +} + + +/* + * Check for queued messages that need to be scheduled. + * + */ + +void mca_pml_teg_send_request_push() +{ + mca_pml_base_send_request_t *req; + for(req = (mca_pml_base_send_request_t*)lam_list_get_first(&mca_pml_teg.teg_incomplete_sends); + req != (mca_pml_base_send_request_t*)lam_list_get_end(&mca_pml_teg.teg_incomplete_sends); + req = (mca_pml_base_send_request_t*)lam_list_get_next(req)) { + + bool complete; + int rc = mca_pml_teg_send_request_schedule(req, &complete); + if(rc != LAM_SUCCESS) { + // FIX + exit(-1); + } + if(complete) { + req = (mca_pml_base_send_request_t*)lam_dbl_remove( + &lam_p2p_sends_incomplete, (lam_list_item_t*)req); + } + } +} + + diff --git a/src/mca/mpi/pml/teg/src/pml_teg_sendreq.h b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.h new file mode 100644 index 0000000000..8c29a89ec8 --- /dev/null +++ b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.h @@ -0,0 +1,16 @@ +/* + * $HEADER$ + */ + +#ifndef LAM_PML_TEG_SEND_REQUEST_H +#define LAM_PML_TEG_SEND_REQUEST_H + +#include "mca/pml/base/pml_base_sendreq.h" + +int mca_pml_teg_send_request_start(mca_pml_base_send_request_t* req); +int mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req, bool_t* complete) +int mca_pml_teg_send_request_progress(); + + +#endif +