1
1

Merge pull request #4 from jsquyres/edgargabriel-pr/sharedfp-sm-logic3

Commits on top of open-mpi/ompi#745
Этот коммит содержится в:
Edgar Gabriel 2015-07-27 10:45:07 -05:00
родитель 4f85e0d833 3e6694f7ea
Коммит 8932387cf2
10 изменённых файлов: 186 добавлений и 200 удалений

Просмотреть файл

@ -83,14 +83,14 @@ struct mca_sharedfp_base_module_1_0_0_t * mca_sharedfp_sm_component_file_query(m
ompi_group_t *group = comm->c_local_group;
for (i = 0; i < size; ++i) {
proc = ompi_group_peer_lookup(group,i);
if (!OPAL_PROC_ON_LOCAL_NODE(proc->super.proc_flags)){
opal_output(ompi_sharedfp_base_framework.framework_output,
"mca_sharedfp_sm_component_file_query: Disqualifying myself: (%d/%s) "
"not all processes are on the same node.",
comm->c_contextid, comm->c_name);
return NULL;
}
proc = ompi_group_peer_lookup(group,i);
if (!OPAL_PROC_ON_LOCAL_NODE(proc->super.proc_flags)){
opal_output(ompi_sharedfp_base_framework.framework_output,
"mca_sharedfp_sm_component_file_query: Disqualifying myself: (%d/%s) "
"not all processes are on the same node.",
comm->c_contextid, comm->c_name);
return NULL;
}
}
/* This module can run */
*priority = mca_sharedfp_sm_priority;

Просмотреть файл

@ -10,6 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2015 University of Houston. All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -101,7 +102,7 @@ int mca_sharedfp_sm_iwrite (mca_io_ompio_file_t *fh,
*Structures and definitions only for this component
*--------------------------------------------------------------*/
struct mca_sharedfp_sm_offset{
sem_t mutex; /* the mutex: a Posix memory-based unnamed semaphore */
sem_t mutex; /* the mutex: a POSIX memory-based unnamed semaphore */
long long offset; /* and the shared file pointer offset */
};
@ -113,7 +114,10 @@ struct mca_sharedfp_sm_data
struct mca_sharedfp_sm_offset * sm_offset_ptr;
/*save filename so that we can remove the file on close*/
char * sm_filename;
sem_t *mutex; /* the mutex: a Posix memory-based named semaphore */
/* The mutex: it will either point to a POSIX memory-based named
semaphore, or it will point to the a POSIX memory-based unnamed
semaphore located in sm_offset_ptr->mutex. */
sem_t *mutex;
char *sem_name; /* Name of the semaphore */
};

Просмотреть файл

@ -13,6 +13,7 @@
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -71,27 +72,27 @@ int mca_sharedfp_sm_file_open (struct ompi_communicator_t *comm,
/*Memory is allocated here for the sh structure*/
if ( mca_sharedfp_sm_verbose ) {
printf( "mca_sharedfp_sm_file_open: malloc f_sharedfp_ptr struct\n");
printf( "mca_sharedfp_sm_file_open: malloc f_sharedfp_ptr struct\n");
}
sh = (struct mca_sharedfp_base_data_t*)malloc(sizeof(struct mca_sharedfp_base_data_t));
if ( NULL == sh ) {
opal_output(0, "mca_sharedfp_sm_file_open: Error, unable to malloc f_sharedfp_ptr struct\n");
free(shfileHandle);
return OMPI_ERR_OUT_OF_RESOURCE;
opal_output(0, "mca_sharedfp_sm_file_open: Error, unable to malloc f_sharedfp_ptr struct\n");
free(shfileHandle);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/*Populate the sh file structure based on the implementation*/
sh->sharedfh = shfileHandle; /* Shared file pointer*/
sh->global_offset = 0; /* Global Offset*/
sh->comm = comm; /* Communicator*/
sh->sharedfh = shfileHandle; /* Shared file pointer*/
sh->global_offset = 0; /* Global Offset*/
sh->comm = comm; /* Communicator*/
sh->selected_module_data = NULL;
rank = ompi_comm_rank ( sh->comm );
/*Open a shared memory segment which will hold the shared file pointer*/
if ( mca_sharedfp_sm_verbose ) {
printf( "mca_sharedfp_sm_file_open: allocatge shared memory segment.\n");
printf( "mca_sharedfp_sm_file_open: allocatge shared memory segment.\n");
}
@ -139,21 +140,21 @@ int mca_sharedfp_sm_file_open (struct ompi_communicator_t *comm,
/*TODO: is it necessary to write to the file first?*/
if( 0 == rank ){
memset ( &sm_offset, 0, sizeof (struct mca_sharedfp_sm_offset ));
write ( sm_fd, &sm_offset, sizeof(struct mca_sharedfp_sm_offset));
memset ( &sm_offset, 0, sizeof (struct mca_sharedfp_sm_offset ));
write ( sm_fd, &sm_offset, sizeof(struct mca_sharedfp_sm_offset));
}
comm->c_coll.coll_barrier (comm, comm->c_coll.coll_barrier_module );
/*the file has been written to, now we can map*/
sm_offset_ptr = mmap(NULL, sizeof(struct mca_sharedfp_sm_offset), PROT_READ | PROT_WRITE,
MAP_SHARED, sm_fd, 0);
MAP_SHARED, sm_fd, 0);
close(sm_fd);
if ( sm_offset_ptr==MAP_FAILED){
err = OMPI_ERROR;
printf("mca_sharedfp_sm_file_open: Error, unable to mmap file: %s\n",sm_filename);
printf("%s\n", strerror(errno));
err = OMPI_ERROR;
printf("mca_sharedfp_sm_file_open: Error, unable to mmap file: %s\n",sm_filename);
printf("%s\n", strerror(errno));
free(sm_filename);
free(sm_data);
free(sh);
@ -170,37 +171,32 @@ int mca_sharedfp_sm_file_open (struct ompi_communicator_t *comm,
if( (sm_data->mutex = sem_open(sm_data->sem_name, O_CREAT, 0644, 1)) != SEM_FAILED ) {
#elif defined(HAVE_SEM_INIT)
sm_data->mutex = &sm_offset_ptr->mutex;
if(sem_init(&sm_offset_ptr->mutex, 1, 1) != -1){
#endif
/*If opening was successful*/
/*Store the new file handle*/
sm_data->sm_offset_ptr = sm_offset_ptr;
/* Assign the sm_data to sh->selected_module_data*/
sh->selected_module_data = sm_data;
/*remember the shared file handle*/
fh->f_sharedfp_data = sh;
/*If opening was successful*/
/*Store the new file handle*/
sm_data->sm_offset_ptr = sm_offset_ptr;
/* Assign the sm_data to sh->selected_module_data*/
sh->selected_module_data = sm_data;
/*remember the shared file handle*/
fh->f_sharedfp_data = sh;
/*write initial zero*/
if(rank==0){
MPI_Offset position=0;
/*write initial zero*/
if(rank==0){
MPI_Offset position=0;
#if defined(HAVE_SEM_OPEN)
sem_wait(sm_data->mutex);
sm_offset_ptr->offset=position;
sem_post(sm_data->mutex);
#elif defined(HAVE_SEM_INIT)
sem_wait(&sm_offset_ptr->mutex);
sm_offset_ptr->offset=position;
sem_post(&sm_offset_ptr->mutex);
#endif
}
sem_wait(sm_data->mutex);
sm_offset_ptr->offset=position;
sem_post(sm_data->mutex);
}
}else{
free(sm_filename);
free(sm_data);
free(sh);
free(shfileHandle);
free(sm_data);
free(sh);
free(shfileHandle);
munmap(sm_offset_ptr, sizeof(struct mca_sharedfp_sm_offset));
err = OMPI_ERROR;
err = OMPI_ERROR;
}
comm->c_coll.coll_barrier (comm, comm->c_coll.coll_barrier_module );
@ -217,9 +213,9 @@ int mca_sharedfp_sm_file_close (mca_io_ompio_file_t *fh)
struct mca_sharedfp_sm_data * file_data=NULL;
if( NULL == fh->f_sharedfp_data ){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_file_close: shared file pointer structure not initialized\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_file_close: shared file pointer structure not initialized\n");
}
return OMPI_SUCCESS;
}
sh = fh->f_sharedfp_data;
@ -236,10 +232,10 @@ int mca_sharedfp_sm_file_close (mca_io_ompio_file_t *fh)
if (file_data->sm_offset_ptr) {
/* destroy semaphore */
#if defined(HAVE_SEM_OPEN)
sem_unlink (file_data->sem_name);
free (file_data->sem_name);
sem_unlink (file_data->sem_name);
free (file_data->sem_name);
#elif defined(HAVE_SEM_INIT)
sem_destroy(&file_data->sm_offset_ptr->mutex);
sem_destroy(&file_data->sm_offset_ptr->mutex);
#endif
/*Release the shared memory segment.*/
munmap(file_data->sm_offset_ptr,sizeof(struct mca_sharedfp_sm_offset));

Просмотреть файл

@ -37,7 +37,7 @@ mca_sharedfp_sm_get_position(mca_io_ompio_file_t *fh,
if(fh->f_sharedfp_data==NULL){
opal_output(ompi_sharedfp_base_framework.framework_output,
"sharedfp_sm_write - opening the shared file pointer\n");
"sharedfp_sm_write - opening the shared file pointer\n");
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,

Просмотреть файл

@ -26,10 +26,10 @@
#include "ompi/mca/sharedfp/sharedfp.h"
int mca_sharedfp_sm_iread(mca_io_ompio_file_t *fh,
void *buf,
int count,
ompi_datatype_t *datatype,
MPI_Request * request)
void *buf,
int count,
ompi_datatype_t *datatype,
MPI_Request * request)
{
int ret = OMPI_SUCCESS;
OMPI_MPI_OFFSET_TYPE offset = 0;
@ -39,9 +39,9 @@ int mca_sharedfp_sm_iread(mca_io_ompio_file_t *fh,
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if( NULL == fh->f_sharedfp_data){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: opening the shared file pointer\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
@ -63,15 +63,15 @@ int mca_sharedfp_sm_iread(mca_io_ompio_file_t *fh,
sh = fh->f_sharedfp_data;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: Bytes Requested is %ld\n",bytesRequested);
printf("sharedfp_sm_iread: Bytes Requested is %ld\n",bytesRequested);
}
/*Request the offset to write bytesRequested bytes*/
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offset);
if ( -1 != ret ) {
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: Offset received is %lld\n",offset);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iread: Offset received is %lld\n",offset);
}
/* Read the file */
ret = ompio_io_ompio_file_iread_at(sh->sharedfh,offset,buf,count,datatype,request);
}
@ -90,8 +90,8 @@ int mca_sharedfp_sm_read_ordered_begin(mca_io_ompio_file_t *fh,
int mca_sharedfp_sm_read_ordered_end(mca_io_ompio_file_t *fh,
void *buf,
ompi_status_public_t *status)
void *buf,
ompi_status_public_t *status)
{
opal_output(0,"mca_sharedfp_sm_read_ordered_end: NOT IMPLEMENTED\n");
return OMPI_ERROR;

Просмотреть файл

@ -39,20 +39,20 @@ int mca_sharedfp_sm_iwrite(mca_io_ompio_file_t *fh,
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if( NULL == fh->f_sharedfp_data){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iwrite - opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iwrite - opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
fh->f_filename,
fh->f_amode,
fh->f_info,
fh);
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_sm_iwrite - error opening the shared file pointer\n");
return ret;
}
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
fh->f_filename,
fh->f_amode,
fh->f_info,
fh);
if ( OMPI_SUCCESS != ret ) {
opal_output(0,"sharedfp_sm_iwrite - error opening the shared file pointer\n");
return ret;
}
}
/* Calculate the number of bytes to write */
@ -63,15 +63,15 @@ int mca_sharedfp_sm_iwrite(mca_io_ompio_file_t *fh,
sh = fh->f_sharedfp_data;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iwrite: Bytes Requested is %ld\n",bytesRequested);
printf("sharedfp_sm_iwrite: Bytes Requested is %ld\n",bytesRequested);
}
/* Request the offset to write bytesRequested bytes */
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offset);
if ( -1 != ret ) {
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iwrite: Offset received is %lld\n",offset);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_iwrite: Offset received is %lld\n",offset);
}
/* Write to the file */
ret = ompio_io_ompio_file_iwrite_at(sh->sharedfh,offset,buf,count,datatype,request);
}

Просмотреть файл

@ -36,9 +36,9 @@ int mca_sharedfp_sm_read ( mca_io_ompio_file_t *fh,
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if( NULL == fh->f_sharedfp_data){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_read - opening the shared file pointer\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_read - opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
@ -60,16 +60,16 @@ int mca_sharedfp_sm_read ( mca_io_ompio_file_t *fh,
sh = fh->f_sharedfp_data;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_read: Bytes Requested is %ld\n",bytesRequested);
printf("sharedfp_sm_read: Bytes Requested is %ld\n",bytesRequested);
}
/*Request the offset to write bytesRequested bytes*/
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offset);
if ( -1 != ret ) {
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_read: Offset received is %lld\n",offset);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_read: Offset received is %lld\n",offset);
}
/* Read the file */
ret = ompio_io_ompio_file_read_at(sh->sharedfh,offset,buf,count,datatype,status);
@ -98,9 +98,9 @@ int mca_sharedfp_sm_read_ordered (mca_io_ompio_file_t *fh,
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if ( NULL == fh->f_sharedfp_data){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_read_ordered: opening the shared file pointer\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_read_ordered: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
@ -132,10 +132,10 @@ int mca_sharedfp_sm_read_ordered (mca_io_ompio_file_t *fh,
}
ret = sh->comm->c_coll.coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE,
buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_gather_module );
buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_gather_module );
if( OMPI_SUCCESS != ret){
goto exit;
goto exit;
}
/* All the counts are present now in the recvBuff.
@ -143,25 +143,25 @@ int mca_sharedfp_sm_read_ordered (mca_io_ompio_file_t *fh,
*/
if ( 0 == rank ) {
for (i = 0; i < size ; i ++) {
bytesRequested += buff[i];
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_read_ordered: Bytes requested are %ld\n",bytesRequested);
}
bytesRequested += buff[i];
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_read_ordered: Bytes requested are %ld\n",bytesRequested);
}
}
/* Request the offset to read bytesRequested bytes
** only the root process needs to do the request,
** since the root process will then tell the other
** processes at what offset they should read their
** share of the data.
*/
** only the root process needs to do the request,
** since the root process will then tell the other
** processes at what offset they should read their
** share of the data.
*/
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offsetReceived);
if( OMPI_SUCCESS != ret){
goto exit;
goto exit;
}
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_read_ordered: Offset received is %lld\n",offsetReceived);
}
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_read_ordered: Offset received is %lld\n",offsetReceived);
}
buff[0] += offsetReceived;
for (i = 1 ; i < size; i++) {
@ -171,16 +171,16 @@ int mca_sharedfp_sm_read_ordered (mca_io_ompio_file_t *fh,
/* Scatter the results to the other processes*/
ret = sh->comm->c_coll.coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
&offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
&offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
if( OMPI_SUCCESS != ret){
goto exit;
goto exit;
}
/*Each process now has its own individual offset in recvBUFF*/
offset = offsetBuff - sendBuff;
if ( mca_sharedfp_sm_verbose ) {
printf("mca_sharedfp_sm_read_ordered: Offset returned is %lld\n",offset);
printf("mca_sharedfp_sm_read_ordered: Offset returned is %lld\n",offset);
}
/* read to the file */
@ -188,7 +188,7 @@ int mca_sharedfp_sm_read_ordered (mca_io_ompio_file_t *fh,
exit:
if ( NULL != buff ) {
free ( buff );
free ( buff );
}
return ret;

Просмотреть файл

@ -10,6 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013-2015 University of Houston. All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -41,46 +42,38 @@ int mca_sharedfp_sm_request_position(struct mca_sharedfp_base_data_t * sh,
*offset = 0;
if ( mca_sharedfp_sm_verbose ) {
printf("Aquiring lock, rank=%d...",rank);
printf("Aquiring lock, rank=%d...",rank);
}
sm_offset_ptr = sm_data->sm_offset_ptr;
/* Aquire an exclusive lock */
#if defined(HAVE_SEM_OPEN)
sem_wait(sm_data->mutex);
#elif defined(HAVE_SEM_INIT)
sem_wait(&sm_offset_ptr->mutex);
#endif
if ( mca_sharedfp_sm_verbose ) {
printf("Succeeded! Acquired sm lock.for rank=%d\n",rank);
printf("Succeeded! Acquired sm lock.for rank=%d\n",rank);
}
old_offset=sm_offset_ptr->offset;
if ( mca_sharedfp_sm_verbose ) {
printf("Read last_offset=%lld!\n",old_offset);
printf("Read last_offset=%lld!\n",old_offset);
}
position = old_offset + bytes_requested;
if ( mca_sharedfp_sm_verbose ) {
printf("old_offset=%lld, bytes_requested=%d, new offset=%lld!\n",old_offset,bytes_requested,position);
printf("old_offset=%lld, bytes_requested=%d, new offset=%lld!\n",old_offset,bytes_requested,position);
}
sm_offset_ptr->offset=position;
if ( mca_sharedfp_sm_verbose ) {
printf("Releasing sm lock...rank=%d",rank);
printf("Releasing sm lock...rank=%d",rank);
}
#if defined(HAVE_SEM_OPEN)
sem_post(sm_data->mutex);
#elif defined(HAVE_SEM_INIT)
sem_post(&sm_offset_ptr->mutex);
#endif
if ( mca_sharedfp_sm_verbose ) {
printf("Released lock! released lock.for rank=%d\n",rank);
printf("Released lock! released lock.for rank=%d\n",rank);
}
*offset = old_offset;

Просмотреть файл

@ -10,6 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013-2015 University of Houston. All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -41,9 +42,9 @@ mca_sharedfp_sm_seek (mca_io_ompio_file_t *fh,
struct mca_sharedfp_sm_offset * sm_offset_ptr = NULL;
if( NULL == fh->f_sharedfp_data ) {
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: opening the shared file pointer\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
@ -67,40 +68,40 @@ mca_sharedfp_sm_seek (mca_io_ompio_file_t *fh,
opal_output(0,"sharedfp_sm_seek - MPI_SEEK_SET, offset must be > 0, got offset=%lld.\n",offset);
ret = -1;
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: MPI_SEEK_SET new_offset=%lld\n",offset);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: MPI_SEEK_SET new_offset=%lld\n",offset);
}
}
else if( MPI_SEEK_CUR == whence){
else if( MPI_SEEK_CUR == whence){
OMPI_MPI_OFFSET_TYPE current_position;
ret = mca_sharedfp_sm_get_position ( fh, &current_position);
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: MPI_SEEK_CUR: curr=%lld, offset=%lld, call status=%d\n",
current_position,offset,status);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: MPI_SEEK_CUR: curr=%lld, offset=%lld, call status=%d\n",
current_position,offset,status);
}
offset = current_position + offset;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: MPI_SEEK_CUR: new_offset=%lld\n",offset);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: MPI_SEEK_CUR: new_offset=%lld\n",offset);
}
if(offset < 0){
opal_output(0,"sharedfp_sm_seek - MPI_SEEK_CURE, offset must be > 0, got offset=%lld.\n",offset);
ret = -1;
}
}
else if( MPI_SEEK_END == whence){
else if( MPI_SEEK_END == whence){
end_position=0;
ompio_io_ompio_file_get_size(sh->sharedfh,&end_position);
offset = end_position + offset;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: MPI_SEEK_END: file_get_size=%lld\n",end_position);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: MPI_SEEK_END: file_get_size=%lld\n",end_position);
}
if(offset < 0){
opal_output(0,"sharedfp_sm_seek - MPI_SEEK_CUR, offset must be > 0, got offset=%lld.\n",offset);
ret = -1;
}
}
else {
else {
opal_output(0,"sharedfp_sm_seek - whence=%i is not supported\n",whence);
ret = -1;
}
@ -114,31 +115,23 @@ mca_sharedfp_sm_seek (mca_io_ompio_file_t *fh,
/*-------------------*/
/*lock the file */
/*--------------------*/
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: Aquiring lock, rank=%d...",rank); fflush(stdout);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: Aquiring lock, rank=%d...",rank); fflush(stdout);
}
/* Aquire an exclusive lock */
sm_offset_ptr = sm_data->sm_offset_ptr;
#if defined(HAVE_SEM_OPEN)
sem_wait(sm_data->mutex);
#elif defined(HAVE_SEM_INIT)
sem_wait(&sm_offset_ptr->mutex);
#endif
sem_wait(sm_data->mutex);
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: Success! Acquired sm lock.for rank=%d\n",rank);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: Success! Acquired sm lock.for rank=%d\n",rank);
}
sm_offset_ptr->offset=offset;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: Releasing sm lock...rank=%d",rank); fflush(stdout);
}
#if defined(HAVE_SEM_OPEN)
sem_post(sm_data->mutex);
#elif defined(HAVE_SEM_INIT)
sem_post(&sm_offset_ptr->mutex);
#endif
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_seek: Releasing sm lock...rank=%d",rank); fflush(stdout);
}
sem_post(sm_data->mutex);
}
/* since we are only letting process 0, update the current pointer

Просмотреть файл

@ -39,9 +39,9 @@ int mca_sharedfp_sm_write (mca_io_ompio_file_t *fh,
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if( NULL == fh->f_sharedfp_data ){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write: opening the shared file pointer\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
@ -63,15 +63,15 @@ int mca_sharedfp_sm_write (mca_io_ompio_file_t *fh,
sh = fh->f_sharedfp_data;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write: Requested is %ld\n",bytesRequested);
printf("sharedfp_sm_write: Requested is %ld\n",bytesRequested);
}
/*Request the offset to write bytesRequested bytes*/
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offset);
if ( -1 != ret ) {
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write: fset received is %lld\n",offset);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write: fset received is %lld\n",offset);
}
/* Write to the file*/
ret = ompio_io_ompio_file_write_at(sh->sharedfh,offset,buf,count,datatype,status);
@ -102,9 +102,9 @@ int mca_sharedfp_sm_write_ordered (mca_io_ompio_file_t *fh,
mca_sharedfp_base_module_t * shared_fp_base_module = NULL;
if( NULL == fh->f_sharedfp_data){
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write_ordered: opening the shared file pointer\n");
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write_ordered: opening the shared file pointer\n");
}
shared_fp_base_module = fh->f_sharedfp;
ret = shared_fp_base_module->sharedfp_file_open(fh->f_comm,
@ -137,10 +137,10 @@ int mca_sharedfp_sm_write_ordered (mca_io_ompio_file_t *fh,
}
ret = sh->comm->c_coll.coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE,
buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_gather_module );
buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_gather_module );
if ( OMPI_SUCCESS != ret ) {
goto exit;
goto exit;
}
/* All the counts are present now in the recvBuff.
@ -148,25 +148,25 @@ int mca_sharedfp_sm_write_ordered (mca_io_ompio_file_t *fh,
*/
if ( 0 == rank ) {
for (i = 0; i < size ; i ++) {
bytesRequested += buff[i];
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write_ordered: Bytes requested are %ld\n",bytesRequested);
}
}
bytesRequested += buff[i];
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write_ordered: Bytes requested are %ld\n",bytesRequested);
}
}
/* Request the offset to write bytesRequested bytes
** only the root process needs to do the request,
** since the root process will then tell the other
** processes at what offset they should write their
** share of the data.
*/
** only the root process needs to do the request,
** since the root process will then tell the other
** processes at what offset they should write their
** share of the data.
*/
ret = mca_sharedfp_sm_request_position(sh,bytesRequested,&offsetReceived);
if( OMPI_SUCCESS != ret){
goto exit;
goto exit;
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write_ordered: Offset received is %lld\n",offsetReceived);
}
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write_ordered: Offset received is %lld\n",offsetReceived);
}
buff[0] += offsetReceived;
for (i = 1 ; i < size; i++) {
@ -176,25 +176,25 @@ int mca_sharedfp_sm_write_ordered (mca_io_ompio_file_t *fh,
/* Scatter the results to the other processes*/
ret = sh->comm->c_coll.coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
&offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
&offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
sh->comm, sh->comm->c_coll.coll_scatter_module );
if ( OMPI_SUCCESS != ret ) {
goto exit;
goto exit;
}
/* Each process now has its own individual offset */
offset = offsetBuff - sendBuff;
if ( mca_sharedfp_sm_verbose ) {
printf("sharedfp_sm_write_ordered: Offset returned is %lld\n",offset);
printf("sharedfp_sm_write_ordered: Offset returned is %lld\n",offset);
}
/* write to the file */
ret = ompio_io_ompio_file_write_at_all(sh->sharedfh,offset,buf,count,datatype,status);
exit:
if ( NULL != buff ) {
free ( buff );
free ( buff );
}
return ret;