1
1
Этот коммит содержится в:
Edgar Gabriel 2016-07-06 12:31:49 -05:00
родитель 06930a0423
Коммит 1f1504ebbb
2 изменённых файлов: 0 добавлений и 780 удалений

Просмотреть файл

@ -1165,751 +1165,6 @@ int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh,
return 1;
}
int ompi_io_ompio_distribute_file_view (mca_io_ompio_file_t *fh,
struct iovec *broken_iov,
int broken_count,
int num_aggregators,
size_t stripe_size,
int **fview_count,
struct iovec **iov,
int *count)
{
int *num_entries;
int *broken_index;
int temp = 0;
int *fview_cnt = NULL;
int global_fview_count = 0;
int i = 0;
int *displs = NULL;
int rc = OMPI_SUCCESS;
struct iovec *global_fview = NULL;
struct iovec **broken = NULL;
MPI_Request *req=NULL, *sendreq=NULL;
num_entries = (int *) malloc (sizeof (int) * num_aggregators);
if (NULL == num_entries) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
broken_index = (int *) malloc (sizeof (int) * num_aggregators);
if (NULL == broken_index) {
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memset (num_entries, 0x0, num_aggregators * sizeof (int));
memset (broken_index, 0x0, num_aggregators * sizeof (int));
/* calculate how many entries in the broken iovec belong to each aggregator */
for (i=0 ; i<broken_count ; i++) {
temp = (int)((OPAL_PTRDIFF_TYPE)broken_iov[i].iov_base/stripe_size) %
num_aggregators;
num_entries [temp] ++;
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
fview_cnt = (int *) malloc (sizeof (int) * fh->f_size);
if (NULL == fview_cnt) {
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
free(broken_index);
return OMPI_ERR_OUT_OF_RESOURCE;
}
req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request));
if (NULL == req) {
free(num_entries);
free(broken_index);
free(fview_cnt);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
sendreq = (MPI_Request *)malloc (num_aggregators * sizeof(MPI_Request));
if (NULL == sendreq) {
free(num_entries);
free(broken_index);
if (0 == fh->f_rank%fh->f_aggregator_index) {
free(fview_cnt);
free(req);
}
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* gather at each aggregator how many entires from the broken file view it
expects from each process */
if (0 == fh->f_rank%fh->f_aggregator_index) {
for (i=0; i<fh->f_size ; i++) {
rc = MCA_PML_CALL(irecv(&fview_cnt[i],
1,
MPI_INT,
i,
OMPIO_TAG_GATHER,
fh->f_comm,
&req[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
for (i=0 ; i<num_aggregators ; i++) {
rc = MCA_PML_CALL(isend(&num_entries[i],
1,
MPI_INT,
i*fh->f_aggregator_index,
OMPIO_TAG_GATHER,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
rc = ompi_request_wait_all (fh->f_size, req, MPI_STATUSES_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
rc = ompi_request_wait_all (num_aggregators, sendreq, MPI_STATUSES_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
/*
for (i=0 ; i<num_aggregators ; i++) {
fh->f_comm->c_coll.coll_gather (&num_entries[i],
1,
MPI_INT,
fview_cnt,
1,
MPI_INT,
i*fh->f_aggregator_index,
fh->f_comm,
fh->f_comm->c_coll.coll_gather_module);
}
*/
if (0 == fh->f_rank%fh->f_aggregator_index) {
displs = (int*) malloc (fh->f_size * sizeof (int));
if (NULL == displs) {
opal_output (1, "OUT OF MEMORY\n");
free(fview_cnt);
free(num_entries);
free(broken_index);
return OMPI_ERR_OUT_OF_RESOURCE;
}
displs[0] = 0;
global_fview_count = fview_cnt[0];
for (i=1 ; i<fh->f_size ; i++) {
global_fview_count += fview_cnt[i];
displs[i] = displs[i-1] + fview_cnt[i-1];
}
if (global_fview_count) {
global_fview = (struct iovec*)malloc (global_fview_count *
sizeof(struct iovec));
if (NULL == global_fview) {
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
free(broken_index);
free(fview_cnt);
free(displs);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
}
broken = (struct iovec**)malloc (num_aggregators * sizeof(struct iovec *));
if (NULL == broken) {
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
free(broken_index);
if (0 == fh->f_rank%fh->f_aggregator_index) {
free(global_fview);
free(displs);
free(fview_cnt);
}
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0 ; i<num_aggregators ; i++) {
broken[i] = NULL;
if (0 != num_entries[i]) {
broken[i] = (struct iovec*) malloc (num_entries[i] *
sizeof (struct iovec));
if (NULL == broken[i]) {
int j;
opal_output (1, "OUT OF MEMORY\n");
free(num_entries);
free(broken_index);
for (j=0; j<i; j++) {
free(broken[j]);
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
free(global_fview);
free(displs);
free(fview_cnt);
}
free(broken);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
}
for (i=0 ; i<broken_count ; i++) {
temp = (int)((OPAL_PTRDIFF_TYPE)broken_iov[i].iov_base/stripe_size) %
num_aggregators;
broken[temp][broken_index[temp]].iov_base = broken_iov[i].iov_base;
broken[temp][broken_index[temp]].iov_len = broken_iov[i].iov_len;
broken_index[temp] ++;
}
/*
for (i=0 ; i<num_aggregators ; i++) {
int j;
for (j=0 ; j<num_entries[i] ; j++) {
printf("%d->%d: OFFSET: %d LENGTH: %d\n",
fh->f_rank,
i,
broken[i][j].iov_base,
broken[i][j].iov_len);
}
}
sleep(1);
*/
if (0 == fh->f_rank%fh->f_aggregator_index) {
ptrdiff_t lb, extent;
rc = ompi_datatype_get_extent(fh->f_iov_type, &lb, &extent);
if (OMPI_SUCCESS != rc) {
goto exit;
}
for (i=0; i<fh->f_size ; i++) {
if (fview_cnt[i]) {
char *ptmp;
ptmp = ((char *) global_fview) + (extent * displs[i]);
rc = MCA_PML_CALL(irecv(ptmp,
fview_cnt[i],
fh->f_iov_type,
i,
OMPIO_TAG_GATHERV,
fh->f_comm,
&req[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0 ; i<num_aggregators ; i++) {
if (num_entries[i]) {
rc = MCA_PML_CALL(isend(broken[i],
num_entries[i],
fh->f_iov_type,
i*fh->f_aggregator_index,
OMPIO_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
for (i=0; i<fh->f_size ; i++) {
if (fview_cnt[i]) {
rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0; i<num_aggregators ; i++) {
if (num_entries[i]) {
rc = ompi_request_wait (&sendreq[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
/*
for (i=0 ; i<num_aggregators ; i++) {
fh->f_comm->c_coll.coll_gatherv (broken[i],
num_entries[i],
fh->f_iov_type,
global_fview,
fview_cnt,
displs,
fh->f_iov_type,
i*fh->f_aggregator_index,
fh->f_comm,
fh->f_comm->c_coll.coll_gatherv_module);
}
*/
/*
for (i=0 ; i<global_fview_count ; i++) {
printf("%d: OFFSET: %d LENGTH: %d\n",
fh->f_rank,
global_fview[i].iov_base,
global_fview[i].iov_len);
}
*/
exit:
if (NULL != broken) {
for (i=0 ; i<num_aggregators ; i++) {
if (NULL != broken[i]) {
free (broken[i]);
}
}
free (broken);
}
if (NULL != req) {
free (req);
}
if (NULL != sendreq) {
free (sendreq);
}
free (num_entries);
free (broken_index);
if (NULL != displs) {
free (displs);
}
*fview_count = fview_cnt;
*iov = global_fview;
*count = global_fview_count;
return rc;
}
int ompi_io_ompio_gather_data (mca_io_ompio_file_t *fh,
void *send_buf,
size_t total_bytes_sent,
int *bytes_sent,
struct iovec *broken_iovec,
int broken_index,
size_t partial,
void *global_buf,
int *bytes_per_process,
int *displs,
int num_aggregators,
size_t stripe_size)
{
void **sbuf = NULL;
size_t bytes_remaining;
size_t *temp_position;
size_t part;
int current;
int temp = 0;
int i = 0;
int rc = OMPI_SUCCESS;
MPI_Request *req=NULL, *sendreq=NULL;
current = broken_index;
part = partial;
sbuf = (void**) malloc (num_aggregators * sizeof(void *));
if (NULL == sbuf) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
temp_position = (size_t *) malloc (num_aggregators * sizeof(size_t));
if (NULL == temp_position) {
opal_output (1, "OUT OF MEMORY\n");
free(sbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memset (temp_position, 0x0, num_aggregators * sizeof (size_t));
for (i=0 ; i<num_aggregators ; i++) {
sbuf[i] = NULL;
if (0 != bytes_sent[i]) {
sbuf[i] = (void *) malloc (bytes_sent[i]);
if (NULL == sbuf[i]) {
opal_output (1, "OUT OF MEMORY\n");
free(sbuf);
free(temp_position);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
}
bytes_remaining = total_bytes_sent;
while (bytes_remaining) {
temp = (int)((OPAL_PTRDIFF_TYPE)broken_iovec[current].iov_base/stripe_size)
% num_aggregators;
if (part) {
if (bytes_remaining > part) {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+
temp_position[temp]),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf +
(total_bytes_sent-bytes_remaining)),
part);
bytes_remaining -= part;
temp_position[temp] += part;
part = 0;
current ++;
}
else {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+
temp_position[temp]),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf +
(total_bytes_sent-bytes_remaining)),
bytes_remaining);
break;
}
}
else {
if (bytes_remaining > broken_iovec[current].iov_len) {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+
temp_position[temp]),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf +
(total_bytes_sent-bytes_remaining)),
broken_iovec[current].iov_len);
bytes_remaining -= broken_iovec[current].iov_len;
temp_position[temp] += broken_iovec[current].iov_len;
current ++;
}
else {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)sbuf[temp]+
temp_position[temp]),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)send_buf +
(total_bytes_sent-bytes_remaining)),
bytes_remaining);
break;
}
}
}
sendreq = (MPI_Request *)malloc (num_aggregators * sizeof(MPI_Request));
if (NULL == sendreq) {
free(sbuf);
free(temp_position);
return OMPI_ERR_OUT_OF_RESOURCE;
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request));
if (NULL == req) {
free(sbuf);
free(temp_position);
free(sendreq);
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0; i<fh->f_size ; i++) {
if (bytes_per_process[i]) {
rc = MCA_PML_CALL(irecv((char *)global_buf + displs[i],
bytes_per_process[i],
MPI_BYTE,
i,
OMPIO_TAG_GATHERV,
fh->f_comm,
&req[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0 ; i<num_aggregators ; i++) {
if (bytes_sent[i]) {
rc = MCA_PML_CALL(isend(sbuf[i],
bytes_sent[i],
MPI_BYTE,
i*fh->f_aggregator_index,
OMPIO_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&sendreq[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
for (i=0; i<fh->f_size ; i++) {
if (bytes_per_process[i]) {
rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0; i<num_aggregators ; i++) {
if (bytes_sent[i]) {
rc = ompi_request_wait (&sendreq[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
/*
for (i=0 ; i<num_aggregators ; i++) {
fh->f_comm->c_coll.coll_gatherv (sbuf[i],
bytes_sent[i],
MPI_BYTE,
global_buf,
bytes_per_process,
displs,
MPI_BYTE,
i*fh->f_aggregator_index,
fh->f_comm,
fh->f_comm->c_coll.coll_gatherv_module);
}
*/
exit:
for (i=0 ; i<num_aggregators ; i++) {
if (NULL != sbuf[i]) {
free (sbuf[i]);
}
}
free (sbuf);
if (NULL != req) {
free (req);
}
if (NULL != sendreq) {
free (sendreq);
}
free (temp_position);
return rc;
}
int ompi_io_ompio_scatter_data (mca_io_ompio_file_t *fh,
void *receive_buf,
size_t total_bytes_recv,
int *bytes_received,
struct iovec *broken_iovec,
int broken_index,
size_t partial,
void *global_buf,
int *bytes_per_process,
int *displs,
int num_aggregators,
size_t stripe_size)
{
void **rbuf = NULL;
size_t bytes_remaining;
size_t *temp_position = NULL;
size_t part;
int current;
int temp = 0;
int i = 0;
int rc = OMPI_SUCCESS;
MPI_Request *req=NULL, *recvreq=NULL;
current = broken_index;
part = partial;
rbuf = (void**) malloc (num_aggregators * sizeof(void *));
if (NULL == rbuf) {
opal_output (1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
temp_position = (size_t *) malloc (num_aggregators * sizeof(size_t));
if (NULL == temp_position) {
opal_output (1, "OUT OF MEMORY\n");
free(rbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memset (temp_position, 0x0, num_aggregators * sizeof (size_t));
for (i=0 ; i<num_aggregators ; i++) {
rbuf[i] = NULL;
if (0 != bytes_received[i]) {
rbuf[i] = (void *) malloc (bytes_received[i]);
if (NULL == rbuf[i]) {
int j;
opal_output (1, "OUT OF MEMORY\n");
free(temp_position);
for (j=0; j<i; j++) {
free(rbuf[j]);
}
free(rbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
}
recvreq = (MPI_Request *)malloc (num_aggregators * sizeof(MPI_Request));
if (NULL == recvreq) {
free(temp_position);
for (i=0; i<num_aggregators; i++) {
free(rbuf[i]);
}
free(rbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0 ; i<num_aggregators ; i++) {
if (bytes_received[i]) {
rc = MCA_PML_CALL(irecv(rbuf[i],
bytes_received[i],
MPI_BYTE,
i*fh->f_aggregator_index,
OMPIO_TAG_SCATTERV,
fh->f_comm,
&recvreq[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
req = (MPI_Request *)malloc (fh->f_size * sizeof(MPI_Request));
if (NULL == req) {
free(temp_position);
for (i=0; i<num_aggregators; i++) {
free(rbuf[i]);
}
free(rbuf);
free(recvreq);
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0; i<fh->f_size ; i++) {
if (bytes_per_process[i]) {
rc = MCA_PML_CALL(isend((char *)global_buf + displs[i],
bytes_per_process[i],
MPI_BYTE,
i,
OMPIO_TAG_SCATTERV,
MCA_PML_BASE_SEND_STANDARD,
fh->f_comm,
&req[i]));
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
for (i=0; i<num_aggregators ; i++) {
if (bytes_received[i]) {
rc = ompi_request_wait (&recvreq[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
if (0 == fh->f_rank%fh->f_aggregator_index) {
for (i=0; i<fh->f_size ; i++) {
if (bytes_per_process[i]) {
rc = ompi_request_wait (&req[i], MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != rc) {
goto exit;
}
}
}
}
/*
for (i=0 ; i<num_aggregators ; i++) {
fh->f_comm->c_coll.coll_scatterv (global_buf,
bytes_per_process,
displs,
MPI_BYTE,
rbuf[i],
bytes_received[i],
MPI_BYTE,
i*fh->f_aggregator_index,
fh->f_comm,
fh->f_comm->c_coll.coll_scatterv_module);
}
*/
bytes_remaining = total_bytes_recv;
while (bytes_remaining) {
temp = (int)((OPAL_PTRDIFF_TYPE)broken_iovec[current].iov_base/stripe_size)
% num_aggregators;
if (part) {
if (bytes_remaining > part) {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf +
(total_bytes_recv-bytes_remaining)),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+
temp_position[temp]),
part);
bytes_remaining -= part;
temp_position[temp] += part;
part = 0;
current ++;
}
else {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf +
(total_bytes_recv-bytes_remaining)),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+
temp_position[temp]),
bytes_remaining);
break;
}
}
else {
if (bytes_remaining > broken_iovec[current].iov_len) {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf +
(total_bytes_recv-bytes_remaining)),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+
temp_position[temp]),
broken_iovec[current].iov_len);
bytes_remaining -= broken_iovec[current].iov_len;
temp_position[temp] += broken_iovec[current].iov_len;
current ++;
}
else {
memcpy ((IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)receive_buf +
(total_bytes_recv-bytes_remaining)),
(IOVBASE_TYPE *)((OPAL_PTRDIFF_TYPE)rbuf[temp]+
temp_position[temp]),
bytes_remaining);
break;
}
}
}
exit:
for (i=0 ; i<num_aggregators ; i++) {
if (NULL != rbuf[i]) {
free (rbuf[i]);
rbuf[i] = NULL;
}
}
if (NULL != req) {
free (req);
}
if (NULL != recvreq) {
free (recvreq);
}
if (NULL != rbuf) {
free (rbuf);
rbuf = NULL;
}
if (NULL != temp_position) {
free (temp_position);
temp_position = NULL;
}
return rc;
}
void mca_io_ompio_get_num_aggregators ( int *num_aggregators)
{

Просмотреть файл

@ -568,41 +568,6 @@ OMPI_DECLSPEC int ompi_io_ompio_break_file_view (mca_io_ompio_file_t *fh,
struct iovec **broken_iov,
int *broken_count);
OMPI_DECLSPEC int ompi_io_ompio_distribute_file_view (mca_io_ompio_file_t *fh,
struct iovec *broken_iov,
int broken_count,
int num_aggregators,
size_t stripe_size,
int **fview_count,
struct iovec **iov,
int *count);
OMPI_DECLSPEC int ompi_io_ompio_gather_data (mca_io_ompio_file_t *fh,
void *send_buf,
size_t total_bytes_sent,
int *bytes_sent,
struct iovec *broken_iovec,
int broken_index,
size_t partial,
void *global_buf,
int *bytes_per_process,
int *displs,
int num_aggregators,
size_t stripe_size);
OMPI_DECLSPEC int ompi_io_ompio_scatter_data (mca_io_ompio_file_t *fh,
void *receive_buf,
size_t total_bytes_recv,
int *bytes_received,
struct iovec *broken_iovec,
int broken_index,
size_t partial,
void *global_buf,
int *bytes_per_process,
int *displs,
int num_aggregators,
size_t stripe_size);
/*
* Modified versions of Collective operations