1
1

1. Freeing the displs array after allgatherv to avoid segmentation faults in dynamic segmentation

2. Checking for 0 bytes datatypes  and sending only when data available to avoid 0 byte messages being sent and received. 
3. Changing timing extraction to support calculating, min, max and avg communication costs + min and avg write costs

This commit was SVN r26450.
Этот коммит содержится в:
Vishwanath Venkatesan 2012-05-18 21:39:58 +00:00
родитель b16e43f489
Коммит cbad31cc88

Просмотреть файл

@ -94,13 +94,17 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
ompi_datatype_t **recvtype = NULL;
MPI_Aint *total_bytes_per_process = NULL;
MPI_Request *send_req=NULL, *recv_req=NULL;
int datatype_size, recv_req_count=0;
#if TIME_BREAKDOWN
double start_time=0.0, end_time=0.0, start_time2=0.0, end_time2=0.0;
double total=0.0 , total_io=0.0, max_pp=0.0, max_io=0.0;
double start_ptime=0.0, end_ptime=0.0, tpw=0.0, max_tpw=0.0;
double start_cio_array=0.0, end_cio_array=0.0, tcio_array=0.0, max_cio=0.0;
double start_sr=0.0, end_sr=0.0, tsr=0.0, max_sr=0.0;
double total=0.0 , total_io=0.0, max_io=0.0; /* max_pp=0.0;*/
double start_ptime=0.0, end_ptime=0.0, tpw=0.0; /* max_tpw=0.0;*/
double start_cio_array=0.0, end_cio_array=0.0, tcio_array=0.0;/* max_cio=0.0;*/
double start_sr=0.0, end_sr=0.0, tsr=0.0;/* max_sr=0.0;*/
double comm_time = 0.0, max_comm_time=0.0;
double write_time = 0.0, max_write_time=0.0;
#endif
#if TIME_BREAKDOWN
@ -132,14 +136,18 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
if (! (fh->f_flags & OMPIO_AGGREGATOR_IS_SET)) {
ret = ompi_io_ompio_set_aggregator_props (fh,
mca_fcoll_dynamic_num_io_procs,
max_data);
if (OMPI_SUCCESS != ret){
goto exit;
}
ret = ompi_io_ompio_set_aggregator_props (fh,
mca_fcoll_dynamic_num_io_procs,
max_data);
if (OMPI_SUCCESS != ret){
goto exit;
}
}
if (-1 == mca_fcoll_dynamic_num_io_procs) {
mca_fcoll_dynamic_num_io_procs = 1;
}
#if TIME_BREAKDOWN
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
@ -256,10 +264,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
#endif
/* allocate the global iovec */
if (NULL != displs){
free(displs);
displs=NULL;
}
if (0 != total_fview_count) {
global_iov_array = (struct iovec*) malloc (total_fview_count *
@ -319,6 +323,12 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
free(local_iov_array);
local_iov_array = NULL;
}
if (NULL != displs){
free(displs);
displs=NULL;
}
#if DEBUG_ON
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
@ -364,6 +374,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
bytes_per_cycle = mca_fcoll_dynamic_cycle_buffer_size;
cycles = ceil((double)total_bytes/bytes_per_cycle);
n = 0;
bytes_remaining = 0;
current_index = 0;
@ -463,7 +474,7 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
/* The blocklen and displs calculation only done at aggregators!*/
#if TIME_BREAKDOWN
start_cio_array = MPI_Wtime();
start_cio_array = MPI_Wtime();
#endif
while (bytes_to_write_in_cycle) {
@ -582,6 +593,9 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
}
#if TIME_BREAKDOWN
start_sr = MPI_Wtime();
#endif
/* Calculate the displacement on where to put the data and allocate
the recieve buffer (global_buf) */
@ -730,25 +744,25 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
recv_req = (MPI_Request *)
malloc (fh->f_procs_per_group * sizeof(MPI_Request));
if (NULL == recv_req){
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
recv_req_count = 0;
for (i=0;i<fh->f_procs_per_group; i++){
ompi_datatype_create_hindexed(disp_index[i],
blocklen_per_process[i],
displs_per_process[i],
MPI_BYTE,
&recvtype[i]);
ompi_datatype_commit(&recvtype[i]);
ompi_datatype_create_hindexed(disp_index[i],
blocklen_per_process[i],
displs_per_process[i],
MPI_BYTE,
&recvtype[i]);
ompi_datatype_commit(&recvtype[i]);
MPI_Type_size (recvtype[i],
&datatype_size);
if (datatype_size){
recv_req = (MPI_Request *)realloc
((void *)recv_req, (recv_req_count + 1)*sizeof(MPI_Request));
ret = MCA_PML_CALL(irecv(global_buf,
1,
@ -756,12 +770,14 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
fh->f_procs_in_group[i],
123,
fh->f_comm,
&recv_req[i]));
if (OMPI_SUCCESS != ret){
goto exit;
}
&recv_req[recv_req_count]));
recv_req_count++;
}
if (OMPI_SUCCESS != ret){
goto exit;
}
}
}
}
@ -828,37 +844,37 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
goto exit;
}
ret = MCA_PML_CALL(isend(send_buf,
bytes_sent,
MPI_BYTE,
fh->f_procs_in_group[fh->f_aggregator_index],
123,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
send_req));
if ( OMPI_SUCCESS != ret ){
goto exit;
}
if (bytes_sent){
#if TIME_BREAKDOWN
start_sr = MPI_Wtime();
#endif
ret = MCA_PML_CALL(isend(send_buf,
bytes_sent,
MPI_BYTE,
fh->f_procs_in_group[fh->f_aggregator_index],
123,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
send_req));
ret = ompi_request_wait (send_req, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
ret = ompi_request_wait_all (fh->f_procs_per_group,
recv_req,
MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
if ( OMPI_SUCCESS != ret ){
goto exit;
}
ret = ompi_request_wait(send_req, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
}
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank) {
ret = ompi_request_wait_all (recv_req_count,
recv_req,
MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
}
}
#if DEBUG_ON
if (fh->f_procs_in_group[fh->f_aggregator_index] == fh->f_rank){
printf("************Cycle: %d, Aggregator: %d ***************\n",
@ -871,13 +887,23 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
#if TIME_BREAKDOWN
end_sr = MPI_Wtime();
tsr = end_sr - start_sr;
comm_time += tsr;
#endif
/**********************************************************
**************** DONE GATHERING OF DATA ******************
*********************************************************/
if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
if (NULL != send_buf) {
free (send_buf);
send_buf = NULL;
}
}
/**********************************************************
**************** DONE GATHERING OF DATA ******************
*********************************************************/
/**********************************************************
******* Create the io array, and pass it to fbtl *********
*********************************************************/
@ -946,13 +972,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
#if TIME_BREAKDOWN
end_ptime = MPI_Wtime();
tpw = end_ptime - start_ptime;
write_time += tpw;
#endif
}
if (NULL != send_req){
free(send_req);
send_req = NULL;
@ -986,14 +1010,6 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
total_io = end_time2-start_time2;
fh->f_comm->c_coll.coll_allreduce (&total,
&max_pp,
1,
MPI_DOUBLE,
MPI_MAX,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
fh->f_comm->c_coll.coll_allreduce (&total_io,
&max_io,
1,
@ -1001,25 +1017,47 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
MPI_MAX,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
fh->f_comm->c_coll.coll_allreduce (&comm_time,
&max_comm_time,
1,
MPI_DOUBLE,
MPI_SUM,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
fh->f_comm->c_coll.coll_allreduce (&tcio_array,
&max_cio,
fh->f_comm->c_coll.coll_allreduce (&write_time,
&max_write_time,
1,
MPI_DOUBLE,
MPI_SUM,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
if (0 == fh->f_rank){
printf ("Max Exchange and write ---- %f\n", max_io);
printf ("AVG pwrite time : %f \n", max_write_time/mca_fcoll_dynamic_num_io_procs);
printf ("AVG communication time : %f\n", max_comm_time/fh->f_size);
}
fh->f_comm->c_coll.coll_allreduce (&comm_time,
&max_comm_time,
1,
MPI_DOUBLE,
MPI_MAX,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
fh->f_comm->c_coll.coll_allreduce (&tpw,
&max_tpw,
1,
MPI_DOUBLE,
MPI_MAX,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
fh->f_comm->c_coll.coll_allreduce (&tsr,
&max_sr,
fh->f_comm->c_coll.coll_allreduce (&write_time,
&max_write_time,
1,
MPI_DOUBLE,
MPI_MAX,
@ -1028,11 +1066,23 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
if (0 == fh->f_rank){
printf ("Preprocessing --- %f Max Exchange and write ---- %f\n", max_pp, max_io);
printf ("Calculate Offsets and create receive buf : %f\n", max_cio);
printf ("sendbuf + send recv : %f\n", max_sr) ;
printf ("max pwrite time : %f \n", max_tpw);
printf ("MAX pwrite time : %f \n", max_write_time);
printf ("MAX communication time : %f\n", max_comm_time);
}
fh->f_comm->c_coll.coll_allreduce (&comm_time,
&max_comm_time,
1,
MPI_DOUBLE,
MPI_MIN,
fh->f_comm,
fh->f_comm->c_coll.coll_allreduce_module);
if (0 == fh->f_rank){
printf ("MIN communication time : %f\n", max_comm_time);
}
#endif