1
1

- make the split collective shared file pointer operations work

- minor code restructering in io/ompio required for that.
Этот коммит содержится в:
Edgar Gabriel 2015-07-15 16:05:52 -05:00
родитель 429bdf1af7
Коммит 824d488709
9 изменённых файлов: 686 добавлений и 108 удалений

Просмотреть файл

@ -422,16 +422,6 @@ OMPI_DECLSPEC int ompio_io_ompio_file_write_at_all (mca_io_ompio_file_t *fh,
struct ompi_datatype_t *datatype,
ompi_status_public_t *status);
OMPI_DECLSPEC int ompio_io_ompio_file_write_at_all_begin (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
int count,
struct ompi_datatype_t *datatype);
OMPI_DECLSPEC int ompio_io_ompio_file_write_at_all_end (mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t * status);
OMPI_DECLSPEC int ompio_io_ompio_file_iwrite_at (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
@ -445,6 +435,12 @@ OMPI_DECLSPEC int ompio_io_ompio_file_iwrite (mca_io_ompio_file_t *fh,
struct ompi_datatype_t *datatype,
ompi_request_t **request);
OMPI_DECLSPEC int ompio_io_ompio_file_iwrite_at_all (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request);
OMPI_DECLSPEC int ompio_io_ompio_file_iread (mca_io_ompio_file_t *fh,
void *buf,
int count,
@ -462,6 +458,12 @@ OMPI_DECLSPEC int ompio_io_ompio_file_iread_at (mca_io_ompio_file_t *fh,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request);
OMPI_DECLSPEC int ompio_io_ompio_file_iread_at_all (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request);
OMPI_DECLSPEC int ompio_io_ompio_file_read_at (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
@ -474,14 +476,6 @@ OMPI_DECLSPEC int ompio_io_ompio_file_read_at_all (mca_io_ompio_file_t *fh,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
OMPI_DECLSPEC int ompio_io_ompio_file_read_at_all_begin (mca_io_ompio_file_t *ompio_fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
int count,
struct ompi_datatype_t *datatype);
OMPI_DECLSPEC int ompio_io_ompio_file_read_at_all_end (mca_io_ompio_file_t *ompio_fh,
void *buf,
ompi_status_public_t * status);
OMPI_DECLSPEC int ompio_io_ompio_file_get_size (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *size);

Просмотреть файл

@ -458,16 +458,27 @@ int mca_io_ompio_file_iread_at_all (ompi_file_t *fh,
{
int ret = OMPI_SUCCESS;
mca_io_ompio_data_t *data;
mca_io_ompio_file_t *fp=NULL;
OMPI_MPI_OFFSET_TYPE prev_offset;
data = (mca_io_ompio_data_t *) fh->f_io_selected_data;
fp = &data->ompio_fh;
ret = ompio_io_ompio_file_iread_at_all ( &data->ompio_fh, offset, buf, count, datatype, request );
return ret;
}
int ompio_io_ompio_file_iread_at_all (mca_io_ompio_file_t *fp,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request)
{
int ret = OMPI_SUCCESS;
OMPI_MPI_OFFSET_TYPE prev_offset;
ompio_io_ompio_file_get_position (fp, &prev_offset );
ompi_io_ompio_set_explicit_offset (fp, offset);
if ( NULL != fp->f_fcoll->fcoll_file_iread_all ) {
ret = fp->f_fcoll->fcoll_file_iread_all (&data->ompio_fh,
ret = fp->f_fcoll->fcoll_file_iread_all (fp,
buf,
count,
datatype,
@ -659,26 +670,16 @@ int mca_io_ompio_file_read_at_all_begin (ompi_file_t *fh,
{
int ret = OMPI_SUCCESS;
mca_io_ompio_data_t *data;
mca_io_ompio_file_t *fp=NULL;
data = (mca_io_ompio_data_t *) fh->f_io_selected_data;
ret = ompio_io_ompio_file_read_at_all_begin ( &data->ompio_fh, offset, buf, count, datatype );
return ret;
}
fp = &data->ompio_fh;
int ompio_io_ompio_file_read_at_all_begin (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
int count,
struct ompi_datatype_t *datatype)
{
int ret = OMPI_SUCCESS;
if ( true == fh->f_split_coll_in_use ) {
if ( true == fp->f_split_coll_in_use ) {
printf("Only one split collective I/O operation allowed per file handle at any given point in time!\n");
return MPI_ERR_REQUEST;
}
ret = mca_io_ompio_file_iread_at_all ( fh->f_fh, offset, buf, count, datatype, &fh->f_split_coll_req );
fh->f_split_coll_in_use = true;
ret = ompio_io_ompio_file_iread_at_all ( fp, offset, buf, count, datatype, &fp->f_split_coll_req );
fp->f_split_coll_in_use = true;
return ret;
}
@ -688,20 +689,13 @@ int mca_io_ompio_file_read_at_all_end (ompi_file_t *fh,
{
int ret = OMPI_SUCCESS;
mca_io_ompio_data_t *data;
mca_io_ompio_file_t *fp=NULL;
data = (mca_io_ompio_data_t *) fh->f_io_selected_data;
ret = ompio_io_ompio_file_read_at_all_end ( &data->ompio_fh, buf, status );
return ret;
}
int ompio_io_ompio_file_read_at_all_end (mca_io_ompio_file_t *ompio_fh,
void *buf,
ompi_status_public_t * status)
{
int ret = OMPI_SUCCESS;
ret = ompi_request_wait ( &ompio_fh->f_split_coll_req, status );
fp = &data->ompio_fh;
ret = ompi_request_wait ( &fp->f_split_coll_req, status );
/* remove the flag again */
ompio_fh->f_split_coll_in_use = false;
fp->f_split_coll_in_use = false;
return ret;
}

Просмотреть файл

@ -217,7 +217,7 @@ int ompio_io_ompio_file_iwrite (mca_io_ompio_file_t *fh,
ompio_req->req_type = MCA_OMPIO_REQUEST_WRITE;
ompio_req->req_ompi.req_state = OMPI_REQUEST_ACTIVE;
if ( 0 == count ) {
if ( 0 == count ) {
ompi_request_complete (&ompio_req->req_ompi, 0);
ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
ompio_req->req_ompi.req_status._ucount = 0;
@ -569,17 +569,29 @@ int mca_io_ompio_file_iwrite_at_all (ompi_file_t *fh,
{
int ret = OMPI_SUCCESS;
mca_io_ompio_data_t *data;
mca_io_ompio_file_t *fp=NULL;
OMPI_MPI_OFFSET_TYPE prev_offset;
data = (mca_io_ompio_data_t *) fh->f_io_selected_data;
fp = &data->ompio_fh;
ret = ompio_io_ompio_file_iwrite_at_all ( &data->ompio_fh, offset, buf, count, datatype, request );
return ret;
}
int ompio_io_ompio_file_iwrite_at_all (mca_io_ompio_file_t *fp,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request)
{
int ret = OMPI_SUCCESS;
OMPI_MPI_OFFSET_TYPE prev_offset;
ompio_io_ompio_file_get_position (fp, &prev_offset );
ompi_io_ompio_set_explicit_offset (fp, offset);
if ( NULL != fp->f_fcoll->fcoll_file_iwrite_all ) {
ret = fp->f_fcoll->fcoll_file_iwrite_all (&data->ompio_fh,
ret = fp->f_fcoll->fcoll_file_iwrite_all (fp,
buf,
count,
datatype,
@ -592,12 +604,14 @@ int mca_io_ompio_file_iwrite_at_all (ompi_file_t *fh,
ret = ompio_io_ompio_file_iwrite ( fp, buf, count, datatype, request );
}
ompi_io_ompio_set_explicit_offset (fp, prev_offset);
return ret;
}
/* Infrastructure for shared file pointer operations */
/* (Individual and collective */
/******************************************************/
@ -773,51 +787,37 @@ int mca_io_ompio_file_write_at_all_begin (ompi_file_t *fh,
struct ompi_datatype_t *datatype)
{
int ret = OMPI_SUCCESS;
mca_io_ompio_data_t *data;
mca_io_ompio_data_t *data=NULL;
mca_io_ompio_file_t *fp=NULL;
data = (mca_io_ompio_data_t *) fh->f_io_selected_data;
ret = ompio_io_ompio_file_write_at_all_begin ( &data->ompio_fh, offset, buf, count, datatype );
return ret;
}
fp = &data->ompio_fh;
int ompio_io_ompio_file_write_at_all_begin (mca_io_ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
int count,
struct ompi_datatype_t *datatype)
{
int ret = OMPI_SUCCESS;
if ( true == fh->f_split_coll_in_use ) {
if ( true == fp->f_split_coll_in_use ) {
printf("Only one split collective I/O operation allowed per file handle at any given point in time!\n");
return MPI_ERR_REQUEST;
}
ret = mca_io_ompio_file_iwrite_at_all ( fh->f_fh, offset, buf, count, datatype, &fh->f_split_coll_req );
fh->f_split_coll_in_use = true;
ret = ompio_io_ompio_file_iwrite_at_all ( fp, offset, buf, count, datatype, &fp->f_split_coll_req );
fp->f_split_coll_in_use = true;
return ret;
}
int mca_io_ompio_file_write_at_all_end (ompi_file_t *fh,
void *buf,
ompi_status_public_t * status)
{
int ret = OMPI_SUCCESS;
mca_io_ompio_data_t *data;
mca_io_ompio_file_t *fp=NULL;
data = (mca_io_ompio_data_t *) fh->f_io_selected_data;
ret = ompio_io_ompio_file_read_at_all_end ( &data->ompio_fh, buf, status );
return ret;
}
int ompio_io_ompio_file_write_at_all_end (mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t * status)
{
int ret = OMPI_SUCCESS;
ret = ompi_request_wait ( &fh->f_split_coll_req, status );
fp = &data->ompio_fh;
ret = ompi_request_wait ( &fp->f_split_coll_req, status );
/* remove the flag again */
fh->f_split_coll_in_use = false;
fp->f_split_coll_in_use = false;
return ret;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013 University of Houston. All rights reserved.
* Copyright (c) 2013-2015 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -90,14 +90,132 @@ int mca_sharedfp_individual_write_ordered_begin(mca_io_ompio_file_t *fh,
int count,
struct ompi_datatype_t *datatype)
{
opal_output(0,"mca_sharedfp_individual_write_ordered_begin: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
int size = 0, rank = 0;
int i = 0;
size_t numofbytes = 0;
size_t totalbytes = 0;
OMPI_MPI_OFFSET_TYPE *offbuff=NULL;
OMPI_MPI_OFFSET_TYPE global_offset = 0;
OMPI_MPI_OFFSET_TYPE prev_offset = 0;
OMPI_MPI_OFFSET_TYPE temp = 0, offset = 0;
mca_sharedfp_individual_header_record *headnode = NULL;
struct mca_sharedfp_base_data_t *sh = NULL;
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if(fh->f_sharedfp_data==NULL){
if ( mca_sharedfp_individual_verbose ) {
printf("sharedfp_individual_write_ordered_begin - opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
fh->f_filename,
fh->f_amode,
fh->f_info,
fh);
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_individual_write_ordered_begin - error opening the shared file pointer\n");
return ret;
}
}
if ( true == fh->f_split_coll_in_use ) {
printf("Only one split collective I/O operation allowed per file handle at any given point in time!\n");
return MPI_ERR_REQUEST;
}
/*Retrieve the sharedfp data structures*/
sh = fh->f_sharedfp_data;
rank = ompi_comm_rank ( sh->comm );
size = ompi_comm_size ( sh->comm );
/* Calculate the number of bytes of data that needs to be written*/
opal_datatype_type_size ( &datatype->super, &numofbytes);
totalbytes = count * numofbytes;
headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
if ( NULL == headnode) {
opal_output (0, "sharedfp_individual_write_ordered_begin: headnode is NULL but file is open\n");
return OMPI_ERROR;
}
/* Data from all the metadata is combined and written to the main file */
ret = mca_sharedfp_individual_collaborate_data ( sh );
if ( OMPI_SUCCESS != ret) {
return ret;
}
if ( 0 == rank ) {
offbuff = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * size);
if (NULL == offbuff ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
/*collect the total bytes to be written*/
sh->comm->c_coll.coll_gather ( &totalbytes, 1, OMPI_OFFSET_DATATYPE,
offbuff, 1, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_gather_module );
if ( 0 == rank ) {
prev_offset = offbuff[0];
offbuff[0] = sh->global_offset;
for (i = 1; i < size ; i++){
temp = offbuff[i];
offbuff[i] = offbuff[i - 1] + prev_offset;
prev_offset = temp;
}
for (i = 0; i < size; i++){
global_offset = offbuff[size - 1] + prev_offset;
}
}
/* Scatter the results to the other processes */
ret = sh->comm->c_coll.coll_scatter ( offbuff, 1, OMPI_OFFSET_DATATYPE,
&offset, 1, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_individual_write_ordered_begin: Error in scattering offsets \n");
goto exit;
}
ret = sh->comm->c_coll.coll_bcast ( &global_offset, 1, OMPI_OFFSET_DATATYPE,
0, sh->comm, sh->comm->c_coll.coll_bcast_module );
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_individual_write_ordered_begin: Error while bcasting global offset \n");
goto exit;
}
sh->global_offset = global_offset;
/*use file_write_at_all to ensure the order*/
ret = ompio_io_ompio_file_iwrite_at_all(sh->sharedfh,offset, buf,count,datatype,
&fh->f_split_coll_req);
fh->f_split_coll_in_use = true;
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_individual_write_ordered_begin: Error while writing the datafile \n");
}
exit:
if ( NULL != offbuff ) {
free ( offbuff);
}
return ret;
}
int mca_sharedfp_individual_write_ordered_end(mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t *status)
{
opal_output(0,"mca_sharedfp_individual_write_ordered_end: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
ret = ompi_request_wait ( &fh->f_split_coll_req, status );
/* remove the flag again */
fh->f_split_coll_in_use = false;
return ret;
}

Просмотреть файл

@ -104,7 +104,7 @@ int mca_sharedfp_individual_write_ordered (mca_io_ompio_file_t *fh,
if(fh->f_sharedfp_data==NULL){
if ( mca_sharedfp_individual_verbose ) {
printf("sharedfp_individual_write - opening the shared file pointer\n");
printf("sharedfp_individual_write_ordered - opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
@ -114,7 +114,7 @@ int mca_sharedfp_individual_write_ordered (mca_io_ompio_file_t *fh,
fh->f_info,
fh);
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_individual_write - error opening the shared file pointer\n");
opal_output(0,"sharedfp_individual_write_ordered - error opening the shared file pointer\n");
return ret;
}
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013 University of Houston. All rights reserved.
* Copyright (c) 2013-2015 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -87,8 +87,121 @@ int mca_sharedfp_lockedfile_read_ordered_begin(mca_io_ompio_file_t *fh,
int count,
struct ompi_datatype_t *datatype)
{
opal_output(0,"mca_sharedfp_lockedfile_write_ordered_begin: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
mca_sharedfp_base_module_t * shared_fp_base_module=NULL;
OMPI_MPI_OFFSET_TYPE offset = 0;
long sendBuff = 0;
long *buff=NULL;
long offsetBuff;
OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
long bytesRequested = 0;
int recvcnt = 1, sendcnt = 1;
size_t numofBytes;
int rank, size, i;
struct mca_sharedfp_base_data_t *sh = NULL;
if(fh->f_sharedfp_data==NULL){
if ( mca_sharedfp_lockedfile_verbose ) {
printf("sharedfp_lockedfile_read_ordered_begin: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
fh->f_filename,
fh->f_amode,
fh->f_info,
fh);
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_lockedfile_read_ordered_begin - error opening the shared file pointer\n");
return ret;
}
}
if ( true == fh->f_split_coll_in_use ) {
printf("Only one split collective I/O operation allowed per file handle at any given point in time!\n");
return MPI_ERR_REQUEST;
}
/*Retrieve the new communicator*/
sh = fh->f_sharedfp_data;
/* Calculate the number of bytes to write*/
opal_datatype_type_size ( &datatype->super, &numofBytes);
sendBuff = count * numofBytes;
/* Get the ranks in the communicator */
rank = ompi_comm_rank ( sh->comm );
size = ompi_comm_size ( sh->comm );
if ( 0 == rank ) {
buff = (long*) malloc (sizeof(long) * size);
if ( NULL == buff ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
ret = sh->comm->c_coll.coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE, buff, recvcnt,
OMPI_OFFSET_DATATYPE, 0, sh->comm,
sh->comm->c_coll.coll_gather_module );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
/* All the counts are present now in the recvBuff.
The size of recvBuff is sizeof_newComm
*/
if (rank == 0) {
for ( i = 0; i < size ; i ++) {
bytesRequested += buff[i];
if ( mca_sharedfp_lockedfile_verbose ) {
printf("sharedfp_lockedfile_read_ordered_begin: Bytes requested are %ld\n",bytesRequested);
}
}
/*Request the offset to write bytesRequested bytes
only the root process needs to do the request,
since the root process will then tell the other
processes at what offset they should write their
share of the data.
*/
ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
if ( OMPI_SUCCESS != ret ){
goto exit;
}
if ( mca_sharedfp_lockedfile_verbose ) {
printf("sharedfp_lockedfile_read_ordered_begin: Offset received is %lld\n",offsetReceived);
}
buff[0] += offsetReceived;
for (i = 1 ; i < size; i++) {
buff[i] += buff[i-1];
}
}
/* Scatter the results to the other processes*/
ret = sh->comm->c_coll.coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
&offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
/*Each process now has its own individual offset*/
offset = offsetBuff - sendBuff;
if ( mca_sharedfp_lockedfile_verbose ) {
printf("sharedfp_lockedfile_read_ordered_begin: Offset returned is %lld\n",offset);
}
ret = ompio_io_ompio_file_iread_at_all ( sh->sharedfh, offset, buf, count, datatype, &fh->f_split_coll_req );
fh->f_split_coll_in_use = true;
exit:
if ( NULL != buff ) {
free ( buff);
}
return ret;
}
@ -96,6 +209,10 @@ int mca_sharedfp_lockedfile_read_ordered_end(mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t *status)
{
opal_output(0,"mca_sharedfp_lockedfile_write_ordered_end: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
ret = ompi_request_wait ( &fh->f_split_coll_req, status );
/* remove the flag again */
fh->f_split_coll_in_use = false;
return ret;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013 University of Houston. All rights reserved.
* Copyright (c) 2013-2015 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -85,8 +85,121 @@ int mca_sharedfp_lockedfile_write_ordered_begin(mca_io_ompio_file_t *fh,
int count,
struct ompi_datatype_t *datatype)
{
opal_output(0,"mca_sharedfp_lockedfile_write_ordered_begin: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
mca_sharedfp_base_module_t * shared_fp_base_module=NULL;
OMPI_MPI_OFFSET_TYPE offset = 0;
long sendBuff = 0;
long *buff=NULL;
long offsetBuff;
OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
long bytesRequested = 0;
int recvcnt = 1, sendcnt = 1;
size_t numofBytes;
int rank, size, i;
struct mca_sharedfp_base_data_t *sh = NULL;
if(fh->f_sharedfp_data==NULL){
if ( mca_sharedfp_lockedfile_verbose ) {
printf("sharedfp_lockedfile_write_ordered_begin: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
fh->f_filename,
fh->f_amode,
fh->f_info,
fh);
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_lockedfile_write_ordered_begin - error opening the shared file pointer\n");
return ret;
}
}
if ( true == fh->f_split_coll_in_use ) {
printf("Only one split collective I/O operation allowed per file handle at any given point in time!\n");
return MPI_ERR_REQUEST;
}
/*Retrieve the new communicator*/
sh = fh->f_sharedfp_data;
/* Calculate the number of bytes to write*/
opal_datatype_type_size ( &datatype->super, &numofBytes);
sendBuff = count * numofBytes;
/* Get the ranks in the communicator */
rank = ompi_comm_rank ( sh->comm );
size = ompi_comm_size ( sh->comm );
if ( 0 == rank ) {
buff = (long*) malloc (sizeof(long) * size);
if ( NULL == buff ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
ret = sh->comm->c_coll.coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE, buff, recvcnt,
OMPI_OFFSET_DATATYPE, 0, sh->comm,
sh->comm->c_coll.coll_gather_module );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
/* All the counts are present now in the recvBuff.
The size of recvBuff is sizeof_newComm
*/
if (rank == 0) {
for ( i = 0; i < size ; i ++) {
bytesRequested += buff[i];
if ( mca_sharedfp_lockedfile_verbose ) {
printf("sharedfp_lockedfile_write_ordered_begin: Bytes requested are %ld\n",bytesRequested);
}
}
/*Request the offset to write bytesRequested bytes
only the root process needs to do the request,
since the root process will then tell the other
processes at what offset they should write their
share of the data.
*/
ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
if ( OMPI_SUCCESS != ret ){
goto exit;
}
if ( mca_sharedfp_lockedfile_verbose ) {
printf("sharedfp_lockedfile_write_ordered_begin: Offset received is %lld\n",offsetReceived);
}
buff[0] += offsetReceived;
for (i = 1 ; i < size; i++) {
buff[i] += buff[i-1];
}
}
/* Scatter the results to the other processes*/
ret = sh->comm->c_coll.coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
&offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
/*Each process now has its own individual offset*/
offset = offsetBuff - sendBuff;
if ( mca_sharedfp_lockedfile_verbose ) {
printf("sharedfp_lockedfile_write_ordered_begin: Offset returned is %lld\n",offset);
}
ret = ompio_io_ompio_file_iwrite_at_all ( sh->sharedfh, offset, buf, count, datatype, &fh->f_split_coll_req );
fh->f_split_coll_in_use = true;
exit:
if ( NULL != buff ) {
free ( buff);
}
return ret;
}
@ -95,6 +208,10 @@ int mca_sharedfp_lockedfile_write_ordered_end(mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t *status)
{
opal_output(0,"mca_sharedfp_lockedfile_write_ordered_end: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
ret = ompi_request_wait ( &fh->f_split_coll_req, status );
/* remove the flag again */
fh->f_split_coll_in_use = false;
return ret;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013 University of Houston. All rights reserved.
* Copyright (c) 201302915 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -84,8 +84,123 @@ int mca_sharedfp_sm_read_ordered_begin(mca_io_ompio_file_t *fh,
int count,
struct ompi_datatype_t *datatype)
{
opal_output(0,"mca_sharedfp_sm_read_ordered_begin: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
OMPI_MPI_OFFSET_TYPE offset = 0;
long sendBuff = 0;
long *buff=NULL;
long offsetBuff;
OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
long bytesRequested = 0;
int recvcnt = 1, sendcnt = 1;
size_t numofBytes;
int rank, size, i;
struct mca_sharedfp_base_data_t *sh = NULL;
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if ( NULL == fh->f_sharedfp_data){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_read_ordered_begin: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
fh->f_filename,
fh->f_amode,
fh->f_info,
fh);
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_sm_read_ordered_begin - error opening the shared file pointer\n");
return ret;
}
}
if ( true == fh->f_split_coll_in_use ) {
printf("Only one split collective I/O operation allowed per file handle at any given point in time!\n");
return MPI_ERR_REQUEST;
}
/*Retrieve the new communicator*/
sh = fh->f_sharedfp_data;
/* Calculate the number of bytes to read*/
opal_datatype_type_size ( &datatype->super, &numofBytes);
sendBuff = count * numofBytes;
/* Get the ranks in the communicator */
rank = ompi_comm_rank ( sh->comm );
size = ompi_comm_size ( sh->comm );
if ( 0 == rank ) {
buff = (long*)malloc(sizeof(long) * size);
if ( NULL == buff )
return OMPI_ERR_OUT_OF_RESOURCE;
}
ret = sh->comm->c_coll.coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE,
buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_gather_module );
if( OMPI_SUCCESS != ret){
goto exit;
}
/* All the counts are present now in the recvBuff.
** The size of recvBuff is sizeof_newComm
*/
if ( 0 == rank ) {
for (i = 0; i < size ; i ++) {
bytesRequested += buff[i];
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_read_ordered_begin: Bytes requested are %ld\n",
bytesRequested);
}
}
/* Request the offset to read bytesRequested bytes
** only the root process needs to do the request,
** since the root process will then tell the other
** processes at what offset they should read their
** share of the data.
*/
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offsetReceived);
if( OMPI_SUCCESS != ret){
goto exit;
}
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_read_ordered_begin: Offset received is %lld\n",offsetReceived);
}
buff[0] += offsetReceived;
for (i = 1 ; i < size; i++) {
buff[i] += buff[i-1];
}
}
/* Scatter the results to the other processes*/
ret = sh->comm->c_coll.coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
&offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
if( OMPI_SUCCESS != ret){
goto exit;
}
/*Each process now has its own individual offset in recvBUFF*/
offset = offsetBuff - sendBuff;
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_read_ordered_begin: Offset returned is %lld\n",offset);
}
/* read to the file */
ret = ompio_io_ompio_file_iread_at_all(sh->sharedfh,offset,buf,count,datatype,
&fh->f_split_coll_req);
fh->f_split_coll_in_use = true;
exit:
if ( NULL != buff ) {
free ( buff );
}
return ret;
}
@ -93,6 +208,10 @@ int mca_sharedfp_sm_read_ordered_end(mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t *status)
{
opal_output(0,"mca_sharedfp_sm_read_ordered_end: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
ret = ompi_request_wait ( &fh->f_split_coll_req, status );
/* remove the flag again */
fh->f_split_coll_in_use = false;
return ret;
}

Просмотреть файл

@ -85,8 +85,123 @@ int mca_sharedfp_sm_write_ordered_begin(mca_io_ompio_file_t *fh,
int count,
struct ompi_datatype_t *datatype)
{
opal_output(0,"mca_sharedfp_sm_write_ordered_begin: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
OMPI_MPI_OFFSET_TYPE offset = 0;
long sendBuff = 0;
long *buff=NULL;
long offsetBuff;
OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
long bytesRequested = 0;
int recvcnt = 1, sendcnt = 1;
size_t numofBytes;
int rank, size, i;
struct mca_sharedfp_base_data_t *sh = NULL;
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if ( NULL == fh->f_sharedfp_data){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write_ordered_begin: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
fh->f_filename,
fh->f_amode,
fh->f_info,
fh);
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_sm_write_ordered_begin - error opening the shared file pointer\n");
return ret;
}
}
if ( true == fh->f_split_coll_in_use ) {
printf("Only one split collective I/O operation allowed per file handle at any given point in time!\n");
return MPI_ERR_REQUEST;
}
/*Retrieve the new communicator*/
sh = fh->f_sharedfp_data;
/* Calculate the number of bytes to read*/
opal_datatype_type_size ( &datatype->super, &numofBytes);
sendBuff = count * numofBytes;
/* Get the ranks in the communicator */
rank = ompi_comm_rank ( sh->comm );
size = ompi_comm_size ( sh->comm );
if ( 0 == rank ) {
buff = (long*)malloc(sizeof(long) * size);
if ( NULL == buff )
return OMPI_ERR_OUT_OF_RESOURCE;
}
ret = sh->comm->c_coll.coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE,
buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_gather_module );
if( OMPI_SUCCESS != ret){
goto exit;
}
/* All the counts are present now in the recvBuff.
** The size of recvBuff is sizeof_newComm
*/
if ( 0 == rank ) {
for (i = 0; i < size ; i ++) {
bytesRequested += buff[i];
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_write_ordered_begin: Bytes requested are %ld\n",
bytesRequested);
}
}
/* Request the offset to read bytesRequested bytes
** only the root process needs to do the request,
** since the root process will then tell the other
** processes at what offset they should read their
** share of the data.
*/
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offsetReceived);
if( OMPI_SUCCESS != ret){
goto exit;
}
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_write_ordered_begin: Offset received is %lld\n",offsetReceived);
}
buff[0] += offsetReceived;
for (i = 1 ; i < size; i++) {
buff[i] += buff[i-1];
}
}
/* Scatter the results to the other processes*/
ret = sh->comm->c_coll.coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
&offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
if( OMPI_SUCCESS != ret){
goto exit;
}
/*Each process now has its own individual offset in recvBUFF*/
offset = offsetBuff - sendBuff;
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_write_ordered_begin: Offset returned is %lld\n",offset);
}
/* read to the file */
ret = ompio_io_ompio_file_iwrite_at_all(sh->sharedfh,offset,buf,count,datatype,
&fh->f_split_coll_req);
fh->f_split_coll_in_use = true;
exit:
if ( NULL != buff ) {
free ( buff );
}
return ret;
}
@ -94,6 +209,10 @@ int mca_sharedfp_sm_write_ordered_end(mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t *status)
{
opal_output(0,"mca_sharedfp_sm_write_ordered_end: NOT IMPLEMENTED\n");
return OMPI_ERROR;
int ret = OMPI_SUCCESS;
ret = ompi_request_wait ( &fh->f_split_coll_req, status );
/* remove the flag again */
fh->f_split_coll_in_use = false;
return ret;
}