diff --git a/src/class/ompi_list.c b/src/class/ompi_list.c index fd54b40680..6ff760a7a5 100644 --- a/src/class/ompi_list.c +++ b/src/class/ompi_list.c @@ -38,7 +38,6 @@ OBJ_CLASS_INSTANCE( static void ompi_list_item_construct(ompi_list_item_t *item) { item->ompi_list_next = item->ompi_list_prev = NULL; - item->ompi_list_type = 0; } static void ompi_list_item_destruct(ompi_list_item_t *item) @@ -58,7 +57,6 @@ static void ompi_list_construct(ompi_list_t *list) list->ompi_list_head.ompi_list_next = &list->ompi_list_tail; list->ompi_list_tail.ompi_list_prev = &list->ompi_list_head; list->ompi_list_tail.ompi_list_next = NULL; - list->ompi_list_type = 0; list->ompi_list_length = 0; } diff --git a/src/class/ompi_list.h b/src/class/ompi_list.h index 0fc11d3a2d..0d0a4e6914 100644 --- a/src/class/ompi_list.h +++ b/src/class/ompi_list.h @@ -16,7 +16,6 @@ extern ompi_class_t ompi_list_item_t_class; extern ompi_class_t ompi_list_t_class; -typedef int ompi_list_type_t; /* @@ -28,7 +27,6 @@ typedef int ompi_list_type_t; typedef struct ompi_list_item { ompi_object_t super; - ompi_list_type_t ompi_list_type; volatile struct ompi_list_item *ompi_list_next; volatile struct ompi_list_item *ompi_list_prev; } ompi_list_item_t; @@ -48,10 +46,9 @@ typedef struct ompi_list_item typedef struct ompi_list { - ompi_object_t super; - ompi_list_item_t ompi_list_head; - ompi_list_item_t ompi_list_tail; - ompi_list_type_t ompi_list_type; + ompi_object_t super; + ompi_list_item_t ompi_list_head; + ompi_list_item_t ompi_list_tail; volatile size_t ompi_list_length; } ompi_list_t; @@ -60,16 +57,6 @@ typedef struct ompi_list * Inlined accessor functions */ -static inline ompi_list_type_t ompi_list_get_type(ompi_list_t* list) -{ - return list->ompi_list_type; -} - -static inline void ompi_list_set_type(ompi_list_t* list, ompi_list_type_t type) -{ - list->ompi_list_type = type; -} - static inline size_t ompi_list_get_size(ompi_list_t* list) { return list->ompi_list_length; diff --git a/src/mca/pml/base/pml_base_request.h b/src/mca/pml/base/pml_base_request.h index 4cf22072da..0f3943614d 100644 --- a/src/mca/pml/base/pml_base_request.h +++ b/src/mca/pml/base/pml_base_request.h @@ -21,7 +21,9 @@ extern ompi_class_t mca_pml_base_request_t_class; typedef enum { MCA_PML_REQUEST_NULL, MCA_PML_REQUEST_SEND, - MCA_PML_REQUEST_RECV + MCA_PML_REQUEST_RECV, + MCA_PML_REQUEST_IPROBE, + MCA_PML_REQUEST_PROBE } mca_pml_base_request_type_t; diff --git a/src/mca/pml/pml.h b/src/mca/pml/pml.h index c049bdd74c..424ed2f28f 100644 --- a/src/mca/pml/pml.h +++ b/src/mca/pml/pml.h @@ -434,6 +434,66 @@ typedef int (*mca_pml_base_wait_all_fn_t)( ); +/** + * Probe to poll for pending recv. + * + * @param src (IN) Source rank w/in communicator. + * @param tag (IN) User defined tag. + * @param comm (IN) Communicator. + * @param matched (OUT) Flag indicating if matching recv exists. + * @param status (OUT) Completion statuses. + * @return OMPI_SUCCESS or failure status. + * + */ +typedef int (*mca_pml_base_iprobe_fn_t)( + int src, + int tag, + ompi_communicator_t* comm, + int *matched, + ompi_status_public_t *status +); + +/** + * Blocking probe to wait for pending recv. + * + * @param src (IN) Source rank w/in communicator. + * @param tag (IN) User defined tag. + * @param comm (IN) Communicator. + * @param status (OUT) Completion statuses. + * @return OMPI_SUCCESS or failure status. + * + */ +typedef int (*mca_pml_base_probe_fn_t)( + int src, + int tag, + ompi_communicator_t* comm, + ompi_status_public_t *status +); + +/** + * Cancel pending operation. + * + * @param request (IN) Request + * @return OMPI_SUCCESS or failure status. + * + */ +typedef int (*mca_pml_base_cancel_fn_t)( + ompi_request_t* request +); + + +/** + * Has a request been cancelled? + * + * @param request (IN) Request + * @return OMPI_SUCCESS or failure status. + * + */ +typedef int (*mca_pml_base_cancelled_fn_t)( + ompi_request_t* request, + int *flag +); + /** * Release resources held by a persistent mode request. * @@ -485,6 +545,10 @@ struct mca_pml_1_0_0_t { mca_pml_base_test_all_fn_t pml_test_all; mca_pml_base_wait_fn_t pml_wait; mca_pml_base_wait_all_fn_t pml_wait_all; + mca_pml_base_iprobe_fn_t pml_iprobe; + mca_pml_base_probe_fn_t pml_probe; + mca_pml_base_cancel_fn_t pml_cancel; + mca_pml_base_cancelled_fn_t pml_cancelled; mca_pml_base_free_fn_t pml_free; mca_pml_base_null_fn_t pml_null; }; diff --git a/src/mca/pml/teg/src/Makefile.am b/src/mca/pml/teg/src/Makefile.am index c757edd30b..3702aa0883 100644 --- a/src/mca/pml/teg/src/Makefile.am +++ b/src/mca/pml/teg/src/Makefile.am @@ -14,7 +14,9 @@ noinst_LTLIBRARIES = libmca_pml_teg.la libmca_pml_teg_la_SOURCES = \ pml_teg.c \ pml_teg.h \ + pml_teg_cancel.c \ pml_teg_free.c \ + pml_teg_iprobe.c \ pml_teg_irecv.c \ pml_teg_isend.c \ pml_teg_module.c \ diff --git a/src/mca/pml/teg/src/pml_teg.c b/src/mca/pml/teg/src/pml_teg.c index c34dffe8e2..7620772feb 100644 --- a/src/mca/pml/teg/src/pml_teg.c +++ b/src/mca/pml/teg/src/pml_teg.c @@ -37,6 +37,10 @@ mca_pml_teg_t mca_pml_teg = { mca_pml_teg_test_all, mca_pml_teg_wait, mca_pml_teg_wait_all, + mca_pml_teg_iprobe, + mca_pml_teg_probe, + mca_pml_teg_cancel, + mca_pml_teg_cancelled, mca_pml_teg_free, mca_pml_teg_null } diff --git a/src/mca/pml/teg/src/pml_teg.h b/src/mca/pml/teg/src/pml_teg.h index b7ae721c2d..10eb1193fc 100644 --- a/src/mca/pml/teg/src/pml_teg.h +++ b/src/mca/pml/teg/src/pml_teg.h @@ -119,6 +119,31 @@ extern int mca_pml_teg_control( extern int mca_pml_teg_progress(void); +extern int mca_pml_teg_iprobe( + int dst, + int tag, + struct ompi_communicator_t* comm, + int *matched, + ompi_status_public_t* status +); + +extern int mca_pml_teg_probe( + int dst, + int tag, + struct ompi_communicator_t* comm, + ompi_status_public_t* status +); + +extern int mca_pml_teg_cancel( + ompi_request_t* request +); + +extern int mca_pml_teg_cancelled( + ompi_request_t* request, + int *flag +); + + extern int mca_pml_teg_isend_init( void *buf, size_t count, diff --git a/src/mca/pml/teg/src/pml_teg_cancel.c b/src/mca/pml/teg/src/pml_teg_cancel.c new file mode 100644 index 0000000000..55b0f32a35 --- /dev/null +++ b/src/mca/pml/teg/src/pml_teg_cancel.c @@ -0,0 +1,15 @@ +#include "pml_teg.h" + + +int mca_pml_teg_cancel(ompi_request_t* request) +{ + return OMPI_SUCCESS; +} + +int mca_pml_teg_cancelled(ompi_request_t* request, int* flag) +{ + if(NULL != flag) + *flag = 0; + return OMPI_SUCCESS; +} + diff --git a/src/mca/pml/teg/src/pml_teg_iprobe.c b/src/mca/pml/teg/src/pml_teg_iprobe.c new file mode 100644 index 0000000000..28e770c30c --- /dev/null +++ b/src/mca/pml/teg/src/pml_teg_iprobe.c @@ -0,0 +1,87 @@ +#include "pml_teg_recvreq.h" + + +int mca_pml_teg_iprobe( + int src, + int tag, + struct ompi_communicator_t* comm, + int* matched, + ompi_status_public_t* status) +{ + int rc; + + mca_ptl_base_recv_request_t recvreq; + OBJ_CONSTRUCT(&recvreq, mca_ptl_base_recv_request_t); + recvreq.super.req_type = MCA_PML_REQUEST_IPROBE; + MCA_PTL_BASE_RECV_REQUEST_INIT( + &recvreq, + NULL, + 0, + NULL, + src, + tag, + comm, + true); + + if((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) { + OBJ_DESTRUCT(&recvreq); + return rc; + } + if((*matched = recvreq.super.req_mpi_done) == true && (NULL != status)) { + *status = recvreq.super.req_status; + } + + OBJ_DESTRUCT(&recvreq); + return OMPI_SUCCESS; +} + + +int mca_pml_teg_probe( + int src, + int tag, + struct ompi_communicator_t* comm, + ompi_status_public_t* status) +{ + int rc; + mca_ptl_base_recv_request_t recvreq; + OBJ_CONSTRUCT(&recvreq, mca_ptl_base_recv_request_t); + recvreq.super.req_type = MCA_PML_REQUEST_PROBE; + MCA_PTL_BASE_RECV_REQUEST_INIT( + &recvreq, + NULL, + 0, + NULL, + src, + tag, + comm, + true); + + if((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) { + OBJ_DESTRUCT(&recvreq); + return rc; + } + + if(recvreq.super.req_mpi_done == false) { + /* give up and sleep until completion */ + if(ompi_using_threads()) { + ompi_mutex_lock(&mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting++; + while(recvreq.super.req_mpi_done == false) + ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting--; + ompi_mutex_unlock(&mca_pml_teg.teg_request_lock); + } else { + mca_pml_teg.teg_request_waiting++; + while(recvreq.super.req_mpi_done == false) + ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting--; + } + } + + if(NULL != status) { + *status = recvreq.super.req_status; + } + OBJ_DESTRUCT(&recvreq); + return OMPI_SUCCESS; +} + diff --git a/src/mca/pml/teg/src/pml_teg_sendreq.c b/src/mca/pml/teg/src/pml_teg_sendreq.c index 92045a9ea0..7a86d818c8 100644 --- a/src/mca/pml/teg/src/pml_teg_sendreq.c +++ b/src/mca/pml/teg/src/pml_teg_sendreq.c @@ -48,11 +48,12 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req) */ else { bytes_to_frag = (ptl_proc->ptl_weight * bytes_remaining) / 100; + if(ptl->ptl_max_frag_size != 0 && bytes_to_frag > ptl->ptl_max_frag_size) + bytes_to_frag = ptl->ptl_max_frag_size; } - rc = ptl->ptl_put(ptl, ptl_proc->ptl_peer, req, req->req_offset, &bytes_to_frag, 0); + rc = ptl->ptl_put(ptl, ptl_proc->ptl_peer, req, req->req_offset, bytes_to_frag, 0); if(rc == OMPI_SUCCESS) { - req->req_offset += bytes_to_frag; bytes_remaining = req->req_bytes_packed - req->req_offset; } } diff --git a/src/mca/pml/teg/src/pml_teg_sendreq.h b/src/mca/pml/teg/src/pml_teg_sendreq.h index fcf5988dd4..0b8af8753e 100644 --- a/src/mca/pml/teg/src/pml_teg_sendreq.h +++ b/src/mca/pml/teg/src/pml_teg_sendreq.h @@ -46,7 +46,7 @@ static inline int mca_pml_teg_send_request_start( int flags, rc; /* start the first fragment */ - if(first_fragment_size <= 0 || req->req_bytes_packed <= first_fragment_size) { + if(first_fragment_size == 0 || req->req_bytes_packed <= first_fragment_size) { first_fragment_size = req->req_bytes_packed; flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ? MCA_PTL_FLAGS_ACK_MATCHED : 0; diff --git a/src/mca/ptl/base/ptl_base_recvfrag.h b/src/mca/ptl/base/ptl_base_recvfrag.h index 4e81434a9f..f551b179e2 100644 --- a/src/mca/ptl/base/ptl_base_recvfrag.h +++ b/src/mca/ptl/base/ptl_base_recvfrag.h @@ -54,11 +54,22 @@ static inline bool mca_ptl_base_recv_frag_match( request->super.req_peer = header->hdr_src; request->super.req_tag = header->hdr_tag; - /* notify ptl of match */ - ptl->ptl_matched(ptl, frag); + /* + * If probe - signal request is complete - but don't notify PTL + */ + if(request->super.req_type == MCA_PML_REQUEST_PROBE) { - /* process any additional fragments that arrived out of order */ - frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags); + ptl->ptl_recv_progress(request, frag); + matched = mca_ptl_base_recv_frag_match(frag, header); + + } else { + + /* notify ptl of match */ + ptl->ptl_matched(ptl, frag); + + /* process any additional fragments that arrived out of order */ + frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags); + }; }; return matched; } diff --git a/src/mca/ptl/base/ptl_base_recvreq.c b/src/mca/ptl/base/ptl_base_recvreq.c index fe8b6b56e1..c9051afd04 100644 --- a/src/mca/ptl/base/ptl_base_recvreq.c +++ b/src/mca/ptl/base/ptl_base_recvreq.c @@ -63,7 +63,8 @@ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* reque /* We didn't find any matches. Record this irecv so we can match * it when the message comes in. */ - ompi_list_append(pml_comm->c_specific_receives+req_peer, (ompi_list_item_t*)request); + if(request->super.req_type != MCA_PML_REQUEST_IPROBE) + ompi_list_append(pml_comm->c_specific_receives+req_peer, (ompi_list_item_t*)request); THREAD_UNLOCK(&pml_comm->c_matching_lock); } @@ -111,7 +112,8 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request) * it when the message comes in. */ - ompi_list_append(&pml_comm->c_wild_receives, (ompi_list_item_t*)request); + if(request->super.req_type != MCA_PML_REQUEST_IPROBE) + ompi_list_append(&pml_comm->c_wild_receives, (ompi_list_item_t*)request); THREAD_UNLOCK(&pml_comm->c_matching_lock); } diff --git a/src/mca/ptl/base/ptl_base_recvreq.h b/src/mca/ptl/base/ptl_base_recvreq.h index cacb163c18..7b4af4237f 100644 --- a/src/mca/ptl/base/ptl_base_recvreq.h +++ b/src/mca/ptl/base/ptl_base_recvreq.h @@ -47,21 +47,21 @@ typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t; comm, \ persistent) \ { \ - request->req_bytes_packed = 0; \ - request->req_bytes_received = 0; \ - request->req_bytes_delivered = 0; \ - request->super.req_sequence = 0; \ - request->super.req_addr = addr; \ - request->super.req_count = count; \ - request->super.req_datatype = datatype; \ - request->super.req_peer = src; \ - request->super.req_tag = tag; \ - request->super.req_comm = comm; \ - request->super.req_proc = NULL; \ - request->super.req_persistent = persistent; \ - request->super.req_mpi_done = false; \ - request->super.req_pml_done = false; \ - request->super.req_free_called = false; \ + (request)->req_bytes_packed = 0; \ + (request)->req_bytes_received = 0; \ + (request)->req_bytes_delivered = 0; \ + (request)->super.req_sequence = 0; \ + (request)->super.req_addr = addr; \ + (request)->super.req_count = count; \ + (request)->super.req_datatype = datatype; \ + (request)->super.req_peer = src; \ + (request)->super.req_tag = tag; \ + (request)->super.req_comm = comm; \ + (request)->super.req_proc = NULL; \ + (request)->super.req_persistent = persistent; \ + (request)->super.req_mpi_done = false; \ + (request)->super.req_pml_done = false; \ + (request)->super.req_free_called = false; \ } /** diff --git a/src/mpi/c/cancel.c b/src/mpi/c/cancel.c index 6632b77b49..281a0732bc 100644 --- a/src/mpi/c/cancel.c +++ b/src/mpi/c/cancel.c @@ -5,8 +5,10 @@ #include #include "mpi.h" +#include "runtime/runtime.h" #include "mpi/c/bindings.h" - +#include "mca/pml/pml.h" + #if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES #pragma weak MPI_Cancel = PMPI_Cancel #endif @@ -15,6 +17,23 @@ #include "mpi/c/profile/defines.h" #endif -int MPI_Cancel(MPI_Request *request) { - return MPI_SUCCESS; +int MPI_Cancel(MPI_Request *request) +{ + int rc; + if ( MPI_PARAM_CHECK ) { + rc = MPI_SUCCESS; + if ( OMPI_MPI_INVALID_STATE ) { + rc = MPI_ERR_INTERN; + } else if (request == NULL) { + rc = MPI_ERR_REQUEST; + } + OMPI_ERRHANDLER_CHECK(rc, (ompi_communicator_t*)NULL, rc, "MPI_Cancel"); + } + + if (NULL == *request) { + return MPI_SUCCESS; + } + rc = mca_pml.pml_cancel(*request); + OMPI_ERRHANDLER_RETURN(rc, (ompi_communicator_t*)NULL, rc, "MPI_Cancel"); } + diff --git a/src/mpi/c/iprobe.c b/src/mpi/c/iprobe.c index da45739d5e..e24858223a 100644 --- a/src/mpi/c/iprobe.c +++ b/src/mpi/c/iprobe.c @@ -5,7 +5,10 @@ #include "ompi_config.h" #include "mpi.h" +#include "runtime/runtime.h" #include "mpi/c/bindings.h" +#include "mca/pml/pml.h" + #if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES #pragma weak MPI_Iprobe = PMPI_Iprobe @@ -15,7 +18,34 @@ #include "mpi/c/profile/defines.h" #endif -int MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag, - MPI_Status *status) { - return MPI_SUCCESS; +int MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag, MPI_Status *status) +{ + int rc; + if (source == MPI_PROC_NULL) { + if (status) { + status->MPI_SOURCE = MPI_PROC_NULL; + status->MPI_TAG = MPI_ANY_TAG; + status->MPI_ERROR = MPI_SUCCESS; + status->_count = 0; + } + return MPI_SUCCESS; + } + + if ( MPI_PARAM_CHECK ) { + rc = MPI_SUCCESS; + if ( OMPI_MPI_INVALID_STATE ) { + rc = MPI_ERR_INTERN; + } else if (tag < 0 || tag > MPI_TAG_UB_VALUE) { + rc = MPI_ERR_TAG; + } else if (ompi_comm_invalid(comm)) { + rc = MPI_ERR_COMM; + } else if (source != MPI_ANY_SOURCE && ompi_comm_peer_invalid(comm, source)) { + rc = MPI_ERR_RANK; + } + OMPI_ERRHANDLER_CHECK(rc, comm, rc, "MPI_Iprobe"); + } + + rc = mca_pml.pml_iprobe(source, tag, comm, flag, status); + OMPI_ERRHANDLER_RETURN(rc, comm, rc, "MPI_Iprobe"); } + diff --git a/src/mpi/c/probe.c b/src/mpi/c/probe.c index 5aae8e7552..7847ae9c0c 100644 --- a/src/mpi/c/probe.c +++ b/src/mpi/c/probe.c @@ -5,7 +5,9 @@ #include #include "mpi.h" +#include "runtime/runtime.h" #include "mpi/c/bindings.h" +#include "mca/pml/pml.h" #if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES #pragma weak MPI_Probe = PMPI_Probe @@ -15,6 +17,34 @@ #include "mpi/c/profile/defines.h" #endif -int MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status *status) { - return MPI_SUCCESS; +int MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status *status) +{ + int rc; + if (source == MPI_PROC_NULL) { + if (status) { + status->MPI_SOURCE = MPI_PROC_NULL; + status->MPI_TAG = MPI_ANY_TAG; + status->MPI_ERROR = MPI_SUCCESS; + status->_count = 0; + } + return MPI_SUCCESS; + } + + if ( MPI_PARAM_CHECK ) { + rc = MPI_SUCCESS; + if ( OMPI_MPI_INVALID_STATE ) { + rc = MPI_ERR_INTERN; + } else if (tag < 0 || tag > MPI_TAG_UB_VALUE) { + rc = MPI_ERR_TAG; + } else if (ompi_comm_invalid(comm)) { + rc = MPI_ERR_COMM; + } else if (source != MPI_ANY_SOURCE && ompi_comm_peer_invalid(comm, source)) { + rc = MPI_ERR_RANK; + } + OMPI_ERRHANDLER_CHECK(rc, comm, rc, "MPI_Probe"); + } + + rc = mca_pml.pml_probe(source, tag, comm, status); + OMPI_ERRHANDLER_RETURN(rc, comm, rc, "MPI_Probe"); } + diff --git a/src/request/request.c b/src/request/request.c index f8d759b316..cc8698e990 100644 --- a/src/request/request.c +++ b/src/request/request.c @@ -12,12 +12,13 @@ ompi_class_t ompi_request_t_class = { }; -void ompi_request_construct(ompi_request_t* rq) -{ -} - - -void ompi_request_destruct(ompi_request_t* rq) +void ompi_request_construct(ompi_request_t* req) +{ + req->req_mode = OMPI_REQUEST_INVALID; +} + + +void ompi_request_destruct(ompi_request_t* req) { } diff --git a/src/request/request.h b/src/request/request.h index a58da42479..84974cf165 100644 --- a/src/request/request.h +++ b/src/request/request.h @@ -18,9 +18,17 @@ typedef enum { OMPI_REQUEST_MAX } ompi_request_type_t; +typedef enum { + OMPI_REQUEST_INVALID, + OMPI_REQUEST_INACTIVE, + OMPI_REQUEST_ACTIVE, + OMPI_REQUEST_CANCELLED +} ompi_request_mode_t; + struct ompi_request_t { ompi_list_item_t super; ompi_request_type_t req_type; + ompi_request_mode_t req_mode; }; typedef struct ompi_request_t ompi_request_t;