1
1

fcoll/vulcan: Support of asynchronous write in collective writeAll

We introduced a new mca_vulcan parameter that specify the I/O synchronization
type (Async/sync I/O) applied within the collective write operation.
The user can explicitly choose to use async or sync write operation or make
the choice automatically made.

Signed-off-by: raafatfeki <fekiraafat@gmail.com>
Этот коммит содержится в:
raafatfeki 2018-06-06 12:30:47 -05:00 коммит произвёл Edgar Gabriel
родитель 4f7172ddf6
Коммит 5ecb4a56e3
3 изменённых файлов: 140 добавлений и 82 удалений

Просмотреть файл

@ -37,6 +37,7 @@ BEGIN_C_DECLS
extern int mca_fcoll_vulcan_priority;
extern int mca_fcoll_vulcan_num_groups;
extern int mca_fcoll_vulcan_write_chunksize;
extern int mca_fcoll_vulcan_async_io;
OMPI_MODULE_DECLSPEC extern mca_fcoll_base_component_2_0_0_t mca_fcoll_vulcan_component;

Просмотреть файл

@ -43,6 +43,7 @@ const char *mca_fcoll_vulcan_component_version_string =
int mca_fcoll_vulcan_priority = 10;
int mca_fcoll_vulcan_num_groups = 1;
int mca_fcoll_vulcan_write_chunksize = -1;
int mca_fcoll_vulcan_async_io = 0;
/*
* Local function
@ -102,5 +103,13 @@ vulcan_register(void)
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_write_chunksize);
mca_fcoll_vulcan_async_io = 0;
(void) mca_base_component_var_register(&mca_fcoll_vulcan_component.fcollm_version,
"async_io", "Asynchronous I/O support options. 0: Automatic choice (default) "
"1: Asynchronous I/O only. 2: Synchronous I/O only.",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_async_io);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -28,11 +28,11 @@
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/io/ompio/io_ompio_request.h"
#include "math.h"
#include "ompi/mca/pml/pml.h"
#include <unistd.h>
#define DEBUG_ON 0
#define FCOLL_VULCAN_SHUFFLE_TAG 123
#define INIT_LEN 10
@ -91,8 +91,8 @@ typedef struct mca_io_ompio_aggregator_data {
static int shuffle_init ( int index, int cycles, int aggregator, int rank,
mca_io_ompio_aggregator_data *data,
ompi_request_t **reqs );
static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize );
static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data,
int write_chunksize, int write_synchType, ompi_request_t **request);
int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count,
struct iovec *local_iov_array, int local_count,
struct iovec ***broken_decoded_iovs, int **broken_iov_counts,
@ -131,8 +131,8 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
struct iovec *local_iov_array=NULL;
uint32_t total_fview_count = 0;
int local_count = 0;
ompi_request_t **reqs1=NULL,**reqs2=NULL;
ompi_request_t **curr_reqs=NULL,**prev_reqs=NULL;
ompi_request_t **reqs = NULL;
ompi_request_t *req_iwrite = MPI_REQUEST_NULL;
mca_io_ompio_aggregator_data **aggr_data=NULL;
int *displs = NULL;
@ -147,6 +147,8 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
MPI_Aint *broken_total_lengths=NULL;
int *aggregators=NULL;
int aggr_index = NOT_AGGR_INDEX;
int write_synch_type = 2;
int write_chunksize, *result_counts=NULL;
int stripe_size_org;
@ -171,6 +173,13 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
ret = OMPI_ERROR;
goto exit;
}
if( (1 == mca_fcoll_vulcan_async_io) && (NULL == fh->f_fbtl->fbtl_ipwritev) ) {
opal_output (1, "vulcan_write_all: fbtl Does NOT support ipwritev() (asynchrounous write) \n");
ret = MPI_ERR_UNSUPPORTED_OPERATION;
goto exit;
}
/* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
the user requested */
bytes_per_cycle =bytes_per_cycle/2;
@ -209,6 +218,11 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
aggr_data[i]->procs_in_group = fh->f_procs_in_group;
aggr_data[i]->comm = fh->f_comm;
aggr_data[i]->buf = (char *)buf; // should not be used in the new version.
// Identify if the process is an aggregator.
// If so, aggr_index would be its index in "aggr_data" and "aggregators" arrays.
if(aggregators[i] == fh->f_rank) {
aggr_index = i;
}
}
/*********************************************************************
@ -538,98 +552,95 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
#endif
}
int aggr_index = NOT_AGGR_INDEX;
reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *));
reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *));
if ( NULL == reqs1 || NULL == reqs2 ) {
reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *));
if ( NULL == reqs ) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
for (l=0,i=0; i < vulcan_num_io_procs; i++ ) {
for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
reqs1[l] = MPI_REQUEST_NULL;
reqs2[l] = MPI_REQUEST_NULL;
reqs[l] = MPI_REQUEST_NULL;
l++;
}
}
curr_reqs = reqs1;
prev_reqs = reqs2;
// In fact it should be: if ((1 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipwritev))
// But we've already tested that.
if( (1 == mca_fcoll_vulcan_async_io) ||
( (0 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipwritev) && (2 < cycles) ) ) {
write_synch_type = 1;
}
/* Initialize communication for iteration 0 */
if ( cycles > 0 ) {
for ( i=0; i<vulcan_num_io_procs; i++ ) {
ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&curr_reqs[i*(fh->f_procs_per_group + 1)] );
if(aggregators[i] == fh->f_rank) {
aggr_index = i;
}
&reqs[i*(fh->f_procs_per_group + 1)] );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
}
// Register progress function that should be used by ompi_request_wait
if ((NOT_AGGR_INDEX != aggr_index) && (false == mca_io_ompio_progress_is_registered)) {
opal_progress_register (mca_io_ompio_component_progress);
mca_io_ompio_progress_is_registered=true;
}
}
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*vulcan_num_io_procs,
reqs, MPI_STATUS_IGNORE);
for (index = 1; index < cycles; index++) {
SWAP_REQUESTS(curr_reqs,prev_reqs);
SWAP_AGGR_POINTERS(aggr_data,vulcan_num_io_procs);
SWAP_AGGR_POINTERS(aggr_data, vulcan_num_io_procs);
if(NOT_AGGR_INDEX != aggr_index) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index],
write_chunksize, write_synch_type, &req_iwrite);
if (OMPI_SUCCESS != ret){
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif
}
/* Initialize communication for iteration i */
for ( i=0; i<vulcan_num_io_procs; i++ ) {
ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&curr_reqs[i*(fh->f_procs_per_group + 1)] );
&reqs[i*(fh->f_procs_per_group + 1)] );
if ( OMPI_SUCCESS != ret ) {
goto exit;
}
}
/* Finish communication for iteration i-1 */
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*vulcan_num_io_procs,
prev_reqs, MPI_STATUS_IGNORE);
reqs, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
/* Write data for iteration i-1 only by an aggregator*/
if(NOT_AGGR_INDEX != aggr_index) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], write_chunksize );
ret = ompi_request_wait(&req_iwrite, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif
}
} /* end for (index = 0; index < cycles; index++) */
/* Finish communication for iteration i = cycles-1 */
if ( cycles > 0 ) {
SWAP_REQUESTS(curr_reqs,prev_reqs);
SWAP_AGGR_POINTERS(aggr_data,vulcan_num_io_procs);
ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*vulcan_num_io_procs,
prev_reqs, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
/* Write data for iteration i=cycles-1 */
if(NOT_AGGR_INDEX != aggr_index) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], write_chunksize );
ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index],
write_chunksize, write_synch_type, &req_iwrite);
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -638,8 +649,14 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
write_time += end_write_time - start_write_time;
#endif
}
}
if(NOT_AGGR_INDEX != aggr_index) {
ret = ompi_request_wait(&req_iwrite, MPI_STATUS_IGNORE);
if (OMPI_SUCCESS != ret){
goto exit;
}
}
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_exch = MPI_Wtime();
@ -717,44 +734,75 @@ exit :
free(fh->f_procs_in_group);
fh->f_procs_in_group=NULL;
fh->f_procs_per_group=0;
free(reqs1);
free(reqs2);
free(result_counts);
free(reqs);
fh->f_stripe_size=stripe_size_org;
return OMPI_SUCCESS;
}
static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize )
static int write_init (mca_io_ompio_file_t *fh,
int aggregator,
mca_io_ompio_aggregator_data *aggr_data,
int write_chunksize,
int write_synchType,
ompi_request_t **request )
{
int ret=OMPI_SUCCESS;
int last_array_pos=0;
int last_pos=0;
int ret = OMPI_SUCCESS;
ssize_t ret_temp = 0;
int last_array_pos = 0;
int last_pos = 0;
mca_ompio_request_t *ompio_req = NULL;
ompio_req = OBJ_NEW(mca_ompio_request_t);
ompio_req->req_type = MCA_OMPIO_REQUEST_WRITE;
ompio_req->req_ompi.req_state = OMPI_REQUEST_ACTIVE;
if (aggr_data->prev_num_io_entries) {
while ( aggr_data->prev_bytes_to_write > 0 ) {
aggr_data->prev_bytes_to_write -= mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array,
aggr_data->prev_num_io_entries,
&last_array_pos, &last_pos,
write_chunksize );
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
free ( aggr_data->prev_io_array);
opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n");
ret = OMPI_ERROR;
goto exit;
/* In this case, aggr_data->prev_num_io_entries is always == 1.
Therefore we can write the data of size aggr_data->prev_bytes_to_write in one iteration.
In fact, aggr_data->prev_bytes_to_write <= write_chunksize.
*/
mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array,
aggr_data->prev_num_io_entries,
&last_array_pos, &last_pos,
write_chunksize);
if (1 == write_synchType) {
ret = fh->f_fbtl->fbtl_ipwritev(fh, (ompi_request_t *) ompio_req);
if(0 > ret) {
opal_output (1, "vulcan_write_all: fbtl_ipwritev failed\n");
ompio_req->req_ompi.req_status.MPI_ERROR = ret;
ompio_req->req_ompi.req_status._ucount = 0;
}
}
free ( fh->f_io_array );
free ( aggr_data->prev_io_array);
else {
ret_temp = fh->f_fbtl->fbtl_pwritev(fh);
if(0 > ret_temp) {
opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n");
ret = ret_temp;
ret_temp = 0;
}
ompio_req->req_ompi.req_status.MPI_ERROR = ret;
ompio_req->req_ompi.req_status._ucount = ret_temp;
ompi_request_complete (&ompio_req->req_ompi, false);
}
free(fh->f_io_array);
free(aggr_data->prev_io_array);
}
else {
ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
ompio_req->req_ompi.req_status._ucount = 0;
ompi_request_complete (&ompio_req->req_ompi, false);
}
exit:
*request = (ompi_request_t *) ompio_req;
fh->f_io_array=NULL;
fh->f_num_of_io_entries=0;
return ret;
}