1
1

fix numerous bugs - code now compiles.

This commit was SVN r2129.
Этот коммит содержится в:
Rich Graham 2004-08-13 19:49:13 +00:00
родитель 0fbf919658
Коммит 0ac3e819ac

Просмотреть файл

@ -5,7 +5,10 @@
#ifndef _OMPI_CIRCULAR_BUFFER_FIFO #ifndef _OMPI_CIRCULAR_BUFFER_FIFO
#define _OMPI_CIRCULAR_BUFFER_FIFO #define _OMPI_CIRCULAR_BUFFER_FIFO
#include "include/constants.h"
#include "include/sys/cache.h" #include "include/sys/cache.h"
#include "os/atomic.h"
#include "mca/mpool/mpool.h"
/** @file /** @file
@ -49,6 +52,9 @@ struct ompi_cb_fifo_t {
/* size of fifo */ /* size of fifo */
int size; int size;
/* frequency of lazy free */
int lazy_free_frequency;
/* head of queue - where next entry will be written */ /* head of queue - where next entry will be written */
ompi_cb_fifo_ctl_t *head; ompi_cb_fifo_ctl_t *head;
@ -61,7 +67,9 @@ struct ompi_cb_fifo_t {
/* circular buffer array */ /* circular buffer array */
void **queue; void **queue;
} };
typedef struct ompi_cb_fifo_t ompi_cb_fifo_t;
/** /**
* Initialize a fifo * Initialize a fifo
@ -88,10 +96,10 @@ struct ompi_cb_fifo_t {
* @returncode Error code * @returncode Error code
* *
*/ */
int ompi_cb_fifo_init(int size_of_fifo, int fifo_memory_locality_index, static inline int ompi_cb_fifo_init(int size_of_fifo, int lazy_free_freq,
int head_memory_locality_index, int tail_memory_locality_index, int fifo_memory_locality_index, int head_memory_locality_index,
ompi_cb_fifo_ctl_t *fifo, mca_mpool_base_module_t int tail_memory_locality_index, ompi_cb_fifo_t *fifo,
*memory_allocator) mca_mpool_base_module_t *memory_allocator)
{ {
int tmp_size, errorCode = OMPI_SUCCESS; int tmp_size, errorCode = OMPI_SUCCESS;
@ -111,45 +119,52 @@ int ompi_cb_fifo_init(int size_of_fifo, int fifo_memory_locality_index,
/* set fifo size */ /* set fifo size */
fifo->size = size_of_fifo; fifo->size = size_of_fifo;
/* set lazy free frequence */
if( ( 0 >= lazy_free_freq ) ||
( lazy_free_freq > fifo->size) ) {
return OMPI_ERROR;
}
fifo->lazy_free_frequency=lazy_free_freq;
/* this will be used to mask off the higher order bits, /* this will be used to mask off the higher order bits,
* and use the & operator for the wrap-around */ * and use the & operator for the wrap-around */
mask = (size_of_fifo - 1); fifo->mask = (size_of_fifo - 1);
/* allocate fifo array */ /* allocate fifo array */
len_to_allocate = sizeof(void *) * size_of_fifo; len_to_allocate = sizeof(void *) * size_of_fifo;
fifo->queue=memory_allocator->mpool_alloc(len_to_allocate,CACHE_LINE_SIZE); fifo->queue=memory_allocator->mpool_alloc(len_to_allocate,CACHE_LINE_SIZE);
if ( NULL == fifo->queue) { if ( NULL == fifo->queue) {
return ULM_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
/* initialize the queue entries */ /* initialize the queue entries */
for (i = 0; i < size_of_fifo; i++) { for (i = 0; i < size_of_fifo; i++) {
queue[i] = OMPI_CB_FREE; fifo->queue[i] = OMPI_CB_FREE;
} }
/* allocate head control structure */ /* allocate head control structure */
len_to_allocate = sizeof(ompi_cb_fifo_ctl_t); len_to_allocate = sizeof(ompi_cb_fifo_ctl_t);
fifo->head=memory_allocator->mpool_alloc(len_to_allocate,CACHE_LINE_SIZE); fifo->head=memory_allocator->mpool_alloc(len_to_allocate,CACHE_LINE_SIZE);
if ( NULL == fifo->head) { if ( NULL == fifo->head) {
return ULM_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
/* initialize the head structure */ /* initialize the head structure */
spinunlock(&(fifo->head.lock)); spinunlock(&(fifo->head->lock));
fifo->head.fifo_index=0; fifo->head->fifo_index=0;
fifo->head.num_to_clear=0; fifo->head->num_to_clear=0;
/* allocate tail control structure */ /* allocate tail control structure */
len_to_allocate = sizeof(ompi_cb_fifo_ctl_t); len_to_allocate = sizeof(ompi_cb_fifo_ctl_t);
fifo->tail=memory_allocator->mpool_alloc(len_to_allocate,CACHE_LINE_SIZE); fifo->tail=memory_allocator->mpool_alloc(len_to_allocate,CACHE_LINE_SIZE);
if ( NULL == fifo->tail) { if ( NULL == fifo->tail) {
return ULM_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
/* initialize the head structure */ /* initialize the head structure */
spinunlock(&(fifo->tail.lock)); spinunlock(&(fifo->tail->lock));
fifo->tail.fifo_index=0; fifo->tail->fifo_index=0;
fifo->tail.num_to_clear=0; fifo->tail->num_to_clear=0;
/* return */ /* return */
return errorCode; return errorCode;
@ -168,7 +183,7 @@ int ompi_cb_fifo_init(int size_of_fifo, int fifo_memory_locality_index,
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
int ompi_cb_fifo_write_to_slot(int slot, void* data, ompi_cb_fifo_ctl_t *fifo) { static inline int ompi_cb_fifo_write_to_slot(int slot, void* data, ompi_cb_fifo_t *fifo) {
int wrote_to_slot = OMPI_CB_ERROR; int wrote_to_slot = OMPI_CB_ERROR;
/* make sure that this slot is already reserved */ /* make sure that this slot is already reserved */
if (fifo->queue[slot] == OMPI_CB_RESERVED ) { if (fifo->queue[slot] == OMPI_CB_RESERVED ) {
@ -189,19 +204,19 @@ int ompi_cb_fifo_write_to_slot(int slot, void* data, ompi_cb_fifo_ctl_t *fifo) {
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
int ompi_cb_fifo_write_to_head(void *data, ompi_cb_fifo_ctl_t *fifo) { static inline int ompi_cb_fifo_write_to_head(void *data, ompi_cb_fifo_t *fifo) {
int slot = OMPI_CB_ERROR, index; int slot = OMPI_CB_ERROR, index;
spinlock(&(fifo->head.lock)); spinlock(&(fifo->head->lock));
index = fifo->head->fifo_index; index = fifo->head->fifo_index;
/* make sure the head is pointing at a free element - avoid wrap /* make sure the head is pointing at a free element - avoid wrap
* around */ * around */
if (queue[index] == OMPI_CB_FREE) { if (fifo->queue[index] == OMPI_CB_FREE) {
slot = index; slot = index;
fifo->queue[slot] = data; fifo->queue[slot] = data;
(fifo->head->fifo_index)++; (fifo->head->fifo_index)++;
(fifo->head->fifo_index) &= fifo->mask; (fifo->head->fifo_index) &= fifo->mask;
} }
spinunlock(&(fifo->head.lock)); spinunlock(&(fifo->head->lock));
/* return */ /* return */
return slot; return slot;
@ -220,13 +235,13 @@ int ompi_cb_fifo_write_to_head(void *data, ompi_cb_fifo_ctl_t *fifo) {
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
int ompi_cb_fifo_write_to_head_no_lock(void *data, ompi_cb_fifo_ctl_t *fifo) static inline int ompi_cb_fifo_write_to_head_no_lock(void *data, ompi_cb_fifo_t *fifo)
{ {
int slot = OMPI_CB_ERROR, index; int slot = OMPI_CB_ERROR, index;
index = fifo->head->fifo_index; index = fifo->head->fifo_index;
/* make sure the head is pointing at a free element - avoid wrap /* make sure the head is pointing at a free element - avoid wrap
* around */ * around */
if (queue[index] == OMPI_CB_FREE) { if (fifo->queue[index] == OMPI_CB_FREE) {
slot = index; slot = index;
fifo->queue[slot] = data; fifo->queue[slot] = data;
(fifo->head->fifo_index)++; (fifo->head->fifo_index)++;
@ -247,10 +262,10 @@ int ompi_cb_fifo_write_to_head_no_lock(void *data, ompi_cb_fifo_ctl_t *fifo)
* @returncode OMPI_CB_ERROR failed to allocate index * @returncode OMPI_CB_ERROR failed to allocate index
* *
*/ */
int ompi_cb_fifo_get_slot(ompi_cb_fifo_ctl_t *fifo) { static inline int ompi_cb_fifo_get_slot(ompi_cb_fifo_t *fifo) {
int return_value = OMPI_CB_ERROR,index; int return_value = OMPI_CB_ERROR,index;
spinlock(&(fifo->head.lock)); spinlock(&(fifo->head->lock));
index = fifo->head->fifo_index; index = fifo->head->fifo_index;
/* try and reserve slot */ /* try and reserve slot */
if (fifo->queue[index] == OMPI_CB_FREE) { if (fifo->queue[index] == OMPI_CB_FREE) {
@ -259,7 +274,7 @@ int ompi_cb_fifo_get_slot(ompi_cb_fifo_ctl_t *fifo) {
(fifo->head->fifo_index)++; (fifo->head->fifo_index)++;
(fifo->head->fifo_index) &= fifo->mask; (fifo->head->fifo_index) &= fifo->mask;
} }
spinunlock(&(fifo->head.lock)); spinunlock(&(fifo->head->lock));
/* return */ /* return */
return return_value; return return_value;
@ -275,7 +290,7 @@ int ompi_cb_fifo_get_slot(ompi_cb_fifo_ctl_t *fifo) {
* @returncode OMPI_CB_ERROR failed to allocate index * @returncode OMPI_CB_ERROR failed to allocate index
* *
*/ */
int ompi_cb_fifo_get_slot_no_lock(ompi_cb_fifo_ctl_t *fifo) static inline int ompi_cb_fifo_get_slot_no_lock(ompi_cb_fifo_t *fifo)
{ {
int return_value = OMPI_CB_ERROR,index; int return_value = OMPI_CB_ERROR,index;
index = fifo->head->fifo_index; index = fifo->head->fifo_index;
@ -301,7 +316,7 @@ int ompi_cb_fifo_get_slot_no_lock(ompi_cb_fifo_ctl_t *fifo)
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
int ompi_cb_fifo_read_from_tail(void** data, ompi_cb_fifo_ctl_t *fifo) static inline int ompi_cb_fifo_read_from_tail(void** data, ompi_cb_fifo_t *fifo)
{ {
int read_from_tail = OMPI_CB_ERROR,index = 0,clearIndex, i; int read_from_tail = OMPI_CB_ERROR,index = 0,clearIndex, i;
@ -313,32 +328,32 @@ int ompi_cb_fifo_read_from_tail(void** data, ompi_cb_fifo_ctl_t *fifo)
} }
/* lock tail - for thread safety */ /* lock tail - for thread safety */
spinlock(&(fifo->tail.lock)); spinlock(&(fifo->tail->lock));
/* make sure that there is still data to read - other thread /* make sure that there is still data to read - other thread
* may have gotten here first */ * may have gotten here first */
if ( (fifo->queue[fifo->tail->fifo_index] == OMPI_CB_FREE ) || if ( (fifo->queue[fifo->tail->fifo_index] == OMPI_CB_FREE ) ||
(fifo->queue[fifo->tail->fifo_index] == OMPI_CB_RESERVED ) ) { (fifo->queue[fifo->tail->fifo_index] == OMPI_CB_RESERVED ) ) {
spinunlock(&(fifo->tail.lock); spinunlock(&(fifo->tail->lock));
return OMPI_CB_ERROR; return OMPI_CB_ERROR;
} }
/* set return data */ /* set return data */
index = fifo->tail->fifo_index; index = fifo->tail->fifo_index;
*data = fifo->queue[index]; *data = fifo->queue[index];
fifo->tail->numToClear++; fifo->tail->num_to_clear++;
/* check to see if time to do a lazy free of queue slots */ /* check to see if time to do a lazy free of queue slots */
if (tail->numToClear == ompi_lazy_free_frequency) { if (fifo->tail->num_to_clear == ompi_lazy_free_frequency) {
clearIndex = index - ompi_lazy_free_frequency + 1; clearIndex = index - ompi_lazy_free_frequency + 1;
clearIndex &= fifo->mask; clearIndex &= fifo->mask;
for (i = 0; i < ompi_lazy_free_frequency; i++) { for (i = 0; i < ompi_lazy_free_frequency; i++) {
fifo->queue[clearIndex] = fifo->slot_markded_free; fifo->queue[clearIndex] = OMPI_CB_FREE;
clearIndex++; clearIndex++;
clearIndex &= mask; clearIndex &= fifo->mask;
} }
fifo->tail->numToClear = 0; fifo->tail->num_to_clear = 0;
} }
/* increment counter for later lazy free */ /* increment counter for later lazy free */
@ -347,7 +362,7 @@ int ompi_cb_fifo_read_from_tail(void** data, ompi_cb_fifo_ctl_t *fifo)
(fifo->tail->fifo_index) &= fifo->mask; (fifo->tail->fifo_index) &= fifo->mask;
/* unlock tail */ /* unlock tail */
spinunlock(&(fifo->tail.lock)); spinunlock(&(fifo->tail->lock));
/* return */ /* return */
return read_from_tail; return read_from_tail;
@ -365,7 +380,7 @@ int ompi_cb_fifo_read_from_tail(void** data, ompi_cb_fifo_ctl_t *fifo)
* @returncode Slot index to which data is written * @returncode Slot index to which data is written
* *
*/ */
int ompi_cb_fifo_read_from_tail_no_lock(void** data, ompi_cb_fifo_ctl_t *fifo) static inline int ompi_cb_fifo_read_from_tail_no_lock(void** data, ompi_cb_fifo_t *fifo)
{ {
int read_from_tail = OMPI_CB_ERROR,index = 0,clearIndex, i; int read_from_tail = OMPI_CB_ERROR,index = 0,clearIndex, i;
@ -388,19 +403,19 @@ int ompi_cb_fifo_read_from_tail_no_lock(void** data, ompi_cb_fifo_ctl_t *fifo)
/* set return data */ /* set return data */
index = fifo->tail->fifo_index; index = fifo->tail->fifo_index;
*data = fifo->queue[index]; *data = fifo->queue[index];
fifo->tail->numToClear++; fifo->tail->num_to_clear++;
/* check to see if time to do a lazy free of queue slots */ /* check to see if time to do a lazy free of queue slots */
if (tail->numToClear == ompi_lazy_free_frequency) { if (fifo->tail->num_to_clear == ompi_lazy_free_frequency) {
clearIndex = index - ompi_lazy_free_frequency + 1; clearIndex = index - ompi_lazy_free_frequency + 1;
clearIndex &= fifo->mask; clearIndex &= fifo->mask;
for (i = 0; i < ompi_lazy_free_frequency; i++) { for (i = 0; i < ompi_lazy_free_frequency; i++) {
fifo->queue[clearIndex] = fifo->slot_markded_free; fifo->queue[clearIndex] = OMPI_CB_FREE;
clearIndex++; clearIndex++;
clearIndex &= mask; clearIndex &= fifo->mask;
} }
fifo->tail->numToClear = 0; fifo->tail->num_to_clear = 0;
} }
/* increment counter for later lazy free */ /* increment counter for later lazy free */