1
1

Clean up circular buffer implementation. Get rid of _same_base_address()

functions by pre-calculating everything in advance.

This commit was SVN r13923.
Этот коммит содержится в:
Gleb Natapov 2007-03-05 14:27:26 +00:00
родитель 8078ae5977
Коммит be018944d2
5 изменённых файлов: 199 добавлений и 355 удалений

Просмотреть файл

@ -57,116 +57,35 @@ struct ompi_cb_fifo_ctl_t {
/* number of entries that have been used, but not invalidated. used /* number of entries that have been used, but not invalidated. used
* for lazy resource reclamation */ * for lazy resource reclamation */
volatile int num_to_clear; int num_to_clear;
}; };
typedef struct ompi_cb_fifo_ctl_t ompi_cb_fifo_ctl_t; typedef struct ompi_cb_fifo_ctl_t ompi_cb_fifo_ctl_t;
/* data structure used to describe the fifo */ /* data structure used to describe the fifo */
struct ompi_cb_fifo_t { struct ompi_cb_fifo_t {
/* head of queue - where next entry will be written (sender address)*/
ompi_cb_fifo_ctl_t *head;
/* size of fifo */ /* tail of queue - next element to read (receiver address) */
int size; ompi_cb_fifo_ctl_t *tail;
/* head of queue - where next entry will be written (receiver address) */
ompi_cb_fifo_ctl_t *recv_head;
/* circular buffer array (sender address) */
volatile void **queue;
/* circular buffer array (receiver address) */
volatile void **recv_queue;
/* frequency of lazy free */ /* frequency of lazy free */
int lazy_free_frequency; int lazy_free_frequency;
/* fifo memory locality index */
int fifo_memory_locality_index;
/* head memory locality index */
int head_memory_locality_index;
/* tail memory locality index */
int tail_memory_locality_index;
/* head of queue - where next entry will be written */
ompi_cb_fifo_ctl_t *head;
/* tail of queue - next element to read */
ompi_cb_fifo_ctl_t *tail;
/* mask - to handle wrap around */ /* mask - to handle wrap around */
unsigned int mask; unsigned int mask;
/* circular buffer array */
volatile void **queue;
}; };
typedef struct ompi_cb_fifo_t ompi_cb_fifo_t; typedef struct ompi_cb_fifo_t ompi_cb_fifo_t;
/**
* Try to read pointer from the tail of the queue
*
* @param data Pointer to where data was be written (OUT)
*
* @param fifo Pointer to data structure defining this fifo (IN)
*
* @param flush_entries_read force the lazy free to happen (IN)
*
* @param queue_empty checks to see if the fifo is empty, but only if
* flush_entries_read is set (OUT)
*
* @returncode Slot index to which data is written
*
*/
static inline void *ompi_cb_fifo_read_from_tail(ompi_cb_fifo_t *fifo,
bool flush_entries_read, bool *queue_empty, ptrdiff_t offset)
{
int old_fifo_index, clearIndex, i;
void **q_ptr;
ompi_cb_fifo_ctl_t *h_ptr, *t_ptr;
void *read_from_tail = (void *)OMPI_CB_ERROR;
*queue_empty=false;
h_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->head) + offset);
t_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->tail) + offset);
q_ptr=(void **)( (char *)(fifo->queue) + offset);
old_fifo_index = t_ptr->fifo_index;
/* check to see that the data is valid */
if ((q_ptr[old_fifo_index] == OMPI_CB_FREE) ||
(q_ptr[old_fifo_index] == OMPI_CB_RESERVED))
{
return (void *)OMPI_CB_FREE;
}
/* set return data */
read_from_tail = (void *)q_ptr[old_fifo_index];
opal_atomic_rmb();
t_ptr->num_to_clear++;
/* increment counter for later lazy free */
(t_ptr->fifo_index)++;
(t_ptr->fifo_index) &= fifo->mask;
/* check to see if time to do a lazy free of queue slots */
if ( (t_ptr->num_to_clear == fifo->lazy_free_frequency) ||
flush_entries_read ) {
clearIndex = old_fifo_index - t_ptr->num_to_clear + 1;
clearIndex &= fifo->mask;
for (i = 0; i < t_ptr->num_to_clear; i++) {
q_ptr[clearIndex] = OMPI_CB_FREE;
clearIndex++;
clearIndex &= fifo->mask;
}
t_ptr->num_to_clear = 0;
/* check to see if queue is empty */
if( flush_entries_read &&
(t_ptr->fifo_index == h_ptr->fifo_index) ) {
*queue_empty=true;
}
}
/* return */
return read_from_tail;
}
/** /**
* Return the fifo size * Return the fifo size
* *
@ -177,7 +96,7 @@ static inline void *ompi_cb_fifo_read_from_tail(ompi_cb_fifo_t *fifo,
*/ */
static inline int ompi_cb_fifo_size(ompi_cb_fifo_t *fifo) { static inline int ompi_cb_fifo_size(ompi_cb_fifo_t *fifo) {
return fifo->size; return fifo->mask + 1;
} }
/** /**
@ -205,75 +124,67 @@ static inline int ompi_cb_fifo_size(ompi_cb_fifo_t *fifo) {
* @returncode Error code * @returncode Error code
* *
*/ */
static inline int ompi_cb_fifo_init_same_base_addr(int size_of_fifo, static inline int ompi_cb_fifo_init(int size_of_fifo,
int lazy_free_freq, int fifo_memory_locality_index, int lazy_free_freq, int fifo_memory_locality_index,
int head_memory_locality_index, int tail_memory_locality_index, int head_memory_locality_index, int tail_memory_locality_index,
ompi_cb_fifo_t *fifo, mca_mpool_base_module_t *memory_allocator) ompi_cb_fifo_t *fifo, ptrdiff_t offset,
mca_mpool_base_module_t *memory_allocator)
{ {
int i, size;
char *buf;
int i; /* verify that size is power of 2, and greater that 0 - if not,
size_t len_to_allocate;
/* verify that size is power of 2, and greater than 0 - if not,
* round up */ * round up */
if ( 0 >= size_of_fifo) { if(size_of_fifo <= 0) {
return OMPI_ERROR; return OMPI_ERROR;
} }
/* set fifo size */ /* set fifo size */
fifo->size = opal_round_up_to_nearest_pow2(size_of_fifo); size = opal_round_up_to_nearest_pow2(size_of_fifo);
/* set lazy free frequence */ /* set lazy free frequence */
if( ( 0 >= lazy_free_freq ) || if((lazy_free_freq <= 0) || (lazy_free_freq > size)) {
( lazy_free_freq > fifo->size) ) {
return OMPI_ERROR; return OMPI_ERROR;
} }
fifo->lazy_free_frequency = lazy_free_freq; fifo->lazy_free_frequency = lazy_free_freq;
/* this will be used to mask off the higher order bits, /* this will be used to mask off the higher order bits,
* and use the & operator for the wrap-around */ * and use the & operator for the wrap-around */
fifo->mask = (fifo->size - 1); fifo->mask = (size - 1);
/* allocate fifo array */ /* allocate fifo array */
len_to_allocate = sizeof(void *) * fifo->size; buf = memory_allocator->mpool_alloc(memory_allocator,
fifo->queue = (volatile void**)memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); sizeof(void *) * size + 2*CACHE_LINE_SIZE, CACHE_LINE_SIZE, 0,
if ( NULL == fifo->queue) { NULL);
if (NULL == buf) {
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
fifo->queue = (volatile void**)(buf + 2*CACHE_LINE_SIZE);
/* buffer address in a receiver address space */
fifo->recv_queue = (volatile void**)((char*)fifo->queue - offset);
/* initialize the queue entries */ /* initialize the queue entries */
for (i = 0; i < fifo->size; i++) { for (i = 0; i < size; i++) {
fifo->queue[i] = OMPI_CB_FREE; fifo->queue[i] = OMPI_CB_FREE;
} }
/* allocate head control structure */ fifo->head = (ompi_cb_fifo_ctl_t*)buf;
len_to_allocate = sizeof(ompi_cb_fifo_ctl_t); /* head address in a receiver address space */
fifo->head = (ompi_cb_fifo_ctl_t*)memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); fifo->recv_head = (ompi_cb_fifo_ctl_t*)((char*)fifo->head - offset);
if ( NULL == fifo->head) { fifo->tail = (ompi_cb_fifo_ctl_t*)(buf + CACHE_LINE_SIZE);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* initialize the head structure */ /* initialize the head structure */
opal_atomic_unlock(&(fifo->head->lock)); opal_atomic_unlock(&(fifo->head->lock));
fifo->head->fifo_index=0; fifo->head->fifo_index=0;
fifo->head->num_to_clear=0; fifo->head->num_to_clear=0;
/* allocate tail control structure */
len_to_allocate = sizeof(ompi_cb_fifo_ctl_t);
fifo->tail = (ompi_cb_fifo_ctl_t*)memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL);
if ( NULL == fifo->tail) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* initialize the head structure */ /* initialize the head structure */
opal_atomic_unlock(&(fifo->tail->lock)); opal_atomic_unlock(&(fifo->tail->lock));
fifo->tail->fifo_index=0; fifo->tail->fifo_index=0;
fifo->tail->num_to_clear=0; fifo->tail->num_to_clear=0;
/* set memory locality indecies */ /* recalculate tail address in a receiver address space */
fifo->fifo_memory_locality_index=fifo_memory_locality_index; fifo->tail = (ompi_cb_fifo_ctl_t*)((char*)fifo->tail - offset);
fifo->head_memory_locality_index=head_memory_locality_index;
fifo->tail_memory_locality_index=tail_memory_locality_index;
/* return */ /* return */
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -288,7 +199,7 @@ static inline int ompi_cb_fifo_init_same_base_addr(int size_of_fifo,
* to allocate memory for this fifo. (IN) * to allocate memory for this fifo. (IN)
* *
*/ */
static inline int ompi_cb_fifo_free_same_base_addr( ompi_cb_fifo_t *fifo, static inline int ompi_cb_fifo_free(ompi_cb_fifo_t *fifo,
mca_mpool_base_module_t *memory_allocator) mca_mpool_base_module_t *memory_allocator)
{ {
char *ptr; char *ptr;
@ -300,28 +211,14 @@ static inline int ompi_cb_fifo_free_same_base_addr( ompi_cb_fifo_t *fifo,
/* free fifo array */ /* free fifo array */
if(OMPI_CB_NULL != fifo->head){ if(OMPI_CB_NULL != fifo->head){
ptr=(char *)(fifo->queue); ptr=(char *)(fifo->head);
memory_allocator->mpool_free(memory_allocator, ptr, NULL); memory_allocator->mpool_free(memory_allocator, ptr, NULL);
fifo->queue = (volatile void**)OMPI_CB_NULL; fifo->queue = (volatile void**)OMPI_CB_NULL;
} }
/* free head control structure */
if( OMPI_CB_NULL != fifo->head) {
ptr=(char *)(fifo->head);
memory_allocator->mpool_free(memory_allocator, ptr, NULL);
fifo->head = (ompi_cb_fifo_ctl_t*)OMPI_CB_NULL;
}
/* free tail control structure */
if( OMPI_CB_NULL != fifo->tail) {
ptr=(char *)(fifo->tail);
memory_allocator->mpool_free(memory_allocator, ptr, NULL);
fifo->tail = (ompi_cb_fifo_ctl_t*)OMPI_CB_NULL;
}
/* return */
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/** /**
* Write pointer to the specified slot * Write pointer to the specified slot
* *
@ -334,16 +231,16 @@ static inline int ompi_cb_fifo_free_same_base_addr( ompi_cb_fifo_t *fifo,
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
static inline int ompi_cb_fifo_write_to_slot_same_base_addr(int slot, void* data, static inline int ompi_cb_fifo_write_to_slot(int slot, void* data,
ompi_cb_fifo_t *fifo) ompi_cb_fifo_t *fifo)
{ {
volatile void **ptr; volatile void **ptr;
/* make sure that this slot is already reserved */ /* make sure that this slot is already reserved */
ptr=fifo->queue; ptr=fifo->queue;
if (ptr[slot] == OMPI_CB_RESERVED ) { if (ptr[slot] == OMPI_CB_RESERVED ) {
opal_atomic_wmb(); opal_atomic_rmb();
ptr[slot] = data; ptr[slot] = data;
opal_atomic_wmb();
return slot; return slot;
} }
@ -360,29 +257,27 @@ static inline int ompi_cb_fifo_write_to_slot_same_base_addr(int slot, void* data
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
static inline int ompi_cb_fifo_write_to_head_same_base_addr(void *data, ompi_cb_fifo_t *fifo) static inline int ompi_cb_fifo_write_to_head(void *data, ompi_cb_fifo_t *fifo)
{ {
volatile void **ptr; volatile void **ptr;
ompi_cb_fifo_ctl_t *h_ptr; ompi_cb_fifo_ctl_t *h_ptr;
int slot = OMPI_CB_ERROR; int index;
int old_fifo_index;
h_ptr=fifo->head; h_ptr=fifo->head;
ptr=fifo->queue; ptr=fifo->queue;
old_fifo_index = h_ptr->fifo_index; index = h_ptr->fifo_index;
/* make sure the head is pointing at a free element */ /* make sure the head is pointing at a free element */
if (ptr[old_fifo_index] == OMPI_CB_FREE) { if (ptr[index] == OMPI_CB_FREE) {
slot = old_fifo_index; opal_atomic_rmb();
ptr[index] = data;
opal_atomic_wmb(); opal_atomic_wmb();
ptr[slot] = data; h_ptr->fifo_index = (index + 1) & fifo->mask;
(h_ptr->fifo_index)++; return index;
/* wrap around */
(h_ptr->fifo_index) &= fifo->mask;
} }
/* return */ /* return */
return slot; return OMPI_CB_ERROR;
} }
@ -396,27 +291,9 @@ static inline int ompi_cb_fifo_write_to_head_same_base_addr(void *data, ompi_cb_
* @returncode OMPI_CB_ERROR failed to allocate index * @returncode OMPI_CB_ERROR failed to allocate index
* *
*/ */
static inline int ompi_cb_fifo_get_slot_same_base_addr(ompi_cb_fifo_t *fifo) static inline int ompi_cb_fifo_get_slot(ompi_cb_fifo_t *fifo)
{ {
volatile void **ptr; return ompi_cb_fifo_write_to_head(OMPI_CB_RESERVED, fifo);
ompi_cb_fifo_ctl_t *h_ptr;
int slot = OMPI_CB_ERROR;
int old_fifo_index;
h_ptr=fifo->head;
ptr=fifo->queue;
old_fifo_index = h_ptr->fifo_index;
/* try and reserve slot */
if ( OMPI_CB_FREE == ptr[old_fifo_index] ) {
slot = old_fifo_index;
ptr[old_fifo_index] = OMPI_CB_RESERVED;
(h_ptr->fifo_index)++;
(h_ptr->fifo_index) &= fifo->mask;
}
/* return */
return slot;
} }
/** /**
@ -434,49 +311,45 @@ static inline int ompi_cb_fifo_get_slot_same_base_addr(ompi_cb_fifo_t *fifo)
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
static inline void *ompi_cb_fifo_read_from_tail_same_base_addr( static inline void *ompi_cb_fifo_read_from_tail(
ompi_cb_fifo_t *fifo, ompi_cb_fifo_t *fifo,
bool flush_entries_read, bool *queue_empty) bool flush_entries_read, bool *queue_empty)
{ {
int old_fifo_index, clearIndex, i; int index, i;
volatile void **q_ptr; volatile void **q_ptr;
ompi_cb_fifo_ctl_t *h_ptr, *t_ptr; ompi_cb_fifo_ctl_t *t_ptr;
void *read_from_tail = (void *)OMPI_CB_ERROR; void *read_from_tail;
*queue_empty=false; *queue_empty=false;
h_ptr=fifo->head;
t_ptr=fifo->tail; t_ptr=fifo->tail;
q_ptr=fifo->queue; q_ptr=fifo->recv_queue;
old_fifo_index = t_ptr->fifo_index; index = t_ptr->fifo_index;
read_from_tail = (void *)q_ptr[index];
opal_atomic_rmb();
/* check to see that the data is valid */ /* check to see that the data is valid */
if ((q_ptr[old_fifo_index] == OMPI_CB_FREE) || if ((read_from_tail == OMPI_CB_FREE) ||
(q_ptr[old_fifo_index] == OMPI_CB_RESERVED)) { (read_from_tail == OMPI_CB_RESERVED)) {
read_from_tail=(void *)OMPI_CB_FREE; return (void*)OMPI_CB_FREE;
goto CLEANUP;
} }
/* set return data */ /* increment counter for later lazy free */
read_from_tail = (void *)q_ptr[old_fifo_index];
opal_atomic_rmb();
t_ptr->num_to_clear++; t_ptr->num_to_clear++;
/* increment counter for later lazy free */ t_ptr->fifo_index = (index + 1) & fifo->mask;
(t_ptr->fifo_index)++;
(t_ptr->fifo_index) &= fifo->mask;
/* check to see if time to do a lazy free of queue slots */ /* check to see if time to do a lazy free of queue slots */
if ( (t_ptr->num_to_clear == fifo->lazy_free_frequency) || if ( (t_ptr->num_to_clear == fifo->lazy_free_frequency) ||
flush_entries_read ) { flush_entries_read ) {
clearIndex = old_fifo_index - t_ptr->num_to_clear + 1; ompi_cb_fifo_ctl_t *h_ptr = fifo->recv_head;
clearIndex &= fifo->mask; index = (index - t_ptr->num_to_clear + 1) & fifo->mask;
for (i = 0; i < t_ptr->num_to_clear; i++) { for (i = 0; i < t_ptr->num_to_clear; i++) {
q_ptr[clearIndex] = OMPI_CB_FREE; q_ptr[index] = OMPI_CB_FREE;
clearIndex++; index = (index + 1) & fifo->mask;
clearIndex &= fifo->mask;
} }
opal_atomic_wmb();
t_ptr->num_to_clear = 0; t_ptr->num_to_clear = 0;
/* check to see if queue is empty */ /* check to see if queue is empty */
@ -486,7 +359,6 @@ static inline void *ompi_cb_fifo_read_from_tail_same_base_addr(
} }
} }
CLEANUP:
return read_from_tail; return read_from_tail;
} }

Просмотреть файл

@ -179,56 +179,57 @@
* extra queue information not needed by the ompi_cb_fifo routines. * extra queue information not needed by the ompi_cb_fifo routines.
*/ */
struct ompi_cb_fifo_wrapper_t { struct ompi_cb_fifo_wrapper_t {
/* pointer to ompi_cb_fifo_ctl_t structure in use */ /* pointer to ompi_cb_fifo_ctl_t structure in use */
ompi_cb_fifo_t cb_fifo; ompi_cb_fifo_t cb_fifo;
/* pointer to next ompi_cb_fifo_ctl_t structure. This is always /* pointer to next ompi_cb_fifo_ctl_t structure. This is always
stored as an absolute address. */ stored as an absolute address. */
volatile struct ompi_cb_fifo_wrapper_t *next_fifo_wrapper; struct ompi_cb_fifo_wrapper_t *next_fifo_wrapper;
/* flag indicating if cb_fifo has over flown - need this to force /* flag indicating if cb_fifo has over flown - need this to force
* release of entries already read */ * release of entries already read */
volatile bool cb_overflow; volatile bool cb_overflow;
}; };
typedef struct ompi_cb_fifo_wrapper_t ompi_cb_fifo_wrapper_t; typedef struct ompi_cb_fifo_wrapper_t ompi_cb_fifo_wrapper_t;
/* data structure used to describe the fifo */ /* data structure used to describe the fifo */
struct ompi_fifo_t { struct ompi_fifo_t {
/* pointer to head (write) ompi_cb_fifo_t structure. This is
always stored as an absolute address. */
volatile ompi_cb_fifo_wrapper_t *head;
/* pointer to tail (read) ompi_cb_fifo_t structure. This is
always stored as an absolute address. */
volatile ompi_cb_fifo_wrapper_t *tail;
/* locks for thread synchronization */
opal_atomic_lock_t head_lock;
opal_atomic_lock_t tail_lock;
/* locks for multi-process synchronization */ /* locks for multi-process synchronization */
opal_atomic_lock_t fifo_lock; opal_atomic_lock_t fifo_lock;
/* locks for thread synchronization */
opal_atomic_lock_t *head_lock;
/* locks for thread synchronization */
opal_atomic_lock_t *tail_lock;
/* size of fifo */
int size;
/* fifo memory locality index */
int fifo_memory_locality_index;
/* head memory locality index */
int head_memory_locality_index;
/* tail memory locality index */
int tail_memory_locality_index;
/* offset between sender and receiver shared mapping */
ptrdiff_t offset;
/* pointer to head (write) ompi_cb_fifo_t structure. This is
always stored as an sender size address. */
ompi_cb_fifo_wrapper_t *head;
/* pointer to tail (read) ompi_cb_fifo_t structure. This is
always stored as an receiver size address. */
ompi_cb_fifo_wrapper_t *tail;
}; };
typedef struct ompi_fifo_t ompi_fifo_t; typedef struct ompi_fifo_t ompi_fifo_t;
/*
* structure used to track which circular buffer slot to write to
*/
struct cb_slot_t {
/* pointer to circular buffer fifo structures */
ompi_cb_fifo_t *cb;
/* index in circular buffer */
int index;
};
typedef struct cb_slot_t cb_slot_t;
/** /**
* Initialize a fifo * Initialize a fifo
* *
@ -254,27 +255,34 @@ typedef struct cb_slot_t cb_slot_t;
* @returncode Error code * @returncode Error code
* *
*/ */
static inline int ompi_fifo_init_same_base_addr(int size_of_cb_fifo, static inline int ompi_fifo_init(int size_of_cb_fifo,
int lazy_free_freq, int fifo_memory_locality_index, int lazy_free_freq, int fifo_memory_locality_index,
int head_memory_locality_index, int tail_memory_locality_index, int head_memory_locality_index, int tail_memory_locality_index,
ompi_fifo_t *fifo, mca_mpool_base_module_t *memory_allocator) ompi_fifo_t *fifo, ptrdiff_t offset,
mca_mpool_base_module_t *memory_allocator)
{ {
int error_code=OMPI_SUCCESS; int error_code;
size_t len_to_allocate;
/* allocate head ompi_cb_fifo_t structure */ fifo->offset = offset;
len_to_allocate=sizeof(ompi_cb_fifo_wrapper_t); fifo->size = size_of_cb_fifo;
fifo->head = (ompi_cb_fifo_wrapper_t*)memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); fifo->fifo_memory_locality_index = fifo_memory_locality_index;
fifo->head_memory_locality_index = head_memory_locality_index;
fifo->tail_memory_locality_index = tail_memory_locality_index;
/* allocate head ompi_cb_fifo_t structure and place for head and tail locks
* on different cache lines */
fifo->head = (ompi_cb_fifo_wrapper_t*)memory_allocator->mpool_alloc(
memory_allocator, sizeof(ompi_cb_fifo_wrapper_t), CACHE_LINE_SIZE,
0, NULL);
if(NULL == fifo->head) { if(NULL == fifo->head) {
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
/* initialize the circular buffer fifo head structure */ /* initialize the circular buffer fifo head structure */
error_code=ompi_cb_fifo_init_same_base_addr(size_of_cb_fifo, error_code=ompi_cb_fifo_init(size_of_cb_fifo,
lazy_free_freq, fifo_memory_locality_index, lazy_free_freq, fifo_memory_locality_index,
head_memory_locality_index, tail_memory_locality_index, head_memory_locality_index, tail_memory_locality_index,
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo), &(fifo->head->cb_fifo), offset, memory_allocator);
memory_allocator);
if ( OMPI_SUCCESS != error_code ) { if ( OMPI_SUCCESS != error_code ) {
return error_code; return error_code;
} }
@ -285,7 +293,7 @@ static inline int ompi_fifo_init_same_base_addr(int size_of_cb_fifo,
fifo->head->cb_overflow=false; /* no attempt to overflow the queue */ fifo->head->cb_overflow=false; /* no attempt to overflow the queue */
/* set the tail */ /* set the tail */
fifo->tail=fifo->head; fifo->tail = (ompi_cb_fifo_wrapper_t*)((char*)fifo->head - offset);
/* return */ /* return */
return error_code; return error_code;
@ -301,16 +309,14 @@ static inline int ompi_fifo_init_same_base_addr(int size_of_cb_fifo,
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
static inline int ompi_fifo_write_to_head_same_base_addr(void *data, static inline int ompi_fifo_write_to_head(void *data,
ompi_fifo_t *fifo, mca_mpool_base_module_t *fifo_allocator) ompi_fifo_t *fifo, mca_mpool_base_module_t *fifo_allocator)
{ {
int error_code; int error_code;
size_t len_to_allocate;
ompi_cb_fifo_wrapper_t *next_ff; ompi_cb_fifo_wrapper_t *next_ff;
/* attempt to write data to head ompi_fifo_cb_fifo_t */ /* attempt to write data to head ompi_fifo_cb_fifo_t */
error_code=ompi_cb_fifo_write_to_head_same_base_addr(data, error_code = ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo);
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo));
/* If the queue is full, create a new circular buffer and put the /* If the queue is full, create a new circular buffer and put the
data in it. */ data in it. */
@ -324,7 +330,7 @@ static inline int ompi_fifo_write_to_head_same_base_addr(void *data,
likely that we can get rid of this lock altogther, but it likely that we can get rid of this lock altogther, but it
will take some refactoring to make the data updates will take some refactoring to make the data updates
safe. */ safe. */
opal_atomic_lock(&(fifo->fifo_lock)); opal_atomic_lock(&fifo->fifo_lock);
/* mark queue as overflown */ /* mark queue as overflown */
fifo->head->cb_overflow = true; fifo->head->cb_overflow = true;
@ -332,8 +338,7 @@ static inline int ompi_fifo_write_to_head_same_base_addr(void *data,
/* We retry to write to the old head before creating new one just in /* We retry to write to the old head before creating new one just in
* case consumer read all entries after first attempt failed, but * case consumer read all entries after first attempt failed, but
* before we set cb_overflow to true */ * before we set cb_overflow to true */
error_code=ompi_cb_fifo_write_to_head_same_base_addr(data, error_code=ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo);
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo));
if(error_code != OMPI_CB_ERROR) { if(error_code != OMPI_CB_ERROR) {
fifo->head->cb_overflow = false; fifo->head->cb_overflow = false;
@ -343,46 +348,44 @@ static inline int ompi_fifo_write_to_head_same_base_addr(void *data,
/* see if next queue is available - while the next queue /* see if next queue is available - while the next queue
* has not been emptied, it will be marked as overflowen*/ * has not been emptied, it will be marked as overflowen*/
next_ff=(ompi_cb_fifo_wrapper_t *)fifo->head->next_fifo_wrapper; next_ff = fifo->head->next_fifo_wrapper;
/* if next queue not available, allocate new queue */ /* if next queue not available, allocate new queue */
if (next_ff->cb_overflow) { if (next_ff->cb_overflow) {
/* allocate head ompi_cb_fifo_t structure */ /* allocate head ompi_cb_fifo_t structure */
len_to_allocate=sizeof(ompi_cb_fifo_wrapper_t); next_ff = (ompi_cb_fifo_wrapper_t*)fifo_allocator->mpool_alloc(
next_ff = (ompi_cb_fifo_wrapper_t*)fifo_allocator->mpool_alloc fifo_allocator, sizeof(ompi_cb_fifo_wrapper_t),
(fifo_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); CACHE_LINE_SIZE, 0, NULL);
if (NULL == next_ff) { if (NULL == next_ff) {
opal_atomic_unlock(&(fifo->fifo_lock)); opal_atomic_unlock(&fifo->fifo_lock);
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
/* initialize the circular buffer fifo head structure */ /* initialize the circular buffer fifo head structure */
error_code=ompi_cb_fifo_init_same_base_addr( error_code = ompi_cb_fifo_init(fifo->size,
fifo->head->cb_fifo.size,
fifo->head->cb_fifo.lazy_free_frequency, fifo->head->cb_fifo.lazy_free_frequency,
fifo->head->cb_fifo.fifo_memory_locality_index, fifo->fifo_memory_locality_index,
fifo->head->cb_fifo.head_memory_locality_index, fifo->head_memory_locality_index,
fifo->head->cb_fifo.tail_memory_locality_index, fifo->tail_memory_locality_index,
&(next_ff->cb_fifo), &(next_ff->cb_fifo), fifo->offset, fifo_allocator);
fifo_allocator);
if (OMPI_SUCCESS != error_code) { if (OMPI_SUCCESS != error_code) {
opal_atomic_unlock(&(fifo->fifo_lock)); opal_atomic_unlock(&fifo->fifo_lock);
return error_code; return error_code;
} }
/* finish new element initialization */ /* finish new element initialization */
next_ff->next_fifo_wrapper=fifo->head->next_fifo_wrapper; /* only one element in the link list */ /* only one element in the link list */
next_ff->next_fifo_wrapper = fifo->head->next_fifo_wrapper;
next_ff->cb_overflow = false; /* no attempt to overflow the queue */ next_ff->cb_overflow = false; /* no attempt to overflow the queue */
fifo->head->next_fifo_wrapper = next_ff; fifo->head->next_fifo_wrapper = next_ff;
} }
/* reset head pointer */ /* reset head pointer */
fifo->head = next_ff; fifo->head = next_ff;
opal_atomic_unlock(&(fifo->fifo_lock)); opal_atomic_unlock(&fifo->fifo_lock);
/* write data to new head structure */ /* write data to new head structure */
error_code=ompi_cb_fifo_write_to_head_same_base_addr(data, error_code=ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo);
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo));
if( OMPI_CB_ERROR == error_code ) { if( OMPI_CB_ERROR == error_code ) {
return OMPI_ERROR; return OMPI_ERROR;
} }
@ -401,19 +404,15 @@ static inline int ompi_fifo_write_to_head_same_base_addr(void *data,
* *
*/ */
static inline static inline
void *ompi_fifo_read_from_tail_same_base_addr( ompi_fifo_t *fifo) void *ompi_fifo_read_from_tail(ompi_fifo_t *fifo)
{ {
/* local parameters */ /* local parameters */
void *return_value; void *return_value;
bool queue_empty, flush_entries_read; bool queue_empty;
ompi_cb_fifo_t *cb_fifo;
/* get next element */ /* get next element */
cb_fifo=(ompi_cb_fifo_t *)&(fifo->tail->cb_fifo); return_value = ompi_cb_fifo_read_from_tail(&fifo->tail->cb_fifo,
flush_entries_read=fifo->tail->cb_overflow; fifo->tail->cb_overflow, &queue_empty);
return_value = ompi_cb_fifo_read_from_tail_same_base_addr( cb_fifo,
flush_entries_read,
&queue_empty);
/* check to see if need to move on to next cb_fifo in the link list */ /* check to see if need to move on to next cb_fifo in the link list */
if(queue_empty) { if(queue_empty) {
@ -423,7 +422,8 @@ void *ompi_fifo_read_from_tail_same_base_addr( ompi_fifo_t *fifo)
opal_atomic_lock(&(fifo->fifo_lock)); opal_atomic_lock(&(fifo->fifo_lock));
if(fifo->tail->cb_overflow == true) { if(fifo->tail->cb_overflow == true) {
fifo->tail->cb_overflow = false; fifo->tail->cb_overflow = false;
fifo->tail=fifo->tail->next_fifo_wrapper; fifo->tail = (ompi_cb_fifo_wrapper_t*)
((char*)fifo->tail->next_fifo_wrapper - fifo->offset);
} }
opal_atomic_unlock(&(fifo->fifo_lock)); opal_atomic_unlock(&(fifo->fifo_lock));
} }
@ -431,48 +431,4 @@ void *ompi_fifo_read_from_tail_same_base_addr( ompi_fifo_t *fifo)
return return_value; return return_value;
} }
/**
* Try to read pointer from the tail of the queue, and the base
* pointer is different so we must convert.
*
* @param fifo Pointer to data structure defining this fifo (IN)
*
* @param offset Offset relative to base of the memory segement (IN)
*
* @returncode Pointer - OMPI_CB_FREE indicates no data to read
*
*/
static inline void *ompi_fifo_read_from_tail(ompi_fifo_t *fifo,
ptrdiff_t offset)
{
/* local parameters */
void *return_value;
bool queue_empty;
volatile ompi_cb_fifo_wrapper_t *t_ptr;
t_ptr = (volatile ompi_cb_fifo_wrapper_t *)
(((char*) fifo->tail) + offset);
/* get next element */
return_value=ompi_cb_fifo_read_from_tail(
(ompi_cb_fifo_t *)&(t_ptr->cb_fifo),
t_ptr->cb_overflow, &queue_empty, offset);
/* check to see if need to move on to next cb_fifo in the link list */
if( queue_empty ) {
/* queue_emptied - move on to next element in fifo */
/* See the big comment at the top of this file about this
lock. */
opal_atomic_lock(&(fifo->fifo_lock));
if(t_ptr->cb_overflow == true) {
t_ptr->cb_overflow = false;
fifo->tail = t_ptr->next_fifo_wrapper;
}
opal_atomic_unlock(&(fifo->fifo_lock));
}
/* return */
return return_value;
}
#endif /* !_OMPI_FIFO */ #endif /* !_OMPI_FIFO */

Просмотреть файл

@ -488,10 +488,27 @@ int mca_btl_sm_add_procs_same_base_addr(
} }
for( j=0 ; j < n_to_allocate ; j++ ) { for( j=0 ; j < n_to_allocate ; j++ ) {
char *buf;
my_fifos[j].head = (ompi_cb_fifo_wrapper_t*)OMPI_CB_FREE; my_fifos[j].head = (ompi_cb_fifo_wrapper_t*)OMPI_CB_FREE;
my_fifos[j].tail = (ompi_cb_fifo_wrapper_t*)OMPI_CB_FREE; my_fifos[j].tail = (ompi_cb_fifo_wrapper_t*)OMPI_CB_FREE;
opal_atomic_unlock(&(my_fifos[j].head_lock)); if(opal_using_threads()) {
opal_atomic_unlock(&(my_fifos[j].tail_lock)); /* allocate head and tail locks on different cache lines */
buf = (char*)mca_btl_sm_component.sm_mpool->mpool_alloc(
mca_btl_sm_component.sm_mpool,
CACHE_LINE_SIZE*2, CACHE_LINE_SIZE, 0, NULL);
if(NULL == buf) {
return_code = OMPI_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
my_fifos[j].head_lock = (opal_atomic_lock_t*)buf;
my_fifos[j].tail_lock = (opal_atomic_lock_t*)(buf +
CACHE_LINE_SIZE);
opal_atomic_init(my_fifos[j].head_lock, OPAL_ATOMIC_UNLOCKED);
opal_atomic_init(my_fifos[j].tail_lock, OPAL_ATOMIC_UNLOCKED);
} else {
my_fifos[j].head_lock = NULL;
my_fifos[j].tail_lock = NULL;
}
} }
fifo_tmp=(ompi_fifo_t * volatile *) fifo_tmp=(ompi_fifo_t * volatile *)
( (char *)(mca_btl_sm_component.sm_ctl_header->fifo) + ( (char *)(mca_btl_sm_component.sm_ctl_header->fifo) +

Просмотреть файл

@ -335,7 +335,6 @@ void mca_btl_sm_component_event_thread(opal_object_t* thread)
} }
#endif #endif
int mca_btl_sm_component_progress(void) int mca_btl_sm_component_progress(void)
{ {
/* local variables */ /* local variables */
@ -361,7 +360,7 @@ int mca_btl_sm_component_progress(void)
; proc++ ) ; proc++ )
{ {
peer_smp_rank= mca_btl_sm_component.list_smp_procs_same_base_addr[proc]; peer_smp_rank= mca_btl_sm_component.list_smp_procs_same_base_addr[proc];
fifo=&(mca_btl_sm_component.fifo[peer_smp_rank][my_smp_rank]); fifo=&(mca_btl_sm_component.fifo[my_smp_rank][peer_smp_rank]);
/* if fifo is not yet setup - continue - not data has been sent*/ /* if fifo is not yet setup - continue - not data has been sent*/
if(OMPI_CB_FREE == fifo->tail){ if(OMPI_CB_FREE == fifo->tail){
@ -370,7 +369,7 @@ int mca_btl_sm_component_progress(void)
/* aquire thread lock */ /* aquire thread lock */
if( opal_using_threads() ) { if( opal_using_threads() ) {
opal_atomic_lock( &(fifo->tail_lock) ); opal_atomic_lock(fifo->tail_lock);
} }
/* get pointer - pass in offset to change queue pointer /* get pointer - pass in offset to change queue pointer
@ -378,12 +377,11 @@ int mca_btl_sm_component_progress(void)
* that we have the same base address as the sender, so no * that we have the same base address as the sender, so no
* translation is necessary when accessing the fifo. Hence, * translation is necessary when accessing the fifo. Hence,
* we use the _same_base_addr varient. */ * we use the _same_base_addr varient. */
hdr = (mca_btl_sm_hdr_t *) hdr = (mca_btl_sm_hdr_t *)ompi_fifo_read_from_tail(fifo);
ompi_fifo_read_from_tail_same_base_addr( fifo );
/* release thread lock */ /* release thread lock */
if( opal_using_threads() ) { if( opal_using_threads() ) {
opal_atomic_unlock(&(fifo->tail_lock)); opal_atomic_unlock(fifo->tail_lock);
} }
if( OMPI_CB_FREE == hdr ) { if( OMPI_CB_FREE == hdr ) {
@ -445,7 +443,7 @@ int mca_btl_sm_component_progress(void)
; proc++ ) ; proc++ )
{ {
peer_smp_rank= mca_btl_sm_component.list_smp_procs_different_base_addr[proc]; peer_smp_rank= mca_btl_sm_component.list_smp_procs_different_base_addr[proc];
fifo=&(mca_btl_sm_component.fifo[peer_smp_rank][my_smp_rank]); fifo=&(mca_btl_sm_component.fifo[my_smp_rank][peer_smp_rank]);
/* if fifo is not yet setup - continue - not data has been sent*/ /* if fifo is not yet setup - continue - not data has been sent*/
if(OMPI_CB_FREE == fifo->tail){ if(OMPI_CB_FREE == fifo->tail){
@ -454,7 +452,7 @@ int mca_btl_sm_component_progress(void)
/* aquire thread lock */ /* aquire thread lock */
if( opal_using_threads() ) { if( opal_using_threads() ) {
opal_atomic_lock(&(fifo->tail_lock)); opal_atomic_lock(fifo->tail_lock);
} }
/* get pointer - pass in offset to change queue pointer /* get pointer - pass in offset to change queue pointer
@ -463,19 +461,19 @@ int mca_btl_sm_component_progress(void)
* translate every access into the fifo to be relevant to our * translate every access into the fifo to be relevant to our
* memory space. Hence, we do *not* use the _same_base_addr * memory space. Hence, we do *not* use the _same_base_addr
* variant. */ * variant. */
hdr=(mca_btl_sm_hdr_t *)ompi_fifo_read_from_tail( fifo, hdr = (mca_btl_sm_hdr_t *)ompi_fifo_read_from_tail( fifo );
mca_btl_sm_component.sm_offset[peer_smp_rank]);
if( OMPI_CB_FREE == hdr ) { if( OMPI_CB_FREE == hdr ) {
/* release thread lock */ /* release thread lock */
if( opal_using_threads() ) { if( opal_using_threads() ) {
opal_atomic_unlock(&(fifo->tail_lock)); opal_atomic_unlock(fifo->tail_lock);
} }
continue; continue;
} }
/* release thread lock */ /* release thread lock */
if( opal_using_threads() ) { if( opal_using_threads() ) {
opal_atomic_unlock(&(fifo->tail_lock)); opal_atomic_unlock(fifo->tail_lock);
} }
/* dispatch fragment by type */ /* dispatch fragment by type */

Просмотреть файл

@ -7,35 +7,36 @@
#define MCA_BTL_SM_FIFO_WRITE(endpoint_peer, my_smp_rank,peer_smp_rank,hdr,rc) \ #define MCA_BTL_SM_FIFO_WRITE(endpoint_peer, my_smp_rank,peer_smp_rank,hdr,rc) \
do { \ do { \
ompi_fifo_t* fifo; \ ompi_fifo_t* fifo; \
fifo=&(mca_btl_sm_component.fifo[my_smp_rank][peer_smp_rank]); \ fifo=&(mca_btl_sm_component.fifo[peer_smp_rank][my_smp_rank]); \
\ \
/* thread lock */ \ /* thread lock */ \
if(opal_using_threads()) \ if(opal_using_threads()) \
opal_atomic_lock(&fifo->head_lock); \ opal_atomic_lock(fifo->head_lock); \
if(OMPI_CB_FREE == fifo->head) { \ if(OMPI_CB_FREE == fifo->head) { \
/* no queues have been allocated - allocate now */ \ /* no queues have been allocated - allocate now */ \
rc=ompi_fifo_init_same_base_addr( \ rc=ompi_fifo_init( \
(int)mca_btl_sm_component.size_of_cb_queue, \ (int)mca_btl_sm_component.size_of_cb_queue, \
(int)mca_btl_sm_component.cb_lazy_free_freq, \ (int)mca_btl_sm_component.cb_lazy_free_freq, \
/* at this stage we are not doing anything with memory \ /* at this stage we are not doing anything with memory \
* locality */ \ * locality */ \
0,0,0, \ 0,0,0, \
fifo, mca_btl_sm_component.sm_mpool); \ fifo, mca_btl_sm_component.sm_offset[peer_smp_rank], \
mca_btl_sm_component.sm_mpool); \
if( rc != OMPI_SUCCESS ) { \ if( rc != OMPI_SUCCESS ) { \
if(opal_using_threads()) \ if(opal_using_threads()) \
opal_atomic_unlock(&(fifo->head_lock)); \ opal_atomic_unlock(fifo->head_lock); \
break; \ break; \
} \ } \
} \ } \
\ \
/* post fragment */ \ /* post fragment */ \
while(ompi_fifo_write_to_head_same_base_addr(hdr, fifo, \ while(ompi_fifo_write_to_head(hdr, fifo, \
mca_btl_sm_component.sm_mpool) != OMPI_SUCCESS) \ mca_btl_sm_component.sm_mpool) != OMPI_SUCCESS) \
opal_progress(); \ opal_progress(); \
MCA_BTL_SM_SIGNAL_PEER(endpoint_peer); \ MCA_BTL_SM_SIGNAL_PEER(endpoint_peer); \
rc=OMPI_SUCCESS; \ rc=OMPI_SUCCESS; \
if(opal_using_threads()) \ if(opal_using_threads()) \
opal_atomic_unlock(&fifo->head_lock); \ opal_atomic_unlock(fifo->head_lock); \
} while(0) } while(0)