- removed req_query from base request - only needed for generalized requests
- fix for persistent recvs This commit was SVN r3235.
Этот коммит содержится в:
родитель
2adc1fd062
Коммит
cad5b2c923
@ -44,8 +44,8 @@ bool mca_pml_teg_recv_frag_match(
|
||||
* Initialize request status.
|
||||
*/
|
||||
request->req_bytes_packed = header->hdr_msg_length;
|
||||
request->req_base.req_peer = header->hdr_src;
|
||||
request->req_base.req_tag = header->hdr_tag;
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE = header->hdr_src;
|
||||
request->req_base.req_ompi.req_status.MPI_TAG = header->hdr_tag;
|
||||
|
||||
/*
|
||||
* If probe - signal request is complete - but don't notify PTL
|
||||
|
@ -35,7 +35,6 @@ static int mca_pml_teg_recv_request_cancel(struct ompi_request_t* request, int c
|
||||
static void mca_pml_teg_recv_request_construct(mca_pml_base_recv_request_t* request)
|
||||
{
|
||||
request->req_base.req_type = MCA_PML_REQUEST_RECV;
|
||||
request->req_base.req_ompi.req_query = NULL;
|
||||
request->req_base.req_ompi.req_fini = mca_pml_teg_recv_request_fini;
|
||||
request->req_base.req_ompi.req_free = mca_pml_teg_recv_request_free;
|
||||
request->req_base.req_ompi.req_cancel = mca_pml_teg_recv_request_cancel;
|
||||
@ -68,9 +67,6 @@ void mca_pml_teg_recv_request_progress(
|
||||
req->req_bytes_delivered += bytes_delivered;
|
||||
if (req->req_bytes_received >= req->req_bytes_packed) {
|
||||
/* initialize request status */
|
||||
req->req_base.req_ompi.req_status.MPI_SOURCE = req->req_base.req_peer;
|
||||
req->req_base.req_ompi.req_status.MPI_TAG = req->req_base.req_tag;
|
||||
req->req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
req->req_base.req_ompi.req_status._count = req->req_bytes_delivered;
|
||||
req->req_base.req_pml_complete = true;
|
||||
req->req_base.req_ompi.req_complete = true;
|
||||
@ -200,8 +196,8 @@ static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc(
|
||||
}
|
||||
ompi_list_remove_item(unexpected_frags, (ompi_list_item_t*)frag);
|
||||
request->req_bytes_packed = header->hdr_msg_length;
|
||||
request->req_base.req_tag = header->hdr_tag;
|
||||
request->req_base.req_peer = header->hdr_src;
|
||||
request->req_base.req_ompi.req_status.MPI_TAG = header->hdr_tag;
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE = header->hdr_src;
|
||||
frag->frag_request = request;
|
||||
return frag;
|
||||
}
|
||||
|
@ -71,6 +71,7 @@ static inline int mca_pml_teg_recv_request_start(mca_pml_base_recv_request_t* re
|
||||
request->req_base.req_pml_complete = false;
|
||||
request->req_base.req_ompi.req_complete = false;
|
||||
request->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE;
|
||||
request->req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
|
||||
/* attempt to match posted recv */
|
||||
if(request->req_base.req_peer == OMPI_ANY_SOURCE) {
|
||||
|
@ -36,7 +36,6 @@ static int mca_pml_teg_send_request_cancel(struct ompi_request_t* request, int c
|
||||
static void mca_pml_teg_send_request_construct(mca_pml_base_send_request_t* req)
|
||||
{
|
||||
req->req_base.req_type = MCA_PML_REQUEST_SEND;
|
||||
req->req_base.req_ompi.req_query = NULL;
|
||||
req->req_base.req_ompi.req_fini = mca_pml_teg_send_request_fini;
|
||||
req->req_base.req_ompi.req_free = mca_pml_teg_send_request_free;
|
||||
req->req_base.req_ompi.req_cancel = mca_pml_teg_send_request_cancel;
|
||||
|
@ -72,7 +72,8 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag)
|
||||
if(header->hdr_frag_length > 0) {
|
||||
/* initialize receive convertor */
|
||||
ompi_proc_t *proc =
|
||||
ompi_comm_peer_lookup(request->req_base.req_comm, request->req_base.req_peer);
|
||||
ompi_comm_peer_lookup(request->req_base.req_comm,
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE);
|
||||
ompi_convertor_copy(proc->proc_convertor, &frag->frag_recv.frag_base.frag_convertor);
|
||||
ompi_convertor_init_for_recv(
|
||||
&frag->frag_recv.frag_base.frag_convertor, /* convertor */
|
||||
@ -120,7 +121,8 @@ static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag)
|
||||
*/
|
||||
struct iovec iov;
|
||||
ompi_proc_t *proc =
|
||||
ompi_comm_peer_lookup(request->req_base.req_comm, request->req_base.req_peer);
|
||||
ompi_comm_peer_lookup(request->req_base.req_comm,
|
||||
request->req_base.req_ompi.req_status.MPI_SOURCE);
|
||||
ompi_convertor_copy(proc->proc_convertor, &frag->frag_recv.frag_base.frag_convertor);
|
||||
ompi_convertor_init_for_recv(
|
||||
&frag->frag_recv.frag_base.frag_convertor, /* convertor */
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include "mpi.h"
|
||||
#include "mpi/c/bindings.h"
|
||||
#include "communicator/communicator.h"
|
||||
#include "request/request.h"
|
||||
#include "request/grequest.h"
|
||||
#include "errhandler/errhandler.h"
|
||||
|
||||
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
|
||||
@ -32,7 +32,14 @@ int MPI_Grequest_complete(MPI_Request request)
|
||||
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
|
||||
}
|
||||
|
||||
rc = ompi_request_complete(request);
|
||||
switch(request->req_type) {
|
||||
OMPI_REQUEST_GEN:
|
||||
rc = ompi_grequest_complete((ompi_grequest_t*)request);
|
||||
break;
|
||||
default:
|
||||
rc = MPI_ERR_REQUEST;
|
||||
break;
|
||||
}
|
||||
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, MPI_ERR_INTERN, FUNC_NAME);
|
||||
}
|
||||
|
||||
|
@ -2,15 +2,6 @@
|
||||
#include "request/grequest.h"
|
||||
|
||||
|
||||
static int ompi_grequest_query(ompi_request_t* req, ompi_status_public_t* status)
|
||||
{
|
||||
int rc = OMPI_SUCCESS;
|
||||
ompi_grequest_t* greq = (ompi_grequest_t*)req;
|
||||
if(greq->greq_query != NULL)
|
||||
rc = greq->greq_query(greq->greq_state, status);
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int ompi_grequest_free(ompi_request_t** req)
|
||||
{
|
||||
int rc = OMPI_SUCCESS;
|
||||
@ -36,7 +27,6 @@ static int ompi_grequest_cancel(ompi_request_t* req, int flag)
|
||||
static void ompi_grequest_construct(ompi_grequest_t* greq)
|
||||
{
|
||||
OMPI_REQUEST_INIT(&greq->greq_base);
|
||||
greq->greq_base.req_query = ompi_grequest_query;
|
||||
greq->greq_base.req_fini = ompi_grequest_free;
|
||||
greq->greq_base.req_free = ompi_grequest_free;
|
||||
greq->greq_base.req_cancel = ompi_grequest_cancel;
|
||||
@ -77,3 +67,16 @@ int ompi_grequest_start(
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_grequest_complete(ompi_grequest_t* grequest)
|
||||
{
|
||||
int rc = OMPI_SUCCESS;
|
||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||
grequest->greq_base.req_complete = true;
|
||||
if(grequest->greq_query != NULL)
|
||||
rc = grequest->greq_query(grequest->greq_state, &grequest->greq_base.req_status);
|
||||
if(ompi_request_waiting)
|
||||
ompi_condition_signal(&ompi_request_cond);
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,10 @@ struct ompi_grequest_t {
|
||||
typedef struct ompi_grequest_t ompi_grequest_t;
|
||||
|
||||
|
||||
/*
|
||||
* Start a generalized request.
|
||||
*/
|
||||
|
||||
int ompi_grequest_start(
|
||||
MPI_Grequest_query_function *gquery,
|
||||
MPI_Grequest_free_function *gfree,
|
||||
@ -28,6 +32,11 @@ int ompi_grequest_start(
|
||||
void* gstate,
|
||||
ompi_request_t** request);
|
||||
|
||||
/*
|
||||
* Mark a generalized request as complete.
|
||||
*/
|
||||
int ompi_grequest_complete(ompi_grequest_t*);
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -16,8 +16,9 @@ int ompi_request_wait_any(
|
||||
#if OMPI_HAVE_THREADS
|
||||
int c;
|
||||
#endif
|
||||
unsigned int i, null_requests;
|
||||
int rc, completed = -1;
|
||||
size_t i, null_requests;
|
||||
int rc;
|
||||
int completed = -1;
|
||||
ompi_request_t **rptr;
|
||||
ompi_request_t *request;
|
||||
|
||||
|
@ -19,7 +19,6 @@ ompi_request_t ompi_request_null;
|
||||
static void ompi_request_construct(ompi_request_t* req)
|
||||
{
|
||||
OMPI_REQUEST_INIT(req);
|
||||
req->req_query = NULL;
|
||||
req->req_fini = NULL;
|
||||
req->req_free = NULL;
|
||||
req->req_cancel = NULL;
|
||||
@ -62,7 +61,6 @@ int ompi_request_init(void)
|
||||
|
||||
ompi_request_null.req_complete = true;
|
||||
ompi_request_null.req_type = OMPI_REQUEST_NULL;
|
||||
ompi_request_null.req_query = NULL;
|
||||
ompi_request_null.req_fini = ompi_request_null_free;
|
||||
ompi_request_null.req_free = ompi_request_null_free;
|
||||
ompi_request_null.req_cancel = ompi_request_null_cancel;
|
||||
@ -89,13 +87,6 @@ int ompi_request_complete(ompi_request_t* request)
|
||||
{
|
||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||
request->req_complete = true;
|
||||
if (request->req_query != NULL) {
|
||||
int rc = request->req_query(request, &request->req_status);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
if(ompi_request_waiting)
|
||||
ompi_condition_signal(&ompi_request_cond);
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
@ -51,15 +51,6 @@ typedef enum {
|
||||
|
||||
struct ompi_request_t;
|
||||
|
||||
|
||||
/*
|
||||
* Optional function that is called on request completion to update
|
||||
* the request status.
|
||||
*/
|
||||
typedef int (*ompi_request_query_fn_t)(
|
||||
struct ompi_request_t* request,
|
||||
ompi_status_public_t* status);
|
||||
|
||||
/*
|
||||
* Required function to free the request and any associated resources.
|
||||
*/
|
||||
@ -81,7 +72,6 @@ struct ompi_request_t {
|
||||
volatile bool req_complete; /**< Flag indicating wether request has completed */
|
||||
volatile ompi_request_state_t req_state; /**< enum indicate state of the request */
|
||||
int req_f_to_c_index; /**< Index in Fortran <-> C translation array */
|
||||
ompi_request_query_fn_t req_query; /**< Optional query function to retrieve status */
|
||||
ompi_request_free_fn_t req_fini; /**< Called by test/wait */
|
||||
ompi_request_free_fn_t req_free; /**< Called by free */
|
||||
ompi_request_cancel_fn_t req_cancel; /**< Optional function to cancel the request */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user