diff --git a/src/mca/ptl/sm/src/ptl_sm.c b/src/mca/ptl/sm/src/ptl_sm.c index 10a1d745b1..90deb37277 100644 --- a/src/mca/ptl/sm/src/ptl_sm.c +++ b/src/mca/ptl/sm/src/ptl_sm.c @@ -61,13 +61,13 @@ int mca_ptl_sm_add_procs( struct mca_ptl_base_peer_t **peers, ompi_bitmap_t* reachability) { - int i,proc,my_smp_rank,return_code=OMPI_SUCCESS; - size_t size,len,my_len,n_local_procs; + int i,j,proc,return_code=OMPI_SUCCESS; + size_t size,len,my_len,n_local_procs,n_to_allocate; mca_ptl_sm_exchange_t **sm_proc_info; ompi_proc_t* my_proc; /* pointer to caller's proc structure */ mca_ptl_sm_t *ptl_sm; bool threads; - char file_name[PATH_MAX]; + ompi_fifo_t *fifo_addr; /* initializion */ for(i=0 ; i < nprocs ; i++ ) { @@ -100,7 +100,7 @@ int mca_ptl_sm_add_procs( for( proc=0 ; proc < nprocs; proc++ ) { /* don't compare with self */ if( my_proc == procs[proc] ) { - ptl_sm->my_smp_rank=n_local_procs; + mca_ptl_sm_component.my_smp_rank=n_local_procs; n_local_procs++; continue; } @@ -130,7 +130,7 @@ int mca_ptl_sm_add_procs( goto CLEANUP; } peers[proc]->peer_smp_rank=n_local_procs+ - ptl_sm->num_smp_procs; + mca_ptl_sm_component.num_smp_procs; n_local_procs++; /* add this proc to shared memory accessability list */ @@ -144,7 +144,7 @@ int mca_ptl_sm_add_procs( } /* make sure that my_smp_rank has been defined */ - if(-1 == ptl_sm->my_smp_rank){ + if(-1 == mca_ptl_sm_component.my_smp_rank){ return_code=OMPI_ERROR; goto CLEANUP; } @@ -153,53 +153,129 @@ int mca_ptl_sm_add_procs( * rapid access */ for( proc=0 ; proc < nprocs; proc++ ) { if(NULL != peers[proc] ) { - peers[proc]->my_smp_rank=ptl_sm->my_smp_rank; + peers[proc]->my_smp_rank=mca_ptl_sm_component.my_smp_rank; } } + /* set the shared memory offset */ + mca_ptl_sm_component.sm_offset=(size_t) + (mca_ptl_sm_component.sm_mpool->mpool_base()); + /* Allocate Shared Memory PTL process coordination * data structure. This will reside in shared memory */ - /* Create backing file */ - /* set file name */ - len=asprintf(&(mca_ptl_sm_component.sm_resouce_ctl_file), - "%s/shared_mem_ptl_module.%s",ompi_process_info.job_session_dir, - ompi_system_info.nodename); - if( 0 > len ) { - goto CLEANUP; - } - size=sizeof(mca_ptl_sm_module_resource_t); - if(NULL == - (mca_ptl_sm_component.mmap_file = - mca_common_sm_mmap_init(size, - mca_ptl_sm_component.sm_resouce_ctl_file, - sizeof(mca_ptl_sm_module_resource_t), 8 ))) - { - ompi_output(0, "mca_ptl_sm_add_procs: unable to create shared memory PTL coordinating strucure :: size %ld \n", - size); - return_code=OMPI_ERROR; - goto CLEANUP; - } + /* Create backing file - only first time through */ + if ( 0 == mca_ptl_sm_component.num_smp_procs ) { + /* set file name */ + len=asprintf(&(mca_ptl_sm_component.sm_resouce_ctl_file), + "%s/shared_mem_ptl_module.%s",ompi_process_info.job_session_dir, + ompi_system_info.nodename); + if( 0 > len ) { + goto CLEANUP; + } - /* Allocate a fixed size pointer array for the 2-D Shared memory queues. - * Excess slots will be allocated for future growth. One could - * make this array growable, but then one would need to uses mutexes - * for any access to these queues to ensure data consistancy when - * the array is grown */ - if(0 == ptl_sm->my_smp_rank ) { - mca_ptl_sm_component.mmap_file->map_seg->seg_inited=true; - } + size=sizeof(mca_ptl_sm_module_resource_t); + if(NULL == (mca_ptl_sm_component.mmap_file = mca_common_sm_mmap_init(size, + mca_ptl_sm_component.sm_resouce_ctl_file, + sizeof(mca_ptl_sm_module_resource_t), 8 ))) + { + ompi_output(0, "mca_ptl_sm_add_procs: unable to create shared memory PTL coordinating strucure :: size %ld \n", + size); + return_code=OMPI_ERROR; + goto CLEANUP; + } - /* Note: Need to make sure that proc 0 initializes control - * structures before any of the other procs can progress */ - if( 0 != ptl_sm->my_smp_rank ) { - } + /* set the pointer to the shared memory control structure */ + mca_ptl_sm_component.sm_ctl_header=(mca_ptl_sm_module_resource_t *) + mca_ptl_sm_component.mmap_file->map_seg; - /* Initizlize queue data structures - * - proc with lowest local rank does this - * - all the rest of the procs block until the queues are - * initialized - * - initial queue size is zero */ + /* Allocate a fixed size pointer array for the 2-D Shared memory queues. + * Excess slots will be allocated for future growth. One could + * make this array growable, but then one would need to uses mutexes + * for any access to these queues to ensure data consistancy when + * the array is grown */ + + if(0 == mca_ptl_sm_component.my_smp_rank ) { + /* allocate ompi_fifo_t strucutes for each fifo of the queue + * pairs - one per pair of local processes */ + /* check to make sure number of local procs is within the + * specified limits */ + if( ( 0 < mca_ptl_sm_component.sm_max_procs ) && + ( n_local_procs > mca_ptl_sm_component.sm_max_procs + ) ) { + return_code=OMPI_ERROR; + goto CLEANUP; + } + + /* see if need to allocate space for extra procs */ + if( 0 > mca_ptl_sm_component.sm_max_procs ) { + if( 0 < mca_ptl_sm_component.sm_extra_procs ) { + mca_ptl_sm_component.sm_max_procs= + n_local_procs+mca_ptl_sm_component.sm_extra_procs; + } else { + mca_ptl_sm_component.sm_max_procs=n_local_procs; + } + } + n_to_allocate=mca_ptl_sm_component.sm_max_procs; + + /* allocate array of ompi_fifo_t* elements - + * offset relative to base segement is stored, so that + * this can be used by other procs */ + mca_ptl_sm_component.sm_ctl_header->fifo= + mca_ptl_sm_component.sm_mpool->mpool_alloc + (n_to_allocate*sizeof(ompi_fifo_t *), + CACHE_LINE_SIZE); + if ( NULL == mca_ptl_sm_component.sm_ctl_header->fifo ) { + return_code=OMPI_ERR_OUT_OF_RESOURCE; + goto CLEANUP; + } + mca_ptl_sm_component.sm_ctl_header->fifo= + (volatile ompi_fifo_t **) + ( (char *)(mca_ptl_sm_component.sm_ctl_header->fifo)- + (char *)(mca_ptl_sm_component.sm_mpool->mpool_base()) ); + + /* allocate vectors of ompi_fifo_t - one per + * process - offsets will be stored */ + size=n_to_allocate*sizeof(ompi_fifo_t); + for( i=0 ; i < n_to_allocate ; i++ ) { + mca_ptl_sm_component.sm_ctl_header->fifo[i]= + mca_ptl_sm_component.sm_mpool->mpool_alloc + (size, CACHE_LINE_SIZE); + if ( NULL == mca_ptl_sm_component.sm_ctl_header->fifo[i] ) { + return_code=OMPI_ERR_OUT_OF_RESOURCE; + goto CLEANUP; + } + mca_ptl_sm_component.sm_ctl_header->fifo[i]= + (volatile ompi_fifo_t *) + ( (char *)(mca_ptl_sm_component.sm_ctl_header->fifo[i]) - + (char *)(mca_ptl_sm_component.sm_mpool->mpool_base()) ); + /* initialize the ompi_fifo_t structures */ + for( j=0 ; j < n_to_allocate ; j++ ) { + fifo_addr=(ompi_fifo_t *) ( + ((char *)(mca_ptl_sm_component.sm_ctl_header->fifo[i])) + + mca_ptl_sm_component.sm_offset); + fifo_addr[j].head=OMPI_CB_FREE; + fifo_addr[j].tail=OMPI_CB_FREE; + ompi_atomic_unlock(&(fifo_addr[j].head_lock)); + ompi_atomic_unlock(&(fifo_addr[j].tail_lock)); + } + } + + /* allow other procs to use this shared memory map */ + mca_ptl_sm_component.mmap_file->map_seg->seg_inited=true; + } + + /* Note: Need to make sure that proc 0 initializes control + * structures before any of the other procs can progress */ + if( 0 != mca_ptl_sm_component.my_smp_rank ) { + } + + /* Initizlize queue data structures + * - proc with lowest local rank does this + * - all the rest of the procs block until the queues are + * initialized + * - initial queue size is zero */ + } /* free local memory */ if(sm_proc_info){ @@ -213,7 +289,7 @@ int mca_ptl_sm_add_procs( } /* update the local smp process count */ - ptl_sm->num_smp_procs+=n_local_procs; + mca_ptl_sm_component.num_smp_procs+=n_local_procs; CLEANUP: if(sm_proc_info){ diff --git a/src/mca/ptl/sm/src/ptl_sm.h b/src/mca/ptl/sm/src/ptl_sm.h index 19c91132bf..9ba769da01 100644 --- a/src/mca/ptl/sm/src/ptl_sm.h +++ b/src/mca/ptl/sm/src/ptl_sm.h @@ -16,12 +16,18 @@ #include "mca/ptl/ptl.h" #include "mca/mpool/mpool.h" #include "mca/common/sm/common_sm_mmap.h" +#include "class/ompi_fifo.h" /* * Shared Memory resource managment */ struct mca_ptl_sm_module_resource_t { + /* base control structures */ mca_common_sm_file_header_t segment_header; + + /* fifo queues - offsets relative to the base of the share memory + * segment will be stored here */ + volatile ompi_fifo_t **fifo; }; typedef struct mca_ptl_sm_module_resource_t mca_ptl_sm_module_resource_t; extern mca_ptl_sm_module_resource_t mca_ptl_sm_module_resource; @@ -35,6 +41,7 @@ struct mca_ptl_sm_component_t { int sm_free_list_max; /**< maximum size of free lists */ int sm_free_list_inc; /**< number of elements to alloc when growing free lists */ int sm_max_procs; /**< upper limit on the number of processes using the shared memory pool */ + int sm_extra_procs; /**< number of extra procs to allow */ char* sm_mpool_name; /**< name of shared memory pool module */ mca_mpool_base_module_t* sm_mpool; /**< shared memory pool */ void* sm_mpool_base; /**< base address of shared memory pool */ @@ -46,6 +53,14 @@ struct mca_ptl_sm_component_t { to coordinate resource usage */ mca_common_sm_mmap_t *mmap_file; /**< description of mmap'ed file */ + mca_ptl_sm_module_resource_t *sm_ctl_header; /* control header in + shared memory */ + size_t sm_offset; /**< offset to be applied to shared memory + addresses */ + int num_smp_procs; /**< current number of smp procs on this + host */ + int my_smp_rank; /**< My SMP process rank. Used for accessing + * SMP specfic data structures. */ }; typedef struct mca_ptl_sm_component_t mca_ptl_sm_component_t; extern mca_ptl_sm_component_t mca_ptl_sm_component; @@ -95,10 +110,6 @@ extern int mca_ptl_sm_component_progress( */ struct mca_ptl_sm_t { mca_ptl_base_module_t super; /**< base PTL interface */ - int num_smp_procs; /**< current number of smp procs on this - host */ - int my_smp_rank; /**< My SMP process rank. Used for accessing - * SMP specfic data structures. */ }; typedef struct mca_ptl_sm_t mca_ptl_sm_t; diff --git a/src/mca/ptl/sm/src/ptl_sm_component.c b/src/mca/ptl/sm/src/ptl_sm_component.c index 5217c9587c..c9908467de 100644 --- a/src/mca/ptl/sm/src/ptl_sm_component.c +++ b/src/mca/ptl/sm/src/ptl_sm_component.c @@ -41,7 +41,7 @@ static int mca_ptl_sm_component_exchange(void); */ mca_ptl_sm_component_t mca_ptl_sm_component = { - { + { /* super is being filled in */ /* First, the mca_base_component_t struct containing meta information about the component itself */ { @@ -65,7 +65,7 @@ mca_ptl_sm_component_t mca_ptl_sm_component = { mca_ptl_sm_component_init, mca_ptl_sm_component_control, mca_ptl_sm_component_progress, - } + } /* end super */ }; @@ -113,6 +113,10 @@ int mca_ptl_sm_component_open(void) mca_ptl_sm_component.sm_mpool_name = mca_ptl_sm_param_register_string("mpool", "sm"); + /* default number of extra procs to allow for future growth */ + mca_ptl_sm_component.sm_extra_procs = + mca_ptl_sm_param_register_int("sm_extra_procs", 2); + /* initialize objects */ OBJ_CONSTRUCT(&mca_ptl_sm_component.sm_lock, ompi_mutex_t); OBJ_CONSTRUCT(&mca_ptl_sm_component.sm_send_requests, ompi_free_list_t); @@ -189,8 +193,8 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init( /* initialize some PTL data */ /* start with no SM procs */ - mca_ptl_sm.num_smp_procs=0; - mca_ptl_sm.my_smp_rank=-1; + mca_ptl_sm_component.num_smp_procs=0; + mca_ptl_sm_component.my_smp_rank=-1; return ptls; }