/* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** * @file * * Most of the description of the data layout is in the * coll_sm_module.c file. */ #include "ompi_config.h" #include #include #include #include #include #include "ompi/constants.h" #include "ompi/communicator/communicator.h" #include "ompi/mca/coll/coll.h" #include "opal/util/show_help.h" #include "coll_sm2.h" #include "ompi/mca/coll/base/base.h" #include "ompi/mca/dpm/dpm.h" #include "orte/mca/rml/rml.h" #include "orte/util/proc_info.h" /* * Local functions */ static int sm2_module_enable(mca_coll_base_module_t *module, struct ompi_communicator_t *comm); #if 0 /* debug */ extern int debug_print; extern int my_debug_rank; extern int my_debug_comm_size; extern void debug_module(void); extern int last_root; extern int node_type; long long free_buff_free_index=-1; static mca_coll_sm2_module_t *module_dbg; static int blocking_cnt=0; void debug_module(void) { int i,j,k; char *ptr; int barrier_index,index; long long br_tag; mca_coll_sm2_nb_request_process_shared_mem_t * ctl_ptr; /* control regions */ if ( 0 == my_debug_rank ) { for( i=0 ; i < 2 ; i++ ) { for( j=0 ; j < 2 ; j++ ) { fprintf(stderr," bank %d index %d \n", i,j); for( k=0 ; k < my_debug_comm_size ; k++ ) { ctl_ptr=module_dbg->barrier_request[i].barrier_base_address[j]; ctl_ptr=(mca_coll_sm2_nb_request_process_shared_mem_t *) ( (char *)ctl_ptr+k*module_dbg->sm2_size_management_region_per_proc ); fprintf(stderr," bank %d index %d flag %lld \n", i,j,ctl_ptr->flag); } } } } /* data regions */ fprintf(stderr," my_debug_rank %d current index %d freed index %d coll_tag %lld debug stat %d blocking_cnt %d last_root %d free_buff_free_index %lld node_type %d \n", my_debug_rank, module_dbg->sm2_allocated_buffer_index,module_dbg->sm2_freed_buffer_index, module_dbg->collective_tag, module_dbg->blocked_on_barrier,blocking_cnt,last_root, free_buff_free_index,node_type); barrier_index=(module_dbg->num_nb_barriers_completed% module_dbg->sm2_module_num_memory_banks); index=module_dbg->barrier_request[barrier_index].sm_index; fprintf(stderr," my_debug_rank %d started %lld completed %lld bank %d index %d br_tag %lld \n", my_debug_rank, module_dbg->num_nb_barriers_started, module_dbg->num_nb_barriers_completed, barrier_index,index, module_dbg->barrier_request[barrier_index].tag); fprintf(stderr," my_debug_rank %d barrier_bank_cntr %lld ", my_debug_rank,module_dbg->barrier_bank_cntr); for( i=0 ; i < BARRIER_BANK_LIST_SIZE ; i++ ) fprintf(stderr,"%2d",module_dbg->barrier_bank_list[i]); fprintf(stderr," \n"); if( 0 == my_debug_rank ) { for( i=0 ; i < module_dbg->sm2_module_num_buffers ; i++ ) { for( j=0 ; j < my_debug_comm_size ; j++ ) { fprintf(stderr," buffer index %d tag %lld ptr %p \n", i, module_dbg->sm_buffer_descriptor[i].proc_memory[j].control_region->flag, module_dbg->sm_buffer_descriptor[i].proc_memory[j].control_region); } } } fflush(stderr); return; } /* end debug */ #endif /* * Local functions */ static void mca_coll_sm2_module_construct(mca_coll_sm2_module_t *module) { } static void mca_coll_sm2_module_destruct(mca_coll_sm2_module_t *module) { int i,ret; /* free the mmaped shared file */ if( module->shared_memory_region) { ret=munmap(module->shared_memory_region, module->size_sm2_backing_file); /* this is cleanup, no recovery will be done */ } /* free list of children in the barrier-tree */ if( NULL != module->sm_buffer_mgmt_barrier_tree.children_ranks ) { free(module->sm_buffer_mgmt_barrier_tree.children_ranks); } /* free non-blocking barrier request objects */ if( NULL != module->barrier_request ) { free(module->barrier_request); } /* free reduction tree */ if( NULL != module->reduction_tree ) { for( i=0 ; i < module->comm_size ; i++ ) { if( NULL != module->reduction_tree[i].children_ranks) { free(module->reduction_tree[i].children_ranks); } } free(module->reduction_tree); } /* free fan-out read tree */ if( NULL != module->fanout_read_tree ) { for( i=0 ; i < module->comm_size ; i++ ) { if( NULL != module->fanout_read_tree[i].children_ranks) { free(module->fanout_read_tree[i].children_ranks); } } free(module->fanout_read_tree); } /* done */ } static bool have_local_peers(ompi_group_t *group, size_t size) { size_t i; ompi_proc_t *proc; for (i = 0; i < size; ++i) { proc = ompi_group_peer_lookup(group,i); if (0 == (proc->proc_flags & OMPI_PROC_FLAG_LOCAL)) { return false; } } return true; } /* * Create mmaped shared file */ static int allocate_shared_file(size_t size, char **file_name, struct ompi_communicator_t *comm, char **sm_backing_file) { int fd = -1; int group_size,my_rank; int unique_comm_id; size_t len; char *f_name; bool i_create_shared_file=false; ssize_t p; int rc=0, sm_file_inited=0; struct iovec iov[3]; int sm_file_created; ompi_proc_t **comm_proc_list; /* get the list of procs */ comm_proc_list=comm->c_local_group->grp_proc_pointers; group_size=ompi_comm_size(comm); my_rank=ompi_comm_rank(comm); /* determine who will actually create the file */ if( my_rank == 0 ) { i_create_shared_file=true; } /* open the backing file. */ if( i_create_shared_file ) { /* * set file name */ /* generate id that will be different for non-overlapping * communicators. */ unique_comm_id=(int)getpid(); len=asprintf(&f_name, "%s"OPAL_PATH_SEP"sm_coll_v2_%0d_%0d",orte_process_info.job_session_dir, ompi_comm_get_cid(comm),unique_comm_id); if( 0 > len ) { return OMPI_ERROR; } *file_name=f_name; /* process initializing the file */ fd = open(*file_name, O_CREAT|O_RDWR, 0600); if (fd < 0) { opal_output(0,"mca_common_sm_mmap_init: open %s len %ld failed with errno=%d\n", *file_name, len, errno); goto file_opened; } /* map the file and initialize segment state */ *sm_backing_file = (char *) mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); if( (void*)-1 == sm_backing_file ) { opal_output(0, "mca_common_sm_mmap_init: mmap failed with errno=%d\n", errno); goto file_opened; } /* truncate the file to the requested size */ if(ftruncate(fd, size) != 0) { opal_output(0, "mca_common_sm_mmap_init: ftruncate failed with errno=%d\n", errno); goto file_opened; } /* if we got this far, the file has been initialized correctly */ sm_file_inited=1; file_opened: /* signal the rest of the local procs that the backing file * has been created - not very scalable, but for small shared * memory nodes is adequate for now */ for(p=1 ; p < group_size ; p++ ) { sm_file_created=OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED; iov[0].iov_base=&sm_file_created; iov[0].iov_len=sizeof(sm_file_created); iov[1].iov_base=&sm_file_inited; iov[1].iov_len=sizeof(sm_file_inited); iov[2].iov_base=&unique_comm_id; iov[2].iov_len=sizeof(unique_comm_id); rc=orte_rml.send(&(comm_proc_list[p]->proc_name),iov,3, OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED,0); if( rc < 0 ) { opal_output(0, "allocate_shared_file: orte_rml.send failed to %lu with errno=%d\n", (unsigned long)p, errno); goto return_error; } } if ( 0 == sm_file_inited ) { /* error - the sm backing file did not get opened correctly */ goto return_error; } } else { /* all other procs wait for the file to be initialized before using the backing file */ iov[0].iov_base=&sm_file_created; iov[0].iov_len=sizeof(sm_file_created); iov[1].iov_base=&sm_file_inited; iov[1].iov_len=sizeof(sm_file_inited); iov[2].iov_base=&unique_comm_id; iov[2].iov_len=sizeof(unique_comm_id); rc=orte_rml.recv(&(comm_proc_list[0]->proc_name),iov,3, OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED,0); if( rc < 0 ) { opal_output(0, "allocate_shared_file: orte_rml.recv failed from %ld with errno=%d\n", 0L, errno); goto return_error; } /* check to see if file inited correctly */ if( 0 == sm_file_inited ) { goto return_error; } /* set file name - we need the unique id for non-overlapping * communicators, that could have the same communicator id */ len=asprintf(&f_name, "%s"OPAL_PATH_SEP"sm_coll_v2_%0d_%0d",orte_process_info.job_session_dir, ompi_comm_get_cid(comm),unique_comm_id); if( 0 > len ) { return OMPI_ERROR; } *file_name=f_name; /* open backing file */ fd = open(*file_name, O_RDWR, 0600); if (fd < 0) { opal_output(0,"mca_common_sm_mmap_init: open %s len %ld failed with errno=%d\n", *file_name, len, errno); goto return_error; } /* map the file and initialize segment state */ *sm_backing_file = (char *) mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); if( (void*)-1 == sm_backing_file ) { opal_output(0, "mca_common_sm_mmap_init: mmap failed with errno=%d\n", errno); goto return_error; } } /* enable access by other processes on this host */ close(fd); return OMPI_SUCCESS; return_error: if( -1 != fd ) { close(fd); } if( NULL != sm_backing_file ) munmap((void*) sm_backing_file,size); return OMPI_ERROR; } static int barrier( struct ompi_communicator_t *comm , tree_node_t *multinomial_tree) { int group_size,my_rank,n_children,child,n_parents,my_fanout_parent; int child_rank, dummy; tree_node_t *my_node; int rc=0; struct iovec iov; ompi_proc_t **comm_proc_list; /* get the list of procs */ comm_proc_list=comm->c_local_group->grp_proc_pointers; group_size=ompi_comm_size(comm); my_rank=ompi_comm_rank(comm); my_node=&(multinomial_tree[my_rank]); n_children=my_node->n_children; n_parents=my_node->n_parents; my_fanout_parent=my_node->parent_rank; /* * fan in */ /* receive from the children */ for( child=0 ; child < n_children ; child++ ) { child_rank=my_node->children_ranks[child]; iov.iov_base=&dummy; iov.iov_len=sizeof(dummy); rc=orte_rml.recv(&(comm_proc_list[child_rank]->proc_name),&iov,1, OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED,0); if( rc < 0 ) { opal_output(0, "sm barrier fan-in: orte_rml.recv failed to %lu with errno=%d\n", (unsigned long)child_rank, errno); goto return_error; } } /* send to parent */ if( 0 < n_parents ) { iov.iov_base=&dummy; iov.iov_len=sizeof(dummy); rc=orte_rml.send(&(comm_proc_list[my_fanout_parent]->proc_name),&iov,1, OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED,0); if( rc < 0 ) { opal_output(0, "sm barrier fan-in: orte_rml.send failed to %lu with errno=%d\n", (unsigned long)my_fanout_parent, errno); goto return_error; } } /* * Fan out */ /* receive from parent */ if( 0 < n_parents ) { iov.iov_base=&dummy; iov.iov_len=sizeof(dummy); rc=orte_rml.recv(&(comm_proc_list[my_fanout_parent]->proc_name),&iov,1, OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED,0); if( rc < 0 ) { opal_output(0, "sm barrier fan-out: orte_rml.recv failed to %lu with errno=%d\n", (unsigned long)my_fanout_parent, errno); goto return_error; } } /* send to children */ for( child=0 ; child < n_children ; child++ ) { child_rank=my_node->children_ranks[child]; iov.iov_base=&dummy; iov.iov_len=sizeof(dummy); rc=orte_rml.send(&(comm_proc_list[child_rank]->proc_name),&iov,1, OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED,0); if( rc < 0 ) { opal_output(0, "sm barrier fan-out: orte_rml.send failed to %lu with errno=%d\n", (unsigned long)child_rank, errno); goto return_error; } } return OMPI_SUCCESS; return_error: return OMPI_ERROR; } /* setup an n-array tree */ static int setup_nary_tree(int tree_order, int my_rank, int num_nodes, tree_node_t *my_node) { /* local variables */ int n_levels, result; int my_level_in_tree, cnt, parent_cnt; int lvl,cum_cnt, my_rank_in_my_level,n_lvls_in_tree; int start_index,end_index; /* sanity check */ if( 1 >= tree_order ) { goto Error; } my_node->my_rank=my_rank; my_node->tree_size=num_nodes; /* figure out number of levels in tree */ n_levels=0; result=num_nodes-1; while (0 < result ) { result/=tree_order; n_levels++; }; /* figure out who my children and parents are */ my_level_in_tree=-1; result=my_rank; /* cnt - number of ranks in given level */ cnt=1; /* parent_cnt - cummulative count of ranks */ parent_cnt=0; while( 0 <= result ) { result-=cnt; cnt*=tree_order; my_level_in_tree++; }; /* int my_level_in_tree, n_children, n_parents; */ if( 0 == my_rank ) { my_node->n_parents=0; my_node->parent_rank=-1; my_rank_in_my_level=0; } else { my_node->n_parents=1; cnt=1; cum_cnt=0; for (lvl = 0 ; lvl < my_level_in_tree ; lvl ++ ) { /* cummulative count up to this level */ cum_cnt+=cnt; /* number of ranks in this level */ cnt*=tree_order; } my_rank_in_my_level=my_rank-cum_cnt; /* tree_order consecutive ranks have the same parent */ my_node->parent_rank=cum_cnt-cnt/tree_order+my_rank_in_my_level/tree_order; } /* figure out number of levels in the tree */ n_lvls_in_tree=0; result=num_nodes; /* cnt - number of ranks in given level */ cnt=1; /* parent_cnt - cummulative count of ranks */ parent_cnt=0; while( 0 < result ) { result-=cnt; cnt*=tree_order; n_lvls_in_tree++; }; my_node->children_ranks=(int *)NULL; /* get list of children */ if( my_level_in_tree == (n_lvls_in_tree -1 ) ) { /* last level has no children */ my_node->n_children=0; } else { cum_cnt=0; cnt=1; for( lvl=0 ; lvl <= my_level_in_tree ; lvl++ ) { cum_cnt+=cnt; cnt*=tree_order; } start_index=cum_cnt+my_rank_in_my_level*tree_order; end_index=start_index+tree_order-1; /* don't go out of bounds at the end of the list */ if( end_index >= num_nodes ) { end_index = num_nodes-1; } if( start_index <= (num_nodes-1) ) { my_node->n_children=end_index-start_index+1; } else { my_node->n_children=0; } my_node->children_ranks=NULL; if( 0 < my_node->n_children ) { my_node->children_ranks= (int *)malloc( sizeof(int)*my_node->n_children); if( NULL == my_node->children_ranks) { goto Error; } for (lvl= start_index ; lvl <= end_index ; lvl++ ) { my_node->children_ranks[lvl-start_index]=lvl; } } } /* set node type */ if( 0 == my_node->n_parents ) { my_node->my_node_type=ROOT_NODE; } else if ( 0 == my_node->n_children ) { my_node->my_node_type=LEAF_NODE; } else { my_node->my_node_type=INTERIOR_NODE; } /* successful return */ return OMPI_SUCCESS; Error: /* error return */ return OMPI_ERROR; } /* initialize barrier structures */ static int init_sm2_barrier(struct ompi_communicator_t *comm, mca_coll_sm2_component_t *component, mca_coll_sm2_module_t *module) { /*local variables */ int i,j,k,comm_size, my_rank, tree_order, rc; mca_coll_sm2_nb_request_process_shared_mem_t *sm_address; /* get order of fan-in and fan-out tree */ tree_order=component->order_barrier_tree; /* get communicator size */ comm_size=ompi_comm_size(comm); /* get rank within communictor */ my_rank=ompi_comm_rank(comm); /* initialize fan-in/fan-out tree */ rc=setup_nary_tree(tree_order, my_rank, comm_size, &(module->sm_buffer_mgmt_barrier_tree)); if( OMPI_SUCCESS != rc ) { goto Error; } /* Allocate barrier control structures - allocating one barrier structure * per memory bank. Allocating two shared memory regions per bank. */ module->barrier_request=(mca_coll_sm2_nb_request_process_private_mem_t *) malloc(sizeof(mca_coll_sm2_nb_request_process_private_mem_t) * component->sm2_num_mem_banks); if( NULL == module->barrier_request ){ rc=OMPI_ERROR; goto Error; } module->nb_barrier_tag=0; /* initialize barrier control structures */ for(i=0 ; i < component->sm2_num_mem_banks ; i++ ) { module->barrier_request[i].tag=0; module->barrier_request[i].sm_index=0; module->barrier_request[i].sm2_barrier_phase=NB_BARRIER_INACTIVE; /* set the base address of each barrier's shared memory regions */ for( j =0 ; j < 2 ; j++ ) { module->barrier_request[i].barrier_base_address[j]= (mca_coll_sm2_nb_request_process_shared_mem_t *) (module->shared_memory_region + /* there are 2 barrier structs per bank */ (2*i+j)*CACHE_LINE_SIZE); /* initialize per-process flags */ for(k=0 ; k < comm_size ; k++ ) { sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *) ((char *) (module->barrier_request[i].barrier_base_address[j])+ k*module->sm2_size_management_region_per_proc); sm_address->flag=0; } } } module->num_nb_barriers_started=0; module->num_nb_barriers_completed=0; /* set pointer to the collective operation buffers */ module->collective_buffer_region=module->shared_memory_region+ module->sm2_size_management_region_per_proc* module->sm_buffer_mgmt_barrier_tree.tree_size; /* set the pointer to the request that needs to be completed first */ module->current_request_index=0; /* set starting collective tag */ module->collective_tag=1; /* return - successful */ return OMPI_SUCCESS; Error: return rc; } /* query to see if the module is available for use on the given * communicator, and if so, what it's priority is. This is where * the backing shared-memory file is created. */ mca_coll_base_module_t * mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) { /* local variables */ mca_coll_sm2_module_t *sm_module; int i,j,group_size,ret; size_t alignment,size; size_t tot_size_mem_banks; size_t ctl_memory_per_proc_per_segment; size_t mem_management_per_proc_per_block; size_t mem_management_per_proc; size_t mem_management_total; size_t size_sm2_backing_file; size_t size_buff_ctl_per_proc,size_data_buff_per_proc; /* * This is activated only for intra-communicators */ if (OMPI_COMM_IS_INTER(comm) ) { return NULL; } /* * Use only if more than on proc in the communicator */ if (1 == ompi_comm_size(comm) ) { return NULL; } /* check to see if all procs are on the same node, and therefore * can communicate using shared memory */ if ( !have_local_peers(comm->c_local_group, ompi_comm_size(comm))) { return NULL; } /* Get our priority */ *priority = mca_coll_sm2_component.sm2_priority; /* allocate and initialize an sm-v2 module */ sm_module = OBJ_NEW(mca_coll_sm2_module_t); sm_module->super.coll_module_enable = sm2_module_enable; sm_module->super.ft_event = NULL; sm_module->super.coll_allgather = NULL; sm_module->super.coll_allgatherv = NULL; sm_module->super.coll_allreduce = mca_coll_sm2_allreduce_intra; sm_module->super.coll_alltoall = NULL; sm_module->super.coll_alltoallv = NULL; sm_module->super.coll_alltoallw = NULL; sm_module->super.coll_barrier = mca_coll_sm2_barrier_intra; sm_module->super.coll_bcast = mca_coll_sm2_bcast_intra; sm_module->super.coll_exscan = NULL; sm_module->super.coll_gather = NULL; sm_module->super.coll_gatherv = NULL; sm_module->super.coll_reduce = mca_coll_sm2_reduce_intra; sm_module->super.coll_reduce_scatter = NULL; sm_module->super.coll_scan = NULL; sm_module->super.coll_scatter = NULL; sm_module->super.coll_scatterv = NULL; /* * set up specific function to be used */ /* barrier */ sm_module->barrier_functions[FANIN_FAN_OUT_BARRIER_FN]= mca_coll_sm2_barrier_intra_fanin_fanout; sm_module->barrier_functions[RECURSIVE_DOUBLING_BARRIER_FN]= mca_coll_sm2_barrier_intra_fanin_fanout; if( ( 0 <= mca_coll_sm2_component.force_barrier ) && ( N_BARRIER_FNS > mca_coll_sm2_component.force_barrier ) ) { /* set user specifed function */ mca_coll_base_module_barrier_fn_t tmp_fn= sm_module->barrier_functions[mca_coll_sm2_component.force_barrier]; sm_module->barrier_functions[FANIN_FAN_OUT_BARRIER_FN]=tmp_fn; sm_module->barrier_functions[RECURSIVE_DOUBLING_BARRIER_FN]=tmp_fn; } /* reduce */ sm_module->list_reduce_functions[FANIN_REDUCE_FN]= mca_coll_sm2_reduce_intra_fanin; sm_module->list_reduce_functions[REDUCE_SCATTER_GATHER_FN]= mca_coll_sm2_reduce_intra_reducescatter_gather; sm_module->reduce_functions[SHORT_DATA_FN_REDUCE]= sm_module->list_reduce_functions[FANIN_REDUCE_FN]; sm_module->reduce_functions[LONG_DATA_FN_REDUCE]= sm_module->list_reduce_functions[REDUCE_SCATTER_GATHER_FN]; if( ( 0 <= mca_coll_sm2_component.force_reduce ) && ( N_REDUCE_FNS > mca_coll_sm2_component.force_reduce ) ) { /* set user specifed function */ mca_coll_base_module_reduce_fn_t tmp_fn=sm_module-> list_reduce_functions[mca_coll_sm2_component.force_reduce]; sm_module->reduce_functions[SHORT_DATA_FN_REDUCE]=tmp_fn; sm_module->reduce_functions[LONG_DATA_FN_REDUCE]=tmp_fn; } /* allreduce */ sm_module->list_allreduce_functions[FANIN_FANOUT_ALLREDUCE_FN]= mca_coll_sm2_allreduce_intra_fanin_fanout; sm_module->list_allreduce_functions[REDUCE_SCATTER_ALLGATHER_FN]= mca_coll_sm2_allreduce_intra_reducescatter_allgather; sm_module->allreduce_functions[SHORT_DATA_FN_ALLREDUCE]= sm_module->list_allreduce_functions[FANIN_FANOUT_ALLREDUCE_FN]; sm_module->allreduce_functions[LONG_DATA_FN_ALLREDUCE]= sm_module->list_allreduce_functions[REDUCE_SCATTER_ALLGATHER_FN]; if( ( 0 <= mca_coll_sm2_component.force_allreduce ) && ( N_ALLREDUCE_FNS > mca_coll_sm2_component.force_allreduce ) ) { /* set user specifed function */ mca_coll_base_module_allreduce_fn_t tmp_fn=sm_module-> list_allreduce_functions[mca_coll_sm2_component.force_allreduce]; sm_module->allreduce_functions[SHORT_DATA_FN_ALLREDUCE]=tmp_fn; sm_module->allreduce_functions[LONG_DATA_FN_ALLREDUCE]=tmp_fn; } /* * Some initialization */ sm_module->reduction_tree=NULL; sm_module->fanout_read_tree=NULL; /* * create backing file */ /* * set group size */ group_size=ompi_comm_size(comm); sm_module->module_comm=comm; sm_module->comm_size=group_size; sm_module->n_poll_loops=mca_coll_sm2_component.n_poll_loops; /* * set memory region parameters */ sm_module->sm2_module_num_memory_banks= mca_coll_sm2_component.sm2_num_mem_banks; sm_module->sm2_module_num_regions_per_bank= mca_coll_sm2_component.sm2_num_regions_per_bank; sm_module->sm2_module_num_buffers= mca_coll_sm2_component.sm2_num_regions_per_bank * mca_coll_sm2_component.sm2_num_mem_banks; /* allocate the array of memory descriptors used to describe the * shared memory buffers. This structure resides in process * private memory, but describes the shared memory. */ sm_module->sm_buffer_descriptor=(sm_work_buffer_t *)malloc( sizeof(sm_work_buffer_t)*sm_module->sm2_module_num_buffers); if( NULL == sm_module->sm_buffer_descriptor ) { goto CLEANUP; } #if 0 /* data buffers and management buffers are allocated in a single * contigous region */ /* * Now figure out how much memory to allocate for use as * working memory for the shared memory collectives. */ /* * get control region size */ /* just enough place for two flags per process */ ctl_memory_per_proc_per_segment=2*sizeof(long long); if( mca_coll_sm2_component.sm2_ctl_size_per_proc > ctl_memory_per_proc_per_segment ) ctl_memory_per_proc_per_segment=mca_coll_sm2_component.sm2_ctl_size_per_proc; /* pad this up to the alignment needed by the data segment, as the * that data segment will directly follow the control segment in * memory. */ alignment=mca_coll_sm2_component.sm2_data_alignment; ctl_memory_per_proc_per_segment= (alignment + ctl_memory_per_proc_per_segment -1) / alignment; ctl_memory_per_proc_per_segment*=alignment; mca_coll_sm2_component.sm2_ctl_size_allocated=ctl_memory_per_proc_per_segment; sm_module->ctl_memory_per_proc_per_segment=ctl_memory_per_proc_per_segment; /* get data region size - allocation happens on a page granularity, with * a minimum of a page allocated per proc, so adjust to this */ size=mca_coll_sm2_component.sm2_data_seg_size; if( size > mca_coll_sm2_component.sm2_max_data_seg_size ) size=mca_coll_sm2_component.sm2_max_data_seg_size; size_tot_per_proc_per_seg=size+ mca_coll_sm2_component.sm2_ctl_size_per_proc; if( size_tot_per_proc_per_seg < getpagesize()) size_tot_per_proc_per_seg=getpagesize(); /* round this up to the nearest integer page-size multiple */ size_tot_per_proc_per_seg= ( size_tot_per_proc_per_seg + getpagesize() - 1)/ getpagesize(); size_tot_per_proc_per_seg*=getpagesize(); /* compute segment memory needed */ size_tot_per_segment=group_size * size_tot_per_proc_per_seg ; sm_module->segement_size_per_process=size_tot_per_proc_per_seg; sm_module->segment_size=size_tot_per_segment; sm_module->data_memory_per_proc_per_segment=size_tot_per_proc_per_seg- ctl_memory_per_proc_per_segment; /* compute memory per bank */ tot_size_per_bank=size_tot_per_segment*mca_coll_sm2_component.sm2_num_regions_per_bank; /* compute total memory in the memory banks */ tot_size_mem_banks=tot_size_per_bank*mca_coll_sm2_component.sm2_num_mem_banks; sm_module->data_memory_per_proc_per_segment=size_tot_per_proc_per_seg- ctl_memory_per_proc_per_segment; #endif /* management structures are allocated is a one segment, and data buffers * in a separate segment */ /* * Now figure out how much memory to allocate for use as * working memory for the shared memory collectives. */ /* * get control region size */ /* just enough place for two flags per process */ ctl_memory_per_proc_per_segment=2*sizeof(long long); if( mca_coll_sm2_component.sm2_ctl_size_per_proc > ctl_memory_per_proc_per_segment ) ctl_memory_per_proc_per_segment=mca_coll_sm2_component.sm2_ctl_size_per_proc; /* pad this up to the alignment needed by the data segment, as the * that data segment will directly follow the control segment in * memory. */ alignment=mca_coll_sm2_component.sm2_data_alignment; ctl_memory_per_proc_per_segment= (alignment + ctl_memory_per_proc_per_segment -1) / alignment; ctl_memory_per_proc_per_segment*=alignment; mca_coll_sm2_component.sm2_ctl_size_allocated=ctl_memory_per_proc_per_segment; sm_module->ctl_memory_per_proc_per_segment=ctl_memory_per_proc_per_segment; /* get data region size - allocation happens on a page granularity, with * a minimum of a page allocated per proc, so adjust to this */ size=mca_coll_sm2_component.sm2_data_seg_size; if( size < getpagesize() ) size=getpagesize(); if( size > mca_coll_sm2_component.sm2_max_data_seg_size ) size=mca_coll_sm2_component.sm2_max_data_seg_size; size= ( size + getpagesize() - 1)/getpagesize(); size*=getpagesize(); sm_module->segment_size=size*group_size; size_data_buff_per_proc=size; /* compute size of management region - per proc */ size_buff_ctl_per_proc= ctl_memory_per_proc_per_segment*sm_module->sm2_module_num_buffers; size_buff_ctl_per_proc= ( size_buff_ctl_per_proc + getpagesize() - 1)/ getpagesize(); size_buff_ctl_per_proc*=getpagesize(); tot_size_mem_banks= /* size of buffer conrol region */ size_buff_ctl_per_proc*group_size+ /* size of data buffers */ size*sm_module->sm2_module_num_buffers*group_size; sm_module->size_of_collective_buffer_region=tot_size_mem_banks; sm_module->data_memory_per_proc_per_segment=size; /* * compute the amount of memory needed for the anynchromous barriers used to * manage the memory resources. */ /* for each bank, 2 sets of barrier buffers */ mem_management_per_proc_per_block= 2 * CACHE_LINE_SIZE ; /* add in number of banks */ mem_management_per_proc= mem_management_per_proc_per_block * mca_coll_sm2_component.sm2_num_mem_banks; /* round up to page multiples */ mem_management_per_proc=(mem_management_per_proc + getpagesize() -1 ) / getpagesize(); mem_management_per_proc*=getpagesize(); /* size of memory region, per process, for memory bank management */ sm_module->sm2_size_management_region_per_proc= mem_management_per_proc; /* total memory management required */ mem_management_total=mem_management_per_proc * group_size; sm_module->size_mem_banks_ctl_region=mem_management_total; /* * Memory for blocking collectives - need two sets of memory * regions for this. */ /* size per proc */ size=2*sizeof(mca_coll_sm2_nb_request_process_shared_mem_t); /* page align */ size=(size + getpagesize() -1 ) / getpagesize(); size*=getpagesize(); sm_module->per_proc_size_of_blocking_barrier_region=size; sm_module->size_of_blocking_barrier_region=size*group_size; /* total size of backing file - this assumes the mmap allocation * occurs on page boundaries, and that all segments are paged * aligned */ size_sm2_backing_file=sm_module->size_mem_banks_ctl_region+ sm_module->size_of_collective_buffer_region+ sm_module->size_of_blocking_barrier_region; sm_module->size_sm2_backing_file=size_sm2_backing_file; /* set file name */ /* len=asprintf(&(sm_module->coll_sm2_file_name), "%s"OPAL_PATH_SEP"sm_coll_v2%s_%0d\0",orte_process_info.job_session_dir, orte_process_info.nodename,ompi_comm_get_cid(comm)); if( 0 > len ) { goto CLEANUP; } */ /* allocate backing file */ ret=allocate_shared_file(size_sm2_backing_file, &(sm_module->coll_sm2_file_name), comm, &(sm_module->shared_memory_region)); if( MPI_SUCCESS != ret ) { goto CLEANUP; } /* intialize barrier structures */ ret=init_sm2_barrier(comm, &mca_coll_sm2_component, sm_module); if( MPI_SUCCESS != ret ) { goto CLEANUP; } /* initialize reduction tree */ sm_module->reduction_tree=(tree_node_t *) malloc( sizeof(tree_node_t )*group_size); if( NULL == sm_module->reduction_tree ) { goto CLEANUP; } ret=setup_multinomial_tree(mca_coll_sm2_component.order_reduction_tree, group_size,sm_module->reduction_tree); if( MPI_SUCCESS != ret ) { goto CLEANUP; } /* initialize fan-out read tree */ sm_module->fanout_read_tree=(tree_node_t *) malloc( sizeof(tree_node_t )*group_size); if( NULL == sm_module->fanout_read_tree ) { goto CLEANUP; } ret=setup_multinomial_tree(mca_coll_sm2_component.order_fanout_read_tree, group_size,sm_module->fanout_read_tree); if( MPI_SUCCESS != ret ) { goto CLEANUP; } /* initialize recursive doubling tree */ ret=setup_recursive_doubling_tree_node(group_size, ompi_comm_rank(comm), &(sm_module->recursive_doubling_tree)); if( MPI_SUCCESS != ret ) { goto CLEANUP; } /* initialize local counters */ sm_module->sm2_allocated_buffer_index=-1; sm_module->sm2_freed_buffer_index=-1; /* setup shared memory memory descriptors */ for( i=0 ; i < sm_module->sm2_module_num_buffers ; i++ ) { char *base_buffer; volatile mca_coll_sm2_nb_request_process_shared_mem_t *ctl_ptr; /* set the base address for this working buffer */ base_buffer= sm_module->collective_buffer_region+ /* offset past control data structures */ size_buff_ctl_per_proc*group_size + i*sm_module->segment_size; sm_module->sm_buffer_descriptor[i].base_segment_address=base_buffer; /* allocate array to keep data on each segment in the buffer. * One segment per process in the group. */ sm_module->sm_buffer_descriptor[i].proc_memory= (sm_memory_region_desc_t *)malloc(sizeof(sm_memory_region_desc_t)* group_size); if( NULL == sm_module->sm_buffer_descriptor[i].proc_memory ) { goto CLEANUP; } /* set bank index */ sm_module->sm_buffer_descriptor[i].bank_index= i/sm_module->sm2_module_num_regions_per_bank; sm_module->sm_buffer_descriptor[i].index_first_buffer_in_bank= sm_module->sm_buffer_descriptor[i].bank_index * sm_module->sm2_module_num_regions_per_bank; sm_module->sm_buffer_descriptor[i].index_last_buffer_in_bank= ((sm_module->sm_buffer_descriptor[i].bank_index+1) * sm_module->sm2_module_num_regions_per_bank)-1; for(j=0 ; j < group_size ; j++ ) { ctl_ptr=(volatile mca_coll_sm2_nb_request_process_shared_mem_t *) (base_buffer+j* sm_module->segement_size_per_process); sm_module->sm_buffer_descriptor[i].proc_memory[j].control_region= (volatile mca_coll_sm2_nb_request_process_shared_mem_t *) /* offset to temp space */ (sm_module->collective_buffer_region+ /* offset to the per-proc control region */ size_buff_ctl_per_proc*j+ /* offset to control structure for the i'th buffer */ ctl_memory_per_proc_per_segment*i); sm_module->sm_buffer_descriptor[i].proc_memory[j].data_segment= (char *)base_buffer+ /* offset to data segment for the j'th proc */ j*size_data_buff_per_proc; /* initialize the control region */ sm_module->sm_buffer_descriptor[i].proc_memory[j].control_region-> flag=0; sm_module->sm_buffer_descriptor[i].proc_memory[j].control_region->flag=0; } } /* allocate process private scratch space */ sm_module->scratch_space=(int *)malloc(sizeof(int)*group_size); if( NULL == sm_module->scratch_space) { goto CLEANUP; } /* * setup blocking barrier data structures */ sm_module->sm_blocking_barrier_region= sm_module->shared_memory_region+ sm_module->size_mem_banks_ctl_region+ sm_module->size_of_collective_buffer_region; sm_module->index_blocking_barrier_memory_bank=0; sm_module->ctl_blocking_barrier= (volatile mca_coll_sm2_nb_request_process_shared_mem_t ***) malloc(2*sizeof(mca_coll_sm2_nb_request_process_shared_mem_t **)); if( NULL == sm_module->ctl_blocking_barrier ) { goto CLEANUP; } sm_module->ctl_blocking_barrier[0]= (mca_coll_sm2_nb_request_process_shared_mem_t **) malloc(group_size*sizeof(mca_coll_sm2_nb_request_process_shared_mem_t *)); if( NULL == sm_module->ctl_blocking_barrier[0]) { goto CLEANUP; } sm_module->ctl_blocking_barrier[1]= (mca_coll_sm2_nb_request_process_shared_mem_t **) malloc(group_size*sizeof(mca_coll_sm2_nb_request_process_shared_mem_t *)); if( NULL == sm_module->ctl_blocking_barrier[1]) { goto CLEANUP; } for( j= 0 ; j < 2 ; j++ ) { for( i=0 ; i < group_size ; i++ ) { sm_module->ctl_blocking_barrier[j][i]= (mca_coll_sm2_nb_request_process_shared_mem_t * ) ( sm_module->sm_blocking_barrier_region+ j*sizeof(mca_coll_sm2_nb_request_process_shared_mem_t)+ i*sm_module->per_proc_size_of_blocking_barrier_region ) ; sm_module->ctl_blocking_barrier[j][i]->flag=0; } } /* set the switch-over parameter */ sm_module->short_message_size=mca_coll_sm2_component.short_message_size; /* touch pages to apply memory affinity - Note: do we really need this or will * the algorithms do this */ /* make sure all procs are done with setup - need to avoid initializing * shared memory regions already in use */ ret=barrier(comm,sm_module->reduction_tree); if( MPI_SUCCESS != ret ) { goto CLEANUP; } /* return */ return &(sm_module->super); CLEANUP: if( NULL != sm_module->coll_sm2_file_name ) { free(sm_module->coll_sm2_file_name); sm_module->coll_sm2_file_name=NULL; } if( NULL != sm_module->reduction_tree ) { free(sm_module->coll_sm2_file_name); sm_module->coll_sm2_file_name=NULL; } if( NULL != sm_module->sm_buffer_descriptor ) { for(i=0 ; i < group_size ; i++ ) { if(NULL != sm_module->sm_buffer_descriptor[i].proc_memory) { free(sm_module->sm_buffer_descriptor[i].proc_memory); sm_module->sm_buffer_descriptor[i].proc_memory=NULL; } } free(sm_module->sm_buffer_descriptor); sm_module->sm_buffer_descriptor=NULL; } if(sm_module->scratch_space) { free(sm_module->scratch_space); sm_module->scratch_space=NULL; } for( i= 0 ; i < group_size ; i++ ) { if( NULL != sm_module->ctl_blocking_barrier[0][i] ) { free( sm_module->ctl_blocking_barrier[0][i]); sm_module->ctl_blocking_barrier[0][i]=NULL; } if( NULL != sm_module->ctl_blocking_barrier[1][i] ) { free( sm_module->ctl_blocking_barrier[1][i]); sm_module->ctl_blocking_barrier[1][i]=NULL; } } if( NULL != sm_module->ctl_blocking_barrier ) { free(sm_module->ctl_blocking_barrier); sm_module->ctl_blocking_barrier=NULL; } OBJ_RELEASE(sm_module); return NULL; } /* * Init module on the communicator */ static int sm2_module_enable(mca_coll_base_module_t *module, struct ompi_communicator_t *comm) { /* local variables */ char output_buffer[2*MPI_MAX_OBJECT_NAME]; memset(&output_buffer[0],0,sizeof(output_buffer)); snprintf(output_buffer,sizeof(output_buffer),"%s (cid %d)", comm->c_name, comm->c_contextid); opal_output_verbose(10, mca_coll_base_output, "coll:sm2:enable: new communicator: %s", output_buffer); /* All done */ return OMPI_SUCCESS; } /* progress barrier */ static int progress_nb_barrier(mca_coll_sm2_module_t *module) { int rc,barrier_index; if( module->num_nb_barriers_started != module->num_nb_barriers_completed ) { /* is there anything to progress ? */ /* get index of barrier structure to progress. The one to progress * is the one right after the last competed nb barrier. No need * to subtract 1 for the index, as the number completed is the index * of the next one to complete. */ barrier_index=(module->num_nb_barriers_completed% module->sm2_module_num_memory_banks); rc=mca_coll_sm2_nbbarrier_intra_progress(module->module_comm, &(module->barrier_request[barrier_index]), (mca_coll_base_module_t *)module); if( OMPI_SUCCESS != rc ) { return rc; } /* if barrier is completed, transition it to inactive, and point to * the request object for then next bank */ if ( NB_BARRIER_DONE == module->barrier_request[barrier_index].sm2_barrier_phase ) { /* set request to inactive */ module->barrier_request[barrier_index].sm2_barrier_phase= NB_BARRIER_INACTIVE; module->num_nb_barriers_completed++; /* change pointer to the shared data structure to use next time */ module->barrier_request[barrier_index].sm_index^=1; } } return OMPI_SUCCESS; } /* allocate working buffer */ sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module) { /* local variables */ int rc,buffer_index; /* progress active barrier */ rc=progress_nb_barrier(module); if( OMPI_SUCCESS != rc ) { return NULL; } /* get next buffer index */ module->sm2_allocated_buffer_index++; /* check for wrap-around */ if( module->sm2_allocated_buffer_index == module->sm2_module_num_buffers ) { module->sm2_allocated_buffer_index=0; } /* If this is the first buffer in the bank, see if the barrier * needs to be completed */ buffer_index=module->sm2_allocated_buffer_index; if( buffer_index == module->sm_buffer_descriptor[buffer_index]. index_first_buffer_in_bank ) { /* are there incomplete barriers ? */ int num_incomlete_barriers=module->num_nb_barriers_started - module->num_nb_barriers_completed; /* only complete the one we want to use. If there are less than * module->sm2_module_num_memory_banks active banks, not need to * worry about completion, as completion is ordered. */ while( num_incomlete_barriers == module->sm2_module_num_memory_banks ) { rc=progress_nb_barrier(module); if( OMPI_SUCCESS != rc ) { return NULL; } num_incomlete_barriers=module->num_nb_barriers_started - module->num_nb_barriers_completed; } } /* end pooling waiting to be able to use the memory bank */ return &(module->sm_buffer_descriptor[buffer_index]); } /* free working buffer - it is assumed that buffers are released in * the order they are allocated. We can assume this because each * communiator will have only one outstanding collective at a given * time, and we ensure that operations are completed in order. */ int free_sm2_shared_buffer(mca_coll_sm2_module_t *module) { /* local variables */ int rc,buffer_index; mca_coll_sm2_nb_request_process_private_mem_t *request; /* progress active barrier */ rc=progress_nb_barrier(module); if( OMPI_SUCCESS != rc ) { return rc; } /* get next buffer index */ module->sm2_freed_buffer_index++; /* check for wrap-around */ if( module->sm2_freed_buffer_index == module->sm2_module_num_buffers ) { module->sm2_freed_buffer_index=0; } buffer_index=module->sm2_freed_buffer_index; if( buffer_index == module->sm_buffer_descriptor[buffer_index]. index_last_buffer_in_bank ) { int barrier_index=module-> sm_buffer_descriptor[buffer_index].bank_index; /* start non-blocking barrier */ request=&(module->barrier_request[barrier_index]); rc=mca_coll_sm2_nbbarrier_intra(module->module_comm, request,(mca_coll_base_module_t *)module); if( OMPI_SUCCESS !=rc ) { return rc; } module->num_nb_barriers_started++; /* the mca_coll_sm2_nbbarrier_intra never completes the barrier, * so no need to check. This is needed for order completion. */ } /* return */ return OMPI_SUCCESS; } OBJ_CLASS_INSTANCE(mca_coll_sm2_module_t, mca_coll_base_module_t, mca_coll_sm2_module_construct, mca_coll_sm2_module_destruct);