1
1

small re-org of request completion - moved test/wait out of pml

This commit was SVN r3066.
Этот коммит содержится в:
Tim Woodall 2004-10-12 15:50:01 +00:00
родитель aa07811da5
Коммит 0df8ca90ea
64 изменённых файлов: 944 добавлений и 748 удалений

Просмотреть файл

@ -296,7 +296,7 @@ enum {
*/
#define MPI_GROUP_NULL (&ompi_mpi_group_null)
#define MPI_COMM_NULL (&ompi_mpi_comm_null)
#define MPI_REQUEST_NULL ((MPI_Request) 0)
#define MPI_REQUEST_NULL (&ompi_request_null)
#define MPI_OP_NULL (&ompi_mpi_op_null)
#define MPI_ERRHANDLER_NULL (&ompi_mpi_errhandler_null)
#define MPI_INFO_NULL (&ompi_mpi_info_null)
@ -391,6 +391,8 @@ extern struct ompi_communicator_t ompi_mpi_comm_null;
extern struct ompi_group_t ompi_mpi_group_empty;
extern struct ompi_group_t ompi_mpi_group_null;
extern struct ompi_request_t ompi_request_null;
extern struct ompi_op_t ompi_mpi_op_null;
extern struct ompi_op_t ompi_mpi_op_max, ompi_mpi_op_min;
extern struct ompi_op_t ompi_mpi_op_sum, ompi_mpi_op_prod;

Просмотреть файл

@ -627,13 +627,13 @@ static int ompi_comm_allgather_emulate_intra( void *inbuf, int incount,
}
if ( 0 == rank ) {
rc = mca_pml.pml_wait_all (rsize, req, MPI_STATUSES_IGNORE);
rc = ompi_request_wait_all (rsize, req, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
}
rc = mca_pml.pml_wait_all (1, &sendreq, MPI_STATUS_IGNORE);
rc = ompi_request_wait_all (1, &sendreq, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
@ -656,7 +656,7 @@ static int ompi_comm_allgather_emulate_intra( void *inbuf, int incount,
}
}
rc = mca_pml.pml_wait_all (1, &sendreq, MPI_STATUS_IGNORE );
rc = ompi_request_wait_all (1, &sendreq, MPI_STATUS_IGNORE );
exit:
if ( NULL != req ) {
@ -737,7 +737,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
if ( OMPI_SUCCESS != rc ) {
goto err_exit;
}
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE );
rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE );
if ( OMPI_SUCCESS != rc ) {
goto err_exit;
}
@ -770,7 +770,7 @@ ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
if ( OMPI_SUCCESS != rc ) {
goto err_exit;
}
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE );
rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE );
if ( OMPI_SUCCESS != rc ) {
goto err_exit;
}

Просмотреть файл

@ -432,7 +432,7 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE );
rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
@ -537,7 +537,7 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE);
rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}

Просмотреть файл

@ -101,7 +101,7 @@ int ompi_ddt_sndrcv(void *sbuf, int scount, MPI_Datatype sdtype, void *rbuf,
if (MPI_SUCCESS != err) {
return err;
}
err = mca_pml.pml_wait(1, &req, NULL, MPI_STATUS_IGNORE);
err = ompi_request_wait(1, &req, NULL, MPI_STATUS_IGNORE);
}
return err;

Просмотреть файл

@ -231,7 +231,7 @@ static inline void mca_coll_basic_free_reqs(ompi_request_t **reqs, int count)
{
int i;
for (i = 0; i < count; ++i)
mca_pml.pml_free(&reqs[i]);
ompi_request_free(&reqs[i]);
}

Просмотреть файл

@ -125,7 +125,7 @@ int mca_coll_basic_allgather_inter(void *sbuf, int scount,
}
}
err = mca_pml.pml_wait_all (rsize+1, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all (rsize+1, reqs, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != err ) {
return err;
}
@ -151,7 +151,7 @@ int mca_coll_basic_allgather_inter(void *sbuf, int scount,
goto exit;
}
err = mca_pml.pml_wait_all(1, &req, MPI_STATUS_IGNORE);
err = ompi_request_wait_all(1, &req, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err ) {
goto exit;
}
@ -184,7 +184,7 @@ int mca_coll_basic_allgather_inter(void *sbuf, int scount,
}
}
err = mca_pml.pml_wait_all (rsize-1, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all (rsize-1, reqs, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}

Просмотреть файл

@ -101,7 +101,7 @@ int mca_coll_basic_allreduce_inter(void *sbuf, void *rbuf, int count,
goto exit;
}
err = mca_pml.pml_wait_all(2, req, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all(2, req, MPI_STATUSES_IGNORE);
if (OMPI_SUCCESS != err ) {
goto exit;
}
@ -168,7 +168,7 @@ int mca_coll_basic_allreduce_inter(void *sbuf, void *rbuf, int count,
}
}
err = mca_pml.pml_wait_all (rsize+1, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all (rsize+1, reqs, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}

Просмотреть файл

@ -122,7 +122,7 @@ int mca_coll_basic_alltoall_intra(void *sbuf, int scount,
So free them anyway -- even if there was an error, and return
the error after we free everything. */
err = mca_pml.pml_wait_all(nreqs, req, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all(nreqs, req, MPI_STATUSES_IGNORE);
/* Free the reqs */
@ -212,7 +212,7 @@ int mca_coll_basic_alltoall_inter(void *sbuf, int scount,
i.e., by the end of this call, all the requests are free-able.
So free them anyway -- even if there was an error, and return
the error after we free everything. */
err = mca_pml.pml_wait_all(nreqs, req, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all(nreqs, req, MPI_STATUSES_IGNORE);
/* All done */
return err;

Просмотреть файл

@ -118,7 +118,7 @@ mca_coll_basic_alltoallv_intra(void *sbuf, int *scounts, int *sdisps,
So free them anyway -- even if there was an error, and return the
error after we free everything. */
err = mca_pml.pml_wait_all(nreqs, comm->c_coll_basic_data->mccb_reqs,
err = ompi_request_wait_all(nreqs, comm->c_coll_basic_data->mccb_reqs,
MPI_STATUSES_IGNORE);
/* Free the requests. */
@ -200,7 +200,7 @@ mca_coll_basic_alltoallv_inter(void *sbuf, int *scounts, int *sdisps,
}
}
err = mca_pml.pml_wait_all(nreqs, preq, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all(nreqs, preq, MPI_STATUSES_IGNORE);
/* All done */
return err;

Просмотреть файл

@ -109,7 +109,7 @@ int mca_coll_basic_alltoallw_intra(void *sbuf, int *scounts, int *sdisps,
So free them anyway -- even if there was an error, and return the
error after we free everything. */
err = mca_pml.pml_wait_all(nreqs, comm->c_coll_basic_data->mccb_reqs,
err = ompi_request_wait_all(nreqs, comm->c_coll_basic_data->mccb_reqs,
MPI_STATUSES_IGNORE);
/* Free the requests. */
@ -185,7 +185,7 @@ int mca_coll_basic_alltoallw_inter(void *sbuf, int *scounts, int *sdisps,
i.e., by the end of this call, all the requests are free-able.
So free them anyway -- even if there was an error, and return the
error after we free everything. */
err = mca_pml.pml_wait_all(nreqs, comm->c_coll_basic_data->mccb_reqs,
err = ompi_request_wait_all(nreqs, comm->c_coll_basic_data->mccb_reqs,
MPI_STATUSES_IGNORE);
/* Free the requests. */

Просмотреть файл

@ -73,7 +73,7 @@ int mca_coll_basic_bcast_lin_intra(void *buff, int count,
So free them anyway -- even if there was an error, and return
the error after we free everything. */
err = mca_pml.pml_wait_all(i, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all(i, reqs, MPI_STATUSES_IGNORE);
/* Free the reqs */
@ -167,7 +167,7 @@ int mca_coll_basic_bcast_log_intra(void *buff, int count,
free-able. So free them anyway -- even if there was an
error, and return the error after we free everything. */
err = mca_pml.pml_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
/* Free the reqs */
@ -221,7 +221,7 @@ int mca_coll_basic_bcast_lin_inter(void *buff, int count,
return err;
}
}
err = mca_pml.pml_wait_all(rsize, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all(rsize, reqs, MPI_STATUSES_IGNORE);
}

Просмотреть файл

@ -101,7 +101,7 @@ int mca_coll_basic_exscan_intra(void *sbuf, void *rbuf, int count,
/* Finally, wait for the receive to complete (so that we can do
the reduction). */
err = mca_pml.pml_wait(1, &req, &index, MPI_STATUS_IGNORE);
err = ompi_request_wait(1, &req, &index, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
goto error;
}
@ -114,7 +114,7 @@ int mca_coll_basic_exscan_intra(void *sbuf, void *rbuf, int count,
/* If we're not commutative, we have to wait for the receive to
complete and then copy it into the reduce buffer */
err = mca_pml.pml_wait(1, &req, &index, MPI_STATUS_IGNORE);
err = ompi_request_wait(1, &req, &index, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
goto error;
}
@ -142,8 +142,8 @@ int mca_coll_basic_exscan_intra(void *sbuf, void *rbuf, int count,
error:
free(free_buffer);
if (MPI_REQUEST_NULL != req) {
mca_pml.pml_cancel(req);
mca_pml.pml_wait(1, &req, &index, MPI_STATUS_IGNORE);
ompi_request_cancel(req);
ompi_request_wait(1, &req, &index, MPI_STATUS_IGNORE);
}
/* All done */

Просмотреть файл

@ -148,7 +148,7 @@ int mca_coll_basic_gatherv_inter(void *sbuf, int scount,
}
}
err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all (size, reqs, MPI_STATUSES_IGNORE);
}
/* All done */

Просмотреть файл

@ -171,7 +171,7 @@ int mca_coll_basic_reduce_scatter_inter(void *sbuf, void *rbuf, int *rcounts,
goto exit;
}
err = mca_pml.pml_wait_all (1, &req, MPI_STATUS_IGNORE);
err = ompi_request_wait_all (1, &req, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != err ) {
goto exit;
}
@ -226,7 +226,7 @@ int mca_coll_basic_reduce_scatter_inter(void *sbuf, void *rbuf, int *rcounts,
goto exit;
}
err = mca_pml.pml_wait_all (1, &req, MPI_STATUS_IGNORE);
err = ompi_request_wait_all (1, &req, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
@ -253,12 +253,12 @@ int mca_coll_basic_reduce_scatter_inter(void *sbuf, void *rbuf, int *rcounts,
tcount += rcounts[i];
}
err = mca_pml.pml_wait_all (rsize, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all (rsize, reqs, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}
err = mca_pml.pml_wait_all (1, &req, MPI_STATUS_IGNORE);
err = ompi_request_wait_all (1, &req, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != err ) {
goto exit;
}

Просмотреть файл

@ -135,7 +135,7 @@ int mca_coll_basic_scatter_inter(void *sbuf, int scount,
}
}
err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all (size, reqs, MPI_STATUSES_IGNORE);
}
return err;

Просмотреть файл

@ -144,7 +144,7 @@ int mca_coll_basic_scatterv_inter(void *sbuf, int *scounts,
}
}
err = mca_pml.pml_wait_all (size, reqs, MPI_STATUSES_IGNORE);
err = ompi_request_wait_all (size, reqs, MPI_STATUSES_IGNORE);
}
/* All done */

Просмотреть файл

@ -236,7 +236,6 @@ int mca_oob_tcp_recv_cancel(
ompi_process_name_t* name,
int tag)
{
extern ompi_mutex_t ompi_event_lock;
int matched = 0;
ompi_list_item_t *item, *next;

Просмотреть файл

@ -227,7 +227,7 @@ int mca_pml_base_bsend_request_init(ompi_request_t* request, bool persistent)
/* set flag indicating mpi layer is done */
sendreq->req_base.req_persistent = persistent;
sendreq->req_base.req_mpi_done = true;
sendreq->req_base.req_ompi.req_complete = true;
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -31,30 +31,22 @@ static int mca_pml_base_progress(void)
*/
int mca_pml_base_output = -1;
mca_pml_base_module_t mca_pml = {
NULL,
NULL,
NULL,
NULL,
mca_pml_base_progress,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL
NULL, /* pml_add_procs */
NULL, /* pml_del_procs */
NULL, /* pml_add_ptls */
NULL, /* pml_control */
mca_pml_base_progress, /* pml_progress */
NULL, /* pml_add_comm */
NULL, /* pml_del_comm */
NULL, /* pml_irecv_init */
NULL, /* pml_irecv */
NULL, /* pml_recv */
NULL, /* pml_isend_init */
NULL, /* pml_isend */
NULL, /* pml_send */
NULL, /* pml_iprobe */
NULL, /* pml_probe */
NULL /* pml_start */
};
ompi_list_t mca_pml_base_components_available;

Просмотреть файл

@ -58,8 +58,7 @@ typedef struct mca_pml_base_recv_request_t mca_pml_base_recv_request_t;
(request)->req_base.req_comm = comm; \
(request)->req_base.req_proc = NULL; \
(request)->req_base.req_persistent = persistent; \
(request)->req_base.req_mpi_done = false; \
(request)->req_base.req_pml_done = false; \
(request)->req_base.req_pml_complete = false; \
(request)->req_base.req_free_called = false; \
\
/* increment reference count on communicator */ \

Просмотреть файл

@ -41,10 +41,8 @@ struct mca_pml_base_request_t {
mca_ptl_sequence_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */
ompi_datatype_t *req_datatype; /**< pointer to data type */
mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */
ompi_status_public_t req_status; /**< completion status */
bool req_persistent; /**< flag indicating if the this is a persistent request */
volatile bool req_mpi_done; /**< flag indicating if MPI is done with this request */
volatile bool req_pml_done; /**< flag indicating if the pt-2-pt layer is done with this request */
volatile bool req_pml_complete; /**< flag indicating if the pt-2-pt layer is done with this request */
volatile bool req_free_called; /**< flag indicating if the user has freed this request */
};
typedef struct mca_pml_base_request_t mca_pml_base_request_t;

Просмотреть файл

@ -48,58 +48,58 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
* @param mode (IN) Send mode (STANDARD,BUFFERED,SYNCHRONOUS,READY)
* @param persistent (IN) Is request persistent.
*/
#define MCA_PML_BASE_SEND_REQUEST_INIT( request, \
addr, \
count, \
datatype, \
peer, \
tag, \
comm, \
mode, \
persistent) \
{ \
/* increment reference count on communicator */ \
OBJ_RETAIN(comm); \
\
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \
request->req_offset = 0; \
request->req_bytes_sent = 0; \
request->req_send_mode = mode; \
request->req_peer_match.lval = 0; \
request->req_peer_addr.lval = 0; \
request->req_peer_size = 0; \
request->req_base.req_addr = addr; \
request->req_base.req_count = count; \
request->req_base.req_datatype = datatype; \
request->req_base.req_peer = peer; \
request->req_base.req_tag = tag; \
request->req_base.req_comm = comm; \
request->req_base.req_proc = ompi_comm_peer_lookup(comm,peer); \
request->req_base.req_persistent = persistent; \
request->req_base.req_mpi_done = false; \
request->req_base.req_pml_done = false; \
request->req_base.req_free_called = false; \
\
/* initialize datatype convertor for this request */ \
if(count > 0) { \
#define MCA_PML_BASE_SEND_REQUEST_INIT( request, \
addr, \
count, \
datatype, \
peer, \
tag, \
comm, \
mode, \
persistent) \
{ \
/* increment reference count on communicator */ \
OBJ_RETAIN(comm); \
\
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \
request->req_offset = 0; \
request->req_bytes_sent = 0; \
request->req_send_mode = mode; \
request->req_peer_match.lval = 0; \
request->req_peer_addr.lval = 0; \
request->req_peer_size = 0; \
request->req_base.req_addr = addr; \
request->req_base.req_count = count; \
request->req_base.req_datatype = datatype; \
request->req_base.req_peer = peer; \
request->req_base.req_tag = tag; \
request->req_base.req_comm = comm; \
request->req_base.req_proc = ompi_comm_peer_lookup(comm,peer); \
request->req_base.req_persistent = persistent; \
request->req_base.req_pml_complete = false; \
request->req_base.req_free_called = false; \
\
/* initialize datatype convertor for this request */ \
if(count > 0) { \
ompi_convertor_copy( request->req_base.req_proc->proc_convertor, \
&request->req_convertor); \
/* We will create a convertor specialized for send */ \
/* just to be able to get the packed size. This size */ \
/* depend on the conversion used sender side or receiver */ \
/* size. BEWARE this convertor is not suitable for the */ \
/* sending operation !! */ \
ompi_convertor_init_for_send( &request->req_convertor, \
0, \
request->req_base.req_datatype, \
request->req_base.req_count, \
request->req_base.req_addr, \
0, NULL ); \
ompi_convertor_get_packed_size( &request->req_convertor, \
&(request->req_bytes_packed) ); \
} else { \
request->req_bytes_packed = 0; \
} \
&request->req_convertor); \
/* We will create a convertor specialized for send */ \
/* just to be able to get the packed size. This size */ \
/* depend on the conversion used sender side or receiver */ \
/* size. BEWARE this convertor is not suitable for the */ \
/* sending operation !! */ \
ompi_convertor_init_for_send( &request->req_convertor, \
0, \
request->req_base.req_datatype, \
request->req_base.req_count, \
request->req_base.req_addr, \
0, NULL ); \
ompi_convertor_get_packed_size( &request->req_convertor, \
&(request->req_bytes_packed) ); \
} else { \
request->req_bytes_packed = 0; \
} \
}
@ -109,6 +109,7 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
* @param request (IN) Send request.
* return TRUE if an ack/match has been received from peer.
*/
static inline bool mca_pml_base_send_request_matched(
mca_pml_base_send_request_t* request)
{

Просмотреть файл

@ -74,11 +74,12 @@ typedef enum {
MCA_PML_BASE_SEND_READY
} mca_pml_base_send_mode_t;
/* renamed MPI constants */
#define OMPI_ANY_TAG MPI_ANY_TAG
#define OMPI_ANY_SOURCE MPI_ANY_SOURCE
#define OMPI_PROC_NULL MPI_PROC_NULL
/**
* MCA->PML Called by MCA framework to initialize the component.
*
@ -359,84 +360,6 @@ typedef int (*mca_pml_base_module_start_fn_t)(
ompi_request_t** requests
);
/**
* Non-blocking test for request completion.
*
* @param count (IN) Number of requests
* @param request (IN) Array of requests
* @param index (OUT) Index of first completed request.
* @param complete (OUT) Flag indicating if index is valid (a request completed).
* @param status (OUT) Status of completed request.
* @return OMPI_SUCCESS or failure status.
*
* Note that upon completion, the request is freed, and the
* request handle at index set to NULL.
*/
typedef int (*mca_pml_base_module_test_fn_t)(
size_t count,
ompi_request_t** requests,
int *index,
int *completed,
ompi_status_public_t* status
);
/**
* Non-blocking test for request completion.
*
* @param count (IN) Number of requests
* @param requests (IN) Array of requests
* @param completed (OUT) Flag indicating wether all requests completed.
* @param statuses (OUT) Array of completion statuses.
* @return OMPI_SUCCESS or failure status.
*
* This routine returns completed==true if all requests have completed.
* The statuses parameter is only updated if all requests completed. Likewise,
* the requests array is not modified (no requests freed), unless all requests
* have completed.
*/
typedef int (*mca_pml_base_module_test_all_fn_t)(
size_t count,
ompi_request_t** requests,
int *completed,
ompi_status_public_t* statuses
);
/**
* Wait (blocking-mode) for one of N requests to complete.
*
* @param count (IN) Number of requests
* @param requests (IN) Array of requests
* @param index (OUT) Index into request array of completed request.
* @param status (OUT) Status of completed request.
* @return OMPI_SUCCESS or failure status.
*
*/
typedef int (*mca_pml_base_module_wait_fn_t)(
size_t count,
ompi_request_t** request,
int *index,
ompi_status_public_t* status
);
/**
* Wait (blocking-mode) for all of N requests to complete.
*
* @param count (IN) Number of requests
* @param requests (IN) Array of requests
* @param statuses (OUT) Array of completion statuses.
* @return OMPI_SUCCESS or failure status.
*
*/
typedef int (*mca_pml_base_module_wait_all_fn_t)(
size_t count,
ompi_request_t** request,
ompi_status_public_t *status
);
/**
* Probe to poll for pending recv.
*
@ -543,17 +466,9 @@ struct mca_pml_base_module_1_0_0_t {
mca_pml_base_module_isend_init_fn_t pml_isend_init;
mca_pml_base_module_isend_fn_t pml_isend;
mca_pml_base_module_send_fn_t pml_send;
mca_pml_base_module_start_fn_t pml_start;
mca_pml_base_module_test_fn_t pml_test;
mca_pml_base_module_test_all_fn_t pml_test_all;
mca_pml_base_module_wait_fn_t pml_wait;
mca_pml_base_module_wait_all_fn_t pml_wait_all;
mca_pml_base_module_iprobe_fn_t pml_iprobe;
mca_pml_base_module_probe_fn_t pml_probe;
mca_pml_base_module_cancel_fn_t pml_cancel;
mca_pml_base_module_cancelled_fn_t pml_cancelled;
mca_pml_base_module_free_fn_t pml_free;
mca_pml_base_module_null_fn_t pml_null;
mca_pml_base_module_start_fn_t pml_start;
};
typedef struct mca_pml_base_module_1_0_0_t mca_pml_base_module_1_0_0_t;
typedef mca_pml_base_module_1_0_0_t mca_pml_base_module_t;

Просмотреть файл

@ -10,7 +10,6 @@ libmca_pml_teg_la_SOURCES = \
pml_teg.h \
pml_teg_cancel.c \
pml_teg_component.c \
pml_teg_free.c \
pml_teg_iprobe.c \
pml_teg_irecv.c \
pml_teg_isend.c \
@ -26,7 +25,5 @@ libmca_pml_teg_la_SOURCES = \
pml_teg_sendreq.c \
pml_teg_sendreq.h \
pml_teg_start.c \
pml_teg_test.c \
pml_teg_wait.c \
pml_ptl_array.c \
pml_ptl_array.h

Просмотреть файл

@ -36,17 +36,9 @@ mca_pml_teg_t mca_pml_teg = {
mca_pml_teg_isend_init,
mca_pml_teg_isend,
mca_pml_teg_send,
mca_pml_teg_start,
mca_pml_teg_test,
mca_pml_teg_test_all,
mca_pml_teg_wait,
mca_pml_teg_wait_all,
mca_pml_teg_iprobe,
mca_pml_teg_probe,
mca_pml_teg_cancel,
mca_pml_teg_cancelled,
mca_pml_teg_free,
mca_pml_teg_null
mca_pml_teg_start
}
};
@ -136,8 +128,8 @@ int mca_pml_teg_add_ptls(ompi_list_t *ptls)
/* setup send fragments based on largest required send request */
ompi_free_list_init(
&mca_pml_teg.teg_send_requests,
sizeof(mca_pml_base_send_request_t) + cache_bytes,
OBJ_CLASS(mca_pml_base_send_request_t),
sizeof(mca_pml_teg_send_request_t) + cache_bytes,
OBJ_CLASS(mca_pml_teg_send_request_t),
mca_pml_teg.teg_free_list_num,
mca_pml_teg.teg_free_list_max,
mca_pml_teg.teg_free_list_inc,
@ -374,10 +366,3 @@ int mca_pml_teg_component_fini(void)
return OMPI_SUCCESS;
}
int mca_pml_teg_null(ompi_request_t** request)
{
*request = (ompi_request_t*)&mca_pml_teg.teg_request_null;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -52,15 +52,7 @@ struct mca_pml_teg_t {
long teg_sends;
long teg_recvs;
long teg_waits;
long teg_condition_waits;
long teg_condition_broadcasts;
#endif
/* request completion */
ompi_mutex_t teg_request_lock;
ompi_condition_t teg_request_cond;
volatile int teg_request_waiting;
mca_pml_base_request_t teg_request_null;
};
typedef struct mca_pml_teg_t mca_pml_teg_t;
@ -215,52 +207,15 @@ extern int mca_pml_teg_start(
ompi_request_t** requests
);
extern int mca_pml_teg_test(
size_t count,
ompi_request_t** request,
int *index,
int *completed,
ompi_status_public_t* status
);
extern int mca_pml_teg_test_all(
size_t count,
ompi_request_t** request,
int *completed,
ompi_status_public_t* status
);
extern int mca_pml_teg_wait(
size_t count,
ompi_request_t** request,
int *index,
ompi_status_public_t* status
);
extern int mca_pml_teg_wait_all(
size_t count,
ompi_request_t** request,
ompi_status_public_t* status
);
extern int mca_pml_teg_null(
ompi_request_t** request
);
extern int mca_pml_teg_free(
ompi_request_t** request
);
#define MCA_PML_TEG_FINI(request) \
{ \
mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)(request); \
if(pml_request->req_persistent) { \
if(pml_request->req_free_called) { \
MCA_PML_TEG_FREE(request); \
} else { \
} else { \
pml_request->req_ompi.req_state = OMPI_REQUEST_INACTIVE; \
} \
} \
} else { \
MCA_PML_TEG_FREE(request); \
} \
@ -271,7 +226,7 @@ extern int mca_pml_teg_free(
{ \
mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)(request); \
pml_request->req_free_called = true; \
if(pml_request->req_pml_done == true) \
if(pml_request->req_pml_complete == true) \
{ \
OMPI_REQUEST_FINI(*(request)); \
switch(pml_request->req_type) { \

Просмотреть файл

@ -10,10 +10,10 @@
#include "mca/ptl/ptl.h"
#include "mca/base/mca_base_param.h"
#include "mca/pml/base/pml_base_bsend.h"
#include "mca/pml/base/pml_base_sendreq.h"
#include "mca/pml/base/pml_base_recvreq.h"
#include "pml_teg.h"
#include "pml_teg_proc.h"
#include "pml_teg_sendreq.h"
#include "pml_teg_recvreq.h"
mca_pml_base_component_1_0_0_t mca_pml_teg_component = {
@ -61,20 +61,10 @@ static inline int mca_pml_teg_param_register_int(
int mca_pml_teg_component_open(void)
{
mca_pml_base_request_t* teg_null = &mca_pml_teg.teg_request_null;
OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_send_requests, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_recv_requests, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_procs, ompi_list_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_request_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_request_cond, ompi_condition_t);
OBJ_CONSTRUCT(teg_null, mca_pml_base_request_t);
teg_null->req_type = MCA_PML_REQUEST_NULL;
teg_null->req_status.MPI_SOURCE = OMPI_PROC_NULL;
teg_null->req_status.MPI_TAG = OMPI_ANY_TAG;
teg_null->req_status.MPI_ERROR = OMPI_SUCCESS;
teg_null->req_status._count = 0;
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_waits = 0;
@ -82,8 +72,6 @@ int mca_pml_teg_component_open(void)
mca_pml_teg.teg_recvs = 0;
mca_pml_teg.teg_isends = 0;
mca_pml_teg.teg_irecvs = 0;
mca_pml_teg.teg_condition_waits = 0;
mca_pml_teg.teg_condition_broadcasts = 0;
#endif
mca_pml_teg.teg_free_list_num =
@ -111,10 +99,6 @@ int mca_pml_teg_component_close(void)
mca_pml_teg.teg_isends);
ompi_output(0, "mca_pml_teg.teg_irecvs = %d\n",
mca_pml_teg.teg_irecvs);
ompi_output(0, "mca_pml_teg.teg_condition_waits = %d\n",
mca_pml_teg.teg_condition_waits);
ompi_output(0, "mca_pml_teg.teg_condition_broadcast = %d\n",
mca_pml_teg.teg_condition_broadcasts);
#endif
if (mca_pml_teg.teg_recv_requests.fl_num_allocated !=
@ -127,8 +111,6 @@ int mca_pml_teg_component_close(void)
if(NULL != mca_pml_teg.teg_ptl_components) {
free(mca_pml_teg.teg_ptl_components);
}
OBJ_DESTRUCT(&mca_pml_teg.teg_request_lock);
OBJ_DESTRUCT(&mca_pml_teg.teg_request_cond);
OBJ_DESTRUCT(&mca_pml_teg.teg_send_requests);
OBJ_DESTRUCT(&mca_pml_teg.teg_recv_requests);
OBJ_DESTRUCT(&mca_pml_teg.teg_procs);
@ -148,13 +130,12 @@ mca_pml_base_module_t* mca_pml_teg_component_init(int* priority,
mca_pml_teg.teg_num_ptl_components = 0;
mca_pml_teg.teg_ptl_components = NULL;
mca_pml_teg.teg_num_ptl_components = 0;
mca_pml_teg.teg_request_waiting = 0;
/* recv requests */
ompi_free_list_init(
&mca_pml_teg.teg_recv_requests,
sizeof(mca_pml_base_recv_request_t),
OBJ_CLASS(mca_pml_base_recv_request_t),
sizeof(mca_pml_teg_recv_request_t),
OBJ_CLASS(mca_pml_teg_recv_request_t),
mca_pml_teg.teg_free_list_num,
mca_pml_teg.teg_free_list_max,
mca_pml_teg.teg_free_list_inc,

Просмотреть файл

@ -1,18 +0,0 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h"
#include "pml_teg_sendreq.h"
#include "pml_teg_recvreq.h"
#include "mca/pml/base/pml_base_request.h"
int mca_pml_teg_free(ompi_request_t** request)
{
MCA_PML_TEG_FREE(request);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -3,7 +3,7 @@
*/
#include "ompi_config.h"
#include "request/request.h"
#include "pml_teg_recvreq.h"
@ -17,16 +17,15 @@ int mca_pml_teg_iprobe(int src,
mca_pml_base_recv_request_t recvreq;
recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML;
recvreq.req_base.req_type = MCA_PML_REQUEST_IPROBE;
MCA_PML_BASE_RECV_REQUEST_INIT(&recvreq,
NULL, 0, NULL, src, tag, comm, true);
MCA_PML_BASE_RECV_REQUEST_INIT(&recvreq, NULL, 0, NULL, src, tag, comm, true);
if ((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) {
OBJ_DESTRUCT(&recvreq);
return rc;
}
if ((*matched = recvreq.req_base.req_mpi_done) == true
if ((*matched = recvreq.req_base.req_ompi.req_complete) == true
&& (NULL != status)) {
*status = recvreq.req_base.req_status;
*status = recvreq.req_base.req_ompi.req_status;
}
return OMPI_SUCCESS;
}
@ -41,35 +40,33 @@ int mca_pml_teg_probe(int src,
mca_pml_base_recv_request_t recvreq;
recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML;
recvreq.req_base.req_type = MCA_PML_REQUEST_PROBE;
MCA_PML_BASE_RECV_REQUEST_INIT(&recvreq,
NULL, 0, NULL, src, tag, comm, true);
MCA_PML_BASE_RECV_REQUEST_INIT(&recvreq, NULL, 0, NULL, src, tag, comm, true);
if ((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) {
OBJ_DESTRUCT(&recvreq);
return rc;
}
if (recvreq.req_base.req_mpi_done == false) {
if (recvreq.req_base.req_ompi.req_complete == false) {
/* give up and sleep until completion */
if (ompi_using_threads()) {
ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++;
while (recvreq.req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
ompi_mutex_lock(&ompi_request_lock);
ompi_request_waiting++;
while (recvreq.req_base.req_ompi.req_complete == false)
ompi_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
ompi_mutex_unlock(&ompi_request_lock);
} else {
mca_pml_teg.teg_request_waiting++;
while (recvreq.req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
ompi_request_waiting++;
while (recvreq.req_base.req_ompi.req_complete == false)
ompi_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
}
}
if (NULL != status) {
*status = recvreq.req_base.req_status;
*status = recvreq.req_base.req_ompi.req_status;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -3,7 +3,7 @@
*/
#include "ompi_config.h"
#include "request/request.h"
#include "pml_teg_recvreq.h"
@ -86,28 +86,26 @@ int mca_pml_teg_recv(void *addr,
return rc;
}
if (recvreq->req_base.req_mpi_done == false) {
if (recvreq->req_base.req_ompi.req_complete == false) {
/* give up and sleep until completion */
if (ompi_using_threads()) {
ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++;
while (recvreq->req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
ompi_mutex_lock(&ompi_request_lock);
ompi_request_waiting++;
while (recvreq->req_base.req_ompi.req_complete == false)
ompi_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
ompi_mutex_unlock(&ompi_request_lock);
} else {
mca_pml_teg.teg_request_waiting++;
while (recvreq->req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
ompi_request_waiting++;
while (recvreq->req_base.req_ompi.req_complete == false)
ompi_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
}
}
/* return status */
if (NULL != status) {
*status = recvreq->req_base.req_status;
*status = recvreq->req_base.req_ompi.req_status;
}
MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq);

Просмотреть файл

@ -95,22 +95,20 @@ int mca_pml_teg_send(void *buf,
return rc;
}
if (sendreq->req_base.req_mpi_done == false) {
if (sendreq->req_base.req_ompi.req_complete == false) {
/* give up and sleep until completion */
if (ompi_using_threads()) {
ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++;
while (sendreq->req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
ompi_mutex_lock(&ompi_request_lock);
ompi_request_waiting++;
while (sendreq->req_base.req_ompi.req_complete == false)
ompi_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
ompi_mutex_unlock(&ompi_request_lock);
} else {
mca_pml_teg.teg_request_waiting++;
while (sendreq->req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
ompi_request_waiting++;
while (sendreq->req_base.req_ompi.req_complete == false)
ompi_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
}
}

Просмотреть файл

@ -7,11 +7,43 @@
#include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_comm.h"
#include "pml_teg_recvreq.h"
#include "pml_teg_sendreq.h"
static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc(
mca_pml_base_recv_request_t* request, int proc);
static int mca_pml_teg_recv_request_free(struct ompi_request_t* request)
{
MCA_PML_TEG_FINI(&request);
return OMPI_SUCCESS;
}
static int mca_pml_teg_recv_request_cancel(struct ompi_request_t* request, int complete)
{
return OMPI_SUCCESS;
}
static void mca_pml_teg_recv_request_construct(mca_pml_base_recv_request_t* request)
{
request->req_base.req_type = MCA_PML_REQUEST_RECV;
request->req_base.req_ompi.req_query = NULL;
request->req_base.req_ompi.req_free = mca_pml_teg_recv_request_free;
request->req_base.req_ompi.req_cancel = mca_pml_teg_recv_request_cancel;
}
static void mca_pml_teg_recv_request_destruct(mca_pml_base_recv_request_t* request)
{
}
OBJ_CLASS_INSTANCE(
mca_pml_teg_recv_request_t,
mca_pml_base_request_t,
mca_pml_teg_recv_request_construct,
mca_pml_teg_recv_request_destruct);
/*
* Update the recv request status to reflect the number of bytes
* received and actually delivered to the application.
@ -23,25 +55,22 @@ void mca_pml_teg_recv_request_progress(
size_t bytes_received,
size_t bytes_delivered)
{
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
OMPI_THREAD_LOCK(&ompi_request_lock);
req->req_bytes_received += bytes_received;
req->req_bytes_delivered += bytes_delivered;
if (req->req_bytes_received >= req->req_bytes_packed) {
/* initialize request status */
req->req_base.req_status.MPI_SOURCE = req->req_base.req_peer;
req->req_base.req_status.MPI_TAG = req->req_base.req_tag;
req->req_base.req_status.MPI_ERROR = OMPI_SUCCESS;
req->req_base.req_status._count = req->req_bytes_delivered;
req->req_base.req_pml_done = true;
req->req_base.req_mpi_done = true;
if(mca_pml_teg.teg_request_waiting) {
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_condition_broadcasts++;
#endif
ompi_condition_broadcast(&mca_pml_teg.teg_request_cond);
req->req_base.req_ompi.req_status.MPI_SOURCE = req->req_base.req_peer;
req->req_base.req_ompi.req_status.MPI_TAG = req->req_base.req_tag;
req->req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
req->req_base.req_ompi.req_status._count = req->req_bytes_delivered;
req->req_base.req_pml_complete = true;
req->req_base.req_ompi.req_complete = true;
if(ompi_request_waiting) {
ompi_condition_broadcast(&ompi_request_cond);
}
}
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
OMPI_THREAD_UNLOCK(&ompi_request_lock);
}

Просмотреть файл

@ -12,6 +12,10 @@
#include "mca/pml/base/pml_base_recvreq.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
typedef mca_pml_base_recv_request_t mca_pml_teg_recv_request_t;
OBJ_CLASS_DECLARATION(mca_pml_teg_recv_request_t);
/**
* Allocate a recv request from the modules free list.

Просмотреть файл

@ -14,6 +14,42 @@
#include "pml_teg_recvreq.h"
static int mca_pml_teg_send_request_free(struct ompi_request_t* request)
{
MCA_PML_TEG_FINI(&request);
return OMPI_SUCCESS;
}
static int mca_pml_teg_send_request_cancel(struct ompi_request_t* request, int complete)
{
return OMPI_SUCCESS;
}
static void mca_pml_teg_send_request_construct(mca_pml_base_send_request_t* req)
{
req->req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_base.req_ompi.req_query = NULL;
req->req_base.req_ompi.req_free = mca_pml_teg_send_request_free;
req->req_base.req_ompi.req_cancel = mca_pml_teg_send_request_cancel;
}
static void mca_pml_teg_send_request_destruct(mca_pml_base_send_request_t* req)
{
}
OBJ_CLASS_INSTANCE(
mca_pml_teg_send_request_t,
mca_pml_base_request_t,
mca_pml_teg_send_request_construct,
mca_pml_teg_send_request_destruct);
/**
* Schedule message delivery across potentially multiple PTLs.
*
@ -63,12 +99,12 @@ void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req)
/* unable to complete send - signal request failed */
if(bytes_remaining > 0) {
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
req->req_base.req_mpi_done = true;
OMPI_THREAD_LOCK(&ompi_request_lock);
req->req_base.req_ompi.req_complete = true;
/* FIX - set status correctly */
if(mca_pml_teg.teg_request_waiting)
ompi_condition_broadcast(&mca_pml_teg.teg_request_cond);
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
if(ompi_request_waiting)
ompi_condition_broadcast(&ompi_request_cond);
OMPI_THREAD_UNLOCK(&ompi_request_lock);
}
}
@ -89,27 +125,27 @@ void mca_pml_teg_send_request_progress(
size_t bytes_sent)
{
bool first_frag;
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
OMPI_THREAD_LOCK(&ompi_request_lock);
first_frag = (req->req_bytes_sent == 0 && req->req_bytes_packed > 0);
req->req_bytes_sent += bytes_sent;
if (req->req_bytes_sent >= req->req_bytes_packed) {
req->req_base.req_pml_done = true;
if (req->req_base.req_mpi_done == false) {
req->req_base.req_status.MPI_SOURCE = req->req_base.req_comm->c_my_rank;
req->req_base.req_status.MPI_TAG = req->req_base.req_tag;
req->req_base.req_status.MPI_ERROR = OMPI_SUCCESS;
req->req_base.req_status._count = req->req_bytes_sent;
req->req_base.req_mpi_done = true;
if(mca_pml_teg.teg_request_waiting) {
ompi_condition_broadcast(&mca_pml_teg.teg_request_cond);
req->req_base.req_pml_complete = true;
if (req->req_base.req_ompi.req_complete == false) {
req->req_base.req_ompi.req_status.MPI_SOURCE = req->req_base.req_comm->c_my_rank;
req->req_base.req_ompi.req_status.MPI_TAG = req->req_base.req_tag;
req->req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
req->req_base.req_ompi.req_status._count = req->req_bytes_sent;
req->req_base.req_ompi.req_complete = true;
if(ompi_request_waiting) {
ompi_condition_broadcast(&ompi_request_cond);
}
} else if (req->req_base.req_free_called) {
MCA_PML_TEG_FREE((ompi_request_t**)&req);
}
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
OMPI_THREAD_UNLOCK(&ompi_request_lock);
return;
}
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
OMPI_THREAD_UNLOCK(&ompi_request_lock);
/* if first fragment - schedule remaining fragments */
if(first_frag == true) {

Просмотреть файл

@ -14,6 +14,11 @@
#include "pml_teg_proc.h"
#include "pml_teg_ptl.h"
typedef mca_pml_base_send_request_t mca_pml_teg_send_request_t;
OBJ_CLASS_DECLARATION(mca_pml_teg_send_request_t);
#define MCA_PML_TEG_SEND_REQUEST_ALLOC( \
comm, \
dst, \

Просмотреть файл

@ -26,23 +26,21 @@ int mca_pml_teg_start(size_t count, ompi_request_t** requests)
*/
switch(pml_request->req_ompi.req_state) {
case OMPI_REQUEST_INVALID:
return OMPI_ERR_REQUEST;
case OMPI_REQUEST_INACTIVE:
break;
case OMPI_REQUEST_ACTIVE: {
ompi_request_t *request;
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
if (pml_request->req_pml_done == false) {
OMPI_THREAD_LOCK(&ompi_request_lock);
if (pml_request->req_pml_complete == false) {
/* free request after it completes */
pml_request->req_free_called = true;
} else {
/* can reuse the existing request */
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
OMPI_THREAD_UNLOCK(&ompi_request_lock);
break;
}
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
OMPI_THREAD_UNLOCK(&ompi_request_lock);
/* allocate a new request */
switch(pml_request->req_type) {
@ -83,6 +81,8 @@ int mca_pml_teg_start(size_t count, ompi_request_t** requests)
requests[i] = request;
break;
}
default:
return OMPI_ERR_REQUEST;
}
/* start the request */

Просмотреть файл

@ -1,86 +0,0 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h"
#include "pml_teg_sendreq.h"
#include "pml_teg_recvreq.h"
int mca_pml_teg_test(size_t count,
ompi_request_t ** requests,
int *index,
int *completed, ompi_status_public_t * status)
{
size_t i;
ompi_atomic_mb();
for (i = 0; i < count; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if (pml_request == NULL)
continue;
if (pml_request->req_mpi_done) {
*index = i;
*completed = true;
if (NULL != status)
*status = pml_request->req_status;
MCA_PML_TEG_FINI(requests + i);
return OMPI_SUCCESS;
}
}
*index = MPI_UNDEFINED;
*completed = false;
if (NULL != status)
*status = mca_pml_teg.teg_request_null.req_status;
return OMPI_SUCCESS;
}
int mca_pml_teg_test_all(size_t count,
ompi_request_t ** requests,
int *completed, ompi_status_public_t * statuses)
{
size_t i;
size_t num_completed = 0;
ompi_atomic_mb();
for (i = 0; i < count; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if (pml_request == NULL || pml_request->req_mpi_done)
num_completed++;
}
if (num_completed != count) {
*completed = false;
return OMPI_SUCCESS;
}
*completed = true;
if (NULL != statuses) {
/* fill out completion status and free request if required */
for (i = 0; i < count; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if (NULL == pml_request) {
statuses[i] = mca_pml_teg.teg_request_null.req_status;
} else {
statuses[i] = pml_request->req_status;
MCA_PML_TEG_FINI(requests + i);
}
}
} else {
/* free request if required */
for (i = 0; i < count; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if (NULL != pml_request)
MCA_PML_TEG_FINI(requests + i);
}
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -1,168 +0,0 @@
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h"
#include "pml_teg_sendreq.h"
#include "pml_teg_recvreq.h"
#include "mca/ptl/base/ptl_base_comm.h"
#include "mca/pml/base/pml_base_request.h"
#include "mpi.h" /* we need at least MPI_UNDEFINED and MPI_REQUEST_NULL */
int mca_pml_teg_wait(size_t count,
ompi_request_t ** request,
int *index, ompi_status_public_t * status)
{
#if OMPI_HAVE_THREADS
int c;
#endif
int i;
int null_requests = 0;
int completed = -1;
mca_pml_base_request_t *pml_request = NULL;
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_waits++;
#endif
#if OMPI_HAVE_THREADS
/* poll for completion */
ompi_atomic_mb();
for (c = 0; c < mca_pml_teg.teg_poll_iterations; c++) {
null_requests = 0;
for (i = 0; i < count; i++) {
pml_request = (mca_pml_base_request_t *) request[i];
if (MPI_REQUEST_NULL == (ompi_request_t*)pml_request) {
++null_requests;
if( null_requests == count ) goto out_of_loop;
}
if (true == pml_request->req_mpi_done) {
completed = i;
goto out_of_loop;
}
}
}
out_of_loop:
#endif
if ((completed < 0) || (null_requests != count)) {
/* give up and sleep until completion */
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++;
do {
null_requests = 0;
for (i = 0; i < count; i++) {
pml_request = (mca_pml_base_request_t *) request[i];
if (MPI_REQUEST_NULL == (ompi_request_t*)pml_request) {
++null_requests;
continue;
}
if (pml_request->req_mpi_done == true) {
completed = i;
break;
}
}
/* for performance reason I prefer to make the test only once not
* on all ifs and whiles around here.
*/
if (null_requests == count) break;
if (completed < 0) {
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_condition_waits++;
#endif
ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
}
} while (completed < 0);
mca_pml_teg.teg_request_waiting--;
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
}
if( null_requests == count ) {
*index = MPI_UNDEFINED;
*status = mca_pml_teg.teg_request_null.req_status;
} else {
/* return status */
if (MPI_STATUS_IGNORE != status) {
*status = pml_request->req_status;
}
*index = completed;
/* return request to pool */
MCA_PML_TEG_FINI( &(request[completed]) );
}
return OMPI_SUCCESS;
}
int mca_pml_teg_wait_all(size_t count,
ompi_request_t ** requests,
ompi_status_public_t * statuses)
{
int completed = 0, i;
for (i = 0; i < count; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if ((MPI_REQUEST_NULL == (ompi_request_t*)pml_request) ||
(pml_request->req_mpi_done == true)) {
completed++;
}
}
/* if all requests have not completed -- defer requiring lock
* unless required */
if (completed != count) {
/*
* acquire lock and test for completion - if all requests are
* not completed pend on condition variable until a request
* completes
*/
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++;
do {
completed = 0;
for (i = 0; i < count; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if ((MPI_REQUEST_NULL == (ompi_request_t*)pml_request) ||
(pml_request->req_mpi_done == true)) {
completed++;
continue;
}
}
if (completed != count)
ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
} while (completed != count);
mca_pml_teg.teg_request_waiting--;
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
}
if (MPI_STATUSES_IGNORE != statuses) {
/* fill out status and free request if required */
for (i = 0; i < count; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if (MPI_REQUEST_NULL == (ompi_request_t*)pml_request) {
statuses[i] = mca_pml_teg.teg_request_null.req_status;
} else {
statuses[i] = pml_request->req_status;
MCA_PML_TEG_FINI(requests + i);
}
}
} else {
/* free request if required */
for (i = 0; i < count; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if (MPI_REQUEST_NULL != (ompi_request_t*)pml_request) {
MCA_PML_TEG_FINI(requests + i);
}
}
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -179,6 +179,9 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
ompi_event_add(&ptl_peer->peer_send_event, 0);
}
} else {
/* after the first fragment - delay sending subsequent fragments to
* enable better overlap by the scheduler
*/
ptl_peer->peer_send_frag = frag;
ompi_event_add(&ptl_peer->peer_send_event, 0);
}

Просмотреть файл

@ -59,9 +59,9 @@ int MPI_Bsend(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Co
if(OMPI_SUCCESS != rc)
goto error_return;
rc = mca_pml.pml_wait(1, &request, &index, NULL);
rc = ompi_request_wait(1, &request, &index, NULL);
if(OMPI_SUCCESS != rc) {
mca_pml.pml_free(&request);
ompi_request_free(&request);
return rc;
}

Просмотреть файл

@ -39,9 +39,7 @@ int MPI_Cancel(MPI_Request *request)
if (NULL == *request) {
return MPI_SUCCESS;
}
rc = mca_pml.pml_cancel(*request);
/* JMS: Tim will fix to invoke on the communicator/window/file on
the request (i.e., not COMM_WORLD) */
rc = ompi_request_cancel(*request);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, "MPI_Cancel");
}

Просмотреть файл

@ -7,6 +7,7 @@
#include "mpi.h"
#include "mpi/c/bindings.h"
#include "communicator/communicator.h"
#include "request/request.h"
#include "errhandler/errhandler.h"
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
@ -22,11 +23,16 @@ static const char FUNC_NAME[] = "MPI_Grequest_complete";
int MPI_Grequest_complete(MPI_Request request)
{
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
}
int rc = MPI_SUCCESS;
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (request == MPI_REQUEST_NULL) {
rc = MPI_ERR_REQUEST;
}
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}
/* This function is not yet implemented */
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_INTERN, FUNC_NAME);
rc = ompi_request_complete(request);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, MPI_ERR_INTERN, FUNC_NAME);
}

Просмотреть файл

@ -7,6 +7,7 @@
#include "mpi.h"
#include "mpi/c/bindings.h"
#include "communicator/communicator.h"
#include "request/grequest.h"
#include "errhandler/errhandler.h"
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
@ -25,11 +26,12 @@ int MPI_Grequest_start(MPI_Grequest_query_function *query_fn,
MPI_Grequest_cancel_function *cancel_fn,
void *extra_state, MPI_Request *request)
{
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
}
int rc;
if (MPI_PARAM_CHECK) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
}
/* This function is not yet implemented */
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_INTERN, FUNC_NAME);
rc = ompi_grequest_start(query_fn,free_fn,cancel_fn,extra_state,request);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

Просмотреть файл

@ -89,11 +89,10 @@ int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader,
if ( rc != MPI_SUCCESS ) {
goto err_exit;
}
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE);
rc = ompi_request_wait_all ( 1, &req, MPI_STATUS_IGNORE);
if ( rc != MPI_SUCCESS ) {
goto err_exit;
}
}
/* bcast size and list of remote processes to all processes in local_comm */

Просмотреть файл

@ -25,7 +25,8 @@ int MPI_Irecv(void *buf, int count, MPI_Datatype type, int source,
{
int rc;
if (source == MPI_PROC_NULL) {
return mca_pml.pml_null(request);
*request = &ompi_request_null;
return OMPI_SUCCESS;
}
if ( MPI_PARAM_CHECK ) {

Просмотреть файл

@ -24,8 +24,9 @@ int MPI_Recv_init(void *buf, int count, MPI_Datatype type, int source,
int tag, MPI_Comm comm, MPI_Request *request)
{
int rc;
if (source == MPI_PROC_NULL) {
return mca_pml.pml_null(request);
if (source == MPI_PROC_NULL) {
*request = &ompi_request_null;
return MPI_SUCCESS;
}
if ( MPI_PARAM_CHECK ) {

Просмотреть файл

@ -33,7 +33,7 @@ int MPI_Request_free(MPI_Request *request)
if( *request == NULL ) {
return MPI_SUCCESS;
}
rc = mca_pml.pml_free(request);
rc = ompi_request_free(request);
error_return:
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);

Просмотреть файл

@ -66,8 +66,8 @@ int MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype recvtype,
}
if (source != MPI_PROC_NULL) { /* wait for recv */
int useless; /* this one is just used to keep the pml_wait happy */
rc = mca_pml.pml_wait(1, &req, &useless, status);
int index;
rc = ompi_request_wait(1, &req, &index, status);
} else {
if (MPI_STATUS_IGNORE != status) {
status->MPI_ERROR = MPI_SUCCESS;

Просмотреть файл

@ -42,7 +42,7 @@ int MPI_Test(MPI_Request *request, int *completed, MPI_Status *status)
status->_count = 0;
return MPI_SUCCESS;
}
rc = mca_pml.pml_test(1, request, &index, completed, status);
rc = ompi_request_test(1, request, &index, completed, status);
if(*completed < 0) {
*completed = 0;
}

Просмотреть файл

@ -33,7 +33,7 @@ int MPI_Testall(int count, MPI_Request requests[], int *flag,
}
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}
rc = mca_pml.pml_test_all(count, requests, flag, statuses);
rc = ompi_request_test_all(count, requests, flag, statuses);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

Просмотреть файл

@ -33,7 +33,7 @@ int MPI_Testany(int count, MPI_Request requests[], int *index, int *completed, M
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}
rc = mca_pml.pml_test(count, requests, index, completed, status);
rc = ompi_request_test(count, requests, index, completed, status);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

Просмотреть файл

@ -36,7 +36,7 @@ int MPI_Testsome(int incount, MPI_Request requests[],
}
/* optimize this in the future */
rc = mca_pml.pml_test(incount, requests, &index, &completed, statuses);
rc = ompi_request_test(incount, requests, &index, &completed, statuses);
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
if(completed) {
*outcount = 1;

Просмотреть файл

@ -21,7 +21,8 @@ static const char FUNC_NAME[] = "MPI_Wait";
int MPI_Wait(MPI_Request *request, MPI_Status *status)
{
int index, rc;
int rc;
int index;
if ( MPI_PARAM_CHECK ) {
rc = MPI_SUCCESS;
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
@ -41,7 +42,7 @@ int MPI_Wait(MPI_Request *request, MPI_Status *status)
return MPI_SUCCESS;
}
rc = mca_pml.pml_wait(1, request, &index, status);
rc = ompi_request_wait(1, request, &index, status);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

Просмотреть файл

@ -30,7 +30,7 @@ int MPI_Waitall(int count, MPI_Request *requests, MPI_Status *statuses)
}
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}
rc = mca_pml.pml_wait_all(count, requests, statuses);
rc = ompi_request_wait_all(count, requests, statuses);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

Просмотреть файл

@ -32,7 +32,7 @@ int MPI_Waitany(int count, MPI_Request *requests, int *index, MPI_Status *status
}
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}
rc = mca_pml.pml_wait(count, requests, index, status);
rc = ompi_request_wait(count, requests, index, status);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

Просмотреть файл

@ -40,7 +40,7 @@ int MPI_Waitsome(int incount, MPI_Request *requests,
}
/* optimize this in the future */
rc = mca_pml.pml_wait(incount, requests, &index, &status);
rc = ompi_request_wait(incount, requests, &index, statuses);
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
if( MPI_UNDEFINED ==index ) {
*outcount = MPI_UNDEFINED;

Просмотреть файл

@ -10,11 +10,15 @@ noinst_LTLIBRARIES = librequest.la
# Source code files
headers = \
request.h
grequest.h \
request.h
librequest_la_SOURCES = \
$(headers) \
request.c
$(headers) \
grequest.c \
request.c \
req_test.c \
req_wait.c
# Conditionally install the header files

74
src/request/grequest.c Обычный файл
Просмотреть файл

@ -0,0 +1,74 @@
#include "request/grequest.h"
static int ompi_grequest_query(ompi_request_t* req, ompi_status_public_t* status)
{
int rc = OMPI_SUCCESS;
ompi_grequest_t* greq = (ompi_grequest_t*)req;
if(greq->greq_query != NULL)
rc = greq->greq_query(greq->greq_state, status);
return rc;
}
static int ompi_grequest_free(ompi_request_t* req)
{
int rc = OMPI_SUCCESS;
ompi_grequest_t* greq = (ompi_grequest_t*)req;
if(greq->greq_free != NULL)
rc = greq->greq_free(greq->greq_state);
OBJ_RELEASE(req);
return rc;
}
static int ompi_grequest_cancel(ompi_request_t* req, int flag)
{
int rc = OMPI_SUCCESS;
ompi_grequest_t* greq = (ompi_grequest_t*)req;
if(greq->greq_cancel != NULL)
rc = greq->greq_cancel(greq->greq_state, flag);
return rc;
}
static void ompi_grequest_construct(ompi_grequest_t* greq)
{
OMPI_REQUEST_INIT(&greq->greq_base);
greq->greq_base.req_query = ompi_grequest_query;
greq->greq_base.req_free = ompi_grequest_free;
greq->greq_base.req_cancel = ompi_grequest_cancel;
greq->greq_base.req_type = OMPI_REQUEST_GEN;
}
static void ompi_grequest_destruct(ompi_grequest_t* greq)
{
OMPI_REQUEST_FINI(&greq->greq_base);
}
OBJ_CLASS_INSTANCE(
ompi_grequest_t,
ompi_request_t,
ompi_grequest_construct,
ompi_grequest_destruct);
int ompi_grequest_start(
MPI_Grequest_query_function *gquery_fn,
MPI_Grequest_free_function *gfree_fn,
MPI_Grequest_cancel_function *gcancel_fn,
void* gstate,
ompi_request_t** request)
{
ompi_grequest_t *greq = OBJ_NEW(ompi_grequest_t);
if(greq == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
greq->greq_state = gstate;
greq->greq_query = gquery_fn;
greq->greq_free = gfree_fn;
greq->greq_cancel = gcancel_fn;
*request = &greq->greq_base;
return OMPI_SUCCESS;
}

33
src/request/grequest.h Обычный файл
Просмотреть файл

@ -0,0 +1,33 @@
/*
* $HEADER$
*/
#ifndef OMPI_GEN_REQUEST_H
#define OMPI_GEN_REQUEST_H
#include "ompi_config.h"
#include "request/request.h"
OBJ_CLASS_DECLARATION(ompi_grequest_t);
struct ompi_grequest_t {
ompi_request_t greq_base;
MPI_Grequest_query_function *greq_query;
MPI_Grequest_free_function *greq_free;
MPI_Grequest_cancel_function *greq_cancel;
void *greq_state;
};
typedef struct ompi_grequest_t ompi_grequest_t;
int ompi_grequest_start(
MPI_Grequest_query_function *gquery,
MPI_Grequest_free_function *gfree,
MPI_Grequest_cancel_function *gcancel,
void* gstate,
ompi_request_t** request);
#endif

111
src/request/req_test.c Обычный файл
Просмотреть файл

@ -0,0 +1,111 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "include/constants.h"
#include "request/request.h"
int ompi_request_test(
size_t count,
ompi_request_t ** requests,
int *index,
int *completed,
ompi_status_public_t * status)
{
int rc;
size_t i;
ompi_request_t **rptr;
ompi_request_t *request;
ompi_atomic_mb();
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (request == NULL)
continue;
if (request->req_complete) {
*index = i;
*completed = true;
if (NULL != status) {
*status = request->req_status;
}
rc = request->req_free(request);
if(rc != OMPI_SUCCESS)
return rc;
*rptr = NULL;
return OMPI_SUCCESS;
}
rptr++;
}
*index = MPI_UNDEFINED;
*completed = false;
if (NULL != status)
*status = ompi_request_null.req_status;
return OMPI_SUCCESS;
}
int ompi_request_test_all(
size_t count,
ompi_request_t ** requests,
int *completed,
ompi_status_public_t * statuses)
{
size_t i;
ompi_request_t **rptr;
size_t num_completed;
ompi_request_t *request;
ompi_atomic_mb();
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (request == NULL || request->req_complete) {
num_completed++;
}
rptr++;
}
if (num_completed != count) {
*completed = false;
return OMPI_SUCCESS;
}
*completed = true;
if (NULL != statuses) {
/* fill out completion status and free request if required */
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (NULL == request) {
statuses[i] = ompi_request_null.req_status;
} else {
int rc;
statuses[i] = request->req_status;
rc = request->req_free(request);
if(rc != OMPI_SUCCESS)
return rc;
*rptr = NULL;
}
rptr++;
}
} else {
/* free request if required */
rptr = requests;
for (i = 0; i < count; i++) {
ompi_request_t *request = *rptr;
if (NULL != request) {
int rc = request->req_free(request);
if(rc != OMPI_SUCCESS)
return rc;
}
rptr++;
}
}
return OMPI_SUCCESS;
}

154
src/request/req_wait.c Обычный файл
Просмотреть файл

@ -0,0 +1,154 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "include/constants.h"
#include "request/request.h"
int ompi_request_wait(
size_t count,
ompi_request_t ** requests,
int *index,
ompi_status_public_t * status)
{
#if OMPI_HAVE_THREADS
int c;
#endif
int i, rc;
int completed = -1;
ompi_request_t **rptr;
ompi_request_t *request;
#if OMPI_HAVE_THREADS
/* poll for completion */
ompi_atomic_mb();
for (c = 0; completed < 0 && c < ompi_request_poll_iterations; c++) {
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (request->req_complete == true) {
completed = i;
break;
}
rptr++;
}
}
#endif
if (completed < 0) {
/* give up and sleep until completion */
OMPI_THREAD_LOCK(&ompi_request_lock);
do {
ompi_request_waiting++;
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (request->req_complete == true) {
completed = i;
break;
}
rptr++;
}
if (completed < 0) {
ompi_condition_wait(&ompi_request_cond, &ompi_request_lock);
}
} while (completed < 0);
ompi_request_waiting--;
OMPI_THREAD_UNLOCK(&ompi_request_lock);
}
/* return status */
if (NULL != status) {
*status = request->req_status;
}
/* return request to pool */
rc = request->req_free(request);
*rptr = NULL;
*index = completed;
return rc;
}
int ompi_request_wait_all(
size_t count,
ompi_request_t ** requests,
ompi_status_public_t * statuses)
{
size_t completed = 0, i;
ompi_request_t **rptr;
ompi_request_t *request;
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (request == NULL || request->req_complete == true) {
completed++;
}
rptr++;
}
/* if all requests have not completed -- defer acquiring lock
* unless required
*/
if (completed != count) {
/*
* acquire lock and test for completion - if all requests are
* not completed pend on condition variable until a request
* completes
*/
OMPI_THREAD_LOCK(&ompi_request_lock);
ompi_request_waiting++;
do {
completed = 0;
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (request == NULL || request->req_complete == true) {
completed++;
continue;
}
}
if (completed != count) {
ompi_condition_wait(&ompi_request_cond, &ompi_request_lock);
}
} while (completed != count);
ompi_request_waiting--;
OMPI_THREAD_UNLOCK(&ompi_request_lock);
}
if (NULL != statuses) {
/* fill out status and free request if required */
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (NULL == request) {
statuses[i] = ompi_request_null.req_status;
} else {
int rc;
statuses[i] = request->req_status;
rc = request->req_free(request);
if(rc != OMPI_SUCCESS)
return rc;
*rptr = NULL;
}
rptr++;
}
} else {
/* free request if required */
rptr = requests;
for (i = 0; i < count; i++) {
request = *rptr;
if (NULL != request) {
int rc = request->req_free(request);
if(rc != OMPI_SUCCESS)
return rc;
*rptr = NULL;
}
rptr++;
}
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -8,28 +8,37 @@
#include "request/request.h"
#include "include/constants.h"
/*
* Table for Fortran <-> C Request handle conversion
*/
ompi_pointer_array_t ompi_request_f_to_c_table;
/*
* MPI_REQUEST_NULL
*/
ompi_request_t ompi_mpi_request_null;
ompi_pointer_array_t ompi_request_f_to_c_table;
volatile int ompi_request_waiting = 0;
int ompi_request_poll_iterations = 20000;
ompi_mutex_t ompi_request_lock;
ompi_condition_t ompi_request_cond;
ompi_request_t ompi_request_null;
static void ompi_request_construct(ompi_request_t* req)
{
OMPI_REQUEST_INIT(req);
req->req_query = NULL;
req->req_free = NULL;
req->req_cancel = NULL;
}
static void ompi_request_destruct(ompi_request_t* req)
{
OMPI_REQUEST_FINI(req);
}
static int ompi_request_null_free(ompi_request_t* request)
{
return OMPI_SUCCESS;
}
static int ompi_request_null_cancel(ompi_request_t* request, int flag)
{
return OMPI_SUCCESS;
}
OBJ_CLASS_INSTANCE(
ompi_request_t,
@ -40,7 +49,22 @@ OBJ_CLASS_INSTANCE(
int ompi_request_init(void)
{
OBJ_CONSTRUCT(&ompi_mpi_request_null, ompi_request_t);
OBJ_CONSTRUCT(&ompi_request_f_to_c_table, ompi_pointer_array_t);
OBJ_CONSTRUCT(&ompi_request_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&ompi_request_cond, ompi_condition_t);
OBJ_CONSTRUCT(&ompi_request_null, ompi_request_t);
ompi_request_null.req_status.MPI_SOURCE = MPI_PROC_NULL;
ompi_request_null.req_status.MPI_TAG = MPI_ANY_TAG;
ompi_request_null.req_status.MPI_ERROR = MPI_SUCCESS;
ompi_request_null.req_status._count = 0;
ompi_request_null.req_complete = true;
ompi_request_null.req_type = OMPI_REQUEST_NULL;
ompi_request_null.req_query = NULL;
ompi_request_null.req_free = ompi_request_null_free;
ompi_request_null.req_cancel = ompi_request_null_cancel;
if (0 != ompi_pointer_array_add(&ompi_request_f_to_c_table,
MPI_REQUEST_NULL)) {
return OMPI_ERR_REQUEST;
@ -51,6 +75,28 @@ int ompi_request_init(void)
int ompi_request_finalize(void)
{
OBJ_DESTRUCT(&ompi_mpi_request_null);
OBJ_DESTRUCT(&ompi_request_f_to_c_table);
OBJ_DESTRUCT(&ompi_request_lock);
OBJ_DESTRUCT(&ompi_request_cond);
OBJ_DESTRUCT(&ompi_request_null);
return OMPI_SUCCESS;
}
int ompi_request_complete(ompi_request_t* request)
{
OMPI_THREAD_LOCK(&ompi_request_lock);
request->req_complete = true;
if (request->req_query != NULL) {
int rc = request->req_query(request, &request->req_status);
if(rc != OMPI_SUCCESS) {
OMPI_THREAD_UNLOCK(&ompi_request_lock);
return rc;
}
}
if(ompi_request_waiting)
ompi_condition_signal(&ompi_request_cond);
OMPI_THREAD_UNLOCK(&ompi_request_lock);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -13,8 +13,10 @@
#include "mpi.h"
#include "class/ompi_list.h"
#include "class/ompi_pointer_array.h"
#include "errhandler/errhandler.h"
#include "threads/mutex.h"
#include "threads/condition.h"
/**
* Request class
*/
@ -24,14 +26,11 @@ OBJ_CLASS_DECLARATION(ompi_request_t);
* Enum inidicating the type of the request
*/
typedef enum {
/** MPI point-to-point request */
OMPI_REQUEST_PML,
/** MPI-2 IO request */
OMPI_REQUEST_IO,
/** MPI-2 generalized request */
OMPI_REQUEST_GEN,
/** Maximum request type */
OMPI_REQUEST_MAX
OMPI_REQUEST_PML, /**< MPI point-to-point request */
OMPI_REQUEST_IO, /**< MPI-2 IO request */
OMPI_REQUEST_GEN, /**< MPI-2 generalized request */
OMPI_REQUEST_NULL, /**< NULL request */
OMPI_REQUEST_MAX /**< Maximum request type */
} ompi_request_type_t;
/**
@ -49,36 +48,50 @@ typedef enum {
OMPI_REQUEST_CANCELLED
} ompi_request_state_t;
struct ompi_request_t;
/*
* Optional function that is called on request completion to update
* the request status.
*/
typedef int (*ompi_request_query_fn_t)(
struct ompi_request_t* request,
ompi_status_public_t* status);
/*
* Required function to free the request and any associated resources.
*/
typedef int (*ompi_request_free_fn_t)(struct ompi_request_t* request);
/*
* Optional function to cancel a pending request.
*/
typedef int (*ompi_request_cancel_fn_t)(struct ompi_request_t* request, int flag);
/**
* Main top-level request struct definition
*/
struct ompi_request_t {
/** Base type */
ompi_list_item_t super;
/** Enum indicating the type of the request */
ompi_request_type_t req_type;
/** Enum indicating the state of the request */
volatile ompi_request_state_t req_state;
/** Index in Fortran <-> C translation array */
int req_f_to_c_index;
ompi_list_item_t super; /**< Base type */
ompi_request_type_t req_type; /**< Enum indicating the type of the request */
ompi_status_public_t req_status; /**< Completion status */
volatile bool req_complete; /**< Flag indicating wether request has completed */
volatile ompi_request_state_t req_state; /**< enum indicate state of the request */
int req_f_to_c_index; /**< Index in Fortran <-> C translation array */
ompi_request_query_fn_t req_query; /**< Optional query function to retrieve status */
ompi_request_free_fn_t req_free; /**< Required function to free request */
ompi_request_cancel_fn_t req_cancel; /**< Optional function to cancel the request */
};
/**
* Convenience typedef
*/
typedef struct ompi_request_t ompi_request_t;
/**
* Table for Fortran <-> C request handle conversion
*/
extern ompi_pointer_array_t ompi_request_f_to_c_table;
/**
* MPI_REQUEST_NULL
*/
extern ompi_request_t ompi_mpi_request_null;
/**
* Iniitialize a request. This is a macro to avoid function call
* overhead, since this is typically invoked in the critical
@ -88,7 +101,8 @@ extern ompi_request_t ompi_mpi_request_null;
#define OMPI_REQUEST_INIT(request) \
do { \
(request)->req_state = OMPI_REQUEST_INACTIVE; \
(request)->req_f_to_c_index = -1; \
(request)->req_complete = false; \
(request)->req_f_to_c_index = -1; \
} while(0);
/**
@ -101,6 +115,7 @@ extern ompi_request_t ompi_mpi_request_null;
* invoked on that request, then this request was added to the f2c
* table, and we need to remove it
*/
#define OMPI_REQUEST_FINI(request) \
do { \
(request)->req_state = OMPI_REQUEST_INVALID; \
@ -110,22 +125,150 @@ extern ompi_request_t ompi_mpi_request_null;
} \
} while (0);
/**
* Globals used for tracking requests and request completion.
*/
extern ompi_pointer_array_t ompi_request_f_to_c_table;
extern volatile int ompi_request_waiting;
extern ompi_mutex_t ompi_request_lock;
extern ompi_condition_t ompi_request_cond;
extern int ompi_request_poll_iterations;
extern ompi_request_t ompi_request_null;
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* Initialize the MPI_Request subsystem; invoked during MPI_INIT.
*/
int ompi_request_init(void);
/**
* Shut down the MPI_Request subsystem; invoked during MPI_FINALIZE.
*/
int ompi_request_finalize(void);
/**
* Initialize the MPI_Request subsystem; invoked during MPI_INIT.
*/
int ompi_request_init(void);
/**
* Shut down the MPI_Request subsystem; invoked during MPI_FINALIZE.
*/
int ompi_request_finalize(void);
/**
* Cancel a pending request.
*/
static inline int ompi_request_cancel(ompi_request_t* request)
{
if(request->req_cancel != NULL)
return request->req_cancel(request, true);
return OMPI_SUCCESS;
}
/**
* Signal a request as complete. Note this will
* wake any thread pending on the request.
*/
int ompi_request_complete(ompi_request_t* request);
/**
* Free a request.
*
* @param request (IN) Pointer to request.
*/
static inline int ompi_request_free(ompi_request_t** request)
{
int rc = OMPI_SUCCESS;
if(*request != NULL) {
rc = (*request)->req_free(*request);
if(rc == OMPI_SUCCESS)
*request = NULL;
}
return rc;
}
/**
* Non-blocking test for request completion.
*
* @param count (IN) Number of requests
* @param request (IN) Array of requests
* @param index (OUT) Index of first completed request.
* @param complete (OUT) Flag indicating if index is valid (a request completed).
* @param status (OUT) Status of completed request.
* @return OMPI_SUCCESS or failure status.
*
* Note that upon completion, the request is freed, and the
* request handle at index set to NULL.
*/
int ompi_request_test(
size_t count,
ompi_request_t ** requests,
int *index,
int *completed,
ompi_status_public_t * status);
/**
* Non-blocking test for request completion.
*
* @param count (IN) Number of requests
* @param requests (IN) Array of requests
* @param completed (OUT) Flag indicating wether all requests completed.
* @param statuses (OUT) Array of completion statuses.
* @return OMPI_SUCCESS or failure status.
*
* This routine returns completed==true if all requests have completed.
* The statuses parameter is only updated if all requests completed. Likewise,
* the requests array is not modified (no requests freed), unless all requests
* have completed.
*/
int ompi_request_test_all(
size_t count,
ompi_request_t ** requests,
int *completed,
ompi_status_public_t * statuses);
/**
* Wait (blocking-mode) for one of N requests to complete.
*
* @param count (IN) Number of requests
* @param requests (IN) Array of requests
* @param index (OUT) Index into request array of completed request.
* @param status (OUT) Status of completed request.
* @return OMPI_SUCCESS or failure status.
*
*/
int ompi_request_wait(
size_t count,
ompi_request_t ** requests,
int *index,
ompi_status_public_t * status);
/**
* Wait (blocking-mode) for all of N requests to complete.
*
* @param count (IN) Number of requests
* @param requests (IN) Array of requests
* @param statuses (OUT) Array of completion statuses.
* @return OMPI_SUCCESS or failure status.
*
*/
int ompi_request_wait_all(
size_t count,
ompi_request_t ** requests,
ompi_status_public_t * statuses);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

Просмотреть файл

@ -44,15 +44,16 @@ static inline int ompi_condition_timedwait(ompi_condition_t *c,
const struct timespec *abstime)
{
struct timeval tv;
uint32_t secs = abstime->tv_sec;
uint32_t usecs = abstime->tv_nsec * 1000;
struct timeval abs;
abs.tv_sec = abstime->tv_sec;
abs.tv_usec = abstime->tv_nsec * 1000;
gettimeofday(&tv,NULL);
c->c_waiting++;
if (ompi_using_threads()) {
while (c->c_signaled == 0 &&
(tv.tv_sec <= secs ||
(tv.tv_sec == secs && tv.tv_usec < usecs))) {
(tv.tv_sec <= abs.tv_sec ||
(tv.tv_sec == abs.tv_sec && tv.tv_usec < abs.tv_usec))) {
ompi_mutex_unlock(m);
ompi_progress();
gettimeofday(&tv,NULL);
@ -60,8 +61,8 @@ static inline int ompi_condition_timedwait(ompi_condition_t *c,
}
} else {
while (c->c_signaled == 0 &&
(tv.tv_sec <= secs ||
(tv.tv_sec == secs && tv.tv_usec < usecs))) {
(tv.tv_sec <= abs.tv_sec ||
(tv.tv_sec == abs.tv_sec && tv.tv_usec < abs.tv_usec))) {
ompi_progress();
gettimeofday(&tv,NULL);
}