1
1
openmpi/ompi/mca/bcol/basesmuma/bcol_basesmuma_buf_mgmt.c
Nathan Hjelm c13c21d476 basesmuma: clean up the setup code and ensure mapped files are unmapped
We were leaking file descriptors when coll/ml was in use. It turn out
this was because basesmuma was failing to unmap files it had previously
mapped. This commit cleans up the setup code to ensure that we only
attempt to map the control files once per module and then ensures the
files are unmapped when the module is released.

cmr=v1.8.2:reviewer=manjugv

This commit was SVN r31737.
2014-05-13 17:00:31 +00:00

478 строки
18 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2009-2013 Oak Ridge National Laboratory. All rights reserved.
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
* Copyright (c) 2013-2014 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
*/
#include "ompi_config.h"
#include "ompi/constants.h"
#include "ompi/mca/bcol/bcol.h"
#include "ompi/mca/bcol/base/base.h"
#include "ompi/patterns/comm/coll_ops.h"
#include "opal/dss/dss.h"
#include "bcol_basesmuma.h"
/*
* With support for nonblocking collectives, we don't have an upper
* limit on the number of outstanding collectives per communicator.
* Also, since we want to avoid communication to figure out which
* buffers other ranks in the group will use, we will rely on the
* fact that collective operations are called in the same order
* in each process, to assign a unique ID to each collective operation.
* We use this to create a static mapping from the index to the buffer
* that will be used. Also, because there is no limit to the number of
* outstanding collective operations, we use a generation index for each
* memory bank, so the collective will use the buffer only when the
* correct generation of the bank is ready for use.
*/
int bcol_basesmuma_get_buff_index( sm_buffer_mgmt *buff_block,
uint64_t buff_id )
{
/* local variables */
int memory_bank;
uint64_t generation;
int index=-1;
/* get the bank index that will be used */
memory_bank=buff_id& buff_block->mask;
memory_bank = memory_bank SHIFT_DOWN buff_block->log2_num_buffs_per_mem_bank;
/* get the generation of the bank this maps to */
generation = buff_id SHIFT_DOWN (buff_block->log2_number_of_buffs);
/* check to see if the bank is available */
if( generation == buff_block->ctl_buffs_mgmt[memory_bank].
bank_gen_counter ) {
/* get the buffer index that will be returned */
index=buff_id & buff_block->mask;
/* no in-use counter increment, as the mapping is static, and
* all we need to know if the number of collectives that complete */
} else {
/* progress communications so that resources can be freed up */
opal_progress();
}
/* return */
return index;
}
/* release the shared memory buffers
* buf_id is the unique ID assigned to the particular buffer
*/
int bcol_basesmuma_free_buff( sm_buffer_mgmt * buff_block,
uint64_t buff_id )
{
/* local variables */
int ret=OMPI_SUCCESS;
int memory_bank;
uint64_t generation;
mca_bcol_basesmuma_component_t *cs = &mca_bcol_basesmuma_component;
/* get the bank index that will be used */
memory_bank=buff_id& buff_block->mask;
memory_bank = memory_bank SHIFT_DOWN buff_block->log2_num_buffs_per_mem_bank;
/* get the generation of the bank this maps to */
generation = buff_id SHIFT_DOWN (buff_block->log2_number_of_buffs);
/* the generation counter should not change until all resrouces
* associated with this bank have been freed.
*/
assert(generation == buff_block->ctl_buffs_mgmt[memory_bank].bank_gen_counter);
/*
* increment counter of completed buffers
*/
OPAL_THREAD_ADD32(&(buff_block->ctl_buffs_mgmt[memory_bank].n_buffs_freed),
1);
/*
* If I am the last to checkin - initiate resource recycling
*/
if( buff_block->ctl_buffs_mgmt[memory_bank].n_buffs_freed ==
buff_block->ctl_buffs_mgmt[memory_bank].number_of_buffers ) {
/* Lock to ensure atomic recycling of resources */
OPAL_THREAD_LOCK(&(buff_block->ctl_buffs_mgmt[memory_bank].mutex));
/* make sure someone else did not already get to this */
if( buff_block->ctl_buffs_mgmt[memory_bank].n_buffs_freed !=
buff_block->ctl_buffs_mgmt[memory_bank].number_of_buffers ) {
/* release lock and exit */
OPAL_THREAD_UNLOCK(&(buff_block->ctl_buffs_mgmt[memory_bank].mutex));
} else {
sm_nbbar_desc_t *p_sm_nb_desc = NULL;
/* initiate the freeing of resources. Need to make sure the other
* ranks in the group are also done with their resources before this
* block is made available for use again.
* No one else will try to allocate from this block or free back to
* this block until the next genration counter has been incremented,
* so will just reset the number of freed buffers to 0, so no one else
* will try to also initialize the recycling of these resrouces
*/
buff_block->ctl_buffs_mgmt[memory_bank].n_buffs_freed=0;
/* Start the nonblocking barrier */
p_sm_nb_desc = &(buff_block->ctl_buffs_mgmt[memory_bank].nb_barrier_desc);
p_sm_nb_desc->coll_buff = buff_block;
bcol_basesmuma_rd_nb_barrier_init_admin(p_sm_nb_desc);
if( NB_BARRIER_DONE !=
buff_block->ctl_buffs_mgmt[memory_bank].
nb_barrier_desc.collective_phase) {
opal_list_t *list=&(cs->nb_admin_barriers);
opal_list_item_t *append_item;
/* put this onto the progression list */
OPAL_THREAD_LOCK(&(cs->nb_admin_barriers_mutex));
append_item=(opal_list_item_t *)
&(buff_block->ctl_buffs_mgmt[memory_bank].nb_barrier_desc);
opal_list_append(list,append_item);
OPAL_THREAD_UNLOCK(&(cs->nb_admin_barriers_mutex));
/* progress communications so that resources can be freed up */
opal_progress();
} else {
/* mark the block as available */
(buff_block->ctl_buffs_mgmt[memory_bank].bank_gen_counter)++;
}
/* get out of here */
OPAL_THREAD_UNLOCK(&(buff_block->ctl_buffs_mgmt[memory_bank].mutex));
}
}
/* return */
return ret;
}
/*
* Allocate buffers for storing non-blocking collective descriptions, required
* for making code re-entrant
*
*/
static int init_nb_coll_buff_desc(mca_bcol_basesmuma_nb_coll_buff_desc_t **desc,
void *base_addr, uint32_t num_banks,
uint32_t num_buffers_per_bank,
uint32_t size_buffer,
uint32_t header_size,
int group_size,
int pow_k)
{
uint32_t i, j, ci;
mca_bcol_basesmuma_nb_coll_buff_desc_t *tmp_desc = NULL;
int k_nomial_radix = mca_bcol_basesmuma_component.k_nomial_radix;
int pow_k_val = (0 == pow_k) ? 1 : pow_k;
int num_to_alloc = (k_nomial_radix - 1) * pow_k_val * 2 + 1 ;
*desc = (mca_bcol_basesmuma_nb_coll_buff_desc_t *)calloc(num_banks * num_buffers_per_bank, sizeof(mca_bcol_basesmuma_nb_coll_buff_desc_t));
if (NULL == *desc) {
return OMPI_ERROR;
}
tmp_desc = *desc;
for (i = 0; i < num_banks; i++) {
for (j = 0; j < num_buffers_per_bank; j++) {
ci = i * num_buffers_per_bank + j;
tmp_desc[ci].bank_index = i;
tmp_desc[ci].buffer_index = j;
/* *2 is for gather session +1 for extra peer */
tmp_desc[ci].requests = (ompi_request_t **)
calloc(num_to_alloc, sizeof(ompi_request_t *));
tmp_desc[ci].data_addr = (void *)
((unsigned char*)base_addr + ci * size_buffer + header_size);
BASESMUMA_VERBOSE(10, ("ml memory cache setup %d %d - %p", i, j, tmp_desc[ci].data_addr));
}
}
return OMPI_SUCCESS;
}
/*
* Free buffers for storing non-blocking collective descriptions.
*
*/
void cleanup_nb_coll_buff_desc(mca_bcol_basesmuma_nb_coll_buff_desc_t **desc,
uint32_t num_banks,
uint32_t num_buffers_per_bank)
{
uint32_t ci;
if (NULL != *desc) {
for (ci=0; ci<num_banks*num_buffers_per_bank; ci++) {
if (NULL != ((*desc)[ci]).requests) {
free(((*desc)[ci]).requests);
((*desc))[ci].requests = NULL;
}
}
free(*desc);
*desc = NULL;
}
}
#if 1
/* New init function used for new control scheme where we put the control
* struct at the top of the payload buffer
*/
int bcol_basesmuma_bank_init_opti(struct mca_bcol_base_memory_block_desc_t *payload_block,
uint32_t data_offset,
mca_bcol_base_module_t *bcol_module,
void *reg_data)
{
/* assumption here is that the block has been registered with
* sm bcol hence has been mapped by each process, need to be
* sure that memory is mapped amongst sm peers
*/
/* local variables */
int ret = OMPI_SUCCESS, i, j;
sm_buffer_mgmt *pload_mgmt;
mca_bcol_basesmuma_component_t *cs = &mca_bcol_basesmuma_component;
bcol_basesmuma_registration_data_t *sm_reg_data =
(bcol_basesmuma_registration_data_t *) reg_data;
mca_bcol_basesmuma_module_t *sm_bcol =
(mca_bcol_basesmuma_module_t *) bcol_module;
mca_bcol_base_memory_block_desc_t *ml_block = payload_block;
size_t malloc_size;
bcol_basesmuma_smcm_file_t input_file;
int leading_dim,loop_limit,buf_id;
unsigned char *base_ptr;
mca_bcol_basesmuma_module_t *sm_bcol_module=
(mca_bcol_basesmuma_module_t *)bcol_module;
int my_idx, array_id;
mca_bcol_basesmuma_header_t *ctl_ptr;
void **results_array, *mem_offset;
mca_bcol_basesmuma_local_mlmem_desc_t *ml_mem = &sm_bcol_module->ml_mem;
/* first, we get a pointer to the payload buffer management struct */
pload_mgmt = &(sm_bcol->colls_with_user_data);
/* go ahead and get the header size that is cached on the payload block
*/
sm_bcol->total_header_size = data_offset;
/* allocate memory for pointers to mine and my peers' payload buffers
* difference here is that now we use our new data struct
*/
malloc_size = ml_block->num_banks*ml_block->num_buffers_per_bank*
pload_mgmt->size_of_group *sizeof(mca_bcol_basesmuma_payload_t);
pload_mgmt->data_buffs = (mca_bcol_basesmuma_payload_t *) malloc(malloc_size);
if( !pload_mgmt->data_buffs) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit_ERROR;
}
/* allocate some memory to hold the offsets */
results_array = (void **) malloc(pload_mgmt->size_of_group * sizeof (void *));
/* setup the input file for the shared memory connection manager */
input_file.file_name = sm_reg_data->file_name;
input_file.size = sm_reg_data->size;
input_file.size_ctl_structure = 0;
input_file.data_seg_alignment = BASESMUMA_CACHE_LINE_SIZE;
input_file.mpool_size = sm_reg_data->size;
/* call the connection manager and map my shared memory peers' file
*/
ret = bcol_basesmuma_smcm_allgather_connection(
sm_bcol,
sm_bcol->super.sbgp_partner_module,
&(cs->sm_connections_list),
&(sm_bcol->payload_backing_files_info),
sm_bcol->super.sbgp_partner_module->group_comm,
input_file, cs->payload_base_fname,
false);
if( OMPI_SUCCESS != ret ) {
goto exit_ERROR;
}
/* now we exchange offset info - don't assume symmetric virtual memory
*/
mem_offset = (void *) ((uintptr_t) ml_block->block->base_addr -
(uintptr_t) cs->sm_payload_structs->data_addr);
/* call into the exchange offsets function */
ret=comm_allgather_pml(&mem_offset, results_array, sizeof (void *), MPI_BYTE,
sm_bcol_module->super.sbgp_partner_module->my_index,
sm_bcol_module->super.sbgp_partner_module->group_size,
sm_bcol_module->super.sbgp_partner_module->group_list,
sm_bcol_module->super.sbgp_partner_module->group_comm);
if( OMPI_SUCCESS != ret ) {
goto exit_ERROR;
}
/* convert memory offset to virtual address in current rank */
leading_dim = pload_mgmt->size_of_group;
loop_limit = ml_block->num_banks*ml_block->num_buffers_per_bank;
for (i=0;i< sm_bcol_module->super.sbgp_partner_module->group_size;i++) {
/* get the base pointer */
int array_id=SM_ARRAY_INDEX(leading_dim,0,i);
if( i == sm_bcol_module->super.sbgp_partner_module->my_index) {
/* me */
base_ptr=cs->sm_payload_structs->map_addr;
} else {
base_ptr=sm_bcol_module->payload_backing_files_info[i]->
sm_mmap->map_addr;
}
/* first, set the pointer to the control struct */
pload_mgmt->data_buffs[array_id].ctl_struct=(mca_bcol_basesmuma_header_t *)
(uintptr_t)(((uint64_t)(uintptr_t)results_array[array_id])+(uint64_t)(uintptr_t)base_ptr);
/* second, calculate where to set the data pointer */
pload_mgmt->data_buffs[array_id].payload=(void *)
(uintptr_t)((uint64_t)(uintptr_t) pload_mgmt->data_buffs[array_id].ctl_struct +
(uint64_t)(uintptr_t) data_offset);
for( buf_id = 1 ; buf_id < loop_limit ; buf_id++ ) {
int array_id_m1=SM_ARRAY_INDEX(leading_dim,(buf_id-1),i);
array_id=SM_ARRAY_INDEX(leading_dim,buf_id,i);
/* now, play the same game as above
*
* first, set the control struct's position */
pload_mgmt->data_buffs[array_id].ctl_struct=(mca_bcol_basesmuma_header_t *)
(uintptr_t)(((uint64_t)(uintptr_t)(pload_mgmt->data_buffs[array_id_m1].ctl_struct) +
(uint64_t)(uintptr_t)ml_block->size_buffer));
/* second, set the payload pointer */
pload_mgmt->data_buffs[array_id].payload =(void *)
(uintptr_t)((uint64_t)(uintptr_t) pload_mgmt->data_buffs[array_id].ctl_struct +
(uint64_t)(uintptr_t) data_offset);
}
}
/* done with the index array */
free (results_array);
/* initialize my control structures!! */
my_idx = sm_bcol_module->super.sbgp_partner_module->my_index;
leading_dim = sm_bcol_module->super.sbgp_partner_module->group_size;
for( buf_id = 0; buf_id < loop_limit; buf_id++){
array_id = SM_ARRAY_INDEX(leading_dim,buf_id,my_idx);
ctl_ptr = pload_mgmt->data_buffs[array_id].ctl_struct;
/* initialize the data structures */
for( j = 0; j < SM_BCOLS_MAX; j++){
for( i = 0; i < NUM_SIGNAL_FLAGS; i++){
ctl_ptr->flags[i][j] = -1;
}
}
ctl_ptr->sequence_number = -1;
ctl_ptr->src = -1;
}
/* setup the data structures needed for releasing the payload
* buffers back to the ml level
*/
for( i=0 ; i < (int) ml_block->num_banks ; i++ ) {
sm_bcol->colls_with_user_data.
ctl_buffs_mgmt[i].nb_barrier_desc.ml_memory_block_descriptor=
ml_block;
}
ml_mem->num_banks = ml_block->num_banks;
ml_mem->bank_release_counter = calloc(ml_block->num_banks, sizeof(uint32_t));
ml_mem->num_buffers_per_bank = ml_block->num_buffers_per_bank;
ml_mem->size_buffer = ml_block->size_buffer;
/* pointer to ml level descriptor */
ml_mem->ml_mem_desc = ml_block;
if (OMPI_SUCCESS != init_nb_coll_buff_desc(&ml_mem->nb_coll_desc,
ml_block->block->base_addr,
ml_mem->num_banks,
ml_mem->num_buffers_per_bank,
ml_mem->size_buffer,
data_offset,
sm_bcol_module->super.sbgp_partner_module->group_size,
sm_bcol_module->pow_k)) {
BASESMUMA_VERBOSE(10, ("Failed to allocate memory descriptors for storing state of non-blocking collectives\n"));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
exit_ERROR:
return ret;
}
#endif
/* Basesmuma interface function used for buffer release */
#if 0
/* gvm
* A collective operation calls this routine to release the payload buffer.
* All processes in the shared memory sub-group of a bcol should call the non-blocking
* barrier on the last payload buffer of a memory bank. On the completion
* of the non-blocking barrier, the ML callback is called which is responsible
* for recycling the memory bank.
*/
mca_bcol_basesmuma_module_t *sm_bcol_module
int bcol_basesmuma_free_payload_buff(
struct mca_bcol_base_memory_block_desc_t *block,
sm_buffer_mgmt *ctl_mgmt,
uint64_t buff_id)
{
/* local variables */
int ret = OMPI_SUCCESS;
memory_bank = BANK_FROM_BUFFER_IDX(buff_id);
ctl_mgmt->ctl_buffs_mgmt[memory_bank].n_buffs_freed++;
OPAL_THREAD_ADD32(&(ctl_mgmt->ctl_buffs_mgmt[memory_bank].n_buffs_freed),1);
if (ctl_mgmt->ctl_buffs_mgmt[memory_bank].n_buffs_freed == block->size_buffers_bank){
/* start non-blocking barrier */
bcol_basesmuma_rd_nb_barrier_init_admin(
&(ctl_mgmt->ctl_buffs_mgmt[memory_bank].nb_barrier_desc));
if (NB_BARRIER_DONE !=
ctl_mgmt->ctl_buffs_mgmt[memory_bank].
nb_barrier_desc.collective_phase){
/* progress the barrier */
opal_progress();
}
else{
/* free the buffer - i.e. initiate callback to ml level */
block->ml_release_cb(block,memory_bank);
}
}
return ret;
}
#endif