1
1

Refactor the request completion (#1422)

* Remodel the request.
Added the wait sync primitive and integrate it into the PML and MTL
infrastructure. The multi-threaded requests are now significantly
less heavy and less noisy (only the threads associated with completed
requests are signaled).

* Fix the condition to release the request.
Этот коммит содержится в:
bosilca 2016-05-24 18:20:51 -05:00
родитель 1d3110471c
Коммит b90c83840f
32 изменённых файлов: 525 добавлений и 444 удалений

Просмотреть файл

@ -4,6 +4,9 @@
* reseved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -114,7 +117,7 @@ static int ompi_comm_request_progress (void)
/* don't call ompi_request_test_all as it causes a recursive call into opal_progress */
while (request_item->subreq_count) {
ompi_request_t *subreq = request_item->subreqs[request_item->subreq_count-1];
if (true == subreq->req_complete) {
if( REQUEST_COMPLETE(subreq) ) {
ompi_request_free (&subreq);
request_item->subreq_count--;
} else {
@ -204,7 +207,7 @@ static int ompi_comm_request_free (struct ompi_request_t **ompi_req)
{
ompi_comm_request_t *request = (ompi_comm_request_t *) *ompi_req;
if (!(*ompi_req)->req_complete) {
if( !REQUEST_COMPLETE(*ompi_req) ) {
return MPI_ERR_REQUEST;
}

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -300,7 +300,7 @@ request_free(struct ompi_request_t **ompi_req)
ompi_coll_libnbc_request_t *request =
(ompi_coll_libnbc_request_t*) *ompi_req;
if (true != request->super.req_complete) {
if( !REQUEST_COMPLETE(&request->super) ) {
return MPI_ERR_REQUEST;
}

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -29,7 +29,7 @@ static int mca_io_ompio_request_free ( struct ompi_request_t **req)
{
mca_ompio_request_t *ompio_req = ( mca_ompio_request_t *)*req;
if ( NULL != ompio_req->req_free_fn ) {
ompio_req->req_free_fn (ompio_req );
ompio_req->req_free_fn (ompio_req );
}
opal_list_remove_item (&mca_io_ompio_pending_requests, &ompio_req->req_item);
@ -65,7 +65,7 @@ void mca_io_ompio_request_destruct(mca_ompio_request_t* req)
OMPI_REQUEST_FINI ( &(req->req_ompi));
OBJ_DESTRUCT (&req->req_item);
if ( NULL != req->req_data ) {
free (req->req_data);
free (req->req_data);
}
return;
@ -79,16 +79,16 @@ int mca_io_ompio_component_progress ( void )
OPAL_LIST_FOREACH(litem, &mca_io_ompio_pending_requests, opal_list_item_t) {
req = GET_OMPIO_REQ_FROM_ITEM(litem);
if ( true == req->req_ompi.req_complete ) {
continue;
}
if( REQUEST_COMPLETE(&req->req_ompi) ) {
continue;
}
if ( NULL != req->req_progress_fn ) {
if ( req->req_progress_fn(req) ) {
completed++;
ompi_request_complete (&req->req_ompi, 1);
/* The fbtl progress function is expected to set the
** status elements
*/
* status elements
*/
}
}

Просмотреть файл

@ -3,6 +3,9 @@
* Copyright (c) 2011-2012 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -32,7 +35,7 @@ request_free(struct ompi_request_t **ompi_req)
ompi_osc_pt2pt_request_t *request =
(ompi_osc_pt2pt_request_t*) *ompi_req;
if (true != request->super.req_complete) {
if (REQUEST_COMPLETED != request->super.req_complete) {
return MPI_ERR_REQUEST;
}

Просмотреть файл

@ -3,6 +3,9 @@
* Copyright (c) 2011-2012 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -30,7 +33,7 @@ static int request_free(struct ompi_request_t **ompi_req)
ompi_osc_rdma_request_t *request =
(ompi_osc_rdma_request_t*) *ompi_req;
if (true != request->super.req_complete) {
if( REQUEST_COMPLETE(&request->super) ) {
return MPI_ERR_REQUEST;
}

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2010 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -78,7 +78,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_base_recv_request_t);
(request)->req_base.req_sequence = 0; \
(request)->req_base.req_datatype = datatype; \
/* What about req_type ? */ \
(request)->req_base.req_pml_complete = OPAL_INT_TO_BOOL(persistent); \
(request)->req_base.req_pml_complete = false; \
(request)->req_base.req_free_called = false; \
}
/**
@ -99,7 +99,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_base_recv_request_t);
(request)->req_ompi.req_status._ucount = 0; \
(request)->req_ompi.req_status._cancelled = 0; \
\
(request)->req_ompi.req_complete = false; \
(request)->req_ompi.req_complete = REQUEST_PENDING; \
(request)->req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
} while (0)

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -62,11 +62,11 @@ struct mca_pml_base_request_t {
/* START: These fields have to match the definition of the mca_pml_cm_request_t */
ompi_request_t req_ompi; /**< base request */
volatile bool req_pml_complete; /**< flag indicating if the pt-2-pt layer is done with this request */
volatile int32_t req_pml_complete; /**< flag indicating if the pt-2-pt layer is done with this request */
volatile int32_t req_free_called; /**< flag indicating if the user has freed this request */
mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */
struct ompi_communicator_t *req_comm; /**< communicator pointer */
struct ompi_datatype_t *req_datatype; /**< pointer to data type */
volatile bool req_free_called; /**< flag indicating if the user has freed this request */
opal_convertor_t req_convertor; /**< always need the convertor */
/* END: These field have to match the definition of the mca_pml_cm_request_t */

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -88,7 +88,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION( mca_pml_base_send_request_t );
(request)->req_base.req_tag = (int32_t)tag; \
(request)->req_base.req_comm = comm; \
/* (request)->req_base.req_proc is set on request allocation */ \
(request)->req_base.req_pml_complete = OPAL_INT_TO_BOOL(persistent); \
(request)->req_base.req_pml_complete = false; \
(request)->req_base.req_free_called = false; \
(request)->req_base.req_ompi.req_status._cancelled = 0; \
(request)->req_bytes_packed = 0; \
@ -119,7 +119,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION( mca_pml_base_send_request_t );
#define MCA_PML_BASE_SEND_START( request ) \
do { \
(request)->req_pml_complete = false; \
(request)->req_ompi.req_complete = false; \
(request)->req_ompi.req_complete = REQUEST_PENDING; \
(request)->req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
(request)->req_ompi.req_status._cancelled = 0; \
} while (0)

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -42,11 +42,11 @@ struct mca_pml_cm_request_t {
/* START: These fields have to match the definition of the mca_pml_base_request_t */
ompi_request_t req_ompi; /**< base request */
volatile bool req_pml_complete; /**< flag indicating if the pt-2-pt layer is done with this request */
volatile int32_t req_pml_complete; /**< flag indicating if the pt-2-pt layer is done with this request */
volatile int32_t req_free_called; /**< flag indicating if the user has freed this request */
mca_pml_cm_request_type_t req_pml_type;
struct ompi_communicator_t *req_comm; /**< communicator pointer */
struct ompi_datatype_t *req_datatype; /**< pointer to data type */
volatile bool req_free_called; /**< flag indicating if the user has freed this request */
opal_convertor_t req_convertor; /**< convertor that describes the memory layout */
/* END: These fields have to match the definition of the mca_pml_base_request_t */
};

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -39,7 +39,7 @@ int mca_pml_ob1_iprobe(int src,
MCA_PML_OB1_RECV_REQUEST_INIT(&recvreq, NULL, 0, &ompi_mpi_char.dt, src, tag, comm, true);
MCA_PML_OB1_RECV_REQUEST_START(&recvreq);
if( recvreq.req_recv.req_base.req_ompi.req_complete == true ) {
if( REQUEST_COMPLETE( &(recvreq.req_recv.req_base.req_ompi)) ) {
if( NULL != status ) {
*status = recvreq.req_recv.req_base.req_ompi.req_status;
}
@ -106,7 +106,7 @@ mca_pml_ob1_improbe(int src,
src, tag, comm, false);
MCA_PML_OB1_RECV_REQUEST_START(recvreq);
if( recvreq->req_recv.req_base.req_ompi.req_complete == true ) {
if( REQUEST_COMPLETE( &(recvreq->req_recv.req_base.req_ompi)) ) {
if( NULL != status ) {
*status = recvreq->req_recv.req_base.req_ompi.req_status;
}

Просмотреть файл

@ -71,28 +71,26 @@ static int mca_pml_ob1_recv_request_free(struct ompi_request_t** request)
{
mca_pml_ob1_recv_request_t* recvreq = *(mca_pml_ob1_recv_request_t**)request;
assert( false == recvreq->req_recv.req_base.req_free_called );
if(false == recvreq->req_recv.req_base.req_free_called){
OPAL_THREAD_LOCK(&ompi_request_lock);
recvreq->req_recv.req_base.req_free_called = true;
recvreq->req_recv.req_base.req_free_called = true;
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_NOTIFY,
&(recvreq->req_recv.req_base), PERUSE_RECV );
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_NOTIFY,
&(recvreq->req_recv.req_base), PERUSE_RECV );
if( true == recvreq->req_recv.req_base.req_pml_complete ) {
/* make buffer defined when the request is compeleted,
and before releasing the objects. */
MEMCHECKER(
memchecker_call(&opal_memchecker_base_mem_defined,
recvreq->req_recv.req_base.req_addr,
recvreq->req_recv.req_base.req_count,
recvreq->req_recv.req_base.req_datatype);
);
if( true == recvreq->req_recv.req_base.req_pml_complete ) {
/* make buffer defined when the request is compeleted,
and before releasing the objects. */
MEMCHECKER(
memchecker_call(&opal_memchecker_base_mem_defined,
recvreq->req_recv.req_base.req_addr,
recvreq->req_recv.req_base.req_count,
recvreq->req_recv.req_base.req_datatype);
);
MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq );
}
MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
@ -126,14 +124,12 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
request->req_recv.req_base.req_pml_complete = true;
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request->req_status._cancelled = true;
/* This macro will set the req_complete to true so the MPI Test/Wait* functions
* on this request will be able to complete. As the status is marked as
* cancelled the cancel state will be detected.
*/
MCA_PML_OB1_RECV_REQUEST_MPI_COMPLETE(request);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/*
* Receive request cancelled, make user buffer accessible.
*/

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2014 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2007 High Performance Computing Center Stuttgart,
@ -158,47 +158,48 @@ recv_request_pml_complete(mca_pml_ob1_recv_request_t *recvreq)
{
size_t i;
assert(false == recvreq->req_recv.req_base.req_pml_complete);
if(false == recvreq->req_recv.req_base.req_pml_complete){
if(recvreq->req_recv.req_bytes_packed > 0) {
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END,
&recvreq->req_recv.req_base, PERUSE_RECV );
}
for(i = 0; i < recvreq->req_rdma_cnt; i++) {
struct mca_btl_base_registration_handle_t *handle = recvreq->req_rdma[i].btl_reg;
mca_bml_base_btl_t *bml_btl = recvreq->req_rdma[i].bml_btl;
if (NULL != handle) {
mca_bml_base_deregister_mem (bml_btl, handle);
if(recvreq->req_recv.req_bytes_packed > 0) {
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END,
&recvreq->req_recv.req_base, PERUSE_RECV );
}
}
recvreq->req_rdma_cnt = 0;
OPAL_THREAD_LOCK(&ompi_request_lock);
if(true == recvreq->req_recv.req_base.req_free_called) {
if( MPI_SUCCESS != recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR ) {
ompi_mpi_abort(&ompi_mpi_comm_world.comm, MPI_ERR_REQUEST);
for(i = 0; i < recvreq->req_rdma_cnt; i++) {
struct mca_btl_base_registration_handle_t *handle = recvreq->req_rdma[i].btl_reg;
mca_bml_base_btl_t *bml_btl = recvreq->req_rdma[i].bml_btl;
if (NULL != handle) {
mca_bml_base_deregister_mem (bml_btl, handle);
}
}
MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq);
} else {
/* initialize request status */
recvreq->req_recv.req_base.req_pml_complete = true;
recvreq->req_recv.req_base.req_ompi.req_status._ucount =
recvreq->req_bytes_received;
if (recvreq->req_recv.req_bytes_packed > recvreq->req_bytes_expected) {
recvreq->req_rdma_cnt = 0;
if(true == recvreq->req_recv.req_base.req_free_called) {
if( MPI_SUCCESS != recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR ) {
ompi_mpi_abort(&ompi_mpi_comm_world.comm, MPI_ERR_REQUEST);
}
MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq);
} else {
/* initialize request status */
recvreq->req_recv.req_base.req_pml_complete = true;
recvreq->req_recv.req_base.req_ompi.req_status._ucount =
recvreq->req_recv.req_bytes_packed;
recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR =
MPI_ERR_TRUNCATE;
recvreq->req_bytes_received;
if (recvreq->req_recv.req_bytes_packed > recvreq->req_bytes_expected) {
recvreq->req_recv.req_base.req_ompi.req_status._ucount =
recvreq->req_recv.req_bytes_packed;
recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR =
MPI_ERR_TRUNCATE;
}
if (OPAL_UNLIKELY(recvreq->local_handle)) {
mca_bml_base_deregister_mem (recvreq->rdma_bml, recvreq->local_handle);
recvreq->local_handle = NULL;
}
MCA_PML_OB1_RECV_REQUEST_MPI_COMPLETE(recvreq);
}
if (OPAL_UNLIKELY(recvreq->local_handle)) {
mca_bml_base_deregister_mem (recvreq->rdma_bml, recvreq->local_handle);
recvreq->local_handle = NULL;
}
MCA_PML_OB1_RECV_REQUEST_MPI_COMPLETE(recvreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
static inline bool

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -37,6 +37,7 @@
#include "ompi/mca/bml/base/base.h"
#include "ompi/memchecker.h"
OBJ_CLASS_INSTANCE(mca_pml_ob1_send_range_t, opal_free_list_item_t,
NULL, NULL);
@ -97,31 +98,26 @@ void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl)
static int mca_pml_ob1_send_request_free(struct ompi_request_t** request)
{
mca_pml_ob1_send_request_t* sendreq = *(mca_pml_ob1_send_request_t**)request;
if(false == sendreq->req_send.req_base.req_free_called) {
assert( false == sendreq->req_send.req_base.req_free_called );
OPAL_THREAD_LOCK(&ompi_request_lock);
sendreq->req_send.req_base.req_free_called = true;
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_NOTIFY,
sendreq->req_send.req_base.req_free_called = true;
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_NOTIFY,
&(sendreq->req_send.req_base), PERUSE_SEND );
if( true == sendreq->req_send.req_base.req_pml_complete ) {
/* make buffer defined when the request is compeleted,
and before releasing the objects. */
MEMCHECKER(
memchecker_call(&opal_memchecker_base_mem_defined,
sendreq->req_send.req_base.req_addr,
sendreq->req_send.req_base.req_count,
sendreq->req_send.req_base.req_datatype);
);
if( true == sendreq->req_send.req_base.req_pml_complete ) {
/* make buffer defined when the request is compeleted,
and before releasing the objects. */
MEMCHECKER(
memchecker_call(&opal_memchecker_base_mem_defined,
sendreq->req_send.req_base.req_addr,
sendreq->req_send.req_base.req_count,
sendreq->req_send.req_base.req_datatype);
);
MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq );
MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq );
}
*request = MPI_REQUEST_NULL;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
@ -446,9 +442,7 @@ int mca_pml_ob1_send_request_start_buffered(
sendreq->req_state = 2;
/* request is complete at mpi level */
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* send */
rc = mca_bml_base_send(bml_btl, des, MCA_PML_OB1_HDR_TYPE_RNDV);
@ -573,7 +567,7 @@ int mca_pml_ob1_send_request_start_copy( mca_pml_ob1_send_request_t* sendreq,
/* send */
rc = mca_bml_base_send_status(bml_btl, des, MCA_PML_OB1_HDR_TYPE_MATCH);
if( OPAL_LIKELY( rc >= OMPI_SUCCESS ) ) {
if( OPAL_LIKELY( rc >= OPAL_SUCCESS ) ) {
if( OPAL_LIKELY( 1 == rc ) ) {
mca_pml_ob1_match_completion_free_request( bml_btl, sendreq );
}
@ -633,7 +627,7 @@ int mca_pml_ob1_send_request_start_prepare( mca_pml_ob1_send_request_t* sendreq,
/* send */
rc = mca_bml_base_send(bml_btl, des, MCA_PML_OB1_HDR_TYPE_MATCH);
if( OPAL_LIKELY( rc >= 0 ) ) {
if( OPAL_LIKELY( rc >= OPAL_SUCCESS ) ) {
if( OPAL_LIKELY( 1 == rc ) ) {
mca_pml_ob1_match_completion_free_request( bml_btl, sendreq );
}
@ -941,7 +935,7 @@ mca_pml_ob1_send_request_schedule_once(mca_pml_ob1_send_request_t* sendreq)
prev_bytes_remaining = range->range_send_length;
if( OPAL_UNLIKELY(num_fail == range->range_btl_cnt) ) {
assert(sendreq->req_pending == MCA_PML_OB1_SEND_PENDING_NONE);
/*TODO : assert(sendreq->req_pending == MCA_PML_OB1_SEND_PENDING_NONE); */
add_request_to_send_pending(sendreq,
MCA_PML_OB1_SEND_PENDING_SCHEDULE, true);
/* Note that request remains locked. send_request_process_pending()
@ -1125,7 +1119,7 @@ static void mca_pml_ob1_put_completion (mca_btl_base_module_t* btl, struct mca_b
/* check completion status */
if( OPAL_UNLIKELY(OMPI_SUCCESS == status) ) {
/* TODO -- readd ordering */
/* TODO -- read ordering */
mca_pml_ob1_send_fin (sendreq->req_send.req_base.req_proc, bml_btl,
frag->rdma_hdr.hdr_rdma.hdr_frag, frag->rdma_length,
0, 0);

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2014 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -248,36 +248,34 @@ static inline void mca_pml_ob1_send_request_fini (mca_pml_ob1_send_request_t *se
static inline void
send_request_pml_complete(mca_pml_ob1_send_request_t *sendreq)
{
assert(false == sendreq->req_send.req_base.req_pml_complete);
if(false == sendreq->req_send.req_base.req_pml_complete) {
if(sendreq->req_send.req_bytes_packed > 0) {
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END,
&(sendreq->req_send.req_base), PERUSE_SEND);
}
if(sendreq->req_send.req_bytes_packed > 0) {
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END,
&(sendreq->req_send.req_base), PERUSE_SEND);
}
/* return mpool resources */
mca_pml_ob1_free_rdma_resources(sendreq);
/* return mpool resources */
mca_pml_ob1_free_rdma_resources(sendreq);
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED &&
sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) {
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
}
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED &&
sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) {
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
}
sendreq->req_send.req_base.req_pml_complete = true;
OPAL_THREAD_LOCK(&ompi_request_lock);
if(false == sendreq->req_send.req_base.req_ompi.req_complete) {
/* Should only be called for long messages (maybe synchronous) */
MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq, true);
} else {
if( MPI_SUCCESS != sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR ) {
ompi_mpi_abort(&ompi_mpi_comm_world.comm, MPI_ERR_REQUEST);
if( !REQUEST_COMPLETE( &((sendreq->req_send).req_base.req_ompi)) ) {
/* Should only be called for long messages (maybe synchronous) */
MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq, true);
} else {
if( MPI_SUCCESS != sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR ) {
ompi_mpi_abort(&ompi_mpi_comm_world.comm, MPI_ERR_REQUEST);
}
}
if(true == sendreq->req_send.req_base.req_free_called) {
MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq);
}
}
sendreq->req_send.req_base.req_pml_complete = true;
if(sendreq->req_send.req_base.req_free_called) {
MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/* returns true if request was completed on PML level */

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -48,6 +48,9 @@ int mca_pml_ob1_start(size_t count, ompi_request_t** requests)
* completes - and create a new request.
*/
#if OPAL_ENABLE_MULTI_THREADS
opal_atomic_rmb();
#endif
reuse_old_request = true;
switch(pml_request->req_ompi.req_state) {
case OMPI_REQUEST_INACTIVE:
@ -57,13 +60,11 @@ int mca_pml_ob1_start(size_t count, ompi_request_t** requests)
case OMPI_REQUEST_ACTIVE: {
ompi_request_t *request;
OPAL_THREAD_LOCK(&ompi_request_lock);
if (pml_request->req_pml_complete == false) {
/* free request after it completes */
pml_request->req_free_called = true;
} else {
/* can reuse the existing request */
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
}
@ -98,7 +99,6 @@ int mca_pml_ob1_start(size_t count, ompi_request_t** requests)
rc = OMPI_ERR_REQUEST;
break;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(OMPI_SUCCESS != rc)
return rc;
pml_request = (mca_pml_base_request_t*)request;

Просмотреть файл

@ -1,5 +1,8 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* Copyright (c) 2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -402,8 +405,8 @@ mca_pml_ucx_blocking_recv_completion(void *request, ucs_status_t status,
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_recv_status(&req->req_status, status, info);
PML_UCX_ASSERT(!req->req_complete);
req->req_complete = true;
PML_UCX_ASSERT( !(REQUEST_COMPLETE(req)));
req->req_complete = REQUEST_COMPLETED;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -427,7 +430,7 @@ int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src
}
ucp_worker_progress(ompi_pml_ucx.ucp_worker);
while (!req->req_complete) {
while ( !REQUEST_COMPLETE(req) ) {
opal_progress();
}
@ -435,7 +438,7 @@ int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src
*mpi_status = req->req_status;
}
req->req_complete = false;
req->req_complete = REQUEST_PENDING;
ucp_request_release(req);
return OMPI_SUCCESS;
}
@ -750,7 +753,7 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests)
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else if (!UCS_PTR_IS_ERR(tmp_req)) {
OPAL_THREAD_LOCK(&ompi_request_lock);
if (tmp_req->req_complete) {
if (REQUEST_COMPLETE(tmp_req)) {
/* tmp_req is already completed */
PML_UCX_VERBOSE(8, "completing persistent request %p", (void*)preq);
mca_pml_ucx_persistent_request_complete(preq, tmp_req);

Просмотреть файл

@ -1,5 +1,8 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* Copyright (c) 2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -40,7 +43,7 @@ void mca_pml_ucx_send_completion(void *request, ucs_status_t status)
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_send_status(&req->req_status, status);
PML_UCX_ASSERT(!req->req_complete);
PML_UCX_ASSERT( !(REQUEST_COMPLETE(req)));
ompi_request_complete(req, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
@ -56,7 +59,7 @@ void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_recv_status(&req->req_status, status, info);
PML_UCX_ASSERT(!req->req_complete);
PML_UCX_ASSERT( !(REQUEST_COMPLETE(req)));
ompi_request_complete(req, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}

Просмотреть файл

@ -1,5 +1,8 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
* Copyright (c) 2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -137,7 +140,7 @@ static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int dst)
static inline void mca_pml_ucx_request_reset(ompi_request_t *req)
{
req->req_complete = false;
req->req_complete = REQUEST_PENDING;
req->req_status._cancelled = false;
}

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -79,7 +79,7 @@ int MPI_Bsend_init(const void *buf, int count, MPI_Datatype type,
ompi_request_t */
(*request)->req_type = OMPI_REQUEST_NOOP;
(*request)->req_status = ompi_request_empty.req_status;
(*request)->req_complete = true;
(*request)->req_complete = REQUEST_COMPLETED;
(*request)->req_state = OMPI_REQUEST_INACTIVE;
(*request)->req_persistent = true;
(*request)->req_free = ompi_request_persistent_proc_null_free;

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -75,7 +75,7 @@ int MPI_Recv_init(void *buf, int count, MPI_Datatype type, int source,
ompi_request_t */
(*request)->req_type = OMPI_REQUEST_NOOP;
(*request)->req_status = ompi_request_empty.req_status;
(*request)->req_complete = true;
(*request)->req_complete = REQUEST_COMPLETED;
(*request)->req_state = OMPI_REQUEST_INACTIVE;
(*request)->req_persistent = true;
(*request)->req_free = ompi_request_persistent_proc_null_free;

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -80,7 +80,7 @@ int MPI_Rsend_init(const void *buf, int count, MPI_Datatype type,
ompi_request_t */
(*request)->req_type = OMPI_REQUEST_NOOP;
(*request)->req_status = ompi_request_empty.req_status;
(*request)->req_complete = true;
(*request)->req_complete = REQUEST_COMPLETED;
(*request)->req_state = OMPI_REQUEST_INACTIVE;
(*request)->req_persistent = true;
(*request)->req_free = ompi_request_persistent_proc_null_free;

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -80,7 +80,7 @@ int MPI_Send_init(const void *buf, int count, MPI_Datatype type,
ompi_request_t */
(*request)->req_type = OMPI_REQUEST_NOOP;
(*request)->req_status = ompi_request_empty.req_status;
(*request)->req_complete = true;
(*request)->req_complete = REQUEST_COMPLETED;
(*request)->req_state = OMPI_REQUEST_INACTIVE;
(*request)->req_persistent = true;
(*request)->req_free = ompi_request_persistent_proc_null_free;

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -80,7 +80,7 @@ int MPI_Ssend_init(const void *buf, int count, MPI_Datatype type,
ompi_request_t */
(*request)->req_type = OMPI_REQUEST_NOOP;
(*request)->req_status = ompi_request_empty.req_status;
(*request)->req_complete = true;
(*request)->req_complete = REQUEST_COMPLETED;
(*request)->req_state = OMPI_REQUEST_INACTIVE;
(*request)->req_persistent = true;
(*request)->req_free = ompi_request_persistent_proc_null_free;

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -51,9 +51,9 @@ static int ompi_grequest_cancel(ompi_request_t* req, int flag)
if (greq->greq_cancel.c_cancel != NULL) {
if (greq->greq_funcs_are_c) {
rc = greq->greq_cancel.c_cancel(greq->greq_state,
greq->greq_base.req_complete);
REQUEST_COMPLETE(&greq->greq_base));
} else {
fflag = (ompi_fortran_logical_t) greq->greq_base.req_complete;
fflag = (ompi_fortran_logical_t) REQUEST_COMPLETE(&greq->greq_base);
greq->greq_cancel.f_cancel((MPI_Aint*)greq->greq_state, &fflag, &ierr);
rc = OMPI_FINT_2_INT(ierr);
}
@ -181,9 +181,7 @@ int ompi_grequest_complete(ompi_request_t *req)
{
int rc;
OPAL_THREAD_LOCK(&ompi_request_lock);
rc = ompi_request_complete(req, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
OBJ_RELEASE(req);
return rc;
}

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -295,7 +295,7 @@ int ompi_request_default_test_some(
num_requests_null_inactive++;
continue;
}
if (true == request->req_complete) {
if( REQUEST_COMPLETE(request) ) {
OMPI_CRCP_REQUEST_COMPLETE(request);
indices[num_requests_done++] = i;
}

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
@ -29,9 +29,6 @@
#include "ompi/mca/crcp/crcp.h"
#include "ompi/mca/pml/base/pml_base_request.h"
#if OPAL_ENABLE_PROGRESS_THREADS
static int ompi_progress_thread_count=0;
#endif
int ompi_request_default_wait(
ompi_request_t ** req_ptr,
@ -83,91 +80,61 @@ int ompi_request_default_wait(
}
int ompi_request_default_wait_any(
size_t count,
ompi_request_t ** requests,
int *index,
ompi_status_public_t * status)
int ompi_request_default_wait_any(size_t count,
ompi_request_t ** requests,
int *index,
ompi_status_public_t * status)
{
#if OPAL_ENABLE_PROGRESS_THREADS
int c;
#endif
size_t i=0, num_requests_null_inactive=0;
size_t i = 0, completed = count, num_requests_null_inactive = 0;
int rc = OMPI_SUCCESS;
int completed = -1;
ompi_request_t **rptr=NULL;
ompi_request_t *request=NULL;
ompi_wait_sync_t sync;
#if OPAL_ENABLE_PROGRESS_THREADS
/* poll for completion */
OPAL_THREAD_ADD32(&ompi_progress_thread_count,1);
for (c = 0; completed < 0 && c < opal_progress_spin_count; c++) {
rptr = requests;
num_requests_null_inactive = 0;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
/*
* Check for null or completed persistent request.
* For MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE
*/
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
num_requests_null_inactive++;
continue;
}
if (true == request->req_complete) {
completed = i;
OPAL_THREAD_ADD32(&ompi_progress_thread_count,-1);
goto finished;
}
WAIT_SYNC_INIT(&sync, 1);
rptr = requests;
num_requests_null_inactive = 0;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
/* Sanity test */
if( NULL == request) {
continue;
}
if( num_requests_null_inactive == count ) {
OPAL_THREAD_ADD32(&ompi_progress_thread_count,-1);
goto finished;
/*
* Check for null or completed persistent request.
* For MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE.
*/
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
num_requests_null_inactive++;
continue;
}
opal_progress();
}
OPAL_THREAD_ADD32(&ompi_progress_thread_count,-1);
#endif
/* give up and sleep until completion */
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request_waiting++;
do {
rptr = requests;
num_requests_null_inactive = 0;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
/* Sanity test */
if( NULL == request) {
continue;
}
/*
* Check for null or completed persistent request.
* For MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE.
*/
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
num_requests_null_inactive++;
continue;
}
if (request->req_complete == true) {
completed = i;
break;
}
}
if(num_requests_null_inactive == count)
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync)) {
assert(REQUEST_COMPLETE(request));
wait_sync_update( &sync, 1, request->req_status.MPI_ERROR);
completed = i;
break;
if (completed < 0) {
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
}
} while (completed < 0);
ompi_request_waiting--;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
SYNC_WAIT(&sync);
/* recheck the complete status and clean up the sync primitives */
rptr = requests;
for(i = 0; i < completed; i++, rptr++) {
request = *rptr;
#if OPAL_ENABLE_PROGRESS_THREADS
finished:
#endif /* OPAL_ENABLE_PROGRESS_THREADS */
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
continue;
}
/* Atomically mark the request as pending. If this succeed then the
* request was not completed, and it is now marked as pending. Otherwise,
* the request has been completed meanwhile, and it has been atomically
* marked as REQUEST_COMPLETE.
*/
OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING);
}
if(num_requests_null_inactive == count) {
*index = MPI_UNDEFINED;
@ -175,7 +142,7 @@ finished:
*status = ompi_status_empty;
}
} else {
assert( true == request->req_complete );
assert( REQUEST_COMPLETE(request) );
/* Per note above, we have to call gen request query_fn even
if STATUS_IGNORE was provided */
if (OMPI_REQUEST_GEN == request->req_type) {
@ -206,13 +173,13 @@ finished:
rptr = requests;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
if( true == request->req_complete) {
if( REQUEST_COMPLETE(request) ) {
OMPI_CRCP_REQUEST_COMPLETE(request);
}
}
}
#endif
WAIT_SYNC_RELEASE(&sync);
return rc;
}
@ -225,102 +192,40 @@ int ompi_request_default_wait_all( size_t count,
ompi_request_t **rptr;
ompi_request_t *request;
int mpi_error = OMPI_SUCCESS;
ompi_wait_sync_t sync;
WAIT_SYNC_INIT(&sync, count);
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr++;
if (request->req_complete == true) {
if (!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync)) {
if( OPAL_UNLIKELY( MPI_SUCCESS != request->req_status.MPI_ERROR ) ) {
failed++;
}
completed++;
}
}
if( 0 != completed ) {
wait_sync_update(&sync, completed, OPAL_SUCCESS);
}
if( failed > 0 ) {
goto finish;
}
/* if all requests have not completed -- defer acquiring lock
* unless required
/*
* acquire lock and test for completion - if all requests are
* not completed pend on condition variable until a request
* completes
*/
if (completed != count) {
/*
* acquire lock and test for completion - if all requests are
* not completed pend on condition variable until a request
* completes
*/
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request_waiting++;
#if OPAL_ENABLE_MULTI_THREADS
/*
* confirm the status of the pending requests. We have to do it before
* taking the condition or otherwise we can miss some requests completion (the
* one that happpens between our initial test and the aquisition of the lock).
*/
rptr = requests;
for( completed = i = 0; i < count; i++ ) {
request = *rptr++;
if (request->req_complete == true) {
if( MPI_SUCCESS != request->req_status.MPI_ERROR ) {
failed++;
}
completed++;
}
}
if( failed > 0 ) {
ompi_request_waiting--;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
goto finish;
}
#endif /* OPAL_ENABLE_MULTI_THREADS */
while( completed != count ) {
/* check number of pending requests */
size_t start = ompi_request_completed;
size_t pending = count - completed;
size_t start_failed = ompi_request_failed;
/*
* wait until at least pending requests complete
*/
while (pending > ompi_request_completed - start) {
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
/*
* Check for failed requests. If one request fails, then
* this operation completes in error marking the remaining
* requests as PENDING.
*/
if( OPAL_UNLIKELY( 0 < (ompi_request_failed - start_failed) ) ) {
failed += (ompi_request_failed - start_failed);
ompi_request_waiting--;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
goto finish;
}
}
/*
* confirm that all pending operations have completed.
*/
rptr = requests;
for( failed = completed = i = 0; i < count; i++ ) {
request = *rptr++;
if (request->req_complete == true) {
if( MPI_SUCCESS != request->req_status.MPI_ERROR ) {
failed++;
}
completed++;
}
}
}
ompi_request_waiting--;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
SYNC_WAIT(&sync);
#if OPAL_ENABLE_FT_CR == 1
if( opal_cr_is_enabled) {
rptr = requests;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
if( true == request->req_complete) {
if( REQUEST_COMPLETE(request) ) {
OMPI_CRCP_REQUEST_COMPLETE(request);
}
}
@ -339,7 +244,7 @@ int ompi_request_default_wait_all( size_t count,
* Since some may still be pending.
*/
if( 0 >= failed ) {
assert( true == request->req_complete );
assert( REQUEST_COMPLETE(request) );
}
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
@ -395,12 +300,12 @@ int ompi_request_default_wait_all( size_t count,
* Since some may still be pending.
*/
if( 0 >= failed ) {
assert( true == request->req_complete );
assert( REQUEST_COMPLETE(request) );
} else {
/* If the request is still pending due to a failed request
* then skip it in this loop.
*/
if( !request->req_complete ) {
if( !REQUEST_COMPLETE(request) ) {
continue;
}
}
@ -436,105 +341,62 @@ int ompi_request_default_wait_all( size_t count,
}
}
}
WAIT_SYNC_RELEASE(&sync);
return mpi_error;
}
int ompi_request_default_wait_some(
size_t count,
ompi_request_t ** requests,
int * outcount,
int * indices,
ompi_status_public_t * statuses)
int ompi_request_default_wait_some(size_t count,
ompi_request_t ** requests,
int * outcount,
int * indices,
ompi_status_public_t * statuses)
{
#if OPAL_ENABLE_PROGRESS_THREADS
int c;
#endif
size_t i, num_requests_null_inactive=0, num_requests_done=0;
int rc = MPI_SUCCESS;
ompi_request_t **rptr=NULL;
ompi_request_t *request=NULL;
ompi_request_t **rptr = NULL;
ompi_request_t *request = NULL;
ompi_wait_sync_t sync;
WAIT_SYNC_INIT(&sync, 1);
*outcount = 0;
for (i = 0; i < count; i++){
indices[i] = 0;
}
#if OPAL_ENABLE_PROGRESS_THREADS
/* poll for completion */
OPAL_THREAD_ADD32(&ompi_progress_thread_count,1);
for (c = 0; c < opal_progress_spin_count; c++) {
rptr = requests;
num_requests_null_inactive = 0;
num_requests_done = 0;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
/*
* Check for null or completed persistent request.
* For MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE
*/
if (request->req_state == OMPI_REQUEST_INACTIVE ) {
num_requests_null_inactive++;
continue;
}
if (true == request->req_complete) {
indices[i] = 1;
num_requests_done++;
}
}
if (num_requests_null_inactive == count ||
num_requests_done > 0) {
OPAL_THREAD_ADD32(&ompi_progress_thread_count,-1);
goto finished;
}
opal_progress();
}
OPAL_THREAD_ADD32(&ompi_progress_thread_count,-1);
#endif
/*
* We only get here when outcount still is 0.
* give up and sleep until completion
*/
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request_waiting++;
do {
rptr = requests;
num_requests_null_inactive = 0;
num_requests_done = 0;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
/*
* Check for null or completed persistent request.
* For MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE.
*/
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
num_requests_null_inactive++;
continue;
}
if (request->req_complete == true) {
indices[i] = 1;
num_requests_done++;
}
rptr = requests;
num_requests_null_inactive = 0;
num_requests_done = 0;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
/*
* Check for null or completed persistent request.
* For MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE.
*/
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
num_requests_null_inactive++;
continue;
}
if (num_requests_null_inactive == count ||
num_requests_done > 0)
break;
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
} while (1);
ompi_request_waiting--;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
#if OPAL_ENABLE_PROGRESS_THREADS
finished:
#endif /* OPAL_ENABLE_PROGRESS_THREADS */
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync) ) {
assert( REQUEST_COMPLETE(request) );
num_requests_done++;
}
}
if( 0 != num_requests_done ) {
/* As we only expect one trigger update the sync with count 1 */
wait_sync_update(&sync, 1, request->req_status.MPI_ERROR);
}
SYNC_WAIT(&sync);
#if OPAL_ENABLE_FT_CR == 1
if( opal_cr_is_enabled) {
rptr = requests;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
if( true == request->req_complete) {
if( REQUEST_COMPLETE(request) ) {
OMPI_CRCP_REQUEST_COMPLETE(request);
}
}
@ -544,20 +406,33 @@ finished:
if(num_requests_null_inactive == count) {
*outcount = MPI_UNDEFINED;
} else {
/*
* Compress the index array.
*/
for (i = 0, num_requests_done = 0; i < count; i++) {
if (0 != indices[i]) {
indices[num_requests_done++] = i;
/* Do the final counting and */
/* Clean up the synchronization primitives */
rptr = requests;
num_requests_done = 0;
for (i = 0; i < count; i++, rptr++) {
request = *rptr;
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
continue;
}
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) {
assert(REQUEST_COMPLETE(request));
indices[num_requests_done] = i;
num_requests_done++;
}
}
WAIT_SYNC_RELEASE(&sync);
*outcount = num_requests_done;
for (i = 0; i < num_requests_done; i++) {
request = requests[indices[i]];
assert( true == request->req_complete );
assert( REQUEST_COMPLETE(request) );
/* Per note above, we have to call gen request query_fn even
if STATUS_IGNORE was provided */
if (OMPI_REQUEST_GEN == request->req_type) {

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2007 High Performance Computing Center Stuttgart,
@ -125,7 +125,7 @@ int ompi_request_init(void)
ompi_request_null.request.req_status._ucount = 0;
ompi_request_null.request.req_status._cancelled = 0;
ompi_request_null.request.req_complete = true;
ompi_request_null.request.req_complete = REQUEST_COMPLETED;
ompi_request_null.request.req_state = OMPI_REQUEST_INACTIVE;
ompi_request_null.request.req_persistent = false;
ompi_request_null.request.req_f_to_c_index =
@ -157,7 +157,7 @@ int ompi_request_init(void)
ompi_request_empty.req_status._ucount = 0;
ompi_request_empty.req_status._cancelled = 0;
ompi_request_empty.req_complete = true;
ompi_request_empty.req_complete = REQUEST_COMPLETED;
ompi_request_empty.req_state = OMPI_REQUEST_ACTIVE;
ompi_request_empty.req_persistent = false;
ompi_request_empty.req_f_to_c_index =

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -35,6 +35,7 @@
#include "opal/class/opal_free_list.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/threads/condition.h"
#include "opal/threads/wait_sync.h"
#include "ompi/constants.h"
BEGIN_C_DECLS
@ -102,7 +103,7 @@ struct ompi_request_t {
opal_free_list_item_t super; /**< Base type */
ompi_request_type_t req_type; /**< Enum indicating the type of the request */
ompi_status_public_t req_status; /**< Completion status */
volatile bool req_complete; /**< Flag indicating wether request has completed */
volatile void *req_complete; /**< Flag indicating wether request has completed */
volatile ompi_request_state_t req_state; /**< enum indicate state of the request */
bool req_persistent; /**< flag indicating if the this is a persistent request */
int req_f_to_c_index; /**< Index in Fortran <-> C translation array */
@ -118,6 +119,7 @@ struct ompi_request_t {
*/
typedef struct ompi_request_t ompi_request_t;
/**
* Padded struct to maintain back compatibiltiy.
* See ompi/communicator/communicator.h comments with struct ompi_communicator_t
@ -140,11 +142,15 @@ typedef struct ompi_predefined_request_t ompi_predefined_request_t;
*/
#define OMPI_REQUEST_INIT(request, persistent) \
do { \
(request)->req_complete = false; \
(request)->req_complete = REQUEST_PENDING; \
(request)->req_state = OMPI_REQUEST_INACTIVE; \
(request)->req_persistent = (persistent); \
(request)->req_complete_cb = NULL; \
(request)->req_complete_cb_data = NULL; \
} while (0);
#define REQUEST_COMPLETE(req) (REQUEST_COMPLETED == (req)->req_complete)
/**
* Finalize a request. This is a macro to avoid function call
* overhead, since this is typically invoked in the critical
@ -365,28 +371,31 @@ static inline int ompi_request_free(ompi_request_t** request)
#define ompi_request_wait_all (ompi_request_functions.req_wait_all)
#define ompi_request_wait_some (ompi_request_functions.req_wait_some)
/**
* Wait a particular request for completion
*/
#if OPAL_ENABLE_MULTI_THREADS
static inline void ompi_request_wait_completion(ompi_request_t *req)
{
if(false == req->req_complete) {
#if OPAL_ENABLE_PROGRESS_THREADS
if(opal_progress_spin(&req->req_complete)) {
return;
}
#endif
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request_waiting++;
while(false == req->req_complete) {
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
}
ompi_request_waiting--;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
ompi_wait_sync_t sync;
WAIT_SYNC_INIT(&sync, 1);
if(OPAL_ATOMIC_CMPSET_PTR(&req->req_complete, REQUEST_PENDING, &sync)) {
SYNC_WAIT(&sync);
}
assert(REQUEST_COMPLETE(req));
WAIT_SYNC_RELEASE(&sync);
}
#else
static inline void ompi_request_wait_completion(ompi_request_t *req)
{
while(!REQUEST_COMPLETE(req)) {
opal_progress();
}
}
#endif
/**
* Signal or mark a request as complete. If with_signal is true this will
* wake any thread pending on the request and ompi_request_lock should be
@ -398,22 +407,23 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
*/
static inline int ompi_request_complete(ompi_request_t* request, bool with_signal)
{
ompi_request_complete_fn_t tmp = request->req_complete_cb;
if( NULL != tmp ) {
if( NULL != request->req_complete_cb) {
request->req_complete_cb( request );
request->req_complete_cb = NULL;
tmp( request );
}
ompi_request_completed++;
request->req_complete = true;
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) {
ompi_wait_sync_t *tmp_sync = OPAL_ATOMIC_SWP_PTR(&request->req_complete,
REQUEST_COMPLETED);
/* In the case where another thread concurrently changed the request to REQUEST_PENDING */
if( REQUEST_PENDING != tmp_sync )
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
}
if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) {
ompi_request_failed++;
}
if(with_signal && ompi_request_waiting) {
/* Broadcast the condition, otherwise if there is already a thread
* waiting on another request it can use all signals.
*/
opal_condition_broadcast(&ompi_request_cond);
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -3,7 +3,7 @@
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# Copyright (c) 2004-2016 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -28,9 +28,11 @@ headers += \
threads/mutex.h \
threads/mutex_unix.h \
threads/threads.h \
threads/tsd.h
threads/tsd.h \
threads/wait_sync.h
lib@OPAL_LIB_PREFIX@open_pal_la_SOURCES += \
threads/condition.c \
threads/mutex.c \
threads/thread.c
threads/thread.c \
threads/wait_sync.c

Просмотреть файл

@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -318,6 +318,11 @@ OPAL_THREAD_ADD_SIZE_T(volatile size_t *addr, int delta)
#define OPAL_ATOMIC_CMPSET(x, y, z) \
(OPAL_UNLIKELY(opal_using_threads()) ? opal_atomic_cmpset(x, y, z) : OPAL_CMPSET(x, y, z))
#endif
#if OPAL_HAVE_ATOMIC_CMPSET_32 || OPAL_HAVE_ATOMIC_CMPSET_64
#define OPAL_ATOMIC_CMPSET_PTR(x, y, z) \
(opal_using_threads() ? opal_atomic_cmpset_ptr(x, y, z) : OPAL_CMPSET(x, y, z))
#endif
END_C_DECLS

95
opal/threads/wait_sync.c Обычный файл
Просмотреть файл

@ -0,0 +1,95 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "wait_sync.h"
static opal_mutex_t wait_sync_lock = OPAL_MUTEX_STATIC_INIT;
static ompi_wait_sync_t* wait_sync_list = NULL;
#define WAIT_SYNC_PASS_OWNERSHIP(who) \
do { \
pthread_mutex_lock( &(who)->lock); \
pthread_cond_signal( &(who)->condition ); \
pthread_mutex_unlock( &(who)->lock); \
} while(0)
int sync_wait_st(ompi_wait_sync_t *sync)
{
while(sync->count > 0) {
opal_progress();
}
return OPAL_SUCCESS;
}
int sync_wait_mt(ompi_wait_sync_t *sync)
{
if(sync->count <= 0)
return OPAL_SUCCESS;
/* lock so nobody can signal us during the list updating */
pthread_mutex_lock(&sync->lock);
/* Insert sync to the list */
OPAL_THREAD_LOCK(&wait_sync_lock);
if( NULL == wait_sync_list ) {
sync->next = sync->prev = sync;
wait_sync_list = sync;
} else {
sync->prev = wait_sync_list->prev;
sync->prev->next = sync;
sync->next = wait_sync_list;
wait_sync_list->prev = sync;
}
OPAL_THREAD_UNLOCK(&wait_sync_lock);
/**
* If we are not responsible for progresing, let's go silent until something worth noticing happen:
* - this thread has been promoted to take care of the progress
* - our sync has been triggered.
*/
if( sync != wait_sync_list ) {
pthread_cond_wait(&sync->condition, &sync->lock);
/**
* At this point either the sync was completed in which case we should remove it from the wait
* list, or/and I was promoted as the progress manager.
*/
if( sync->count <= 0 ) { /* Completed? */
pthread_mutex_unlock(&sync->lock);
goto i_am_done;
}
/* promoted ! */
assert(sync == wait_sync_list);
}
pthread_mutex_unlock(&sync->lock);
while(sync->count > 0) { /* progress till completion */
opal_progress(); /* don't progress with the sync lock locked or you'll deadlock */
}
assert(sync == wait_sync_list);
i_am_done:
/* My sync is now complete. Trim the list: remove self, wake next */
OPAL_THREAD_LOCK(&wait_sync_lock);
sync->prev->next = sync->next;
sync->next->prev = sync->prev;
/* In case I am the progress manager, pass the duties on */
if( sync == wait_sync_list ) {
wait_sync_list = (sync == sync->next) ? NULL : sync->next;
if( NULL != wait_sync_list )
WAIT_SYNC_PASS_OWNERSHIP(wait_sync_list);
}
OPAL_THREAD_UNLOCK(&wait_sync_lock);
return OPAL_SUCCESS;
}

89
opal/threads/wait_sync.h Обычный файл
Просмотреть файл

@ -0,0 +1,89 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "opal/sys/atomic.h"
#include "opal/threads/condition.h"
#include <pthread.h>
BEGIN_C_DECLS
typedef struct ompi_wait_sync_t {
int32_t count;
int32_t status;
pthread_cond_t condition;
pthread_mutex_t lock;
struct ompi_wait_sync_t *next;
struct ompi_wait_sync_t *prev;
} ompi_wait_sync_t;
#define REQUEST_PENDING (void*)0L
#define REQUEST_COMPLETED (void*)1L
#if OPAL_ENABLE_MULTI_THREADS
#define OPAL_ATOMIC_ADD_32(a,b) opal_atomic_add_32(a,b)
#define OPAL_ATOMIC_SWP_PTR(a,b) opal_atomic_swap_ptr(a,b)
#define SYNC_WAIT(sync) sync_wait_mt(sync)
#define PTHREAD_COND_INIT(a,b) pthread_cond_init(a,b)
#define PTHREAD_MUTEX_INIT(a,b) pthread_mutex_init(a,b)
#define WAIT_SYNC_RELEASE(sync) \
do { \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
} while(0)
#define WAIT_SYNC_SIGNAL(sync) \
do { \
pthread_mutex_lock(&(sync->lock)); \
pthread_cond_signal(&sync->condition); \
pthread_mutex_unlock(&(sync->lock)); \
} while(0)
#else
#define OPAL_ATOMIC_ADD_32(a,b) (*(a) += (b))
#define OPAL_ATOMIC_SWP_PTR(a,b) *(a) = (b)
#define PTHREAD_COND_INIT(a,b)
#define PTHREAD_MUTEX_INIT(a,b)
#define SYNC_WAIT(sync) sync_wait_st(sync)
#define WAIT_SYNC_RELEASE(sync)
#define WAIT_SYNC_SIGNAL(sync)
#endif /* OPAL_ENABLE_MULTI_THREADS */
OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync);
OPAL_DECLSPEC int sync_wait_st(ompi_wait_sync_t *sync);
#define WAIT_SYNC_INIT(sync,c) \
do { \
(sync)->count = c; \
(sync)->next = NULL; \
(sync)->prev = NULL; \
(sync)->status = 0; \
PTHREAD_COND_INIT(&(sync)->condition, NULL); \
PTHREAD_MUTEX_INIT(&(sync)->lock, NULL); \
} while(0)
static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int req_status)
{
if( OPAL_LIKELY(OPAL_SUCCESS == req_status) ) {
if( 0 == (OPAL_ATOMIC_ADD_32(&sync->count, -updates)) ) {
WAIT_SYNC_SIGNAL(sync);
}
} else {
OPAL_ATOMIC_CMPSET_32(&(sync->count), 0, 0);
sync->status = -1;
WAIT_SYNC_SIGNAL(sync);
}
}
END_C_DECLS