/* * Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved. * Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved. * Copyright (c) 2012 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** * @file * */ #include "ompi_config.h" #include "ompi/constants.h" #include "ompi/mca/bcol/bcol.h" #include "ompi/mca/bcol/base/base.h" #include "ompi/mca/coll/ml/coll_ml.h" #include "ompi/patterns/comm/coll_ops.h" #include "opal/dss/dss.h" #include "bcol_basesmuma.h" /* * With support for nonblocking collectives, we don't have an upper * limit on the number of outstanding collectives per communicator. * Also, since we want to avoid communication to figure out which * buffers other ranks in the group will use, we will rely on the * fact that collective operations are called in the same order * in each process, to assign a unique ID to each collective operation. * We use this to create a static mapping from the index to the buffer * that will be used. Also, because there is no limit to the number of * outstanding collective operations, we use a generation index for each * memory bank, so the collective will use the buffer only when the * correct generation of the bank is ready for use. */ int bcol_basesmuma_get_buff_index( sm_buffer_mgmt *buff_block, uint64_t buff_id ) { /* local variables */ int memory_bank; uint64_t generation; int index=-1; /* get the bank index that will be used */ memory_bank=buff_id& buff_block->mask; memory_bank = memory_bank SHIFT_DOWN buff_block->log2_num_buffs_per_mem_bank; /* get the generation of the bank this maps to */ generation = buff_id SHIFT_DOWN (buff_block->log2_number_of_buffs); /* check to see if the bank is available */ if( generation == buff_block->ctl_buffs_mgmt[memory_bank]. bank_gen_counter ) { /* get the buffer index that will be returned */ index=buff_id & buff_block->mask; /* no in-use counter increment, as the mapping is static, and * all we need to know if the number of collectives that complete */ } else { /* progress communications so that resources can be freed up */ opal_progress(); } /* return */ return index; } /* release the shared memory buffers * buf_id is the unique ID assigned to the particular buffer */ int bcol_basesmuma_free_buff( sm_buffer_mgmt * buff_block, uint64_t buff_id ) { /* local variables */ int ret=OMPI_SUCCESS; int memory_bank; uint64_t generation; mca_bcol_basesmuma_component_t *cs = &mca_bcol_basesmuma_component; /* get the bank index that will be used */ memory_bank=buff_id& buff_block->mask; memory_bank = memory_bank SHIFT_DOWN buff_block->log2_num_buffs_per_mem_bank; /* get the generation of the bank this maps to */ generation = buff_id SHIFT_DOWN (buff_block->log2_number_of_buffs); /* the generation counter should not change until all resrouces * associated with this bank have been freed. */ assert(generation == buff_block->ctl_buffs_mgmt[memory_bank].bank_gen_counter); /* * increment counter of completed buffers */ OPAL_THREAD_ADD32(&(buff_block->ctl_buffs_mgmt[memory_bank].n_buffs_freed), 1); /* * If I am the last to checkin - initiate resource recycling */ if( buff_block->ctl_buffs_mgmt[memory_bank].n_buffs_freed == buff_block->ctl_buffs_mgmt[memory_bank].number_of_buffers ) { /* Lock to ensure atomic recycling of resources */ OPAL_THREAD_LOCK(&(buff_block->ctl_buffs_mgmt[memory_bank].mutex)); /* make sure someone else did not already get to this */ if( buff_block->ctl_buffs_mgmt[memory_bank].n_buffs_freed != buff_block->ctl_buffs_mgmt[memory_bank].number_of_buffers ) { /* release lock and exit */ OPAL_THREAD_UNLOCK(&(buff_block->ctl_buffs_mgmt[memory_bank].mutex)); } else { sm_nbbar_desc_t *p_sm_nb_desc = NULL; /* initiate the freeing of resources. Need to make sure the other * ranks in the group are also done with their resources before this * block is made available for use again. * No one else will try to allocate from this block or free back to * this block until the next genration counter has been incremented, * so will just reset the number of freed buffers to 0, so no one else * will try to also initialize the recycling of these resrouces */ buff_block->ctl_buffs_mgmt[memory_bank].n_buffs_freed=0; /* Start the nonblocking barrier */ p_sm_nb_desc = &(buff_block->ctl_buffs_mgmt[memory_bank].nb_barrier_desc); p_sm_nb_desc->coll_buff = buff_block; bcol_basesmuma_rd_nb_barrier_init_admin(p_sm_nb_desc); if( NB_BARRIER_DONE != buff_block->ctl_buffs_mgmt[memory_bank]. nb_barrier_desc.collective_phase) { opal_list_t *list=&(cs->nb_admin_barriers); opal_list_item_t *append_item; /* put this onto the progression list */ OPAL_THREAD_LOCK(&(cs->nb_admin_barriers_mutex)); append_item=(opal_list_item_t *) &(buff_block->ctl_buffs_mgmt[memory_bank].nb_barrier_desc); opal_list_append(list,append_item); OPAL_THREAD_UNLOCK(&(cs->nb_admin_barriers_mutex)); /* progress communications so that resources can be freed up */ opal_progress(); } else { /* mark the block as available */ (buff_block->ctl_buffs_mgmt[memory_bank].bank_gen_counter)++; } /* get out of here */ OPAL_THREAD_UNLOCK(&(buff_block->ctl_buffs_mgmt[memory_bank].mutex)); } } /* return */ return ret; } #if 0 /* Basesmuma interface function used for buffer bank resource recycling and bcol specific registration information */ int bcol_basesmuma_bank_init(struct mca_coll_ml_module_t *ml_module, mca_bcol_base_module_t *bcol_module, void *reg_data) { /* assumption here is that the block has been registered with * sm bcol hence has been mapped by each process, need to be * sure that memory is mapped amongst sm peers */ /* local variables */ int ret = OMPI_SUCCESS, i; uint32_t j; sm_buffer_mgmt *pload_mgmt; mca_bcol_basesmuma_component_t *cs = &mca_bcol_basesmuma_component; bcol_basesmuma_registration_data_t *sm_reg_data = (bcol_basesmuma_registration_data_t *) reg_data; mca_bcol_basesmuma_module_t *sm_bcol = (mca_bcol_basesmuma_module_t *) bcol_module; ml_memory_block_desc_t *ml_block = ml_module->payload_block; size_t malloc_size; ompi_common_sm_file_t input_file; uint64_t mem_offset; int leading_dim,loop_limit,buf_id; unsigned char *base_ptr; mca_bcol_basesmuma_module_t *sm_bcol_module= (mca_bcol_basesmuma_module_t *)bcol_module; fprintf(stderr,"test opti test\n"); /* first, we get a pointer to the payload buffer management struct */ pload_mgmt = &(sm_bcol->colls_with_user_data); /* allocate memory for pointers to mine and my peers' payload buffers */ malloc_size = ml_block->num_banks*ml_block->num_buffers_per_bank* pload_mgmt->size_of_group *sizeof(void *); pload_mgmt->data_buffs = malloc(malloc_size); if( !pload_mgmt->data_buffs) { ret = OMPI_ERR_OUT_OF_RESOURCE; goto ERROR; } /* setup the input file for the shared memory connection manager */ input_file.file_name = sm_reg_data->file_name; input_file.size = sm_reg_data->size; input_file.size_ctl_structure = 0; input_file.data_seg_alignment = CACHE_LINE_SIZE; input_file.mpool_size = sm_reg_data->size; /* call the connection manager and map my shared memory peers' file */ ret = ompi_common_smcm_allgather_connection( sm_bcol, sm_bcol->super.sbgp_partner_module, &(cs->sm_connections_list), &(sm_bcol->payload_backing_files_info), sm_bcol->super.sbgp_partner_module->group_comm, input_file, false); if( OMPI_SUCCESS != ret ) { goto ERROR; } /* now we exchange offset info - don't assume symmetric virtual memory */ mem_offset = (uint64_t)(ml_block->block->base_addr) - (uint64_t)(cs->sm_payload_structs->data_addr); /* call into the exchange offsets function */ ret = base_bcol_basesmuma_exchange_offsets(sm_bcol_module, (void **)pload_mgmt->data_buffs, mem_offset, 0, pload_mgmt->size_of_group); if( OMPI_SUCCESS != ret ) { goto ERROR; } /* convert memory offset to virtual address in current rank */ leading_dim = pload_mgmt->size_of_group; loop_limit = ml_block->num_banks*ml_block->num_buffers_per_bank; for (i=0;i< sm_bcol_module->super.sbgp_partner_module->group_size;i++) { /* get the base pointer */ int array_id=SM_ARRAY_INDEX(leading_dim,0,i); if( i == sm_bcol_module->super.sbgp_partner_module->my_index) { /* me */ base_ptr=cs->sm_payload_structs->map_addr; } else { base_ptr=sm_bcol_module->payload_backing_files_info[i]-> sm_mmap->map_addr; } pload_mgmt->data_buffs[array_id]=(void *) (((uint64_t)pload_mgmt->data_buffs[array_id])+(uint64_t)base_ptr); for( buf_id = 1 ; buf_id < loop_limit ; buf_id++ ) { int array_id_m1=SM_ARRAY_INDEX(leading_dim,(buf_id-1),i); array_id=SM_ARRAY_INDEX(leading_dim,buf_id,i); pload_mgmt->data_buffs[array_id]=(void *) ((uint64_t)(pload_mgmt->data_buffs[array_id_m1])+ (uint64_t)ml_block->size_buffer); } } /* setup the data structures needed for releasing the payload * buffers back to the ml level */ for(j = 0; j < ml_block->num_banks; j++) { sm_bcol->colls_with_user_data. ctl_buffs_mgmt[j].nb_barrier_desc.ml_memory_block_descriptor= ml_block; } return OMPI_SUCCESS; ERROR: return ret; } #endif /* * Allocate buffers for storing non-blocking collective descriptions, required * for making code re-entrant * */ static int init_nb_coll_buff_desc(mca_bcol_basesmuma_nb_coll_buff_desc_t **desc, void *base_addr, uint32_t num_banks, uint32_t num_buffers_per_bank, uint32_t size_buffer, uint32_t header_size, int group_size, int pow_k) { uint32_t i, j, ci; mca_bcol_basesmuma_nb_coll_buff_desc_t *tmp_desc = NULL; int k_nomial_radix = mca_bcol_basesmuma_component.k_nomial_radix; int pow_k_val = (0 == pow_k) ? 1 : pow_k; int num_to_alloc = (k_nomial_radix - 1) * pow_k_val * 2 + 1 ; *desc = (mca_bcol_basesmuma_nb_coll_buff_desc_t *)calloc(num_banks * num_buffers_per_bank, sizeof(mca_bcol_basesmuma_nb_coll_buff_desc_t)); if (NULL == *desc) { return OMPI_ERROR; } tmp_desc = *desc; for (i = 0; i < num_banks; i++) { for (j = 0; j < num_buffers_per_bank; j++) { ci = i * num_buffers_per_bank + j; tmp_desc[ci].bank_index = i; tmp_desc[ci].buffer_index = j; /* *2 is for gather session +1 for extra peer */ tmp_desc[ci].requests = (ompi_request_t **) calloc(num_to_alloc, sizeof(ompi_request_t *)); tmp_desc[ci].data_addr = (void *) ((unsigned char*)base_addr + ci * size_buffer + header_size); BASESMUMA_VERBOSE(10, ("ml memory cache setup %d %d - %p", i, j, tmp_desc[ci].data_addr)); } } return OMPI_SUCCESS; } #if 1 /* New init function used for new control scheme where we put the control * struct at the top of the payload buffer */ int bcol_basesmuma_bank_init_opti(struct mca_coll_ml_module_t *ml_module, mca_bcol_base_module_t *bcol_module, void *reg_data) { /* assumption here is that the block has been registered with * sm bcol hence has been mapped by each process, need to be * sure that memory is mapped amongst sm peers */ /* local variables */ int ret = OMPI_SUCCESS, i, j; sm_buffer_mgmt *pload_mgmt; mca_bcol_basesmuma_component_t *cs = &mca_bcol_basesmuma_component; bcol_basesmuma_registration_data_t *sm_reg_data = (bcol_basesmuma_registration_data_t *) reg_data; mca_bcol_basesmuma_module_t *sm_bcol = (mca_bcol_basesmuma_module_t *) bcol_module; ml_memory_block_desc_t *ml_block = ml_module->payload_block; size_t malloc_size; bcol_basesmuma_smcm_file_t input_file; uint64_t mem_offset; int leading_dim,loop_limit,buf_id; unsigned char *base_ptr; mca_bcol_basesmuma_module_t *sm_bcol_module= (mca_bcol_basesmuma_module_t *)bcol_module; int my_idx, array_id; mca_bcol_basesmuma_header_t *ctl_ptr; void **results_array; mca_bcol_basesmuma_local_mlmem_desc_t *ml_mem = &sm_bcol_module->ml_mem; /* first, we get a pointer to the payload buffer management struct */ pload_mgmt = &(sm_bcol->colls_with_user_data); /* go ahead and get the header size that is cached on the payload block */ sm_bcol->total_header_size = ml_module->data_offset; /* allocate memory for pointers to mine and my peers' payload buffers * difference here is that now we use our new data struct */ malloc_size = ml_block->num_banks*ml_block->num_buffers_per_bank* pload_mgmt->size_of_group *sizeof(mca_bcol_basesmuma_payload_t); pload_mgmt->data_buffs = (mca_bcol_basesmuma_payload_t *) malloc(malloc_size); if( !pload_mgmt->data_buffs) { ret = OMPI_ERR_OUT_OF_RESOURCE; goto ERROR; } /* allocate some memory to hold the offsets */ results_array = (void **) malloc(pload_mgmt->size_of_group*sizeof(void *)); /* setup the input file for the shared memory connection manager */ input_file.file_name = sm_reg_data->file_name; input_file.size = sm_reg_data->size; input_file.size_ctl_structure = 0; input_file.data_seg_alignment = BASESMUMA_CACHE_LINE_SIZE; input_file.mpool_size = sm_reg_data->size; /* call the connection manager and map my shared memory peers' file */ ret = bcol_basesmuma_smcm_allgather_connection( sm_bcol, sm_bcol->super.sbgp_partner_module, &(cs->sm_connections_list), &(sm_bcol->payload_backing_files_info), sm_bcol->super.sbgp_partner_module->group_comm, input_file,cs->payload_base_fname, false); if( OMPI_SUCCESS != ret ) { goto ERROR; } /* now we exchange offset info - don't assume symmetric virtual memory */ mem_offset = (uint64_t)(uintptr_t)(ml_block->block->base_addr) - (uint64_t)(uintptr_t)(cs->sm_payload_structs->data_addr); /* call into the exchange offsets function */ ret=comm_allgather_pml(&mem_offset,results_array,1, MPI_LONG_LONG_INT, sm_bcol_module->super.sbgp_partner_module->my_index, sm_bcol_module->super.sbgp_partner_module->group_size, sm_bcol_module->super.sbgp_partner_module->group_list, sm_bcol_module->super.sbgp_partner_module->group_comm); if( OMPI_SUCCESS != ret ) { goto ERROR; } /* convert memory offset to virtual address in current rank */ leading_dim = pload_mgmt->size_of_group; loop_limit = ml_block->num_banks*ml_block->num_buffers_per_bank; for (i=0;i< sm_bcol_module->super.sbgp_partner_module->group_size;i++) { /* get the base pointer */ int array_id=SM_ARRAY_INDEX(leading_dim,0,i); if( i == sm_bcol_module->super.sbgp_partner_module->my_index) { /* me */ base_ptr=cs->sm_payload_structs->map_addr; } else { base_ptr=sm_bcol_module->payload_backing_files_info[i]-> sm_mmap->map_addr; } /* first, set the pointer to the control struct */ pload_mgmt->data_buffs[array_id].ctl_struct=(mca_bcol_basesmuma_header_t *) (uintptr_t)(((uint64_t)(uintptr_t)results_array[array_id])+(uint64_t)(uintptr_t)base_ptr); /* second, calculate where to set the data pointer */ pload_mgmt->data_buffs[array_id].payload=(void *) (uintptr_t)((uint64_t)(uintptr_t) pload_mgmt->data_buffs[array_id].ctl_struct + (uint64_t)(uintptr_t) ml_module->data_offset); for( buf_id = 1 ; buf_id < loop_limit ; buf_id++ ) { int array_id_m1=SM_ARRAY_INDEX(leading_dim,(buf_id-1),i); array_id=SM_ARRAY_INDEX(leading_dim,buf_id,i); /* now, play the same game as above * * first, set the control struct's position */ pload_mgmt->data_buffs[array_id].ctl_struct=(mca_bcol_basesmuma_header_t *) (uintptr_t)(((uint64_t)(uintptr_t)(pload_mgmt->data_buffs[array_id_m1].ctl_struct) + (uint64_t)(uintptr_t)ml_block->size_buffer)); /* second, set the payload pointer */ pload_mgmt->data_buffs[array_id].payload =(void *) (uintptr_t)((uint64_t)(uintptr_t) pload_mgmt->data_buffs[array_id].ctl_struct + (uint64_t)(uintptr_t) ml_module->data_offset); } } /* initialize my control structures!! */ my_idx = sm_bcol_module->super.sbgp_partner_module->my_index; leading_dim = sm_bcol_module->super.sbgp_partner_module->group_size; for( buf_id = 0; buf_id < loop_limit; buf_id++){ array_id = SM_ARRAY_INDEX(leading_dim,buf_id,my_idx); ctl_ptr = pload_mgmt->data_buffs[array_id].ctl_struct; /* initialize the data structures */ for( j = 0; j < SM_BCOLS_MAX; j++){ for( i = 0; i < NUM_SIGNAL_FLAGS; i++){ ctl_ptr->flags[i][j] = -1; } } ctl_ptr->sequence_number = -1; ctl_ptr->src = -1; } /* setup the data structures needed for releasing the payload * buffers back to the ml level */ for( i=0 ; i < (int) ml_block->num_banks ; i++ ) { sm_bcol->colls_with_user_data. ctl_buffs_mgmt[i].nb_barrier_desc.ml_memory_block_descriptor= ml_block; } ml_mem->num_banks = ml_block->num_banks; ml_mem->bank_release_counter = calloc(ml_block->num_banks, sizeof(uint32_t)); ml_mem->num_buffers_per_bank = ml_block->num_buffers_per_bank; ml_mem->size_buffer = ml_block->size_buffer; /* pointer to ml level descriptor */ ml_mem->ml_mem_desc = ml_block; if (OMPI_SUCCESS != init_nb_coll_buff_desc(&ml_mem->nb_coll_desc, ml_block->block->base_addr, ml_mem->num_banks, ml_mem->num_buffers_per_bank, ml_mem->size_buffer, ml_module->data_offset, sm_bcol_module->super.sbgp_partner_module->group_size, sm_bcol_module->pow_k)) { BASESMUMA_VERBOSE(10, ("Failed to allocate memory descriptors for storing state of non-blocking collectives\n")); return OMPI_ERROR; } return OMPI_SUCCESS; ERROR: return ret; } #endif /* Basesmuma interface function used for buffer release */ #if 0 /* gvm * A collective operation calls this routine to release the payload buffer. * All processes in the shared memory sub-group of a bcol should call the non-blocking * barrier on the last payload buffer of a memory bank. On the completion * of the non-blocking barrier, the ML callback is called which is responsible * for recycling the memory bank. */ mca_bcol_basesmuma_module_t *sm_bcol_module int bcol_basesmuma_free_payload_buff( struct ml_memory_block_desc_t *block, sm_buffer_mgmt *ctl_mgmt, uint64_t buff_id) { /* local variables */ int ret = OMPI_SUCCESS; memory_bank = BANK_FROM_BUFFER_IDX(buff_id); ctl_mgmt->ctl_buffs_mgmt[memory_bank].n_buffs_freed++; OPAL_THREAD_ADD32(&(ctl_mgmt->ctl_buffs_mgmt[memory_bank].n_buffs_freed),1); if (ctl_mgmt->ctl_buffs_mgmt[memory_bank].n_buffs_freed == block->size_buffers_bank){ /* start non-blocking barrier */ bcol_basesmuma_rd_nb_barrier_init_admin( &(ctl_mgmt->ctl_buffs_mgmt[memory_bank].nb_barrier_desc)); if (NB_BARRIER_DONE != ctl_mgmt->ctl_buffs_mgmt[memory_bank]. nb_barrier_desc.collective_phase){ /* progress the barrier */ opal_progress(); } else{ /* free the buffer - i.e. initiate callback to ml level */ block->ml_release_cb(block,memory_bank); } } return ret; } #endif