1
1
openmpi/ompi/mca/io/romio321/romio/test/aggregation1.c
Gilles Gouaillardet 2f391a99a7 ROMIO 3.2.1 refresh: import romio from mpich 3.2.1 tarball
Signed-off-by: Gilles Gouaillardet <gilles@rist.or.jp>
2018-06-20 14:28:14 +09:00

267 строки
7.1 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2007 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
/* Test case from John Bent (ROMIO req #835)
* Aggregation code was not handling certain access patterns when collective
* buffering forced */
#define _XOPEN_SOURCE 500 /* strdup not in string.h otherwsie */
#include <unistd.h>
#include <stdlib.h>
#include <mpi.h>
#include <stdio.h>
#include <string.h>
#define NUM_OBJS 4
#define OBJ_SIZE 1048576
extern char *optarg;
extern int optind, opterr, optopt;
char *prog = NULL;
int debug = 0;
static void
Usage( int line ) {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if ( rank == 0 ) {
fprintf( stderr,
"Usage (line %d): %s [-d] [-h] -f filename\n"
"\t-d for debugging\n"
"\t-h to turn on the hints to force collective aggregation\n",
line, prog );
}
exit( 0 );
}
static void
fatal_error( int mpi_ret, MPI_Status *mpi_stat, const char *msg ) {
fprintf( stderr, "Fatal error %s: %d\n", msg, mpi_ret );
MPI_Abort( MPI_COMM_WORLD, -1 );
}
static void
print_hints( int rank, MPI_File *mfh ) {
MPI_Info info;
int nkeys;
int i, dummy_int;
char key[1024];
char value[1024];
MPI_Barrier( MPI_COMM_WORLD );
if ( rank == 0 ) {
MPI_File_get_info( *mfh, &info );
MPI_Info_get_nkeys( info, &nkeys );
printf( "HINTS:\n" );
for( i = 0; i < nkeys; i++ ) {
MPI_Info_get_nthkey( info, i, key );
printf( "%35s -> ", key );
MPI_Info_get( info, key, 1024, value, &dummy_int );
printf( "%s\n", value );
}
MPI_Info_free(&info);
}
MPI_Barrier( MPI_COMM_WORLD );
}
static void
fill_buffer( char *buffer, int bufsize, int rank, MPI_Offset offset ) {
memset( (void*)buffer, 0, bufsize );
snprintf( buffer, bufsize, "Hello from %d at %lld\n", rank, offset );
}
static MPI_Offset
get_offset( int rank, int num_objs, int obj_size, int which_obj ) {
MPI_Offset offset;
offset = (MPI_Offset)rank * num_objs * obj_size + which_obj * obj_size;
return offset;
}
static void
write_file( char *target, int rank, MPI_Info *info ) {
MPI_File wfh;
MPI_Status mpi_stat;
int mpi_ret;
int i;
char *buffer;
buffer = malloc(OBJ_SIZE);
if ( debug ) printf( "%d writing file %s\n", rank, target );
if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
MPI_MODE_WRONLY | MPI_MODE_CREATE, *info, &wfh ) )
!= MPI_SUCCESS )
{
fatal_error( mpi_ret, NULL, "open for write" );
}
for( i = 0; i < NUM_OBJS; i++ ) {
MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i );
fill_buffer( buffer, OBJ_SIZE, rank, offset );
if ( debug ) printf( "%s", buffer );
if ( (mpi_ret = MPI_File_write_at_all( wfh, offset, buffer, OBJ_SIZE,
MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS )
{
fatal_error( mpi_ret, &mpi_stat, "write" );
}
}
if ( debug ) print_hints( rank, &wfh );
if( (mpi_ret = MPI_File_close( &wfh ) ) != MPI_SUCCESS ) {
fatal_error( mpi_ret, NULL, "close for write" );
}
if ( debug ) printf( "%d wrote file %s\n", rank, target );
free(buffer);
}
static int
reduce_corruptions( int corrupt_blocks ) {
int mpi_ret;
int sum;
if ( ( mpi_ret = MPI_Reduce( &corrupt_blocks, &sum, 1,
MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD ) ) != MPI_SUCCESS )
{
fatal_error( mpi_ret, NULL, "MPI_Reduce" );
}
return sum;
}
static void
read_file( char *target, int rank, MPI_Info *info, int *corrupt_blocks ) {
MPI_File rfh;
MPI_Status mpi_stat;
int mpi_ret;
int i;
char *buffer;
char *verify_buf = NULL;
buffer = malloc(OBJ_SIZE);
verify_buf = (char *)malloc(OBJ_SIZE);
if ( debug ) printf( "%d reading file %s\n", rank, target );
if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
MPI_MODE_RDONLY, *info, &rfh ) ) != MPI_SUCCESS )
{
fatal_error( mpi_ret, NULL, "open for read" );
}
for( i = 0; i < NUM_OBJS; i++ ) {
MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i );
fill_buffer( verify_buf, OBJ_SIZE, rank, offset );
if ( debug ) printf( "Expecting %s", buffer );
if ( (mpi_ret = MPI_File_read_at_all( rfh, offset, buffer, OBJ_SIZE,
MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS )
{
fatal_error( mpi_ret, &mpi_stat, "read" );
}
if ( memcmp( verify_buf, buffer, OBJ_SIZE ) != 0 ) {
(*corrupt_blocks)++;
printf( "Corruption at %lld\n", offset );
if ( debug ) {
printf( "\tExpecting %s\n"
"\tRecieved %s\n",
verify_buf, buffer );
}
}
}
if( (mpi_ret = MPI_File_close( &rfh ) ) != MPI_SUCCESS ) {
fatal_error( mpi_ret, NULL, "close for read" );
}
free (buffer);
free(verify_buf);
}
static void
set_hints( MPI_Info *info ) {
MPI_Info_set( *info, "romio_cb_write", "enable" );
MPI_Info_set( *info, "romio_no_indep_rw", "1" );
MPI_Info_set( *info, "cb_nodes", "1" );
MPI_Info_set( *info, "cb_buffer_size", "4194304" );
}
/*
void
set_hints( MPI_Info *info, char *hints ) {
char *delimiter = " ";
char *hints_cp = strdup( hints );
char *key = strtok( hints_cp, delimiter );
char *val;
while( key ) {
val = strtok( NULL, delimiter );
if ( debug ) printf( "HINT: %s = %s\n", key, val );
if ( ! val ) {
Usage( __LINE__ );
}
MPI_Info_set( *info, key, val );
key = strtok( NULL, delimiter );
}
free( hints_cp );
}
*/
int
main( int argc, char *argv[] ) {
int nproc = 1, rank = 0;
char *target = NULL;
int c;
MPI_Info info;
int mpi_ret;
int corrupt_blocks = 0;
MPI_Init( &argc, &argv );
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if( (mpi_ret = MPI_Info_create(&info)) != MPI_SUCCESS) {
if(rank == 0) fatal_error( mpi_ret, NULL, "MPI_info_create.\n");
}
prog = strdup( argv[0] );
while( ( c = getopt( argc, argv, "df:h" ) ) != EOF ) {
switch( c ) {
case 'd':
debug = 1;
break;
case 'f':
target = strdup( optarg );
break;
case 'h':
set_hints( &info );
break;
default:
Usage( __LINE__ );
}
}
if ( ! target ) {
Usage( __LINE__ );
}
write_file( target, rank, &info );
read_file( target, rank, &info, &corrupt_blocks );
corrupt_blocks = reduce_corruptions( corrupt_blocks );
if ( rank == 0 ) {
if (corrupt_blocks == 0) {
fprintf(stdout, " No Errors\n");
} else {
fprintf(stdout, "%d/%d blocks corrupt\n",
corrupt_blocks, nproc * NUM_OBJS );
}
}
MPI_Info_free(&info);
MPI_Finalize();
free(prog);
exit( 0 );
}