fcoll/dynamic_gen2: Adjustment of displacement index in collective write
Within the shuffle iteration, the aggregators have to set a displacement array needed to receive data from other processes. The array had 1 extra element. We adjust the displacement index to match the number of elements. Signed-off-by: raafatfeki <fekiraafat@gmail.com>
Этот коммит содержится в:
родитель
f45e9cfdbe
Коммит
2c6a5eed29
@ -801,7 +801,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
}
|
}
|
||||||
|
|
||||||
for(l=0;l<data->procs_per_group;l++){
|
for(l=0;l<data->procs_per_group;l++){
|
||||||
data->disp_index[l] = 1;
|
data->disp_index[l] = 0;
|
||||||
|
|
||||||
if(data->max_disp_index[l] == 0) {
|
if(data->max_disp_index[l] == 0) {
|
||||||
data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int));
|
data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int));
|
||||||
@ -880,8 +880,8 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
if (data->bytes_remaining <= data->bytes_to_write_in_cycle) {
|
if (data->bytes_remaining <= data->bytes_to_write_in_cycle) {
|
||||||
/* The data fits completely into the block */
|
/* The data fits completely into the block */
|
||||||
if (aggregator == rank) {
|
if (aggregator == rank) {
|
||||||
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_remaining;
|
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_remaining;
|
||||||
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
|
data->displs_per_process[data->n][data->disp_index[data->n]] =
|
||||||
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
|
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
|
||||||
(data->global_iov_array[data->sorted[data->current_index]].iov_len
|
(data->global_iov_array[data->sorted[data->current_index]].iov_len
|
||||||
- data->bytes_remaining);
|
- data->bytes_remaining);
|
||||||
@ -914,11 +914,12 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
/* the remaining data from the previous cycle is larger than the
|
/* the remaining data from the previous cycle is larger than the
|
||||||
data->bytes_to_write_in_cycle, so we have to segment again */
|
data->bytes_to_write_in_cycle, so we have to segment again */
|
||||||
if (aggregator == rank) {
|
if (aggregator == rank) {
|
||||||
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
|
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle;
|
||||||
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
|
data->displs_per_process[data->n][data->disp_index[data->n]] =
|
||||||
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
|
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
|
||||||
(data->global_iov_array[data->sorted[data->current_index]].iov_len
|
(data->global_iov_array[data->sorted[data->current_index]].iov_len
|
||||||
- data->bytes_remaining);
|
- data->bytes_remaining);
|
||||||
|
data->disp_index[data->n] += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data->procs_in_group[data->n] == rank) {
|
if (data->procs_in_group[data->n] == rank) {
|
||||||
@ -935,9 +936,10 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
(MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) {
|
(MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) {
|
||||||
/* This entry has more data than we can sendin one cycle */
|
/* This entry has more data than we can sendin one cycle */
|
||||||
if (aggregator == rank) {
|
if (aggregator == rank) {
|
||||||
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
|
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle;
|
||||||
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
|
data->displs_per_process[data->n][data->disp_index[data->n]] =
|
||||||
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ;
|
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ;
|
||||||
|
data->disp_index[data->n] += 1;
|
||||||
}
|
}
|
||||||
if (data->procs_in_group[data->n] == rank) {
|
if (data->procs_in_group[data->n] == rank) {
|
||||||
bytes_sent += data->bytes_to_write_in_cycle;
|
bytes_sent += data->bytes_to_write_in_cycle;
|
||||||
@ -951,9 +953,9 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
|
|||||||
else {
|
else {
|
||||||
/* Next data entry is less than data->bytes_to_write_in_cycle */
|
/* Next data entry is less than data->bytes_to_write_in_cycle */
|
||||||
if (aggregator == rank) {
|
if (aggregator == rank) {
|
||||||
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] =
|
data->blocklen_per_process[data->n][data->disp_index[data->n]] =
|
||||||
data->global_iov_array[data->sorted[data->current_index]].iov_len;
|
data->global_iov_array[data->sorted[data->current_index]].iov_len;
|
||||||
data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t)
|
data->displs_per_process[data->n][data->disp_index[data->n]] = (ptrdiff_t)
|
||||||
data->global_iov_array[data->sorted[data->current_index]].iov_base;
|
data->global_iov_array[data->sorted[data->current_index]].iov_base;
|
||||||
|
|
||||||
data->disp_index[data->n] += 1;
|
data->disp_index[data->n] += 1;
|
||||||
|
Загрузка…
Ссылка в новой задаче
Block a user