diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 46df8865ed..8f6e244aef 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -670,7 +670,6 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ data->bytes_to_write_in_cycle -= data->bytes_remaining; data->bytes_remaining = 0; // continue; -// break; } else { /* the remaining data from the previous cycle is larger than the @@ -803,122 +802,118 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ } } } - } - else{ -// continue; - return OMPI_SUCCESS; - } - /* Sort the displacements for each aggregator*/ - local_heap_sort (file_offsets_for_agg, - entries_per_aggregator, - sorted_file_offsets); - - /*create contiguous memory displacements - based on blocklens on the same displs array - and map it to this aggregator's actual - file-displacements (this is in the io-array created above)*/ - memory_displacements = (MPI_Aint *) malloc - (entries_per_aggregator * sizeof(MPI_Aint)); - - memory_displacements[sorted_file_offsets[0]] = 0; - for (i=1; iprocs_per_group * sizeof (int)); - if (NULL == temp_disp_index) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - /*Now update the displacements array with memory offsets*/ - global_count = 0; - for (i=0;idispls_per_process[temp_pindex][temp_disp_index[temp_pindex]] = - memory_displacements[sorted_file_offsets[i]]; - if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex]) - temp_disp_index[temp_pindex] += 1; - else{ - printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n", - temp_pindex, temp_disp_index[temp_pindex], - temp_pindex, data->disp_index[temp_pindex]); - } - global_count += - file_offsets_for_agg[sorted_file_offsets[i]].length; - } - - if (NULL != temp_disp_index){ - free(temp_disp_index); - temp_disp_index = NULL; - } - -#if DEBUG_ON - - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - for (i=0;iprocs_per_group; i++){ - for(j=0;jdisp_index[i];j++){ - if (data->blocklen_per_process[i][j] > 0){ - printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", - data->procs_in_group[i],j, - data->blocklen_per_process[i][j],j, - data->displs_per_process[i][j], - rank); - - } - } - } - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - for (i=0; iprocs_per_group; i++) { - size_t datatype_size; - data->recv_req[i] = MPI_REQUEST_NULL; - if ( 0 < data->disp_index[i] ) { - ompi_datatype_create_hindexed(data->disp_index[i], - data->blocklen_per_process[i], - data->displs_per_process[i], - MPI_BYTE, - &data->recvtype[i]); - ompi_datatype_commit(&data->recvtype[i]); - opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size); - if (datatype_size){ - ret = MCA_PML_CALL(irecv(data->global_buf, - 1, - data->recvtype[i], - data->procs_in_group[i], - 123, - data->comm, - &data->recv_req[i])); - if (OMPI_SUCCESS != ret){ - goto exit; + /* Sort the displacements for each aggregator*/ + local_heap_sort (file_offsets_for_agg, + entries_per_aggregator, + sorted_file_offsets); + + /*create contiguous memory displacements + based on blocklens on the same displs array + and map it to this aggregator's actual + file-displacements (this is in the io-array created above)*/ + memory_displacements = (MPI_Aint *) malloc + (entries_per_aggregator * sizeof(MPI_Aint)); + + memory_displacements[sorted_file_offsets[0]] = 0; + for (i=1; iprocs_per_group * sizeof (int)); + if (NULL == temp_disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + /*Now update the displacements array with memory offsets*/ + global_count = 0; + for (i=0;idispls_per_process[temp_pindex][temp_disp_index[temp_pindex]] = + memory_displacements[sorted_file_offsets[i]]; + if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex]) + temp_disp_index[temp_pindex] += 1; + else{ + printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n", + temp_pindex, temp_disp_index[temp_pindex], + temp_pindex, data->disp_index[temp_pindex]); + } + global_count += + file_offsets_for_agg[sorted_file_offsets[i]].length; + } + + if (NULL != temp_disp_index){ + free(temp_disp_index); + temp_disp_index = NULL; + } + +#if DEBUG_ON + + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1,rank); + for (i=0;iprocs_per_group; i++){ + for(j=0;jdisp_index[i];j++){ + if (data->blocklen_per_process[i][j] > 0){ + printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", + data->procs_in_group[i],j, + data->blocklen_per_process[i][j],j, + data->displs_per_process[i][j], + rank); + } } } - } - } /* end if (aggregator == rank ) */ - + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1,rank); + for (i=0; iprocs_per_group; i++) { + size_t datatype_size; + data->recv_req[i] = MPI_REQUEST_NULL; + if ( 0 < data->disp_index[i] ) { + ompi_datatype_create_hindexed(data->disp_index[i], + data->blocklen_per_process[i], + data->displs_per_process[i], + MPI_BYTE, + &data->recvtype[i]); + ompi_datatype_commit(&data->recvtype[i]); + opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size); + + if (datatype_size){ + ret = MCA_PML_CALL(irecv(data->global_buf, + 1, + data->recvtype[i], + data->procs_in_group[i], + 123, + data->comm, + &data->recv_req[i])); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + } + } + } /* end if (entries_per_aggr > 0 ) */ + }/* end if (aggregator == rank ) */ if ( data->sendbuf_is_contiguous ) { send_buf = &((char*)data->buf)[data->total_bytes_written]; @@ -991,7 +986,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ } } - if (aggregator == rank) { + if (aggregator == rank && entries_per_aggregator > 0 ) { ret = ompi_request_wait_all (data->procs_per_group, data->recv_req, MPI_STATUS_IGNORE); @@ -1025,7 +1020,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_ *** 7f. Create the io array, and pass it to fbtl *********************************************************/ - if (aggregator == rank) { + if (aggregator == rank && entries_per_aggregator>0) { #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_write_time = MPI_Wtime();