1
1
Этот коммит содержится в:
Edgar Gabriel 2016-01-14 15:09:50 -06:00
родитель 2bdd6ba17a
Коммит 2bcae84e11

Просмотреть файл

@ -670,7 +670,6 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
data->bytes_to_write_in_cycle -= data->bytes_remaining;
data->bytes_remaining = 0;
// continue;
// break;
}
else {
/* the remaining data from the previous cycle is larger than the
@ -803,122 +802,118 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
}
}
}
}
else{
// continue;
return OMPI_SUCCESS;
}
/* Sort the displacements for each aggregator*/
local_heap_sort (file_offsets_for_agg,
entries_per_aggregator,
sorted_file_offsets);
/*create contiguous memory displacements
based on blocklens on the same displs array
and map it to this aggregator's actual
file-displacements (this is in the io-array created above)*/
memory_displacements = (MPI_Aint *) malloc
(entries_per_aggregator * sizeof(MPI_Aint));
memory_displacements[sorted_file_offsets[0]] = 0;
for (i=1; i<entries_per_aggregator; i++){
memory_displacements[sorted_file_offsets[i]] =
memory_displacements[sorted_file_offsets[i-1]] +
file_offsets_for_agg[sorted_file_offsets[i-1]].length;
}
temp_disp_index = (int *)calloc (1, data->procs_per_group * sizeof (int));
if (NULL == temp_disp_index) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
/*Now update the displacements array with memory offsets*/
global_count = 0;
for (i=0;i<entries_per_aggregator;i++){
temp_pindex =
file_offsets_for_agg[sorted_file_offsets[i]].process_id;
data->displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] =
memory_displacements[sorted_file_offsets[i]];
if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex])
temp_disp_index[temp_pindex] += 1;
else{
printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
temp_pindex, temp_disp_index[temp_pindex],
temp_pindex, data->disp_index[temp_pindex]);
}
global_count +=
file_offsets_for_agg[sorted_file_offsets[i]].length;
}
if (NULL != temp_disp_index){
free(temp_disp_index);
temp_disp_index = NULL;
}
#if DEBUG_ON
printf("************Cycle: %d, Aggregator: %d ***************\n",
index+1,rank);
for (i=0;i<data->procs_per_group; i++){
for(j=0;j<data->disp_index[i];j++){
if (data->blocklen_per_process[i][j] > 0){
printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
data->procs_in_group[i],j,
data->blocklen_per_process[i][j],j,
data->displs_per_process[i][j],
rank);
}
}
}
printf("************Cycle: %d, Aggregator: %d ***************\n",
index+1,rank);
for (i=0; i<entries_per_aggregator;i++){
printf("%d: OFFSET: %lld LENGTH: %ld, Mem-offset: %ld\n",
file_offsets_for_agg[sorted_file_offsets[i]].process_id,
file_offsets_for_agg[sorted_file_offsets[i]].offset,
file_offsets_for_agg[sorted_file_offsets[i]].length,
memory_displacements[sorted_file_offsets[i]]);
}
printf("%d : global_count : %ld, bytes_sent : %d\n",
rank,global_count, bytes_sent);
#endif
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_comm_time = MPI_Wtime();
#endif
/*************************************************************************
*** 7e. Perform the actual communication
*************************************************************************/
for (i=0;i<data->procs_per_group; i++) {
size_t datatype_size;
data->recv_req[i] = MPI_REQUEST_NULL;
if ( 0 < data->disp_index[i] ) {
ompi_datatype_create_hindexed(data->disp_index[i],
data->blocklen_per_process[i],
data->displs_per_process[i],
MPI_BYTE,
&data->recvtype[i]);
ompi_datatype_commit(&data->recvtype[i]);
opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size);
if (datatype_size){
ret = MCA_PML_CALL(irecv(data->global_buf,
1,
data->recvtype[i],
data->procs_in_group[i],
123,
data->comm,
&data->recv_req[i]));
if (OMPI_SUCCESS != ret){
goto exit;
/* Sort the displacements for each aggregator*/
local_heap_sort (file_offsets_for_agg,
entries_per_aggregator,
sorted_file_offsets);
/*create contiguous memory displacements
based on blocklens on the same displs array
and map it to this aggregator's actual
file-displacements (this is in the io-array created above)*/
memory_displacements = (MPI_Aint *) malloc
(entries_per_aggregator * sizeof(MPI_Aint));
memory_displacements[sorted_file_offsets[0]] = 0;
for (i=1; i<entries_per_aggregator; i++){
memory_displacements[sorted_file_offsets[i]] =
memory_displacements[sorted_file_offsets[i-1]] +
file_offsets_for_agg[sorted_file_offsets[i-1]].length;
}
temp_disp_index = (int *)calloc (1, data->procs_per_group * sizeof (int));
if (NULL == temp_disp_index) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
/*Now update the displacements array with memory offsets*/
global_count = 0;
for (i=0;i<entries_per_aggregator;i++){
temp_pindex =
file_offsets_for_agg[sorted_file_offsets[i]].process_id;
data->displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] =
memory_displacements[sorted_file_offsets[i]];
if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex])
temp_disp_index[temp_pindex] += 1;
else{
printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
temp_pindex, temp_disp_index[temp_pindex],
temp_pindex, data->disp_index[temp_pindex]);
}
global_count +=
file_offsets_for_agg[sorted_file_offsets[i]].length;
}
if (NULL != temp_disp_index){
free(temp_disp_index);
temp_disp_index = NULL;
}
#if DEBUG_ON
printf("************Cycle: %d, Aggregator: %d ***************\n",
index+1,rank);
for (i=0;i<data->procs_per_group; i++){
for(j=0;j<data->disp_index[i];j++){
if (data->blocklen_per_process[i][j] > 0){
printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
data->procs_in_group[i],j,
data->blocklen_per_process[i][j],j,
data->displs_per_process[i][j],
rank);
}
}
}
}
} /* end if (aggregator == rank ) */
printf("************Cycle: %d, Aggregator: %d ***************\n",
index+1,rank);
for (i=0; i<entries_per_aggregator;i++){
printf("%d: OFFSET: %lld LENGTH: %ld, Mem-offset: %ld\n",
file_offsets_for_agg[sorted_file_offsets[i]].process_id,
file_offsets_for_agg[sorted_file_offsets[i]].offset,
file_offsets_for_agg[sorted_file_offsets[i]].length,
memory_displacements[sorted_file_offsets[i]]);
}
printf("%d : global_count : %ld, bytes_sent : %d\n",
rank,global_count, bytes_sent);
#endif
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_comm_time = MPI_Wtime();
#endif
/*************************************************************************
*** 7e. Perform the actual communication
*************************************************************************/
for (i=0;i<data->procs_per_group; i++) {
size_t datatype_size;
data->recv_req[i] = MPI_REQUEST_NULL;
if ( 0 < data->disp_index[i] ) {
ompi_datatype_create_hindexed(data->disp_index[i],
data->blocklen_per_process[i],
data->displs_per_process[i],
MPI_BYTE,
&data->recvtype[i]);
ompi_datatype_commit(&data->recvtype[i]);
opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size);
if (datatype_size){
ret = MCA_PML_CALL(irecv(data->global_buf,
1,
data->recvtype[i],
data->procs_in_group[i],
123,
data->comm,
&data->recv_req[i]));
if (OMPI_SUCCESS != ret){
goto exit;
}
}
}
}
} /* end if (entries_per_aggr > 0 ) */
}/* end if (aggregator == rank ) */
if ( data->sendbuf_is_contiguous ) {
send_buf = &((char*)data->buf)[data->total_bytes_written];
@ -991,7 +986,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
}
}
if (aggregator == rank) {
if (aggregator == rank && entries_per_aggregator > 0 ) {
ret = ompi_request_wait_all (data->procs_per_group,
data->recv_req,
MPI_STATUS_IGNORE);
@ -1025,7 +1020,7 @@ static int subroutine ( int index, int cycles, int aggregator, int rank, mca_io_
*** 7f. Create the io array, and pass it to fbtl
*********************************************************/
if (aggregator == rank) {
if (aggregator == rank && entries_per_aggregator>0) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();