Vprotocol pessimist benefits from customizable requests. Waitany, waitsome, test, testany, testall, testsome can now be hooked and are therefore logged correctly.
This commit was SVN r16885.
Этот коммит содержится в:
родитель
a80cfe0941
Коммит
859169214c
@ -17,7 +17,6 @@ mca_vprotocol_pessimist_module_t mca_vprotocol_pessimist =
|
||||
/* mca_pml_base_module_del_procs_fn_t */ NULL,
|
||||
/* mca_pml_base_module_enable_fn_f */ mca_vprotocol_pessimist_enable,
|
||||
/* mca_pml_base_module_progress_fn_t */ NULL, /*mca_vprotocol_pessimist_progress,*/
|
||||
|
||||
/* mca_pml_base_module_add_comm_fn_t */ NULL,
|
||||
/* mca_pml_base_module_del_comm_fn_t */ NULL,
|
||||
/* mca_pml_base_module_irecv_init_fn_t */ NULL,
|
||||
@ -29,9 +28,17 @@ mca_vprotocol_pessimist_module_t mca_vprotocol_pessimist =
|
||||
/* mca_pml_base_module_iprobe_fn_t */ mca_vprotocol_pessimist_iprobe,
|
||||
/* mca_pml_base_module_probe_fn_t */ mca_vprotocol_pessimist_probe,
|
||||
/* mca_pml_base_module_start_fn_t */ mca_vprotocol_pessimist_start,
|
||||
|
||||
/* mca_pml_base_module_dump_fn_t */ mca_vprotocol_pessimist_dump,
|
||||
|
||||
/* ompi_request_test_fn_t */ mca_vprotocol_pessimist_test,
|
||||
/* ompi_request_testany_fn_t */ mca_vprotocol_pessimist_test_any,
|
||||
/* ompi_request_testall_fn_t */ mca_vprotocol_pessimist_test_all,
|
||||
/* ompi_request_testsome_fn_t */ mca_vprotocol_pessimist_test_some,
|
||||
/* ompi_request_wait_fn_t */ NULL,
|
||||
/* ompi_request_waitany_fn_t */ mca_vprotocol_pessimist_wait_any,
|
||||
/* ompi_request_waitall_fn_t */ NULL,
|
||||
/* ompi_request_waitsome_fn_t */ mca_vprotocol_pessimist_wait_some,
|
||||
|
||||
/* opal_class_t * */ OBJ_CLASS(mca_vprotocol_pessimist_recv_request_t),
|
||||
/* opal_class_t * */ OBJ_CLASS(mca_vprotocol_pessimist_send_request_t),
|
||||
},
|
||||
|
@ -92,9 +92,9 @@ int mca_vprotocol_pessimist_probe(int src, int tag,
|
||||
struct ompi_communicator_t *comm,
|
||||
ompi_status_public_t * status );
|
||||
|
||||
#include "vprotocol_pessimist_wait.h"
|
||||
#include "vprotocol_pessimist_request.h"
|
||||
#include "vprotocol_pessimist_start.h"
|
||||
#include "vprotocol_pessimist_wait.h"
|
||||
#include "vprotocol_pessimist_eventlog.h"
|
||||
|
||||
#endif /* __INCLUDE_VPROTOCOL_PESSIMIST_H__ */
|
||||
|
@ -47,7 +47,8 @@ void vprotocol_pessimist_matching_replay(int *src) {
|
||||
}
|
||||
|
||||
void vprotocol_pessimist_delivery_replay(size_t n, ompi_request_t **reqs,
|
||||
int *index, ompi_status_public_t *status) {
|
||||
int *outcount, int *index,
|
||||
ompi_status_public_t *status) {
|
||||
mca_vprotocol_pessimist_event_t *event;
|
||||
|
||||
for(event = (mca_vprotocol_pessimist_event_t *) opal_list_get_first(&mca_vprotocol_pessimist.replay_events);
|
||||
@ -63,6 +64,7 @@ void vprotocol_pessimist_delivery_replay(size_t n, ompi_request_t **reqs,
|
||||
/* this particular test have to return no request completed yet */
|
||||
V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
|
||||
*index = MPI_UNDEFINED;
|
||||
*outcount = 0;
|
||||
mca_vprotocol_pessimist.clock++;
|
||||
/* This request have to stay in the queue until probeid matches */
|
||||
return;
|
||||
@ -79,6 +81,7 @@ void vprotocol_pessimist_delivery_replay(size_t n, ompi_request_t **reqs,
|
||||
(opal_list_item_t *) event);
|
||||
VPESSIMIST_EVENT_RETURN(event);
|
||||
*index = i;
|
||||
*outcount = 1;
|
||||
mca_vprotocol_pessimist.clock++;
|
||||
ompi_request_wait(&reqs[i], status);
|
||||
return;
|
||||
@ -87,6 +90,7 @@ void vprotocol_pessimist_delivery_replay(size_t n, ompi_request_t **reqs,
|
||||
V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
|
||||
assert(devent->reqid == 0); /* make sure we don't missed a request */
|
||||
*index = MPI_UNDEFINED;
|
||||
*outcount = 0;
|
||||
mca_vprotocol_pessimist.clock++;
|
||||
opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
|
||||
(opal_list_item_t *) event);
|
||||
|
@ -61,7 +61,7 @@
|
||||
#define VPROTOCOL_PESSIMIST_SEND_BUFFER() do { \
|
||||
if(mca_vprotocol_pessimist.event_buffer_length) \
|
||||
{ \
|
||||
/* write(2, mca_vprotocol_pessimist.event_buffer, \
|
||||
/* pml_v.host_pml.send(mca_vprotocol_pessimist.event_buffer, MPI_BYTE, \
|
||||
mca_vprotocol_pessimist.event_buffer_length * \
|
||||
sizeof(vprotocol_pessimist_mem_event_t)); */ \
|
||||
mca_vprotocol_pessimist.event_buffer_length = 0; \
|
||||
@ -139,17 +139,16 @@ void vprotocol_pessimist_matching_replay(int *src);
|
||||
vprotocol_pessimist_delivery_event_t *devent; \
|
||||
\
|
||||
if(req == NULL) \
|
||||
{ \
|
||||
{ /* No request delivered to this probe, we need to count howmany times*/ \
|
||||
V_OUTPUT_VERBOSE(70, "pessimist:\tlog\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock); \
|
||||
event = (mca_vprotocol_pessimist_event_t*)opal_list_get_last(&mca_vprotocol_pessimist.pending_events); \
|
||||
if(event->type == VPROTOCOL_PESSIMIST_EVENT_TYPE_DELIVERY && \
|
||||
event->u_event.e_delivery.reqid == 0) \
|
||||
{ \
|
||||
/* consecutive probes not delivering anything are merged */ \
|
||||
{ /* consecutive probes not delivering anything are merged */ \
|
||||
event->u_event.e_delivery.probeid = mca_vprotocol_pessimist.clock++; \
|
||||
} \
|
||||
else \
|
||||
{ \
|
||||
{ /* Last event is not a failed probe, lets create a new event then */ \
|
||||
VPESSIMIST_DELIVERY_EVENT_NEW(event); \
|
||||
devent = &(event->u_event.e_delivery); \
|
||||
devent->probeid = mca_vprotocol_pessimist.clock++; \
|
||||
@ -159,7 +158,7 @@ void vprotocol_pessimist_matching_replay(int *src);
|
||||
} \
|
||||
} \
|
||||
else \
|
||||
{ \
|
||||
{ /* A request have been delivered, log which one it is */ \
|
||||
V_OUTPUT_VERBOSE(70, "pessimist:\tlog\tdeliver\t%"PRIpclock"\treq %"PRIpclock, mca_vprotocol_pessimist.clock, VPESSIMIST_REQ(req)->reqid); \
|
||||
VPESSIMIST_DELIVERY_EVENT_NEW(event); \
|
||||
devent = &(event->u_event.e_delivery); \
|
||||
@ -174,14 +173,15 @@ void vprotocol_pessimist_matching_replay(int *src);
|
||||
* event clock
|
||||
* n (IN): the number of input requests
|
||||
* reqs (IN): the set of considered requests (pml_base_request_t *)
|
||||
* i (IN/OUT): index(es) of the delivered request (currently always 1 at a time)
|
||||
* status (IN/OUT): status of the delivered request
|
||||
* outcount (OUT): number of delivered requests
|
||||
* i (OUT): index(es) of the delivered request
|
||||
* status (OUT): status of the delivered request
|
||||
*/
|
||||
#define VPROTOCOL_PESSIMIST_DELIVERY_REPLAY(n, reqs, i, status) do { \
|
||||
#define VPROTOCOL_PESSIMIST_DELIVERY_REPLAY(n, reqs, outcount, i, status) do {\
|
||||
if(mca_vprotocol_pessimist.replay) \
|
||||
vprotocol_pessimist_delivery_replay(n, reqs, i, status); \
|
||||
vprotocol_pessimist_delivery_replay(n, reqs, outcount, i, status); \
|
||||
} while(0)
|
||||
void vprotocol_pessimist_delivery_replay(size_t, ompi_request_t **,
|
||||
int *, ompi_status_public_t *);
|
||||
int *, int *, ompi_status_public_t *);
|
||||
|
||||
#endif /* __VPROTOCOL_PESSIMIST_EVENTLOG_H__ */
|
||||
|
@ -13,53 +13,160 @@
|
||||
#include "vprotocol_pessimist_wait.h"
|
||||
|
||||
|
||||
int vprotocol_pessimist_request_null_free(ompi_request_t **req)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
/* Helpers to prevent requests from being freed by the real wait/test */
|
||||
static int vprotocol_pessimist_request_no_free(ompi_request_t **req) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
#define PREPARE_REQUESTS_WITH_NO_FREE(count, requests) do { \
|
||||
size_t i; \
|
||||
for(i = 0; i < count; i++) \
|
||||
{ \
|
||||
if(requests[i] == MPI_REQUEST_NULL) continue; \
|
||||
requests[i]->req_free = vprotocol_pessimist_request_no_free; \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
|
||||
int mca_vprotocol_pessimist_wait_any(size_t count, ompi_request_t ** requests, int *index, ompi_status_public_t * status)
|
||||
int mca_vprotocol_pessimist_test(ompi_request_t ** rptr, int *completed,
|
||||
ompi_status_public_t * status)
|
||||
{
|
||||
int ret;
|
||||
size_t i;
|
||||
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_REPLAY(count, requests, index, status);
|
||||
int ret;
|
||||
int index;
|
||||
|
||||
# define pml_req ((mca_pml_base_request_t *) requests[i])
|
||||
/* Avoid the request to be disposed by waitall */
|
||||
for(i = 0; i < count; i++)
|
||||
{
|
||||
if(requests[i] == MPI_REQUEST_NULL) continue;
|
||||
requests[i]->req_free = vprotocol_pessimist_request_null_free;
|
||||
}
|
||||
|
||||
ret = ompi_request_wait_any(count, requests, index, status);
|
||||
|
||||
/* Parse the result */
|
||||
for(i = 0; i < count; i++)
|
||||
{
|
||||
if(requests[i] == MPI_REQUEST_NULL) continue;
|
||||
/* Restore requests and store they've been probed for termination */
|
||||
pml_req->req_ompi.req_free = mca_vprotocol_pessimist_request_free;
|
||||
|
||||
if(i == (size_t) *index)
|
||||
{
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_LOG(pml_req);
|
||||
|
||||
/* only free request without error status */
|
||||
if(pml_req->req_ompi.req_status.MPI_ERROR == MPI_SUCCESS)
|
||||
ompi_request_free(&(requests[i]));
|
||||
else
|
||||
ret = pml_req->req_ompi.req_status.MPI_ERROR;
|
||||
}
|
||||
# undef pml_req
|
||||
}
|
||||
return ret;
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_REPLAY(1, rptr, completed, &index, status);
|
||||
|
||||
ret = mca_pml_v.host_request_fns.req_test(rptr, completed, status);
|
||||
if(completed)
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_LOG(rptr);
|
||||
else
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_LOG(NULL);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int mca_vprotocol_pessimist_wait_some(size_t count, ompi_request_t ** requests, int *indexes, ompi_status_public_t * statuses)
|
||||
int mca_vprotocol_pessimist_test_all(size_t count, ompi_request_t ** requests,
|
||||
int *completed,
|
||||
ompi_status_public_t * statuses)
|
||||
{
|
||||
return mca_vprotocol_pessimist_wait_any(count, requests, indexes, statuses);
|
||||
int ret;
|
||||
int index;
|
||||
|
||||
/* /!\ this is not correct until I upgrade DELIVERY_REPLAY to manage several requests at once */
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_REPLAY(1, requests, completed, &index, statuses);
|
||||
|
||||
ret = mca_pml_v.host_request_fns.req_test_all(count, requests, completed,
|
||||
statuses);
|
||||
if(completed)
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_LOG(requests); /* /!\ need to make sure this is correct: what if first is request_null ? */
|
||||
else
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_LOG(NULL);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/* TESTANY and WAITANY */
|
||||
|
||||
int mca_vprotocol_pessimist_test_any(size_t count, ompi_request_t ** requests,
|
||||
int *index, int *completed,
|
||||
ompi_status_public_t * status)
|
||||
{
|
||||
int ret;
|
||||
size_t i;
|
||||
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_REPLAY(count, requests, completed, index, status);
|
||||
|
||||
PREPARE_REQUESTS_WITH_NO_FREE(count, requests);
|
||||
|
||||
/* Call the real one to do the job */
|
||||
ret = mca_pml_v.host_request_fns.req_test_any(count, requests, index, completed,
|
||||
status);
|
||||
|
||||
if(completed)
|
||||
{ /* Parse the result */
|
||||
# define pml_req ((mca_pml_base_request_t *) requests[i])
|
||||
for(i = 0; i < count; i++)
|
||||
{
|
||||
if(requests[i] == MPI_REQUEST_NULL) continue;
|
||||
/* Restore requests and store they've been delivered */
|
||||
pml_req->req_ompi.req_free = mca_vprotocol_pessimist_request_free;
|
||||
|
||||
if(i == (size_t) *index)
|
||||
{
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_LOG(pml_req);
|
||||
|
||||
/* only free request without error status */
|
||||
if(pml_req->req_ompi.req_status.MPI_ERROR == MPI_SUCCESS)
|
||||
ompi_request_free(&(requests[i]));
|
||||
else
|
||||
ret = pml_req->req_ompi.req_status.MPI_ERROR;
|
||||
}
|
||||
}
|
||||
# undef pml_req
|
||||
}
|
||||
else
|
||||
{ /* No request delivered this time, log it */
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_LOG(NULL);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int mca_vprotocol_pessimist_wait_any(size_t count, ompi_request_t ** requests,
|
||||
int *index, ompi_status_public_t * status)
|
||||
{
|
||||
int ret;
|
||||
size_t i;
|
||||
int dummy;
|
||||
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_REPLAY(count, requests, &dummy, index, status);
|
||||
|
||||
PREPARE_REQUESTS_WITH_NO_FREE(count, requests);
|
||||
|
||||
/* Call the real one to do the job */
|
||||
ret = mca_pml_v.host_request_fns.req_wait_any(count, requests, index, status);
|
||||
|
||||
/* Parse the result */
|
||||
# define pml_req ((mca_pml_base_request_t *) requests[i])
|
||||
for(i = 0; i < count; i++)
|
||||
{
|
||||
if(requests[i] == MPI_REQUEST_NULL) continue;
|
||||
/* Restore requests and store they've been delivered */
|
||||
pml_req->req_ompi.req_free = mca_vprotocol_pessimist_request_free;
|
||||
|
||||
if(i == (size_t) *index)
|
||||
{
|
||||
VPROTOCOL_PESSIMIST_DELIVERY_LOG(pml_req);
|
||||
|
||||
/* only free request without error status */
|
||||
if(pml_req->req_ompi.req_status.MPI_ERROR == MPI_SUCCESS)
|
||||
ompi_request_free(&(requests[i]));
|
||||
else
|
||||
ret = pml_req->req_ompi.req_status.MPI_ERROR;
|
||||
}
|
||||
# undef pml_req
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/* TESTSOME and WAITSOME */
|
||||
|
||||
int mca_vprotocol_pessimist_test_some(size_t count, ompi_request_t ** requests,
|
||||
int * outcount, int * indices,
|
||||
ompi_status_public_t * statuses)
|
||||
{
|
||||
int ret;
|
||||
ret = mca_vprotocol_pessimist_test_any(count, requests, indices, outcount, statuses);
|
||||
if(*outcount) *outcount = 1;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int mca_vprotocol_pessimist_wait_some(size_t count, ompi_request_t ** requests,
|
||||
int *outcount, int *indexes,
|
||||
ompi_status_public_t * statuses)
|
||||
{
|
||||
int ret;
|
||||
ret = mca_vprotocol_pessimist_wait_any(count, requests, indexes, statuses);
|
||||
if(MPI_UNDEFINED == *indexes) *outcount = 0;
|
||||
else *outcount = 1;
|
||||
return ret;
|
||||
}
|
||||
|
@ -13,33 +13,70 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
#include "vprotocol_pessimist.h"
|
||||
|
||||
int vprotocol_pessimist_request_null_free(ompi_request_t **req);
|
||||
#include "ompi/request/request.h"
|
||||
|
||||
#define VPROTOCOL_PESSIMIST_WAIT(req, status, rc) \
|
||||
((rc) = ompi_request_wait(req, status))
|
||||
|
||||
int mca_vprotocol_pessimist_test(ompi_request_t ** rptr, int *completed,
|
||||
ompi_status_public_t * status);
|
||||
|
||||
int mca_vprotocol_pessimist_test_all(size_t count, ompi_request_t ** requests,
|
||||
int *completed,
|
||||
ompi_status_public_t * statuses);
|
||||
|
||||
int mca_vprotocol_pessimist_test_any(size_t count, ompi_request_t ** requests,
|
||||
int *index, int *completed,
|
||||
ompi_status_public_t * status);
|
||||
|
||||
int mca_vprotocol_pessimist_test_some(size_t count, ompi_request_t ** requests,
|
||||
int * outcount, int * indices,
|
||||
ompi_status_public_t * statuses);
|
||||
|
||||
int mca_vprotocol_pessimist_wait_any(size_t count, ompi_request_t ** requests,
|
||||
int *index, ompi_status_public_t * status);
|
||||
|
||||
int mca_vprotocol_pessimist_wait_some(size_t count, ompi_request_t ** requests,
|
||||
int *outcount, int *indexes,
|
||||
ompi_status_public_t * statuses);
|
||||
|
||||
#endif /* __VPROTOCOL_PESSIMIST_WAIT_H__ */
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#if 0
|
||||
**** Previous implementation of wait, now outdated but kept for future references *****
|
||||
int vprotocol_pessimist_request_null_free(ompi_request_t **req);
|
||||
|
||||
|
||||
do { \
|
||||
if(*(req) == MPI_REQUEST_NULL) (rc) = ompi_request_wait(req, status); \
|
||||
else \
|
||||
{ \
|
||||
mca_pml_base_request_t *pml_req = (mca_pml_base_request_t *) *(req); \
|
||||
ompi_request_free_fn_t free_fn = pml_req->req_ompi.req_free; \
|
||||
pml_req->req_ompi.req_free = vprotocol_pessimist_request_null_free; \
|
||||
V_OUTPUT_VERBOSE(50, "pessimist:\twait\tdeliver\t%d:%llx\tpeer %d\ttag %d\tsize %d", pml_req->req_comm->c_contextid, pml_req->req_sequence, pml_req->req_peer, pml_req->req_tag, pml_req->req_count); \
|
||||
(rc) = ompi_request_wait(req, status); \
|
||||
VPROTOCOL_PESSIMIST_MATCHING_LOG(pml_req); \
|
||||
pml_req->req_ompi.req_free = free_fn; \
|
||||
ompi_request_free(req); \
|
||||
} \
|
||||
if(*(req) == MPI_REQUEST_NULL) (rc) = ompi_request_wait(req, status); \
|
||||
else \
|
||||
{ \
|
||||
mca_pml_base_request_t *pml_req = (mca_pml_base_request_t *) *(req); \
|
||||
ompi_request_free_fn_t free_fn = pml_req->req_ompi.req_free; \
|
||||
pml_req->req_ompi.req_free = vprotocol_pessimist_request_null_free; \
|
||||
V_OUTPUT_VERBOSE(50, "pessimist:\twait\tdeliver\t%d:%llx\tpeer %d\ttag %d\tsize %d", pml_req->req_comm->c_contextid, pml_req->req_sequence, pml_req->req_peer, pml_req->req_tag, pml_req->req_count); \
|
||||
(rc) = ompi_request_wait(req, status); \
|
||||
VPROTOCOL_PESSIMIST_MATCHING_LOG(pml_req); \
|
||||
pml_req->req_ompi.req_free = free_fn; \
|
||||
ompi_request_free(req); \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
OMPI_DECLSPEC int mca_vprotocol_pessimist_wait(ompi_request_t **request, ompi_status_public_t *status);
|
||||
OMPI_DECLSPEC int mca_vprotocol_pessimist_wait_all(size_t count, ompi_request_t ** requests, ompi_status_public_t * statuses);
|
||||
#endif
|
||||
|
||||
int mca_vprotocol_pessimist_wait_any(size_t count, ompi_request_t ** requests, int *index, ompi_status_public_t * status);
|
||||
int mca_vprotocol_pessimist_wait_some(size_t count, ompi_request_t ** requests, int *indexes, ompi_status_public_t * statuses);
|
||||
|
||||
#endif /* __VPROTOCOL_PESSIMIST_WAIT_H__ */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user