1
1

Clean up request handling in the I/O framework to be more consistent with

other request-using frameworks.

 - Rather than having mpi/c/* functions allocate requests explicitly,
   pass the MPI_Request* down to the I/O component and have it 
   perform the allocation.
 - While the I/O base provides a base request which can be used,
   it is not required and all request management occurs within
   the component.
 - Push progress management into the component, rather than having it
   happen in the base.  Progress functions are now easily registered,
   and not all (ie, the one existing) components use progress functions
   in any rational way.

ROMIO switched to generalized requests instead of MPIO_Requests many
moons ago, and Open MPI now uses ROMIO's generalized requests, so there
is no reason to wrap those requests (which are OMPI requests) in another
level of request.

Now the file function passes the MPI_Request* to the ROMIO component,
which passes it to the underlying ROMIO function, which calls 
MPI_Grequest_start to create an OMPI request, which is what gets set
as the request to the user.  Much cleaner.

This patch has two motivations.  One, a whole heck of a lot of code
just got removed, and request handling is now much cleaner for I/O
components.  Two, by adding support for Argonne's proposed generalized
request extensions, we can allow ROMIO to provide async I/O through
generalized requests, which we couldn't rationally do in the old
setup due to the crazy request completion rules.

This commit was SVN r22235.
Этот коммит содержится в:
Brian Barrett 2009-11-26 05:13:43 +00:00
родитель 70a69e796f
Коммит b57b8c5b3f
25 изменённых файлов: 49 добавлений и 1134 удалений

Просмотреть файл

@ -143,8 +143,6 @@ int ompi_file_open(struct ompi_communicator_t *comm, char *filename,
int ompi_file_close(ompi_file_t **file)
{
(*file)->f_flags |= OMPI_FILE_ISCLOSED;
mca_io_base_component_del(&((*file)->f_io_selected_component));
mca_io_base_request_return(*file);
OBJ_RELEASE(*file);
*file = &ompi_mpi_file_null.file;
@ -254,12 +252,6 @@ static void file_constructor(ompi_file_t *file)
sizeof(file->f_io_selected_module));
file->f_io_selected_data = NULL;
/* Construct the io request freelist */
OBJ_CONSTRUCT(&file->f_io_requests, opal_list_t);
/* Construct the per-module io request freelist */
OBJ_CONSTRUCT(&file->f_io_requests_lock, opal_mutex_t);
/* If the user doesn't want us to ever free it, then add an extra
RETAIN here */
@ -280,7 +272,6 @@ static void file_destructor(ompi_file_t *file)
case MCA_IO_BASE_V_2_0_0:
file->f_io_selected_module.v2_0_0.io_module_file_close(file);
break;
default:
/* Should never get here */
break;
@ -316,12 +307,6 @@ static void file_destructor(ompi_file_t *file)
#endif
}
/* Destruct the io request freelist */
OBJ_DESTRUCT(&file->f_io_requests);
/* Destruct the io requests lock */
OBJ_DESTRUCT(&file->f_io_requests_lock);
/* Reset the f_to_c table entry */
if (MPI_UNDEFINED != file->f_f_to_c_index &&

Просмотреть файл

@ -86,12 +86,6 @@ struct ompi_file_t {
/** Allow the selected module to cache data on the file */
struct mca_io_base_file_t *f_io_selected_data;
/** Per-module io request freelist */
opal_list_t f_io_requests;
/** Lock for the per-module io request freelist */
opal_mutex_t f_io_requests_lock;
};
/**
* Convenience typedef

Просмотреть файл

@ -21,7 +21,6 @@ headers += \
base/io_base_request.h
libmca_io_la_SOURCES += \
base/io_base_component_list.c \
base/io_base_close.c \
base/io_base_delete.c \
base/io_base_file_select.c \

Просмотреть файл

@ -179,78 +179,6 @@ BEGIN_C_DECLS
OMPI_DECLSPEC int mca_io_base_delete(char *filename,
struct ompi_info_t *info);
/**
* Initialize the components-in-use list.
*
* @returns OMPI_SUCCESS Always
*
* Creates resources associated with the io framework's
* currently-in-use list.
*/
OMPI_DECLSPEC int mca_io_base_component_init(void);
/**
* Add a comoponent to the io framework's currently-in-use list,
* or increase its refcount if it's already in the list.
*
* @param comp The component being added (union)
*
* Add a component to the list of components that is monitored
* every time we go to make progress on asynchronous requests. If
* the component is already in the list, its reference count will
* be increases.
*
* For asynchronous progress, opal_progress() will call
* mca_io_base_progress(), which will go down the list of active
* io components and call their progress() function.
*
* Since components on this list are refcounted; they must be
* removed with mca_io_base_component_del() for each time that
* they are added with mca_io_base_component_add(). Once their
* refcount reaches 0, they are removed from the list and will not
* be called for asynchronous progress.
*
* This function is protected by a mutex; it is safe to call this
* function from multiple threads simultaneously.
*/
OMPI_DECLSPEC int mca_io_base_component_add(mca_io_base_components_t *comp);
/**
* Decrease the refcount of a component in the io framework's
* currently-in-use list.
*
* @param comp The component to be [potentially] removed from the
* currently-in-use list.
*
* Find a component in the currently-in-use list and decrease its
* refcount. If its refcount goes to 0, it will be removed from
* the list and will not be polled for asynchonous progress.
*
* This function is protected by a mutex; it is safe to call this
* function from multiple threads simultaneously.
*/
OMPI_DECLSPEC int mca_io_base_component_del(mca_io_base_components_t *comp);
/**
* Return all the cached requests on an MPI_File to the global IO
* request freelist.
*
* @param file MPI_File to flush the cache
*
*
*/
OMPI_DECLSPEC void mca_io_base_request_return(struct ompi_file_t *file);
/**
* Shut down the component list
*
* @returns OMPI_SUCCESS Always
*
* Destroys resources associated with the io framework's
* currently-in-use list.
*/
OMPI_DECLSPEC int mca_io_base_component_finalize(void);
/**
* Shut down the io MCA framework.
*
@ -264,8 +192,6 @@ BEGIN_C_DECLS
*/
OMPI_DECLSPEC int mca_io_base_close(void);
OMPI_DECLSPEC int mca_io_base_component_run_progress(void);
OMPI_DECLSPEC int mca_io_base_register_datarep(char *,
MPI_Datarep_conversion_function*,
MPI_Datarep_conversion_function*,
@ -309,15 +235,6 @@ OMPI_DECLSPEC extern bool mca_io_base_components_available_valid;
* process.
*/
OMPI_DECLSPEC extern opal_list_t mca_io_base_components_available;
/**
* Indicator as to whether the freelist of IO requests is valid or
* not.
*/
OMPI_DECLSPEC extern bool mca_io_base_requests_valid;
/**
* Free list of IO requests
*/
OMPI_DECLSPEC extern ompi_free_list_t mca_io_base_requests;
END_C_DECLS
#endif /* MCA_BASE_IO_H */

Просмотреть файл

@ -30,16 +30,6 @@
int mca_io_base_close(void)
{
/* stop the progress engine */
mca_io_base_request_progress_fini();
/* Destroy the freelist */
if (mca_io_base_requests_valid) {
OBJ_DESTRUCT(&mca_io_base_requests);
mca_io_base_requests_valid = false;
}
/* Close all components that are still open. This may be the opened
list (if we're in ompi_info), or it may be the available list (if
we're anywhere else). */
@ -56,10 +46,6 @@ int mca_io_base_close(void)
mca_io_base_components_available_valid = false;
}
/* Destroy some io framework resrouces */
mca_io_base_component_finalize();
/* All done */
return OMPI_SUCCESS;

Просмотреть файл

@ -1,219 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "opal/mca/base/base.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/io/base/base.h"
#include "opal/runtime/opal_progress.h"
/*
* Private variables
*/
static bool initialized = false;
static opal_list_t components_in_use;
static opal_mutex_t mutex;
struct component_item_t {
opal_list_item_t super;
int refcount;
mca_io_base_version_t version;
mca_io_base_components_t component;
};
typedef struct component_item_t component_item_t;
static OBJ_CLASS_INSTANCE(component_item_t, opal_list_item_t, NULL, NULL);
/*
* Initialize this interface
*/
int mca_io_base_component_init(void)
{
OBJ_CONSTRUCT(&components_in_use, opal_list_t);
initialized = true;
opal_progress_register(mca_io_base_component_run_progress);
return OMPI_SUCCESS;
}
/*
* Add a component to the io framework's currently-in-use list, or
* increase its refcount if it's already in the list.
*/
int mca_io_base_component_add(mca_io_base_components_t *comp)
{
opal_list_item_t *item;
component_item_t *citem;
mca_base_component_t *c;
OPAL_THREAD_LOCK(&mutex);
/* Save the component in ref-counted list of compoonents in use.
This is used for the progression of non-blocking IO requests.
If the component is already on the list, just bump up the
refcount. Otherwise, add it to the list with a refcount of
1. */
for (item = opal_list_get_first(&components_in_use);
item != opal_list_get_end(&components_in_use);
item = opal_list_get_next(item)) {
citem = (component_item_t *) item;
/* Note the memory / pointer trickery here: we don't care what
IO version this component is -- all the members of the
mca_io_base_components union have a base of
mca_base_component_t. mca_base_component_compare() will do
all the Right Things to ensure that the components are the
same. */
if (mca_base_component_compare(
(const mca_base_component_t *) &(citem->component),
(const mca_base_component_t *) comp) == 0) {
++citem->refcount;
OBJ_RETAIN(citem);
break;
}
}
/* If we didn't find it, save it */
if (opal_list_get_end(&components_in_use) == item) {
citem = OBJ_NEW(component_item_t);
citem->refcount = 1;
citem->component = *comp;
c = (mca_base_component_t *) (&citem->component);
if (2 == c->mca_type_major_version &&
0 == c->mca_type_minor_version &&
0 == c->mca_type_release_version) {
citem->version = MCA_IO_BASE_V_2_0_0;
} else {
citem->version = MCA_IO_BASE_V_NONE;
}
opal_list_append(&components_in_use, (opal_list_item_t *) citem);
}
OPAL_THREAD_UNLOCK(&mutex);
/* All done */
return OMPI_SUCCESS;
}
/*
* Find a component in the currently-in-use list and decrease its
* refcount. If the refcount goes to 0, remove it from the list.
*/
int mca_io_base_component_del(mca_io_base_components_t *comp)
{
opal_list_item_t *item;
component_item_t *citem;
OPAL_THREAD_LOCK(&mutex);
/* Find the component in the list */
for (item = opal_list_get_first(&components_in_use);
item != opal_list_get_end(&components_in_use);
item = opal_list_get_next(item)) {
citem = (component_item_t *) item;
/* Note the memory / pointer trickery here: we don't care what
IO version this component is -- all the members of the
mca_io_base_components union have a base of
mca_base_component_t. */
if (mca_base_component_compare(
(const mca_base_component_t *) &(citem->component),
(const mca_base_component_t *) comp) == 0) {
--citem->refcount;
if (0 == citem->refcount) {
opal_list_remove_item(&components_in_use,
(opal_list_item_t *) citem);
}
OBJ_RELEASE(citem);
break;
}
}
OPAL_THREAD_UNLOCK(&mutex);
/* All done */
return OMPI_SUCCESS;
}
/* in this file so that mutex can be static */
int mca_io_base_component_run_progress(void)
{
int ret, count = 0;
opal_list_item_t *item;
component_item_t *citem;
if (! initialized) return 0;
OPAL_THREAD_LOCK(&mutex);
/* Go through all the components and call their progress
function */
for (item = opal_list_get_first(&components_in_use);
item != opal_list_get_end(&components_in_use);
item = opal_list_get_next(item)) {
citem = (component_item_t *) item;
switch (citem->version) {
case MCA_IO_BASE_V_2_0_0:
ret = citem->component.v2_0_0.io_progress();
if (ret > 0) {
count += ret;
}
break;
default:
break;
}
}
OPAL_THREAD_UNLOCK(&mutex);
return count;
}
/*
* Initialize this interface
*/
int mca_io_base_component_finalize(void)
{
initialized = false;
opal_progress_unregister(mca_io_base_component_run_progress);
OBJ_DESTRUCT(&components_in_use);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -215,11 +215,6 @@ int mca_io_base_file_select(ompi_file_t *file,
return err;
}
/* Add the component to the list of components that the io
framework is maintaining */
mca_io_base_component_add(&selected.ai_component);
/* Announce the winner */
opal_output_verbose(10, mca_io_base_output,

Просмотреть файл

@ -63,7 +63,6 @@ static int init_query_2_0_0(const mca_base_component_t *ls,
int mca_io_base_find_available(bool enable_progress_threads,
bool enable_mpi_threads)
{
int err;
mca_base_component_priority_list_item_t *entry;
opal_list_item_t *p;
const mca_base_component_t *component;
@ -120,12 +119,6 @@ int mca_io_base_find_available(bool enable_progress_threads,
OBJ_DESTRUCT(&mca_io_base_components_opened);
mca_io_base_components_opened_valid = false;
/* Setup the freelist */
if (OMPI_SUCCESS != (err = mca_io_base_request_create_freelist())) {
return err;
}
/* All done */
return OMPI_SUCCESS;

Просмотреть файл

@ -97,14 +97,6 @@ int mca_io_base_open(void)
mca_io_base_param = mca_base_param_find("io", "base", NULL);
/* Initialize some io framework resrouces */
mca_io_base_component_init();
/* Intialize the request progression code */
mca_io_base_request_progress_init();
/* All done */
return OMPI_SUCCESS;

Просмотреть файл

@ -35,314 +35,16 @@
#include "ompi/mca/io/base/io_base_request.h"
/*
* Public variables
*/
bool mca_io_base_requests_valid = false;
ompi_free_list_t mca_io_base_requests;
volatile int32_t mca_io_base_request_num_pending = 0;
/*
* Private functions
*/
static void io_base_request_constructor(mca_io_base_request_t *req);
OBJ_CLASS_INSTANCE(mca_io_base_request_t,
ompi_request_t,
io_base_request_constructor,
NULL);
static void io_base_request_constructor(mca_io_base_request_t *req)
{
req->super.req_type = OMPI_REQUEST_IO;
req->free_called = false;
}
/*
* Setup the freelist of IO requests. This does not need to be
* protected with a lock because it's called during MPI_INIT.
*/
int mca_io_base_request_create_freelist(void)
{
opal_list_item_t *p;
const mca_base_component_t *component;
const mca_io_base_component_2_0_0_t *v200;
size_t size = 0;
int i, init, incr;
/* Find the maximum additional number of bytes required by all io
components for requests and make that the request size */
for (p = opal_list_get_first(&mca_io_base_components_available);
p != opal_list_get_end(&mca_io_base_components_available);
p = opal_list_get_next(p)) {
component = ((mca_base_component_priority_list_item_t *)
p)->super.cli_component;
/* Only know how to handle v2.0.0 components for now */
if (component->mca_type_major_version == 2 &&
component->mca_type_minor_version == 0 &&
component->mca_type_release_version == 0) {
v200 = (mca_io_base_component_2_0_0_t *) component;
if (v200->io_request_bytes > size) {
size = v200->io_request_bytes;
}
}
}
/* Construct and initialized the freelist of IO requests. */
OBJ_CONSTRUCT(&mca_io_base_requests, ompi_free_list_t);
mca_io_base_requests_valid = true;
i = mca_base_param_find("io", NULL, "base_freelist_initial_size");
mca_base_param_lookup_int(i, &init);
i = mca_base_param_find("io", NULL, "base_freelist_increment");
mca_base_param_lookup_int(i, &incr);
ompi_free_list_init_new(&mca_io_base_requests,
sizeof(mca_io_base_request_t) + size,
CACHE_LINE_SIZE,
OBJ_CLASS(mca_io_base_request_t),
0,CACHE_LINE_SIZE,
init, -1, incr,
NULL);
/* All done */
return OMPI_SUCCESS;
}
/*
* Return a module-specific IO MPI_Request
*/
int mca_io_base_request_alloc(ompi_file_t *file,
mca_io_base_request_t **req)
{
int err;
mca_io_base_module_request_once_init_fn_t func;
ompi_free_list_item_t *item;
/* See if we've got a request on the module's freelist (which is
cached on the file, since there's only one module per
MPI_File). Use a quick-but-not-entirely-accurate (but good
enough) check as a slight optimization to potentially having to
avoid locking and unlocking. */
if (opal_list_get_size(&file->f_io_requests) > 0) {
OPAL_THREAD_LOCK(&file->f_io_requests_lock);
if (opal_list_get_size(&file->f_io_requests) > 0) {
*req = (mca_io_base_request_t*)
opal_list_remove_first(&file->f_io_requests);
(*req)->free_called = false;
} else {
*req = NULL;
}
OPAL_THREAD_UNLOCK(&file->f_io_requests_lock);
} else {
*req = NULL;
}
/* Nope, we didn't have one on the file freelist, so let's get one
off the global freelist */
if (NULL == *req) {
OMPI_FREE_LIST_GET(&mca_io_base_requests, item, err);
*req = (mca_io_base_request_t*) item;
/* Call the per-use init function, if it exists */
switch (file->f_io_version) {
case MCA_IO_BASE_V_2_0_0:
/* These can be set once for this request since this
request will always be used with the same module (and
therefore, the same MPI_File). Note that
(*req)->req_ompi.rq_type is already set by the
constructor. */
(*req)->req_file = file;
(*req)->req_ver = file->f_io_version;
(*req)->free_called = false;
(*req)->super.req_free =
file->f_io_selected_module.v2_0_0.io_module_request_free;
(*req)->super.req_cancel =
file->f_io_selected_module.v2_0_0.io_module_request_cancel;
/* Call the module's once-per process init, if it
exists */
func =
file->f_io_selected_module.v2_0_0.io_module_request_once_init;
if (NULL != func) {
if (OMPI_SUCCESS !=
(err = func(&file->f_io_selected_module, *req))) {
OMPI_FREE_LIST_RETURN(&mca_io_base_requests, item);
return err;
}
}
break;
default:
OMPI_FREE_LIST_RETURN(&mca_io_base_requests, item);
return OMPI_ERR_NOT_IMPLEMENTED;
break;
}
}
/* Initialize the request */
OMPI_REQUEST_INIT(&((*req)->super), false);
(*req)->super.req_mpi_object.file = file;
/*
* Copied from ompi/mca/pml/base/pml_base_recvreq.h:
* always set the req_status.MPI_TAG to ANY_TAG before starting the
* request. This field is used if cancelled to find out if the request
* has been matched or not.
*/
(*req)->super.req_status.MPI_TAG = MPI_ANY_TAG;
(*req)->super.req_status.MPI_ERROR = OMPI_SUCCESS;
(*req)->super.req_status._count = 0;
(*req)->super.req_status._cancelled = 0;
/* All done */
return OMPI_SUCCESS;
}
/*
* Free a module-specific IO MPI_Request
*/
OMPI_DECLSPEC void mca_io_base_request_free(ompi_file_t *file,
mca_io_base_request_t *req)
{
/* Put the request back on the per-module freelist, since it's
been initialized for that module */
OPAL_THREAD_LOCK(&file->f_io_requests_lock);
opal_list_prepend(&file->f_io_requests, (opal_list_item_t*) req);
OPAL_THREAD_UNLOCK(&file->f_io_requests_lock);
}
/*
* Return all the requests in the per-file freelist to the global list
*/
void mca_io_base_request_return(ompi_file_t *file)
{
ompi_free_list_item_t *next;
OPAL_THREAD_LOCK(&file->f_io_requests_lock);
while (NULL != (next = (ompi_free_list_item_t*)
opal_list_remove_first(&file->f_io_requests))) {
OMPI_FREE_LIST_RETURN(&mca_io_base_requests, next);
}
OPAL_THREAD_UNLOCK(&file->f_io_requests_lock);
}
#if OPAL_ENABLE_PROGRESS_THREADS
static volatile bool thread_running = false;
static volatile bool thread_done = false;
static opal_thread_t progress_thread;
static opal_mutex_t progress_mutex;
static opal_condition_t progress_cond;
static void*
request_progress_thread(opal_object_t *arg)
{
struct timespec abstime;
struct timeval tv;
while (! thread_done) {
gettimeofday(&tv, NULL);
abstime.tv_sec = tv.tv_sec + 1;
abstime.tv_nsec = tv.tv_usec * 1000;
while (mca_io_base_request_num_pending > 0) {
/* do some progress, sleep, repeat */
mca_io_base_component_run_progress();
sleep(2);
}
opal_condition_timedwait(&progress_cond, &progress_mutex, &abstime);
}
return NULL;
}
#endif /* OPAL_ENABLE_PROGRESS_THREADS */
void
mca_io_base_request_progress_init(void)
{
mca_io_base_request_num_pending = 0;
#if OPAL_ENABLE_PROGRESS_THREADS
thread_running = false;
thread_done = false;
OBJ_CONSTRUCT(&progress_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&progress_cond, opal_condition_t);
OBJ_CONSTRUCT(&progress_thread, opal_thread_t);
progress_thread.t_run = request_progress_thread;
progress_thread.t_arg = NULL;
#endif /* OPAL_ENABLE_PROGRESS_THREADS */
}
OMPI_DECLSPEC void
mca_io_base_request_progress_add(void)
{
#if OPAL_ENABLE_PROGRESS_THREADS
/* if we don't have a progress thread, make us have a progress
thread */
if (! thread_running) {
OPAL_THREAD_LOCK(&progress_mutex);
if (! thread_running) {
thread_running = true;
opal_thread_start(&progress_thread);
}
OPAL_THREAD_UNLOCK(&progress_mutex);
}
#endif /* OPAL_ENABLE_PROGRESS_THREADS */
OPAL_THREAD_ADD32(&mca_io_base_request_num_pending, 1);
#if OPAL_ENABLE_PROGRESS_THREADS
opal_condition_signal(&progress_cond);
#endif /* OPAL_ENABLE_PROGRESS_THREADS */
}
OMPI_DECLSPEC void
mca_io_base_request_progress_del(void)
{
OPAL_THREAD_ADD32(&mca_io_base_request_num_pending, -1);
}
void
mca_io_base_request_progress_fini(void)
{
#if OPAL_ENABLE_PROGRESS_THREADS
void *ret;
/* make the helper thread die */
thread_done = true;
if (thread_running) {
opal_condition_signal(&progress_cond);
opal_thread_join(&progress_thread, &ret);
}
/* clean up */
OBJ_DESTRUCT(&progress_thread);
OBJ_DESTRUCT(&progress_cond);
OBJ_DESTRUCT(&progress_mutex);
#endif /* OPAL_ENABLE_PROGRESS_THREADS */
}
OBJ_CLASS_INSTANCE(mca_io_base_request_t,
ompi_request_t,
io_base_request_constructor,
NULL);

Просмотреть файл

@ -54,109 +54,12 @@ struct mca_io_base_request_t {
typedef struct mca_io_base_request_t mca_io_base_request_t;
BEGIN_C_DECLS
/**
* Declare the class
*/
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_io_base_request_t);
/**
* Setup freelist of IO requests
*
* @returns OMPI_SUCCESS upon success
*
* Initialize the IO freelist of requests, making each request be
* the size of the base IO request plus the maxiumum number of
* bytes from all the available IO components.
*/
int mca_io_base_request_create_freelist(void);
/**
* Get a new IO request
*
* @param fh The file handle
* @param req Pointer to an IO base request
*
* @returns OMPI_SUCCESS on success
* @returns err otherwise
*
* This function allocates an MPI_Request (ompi_request_t)
* according to the io module that is selected on the file handle.
* Specifically, it mallocs enough contiguous space for an
* ompi_request_t, an mca_io_base_request_t, and the value of
* io_cache_bytes from the selected module. This allows all three
* entities to have space allocated in one contiguous chunk.
*
* This function will either allocate an initialize a new request
* (in which case it will call the module's request init
* function), or it will take an old request off a cache of
* already-created requests. Either way, the return is a new IO
* request that is suitable for use by the selected module.
*
* For optimization reasons, only minimal error checking is
* performed.
*/
int mca_io_base_request_alloc(ompi_file_t *file,
mca_io_base_request_t **req);
/**
* Return a module-specific IO MPI_Request.
*
* @param fh The file handle
* @param req The request to return
*
* Returns a module-specific IO request when it has completed.
* This request may actually be freed (in which case it will call
* the IO module's fini function before freeing it) or it may be
* placed on a freelist.
*
* The req argument is set to MPI_REQUEST_NULL upon return.
*
* For optimization reasons, \em no error checking is performed.
*/
OMPI_DECLSPEC void mca_io_base_request_free(ompi_file_t *file,
mca_io_base_request_t *req);
/*
* count of number of pending requests in the IO subsystem. Should
* only be modified with OPAL_THREAD_ADD32. Probably should not be
* used outside of IO components. Here only for the progress check
* optimzation.
*/
OMPI_DECLSPEC extern volatile int32_t mca_io_base_request_num_pending;
/**
* Initialize the request progress code
*
*/
void mca_io_base_request_progress_init(void);
/**
*
*/
OMPI_DECLSPEC void mca_io_base_request_progress_add(void);
/**
*
*/
OMPI_DECLSPEC void mca_io_base_request_progress_del(void);
/**
* Finalize the request progress code
*/
void mca_io_base_request_progress_fini(void);
/**
* External progress function; invoked from opal_progress()
*/
static inline int mca_io_base_request_progress(void)
{
if (mca_io_base_request_num_pending > 0) {
return mca_io_base_component_run_progress();
}
return 0;
}
END_C_DECLS
#endif

Просмотреть файл

@ -32,12 +32,6 @@ struct mca_io_base_file_t;
struct mca_io_base_delete_t;
/*
* Forward declaration so that we don't create an include file loop.
*/
struct mca_io_base_request_t;
/*
* Forward declarations of things declared in this file
*/
@ -100,8 +94,6 @@ typedef int (*mca_io_base_component_file_delete_unselect_fn_t)
(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t *private_data);
typedef int (*mca_io_base_component_progress_fn_t)(void);
typedef int (*mca_io_base_component_register_datarep_fn_t)(
char *,
MPI_Datarep_conversion_function*,
@ -115,11 +107,6 @@ struct mca_io_base_component_2_0_0_t {
mca_base_component_t io_version;
mca_base_component_data_t io_data;
/** Additional bytes that this module needs to be allocated when
an MPI_Request (ompi_request_t) is allocated. */
size_t io_request_bytes;
mca_io_base_component_init_query_fn_t io_init_query;
mca_io_base_component_file_query_2_0_0_fn_t io_file_query;
mca_io_base_component_file_unquery_fn_t io_file_unquery;
@ -128,8 +115,6 @@ struct mca_io_base_component_2_0_0_t {
mca_io_base_component_file_delete_unselect_fn_t io_delete_unquery;
mca_io_base_component_file_delete_select_fn_t io_delete_select;
mca_io_base_component_progress_fn_t io_progress;
mca_io_base_component_register_datarep_fn_t io_register_datarep;
};
typedef struct mca_io_base_component_2_0_0_t mca_io_base_component_2_0_0_t;
@ -148,55 +133,6 @@ typedef union mca_io_base_components_t mca_io_base_components_t;
* Module v2.0.0
*/
/**
* Initialize a request for use by the io module.
*
* @param module (IN) IO module
* @param request (IN) Pointer to allocated request.
*
* To reduce latency (number of required allocations), the io base
* allocates additional space along with each request that may be used
* by the io module for additional control information. If the io
* module intends to use this space, the io_request_bytes attribute
* should be set to reflect the number of bytes above
* sizeof(mca_io_base_request_t) are needed by the io component. This
* space is allocated contiguously along with the
* mca_io_base_request_t with the space immediately following the base
* request available to the io module.
*
* This init function is called the first time the request is created
* by the io base. On completion of the request, the io base will
* cache the request for later use by the same io module. When the
* request is re-used from the cache, this init function is NOT called
* again.
*/
typedef int (*mca_io_base_module_request_once_init_fn_t)(
union mca_io_base_modules_t* module_union,
struct mca_io_base_request_t* request);
/**
* Cleanup any resources that may have been associated with the
* request by the io module.
*
* @param module (IN) io module
* @param request (IN) Pointer to allocated request.
*
* This function is called when the io base removes a request from the
* io module's cache (due to resource constraints) or the cache limit
* has been reached, prior to re-using the request for another io
* module. This provides the io module the chance to cleanup/release
* any resources cached on the request by the io module.
*/
typedef int (*mca_io_base_module_request_once_finalize_fn_t)(
union mca_io_base_modules_t* module_union,
struct mca_io_base_request_t* request);
typedef int (*mca_io_base_module_request_fini_fn_t)(
struct ompi_file_t *file,
union mca_io_base_modules_t* module_union,
struct mca_io_base_request_t* request);
typedef int (*mca_io_base_module_file_open_fn_t)
(struct ompi_communicator_t *comm, char *filename, int amode,
struct ompi_info_t *info, struct ompi_file_t *fh);
@ -244,11 +180,11 @@ typedef int (*mca_io_base_module_file_write_at_all_fn_t)
typedef int (*mca_io_base_module_file_iread_at_fn_t)
(struct ompi_file_t *fh, MPI_Offset offset, void *buf,
int count, struct ompi_datatype_t *datatype,
struct mca_io_base_request_t *request);
struct ompi_request_t **request);
typedef int (*mca_io_base_module_file_iwrite_at_fn_t)
(struct ompi_file_t *fh, MPI_Offset offset, void *buf,
int count, struct ompi_datatype_t *datatype,
struct mca_io_base_request_t *request);
struct ompi_request_t **request);
typedef int (*mca_io_base_module_file_read_fn_t)
(struct ompi_file_t *fh, void *buf, int count, struct ompi_datatype_t *
@ -264,11 +200,11 @@ typedef int (*mca_io_base_module_file_write_all_fn_t)
datatype, struct ompi_status_public_t *status);
typedef int (*mca_io_base_module_file_iread_fn_t)
(struct ompi_file_t *fh, void *buf, int count, struct ompi_datatype_t *
datatype, struct mca_io_base_request_t *request);
(struct ompi_file_t *fh, void *buf, int count,
struct ompi_datatype_t *datatype, struct ompi_request_t **request);
typedef int (*mca_io_base_module_file_iwrite_fn_t)
(struct ompi_file_t *fh, void *buf, int count,
struct ompi_datatype_t *datatype, struct mca_io_base_request_t *request);
struct ompi_datatype_t *datatype, struct ompi_request_t **request);
typedef int (*mca_io_base_module_file_seek_fn_t)
(struct ompi_file_t *fh, MPI_Offset offset, int whence);
@ -285,10 +221,10 @@ typedef int (*mca_io_base_module_file_write_shared_fn_t)
struct ompi_datatype_t *datatype, struct ompi_status_public_t *status);
typedef int (*mca_io_base_module_file_iread_shared_fn_t)
(struct ompi_file_t *fh, void *buf, int count,
struct ompi_datatype_t *datatype, struct mca_io_base_request_t *request);
struct ompi_datatype_t *datatype, struct ompi_request_t **request);
typedef int (*mca_io_base_module_file_iwrite_shared_fn_t)
(struct ompi_file_t *fh, void *buf, int count,
struct ompi_datatype_t *datatype, struct mca_io_base_request_t *request);
struct ompi_datatype_t *datatype, struct ompi_request_t **request);
typedef int (*mca_io_base_module_file_read_ordered_fn_t)
(struct ompi_file_t *fh, void *buf, int count,
struct ompi_datatype_t *datatype, struct ompi_status_public_t *status);
@ -343,22 +279,6 @@ typedef int (*mca_io_base_module_file_sync_fn_t)(struct ompi_file_t *fh);
struct mca_io_base_module_2_0_0_t {
/** Once-per-process request initializtion function */
mca_io_base_module_request_once_init_fn_t io_module_request_once_init;
/** Once-per-process request finalization function */
mca_io_base_module_request_once_finalize_fn_t io_module_request_once_finalize;
/** Free a request (per usage) */
ompi_request_free_fn_t io_module_request_free;
/** Cancel a request (per usage) */
ompi_request_cancel_fn_t io_module_request_cancel;
/* Back-ends to MPI API calls (pretty much a 1-to-1 mapping) */
mca_io_base_module_file_open_fn_t io_module_file_open;

Просмотреть файл

@ -42,6 +42,5 @@ sources += \
src/io_romio_file_open.c \
src/io_romio_file_read.c \
src/io_romio_file_write.c \
src/io_romio_module.c \
src/io_romio_request.c
src/io_romio_module.c

Просмотреть файл

@ -25,7 +25,6 @@
#include "ompi/request/request.h"
#include "ompi/file/file.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/io/base/io_base_request.h"
#include "romio/adio/include/romioconf.h"
#include "romio/include/mpio.h"
@ -39,20 +38,8 @@ OMPI_DECLSPEC extern mca_io_base_component_2_0_0_t mca_io_romio_component;
*/
extern opal_mutex_t mca_io_romio_mutex;
extern mca_io_base_module_2_0_0_t mca_io_romio_module;
extern opal_list_t mca_io_romio_pending_requests;
OMPI_DECLSPEC extern mca_io_base_component_2_0_0_t mca_io_romio_component;
/*
* The romio component will "inherit" from the mca_io_base_request_t,
* adding its own stuff on to the end.
*/
struct mca_io_romio_request_t {
mca_io_base_request_t super;
MPI_Request romio_rq;
};
typedef struct mca_io_romio_request_t mca_io_romio_request_t;
/*
* Private data for ROMIO modules
*/
@ -66,20 +53,6 @@ typedef struct mca_io_romio_data_t mca_io_romio_data_t;
* Module functions
*/
int mca_io_romio_progress(void);
int mca_io_romio_request_free(ompi_request_t **req);
int mca_io_romio_request_cancel(ompi_request_t *req, int flag);
/**
* Macro to add a request to the list of pending requests to be
* progressed. This macro is ONLY called when the ROMIO mutex is
* already held!
*/
#define MCA_IO_ROMIO_REQUEST_ADD(request) \
((ompi_request_t*) request)->req_state = OMPI_REQUEST_ACTIVE; \
opal_list_append(&mca_io_romio_pending_requests, (opal_list_item_t *) request); \
mca_io_base_request_progress_add();
/*
* mca->ROMIO module routines:
@ -152,13 +125,13 @@ int mca_io_romio_file_iread_at (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
ompi_request_t **request);
int mca_io_romio_file_iwrite_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
ompi_request_t **request);
/* Section 9.4.3 */
int mca_io_romio_file_read (struct ompi_file_t *fh,
@ -185,12 +158,12 @@ int mca_io_romio_file_iread (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
ompi_request_t **request);
int mca_io_romio_file_iwrite (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
ompi_request_t **request);
int mca_io_romio_file_seek (struct ompi_file_t *fh,
MPI_Offset offset,
int whence);
@ -215,12 +188,12 @@ int mca_io_romio_file_iread_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
ompi_request_t **request);
int mca_io_romio_file_iwrite_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
ompi_request_t **request);
int mca_io_romio_file_read_ordered (struct ompi_file_t *fh,
void *buf,
int count,

Просмотреть файл

@ -47,7 +47,6 @@ static int delete_query(char *filename, struct ompi_info_t *info,
bool *usable, int *priorty);
static int delete_select(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t *private_data);
static int progress(void);
static int register_datarep(char *,
MPI_Datarep_conversion_function*,
@ -68,12 +67,6 @@ static int delete_priority_param = -1;
opal_mutex_t mca_io_romio_mutex;
/*
* Global list of requests for this component
*/
opal_list_t mca_io_romio_pending_requests;
/*
* Public string showing this component's version number
*/
@ -99,11 +92,6 @@ mca_io_base_component_2_0_0_t mca_io_romio_component = {
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
/* Additional number of bytes required for this component's
requests */
sizeof(mca_io_romio_request_t) - sizeof(mca_io_base_request_t),
/* Initial configuration / Open a new file */
init_query,
@ -116,9 +104,6 @@ mca_io_base_component_2_0_0_t mca_io_romio_component = {
NULL,
delete_select,
/* Progression of non-blocking requests */
(mca_io_base_component_progress_fn_t) progress,
register_datarep
};
@ -157,22 +142,12 @@ static int open_component(void)
/* Create the mutex */
OBJ_CONSTRUCT(&mca_io_romio_mutex, opal_mutex_t);
/* Create the list of pending requests */
OBJ_CONSTRUCT(&mca_io_romio_pending_requests, opal_list_t);
return OMPI_SUCCESS;
}
static int close_component(void)
{
/* Destroy the list of pending requests */
/* JMS: Good opprotunity here to list out all the IO requests that
were not destroyed / completed upon MPI_FINALIZE */
OBJ_DESTRUCT(&mca_io_romio_pending_requests);
OBJ_DESTRUCT(&mca_io_romio_mutex);
return OMPI_SUCCESS;
@ -266,57 +241,6 @@ static int delete_select(char *filename, struct ompi_info_t *info,
}
static int progress()
{
opal_list_item_t *item, *next;
int ret, flag, count;
MPI_Request romio_rq;
mca_io_base_request_t *ioreq;
/* Troll through all pending requests and try to progress them.
If a request finishes, remove it from the list. */
count = 0;
OPAL_THREAD_LOCK (&mca_io_romio_mutex);
for (item = opal_list_get_first(&mca_io_romio_pending_requests);
item != opal_list_get_end(&mca_io_romio_pending_requests);
item = next) {
next = opal_list_get_next(item);
ioreq = (mca_io_base_request_t*) item;
romio_rq = ((mca_io_romio_request_t *) item)->romio_rq;
ret = MPI_Test(&romio_rq, &flag,
&(((ompi_request_t *) item)->req_status));
if ((0 != ret) || (0 != flag)) {
ioreq->super.req_status.MPI_ERROR = ret;
++count;
/* we're done, so remove us from the pending list */
opal_list_remove_item(&mca_io_romio_pending_requests, item);
/* mark as complete (and make sure to wake up any waiters */
ompi_request_complete((ompi_request_t*) item, true);
mca_io_base_request_progress_del();
/* if the request has been freed already, the user isn't
* going to call test or wait on us, so we need to do it
* here
*/
if (ioreq->free_called) {
ret = ompi_request_free((ompi_request_t**) &ioreq);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_UNLOCK(&mca_io_romio_mutex);
return count;
}
}
}
}
OPAL_THREAD_UNLOCK (&mca_io_romio_mutex);
/* Return how many requests completed */
return count;
}
static int
register_datarep(char * datarep,
MPI_Datarep_conversion_function* read_fn,

Просмотреть файл

@ -72,21 +72,16 @@ mca_io_romio_file_iread_at (ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t *request)
ompi_request_t **request)
{
int ret;
mca_io_romio_data_t *data;
mca_io_romio_request_t *req;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
req = (mca_io_romio_request_t *) request;
OPAL_THREAD_LOCK (&mca_io_romio_mutex);
ret =
ROMIO_PREFIX(MPI_File_iread_at) (data->romio_fh, offset, buf, count,
datatype, &req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
datatype, request);
OPAL_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;
@ -140,21 +135,16 @@ mca_io_romio_file_iread (ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request)
ompi_request_t **request)
{
int ret;
mca_io_romio_data_t *data;
mca_io_romio_request_t *req;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
req = (mca_io_romio_request_t *) request;
OPAL_THREAD_LOCK (&mca_io_romio_mutex);
ret =
ROMIO_PREFIX(MPI_File_iread) (data->romio_fh, buf, count, datatype,
&req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
request);
OPAL_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;
@ -187,21 +177,16 @@ mca_io_romio_file_iread_shared (ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request)
ompi_request_t **request)
{
int ret;
mca_io_romio_data_t *data;
mca_io_romio_request_t *req;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
req = (mca_io_romio_request_t *) request;
OPAL_THREAD_LOCK (&mca_io_romio_mutex);
ret =
ROMIO_PREFIX(MPI_File_iread_shared) (data->romio_fh, buf, count,
datatype, &req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
datatype, request);
OPAL_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;

Просмотреть файл

@ -74,21 +74,16 @@ mca_io_romio_file_iwrite_at (ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request)
ompi_request_t **request)
{
int ret;
mca_io_romio_data_t *data;
mca_io_romio_request_t *req;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
req = (mca_io_romio_request_t *) request;
OPAL_THREAD_LOCK (&mca_io_romio_mutex);
ret =
ROMIO_PREFIX(MPI_File_iwrite_at) (data->romio_fh, offset, buf, count,
datatype, &req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
datatype, request);
OPAL_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;
@ -143,21 +138,16 @@ mca_io_romio_file_iwrite (ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request)
ompi_request_t **request)
{
int ret;
mca_io_romio_data_t *data;
mca_io_romio_request_t *req;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
req = (mca_io_romio_request_t *) request;
OPAL_THREAD_LOCK (&mca_io_romio_mutex);
ret =
ROMIO_PREFIX(MPI_File_iwrite) (data->romio_fh, buf, count, datatype,
&req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
request);
OPAL_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;
@ -189,21 +179,16 @@ mca_io_romio_file_iwrite_shared (ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request)
ompi_request_t **request)
{
int ret;
mca_io_romio_data_t *data;
mca_io_romio_request_t *req;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
req = (mca_io_romio_request_t *) request;
OPAL_THREAD_LOCK (&mca_io_romio_mutex);
ret =
ROMIO_PREFIX(MPI_File_iwrite_shared) (data->romio_fh, buf, count,
datatype, &req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
datatype, request);
OPAL_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;

Просмотреть файл

@ -38,17 +38,6 @@ void ADIOI_Datatype_iscontig(MPI_Datatype datatype, int *flag);
* The ROMIO module operations
*/
mca_io_base_module_2_0_0_t mca_io_romio_module = {
/* Once-per-process request init / finalize functions */
NULL,
NULL,
/* Finalize, free, cancel */
mca_io_romio_request_free,
mca_io_romio_request_cancel,
/* Back end to MPI API calls (pretty much a 1-to-1 mapping) */
mca_io_romio_file_open,

Просмотреть файл

@ -1,57 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "ompi/request/request.h"
#include "ompi/mca/io/base/io_base_request.h"
#include "io_romio.h"
int mca_io_romio_request_free(ompi_request_t **req)
{
mca_io_base_request_t *ioreq = *((mca_io_base_request_t**) req);
OPAL_THREAD_LOCK(&mca_io_romio_mutex);
/* clean up the fortran stuff, mark us as invalid */
OMPI_REQUEST_FINI(*req);
ioreq->free_called = true;
/* if the thing is done already, finalize it and get out... */
if (ioreq->super.req_complete) {
mca_io_base_request_free(ioreq->req_file, ioreq);
}
OPAL_THREAD_UNLOCK(&mca_io_romio_mutex);
*req = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
/*
* ROMIO doesn't allow anything to be cancelled
*/
int mca_io_romio_request_cancel(ompi_request_t *req, int flag)
{
/* BWB - do we really want to return an error here or just a bad
flag? */
return OMPI_ERROR;
}

Просмотреть файл

@ -43,7 +43,6 @@ int MPI_File_iread(MPI_File fh, void *buf, int count,
MPI_Datatype datatype, MPI_Request *request)
{
int rc;
mca_io_base_request_t *io_request;
MEMCHECKER(
memchecker_datatype(datatype);
@ -57,6 +56,8 @@ int MPI_File_iread(MPI_File fh, void *buf, int count,
rc = MPI_ERR_FILE;
} else if (count < 0) {
rc = MPI_ERR_COUNT;
} else if (NULL == request) {
rc = MPI_ERR_REQUEST;
} else {
OMPI_CHECK_DATATYPE_FOR_RECV(rc, datatype, count);
}
@ -65,20 +66,11 @@ int MPI_File_iread(MPI_File fh, void *buf, int count,
OPAL_CR_ENTER_LIBRARY();
/* Get a request */
if (OMPI_SUCCESS != mca_io_base_request_alloc(fh, &io_request)) {
OPAL_CR_EXIT_LIBRARY();
return OMPI_ERRHANDLER_INVOKE(fh, MPI_ERR_NO_MEM, FUNC_NAME);
}
*request = (ompi_request_t*) io_request;
/* Call the back-end io component function */
switch (fh->f_io_version) {
case MCA_IO_BASE_V_2_0_0:
rc = fh->f_io_selected_module.v2_0_0.
io_module_file_iread(fh, buf, count, datatype, io_request);
io_module_file_iread(fh, buf, count, datatype, request);
break;
default:
@ -87,6 +79,5 @@ int MPI_File_iread(MPI_File fh, void *buf, int count,
}
/* All done */
OMPI_ERRHANDLER_RETURN(rc, fh, rc, FUNC_NAME);
}

Просмотреть файл

@ -43,7 +43,6 @@ int MPI_File_iread_at(MPI_File fh, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Request *request)
{
int rc;
mca_io_base_request_t *io_request;
MEMCHECKER(
memchecker_datatype(datatype);
@ -57,6 +56,8 @@ int MPI_File_iread_at(MPI_File fh, MPI_Offset offset, void *buf,
rc = MPI_ERR_FILE;
} else if (count < 0) {
rc = MPI_ERR_COUNT;
} else if (NULL == request) {
rc = MPI_ERR_REQUEST;
} else {
OMPI_CHECK_DATATYPE_FOR_RECV(rc, datatype, count);
}
@ -65,21 +66,12 @@ int MPI_File_iread_at(MPI_File fh, MPI_Offset offset, void *buf,
OPAL_CR_ENTER_LIBRARY();
/* Get a request */
if (OMPI_SUCCESS != mca_io_base_request_alloc(fh, &io_request)) {
OPAL_CR_EXIT_LIBRARY();
return OMPI_ERRHANDLER_INVOKE(fh, MPI_ERR_NO_MEM, FUNC_NAME);
}
*request = (ompi_request_t*) io_request;
/* Call the back-end io component function */
switch (fh->f_io_version) {
case MCA_IO_BASE_V_2_0_0:
rc = fh->f_io_selected_module.v2_0_0.
io_module_file_iread_at(fh, offset, buf, count, datatype,
io_request);
request);
break;
default:

Просмотреть файл

@ -43,7 +43,6 @@ int MPI_File_iread_shared(MPI_File fh, void *buf, int count,
MPI_Datatype datatype, MPI_Request *request)
{
int rc;
mca_io_base_request_t *io_request;
MEMCHECKER(
memchecker_datatype(datatype);
@ -57,6 +56,8 @@ int MPI_File_iread_shared(MPI_File fh, void *buf, int count,
rc = MPI_ERR_FILE;
} else if (count < 0) {
rc = MPI_ERR_COUNT;
} else if (NULL == request) {
rc = MPI_ERR_REQUEST;
} else {
OMPI_CHECK_DATATYPE_FOR_RECV(rc, datatype, count);
}
@ -65,20 +66,11 @@ int MPI_File_iread_shared(MPI_File fh, void *buf, int count,
OPAL_CR_ENTER_LIBRARY();
/* Get a request */
if (OMPI_SUCCESS != mca_io_base_request_alloc(fh, &io_request)) {
OPAL_CR_EXIT_LIBRARY();
return OMPI_ERRHANDLER_INVOKE(fh, MPI_ERR_NO_MEM, FUNC_NAME);
}
*request = (ompi_request_t*) io_request;
/* Call the back-end io component function */
switch (fh->f_io_version) {
case MCA_IO_BASE_V_2_0_0:
rc = fh->f_io_selected_module.v2_0_0.
io_module_file_iread_shared(fh, buf, count, datatype, io_request);
io_module_file_iread_shared(fh, buf, count, datatype, request);
break;
default:

Просмотреть файл

@ -43,7 +43,6 @@ int MPI_File_iwrite(MPI_File fh, void *buf, int count, MPI_Datatype
datatype, MPI_Request *request)
{
int rc;
mca_io_base_request_t *io_request;
MEMCHECKER(
memchecker_datatype(datatype);
@ -58,6 +57,8 @@ int MPI_File_iwrite(MPI_File fh, void *buf, int count, MPI_Datatype
rc = MPI_ERR_FILE;
} else if (count < 0) {
rc = MPI_ERR_COUNT;
} else if (NULL == request) {
rc = MPI_ERR_REQUEST;
} else {
OMPI_CHECK_DATATYPE_FOR_SEND(rc, datatype, count);
}
@ -66,20 +67,11 @@ int MPI_File_iwrite(MPI_File fh, void *buf, int count, MPI_Datatype
OPAL_CR_ENTER_LIBRARY();
/* Get a request */
if (OMPI_SUCCESS != mca_io_base_request_alloc(fh, &io_request)) {
OPAL_CR_EXIT_LIBRARY();
return OMPI_ERRHANDLER_INVOKE(fh, MPI_ERR_NO_MEM, FUNC_NAME);
}
*request = (ompi_request_t*) io_request;
/* Call the back-end io component function */
switch (fh->f_io_version) {
case MCA_IO_BASE_V_2_0_0:
rc = fh->f_io_selected_module.v2_0_0.
io_module_file_iwrite(fh, buf, count, datatype, io_request);
io_module_file_iwrite(fh, buf, count, datatype, request);
break;
default:
@ -88,6 +80,5 @@ int MPI_File_iwrite(MPI_File fh, void *buf, int count, MPI_Datatype
}
/* All done */
OMPI_ERRHANDLER_RETURN(rc, fh, rc, FUNC_NAME);
}

Просмотреть файл

@ -44,7 +44,6 @@ int MPI_File_iwrite_at(MPI_File fh, MPI_Offset offset, void *buf,
MPI_Request *request)
{
int rc;
mca_io_base_request_t *io_request;
MEMCHECKER(
memchecker_datatype(datatype);
@ -59,6 +58,8 @@ int MPI_File_iwrite_at(MPI_File fh, MPI_Offset offset, void *buf,
rc = MPI_ERR_FILE;
} else if (count < 0) {
rc = MPI_ERR_COUNT;
} else if (NULL == request) {
rc = MPI_ERR_REQUEST;
} else {
OMPI_CHECK_DATATYPE_FOR_SEND(rc, datatype, count);
}
@ -67,21 +68,12 @@ int MPI_File_iwrite_at(MPI_File fh, MPI_Offset offset, void *buf,
OPAL_CR_ENTER_LIBRARY();
/* Get a request */
if (OMPI_SUCCESS != mca_io_base_request_alloc(fh, &io_request)) {
OPAL_CR_EXIT_LIBRARY();
return OMPI_ERRHANDLER_INVOKE(fh, MPI_ERR_NO_MEM, FUNC_NAME);
}
*request = (ompi_request_t*) io_request;
/* Call the back-end io component function */
switch (fh->f_io_version) {
case MCA_IO_BASE_V_2_0_0:
rc = fh->f_io_selected_module.v2_0_0.
io_module_file_iwrite_at(fh, offset, buf, count, datatype,
io_request);
request);
break;
default:

Просмотреть файл

@ -43,7 +43,6 @@ int MPI_File_iwrite_shared(MPI_File fh, void *buf, int count,
MPI_Datatype datatype, MPI_Request *request)
{
int rc;
mca_io_base_request_t *io_request;
MEMCHECKER(
memchecker_datatype(datatype);
@ -57,6 +56,8 @@ int MPI_File_iwrite_shared(MPI_File fh, void *buf, int count,
rc = MPI_ERR_FILE;
} else if (count < 0) {
rc = MPI_ERR_COUNT;
} else if (NULL == request) {
rc = MPI_ERR_REQUEST;
} else {
OMPI_CHECK_DATATYPE_FOR_SEND(rc, datatype, count);
}
@ -65,20 +66,11 @@ int MPI_File_iwrite_shared(MPI_File fh, void *buf, int count,
OPAL_CR_ENTER_LIBRARY();
/* Get a request */
if (OMPI_SUCCESS != mca_io_base_request_alloc(fh, &io_request)) {
OPAL_CR_EXIT_LIBRARY();
return OMPI_ERRHANDLER_INVOKE(fh, MPI_ERR_NO_MEM, FUNC_NAME);
}
*request = (ompi_request_t*) io_request;
/* Call the back-end io component function */
switch (fh->f_io_version) {
case MCA_IO_BASE_V_2_0_0:
rc = fh->f_io_selected_module.v2_0_0.
io_module_file_iwrite_shared(fh, buf, count, datatype, io_request);
io_module_file_iwrite_shared(fh, buf, count, datatype, request);
break;
default: