improve the scalability of MPI_Waitall ...
note that any code that sets a request to a completed state must now increment a counter for every completed request This commit was SVN r8073.
Этот коммит содержится в:
родитель
62fd74140b
Коммит
4c7c277b0a
@ -84,6 +84,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
|
||||
* broadcast the condition on the request in order to allow the other threa ds
|
||||
* to complete their test/wait functions.
|
||||
*/
|
||||
ompi_request_completed++;
|
||||
if(ompi_request_waiting) {
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
@ -158,6 +159,7 @@ static void mca_pml_ob1_put_completion(
|
||||
recvreq->req_recv.req_base.req_pml_complete = true;
|
||||
recvreq->req_recv.req_base.req_ompi.req_complete = true;
|
||||
|
||||
ompi_request_completed++;
|
||||
if(ompi_request_waiting) {
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
@ -326,6 +328,8 @@ static void mca_pml_ob1_rget_completion(
|
||||
recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered;
|
||||
recvreq->req_recv.req_base.req_pml_complete = true;
|
||||
recvreq->req_recv.req_base.req_ompi.req_complete = true;
|
||||
|
||||
ompi_request_completed++;
|
||||
if(ompi_request_waiting) {
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
@ -529,6 +533,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
recvreq->req_recv.req_base.req_pml_complete = true;
|
||||
recvreq->req_recv.req_base.req_ompi.req_complete = true;
|
||||
|
||||
ompi_request_completed++;
|
||||
if(ompi_request_waiting) {
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
@ -581,6 +586,7 @@ void mca_pml_ob1_recv_request_matched_probe(
|
||||
recvreq->req_recv.req_base.req_pml_complete = true;
|
||||
recvreq->req_recv.req_base.req_ompi.req_complete = true;
|
||||
|
||||
ompi_request_completed++;
|
||||
if(ompi_request_waiting) {
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
|
@ -233,6 +233,7 @@ do {
|
||||
(sendreq)->req_send.req_base.req_ompi.req_status._count = \
|
||||
(sendreq)->req_send.req_bytes_packed; \
|
||||
(sendreq)->req_send.req_base.req_ompi.req_complete = true; \
|
||||
ompi_request_completed++; \
|
||||
if(ompi_request_waiting) { \
|
||||
opal_condition_broadcast(&ompi_request_cond); \
|
||||
} \
|
||||
|
@ -73,6 +73,7 @@ static int mca_pml_teg_recv_request_cancel(struct ompi_request_t* request, int c
|
||||
* broadcast the condition on the request in order to allow the other threa ds
|
||||
* to complete their test/wait functions.
|
||||
*/
|
||||
ompi_request_completed++;
|
||||
if(ompi_request_waiting) {
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
@ -117,6 +118,7 @@ void mca_pml_teg_recv_request_progress(
|
||||
req->req_recv.req_base.req_ompi.req_status._count = req->req_bytes_delivered;
|
||||
req->req_recv.req_base.req_pml_complete = true;
|
||||
req->req_recv.req_base.req_ompi.req_complete = true;
|
||||
ompi_request_completed++;
|
||||
if(ompi_request_waiting) {
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
|
@ -183,6 +183,7 @@ void mca_pml_teg_send_request_progress(
|
||||
req->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
req->req_send.req_base.req_ompi.req_status._count = req->req_bytes_sent;
|
||||
req->req_send.req_base.req_ompi.req_complete = true;
|
||||
ompi_request_completed++;
|
||||
if(ompi_request_waiting) {
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
|
@ -132,6 +132,7 @@ int ompi_request_wait_all(
|
||||
* unless required
|
||||
*/
|
||||
if (completed != count) {
|
||||
|
||||
/*
|
||||
* acquire lock and test for completion - if all requests are
|
||||
* not completed pend on condition variable until a request
|
||||
@ -139,7 +140,22 @@ int ompi_request_wait_all(
|
||||
*/
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
ompi_request_waiting++;
|
||||
|
||||
do {
|
||||
/* check number of pending requests */
|
||||
size_t start = ompi_request_completed;
|
||||
size_t pending = count - completed;
|
||||
|
||||
/*
|
||||
* wait until at least pending requests complete
|
||||
*/
|
||||
while (pending > ompi_request_completed - start) {
|
||||
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* confirm that all pending operations have completed
|
||||
*/
|
||||
completed = 0;
|
||||
rptr = requests;
|
||||
for (i = 0; i < count; i++) {
|
||||
@ -148,9 +164,6 @@ int ompi_request_wait_all(
|
||||
completed++;
|
||||
}
|
||||
}
|
||||
if (completed != count) {
|
||||
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
|
||||
}
|
||||
} while (completed != count);
|
||||
ompi_request_waiting--;
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
@ -22,13 +22,14 @@
|
||||
#include "request/request.h"
|
||||
#include "ompi/include/constants.h"
|
||||
|
||||
ompi_pointer_array_t ompi_request_f_to_c_table;
|
||||
OMPI_DECLSPEC volatile int ompi_request_waiting = 0;
|
||||
OMPI_DECLSPEC opal_mutex_t ompi_request_lock;
|
||||
OMPI_DECLSPEC opal_condition_t ompi_request_cond;
|
||||
OMPI_DECLSPEC ompi_request_t ompi_request_null;
|
||||
OMPI_DECLSPEC ompi_request_t ompi_request_empty;
|
||||
ompi_status_public_t ompi_status_empty;
|
||||
ompi_pointer_array_t ompi_request_f_to_c_table;
|
||||
OMPI_DECLSPEC size_t ompi_request_waiting = 0;
|
||||
OMPI_DECLSPEC size_t ompi_request_completed = 0;
|
||||
OMPI_DECLSPEC opal_mutex_t ompi_request_lock;
|
||||
OMPI_DECLSPEC opal_condition_t ompi_request_cond;
|
||||
OMPI_DECLSPEC ompi_request_t ompi_request_null;
|
||||
OMPI_DECLSPEC ompi_request_t ompi_request_empty;
|
||||
ompi_status_public_t ompi_status_empty;
|
||||
|
||||
|
||||
static void ompi_request_construct(ompi_request_t* req)
|
||||
@ -149,6 +150,7 @@ int ompi_request_finalize(void)
|
||||
int ompi_request_complete(ompi_request_t* request)
|
||||
{
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
ompi_request_completed++;
|
||||
request->req_complete = true;
|
||||
if(ompi_request_waiting)
|
||||
opal_condition_signal(&ompi_request_cond);
|
||||
|
@ -139,7 +139,8 @@ typedef struct ompi_request_t ompi_request_t;
|
||||
* Globals used for tracking requests and request completion.
|
||||
*/
|
||||
OMPI_DECLSPEC extern ompi_pointer_array_t ompi_request_f_to_c_table;
|
||||
OMPI_DECLSPEC extern volatile int ompi_request_waiting;
|
||||
OMPI_DECLSPEC extern size_t ompi_request_waiting;
|
||||
OMPI_DECLSPEC extern size_t ompi_request_completed;
|
||||
OMPI_DECLSPEC extern opal_mutex_t ompi_request_lock;
|
||||
OMPI_DECLSPEC extern opal_condition_t ompi_request_cond;
|
||||
OMPI_DECLSPEC extern int ompi_request_poll_iterations;
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user