1
1
openmpi/ompi/mca/io/ompio/io_ompio_component.c
Vishwanath Venkatesan 9eeb3b5d50 # Extracting timing information for individual components of collective algorithm using a generic queue.
# This is triggered based on a mca-paramater and can be used with all collective modules.
# Individual queues maintained for read and write.
# The additional communication to combine data is done at file-close so that the 
  actual timing of collective-operations will not get affected. 
# The queues are initialized in file-open

This commit was SVN r27439.
2012-10-11 21:14:07 +00:00

301 строка
9.0 KiB
C

/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "opal/class/opal_list.h"
#include "opal/threads/mutex.h"
#include "opal/mca/base/base.h"
#include "ompi/mca/io/io.h"
#include "io_ompio.h"
int mca_io_ompio_cycle_buffer_size = OMPIO_PREALLOC_MAX_BUF_SIZE;
int mca_io_ompio_bytes_per_agg = OMPIO_PREALLOC_MAX_BUF_SIZE;
int mca_io_ompio_record_offset_info = 0;
int mca_io_ompio_coll_timing_info = 0;
/*
* Private functions
*/
static int open_component(void);
static int close_component(void);
static int init_query(bool enable_progress_threads,
bool enable_mpi_threads);
static const struct mca_io_base_module_2_0_0_t *
file_query (struct ompi_file_t *file,
struct mca_io_base_file_t **private_data,
int *priority);
static int file_unquery(struct ompi_file_t *file,
struct mca_io_base_file_t *private_data);
static int delete_query(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t **private_data,
bool *usable, int *priorty);
static int delete_select(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t *private_data);
/*
static int io_progress(void);
static int register_datarep(char *,
MPI_Datarep_conversion_function*,
MPI_Datarep_conversion_function*,
MPI_Datarep_extent_function*,
void*);
*/
/*
* Private variables
*/
static int priority_param = 10;
static int delete_priority_param = 10;
/*
* Global, component-wide OMPIO mutex because OMPIO is not thread safe
*/
opal_mutex_t mca_io_ompio_mutex;
/*
* Global list of requests for this component
*/
opal_list_t mca_io_ompio_pending_requests;
/*
* Public string showing this component's version number
*/
const char *mca_io_ompio_component_version_string =
"OMPI/MPI OMPIO io MCA component version " OMPI_VERSION;
mca_io_base_component_2_0_0_t mca_io_ompio_component = {
/* First, the mca_base_component_t struct containing meta information
about the component itself */
{
MCA_IO_BASE_VERSION_2_0_0,
"ompio",
OMPI_MAJOR_VERSION,
OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION,
open_component,
close_component,
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
/* Initial configuration / Open a new file */
init_query,
file_query,
file_unquery,
/* Delete a file */
delete_query,
NULL, /* delete_unquery */
delete_select, /* delete_select */
NULL /* io_register_datarep */
};
static int open_component(void)
{
int param;
param = mca_base_param_find ("io", NULL, "ompio_cycle_buffer_size");
if (param >= 0) {
mca_base_param_lookup_int (param, &mca_io_ompio_cycle_buffer_size);
}
param = mca_base_param_find ("io", NULL, "ompio_bytes_per_agg");
if (param >= 0) {
mca_base_param_lookup_int (param, &mca_io_ompio_bytes_per_agg);
}
priority_param =
mca_base_param_reg_int(&mca_io_ompio_component.io_version,
"priority",
"Priority of the io ompio component",
false, false, priority_param, NULL);
delete_priority_param =
mca_base_param_reg_int(&mca_io_ompio_component.io_version,
"delete_priority",
"Delete priority of the io ompio component",
false, false, delete_priority_param, NULL);
mca_base_param_reg_string(&mca_io_ompio_component.io_version,
"version",
"Version of OMPIO",
false, true, NULL, NULL);
mca_base_param_reg_int (&mca_io_ompio_component.io_version,
"record_file_offset_info",
"The information of the file offset/length",
false, false, mca_io_ompio_record_offset_info,
&mca_io_ompio_record_offset_info);
mca_base_param_reg_int (&mca_io_ompio_component.io_version,
"coll_timing_info",
"Enable collective algorithm timing information",
false, false, mca_io_ompio_coll_timing_info,
&mca_io_ompio_coll_timing_info);
mca_base_param_reg_int (&mca_io_ompio_component.io_version,
"cycle_buffer_size",
"Cycle Buffer Size of individual reads/writes",
false, false, mca_io_ompio_cycle_buffer_size,
&mca_io_ompio_cycle_buffer_size);
mca_base_param_reg_int (&mca_io_ompio_component.io_version,
"bytes_per_agg",
"Bytes per aggregator process for automatic selection",
false, false, mca_io_ompio_bytes_per_agg,
&mca_io_ompio_bytes_per_agg);
/*
mca_base_param_reg_string(&mca_io_ompio_component.io_version,
"user_configure_params",
"User-specified command line parameters passed to OMPIO's configure script",
false, true,
MCA_io_ompio_USER_CONFIGURE_FLAGS, NULL);
mca_base_param_reg_string(&mca_io_ompio_component.io_version,
"complete_configure_params",
"Complete set of command line parameters passed to OMPIO's configure script",
false, true,
MCA_io_ompio_COMPLETE_CONFIGURE_FLAGS, NULL);
*/
/* Create the mutex */
OBJ_CONSTRUCT(&mca_io_ompio_mutex, opal_mutex_t);
/* Create the list of pending requests */
OBJ_CONSTRUCT(&mca_io_ompio_pending_requests, opal_list_t);
return OMPI_SUCCESS;
}
static int close_component(void)
{
/* Destroy the list of pending requests */
/* JMS: Good opprotunity here to list out all the IO requests that
were not destroyed / completed upon MPI_FINALIZE */
OBJ_DESTRUCT(&mca_io_ompio_pending_requests);
OBJ_DESTRUCT(&mca_io_ompio_mutex);
return OMPI_SUCCESS;
}
static int init_query(bool enable_progress_threads,
bool enable_mpi_threads)
{
return OMPI_SUCCESS;
}
static const struct mca_io_base_module_2_0_0_t *
file_query(struct ompi_file_t *file,
struct mca_io_base_file_t **private_data,
int *priority)
{
mca_io_ompio_data_t *data;
/* Lookup our priority */
if (OMPI_SUCCESS != mca_base_param_lookup_int(priority_param,
priority)) {
return NULL;
}
/* Allocate a space for this module to hang private data (e.g.,
the OMPIO file handle) */
data = malloc(sizeof(mca_io_ompio_data_t));
if (NULL == data) {
return NULL;
}
*private_data = (struct mca_io_base_file_t*) data;
/* All done */
return &mca_io_ompio_module;
}
static int file_unquery(struct ompi_file_t *file,
struct mca_io_base_file_t *private_data)
{
/* Free the ompio module-specific data that was allocated in
_file_query(), above */
if (NULL != private_data) {
free(private_data);
}
return OMPI_SUCCESS;
}
static int delete_query(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t **private_data,
bool *usable, int *priority)
{
/* Lookup our priority */
if (OMPI_SUCCESS != mca_base_param_lookup_int(delete_priority_param,
priority)) {
return OMPI_ERROR;
}
*usable = true;
*private_data = NULL;
return OMPI_SUCCESS;
}
static int delete_select(char *filename, struct ompi_info_t *info,
struct mca_io_base_delete_t *private_data)
{
int ret;
OPAL_THREAD_LOCK (&mca_io_ompio_mutex);
ret = mca_io_ompio_file_delete (filename, info);
OPAL_THREAD_UNLOCK (&mca_io_ompio_mutex);
return ret;
}
/*
static int io_progress (void)
{
return OMPI_SUCCESS;
}
*/