1
1

First cut at non-blocking IO progress. No asychronous progress

support is included because ROMIO is inherently thread-unsafe.

One possible way to have true asynchronous progress would be to use a
progress thread that wakes up and polls at some frequency when there
are non-blocking IO requests pending.  This is pretty icky, though --
it should definitely have an MCA parameter to enable/disable this
functionality, as well as another to control the polling frequency.

This also strengthens the argument that we need a v2 of the io
framework -- one that is not designed to exclusively support ROMIO --
one that does something unimaginably "better" for the parallel MPI-2
IO interface.  :-)

This commit was SVN r3786.
Этот коммит содержится в:
Jeff Squyres 2004-12-12 15:29:29 +00:00
родитель d6e0552080
Коммит ac5f313af8
30 изменённых файлов: 1073 добавлений и 468 удалений

Просмотреть файл

@ -518,11 +518,17 @@ OMPI_DECLSPEC extern MPI_Fint *MPI_F_STATUSES_IGNORE;
#define MPI_ERRORS_RETURN (&ompi_mpi_errors_return)
/*
* For applications still using the old [non-standard] ROMIO conventions
*/
#define MPIO_Request MPI_Request
#define MPIO_Test MPI_Test
#define MPIO_Wait MPI_Wait
/*
* MPI API
*/
/* JMS: Look for missing functions (e.g., MPI_File_*) */
/*
* MPI API
*/
OMPI_DECLSPEC int MPI_Abort(MPI_Comm comm, int errorcode);
OMPI_DECLSPEC int MPI_Accumulate(void *origin_addr, int origin_count, MPI_Datatype origin_datatype,

Просмотреть файл

@ -82,9 +82,6 @@ int ompi_file_open(struct ompi_communicator_t *comm, char *filename,
int ret;
ompi_file_t *file;
/* JMS: This is problematic -- need this to be as large as ROMIO
needs to to be, or we need an additional pointer to hang stuff
off :-( */
file = OBJ_NEW(ompi_file_t);
if (NULL == file) {
return OMPI_ERR_OUT_OF_RESOURCE;
@ -132,6 +129,8 @@ int ompi_file_open(struct ompi_communicator_t *comm, char *filename,
int ompi_file_close(ompi_file_t **file)
{
(*file)->f_flags |= OMPI_FILE_ISCLOSED;
mca_io_base_component_del(&((*file)->f_io_selected_component));
mca_io_base_request_return(*file);
OBJ_RELEASE(*file);
*file = MPI_FILE_NULL;
@ -241,6 +240,9 @@ static void file_constructor(ompi_file_t *file)
sizeof(file->f_io_selected_module));
file->f_io_selected_data = NULL;
/* Construct the io request freelist */
OBJ_CONSTRUCT(&file->f_io_requests, ompi_list_t);
/* If the user doesn't want us to ever free it, then add an extra
RETAIN here */
@ -297,6 +299,10 @@ static void file_destructor(ompi_file_t *file)
#endif
}
/* Destruct the io request freelist */
OBJ_DESTRUCT(&file->f_io_requests);
/* Reset the f_to_c table entry */
if (MPI_UNDEFINED != file->f_f_to_c_index &&

Просмотреть файл

@ -16,6 +16,7 @@
#define OMPI_FILE_H
#include "mpi.h"
#include "class/ompi_list.h"
#include "errhandler/errhandler.h"
#include "threads/mutex.h"
#include "mca/io/io.h"
@ -68,13 +69,23 @@ struct ompi_file_t {
indicates what member to look at in the union, below) */
mca_io_base_version_t f_io_version;
/** The selected component (note that this is a union) -- we need
this to add and remove the component from the list of
components currently in use by the io framework for
progression porpoises. */
mca_io_base_components_t f_io_selected_component;
/** The selected module (note that this is a union) */
mca_io_base_modules_t f_io_selected_module;
/** Allow the selected module to cache data on the file */
struct mca_io_base_file_t *f_io_selected_data;
/* JMS: also put io request freelist here */
/** Per-module io request freelist */
ompi_list_t f_io_requests;
/** Lock for the per-module io request freelist */
ompi_mutex_t f_io_requests_lock;
};
/**
* Convenience typedef

Просмотреть файл

@ -22,6 +22,7 @@ headers = \
libmca_io_base_la_SOURCES = \
$(headers) \
io_base_component_list.c \
io_base_close.c \
io_base_delete.c \
io_base_file_select.c \

Просмотреть файл

@ -25,6 +25,7 @@
#include "mpi.h"
#include "class/ompi_list.h"
#include "class/ompi_free_list.h"
#include "mca/io/io.h"
@ -55,7 +56,7 @@ extern "C" {
* public interface member -- and is only mentioned here for
* completeness.
*/
OMPI_DECLSPEC int mca_io_base_open(void);
OMPI_DECLSPEC int mca_io_base_open(void);
/**
* Create list of available io components.
@ -82,8 +83,8 @@ OMPI_DECLSPEC int mca_io_base_open(void);
* functions -- it is not considered a public interface member --
* and is only mentioned here for completeness.
*/
OMPI_DECLSPEC int mca_io_base_find_available(bool *allow_multi_user_threads,
bool *have_hidden_threads);
OMPI_DECLSPEC int mca_io_base_find_available(bool *allow_multi_user_threads,
bool *have_hidden_threads);
/**
* Select an available component for a new file handle.
@ -135,8 +136,8 @@ OMPI_DECLSPEC int mca_io_base_find_available(bool *allow_multi_user_threads,
* file handle, or no component was selected and an error is
* returned up the stack.
*/
OMPI_DECLSPEC int mca_io_base_file_select(struct ompi_file_t *file,
struct mca_base_component_t *preferred);
OMPI_DECLSPEC int mca_io_base_file_select(struct ompi_file_t *file,
struct mca_base_component_t *preferred);
/**
* Finalize a io component on a specific file handle.
@ -158,7 +159,7 @@ OMPI_DECLSPEC int mca_io_base_file_select(struct ompi_file_t *file,
* mca_io_base_select(), as result of this function, other
* file handles may also be destroyed.
*/
OMPI_DECLSPEC int mca_io_base_file_unselect(struct ompi_file_t *file);
OMPI_DECLSPEC int mca_io_base_file_unselect(struct ompi_file_t *file);
/**
* Invoke a back-end component to delete a file.
@ -172,7 +173,80 @@ OMPI_DECLSPEC int mca_io_base_file_unselect(struct ompi_file_t *file);
* the available components (rather than some pre-selected
* module). See io.h for details.
*/
OMPI_DECLSPEC int mca_io_base_delete(char *filename, struct ompi_info_t *info);
OMPI_DECLSPEC int mca_io_base_delete(char *filename,
struct ompi_info_t *info);
/**
* Initialize the components-in-use list.
*
* @returns OMPI_SUCCESS Always
*
* Creates resources associated with the io framework's
* currently-in-use list.
*/
OMPI_DECLSPEC int mca_io_base_component_init(void);
/**
* Add a comoponent to the io framework's currently-in-use list,
* or increase its refcount if it's already in the list.
*
* @param comp The component being added (union)
*
* Add a component to the list of components that is monitored
* every time we go to make progress on asynchronous requests. If
* the component is already in the list, its reference count will
* be increases.
*
* For asynchronous progress, ompi_progress() will call
* mca_io_base_progress(), which will go down the list of active
* io components and call their progress() function.
*
* Since components on this list are refcounted; they must be
* removed with mca_io_base_component_del() for each time that
* they are added with mca_io_base_component_add(). Once their
* refcount reaches 0, they are removed from the list and will not
* be called for asynchronous progress.
*
* This function is protected by a mutex; it is safe to call this
* function from multiple threads simultaneously.
*/
OMPI_DECLSPEC int mca_io_base_component_add(mca_io_base_components_t *comp);
/**
* Decrease the refcount of a component in the io framework's
* currently-in-use list.
*
* @param comp The component to be [potentially] removed from the
* currently-in-use list.
*
* Find a component in the currently-in-use list and decrease its
* refcount. If its refcount goes to 0, it will be removed from
* the list and will not be polled for asynchonous progress.
*
* This function is protected by a mutex; it is safe to call this
* function from multiple threads simultaneously.
*/
OMPI_DECLSPEC int mca_io_base_component_del(mca_io_base_components_t *comp);
/**
* Return all the cached requests on an MPI_File to the global IO
* request freelist.
*
* @param file MPI_File to flush the cache
*
*
*/
OMPI_DECLSPEC void mca_io_base_request_return(struct ompi_file_t *file);
/**
* Shut down the component list
*
* @returns OMPI_SUCCESS Always
*
* Destroys resources associated with the io framework's
* currently-in-use list.
*/
OMPI_DECLSPEC int mca_io_base_component_finalize(void);
/**
* Shut down the io MCA framework.
@ -185,7 +259,7 @@ OMPI_DECLSPEC int mca_io_base_delete(char *filename, struct ompi_info_t *info);
*
* It must be the last function invoked on the io MCA framework.
*/
OMPI_DECLSPEC int mca_io_base_close(void);
OMPI_DECLSPEC int mca_io_base_close(void);
/*
@ -225,6 +299,15 @@ OMPI_DECLSPEC extern bool mca_io_base_components_available_valid;
* process.
*/
OMPI_DECLSPEC extern ompi_list_t mca_io_base_components_available;
/**
* Indicator as to whether the freelist of IO requests is valid or
* not.
*/
OMPI_DECLSPEC extern bool mca_io_base_requests_valid;
/**
* Free list of IO requests
*/
OMPI_DECLSPEC extern ompi_free_list_t mca_io_base_requests;
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -25,6 +25,13 @@
int mca_io_base_close(void)
{
/* Destroy the freelist */
if (mca_io_base_requests_valid) {
OBJ_DESTRUCT(&mca_io_base_requests);
mca_io_base_requests_valid = false;
}
/* Close all components that are still open. This may be the opened
list (if we're in ompi_info), or it may be the available list (if
we're anywhere else). */
@ -41,6 +48,10 @@ int mca_io_base_close(void)
mca_io_base_components_available_valid = false;
}
/* Destroy some io framework resrouces */
mca_io_base_component_finalize();
/* All done */
return OMPI_SUCCESS;

203
src/mca/io/base/io_base_component_list.c Обычный файл
Просмотреть файл

@ -0,0 +1,203 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "class/ompi_list.h"
#include "threads/mutex.h"
#include "mca/base/base.h"
#include "mca/io/io.h"
#include "mca/io/base/base.h"
/*
* Private variables
*/
static ompi_list_t components_in_use;
static ompi_mutex_t mutex;
struct component_item_t {
ompi_list_item_t super;
int refcount;
mca_io_base_version_t version;
mca_io_base_components_t component;
};
typedef struct component_item_t component_item_t;
static OBJ_CLASS_INSTANCE(component_item_t, ompi_list_item_t, NULL, NULL);
/*
* Initialize this interface
*/
int mca_io_base_component_init(void)
{
OBJ_CONSTRUCT(&components_in_use, ompi_list_t);
return OMPI_SUCCESS;
}
/*
* Add a comoponent to the io framework's currently-in-use list, or
* increase its refcount if it's already in the list.
*/
int mca_io_base_component_add(mca_io_base_components_t *comp)
{
ompi_list_item_t *item;
component_item_t *citem;
mca_base_component_t *c;
OMPI_THREAD_LOCK(&mutex);
/* Save the component in ref-counted list of compoonents in use.
This is used for the progression of non-blocking IO requests.
If the component is already on the list, just bump up the
refcount. Otherwise, add it to the list with a refcount of
1. */
for (item = ompi_list_get_first(&components_in_use);
item != ompi_list_get_end(&components_in_use);
item = ompi_list_get_next(item)) {
citem = (component_item_t *) item;
/* Note the memory / pointer trickery here: we don't care what
IO version this component is -- all the members of the
mca_io_base_components union have a base of
mca_base_component_t. mca_base_component_compare() will do
all the Right Things to ensure that the components are the
same. */
if (mca_base_component_compare(
(const mca_base_component_t *) &(citem->component),
(const mca_base_component_t *) comp) == 0) {
++citem->refcount;
OBJ_RETAIN(citem);
break;
}
}
/* If we didn't find it, save it */
if (ompi_list_get_end(&components_in_use) == item) {
citem = OBJ_NEW(component_item_t);
citem->refcount = 1;
citem->component = *comp;
c = (mca_base_component_t *) (&citem->component);
if (1 == c->mca_type_major_version &&
0 == c->mca_type_minor_version &&
0 == c->mca_type_release_version) {
citem->version = MCA_IO_BASE_V_1_0_0;
} else {
citem->version = MCA_IO_BASE_V_NONE;
}
ompi_list_append(&components_in_use, (ompi_list_item_t *) citem);
}
OMPI_THREAD_UNLOCK(&mutex);
/* All done */
return OMPI_SUCCESS;
}
/*
* Find a component in the currently-in-use list and decrease its
* refcount. If the refcount goes to 0, remove it from the list.
*/
int mca_io_base_component_del(mca_io_base_components_t *comp)
{
ompi_list_item_t *item;
component_item_t *citem;
OMPI_THREAD_LOCK(&mutex);
/* Find the component in the list */
for (item = ompi_list_get_first(&components_in_use);
item != ompi_list_get_end(&components_in_use);
item = ompi_list_get_next(item)) {
citem = (component_item_t *) item;
/* Note the memory / pointer trickery here: we don't care what
IO version this component is -- all the members of the
mca_io_base_components union have a base of
mca_base_component_t. */
if (mca_base_component_compare(
(const mca_base_component_t *) &(citem->component),
(const mca_base_component_t *) comp) == 0) {
--citem->refcount;
if (0 == citem->refcount) {
ompi_list_remove_item(&components_in_use,
(ompi_list_item_t *) citem);
}
OBJ_RELEASE(citem);
break;
}
}
/* All done */
return OMPI_SUCCESS;
}
int mca_io_base_progress(void)
{
int ret, count = 0;
ompi_list_item_t *item;
component_item_t *citem;
OMPI_THREAD_LOCK(&mutex);
/* Go through all the components and call their progress
function */
for (item = ompi_list_get_first(&components_in_use);
item != ompi_list_get_end(&components_in_use);
item = ompi_list_get_next(item)) {
citem = (component_item_t *) item;
switch(citem->version) {
case MCA_IO_BASE_V_1_0_0:
ret = citem->component.v1_0_0.io_progress();
break;
default:
ret = -1;
break;
}
if (ret > 0) {
count += ret;
} else {
return ret;
}
}
OMPI_THREAD_UNLOCK(&mutex);
return count;
}
/*
* Initialize this interface
*/
int mca_io_base_component_finalize(void)
{
OBJ_DESTRUCT(&components_in_use);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -28,6 +28,7 @@
#include "mca/base/base.h"
#include "mca/io/io.h"
#include "mca/io/base/base.h"
#include "mca/io/base/io_base_request.h"
/*
@ -199,6 +200,7 @@ int mca_io_base_file_select(ompi_file_t *file,
/* Save the pointers of the selected module on the ompi_file_t */
file->f_io_version = selected.ai_version;
file->f_io_selected_component = selected.ai_component;
file->f_io_selected_module = selected.ai_module;
file->f_io_selected_data = selected.ai_module_data;
@ -207,7 +209,12 @@ int mca_io_base_file_select(ompi_file_t *file,
if (OMPI_SUCCESS != (err = module_init(file))) {
return err;
}
/* Add the component to the list of components that the io
framework is maintaining */
mca_io_base_component_add(&selected.ai_component);
/* Announce the winner */
ompi_output_verbose(10, mca_io_base_output,
@ -219,11 +226,11 @@ int mca_io_base_file_select(ompi_file_t *file,
/*
* For each module in the list, if it is in the list of names (or the
* list of names is NULL), then check and see if it wants to run, and
* do the resulting priority comparison. Make a list of modules to be
* only those who returned that they want to run, and put them in
* priority order.
* For each component in the list, if it is in the list of names (or
* the list of names is NULL), then check and see if it wants to run,
* and do the resulting priority comparison. Make a list of
* (component, module) tuples (of type avail_io_t) to be only those
* who returned that they want to run, and put them in priority order.
*/
static ompi_list_t *check_components(ompi_list_t *components,
ompi_file_t *file,

Просмотреть файл

@ -25,6 +25,7 @@
#include "mca/base/base.h"
#include "mca/io/io.h"
#include "mca/io/base/base.h"
#include "mca/io/base/io_base_request.h"
/*
@ -32,7 +33,6 @@
*/
bool mca_io_base_components_available_valid = false;
ompi_list_t mca_io_base_components_available;
const mca_io_base_component_1_0_0_t *mca_io_base_basic_component = NULL;
/*
@ -59,6 +59,7 @@ static int init_query_1_0_0(const mca_base_component_t *ls,
int mca_io_base_find_available(bool *allow_multi_user_threads,
bool *have_hidden_threads)
{
int err;
mca_base_component_priority_list_item_t *entry;
ompi_list_item_t *p;
const mca_base_component_t *component;
@ -113,6 +114,12 @@ int mca_io_base_find_available(bool *allow_multi_user_threads,
OBJ_DESTRUCT(&mca_io_base_components_opened);
mca_io_base_components_opened_valid = false;
/* Setup the freelist */
if (OMPI_SUCCESS != (err = mca_io_base_request_create_freelist())) {
return err;
}
/* All done */
return OMPI_SUCCESS;

Просмотреть файл

@ -16,6 +16,7 @@
#include "ompi_config.h"
#include <stdio.h>
#include "class/ompi_free_list.h"
#include "util/output.h"
#include "mca/mca.h"
#include "mca/base/base.h"
@ -56,6 +57,20 @@ int mca_io_base_open(void)
mca_io_base_output = ompi_output_open(NULL);
/* Create some parameters */
if (0 >
mca_base_param_register_int("io", "base", "freelist_initial_size",
"", 16) ||
0 >
mca_base_param_register_int("io", "base", "freelist_max_size",
"", 64) ||
0 >
mca_base_param_register_int("io", "base", "freelist_increment",
"", 16)) {
return OMPI_ERROR;
}
/* Open up all available components */
if (OMPI_SUCCESS !=
@ -69,7 +84,11 @@ int mca_io_base_open(void)
/* Find the index of the MCA "io" param for selection */
mca_io_base_param = mca_base_param_find("io", "base", NULL);
/* Initialize some io framework resrouces */
mca_io_base_component_init();
/* All done */
return OMPI_SUCCESS;

Просмотреть файл

@ -16,9 +16,19 @@
#include "class/ompi_object.h"
#include "file/file.h"
#include "mca/base/base.h"
#include "mca/base/mca_base_param.h"
#include "mca/io/base/base.h"
#include "mca/io/base/io_base_request.h"
/*
* Public variables
*/
bool mca_io_base_requests_valid = false;
ompi_free_list_t mca_io_base_requests;
/*
* Private functions
*/
@ -33,7 +43,61 @@ OBJ_CLASS_INSTANCE(mca_io_base_request_t,
static void io_base_request_constructor(mca_io_base_request_t *req)
{
req->req_ompi.req_type = OMPI_REQUEST_IO;
req->super.req_type = OMPI_REQUEST_IO;
}
/*
* Setup the freelist of IO requests. This does not need to be
* protected with a lock because it's called during MPI_INIT.
*/
int mca_io_base_request_create_freelist(void)
{
ompi_list_item_t *p;
const mca_base_component_t *component;
const mca_io_base_component_1_0_0_t *v100;
size_t size = 0;
int i, init, incr;
/* Find the maximum additional number of bytes required by all io
components for requests and make that the request size */
for (p = ompi_list_get_first(&mca_io_base_components_available);
p != ompi_list_get_end(&mca_io_base_components_available);
p = ompi_list_get_next(p)) {
component = ((mca_base_component_priority_list_item_t *)
p)->super.cli_component;
/* Only know how to handle v1.0.0 components for now */
if (component->mca_type_major_version == 1 &&
component->mca_type_minor_version == 0 &&
component->mca_type_release_version == 0) {
v100 = (mca_io_base_component_1_0_0_t *) component;
if (v100->io_request_bytes > size) {
size = v100->io_request_bytes;
}
}
}
/* Construct and initialized the freelist of IO requests. */
OBJ_CONSTRUCT(&mca_io_base_requests, ompi_free_list_t);
mca_io_base_requests_valid = true;
i = mca_base_param_find("io", "base", "freelist_initial_size");
mca_base_param_lookup_int(i, &init);
i = mca_base_param_find("io", "base", "freelist_increment");
mca_base_param_lookup_int(i, &incr);
ompi_free_list_init(&mca_io_base_requests,
sizeof(mca_io_base_request_t) + size,
OBJ_CLASS(mca_io_base_request_t),
init, -1, incr,
NULL);
/* All done */
return OMPI_SUCCESS;
}
@ -44,41 +108,79 @@ int mca_io_base_request_alloc(ompi_file_t *file,
mca_io_base_request_t **req)
{
int err;
size_t extra;
mca_io_base_module_request_init_fn_t func;
mca_io_base_module_request_once_init_fn_t func;
ompi_list_item_t *item;
/* JMS For the moment, no freelisting */
/* See if we've got a request on the module's freelist (which is
cached on the file, since there's only one module per
MPI_File). Use a quick-but-not-entirely-accurate (but good
enough) check as a slight optimization to potentially having to
avoid locking and unlocking. */
switch (file->f_io_version) {
case MCA_IO_BASE_V_1_0_0:
extra = file->f_io_selected_module.v1_0_0.io_module_cache_bytes;
func = file->f_io_selected_module.v1_0_0.io_module_request_init;
break;
default:
extra = 0;
func = NULL;
break;
if (ompi_list_get_size(&file->f_io_requests) > 0) {
OMPI_THREAD_LOCK(&file->f_io_requests_lock);
*req = (mca_io_base_request_t*)
ompi_list_remove_first(&file->f_io_requests);
OMPI_THREAD_UNLOCK(&file->f_io_requests_lock);
} else {
*req = NULL;
}
/* Nope, we didn't have one on the file freelist, so let's get one
off the global freelist */
/* Malloc out enough space */
*req = malloc(sizeof(mca_io_base_request_t) + extra);
if (NULL == *req) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
OMPI_FREE_LIST_GET(&mca_io_base_requests, item, err);
*req = (mca_io_base_request_t*) item;
/* Construct the object and call the module's request init
function, if it exists */
/* Call the per-use init function, if it exists */
OBJ_CONSTRUCT(req, mca_io_base_request_t);
if (NULL != func) {
if (OMPI_SUCCESS != (err = func(&file->f_io_selected_module, *req))) {
OBJ_RELEASE(*req);
return err;
switch (file->f_io_version) {
case MCA_IO_BASE_V_1_0_0:
/* These can be set once for this request since this
request will always be used with the same module (and
therefore, the same MPI_File). Note that
(*req)->req_ompi.rq_type is already set by the
constructor. */
(*req)->req_file = file;
(*req)->req_ver = file->f_io_version;
(*req)->super.req_fini =
file->f_io_selected_module.v1_0_0.io_module_request_fini;
(*req)->super.req_free =
file->f_io_selected_module.v1_0_0.io_module_request_free;
(*req)->super.req_cancel =
file->f_io_selected_module.v1_0_0.io_module_request_cancel;
/* Call the module's once-per process init, if it
exists */
func =
file->f_io_selected_module.v1_0_0.io_module_request_once_init;
if (NULL != func) {
if (OMPI_SUCCESS !=
(err = func(&file->f_io_selected_module, *req))) {
OMPI_FREE_LIST_RETURN(&mca_io_base_requests, item);
return err;
}
}
break;
default:
OMPI_FREE_LIST_RETURN(&mca_io_base_requests, item);
return OMPI_ERR_NOT_IMPLEMENTED;
break;
}
}
/* Initialize the request */
OMPI_REQUEST_INIT(&((*req)->super));
/* All done */
return OMPI_SUCCESS;
}
@ -87,25 +189,30 @@ int mca_io_base_request_alloc(ompi_file_t *file,
* Free a module-specific IO MPI_Request
*/
void mca_io_base_request_free(ompi_file_t *file,
mca_io_base_request_t **req)
mca_io_base_request_t *req)
{
mca_io_base_module_request_finalize_fn_t func;
/* Put the request back on the per-module freelist, since it's
been initialized for that module */
/* JMS For the moment, no freelisting */
switch (file->f_io_version) {
case MCA_IO_BASE_V_1_0_0:
func = file->f_io_selected_module.v1_0_0.io_module_request_finalize;
if (NULL != func) {
func(&file->f_io_selected_module, *req);
}
break;
default:
break;
}
OBJ_RELEASE(*req);
*req = (mca_io_base_request_t*) MPI_REQUEST_NULL;
OMPI_THREAD_LOCK(&file->f_io_requests_lock);
ompi_list_prepend(&file->f_io_requests, (ompi_list_item_t*) req);
OMPI_THREAD_UNLOCK(&file->f_io_requests_lock);
}
/*
* Return all the requests in the per-file freelist to the global list
*/
void mca_io_base_request_return(ompi_file_t *file)
{
ompi_list_item_t *p, *next;
OMPI_THREAD_LOCK(&file->f_io_requests_lock);
for (p = ompi_list_get_first(&file->f_io_requests);
p != ompi_list_get_end(&file->f_io_requests);
p = next) {
next = ompi_list_get_next(p);
OMPI_FREE_LIST_RETURN(&mca_io_base_requests, p);
}
OMPI_THREAD_UNLOCK(&file->f_io_requests_lock);
}

Просмотреть файл

@ -29,24 +29,22 @@
*/
struct mca_io_base_request_t {
/** Base request */
ompi_request_t req_ompi;
/** io component version number of the module that owns this
request */
mca_io_base_version_t req_ver;
ompi_request_t super;
/** ompi_file_t of the file that owns this request */
ompi_file_t *req_file;
/* JMS ...nothing more needed for io v1.x, but will likely need
more for io v2.x -- probably need to keep other things,
analogout to the pml_base_request_t */
/** io component version number of the module that owns this
request (i.e., this defines what follows this entry in
memory) */
mca_io_base_version_t req_ver;
};
/**
* Convenience typedef
*/
typedef struct mca_io_base_request_t mca_io_base_request_t;
/**
* Declare the class
*/
@ -56,13 +54,24 @@ OBJ_CLASS_DECLARATION(mca_io_base_request_t);
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* Setup freelist of IO requests
*
* @returns OMPI_SUCCESS upon success
*
* Initialize the IO freelist of requests, making each request be
* the size of the base IO request plus the maxiumum number of
* bytes from all the available IO components.
*/
int mca_io_base_request_create_freelist(void);
/**
* Get a new IO request
*
* @param fh The file handle
* @param req Pointer to an IO base request
*
* @returns MPI_SUCCESS on success
* @returns OMPI_SUCCESS on success
* @returns err otherwise
*
* This function allocates an MPI_Request (ompi_request_t)
@ -100,7 +109,7 @@ extern "C" {
* For optimization reasons, \em no error checking is performed.
*/
void mca_io_base_request_free(ompi_file_t *file,
mca_io_base_request_t **req);
mca_io_base_request_t *req);
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -16,25 +16,37 @@
#define MCA_IO_H
#include "mca/mca.h"
#include "request/request.h"
#include "datatype/datatype.h"
/*
* Forward declaration for private data on io components and modules.
*/
struct ompi_file_t;
struct mca_io_base_file_t;
struct mca_io_base_delete_t;
/*
* Forward declataion so that we don't create an include file loop.
*/
struct mca_io_base_request_t;
/* * Forward declarations of things declared in this file
/*
* Forward declarations of things declared in this file
*/
struct mca_io_base_module_1_0_0_t;
union mca_io_base_modules_t;
/**
* External progress function; invoked from ompi_progress()
*/
int mca_io_base_progress(void);
/**
* Version of IO component interface that we're using.
*
@ -90,11 +102,18 @@ typedef int (*mca_io_base_component_file_delete_unselect_fn_t)
(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t *private_data);
typedef int (*mca_io_base_component_progress_fn_t)(void);
/* IO component version and interface functions. */
struct mca_io_base_component_1_0_0_t {
mca_base_component_t io_version;
mca_base_component_data_1_0_0_t io_data;
/** Additional bytes that this module needs to be allocated when
an MPI_Request (ompi_request_t) is allocated. */
size_t io_request_bytes;
mca_io_base_component_init_query_fn_t io_init_query;
mca_io_base_component_file_query_1_0_0_fn_t io_file_query;
mca_io_base_component_file_unquery_fn_t io_file_unquery;
@ -102,6 +121,8 @@ struct mca_io_base_component_1_0_0_t {
mca_io_base_component_file_delete_query_fn_t io_delete_query;
mca_io_base_component_file_delete_unselect_fn_t io_delete_unquery;
mca_io_base_component_file_delete_select_fn_t io_delete_select;
mca_io_base_component_progress_fn_t io_progress;
};
typedef struct mca_io_base_component_1_0_0_t mca_io_base_component_1_0_0_t;
@ -128,11 +149,12 @@ typedef union mca_io_base_components_t mca_io_base_components_t;
* To reduce latency (number of required allocations), the io base
* allocates additional space along with each request that may be used
* by the io module for additional control information. If the io
* module intends to use this space, the io_module_cache_bytes
* attributes should be set to reflect the number of bytes needed by
* the io module on a per-request basis. This space is allocated
* contiguously along with the mca_io_base_request_t with the space
* immediately following the base request available to the io module.
* module intends to use this space, the io_request_bytes attribute
* should be set to reflect the number of bytes above
* sizeof(mca_io_base_request_t) are needed by the io component. This
* space is allocated contiguously along with the
* mca_io_base_request_t with the space immediately following the base
* request available to the io module.
*
* This init function is called the first time the request is created
* by the io base. On completion of the request, the io base will
@ -140,7 +162,7 @@ typedef union mca_io_base_components_t mca_io_base_components_t;
* request is re-used from the cache, this init function is NOT called
* again.
*/
typedef int (*mca_io_base_module_request_init_fn_t)(
typedef int (*mca_io_base_module_request_once_init_fn_t)(
union mca_io_base_modules_t* module_union,
struct mca_io_base_request_t* request);
@ -152,14 +174,18 @@ typedef int (*mca_io_base_module_request_init_fn_t)(
* @param module (IN) io module
* @param request (IN) Pointer to allocated request.
*
* The fini function is called when the io base removes a request from
* the io module's cache (due to resource constraints) or the cache
* limit has been reached, prior to re-using the request for another
* io module. This provides the io module the chance to
* cleanup/release any resources cached on the request by the io
* module.
* This function is called when the io base removes a request from the
* io module's cache (due to resource constraints) or the cache limit
* has been reached, prior to re-using the request for another io
* module. This provides the io module the chance to cleanup/release
* any resources cached on the request by the io module.
*/
typedef int (*mca_io_base_module_request_finalize_fn_t)(
typedef int (*mca_io_base_module_request_once_finalize_fn_t)(
union mca_io_base_modules_t* module_union,
struct mca_io_base_request_t* request);
typedef int (*mca_io_base_module_request_fini_fn_t)(
struct ompi_file_t *file,
union mca_io_base_modules_t* module_union,
struct mca_io_base_request_t* request);
@ -307,26 +333,27 @@ typedef int (*mca_io_base_module_file_get_atomicity_fn_t)
(struct ompi_file_t *fh, int *flag);
typedef int (*mca_io_base_module_file_sync_fn_t)(struct ompi_file_t *fh);
typedef int (*mca_io_base_module_test_fn_t)
(struct mca_io_base_request_t *request, int *flag,
struct ompi_status_public_t *status);
typedef int (*mca_io_base_module_wait_fn_t)
(struct mca_io_base_request_t *request,
struct ompi_status_public_t *status);
struct mca_io_base_module_1_0_0_t {
/* Additional data */
/** Once-per-process request initializtion function */
/** Maximum number of requests to hold in the cache */
size_t io_module_cache_size;
mca_io_base_module_request_once_init_fn_t io_module_request_once_init;
/** Additional bytes that this module needs to be allocated when
an MPI_Request (ompi_request_t) is allocated. */
size_t io_module_cache_bytes;
/** Once-per-process request finalization function */
mca_io_base_module_request_init_fn_t io_module_request_init;
mca_io_base_module_request_finalize_fn_t io_module_request_finalize;
mca_io_base_module_request_once_finalize_fn_t io_module_request_once_finalize;
/** Finalize a request (per usage) */
ompi_request_free_fn_t io_module_request_fini;
/** Free a request (per usage) */
ompi_request_free_fn_t io_module_request_free;
/** Cancel a request (per usage) */
ompi_request_cancel_fn_t io_module_request_cancel;
/* Back-ends to MPI API calls (pretty much a 1-to-1 mapping) */
@ -390,9 +417,6 @@ struct mca_io_base_module_1_0_0_t {
mca_io_base_module_file_set_atomicity_fn_t io_module_file_set_atomicity;
mca_io_base_module_file_get_atomicity_fn_t io_module_file_get_atomicity;
mca_io_base_module_file_sync_fn_t io_module_file_sync;
mca_io_base_module_test_fn_t io_module_file_fn_test;
mca_io_base_module_wait_fn_t io_module_file_wait;
};
typedef struct mca_io_base_module_1_0_0_t mca_io_base_module_1_0_0_t;

Просмотреть файл

@ -29,7 +29,7 @@ libmca_io_romio_la_SOURCES = \
io_romio_component.c \
io_romio_file_open.c \
io_romio_file_read.c \
io_romio_file_request.c \
io_romio_file_write.c \
io_romio_module.c
io_romio_module.c \
io_romio_request.c

Просмотреть файл

@ -32,6 +32,7 @@ extern "C" {
*/
extern ompi_mutex_t mca_io_romio_mutex;
extern mca_io_base_module_1_0_0_t mca_io_romio_module;
extern ompi_list_t mca_io_romio_pending_requests;
/*
@ -56,230 +57,237 @@ typedef struct mca_io_romio_data_t mca_io_romio_data_t;
/*
* Module functions
*
*/
int mca_io_romio_progress(void);
int mca_io_romio_request_fini(ompi_request_t **req);
int mca_io_romio_request_free(ompi_request_t **req);
int mca_io_romio_request_cancel(ompi_request_t *req, int flag);
/**
* Macro to add a request to the list of pending requests to be
* progressed. This macro is ONLY called when the ROMIO mutex is
* already held!
*/
#define MCA_IO_ROMIO_REQUEST_ADD(request) \
ompi_list_append(&mca_io_romio_pending_requests, (ompi_list_item_t *) request);
/*
* mca->ROMIO module routines:
* ROMIO_PREFIX(file_XXX)
* ROMIO operations names:
* ROMIO_PREFIX(MPI_File_XXX)
*/
/* Section 9.2 */
int mca_io_romio_file_open (struct ompi_communicator_t *comm,
char *filename,
int amode,
struct ompi_info_t *info,
ompi_file_t *fh);
int mca_io_romio_file_close (struct ompi_file_t *fh);
int mca_io_romio_file_delete (char *filename,
struct ompi_info_t *info);
int mca_io_romio_file_set_size (struct ompi_file_t *fh,
MPI_Offset size);
int mca_io_romio_file_preallocate (struct ompi_file_t *fh,
MPI_Offset size);
int mca_io_romio_file_get_size (struct ompi_file_t *fh,
MPI_Offset * size);
int mca_io_romio_file_get_amode (struct ompi_file_t *fh,
int *amode);
int mca_io_romio_file_set_info (struct ompi_file_t *fh,
struct ompi_info_t *info);
int mca_io_romio_file_get_info (struct ompi_file_t *fh,
struct ompi_info_t ** info_used);
int mca_io_romio_file_open (struct ompi_communicator_t *comm,
char *filename,
int amode,
struct ompi_info_t *info,
ompi_file_t *fh);
int mca_io_romio_file_close (struct ompi_file_t *fh);
int mca_io_romio_file_delete (char *filename,
struct ompi_info_t *info);
int mca_io_romio_file_set_size (struct ompi_file_t *fh,
MPI_Offset size);
int mca_io_romio_file_preallocate (struct ompi_file_t *fh,
MPI_Offset size);
int mca_io_romio_file_get_size (struct ompi_file_t *fh,
MPI_Offset * size);
int mca_io_romio_file_get_amode (struct ompi_file_t *fh,
int *amode);
int mca_io_romio_file_set_info (struct ompi_file_t *fh,
struct ompi_info_t *info);
int mca_io_romio_file_get_info (struct ompi_file_t *fh,
struct ompi_info_t ** info_used);
/* Section 9.3 */
int mca_io_romio_file_set_view (struct ompi_file_t *fh,
MPI_Offset disp,
struct ompi_datatype_t *etype,
struct ompi_datatype_t *filetype,
char *datarep,
struct ompi_info_t *info);
int mca_io_romio_file_get_view (struct ompi_file_t *fh,
MPI_Offset * disp,
struct ompi_datatype_t ** etype,
struct ompi_datatype_t ** filetype,
char *datarep);
int mca_io_romio_file_set_view (struct ompi_file_t *fh,
MPI_Offset disp,
struct ompi_datatype_t *etype,
struct ompi_datatype_t *filetype,
char *datarep,
struct ompi_info_t *info);
int mca_io_romio_file_get_view (struct ompi_file_t *fh,
MPI_Offset * disp,
struct ompi_datatype_t ** etype,
struct ompi_datatype_t ** filetype,
char *datarep);
/* Section 9.4.2 */
int mca_io_romio_file_read_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_read_at_all (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_at_all (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_iread_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_iwrite_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
/* Section 9.4.3 */
int mca_io_romio_file_read (struct ompi_file_t *fh,
int mca_io_romio_file_read_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_read_at_all (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_at_all (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_read_all (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_all (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_iread (struct ompi_file_t *fh,
int mca_io_romio_file_iread_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_iwrite_at (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
/* Section 9.4.3 */
int mca_io_romio_file_read (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_read_all (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_all (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_iread (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_iwrite (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_seek (struct ompi_file_t *fh,
MPI_Offset offset,
int whence);
int mca_io_romio_file_get_position (struct ompi_file_t *fh,
MPI_Offset * offset);
int mca_io_romio_file_get_byte_offset (struct ompi_file_t *fh,
MPI_Offset offset,
MPI_Offset * disp);
/* Section 9.4.4 */
int mca_io_romio_file_read_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_iread_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_iwrite_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_iwrite (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_seek (struct ompi_file_t *fh,
MPI_Offset offset,
int whence);
int mca_io_romio_file_get_position (struct ompi_file_t *fh,
MPI_Offset * offset);
int mca_io_romio_file_get_byte_offset (struct ompi_file_t *fh,
MPI_Offset offset,
MPI_Offset * disp);
/* Section 9.4.4 */
int mca_io_romio_file_read_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_iread_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_iwrite_shared (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
mca_io_base_request_t * request);
int mca_io_romio_file_read_ordered (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_ordered (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_seek_shared (struct ompi_file_t *fh,
MPI_Offset offset,
int whence);
int mca_io_romio_file_get_position_shared (struct ompi_file_t *fh,
MPI_Offset * offset);
int mca_io_romio_file_read_ordered (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_write_ordered (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
int mca_io_romio_file_seek_shared (struct ompi_file_t *fh,
MPI_Offset offset,
int whence);
int mca_io_romio_file_get_position_shared (struct ompi_file_t *fh,
MPI_Offset * offset);
/* Section 9.4.5 */
int mca_io_romio_file_read_at_all_begin (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_read_at_all_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_write_at_all_begin (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_write_at_all_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_read_all_begin (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_read_all_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_write_all_begin (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_write_all_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_read_ordered_begin (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_read_ordered_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_write_ordered_begin (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_write_ordered_end (struct ompi_file_t *fh,
void *buf,
struct ompi_status_public_t * status);
int mca_io_romio_file_read_at_all_begin (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_read_at_all_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_write_at_all_begin (struct ompi_file_t *fh,
MPI_Offset offset,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_write_at_all_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_read_all_begin (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_read_all_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_write_all_begin (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_write_all_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_read_ordered_begin (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_read_ordered_end (struct ompi_file_t *fh,
void *buf,
ompi_status_public_t * status);
int mca_io_romio_file_write_ordered_begin (struct ompi_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype);
int mca_io_romio_file_write_ordered_end (struct ompi_file_t *fh,
void *buf,
struct ompi_status_public_t * status);
/* Section 9.5.1 */
int mca_io_romio_file_get_type_extent (struct ompi_file_t *fh,
struct ompi_datatype_t *datatype,
MPI_Aint * extent);
int mca_io_romio_file_get_type_extent (struct ompi_file_t *fh,
struct ompi_datatype_t *datatype,
MPI_Aint * extent);
/* Section 9.6.1 */
int mca_io_romio_file_set_atomicity (struct ompi_file_t *fh,
int flag);
int mca_io_romio_file_get_atomicity (struct ompi_file_t *fh,
int *flag);
int mca_io_romio_file_sync (struct ompi_file_t *fh);
int mca_io_romio_file_set_atomicity (struct ompi_file_t *fh,
int flag);
int mca_io_romio_file_get_atomicity (struct ompi_file_t *fh,
int *flag);
int mca_io_romio_file_sync (struct ompi_file_t *fh);
/* End Prototypes */
/* The funtions will not be called by users, but by OMPI's
* MPI_Test/Wait functions when they are called with an I/O request.
*/
int mca_io_romio_test (mca_io_base_request_t * request,
int *flag,
ompi_status_public_t * status);
int mca_io_romio_wait (mca_io_base_request_t * request,
ompi_status_public_t * status);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -14,6 +14,8 @@
#include "ompi_config.h"
#include "mpi.h"
#include "class/ompi_list.h"
#include "mca/base/base.h"
#include "mca/io/io.h"
#include "io_romio.h"
#include "io-romio-version.h"
@ -22,6 +24,8 @@
/*
* Private functions
*/
static int open_component(void);
static int close_component(void);
static int init_query(bool *enable_multi_user_threads,
bool *have_hidden_threads);
static const struct mca_io_base_module_1_0_0_t *
@ -36,6 +40,25 @@ static int delete_query(char *filename, struct ompi_info_t *info,
bool *usable, int *priorty);
static int delete_select(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t *private_data);
static int progress(void);
/*
* Private variables
*/
static int priority_param = -1;
static int delete_priority_param = -1;
/*
* Global, component-wide ROMIO mutex because ROMIO is not thread safe
*/
ompi_mutex_t mca_io_romio_mutex;
/*
* Global list of requests for this component
*/
ompi_list_t mca_io_romio_pending_requests;
/*
@ -57,8 +80,8 @@ mca_io_base_component_1_0_0_t mca_io_romio_component = {
MCA_io_romio_MAJOR_VERSION,
MCA_io_romio_MINOR_VERSION,
MCA_io_romio_RELEASE_VERSION,
NULL,
NULL
open_component,
close_component,
},
/* Next the MCA v1.0.0 component meta data */
@ -68,6 +91,11 @@ mca_io_base_component_1_0_0_t mca_io_romio_component = {
false
},
/* Additional number of bytes required for this component's
requests */
sizeof(mca_io_romio_request_t) - sizeof(mca_io_base_request_t),
/* Initial configuration / Open a new file */
init_query,
@ -78,10 +106,44 @@ mca_io_base_component_1_0_0_t mca_io_romio_component = {
delete_query,
NULL,
delete_select
delete_select,
/* Progression of non-blocking requests */
progress
};
static int open_component(void)
{
/* Use a low priority, but allow other components to be lower */
priority_param =
mca_base_param_register_int("io", "romio", "priority", NULL, 10);
delete_priority_param =
mca_base_param_register_int("io", "romio", "delete_priority",
NULL, 10);
/* Create the list of pending requests */
OBJ_CONSTRUCT(&mca_io_romio_pending_requests, ompi_list_t);
return OMPI_SUCCESS;
}
static int close_component(void)
{
/* Destroy the list of pending requests */
/* JMS: Good opprotunity here to list out all the IO requests that
were not destroyed / completed upon MPI_FINALIZE */
OBJ_DESTRUCT(&mca_io_romio_pending_requests);
return OMPI_SUCCESS;
}
static int init_query(bool *allow_multi_user_threads,
bool *have_hidden_threads)
{
@ -99,6 +161,13 @@ file_query(struct ompi_file_t *file,
{
mca_io_romio_data_t *data;
/* Lookup our priority */
if (OMPI_SUCCESS != mca_base_param_lookup_int(priority_param,
priority)) {
return NULL;
}
/* Allocate a space for this module to hang private data (e.g.,
the ROMIO file handle) */
@ -109,10 +178,8 @@ file_query(struct ompi_file_t *file,
data->romio_fh = NULL;
*private_data = (struct mca_io_base_file_t*) data;
/* The priority level of 20 is pretty arbitrary, since this
component is likely to be the only one in io v1.x */
/* All done */
*priority = 20;
return &mca_io_romio_module;
}
@ -135,8 +202,14 @@ static int delete_query(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t **private_data,
bool *usable, int *priority)
{
/* Lookup our priority */
if (OMPI_SUCCESS != mca_base_param_lookup_int(delete_priority_param,
priority)) {
return OMPI_ERROR;
}
*usable = true;
*priority = 10;
*private_data = NULL;
return OMPI_SUCCESS;
@ -154,3 +227,40 @@ static int delete_select(char *filename, struct ompi_info_t *info,
return ret;
}
static int progress(void)
{
ompi_list_item_t *item, *next;
int ret, flag, count = 0;
ROMIO_PREFIX(MPIO_Request) romio_rq;
/* Troll through all pending requests and try to progress them.
If a request finishes, remove it from the list. */
OMPI_THREAD_LOCK (&mca_io_romio_mutex);
for (item = ompi_list_get_first(&mca_io_romio_pending_requests);
item != ompi_list_get_end(&mca_io_romio_pending_requests);
item = next) {
next = ompi_list_get_next(item);
romio_rq = ((mca_io_romio_request_t *) item)->romio_rq;
ret = ROMIO_PREFIX(MPIO_Test)(&romio_rq, &flag,
&(((ompi_request_t *) item)->req_status));
if (ret < 0) {
return ret;
} else if (1 == flag) {
++count;
ompi_request_complete((ompi_request_t*) item);
OMPI_REQUEST_FINI((ompi_request_t*) item);
ompi_list_remove_item(&mca_io_romio_pending_requests, item);
mca_io_base_request_free(((mca_io_base_request_t *) item)->req_file,
(mca_io_base_request_t *) item);
}
}
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
/* Return how many requests completed */
return count;
}

Просмотреть файл

@ -17,7 +17,6 @@
#include "communicator/communicator.h"
#include "info/info.h"
#include "file/file.h"
#include "request/request.h"
#include "io_romio.h"
@ -29,7 +28,7 @@ mca_io_romio_file_open (ompi_communicator_t *comm,
ompi_info_t *info,
ompi_file_t *fh)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -45,7 +44,7 @@ mca_io_romio_file_open (ompi_communicator_t *comm,
int
mca_io_romio_file_close (ompi_file_t *fh)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -62,7 +61,7 @@ int
mca_io_romio_file_set_size (ompi_file_t *fh,
MPI_Offset size)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -77,7 +76,7 @@ int
mca_io_romio_file_preallocate (ompi_file_t *fh,
MPI_Offset size)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -93,7 +92,7 @@ int
mca_io_romio_file_get_size (ompi_file_t *fh,
MPI_Offset * size)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -109,7 +108,7 @@ int
mca_io_romio_file_get_amode (ompi_file_t *fh,
int *amode)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -125,7 +124,7 @@ int
mca_io_romio_file_set_info (ompi_file_t *fh,
ompi_info_t *info)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -141,7 +140,7 @@ int
mca_io_romio_file_get_info (ompi_file_t *fh,
ompi_info_t ** info_used)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -161,7 +160,7 @@ mca_io_romio_file_set_view (ompi_file_t *fh,
char *datarep,
ompi_info_t *info)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -182,7 +181,7 @@ mca_io_romio_file_get_view (ompi_file_t *fh,
ompi_datatype_t ** filetype,
char *datarep)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -202,7 +201,7 @@ mca_io_romio_file_get_type_extent (ompi_file_t *fh,
ompi_datatype_t *datatype,
MPI_Aint * extent)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -219,7 +218,7 @@ int
mca_io_romio_file_set_atomicity (ompi_file_t *fh,
int flag)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -234,7 +233,7 @@ int
mca_io_romio_file_get_atomicity (ompi_file_t *fh,
int *flag)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -248,7 +247,7 @@ mca_io_romio_file_get_atomicity (ompi_file_t *fh,
int
mca_io_romio_file_sync (ompi_file_t *fh)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -265,7 +264,7 @@ mca_io_romio_file_seek_shared (ompi_file_t *fh,
MPI_Offset offset,
int whence)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -281,7 +280,7 @@ int
mca_io_romio_file_get_position_shared (ompi_file_t *fh,
MPI_Offset * offset)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -298,7 +297,7 @@ mca_io_romio_file_seek (ompi_file_t *fh,
MPI_Offset offset,
int whence)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -314,7 +313,7 @@ int
mca_io_romio_file_get_position (ompi_file_t *fh,
MPI_Offset * offset)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;
@ -331,7 +330,7 @@ mca_io_romio_file_get_byte_offset (ompi_file_t *fh,
MPI_Offset offset,
MPI_Offset * disp)
{
int ret;
int ret;
mca_io_romio_data_t *data;
data = (mca_io_romio_data_t *) fh->f_io_selected_data;

Просмотреть файл

@ -81,6 +81,9 @@ mca_io_romio_file_iread_at (ompi_file_t *fh,
ret =
ROMIO_PREFIX(MPI_File_iread_at) (data->romio_fh, offset, buf, count,
datatype, &req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;
@ -146,6 +149,9 @@ mca_io_romio_file_iread (ompi_file_t *fh,
ret =
ROMIO_PREFIX(MPI_File_iread) (data->romio_fh, buf, count, datatype,
&req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;
@ -190,6 +196,9 @@ mca_io_romio_file_iread_shared (ompi_file_t *fh,
ret =
ROMIO_PREFIX(MPI_File_iread_shared) (data->romio_fh, buf, count,
datatype, &req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;

Просмотреть файл

@ -1,83 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "file/file.h"
#include "request/request.h"
#include "io_romio.h"
#include "mca/io/base/io_base_request.h"
/*
* JMS This function is pretty incomplete and depends on discussions
* with Tim about asynchronous progress. Keep watching this space...
*/
int
mca_io_romio_test (mca_io_base_request_t * request,
int *flag,
ompi_status_public_t * status)
{
#if 0
int ret;
mca_io_romio_request_t *rq;
ROMIO_PREFIX(MPIO_Request) romio_rq;
rq = (mca_io_romio_request_t *) (*request);
romio_rq = rq->romio_rq;
OMPI_THREAD_LOCK (&mca_io_romio_mutex);
ret = ROMIO_PREFIX(MPIO_Test)(&romio_rq, flag, status);
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
if (*flag) {
free (*request);
*request = MPI_REQUEST_NULL;
}
return ret;
#else
return OMPI_ERR_NOT_IMPLEMENTED;
#endif
}
/*
* JMS This function is pretty incomplete and depends on discussions
* with Tim about asynchronous progress. Keep watching this space...
*/
int
mca_io_romio_wait (mca_io_base_request_t * request,
ompi_status_public_t * status)
{
#if 0
int ret;
mca_io_romio_request_t *rq;
ROMIO_PREFIX(MPIO_Request) romio_rq;
rq = (mca_io_romio_request_t *) (*request);
romio_rq = rq->romio_rq;
OMPI_THREAD_LOCK (&mca_io_romio_mutex);
ret = ROMIO_PREFIX(MPIO_Wait)(&romio_rq, status);
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
free (*request);
*request = MPI_REQUEST_NULL;
return ret;
#else
return OMPI_ERR_NOT_IMPLEMENTED;
#endif
}

Просмотреть файл

@ -83,6 +83,9 @@ mca_io_romio_file_iwrite_at (ompi_file_t *fh,
ret =
ROMIO_PREFIX(MPI_File_iwrite_at) (data->romio_fh, offset, buf, count,
datatype, &req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;
@ -149,7 +152,11 @@ mca_io_romio_file_iwrite (ompi_file_t *fh,
ret =
ROMIO_PREFIX(MPI_File_iwrite) (data->romio_fh, buf, count, datatype,
&req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;
}
@ -191,6 +198,9 @@ mca_io_romio_file_iwrite_shared (ompi_file_t *fh,
ret =
ROMIO_PREFIX(MPI_File_iwrite_shared) (data->romio_fh, buf, count,
datatype, &req->romio_rq);
if (MPI_SUCCESS == ret) {
MCA_IO_ROMIO_REQUEST_ADD(request);
}
OMPI_THREAD_UNLOCK (&mca_io_romio_mutex);
return ret;

Просмотреть файл

@ -19,12 +19,6 @@
#include "io_romio.h"
/*
* Global ROMIO mutex because ROMIO is not thread safe
*/
ompi_mutex_t mca_io_romio_mutex;
/*
* Global function that does not need to be prototyped in a header
* because ROMIO just expects this function to exist
@ -33,31 +27,21 @@ int MPIR_Status_set_bytes(ompi_status_public_t *status,
ompi_datatype_t *datatype, int size);
/*
* Private functions
*/
static int io_romio_request_init(mca_io_base_modules_t *module_union,
mca_io_base_request_t *request);
/*
* The ROMIO module operations
*/
mca_io_base_module_1_0_0_t mca_io_romio_module = {
/* Max number of requests in the request cache */
/* Once-per-process request init / finalize functions */
32,
/* Additional number of bytes required for this component's
requests */
sizeof(mca_io_romio_request_t) - sizeof(mca_io_base_request_t),
/* Request init / finalize functions */
io_romio_request_init,
NULL,
NULL,
/* Finalize, free, cancel */
mca_io_romio_request_fini,
mca_io_romio_request_free,
mca_io_romio_request_cancel,
/* Back end to MPI API calls (pretty much a 1-to-1 mapping) */
@ -121,11 +105,7 @@ mca_io_base_module_1_0_0_t mca_io_romio_module = {
/* Sync/atomic IO operations */
mca_io_romio_file_set_atomicity,
mca_io_romio_file_get_atomicity,
mca_io_romio_file_sync,
/* The following two are not add-on for MPI-IO implementations */
mca_io_romio_test,
mca_io_romio_wait
mca_io_romio_file_sync
};
@ -140,14 +120,3 @@ int MPIR_Status_set_bytes(ompi_status_public_t *status,
MPI_Status_set_elements(status, datatype, nbytes);
return MPI_SUCCESS;
}
/*
* One-time initialization of an MPI_Request for this module
*/
static int io_romio_request_init(mca_io_base_modules_t *module_union,
mca_io_base_request_t *request)
{
request->req_ver = MCA_IO_BASE_V_1_0_0;
return OMPI_SUCCESS;
}

41
src/mca/io/romio/src/io_romio_request.c Обычный файл
Просмотреть файл

@ -0,0 +1,41 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "file/file.h"
#include "request/request.h"
#include "io_romio.h"
#include "mca/io/base/io_base_request.h"
int mca_io_romio_request_fini(ompi_request_t **req)
{
return OMPI_SUCCESS;
}
int mca_io_romio_request_free(ompi_request_t **req)
{
return OMPI_SUCCESS;
}
/*
* ROMIO doesn't allow anything to be cancelled
*/
int mca_io_romio_request_cancel(ompi_request_t *req, int flag)
{
return OMPI_ERROR;
}

Просмотреть файл

@ -68,6 +68,10 @@ int MPI_File_iread(MPI_File fh, void *buf, int count,
case MCA_IO_BASE_V_1_0_0:
rc = fh->f_io_selected_module.v1_0_0.
io_module_file_iread(fh, buf, count, datatype, io_request);
if (MPI_SUCCESS == rc) {
++ompi_progress_pending_io_reqs;
(*request)->req_state = OMPI_REQUEST_ACTIVE;
}
break;
default:

Просмотреть файл

@ -69,6 +69,10 @@ int MPI_File_iread_at(MPI_File fh, MPI_Offset offset, void *buf,
rc = fh->f_io_selected_module.v1_0_0.
io_module_file_iread_at(fh, offset, buf, count, datatype,
io_request);
if (MPI_SUCCESS == rc) {
++ompi_progress_pending_io_reqs;
(*request)->req_state = OMPI_REQUEST_ACTIVE;
}
break;
default:

Просмотреть файл

@ -68,6 +68,10 @@ int MPI_File_iread_shared(MPI_File fh, void *buf, int count,
case MCA_IO_BASE_V_1_0_0:
rc = fh->f_io_selected_module.v1_0_0.
io_module_file_iread_shared(fh, buf, count, datatype, io_request);
if (MPI_SUCCESS == rc) {
++ompi_progress_pending_io_reqs;
(*request)->req_state = OMPI_REQUEST_ACTIVE;
}
break;
default:

Просмотреть файл

@ -68,6 +68,10 @@ int MPI_File_iwrite(MPI_File fh, void *buf, int count, MPI_Datatype
case MCA_IO_BASE_V_1_0_0:
rc = fh->f_io_selected_module.v1_0_0.
io_module_file_iwrite(fh, buf, count, datatype, io_request);
if (MPI_SUCCESS == rc) {
++ompi_progress_pending_io_reqs;
(*request)->req_state = OMPI_REQUEST_ACTIVE;
}
break;
default:

Просмотреть файл

@ -70,6 +70,10 @@ int MPI_File_iwrite_at(MPI_File fh, MPI_Offset offset, void *buf,
rc = fh->f_io_selected_module.v1_0_0.
io_module_file_iwrite_at(fh, offset, buf, count, datatype,
io_request);
if (MPI_SUCCESS == rc) {
++ompi_progress_pending_io_reqs;
(*request)->req_state = OMPI_REQUEST_ACTIVE;
}
break;
default:

Просмотреть файл

@ -68,6 +68,10 @@ int MPI_File_iwrite_shared(MPI_File fh, void *buf, int count,
case MCA_IO_BASE_V_1_0_0:
rc = fh->f_io_selected_module.v1_0_0.
io_module_file_iwrite_shared(fh, buf, count, datatype, io_request);
if (MPI_SUCCESS == rc) {
++ompi_progress_pending_io_reqs;
(*request)->req_state = OMPI_REQUEST_ACTIVE;
}
break;
default:

Просмотреть файл

@ -16,29 +16,50 @@
#include <sched.h>
#include "event/event.h"
#include "mca/pml/pml.h"
#include "mca/io/io.h"
#include "runtime/ompi_progress.h"
static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE;
int ompi_progress_pending_io_reqs = false;
void ompi_progress_events(int flag)
{
#if 0
#if 1
if(flag != 0 || ompi_progress_event_flag == OMPI_EVLOOP_ONCE) {
ompi_progress_event_flag = flag;
}
#endif
}
void ompi_progress(void)
{
/* progress any outstanding communications */
int events = 0;
int ret, events = 0;
#if OMPI_HAVE_THREADS == 0
if(ompi_progress_event_flag != 0)
events += ompi_event_loop(ompi_progress_event_flag);
if (ompi_progress_event_flag != 0) {
ret = ompi_event_loop(ompi_progress_event_flag);
if (ret > 0) {
events += ret;
}
}
#endif
events += mca_pml.pml_progress();
ret = mca_pml.pml_progress();
if (ret > 0) {
events += ret;
}
/* Progress IO requests, if there are any */
if (ompi_progress_pending_io_reqs > 0) {
ret = mca_io_base_progress();
if (ret > 0) {
events += ret;
ompi_progress_pending_io_reqs -= ret;
}
}
#if 0
/* TSW - disable this until can validate that it doesn't impact SMP
@ -50,8 +71,9 @@ void ompi_progress(void)
* the processor is oversubscribed - this will result in a best-case
* latency equivalent to the time-slice.
*/
if(events == 0)
if(events == 0) {
sched_yield();
}
#endif
}

Просмотреть файл

@ -18,6 +18,8 @@
extern "C" {
#endif
OMPI_DECLSPEC extern int ompi_progress_pending_io_reqs;
OMPI_DECLSPEC extern void ompi_progress_events(int);
OMPI_DECLSPEC extern void ompi_progress(void);