diff --git a/src/mca/ptl/sm/src/ptl_sm.c b/src/mca/ptl/sm/src/ptl_sm.c index 67cad7097c..3a96799842 100644 --- a/src/mca/ptl/sm/src/ptl_sm.c +++ b/src/mca/ptl/sm/src/ptl_sm.c @@ -419,7 +419,7 @@ int mca_ptl_sm_add_procs_same_base_addr( */ my_fifos=( ompi_fifo_t *) mca_ptl_sm_component.sm_mpool->mpool_alloc - (n_to_allocate*sizeof(ompi_fifo_t *), CACHE_LINE_SIZE); + (n_to_allocate*sizeof(ompi_fifo_t), CACHE_LINE_SIZE); if ( NULL == my_fifos ) { return_code=OMPI_ERR_OUT_OF_RESOURCE; goto CLEANUP; @@ -826,24 +826,21 @@ int mca_ptl_sm_send( send_fifo=&(mca_ptl_sm_component.fifo [my_local_smp_rank][peer_local_smp_rank]); - /* lock for thread safety - using atomic lock, not mutex, since - * we need shared memory access to these lock, and in some pthread - * implementation, such mutex's don't work correctly */ - if( ompi_using_threads() ) { - ompi_atomic_lock(&(send_fifo->head_lock)); - } - - if(OMPI_CB_FREE == send_fifo->head){ + /* thread lock */ + if(ompi_using_threads()) + ompi_atomic_lock(&send_fifo->head_lock); + if(OMPI_CB_FREE == send_fifo->head) { /* no queues have been allocated - allocate now */ return_status=ompi_fifo_init_same_base_addr( - mca_ptl_sm_component.size_of_cb_queue, - mca_ptl_sm_component.cb_lazy_free_freq, - /* at this stage we are not doing anything with memory - * locality */ - 0,0,0, - send_fifo, mca_ptl_sm_component.sm_mpool); + mca_ptl_sm_component.size_of_cb_queue, + mca_ptl_sm_component.cb_lazy_free_freq, + /* at this stage we are not doing anything with memory + * locality */ + 0,0,0, + send_fifo, mca_ptl_sm_component.sm_mpool); if( return_status != OMPI_SUCCESS ) { - ompi_atomic_unlock(&(send_fifo->head_lock)); + if(ompi_using_threads()) + ompi_atomic_unlock(&(send_fifo->head_lock)); return return_status; } } @@ -855,11 +852,8 @@ int mca_ptl_sm_send( MCA_PTL_SM_SIGNAL_PEER(ptl_peer); return_status=OMPI_SUCCESS; } - - /* release thread lock */ - if( ompi_using_threads() ) { - ompi_atomic_unlock(&(send_fifo->head_lock)); - } + if(ompi_using_threads()) + ompi_atomic_unlock(&send_fifo->head_lock); /* if this is the entire message - signal request is complete */ if(sendreq->req_bytes_packed == size) { @@ -963,17 +957,17 @@ int mca_ptl_sm_send_continue( peer_local_smp_rank=ptl_peer->peer_smp_rank; send_fifo=&(mca_ptl_sm_component.fifo [my_local_smp_rank][peer_local_smp_rank]); + /* since the first fragment has already been posted, * the queue has already been initialized, so no need to check */ - /* post descriptor */ /* lock for thread safety - using atomic lock, not mutex, since * we need shared memory access to these lock, and in some pthread * implementation, such mutex's don't work correctly */ - if( ompi_using_threads() ) { - ompi_atomic_lock(&(send_fifo->head_lock)); - } + if(ompi_using_threads()) + ompi_atomic_lock(&send_fifo->head_lock); + /* post descriptor */ return_status=ompi_fifo_write_to_head_same_base_addr(send_frag, send_fifo, mca_ptl_sm_component.sm_mpool); if( 0 <= return_status ) { @@ -981,10 +975,8 @@ int mca_ptl_sm_send_continue( return_status=OMPI_SUCCESS; } - /* release threa lock */ - if( ompi_using_threads() ) { - ompi_atomic_unlock(&(send_fifo->head_lock)); - } - /* return */ + /* release thread lock */ + if(ompi_using_threads()) + ompi_atomic_unlock(&send_fifo->head_lock); return return_status; } diff --git a/src/mca/ptl/sm/src/ptl_sm_send.c b/src/mca/ptl/sm/src/ptl_sm_send.c index c50f809ef9..d5a4e165f2 100644 --- a/src/mca/ptl/sm/src/ptl_sm_send.c +++ b/src/mca/ptl/sm/src/ptl_sm_send.c @@ -133,8 +133,13 @@ void mca_ptl_sm_matched( send_fifo=&(mca_ptl_sm_component.fifo [my_local_smp_rank][peer_local_smp_rank]); + + /* lock as multiple processes can attempt to init the head */ + if(ompi_using_threads()) + ompi_atomic_lock(&send_fifo->head_lock); + /* check to see if fifo is allocated */ - if(OMPI_CB_FREE == send_fifo->head){ + if(OMPI_CB_FREE == send_fifo->head) { /* no queues have been allocated - allocate now */ return_status=ompi_fifo_init_same_base_addr( mca_ptl_sm_component.size_of_cb_queue, @@ -144,7 +149,8 @@ void mca_ptl_sm_matched( 0,0,0, send_fifo, mca_ptl_sm_component.sm_mpool); if( return_status != OMPI_SUCCESS ) { - ompi_atomic_unlock(&(send_fifo->head_lock)); + if(ompi_using_threads()) + ompi_atomic_unlock(&send_fifo->head_lock); return; } } @@ -159,6 +165,9 @@ void mca_ptl_sm_matched( return_status=ompi_fifo_write_to_head_same_base_addr(sm_frag_desc, send_fifo, mca_ptl_sm_component.sm_mpool); + if(ompi_using_threads()) + ompi_atomic_unlock(&send_fifo->head_lock); + /* if can't ack, put on list for later delivery */ if( 0 > return_status ) { OMPI_THREAD_LOCK(&(mca_ptl_sm_component.sm_pending_ack_lock));