diff --git a/ompi/class/ompi_fifo.h b/ompi/class/ompi_fifo.h index bf60b8e455..f9be54406c 100644 --- a/ompi/class/ompi_fifo.h +++ b/ompi/class/ompi_fifo.h @@ -329,6 +329,18 @@ static inline int ompi_fifo_write_to_head_same_base_addr(void *data, /* mark queue as overflown */ fifo->head->cb_overflow=true; + /* We retry to write to the old head before creating new one just in + * case consumer read all entries after first attempt failed, but + * before we set cb_overflow to true */ + error_code=ompi_cb_fifo_write_to_head_same_base_addr(data, + (ompi_cb_fifo_t *)&(fifo->head->cb_fifo)); + + if(error_code != OMPI_CB_ERROR) { + fifo->head->cb_overflow = false; + opal_atomic_unlock(&(fifo->fifo_lock)); + return error_code; + } + /* see if next queue is available - while the next queue * has not been emptied, it will be marked as overflowen*/ next_ff=(ompi_cb_fifo_wrapper_t *)fifo->head->next_fifo_wrapper; @@ -409,8 +421,10 @@ void *ompi_fifo_read_from_tail_same_base_addr( ompi_fifo_t *fifo) /* See the big comment at the top of this file about this lock. */ opal_atomic_lock(&(fifo->fifo_lock)); - fifo->tail->cb_overflow=false; - fifo->tail=fifo->tail->next_fifo_wrapper; + if(fifo->tail->cb_overflow == true) { + fifo->tail->cb_overflow=false; + fifo->tail=fifo->tail->next_fifo_wrapper; + } opal_atomic_unlock(&(fifo->fifo_lock)); } @@ -450,8 +464,10 @@ static inline void *ompi_fifo_read_from_tail(ompi_fifo_t *fifo, /* See the big comment at the top of this file about this lock. */ opal_atomic_lock(&(fifo->fifo_lock)); - t_ptr->cb_overflow = false; - fifo->tail = t_ptr->next_fifo_wrapper; + if(fifo->tail->cb_overflow == true) { + t_ptr->cb_overflow = false; + fifo->tail = t_ptr->next_fifo_wrapper; + } opal_atomic_unlock(&(fifo->fifo_lock)); }