diff --git a/src/lam/event/epoll.c b/src/lam/event/epoll.c index ae82ba3a0a..fa6cd6aac7 100644 --- a/src/lam/event/epoll.c +++ b/src/lam/event/epoll.c @@ -55,9 +55,11 @@ #include "event.h" #include "evsignal.h" +#include "lam/threads/mutex.h" extern struct lam_event_list lam_eventqueue; extern volatile sig_atomic_t lam_evsignal_caught; +extern lam_mutex_t lam_event_mutex; /* due to limitations in the epoll interface, we need to keep track of * all file descriptors outself. @@ -174,7 +176,9 @@ epoll_dispatch(void *arg, struct timeval *tv) return (-1); timeout = tv->tv_sec * 1000 + tv->tv_usec / 1000; + lam_mutex_unlock(&lam_event_lock); res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); + lam_mutex_lock(&lam_event_lock); if (lam_evsignal_recalc(&epollop->evsigmask) == -1) return (-1); @@ -218,15 +222,15 @@ epoll_dispatch(void *arg, struct timeval *tv) continue; if (evread != NULL && !(evread->ev_events & EV_PERSIST)) - event_del(evread); + lam_event_del(evread); if (evwrite != NULL && evwrite != evread && !(evwrite->ev_events & EV_PERSIST)) - event_del(evwrite); + lam_event_del(evwrite); if (evread != NULL) - event_active(evread, EV_READ, 1); + lam_event_active(evread, EV_READ, 1); if (evwrite != NULL) - event_active(evwrite, EV_WRITE, 1); + lam_event_active(evwrite, EV_WRITE, 1); } return (0); diff --git a/src/mca/mpi/pml/base/pml_base_request.h b/src/mca/mpi/pml/base/pml_base_request.h index 3cc6f99bce..c9c39a3c05 100644 --- a/src/mca/mpi/pml/base/pml_base_request.h +++ b/src/mca/mpi/pml/base/pml_base_request.h @@ -47,6 +47,8 @@ struct mca_pml_base_request_t { volatile bool req_mpi_done; /* flag indicating if the pt-2-pt layer is done with this request */ volatile bool req_pml_done; + /* flag indicating if the user has freed this request */ + volatile bool req_free_called; }; typedef struct mca_pml_base_request_t mca_pml_base_request_t; diff --git a/src/mca/mpi/pml/pml.h b/src/mca/mpi/pml/pml.h index 5510e7aad6..3588416fb0 100644 --- a/src/mca/mpi/pml/pml.h +++ b/src/mca/mpi/pml/pml.h @@ -85,7 +85,6 @@ typedef int (*mca_pml_base_irecv_init_fn_t)( struct lam_datatype_t *datatype, int src, int tag, - bool persistent, struct lam_communicator_t* comm, struct lam_request_t **request ); @@ -117,7 +116,6 @@ typedef int (*mca_pml_base_isend_init_fn_t)( int dst, int tag, mca_pml_base_send_mode_t mode, - bool persistent, struct lam_communicator_t* comm, struct lam_request_t **request ); @@ -148,8 +146,8 @@ typedef int (*mca_pml_base_start_fn_t)( ); typedef int (*mca_pml_base_test_fn_t)( - lam_request_t* request, - bool *completed, + lam_request_t** request, + int *completed, lam_status_public_t* status ); diff --git a/src/mca/mpi/pml/teg/src/pml_teg.c b/src/mca/mpi/pml/teg/src/pml_teg.c index 7af51a8041..41970b7062 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg.c +++ b/src/mca/mpi/pml/teg/src/pml_teg.c @@ -276,8 +276,25 @@ int mca_pml_teg_module_fini(void) int mca_pml_teg_null(lam_request_t** request) { - *request = &mca_pml_teg.teg_null; + *request = (lam_request_t*)&mca_pml_teg.teg_null; return LAM_SUCCESS; } +void mca_pml_teg_request_return(mca_pml_base_request_t* request) +{ + switch(request->req_type) { + case MCA_PML_REQUEST_SEND: + { + mca_ptl_base_send_request_t* sendreq = (mca_ptl_base_send_request_t*)request; + sendreq->req_owner->ptl_request_return(sendreq->req_owner, sendreq); + break; + } + case MCA_PML_REQUEST_RECV: + { + mca_ptl_base_recv_request_t* recvreq = (mca_ptl_base_recv_request_t*)request; + lam_free_list_return(&mca_pml_teg.teg_recv_requests,(lam_list_item_t*)recvreq); + break; + } + } +} diff --git a/src/mca/mpi/pml/teg/src/pml_teg.h b/src/mca/mpi/pml/teg/src/pml_teg.h index 6da8c132af..ae0b6e64b0 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg.h +++ b/src/mca/mpi/pml/teg/src/pml_teg.h @@ -113,7 +113,6 @@ extern int mca_pml_teg_isend_init( int dst, int tag, mca_pml_base_send_mode_t mode, - bool persistent, struct lam_communicator_t* comm, struct lam_request_t **request ); @@ -145,7 +144,6 @@ extern int mca_pml_teg_irecv_init( struct lam_datatype_t *datatype, int src, int tag, - bool persistent, struct lam_communicator_t* comm, struct lam_request_t **request ); @@ -173,12 +171,12 @@ extern int mca_pml_teg_recv( extern int mca_pml_teg_progress(void); extern int mca_pml_teg_start( - lam_request_t* request + lam_request_t** request ); extern int mca_pml_teg_test( - lam_request_t* request, - bool *completed, + lam_request_t** request, + int *completed, lam_status_public_t* status ); @@ -191,5 +189,9 @@ extern int mca_pml_teg_null( lam_request_t** request ); +extern void mca_pml_teg_request_return( + mca_pml_base_request_t* +); + #endif diff --git a/src/mca/mpi/pml/teg/src/pml_teg_irecv.c b/src/mca/mpi/pml/teg/src/pml_teg_irecv.c index a536b1e223..554af139e0 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_irecv.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_irecv.c @@ -7,16 +7,10 @@ int mca_pml_teg_irecv_init( struct lam_datatype_t *datatype, int src, int tag, - bool persistent, struct lam_communicator_t* comm, struct lam_request_t **request) { int rc; - -#if 0 - lam_output(0, "mca_pml_teg_irecv_init: src=%d tag=%d comm=%d\n", src, tag, comm->c_contextid); -#endif - mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); if(NULL == recvreq) return rc; @@ -29,7 +23,7 @@ int mca_pml_teg_irecv_init( src, tag, comm, - persistent); + true); *request = (lam_request_t*)recvreq; return LAM_SUCCESS; diff --git a/src/mca/mpi/pml/teg/src/pml_teg_isend.c b/src/mca/mpi/pml/teg/src/pml_teg_isend.c index 49f1e6e0c4..c84424499f 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_isend.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_isend.c @@ -14,7 +14,6 @@ int mca_pml_teg_isend_init( int dst, int tag, mca_pml_base_send_mode_t sendmode, - bool persistent, lam_communicator_t* comm, lam_request_t **request) { @@ -33,7 +32,7 @@ int mca_pml_teg_isend_init( tag, comm, sendmode, - false + true ); *request = (lam_request_t*)sendreq; diff --git a/src/mca/mpi/pml/teg/src/pml_teg_recvreq.c b/src/mca/mpi/pml/teg/src/pml_teg_recvreq.c index 323b799743..c2a1733a5d 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_recvreq.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_recvreq.c @@ -10,6 +10,7 @@ void mca_pml_teg_recv_request_progress( req->req_bytes_recvd += frag->super.frag_size; if (req->req_bytes_recvd >= req->super.req_status._count) { req->super.req_mpi_done = true; + req->super.req_pml_done = true; if(mca_pml_teg.teg_request_waiting) { lam_condition_broadcast(&mca_pml_teg.teg_request_cond); } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c index 7b565551c0..2e28a14a8d 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c @@ -77,6 +77,7 @@ void mca_pml_teg_send_request_progress( req->req_bytes_sent += frag->super.frag_size; if (req->req_bytes_sent >= req->super.req_length) { req->super.req_mpi_done = true; + req->super.req_pml_done = true; if(mca_pml_teg.teg_request_waiting) { lam_condition_broadcast(&mca_pml_teg.teg_request_cond); } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_start.c b/src/mca/mpi/pml/teg/src/pml_teg_start.c index 92a1c871b0..bc4c396bd3 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_start.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_start.c @@ -3,13 +3,14 @@ #include "pml_teg_sendreq.h" -int mca_pml_teg_start(lam_request_t* request) +int mca_pml_teg_start(lam_request_t** request) { - switch(request->req_type) { + mca_pml_base_request_t *pml_request = *(mca_pml_base_request_t**)request; + switch(pml_request->req_type) { case MCA_PML_REQUEST_SEND: - return mca_pml_teg_send_request_start((mca_ptl_base_send_request_t*)request); + return mca_pml_teg_send_request_start((mca_ptl_base_send_request_t*)pml_request); case MCA_PML_REQUEST_RECV: - return mca_pml_teg_recv_request_start((mca_ptl_base_recv_request_t*)request); + return mca_pml_teg_recv_request_start((mca_ptl_base_recv_request_t*)pml_request); default: return LAM_ERROR; } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_test.c b/src/mca/mpi/pml/teg/src/pml_teg_test.c index 1ad29b5493..b41b866f89 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_test.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_test.c @@ -2,10 +2,20 @@ int mca_pml_teg_test( - lam_request_t* request, - bool *completed, + lam_request_t** request, + int *completed, lam_status_public_t* status) { - return LAM_ERROR; + mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)request; + if(pml_request->req_mpi_done) { + *completed = true; + mca_pml_teg_request_return(pml_request); + *request = NULL; + if (status != NULL) + *status = pml_request->req_status; + } else { + *completed = false; + } + return LAM_SUCCESS; } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_wait.c b/src/mca/mpi/pml/teg/src/pml_teg_wait.c index 1975fb01f3..022859637e 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_wait.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_wait.c @@ -11,7 +11,7 @@ int mca_pml_teg_wait( mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)request; if(pml_request->req_mpi_done == false) { - /* poll status - primarily for benchmarks */ + /* poll for completion - primarily for benchmarks */ int i; for(i=0; ireq_mpi_done == false; i++) ; /* do nothing */ @@ -26,6 +26,14 @@ int mca_pml_teg_wait( lam_mutex_unlock(&mca_pml_teg.teg_request_lock); } } + + /* return request to pool */ + if(pml_request->req_persistent == false) { + if(pml_request->req_pml_done == true) + mca_pml_teg_request_return(pml_request); + *request = NULL; + } + if (status != NULL) *status = pml_request->req_status; return LAM_SUCCESS; diff --git a/src/mca/mpi/ptl/base/ptl_base_recvreq.h b/src/mca/mpi/ptl/base/ptl_base_recvreq.h index 379b7b5bb4..dae649768a 100644 --- a/src/mca/mpi/ptl/base/ptl_base_recvreq.h +++ b/src/mca/mpi/ptl/base/ptl_base_recvreq.h @@ -48,6 +48,9 @@ static inline void mca_ptl_base_recv_request_init( request->super.req_persistent = persistent; request->super.req_mpi_done = false; request->super.req_pml_done = false; + request->super.req_free_called = false; + request->super.super.req_type = LAM_REQUEST_PML; } + #endif diff --git a/src/mca/mpi/ptl/base/ptl_base_sendreq.h b/src/mca/mpi/ptl/base/ptl_base_sendreq.h index 6b85cdd1fc..86bbc10e17 100644 --- a/src/mca/mpi/ptl/base/ptl_base_sendreq.h +++ b/src/mca/mpi/ptl/base/ptl_base_sendreq.h @@ -78,6 +78,8 @@ static inline void mca_ptl_base_send_request_init( request->super.req_persistent = persistent; request->super.req_mpi_done = false; request->super.req_pml_done = false; + request->super.req_free_called = false; + request->super.super.req_type = LAM_REQUEST_PML; } diff --git a/src/mpi/interface/c/bsend.c b/src/mpi/interface/c/bsend.c index 8fb33c102c..d6f63536b4 100644 --- a/src/mpi/interface/c/bsend.c +++ b/src/mpi/interface/c/bsend.c @@ -5,14 +5,44 @@ #include #include "mpi.h" +#include "mpi/runtime/runtime.h" #include "mpi/interface/c/bindings.h" +#include "mca/mpi/pml/pml.h" + #if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES #pragma weak MPI_Bsend = PMPI_Bsend #endif -int MPI_Bsend(void *buf, int count, MPI_Datatype datatype, - int dest, int tag, MPI_Comm comm) { - return MPI_SUCCESS; +int MPI_Bsend(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) +{ + if (dest == MPI_PROC_NULL) { + return MPI_SUCCESS; + } + + if ( MPI_PARAM_CHECK ) { + int rc = MPI_SUCCESS; + if (lam_mpi_finalized) { + rc = MPI_ERR_INTERN; + } else if (count < 0) { + rc = MPI_ERR_COUNT; +#if 0 + } else if (type == MPI_DATATYPE_NULL) { + rc = MPI_ERR_TYPE; +#endif + } else if (tag < 0 || tag > MPI_TAG_UB_VALUE) { + rc = MPI_ERR_TAG; + } else if (lam_comm_invalid(comm)) { + rc = MPI_ERR_COMM; + } else if (lam_comm_peer_invalid(comm, dest)) { + rc = MPI_ERR_RANK; + } + if (rc != MPI_SUCCESS) { + return rc; + } + } + + return mca_pml.pml_send(buf, count, type, dest, tag, MCA_PML_BASE_SEND_BUFFERED, comm); } + diff --git a/src/mpi/interface/c/rsend.c b/src/mpi/interface/c/rsend.c index e59c6ae9a8..1a591730cd 100644 --- a/src/mpi/interface/c/rsend.c +++ b/src/mpi/interface/c/rsend.c @@ -5,13 +5,42 @@ #include #include "mpi.h" +#include "mpi/runtime/runtime.h" #include "mpi/interface/c/bindings.h" +#include "mca/mpi/pml/pml.h" + #if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES #pragma weak MPI_Rsend = PMPI_Rsend #endif -int MPI_Rsend(void *ibuf, int count, MPI_Datatype datatype, int dest, - int tag, MPI_Comm comm) { - return MPI_SUCCESS; +int MPI_Rsend(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) +{ + if (dest == MPI_PROC_NULL) { + return MPI_SUCCESS; + } + + if ( MPI_PARAM_CHECK ) { + int rc = MPI_SUCCESS; + if (lam_mpi_finalized) { + rc = MPI_ERR_INTERN; + } else if (count < 0) { + rc = MPI_ERR_COUNT; +#if 0 + } else if (type == MPI_DATATYPE_NULL) { + rc = MPI_ERR_TYPE; +#endif + } else if (tag < 0 || tag > MPI_TAG_UB_VALUE) { + rc = MPI_ERR_TAG; + } else if (lam_comm_invalid(comm)) { + rc = MPI_ERR_COMM; + } else if (lam_comm_peer_invalid(comm, dest)) { + rc = MPI_ERR_RANK; + } + if (rc != MPI_SUCCESS) { + return rc; + } + } + + return mca_pml.pml_send(buf, count, type, dest, tag, MCA_PML_BASE_SEND_READY, comm); } diff --git a/src/mpi/interface/c/ssend.c b/src/mpi/interface/c/ssend.c index b78b7f60bc..5068b187b0 100644 --- a/src/mpi/interface/c/ssend.c +++ b/src/mpi/interface/c/ssend.c @@ -5,13 +5,43 @@ #include #include "mpi.h" +#include "mpi/runtime/runtime.h" #include "mpi/interface/c/bindings.h" +#include "mca/mpi/pml/pml.h" + #if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES #pragma weak MPI_Ssend = PMPI_Ssend #endif -int MPI_Ssend(void *buf, int count, MPI_Datatype datatype, int dest, - int tag, MPI_Comm comm) { - return MPI_SUCCESS; +int MPI_Ssend(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Comm comm) +{ + if (dest == MPI_PROC_NULL) { + return MPI_SUCCESS; + } + + if ( MPI_PARAM_CHECK ) { + int rc = MPI_SUCCESS; + if (lam_mpi_finalized) { + rc = MPI_ERR_INTERN; + } else if (count < 0) { + rc = MPI_ERR_COUNT; +#if 0 + } else if (type == MPI_DATATYPE_NULL) { + rc = MPI_ERR_TYPE; +#endif + } else if (tag < 0 || tag > MPI_TAG_UB_VALUE) { + rc = MPI_ERR_TAG; + } else if (lam_comm_invalid(comm)) { + rc = MPI_ERR_COMM; + } else if (lam_comm_peer_invalid(comm, dest)) { + rc = MPI_ERR_RANK; + } + if (rc != MPI_SUCCESS) { + return rc; + } + } + + return mca_pml.pml_send(buf, count, type, dest, tag, MCA_PML_BASE_SEND_SYNCHRONOUS, comm); } + diff --git a/src/mpi/interface/c/start.c b/src/mpi/interface/c/start.c index 90d6174e57..df2a76a01f 100644 --- a/src/mpi/interface/c/start.c +++ b/src/mpi/interface/c/start.c @@ -5,13 +5,28 @@ #include #include "mpi.h" +#include "mpi/runtime/runtime.h" #include "mpi/interface/c/bindings.h" +#include "mca/mpi/pml/pml.h" + #if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES #pragma weak MPI_Start = PMPI_Start #endif -int MPI_Start(MPI_Request *request) { - return MPI_SUCCESS; +int MPI_Start(MPI_Request *request) +{ + if ( MPI_PARAM_CHECK ) { + int rc = MPI_SUCCESS; + if (lam_mpi_finalized) { + rc = MPI_ERR_INTERN; + } else if (request == NULL || *request == NULL) { + rc = MPI_ERR_REQUEST; + } + if (rc != MPI_SUCCESS) { + return rc; + } + } + return mca_pml.pml_start(request); } diff --git a/src/mpi/interface/c/test.c b/src/mpi/interface/c/test.c index cc64238ace..f8297d9a88 100644 --- a/src/mpi/interface/c/test.c +++ b/src/mpi/interface/c/test.c @@ -5,12 +5,39 @@ #include #include "mpi.h" +#include "mpi/runtime/runtime.h" #include "mpi/interface/c/bindings.h" +#include "mca/mpi/pml/pml.h" + #if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES #pragma weak MPI_Test = PMPI_Test #endif -int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status) { - return MPI_SUCCESS; +int MPI_Test(MPI_Request *request, int *completed, MPI_Status *status) +{ + if ( MPI_PARAM_CHECK ) { + int rc = MPI_SUCCESS; + if ( lam_mpi_finalized ) { + rc = MPI_ERR_INTERN; + } else if (request == NULL) { + rc = MPI_ERR_REQUEST; + } else if (completed == NULL) { + rc = MPI_ERR_ARG; + } + if (rc != MPI_SUCCESS) { + return rc; + } + } + + if(*request == NULL) { + *completed = true; + status->MPI_SOURCE = MPI_PROC_NULL; + status->MPI_TAG = MPI_ANY_TAG; + status->MPI_ERROR = MPI_SUCCESS; + status->_count = 0; + return MPI_SUCCESS; + } + return mca_pml.pml_test(request,completed,status); } + diff --git a/src/mpi/interface/c/wait.c b/src/mpi/interface/c/wait.c index 79df740d8f..5bc0751059 100644 --- a/src/mpi/interface/c/wait.c +++ b/src/mpi/interface/c/wait.c @@ -19,13 +19,21 @@ int MPI_Wait(MPI_Request *request, MPI_Status *status) int rc = MPI_SUCCESS; if (lam_mpi_finalized) { rc = MPI_ERR_INTERN; - } else if (request == NULL || *request == NULL) { + } else if (request == NULL) { rc = MPI_ERR_REQUEST; } if (rc != MPI_SUCCESS) { return rc; } } + + if (*request == NULL) { + status->MPI_SOURCE = MPI_PROC_NULL; + status->MPI_TAG = MPI_ANY_TAG; + status->MPI_ERROR = MPI_SUCCESS; + status->_count = 0; + return MPI_SUCCESS; + } return mca_pml.pml_wait(request,status); }