diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h index 0b5f3b742d..cf9cecaeae 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h @@ -37,6 +37,7 @@ BEGIN_C_DECLS extern int mca_fcoll_vulcan_priority; extern int mca_fcoll_vulcan_num_groups; extern int mca_fcoll_vulcan_write_chunksize; +extern int mca_fcoll_vulcan_async_io; OMPI_MODULE_DECLSPEC extern mca_fcoll_base_component_2_0_0_t mca_fcoll_vulcan_component; diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c index 449cf00028..09a80810d0 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c @@ -43,6 +43,7 @@ const char *mca_fcoll_vulcan_component_version_string = int mca_fcoll_vulcan_priority = 10; int mca_fcoll_vulcan_num_groups = 1; int mca_fcoll_vulcan_write_chunksize = -1; +int mca_fcoll_vulcan_async_io = 0; /* * Local function @@ -102,5 +103,13 @@ vulcan_register(void) OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_write_chunksize); + mca_fcoll_vulcan_async_io = 0; + (void) mca_base_component_var_register(&mca_fcoll_vulcan_component.fcollm_version, + "async_io", "Asynchronous I/O support options. 0: Automatic choice (default) " + "1: Asynchronous I/O only. 2: Synchronous I/O only.", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_async_io); + return OMPI_SUCCESS; } diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c index d6867eeb14..bde2110482 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c @@ -28,11 +28,11 @@ #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/io/ompio/io_ompio.h" #include "ompi/mca/io/io.h" +#include "ompi/mca/io/ompio/io_ompio_request.h" #include "math.h" #include "ompi/mca/pml/pml.h" #include - #define DEBUG_ON 0 #define FCOLL_VULCAN_SHUFFLE_TAG 123 #define INIT_LEN 10 @@ -91,8 +91,8 @@ typedef struct mca_io_ompio_aggregator_data { static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, ompi_request_t **reqs ); -static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize ); - +static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, + int write_chunksize, int write_synchType, ompi_request_t **request); int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count, struct iovec *local_iov_array, int local_count, struct iovec ***broken_decoded_iovs, int **broken_iov_counts, @@ -131,8 +131,8 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, struct iovec *local_iov_array=NULL; uint32_t total_fview_count = 0; int local_count = 0; - ompi_request_t **reqs1=NULL,**reqs2=NULL; - ompi_request_t **curr_reqs=NULL,**prev_reqs=NULL; + ompi_request_t **reqs = NULL; + ompi_request_t *req_iwrite = MPI_REQUEST_NULL; mca_io_ompio_aggregator_data **aggr_data=NULL; int *displs = NULL; @@ -147,6 +147,8 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, MPI_Aint *broken_total_lengths=NULL; int *aggregators=NULL; + int aggr_index = NOT_AGGR_INDEX; + int write_synch_type = 2; int write_chunksize, *result_counts=NULL; int stripe_size_org; @@ -171,6 +173,13 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, ret = OMPI_ERROR; goto exit; } + + if( (1 == mca_fcoll_vulcan_async_io) && (NULL == fh->f_fbtl->fbtl_ipwritev) ) { + opal_output (1, "vulcan_write_all: fbtl Does NOT support ipwritev() (asynchrounous write) \n"); + ret = MPI_ERR_UNSUPPORTED_OPERATION; + goto exit; + } + /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what the user requested */ bytes_per_cycle =bytes_per_cycle/2; @@ -209,6 +218,11 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, aggr_data[i]->procs_in_group = fh->f_procs_in_group; aggr_data[i]->comm = fh->f_comm; aggr_data[i]->buf = (char *)buf; // should not be used in the new version. + // Identify if the process is an aggregator. + // If so, aggr_index would be its index in "aggr_data" and "aggregators" arrays. + if(aggregators[i] == fh->f_rank) { + aggr_index = i; + } } /********************************************************************* @@ -538,98 +552,95 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, #endif } - int aggr_index = NOT_AGGR_INDEX; - reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *)); - reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *)); - if ( NULL == reqs1 || NULL == reqs2 ) { + reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *)); + + if ( NULL == reqs ) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } + for (l=0,i=0; i < vulcan_num_io_procs; i++ ) { for ( j=0; j< (fh->f_procs_per_group+1); j++ ) { - reqs1[l] = MPI_REQUEST_NULL; - reqs2[l] = MPI_REQUEST_NULL; + reqs[l] = MPI_REQUEST_NULL; l++; } } - curr_reqs = reqs1; - prev_reqs = reqs2; - - /* Initialize communication for iteration 0 */ + // In fact it should be: if ((1 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipwritev)) + // But we've already tested that. + if( (1 == mca_fcoll_vulcan_async_io) || + ( (0 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipwritev) && (2 < cycles) ) ) { + write_synch_type = 1; + } + if ( cycles > 0 ) { for ( i=0; if_rank, aggr_data[i], - &curr_reqs[i*(fh->f_procs_per_group + 1)] ); - - if(aggregators[i] == fh->f_rank) { - aggr_index = i; - } - + ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i], + &reqs[i*(fh->f_procs_per_group + 1)] ); if ( OMPI_SUCCESS != ret ) { goto exit; } } + // Register progress function that should be used by ompi_request_wait + if ((NOT_AGGR_INDEX != aggr_index) && (false == mca_io_ompio_progress_is_registered)) { + opal_progress_register (mca_io_ompio_component_progress); + mca_io_ompio_progress_is_registered=true; + } } + ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*vulcan_num_io_procs, + reqs, MPI_STATUS_IGNORE); for (index = 1; index < cycles; index++) { - SWAP_REQUESTS(curr_reqs,prev_reqs); - SWAP_AGGR_POINTERS(aggr_data,vulcan_num_io_procs); - - /* Initialize communication for iteration i */ + SWAP_AGGR_POINTERS(aggr_data, vulcan_num_io_procs); + + if(NOT_AGGR_INDEX != aggr_index) { +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_write_time = MPI_Wtime(); +#endif + ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], + write_chunksize, write_synch_type, &req_iwrite); + if (OMPI_SUCCESS != ret){ + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_write_time = MPI_Wtime(); + write_time += end_write_time - start_write_time; +#endif + } + for ( i=0; if_rank, aggr_data[i], - &curr_reqs[i*(fh->f_procs_per_group + 1)] ); + ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i], + &reqs[i*(fh->f_procs_per_group + 1)] ); if ( OMPI_SUCCESS != ret ) { goto exit; } } - - /* Finish communication for iteration i-1 */ - ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*vulcan_num_io_procs, - prev_reqs, MPI_STATUS_IGNORE); + + ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*vulcan_num_io_procs, + reqs, MPI_STATUS_IGNORE); if (OMPI_SUCCESS != ret){ goto exit; } - - - /* Write data for iteration i-1 only by an aggregator*/ + if(NOT_AGGR_INDEX != aggr_index) { -#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN - start_write_time = MPI_Wtime(); -#endif - ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], write_chunksize ); + ret = ompi_request_wait(&req_iwrite, MPI_STATUS_IGNORE); if (OMPI_SUCCESS != ret){ goto exit; } -#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN - end_write_time = MPI_Wtime(); - write_time += end_write_time - start_write_time; -#endif } - } /* end for (index = 0; index < cycles; index++) */ - - - /* Finish communication for iteration i = cycles-1 */ + if ( cycles > 0 ) { - SWAP_REQUESTS(curr_reqs,prev_reqs); - SWAP_AGGR_POINTERS(aggr_data,vulcan_num_io_procs); - - ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*vulcan_num_io_procs, - prev_reqs, MPI_STATUS_IGNORE); - if (OMPI_SUCCESS != ret){ - goto exit; - } - - /* Write data for iteration i=cycles-1 */ + SWAP_AGGR_POINTERS(aggr_data,vulcan_num_io_procs); + if(NOT_AGGR_INDEX != aggr_index) { #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_write_time = MPI_Wtime(); #endif - ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], write_chunksize ); + ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], + write_chunksize, write_synch_type, &req_iwrite); if (OMPI_SUCCESS != ret){ goto exit; } @@ -638,8 +649,14 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh, write_time += end_write_time - start_write_time; #endif } + + if(NOT_AGGR_INDEX != aggr_index) { + ret = ompi_request_wait(&req_iwrite, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } } - #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN end_exch = MPI_Wtime(); @@ -717,44 +734,75 @@ exit : free(fh->f_procs_in_group); fh->f_procs_in_group=NULL; fh->f_procs_per_group=0; - free(reqs1); - free(reqs2); free(result_counts); - + free(reqs); fh->f_stripe_size=stripe_size_org; return OMPI_SUCCESS; } - -static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize ) +static int write_init (mca_io_ompio_file_t *fh, + int aggregator, + mca_io_ompio_aggregator_data *aggr_data, + int write_chunksize, + int write_synchType, + ompi_request_t **request ) { - int ret=OMPI_SUCCESS; - int last_array_pos=0; - int last_pos=0; - + int ret = OMPI_SUCCESS; + ssize_t ret_temp = 0; + int last_array_pos = 0; + int last_pos = 0; + mca_ompio_request_t *ompio_req = NULL; + + ompio_req = OBJ_NEW(mca_ompio_request_t); + ompio_req->req_type = MCA_OMPIO_REQUEST_WRITE; + ompio_req->req_ompi.req_state = OMPI_REQUEST_ACTIVE; if (aggr_data->prev_num_io_entries) { - while ( aggr_data->prev_bytes_to_write > 0 ) { - aggr_data->prev_bytes_to_write -= mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array, - aggr_data->prev_num_io_entries, - &last_array_pos, &last_pos, - write_chunksize ); - if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) { - free ( aggr_data->prev_io_array); - opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n"); - ret = OMPI_ERROR; - goto exit; + /* In this case, aggr_data->prev_num_io_entries is always == 1. + Therefore we can write the data of size aggr_data->prev_bytes_to_write in one iteration. + In fact, aggr_data->prev_bytes_to_write <= write_chunksize. + */ + mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array, + aggr_data->prev_num_io_entries, + &last_array_pos, &last_pos, + write_chunksize); + + if (1 == write_synchType) { + ret = fh->f_fbtl->fbtl_ipwritev(fh, (ompi_request_t *) ompio_req); + if(0 > ret) { + opal_output (1, "vulcan_write_all: fbtl_ipwritev failed\n"); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = 0; } } - free ( fh->f_io_array ); - free ( aggr_data->prev_io_array); - } + else { + ret_temp = fh->f_fbtl->fbtl_pwritev(fh); + if(0 > ret_temp) { + opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n"); + ret = ret_temp; + ret_temp = 0; + } -exit: + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = ret_temp; + ompi_request_complete (&ompio_req->req_ompi, false); + } + + free(fh->f_io_array); + free(aggr_data->prev_io_array); + } + else { + ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + ompio_req->req_ompi.req_status._ucount = 0; + ompi_request_complete (&ompio_req->req_ompi, false); + } + + *request = (ompi_request_t *) ompio_req; fh->f_io_array=NULL; fh->f_num_of_io_entries=0; + return ret; }