* Merge in changes from bwb-romio/copy-2 branch. This slightly changes
the way progress is done in the non-threaded case, adds a progress thread for the threaded progress case (which should be made to use George's progress engine when it is ready), and fixes some bugs with request handling, especially with freeing requests (IO requests, that is). This commit was SVN r4332.
Этот коммит содержится в:
родитель
46713d23df
Коммит
a8099eaa07
@ -261,6 +261,8 @@ extern "C" {
|
||||
*/
|
||||
OMPI_DECLSPEC int mca_io_base_close(void);
|
||||
|
||||
OMPI_DECLSPEC int mca_io_base_component_run_progress(void);
|
||||
|
||||
|
||||
/*
|
||||
* Globals
|
||||
|
@ -21,10 +21,14 @@
|
||||
#include "mca/base/base.h"
|
||||
#include "mca/io/io.h"
|
||||
#include "mca/io/base/base.h"
|
||||
#include "mca/io/base/io_base_request.h"
|
||||
|
||||
|
||||
int mca_io_base_close(void)
|
||||
{
|
||||
/* stop the progress engine */
|
||||
mca_io_base_request_progress_fini();
|
||||
|
||||
/* Destroy the freelist */
|
||||
|
||||
if (mca_io_base_requests_valid) {
|
||||
|
@ -20,10 +20,10 @@
|
||||
#include "mca/io/io.h"
|
||||
#include "mca/io/base/base.h"
|
||||
|
||||
|
||||
/*
|
||||
* Private variables
|
||||
*/
|
||||
static bool initialized = false;
|
||||
static ompi_list_t components_in_use;
|
||||
static ompi_mutex_t mutex;
|
||||
|
||||
@ -44,6 +44,8 @@ int mca_io_base_component_init(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&components_in_use, ompi_list_t);
|
||||
|
||||
initialized = true;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -157,12 +159,15 @@ int mca_io_base_component_del(mca_io_base_components_t *comp)
|
||||
}
|
||||
|
||||
|
||||
int mca_io_base_progress(int *num_pending)
|
||||
/* in this file so that mutex can be static */
|
||||
int mca_io_base_component_run_progress(void)
|
||||
{
|
||||
int ret;
|
||||
int ret, count = 0;
|
||||
ompi_list_item_t *item;
|
||||
component_item_t *citem;
|
||||
|
||||
if (! initialized) return 0;
|
||||
|
||||
OMPI_THREAD_LOCK(&mutex);
|
||||
|
||||
/* Go through all the components and call their progress
|
||||
@ -175,10 +180,9 @@ int mca_io_base_progress(int *num_pending)
|
||||
|
||||
switch (citem->version) {
|
||||
case MCA_IO_BASE_V_1_0_0:
|
||||
ret = citem->component.v1_0_0.io_progress(num_pending);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
OMPI_THREAD_UNLOCK(&mutex);
|
||||
return ret;
|
||||
ret = citem->component.v1_0_0.io_progress();
|
||||
if (ret > 0) {
|
||||
count += ret;
|
||||
}
|
||||
break;
|
||||
|
||||
@ -189,7 +193,7 @@ int mca_io_base_progress(int *num_pending)
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mutex);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
@ -198,6 +202,8 @@ int mca_io_base_progress(int *num_pending)
|
||||
*/
|
||||
int mca_io_base_component_finalize(void)
|
||||
{
|
||||
initialized = false;
|
||||
|
||||
OBJ_DESTRUCT(&components_in_use);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -28,12 +28,6 @@
|
||||
#include "mca/io/base/io_base_request.h"
|
||||
|
||||
|
||||
/*
|
||||
* Global variables
|
||||
*/
|
||||
bool mca_io_base_components_available_valid = false;
|
||||
ompi_list_t mca_io_base_components_available;
|
||||
|
||||
|
||||
/*
|
||||
* Private functions
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "mca/base/mca_base_param.h"
|
||||
#include "mca/io/io.h"
|
||||
#include "mca/io/base/base.h"
|
||||
#include "mca/io/base/io_base_request.h"
|
||||
|
||||
|
||||
/*
|
||||
@ -46,6 +47,9 @@ int mca_io_base_output = -1;
|
||||
bool mca_io_base_components_opened_valid = false;
|
||||
ompi_list_t mca_io_base_components_opened;
|
||||
|
||||
bool mca_io_base_components_available_valid = false;
|
||||
ompi_list_t mca_io_base_components_available;
|
||||
|
||||
|
||||
/*
|
||||
* Function for finding and opening either all MCA components, or the one
|
||||
@ -89,6 +93,10 @@ int mca_io_base_open(void)
|
||||
|
||||
mca_io_base_component_init();
|
||||
|
||||
/* Intialize the request progression code */
|
||||
|
||||
mca_io_base_request_progress_init();
|
||||
|
||||
/* All done */
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -14,6 +14,8 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include <sys/time.h>
|
||||
|
||||
#include "class/ompi_object.h"
|
||||
#include "file/file.h"
|
||||
#include "mca/base/base.h"
|
||||
@ -27,6 +29,7 @@
|
||||
*/
|
||||
bool mca_io_base_requests_valid = false;
|
||||
ompi_free_list_t mca_io_base_requests;
|
||||
volatile int32_t mca_io_base_request_num_pending = 0;
|
||||
|
||||
|
||||
/*
|
||||
@ -44,6 +47,7 @@ OBJ_CLASS_INSTANCE(mca_io_base_request_t,
|
||||
static void io_base_request_constructor(mca_io_base_request_t *req)
|
||||
{
|
||||
req->super.req_type = OMPI_REQUEST_IO;
|
||||
req->free_called = false;
|
||||
}
|
||||
|
||||
|
||||
@ -119,8 +123,12 @@ int mca_io_base_request_alloc(ompi_file_t *file,
|
||||
|
||||
if (ompi_list_get_size(&file->f_io_requests) > 0) {
|
||||
OMPI_THREAD_LOCK(&file->f_io_requests_lock);
|
||||
*req = (mca_io_base_request_t*)
|
||||
ompi_list_remove_first(&file->f_io_requests);
|
||||
if (ompi_list_get_size(&file->f_io_requests) > 0) {
|
||||
*req = (mca_io_base_request_t*)
|
||||
ompi_list_remove_first(&file->f_io_requests);
|
||||
} else {
|
||||
*req = NULL;
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&file->f_io_requests_lock);
|
||||
} else {
|
||||
*req = NULL;
|
||||
@ -146,6 +154,7 @@ int mca_io_base_request_alloc(ompi_file_t *file,
|
||||
|
||||
(*req)->req_file = file;
|
||||
(*req)->req_ver = file->f_io_version;
|
||||
(*req)->free_called = false;
|
||||
(*req)->super.req_fini =
|
||||
file->f_io_selected_module.v1_0_0.io_module_request_fini;
|
||||
(*req)->super.req_free =
|
||||
@ -216,3 +225,102 @@ void mca_io_base_request_return(ompi_file_t *file)
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&file->f_io_requests_lock);
|
||||
}
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
static volatile bool thread_running = false;
|
||||
static volatile bool thread_done = false;
|
||||
static ompi_thread_t progress_thread;
|
||||
static ompi_mutex_t progress_mutex;
|
||||
static ompi_condition_t progress_cond;
|
||||
|
||||
static void*
|
||||
request_progress_thread(ompi_object_t *arg)
|
||||
{
|
||||
struct timespec abstime;
|
||||
struct timeval tv;
|
||||
|
||||
while (! thread_done) {
|
||||
gettimeofday(&tv, NULL);
|
||||
abstime.tv_sec = tv.tv_sec + 1;
|
||||
abstime.tv_nsec = tv.tv_usec * 1000;
|
||||
while (mca_io_base_request_num_pending > 0) {
|
||||
/* do some progress, sleep, repeat */
|
||||
mca_io_base_component_run_progress();
|
||||
sleep(2);
|
||||
}
|
||||
ompi_condition_timedwait(&progress_cond, &progress_mutex, &abstime);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
|
||||
void
|
||||
mca_io_base_request_progress_init()
|
||||
{
|
||||
mca_io_base_request_num_pending = 0;
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
thread_running = false;
|
||||
thread_done = false;
|
||||
|
||||
OBJ_CONSTRUCT(&progress_mutex, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&progress_cond, ompi_condition_t);
|
||||
OBJ_CONSTRUCT(&progress_thread, ompi_thread_t);
|
||||
|
||||
progress_thread.t_run = request_progress_thread;
|
||||
progress_thread.t_arg = NULL;
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
mca_io_base_request_progress_add()
|
||||
{
|
||||
#if OMPI_HAVE_THREADS
|
||||
/* if we don't have a progress thread, make us have a progress
|
||||
thread */
|
||||
if (! thread_running) {
|
||||
OMPI_THREAD_LOCK(&progress_mutex);
|
||||
if (! thread_running) {
|
||||
thread_running = true;
|
||||
ompi_thread_start(&progress_thread);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&progress_mutex);
|
||||
}
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
|
||||
OMPI_THREAD_ADD32(&mca_io_base_request_num_pending, 1);
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
ompi_condition_signal(&progress_cond);
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
mca_io_base_request_progress_del()
|
||||
{
|
||||
OMPI_THREAD_ADD32(&mca_io_base_request_num_pending, -1);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
mca_io_base_request_progress_fini()
|
||||
{
|
||||
void *ret;
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
/* make the helper thread die */
|
||||
thread_done = true;
|
||||
if (thread_running) {
|
||||
ompi_condition_signal(&progress_cond);
|
||||
ompi_thread_join(&progress_thread, &ret);
|
||||
}
|
||||
|
||||
/* clean up */
|
||||
OBJ_DESTRUCT(&progress_thread);
|
||||
OBJ_DESTRUCT(&progress_cond);
|
||||
OBJ_DESTRUCT(&progress_mutex);
|
||||
#endif /* OMPI_HAVE_THREADS */
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "class/ompi_object.h"
|
||||
#include "request/request.h"
|
||||
#include "file/file.h"
|
||||
#include "mca/io/base/base.h"
|
||||
|
||||
/**
|
||||
* Base request type.
|
||||
@ -38,6 +39,9 @@ struct mca_io_base_request_t {
|
||||
request (i.e., this defines what follows this entry in
|
||||
memory) */
|
||||
mca_io_base_version_t req_ver;
|
||||
/** True if free has been called on this request (before it has
|
||||
been finalized */
|
||||
volatile bool free_called;
|
||||
};
|
||||
/**
|
||||
* Convenience typedef
|
||||
@ -111,6 +115,47 @@ extern "C" {
|
||||
void mca_io_base_request_free(ompi_file_t *file,
|
||||
mca_io_base_request_t *req);
|
||||
|
||||
|
||||
/*
|
||||
* count of number of pending requests in the IO subsystem. Should
|
||||
* only be modified with OMPI_THREAD_ADD32. Probably should not be
|
||||
* used outside of IO components. Here only for the progress check
|
||||
* optimzation.
|
||||
*/
|
||||
OMPI_DECLSPEC extern volatile int32_t mca_io_base_request_num_pending;
|
||||
|
||||
/**
|
||||
* Initialize the request progress code
|
||||
*
|
||||
*/
|
||||
void mca_io_base_request_progress_init(void);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
void mca_io_base_request_progress_add(void);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
void mca_io_base_request_progress_del(void);
|
||||
|
||||
/**
|
||||
* Finalize the request progress code
|
||||
*/
|
||||
void mca_io_base_request_progress_fini(void);
|
||||
|
||||
/**
|
||||
* External progress function; invoked from ompi_progress()
|
||||
*/
|
||||
static inline int mca_io_base_request_progress(void)
|
||||
{
|
||||
if (mca_io_base_request_num_pending > 0) {
|
||||
return mca_io_base_component_run_progress();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -41,12 +41,6 @@ struct mca_io_base_module_1_0_0_t;
|
||||
union mca_io_base_modules_t;
|
||||
|
||||
|
||||
/**
|
||||
* External progress function; invoked from ompi_progress()
|
||||
*/
|
||||
int mca_io_base_progress(int *num_requests);
|
||||
|
||||
|
||||
/**
|
||||
* Version of IO component interface that we're using.
|
||||
*
|
||||
@ -102,7 +96,7 @@ typedef int (*mca_io_base_component_file_delete_unselect_fn_t)
|
||||
(char *filename, struct ompi_info_t *info,
|
||||
struct mca_io_base_delete_t *private_data);
|
||||
|
||||
typedef int (*mca_io_base_component_progress_fn_t)(int *num_pending);
|
||||
typedef int (*mca_io_base_component_progress_fn_t)(void);
|
||||
|
||||
/* IO component version and interface functions. */
|
||||
struct mca_io_base_component_1_0_0_t {
|
||||
|
@ -69,8 +69,11 @@ int mca_io_romio_request_cancel(ompi_request_t *req, int flag);
|
||||
* progressed. This macro is ONLY called when the ROMIO mutex is
|
||||
* already held!
|
||||
*/
|
||||
#define MCA_IO_ROMIO_REQUEST_ADD(request) \
|
||||
ompi_list_append(&mca_io_romio_pending_requests, (ompi_list_item_t *) request);
|
||||
#define MCA_IO_ROMIO_REQUEST_ADD(request) \
|
||||
((ompi_request_t*) request)->req_state = OMPI_REQUEST_ACTIVE; \
|
||||
ompi_list_append(&mca_io_romio_pending_requests, (ompi_list_item_t *) request); \
|
||||
mca_io_base_request_progress_add();
|
||||
|
||||
|
||||
/*
|
||||
* mca->ROMIO module routines:
|
||||
|
@ -41,7 +41,7 @@ static int delete_query(char *filename, struct ompi_info_t *info,
|
||||
bool *usable, int *priorty);
|
||||
static int delete_select(char *filename, struct ompi_info_t *info,
|
||||
struct mca_io_base_delete_t *private_data);
|
||||
static int progress(int *count);
|
||||
static int progress(void);
|
||||
|
||||
/*
|
||||
* Private variables
|
||||
@ -238,39 +238,53 @@ static int delete_select(char *filename, struct ompi_info_t *info,
|
||||
}
|
||||
|
||||
|
||||
static int progress(int *count)
|
||||
static int progress()
|
||||
{
|
||||
ompi_list_item_t *item, *next;
|
||||
int ret, flag;
|
||||
int ret, flag, count;
|
||||
ROMIO_PREFIX(MPIO_Request) romio_rq;
|
||||
mca_io_base_request_t *ioreq;
|
||||
|
||||
/* Troll through all pending requests and try to progress them.
|
||||
If a request finishes, remove it from the list. */
|
||||
|
||||
*count = 0;
|
||||
count = 0;
|
||||
OMPI_THREAD_LOCK (&mca_io_romio_mutex);
|
||||
for (item = ompi_list_get_first(&mca_io_romio_pending_requests);
|
||||
item != ompi_list_get_end(&mca_io_romio_pending_requests);
|
||||
item = next) {
|
||||
next = ompi_list_get_next(item);
|
||||
|
||||
ioreq = (mca_io_base_request_t*) item;
|
||||
romio_rq = ((mca_io_romio_request_t *) item)->romio_rq;
|
||||
ret = ROMIO_PREFIX(MPIO_Test)(&romio_rq, &flag,
|
||||
&(((ompi_request_t *) item)->req_status));
|
||||
if (ret < 0) {
|
||||
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
|
||||
return ret;
|
||||
} else if (1 == flag) {
|
||||
++(*count);
|
||||
++count;
|
||||
/* mark as complete (and make sure to wake up any waiters */
|
||||
ompi_request_complete((ompi_request_t*) item);
|
||||
OMPI_REQUEST_FINI((ompi_request_t*) item);
|
||||
/* we're done, so remove us from the pending list */
|
||||
ompi_list_remove_item(&mca_io_romio_pending_requests, item);
|
||||
mca_io_base_request_free(((mca_io_base_request_t *) item)->req_file,
|
||||
(mca_io_base_request_t *) item);
|
||||
mca_io_base_request_progress_del();
|
||||
/* if the request has been freed already, the user isn't
|
||||
* going to call test or wait on us, so we need to do it
|
||||
* here
|
||||
*/
|
||||
if (ioreq->free_called) {
|
||||
ret = ioreq->super.req_fini((ompi_request_t**) &ioreq);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
OMPI_THREAD_UNLOCK(&mca_io_romio_mutex);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
|
||||
|
||||
/* Return how many requests completed */
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return count;
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ mca_io_romio_file_iread_at (ompi_file_t *fh,
|
||||
ompi_datatype_t *datatype,
|
||||
mca_io_base_request_t *request)
|
||||
{
|
||||
int ret;
|
||||
int ret, flag;
|
||||
mca_io_romio_data_t *data;
|
||||
mca_io_romio_request_t *req;
|
||||
|
||||
@ -139,7 +139,7 @@ mca_io_romio_file_iread (ompi_file_t *fh,
|
||||
ompi_datatype_t *datatype,
|
||||
mca_io_base_request_t * request)
|
||||
{
|
||||
int ret;
|
||||
int ret, flag;
|
||||
mca_io_romio_data_t *data;
|
||||
mca_io_romio_request_t *req;
|
||||
|
||||
@ -186,7 +186,7 @@ mca_io_romio_file_iread_shared (ompi_file_t *fh,
|
||||
ompi_datatype_t *datatype,
|
||||
mca_io_base_request_t * request)
|
||||
{
|
||||
int ret;
|
||||
int ret, flag;
|
||||
mca_io_romio_data_t *data;
|
||||
mca_io_romio_request_t *req;
|
||||
|
||||
|
@ -73,7 +73,7 @@ mca_io_romio_file_iwrite_at (ompi_file_t *fh,
|
||||
ompi_datatype_t *datatype,
|
||||
mca_io_base_request_t * request)
|
||||
{
|
||||
int ret;
|
||||
int ret, flag;
|
||||
mca_io_romio_data_t *data;
|
||||
mca_io_romio_request_t *req;
|
||||
|
||||
@ -142,7 +142,7 @@ mca_io_romio_file_iwrite (ompi_file_t *fh,
|
||||
ompi_datatype_t *datatype,
|
||||
mca_io_base_request_t * request)
|
||||
{
|
||||
int ret;
|
||||
int ret, flag;
|
||||
mca_io_romio_data_t *data;
|
||||
mca_io_romio_request_t *req;
|
||||
|
||||
@ -188,7 +188,7 @@ mca_io_romio_file_iwrite_shared (ompi_file_t *fh,
|
||||
ompi_datatype_t *datatype,
|
||||
mca_io_base_request_t * request)
|
||||
{
|
||||
int ret;
|
||||
int ret, flag;
|
||||
mca_io_romio_data_t *data;
|
||||
mca_io_romio_request_t *req;
|
||||
|
||||
|
@ -22,13 +22,41 @@
|
||||
|
||||
int mca_io_romio_request_fini(ompi_request_t **req)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
mca_io_base_request_t *ioreq = *((mca_io_base_request_t**) req);
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_io_romio_mutex);
|
||||
|
||||
/* clean up the fortran stuff, mark us as invalid */
|
||||
OMPI_REQUEST_FINI(*req);
|
||||
/* and shove us back in the free list */
|
||||
mca_io_base_request_free(ioreq->req_file, ioreq);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_io_romio_mutex);
|
||||
|
||||
*req = MPI_REQUEST_NULL;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int mca_io_romio_request_free(ompi_request_t **req)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
mca_io_base_request_t *ioreq = *((mca_io_base_request_t**) req);
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_io_romio_mutex);
|
||||
|
||||
ioreq->free_called = true;
|
||||
|
||||
/* if the thing is done already, finalize it and get out... */
|
||||
if (ioreq->super.req_complete) {
|
||||
ret = ioreq->super.req_fini(req);
|
||||
}
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_io_romio_mutex);
|
||||
|
||||
*req = MPI_REQUEST_NULL;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -37,5 +65,7 @@ int mca_io_romio_request_free(ompi_request_t **req)
|
||||
*/
|
||||
int mca_io_romio_request_cancel(ompi_request_t *req, int flag)
|
||||
{
|
||||
/* BWB - do we really want to return an error here or just a bad
|
||||
flag? */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
@ -66,10 +66,6 @@ int MPI_File_iread(MPI_File fh, void *buf, int count,
|
||||
case MCA_IO_BASE_V_1_0_0:
|
||||
rc = fh->f_io_selected_module.v1_0_0.
|
||||
io_module_file_iread(fh, buf, count, datatype, io_request);
|
||||
if (MPI_SUCCESS == rc) {
|
||||
++ompi_progress_pending_io_reqs;
|
||||
(*request)->req_state = OMPI_REQUEST_ACTIVE;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -67,10 +67,6 @@ int MPI_File_iread_at(MPI_File fh, MPI_Offset offset, void *buf,
|
||||
rc = fh->f_io_selected_module.v1_0_0.
|
||||
io_module_file_iread_at(fh, offset, buf, count, datatype,
|
||||
io_request);
|
||||
if (MPI_SUCCESS == rc) {
|
||||
++ompi_progress_pending_io_reqs;
|
||||
(*request)->req_state = OMPI_REQUEST_ACTIVE;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -66,10 +66,6 @@ int MPI_File_iread_shared(MPI_File fh, void *buf, int count,
|
||||
case MCA_IO_BASE_V_1_0_0:
|
||||
rc = fh->f_io_selected_module.v1_0_0.
|
||||
io_module_file_iread_shared(fh, buf, count, datatype, io_request);
|
||||
if (MPI_SUCCESS == rc) {
|
||||
++ompi_progress_pending_io_reqs;
|
||||
(*request)->req_state = OMPI_REQUEST_ACTIVE;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -66,10 +66,6 @@ int MPI_File_iwrite(MPI_File fh, void *buf, int count, MPI_Datatype
|
||||
case MCA_IO_BASE_V_1_0_0:
|
||||
rc = fh->f_io_selected_module.v1_0_0.
|
||||
io_module_file_iwrite(fh, buf, count, datatype, io_request);
|
||||
if (MPI_SUCCESS == rc) {
|
||||
++ompi_progress_pending_io_reqs;
|
||||
(*request)->req_state = OMPI_REQUEST_ACTIVE;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -68,10 +68,6 @@ int MPI_File_iwrite_at(MPI_File fh, MPI_Offset offset, void *buf,
|
||||
rc = fh->f_io_selected_module.v1_0_0.
|
||||
io_module_file_iwrite_at(fh, offset, buf, count, datatype,
|
||||
io_request);
|
||||
if (MPI_SUCCESS == rc) {
|
||||
++ompi_progress_pending_io_reqs;
|
||||
(*request)->req_state = OMPI_REQUEST_ACTIVE;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -66,10 +66,6 @@ int MPI_File_iwrite_shared(MPI_File fh, void *buf, int count,
|
||||
case MCA_IO_BASE_V_1_0_0:
|
||||
rc = fh->f_io_selected_module.v1_0_0.
|
||||
io_module_file_iwrite_shared(fh, buf, count, datatype, io_request);
|
||||
if (MPI_SUCCESS == rc) {
|
||||
++ompi_progress_pending_io_reqs;
|
||||
(*request)->req_state = OMPI_REQUEST_ACTIVE;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -18,12 +18,11 @@
|
||||
#endif
|
||||
#include "event/event.h"
|
||||
#include "mca/pml/pml.h"
|
||||
#include "mca/io/io.h"
|
||||
#include "mca/io/base/io_base_request.h"
|
||||
#include "runtime/ompi_progress.h"
|
||||
|
||||
|
||||
static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE;
|
||||
int ompi_progress_pending_io_reqs = 0;
|
||||
|
||||
|
||||
void ompi_progress_events(int flag)
|
||||
@ -35,7 +34,7 @@ void ompi_progress_events(int flag)
|
||||
void ompi_progress(void)
|
||||
{
|
||||
/* progress any outstanding communications */
|
||||
int ret, temp, events = 0;
|
||||
int ret, events = 0;
|
||||
#if OMPI_HAVE_THREADS == 0
|
||||
if (ompi_progress_event_flag != 0) {
|
||||
ret = ompi_event_loop(ompi_progress_event_flag);
|
||||
@ -50,11 +49,9 @@ void ompi_progress(void)
|
||||
}
|
||||
|
||||
/* Progress IO requests, if there are any */
|
||||
|
||||
if (ompi_progress_pending_io_reqs > 0) {
|
||||
temp = ompi_progress_pending_io_reqs;
|
||||
mca_io_base_progress(&ompi_progress_pending_io_reqs);
|
||||
events += (temp - ompi_progress_pending_io_reqs);
|
||||
ret = mca_io_base_request_progress();
|
||||
if (ret > 0) {
|
||||
events += ret;
|
||||
}
|
||||
|
||||
#if 1
|
||||
|
@ -18,8 +18,6 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
OMPI_DECLSPEC extern int ompi_progress_pending_io_reqs;
|
||||
|
||||
OMPI_DECLSPEC extern void ompi_progress_events(int);
|
||||
|
||||
OMPI_DECLSPEC extern void ompi_progress(void);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user