diff --git a/ompi/class/ompi_circular_buffer_fifo.h b/ompi/class/ompi_circular_buffer_fifo.h index 89ece4a413..358bbb16b9 100644 --- a/ompi/class/ompi_circular_buffer_fifo.h +++ b/ompi/class/ompi_circular_buffer_fifo.h @@ -57,116 +57,35 @@ struct ompi_cb_fifo_ctl_t { /* number of entries that have been used, but not invalidated. used * for lazy resource reclamation */ - volatile int num_to_clear; - + int num_to_clear; }; typedef struct ompi_cb_fifo_ctl_t ompi_cb_fifo_ctl_t; /* data structure used to describe the fifo */ struct ompi_cb_fifo_t { + /* head of queue - where next entry will be written (sender address)*/ + ompi_cb_fifo_ctl_t *head; - /* size of fifo */ - int size; + /* tail of queue - next element to read (receiver address) */ + ompi_cb_fifo_ctl_t *tail; + + /* head of queue - where next entry will be written (receiver address) */ + ompi_cb_fifo_ctl_t *recv_head; + + /* circular buffer array (sender address) */ + volatile void **queue; + + /* circular buffer array (receiver address) */ + volatile void **recv_queue; /* frequency of lazy free */ int lazy_free_frequency; - /* fifo memory locality index */ - int fifo_memory_locality_index; - - /* head memory locality index */ - int head_memory_locality_index; - - /* tail memory locality index */ - int tail_memory_locality_index; - - /* head of queue - where next entry will be written */ - ompi_cb_fifo_ctl_t *head; - - /* tail of queue - next element to read */ - ompi_cb_fifo_ctl_t *tail; - /* mask - to handle wrap around */ unsigned int mask; - - /* circular buffer array */ - volatile void **queue; - }; - typedef struct ompi_cb_fifo_t ompi_cb_fifo_t; -/** - * Try to read pointer from the tail of the queue - * - * @param data Pointer to where data was be written (OUT) - * - * @param fifo Pointer to data structure defining this fifo (IN) - * - * @param flush_entries_read force the lazy free to happen (IN) - * - * @param queue_empty checks to see if the fifo is empty, but only if - * flush_entries_read is set (OUT) - * - * @returncode Slot index to which data is written - * - */ -static inline void *ompi_cb_fifo_read_from_tail(ompi_cb_fifo_t *fifo, - bool flush_entries_read, bool *queue_empty, ptrdiff_t offset) -{ - int old_fifo_index, clearIndex, i; - void **q_ptr; - ompi_cb_fifo_ctl_t *h_ptr, *t_ptr; - void *read_from_tail = (void *)OMPI_CB_ERROR; - - *queue_empty=false; - - h_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->head) + offset); - t_ptr=(ompi_cb_fifo_ctl_t *)( (char *)(fifo->tail) + offset); - q_ptr=(void **)( (char *)(fifo->queue) + offset); - - old_fifo_index = t_ptr->fifo_index; - - /* check to see that the data is valid */ - if ((q_ptr[old_fifo_index] == OMPI_CB_FREE) || - (q_ptr[old_fifo_index] == OMPI_CB_RESERVED)) - { - return (void *)OMPI_CB_FREE; - } - - /* set return data */ - read_from_tail = (void *)q_ptr[old_fifo_index]; - opal_atomic_rmb(); - t_ptr->num_to_clear++; - - /* increment counter for later lazy free */ - (t_ptr->fifo_index)++; - (t_ptr->fifo_index) &= fifo->mask; - - /* check to see if time to do a lazy free of queue slots */ - if ( (t_ptr->num_to_clear == fifo->lazy_free_frequency) || - flush_entries_read ) { - clearIndex = old_fifo_index - t_ptr->num_to_clear + 1; - clearIndex &= fifo->mask; - - for (i = 0; i < t_ptr->num_to_clear; i++) { - q_ptr[clearIndex] = OMPI_CB_FREE; - clearIndex++; - clearIndex &= fifo->mask; - } - t_ptr->num_to_clear = 0; - - /* check to see if queue is empty */ - if( flush_entries_read && - (t_ptr->fifo_index == h_ptr->fifo_index) ) { - *queue_empty=true; - } - } - - /* return */ - return read_from_tail; -} - /** * Return the fifo size * @@ -177,7 +96,7 @@ static inline void *ompi_cb_fifo_read_from_tail(ompi_cb_fifo_t *fifo, */ static inline int ompi_cb_fifo_size(ompi_cb_fifo_t *fifo) { - return fifo->size; + return fifo->mask + 1; } /** @@ -205,75 +124,67 @@ static inline int ompi_cb_fifo_size(ompi_cb_fifo_t *fifo) { * @returncode Error code * */ -static inline int ompi_cb_fifo_init_same_base_addr(int size_of_fifo, +static inline int ompi_cb_fifo_init(int size_of_fifo, int lazy_free_freq, int fifo_memory_locality_index, int head_memory_locality_index, int tail_memory_locality_index, - ompi_cb_fifo_t *fifo, mca_mpool_base_module_t *memory_allocator) + ompi_cb_fifo_t *fifo, ptrdiff_t offset, + mca_mpool_base_module_t *memory_allocator) { + int i, size; + char *buf; - int i; - size_t len_to_allocate; - - /* verify that size is power of 2, and greater than 0 - if not, + /* verify that size is power of 2, and greater that 0 - if not, * round up */ - if ( 0 >= size_of_fifo) { + if(size_of_fifo <= 0) { return OMPI_ERROR; } /* set fifo size */ - fifo->size = opal_round_up_to_nearest_pow2(size_of_fifo); + size = opal_round_up_to_nearest_pow2(size_of_fifo); /* set lazy free frequence */ - if( ( 0 >= lazy_free_freq ) || - ( lazy_free_freq > fifo->size) ) { + if((lazy_free_freq <= 0) || (lazy_free_freq > size)) { return OMPI_ERROR; } - fifo->lazy_free_frequency=lazy_free_freq; + + fifo->lazy_free_frequency = lazy_free_freq; /* this will be used to mask off the higher order bits, * and use the & operator for the wrap-around */ - fifo->mask = (fifo->size - 1); + fifo->mask = (size - 1); /* allocate fifo array */ - len_to_allocate = sizeof(void *) * fifo->size; - fifo->queue = (volatile void**)memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); - if ( NULL == fifo->queue) { + buf = memory_allocator->mpool_alloc(memory_allocator, + sizeof(void *) * size + 2*CACHE_LINE_SIZE, CACHE_LINE_SIZE, 0, + NULL); + if (NULL == buf) { return OMPI_ERR_OUT_OF_RESOURCE; } - + fifo->queue = (volatile void**)(buf + 2*CACHE_LINE_SIZE); + /* buffer address in a receiver address space */ + fifo->recv_queue = (volatile void**)((char*)fifo->queue - offset); /* initialize the queue entries */ - for (i = 0; i < fifo->size; i++) { + for (i = 0; i < size; i++) { fifo->queue[i] = OMPI_CB_FREE; } - /* allocate head control structure */ - len_to_allocate = sizeof(ompi_cb_fifo_ctl_t); - fifo->head = (ompi_cb_fifo_ctl_t*)memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); - if ( NULL == fifo->head) { - return OMPI_ERR_OUT_OF_RESOURCE; - } + fifo->head = (ompi_cb_fifo_ctl_t*)buf; + /* head address in a receiver address space */ + fifo->recv_head = (ompi_cb_fifo_ctl_t*)((char*)fifo->head - offset); + fifo->tail = (ompi_cb_fifo_ctl_t*)(buf + CACHE_LINE_SIZE); /* initialize the head structure */ opal_atomic_unlock(&(fifo->head->lock)); fifo->head->fifo_index=0; fifo->head->num_to_clear=0; - /* allocate tail control structure */ - len_to_allocate = sizeof(ompi_cb_fifo_ctl_t); - fifo->tail = (ompi_cb_fifo_ctl_t*)memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); - if ( NULL == fifo->tail) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - /* initialize the head structure */ opal_atomic_unlock(&(fifo->tail->lock)); fifo->tail->fifo_index=0; fifo->tail->num_to_clear=0; - /* set memory locality indecies */ - fifo->fifo_memory_locality_index=fifo_memory_locality_index; - fifo->head_memory_locality_index=head_memory_locality_index; - fifo->tail_memory_locality_index=tail_memory_locality_index; + /* recalculate tail address in a receiver address space */ + fifo->tail = (ompi_cb_fifo_ctl_t*)((char*)fifo->tail - offset); /* return */ return OMPI_SUCCESS; @@ -288,40 +199,26 @@ static inline int ompi_cb_fifo_init_same_base_addr(int size_of_fifo, * to allocate memory for this fifo. (IN) * */ -static inline int ompi_cb_fifo_free_same_base_addr( ompi_cb_fifo_t *fifo, - mca_mpool_base_module_t *memory_allocator) +static inline int ompi_cb_fifo_free(ompi_cb_fifo_t *fifo, + mca_mpool_base_module_t *memory_allocator) { char *ptr; /* make sure null fifo is not passed in */ - if ( NULL == fifo) { + if(NULL == fifo) { return OMPI_ERROR; } /* free fifo array */ - if( OMPI_CB_NULL != fifo->head ){ - ptr=(char *)(fifo->queue); + if(OMPI_CB_NULL != fifo->head){ + ptr=(char *)(fifo->head); memory_allocator->mpool_free(memory_allocator, ptr, NULL); fifo->queue = (volatile void**)OMPI_CB_NULL; } - /* free head control structure */ - if( OMPI_CB_NULL != fifo->head) { - ptr=(char *)(fifo->head); - memory_allocator->mpool_free(memory_allocator, ptr, NULL); - fifo->head = (ompi_cb_fifo_ctl_t*)OMPI_CB_NULL; - } - - /* free tail control structure */ - if( OMPI_CB_NULL != fifo->tail) { - ptr=(char *)(fifo->tail); - memory_allocator->mpool_free(memory_allocator, ptr, NULL); - fifo->tail = (ompi_cb_fifo_ctl_t*)OMPI_CB_NULL; - } - - /* return */ return OMPI_SUCCESS; } + /** * Write pointer to the specified slot * @@ -334,16 +231,16 @@ static inline int ompi_cb_fifo_free_same_base_addr( ompi_cb_fifo_t *fifo, * @returncode Slot index to which data is written * */ -static inline int ompi_cb_fifo_write_to_slot_same_base_addr(int slot, void* data, +static inline int ompi_cb_fifo_write_to_slot(int slot, void* data, ompi_cb_fifo_t *fifo) { volatile void **ptr; - /* make sure that this slot is already reserved */ ptr=fifo->queue; if (ptr[slot] == OMPI_CB_RESERVED ) { - opal_atomic_wmb(); + opal_atomic_rmb(); ptr[slot] = data; + opal_atomic_wmb(); return slot; } @@ -360,29 +257,27 @@ static inline int ompi_cb_fifo_write_to_slot_same_base_addr(int slot, void* data * @returncode Slot index to which data is written * */ -static inline int ompi_cb_fifo_write_to_head_same_base_addr(void *data, ompi_cb_fifo_t *fifo) +static inline int ompi_cb_fifo_write_to_head(void *data, ompi_cb_fifo_t *fifo) { volatile void **ptr; ompi_cb_fifo_ctl_t *h_ptr; - int slot = OMPI_CB_ERROR; - int old_fifo_index; + int index; h_ptr=fifo->head; ptr=fifo->queue; - old_fifo_index = h_ptr->fifo_index; + index = h_ptr->fifo_index; /* make sure the head is pointing at a free element */ - if (ptr[old_fifo_index] == OMPI_CB_FREE) { - slot = old_fifo_index; + if (ptr[index] == OMPI_CB_FREE) { + opal_atomic_rmb(); + ptr[index] = data; opal_atomic_wmb(); - ptr[slot] = data; - (h_ptr->fifo_index)++; - /* wrap around */ - (h_ptr->fifo_index) &= fifo->mask; + h_ptr->fifo_index = (index + 1) & fifo->mask; + return index; } /* return */ - return slot; + return OMPI_CB_ERROR; } @@ -396,27 +291,9 @@ static inline int ompi_cb_fifo_write_to_head_same_base_addr(void *data, ompi_cb_ * @returncode OMPI_CB_ERROR failed to allocate index * */ -static inline int ompi_cb_fifo_get_slot_same_base_addr(ompi_cb_fifo_t *fifo) +static inline int ompi_cb_fifo_get_slot(ompi_cb_fifo_t *fifo) { - volatile void **ptr; - ompi_cb_fifo_ctl_t *h_ptr; - int slot = OMPI_CB_ERROR; - int old_fifo_index; - - h_ptr=fifo->head; - ptr=fifo->queue; - old_fifo_index = h_ptr->fifo_index; - - /* try and reserve slot */ - if ( OMPI_CB_FREE == ptr[old_fifo_index] ) { - slot = old_fifo_index; - ptr[old_fifo_index] = OMPI_CB_RESERVED; - (h_ptr->fifo_index)++; - (h_ptr->fifo_index) &= fifo->mask; - } - - /* return */ - return slot; + return ompi_cb_fifo_write_to_head(OMPI_CB_RESERVED, fifo); } /** @@ -434,49 +311,45 @@ static inline int ompi_cb_fifo_get_slot_same_base_addr(ompi_cb_fifo_t *fifo) * @returncode Slot index to which data is written * */ -static inline void *ompi_cb_fifo_read_from_tail_same_base_addr( +static inline void *ompi_cb_fifo_read_from_tail( ompi_cb_fifo_t *fifo, bool flush_entries_read, bool *queue_empty) { - int old_fifo_index, clearIndex, i; + int index, i; volatile void **q_ptr; - ompi_cb_fifo_ctl_t *h_ptr, *t_ptr; - void *read_from_tail = (void *)OMPI_CB_ERROR; + ompi_cb_fifo_ctl_t *t_ptr; + void *read_from_tail; *queue_empty=false; - h_ptr=fifo->head; t_ptr=fifo->tail; - q_ptr=fifo->queue; - old_fifo_index = t_ptr->fifo_index; + q_ptr=fifo->recv_queue; + index = t_ptr->fifo_index; + read_from_tail = (void *)q_ptr[index]; + opal_atomic_rmb(); /* check to see that the data is valid */ - if ((q_ptr[old_fifo_index] == OMPI_CB_FREE) || - (q_ptr[old_fifo_index] == OMPI_CB_RESERVED)) { - read_from_tail=(void *)OMPI_CB_FREE; - goto CLEANUP; + if ((read_from_tail == OMPI_CB_FREE) || + (read_from_tail == OMPI_CB_RESERVED)) { + return (void*)OMPI_CB_FREE; } - /* set return data */ - read_from_tail = (void *)q_ptr[old_fifo_index]; - opal_atomic_rmb(); + /* increment counter for later lazy free */ t_ptr->num_to_clear++; - /* increment counter for later lazy free */ - (t_ptr->fifo_index)++; - (t_ptr->fifo_index) &= fifo->mask; + t_ptr->fifo_index = (index + 1) & fifo->mask; /* check to see if time to do a lazy free of queue slots */ if ( (t_ptr->num_to_clear == fifo->lazy_free_frequency) || flush_entries_read ) { - clearIndex = old_fifo_index - t_ptr->num_to_clear + 1; - clearIndex &= fifo->mask; + ompi_cb_fifo_ctl_t *h_ptr = fifo->recv_head; + index = (index - t_ptr->num_to_clear + 1) & fifo->mask; for (i = 0; i < t_ptr->num_to_clear; i++) { - q_ptr[clearIndex] = OMPI_CB_FREE; - clearIndex++; - clearIndex &= fifo->mask; + q_ptr[index] = OMPI_CB_FREE; + index = (index + 1) & fifo->mask; } + opal_atomic_wmb(); t_ptr->num_to_clear = 0; /* check to see if queue is empty */ @@ -486,7 +359,6 @@ static inline void *ompi_cb_fifo_read_from_tail_same_base_addr( } } -CLEANUP: return read_from_tail; } diff --git a/ompi/class/ompi_fifo.h b/ompi/class/ompi_fifo.h index f0a35bacdc..b864bf18e0 100644 --- a/ompi/class/ompi_fifo.h +++ b/ompi/class/ompi_fifo.h @@ -179,56 +179,57 @@ * extra queue information not needed by the ompi_cb_fifo routines. */ struct ompi_cb_fifo_wrapper_t { - /* pointer to ompi_cb_fifo_ctl_t structure in use */ ompi_cb_fifo_t cb_fifo; /* pointer to next ompi_cb_fifo_ctl_t structure. This is always stored as an absolute address. */ - volatile struct ompi_cb_fifo_wrapper_t *next_fifo_wrapper; + struct ompi_cb_fifo_wrapper_t *next_fifo_wrapper; /* flag indicating if cb_fifo has over flown - need this to force * release of entries already read */ volatile bool cb_overflow; - }; typedef struct ompi_cb_fifo_wrapper_t ompi_cb_fifo_wrapper_t; /* data structure used to describe the fifo */ struct ompi_fifo_t { - - /* pointer to head (write) ompi_cb_fifo_t structure. This is - always stored as an absolute address. */ - volatile ompi_cb_fifo_wrapper_t *head; - - /* pointer to tail (read) ompi_cb_fifo_t structure. This is - always stored as an absolute address. */ - volatile ompi_cb_fifo_wrapper_t *tail; - - /* locks for thread synchronization */ - opal_atomic_lock_t head_lock; - opal_atomic_lock_t tail_lock; - /* locks for multi-process synchronization */ opal_atomic_lock_t fifo_lock; + + /* locks for thread synchronization */ + opal_atomic_lock_t *head_lock; + + /* locks for thread synchronization */ + opal_atomic_lock_t *tail_lock; + + /* size of fifo */ + int size; + + /* fifo memory locality index */ + int fifo_memory_locality_index; + + /* head memory locality index */ + int head_memory_locality_index; + + /* tail memory locality index */ + int tail_memory_locality_index; + + /* offset between sender and receiver shared mapping */ + ptrdiff_t offset; + + /* pointer to head (write) ompi_cb_fifo_t structure. This is + always stored as an sender size address. */ + ompi_cb_fifo_wrapper_t *head; + + /* pointer to tail (read) ompi_cb_fifo_t structure. This is + always stored as an receiver size address. */ + ompi_cb_fifo_wrapper_t *tail; }; typedef struct ompi_fifo_t ompi_fifo_t; -/* - * structure used to track which circular buffer slot to write to - */ -struct cb_slot_t { - /* pointer to circular buffer fifo structures */ - ompi_cb_fifo_t *cb; - - /* index in circular buffer */ - int index; -}; - -typedef struct cb_slot_t cb_slot_t; - /** * Initialize a fifo * @@ -254,27 +255,34 @@ typedef struct cb_slot_t cb_slot_t; * @returncode Error code * */ -static inline int ompi_fifo_init_same_base_addr(int size_of_cb_fifo, +static inline int ompi_fifo_init(int size_of_cb_fifo, int lazy_free_freq, int fifo_memory_locality_index, int head_memory_locality_index, int tail_memory_locality_index, - ompi_fifo_t *fifo, mca_mpool_base_module_t *memory_allocator) + ompi_fifo_t *fifo, ptrdiff_t offset, + mca_mpool_base_module_t *memory_allocator) { - int error_code=OMPI_SUCCESS; - size_t len_to_allocate; + int error_code; - /* allocate head ompi_cb_fifo_t structure */ - len_to_allocate=sizeof(ompi_cb_fifo_wrapper_t); - fifo->head = (ompi_cb_fifo_wrapper_t*)memory_allocator->mpool_alloc(memory_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); - if ( NULL == fifo->head) { + fifo->offset = offset; + fifo->size = size_of_cb_fifo; + fifo->fifo_memory_locality_index = fifo_memory_locality_index; + fifo->head_memory_locality_index = head_memory_locality_index; + fifo->tail_memory_locality_index = tail_memory_locality_index; + + /* allocate head ompi_cb_fifo_t structure and place for head and tail locks + * on different cache lines */ + fifo->head = (ompi_cb_fifo_wrapper_t*)memory_allocator->mpool_alloc( + memory_allocator, sizeof(ompi_cb_fifo_wrapper_t), CACHE_LINE_SIZE, + 0, NULL); + if(NULL == fifo->head) { return OMPI_ERR_OUT_OF_RESOURCE; } /* initialize the circular buffer fifo head structure */ - error_code=ompi_cb_fifo_init_same_base_addr(size_of_cb_fifo, + error_code=ompi_cb_fifo_init(size_of_cb_fifo, lazy_free_freq, fifo_memory_locality_index, head_memory_locality_index, tail_memory_locality_index, - (ompi_cb_fifo_t *)&(fifo->head->cb_fifo), - memory_allocator); + &(fifo->head->cb_fifo), offset, memory_allocator); if ( OMPI_SUCCESS != error_code ) { return error_code; } @@ -285,7 +293,7 @@ static inline int ompi_fifo_init_same_base_addr(int size_of_cb_fifo, fifo->head->cb_overflow=false; /* no attempt to overflow the queue */ /* set the tail */ - fifo->tail=fifo->head; + fifo->tail = (ompi_cb_fifo_wrapper_t*)((char*)fifo->head - offset); /* return */ return error_code; @@ -301,20 +309,18 @@ static inline int ompi_fifo_init_same_base_addr(int size_of_cb_fifo, * @returncode Slot index to which data is written * */ -static inline int ompi_fifo_write_to_head_same_base_addr(void *data, +static inline int ompi_fifo_write_to_head(void *data, ompi_fifo_t *fifo, mca_mpool_base_module_t *fifo_allocator) { int error_code; - size_t len_to_allocate; ompi_cb_fifo_wrapper_t *next_ff; /* attempt to write data to head ompi_fifo_cb_fifo_t */ - error_code=ompi_cb_fifo_write_to_head_same_base_addr(data, - (ompi_cb_fifo_t *)&(fifo->head->cb_fifo)); + error_code = ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo); /* If the queue is full, create a new circular buffer and put the data in it. */ - if( OMPI_CB_ERROR == error_code ) { + if(OMPI_CB_ERROR == error_code) { /* NOTE: This is the lock described in the top-level comment in this file. There are corresponding uses of this lock in both of the read routines. We need to protect this whole @@ -324,16 +330,15 @@ static inline int ompi_fifo_write_to_head_same_base_addr(void *data, likely that we can get rid of this lock altogther, but it will take some refactoring to make the data updates safe. */ - opal_atomic_lock(&(fifo->fifo_lock)); + opal_atomic_lock(&fifo->fifo_lock); /* mark queue as overflown */ - fifo->head->cb_overflow=true; + fifo->head->cb_overflow = true; /* We retry to write to the old head before creating new one just in * case consumer read all entries after first attempt failed, but * before we set cb_overflow to true */ - error_code=ompi_cb_fifo_write_to_head_same_base_addr(data, - (ompi_cb_fifo_t *)&(fifo->head->cb_fifo)); + error_code=ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo); if(error_code != OMPI_CB_ERROR) { fifo->head->cb_overflow = false; @@ -343,46 +348,44 @@ static inline int ompi_fifo_write_to_head_same_base_addr(void *data, /* see if next queue is available - while the next queue * has not been emptied, it will be marked as overflowen*/ - next_ff=(ompi_cb_fifo_wrapper_t *)fifo->head->next_fifo_wrapper; + next_ff = fifo->head->next_fifo_wrapper; /* if next queue not available, allocate new queue */ if (next_ff->cb_overflow) { /* allocate head ompi_cb_fifo_t structure */ - len_to_allocate=sizeof(ompi_cb_fifo_wrapper_t); - next_ff = (ompi_cb_fifo_wrapper_t*)fifo_allocator->mpool_alloc - (fifo_allocator, len_to_allocate,CACHE_LINE_SIZE, 0, NULL); - if ( NULL == next_ff) { - opal_atomic_unlock(&(fifo->fifo_lock)); + next_ff = (ompi_cb_fifo_wrapper_t*)fifo_allocator->mpool_alloc( + fifo_allocator, sizeof(ompi_cb_fifo_wrapper_t), + CACHE_LINE_SIZE, 0, NULL); + if (NULL == next_ff) { + opal_atomic_unlock(&fifo->fifo_lock); return OMPI_ERR_OUT_OF_RESOURCE; } /* initialize the circular buffer fifo head structure */ - error_code=ompi_cb_fifo_init_same_base_addr( - fifo->head->cb_fifo.size, + error_code = ompi_cb_fifo_init(fifo->size, fifo->head->cb_fifo.lazy_free_frequency, - fifo->head->cb_fifo.fifo_memory_locality_index, - fifo->head->cb_fifo.head_memory_locality_index, - fifo->head->cb_fifo.tail_memory_locality_index, - &(next_ff->cb_fifo), - fifo_allocator); - if ( OMPI_SUCCESS != error_code ) { - opal_atomic_unlock(&(fifo->fifo_lock)); + fifo->fifo_memory_locality_index, + fifo->head_memory_locality_index, + fifo->tail_memory_locality_index, + &(next_ff->cb_fifo), fifo->offset, fifo_allocator); + if (OMPI_SUCCESS != error_code) { + opal_atomic_unlock(&fifo->fifo_lock); return error_code; } /* finish new element initialization */ - next_ff->next_fifo_wrapper=fifo->head->next_fifo_wrapper; /* only one element in the link list */ - next_ff->cb_overflow=false; /* no attempt to overflow the queue */ - fifo->head->next_fifo_wrapper=next_ff; + /* only one element in the link list */ + next_ff->next_fifo_wrapper = fifo->head->next_fifo_wrapper; + next_ff->cb_overflow = false; /* no attempt to overflow the queue */ + fifo->head->next_fifo_wrapper = next_ff; } /* reset head pointer */ - fifo->head=next_ff; - opal_atomic_unlock(&(fifo->fifo_lock)); + fifo->head = next_ff; + opal_atomic_unlock(&fifo->fifo_lock); /* write data to new head structure */ - error_code=ompi_cb_fifo_write_to_head_same_base_addr(data, - (ompi_cb_fifo_t *)&(fifo->head->cb_fifo)); + error_code=ompi_cb_fifo_write_to_head(data, &fifo->head->cb_fifo); if( OMPI_CB_ERROR == error_code ) { return OMPI_ERROR; } @@ -401,29 +404,26 @@ static inline int ompi_fifo_write_to_head_same_base_addr(void *data, * */ static inline -void *ompi_fifo_read_from_tail_same_base_addr( ompi_fifo_t *fifo) +void *ompi_fifo_read_from_tail(ompi_fifo_t *fifo) { /* local parameters */ void *return_value; - bool queue_empty, flush_entries_read; - ompi_cb_fifo_t *cb_fifo; + bool queue_empty; /* get next element */ - cb_fifo=(ompi_cb_fifo_t *)&(fifo->tail->cb_fifo); - flush_entries_read=fifo->tail->cb_overflow; - return_value = ompi_cb_fifo_read_from_tail_same_base_addr( cb_fifo, - flush_entries_read, - &queue_empty); + return_value = ompi_cb_fifo_read_from_tail(&fifo->tail->cb_fifo, + fifo->tail->cb_overflow, &queue_empty); /* check to see if need to move on to next cb_fifo in the link list */ - if( queue_empty ) { + if(queue_empty) { /* queue_emptied - move on to next element in fifo */ /* See the big comment at the top of this file about this lock. */ opal_atomic_lock(&(fifo->fifo_lock)); if(fifo->tail->cb_overflow == true) { - fifo->tail->cb_overflow=false; - fifo->tail=fifo->tail->next_fifo_wrapper; + fifo->tail->cb_overflow = false; + fifo->tail = (ompi_cb_fifo_wrapper_t*) + ((char*)fifo->tail->next_fifo_wrapper - fifo->offset); } opal_atomic_unlock(&(fifo->fifo_lock)); } @@ -431,48 +431,4 @@ void *ompi_fifo_read_from_tail_same_base_addr( ompi_fifo_t *fifo) return return_value; } -/** - * Try to read pointer from the tail of the queue, and the base - * pointer is different so we must convert. - * - * @param fifo Pointer to data structure defining this fifo (IN) - * - * @param offset Offset relative to base of the memory segement (IN) - * - * @returncode Pointer - OMPI_CB_FREE indicates no data to read - * - */ -static inline void *ompi_fifo_read_from_tail(ompi_fifo_t *fifo, - ptrdiff_t offset) -{ - /* local parameters */ - void *return_value; - bool queue_empty; - volatile ompi_cb_fifo_wrapper_t *t_ptr; - - t_ptr = (volatile ompi_cb_fifo_wrapper_t *) - (((char*) fifo->tail) + offset); - - /* get next element */ - return_value=ompi_cb_fifo_read_from_tail( - (ompi_cb_fifo_t *)&(t_ptr->cb_fifo), - t_ptr->cb_overflow, &queue_empty, offset); - - /* check to see if need to move on to next cb_fifo in the link list */ - if( queue_empty ) { - /* queue_emptied - move on to next element in fifo */ - /* See the big comment at the top of this file about this - lock. */ - opal_atomic_lock(&(fifo->fifo_lock)); - if(t_ptr->cb_overflow == true) { - t_ptr->cb_overflow = false; - fifo->tail = t_ptr->next_fifo_wrapper; - } - opal_atomic_unlock(&(fifo->fifo_lock)); - } - - /* return */ - return return_value; -} - #endif /* !_OMPI_FIFO */ diff --git a/ompi/mca/btl/sm/btl_sm.c b/ompi/mca/btl/sm/btl_sm.c index b83082efda..f80991c5e6 100644 --- a/ompi/mca/btl/sm/btl_sm.c +++ b/ompi/mca/btl/sm/btl_sm.c @@ -488,10 +488,27 @@ int mca_btl_sm_add_procs_same_base_addr( } for( j=0 ; j < n_to_allocate ; j++ ) { + char *buf; my_fifos[j].head = (ompi_cb_fifo_wrapper_t*)OMPI_CB_FREE; my_fifos[j].tail = (ompi_cb_fifo_wrapper_t*)OMPI_CB_FREE; - opal_atomic_unlock(&(my_fifos[j].head_lock)); - opal_atomic_unlock(&(my_fifos[j].tail_lock)); + if(opal_using_threads()) { + /* allocate head and tail locks on different cache lines */ + buf = (char*)mca_btl_sm_component.sm_mpool->mpool_alloc( + mca_btl_sm_component.sm_mpool, + CACHE_LINE_SIZE*2, CACHE_LINE_SIZE, 0, NULL); + if(NULL == buf) { + return_code = OMPI_ERR_OUT_OF_RESOURCE; + goto CLEANUP; + } + my_fifos[j].head_lock = (opal_atomic_lock_t*)buf; + my_fifos[j].tail_lock = (opal_atomic_lock_t*)(buf + + CACHE_LINE_SIZE); + opal_atomic_init(my_fifos[j].head_lock, OPAL_ATOMIC_UNLOCKED); + opal_atomic_init(my_fifos[j].tail_lock, OPAL_ATOMIC_UNLOCKED); + } else { + my_fifos[j].head_lock = NULL; + my_fifos[j].tail_lock = NULL; + } } fifo_tmp=(ompi_fifo_t * volatile *) ( (char *)(mca_btl_sm_component.sm_ctl_header->fifo) + diff --git a/ompi/mca/btl/sm/btl_sm_component.c b/ompi/mca/btl/sm/btl_sm_component.c index 3fcc5a11c6..19d0df073e 100644 --- a/ompi/mca/btl/sm/btl_sm_component.c +++ b/ompi/mca/btl/sm/btl_sm_component.c @@ -335,7 +335,6 @@ void mca_btl_sm_component_event_thread(opal_object_t* thread) } #endif - int mca_btl_sm_component_progress(void) { /* local variables */ @@ -361,7 +360,7 @@ int mca_btl_sm_component_progress(void) ; proc++ ) { peer_smp_rank= mca_btl_sm_component.list_smp_procs_same_base_addr[proc]; - fifo=&(mca_btl_sm_component.fifo[peer_smp_rank][my_smp_rank]); + fifo=&(mca_btl_sm_component.fifo[my_smp_rank][peer_smp_rank]); /* if fifo is not yet setup - continue - not data has been sent*/ if(OMPI_CB_FREE == fifo->tail){ @@ -370,7 +369,7 @@ int mca_btl_sm_component_progress(void) /* aquire thread lock */ if( opal_using_threads() ) { - opal_atomic_lock( &(fifo->tail_lock) ); + opal_atomic_lock(fifo->tail_lock); } /* get pointer - pass in offset to change queue pointer @@ -378,12 +377,11 @@ int mca_btl_sm_component_progress(void) * that we have the same base address as the sender, so no * translation is necessary when accessing the fifo. Hence, * we use the _same_base_addr varient. */ - hdr = (mca_btl_sm_hdr_t *) - ompi_fifo_read_from_tail_same_base_addr( fifo ); + hdr = (mca_btl_sm_hdr_t *)ompi_fifo_read_from_tail(fifo); /* release thread lock */ if( opal_using_threads() ) { - opal_atomic_unlock(&(fifo->tail_lock)); + opal_atomic_unlock(fifo->tail_lock); } if( OMPI_CB_FREE == hdr ) { @@ -445,7 +443,7 @@ int mca_btl_sm_component_progress(void) ; proc++ ) { peer_smp_rank= mca_btl_sm_component.list_smp_procs_different_base_addr[proc]; - fifo=&(mca_btl_sm_component.fifo[peer_smp_rank][my_smp_rank]); + fifo=&(mca_btl_sm_component.fifo[my_smp_rank][peer_smp_rank]); /* if fifo is not yet setup - continue - not data has been sent*/ if(OMPI_CB_FREE == fifo->tail){ @@ -454,7 +452,7 @@ int mca_btl_sm_component_progress(void) /* aquire thread lock */ if( opal_using_threads() ) { - opal_atomic_lock(&(fifo->tail_lock)); + opal_atomic_lock(fifo->tail_lock); } /* get pointer - pass in offset to change queue pointer @@ -463,19 +461,19 @@ int mca_btl_sm_component_progress(void) * translate every access into the fifo to be relevant to our * memory space. Hence, we do *not* use the _same_base_addr * variant. */ - hdr=(mca_btl_sm_hdr_t *)ompi_fifo_read_from_tail( fifo, - mca_btl_sm_component.sm_offset[peer_smp_rank]); + hdr = (mca_btl_sm_hdr_t *)ompi_fifo_read_from_tail( fifo ); + if( OMPI_CB_FREE == hdr ) { /* release thread lock */ if( opal_using_threads() ) { - opal_atomic_unlock(&(fifo->tail_lock)); + opal_atomic_unlock(fifo->tail_lock); } continue; } /* release thread lock */ if( opal_using_threads() ) { - opal_atomic_unlock(&(fifo->tail_lock)); + opal_atomic_unlock(fifo->tail_lock); } /* dispatch fragment by type */ diff --git a/ompi/mca/btl/sm/btl_sm_fifo.h b/ompi/mca/btl/sm/btl_sm_fifo.h index c202759e2c..b3915efa57 100644 --- a/ompi/mca/btl/sm/btl_sm_fifo.h +++ b/ompi/mca/btl/sm/btl_sm_fifo.h @@ -7,35 +7,36 @@ #define MCA_BTL_SM_FIFO_WRITE(endpoint_peer, my_smp_rank,peer_smp_rank,hdr,rc) \ do { \ ompi_fifo_t* fifo; \ - fifo=&(mca_btl_sm_component.fifo[my_smp_rank][peer_smp_rank]); \ + fifo=&(mca_btl_sm_component.fifo[peer_smp_rank][my_smp_rank]); \ \ /* thread lock */ \ if(opal_using_threads()) \ - opal_atomic_lock(&fifo->head_lock); \ + opal_atomic_lock(fifo->head_lock); \ if(OMPI_CB_FREE == fifo->head) { \ /* no queues have been allocated - allocate now */ \ - rc=ompi_fifo_init_same_base_addr( \ + rc=ompi_fifo_init( \ (int)mca_btl_sm_component.size_of_cb_queue, \ (int)mca_btl_sm_component.cb_lazy_free_freq, \ /* at this stage we are not doing anything with memory \ * locality */ \ 0,0,0, \ - fifo, mca_btl_sm_component.sm_mpool); \ + fifo, mca_btl_sm_component.sm_offset[peer_smp_rank], \ + mca_btl_sm_component.sm_mpool); \ if( rc != OMPI_SUCCESS ) { \ if(opal_using_threads()) \ - opal_atomic_unlock(&(fifo->head_lock)); \ + opal_atomic_unlock(fifo->head_lock); \ break; \ } \ } \ \ /* post fragment */ \ - while(ompi_fifo_write_to_head_same_base_addr(hdr, fifo, \ + while(ompi_fifo_write_to_head(hdr, fifo, \ mca_btl_sm_component.sm_mpool) != OMPI_SUCCESS) \ opal_progress(); \ MCA_BTL_SM_SIGNAL_PEER(endpoint_peer); \ rc=OMPI_SUCCESS; \ if(opal_using_threads()) \ - opal_atomic_unlock(&fifo->head_lock); \ + opal_atomic_unlock(fifo->head_lock); \ } while(0)