From d282e94b676b52847cc666cd41cb02b67ff7fc4e Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Tue, 12 Jan 2016 16:03:25 -0600 Subject: [PATCH] add the new dynamic_gen2 component, designed to coexist for now with the original dynamic component --- ompi/mca/fcoll/dynamic_gen2/Makefile.am | 47 + .../fcoll/dynamic_gen2/fcoll_dynamic_gen2.h | 69 + .../fcoll_dynamic_gen2_component.c | 90 + .../fcoll_dynamic_gen2_file_read_all.c | 1074 ++++++++++++ .../fcoll_dynamic_gen2_file_write_all.c | 1455 +++++++++++++++++ .../dynamic_gen2/fcoll_dynamic_gen2_module.c | 90 + ompi/mca/fcoll/dynamic_gen2/owner.txt | 7 + 7 files changed, 2832 insertions(+) create mode 100644 ompi/mca/fcoll/dynamic_gen2/Makefile.am create mode 100644 ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h create mode 100644 ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_component.c create mode 100644 ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c create mode 100644 ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c create mode 100644 ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_module.c create mode 100644 ompi/mca/fcoll/dynamic_gen2/owner.txt diff --git a/ompi/mca/fcoll/dynamic_gen2/Makefile.am b/ompi/mca/fcoll/dynamic_gen2/Makefile.am new file mode 100644 index 0000000000..f4910ac5e9 --- /dev/null +++ b/ompi/mca/fcoll/dynamic_gen2/Makefile.am @@ -0,0 +1,47 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2008-2015 University of Houston. All rights reserved. +# Copyright (c) 2012 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + fcoll_dynamic_gen2.h \ + fcoll_dynamic_gen2_module.c \ + fcoll_dynamic_gen2_component.c \ + fcoll_dynamic_gen2_file_read_all.c \ + fcoll_dynamic_gen2_file_write_all.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_ompi_fcoll_dynamic_gen2_DSO +component_noinst = +component_install = mca_fcoll_dynamic_gen2.la +else +component_noinst = libmca_fcoll_dynamic_gen2.la +component_install = +endif + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_fcoll_dynamic_gen2_la_SOURCES = $(sources) +mca_fcoll_dynamic_gen2_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_fcoll_dynamic_gen2_la_SOURCES =$(sources) +libmca_fcoll_dynamic_gen2_la_LDFLAGS = -module -avoid-version diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h new file mode 100644 index 0000000000..55fe211a10 --- /dev/null +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008-2014 University of Houston. All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_FCOLL_DYNAMIC_EXPORT_H +#define MCA_FCOLL_DYNAMIC_EXPORT_H + +#include "ompi_config.h" + +#include "mpi.h" +#include "ompi/mca/mca.h" +#include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/base.h" +#include "ompi/mca/io/ompio/io_ompio.h" + +BEGIN_C_DECLS + +/* Globally exported variables */ + +extern int mca_fcoll_dynamic_gen2_priority; + +OMPI_MODULE_DECLSPEC extern mca_fcoll_base_component_2_0_0_t mca_fcoll_dynamic_gen2_component; + +/* API functions */ + +int mca_fcoll_dynamic_gen2_component_init_query(bool enable_progress_threads, + bool enable_mpi_threads); +struct mca_fcoll_base_module_1_0_0_t * +mca_fcoll_dynamic_gen2_component_file_query (mca_io_ompio_file_t *fh, int *priority); + +int mca_fcoll_dynamic_gen2_component_file_unquery (mca_io_ompio_file_t *file); + +int mca_fcoll_dynamic_gen2_module_init (mca_io_ompio_file_t *file); +int mca_fcoll_dynamic_gen2_module_finalize (mca_io_ompio_file_t *file); + +int mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh, + void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_status_public_t * status); + + +int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, + const void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_status_public_t * status); + + +END_C_DECLS + +#endif /* MCA_FCOLL_DYNAMIC_EXPORT_H */ diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_component.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_component.c new file mode 100644 index 0000000000..118c1c294e --- /dev/null +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_component.c @@ -0,0 +1,90 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2008-2014 University of Houston. All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * These symbols are in a file by themselves to provide nice linker + * semantics. Since linkers generally pull in symbols by object + * files, keeping these symbols as the only symbols in this file + * prevents utility programs such as "ompi_info" from having to import + * entire components just to query their version and parameters. + */ + +#include "ompi_config.h" +#include "fcoll_dynamic_gen2.h" +#include "mpi.h" + +/* + * Public string showing the fcoll ompi_dynamic_gen2 component version number + */ +const char *mca_fcoll_dynamic_gen2_component_version_string = + "Open MPI dynamic_gen2 collective MCA component version " OMPI_VERSION; + +/* + * Global variables + */ +int mca_fcoll_dynamic_gen2_priority = 10; + +/* + * Local function + */ +static int dynamic_gen2_register(void); + +/* + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ +mca_fcoll_base_component_2_0_0_t mca_fcoll_dynamic_gen2_component = { + + /* First, the mca_component_t struct containing meta information + * about the component itself */ + + .fcollm_version = { + MCA_FCOLL_BASE_VERSION_2_0_0, + + /* Component name and version */ + .mca_component_name = "dynamic_gen2", + MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION), + .mca_register_component_params = dynamic_gen2_register, + }, + .fcollm_data = { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + }, + + .fcollm_init_query = mca_fcoll_dynamic_gen2_component_init_query, + .fcollm_file_query = mca_fcoll_dynamic_gen2_component_file_query, + .fcollm_file_unquery = mca_fcoll_dynamic_gen2_component_file_unquery, +}; + + +static int +dynamic_gen2_register(void) +{ + mca_fcoll_dynamic_gen2_priority = 10; + (void) mca_base_component_var_register(&mca_fcoll_dynamic_gen2_component.fcollm_version, + "priority", "Priority of the dynamic_gen2 fcoll component", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_priority); + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c new file mode 100644 index 0000000000..f34858ed34 --- /dev/null +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c @@ -0,0 +1,1074 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008-2015 University of Houston. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "fcoll_dynamic_gen2.h" + +#include "mpi.h" +#include "ompi/constants.h" +#include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/io/ompio/io_ompio.h" +#include "ompi/mca/io/io.h" +#include "math.h" +#include "ompi/mca/pml/pml.h" +#include + +#define DEBUG_ON 0 + +/*Used for loading file-offsets per aggregator*/ +typedef struct mca_io_ompio_local_io_array{ + OMPI_MPI_OFFSET_TYPE offset; + MPI_Aint length; + int process_id; +}mca_io_ompio_local_io_array; + + +static int read_heap_sort (mca_io_ompio_local_io_array *io_array, + int num_entries, + int *sorted); + + + +int +mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh, + void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_status_public_t *status) +{ + MPI_Aint position = 0; + MPI_Aint total_bytes = 0; /* total bytes to be read */ + MPI_Aint bytes_to_read_in_cycle = 0; /* left to be read in a cycle*/ + MPI_Aint bytes_per_cycle = 0; /* total read in each cycle by each process*/ + int index = 0, ret=OMPI_SUCCESS; + int cycles = 0; + int i=0, j=0, l=0; + int n=0; /* current position in total_bytes_per_process array */ + MPI_Aint bytes_remaining = 0; /* how many bytes have been read from the current + value from total_bytes_per_process */ + int *sorted_file_offsets=NULL, entries_per_aggregator=0; + int bytes_received = 0; + int blocks = 0; + /* iovec structure and count of the buffer passed in */ + uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; + int iov_index = 0; + size_t current_position = 0; + struct iovec *local_iov_array=NULL, *global_iov_array=NULL; + char *receive_buf = NULL; + MPI_Aint *memory_displacements=NULL; + /* global iovec at the readers that contain the iovecs created from + file_set_view */ + uint32_t total_fview_count = 0; + int local_count = 0; + int *fview_count = NULL, *disp_index=NULL, *temp_disp_index=NULL; + int current_index=0, temp_index=0; + int **blocklen_per_process=NULL; + MPI_Aint **displs_per_process=NULL; + char *global_buf = NULL; + MPI_Aint global_count = 0; + mca_io_ompio_local_io_array *file_offsets_for_agg=NULL; + + /* array that contains the sorted indices of the global_iov */ + int *sorted = NULL; + int *displs = NULL; + int dynamic_gen2_num_io_procs; + size_t max_data = 0; + MPI_Aint *total_bytes_per_process = NULL; + ompi_datatype_t **sendtype = NULL; + MPI_Request *send_req=NULL, recv_req=NULL; + int my_aggregator =-1; + bool recvbuf_is_contiguous=false; + size_t ftype_size; + OPAL_PTRDIFF_TYPE ftype_extent, lb; + + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0; + double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0; + double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0; + mca_io_ompio_print_entry nentry; +#endif + + /************************************************************************** + ** 1. In case the data is not contigous in memory, decode it into an iovec + **************************************************************************/ + + opal_datatype_type_size ( &datatype->super, &ftype_size ); + opal_datatype_get_extent ( &datatype->super, &lb, &ftype_extent ); + + if ( (ftype_extent == (OPAL_PTRDIFF_TYPE) ftype_size) && + opal_datatype_is_contiguous_memory_layout(&datatype->super,1) && + 0 == lb ) { + recvbuf_is_contiguous = true; + } + + + if (! recvbuf_is_contiguous ) { + ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *)fh, + datatype, + count, + buf, + &max_data, + &decoded_iov, + &iov_count); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + else { + max_data = count * datatype->super.size; + } + + if ( MPI_STATUS_IGNORE != status ) { + status->_ucount = max_data; + } + + fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs); + ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh, + dynamic_gen2_num_io_procs, + max_data); + if (OMPI_SUCCESS != ret){ + goto exit; + } + my_aggregator = fh->f_procs_in_group[fh->f_aggregator_index]; + + /************************************************************************** + ** 2. Determine the total amount of data to be written + **************************************************************************/ + total_bytes_per_process = (MPI_Aint*)malloc(fh->f_procs_per_group*sizeof(MPI_Aint)); + if (NULL == total_bytes_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rcomm_time = MPI_Wtime(); +#endif + ret = fh->f_allgather_array (&max_data, + 1, + MPI_LONG, + total_bytes_per_process, + 1, + MPI_LONG, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + if (OMPI_SUCCESS != ret){ + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif + + for (i=0 ; if_procs_per_group ; i++) { + total_bytes += total_bytes_per_process[i]; + } + + if (NULL != total_bytes_per_process) { + free (total_bytes_per_process); + total_bytes_per_process = NULL; + } + + /********************************************************************* + *** 3. Generate the File offsets/lengths corresponding to this write + ********************************************************************/ + ret = fh->f_generate_current_file_view ((struct mca_io_ompio_file_t *) fh, + max_data, + &local_iov_array, + &local_count); + + if (ret != OMPI_SUCCESS){ + goto exit; + } + + /************************************************************* + *** 4. Allgather the File View information at all processes + *************************************************************/ + + fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int)); + if (NULL == fview_count) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rcomm_time = MPI_Wtime(); +#endif + ret = fh->f_allgather_array (&local_count, + 1, + MPI_INT, + fview_count, + 1, + MPI_INT, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + + if (OMPI_SUCCESS != ret){ + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif + + displs = (int*)malloc (fh->f_procs_per_group*sizeof(int)); + if (NULL == displs) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + displs[0] = 0; + total_fview_count = fview_count[0]; + for (i=1 ; if_procs_per_group ; i++) { + total_fview_count += fview_count[i]; + displs[i] = displs[i-1] + fview_count[i-1]; + } + +#if DEBUG_ON + if (my_aggregator == fh->f_rank) { + for (i=0 ; if_procs_per_group ; i++) { + printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n", + fh->f_rank, + i, + fview_count[i], + displs[i]); +} +} +#endif + + /* allocate the global iovec */ + if (0 != total_fview_count) { + global_iov_array = (struct iovec*)malloc (total_fview_count * + sizeof(struct iovec)); + if (NULL == global_iov_array) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rcomm_time = MPI_Wtime(); +#endif + ret = fh->f_allgatherv_array (local_iov_array, + local_count, + fh->f_iov_type, + global_iov_array, + fview_count, + displs, + fh->f_iov_type, + fh->f_aggregator_index, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + + if (OMPI_SUCCESS != ret){ + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif + + /**************************************************************************************** + *** 5. Sort the global offset/lengths list based on the offsets. + *** The result of the sort operation is the 'sorted', an integer array, + *** which contains the indexes of the global_iov_array based on the offset. + *** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset + *** in the file, and that one is followed by global_iov_array[z].offset, than + *** sorted[0] = x, sorted[1]=y and sorted[2]=z; + ******************************************************************************************/ + if (0 != total_fview_count) { + sorted = (int *)malloc (total_fview_count * sizeof(int)); + if (NULL == sorted) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + fh->f_sort_iovec (global_iov_array, total_fview_count, sorted); + } + + if (NULL != local_iov_array) { + free (local_iov_array); + local_iov_array = NULL; + } + +#if DEBUG_ON + if (my_aggregator == fh->f_rank) { + for (i=0 ; if_rank, + global_iov_array[sorted[i]].iov_base, + global_iov_array[sorted[i]].iov_len); + } + } +#endif + + /************************************************************* + *** 6. Determine the number of cycles required to execute this + *** operation + *************************************************************/ + fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle); + cycles = ceil((double)total_bytes/bytes_per_cycle); + + if ( my_aggregator == fh->f_rank) { + disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int)); + if (NULL == disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + blocklen_per_process = (int **)malloc (fh->f_procs_per_group * sizeof (int*)); + if (NULL == blocklen_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + displs_per_process = (MPI_Aint **)malloc (fh->f_procs_per_group * sizeof (MPI_Aint*)); + if (NULL == displs_per_process){ + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + for (i=0;if_procs_per_group;i++){ + blocklen_per_process[i] = NULL; + displs_per_process[i] = NULL; + } + + send_req = (MPI_Request *) malloc (fh->f_procs_per_group * sizeof(MPI_Request)); + if (NULL == send_req){ + opal_output ( 1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + global_buf = (char *) malloc (bytes_per_cycle); + if (NULL == global_buf){ + opal_output(1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + sendtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *)); + if (NULL == sendtype) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + for(l=0;lf_procs_per_group;l++){ + sendtype[l] = MPI_DATATYPE_NULL; + } + } + + + + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rexch = MPI_Wtime(); +#endif + n = 0; + bytes_remaining = 0; + current_index = 0; + + for (index = 0; index < cycles; index++) { + /********************************************************************** + *** 7a. Getting ready for next cycle: initializing and freeing buffers + **********************************************************************/ + if (my_aggregator == fh->f_rank) { + if (NULL != fh->f_io_array) { + free (fh->f_io_array); + fh->f_io_array = NULL; + } + fh->f_num_of_io_entries = 0; + + if (NULL != sendtype){ + for (i =0; i< fh->f_procs_per_group; i++) { + if ( MPI_DATATYPE_NULL != sendtype[i] ) { + ompi_datatype_destroy(&sendtype[i]); + sendtype[i] = MPI_DATATYPE_NULL; + } + } + } + + for(l=0;lf_procs_per_group;l++){ + disp_index[l] = 1; + + if (NULL != blocklen_per_process[l]){ + free(blocklen_per_process[l]); + blocklen_per_process[l] = NULL; + } + if (NULL != displs_per_process[l]){ + free(displs_per_process[l]); + displs_per_process[l] = NULL; + } + blocklen_per_process[l] = (int *) calloc (1, sizeof(int)); + if (NULL == blocklen_per_process[l]) { + opal_output (1, "OUT OF MEMORY for blocklen\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + displs_per_process[l] = (MPI_Aint *) calloc (1, sizeof(MPI_Aint)); + if (NULL == displs_per_process[l]){ + opal_output (1, "OUT OF MEMORY for displs\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + + if (NULL != sorted_file_offsets){ + free(sorted_file_offsets); + sorted_file_offsets = NULL; + } + + if(NULL != file_offsets_for_agg){ + free(file_offsets_for_agg); + file_offsets_for_agg = NULL; + } + if (NULL != memory_displacements){ + free(memory_displacements); + memory_displacements = NULL; + } + } /* (my_aggregator == fh->f_rank */ + + /************************************************************************** + *** 7b. Determine the number of bytes to be actually read in this cycle + **************************************************************************/ + if (cycles-1 == index) { + bytes_to_read_in_cycle = total_bytes - bytes_per_cycle*index; + } + else { + bytes_to_read_in_cycle = bytes_per_cycle; + } + +#if DEBUG_ON + if (my_aggregator == fh->f_rank) { + printf ("****%d: CYCLE %d Bytes %d**********\n", + fh->f_rank, + index, + bytes_to_write_in_cycle); + } +#endif + + /***************************************************************** + *** 7c. Calculate how much data will be contributed in this cycle + *** by each process + *****************************************************************/ + bytes_received = 0; + + while (bytes_to_read_in_cycle) { + /* This next block identifies which process is the holder + ** of the sorted[current_index] element; + */ + blocks = fview_count[0]; + for (j=0 ; jf_procs_per_group ; j++) { + if (sorted[current_index] < blocks) { + n = j; + break; + } + else { + blocks += fview_count[j+1]; + } + } + + if (bytes_remaining) { + /* Finish up a partially used buffer from the previous cycle */ + if (bytes_remaining <= bytes_to_read_in_cycle) { + /* Data fits completely into the block */ + if (my_aggregator == fh->f_rank) { + blocklen_per_process[n][disp_index[n] - 1] = bytes_remaining; + displs_per_process[n][disp_index[n] - 1] = + (OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base + + (global_iov_array[sorted[current_index]].iov_len - bytes_remaining); + + blocklen_per_process[n] = (int *) realloc + ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int)); + displs_per_process[n] = (MPI_Aint *) realloc + ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint)); + blocklen_per_process[n][disp_index[n]] = 0; + displs_per_process[n][disp_index[n]] = 0; + disp_index[n] += 1; + } + if (fh->f_procs_in_group[n] == fh->f_rank) { + bytes_received += bytes_remaining; + } + current_index ++; + bytes_to_read_in_cycle -= bytes_remaining; + bytes_remaining = 0; + continue; + } + else { + /* the remaining data from the previous cycle is larger than the + bytes_to_write_in_cycle, so we have to segment again */ + if (my_aggregator == fh->f_rank) { + blocklen_per_process[n][disp_index[n] - 1] = bytes_to_read_in_cycle; + displs_per_process[n][disp_index[n] - 1] = + (OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base + + (global_iov_array[sorted[current_index]].iov_len + - bytes_remaining); + } + if (fh->f_procs_in_group[n] == fh->f_rank) { + bytes_received += bytes_to_read_in_cycle; + } + bytes_remaining -= bytes_to_read_in_cycle; + bytes_to_read_in_cycle = 0; + break; + } + } + else { + /* No partially used entry available, have to start a new one */ + if (bytes_to_read_in_cycle < + (MPI_Aint) global_iov_array[sorted[current_index]].iov_len) { + /* This entry has more data than we can sendin one cycle */ + if (my_aggregator == fh->f_rank) { + blocklen_per_process[n][disp_index[n] - 1] = bytes_to_read_in_cycle; + displs_per_process[n][disp_index[n] - 1] = + (OPAL_PTRDIFF_TYPE)global_iov_array[sorted[current_index]].iov_base ; + } + + if (fh->f_procs_in_group[n] == fh->f_rank) { + bytes_received += bytes_to_read_in_cycle; + } + bytes_remaining = global_iov_array[sorted[current_index]].iov_len - + bytes_to_read_in_cycle; + bytes_to_read_in_cycle = 0; + break; + } + else { + /* Next data entry is less than bytes_to_write_in_cycle */ + if (my_aggregator == fh->f_rank) { + blocklen_per_process[n][disp_index[n] - 1] = + global_iov_array[sorted[current_index]].iov_len; + displs_per_process[n][disp_index[n] - 1] = (OPAL_PTRDIFF_TYPE) + global_iov_array[sorted[current_index]].iov_base; + blocklen_per_process[n] = + (int *) realloc ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int)); + displs_per_process[n] = (MPI_Aint *)realloc + ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint)); + blocklen_per_process[n][disp_index[n]] = 0; + displs_per_process[n][disp_index[n]] = 0; + disp_index[n] += 1; + } + if (fh->f_procs_in_group[n] == fh->f_rank) { + bytes_received += + global_iov_array[sorted[current_index]].iov_len; + } + bytes_to_read_in_cycle -= + global_iov_array[sorted[current_index]].iov_len; + current_index ++; + continue; + } + } + } /* end while (bytes_to_read_in_cycle) */ + + /************************************************************************* + *** 7d. Calculate the displacement on where to put the data and allocate + *** the recieve buffer (global_buf) + *************************************************************************/ + if (my_aggregator == fh->f_rank) { + entries_per_aggregator=0; + for (i=0;if_procs_per_group; i++){ + for (j=0;j 0) + entries_per_aggregator++ ; + } + } + if (entries_per_aggregator > 0){ + file_offsets_for_agg = (mca_io_ompio_local_io_array *) + malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array)); + if (NULL == file_offsets_for_agg) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + sorted_file_offsets = (int *) + malloc (entries_per_aggregator*sizeof(int)); + if (NULL == sorted_file_offsets){ + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + /*Moving file offsets to an IO array!*/ + temp_index = 0; + global_count = 0; + for (i=0;if_procs_per_group; i++){ + for(j=0;j 0){ + file_offsets_for_agg[temp_index].length = + blocklen_per_process[i][j]; + global_count += blocklen_per_process[i][j]; + file_offsets_for_agg[temp_index].process_id = i; + file_offsets_for_agg[temp_index].offset = + displs_per_process[i][j]; + temp_index++; + } + } + } + } + else{ + continue; + } + + /* Sort the displacements for each aggregator */ + read_heap_sort (file_offsets_for_agg, + entries_per_aggregator, + sorted_file_offsets); + + memory_displacements = (MPI_Aint *) malloc + (entries_per_aggregator * sizeof(MPI_Aint)); + memory_displacements[sorted_file_offsets[0]] = 0; + for (i=1; if_io_array = (mca_io_ompio_io_array_t *) malloc + (entries_per_aggregator * sizeof (mca_io_ompio_io_array_t)); + if (NULL == fh->f_io_array) { + opal_output(1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + fh->f_num_of_io_entries = 0; + fh->f_io_array[0].offset = + (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset; + fh->f_io_array[0].length = + file_offsets_for_agg[sorted_file_offsets[0]].length; + fh->f_io_array[0].memory_address = + global_buf+memory_displacements[sorted_file_offsets[0]]; + fh->f_num_of_io_entries++; + for (i=1;if_io_array[fh->f_num_of_io_entries - 1].length += + file_offsets_for_agg[sorted_file_offsets[i]].length; + } + else{ + fh->f_io_array[fh->f_num_of_io_entries].offset = + (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset; + fh->f_io_array[fh->f_num_of_io_entries].length = + file_offsets_for_agg[sorted_file_offsets[i]].length; + fh->f_io_array[fh->f_num_of_io_entries].memory_address = + global_buf+memory_displacements[sorted_file_offsets[i]]; + fh->f_num_of_io_entries++; + } + } + + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_read_time = MPI_Wtime(); +#endif + + if (fh->f_num_of_io_entries) { + if ( 0 > fh->f_fbtl->fbtl_preadv (fh)) { + opal_output (1, "READ FAILED\n"); + ret = OMPI_ERROR; + goto exit; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_read_time = MPI_Wtime(); + read_time += end_read_time - start_read_time; +#endif + /********************************************************** + ******************** DONE READING ************************ + *********************************************************/ + + temp_disp_index = (int *)calloc (1, fh->f_procs_per_group * sizeof (int)); + if (NULL == temp_disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + for (i=0; if_procs_per_group;i++){ + send_req[i] = MPI_REQUEST_NULL; + if ( 0 < disp_index[i] ) { + ompi_datatype_create_hindexed(disp_index[i], + blocklen_per_process[i], + displs_per_process[i], + MPI_BYTE, + &sendtype[i]); + ompi_datatype_commit(&sendtype[i]); + ret = MCA_PML_CALL (isend(global_buf, + 1, + sendtype[i], + fh->f_procs_in_group[i], + 123, + MCA_PML_BASE_SEND_STANDARD, + fh->f_comm, + &send_req[i])); + if(OMPI_SUCCESS != ret){ + goto exit; + } + } + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif + } + + /********************************************************** + *** 7f. Scatter the Data from the readers + *********************************************************/ + if ( recvbuf_is_contiguous ) { + receive_buf = &((char*)buf)[position]; + } + else if (bytes_received) { + /* allocate a receive buffer and copy the data that needs + to be received into it in case the data is non-contigous + in memory */ + receive_buf = malloc (bytes_received); + if (NULL == receive_buf) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_rcomm_time = MPI_Wtime(); +#endif + ret = MCA_PML_CALL(irecv(receive_buf, + bytes_received, + MPI_BYTE, + my_aggregator, + 123, + fh->f_comm, + &recv_req)); + if (OMPI_SUCCESS != ret){ + goto exit; + } + + + if (my_aggregator == fh->f_rank){ + ret = ompi_request_wait_all (fh->f_procs_per_group, + send_req, + MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + + ret = ompi_request_wait (&recv_req, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + position += bytes_received; + + /* If data is not contigous in memory, copy the data from the + receive buffer into the buffer passed in */ + if (!recvbuf_is_contiguous ) { + OPAL_PTRDIFF_TYPE mem_address; + size_t remaining = 0; + size_t temp_position = 0; + + remaining = bytes_received; + + while (remaining) { + mem_address = (OPAL_PTRDIFF_TYPE) + (decoded_iov[iov_index].iov_base) + current_position; + + if (remaining >= + (decoded_iov[iov_index].iov_len - current_position)) { + memcpy ((IOVBASE_TYPE *) mem_address, + receive_buf+temp_position, + decoded_iov[iov_index].iov_len - current_position); + remaining = remaining - + (decoded_iov[iov_index].iov_len - current_position); + temp_position = temp_position + + (decoded_iov[iov_index].iov_len - current_position); + iov_index = iov_index + 1; + current_position = 0; + } + else { + memcpy ((IOVBASE_TYPE *) mem_address, + receive_buf+temp_position, + remaining); + current_position = current_position + remaining; + remaining = 0; + } + } + + if (NULL != receive_buf) { + free (receive_buf); + receive_buf = NULL; + } + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rcomm_time = MPI_Wtime(); + rcomm_time += end_rcomm_time - start_rcomm_time; +#endif + } /* end for (index=0; index < cycles; index ++) */ + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_rexch = MPI_Wtime(); + read_exch += end_rexch - start_rexch; + nentry.time[0] = read_time; + nentry.time[1] = rcomm_time; + nentry.time[2] = read_exch; + if (my_aggregator == fh->f_rank) + nentry.aggregator = 1; + else + nentry.aggregator = 0; + nentry.nprocs_for_coll = dynamic_gen2_num_io_procs; + if (!fh->f_full_print_queue(READ_PRINT_QUEUE)){ + fh->f_register_print_entry(READ_PRINT_QUEUE, + nentry); + } +#endif + +exit: + if (!recvbuf_is_contiguous) { + if (NULL != receive_buf) { + free (receive_buf); + receive_buf = NULL; + } + } + if (NULL != global_buf) { + free (global_buf); + global_buf = NULL; + } + if (NULL != sorted) { + free (sorted); + sorted = NULL; + } + if (NULL != global_iov_array) { + free (global_iov_array); + global_iov_array = NULL; + } + if (NULL != fview_count) { + free (fview_count); + fview_count = NULL; + } + if (NULL != decoded_iov) { + free (decoded_iov); + decoded_iov = NULL; + } + if (NULL != local_iov_array){ + free(local_iov_array); + local_iov_array=NULL; + } + + if (NULL != displs) { + free (displs); + displs = NULL; + } + if (my_aggregator == fh->f_rank) { + + if (NULL != sorted_file_offsets){ + free(sorted_file_offsets); + sorted_file_offsets = NULL; + } + if (NULL != file_offsets_for_agg){ + free(file_offsets_for_agg); + file_offsets_for_agg = NULL; + } + if (NULL != memory_displacements){ + free(memory_displacements); + memory_displacements= NULL; + } + if (NULL != sendtype){ + for (i = 0; i < fh->f_procs_per_group; i++) { + if ( MPI_DATATYPE_NULL != sendtype[i] ) { + ompi_datatype_destroy(&sendtype[i]); + } + } + free(sendtype); + sendtype=NULL; + } + + if (NULL != disp_index){ + free(disp_index); + disp_index = NULL; + } + + if ( NULL != blocklen_per_process){ + for(l=0;lf_procs_per_group;l++){ + if (NULL != blocklen_per_process[l]){ + free(blocklen_per_process[l]); + blocklen_per_process[l] = NULL; + } + } + + free(blocklen_per_process); + blocklen_per_process = NULL; + } + + if (NULL != displs_per_process){ + for (l=0; if_procs_per_group; l++){ + if (NULL != displs_per_process[l]){ + free(displs_per_process[l]); + displs_per_process[l] = NULL; + } + } + free(displs_per_process); + displs_per_process = NULL; + } + if ( NULL != send_req ) { + free ( send_req ); + send_req = NULL; + } + } + return ret; +} + + +static int read_heap_sort (mca_io_ompio_local_io_array *io_array, + int num_entries, + int *sorted) +{ + int i = 0; + int j = 0; + int left = 0; + int right = 0; + int largest = 0; + int heap_size = num_entries - 1; + int temp = 0; + unsigned char done = 0; + int* temp_arr = NULL; + + temp_arr = (int*)malloc(num_entries*sizeof(int)); + if (NULL == temp_arr) { + opal_output (1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + temp_arr[0] = 0; + for (i = 1; i < num_entries; ++i) { + temp_arr[i] = i; + } + /* num_entries can be a large no. so NO RECURSION */ + for (i = num_entries/2-1 ; i>=0 ; i--) { + done = 0; + j = i; + largest = j; + + while (!done) { + left = j*2+1; + right = j*2+2; + if ((left <= heap_size) && + (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) { + largest = left; + } + else { + largest = j; + } + if ((right <= heap_size) && + (io_array[temp_arr[right]].offset > + io_array[temp_arr[largest]].offset)) { + largest = right; + } + if (largest != j) { + temp = temp_arr[largest]; + temp_arr[largest] = temp_arr[j]; + temp_arr[j] = temp; + j = largest; + } + else { + done = 1; + } + } + } + + for (i = num_entries-1; i >=1; --i) { + temp = temp_arr[0]; + temp_arr[0] = temp_arr[i]; + temp_arr[i] = temp; + heap_size--; + done = 0; + j = 0; + largest = j; + + while (!done) { + left = j*2+1; + right = j*2+2; + + if ((left <= heap_size) && + (io_array[temp_arr[left]].offset > + io_array[temp_arr[j]].offset)) { + largest = left; + } + else { + largest = j; + } + if ((right <= heap_size) && + (io_array[temp_arr[right]].offset > + io_array[temp_arr[largest]].offset)) { + largest = right; + } + if (largest != j) { + temp = temp_arr[largest]; + temp_arr[largest] = temp_arr[j]; + temp_arr[j] = temp; + j = largest; + } + else { + done = 1; + } + } + sorted[i] = temp_arr[i]; + } + sorted[0] = temp_arr[0]; + + if (NULL != temp_arr) { + free(temp_arr); + temp_arr = NULL; + } + return OMPI_SUCCESS; +} + + + diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c new file mode 100644 index 0000000000..85a911416e --- /dev/null +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -0,0 +1,1455 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008-2016 University of Houston. All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "fcoll_dynamic_gen2.h" + +#include "mpi.h" +#include "ompi/constants.h" +#include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/io/ompio/io_ompio.h" +#include "ompi/mca/io/io.h" +#include "math.h" +#include "ompi/mca/pml/pml.h" +#include + + +#define DEBUG_ON 0 + +/*Used for loading file-offsets per aggregator*/ +typedef struct mca_io_ompio_local_io_array{ + OMPI_MPI_OFFSET_TYPE offset; + MPI_Aint length; + int process_id; +}mca_io_ompio_local_io_array; + +typedef struct mca_io_ompio_aggregator_data { + int *disp_index, *sorted, *fview_count, n; + int **blocklen_per_process; + MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; + MPI_Request *recv_req; + MPI_Comm comm; + char *global_buf, *buf; + ompi_datatype_t **recvtype; + struct iovec *global_iov_array; + int current_index, current_position; + int bytes_to_write_in_cycle, bytes_remaining, procs_per_group; + int *procs_in_group, iov_index; + bool sendbuf_is_contiguous; + struct iovec *decoded_iov; +} mca_io_ompio_aggregator_data; + +static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, + mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries ); + +int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count, + struct iovec *local_iov_array, int local_count, + struct iovec ***broken_decoded_iovs, int **broken_iov_counts, + struct iovec ***broken_iov_arrays, int **broken_counts, + MPI_Aint **broken_total_lengths, + int stripe_count, int stripe_size); + + +int mca_fcoll_dynamic_gen2_get_configuration (mca_io_ompio_file_t *fh, int *dynamic_gen2_num_io_procs, int **ret_aggregators); + + +static int local_heap_sort (mca_io_ompio_local_io_array *io_array, + int num_entries, + int *sorted); + + +int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, + const void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_status_public_t *status) +{ + int index = 0; + int cycles = 0; + int ret =0, l, i, j, bytes_per_cycle; + uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; + struct iovec *local_iov_array=NULL; + uint32_t total_fview_count = 0; + int local_count = 0; + + mca_io_ompio_aggregator_data **aggr_data=NULL; + + int *displs = NULL; + int dynamic_gen2_num_io_procs; + size_t max_data = 0; + + MPI_Aint *total_bytes_per_process = NULL; + mca_io_ompio_io_array_t *io_array; + int num_io_entries; + + struct iovec **broken_iov_arrays=NULL; + struct iovec **broken_decoded_iovs=NULL; + int *broken_counts=NULL; + int *broken_iov_counts=NULL; + MPI_Aint *broken_total_lengths=NULL; + + int *aggregators=NULL; + + //Edgar: just for quick testing: + int stripe_size=1048576; + + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0; + double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0; + double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0; + mca_io_ompio_print_entry nentry; +#endif + + + /************************************************************************** + ** 1. In case the data is not contigous in memory, decode it into an iovec + **************************************************************************/ + fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs ); + fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle ); + + ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *) fh, + datatype, + count, + buf, + &max_data, + &decoded_iov, + &iov_count); + if (OMPI_SUCCESS != ret ){ + goto exit; + } + + if ( MPI_STATUS_IGNORE != status ) { + status->_ucount = max_data; + } + + /* difference to the first generation of this function: + ** dynamic_gen2_num_io_procs should be the number of io_procs per group + ** consequently.Initially, we will have only 1 group. + */ + + // EDGAR: just a quick heck for testing + ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators); + if (OMPI_SUCCESS != ret){ + goto exit; + } + + if ( fh->f_stripe_size > 0 ) { + stripe_size = fh->f_stripe_size; + } + + + aggr_data = (mca_io_ompio_aggregator_data **) malloc ( dynamic_gen2_num_io_procs * + sizeof(mca_io_ompio_aggregator_data*)); + + for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) { + // At this point we know the number of aggregators. If there is a correlation between + // number of aggregators and number of IO nodes, we know how many aggr_data arrays we need + // to allocate. + aggr_data[i] = (mca_io_ompio_aggregator_data *) calloc ( 1, sizeof(mca_io_ompio_aggregator_data)); + aggr_data[i]->procs_per_group = fh->f_procs_per_group; + aggr_data[i]->procs_in_group = fh->f_procs_in_group; + aggr_data[i]->comm = fh->f_comm; + aggr_data[i]->buf = (char *)buf; // should not be used in the new version. + aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now + } + + /********************************************************************* + *** 2. Generate the local offsets/lengths array corresponding to + *** this write operation + ********************************************************************/ + ret = fh->f_generate_current_file_view( (struct mca_io_ompio_file_t *) fh, + max_data, + &local_iov_array, + &local_count); + if (ret != OMPI_SUCCESS){ + goto exit; + } + + /************************************************************************* + ** 2b. Separate the local_iov_array entries based on the number of aggregators + *************************************************************************/ + // broken_iov_arrays[0] contains broken_counts[0] entries to aggregator 0, + // broken_iov_arrays[1] contains broken_counts[1] entries to aggregator 1, etc. + ret = mca_fcoll_dynamic_gen2_break_file_view ( decoded_iov, iov_count, + local_iov_array, local_count, + &broken_decoded_iovs, &broken_iov_counts, + &broken_iov_arrays, &broken_counts, + &broken_total_lengths, + dynamic_gen2_num_io_procs, stripe_size); + + + /************************************************************************** + ** 3. Determine the total amount of data to be written and no. of cycles + **************************************************************************/ + total_bytes_per_process = (MPI_Aint*)malloc + (dynamic_gen2_num_io_procs * fh->f_procs_per_group*sizeof(MPI_Aint)); + if (NULL == total_bytes_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + ret = fh->f_allgather_array (broken_total_lengths, + dynamic_gen2_num_io_procs, + MPI_LONG, + total_bytes_per_process, + dynamic_gen2_num_io_procs, + MPI_LONG, + 0, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + + if( OMPI_SUCCESS != ret){ + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + cycles=0; + for ( i=0; if_procs_per_group ; j++) { + broken_total_lengths[i] += total_bytes_per_process[j*dynamic_gen2_num_io_procs + i]; + } +#if DEBUG_ON + printf("%d: Overall broken_total_lengths[%d] = %ld\n", fh->f_rank, i, broken_total_lengths[i]); +#endif + if ( ceil((double)broken_total_lengths[i]/bytes_per_cycle) > cycles ) { + cycles = ceil((double)broken_total_lengths[i]/bytes_per_cycle); + } + } + + if (NULL != total_bytes_per_process) { + free (total_bytes_per_process); + total_bytes_per_process = NULL; + } + + + /************************************************************* + *** 4. Allgather the offset/lengths array from all processes + *************************************************************/ + for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) { + aggr_data[i]->total_bytes = broken_total_lengths[i]; + aggr_data[i]->decoded_iov = broken_decoded_iovs[i]; + aggr_data[i]->fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int)); + if (NULL == aggr_data[i]->fview_count) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + ret = fh->f_allgather_array (&broken_counts[i], + 1, + MPI_INT, + aggr_data[i]->fview_count, + 1, + MPI_INT, + i, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + + if( OMPI_SUCCESS != ret){ + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + displs = (int*) malloc (fh->f_procs_per_group * sizeof (int)); + if (NULL == displs) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + displs[0] = 0; + total_fview_count = aggr_data[i]->fview_count[0]; + for (j=1 ; jf_procs_per_group ; j++) { + total_fview_count += aggr_data[i]->fview_count[j]; + displs[j] = displs[j-1] + aggr_data[i]->fview_count[j-1]; + } + +#if DEBUG_ON + printf("total_fview_count : %d\n", total_fview_count); + if (aggregators[i] == fh->f_rank) { + for (j=0 ; jf_procs_per_group ; i++) { + printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n", + fh->f_rank, + j, + aggr_data[i]->fview_count[j], + displs[j]); + } + } +#endif + + /* allocate the global iovec */ + if (0 != total_fview_count) { + aggr_data[i]->global_iov_array = (struct iovec*) malloc (total_fview_count * + sizeof(struct iovec)); + if (NULL == aggr_data[i]->global_iov_array){ + opal_output(1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + ret = fh->f_allgatherv_array (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + aggr_data[i]->fview_count, + displs, + fh->f_iov_type, + i, + fh->f_procs_in_group, + fh->f_procs_per_group, + fh->f_comm); + if (OMPI_SUCCESS != ret){ + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + /**************************************************************************************** + *** 5. Sort the global offset/lengths list based on the offsets. + *** The result of the sort operation is the 'sorted', an integer array, + *** which contains the indexes of the global_iov_array based on the offset. + *** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset + *** in the file, and that one is followed by global_iov_array[z].offset, than + *** sorted[0] = x, sorted[1]=y and sorted[2]=z; + ******************************************************************************************/ + if (0 != total_fview_count) { + aggr_data[i]->sorted = (int *)malloc (total_fview_count * sizeof(int)); + if (NULL == aggr_data[i]->sorted) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + fh->f_sort_iovec (aggr_data[i]->global_iov_array, total_fview_count, aggr_data[i]->sorted); + } + + if (NULL != local_iov_array){ + free(local_iov_array); + local_iov_array = NULL; + } + + if (NULL != displs){ + free(displs); + displs=NULL; + } + + +#if DEBUG_ON + if (my_aggregator == fh->f_rank) { + uint32_t tv=0; + for (tv=0 ; tvf_rank, + aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_base, + aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_len); + } + } +#endif + /************************************************************* + *** 6. Determine the number of cycles required to execute this + *** operation + *************************************************************/ + + aggr_data[i]->bytes_per_cycle = bytes_per_cycle; + + if (aggregators[i] == fh->f_rank) { + aggr_data[i]->disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int)); + if (NULL == aggr_data[i]->disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*)); + if (NULL == aggr_data[i]->blocklen_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*)); + if (NULL == aggr_data[i]->displs_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->recv_req = (MPI_Request *)malloc ((fh->f_procs_per_group)*sizeof(MPI_Request)); + if ( NULL == aggr_data[i]->recv_req ) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle); + if (NULL == aggr_data[i]->global_buf){ + opal_output(1, "OUT OF MEMORY"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * + sizeof(ompi_datatype_t *)); + if (NULL == aggr_data[i]->recvtype) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + for(l=0;lf_procs_per_group;l++){ + aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_exch = MPI_Wtime(); +#endif + } + + + for (index = 0; index < cycles; index++) { + for ( i=0; if_rank, aggr_data[i], + &io_array, &num_io_entries ); + if ( OMPI_SUCCESS != ret ) { + goto exit; + } + if ( aggregators[i] == fh->f_rank ) { + fh->f_num_of_io_entries = num_io_entries; + fh->f_io_array = io_array; + if (fh->f_num_of_io_entries) { + if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) { + opal_output (1, "WRITE FAILED\n"); + ret = OMPI_ERROR; + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_write_time = MPI_Wtime(); + write_time += end_write_time - start_write_time; +#endif + free ( fh->f_io_array ); + } + fh->f_io_array=NULL; + fh->f_num_of_io_entries=0; + } /* end if (my_aggregator == fh->f_rank) */ + } + } /* end for (index = 0; index < cycles; index++) */ + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_exch = MPI_Wtime(); + exch_write += end_exch - start_exch; + nentry.time[0] = write_time; + nentry.time[1] = comm_time; + nentry.time[2] = exch_write; + if (my_aggregator == fh->f_rank) + nentry.aggregator = 1; + else + nentry.aggregator = 0; + nentry.nprocs_for_coll = dynamic_gen2_num_io_procs; + if (!fh->f_full_print_queue(WRITE_PRINT_QUEUE)){ + fh->f_register_print_entry(WRITE_PRINT_QUEUE, + nentry); + } +#endif + + +exit : + + if ( NULL != aggr_data ) { + + for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) { + if (aggregators[i] == fh->f_rank) { + if (NULL != aggr_data[i]->recvtype){ + for (j =0; j< aggr_data[i]->procs_per_group; j++) { + if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) { + ompi_datatype_destroy(&aggr_data[i]->recvtype[j]); + } + } + free(aggr_data[i]->recvtype); + } + + free (aggr_data[i]->disp_index); + free (aggr_data[i]->recv_req); + free (aggr_data[i]->global_buf); + for(l=0;lprocs_per_group;l++){ + free (aggr_data[i]->blocklen_per_process[l]); + free (aggr_data[i]->displs_per_process[l]); + } + + free (aggr_data[i]->blocklen_per_process); + free (aggr_data[i]->displs_per_process); + } + free (aggr_data[i]->sorted); + free (aggr_data[i]->global_iov_array); + free (aggr_data[i]->fview_count); + free (aggr_data[i]->decoded_iov); + + free (aggr_data[i]); + } + free (aggr_data); + } + free(displs); + free(decoded_iov); + + return OMPI_SUCCESS; +} + + +static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, + mca_io_ompio_io_array_t **ret_io_array, int *ret_num_io_entries ) +{ + int bytes_sent = 0; + int blocks=0, temp_pindex; + char *send_buf = NULL; + int i, j, l, ret; + MPI_Request send_req; + int entries_per_aggregator=0; + mca_io_ompio_local_io_array *file_offsets_for_agg=NULL; + int *sorted_file_offsets=NULL; + int temp_index=0; + MPI_Aint *memory_displacements=NULL; + mca_io_ompio_io_array_t *io_array; + int num_of_io_entries; + int *temp_disp_index=NULL; + MPI_Aint global_count = 0; + + *ret_num_io_entries = 0; + /********************************************************************** + *** 7a. Getting ready for next cycle: initializing and freeing buffers + **********************************************************************/ + if (aggregator == rank) { + num_of_io_entries = 0; + + if (NULL != data->recvtype){ + for (i =0; i< data->procs_per_group; i++) { + if ( MPI_DATATYPE_NULL != data->recvtype[i] ) { + ompi_datatype_destroy(&data->recvtype[i]); + data->recvtype[i] = MPI_DATATYPE_NULL; + } + } + } + + for(l=0;lprocs_per_group;l++){ + data->disp_index[l] = 1; + + free(data->blocklen_per_process[l]); + free(data->displs_per_process[l]); + + data->blocklen_per_process[l] = (int *) calloc (1, sizeof(int)); + data->displs_per_process[l] = (MPI_Aint *) calloc (1, sizeof(MPI_Aint)); + if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){ + opal_output (1, "OUT OF MEMORY for displs\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + } /* (aggregator == rank */ + + /************************************************************************** + *** 7b. Determine the number of bytes to be actually written in this cycle + **************************************************************************/ + if (cycles-1 == index) { + data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index; + } + else { + data->bytes_to_write_in_cycle = data->bytes_per_cycle; + } + +#if DEBUG_ON + if (aggregator == rank) { + printf ("****%d: CYCLE %d Bytes %lld**********\n", + rank, + index, + data->bytes_to_write_in_cycle); + } +#endif + /********************************************************** + **Gather the Data from all the processes at the writers ** + *********************************************************/ + +#if DEBUG_ON + printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", data->bytes_to_write_in_cycle, + index); +#endif + + /***************************************************************** + *** 7c. Calculate how much data will be contributed in this cycle + *** by each process + *****************************************************************/ + + /* The blocklen and displs calculation only done at aggregators!*/ + while (data->bytes_to_write_in_cycle) { + + /* This next block identifies which process is the holder + ** of the sorted[current_index] element; + */ + blocks = data->fview_count[0]; + for (j=0 ; jprocs_per_group ; j++) { + if (data->sorted[data->current_index] < blocks) { + data->n = j; + break; + } + else { + blocks += data->fview_count[j+1]; + } + } + + if (data->bytes_remaining) { + /* Finish up a partially used buffer from the previous cycle */ + + if (data->bytes_remaining <= data->bytes_to_write_in_cycle) { + /* The data fits completely into the block */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_remaining; + data->displs_per_process[data->n][data->disp_index[data->n] - 1] = + (OPAL_PTRDIFF_TYPE)data->global_iov_array[data->sorted[data->current_index]].iov_base + + (data->global_iov_array[data->sorted[data->current_index]].iov_len + - data->bytes_remaining); + + /* In this cases the length is consumed so allocating for + next displacement and blocklength*/ + data->blocklen_per_process[data->n] = (int *) realloc + ((void *)data->blocklen_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(int)); + data->displs_per_process[data->n] = (MPI_Aint *) realloc + ((void *)data->displs_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(MPI_Aint)); + data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; + data->displs_per_process[data->n][data->disp_index[data->n]] = 0; + data->disp_index[data->n] += 1; + } + if (data->procs_in_group[data->n] == rank) { + bytes_sent += data->bytes_remaining; + } + data->current_index ++; + data->bytes_to_write_in_cycle -= data->bytes_remaining; + data->bytes_remaining = 0; +// continue; +// break; + } + else { + /* the remaining data from the previous cycle is larger than the + data->bytes_to_write_in_cycle, so we have to segment again */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle; + data->displs_per_process[data->n][data->disp_index[data->n] - 1] = + (OPAL_PTRDIFF_TYPE)data->global_iov_array[data->sorted[data->current_index]].iov_base + + (data->global_iov_array[data->sorted[data->current_index]].iov_len + - data->bytes_remaining); + } + + if (data->procs_in_group[data->n] == rank) { + bytes_sent += data->bytes_to_write_in_cycle; + } + data->bytes_remaining -= data->bytes_to_write_in_cycle; + data->bytes_to_write_in_cycle = 0; + break; + } + } + else { + /* No partially used entry available, have to start a new one */ + if (data->bytes_to_write_in_cycle < + (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) { + /* This entry has more data than we can sendin one cycle */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle; + data->displs_per_process[data->n][data->disp_index[data->n] - 1] = + (OPAL_PTRDIFF_TYPE)data->global_iov_array[data->sorted[data->current_index]].iov_base ; + } + if (data->procs_in_group[data->n] == rank) { + bytes_sent += data->bytes_to_write_in_cycle; + + } + data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len - + data->bytes_to_write_in_cycle; + data->bytes_to_write_in_cycle = 0; + break; + } + else { + /* Next data entry is less than data->bytes_to_write_in_cycle */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = + data->global_iov_array[data->sorted[data->current_index]].iov_len; + data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (OPAL_PTRDIFF_TYPE) + data->global_iov_array[data->sorted[data->current_index]].iov_base; + + /*realloc for next blocklength + and assign this displacement and check for next displs as + the total length of this entry has been consumed!*/ + data->blocklen_per_process[data->n] = + (int *) realloc ((void *)data->blocklen_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(int)); + data->displs_per_process[data->n] = (MPI_Aint *)realloc + ((void *)data->displs_per_process[data->n], (data->disp_index[data->n]+1)*sizeof(MPI_Aint)); + data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; + data->displs_per_process[data->n][data->disp_index[data->n]] = 0; + data->disp_index[data->n] += 1; + } + if (data->procs_in_group[data->n] == rank) { + bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len; + } + data->bytes_to_write_in_cycle -= + data->global_iov_array[data->sorted[data->current_index]].iov_len; + data->current_index ++; +// continue; + } + } + } + + + /************************************************************************* + *** 7d. Calculate the displacement on where to put the data and allocate + *** the recieve buffer (global_buf) + *************************************************************************/ + if (aggregator == rank) { + entries_per_aggregator=0; + for (i=0;iprocs_per_group; i++){ + for (j=0;jdisp_index[i];j++){ + if (data->blocklen_per_process[i][j] > 0) + entries_per_aggregator++ ; + } + } + +#if DEBUG_ON + printf("%d: cycle: %d, bytes_sent: %d\n ",rank,index, + bytes_sent); + printf("%d : Entries per aggregator : %d\n",rank,entries_per_aggregator); +#endif + + if (entries_per_aggregator > 0){ + file_offsets_for_agg = (mca_io_ompio_local_io_array *) + malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array)); + if (NULL == file_offsets_for_agg) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + sorted_file_offsets = (int *) + malloc (entries_per_aggregator*sizeof(int)); + if (NULL == sorted_file_offsets){ + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + /*Moving file offsets to an IO array!*/ + temp_index = 0; + + for (i=0;iprocs_per_group; i++){ + for(j=0;jdisp_index[i];j++){ + if (data->blocklen_per_process[i][j] > 0){ + file_offsets_for_agg[temp_index].length = + data->blocklen_per_process[i][j]; + file_offsets_for_agg[temp_index].process_id = i; + file_offsets_for_agg[temp_index].offset = + data->displs_per_process[i][j]; + temp_index++; + +#if DEBUG_ON + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1,rank); + + printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", + data->procs_in_group[i],j, + data->blocklen_per_process[i][j],j, + data->displs_per_process[i][j], + rank); +#endif + } + } + } + } + else{ +// continue; + return OMPI_SUCCESS; + } + /* Sort the displacements for each aggregator*/ + local_heap_sort (file_offsets_for_agg, + entries_per_aggregator, + sorted_file_offsets); + + /*create contiguous memory displacements + based on blocklens on the same displs array + and map it to this aggregator's actual + file-displacements (this is in the io-array created above)*/ + memory_displacements = (MPI_Aint *) malloc + (entries_per_aggregator * sizeof(MPI_Aint)); + + memory_displacements[sorted_file_offsets[0]] = 0; + for (i=1; iprocs_per_group * sizeof (int)); + if (NULL == temp_disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + /*Now update the displacements array with memory offsets*/ + global_count = 0; + for (i=0;idispls_per_process[temp_pindex][temp_disp_index[temp_pindex]] = + memory_displacements[sorted_file_offsets[i]]; + if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex]) + temp_disp_index[temp_pindex] += 1; + else{ + printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n", + temp_pindex, temp_disp_index[temp_pindex], + temp_pindex, data->disp_index[temp_pindex]); + } + global_count += + file_offsets_for_agg[sorted_file_offsets[i]].length; + } + + if (NULL != temp_disp_index){ + free(temp_disp_index); + temp_disp_index = NULL; + } + +#if DEBUG_ON + + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1,rank); + for (i=0;iprocs_per_group; i++){ + for(j=0;jdisp_index[i];j++){ + if (data->blocklen_per_process[i][j] > 0){ + printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", + data->procs_in_group[i],j, + data->blocklen_per_process[i][j],j, + data->displs_per_process[i][j], + rank); + + } + } + } + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1,rank); + for (i=0; iprocs_per_group; i++) { + size_t datatype_size; + data->recv_req[i] = MPI_REQUEST_NULL; + if ( 0 < data->disp_index[i] ) { + ompi_datatype_create_hindexed(data->disp_index[i], + data->blocklen_per_process[i], + data->displs_per_process[i], + MPI_BYTE, + &data->recvtype[i]); + ompi_datatype_commit(&data->recvtype[i]); + opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size); + + if (datatype_size){ + ret = MCA_PML_CALL(irecv(data->global_buf, + 1, + data->recvtype[i], + data->procs_in_group[i], + 123, + data->comm, + &data->recv_req[i])); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + } + } + } /* end if (aggregator == rank ) */ + + + if ( data->sendbuf_is_contiguous ) { + send_buf = &((char*)data->buf)[data->total_bytes_written]; + } + else if (bytes_sent) { + /* allocate a send buffer and copy the data that needs + to be sent into it in case the data is non-contigous + in memory */ + OPAL_PTRDIFF_TYPE mem_address; + size_t remaining = 0; + size_t temp_position = 0; + + send_buf = malloc (bytes_sent); + if (NULL == send_buf) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + remaining = bytes_sent; + + while (remaining) { + mem_address = (OPAL_PTRDIFF_TYPE) + (data->decoded_iov[data->iov_index].iov_base) + data->current_position; + + if (remaining >= + (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { + memcpy (send_buf+temp_position, + (IOVBASE_TYPE *)mem_address, + data->decoded_iov[data->iov_index].iov_len - data->current_position); + remaining = remaining - + (data->decoded_iov[data->iov_index].iov_len - data->current_position); + temp_position = temp_position + + (data->decoded_iov[data->iov_index].iov_len - data->current_position); + data->iov_index = data->iov_index + 1; + data->current_position = 0; + } + else { + memcpy (send_buf+temp_position, + (IOVBASE_TYPE *) mem_address, + remaining); + data->current_position += remaining; + remaining = 0; + } + } + } + data->total_bytes_written += bytes_sent; + + /* Gather the sendbuf from each process in appropritate locations in + aggregators*/ + + if (bytes_sent){ + ret = MCA_PML_CALL(isend(send_buf, + bytes_sent, + MPI_BYTE, + aggregator, + 123, + MCA_PML_BASE_SEND_STANDARD, + data->comm, + &send_req)); + + + if ( OMPI_SUCCESS != ret ){ + goto exit; + } + + ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + + if (aggregator == rank) { + ret = ompi_request_wait_all (data->procs_per_group, + data->recv_req, + MPI_STATUS_IGNORE); + + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + +#if DEBUG_ON + if (aggregator == rank){ + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1,rank); + for (i=0 ; iglobal_buf)[i]); + } +#endif + + if (! data->sendbuf_is_contiguous) { + if (NULL != send_buf) { + free (send_buf); + send_buf = NULL; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + /********************************************************** + *** 7f. Create the io array, and pass it to fbtl + *********************************************************/ + + if (aggregator == rank) { + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_write_time = MPI_Wtime(); +#endif + + io_array = (mca_io_ompio_io_array_t *) malloc + (entries_per_aggregator * sizeof (mca_io_ompio_io_array_t)); + if (NULL == io_array) { + opal_output(1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + num_of_io_entries = 0; + /*First entry for every aggregator*/ + io_array[0].offset = + (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset; + io_array[0].length = + file_offsets_for_agg[sorted_file_offsets[0]].length; + io_array[0].memory_address = + data->global_buf+memory_displacements[sorted_file_offsets[0]]; + num_of_io_entries++; + + for (i=1;iglobal_buf+memory_displacements[sorted_file_offsets[i]]; + num_of_io_entries++; + } + + } + +#if DEBUG_ON + printf("*************************** %d\n", num_of_io_entries); + for (i=0 ; i= rest ) { + blocklen = rest; + temp_offset = offset+rest; + temp_len = len - rest; + } + else { + blocklen = len; + temp_offset = 0; + temp_len = 0; + } + + broken_file_iovs[owner][broken_file_counts[owner]].iov_base = (void *)offset; + broken_file_iovs[owner][broken_file_counts[owner]].iov_len = blocklen; +#if DEBUG_ON + printf("%d: owner=%d b_file_iovs[%d].base=%ld .len=%d \n", rank, owner, + broken_file_counts[owner], + broken_file_iovs[owner][broken_file_counts[owner]].iov_base, + broken_file_iovs[owner][broken_file_counts[owner]].iov_len ); +#endif + do { + if ( memlen >= blocklen ) { + broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset; + broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len = blocklen; + memoffset += blocklen; + memlen -= blocklen; + blocklen = 0; + + if ( 0 == memlen ) { + j++; + memoffset = (off_t ) mem_iov[j].iov_base; + memlen = mem_iov[j].iov_len; + } + } + else { + broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset; + broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len = memlen; + blocklen -= memlen; + + j++; + memoffset = (off_t ) mem_iov[j].iov_base; + memlen = mem_iov[j].iov_len; + } +#if DEBUG_ON + printf("%d: owner=%d b_mem_iovs[%d].base=%ld .len=%d\n", rank, owner, + broken_mem_counts[owner], + broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base, + broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len); +#endif + + broken_mem_counts[owner]++; + if ( broken_mem_counts[owner] >= max_lengths[owner][0] ) { + broken_mem_iovs[owner] = (struct iovec*) realloc ( broken_mem_iovs[owner], + mem_count * block[owner][0] * + sizeof(struct iovec )); + max_lengths[owner][0] = mem_count * block[owner][0]; + block[owner][0]++; + } + + } while ( blocklen > 0 ); + + broken_file_counts[owner]++; + if ( broken_file_counts[owner] >= max_lengths[owner][1] ) { + broken_file_iovs[owner] = (struct iovec*) realloc ( broken_file_iovs[owner], + file_count * block[owner][1] * + sizeof(struct iovec )); + max_lengths[owner][1] = file_count * block[owner][1]; + block[owner][1]++; + } + + offset = temp_offset; + len = temp_len; + } while( temp_len > 0 ); + + i++; + } + + + /* Step 2: recalculating the total lengths per aggregator */ + for ( i=0; i< stripe_count; i++ ) { + for ( j=0; jf_stripe_count; + if ( num_io_procs < 1 ) { + num_io_procs = 1; + } + if ( num_io_procs > fh->f_size ) { + num_io_procs = fh->f_size; + } + } + + fh->f_procs_per_group = fh->f_size; + fh->f_procs_in_group = (int *) malloc ( sizeof(int) * fh->f_size ); + if ( NULL == fh->f_procs_in_group) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + for (i=0; if_size; i++ ) { + fh->f_procs_in_group[i]=i; + } + + + aggregators = (int *) malloc ( num_io_procs * sizeof(int)); + if ( NULL == aggregators ) { + // fh->procs_in_group will be freed with the fh structure. No need to do it here. + return OMPI_ERR_OUT_OF_RESOURCE; + } + for ( i=0; if_size / num_io_procs; + } + + *dynamic_gen2_num_io_procs = num_io_procs; + *ret_aggregators = aggregators; + + return OMPI_SUCCESS; +} + + +static int local_heap_sort (mca_io_ompio_local_io_array *io_array, + int num_entries, + int *sorted) +{ + int i = 0; + int j = 0; + int left = 0; + int right = 0; + int largest = 0; + int heap_size = num_entries - 1; + int temp = 0; + unsigned char done = 0; + int* temp_arr = NULL; + + temp_arr = (int*)malloc(num_entries*sizeof(int)); + if (NULL == temp_arr) { + opal_output (1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + temp_arr[0] = 0; + for (i = 1; i < num_entries; ++i) { + temp_arr[i] = i; + } + /* num_entries can be a large no. so NO RECURSION */ + for (i = num_entries/2-1 ; i>=0 ; i--) { + done = 0; + j = i; + largest = j; + + while (!done) { + left = j*2+1; + right = j*2+2; + if ((left <= heap_size) && + (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) { + largest = left; + } + else { + largest = j; + } + if ((right <= heap_size) && + (io_array[temp_arr[right]].offset > + io_array[temp_arr[largest]].offset)) { + largest = right; + } + if (largest != j) { + temp = temp_arr[largest]; + temp_arr[largest] = temp_arr[j]; + temp_arr[j] = temp; + j = largest; + } + else { + done = 1; + } + } + } + + for (i = num_entries-1; i >=1; --i) { + temp = temp_arr[0]; + temp_arr[0] = temp_arr[i]; + temp_arr[i] = temp; + heap_size--; + done = 0; + j = 0; + largest = j; + + while (!done) { + left = j*2+1; + right = j*2+2; + + if ((left <= heap_size) && + (io_array[temp_arr[left]].offset > + io_array[temp_arr[j]].offset)) { + largest = left; + } + else { + largest = j; + } + if ((right <= heap_size) && + (io_array[temp_arr[right]].offset > + io_array[temp_arr[largest]].offset)) { + largest = right; + } + if (largest != j) { + temp = temp_arr[largest]; + temp_arr[largest] = temp_arr[j]; + temp_arr[j] = temp; + j = largest; + } + else { + done = 1; + } + } + sorted[i] = temp_arr[i]; + } + sorted[0] = temp_arr[0]; + + if (NULL != temp_arr) { + free(temp_arr); + temp_arr = NULL; + } + return OMPI_SUCCESS; +} + diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_module.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_module.c new file mode 100644 index 0000000000..16070a9cbf --- /dev/null +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_module.c @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008-2015 University of Houston. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "fcoll_dynamic_gen2.h" + +#include + +#include "mpi.h" +#include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/base.h" + + +/* + * ******************************************************************* + * ************************ actions structure ************************ + * ******************************************************************* + */ +static mca_fcoll_base_module_1_0_0_t dynamic_gen2 = { + mca_fcoll_dynamic_gen2_module_init, + mca_fcoll_dynamic_gen2_module_finalize, + mca_fcoll_dynamic_gen2_file_read_all, + NULL, /* iread_all */ + mca_fcoll_dynamic_gen2_file_write_all, + NULL, /*iwrite_all */ + NULL, /* progress */ + NULL /* request_free */ +}; + +int +mca_fcoll_dynamic_gen2_component_init_query(bool enable_progress_threads, + bool enable_mpi_threads) +{ + /* Nothing to do */ + + return OMPI_SUCCESS; +} + +mca_fcoll_base_module_1_0_0_t * +mca_fcoll_dynamic_gen2_component_file_query (mca_io_ompio_file_t *fh, int *priority) +{ + *priority = mca_fcoll_dynamic_gen2_priority; + if (0 >= mca_fcoll_dynamic_gen2_priority) { + return NULL; + } + + if (mca_fcoll_base_query_table (fh, "dynamic_gen2")) { + if (*priority < 50) { + *priority = 50; + } + } + + return &dynamic_gen2; +} + +int mca_fcoll_dynamic_gen2_component_file_unquery (mca_io_ompio_file_t *file) +{ + /* This function might be needed for some purposes later. for now it + * does not have anything to do since there are no steps which need + * to be undone if this module is not selected */ + + return OMPI_SUCCESS; +} + +int mca_fcoll_dynamic_gen2_module_init (mca_io_ompio_file_t *file) +{ + return OMPI_SUCCESS; +} + + +int mca_fcoll_dynamic_gen2_module_finalize (mca_io_ompio_file_t *file) +{ + return OMPI_SUCCESS; +} diff --git a/ompi/mca/fcoll/dynamic_gen2/owner.txt b/ompi/mca/fcoll/dynamic_gen2/owner.txt new file mode 100644 index 0000000000..2e9726c28a --- /dev/null +++ b/ompi/mca/fcoll/dynamic_gen2/owner.txt @@ -0,0 +1,7 @@ +# +# owner/status file +# owner: institution that is responsible for this package +# status: e.g. active, maintenance, unmaintained +# +owner: UH +status: active