1
1

The request management framework has been redesigned. The main idea is

to let the PML (or io, more generally the low level request manager)
to have it's own release function (what was before the req_fini). This
function will only be called from the low level while the req_free will
be called from the upper level (MPI layer) in order to mark the request
as not used by the user anymore.

From the request point of view the requests will be marked as inactive
everytime we read their status (true for persistent as well). As 
MPI_REQUEST_NULL is already marked as inactive, the test and wait functions
are simpler. The drawback is that now we have to change in the
ompi_request_{test|wait} the req_status of the request once we get it's
status.

This commit was SVN r9290.
Этот коммит содержится в:
George Bosilca 2006-03-15 22:53:41 +00:00
родитель fd01a23e7f
Коммит 612570134f
31 изменённых файлов: 487 добавлений и 528 удалений

Просмотреть файл

@ -164,8 +164,6 @@ int mca_io_base_request_alloc(ompi_file_t *file,
(*req)->req_file = file; (*req)->req_file = file;
(*req)->req_ver = file->f_io_version; (*req)->req_ver = file->f_io_version;
(*req)->free_called = false; (*req)->free_called = false;
(*req)->super.req_fini =
file->f_io_selected_module.v1_0_0.io_module_request_fini;
(*req)->super.req_free = (*req)->super.req_free =
file->f_io_selected_module.v1_0_0.io_module_request_free; file->f_io_selected_module.v1_0_0.io_module_request_free;
(*req)->super.req_cancel = (*req)->super.req_cancel =
@ -195,7 +193,7 @@ int mca_io_base_request_alloc(ompi_file_t *file,
/* Initialize the request */ /* Initialize the request */
OMPI_REQUEST_INIT(&((*req)->super)); OMPI_REQUEST_INIT(&((*req)->super), false);
/* All done */ /* All done */

Просмотреть файл

@ -350,10 +350,6 @@ struct mca_io_base_module_1_0_0_t {
mca_io_base_module_request_once_finalize_fn_t io_module_request_once_finalize; mca_io_base_module_request_once_finalize_fn_t io_module_request_once_finalize;
/** Finalize a request (per usage) */
ompi_request_free_fn_t io_module_request_fini;
/** Free a request (per usage) */ /** Free a request (per usage) */
ompi_request_free_fn_t io_module_request_free; ompi_request_free_fn_t io_module_request_free;

Просмотреть файл

@ -64,7 +64,6 @@ typedef struct mca_io_romio_data_t mca_io_romio_data_t;
*/ */
int mca_io_romio_progress(void); int mca_io_romio_progress(void);
int mca_io_romio_request_fini(ompi_request_t **req);
int mca_io_romio_request_free(ompi_request_t **req); int mca_io_romio_request_free(ompi_request_t **req);
int mca_io_romio_request_cancel(ompi_request_t *req, int flag); int mca_io_romio_request_cancel(ompi_request_t *req, int flag);

Просмотреть файл

@ -289,7 +289,7 @@ static int progress()
* here * here
*/ */
if (ioreq->free_called) { if (ioreq->free_called) {
ret = ioreq->super.req_fini((ompi_request_t**) &ioreq); ret = ompi_request_free((ompi_request_t**) &ioreq);
if (OMPI_SUCCESS != ret) { if (OMPI_SUCCESS != ret) {
OPAL_THREAD_UNLOCK(&mca_io_romio_mutex); OPAL_THREAD_UNLOCK(&mca_io_romio_mutex);
return ret; return ret;

Просмотреть файл

@ -43,7 +43,6 @@ mca_io_base_module_1_0_0_t mca_io_romio_module = {
/* Finalize, free, cancel */ /* Finalize, free, cancel */
mca_io_romio_request_fini,
mca_io_romio_request_free, mca_io_romio_request_free,
mca_io_romio_request_cancel, mca_io_romio_request_cancel,

Просмотреть файл

@ -24,43 +24,26 @@
#include "io_romio.h" #include "io_romio.h"
int mca_io_romio_request_fini(ompi_request_t **req) int mca_io_romio_request_free(ompi_request_t **req)
{ {
mca_io_base_request_t *ioreq = *((mca_io_base_request_t**) req); mca_io_base_request_t *ioreq = *((mca_io_base_request_t**) req);
int ret = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&mca_io_romio_mutex); OPAL_THREAD_LOCK(&mca_io_romio_mutex);
/* clean up the fortran stuff, mark us as invalid */ /* clean up the fortran stuff, mark us as invalid */
OMPI_REQUEST_FINI(*req); OMPI_REQUEST_FINI(*req);
/* and shove us back in the free list */
mca_io_base_request_free(ioreq->req_file, ioreq);
OPAL_THREAD_UNLOCK(&mca_io_romio_mutex);
*req = MPI_REQUEST_NULL;
return ret;
}
int mca_io_romio_request_free(ompi_request_t **req)
{
mca_io_base_request_t *ioreq = *((mca_io_base_request_t**) req);
int ret = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&mca_io_romio_mutex);
ioreq->free_called = true; ioreq->free_called = true;
/* if the thing is done already, finalize it and get out... */ /* if the thing is done already, finalize it and get out... */
if (ioreq->super.req_complete) { if (ioreq->super.req_complete) {
ret = ioreq->super.req_fini(req); mca_io_base_request_free(ioreq->req_file, ioreq);
} }
OPAL_THREAD_UNLOCK(&mca_io_romio_mutex); OPAL_THREAD_UNLOCK(&mca_io_romio_mutex);
*req = MPI_REQUEST_NULL; *req = MPI_REQUEST_NULL;
return ret; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -63,8 +63,8 @@ int mca_mpool_base_close(void)
if(mca_mpool_base_use_mem_hooks && if(mca_mpool_base_use_mem_hooks &&
0 != (OPAL_MEMORY_FREE_SUPPORT & opal_mem_hooks_support_level())) { 0 != (OPAL_MEMORY_FREE_SUPPORT & opal_mem_hooks_support_level())) {
opal_mem_hooks_unregister_release(mca_mpool_base_mem_cb); opal_mem_hooks_unregister_release(mca_mpool_base_mem_cb);
OBJ_DESTRUCT(&mca_mpool_base_mem_cb_array);
} }
OBJ_DESTRUCT(&mca_mpool_base_mem_cb_array);
/* All done */ /* All done */
return OMPI_SUCCESS; return OMPI_SUCCESS;

Просмотреть файл

@ -93,7 +93,6 @@ mca_mpool_base_module_t* mca_mpool_base_module_create(
if(mca_mpool_base_use_mem_hooks && if(mca_mpool_base_use_mem_hooks &&
0 != (OPAL_MEMORY_FREE_SUPPORT & opal_mem_hooks_support_level())) { 0 != (OPAL_MEMORY_FREE_SUPPORT & opal_mem_hooks_support_level())) {
opal_mem_hooks_register_release(mca_mpool_base_mem_cb, NULL); opal_mem_hooks_register_release(mca_mpool_base_mem_cb, NULL);
OBJ_CONSTRUCT(&mca_mpool_base_mem_cb_array, ompi_pointer_array_t);
} }
#if defined(HAVE_MALLOPT) #if defined(HAVE_MALLOPT)
@ -103,6 +102,7 @@ mca_mpool_base_module_t* mca_mpool_base_module_create(
} }
#endif /* defined(HAVE_MALLOPT) */ #endif /* defined(HAVE_MALLOPT) */
} }
OBJ_CONSTRUCT(&mca_mpool_base_mem_cb_array, ompi_pointer_array_t);
return module; return module;
} }

Просмотреть файл

@ -66,7 +66,6 @@ ompi_osc_pt2pt_sendreq_alloc_init(ompi_osc_pt2pt_req_type_t req_type,
static void ompi_osc_pt2pt_sendreq_construct(ompi_osc_pt2pt_sendreq_t *req) static void ompi_osc_pt2pt_sendreq_construct(ompi_osc_pt2pt_sendreq_t *req)
{ {
req->super.req_type = OMPI_REQUEST_WIN; req->super.req_type = OMPI_REQUEST_WIN;
req->super.req_fini = NULL;
req->super.req_free = NULL; req->super.req_free = NULL;
req->super.req_cancel = NULL; req->super.req_cancel = NULL;
OBJ_CONSTRUCT(&(req->req_origin_convertor), ompi_convertor_t); OBJ_CONSTRUCT(&(req->req_origin_convertor), ompi_convertor_t);

Просмотреть файл

@ -66,7 +66,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_base_recv_request_t);
OBJ_RETAIN(comm); \ OBJ_RETAIN(comm); \
OBJ_RETAIN(datatype); \ OBJ_RETAIN(datatype); \
\ \
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \ OMPI_REQUEST_INIT(&(request)->req_base.req_ompi, persistent); \
(request)->req_bytes_packed = 0; \ (request)->req_bytes_packed = 0; \
(request)->req_base.req_sequence = 0; \ (request)->req_base.req_sequence = 0; \
(request)->req_base.req_addr = addr; \ (request)->req_base.req_addr = addr; \
@ -76,7 +76,6 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_base_recv_request_t);
(request)->req_base.req_tag = tag; \ (request)->req_base.req_tag = tag; \
(request)->req_base.req_comm = comm; \ (request)->req_base.req_comm = comm; \
(request)->req_base.req_proc = NULL; \ (request)->req_base.req_proc = NULL; \
(request)->req_base.req_persistent = persistent; \
(request)->req_base.req_pml_complete = (persistent ? true : false); \ (request)->req_base.req_pml_complete = (persistent ? true : false); \
(request)->req_base.req_free_called = false; \ (request)->req_base.req_free_called = false; \
} }

Просмотреть файл

@ -56,7 +56,6 @@ struct mca_pml_base_request_t {
uint64_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */ uint64_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */
struct ompi_datatype_t *req_datatype; /**< pointer to data type */ struct ompi_datatype_t *req_datatype; /**< pointer to data type */
mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */ mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */
bool req_persistent; /**< flag indicating if the this is a persistent request */
volatile bool req_pml_complete; /**< flag indicating if the pt-2-pt layer is done with this request */ volatile bool req_pml_complete; /**< flag indicating if the pt-2-pt layer is done with this request */
volatile bool req_free_called; /**< flag indicating if the user has freed this request */ volatile bool req_free_called; /**< flag indicating if the user has freed this request */
}; };

Просмотреть файл

@ -80,7 +80,7 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
OBJ_RETAIN(comm); \ OBJ_RETAIN(comm); \
OBJ_RETAIN(datatype); \ OBJ_RETAIN(datatype); \
\ \
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \ OMPI_REQUEST_INIT(&(request)->req_base.req_ompi, persistent); \
(request)->req_addr = addr; \ (request)->req_addr = addr; \
(request)->req_count = count; \ (request)->req_count = count; \
(request)->req_datatype = datatype; \ (request)->req_datatype = datatype; \
@ -92,7 +92,6 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
(request)->req_base.req_tag = (int32_t)tag; \ (request)->req_base.req_tag = (int32_t)tag; \
(request)->req_base.req_comm = comm; \ (request)->req_base.req_comm = comm; \
/* (request)->req_base.req_proc is set on request allocation */ \ /* (request)->req_base.req_proc is set on request allocation */ \
(request)->req_base.req_persistent = persistent; \
(request)->req_base.req_pml_complete = (persistent ? true : false); \ (request)->req_base.req_pml_complete = (persistent ? true : false); \
(request)->req_base.req_free_called = false; \ (request)->req_base.req_free_called = false; \
(request)->req_base.req_ompi.req_status._cancelled = 0; \ (request)->req_base.req_ompi.req_status._cancelled = 0; \

Просмотреть файл

@ -88,6 +88,12 @@ int mca_pml_dr_recv(void *addr,
MCA_PML_DR_RECV_REQUEST_START(recvreq); MCA_PML_DR_RECV_REQUEST_START(recvreq);
if (recvreq->req_recv.req_base.req_ompi.req_complete == false) { if (recvreq->req_recv.req_base.req_ompi.req_complete == false) {
#if OMPI_ENABLE_PROGRESS_THREADS
if(opal_progress_spin(&recvreq->req_recv.req_base.req_ompi.req_complete)) {
goto finished;
}
#endif
/* give up and sleep until completion */ /* give up and sleep until completion */
if (opal_using_threads()) { if (opal_using_threads()) {
opal_mutex_lock(&ompi_request_lock); opal_mutex_lock(&ompi_request_lock);
@ -104,11 +110,15 @@ int mca_pml_dr_recv(void *addr,
} }
} }
#if OMPI_ENABLE_PROGRESS_THREADS
finished:
#endif
if (NULL != status) { /* return status */ if (NULL != status) { /* return status */
*status = recvreq->req_recv.req_base.req_ompi.req_status; *status = recvreq->req_recv.req_base.req_ompi.req_status;
} }
rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR; rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR;
MCA_PML_DR_RECV_REQUEST_RETURN(recvreq); ompi_request_free( (ompi_request_t**)&recvreq );
return rc; return rc;
} }

Просмотреть файл

@ -108,6 +108,13 @@ int mca_pml_dr_send(void *buf,
} }
if (sendreq->req_send.req_base.req_ompi.req_complete == false) { if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
#if OMPI_ENABLE_PROGRESS_THREADS
if(opal_progress_spin(&sendreq->req_send.req_base.req_ompi.req_complete)) {
ompi_request_free( (ompi_request_t**)&sendreq );
return OMPI_SUCCESS;
}
#endif
/* give up and sleep until completion */ /* give up and sleep until completion */
if (opal_using_threads()) { if (opal_using_threads()) {
opal_mutex_lock(&ompi_request_lock); opal_mutex_lock(&ompi_request_lock);
@ -125,7 +132,8 @@ int mca_pml_dr_send(void *buf,
} }
/* return request to pool */ /* return request to pool */
MCA_PML_DR_FREE((ompi_request_t **) & sendreq); rc = sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR;
return OMPI_SUCCESS; ompi_request_free((ompi_request_t **) & sendreq);
return rc;
} }

Просмотреть файл

@ -34,24 +34,19 @@ static mca_pml_dr_recv_frag_t* mca_pml_dr_recv_request_match_specific_proc(
mca_pml_dr_recv_request_t* request, mca_pml_dr_comm_proc_t* proc); mca_pml_dr_recv_request_t* request, mca_pml_dr_comm_proc_t* proc);
static int mca_pml_dr_recv_request_fini(struct ompi_request_t** request) static inline int mca_pml_dr_recv_request_free(struct ompi_request_t** request)
{ {
mca_pml_dr_recv_request_t* recvreq = *(mca_pml_dr_recv_request_t**)request; mca_pml_dr_recv_request_t* recvreq = *(mca_pml_dr_recv_request_t**)request;
if(recvreq->req_recv.req_base.req_persistent) { assert( false == recvreq->req_recv.req_base.req_free_called );
if(recvreq->req_recv.req_base.req_free_called) {
MCA_PML_DR_FREE(request);
} else {
recvreq->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE;
}
} else {
MCA_PML_DR_FREE(request);
}
return OMPI_SUCCESS;
}
static int mca_pml_dr_recv_request_free(struct ompi_request_t** request) OPAL_THREAD_LOCK(&ompi_request_lock);
{ recvreq->req_recv.req_base.req_free_called = true;
MCA_PML_DR_FREE(request); if( true == recvreq->req_recv.req_base.req_pml_complete ) {
MCA_PML_DR_RECV_REQUEST_RETURN( recvreq );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -78,16 +73,11 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i
OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request->req_status._cancelled = true; ompi_request->req_status._cancelled = true;
ompi_request->req_complete = true; /* mark it as completed so all the test/wait functions /* This macro will set the req_complete to true so the MPI Test/Wait* functions
* on this particular request will finish */ * on this request will be able to complete. As the status is marked as
/* Now we have a problem if we are in a multi-threaded environment. We should * cancelled the cancel state will be detected.
* broadcast the condition on the request in order to allow the other threads
* to complete their test/wait functions.
*/ */
ompi_request_completed++; MCA_PML_BASE_REQUEST_MPI_COMPLETE(ompi_request);
if(ompi_request_waiting) {
opal_condition_broadcast(&ompi_request_cond);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -95,7 +85,6 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i
static void mca_pml_dr_recv_request_construct(mca_pml_dr_recv_request_t* request) static void mca_pml_dr_recv_request_construct(mca_pml_dr_recv_request_t* request)
{ {
request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV; request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
request->req_recv.req_base.req_ompi.req_fini = mca_pml_dr_recv_request_fini;
request->req_recv.req_base.req_ompi.req_free = mca_pml_dr_recv_request_free; request->req_recv.req_base.req_ompi.req_free = mca_pml_dr_recv_request_free;
request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel; request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel;
OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t); OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t);
@ -358,16 +347,7 @@ void mca_pml_dr_recv_request_progress(
recvreq->req_bytes_received += bytes_received; recvreq->req_bytes_received += bytes_received;
recvreq->req_bytes_delivered += bytes_delivered; recvreq->req_bytes_delivered += bytes_delivered;
if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) { if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) {
MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq);
/* initialize request status */
recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered;
recvreq->req_recv.req_base.req_pml_complete = true;
recvreq->req_recv.req_base.req_ompi.req_complete = true;
ompi_request_completed++;
if(ompi_request_waiting) {
opal_condition_broadcast(&ompi_request_cond);
}
} }
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
@ -401,19 +381,8 @@ void mca_pml_dr_recv_request_matched_probe(
break; break;
} }
/* check completion status */ /* mark probe request completed */
OPAL_THREAD_LOCK(&ompi_request_lock); MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq);
recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag;
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_match.hdr_src;
recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed;
recvreq->req_recv.req_base.req_pml_complete = true;
recvreq->req_recv.req_base.req_ompi.req_complete = true;
ompi_request_completed++;
if(ompi_request_waiting) {
opal_condition_broadcast(&ompi_request_cond);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
/* /*

Просмотреть файл

@ -105,6 +105,37 @@ do { \
persistent); \ persistent); \
} while(0) } while(0)
/**
* Mark a recv request complete.
*
* @param request (IN) Receive request.
*/
#define MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq) \
do { \
assert( false == recvreq->req_recv.req_base.req_pml_complete ); \
\
OPAL_THREAD_LOCK(&ompi_request_lock); \
/* initialize request status */ \
recvreq->req_recv.req_base.req_pml_complete = true; \
recvreq->req_recv.req_base.req_ompi.req_status._count = \
(recvreq->req_bytes_received < recvreq->req_bytes_delivered ? \
recvreq->req_bytes_received : recvreq->req_bytes_delivered); \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); \
\
if( true == recvreq->req_recv.req_base.req_free_called ) { \
MCA_PML_DR_RECV_REQUEST_RETURN( recvreq ); \
} else { \
if(recvreq->req_recv.req_base.req_ompi.req_persistent) { \
if( !recvreq->req_recv.req_base.req_free_called ) { \
recvreq->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; \
} \
} \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while(0)
/** /**
* Return a recv request to the modules free list. * Return a recv request to the modules free list.
* *

Просмотреть файл

@ -34,34 +34,26 @@
#include "ompi/mca/bml/base/base.h" #include "ompi/mca/bml/base/base.h"
static int mca_pml_dr_send_request_fini(struct ompi_request_t** request) /*
* The free call mark the final stage in a request life-cycle. Starting from this
* point the request is completed at both PML and user level, and can be used
* for others p2p communications. Therefore, in the case of the DR PML it should
* be added to the free request list.
*/
static inline int mca_pml_dr_send_request_free(struct ompi_request_t** request)
{ {
mca_pml_dr_send_request_t* sendreq = *(mca_pml_dr_send_request_t**)(request); mca_pml_dr_send_request_t* sendreq = *(mca_pml_dr_send_request_t**)request;
if(sendreq->req_send.req_base.req_persistent) {
if(sendreq->req_send.req_base.req_free_called) {
MCA_PML_DR_FREE(request);
} else {
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE;
/* rewind convertor */
if(sendreq->req_send.req_bytes_packed) {
size_t offset = 0;
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset);
}
/* if buffered send - release any resources */
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED &&
sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) {
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
}
}
} else {
MCA_PML_DR_FREE(request);
}
return OMPI_SUCCESS;
}
static int mca_pml_dr_send_request_free(struct ompi_request_t** request) assert( false == sendreq->req_send.req_base.req_free_called );
{
MCA_PML_DR_FREE(request); OPAL_THREAD_LOCK(&ompi_request_lock);
sendreq->req_send.req_base.req_free_called = true;
if( true == sendreq->req_send.req_base.req_pml_complete ) {
MCA_PML_DR_SEND_REQUEST_RETURN( sendreq );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -73,7 +65,6 @@ static int mca_pml_dr_send_request_cancel(struct ompi_request_t* request, int co
static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
{ {
OBJ_CONSTRUCT(&req->req_vfrag0, mca_pml_dr_vfrag_t); OBJ_CONSTRUCT(&req->req_vfrag0, mca_pml_dr_vfrag_t);
OBJ_CONSTRUCT(&req->req_retrans, opal_list_t); OBJ_CONSTRUCT(&req->req_retrans, opal_list_t);
@ -84,10 +75,8 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
req->req_vfrag0.vf_mask_processed = 0; req->req_vfrag0.vf_mask_processed = 0;
req->req_vfrag0.vf_send.pval = req; req->req_vfrag0.vf_send.pval = req;
req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini;
req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free; req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free;
req->req_send.req_base.req_ompi.req_cancel = mca_pml_dr_send_request_cancel; req->req_send.req_base.req_ompi.req_cancel = mca_pml_dr_send_request_cancel;
} }
static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req) static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req)

Просмотреть файл

@ -58,7 +58,6 @@ struct mca_pml_dr_send_request_t {
mca_pml_dr_vfrag_t req_vfrag0; mca_pml_dr_vfrag_t req_vfrag0;
opal_list_t req_retrans; opal_list_t req_retrans;
mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */ mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */
}; };
typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t; typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t;
@ -190,43 +189,47 @@ do {
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \ (sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \
(sendreq)->req_send.req_base.req_ompi.req_status._count = \ (sendreq)->req_send.req_base.req_ompi.req_status._count = \
(sendreq)->req_send.req_bytes_packed; \ (sendreq)->req_send.req_bytes_packed; \
(sendreq)->req_send.req_base.req_ompi.req_complete = true; \ if( (sendreq)->req_send.req_base.req_ompi.req_persistent ) { \
ompi_request_completed++; \ (sendreq)->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; \
if(ompi_request_waiting) { \
opal_condition_broadcast(&ompi_request_cond); \
} \ } \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &((sendreq)->req_send.req_base.req_ompi) ); \
} while(0) } while(0)
/* /*
* The PML has completed a send request. Note that this request * The request fini is responsible for releasing all ressources at the PML
* may have been orphaned by the user or have already completed * level. It will never be called directly from the upper level, as it should
* at the MPI level. * only be an internal call to the PML. However, in the case when the user
* already lost the MPI reference to the request (MPI_Request_free was called)
* fini should completely free the MPI request.
*/ */
#define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \ #define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \
do { \ do { \
/* request completed at pml level */ \ assert( false == sendreq->req_send.req_base.req_pml_complete ); \
assert((sendreq)->req_send.req_base.req_pml_complete == false); \ if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
(sendreq)->req_send.req_base.req_pml_complete = true; \ sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \
\ \
/* user has already released the request so simply free it */ \ OPAL_THREAD_LOCK(&ompi_request_lock); \
if((sendreq)->req_send.req_base.req_free_called) { \ if( false == sendreq->req_send.req_base.req_ompi.req_complete ) { \
/* if buffered send - release any resources */ \ /* Should only be called for long messages (maybe synchronous) */ \
if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
(sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \
MCA_PML_DR_SEND_REQUEST_RETURN(sendreq); \
/* is request complete at mpi level */ \
} else if ((sendreq)->req_send.req_base.req_ompi.req_complete == false) { \
MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \
/* buffered send - release any resources */ \
} else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
(sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \ } \
sendreq->req_send.req_base.req_pml_complete = true; \
\
if( sendreq->req_send.req_base.req_free_called ) { \
MCA_PML_DR_SEND_REQUEST_RETURN( sendreq ); \
} else { \
if(sendreq->req_send.req_base.req_ompi.req_persistent) { \
/* rewind convertor */ \
size_t offset = 0; \
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); \
} \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while (0) } while (0)
/* /*
* Release resources associated with a request * Release resources associated with a request
*/ */
@ -235,8 +238,8 @@ do {
do { \ do { \
/* Let the base handle the reference counts */ \ /* Let the base handle the reference counts */ \
MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \ MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \
OMPI_FREE_LIST_RETURN( \ OMPI_FREE_LIST_RETURN( &mca_pml_dr.send_requests, \
&mca_pml_dr.send_requests, (opal_list_item_t*)sendreq); \ (opal_list_item_t*)sendreq ); \
} while(0) } while(0)
/* /*
@ -303,7 +306,6 @@ do { \
} \ } \
} while(0) } while(0)
/* /*
* Update bytes delivered on request based on supplied descriptor * Update bytes delivered on request based on supplied descriptor
*/ */
@ -312,7 +314,6 @@ do { \
do { \ do { \
sendreq->req_bytes_delivered += vfrag->vf_size; \ sendreq->req_bytes_delivered += vfrag->vf_size; \
} while(0) } while(0)
/* /*
* Attempt to process any pending requests * Attempt to process any pending requests
*/ */

Просмотреть файл

@ -117,7 +117,6 @@ finished:
*status = recvreq->req_recv.req_base.req_ompi.req_status; *status = recvreq->req_recv.req_base.req_ompi.req_status;
} }
rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR; rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR;
MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq); ompi_request_free( (ompi_request_t**)&recvreq );
return rc; return rc;
} }

Просмотреть файл

@ -103,16 +103,14 @@ int mca_pml_ob1_send(void *buf,
MCA_PML_OB1_SEND_REQUEST_START(sendreq, rc); MCA_PML_OB1_SEND_REQUEST_START(sendreq, rc);
if (rc != OMPI_SUCCESS) { if (rc != OMPI_SUCCESS) {
MCA_PML_OB1_SEND_REQUEST_FREE( sendreq ); MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq );
return rc; return rc;
} }
if (sendreq->req_send.req_base.req_ompi.req_complete == false) { if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
#if OMPI_ENABLE_PROGRESS_THREADS #if OMPI_ENABLE_PROGRESS_THREADS
if(opal_progress_spin(&sendreq->req_send.req_base.req_ompi.req_complete)) { if(opal_progress_spin(&sendreq->req_send.req_base.req_ompi.req_complete)) {
opal_mutex_lock(&ompi_request_lock); ompi_request_free( (ompi_request_t**)&sendreq );
MCA_PML_OB1_SEND_REQUEST_FREE( sendreq );
opal_mutex_unlock(&ompi_request_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
#endif #endif
@ -124,16 +122,16 @@ int mca_pml_ob1_send(void *buf,
while (sendreq->req_send.req_base.req_ompi.req_complete == false) while (sendreq->req_send.req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock); opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--; ompi_request_waiting--;
MCA_PML_OB1_SEND_REQUEST_FREE( sendreq );
opal_mutex_unlock(&ompi_request_lock); opal_mutex_unlock(&ompi_request_lock);
} else { } else {
ompi_request_waiting++; ompi_request_waiting++;
while (sendreq->req_send.req_base.req_ompi.req_complete == false) while (sendreq->req_send.req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock); opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--; ompi_request_waiting--;
MCA_PML_OB1_SEND_REQUEST_FREE( sendreq );
} }
} }
return OMPI_SUCCESS; rc = sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR;
ompi_request_free( (ompi_request_t**)&sendreq );
return rc;
} }

Просмотреть файл

@ -35,26 +35,19 @@ static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
mca_pml_ob1_recv_request_t* request, mca_pml_ob1_comm_proc_t* proc); mca_pml_ob1_recv_request_t* request, mca_pml_ob1_comm_proc_t* proc);
static int mca_pml_ob1_recv_request_fini(struct ompi_request_t** request) static inline int mca_pml_ob1_recv_request_free(struct ompi_request_t** request)
{ {
mca_pml_ob1_recv_request_t* recvreq = *(mca_pml_ob1_recv_request_t**)request; mca_pml_ob1_recv_request_t* recvreq = *(mca_pml_ob1_recv_request_t**)request;
if(recvreq->req_recv.req_base.req_persistent) {
if(recvreq->req_recv.req_base.req_free_called) {
MCA_PML_OB1_RECV_REQUEST_FREE(recvreq);
*request = MPI_REQUEST_NULL;
} else {
recvreq->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE;
}
} else {
MCA_PML_OB1_RECV_REQUEST_FREE(recvreq);
*request = MPI_REQUEST_NULL;
}
return OMPI_SUCCESS;
}
static int mca_pml_ob1_recv_request_free(struct ompi_request_t** request) assert( false == recvreq->req_recv.req_base.req_free_called );
{
MCA_PML_OB1_RECV_REQUEST_FREE( *(mca_pml_ob1_recv_request_t**)request ); OPAL_THREAD_LOCK(&ompi_request_lock);
recvreq->req_recv.req_base.req_free_called = true;
if( true == recvreq->req_recv.req_base.req_pml_complete ) {
MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL; *request = MPI_REQUEST_NULL;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -94,9 +87,9 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
static void mca_pml_ob1_recv_request_construct(mca_pml_ob1_recv_request_t* request) static void mca_pml_ob1_recv_request_construct(mca_pml_ob1_recv_request_t* request)
{ {
request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV; request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
request->req_recv.req_base.req_ompi.req_fini = mca_pml_ob1_recv_request_fini;
request->req_recv.req_base.req_ompi.req_free = mca_pml_ob1_recv_request_free; request->req_recv.req_base.req_ompi.req_free = mca_pml_ob1_recv_request_free;
request->req_recv.req_base.req_ompi.req_cancel = mca_pml_ob1_recv_request_cancel; request->req_recv.req_base.req_ompi.req_cancel = mca_pml_ob1_recv_request_cancel;
request->req_rdma_cnt = 0;
} }
static void mca_pml_ob1_recv_request_destruct(mca_pml_ob1_recv_request_t* request) static void mca_pml_ob1_recv_request_destruct(mca_pml_ob1_recv_request_t* request)
@ -147,14 +140,7 @@ static void mca_pml_ob1_put_completion(
/* check completion status */ /* check completion status */
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received)
>= recvreq->req_recv.req_bytes_packed ) { >= recvreq->req_recv.req_bytes_packed ) {
/* initialize request status */ MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
recvreq->req_recv.req_base.req_pml_complete = true;
recvreq->req_recv.req_base.req_ompi.req_status._count =
(recvreq->req_bytes_received < recvreq->req_bytes_delivered ?
recvreq->req_bytes_received : recvreq->req_bytes_delivered);
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) );
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) {
/* schedule additional rdma operations */ /* schedule additional rdma operations */
mca_pml_ob1_recv_request_schedule(recvreq); mca_pml_ob1_recv_request_schedule(recvreq);
@ -310,7 +296,6 @@ static void mca_pml_ob1_fin_completion(
struct mca_btl_base_descriptor_t* des, struct mca_btl_base_descriptor_t* des,
int status) int status)
{ {
mca_pml_ob1_rdma_frag_t* frag = (mca_pml_ob1_rdma_frag_t*)des->des_cbdata; mca_pml_ob1_rdma_frag_t* frag = (mca_pml_ob1_rdma_frag_t*)des->des_cbdata;
mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) des->des_context; mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) des->des_context;
MCA_PML_OB1_RDMA_FRAG_RETURN(frag); MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
@ -335,18 +320,6 @@ static void mca_pml_ob1_rget_completion(
mca_btl_base_descriptor_t *fin; mca_btl_base_descriptor_t *fin;
int rc; int rc;
/* is receive request complete */
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length)
== recvreq->req_recv.req_bytes_packed ) {
recvreq->req_recv.req_base.req_ompi.req_status._count =
(recvreq->req_bytes_received < recvreq->req_bytes_delivered ?
recvreq->req_bytes_received : recvreq->req_bytes_delivered);
recvreq->req_recv.req_base.req_pml_complete = true;
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) );
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/* return descriptor */ /* return descriptor */
mca_bml_base_free(bml_btl, des); mca_bml_base_free(bml_btl, des);
@ -379,9 +352,14 @@ static void mca_pml_ob1_rget_completion(
} }
#endif #endif
#endif #endif
/* is receive request complete */
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length)
== recvreq->req_recv.req_bytes_packed ) {
MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
}
/* queue request */ /* queue request */
rc = mca_bml_base_send( rc = mca_bml_base_send( bml_btl,
bml_btl,
fin, fin,
MCA_BTL_TAG_PML MCA_BTL_TAG_PML
); );
@ -549,14 +527,7 @@ void mca_pml_ob1_recv_request_progress(
/* check completion status */ /* check completion status */
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received)
>= recvreq->req_recv.req_bytes_packed ) { >= recvreq->req_recv.req_bytes_packed ) {
/* initialize request status */ MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
recvreq->req_recv.req_base.req_ompi.req_status._count =
(recvreq->req_bytes_received < recvreq->req_bytes_delivered ?
recvreq->req_bytes_received : recvreq->req_bytes_delivered);
recvreq->req_recv.req_base.req_pml_complete = true;
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) );
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) {
/* schedule additional rdma operations */ /* schedule additional rdma operations */
mca_pml_ob1_recv_request_schedule(recvreq); mca_pml_ob1_recv_request_schedule(recvreq);
@ -595,11 +566,9 @@ void mca_pml_ob1_recv_request_matched_probe(
/* set completion status */ /* set completion status */
recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag; recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag;
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_match.hdr_src; recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_match.hdr_src;
recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed; recvreq->req_bytes_received = bytes_packed;
OPAL_THREAD_LOCK(&ompi_request_lock); recvreq->req_bytes_delivered = bytes_packed;
recvreq->req_recv.req_base.req_pml_complete = true; MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) );
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }

Просмотреть файл

@ -81,8 +81,7 @@ do { \
* @param comm (IN) Communicator. * @param comm (IN) Communicator.
* @param persistent (IN) Is this a ersistent request. * @param persistent (IN) Is this a ersistent request.
*/ */
#define MCA_PML_OB1_RECV_REQUEST_INIT( \ #define MCA_PML_OB1_RECV_REQUEST_INIT( request, \
request, \
addr, \ addr, \
count, \ count, \
datatype, \ datatype, \
@ -91,9 +90,7 @@ do { \
comm, \ comm, \
persistent) \ persistent) \
do { \ do { \
(request)->req_rdma_cnt = 0; \ MCA_PML_BASE_RECV_REQUEST_INIT( &(request)->req_recv, \
MCA_PML_BASE_RECV_REQUEST_INIT( \
&(request)->req_recv, \
addr, \ addr, \
count, \ count, \
datatype, \ datatype, \
@ -108,29 +105,43 @@ do { \
* *
* @param request (IN) Receive request. * @param request (IN) Receive request.
*/ */
#define MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq) \ #define MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE(recvreq) \
do { \ do { \
size_t r; \ size_t r; \
\
assert( false == recvreq->req_recv.req_base.req_pml_complete ); \
\
for( r = 0; r < recvreq->req_rdma_cnt; r++ ) { \ for( r = 0; r < recvreq->req_rdma_cnt; r++ ) { \
mca_mpool_base_registration_t* btl_reg = recvreq->req_rdma[r].btl_reg; \ mca_mpool_base_registration_t* btl_reg = recvreq->req_rdma[r].btl_reg; \
if( NULL != btl_reg ) { \ if( NULL != btl_reg ) { \
btl_reg->mpool->mpool_release( btl_reg->mpool, btl_reg ); \ btl_reg->mpool->mpool_release( btl_reg->mpool, btl_reg ); \
} \ } \
} \ } \
MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \ recvreq->req_rdma_cnt = 0; \
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_requests, (opal_list_item_t*)(recvreq)); \ \
OPAL_THREAD_LOCK(&ompi_request_lock); \
\
if( true == recvreq->req_recv.req_base.req_free_called ) { \
MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq ); \
} else { \
/* initialize request status */ \
recvreq->req_recv.req_base.req_pml_complete = true; \
recvreq->req_recv.req_base.req_ompi.req_status._count = \
(recvreq->req_bytes_received < recvreq->req_bytes_delivered ? \
recvreq->req_bytes_received : recvreq->req_bytes_delivered); \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while(0) } while(0)
/* /*
* Free the PML receive request * Free the PML receive request
*/ */
#define MCA_PML_OB1_RECV_REQUEST_FREE(recvreq) \ #define MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq) \
{ \ { \
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)(recvreq); \ MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \
pml_request->req_free_called = true; \ OMPI_FREE_LIST_RETURN( &mca_pml_ob1.recv_requests, \
if( pml_request->req_pml_complete == true) { \ (opal_list_item_t*)(recvreq)); \
MCA_PML_OB1_RECV_REQUEST_RETURN((recvreq)); \
} \
} }
/** /**
@ -166,7 +177,6 @@ do {
(request)->req_bytes_delivered = 0; \ (request)->req_bytes_delivered = 0; \
(request)->req_lock = 0; \ (request)->req_lock = 0; \
(request)->req_pipeline_depth = 0; \ (request)->req_pipeline_depth = 0; \
(request)->req_rdma_cnt = 0; \
(request)->req_rdma_idx = 0; \ (request)->req_rdma_idx = 0; \
(request)->req_recv.req_base.req_pml_complete = false; \ (request)->req_recv.req_base.req_pml_complete = false; \
(request)->req_recv.req_base.req_ompi.req_complete = false; \ (request)->req_recv.req_base.req_ompi.req_complete = false; \

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology * University Research and Technology
* Corporation. All rights reserved. * Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University * Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -35,41 +35,25 @@
#include "ompi/mca/bml/base/base.h" #include "ompi/mca/bml/base/base.h"
#include "ompi/datatype/dt_arch.h" #include "ompi/datatype/dt_arch.h"
/*
static int mca_pml_ob1_send_request_fini(struct ompi_request_t** request) * The free call mark the final stage in a request life-cycle. Starting from this
* point the request is completed at both PML and user level, and can be used
* for others p2p communications. Therefore, in the case of the OB1 PML it should
* be added to the free request list.
*/
static inline int mca_pml_ob1_send_request_free(struct ompi_request_t** request)
{ {
mca_pml_ob1_send_request_t* sendreq = *(mca_pml_ob1_send_request_t**)(request); mca_pml_ob1_send_request_t* sendreq = *(mca_pml_ob1_send_request_t**)request;
assert( false == sendreq->req_send.req_base.req_free_called );
OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_THREAD_LOCK(&ompi_request_lock);
if(sendreq->req_send.req_base.req_persistent) { sendreq->req_send.req_base.req_free_called = true;
if(sendreq->req_send.req_base.req_free_called) { if( true == sendreq->req_send.req_base.req_pml_complete ) {
MCA_PML_OB1_SEND_REQUEST_FREE(sendreq); MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq );
*request = MPI_REQUEST_NULL;
} else {
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE;
/* rewind convertor */
if(sendreq->req_send.req_bytes_packed) {
size_t offset = 0;
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset);
}
/* if buffered send - release any resources */
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED &&
sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) {
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
}
}
} else {
MCA_PML_OB1_SEND_REQUEST_FREE(sendreq);
*request = MPI_REQUEST_NULL;
} }
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
return OMPI_SUCCESS;
}
static int mca_pml_ob1_send_request_free(struct ompi_request_t** request)
{
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_FREE( *(mca_pml_ob1_send_request_t**)request );
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL; *request = MPI_REQUEST_NULL;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -83,9 +67,9 @@ static int mca_pml_ob1_send_request_cancel(struct ompi_request_t* request, int c
static void mca_pml_ob1_send_request_construct(mca_pml_ob1_send_request_t* req) static void mca_pml_ob1_send_request_construct(mca_pml_ob1_send_request_t* req)
{ {
req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_send.req_base.req_ompi.req_fini = mca_pml_ob1_send_request_fini;
req->req_send.req_base.req_ompi.req_free = mca_pml_ob1_send_request_free; req->req_send.req_base.req_ompi.req_free = mca_pml_ob1_send_request_free;
req->req_send.req_base.req_ompi.req_cancel = mca_pml_ob1_send_request_cancel; req->req_send.req_base.req_ompi.req_cancel = mca_pml_ob1_send_request_cancel;
req->req_rdma_cnt = 0;
} }
static void mca_pml_ob1_send_request_destruct(mca_pml_ob1_send_request_t* req) static void mca_pml_ob1_send_request_destruct(mca_pml_ob1_send_request_t* req)
@ -100,7 +84,8 @@ OBJ_CLASS_INSTANCE(
mca_pml_ob1_send_request_destruct); mca_pml_ob1_send_request_destruct);
/** /**
* Completion of a short message - nothing left to schedule. * Completion of a short message - nothing left to schedule. Note that this
* function is only called for 0 sized messages.
*/ */
void mca_pml_ob1_match_completion_cache( void mca_pml_ob1_match_completion_cache(
@ -123,9 +108,7 @@ void mca_pml_ob1_match_completion_cache(
MCA_BML_BASE_BTL_DES_RETURN( bml_btl, descriptor ); MCA_BML_BASE_BTL_DES_RETURN( bml_btl, descriptor );
/* signal request completion */ /* signal request completion */
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
/** /**
@ -152,9 +135,7 @@ void mca_pml_ob1_match_completion_free(
mca_bml_base_free( bml_btl, descriptor ); mca_bml_base_free( bml_btl, descriptor );
/* signal request completion */ /* signal request completion */
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
/* /*
@ -215,9 +196,7 @@ static void mca_pml_ob1_rget_completion(
0, req_bytes_delivered ); 0, req_bytes_delivered );
if( OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered, req_bytes_delivered ) if( OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered, req_bytes_delivered )
== sendreq->req_send.req_bytes_packed ) { == sendreq->req_send.req_bytes_packed ) {
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
/* release resources */ /* release resources */
@ -273,9 +252,7 @@ static void mca_pml_ob1_frag_completion(
req_bytes_delivered == sendreq->req_send.req_bytes_packed) { req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
/*if( OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered, req_bytes_delivered ) /*if( OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered, req_bytes_delivered )
== sendreq->req_send.req_bytes_packed) {*/ == sendreq->req_send.req_bytes_packed) {*/
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else { } else {
mca_pml_ob1_send_request_schedule(sendreq); mca_pml_ob1_send_request_schedule(sendreq);
} }
@ -389,7 +366,10 @@ int mca_pml_ob1_send_request_start_buffered(
sendreq->req_send.req_datatype, sendreq->req_send.req_datatype,
sendreq->req_send.req_count, sendreq->req_send.req_count,
sendreq->req_send.req_addr); sendreq->req_send.req_addr);
#if 0
ompi_convertor_set_position( &sendreq->req_send.req_convertor,
&sendreq->req_send_offset );
#endif
/* request is complete at mpi level */ /* request is complete at mpi level */
OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq);
@ -406,7 +386,8 @@ int mca_pml_ob1_send_request_start_buffered(
/** /**
* BTL requires "specially" allocated memory. Request a segment that * BTL requires "specially" allocated memory. Request a segment that
* is used for initial hdr and any eager data. * is used for initial hdr and any eager data. This is used only from
* the _START macro.
*/ */
int mca_pml_ob1_send_request_start_copy( int mca_pml_ob1_send_request_start_copy(
@ -495,7 +476,7 @@ int mca_pml_ob1_send_request_start_copy(
/** /**
* BTL can send directly from user buffer so allow the BTL * BTL can send directly from user buffer so allow the BTL
* to prepare the segment list. * to prepare the segment list. Start sending a small message.
*/ */
int mca_pml_ob1_send_request_start_prepare( int mca_pml_ob1_send_request_start_prepare(
@ -816,13 +797,12 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
if(num_btl_avail == 1 || bytes_remaining < bml_btl->btl_min_send_size) { if(num_btl_avail == 1 || bytes_remaining < bml_btl->btl_min_send_size) {
size = bytes_remaining; size = bytes_remaining;
} else {
/* otherwise attempt to give the BTL a percentage of the message /* otherwise attempt to give the BTL a percentage of the message
* based on a weighting factor. for simplicity calculate this as * based on a weighting factor. for simplicity calculate this as
* a percentage of the overall message length (regardless of amount * a percentage of the overall message length (regardless of amount
* previously assigned) * previously assigned)
*/ */
} else {
size = (size_t)(bml_btl->btl_weight * bytes_remaining); size = (size_t)(bml_btl->btl_weight * bytes_remaining);
} }
@ -830,7 +810,7 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
if (bml_btl->btl_max_send_size != 0 && if (bml_btl->btl_max_send_size != 0 &&
size > bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t)) { size > bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t)) {
size = bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t); size = bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t);
#if defined(GEORGE_HAVE_TO_MAKE_SURE_THAT_WE_DONT_NEED_IT)
/* very expensive - however for buffered sends we need to send on a /* very expensive - however for buffered sends we need to send on a
* boundary that the receiver will be able to unpack completely * boundary that the receiver will be able to unpack completely
* using the native datatype * using the native datatype
@ -854,9 +834,9 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
OBJ_DESTRUCT( &convertor ); OBJ_DESTRUCT( &convertor );
size = position - sendreq->req_send_offset; size = position - sendreq->req_send_offset;
} }
#endif
} }
/* pack into a descriptor */ /* pack into a descriptor */
ompi_convertor_set_position(&sendreq->req_send.req_convertor, ompi_convertor_set_position(&sendreq->req_send.req_convertor,
&sendreq->req_send_offset); &sendreq->req_send_offset);
@ -971,14 +951,6 @@ static void mca_pml_ob1_put_completion(
orte_errmgr.abort(); orte_errmgr.abort();
} }
/* check for request completion */
if( OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, frag->rdma_length)
>= sendreq->req_send.req_bytes_packed) {
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/* allocate descriptor for fin control message - note that /* allocate descriptor for fin control message - note that
* the rdma descriptor cannot be reused as it points directly * the rdma descriptor cannot be reused as it points directly
* at the user buffer * at the user buffer
@ -1033,6 +1005,13 @@ static void mca_pml_ob1_put_completion(
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
orte_errmgr.abort(); orte_errmgr.abort();
} }
goto cleanup;
}
/* check for request completion */
if( OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, frag->rdma_length)
>= sendreq->req_send.req_bytes_packed) {
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq);
} }
cleanup: cleanup:

Просмотреть файл

@ -142,7 +142,6 @@ do {
sendreq->req_lock = 0; \ sendreq->req_lock = 0; \
sendreq->req_pipeline_depth = 0; \ sendreq->req_pipeline_depth = 0; \
sendreq->req_bytes_delivered = 0; \ sendreq->req_bytes_delivered = 0; \
sendreq->req_rdma_cnt = 0; \
sendreq->req_state = 0; \ sendreq->req_state = 0; \
sendreq->req_send_offset = 0; \ sendreq->req_send_offset = 0; \
sendreq->req_send.req_base.req_pml_complete = false; \ sendreq->req_send.req_base.req_pml_complete = false; \
@ -258,13 +257,15 @@ do {
* The PML has completed a send request. Note that this request * The PML has completed a send request. Note that this request
* may have been orphaned by the user or have already completed * may have been orphaned by the user or have already completed
* at the MPI level. * at the MPI level.
* This macro will never be called directly from the upper level, as it should
* only be an internal call to the PML.
*/ */
#define MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq) \ #define MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq) \
do { \ do { \
size_t r; \ size_t r; \
/* request completed at pml level */ \ \
assert( false == (sendreq)->req_send.req_base.req_pml_complete ); \ assert( false == sendreq->req_send.req_base.req_pml_complete ); \
(sendreq)->req_send.req_base.req_pml_complete = true; \
\ \
/* return mpool resources */ \ /* return mpool resources */ \
for( r = 0; r < sendreq->req_rdma_cnt; r++ ) { \ for( r = 0; r < sendreq->req_rdma_cnt; r++ ) { \
@ -275,22 +276,28 @@ do {
} \ } \
sendreq->req_rdma_cnt = 0; \ sendreq->req_rdma_cnt = 0; \
\ \
/* user has already released the request so simply free it */ \ if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
if((sendreq)->req_send.req_base.req_free_called) { \ sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { \
/* if buffered send - release any resources */ \
if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
(sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \ } \
MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq); \ \
/* is request complete at mpi level */ \ OPAL_THREAD_LOCK(&ompi_request_lock); \
} else if ((sendreq)->req_send.req_base.req_ompi.req_complete == false) { \ if( false == sendreq->req_send.req_base.req_ompi.req_complete ) { \
/* Should only be called for long messages (maybe synchronous) */ \
MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); \ MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); \
/* buffered send - release any resources */ \
} else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
(sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \ } \
sendreq->req_send.req_base.req_pml_complete = true; \
\
if( sendreq->req_send.req_base.req_free_called ) { \
MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq ); \
} else { \
if(sendreq->req_send.req_base.req_ompi.req_persistent) { \
/* rewind convertor */ \
size_t offset = 0; \
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); \
} \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while (0) } while (0)
@ -305,9 +312,7 @@ do {
/* has an acknowledgment been received */ \ /* has an acknowledgment been received */ \
if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \ if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \ if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \
OPAL_THREAD_LOCK(&ompi_request_lock); \
MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); \ MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} else { \ } else { \
/* additional data to schedule */ \ /* additional data to schedule */ \
mca_pml_ob1_send_request_schedule(sendreq); \ mca_pml_ob1_send_request_schedule(sendreq); \
@ -327,22 +332,6 @@ do {
&mca_pml_ob1.send_requests, (opal_list_item_t*)sendreq); \ &mca_pml_ob1.send_requests, (opal_list_item_t*)sendreq); \
} }
/*
* Free a send request
*/
#define MCA_PML_OB1_SEND_REQUEST_FREE(sendreq) \
{ \
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)(sendreq); \
pml_request->req_free_called = true; \
if( pml_request->req_pml_complete == true) { \
if((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
(sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)(sendreq)); \
} \
MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq); \
} \
}
/* /*
* Attempt to process any pending requests * Attempt to process any pending requests
*/ */

Просмотреть файл

@ -44,14 +44,12 @@ static int ompi_grequest_cancel(ompi_request_t* req, int flag)
static void ompi_grequest_construct(ompi_grequest_t* greq) static void ompi_grequest_construct(ompi_grequest_t* greq)
{ {
OMPI_REQUEST_INIT(&greq->greq_base); OMPI_REQUEST_INIT(&greq->greq_base, false);
greq->greq_base.req_fini = ompi_grequest_free;
greq->greq_base.req_free = ompi_grequest_free; greq->greq_base.req_free = ompi_grequest_free;
greq->greq_base.req_cancel = ompi_grequest_cancel; greq->greq_base.req_cancel = ompi_grequest_cancel;
greq->greq_base.req_type = OMPI_REQUEST_GEN; greq->greq_base.req_type = OMPI_REQUEST_GEN;
} }
static void ompi_grequest_destruct(ompi_grequest_t* greq) static void ompi_grequest_destruct(ompi_grequest_t* greq)
{ {
OMPI_REQUEST_FINI(&greq->greq_base); OMPI_REQUEST_FINI(&greq->greq_base);

Просмотреть файл

@ -37,8 +37,7 @@ int ompi_request_test_any(
rptr = requests; rptr = requests;
for (i = 0; i < count; i++, rptr++) { for (i = 0; i < count; i++, rptr++) {
request = *rptr; request = *rptr;
if (request == MPI_REQUEST_NULL || if( request->req_state == OMPI_REQUEST_INACTIVE ) {
request->req_state == OMPI_REQUEST_INACTIVE) {
num_requests_null_inactive++; num_requests_null_inactive++;
continue; continue;
} }
@ -48,7 +47,11 @@ int ompi_request_test_any(
if (MPI_STATUS_IGNORE != status) { if (MPI_STATUS_IGNORE != status) {
*status = request->req_status; *status = request->req_status;
} }
return request->req_fini(rptr); if( request->req_persistent ) {
request->req_state = OMPI_REQUEST_INACTIVE;
return OMPI_SUCCESS;
}
return ompi_request_free(rptr);
} }
} }
@ -84,8 +87,7 @@ int ompi_request_test_all(
rptr = requests; rptr = requests;
for (i = 0; i < count; i++, rptr++) { for (i = 0; i < count; i++, rptr++) {
request = *rptr; request = *rptr;
if (request == MPI_REQUEST_NULL || if( request->req_state == OMPI_REQUEST_INACTIVE ||
request->req_state == OMPI_REQUEST_INACTIVE ||
request->req_complete) { request->req_complete) {
num_completed++; num_completed++;
} }
@ -99,34 +101,41 @@ int ompi_request_test_all(
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
rptr = requests;
*completed = true; *completed = true;
if (MPI_STATUSES_IGNORE != statuses) { if (MPI_STATUSES_IGNORE != statuses) {
/* fill out completion status and free request if required */ /* fill out completion status and free request if required */
rptr = requests; for( i = 0; i < count; i++, rptr++ ) {
for (i = 0; i < count; i++) {
int rc; int rc;
request = *rptr; request = *rptr;
if(request == MPI_REQUEST_NULL || if( request->req_state == OMPI_REQUEST_INACTIVE ) {
request->req_state == OMPI_REQUEST_INACTIVE) {
statuses[i] = ompi_status_empty; statuses[i] = ompi_status_empty;
} else { continue;
statuses[i] = request->req_status;
} }
rc = request->req_fini(rptr); statuses[i] = request->req_status;
if( request->req_persistent ) {
request->req_state = OMPI_REQUEST_INACTIVE;
continue;
}
rc = ompi_request_free(rptr);
if(rc != OMPI_SUCCESS) if(rc != OMPI_SUCCESS)
return rc; return rc;
rptr++;
} }
} else { } else {
/* free request if required */ /* free request if required */
rptr = requests; for( i = 0; i < count; i++, rptr++ ) {
for (i = 0; i < count; i++) {
int rc; int rc;
request = *rptr; request = *rptr;
rc = request->req_fini(rptr); if( request->req_state == OMPI_REQUEST_INACTIVE) {
continue;
}
if( request->req_persistent ) {
request->req_state = OMPI_REQUEST_INACTIVE;
continue;
}
rc = ompi_request_free(rptr);
if(rc != OMPI_SUCCESS) if(rc != OMPI_SUCCESS)
return rc; return rc;
rptr++;
} }
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology * University Research and Technology
* Corporation. All rights reserved. * Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University * Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@ -26,6 +26,7 @@ int ompi_request_wait(
ompi_status_public_t * status) ompi_status_public_t * status)
{ {
ompi_request_t *req = *req_ptr; ompi_request_t *req = *req_ptr;
if(req->req_complete == false) { if(req->req_complete == false) {
#if OMPI_ENABLE_PROGRESS_THREADS #if OMPI_ENABLE_PROGRESS_THREADS
@ -52,9 +53,16 @@ finished:
if( MPI_STATUS_IGNORE != status ) { if( MPI_STATUS_IGNORE != status ) {
*status = req->req_status; *status = req->req_status;
} }
if( req->req_state == OMPI_REQUEST_INACTIVE ) {
return OMPI_SUCCESS;
}
if( req->req_persistent ) {
req->req_state = OMPI_REQUEST_INACTIVE;
return OMPI_SUCCESS;
}
/* return request to pool */ /* return request to pool */
return req->req_fini(req_ptr); return ompi_request_free(req_ptr);
} }
@ -144,13 +152,17 @@ finished:
*status = ompi_status_empty; *status = ompi_status_empty;
} }
} else { } else {
assert( true == request->req_complete );
/* return status */ /* return status */
if (MPI_STATUS_IGNORE != status) { if (MPI_STATUS_IGNORE != status) {
*status = request->req_status; *status = request->req_status;
} }
if( request->req_persistent ) {
request->req_state = OMPI_REQUEST_INACTIVE;
} else {
/* return request to pool */ /* return request to pool */
rc = request->req_fini(rptr); rc = ompi_request_free(rptr);
}
*index = completed; *index = completed;
} }
return rc; return rc;
@ -165,6 +177,7 @@ int ompi_request_wait_all(
size_t completed = 0, i; size_t completed = 0, i;
ompi_request_t **rptr; ompi_request_t **rptr;
ompi_request_t *request; ompi_request_t *request;
int mpi_error = OMPI_SUCCESS;
rptr = requests; rptr = requests;
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
@ -225,38 +238,48 @@ int ompi_request_wait_all(
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
rptr = requests;
if (MPI_STATUSES_IGNORE != statuses) { if (MPI_STATUSES_IGNORE != statuses) {
/* fill out status and free request if required */ /* fill out status and free request if required */
rptr = requests; for( i = 0; i < count; i++, rptr++ ) {
for (i = 0; i < count; i++) {
int rc;
request = *rptr; request = *rptr;
if(request == MPI_REQUEST_NULL || assert( true == request->req_complete );
request->req_state == OMPI_REQUEST_INACTIVE) { if( request->req_state == OMPI_REQUEST_INACTIVE ) {
statuses[i] = ompi_status_empty; statuses[i] = ompi_status_empty;
} else { } else {
statuses[i] = request->req_status; statuses[i] = request->req_status;
rc = request->req_fini(rptr);
if (rc != OMPI_SUCCESS)
return rc;
} }
rptr++; if( request->req_persistent ) {
request->req_state = OMPI_REQUEST_INACTIVE;
} else {
(void)ompi_request_free(rptr);
}
if( statuses[i].MPI_ERROR != OMPI_SUCCESS) {
mpi_error = OMPI_ERR_IN_ERRNO;
}
} }
} else { } else {
/* free request if required */ /* free request if required */
rptr = requests;
for( i = 0; i < count; i++, rptr++ ) { for( i = 0; i < count; i++, rptr++ ) {
int rc; int rc;
request = *rptr; request = *rptr;
if(request == MPI_REQUEST_NULL ||
request->req_state == OMPI_REQUEST_INACTIVE) { assert( true == request->req_complete );
continue; if( request->req_state == OMPI_REQUEST_INACTIVE ) {
rc = ompi_status_empty.MPI_ERROR;
} else {
rc = request->req_status.MPI_ERROR;
} }
rc = request->req_fini(rptr); if( request->req_persistent ) {
if (rc != OMPI_SUCCESS) request->req_state = OMPI_REQUEST_INACTIVE;
return rc; } else {
(void)ompi_request_free(rptr);
}
if( rc != OMPI_SUCCESS) {
mpi_error = rc;
} }
} }
return OMPI_SUCCESS; }
return mpi_error;
} }

Просмотреть файл

@ -34,8 +34,7 @@ ompi_status_public_t ompi_status_empty;
static void ompi_request_construct(ompi_request_t* req) static void ompi_request_construct(ompi_request_t* req)
{ {
OMPI_REQUEST_INIT(req); OMPI_REQUEST_INIT(req, false);
req->req_fini = NULL;
req->req_free = NULL; req->req_free = NULL;
req->req_cancel = NULL; req->req_cancel = NULL;
req->req_f_to_c_index = MPI_UNDEFINED; req->req_f_to_c_index = MPI_UNDEFINED;
@ -43,7 +42,8 @@ static void ompi_request_construct(ompi_request_t* req)
static void ompi_request_destruct(ompi_request_t* req) static void ompi_request_destruct(ompi_request_t* req)
{ {
OMPI_REQUEST_FINI(req); assert( MPI_UNDEFINED == req->req_f_to_c_index );
assert( OMPI_REQUEST_INVALID == req->req_state );
} }
static int ompi_request_null_free(ompi_request_t** request) static int ompi_request_null_free(ompi_request_t** request)
@ -87,7 +87,6 @@ int ompi_request_init(void)
ompi_request_null.req_state = OMPI_REQUEST_INACTIVE; ompi_request_null.req_state = OMPI_REQUEST_INACTIVE;
ompi_request_null.req_complete = true; ompi_request_null.req_complete = true;
ompi_request_null.req_type = OMPI_REQUEST_NULL; ompi_request_null.req_type = OMPI_REQUEST_NULL;
ompi_request_null.req_fini = ompi_request_null_free;
ompi_request_null.req_free = ompi_request_null_free; ompi_request_null.req_free = ompi_request_null_free;
ompi_request_null.req_cancel = ompi_request_null_cancel; ompi_request_null.req_cancel = ompi_request_null_cancel;
ompi_request_null.req_f_to_c_index = ompi_request_null.req_f_to_c_index =
@ -104,7 +103,7 @@ int ompi_request_init(void)
* The main difference to ompi_request_null is * The main difference to ompi_request_null is
* req_state being OMPI_REQUEST_ACTIVE, so that MPI_Waitall * req_state being OMPI_REQUEST_ACTIVE, so that MPI_Waitall
* does not set the status to ompi_status_empty and the different * does not set the status to ompi_status_empty and the different
* req_fini and req_free function, which resets the * req_free function, which resets the
* request to MPI_REQUEST_NULL. * request to MPI_REQUEST_NULL.
* The req_cancel function need not be changed. * The req_cancel function need not be changed.
*/ */
@ -117,7 +116,6 @@ int ompi_request_init(void)
ompi_request_empty.req_state = OMPI_REQUEST_ACTIVE; ompi_request_empty.req_state = OMPI_REQUEST_ACTIVE;
ompi_request_empty.req_complete = true; ompi_request_empty.req_complete = true;
ompi_request_empty.req_type = OMPI_REQUEST_NULL; ompi_request_empty.req_type = OMPI_REQUEST_NULL;
ompi_request_empty.req_fini = ompi_request_empty_free;
ompi_request_empty.req_free = ompi_request_empty_free; ompi_request_empty.req_free = ompi_request_empty_free;
ompi_request_empty.req_cancel = ompi_request_null_cancel; ompi_request_empty.req_cancel = ompi_request_null_cancel;
ompi_request_empty.req_f_to_c_index = ompi_request_empty.req_f_to_c_index =
@ -139,7 +137,10 @@ int ompi_request_init(void)
int ompi_request_finalize(void) int ompi_request_finalize(void)
{ {
OMPI_REQUEST_FINI( &ompi_request_null );
OBJ_DESTRUCT( &ompi_request_null ); OBJ_DESTRUCT( &ompi_request_null );
OMPI_REQUEST_FINI( &ompi_request_empty );
OBJ_DESTRUCT( &ompi_request_empty );
OBJ_DESTRUCT( &ompi_request_cond ); OBJ_DESTRUCT( &ompi_request_cond );
OBJ_DESTRUCT( &ompi_request_lock ); OBJ_DESTRUCT( &ompi_request_lock );
OBJ_DESTRUCT( &ompi_request_f_to_c_table ); OBJ_DESTRUCT( &ompi_request_f_to_c_table );

Просмотреть файл

@ -88,8 +88,8 @@ struct ompi_request_t {
ompi_status_public_t req_status; /**< Completion status */ ompi_status_public_t req_status; /**< Completion status */
volatile bool req_complete; /**< Flag indicating wether request has completed */ volatile bool req_complete; /**< Flag indicating wether request has completed */
volatile ompi_request_state_t req_state; /**< enum indicate state of the request */ volatile ompi_request_state_t req_state; /**< enum indicate state of the request */
bool req_persistent; /**< flag indicating if the this is a persistent request */
int req_f_to_c_index; /**< Index in Fortran <-> C translation array */ int req_f_to_c_index; /**< Index in Fortran <-> C translation array */
ompi_request_free_fn_t req_fini; /**< Called by test/wait */
ompi_request_free_fn_t req_free; /**< Called by free */ ompi_request_free_fn_t req_free; /**< Called by free */
ompi_request_cancel_fn_t req_cancel; /**< Optional function to cancel the request */ ompi_request_cancel_fn_t req_cancel; /**< Optional function to cancel the request */
}; };
@ -106,11 +106,11 @@ typedef struct ompi_request_t ompi_request_t;
* performance path (since requests may be re-used, it is possible * performance path (since requests may be re-used, it is possible
* that we will have to initialize a request multiple times). * that we will have to initialize a request multiple times).
*/ */
#define OMPI_REQUEST_INIT(request) \ #define OMPI_REQUEST_INIT(request, persistent) \
do { \ do { \
(request)->req_state = OMPI_REQUEST_INACTIVE; \ (request)->req_state = OMPI_REQUEST_INACTIVE; \
(request)->req_complete = false; \ (request)->req_complete = false; \
(request)->req_f_to_c_index = MPI_UNDEFINED; \ (request)->req_persistent = (persistent); \
} while (0); } while (0);
/** /**
@ -122,8 +122,12 @@ typedef struct ompi_request_t ompi_request_t;
* When finalizing a request, if MPI_Request_f2c() was previously * When finalizing a request, if MPI_Request_f2c() was previously
* invoked on that request, then this request was added to the f2c * invoked on that request, then this request was added to the f2c
* table, and we need to remove it * table, and we need to remove it
*
* This function should be called only from the MPI layer. It should
* never be called from the PML. It take care of the upper level clean-up.
* When the user call MPI_Request_free we should release all MPI level
* ressources, so we have to call this function too.
*/ */
#define OMPI_REQUEST_FINI(request) \ #define OMPI_REQUEST_FINI(request) \
do { \ do { \
(request)->req_state = OMPI_REQUEST_INVALID; \ (request)->req_state = OMPI_REQUEST_INVALID; \
@ -205,35 +209,36 @@ static inline int ompi_request_free(ompi_request_t** request)
*/ */
static inline int ompi_request_test( static inline int ompi_request_test( ompi_request_t ** rptr,
ompi_request_t ** rptr,
int *completed, int *completed,
ompi_status_public_t * status ) ompi_status_public_t * status )
{ {
ompi_request_t *request = *rptr; ompi_request_t *request = *rptr;
opal_atomic_mb(); opal_atomic_mb();
if (request == MPI_REQUEST_NULL || if( request->req_state == OMPI_REQUEST_INACTIVE ) {
request->req_state == OMPI_REQUEST_INACTIVE) {
*completed = true; *completed = true;
if (MPI_STATUS_IGNORE != status) { if (MPI_STATUS_IGNORE != status) {
*status = ompi_status_empty; *status = ompi_status_empty;
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
else if (request->req_complete) { if (request->req_complete) {
*completed = true; *completed = true;
if (MPI_STATUS_IGNORE != status) { if (MPI_STATUS_IGNORE != status) {
*status = request->req_status; *status = request->req_status;
} }
return request->req_fini(rptr); if( request->req_persistent ) {
} else { request->req_state = OMPI_REQUEST_INACTIVE;
return OMPI_SUCCESS;
}
return ompi_request_free(rptr);
}
*completed = false; *completed = false;
#if OMPI_ENABLE_PROGRESS_THREADS == 0 #if OMPI_ENABLE_PROGRESS_THREADS == 0
opal_progress(); opal_progress();
#endif #endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
}
/** /**
* Non-blocking test for request completion. * Non-blocking test for request completion.
@ -249,7 +254,7 @@ static inline int ompi_request_test(
* request handle at index set to NULL. * request handle at index set to NULL.
*/ */
int ompi_request_test_any( OMPI_DECLSPEC int ompi_request_test_any(
size_t count, size_t count,
ompi_request_t ** requests, ompi_request_t ** requests,
int *index, int *index,