fix the merge algorithm in the individual sharedfp component, which could
lead to file inconsistency in case of identical timestamps Also fixes a potential buffer size problem.
Этот коммит содержится в:
родитель
423114e168
Коммит
f214ccf499
@ -139,9 +139,9 @@ mca_sharedfp_individual_header_record* mca_sharedfp_individual_insert_headnode(v
|
||||
int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh);
|
||||
int mca_sharedfp_individual_get_timestamps_and_reclengths(double **buff, long **rec_length, MPI_Offset **offbuff,struct mca_sharedfp_base_data_t *sh);
|
||||
int mca_sharedfp_individual_create_buff(double **ts,MPI_Offset **off,int totalnodes,int size);
|
||||
int mca_sharedfp_individual_sort_timestamps(double **ts,MPI_Offset **off, int totalnodes);
|
||||
int mca_sharedfp_individual_sort_timestamps(double **ts,MPI_Offset **off, int **ranks, int totalnodes);
|
||||
MPI_Offset mca_sharedfp_individual_assign_globaloffset(MPI_Offset **offsetbuff,int totalnodes,struct mca_sharedfp_base_data_t *sh);
|
||||
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int totalnodes);
|
||||
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int *ranks, int myrank, int totalnodes);
|
||||
/*int mca_sharedfp_individual_cleanup(double *ts, int* rnk, MPI_Offset *off);*/
|
||||
|
||||
int mca_sharedfp_individual_insert_metadata(int functype,long recordlength,struct mca_sharedfp_base_data_t *sh );
|
||||
|
@ -38,7 +38,8 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
|
||||
MPI_Comm comm;
|
||||
int rank, size;
|
||||
int nodesoneachprocess = 0;
|
||||
int idx = 0,i = 0;
|
||||
int idx=0,i=0,j=0, l=0;
|
||||
int *ranks = NULL;
|
||||
double *timestampbuff = NULL;
|
||||
OMPI_MPI_OFFSET_TYPE *offsetbuff = NULL;
|
||||
int *countbuff = NULL;
|
||||
@ -48,6 +49,7 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
|
||||
OMPI_MPI_OFFSET_TYPE *local_off = NULL;
|
||||
int totalnodes = 0;
|
||||
ompi_status_public_t status;
|
||||
int recordlength=0;
|
||||
|
||||
comm = sh->comm;
|
||||
|
||||
@ -99,7 +101,7 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
|
||||
}
|
||||
}
|
||||
|
||||
if ( nodesoneachprocess == 0) {
|
||||
if ( 0 == nodesoneachprocess ) {
|
||||
ind_ts[0] = 0;
|
||||
ind_recordlength[0] = 0;
|
||||
local_off[0] = 0;
|
||||
@ -118,6 +120,17 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ranks = (int *) malloc ( totalnodes * sizeof(int));
|
||||
if ( NULL == ranks ) {
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
for ( l=0, i=0; i<size; i++ ) {
|
||||
for ( j=0; j<countbuff[i]; j++ ) {
|
||||
ranks[l++]=i;
|
||||
}
|
||||
}
|
||||
|
||||
ret = mca_sharedfp_individual_create_buff ( ×tampbuff, &offsetbuff, totalnodes, size);
|
||||
if ( OMPI_SUCCESS != ret ) {
|
||||
goto exit;
|
||||
@ -137,31 +150,41 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
|
||||
goto exit;
|
||||
}
|
||||
|
||||
ret = mca_sharedfp_individual_sort_timestamps(×tampbuff, &offsetbuff,totalnodes);
|
||||
ret = mca_sharedfp_individual_sort_timestamps(×tampbuff, &offsetbuff, &ranks, totalnodes);
|
||||
if ( OMPI_SUCCESS != ret ) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
sh->global_offset = mca_sharedfp_individual_assign_globaloffset ( &offsetbuff, totalnodes, sh);
|
||||
|
||||
buff = (char * ) malloc( ind_recordlength[0] * 1.2 );
|
||||
recordlength = ind_recordlength[0] * 1.2;
|
||||
buff = (char * ) malloc( recordlength );
|
||||
if ( NULL == buff ) {
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
for (i = 0; i < nodesoneachprocess ; i++) {
|
||||
if ( ind_recordlength[i] > recordlength ) {
|
||||
recordlength = ind_recordlength[i] * 1.2;
|
||||
buff = (char *) realloc ( buff, recordlength );
|
||||
if ( NULL == buff ) {
|
||||
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
|
||||
/*Read from the local data file*/
|
||||
ompio_io_ompio_file_read_at ( headnode->datafilehandle,
|
||||
local_off[i], buff, ind_recordlength[i],
|
||||
MPI_BYTE, &status);
|
||||
|
||||
idx = mca_sharedfp_individual_getoffset(ind_ts[i],timestampbuff,totalnodes);
|
||||
idx = mca_sharedfp_individual_getoffset(ind_ts[i],timestampbuff, ranks, rank, totalnodes);
|
||||
|
||||
if ( mca_sharedfp_individual_verbose ) {
|
||||
opal_output(ompi_sharedfp_base_framework.framework_output,
|
||||
"sharedfp_individual_collaborate_data: Process %d writing %ld bytes to main file \n",
|
||||
rank,ind_recordlength[i]);
|
||||
"sharedfp_individual_collaborate_data: Process %d writing %ld bytes to main file at position"
|
||||
"%lld (%d)\n", rank, ind_recordlength[i], offsetbuff[idx], idx);
|
||||
}
|
||||
|
||||
/*Write into main data file*/
|
||||
@ -196,6 +219,9 @@ exit:
|
||||
if ( NULL != buff ) {
|
||||
free ( buff );
|
||||
}
|
||||
if ( NULL != ranks ) {
|
||||
free ( ranks );
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -323,7 +349,7 @@ int mca_sharedfp_individual_create_buff(double **ts,MPI_Offset **off,int totaln
|
||||
}
|
||||
|
||||
/*Sort the timestamp buffer*/
|
||||
int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int totalnodes)
|
||||
int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int **ranks, int totalnodes)
|
||||
{
|
||||
|
||||
int i = 0;
|
||||
@ -331,7 +357,7 @@ int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int
|
||||
int flag = 1;
|
||||
double tempts = 0.0;
|
||||
OMPI_MPI_OFFSET_TYPE tempoffset = 0;
|
||||
|
||||
int temprank = 0;
|
||||
|
||||
for (i= 1; (i <= totalnodes)&&(flag) ; i++) {
|
||||
flag = 0;
|
||||
@ -347,6 +373,11 @@ int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int
|
||||
*(*off + j) = *(*off + j + 1);
|
||||
*(*off + j + 1) = tempoffset;
|
||||
|
||||
/*swap ranks*/
|
||||
temprank = *(*ranks + j);
|
||||
*(*ranks + j) = *(*ranks + j + 1);
|
||||
*(*ranks + j + 1) = temprank;
|
||||
|
||||
flag = 1;
|
||||
}
|
||||
}
|
||||
@ -380,13 +411,14 @@ MPI_Offset mca_sharedfp_individual_assign_globaloffset(MPI_Offset **offsetbuff,
|
||||
}
|
||||
|
||||
|
||||
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int totalnodes)
|
||||
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int *ranks, int myrank, int totalnodes)
|
||||
{
|
||||
int i = 0;
|
||||
int notfound = 1;
|
||||
|
||||
|
||||
while (notfound) {
|
||||
if (ts[i] == timestamp)
|
||||
if (ts[i] == timestamp && ranks[i] == myrank )
|
||||
break;
|
||||
|
||||
i++;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user