- correct thread locking
- allocate enough memory to hold fifo array This commit was SVN r5423.
Этот коммит содержится в:
родитель
cd76153a74
Коммит
f4c25492f3
@ -419,7 +419,7 @@ int mca_ptl_sm_add_procs_same_base_addr(
|
|||||||
*/
|
*/
|
||||||
my_fifos=( ompi_fifo_t *)
|
my_fifos=( ompi_fifo_t *)
|
||||||
mca_ptl_sm_component.sm_mpool->mpool_alloc
|
mca_ptl_sm_component.sm_mpool->mpool_alloc
|
||||||
(n_to_allocate*sizeof(ompi_fifo_t *), CACHE_LINE_SIZE);
|
(n_to_allocate*sizeof(ompi_fifo_t), CACHE_LINE_SIZE);
|
||||||
if ( NULL == my_fifos ) {
|
if ( NULL == my_fifos ) {
|
||||||
return_code=OMPI_ERR_OUT_OF_RESOURCE;
|
return_code=OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
goto CLEANUP;
|
goto CLEANUP;
|
||||||
@ -826,24 +826,21 @@ int mca_ptl_sm_send(
|
|||||||
send_fifo=&(mca_ptl_sm_component.fifo
|
send_fifo=&(mca_ptl_sm_component.fifo
|
||||||
[my_local_smp_rank][peer_local_smp_rank]);
|
[my_local_smp_rank][peer_local_smp_rank]);
|
||||||
|
|
||||||
/* lock for thread safety - using atomic lock, not mutex, since
|
/* thread lock */
|
||||||
* we need shared memory access to these lock, and in some pthread
|
if(ompi_using_threads())
|
||||||
* implementation, such mutex's don't work correctly */
|
ompi_atomic_lock(&send_fifo->head_lock);
|
||||||
if( ompi_using_threads() ) {
|
if(OMPI_CB_FREE == send_fifo->head) {
|
||||||
ompi_atomic_lock(&(send_fifo->head_lock));
|
|
||||||
}
|
|
||||||
|
|
||||||
if(OMPI_CB_FREE == send_fifo->head){
|
|
||||||
/* no queues have been allocated - allocate now */
|
/* no queues have been allocated - allocate now */
|
||||||
return_status=ompi_fifo_init_same_base_addr(
|
return_status=ompi_fifo_init_same_base_addr(
|
||||||
mca_ptl_sm_component.size_of_cb_queue,
|
mca_ptl_sm_component.size_of_cb_queue,
|
||||||
mca_ptl_sm_component.cb_lazy_free_freq,
|
mca_ptl_sm_component.cb_lazy_free_freq,
|
||||||
/* at this stage we are not doing anything with memory
|
/* at this stage we are not doing anything with memory
|
||||||
* locality */
|
* locality */
|
||||||
0,0,0,
|
0,0,0,
|
||||||
send_fifo, mca_ptl_sm_component.sm_mpool);
|
send_fifo, mca_ptl_sm_component.sm_mpool);
|
||||||
if( return_status != OMPI_SUCCESS ) {
|
if( return_status != OMPI_SUCCESS ) {
|
||||||
ompi_atomic_unlock(&(send_fifo->head_lock));
|
if(ompi_using_threads())
|
||||||
|
ompi_atomic_unlock(&(send_fifo->head_lock));
|
||||||
return return_status;
|
return return_status;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -855,11 +852,8 @@ int mca_ptl_sm_send(
|
|||||||
MCA_PTL_SM_SIGNAL_PEER(ptl_peer);
|
MCA_PTL_SM_SIGNAL_PEER(ptl_peer);
|
||||||
return_status=OMPI_SUCCESS;
|
return_status=OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
if(ompi_using_threads())
|
||||||
/* release thread lock */
|
ompi_atomic_unlock(&send_fifo->head_lock);
|
||||||
if( ompi_using_threads() ) {
|
|
||||||
ompi_atomic_unlock(&(send_fifo->head_lock));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if this is the entire message - signal request is complete */
|
/* if this is the entire message - signal request is complete */
|
||||||
if(sendreq->req_bytes_packed == size) {
|
if(sendreq->req_bytes_packed == size) {
|
||||||
@ -963,17 +957,17 @@ int mca_ptl_sm_send_continue(
|
|||||||
peer_local_smp_rank=ptl_peer->peer_smp_rank;
|
peer_local_smp_rank=ptl_peer->peer_smp_rank;
|
||||||
send_fifo=&(mca_ptl_sm_component.fifo
|
send_fifo=&(mca_ptl_sm_component.fifo
|
||||||
[my_local_smp_rank][peer_local_smp_rank]);
|
[my_local_smp_rank][peer_local_smp_rank]);
|
||||||
|
|
||||||
/* since the first fragment has already been posted,
|
/* since the first fragment has already been posted,
|
||||||
* the queue has already been initialized, so no need to check */
|
* the queue has already been initialized, so no need to check */
|
||||||
|
|
||||||
/* post descriptor */
|
|
||||||
/* lock for thread safety - using atomic lock, not mutex, since
|
/* lock for thread safety - using atomic lock, not mutex, since
|
||||||
* we need shared memory access to these lock, and in some pthread
|
* we need shared memory access to these lock, and in some pthread
|
||||||
* implementation, such mutex's don't work correctly */
|
* implementation, such mutex's don't work correctly */
|
||||||
if( ompi_using_threads() ) {
|
if(ompi_using_threads())
|
||||||
ompi_atomic_lock(&(send_fifo->head_lock));
|
ompi_atomic_lock(&send_fifo->head_lock);
|
||||||
}
|
|
||||||
|
|
||||||
|
/* post descriptor */
|
||||||
return_status=ompi_fifo_write_to_head_same_base_addr(send_frag,
|
return_status=ompi_fifo_write_to_head_same_base_addr(send_frag,
|
||||||
send_fifo, mca_ptl_sm_component.sm_mpool);
|
send_fifo, mca_ptl_sm_component.sm_mpool);
|
||||||
if( 0 <= return_status ) {
|
if( 0 <= return_status ) {
|
||||||
@ -981,10 +975,8 @@ int mca_ptl_sm_send_continue(
|
|||||||
return_status=OMPI_SUCCESS;
|
return_status=OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* release threa lock */
|
/* release thread lock */
|
||||||
if( ompi_using_threads() ) {
|
if(ompi_using_threads())
|
||||||
ompi_atomic_unlock(&(send_fifo->head_lock));
|
ompi_atomic_unlock(&send_fifo->head_lock);
|
||||||
}
|
|
||||||
/* return */
|
|
||||||
return return_status;
|
return return_status;
|
||||||
}
|
}
|
||||||
|
@ -133,8 +133,13 @@ void mca_ptl_sm_matched(
|
|||||||
|
|
||||||
send_fifo=&(mca_ptl_sm_component.fifo
|
send_fifo=&(mca_ptl_sm_component.fifo
|
||||||
[my_local_smp_rank][peer_local_smp_rank]);
|
[my_local_smp_rank][peer_local_smp_rank]);
|
||||||
|
|
||||||
|
/* lock as multiple processes can attempt to init the head */
|
||||||
|
if(ompi_using_threads())
|
||||||
|
ompi_atomic_lock(&send_fifo->head_lock);
|
||||||
|
|
||||||
/* check to see if fifo is allocated */
|
/* check to see if fifo is allocated */
|
||||||
if(OMPI_CB_FREE == send_fifo->head){
|
if(OMPI_CB_FREE == send_fifo->head) {
|
||||||
/* no queues have been allocated - allocate now */
|
/* no queues have been allocated - allocate now */
|
||||||
return_status=ompi_fifo_init_same_base_addr(
|
return_status=ompi_fifo_init_same_base_addr(
|
||||||
mca_ptl_sm_component.size_of_cb_queue,
|
mca_ptl_sm_component.size_of_cb_queue,
|
||||||
@ -144,7 +149,8 @@ void mca_ptl_sm_matched(
|
|||||||
0,0,0,
|
0,0,0,
|
||||||
send_fifo, mca_ptl_sm_component.sm_mpool);
|
send_fifo, mca_ptl_sm_component.sm_mpool);
|
||||||
if( return_status != OMPI_SUCCESS ) {
|
if( return_status != OMPI_SUCCESS ) {
|
||||||
ompi_atomic_unlock(&(send_fifo->head_lock));
|
if(ompi_using_threads())
|
||||||
|
ompi_atomic_unlock(&send_fifo->head_lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -159,6 +165,9 @@ void mca_ptl_sm_matched(
|
|||||||
return_status=ompi_fifo_write_to_head_same_base_addr(sm_frag_desc,
|
return_status=ompi_fifo_write_to_head_same_base_addr(sm_frag_desc,
|
||||||
send_fifo, mca_ptl_sm_component.sm_mpool);
|
send_fifo, mca_ptl_sm_component.sm_mpool);
|
||||||
|
|
||||||
|
if(ompi_using_threads())
|
||||||
|
ompi_atomic_unlock(&send_fifo->head_lock);
|
||||||
|
|
||||||
/* if can't ack, put on list for later delivery */
|
/* if can't ack, put on list for later delivery */
|
||||||
if( 0 > return_status ) {
|
if( 0 > return_status ) {
|
||||||
OMPI_THREAD_LOCK(&(mca_ptl_sm_component.sm_pending_ack_lock));
|
OMPI_THREAD_LOCK(&(mca_ptl_sm_component.sm_pending_ack_lock));
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user