1
1
openmpi/ompi/mca/sharedfp/addproc/sharedfp_addproc_file_open.c

176 строки
6.3 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013-2015 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "sharedfp_addproc.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/sharedfp/sharedfp.h"
#include "ompi/mca/pml/pml.h"
#include <semaphore.h>
#include <sys/mman.h>
#include "ompi/mca/sharedfp/base/base.h"
int mca_sharedfp_addproc_file_open (struct ompi_communicator_t *comm,
char* filename,
int amode,
struct ompi_info_t *info,
mca_io_ompio_file_t *fh)
{
int ret = OMPI_SUCCESS, err;
int rank;
struct mca_sharedfp_base_data_t* sh;
mca_io_ompio_file_t * shfileHandle, *ompio_fh;
MPI_Comm newInterComm;
struct mca_sharedfp_addproc_data * addproc_data = NULL;
mca_io_ompio_data_t *data;
/*-------------------------------------------------*/
/*Open the same file again without shared file pointer*/
/*-------------------------------------------------*/
shfileHandle = (mca_io_ompio_file_t *)malloc(sizeof(mca_io_ompio_file_t));
ret = ompio_io_ompio_file_open(comm,filename,amode,info,shfileHandle,false);
if ( OMPI_SUCCESS != ret) {
opal_output(0, "mca_sharedfp_addproc_file_open: Error during file open\n");
return ret;
}
shfileHandle->f_fh = fh->f_fh;
data = (mca_io_ompio_data_t *) fh->f_fh->f_io_selected_data;
ompio_fh = &data->ompio_fh;
err = mca_io_ompio_set_view_internal (shfileHandle,
ompio_fh->f_disp,
ompio_fh->f_etype,
ompio_fh->f_orig_filetype,
ompio_fh->f_datarep,
MPI_INFO_NULL);
/*Memory is allocated here for the sh structure*/
if ( mca_sharedfp_addproc_verbose ) {
opal_output(ompi_sharedfp_base_framework.framework_output,
"mca_sharedfp_addproc_file_open: malloc f_sharedfp_ptr struct\n");
}
sh = (struct mca_sharedfp_base_data_t*)malloc(sizeof(struct mca_sharedfp_base_data_t));
if ( NULL == sh ){
opal_output(ompi_sharedfp_base_framework.framework_output,
"mca_sharedfp_addproc_file_open: Error, unable to malloc f_sharedfp_ptr struct\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
/*Populate the sh file structure based on the implementation*/
sh->sharedfh = shfileHandle; /* Shared file pointer*/
sh->global_offset = 0; /* Global Offset*/
sh->comm = comm; /* Communicator*/
sh->selected_module_data = NULL;
rank = ompi_comm_rank ( sh->comm );
if ( mca_sharedfp_addproc_verbose ) {
opal_output(ompi_sharedfp_base_framework.framework_output,
"mca_sharedfp_addproc_file_open: START spawn by rank=%d\n",rank);
}
/*Spawn a new process which will maintain the offsets for this file open*/
ret = MPI_Comm_spawn("mca_sharedfp_addproc_control", MPI_ARGV_NULL, 1, MPI_INFO_NULL,
0, sh->comm, &newInterComm, &err);
if ( OMPI_SUCCESS != ret ) {
opal_output(0, "mca_sharedfp_addproc_file_open: error spawning control process ret=%d\n",
ret);
}
/*If spawning successful*/
if (newInterComm) {
addproc_data = (struct mca_sharedfp_addproc_data*)malloc(sizeof(struct mca_sharedfp_addproc_data));
if ( NULL == addproc_data ){
opal_output (0,"mca_sharedfp_addproc_file_open: Error, unable to malloc addproc_data struct\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
/*Store the new Intercommunicator*/
addproc_data->intercom = newInterComm;
/*save the addproc data*/
sh->selected_module_data = addproc_data;
/*remember the shared file handle*/
fh->f_sharedfp_data = sh;
}
else{
opal_output(ompi_sharedfp_base_framework.framework_output,
"mca_sharedfp_addproc_file_open: DONE spawn by rank=%d, errcode[success=%d, err=%d]=%d\n",
rank, MPI_SUCCESS, MPI_ERR_SPAWN, ret);
ret = OMPI_ERROR;
}
return ret;
}
int mca_sharedfp_addproc_file_close (mca_io_ompio_file_t *fh)
{
struct mca_sharedfp_base_data_t *sh=NULL;
int err = OMPI_SUCCESS;
long sendBuff = 0;
int count = 1;
int rank;
struct mca_sharedfp_addproc_data * addproc_data = NULL;
if ( NULL == fh->f_sharedfp_data){
/* Can happen with lazy initialization of the sharedfp structures */
if ( mca_sharedfp_addproc_verbose ) {
opal_output(0, "sharedfp_addproc_file_close - shared file pointer structure not initialized\n");
}
return OMPI_SUCCESS;
}
sh = fh->f_sharedfp_data;
rank = ompi_comm_rank ( sh->comm );
/* Make sure that all processes are ready to release the
** shared file pointer resources
*/
sh->comm->c_coll.coll_barrier(sh->comm, sh->comm->c_coll.coll_barrier_module );
addproc_data = (struct mca_sharedfp_addproc_data*)(sh->selected_module_data);
if (addproc_data) {
/*tell additional proc to stop listening*/
if(0 == rank){
MCA_PML_CALL(send( &sendBuff, count, OMPI_OFFSET_DATATYPE, 0, END_TAG,
MCA_PML_BASE_SEND_STANDARD, addproc_data->intercom));
}
/* Free intercommunicator */
if(addproc_data->intercom){
ompi_comm_free(&(addproc_data->intercom));
}
free(addproc_data);
}
/* Close the main file opened by this component*/
err = ompio_io_ompio_file_close(sh->sharedfh);
/*free shared file pointer data struct*/
free(sh);
return err;
}