/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "ompi_config.h" #ifdef HAVE_SYS_TIME_H #include #endif #ifdef HAVE_UNISTD_H #include #endif #include "opal/class/opal_object.h" #include "ompi/file/file.h" #include "opal/mca/base/base.h" #include "opal/mca/base/mca_base_param.h" #include "ompi/mca/io/base/base.h" #include "ompi/mca/io/base/io_base_request.h" /* * Public variables */ bool mca_io_base_requests_valid = false; ompi_free_list_t mca_io_base_requests; volatile int32_t mca_io_base_request_num_pending = 0; /* * Private functions */ static void io_base_request_constructor(mca_io_base_request_t *req); OBJ_CLASS_INSTANCE(mca_io_base_request_t, ompi_request_t, io_base_request_constructor, NULL); static void io_base_request_constructor(mca_io_base_request_t *req) { req->super.req_type = OMPI_REQUEST_IO; req->free_called = false; } /* * Setup the freelist of IO requests. This does not need to be * protected with a lock because it's called during MPI_INIT. */ int mca_io_base_request_create_freelist(void) { opal_list_item_t *p; const mca_base_component_t *component; const mca_io_base_component_2_0_0_t *v200; size_t size = 0; int i, init, incr; /* Find the maximum additional number of bytes required by all io components for requests and make that the request size */ for (p = opal_list_get_first(&mca_io_base_components_available); p != opal_list_get_end(&mca_io_base_components_available); p = opal_list_get_next(p)) { component = ((mca_base_component_priority_list_item_t *) p)->super.cli_component; /* Only know how to handle v2.0.0 components for now */ if (component->mca_type_major_version == 2 && component->mca_type_minor_version == 0 && component->mca_type_release_version == 0) { v200 = (mca_io_base_component_2_0_0_t *) component; if (v200->io_request_bytes > size) { size = v200->io_request_bytes; } } } /* Construct and initialized the freelist of IO requests. */ OBJ_CONSTRUCT(&mca_io_base_requests, ompi_free_list_t); mca_io_base_requests_valid = true; i = mca_base_param_find("io", NULL, "base_freelist_initial_size"); mca_base_param_lookup_int(i, &init); i = mca_base_param_find("io", NULL, "base_freelist_increment"); mca_base_param_lookup_int(i, &incr); ompi_free_list_init_new(&mca_io_base_requests, sizeof(mca_io_base_request_t) + size, CACHE_LINE_SIZE, OBJ_CLASS(mca_io_base_request_t), 0,CACHE_LINE_SIZE, init, -1, incr, NULL); /* All done */ return OMPI_SUCCESS; } /* * Return a module-specific IO MPI_Request */ int mca_io_base_request_alloc(ompi_file_t *file, mca_io_base_request_t **req) { int err; mca_io_base_module_request_once_init_fn_t func; ompi_free_list_item_t *item; /* See if we've got a request on the module's freelist (which is cached on the file, since there's only one module per MPI_File). Use a quick-but-not-entirely-accurate (but good enough) check as a slight optimization to potentially having to avoid locking and unlocking. */ if (opal_list_get_size(&file->f_io_requests) > 0) { OPAL_THREAD_LOCK(&file->f_io_requests_lock); if (opal_list_get_size(&file->f_io_requests) > 0) { *req = (mca_io_base_request_t*) opal_list_remove_first(&file->f_io_requests); (*req)->free_called = false; } else { *req = NULL; } OPAL_THREAD_UNLOCK(&file->f_io_requests_lock); } else { *req = NULL; } /* Nope, we didn't have one on the file freelist, so let's get one off the global freelist */ if (NULL == *req) { OMPI_FREE_LIST_GET(&mca_io_base_requests, item, err); *req = (mca_io_base_request_t*) item; /* Call the per-use init function, if it exists */ switch (file->f_io_version) { case MCA_IO_BASE_V_2_0_0: /* These can be set once for this request since this request will always be used with the same module (and therefore, the same MPI_File). Note that (*req)->req_ompi.rq_type is already set by the constructor. */ (*req)->req_file = file; (*req)->req_ver = file->f_io_version; (*req)->free_called = false; (*req)->super.req_free = file->f_io_selected_module.v2_0_0.io_module_request_free; (*req)->super.req_cancel = file->f_io_selected_module.v2_0_0.io_module_request_cancel; /* Call the module's once-per process init, if it exists */ func = file->f_io_selected_module.v2_0_0.io_module_request_once_init; if (NULL != func) { if (OMPI_SUCCESS != (err = func(&file->f_io_selected_module, *req))) { OMPI_FREE_LIST_RETURN(&mca_io_base_requests, item); return err; } } break; default: OMPI_FREE_LIST_RETURN(&mca_io_base_requests, item); return OMPI_ERR_NOT_IMPLEMENTED; break; } } /* Initialize the request */ OMPI_REQUEST_INIT(&((*req)->super), false); (*req)->super.req_mpi_object.file = file; /* * Copied from ompi/mca/pml/base/pml_base_recvreq.h: * always set the req_status.MPI_TAG to ANY_TAG before starting the * request. This field is used if cancelled to find out if the request * has been matched or not. */ (*req)->super.req_status.MPI_TAG = MPI_ANY_TAG; (*req)->super.req_status.MPI_ERROR = OMPI_SUCCESS; (*req)->super.req_status._count = 0; (*req)->super.req_status._cancelled = 0; /* All done */ return OMPI_SUCCESS; } /* * Free a module-specific IO MPI_Request */ OMPI_DECLSPEC void mca_io_base_request_free(ompi_file_t *file, mca_io_base_request_t *req) { /* Put the request back on the per-module freelist, since it's been initialized for that module */ OPAL_THREAD_LOCK(&file->f_io_requests_lock); opal_list_prepend(&file->f_io_requests, (opal_list_item_t*) req); OPAL_THREAD_UNLOCK(&file->f_io_requests_lock); } /* * Return all the requests in the per-file freelist to the global list */ void mca_io_base_request_return(ompi_file_t *file) { ompi_free_list_item_t *next; OPAL_THREAD_LOCK(&file->f_io_requests_lock); while (NULL != (next = (ompi_free_list_item_t*) opal_list_remove_first(&file->f_io_requests))) { OMPI_FREE_LIST_RETURN(&mca_io_base_requests, next); } OPAL_THREAD_UNLOCK(&file->f_io_requests_lock); } #if OMPI_ENABLE_PROGRESS_THREADS static volatile bool thread_running = false; static volatile bool thread_done = false; static opal_thread_t progress_thread; static opal_mutex_t progress_mutex; static opal_condition_t progress_cond; static void* request_progress_thread(opal_object_t *arg) { struct timespec abstime; struct timeval tv; while (! thread_done) { gettimeofday(&tv, NULL); abstime.tv_sec = tv.tv_sec + 1; abstime.tv_nsec = tv.tv_usec * 1000; while (mca_io_base_request_num_pending > 0) { /* do some progress, sleep, repeat */ mca_io_base_component_run_progress(); sleep(2); } opal_condition_timedwait(&progress_cond, &progress_mutex, &abstime); } return NULL; } #endif /* OMPI_ENABLE_PROGRESS_THREADS */ void mca_io_base_request_progress_init(void) { mca_io_base_request_num_pending = 0; #if OMPI_ENABLE_PROGRESS_THREADS thread_running = false; thread_done = false; OBJ_CONSTRUCT(&progress_mutex, opal_mutex_t); OBJ_CONSTRUCT(&progress_cond, opal_condition_t); OBJ_CONSTRUCT(&progress_thread, opal_thread_t); progress_thread.t_run = request_progress_thread; progress_thread.t_arg = NULL; #endif /* OMPI_ENABLE_PROGRESS_THREADS */ } OMPI_DECLSPEC void mca_io_base_request_progress_add(void) { #if OMPI_ENABLE_PROGRESS_THREADS /* if we don't have a progress thread, make us have a progress thread */ if (! thread_running) { OPAL_THREAD_LOCK(&progress_mutex); if (! thread_running) { thread_running = true; opal_thread_start(&progress_thread); } OPAL_THREAD_UNLOCK(&progress_mutex); } #endif /* OMPI_ENABLE_PROGRESS_THREADS */ OPAL_THREAD_ADD32(&mca_io_base_request_num_pending, 1); #if OMPI_ENABLE_PROGRESS_THREADS opal_condition_signal(&progress_cond); #endif /* OMPI_ENABLE_PROGRESS_THREADS */ } OMPI_DECLSPEC void mca_io_base_request_progress_del(void) { OPAL_THREAD_ADD32(&mca_io_base_request_num_pending, -1); } void mca_io_base_request_progress_fini(void) { #if OMPI_ENABLE_PROGRESS_THREADS void *ret; /* make the helper thread die */ thread_done = true; if (thread_running) { opal_condition_signal(&progress_cond); opal_thread_join(&progress_thread, &ret); } /* clean up */ OBJ_DESTRUCT(&progress_thread); OBJ_DESTRUCT(&progress_cond); OBJ_DESTRUCT(&progress_mutex); #endif /* OMPI_ENABLE_PROGRESS_THREADS */ }