Add caching of FCA communicators
developed by Dinar, reviewed by miked/yossi. cmr:v1.7.3:reviewer=jsquyres:subject=add caching of FCA communicators. This commit was SVN r29256.
Этот коммит содержится в:
родитель
d67e3077f5
Коммит
7c6ff00da5
@ -7,6 +7,7 @@
|
||||
$HEADER$
|
||||
*/
|
||||
|
||||
|
||||
#ifndef MCA_COLL_FCA_H
|
||||
#define MCA_COLL_FCA_H
|
||||
|
||||
@ -20,7 +21,7 @@
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/op/op.h"
|
||||
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "coll_fca_api.h"
|
||||
#include "coll_fca_debug.h"
|
||||
|
||||
@ -72,7 +73,7 @@ BEGIN_C_DECLS
|
||||
* Used to load the library dynamically.
|
||||
*/
|
||||
|
||||
|
||||
int modular_pow(uint64_t base, uint64_t exponent, uint64_t modulus);
|
||||
|
||||
/**
|
||||
* FCA data type information
|
||||
@ -167,15 +168,67 @@ struct mca_coll_fca_component_t {
|
||||
/** MCA parameter: FCA NP */
|
||||
int fca_np;
|
||||
|
||||
/** MCA parameter: Enable FCA comm cache */
|
||||
int fca_enable_cache;
|
||||
|
||||
/** MCA parameter: Enable parallel hash calc */
|
||||
int fca_parallel_hash_calc;
|
||||
|
||||
/** Some statistics counters */
|
||||
double fca_total_work_time;
|
||||
double fca_work_time_parallel;
|
||||
double fca_work_time_sequency;
|
||||
int fca_cache_hit;
|
||||
int fca_cache_miss;
|
||||
int fca_hash_hit;
|
||||
int fca_hash_miss;
|
||||
int fca_max_deep_in_cache;
|
||||
|
||||
/** MCA parameter: Enable hash for cache */
|
||||
int fca_enable_hash;
|
||||
|
||||
/* FCA global stuff */
|
||||
fca_t *fca_context; /* FCA context handle */
|
||||
mca_coll_fca_dtype_info_t fca_dtypes[FCA_DT_MAX_PREDEFINED]; /* FCA dtype translation */
|
||||
mca_coll_fca_op_info_t fca_reduce_ops[FCA_MAX_OPS]; /* FCA op translation */
|
||||
|
||||
/* communicator cache */
|
||||
opal_list_t c_cache;
|
||||
|
||||
/* pointer to primes */
|
||||
int *fca_primes;
|
||||
|
||||
/** MCA parameter hash table size*/
|
||||
int fca_hash_size;
|
||||
|
||||
/** MCA parameter hash table size*/
|
||||
int fca_number_of_primes;
|
||||
|
||||
/* Pointer to hash */
|
||||
opal_list_t **fca_hash;
|
||||
|
||||
};
|
||||
typedef struct mca_coll_fca_component_t mca_coll_fca_component_t;
|
||||
|
||||
OMPI_MODULE_DECLSPEC extern mca_coll_fca_component_t mca_coll_fca_component;
|
||||
|
||||
struct mca_coll_fca_comm_wrap_t {
|
||||
opal_object_t super;
|
||||
fca_comm_t *fca_comm;
|
||||
int comm_id, rank;
|
||||
};
|
||||
typedef struct mca_coll_fca_comm_wrap_t mca_coll_fca_comm_wrap_t;
|
||||
OBJ_CLASS_DECLARATION(mca_coll_fca_comm_wrap_t);
|
||||
|
||||
struct mca_coll_fca_c_cache_item_t {
|
||||
opal_list_item_t super;
|
||||
int size;
|
||||
ompi_communicator_t *comm;
|
||||
mca_coll_fca_comm_wrap_t *fca_comm_wrap;
|
||||
};
|
||||
typedef struct mca_coll_fca_c_cache_item_t mca_coll_fca_c_cache_item_t;
|
||||
OBJ_CLASS_DECLARATION(mca_coll_fca_c_cache_item_t);
|
||||
|
||||
|
||||
/**
|
||||
* FCA enabled communicator
|
||||
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
@ -13,6 +13,20 @@
|
||||
* Initial query function that is invoked during MPI_INIT, allowing
|
||||
* this module to indicate what level of thread support it provides.
|
||||
*/
|
||||
|
||||
int modular_pow(uint64_t base, uint64_t exponent, uint64_t modulus)
|
||||
{
|
||||
int result = 1;
|
||||
while(exponent>0)
|
||||
{
|
||||
if((exponent % 2) == 1)
|
||||
result = (result * base) % modulus;
|
||||
exponent = exponent >> 1;
|
||||
base = (base * base) % modulus;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
int mca_coll_fca_init_query(bool enable_progress_threads,
|
||||
bool enable_mpi_threads)
|
||||
{
|
||||
@ -254,12 +268,130 @@ static int __fca_comm_new(mca_coll_fca_module_t *fca_module)
|
||||
|
||||
static int __create_fca_comm(mca_coll_fca_module_t *fca_module)
|
||||
{
|
||||
int rc, ret;
|
||||
int rc, ret;
|
||||
int result = MPI_UNEQUAL;
|
||||
mca_coll_fca_c_cache_item_t *c_item = NULL, *c_item_new = NULL;
|
||||
mca_coll_fca_component_t *cm = &mca_coll_fca_component;
|
||||
int comm_size = ompi_comm_size(fca_module->comm);
|
||||
ompi_communicator_t *comm = fca_module->comm;
|
||||
opal_list_t *c_cache;
|
||||
struct timeval start, end, seq_start, seq_end, par_start, par_end;
|
||||
int act_deep;
|
||||
|
||||
|
||||
rc = __fca_comm_new(fca_module);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
if(mca_coll_fca_component.fca_verbose == 10) {
|
||||
gettimeofday(&start, NULL);
|
||||
}
|
||||
|
||||
if(mca_coll_fca_component.fca_enable_cache) {
|
||||
|
||||
c_cache = &cm->c_cache;
|
||||
|
||||
if(mca_coll_fca_component.fca_enable_hash){
|
||||
|
||||
int grank = ORTE_PROC_MY_NAME->vpid;
|
||||
int hash_index, part_of_hash_index;
|
||||
|
||||
if(mca_coll_fca_component.fca_parallel_hash_calc == 1) {
|
||||
|
||||
if(mca_coll_fca_component.fca_verbose == 10){
|
||||
gettimeofday(&par_start, NULL);
|
||||
}
|
||||
|
||||
part_of_hash_index = modular_pow(cm->fca_primes[grank % cm->fca_number_of_primes], grank, cm->fca_hash_size);
|
||||
rc = comm->c_coll.coll_allreduce(&part_of_hash_index, &hash_index, 1, MPI_INT, MPI_SUM, comm, comm->c_coll.coll_allreduce_module);
|
||||
if (rc != OMPI_SUCCESS) {
|
||||
FCA_ERROR("Failed to reduce hash_index: %d", rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if(mca_coll_fca_component.fca_verbose == 10){
|
||||
gettimeofday(&par_end, NULL);
|
||||
mca_coll_fca_component.fca_work_time_parallel =+
|
||||
par_end.tv_sec - par_start.tv_sec + 1e-6 * (par_end.tv_usec - par_start.tv_usec);
|
||||
}
|
||||
}else{
|
||||
|
||||
if(mca_coll_fca_component.fca_verbose == 10){
|
||||
gettimeofday(&seq_start, NULL);
|
||||
}
|
||||
|
||||
hash_index = 0;
|
||||
int q_counter = 0;
|
||||
int q_comm_size = ompi_comm_size (comm);
|
||||
|
||||
for(q_counter = 0; q_counter < q_comm_size; q_counter++)
|
||||
{
|
||||
hash_index += modular_pow(cm->fca_primes[q_counter % cm->fca_number_of_primes], q_counter, cm->fca_hash_size);
|
||||
}
|
||||
|
||||
if(mca_coll_fca_component.fca_verbose == 10){
|
||||
gettimeofday(&seq_end, NULL);
|
||||
mca_coll_fca_component.fca_work_time_sequency =+
|
||||
seq_end.tv_sec - seq_start.tv_sec + 1e-6 * (seq_end.tv_usec - seq_start.tv_usec);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if(cm->fca_hash[hash_index % cm->fca_hash_size] != NULL)
|
||||
{
|
||||
c_cache = cm->fca_hash[hash_index % cm->fca_hash_size];
|
||||
if(mca_coll_fca_component.fca_verbose == 10) {
|
||||
gettimeofday(&end, NULL);
|
||||
mca_coll_fca_component.fca_total_work_time =+
|
||||
end.tv_sec - start.tv_sec + 1e-6 * (end.tv_usec - start.tv_usec);
|
||||
mca_coll_fca_component.fca_hash_hit += 1;
|
||||
}
|
||||
}else
|
||||
{
|
||||
if(mca_coll_fca_component.fca_verbose == 10) {
|
||||
mca_coll_fca_component.fca_hash_miss += 1;
|
||||
}
|
||||
c_cache = OBJ_NEW(opal_list_t);
|
||||
cm->fca_hash[hash_index % cm->fca_hash_size] = c_cache;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
act_deep = 0;
|
||||
for( c_item = (mca_coll_fca_c_cache_item_t *)opal_list_get_first(c_cache);
|
||||
c_item != (mca_coll_fca_c_cache_item_t *)opal_list_get_end(c_cache);
|
||||
c_item = (mca_coll_fca_c_cache_item_t *)opal_list_get_next((opal_list_item_t *) c_item)){
|
||||
act_deep++;
|
||||
/* first check the size */
|
||||
if( comm_size == c_item->size){
|
||||
/* then we have a potential cache hit */
|
||||
ompi_comm_compare(comm, c_item->comm, &result);
|
||||
if( MPI_CONGRUENT == result) {
|
||||
/* cache hit! Return the context and be done with it */
|
||||
/* first bump the score */
|
||||
ret = fca_comm_get_caps(c_item->fca_comm_wrap->fca_comm, &fca_module->fca_comm_caps);
|
||||
if (ret < 0) {
|
||||
FCA_ERROR("GET_COMM_CAPS failed: %s", fca_strerror(ret));
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
fca_module->fca_comm = c_item->fca_comm_wrap->fca_comm;
|
||||
|
||||
if(mca_coll_fca_component.fca_verbose == 10) {
|
||||
gettimeofday(&end, NULL);
|
||||
|
||||
mca_coll_fca_component.fca_total_work_time =+
|
||||
end.tv_sec - start.tv_sec + 1e-6 * (end.tv_usec - start.tv_usec);
|
||||
|
||||
mca_coll_fca_component.fca_cache_hit += 1;
|
||||
|
||||
if(act_deep>mca_coll_fca_component.fca_max_deep_in_cache)
|
||||
mca_coll_fca_component.fca_max_deep_in_cache = act_deep;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
rc = __fca_comm_new(fca_module);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
#if OMPI_FCA_VERSION < 30
|
||||
/* allocate comm_init_spec */
|
||||
FCA_MODULE_VERBOSE(fca_module, 1, "Starting COMM_INIT comm_id %d proc_idx %d num_procs %d",
|
||||
@ -282,10 +414,35 @@ static int __create_fca_comm(mca_coll_fca_module_t *fca_module)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* by this point every rank in the communicator is set up */
|
||||
FCA_MODULE_VERBOSE(fca_module, 1, "Initialized FCA communicator, comm_id %d",
|
||||
fca_module->fca_comm_desc.comm_id);
|
||||
return OMPI_SUCCESS;
|
||||
/* by this point every rank in the communicator is set up */
|
||||
FCA_MODULE_VERBOSE(fca_module, 1, "Initialized FCA communicator, comm_id %d",
|
||||
fca_module->fca_comm_desc.comm_id);
|
||||
if(mca_coll_fca_component.fca_enable_cache) {
|
||||
|
||||
c_item_new = OBJ_NEW(mca_coll_fca_c_cache_item_t);
|
||||
c_item_new->fca_comm_wrap = OBJ_NEW(mca_coll_fca_comm_wrap_t);
|
||||
|
||||
OBJ_RETAIN(comm);
|
||||
|
||||
c_item_new->size = comm_size;
|
||||
c_item_new->comm = comm;
|
||||
c_item_new->fca_comm_wrap->fca_comm = fca_module->fca_comm;
|
||||
c_item_new->fca_comm_wrap->rank = fca_module->rank;
|
||||
c_item_new->fca_comm_wrap->comm_id = fca_module->fca_comm_desc.comm_id;
|
||||
|
||||
opal_list_append(c_cache,(opal_list_item_t *) c_item_new);
|
||||
}
|
||||
|
||||
if(mca_coll_fca_component.fca_verbose == 10) {
|
||||
|
||||
gettimeofday(&end, NULL);
|
||||
|
||||
mca_coll_fca_component.fca_total_work_time =+
|
||||
end.tv_sec - start.tv_sec + 1e-6 * (end.tv_usec - start.tv_usec);
|
||||
|
||||
mca_coll_fca_component.fca_cache_miss += 1;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static void __destroy_fca_comm(mca_coll_fca_module_t *fca_module)
|
||||
@ -407,10 +564,12 @@ static void mca_coll_fca_module_construct(mca_coll_fca_module_t *fca_module)
|
||||
static void mca_coll_fca_module_destruct(mca_coll_fca_module_t *fca_module)
|
||||
{
|
||||
FCA_VERBOSE(5, "==>");
|
||||
if(mca_coll_fca_component.fca_enable_cache == 0) {
|
||||
|
||||
if (fca_module->fca_comm) {
|
||||
__destroy_fca_comm(fca_module);
|
||||
}
|
||||
if (fca_module->fca_comm) {
|
||||
__destroy_fca_comm(fca_module);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_RELEASE(fca_module->previous_barrier_module);
|
||||
OBJ_RELEASE(fca_module->previous_bcast_module);
|
||||
@ -495,3 +654,54 @@ OBJ_CLASS_INSTANCE(mca_coll_fca_module_t,
|
||||
mca_coll_base_module_t,
|
||||
mca_coll_fca_module_construct,
|
||||
mca_coll_fca_module_destruct);
|
||||
|
||||
|
||||
static void mca_coll_fca_comm_wrap_constructor(mca_coll_fca_comm_wrap_t *item) {
|
||||
item->fca_comm = NULL;
|
||||
item->comm_id = -1;
|
||||
item->rank = -1;
|
||||
}
|
||||
|
||||
static void mca_coll_fca_comm_wrap_destruct(mca_coll_fca_comm_wrap_t *item) {
|
||||
|
||||
int ret;
|
||||
|
||||
if(item->fca_comm != NULL)
|
||||
{
|
||||
fca_comm_destroy(item->fca_comm);
|
||||
if (item->rank == 0) {
|
||||
ret = fca_comm_end(mca_coll_fca_component.fca_context,
|
||||
item->comm_id);
|
||||
if (ret < 0) {
|
||||
FCA_ERROR("COMM_END failed: %s", fca_strerror(ret));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_coll_fca_comm_wrap_t,
|
||||
opal_object_t,
|
||||
mca_coll_fca_comm_wrap_constructor,
|
||||
mca_coll_fca_comm_wrap_destruct);
|
||||
|
||||
|
||||
static void mca_coll_fca_c_cache_item_construct(mca_coll_fca_c_cache_item_t *item) {
|
||||
item->comm = NULL;
|
||||
item->size = -1;
|
||||
/* item->fca_comm_wrap = OBJ_NEW(mca_coll_fca_comm_wrap_t); */
|
||||
item->fca_comm_wrap = NULL;
|
||||
}
|
||||
|
||||
static void mca_coll_fca_c_cache_item_destruct(mca_coll_fca_c_cache_item_t *item) {
|
||||
if(item->fca_comm_wrap != NULL) {
|
||||
OBJ_RELEASE(item->fca_comm_wrap);
|
||||
/* OBJ_RELEASE(item->comm); */
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_coll_fca_c_cache_item_t,
|
||||
opal_list_item_t,
|
||||
mca_coll_fca_c_cache_item_construct,
|
||||
mca_coll_fca_c_cache_item_destruct);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user