First putback of some sm BTL latency optimizations:
* The main thing done here is to convert from multiple FIFOs/queues per receiver (each receiver has one FIFO for each sender) to a single FIFO/queue per receiver (all senders sharing the same FIFO for a given receiver). * This requires rewriting the FIFO support, so that ompi/class/ompi_[circular_buffer_]fifo.h is no longer used and FIFO support is instead in btl_sm.h. * The number of FIFOs per receiver is actually an MCA tunable parameter, but it appears that 1 or possibly 2 FIFOs (even for 112 local processes) per receiver is sufficient. This commit was SVN r20578.
Этот коммит содержится в:
родитель
558fc2836d
Коммит
5bbf5ba7d7
@ -37,7 +37,6 @@
|
||||
#include "opal/mca/maffinity/base/base.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "opal/util/printf.h"
|
||||
#include "ompi/class/ompi_fifo.h"
|
||||
#include "ompi/class/ompi_free_list.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/mca/btl/btl.h"
|
||||
@ -115,31 +114,6 @@ static void *mpool_calloc(size_t nmemb, size_t size)
|
||||
return buf;
|
||||
}
|
||||
|
||||
static int init_fifos(ompi_fifo_t *f, int n)
|
||||
{
|
||||
int j;
|
||||
for(j=0; j < n; j++) {
|
||||
f[j].head = (ompi_cb_fifo_wrapper_t*)OMPI_CB_FREE;
|
||||
f[j].tail = (ompi_cb_fifo_wrapper_t*)OMPI_CB_FREE;
|
||||
if(opal_using_threads()) {
|
||||
char *buf = (char *) mpool_calloc(2, CACHE_LINE_SIZE);
|
||||
/* allocate head and tail locks on different cache lines */
|
||||
if(NULL == buf)
|
||||
return OMPI_ERROR;
|
||||
|
||||
f[j].head_lock = (opal_atomic_lock_t*)buf;
|
||||
f[j].tail_lock = (opal_atomic_lock_t*)(buf + CACHE_LINE_SIZE);
|
||||
opal_atomic_init(f[j].head_lock, OPAL_ATOMIC_UNLOCKED);
|
||||
opal_atomic_init(f[j].tail_lock, OPAL_ATOMIC_UNLOCKED);
|
||||
} else {
|
||||
f[j].head_lock = NULL;
|
||||
f[j].tail_lock = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static void init_maffinity(int *my_mem_node, int *max_mem_node)
|
||||
{
|
||||
static opal_carto_graph_t *topo;
|
||||
@ -199,7 +173,7 @@ static int sm_btl_first_time_init(mca_btl_sm_t *sm_btl, int n)
|
||||
{
|
||||
size_t size, length, length_payload;
|
||||
char *sm_ctl_file;
|
||||
ompi_fifo_t *my_fifos;
|
||||
sm_fifo_t *my_fifos;
|
||||
int my_mem_node=-1, num_mem_nodes=-1, i;
|
||||
|
||||
init_maffinity(&my_mem_node, &num_mem_nodes);
|
||||
@ -253,7 +227,7 @@ static int sm_btl_first_time_init(mca_btl_sm_t *sm_btl, int n)
|
||||
/* Pass in a data segment alignment of 0 to get no data
|
||||
segment (only the shared control structure) */
|
||||
size = sizeof(mca_common_sm_file_header_t) +
|
||||
n * (sizeof(ompi_fifo_t*) + sizeof(char *) + sizeof(uint16_t)) + CACHE_LINE_SIZE;
|
||||
n * (sizeof(sm_fifo_t*) + sizeof(char *) + sizeof(uint16_t)) + CACHE_LINE_SIZE;
|
||||
if(!(mca_btl_sm_component.mmap_file =
|
||||
mca_common_sm_mmap_init(size, sm_ctl_file,
|
||||
sizeof(mca_common_sm_file_header_t),
|
||||
@ -280,7 +254,7 @@ static int sm_btl_first_time_init(mca_btl_sm_t *sm_btl, int n)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
mca_btl_sm_component.shm_fifo = (ompi_fifo_t **)mca_btl_sm_component.mmap_file->data_addr;
|
||||
mca_btl_sm_component.shm_fifo = (sm_fifo_t **)mca_btl_sm_component.mmap_file->data_addr;
|
||||
mca_btl_sm_component.shm_bases = (char**)(mca_btl_sm_component.shm_fifo + n);
|
||||
mca_btl_sm_component.shm_mem_nodes = (uint16_t*)(mca_btl_sm_component.shm_bases + n);
|
||||
|
||||
@ -304,17 +278,8 @@ static int sm_btl_first_time_init(mca_btl_sm_t *sm_btl, int n)
|
||||
mca_btl_sm_component.shm_mem_nodes[mca_btl_sm_component.my_smp_rank] =
|
||||
(uint16_t)my_mem_node;
|
||||
|
||||
/*
|
||||
* initialize the array of fifo's "owned" by this process
|
||||
* The virtual addresses are valid only in the sender's
|
||||
* address space - unless the base of the shared memory
|
||||
* segment is mapped at the same location in the reader's
|
||||
* virtual address space.
|
||||
*/
|
||||
if(NULL == (my_fifos = (ompi_fifo_t*)mpool_calloc(n, sizeof(ompi_fifo_t))))
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
|
||||
if(init_fifos(my_fifos, n) != OMPI_SUCCESS)
|
||||
/* initialize the array of fifo's "owned" by this process */
|
||||
if(NULL == (my_fifos = (sm_fifo_t*)mpool_calloc(FIFO_MAP_NUM(n), sizeof(sm_fifo_t))))
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
|
||||
mca_btl_sm_component.shm_fifo[mca_btl_sm_component.my_smp_rank] = my_fifos;
|
||||
@ -323,7 +288,7 @@ static int sm_btl_first_time_init(mca_btl_sm_t *sm_btl, int n)
|
||||
|
||||
/* cache the pointer to the 2d fifo array. These addresses
|
||||
* are valid in the current process space */
|
||||
mca_btl_sm_component.fifo = (ompi_fifo_t**)malloc(sizeof(ompi_fifo_t*) * n);
|
||||
mca_btl_sm_component.fifo = (sm_fifo_t**)malloc(sizeof(sm_fifo_t*) * n);
|
||||
|
||||
if(NULL == mca_btl_sm_component.fifo)
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
@ -502,15 +467,30 @@ int mca_btl_sm_add_procs(
|
||||
|
||||
bases = mca_btl_sm_component.shm_bases;
|
||||
|
||||
/* initialize own FIFOs */
|
||||
/*
|
||||
* The receiver initializes all its FIFOs. All components will
|
||||
* be allocated near the receiver. Nothing will be local to
|
||||
* "the sender" since there will be many senders.
|
||||
*/
|
||||
for(j = mca_btl_sm_component.num_smp_procs;
|
||||
j < mca_btl_sm_component.num_smp_procs + FIFO_MAP_NUM(n_local_procs); j++) {
|
||||
|
||||
return_code = sm_fifo_init( mca_btl_sm_component.fifo_size,
|
||||
mca_btl_sm_component.sm_mpool,
|
||||
&mca_btl_sm_component.fifo[my_smp_rank][j],
|
||||
mca_btl_sm_component.fifo_lazy_free);
|
||||
if(return_code != OMPI_SUCCESS)
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* coordinate with other processes */
|
||||
for(j = mca_btl_sm_component.num_smp_procs;
|
||||
j < mca_btl_sm_component.num_smp_procs + n_local_procs; j++) {
|
||||
ptrdiff_t diff;
|
||||
int peer_mem_node;
|
||||
|
||||
if(j == my_smp_rank)
|
||||
continue;
|
||||
|
||||
/* spin until this element is allocated */
|
||||
/* doesn't really wait for that process... FIFO might be allocated, but not initialized */
|
||||
while(NULL == mca_btl_sm_component.shm_fifo[j]) {
|
||||
opal_atomic_rmb();
|
||||
opal_progress();
|
||||
@ -522,33 +502,10 @@ int mca_btl_sm_add_procs(
|
||||
|
||||
/* store local address of remote fifos */
|
||||
mca_btl_sm_component.fifo[j] =
|
||||
(ompi_fifo_t*)OFFSET2ADDR(diff, mca_btl_sm_component.shm_fifo[j]);
|
||||
|
||||
/* don't forget to update the head_lock if allocated because this
|
||||
* address is also in the remote process */
|
||||
if(mca_btl_sm_component.fifo[j][my_smp_rank].head_lock != NULL) {
|
||||
mca_btl_sm_component.fifo[j][my_smp_rank].head_lock =
|
||||
(opal_atomic_lock_t*)OFFSET2ADDR(diff, mca_btl_sm_component.fifo[j][my_smp_rank].head_lock);
|
||||
}
|
||||
(sm_fifo_t*)OFFSET2ADDR(diff, mca_btl_sm_component.shm_fifo[j]);
|
||||
|
||||
/* cache local copy of peer memory node number */
|
||||
peer_mem_node = mca_btl_sm_component.mem_nodes[j] = mca_btl_sm_component.shm_mem_nodes[j];
|
||||
|
||||
/* Initialize fifo for use. Note that sender does initialization */
|
||||
return_code = ompi_fifo_init(mca_btl_sm_component.size_of_cb_queue,
|
||||
mca_btl_sm_component.cb_lazy_free_freq,
|
||||
mca_btl_sm_component.cb_max_num,
|
||||
/* fifo mpool */
|
||||
mca_btl_sm_component.sm_mpools[peer_mem_node],
|
||||
/* head mpool */
|
||||
mca_btl_sm_component.sm_mpool,
|
||||
/* tail mpool */
|
||||
mca_btl_sm_component.sm_mpools[peer_mem_node],
|
||||
&mca_btl_sm_component.fifo[j][my_smp_rank],
|
||||
mca_btl_sm_component.sm_offset[j]);
|
||||
|
||||
if(return_code != OMPI_SUCCESS)
|
||||
goto CLEANUP;
|
||||
mca_btl_sm_component.mem_nodes[j] = mca_btl_sm_component.shm_mem_nodes[j];
|
||||
}
|
||||
|
||||
/* update the local smp process count */
|
||||
@ -781,7 +738,7 @@ int mca_btl_sm_sendi( struct mca_btl_base_module_t* btl,
|
||||
* address
|
||||
*/
|
||||
MCA_BTL_SM_FIFO_WRITE(endpoint, endpoint->my_smp_rank,
|
||||
endpoint->peer_smp_rank, frag->hdr, false, rc);
|
||||
endpoint->peer_smp_rank, (void *) VIRTUAL2RELATIVE(frag->hdr), false, rc);
|
||||
return rc;
|
||||
}
|
||||
*descriptor = mca_btl_sm_alloc( btl, endpoint, order,
|
||||
@ -817,7 +774,7 @@ int mca_btl_sm_send( struct mca_btl_base_module_t* btl,
|
||||
* address
|
||||
*/
|
||||
MCA_BTL_SM_FIFO_WRITE(endpoint, endpoint->my_smp_rank,
|
||||
endpoint->peer_smp_rank, frag->hdr, false, rc);
|
||||
endpoint->peer_smp_rank, (void *) VIRTUAL2RELATIVE(frag->hdr), false, rc);
|
||||
if( OPAL_LIKELY(0 == rc) ) {
|
||||
return 1; /* the data is completely gone */
|
||||
}
|
||||
|
@ -39,7 +39,6 @@
|
||||
#include "opal/class/opal_free_list.h"
|
||||
#include "ompi/class/ompi_free_list.h"
|
||||
#include "ompi/class/ompi_bitmap.h"
|
||||
#include "ompi/class/ompi_fifo.h"
|
||||
#include "opal/event/event.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/mca/btl/btl.h"
|
||||
@ -54,6 +53,30 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Shared Memory FIFOs
|
||||
*/
|
||||
|
||||
struct sm_fifo_t {
|
||||
/* This queue pointer is used only by the heads. */
|
||||
volatile void **queue; char pad0[CACHE_LINE_SIZE - sizeof(void **) ];
|
||||
/* This lock is used by the heads. */
|
||||
opal_atomic_lock_t head_lock; char pad1[CACHE_LINE_SIZE - sizeof(opal_atomic_lock_t)];
|
||||
/* This index is used by the head holding the head lock. */
|
||||
volatile int head; char pad2[CACHE_LINE_SIZE - sizeof(int) ];
|
||||
/* This mask is used "read only" by all processes. */
|
||||
unsigned int mask; char pad3[CACHE_LINE_SIZE - sizeof(int) ];
|
||||
/* The following are used only by the tail. */
|
||||
volatile void **queue_recv;
|
||||
opal_atomic_lock_t tail_lock;
|
||||
volatile int tail;
|
||||
int num_to_clear;
|
||||
int lazy_free; char pad4[CACHE_LINE_SIZE - sizeof(void **)
|
||||
- sizeof(opal_atomic_lock_t)
|
||||
- sizeof(int) * 3 ];
|
||||
};
|
||||
typedef struct sm_fifo_t sm_fifo_t;
|
||||
|
||||
/*
|
||||
* Shared Memory resource managment
|
||||
*/
|
||||
@ -87,18 +110,18 @@ struct mca_btl_sm_component_t {
|
||||
mca_common_sm_mmap_t *mmap_file; /**< description of mmap'ed file */
|
||||
mca_common_sm_file_header_t *sm_ctl_header; /* control header in
|
||||
shared memory */
|
||||
ompi_fifo_t **shm_fifo; /**< pointer to fifo 2D array in shared memory */
|
||||
sm_fifo_t **shm_fifo; /**< pointer to fifo 2D array in shared memory */
|
||||
char **shm_bases; /**< pointer to base pointers in shared memory */
|
||||
uint16_t *shm_mem_nodes; /**< pointer to mem noded in shared memory */
|
||||
ompi_fifo_t **fifo; /**< cached copy of the pointer to the 2D
|
||||
sm_fifo_t **fifo; /**< cached copy of the pointer to the 2D
|
||||
fifo array. The address in the shared
|
||||
memory segment sm_ctl_header is a relative,
|
||||
but this one, in process private memory, is
|
||||
a real virtual address */
|
||||
uint16_t *mem_nodes; /**< cached copy of mem nodes of each local rank */
|
||||
size_t size_of_cb_queue; /**< size of each circular buffer queue array */
|
||||
size_t cb_lazy_free_freq; /**< frequency of lazy free */
|
||||
int cb_max_num; /**< max number of circular buffers for each peer */
|
||||
uint16_t *mem_nodes; /**< cached copy of mem nodes of each local rank */
|
||||
size_t fifo_size; /**< number of FIFO queue entries */
|
||||
size_t fifo_lazy_free; /**< number of reads before lazy fifo free is triggered */
|
||||
int nfifos; /**< number of FIFOs per receiver */
|
||||
ptrdiff_t *sm_offset; /**< offset to be applied to shared memory
|
||||
addresses, per local process value */
|
||||
int32_t num_smp_procs; /**< current number of smp procs on this host */
|
||||
@ -131,6 +154,116 @@ struct btl_sm_pending_send_item_t
|
||||
};
|
||||
typedef struct btl_sm_pending_send_item_t btl_sm_pending_send_item_t;
|
||||
|
||||
/***
|
||||
* FIFO support for sm BTL.
|
||||
*/
|
||||
|
||||
/***
|
||||
* One or more FIFO components may be a pointer that must be
|
||||
* accessed by multiple processes. Since the shared region may
|
||||
* be mmapped differently into each process's address space,
|
||||
* these pointers will be relative to some base address. Here,
|
||||
* we define macros to translate between relative addresses and
|
||||
* virtual addresses.
|
||||
*/
|
||||
#define VIRTUAL2RELATIVE(VADDR ) ((long)(VADDR) - (long)mca_btl_sm_component.shm_bases[mca_btl_sm_component.my_smp_rank])
|
||||
#define RELATIVE2VIRTUAL(OFFSET) ((long)(OFFSET) + (long)mca_btl_sm_component.shm_bases[mca_btl_sm_component.my_smp_rank])
|
||||
|
||||
/* ================================================== */
|
||||
/* ================================================== */
|
||||
/* ================================================== */
|
||||
|
||||
#define SM_FIFO_FREE (void *) (-2)
|
||||
|
||||
static inline int sm_fifo_init(int fifo_size, mca_mpool_base_module_t *mpool,
|
||||
sm_fifo_t *fifo, int lazy_free)
|
||||
{
|
||||
int i, qsize;
|
||||
|
||||
/* figure out the queue size (a power of two that is at least 1) */
|
||||
qsize = 1;
|
||||
while ( qsize < fifo_size )
|
||||
qsize <<= 1;
|
||||
|
||||
/* allocate the queue in the receiver's address space */
|
||||
fifo->queue_recv = (volatile void **)mpool->mpool_alloc(
|
||||
mpool, sizeof(void *) * qsize, CACHE_LINE_SIZE, 0, NULL);
|
||||
if(NULL == fifo->queue_recv) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* initialize the queue */
|
||||
for ( i = 0; i < qsize; i++ )
|
||||
fifo->queue_recv[i] = SM_FIFO_FREE;
|
||||
|
||||
/* shift queue address to be relative */
|
||||
fifo->queue = (volatile void **) VIRTUAL2RELATIVE(fifo->queue_recv);
|
||||
|
||||
/* initialize the locks */
|
||||
opal_atomic_init(&(fifo->head_lock), OPAL_ATOMIC_UNLOCKED);
|
||||
opal_atomic_init(&(fifo->tail_lock), OPAL_ATOMIC_UNLOCKED);
|
||||
opal_atomic_unlock(&(fifo->head_lock)); /* should be unnecessary */
|
||||
opal_atomic_unlock(&(fifo->tail_lock)); /* should be unnecessary */
|
||||
|
||||
/* other initializations */
|
||||
fifo->head = 0;
|
||||
fifo->mask = qsize - 1;
|
||||
fifo->tail = 0;
|
||||
fifo->num_to_clear = 0;
|
||||
fifo->lazy_free = lazy_free;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static inline int sm_fifo_write(void *value, sm_fifo_t *fifo)
|
||||
{
|
||||
volatile void **q = (volatile void **) RELATIVE2VIRTUAL(fifo->queue);
|
||||
|
||||
/* if there is no free slot to write, report exhausted resource */
|
||||
if ( SM_FIFO_FREE != q[fifo->head] )
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
|
||||
/* otherwise, write to the slot and advance the head index */
|
||||
opal_atomic_rmb();
|
||||
q[fifo->head] = value;
|
||||
fifo->head = (fifo->head + 1) & fifo->mask;
|
||||
opal_atomic_wmb();
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static inline void *sm_fifo_read(sm_fifo_t *fifo)
|
||||
{
|
||||
void *value;
|
||||
|
||||
/* read the next queue entry */
|
||||
value = (void *) fifo->queue_recv[fifo->tail];
|
||||
|
||||
opal_atomic_rmb();
|
||||
|
||||
/* if you read a non-empty slot, advance the tail pointer */
|
||||
if ( SM_FIFO_FREE != value ) {
|
||||
|
||||
fifo->tail = ( fifo->tail + 1 ) & fifo->mask;
|
||||
fifo->num_to_clear += 1;
|
||||
|
||||
/* check if it's time to free slots, which we do lazily */
|
||||
if ( fifo->num_to_clear >= fifo->lazy_free ) {
|
||||
int i = (fifo->tail - fifo->num_to_clear ) & fifo->mask;
|
||||
|
||||
while ( fifo->num_to_clear > 0 ) {
|
||||
fifo->queue_recv[i] = SM_FIFO_FREE;
|
||||
i = (i+1) & fifo->mask;
|
||||
fifo->num_to_clear -= 1;
|
||||
}
|
||||
opal_atomic_wmb();
|
||||
}
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register shared memory module parameters with the MCA framework
|
||||
*/
|
||||
|
@ -135,29 +135,29 @@ int mca_btl_sm_component_open(void)
|
||||
mca_btl_sm_param_register_int("sm_extra_procs", -1);
|
||||
mca_btl_sm_component.sm_mpool_name =
|
||||
mca_btl_sm_param_register_string("mpool", "sm");
|
||||
mca_btl_sm_component.size_of_cb_queue =
|
||||
mca_btl_sm_param_register_int("size_of_cb_queue", 128);
|
||||
mca_btl_sm_component.cb_lazy_free_freq =
|
||||
mca_btl_sm_param_register_int("cb_lazy_free_freq", 120);
|
||||
mca_btl_sm_component.cb_max_num =
|
||||
mca_btl_sm_param_register_int("cb_max_num", -1);
|
||||
/* make sure that queue size and lazy free frequency are consistent -
|
||||
* want to make sure that slots are freed at a rate they can be
|
||||
* reused, w/o allocating extra new circular buffer fifo arrays */
|
||||
if( (float)(mca_btl_sm_component.cb_lazy_free_freq) >=
|
||||
0.95*(float)(mca_btl_sm_component.size_of_cb_queue) ) {
|
||||
/* upper limit */
|
||||
mca_btl_sm_component.cb_lazy_free_freq=
|
||||
(int)(0.95*(float)(mca_btl_sm_component.size_of_cb_queue));
|
||||
/* lower limit */
|
||||
if( 0>= mca_btl_sm_component.cb_lazy_free_freq ) {
|
||||
mca_btl_sm_component.cb_lazy_free_freq=1;
|
||||
}
|
||||
mca_btl_sm_component.fifo_size =
|
||||
mca_btl_sm_param_register_int("fifo_size", 4096);
|
||||
mca_btl_sm_component.nfifos =
|
||||
mca_btl_sm_param_register_int("num_fifos", 1);
|
||||
/* make sure the number of fifos is a power of 2 */
|
||||
{
|
||||
int i = 1;
|
||||
while ( i < mca_btl_sm_component.nfifos )
|
||||
i <<= 1;
|
||||
mca_btl_sm_component.nfifos = i;
|
||||
}
|
||||
mca_btl_sm_component.fifo_lazy_free =
|
||||
mca_btl_sm_param_register_int("fifo_lazy_free", 120);
|
||||
|
||||
/* make sure that queue size and lazy free parameter are compatible */
|
||||
if (mca_btl_sm_component.fifo_lazy_free >= (mca_btl_sm_component.fifo_size >> 1) )
|
||||
mca_btl_sm_component.fifo_lazy_free = (mca_btl_sm_component.fifo_size >> 1);
|
||||
if (mca_btl_sm_component.fifo_lazy_free <= 0)
|
||||
mca_btl_sm_component.fifo_lazy_free = 1;
|
||||
|
||||
/* default number of extra procs to allow for future growth */
|
||||
mca_btl_sm_component.sm_extra_procs =
|
||||
mca_btl_sm_param_register_int("sm_extra_procs", 2);
|
||||
mca_btl_sm_param_register_int("sm_extra_procs", 0);
|
||||
|
||||
mca_btl_sm.super.btl_exclusivity = MCA_BTL_EXCLUSIVITY_HIGH-1;
|
||||
mca_btl_sm.super.btl_eager_limit = 4*1024;
|
||||
@ -380,37 +380,28 @@ int mca_btl_sm_component_progress(void)
|
||||
/* local variables */
|
||||
mca_btl_sm_frag_t *frag;
|
||||
mca_btl_sm_frag_t Frag;
|
||||
ompi_fifo_t *fifo = NULL;
|
||||
sm_fifo_t *fifo = NULL;
|
||||
mca_btl_sm_hdr_t *hdr;
|
||||
int my_smp_rank = mca_btl_sm_component.my_smp_rank;
|
||||
int peer_smp_rank, rc = 0;
|
||||
int peer_smp_rank, j, rc = 0;
|
||||
|
||||
/* poll each fifo */
|
||||
for(peer_smp_rank = 0; peer_smp_rank < mca_btl_sm_component.num_smp_procs;
|
||||
peer_smp_rank++) {
|
||||
if(peer_smp_rank == mca_btl_sm_component.my_smp_rank)
|
||||
continue;
|
||||
|
||||
fifo = &(mca_btl_sm_component.fifo[my_smp_rank][peer_smp_rank]);
|
||||
for(j = 0; j < FIFO_MAP_NUM(mca_btl_sm_component.num_smp_procs); j++) {
|
||||
fifo = &(mca_btl_sm_component.fifo[my_smp_rank][j]);
|
||||
recheck_peer:
|
||||
/* if fifo is not yet setup - continue - not data has been sent*/
|
||||
if(OMPI_CB_FREE == fifo->tail){
|
||||
continue;
|
||||
}
|
||||
|
||||
/* aquire thread lock */
|
||||
if(opal_using_threads()) {
|
||||
opal_atomic_lock(fifo->tail_lock);
|
||||
opal_atomic_lock(&(fifo->tail_lock));
|
||||
}
|
||||
|
||||
hdr = (mca_btl_sm_hdr_t *)ompi_fifo_read_from_tail(fifo);
|
||||
hdr = (mca_btl_sm_hdr_t *)sm_fifo_read(fifo);
|
||||
|
||||
/* release thread lock */
|
||||
if(opal_using_threads()) {
|
||||
opal_atomic_unlock(fifo->tail_lock);
|
||||
opal_atomic_unlock(&(fifo->tail_lock));
|
||||
}
|
||||
|
||||
if(OMPI_CB_FREE == hdr) {
|
||||
if(SM_FIFO_FREE == hdr) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -422,8 +413,12 @@ int mca_btl_sm_component_progress(void)
|
||||
mca_btl_active_message_callback_t* reg;
|
||||
/* change the address from address relative to the shared
|
||||
* memory address, to a true virtual address */
|
||||
hdr = (mca_btl_sm_hdr_t *)((char *)hdr +
|
||||
mca_btl_sm_component.sm_offset[peer_smp_rank]);
|
||||
hdr = (mca_btl_sm_hdr_t *) RELATIVE2VIRTUAL(hdr);
|
||||
peer_smp_rank = hdr->my_smp_rank;
|
||||
if ( FIFO_MAP(peer_smp_rank) != j )
|
||||
opal_output(0, "mca_btl_sm_component_progress: "
|
||||
"rank %d got %d on FIFO %d, but this sender should send to FIFO %d\n",
|
||||
my_smp_rank, peer_smp_rank, j, FIFO_MAP(peer_smp_rank));
|
||||
/* recv upcall */
|
||||
reg = mca_btl_base_active_message_trigger + hdr->tag;
|
||||
Frag.segment.seg_addr.pval = ((char*)hdr) +
|
||||
@ -467,6 +462,19 @@ int mca_btl_sm_component_progress(void)
|
||||
}
|
||||
default:
|
||||
/* unknown */
|
||||
/*
|
||||
* This code path should presumably never be called.
|
||||
* It's unclear how it should be written.
|
||||
* If we want to return it to the sending process,
|
||||
* we have to figure out who the sender is.
|
||||
* It seems we need to subtract the mask bits.
|
||||
* Then, hopefully this is an sm header that has an smp_rank field.
|
||||
* Presumably that means the received header was relative.
|
||||
* Or, maybe this code should just be removed.
|
||||
*/
|
||||
opal_output(0, "mca_btl_sm_component_progress read an unknown type of header");
|
||||
hdr = (mca_btl_sm_hdr_t *) RELATIVE2VIRTUAL(hdr);
|
||||
peer_smp_rank = hdr->my_smp_rank;
|
||||
hdr = (mca_btl_sm_hdr_t*)((uintptr_t)hdr->frag |
|
||||
MCA_BTL_SM_FRAG_STATUS_MASK);
|
||||
MCA_BTL_SM_FIFO_WRITE(
|
||||
|
@ -4,25 +4,53 @@
|
||||
#include "btl_sm.h"
|
||||
#include "btl_sm_endpoint.h"
|
||||
|
||||
|
||||
/*
|
||||
* FIFO_MAP(x) defines which FIFO on the receiver should be used
|
||||
* by sender rank x. The map is some many-to-one hash.
|
||||
*
|
||||
* FIFO_MAP_NUM(n) defines how many FIFOs the receiver has for
|
||||
* n senders.
|
||||
*
|
||||
* That is,
|
||||
*
|
||||
* for all 0 <= x < n:
|
||||
*
|
||||
* 0 <= FIFO_MAP(x) < FIFO_MAP_NUM(n)
|
||||
*
|
||||
* For example, using some power-of-two nfifos, we could have
|
||||
*
|
||||
* FIFO_MAP(x) = x & (nfifos-1)
|
||||
* FIFO_MAP_NUM(n) = min(nfifos,n)
|
||||
*
|
||||
* Interesting limits include:
|
||||
*
|
||||
* nfifos very large: In this case, each sender has its
|
||||
* own dedicated FIFO on each receiver and the receiver
|
||||
* has one FIFO per sender.
|
||||
*
|
||||
* nfifos == 1: In this case, all senders use the same
|
||||
* FIFO and each receiver has just one FIFO for all senders.
|
||||
*/
|
||||
#define FIFO_MAP(x) ((x) & (mca_btl_sm_component.nfifos - 1))
|
||||
#define FIFO_MAP_NUM(n) ( (mca_btl_sm_component.nfifos) < (n) ? (mca_btl_sm_component.nfifos) : (n) )
|
||||
|
||||
|
||||
#define MCA_BTL_SM_FIFO_WRITE(endpoint_peer, my_smp_rank, \
|
||||
peer_smp_rank, hdr, resend, rc) \
|
||||
do { \
|
||||
ompi_fifo_t* fifo; \
|
||||
fifo=&(mca_btl_sm_component.fifo[peer_smp_rank][my_smp_rank]); \
|
||||
sm_fifo_t* fifo = &(mca_btl_sm_component.fifo[peer_smp_rank][FIFO_MAP(my_smp_rank)]); \
|
||||
\
|
||||
/* thread lock */ \
|
||||
if(opal_using_threads()) \
|
||||
opal_atomic_lock(fifo->head_lock); \
|
||||
opal_atomic_lock(&(fifo->head_lock)); \
|
||||
/* post fragment */ \
|
||||
if(ompi_fifo_write_to_head(hdr, fifo) != OMPI_SUCCESS) { \
|
||||
if(sm_fifo_write(hdr, fifo) != OMPI_SUCCESS) { \
|
||||
btl_sm_add_pending(endpoint_peer, hdr, resend); \
|
||||
rc = OMPI_ERR_RESOURCE_BUSY; \
|
||||
} else { \
|
||||
MCA_BTL_SM_SIGNAL_PEER(endpoint_peer); \
|
||||
rc = OMPI_SUCCESS; \
|
||||
} \
|
||||
if(opal_using_threads()) \
|
||||
opal_atomic_unlock(fifo->head_lock); \
|
||||
opal_atomic_unlock(&(fifo->head_lock)); \
|
||||
} while(0)
|
||||
|
||||
#endif
|
||||
|
@ -27,6 +27,7 @@ static inline void mca_btl_sm_frag_common_constructor(mca_btl_sm_frag_t* frag)
|
||||
MCA_BTL_SM_FRAG_ACK);
|
||||
frag->segment.seg_addr.pval = ((char*)frag->hdr) +
|
||||
sizeof(mca_btl_sm_hdr_t);
|
||||
frag->hdr->my_smp_rank = mca_btl_sm_component.my_smp_rank;
|
||||
}
|
||||
frag->segment.seg_len = frag->size;
|
||||
frag->base.des_src = &frag->segment;
|
||||
|
@ -41,11 +41,8 @@ struct mca_btl_sm_frag_t;
|
||||
struct mca_btl_sm_hdr_t {
|
||||
struct mca_btl_sm_frag_t *frag;
|
||||
size_t len;
|
||||
int my_smp_rank;
|
||||
mca_btl_base_tag_t tag;
|
||||
/* Add a 4 byte pad to round out structure to 16 bytes for 32-bit
|
||||
* and to 24 bytes for 64-bit. Helps prevent bus errors for strict
|
||||
* alignment cases like SPARC. */
|
||||
char pad[4];
|
||||
};
|
||||
typedef struct mca_btl_sm_hdr_t mca_btl_sm_hdr_t;
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user