Next checkpoint in the sm btl fixes:
- Add big comment about a general overview of what the sm btl is doing - random small code cleanups - fix instances of mca_btl_sm[0] to mca_btl_sm[1] where relevant - remove a lot of unused, confusing, and incorrect interface functions from ompi_fifo.h and ompi_circular_buffer.h. These functions, if they were used, would not work properly with the scheme that the sm btl uses with the fifos (i.e., receiver makes right -- if necessary) - add some missing offset computations in the fifo and circular buffers - change the types of offsets to be ssize_t, not size_t - remove an offset parameter from a function that didn't need it This commit was SVN r8135.
Этот коммит содержится в:
родитель
6444887373
Коммит
97b97f84b8
@ -96,259 +96,6 @@ struct ompi_cb_fifo_t {
|
||||
|
||||
typedef struct ompi_cb_fifo_t ompi_cb_fifo_t;
|
||||
|
||||
/**
|
||||
* Initialize a fifo
|
||||
*
|
||||
* @param size_of_fifo Length of fifo array (IN)
|
||||
*
|
||||
* @param fifo_memory_locality_index Locality index to apply to
|
||||
* the fifo array. Not currently
|
||||
* in use (IN)
|
||||
*
|
||||
* @param tail_memory_locality_index Locality index to apply to the
|
||||
* head control structure. Not
|
||||
* currently in use (IN)
|
||||
*
|
||||
* @param tail_memory_locality_index Locality index to apply to the
|
||||
* tail control structure. Not
|
||||
* currently in use (IN)
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @param memory_allocator Pointer to the memory allocator to use
|
||||
* to allocate memory for this fifo. (IN)
|
||||
*
|
||||
* @returncode Error code
|
||||
*
|
||||
*/
|
||||
static inline int ompi_cb_fifo_init(int size_of_fifo, int lazy_free_freq,
|
||||
int fifo_memory_locality_index, int head_memory_locality_index,
|
||||
int tail_memory_locality_index, ompi_cb_fifo_t *fifo,
|
||||
mca_mpool_base_module_t *memory_allocator)
|
||||
{
|
||||
|
||||
int errorCode = OMPI_SUCCESS,i;
|
||||
size_t len_to_allocate;
|
||||
|
||||
/* verify that size is power of 2, and greatter that 0 - if not,
|
||||
* round up */
|
||||
if ( 0 >= size_of_fifo) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* set fifo size */
|
||||
fifo->size = opal_round_up_to_nearest_pow2(size_of_fifo);
|
||||
|
||||
/* set lazy free frequence */
|
||||
if( ( 0 >= lazy_free_freq ) ||
|
||||
( lazy_free_freq > fifo->size) ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
fifo->lazy_free_frequency=lazy_free_freq;
|
||||
|
||||
/* this will be used to mask off the higher order bits,
|
||||
* and use the & operator for the wrap-around */
|
||||
fifo->mask = (fifo->size - 1);
|
||||
|
||||
/* allocate fifo array */
|
||||
len_to_allocate = sizeof(void *) * fifo->size;
|
||||
fifo->queue=memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL);
|
||||
if ( NULL == fifo->queue) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* initialize the queue entries */
|
||||
for (i = 0; i < fifo->size; i++) {
|
||||
fifo->queue[i] = OMPI_CB_FREE;
|
||||
}
|
||||
|
||||
/* change address be relative to the base of the memory segment */
|
||||
fifo->queue=(volatile void **)( (char *)(fifo->queue) -
|
||||
(size_t)(memory_allocator->mpool_base(memory_allocator)));
|
||||
|
||||
/* allocate head control structure */
|
||||
len_to_allocate = sizeof(ompi_cb_fifo_ctl_t);
|
||||
fifo->head=memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL);
|
||||
if ( NULL == fifo->head) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* initialize the head structure */
|
||||
opal_atomic_unlock(&(fifo->head->lock));
|
||||
fifo->head->fifo_index=0;
|
||||
fifo->head->num_to_clear=0;
|
||||
|
||||
/* allocate tail control structure */
|
||||
len_to_allocate = sizeof(ompi_cb_fifo_ctl_t);
|
||||
fifo->tail=memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL);
|
||||
if ( NULL == fifo->tail) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* initialize the head structure */
|
||||
opal_atomic_unlock(&(fifo->tail->lock));
|
||||
fifo->tail->fifo_index=0;
|
||||
fifo->tail->num_to_clear=0;
|
||||
|
||||
/* set memory locality indecies */
|
||||
fifo->fifo_memory_locality_index=fifo_memory_locality_index;
|
||||
fifo->head_memory_locality_index=head_memory_locality_index;
|
||||
fifo->tail_memory_locality_index=tail_memory_locality_index;
|
||||
|
||||
/* change addresses be relative to the base of the memory segment */
|
||||
fifo->head=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->head) -
|
||||
(size_t)(memory_allocator->mpool_base(memory_allocator)));
|
||||
fifo->tail=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->tail) -
|
||||
(size_t)(memory_allocator->mpool_base(memory_allocator)));
|
||||
|
||||
/* return */
|
||||
return errorCode;
|
||||
}
|
||||
|
||||
/**
|
||||
* function to cleanup the fifo
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @param memory_allocator Pointer to the memory allocator to use
|
||||
* to allocate memory for this fifo. (IN)
|
||||
*
|
||||
*/
|
||||
static inline int ompi_cb_fifo_free( ompi_cb_fifo_t *fifo,
|
||||
mca_mpool_base_module_t *memory_allocator)
|
||||
{
|
||||
|
||||
int errorCode = OMPI_SUCCESS;
|
||||
char *ptr;
|
||||
|
||||
/* make sure null fifo is not passed in */
|
||||
if ( NULL == fifo) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* free fifo array */
|
||||
if( OMPI_CB_NULL != fifo->queue ){
|
||||
ptr=(char *)(fifo->queue)+(size_t)(memory_allocator->mpool_base(memory_allocator));
|
||||
memory_allocator->mpool_free(memory_allocator, ptr, NULL);
|
||||
fifo->queue=OMPI_CB_NULL;
|
||||
}
|
||||
|
||||
/* free head control structure */
|
||||
if( OMPI_CB_NULL != fifo->head) {
|
||||
ptr=(char *)(fifo->head)+(size_t)(memory_allocator->mpool_base(memory_allocator));
|
||||
memory_allocator->mpool_free(memory_allocator, ptr, NULL);
|
||||
fifo->head=OMPI_CB_NULL;
|
||||
|
||||
}
|
||||
|
||||
/* free tail control structure */
|
||||
if( OMPI_CB_NULL != fifo->tail) {
|
||||
ptr=(char *)(fifo->tail)+(size_t)(memory_allocator->mpool_base(memory_allocator));
|
||||
memory_allocator->mpool_free(memory_allocator, ptr, NULL);
|
||||
fifo->tail=OMPI_CB_NULL;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return errorCode;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Write pointer to the specified slot
|
||||
*
|
||||
* @param slot Slot index (IN)
|
||||
*
|
||||
* @param data Pointer value to write in specified slot (IN)
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @returncode Slot index to which data is written
|
||||
*
|
||||
*/
|
||||
static inline int ompi_cb_fifo_write_to_slot(int slot, void* data,
|
||||
ompi_cb_fifo_t *fifo, size_t offset)
|
||||
{
|
||||
void **ptr;
|
||||
int wrote_to_slot = OMPI_CB_ERROR;
|
||||
/* make sure that this slot is already reserved */
|
||||
ptr=(void **)( (char *)(fifo->queue) + (size_t)offset);
|
||||
if (ptr[slot] == OMPI_CB_RESERVED ) {
|
||||
ptr[slot] = data;
|
||||
return slot;
|
||||
} else {
|
||||
return wrote_to_slot;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to write pointer to the head of the queue
|
||||
*
|
||||
* @param data Pointer value to write in specified slot (IN)
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @returncode Slot index to which data is written
|
||||
*
|
||||
*/
|
||||
static inline int ompi_cb_fifo_write_to_head(void *data, ompi_cb_fifo_t
|
||||
*fifo, size_t offset)
|
||||
{
|
||||
void **ptr;
|
||||
ompi_cb_fifo_ctl_t *h_ptr;
|
||||
int slot = OMPI_CB_ERROR, index;
|
||||
|
||||
h_ptr=(ompi_cb_fifo_ctl_t *) ((char *)(fifo->head) +
|
||||
(size_t)offset);
|
||||
index = h_ptr->fifo_index;
|
||||
/* make sure the head is pointing at a free element - avoid wrap
|
||||
* around */
|
||||
ptr=(void **)( (char *)(fifo->queue) + (size_t)offset);
|
||||
if (ptr[index] == OMPI_CB_FREE) {
|
||||
slot = index;
|
||||
ptr[slot] = data;
|
||||
opal_atomic_wmb();
|
||||
(h_ptr->fifo_index)++;
|
||||
(h_ptr->fifo_index) &= fifo->mask;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return slot;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reserve slot in the fifo array
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @returncode Slot index to which data is written
|
||||
*
|
||||
* @returncode OMPI_CB_ERROR failed to allocate index
|
||||
*
|
||||
*/
|
||||
static inline int ompi_cb_fifo_get_slot(ompi_cb_fifo_t *fifo,
|
||||
size_t offset)
|
||||
{
|
||||
void **ptr;
|
||||
ompi_cb_fifo_ctl_t *h_ptr;
|
||||
int return_value = OMPI_CB_ERROR,index;
|
||||
|
||||
h_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->head) + (size_t)offset);
|
||||
ptr=(void **)( (char *)(fifo->queue) + (size_t)offset);
|
||||
index = h_ptr->fifo_index;
|
||||
/* try and reserve slot */
|
||||
if ( OMPI_CB_FREE == ptr[index] ) {
|
||||
ptr[index] = OMPI_CB_RESERVED;
|
||||
return_value = index;
|
||||
opal_atomic_wmb();
|
||||
(h_ptr->fifo_index)++;
|
||||
(h_ptr->fifo_index) &= fifo->mask;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return return_value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to read pointer from the tail of the queue
|
||||
*
|
||||
@ -365,7 +112,7 @@ static inline int ompi_cb_fifo_get_slot(ompi_cb_fifo_t *fifo,
|
||||
*
|
||||
*/
|
||||
static inline void *ompi_cb_fifo_read_from_tail(ompi_cb_fifo_t *fifo,
|
||||
bool flush_entries_read, bool *queue_empty, size_t offset)
|
||||
bool flush_entries_read, bool *queue_empty, ssize_t offset)
|
||||
{
|
||||
int index = 0,clearIndex, i;
|
||||
void **q_ptr;
|
||||
@ -374,9 +121,9 @@ static inline void *ompi_cb_fifo_read_from_tail(ompi_cb_fifo_t *fifo,
|
||||
|
||||
*queue_empty=false;
|
||||
|
||||
h_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->head) + (size_t)offset);
|
||||
t_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->tail) + (size_t)offset);
|
||||
q_ptr=(void **)( (char *)(fifo->queue) + (size_t)offset);
|
||||
h_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->head) + offset);
|
||||
t_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->tail) + offset);
|
||||
q_ptr=(void **)( (char *)(fifo->queue) + offset);
|
||||
|
||||
/* check to see that the data is valid */
|
||||
if ((q_ptr[t_ptr->fifo_index] == OMPI_CB_FREE) ||
|
||||
|
@ -28,15 +28,15 @@
|
||||
|
||||
/** @file
|
||||
*
|
||||
* This defines a set of functions to create, and manipulate a FIFO
|
||||
* implemented as a link list of circular buffer FIFO's. FIFO
|
||||
* elements are assumed to be pointers. Pointers are written to
|
||||
* the head, and read from the tail. For thread safety, a spin
|
||||
* lock is provided in the !!!!!ompi_cb_fifo_ctl_t!!!! structure, but it's use must be managed by
|
||||
* the calling routines - this is not by these set of routines.
|
||||
* When a write to a circular buffer queue will overflow that queue,
|
||||
* the next cirular buffer queue if the link list is used, if it is
|
||||
* empty, or a new one is inserted into the list.
|
||||
* This defines a set of functions to create, and manipulate a FIFO
|
||||
* implemented as a link list of circular buffer FIFO's. FIFO
|
||||
* elements are assumed to be pointers. Pointers are written to the
|
||||
* head, and read from the tail. For thread safety, a spin lock is
|
||||
* provided in the !!!!!ompi_cb_fifo_ctl_t!!!! structure, but it's use
|
||||
* must be managed by the calling routines - this is not by these set
|
||||
* of routines. When a write to a circular buffer queue will overflow
|
||||
* that queue, the next cirular buffer queue if the link list is used,
|
||||
* if it is empty, or a new one is inserted into the list.
|
||||
*/
|
||||
|
||||
/*
|
||||
@ -48,7 +48,8 @@ struct ompi_cb_fifo_wrapper_t {
|
||||
/* pointer to ompi_cb_fifo_ctl_t structure in use */
|
||||
ompi_cb_fifo_t cb_fifo;
|
||||
|
||||
/* pointer to next ompi_cb_fifo_ctl_t structure */
|
||||
/* pointer to next ompi_cb_fifo_ctl_t structure. This is always
|
||||
stored as an absolute address. */
|
||||
volatile struct ompi_cb_fifo_wrapper_t *next_fifo_wrapper;
|
||||
|
||||
/* flag indicating if cb_fifo has over flown - need this to force
|
||||
@ -62,10 +63,12 @@ typedef struct ompi_cb_fifo_wrapper_t ompi_cb_fifo_wrapper_t;
|
||||
/* data structure used to describe the fifo */
|
||||
struct ompi_fifo_t {
|
||||
|
||||
/* pointer to head (write) ompi_cb_fifo_t structure */
|
||||
/* pointer to head (write) ompi_cb_fifo_t structure. This is
|
||||
always stored as an absolute address. */
|
||||
volatile ompi_cb_fifo_wrapper_t *head;
|
||||
|
||||
/* pointer to tail (read) ompi_cb_fifo_t structure */
|
||||
/* pointer to tail (read) ompi_cb_fifo_t structure. This is
|
||||
always stored as an absolute address. */
|
||||
volatile ompi_cb_fifo_wrapper_t *tail;
|
||||
|
||||
/* locks for thread synchronization */
|
||||
@ -89,298 +92,6 @@ struct cb_slot_t {
|
||||
|
||||
typedef struct cb_slot_t cb_slot_t;
|
||||
|
||||
/**
|
||||
* Initialize a fifo
|
||||
*
|
||||
* @param size_of_cb_fifo Length of fifo array (IN)
|
||||
*
|
||||
* @param fifo_memory_locality_index Locality index to apply to
|
||||
* the fifo array. Not currently
|
||||
* in use (IN)
|
||||
*
|
||||
* @param tail_memory_locality_index Locality index to apply to the
|
||||
* head control structure. Not
|
||||
* currently in use (IN)
|
||||
*
|
||||
* @param tail_memory_locality_index Locality index to apply to the
|
||||
* tail control structure. Not
|
||||
* currently in use (IN)
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @param memory_allocator Pointer to the memory allocator to use
|
||||
* to allocate memory for this fifo. (IN)
|
||||
*
|
||||
* @returncode Error code
|
||||
*
|
||||
*/
|
||||
static inline int ompi_fifo_init(int size_of_cb_fifo, int lazy_free_freq,
|
||||
int fifo_memory_locality_index, int head_memory_locality_index,
|
||||
int tail_memory_locality_index, ompi_fifo_t *fifo,
|
||||
mca_mpool_base_module_t *memory_allocator)
|
||||
{
|
||||
int error_code=OMPI_SUCCESS;
|
||||
size_t len_to_allocate;
|
||||
|
||||
/* allocate head ompi_cb_fifo_t structure */
|
||||
len_to_allocate=sizeof(ompi_cb_fifo_wrapper_t);
|
||||
fifo->head=memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL);
|
||||
if ( NULL == fifo->head) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* initialize the circular buffer fifo head structure */
|
||||
error_code=ompi_cb_fifo_init(size_of_cb_fifo, lazy_free_freq,
|
||||
fifo_memory_locality_index, head_memory_locality_index,
|
||||
tail_memory_locality_index,
|
||||
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo),
|
||||
memory_allocator);
|
||||
if ( OMPI_SUCCESS != error_code ) {
|
||||
return error_code;
|
||||
}
|
||||
|
||||
/* finish head initialization */
|
||||
fifo->head->next_fifo_wrapper=
|
||||
(volatile struct ompi_cb_fifo_wrapper_t *)fifo->head; /* only one element
|
||||
in the link list */
|
||||
fifo->head->cb_overflow=false; /* no attempt to overflow the queue */
|
||||
|
||||
/* set the tail */
|
||||
fifo->tail=fifo->head;
|
||||
|
||||
/* return */
|
||||
return error_code;
|
||||
}
|
||||
|
||||
/**
|
||||
* function to cleanup the fifo
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @param memory_allocator Pointer to the memory allocator to use
|
||||
* to allocate memory for this fifo. (IN)
|
||||
*
|
||||
*/
|
||||
static inline int ompi_fifo_free( ompi_fifo_t *fifo,
|
||||
mca_mpool_base_module_t *memory_allocator)
|
||||
{
|
||||
|
||||
int error_code=OMPI_SUCCESS;
|
||||
ompi_cb_fifo_wrapper_t *starting_ff,*ff,*ff_tmp;
|
||||
|
||||
/* loop over the link list of ompi_cb_fifo_wrapper_t structs */
|
||||
starting_ff=(ompi_cb_fifo_wrapper_t *)fifo->head;
|
||||
ff=starting_ff;
|
||||
do {
|
||||
|
||||
/* free the resources associated with the ompi_cb_fifo_t structure */
|
||||
error_code=ompi_cb_fifo_free((ompi_cb_fifo_t *)&(ff->cb_fifo),
|
||||
memory_allocator);
|
||||
if ( OMPI_SUCCESS != error_code ) {
|
||||
return error_code;
|
||||
}
|
||||
|
||||
/* next structure */
|
||||
ff_tmp=(ompi_cb_fifo_wrapper_t *)ff->next_fifo_wrapper;
|
||||
|
||||
/* free the element */
|
||||
memory_allocator->mpool_free(memory_allocator, ff, NULL);
|
||||
|
||||
ff=ff_tmp;
|
||||
|
||||
} while (ff != starting_ff);
|
||||
|
||||
/* return */
|
||||
return error_code;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Write pointer to the specified slot
|
||||
*
|
||||
* @param slot Slot addressing (IN)
|
||||
*
|
||||
* @param data Pointer value to write in specified slot (IN)
|
||||
*
|
||||
* @param offset Offset relative to base of the memory segement (IN)
|
||||
*
|
||||
* @returncode Slot index data written to
|
||||
*
|
||||
*/
|
||||
static inline int ompi_fifo_write_to_slot(cb_slot_t *slot, void* data,
|
||||
size_t offset)
|
||||
{
|
||||
return ompi_cb_fifo_write_to_slot(slot->index,data,slot->cb,offset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to write pointer to the head of the queue
|
||||
*
|
||||
* @param data Pointer value to write in specified slot (IN)
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @param offset Offset relative to base of the memory segement (IN)
|
||||
*
|
||||
* @returncode Slot index to which data is written
|
||||
*
|
||||
*/
|
||||
static inline int ompi_fifo_write_to_head(void *data, ompi_fifo_t
|
||||
*fifo, mca_mpool_base_module_t *fifo_allocator, size_t offset)
|
||||
{
|
||||
int error_code=OMPI_SUCCESS;
|
||||
size_t len_to_allocate;
|
||||
ompi_cb_fifo_wrapper_t *next_ff;
|
||||
bool available;
|
||||
|
||||
/* attempt to write data to head ompi_fifo_cb_fifo_t */
|
||||
error_code=ompi_cb_fifo_write_to_head(data,
|
||||
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo), offset);
|
||||
if( OMPI_CB_ERROR == error_code ) {
|
||||
/*
|
||||
* queue is full
|
||||
*/
|
||||
|
||||
/* mark queue as overflown */
|
||||
fifo->head->cb_overflow=true;
|
||||
|
||||
/* see if next queue is available - while the next queue
|
||||
* has not been emptied, it will be marked as overflowen*/
|
||||
next_ff=(ompi_cb_fifo_wrapper_t *)fifo->head->next_fifo_wrapper;
|
||||
available=!(next_ff->cb_overflow);
|
||||
|
||||
/* if next queue not available, allocate new queue */
|
||||
if( !available ) {
|
||||
|
||||
/* allocate head ompi_cb_fifo_t structure */
|
||||
len_to_allocate=sizeof(ompi_cb_fifo_wrapper_t);
|
||||
next_ff=fifo_allocator->mpool_alloc
|
||||
(fifo_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL);
|
||||
if ( NULL == next_ff) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* initialize the circular buffer fifo head structure */
|
||||
error_code=ompi_cb_fifo_init(fifo->head->cb_fifo.size,
|
||||
fifo->head->cb_fifo.lazy_free_frequency,
|
||||
fifo->head->cb_fifo.fifo_memory_locality_index,
|
||||
fifo->head->cb_fifo.head_memory_locality_index,
|
||||
fifo->head->cb_fifo.tail_memory_locality_index,
|
||||
&(next_ff->cb_fifo),
|
||||
fifo_allocator);
|
||||
if ( OMPI_SUCCESS != error_code ) {
|
||||
return error_code;
|
||||
}
|
||||
|
||||
|
||||
/* finish new element initialization */
|
||||
next_ff->next_fifo_wrapper=fifo->head->next_fifo_wrapper; /* only one
|
||||
element in the
|
||||
link list */
|
||||
next_ff->cb_overflow=false; /* no attempt to overflow the queue */
|
||||
}
|
||||
|
||||
/* reset head pointer */
|
||||
fifo->head->next_fifo_wrapper=next_ff;
|
||||
fifo->head=next_ff;
|
||||
|
||||
/* write data to new head structure */
|
||||
error_code=ompi_cb_fifo_write_to_head(data,
|
||||
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo), offset);
|
||||
if( OMPI_CB_ERROR == error_code ) {
|
||||
return error_code;
|
||||
}
|
||||
}
|
||||
|
||||
/* return */
|
||||
return error_code;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reserve slot in the fifo array
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @returncode Slot index to which data is written
|
||||
*
|
||||
* @param offset Offset relative to base of the memory segement (IN)
|
||||
*
|
||||
* @returncode OMPI_CB_ERROR failed to allocate index
|
||||
*
|
||||
*/
|
||||
static inline cb_slot_t ompi_fifo_get_slot(ompi_fifo_t *fifo,
|
||||
mca_mpool_base_module_t *fifo_allocator, size_t offset)
|
||||
{
|
||||
size_t len_to_allocate;
|
||||
volatile ompi_cb_fifo_wrapper_t *next_ff;
|
||||
bool available;
|
||||
cb_slot_t return_params;
|
||||
|
||||
/* attempt to write data to head ompi_fifo_cb_fifo_t */
|
||||
return_params.index=ompi_cb_fifo_get_slot(
|
||||
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo), offset);
|
||||
if( OMPI_CB_ERROR == return_params.index ) {
|
||||
/*
|
||||
* queue is full
|
||||
*/
|
||||
|
||||
/* mark queue as overflown */
|
||||
fifo->head->cb_overflow=true;
|
||||
|
||||
/* see if next queue is available - while the next queue
|
||||
* has not been emptied, it will be marked as overflowen*/
|
||||
next_ff=fifo->head->next_fifo_wrapper;
|
||||
available=!(next_ff->cb_overflow);
|
||||
|
||||
/* if next queue not available, allocate new queue */
|
||||
if( !available ) {
|
||||
|
||||
/* allocate head ompi_cb_fifo_t structure */
|
||||
len_to_allocate=sizeof(ompi_cb_fifo_wrapper_t);
|
||||
next_ff=fifo_allocator->mpool_alloc
|
||||
(fifo_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL);
|
||||
if ( NULL == next_ff) {
|
||||
return_params.index=OMPI_ERR_OUT_OF_RESOURCE;
|
||||
return return_params;
|
||||
}
|
||||
|
||||
/* initialize the circular buffer fifo head structure */
|
||||
return_params.index=ompi_cb_fifo_init(fifo->head->cb_fifo.size,
|
||||
fifo->head->cb_fifo.lazy_free_frequency,
|
||||
fifo->head->cb_fifo.fifo_memory_locality_index,
|
||||
fifo->head->cb_fifo.head_memory_locality_index,
|
||||
fifo->head->cb_fifo.tail_memory_locality_index,
|
||||
(ompi_cb_fifo_t *)&(next_ff->cb_fifo),
|
||||
fifo_allocator);
|
||||
if ( OMPI_SUCCESS != return_params.index ) {
|
||||
return return_params;
|
||||
}
|
||||
|
||||
|
||||
/* finish new element initialization */
|
||||
next_ff->next_fifo_wrapper=fifo->head->next_fifo_wrapper; /* only one element in
|
||||
the link list */
|
||||
next_ff->cb_overflow=false; /* no attempt to overflow the queue */
|
||||
}
|
||||
|
||||
/* reset head pointer */
|
||||
fifo->head->next_fifo_wrapper=next_ff;
|
||||
fifo->head=next_ff;
|
||||
|
||||
/* write data to new head structure */
|
||||
return_params.index=ompi_cb_fifo_get_slot(
|
||||
(ompi_cb_fifo_t *)&(fifo->head->cb_fifo), offset);
|
||||
if( OMPI_CB_ERROR == return_params.index ) {
|
||||
return return_params;
|
||||
}
|
||||
}
|
||||
|
||||
/* return */
|
||||
return_params.cb=(ompi_cb_fifo_t *)&(fifo->head->cb_fifo);
|
||||
return return_params;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to read pointer from the tail of the queue
|
||||
*
|
||||
@ -391,28 +102,33 @@ static inline cb_slot_t ompi_fifo_get_slot(ompi_fifo_t *fifo,
|
||||
* @returncode Pointer - OMPI_CB_FREE indicates no data to read
|
||||
*
|
||||
*/
|
||||
static inline void *ompi_fifo_read_from_tail(ompi_fifo_t *fifo, size_t
|
||||
offset)
|
||||
static inline void *ompi_fifo_read_from_tail(ompi_fifo_t *fifo,
|
||||
ssize_t offset)
|
||||
{
|
||||
/* local parameters */
|
||||
void *return_value;
|
||||
bool queue_empty;
|
||||
volatile ompi_cb_fifo_wrapper_t *t_ptr;
|
||||
|
||||
t_ptr = (volatile ompi_cb_fifo_wrapper_t *)
|
||||
(((char*) fifo->tail) + offset);
|
||||
|
||||
/* get next element */
|
||||
return_value=ompi_cb_fifo_read_from_tail(
|
||||
(ompi_cb_fifo_t *)&(fifo->tail->cb_fifo),
|
||||
fifo->tail->cb_overflow,&queue_empty, offset);
|
||||
(ompi_cb_fifo_t *)&(t_ptr->cb_fifo),
|
||||
t_ptr->cb_overflow, &queue_empty, offset);
|
||||
|
||||
/* check to see if need to move on to next cb_fifo in the link list */
|
||||
if( queue_empty ) {
|
||||
/* queue_emptied - move on to next element in fifo */
|
||||
fifo->tail->cb_overflow=false;
|
||||
fifo->tail=fifo->tail->next_fifo_wrapper;
|
||||
t_ptr->cb_overflow = false;
|
||||
fifo->tail = t_ptr->next_fifo_wrapper;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return return_value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a fifo
|
||||
*
|
||||
@ -483,13 +199,11 @@ static inline int ompi_fifo_init_same_base_addr(int size_of_cb_fifo,
|
||||
*
|
||||
* @param data Pointer value to write in specified slot (IN)
|
||||
*
|
||||
* @param offset Offset relative to base of the memory segement (IN)
|
||||
*
|
||||
* @returncode Slot index data written to
|
||||
*
|
||||
*/
|
||||
static inline int ompi_fifo_write_to_slot_same_base_addr(cb_slot_t *slot,
|
||||
void* data, size_t offset)
|
||||
void* data)
|
||||
{
|
||||
return ompi_cb_fifo_write_to_slot_same_base_addr(slot->index,data,
|
||||
slot->cb);
|
||||
@ -662,8 +376,6 @@ static inline cb_slot_t ompi_fifo_get_slot_same_base_addr(ompi_fifo_t *fifo,
|
||||
*
|
||||
* @param fifo Pointer to data structure defining this fifo (IN)
|
||||
*
|
||||
* @param offset Offset relative to base of the memory segement (IN)
|
||||
*
|
||||
* @returncode Pointer - OMPI_CB_FREE indicates no data to read
|
||||
*
|
||||
*/
|
||||
|
@ -45,6 +45,43 @@
|
||||
#include "btl_sm_fifo.h"
|
||||
#include "ompi/proc/proc.h"
|
||||
|
||||
/**
|
||||
* @file
|
||||
*
|
||||
* Note that there are effectively two versions of the btl sm module
|
||||
* -- one that assumes that the base address of the shared memory
|
||||
* segment is the same between pairs of processes (i.e., mmap()
|
||||
* returned the same virtual address for the same segment in both
|
||||
* processes), and one that assumes that the base addresses are
|
||||
* different.
|
||||
*
|
||||
* In the former, no translation is necessary -- all pointers can be
|
||||
* stored directly as-is and used in both processes.
|
||||
*
|
||||
* In the latter, we calculate the difference between the base virtual
|
||||
* address of the two process' shared memory segments and cache it.
|
||||
* This difference is used to access memory and pointers written by
|
||||
* the other process.
|
||||
*
|
||||
* Specifically, a good portion of this btl is implemented in the
|
||||
* ompi_fifo_t and ompi_circular_buffer_t classes. These classes
|
||||
* *always* store absolute virtual addresses in their data structures.
|
||||
* The virtual addresses are always meaningful in the *sender's*
|
||||
* process space. If the base address is the same in both processes,
|
||||
* then we get the happy side effect that the virtual addresses are
|
||||
* also valid in the receiver's process space, and therefore no
|
||||
* address translation needs to be done when the reader accesses the
|
||||
* data.
|
||||
*
|
||||
* However, in the case where the base addresses are different, the
|
||||
* receiver must translate every pointer address in the ompi_fifo_t
|
||||
* and ompi_circular_buffer_t data structures (even when writing back
|
||||
* to those data structures, such as updating a head or tail pointer).
|
||||
*
|
||||
* In short, we use a "receiver makes right" scheme, and in some
|
||||
* cases, the receiver doesn't have to do anything.
|
||||
*/
|
||||
|
||||
mca_btl_sm_t mca_btl_sm[2] = {
|
||||
{
|
||||
{
|
||||
@ -465,6 +502,10 @@ int mca_btl_sm_add_procs_same_base_addr(
|
||||
fifo_tmp=(ompi_fifo_t * volatile *)
|
||||
( (char *)(mca_btl_sm_component.sm_ctl_header->fifo) +
|
||||
(long)(mca_btl_sm_component.sm_mpool->mpool_base(mca_btl_sm_component.sm_mpool)) );
|
||||
tmp_ptr=(volatile char **)
|
||||
( (char *)mca_btl_sm_component.sm_ctl_header->
|
||||
segment_header.base_shared_mem_segment +
|
||||
(long)mca_btl_sm_component.sm_mpool->mpool_base(mca_btl_sm_component.sm_mpool));
|
||||
for( j=mca_btl_sm_component.num_smp_procs ; j <
|
||||
mca_btl_sm_component.num_smp_procs+n_local_procs ; j++ ) {
|
||||
|
||||
@ -474,16 +515,11 @@ int mca_btl_sm_add_procs_same_base_addr(
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
tmp_ptr=(volatile char **)
|
||||
( (char *)mca_btl_sm_component.sm_ctl_header->
|
||||
segment_header.base_shared_mem_segment +
|
||||
(long)mca_btl_sm_component.sm_mpool->mpool_base(mca_btl_sm_component.sm_mpool));
|
||||
diff= tmp_ptr[mca_btl_sm_component.my_smp_rank]-tmp_ptr[j];
|
||||
/* Calculate the difference as (my_base - their_base) */
|
||||
diff = tmp_ptr[mca_btl_sm_component.my_smp_rank] - tmp_ptr[j];
|
||||
mca_btl_sm_component.fifo[j]=
|
||||
( ompi_fifo_t *)( (char *)fifo_tmp[j]+diff);
|
||||
mca_btl_sm_component.sm_offset[j]=tmp_ptr[j]-
|
||||
tmp_ptr[mca_btl_sm_component.my_smp_rank];
|
||||
|
||||
mca_btl_sm_component.sm_offset[j] = diff;
|
||||
}
|
||||
|
||||
/* initialize some of the free-lists */
|
||||
@ -626,6 +662,7 @@ int mca_btl_sm_add_procs(
|
||||
/* Different base address case */
|
||||
else if (SM_CONNECTED_DIFFERENT_BASE_ADDR ==
|
||||
mca_btl_sm_component.sm_proc_connect[proc]) {
|
||||
printf("=================== different base ===============\n");
|
||||
|
||||
/* add this proc to shared memory accessability list */
|
||||
return_code=ompi_bitmap_set_bit(reachability,proc);
|
||||
|
@ -343,7 +343,10 @@ int mca_btl_sm_component_progress(void)
|
||||
}
|
||||
|
||||
/* get pointer - pass in offset to change queue pointer
|
||||
* addressing from that of the sender */
|
||||
* addressing from that of the sender. In this case, we know
|
||||
* that we have the same base address as the sender, so no
|
||||
* translation is necessary when accessing the fifo. Hence,
|
||||
* we use the _same_base_addr varient. */
|
||||
frag = (mca_btl_sm_frag_t *)
|
||||
ompi_fifo_read_from_tail_same_base_addr( fifo );
|
||||
if( OMPI_CB_FREE == frag ) {
|
||||
@ -422,7 +425,11 @@ int mca_btl_sm_component_progress(void)
|
||||
}
|
||||
|
||||
/* get pointer - pass in offset to change queue pointer
|
||||
* addressing from that of the sender */
|
||||
* addressing from that of the sender. In this case, we do
|
||||
* *not* have the same base address as the sender, so we must
|
||||
* translate every access into the fifo to be relevant to our
|
||||
* memory space. Hence, we do *not* use the _same_base_addr
|
||||
* variant. */
|
||||
frag=(mca_btl_sm_frag_t *)ompi_fifo_read_from_tail( fifo,
|
||||
mca_btl_sm_component.sm_offset[peer_smp_rank]);
|
||||
if( OMPI_CB_FREE == frag ) {
|
||||
@ -440,7 +447,7 @@ int mca_btl_sm_component_progress(void)
|
||||
|
||||
/* change the address from address relative to the shared
|
||||
* memory address, to a true virtual address */
|
||||
frag = (mca_btl_sm_frag_t *)( (char *)frag+
|
||||
frag = (mca_btl_sm_frag_t *)( (char *)frag +
|
||||
mca_btl_sm_component.sm_offset[peer_smp_rank]);
|
||||
|
||||
/* dispatch fragment by type */
|
||||
@ -449,29 +456,29 @@ int mca_btl_sm_component_progress(void)
|
||||
{
|
||||
/* completion callback */
|
||||
frag->base.des_src =
|
||||
( mca_btl_base_segment_t* )((unsigned char*)frag->base.des_dst - mca_btl_sm_component.sm_offset[peer_smp_rank]);
|
||||
( mca_btl_base_segment_t* )((unsigned char*)frag->base.des_dst + mca_btl_sm_component.sm_offset[peer_smp_rank]);
|
||||
frag->base.des_src->seg_addr.pval =
|
||||
((unsigned char*)frag->base.des_src->seg_addr.pval -
|
||||
((unsigned char*)frag->base.des_src->seg_addr.pval +
|
||||
mca_btl_sm_component.sm_offset[peer_smp_rank]);
|
||||
frag->base.des_src_cnt = frag->base.des_dst_cnt;
|
||||
frag->base.des_dst = NULL;
|
||||
frag->base.des_dst_cnt = 0;
|
||||
frag->base.des_cbfunc(&mca_btl_sm[0].super, frag->endpoint, &frag->base, frag->rc);
|
||||
frag->base.des_cbfunc(&mca_btl_sm[1].super, frag->endpoint, &frag->base, frag->rc);
|
||||
break;
|
||||
}
|
||||
case MCA_BTL_SM_FRAG_SEND:
|
||||
{
|
||||
/* recv upcall */
|
||||
mca_btl_sm_recv_reg_t* reg = mca_btl_sm[0].sm_reg + frag->tag;
|
||||
mca_btl_sm_recv_reg_t* reg = mca_btl_sm[1].sm_reg + frag->tag;
|
||||
frag->base.des_dst = (mca_btl_base_segment_t*)
|
||||
((unsigned char*)frag->base.des_src + mca_btl_sm_component.sm_offset[peer_smp_rank]);
|
||||
frag->base.des_dst->seg_addr.pval =
|
||||
((unsigned char*)frag->base.des_dst->seg_addr.pval +
|
||||
((unsigned char*)frag->base.des_dst->seg_addr.pval +
|
||||
mca_btl_sm_component.sm_offset[peer_smp_rank]);
|
||||
frag->base.des_dst_cnt = frag->base.des_src_cnt;
|
||||
frag->base.des_src = NULL;
|
||||
frag->base.des_src_cnt = 0;
|
||||
reg->cbfunc(&mca_btl_sm[0].super,frag->tag,&frag->base,reg->cbdata);
|
||||
reg->cbfunc(&mca_btl_sm[1].super,frag->tag,&frag->base,reg->cbdata);
|
||||
frag->type = MCA_BTL_SM_FRAG_ACK;
|
||||
MCA_BTL_SM_FIFO_WRITE( mca_btl_sm_component.sm_peers[peer_smp_rank],
|
||||
my_smp_rank, peer_smp_rank, frag, rc );
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user