# Extracting timing information for individual components of collective algorithm using a generic queue.
# This is triggered based on a mca-paramater and can be used with all collective modules. # Individual queues maintained for read and write. # The additional communication to combine data is done at file-close so that the actual timing of collective-operations will not get affected. # The queues are initialized in file-open This commit was SVN r27439.
Этот коммит содержится в:
родитель
2ab48b997e
Коммит
9eeb3b5d50
@ -51,10 +51,14 @@
|
||||
|
||||
#include "io_ompio.h"
|
||||
|
||||
print_queue *coll_write_time=NULL;
|
||||
print_queue *coll_read_time=NULL;
|
||||
|
||||
|
||||
int ompi_io_ompio_set_file_defaults (mca_io_ompio_file_t *fh)
|
||||
{
|
||||
|
||||
if (NULL != fh) {
|
||||
if (NULL != fh) {
|
||||
ompi_datatype_t *types[2], *default_file_view;
|
||||
int blocklen[2] = {1, 1};
|
||||
OPAL_PTRDIFF_TYPE d[2], base;
|
||||
@ -1991,4 +1995,188 @@ int ompi_io_ompio_scatter_data (mca_io_ompio_file_t *fh,
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Print queue related function implementations */
|
||||
|
||||
|
||||
int ompi_io_ompio_initialize_print_queue(print_queue *q){
|
||||
|
||||
q->first = 0;
|
||||
q->last = QUEUESIZE - 1;
|
||||
q->count = 0;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
int ompi_io_ompio_register_print_entry (print_queue *q,
|
||||
print_entry x){
|
||||
|
||||
if (q->count >= QUEUESIZE){
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
else{
|
||||
q->last = (q->last + 1) % QUEUESIZE;
|
||||
q->entry[q->last] = x;
|
||||
q->count = q->count + 1;
|
||||
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_io_ompio_unregister_print_entry (print_queue *q, print_entry *x){
|
||||
|
||||
if (q->count <= 0){
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
else{
|
||||
*x = q->entry[q->first];
|
||||
q->first = (q->first+1) % QUEUESIZE;
|
||||
q->count = q->count - 1;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_io_ompio_empty_print_queue(print_queue *q){
|
||||
|
||||
if (q->count <= 0){
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ompi_io_ompio_full_print_queue(print_queue *q){
|
||||
|
||||
if (q->count < QUEUESIZE)
|
||||
return 0;
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
int ompi_io_ompio_print_time_info(print_queue *q,
|
||||
char *name,
|
||||
mca_io_ompio_file_t *fh){
|
||||
|
||||
int i = 0, j=0, nprocs_for_coll = 0, ret = OMPI_SUCCESS, count = 0;
|
||||
double *time_details = NULL, *final_sum = NULL;
|
||||
double *final_max = NULL, *final_min = NULL;
|
||||
double *final_time_details=NULL;
|
||||
|
||||
|
||||
|
||||
nprocs_for_coll = q->entry[0].nprocs_for_coll;
|
||||
time_details = (double *) malloc (4*sizeof(double));
|
||||
if ( NULL == time_details){
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
|
||||
}
|
||||
|
||||
if (!fh->f_rank){
|
||||
|
||||
final_min = (double *) malloc (3*sizeof(double));
|
||||
if ( NULL == final_min){
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
final_max = (double *) malloc (3*sizeof(double));
|
||||
if ( NULL == final_max){
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
|
||||
}
|
||||
|
||||
final_sum = (double *) malloc (3*sizeof(double));
|
||||
if ( NULL == final_sum){
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
final_time_details =
|
||||
(double *)malloc
|
||||
(fh->f_size * 4 * sizeof(double));
|
||||
if (NULL == final_time_details){
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < 4; i++){
|
||||
time_details[i] = 0.0;
|
||||
}
|
||||
|
||||
if (q->count > 0){
|
||||
for (i=0; i < q->count; i++){
|
||||
for (j=0;j<3;j++){
|
||||
if (!fh->f_rank){
|
||||
final_min[j] = 100000.0;
|
||||
final_max[j] = 0.0;
|
||||
final_sum[j] = 0.0;
|
||||
}
|
||||
time_details[j] += q->entry[i].time[j];
|
||||
}
|
||||
time_details[3] = q->entry[i].aggregator;
|
||||
}
|
||||
}
|
||||
|
||||
fh->f_comm->c_coll.coll_gather(time_details,
|
||||
4,
|
||||
MPI_DOUBLE,
|
||||
final_time_details,
|
||||
4,
|
||||
MPI_DOUBLE,
|
||||
0,
|
||||
fh->f_comm,
|
||||
fh->f_comm->c_coll.coll_gather_module);
|
||||
|
||||
|
||||
|
||||
if (!fh->f_rank){
|
||||
|
||||
count = 4 * fh->f_size;
|
||||
|
||||
for (i=0;i<count;i+=4){
|
||||
if (final_time_details[i+3] == 1){
|
||||
for (j=i; j<i+3;j++){
|
||||
final_sum[j-i] += final_time_details[j];
|
||||
if ( final_time_details[j] < final_min[j-i])
|
||||
final_min[j-i] = final_time_details[j];
|
||||
if ( final_time_details[j] > final_max[j-i])
|
||||
final_max[j-i] = final_time_details[j];
|
||||
}
|
||||
}
|
||||
}
|
||||
printf ("\n# MAX-%s AVG-%s MIN-%s MAX-COMM AVG-COMM MIN-COMM",
|
||||
name, name, name);
|
||||
printf (" MAX-EXCH AVG-EXCH MIN-EXCH\n");
|
||||
printf (" %f %f %f %f %f %f %f %f %f\n\n",
|
||||
final_max[0], final_sum[0]/nprocs_for_coll, final_min[0],
|
||||
final_max[1], final_sum[1]/nprocs_for_coll, final_min[1],
|
||||
final_max[2], final_sum[2]/nprocs_for_coll, final_min[2]);
|
||||
|
||||
}
|
||||
|
||||
exit:
|
||||
if ( NULL != final_max){
|
||||
free(final_max);
|
||||
final_max = NULL;
|
||||
}
|
||||
if (NULL != final_min){
|
||||
free(final_min);
|
||||
final_min = NULL;
|
||||
}
|
||||
if (NULL != final_sum){
|
||||
free(final_sum);
|
||||
final_sum = NULL;
|
||||
}
|
||||
if (NULL != time_details){
|
||||
free(time_details);
|
||||
time_details = NULL;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
@ -42,6 +42,7 @@
|
||||
extern int mca_io_ompio_cycle_buffer_size;
|
||||
extern int mca_io_ompio_bytes_per_agg;
|
||||
extern int mca_io_ompio_record_offset_info;
|
||||
extern int mca_io_ompio_coll_timing_info;
|
||||
|
||||
/*
|
||||
* Flags
|
||||
@ -52,6 +53,7 @@ extern int mca_io_ompio_record_offset_info;
|
||||
#define OMPIO_FILE_VIEW_IS_SET 0x00000008
|
||||
#define OMPIO_CONTIGUOUS_FVIEW 0x00000010
|
||||
#define OMPIO_AGGREGATOR_IS_SET 0x00000020
|
||||
#define QUEUESIZE 2048
|
||||
|
||||
#define OMPIO_MIN(a, b) (((a) < (b)) ? (a) : (b))
|
||||
#define OMPIO_MAX(a, b) (((a) < (b)) ? (b) : (a))
|
||||
@ -121,6 +123,20 @@ typedef struct mca_io_ompio_offlen_array_t{
|
||||
int process_id;
|
||||
}mca_io_ompio_offlen_array_t;
|
||||
|
||||
/*To extract time-information */
|
||||
typedef struct {
|
||||
double time[3];
|
||||
int nprocs_for_coll;
|
||||
int aggregator;
|
||||
}print_entry;
|
||||
|
||||
typedef struct {
|
||||
print_entry entry[QUEUESIZE + 1];
|
||||
int first;
|
||||
int last;
|
||||
int count;
|
||||
} print_queue;
|
||||
|
||||
|
||||
/**
|
||||
* Back-end structure for MPI_File
|
||||
@ -194,6 +210,10 @@ struct mca_io_ompio_data_t {
|
||||
};
|
||||
typedef struct mca_io_ompio_data_t mca_io_ompio_data_t;
|
||||
|
||||
extern print_queue *coll_write_time;
|
||||
extern print_queue *coll_read_time;
|
||||
|
||||
|
||||
OMPI_DECLSPEC int ompi_io_ompio_set_file_defaults (mca_io_ompio_file_t *fh);
|
||||
|
||||
/*
|
||||
@ -353,6 +373,21 @@ OMPI_DECLSPEC int ompi_io_ompio_bcast_array (void *buff,
|
||||
int procs_per_group,
|
||||
ompi_communicator_t *comm);
|
||||
|
||||
OMPI_DECLSPEC int ompi_io_ompio_register_print_entry (print_queue *q,
|
||||
print_entry x);
|
||||
|
||||
OMPI_DECLSPEC int ompi_io_ompio_unregister_print_entry (print_queue *q, print_entry *x);
|
||||
|
||||
OMPI_DECLSPEC int ompi_io_ompio_empty_print_queue(print_queue *q);
|
||||
|
||||
OMPI_DECLSPEC int ompi_io_ompio_full_print_queue(print_queue *q);
|
||||
|
||||
OMPI_DECLSPEC int ompi_io_ompio_initialize_print_queue(print_queue *q);
|
||||
|
||||
OMPI_DECLSPEC int ompi_io_ompio_print_time_info(print_queue *q,
|
||||
char *name_operation,
|
||||
mca_io_ompio_file_t *fh);
|
||||
|
||||
|
||||
/*
|
||||
* ******************************************************************
|
||||
|
@ -29,7 +29,7 @@
|
||||
int mca_io_ompio_cycle_buffer_size = OMPIO_PREALLOC_MAX_BUF_SIZE;
|
||||
int mca_io_ompio_bytes_per_agg = OMPIO_PREALLOC_MAX_BUF_SIZE;
|
||||
int mca_io_ompio_record_offset_info = 0;
|
||||
|
||||
int mca_io_ompio_coll_timing_info = 0;
|
||||
|
||||
|
||||
/*
|
||||
@ -157,6 +157,12 @@ static int open_component(void)
|
||||
false, false, mca_io_ompio_record_offset_info,
|
||||
&mca_io_ompio_record_offset_info);
|
||||
|
||||
mca_base_param_reg_int (&mca_io_ompio_component.io_version,
|
||||
"coll_timing_info",
|
||||
"Enable collective algorithm timing information",
|
||||
false, false, mca_io_ompio_coll_timing_info,
|
||||
&mca_io_ompio_coll_timing_info);
|
||||
|
||||
|
||||
mca_base_param_reg_int (&mca_io_ompio_component.io_version,
|
||||
"cycle_buffer_size",
|
||||
|
@ -43,6 +43,7 @@ mca_io_ompio_file_open (ompi_communicator_t *comm,
|
||||
mca_io_ompio_data_t *data=NULL;
|
||||
int remote_arch;
|
||||
|
||||
|
||||
if ( ((amode&MPI_MODE_RDONLY)?1:0) + ((amode&MPI_MODE_RDWR)?1:0) +
|
||||
((amode&MPI_MODE_WRONLY)?1:0) != 1 ) {
|
||||
return MPI_ERR_AMODE;
|
||||
@ -82,9 +83,15 @@ mca_io_ompio_file_open (ompi_communicator_t *comm,
|
||||
data->ompio_fh.f_atomicity = 0;
|
||||
|
||||
ompi_io_ompio_set_file_defaults ( &data->ompio_fh);
|
||||
|
||||
data->ompio_fh.f_filename = fh->f_filename;
|
||||
|
||||
/*Initialize the print_queues queues here!*/
|
||||
coll_write_time = (print_queue *) malloc (sizeof(print_queue));
|
||||
coll_read_time = (print_queue *) malloc (sizeof(print_queue));
|
||||
|
||||
ompi_io_ompio_initialize_print_queue(coll_write_time);
|
||||
ompi_io_ompio_initialize_print_queue(coll_read_time);
|
||||
|
||||
/*
|
||||
if (MPI_INFO_NULL != info) {
|
||||
ret = ompi_info_dup (info, &data->ompio_fh.f_info);
|
||||
@ -160,6 +167,7 @@ mca_io_ompio_file_close (ompi_file_t *fh)
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_io_ompio_data_t *data;
|
||||
int delete_flag = 0;
|
||||
char name[256];
|
||||
|
||||
data = (mca_io_ompio_data_t *) fh->f_io_selected_data;
|
||||
if ( NULL == data ) {
|
||||
@ -167,6 +175,26 @@ mca_io_ompio_file_close (ompi_file_t *fh)
|
||||
return ret;
|
||||
}
|
||||
|
||||
if(mca_io_ompio_coll_timing_info){
|
||||
strcpy (name, "WRITE");
|
||||
if (!ompi_io_ompio_empty_print_queue(coll_write_time)){
|
||||
ret = ompi_io_ompio_print_time_info(coll_write_time,
|
||||
name,
|
||||
&data->ompio_fh);
|
||||
if (OMPI_SUCCESS != ret){
|
||||
printf("Error in print_time_info ");
|
||||
}
|
||||
}
|
||||
strcpy (name, "READ");
|
||||
if (!ompi_io_ompio_empty_print_queue(coll_read_time)){
|
||||
ret = ompi_io_ompio_print_time_info(coll_read_time,
|
||||
name,
|
||||
&data->ompio_fh);
|
||||
if (OMPI_SUCCESS != ret){
|
||||
printf("Error in print_time_info ");
|
||||
}
|
||||
}
|
||||
}
|
||||
if ( data->ompio_fh.f_amode & MPI_MODE_DELETE_ON_CLOSE ) {
|
||||
delete_flag = 1;
|
||||
}
|
||||
@ -176,6 +204,8 @@ mca_io_ompio_file_close (ompi_file_t *fh)
|
||||
mca_io_ompio_file_delete ( data->ompio_fh.f_filename, MPI_INFO_NULL );
|
||||
}
|
||||
|
||||
|
||||
|
||||
mca_fs_base_file_unselect (&data->ompio_fh);
|
||||
mca_fbtl_base_file_unselect (&data->ompio_fh);
|
||||
mca_fcoll_base_file_unselect (&data->ompio_fh);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user