From 826b5ebdf6f96bc9bcfe85a26fedb02ce715d6ae Mon Sep 17 00:00:00 2001 From: Rich Graham Date: Mon, 25 Oct 2004 17:22:47 +0000 Subject: [PATCH] fix numerous bugs in the shared memory implementation. Can pass a hello world test now. This commit was SVN r3313. --- src/mca/ptl/sm/src/ptl_sm.c | 74 ++++++++++----------------- src/mca/ptl/sm/src/ptl_sm_component.c | 31 +++++------ 2 files changed, 40 insertions(+), 65 deletions(-) diff --git a/src/mca/ptl/sm/src/ptl_sm.c b/src/mca/ptl/sm/src/ptl_sm.c index 8b776fd9d1..e80397279e 100644 --- a/src/mca/ptl/sm/src/ptl_sm.c +++ b/src/mca/ptl/sm/src/ptl_sm.c @@ -288,10 +288,6 @@ int mca_ptl_sm_add_procs( /* allow other procs to use this shared memory map */ mca_ptl_sm_component.mmap_file->map_seg->seg_inited=true; } - /* debug */ - fprintf(stderr," yeah !!! \n"); - fflush(stderr); - /* end debug */ /* Note: Need to make sure that proc 0 initializes control * structures before any of the other procs can progress */ @@ -301,10 +297,6 @@ int mca_ptl_sm_add_procs( while(!mca_ptl_sm_component.mmap_file->map_seg->seg_inited) { ; } } - /* debug */ - fprintf(stderr," yeah !!! II %d \n",n_to_allocate); - fflush(stderr); - /* end debug */ /* cache the pointer to the 2d fifo array. This is a virtual * address, whereas the address in the virtual memory segment @@ -315,10 +307,6 @@ int mca_ptl_sm_add_procs( return_code=OMPI_ERROR; goto CLEANUP; } - /* debug */ - fprintf(stderr," yeah !!! IV \n"); - fflush(stderr); - /* end debug */ fifo_tmp=(ompi_fifo_t **) ( (char *)(mca_ptl_sm_component.sm_ctl_header->fifo) + (size_t)(mca_ptl_sm_component.sm_mpool->mpool_base()) ); @@ -481,11 +469,6 @@ int mca_ptl_sm_send( mca_ptl_base_header_t* hdr; void *sm_data_ptr, *user_data_ptr; - /* debug */ - fprintf(stderr," ZZZZ send called %d \n", - ptl_peer->my_smp_rank); - fflush(stderr); - /* end debug */ /* cast to shared memory send descriptor */ sm_request=(mca_ptl_sm_send_request_t *)sendreq; @@ -495,10 +478,6 @@ int mca_ptl_sm_send( /* in this ptl, we will only use the cache, or fail */ return OMPI_ERR_OUT_OF_RESOURCE; } - /* debug */ - fprintf(stderr," ZZZZ II send called %d size %d\n", - ptl_peer->my_smp_rank,size); - fflush(stderr); /* if needed, pack data in payload buffer */ if( 0 <= size ) { @@ -531,10 +510,6 @@ int mca_ptl_sm_send( return OMPI_ERROR; } } - /* debug */ - fprintf(stderr," after pack \n"); - fflush(stderr); - /* end debug */ /* fill in the fragment descriptor */ /* get pointer to the fragment header */ @@ -546,8 +521,6 @@ int mca_ptl_sm_send( hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); hdr->hdr_frag.hdr_frag_seq = 0; hdr->hdr_frag.hdr_src_ptr.pval = sendreq; - hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - - REPLACE WITH MACRO */ hdr->hdr_frag.hdr_dst_ptr.lval = 0; hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; @@ -574,22 +547,12 @@ int mca_ptl_sm_send( send_fifo=&(mca_ptl_sm_component.fifo [my_local_smp_rank][peer_local_smp_rank]); - /* debug */ - fprintf(stderr," send_fifo %d %d %u \n", - my_local_smp_rank,peer_local_smp_rank,getpid()); - fflush(stderr); - /* end debug */ /* lock for thread safety - using atomic lock, not mutex, since * we need shared memory access to these lock, and in some pthread * implementation, such mutex's don't work correctly */ if( ompi_using_threads() ) { ompi_atomic_lock(&(send_fifo->head_lock)); } - /* debug */ - fprintf(stderr," send_fifo after %d %d %u \n", - my_local_smp_rank,peer_local_smp_rank,getpid()); - fflush(stderr); - /* end debug */ if(OMPI_CB_FREE == send_fifo->head){ /* no queues have been allocated - allocate now */ @@ -604,25 +567,19 @@ int mca_ptl_sm_send( return return_status; } } - /* debug */ - fprintf(stderr," sending me %d it %d \n",my_local_smp_rank, - peer_local_smp_rank); - fflush(stderr); - /* end if */ /* post descriptor */ return_status=ompi_fifo_write_to_head(sm_request->req_frag_offset_from_base, send_fifo, mca_ptl_sm_component.sm_mpool, mca_ptl_sm_component.sm_offset); + if( 0 <= return_status ) { + return_status=OMPI_SUCCESS; + } /* release threa lock */ if( ompi_using_threads() ) { ompi_atomic_unlock(&(send_fifo->head_lock)); } - /* debug */ - fprintf(stderr," done sending \n"); - fflush(stderr); - /* end debug */ /* return */ return return_status; } @@ -735,6 +692,9 @@ int mca_ptl_sm_send_continue( return_status=ompi_fifo_write_to_head(sm_request->req_frag_offset_from_base, send_fifo, mca_ptl_sm_component.sm_mpool, mca_ptl_sm_component.sm_offset); + if( 0 <= return_status ) { + return_status=OMPI_SUCCESS; + } /* release threa lock */ if( ompi_using_threads() ) { @@ -818,19 +778,39 @@ void mca_ptl_sm_matched( * don't agragate */ my_local_smp_rank=mca_ptl_sm_component.my_smp_rank; peer_local_smp_rank=sm_frag_desc->queue_index; + send_fifo=&(mca_ptl_sm_component.fifo [my_local_smp_rank][peer_local_smp_rank]); + /* check to see if fifo is allocated */ + if(OMPI_CB_FREE == send_fifo->head){ + /* no queues have been allocated - allocate now */ + return_status=ompi_fifo_init(mca_ptl_sm_component.size_of_cb_queue, + mca_ptl_sm_component.cb_lazy_free_freq, + /* at this stage we are not doing anything with memory + * locality */ + 0,0,0, + send_fifo, mca_ptl_sm_component.sm_mpool); + if( return_status != OMPI_SUCCESS ) { + ompi_atomic_unlock(&(send_fifo->head_lock)); + return return_status; + } + } + /* change address to be relative to offset from base of shared * memory segment */ sm_frag_desc_rel_to_base= (char *) ( (char *)sm_frag_desc - mca_ptl_sm_component.sm_offset ); + + /* set the fragment type to be an ack */ + sm_frag_desc->super.frag_base.frag_header.hdr_common.hdr_type= + MCA_PTL_HDR_TYPE_ACK; return_status=ompi_fifo_write_to_head( sm_frag_desc_rel_to_base, send_fifo, mca_ptl_sm_component.sm_mpool, mca_ptl_sm_component.sm_offset); /* if can't ack, put on list for later delivery */ - if( OMPI_SUCCESS != return_status ) { + if( 0 > return_status ) { OMPI_THREAD_LOCK(&(mca_ptl_sm.sm_pending_ack_lock)); ompi_list_append(&(mca_ptl_sm.sm_pending_ack), (ompi_list_item_t *)sm_frag_desc); diff --git a/src/mca/ptl/sm/src/ptl_sm_component.c b/src/mca/ptl/sm/src/ptl_sm_component.c index 3b719df77b..9b15557ecb 100644 --- a/src/mca/ptl/sm/src/ptl_sm_component.c +++ b/src/mca/ptl/sm/src/ptl_sm_component.c @@ -159,11 +159,6 @@ int mca_ptl_sm_component_open(void) OBJ_CONSTRUCT(&mca_ptl_sm.sm_pending_ack_lock, ompi_mutex_t); OBJ_CONSTRUCT(&mca_ptl_sm.sm_pending_ack, ompi_list_t); - /* debug */ - fprintf(stderr," at end of open \n"); - fflush(stderr); - /* end debug */ - return OMPI_SUCCESS; } @@ -237,10 +232,6 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init( /* set flag indicating ptl not inited */ mca_ptl_sm.ptl_inited=false; - /* debug */ - fprintf(stderr," at end of init \n"); - fflush(stderr); - /* end debug */ return ptls; } @@ -300,7 +291,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) } send_fifo=&(mca_ptl_sm_component.fifo - [my_local_smp_rank][peer_local_smp_rank]); + [peer_local_smp_rank][my_local_smp_rank]); /* if fifo is not yet setup - continue - not data has been sent*/ if(OMPI_CB_FREE == send_fifo->tail){ @@ -332,11 +323,10 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) * memory address, to a true virtual address */ header_ptr = (mca_ptl_sm_frag_t *)( (char *)header_ptr+ mca_ptl_sm_component.sm_offset); - /* debug */ - fprintf(stderr," recv :: got it %d from %d \n", - my_local_smp_rank,peer_local_smp_rank); - fflush(stderr); - /* end debug */ + + /* set the owning ptl */ + header_ptr->super.frag_base.frag_owner=(mca_ptl_base_module_t *) + (&mca_ptl_sm); /* figure out what type of message this is */ switch @@ -350,7 +340,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) frag_matched=mca_ptl_base_match_in_order_network_delivery( matching_header, (mca_ptl_base_recv_frag_t *)header_ptr); - if( NULL != frag_matched ) { + if( frag_matched ) { /* deliver data, and ack */ mca_ptl_sm_matched((mca_ptl_base_module_t *)&mca_ptl_sm, (mca_ptl_base_recv_frag_t *)header_ptr); @@ -416,14 +406,19 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) sm_frag_desc_rel_to_base= (char *) ( (char *)header_ptr - mca_ptl_sm_component.sm_offset ); - /* try and send an ack */ + /* try and send an ack - no need to check and see if a send + * queue has been allocated, since entries are put here only + * if the queue was previously full */ + + /* fragment already marked as an ack */ + return_status=ompi_fifo_write_to_head( sm_frag_desc_rel_to_base, send_fifo, mca_ptl_sm_component.sm_mpool, mca_ptl_sm_component.sm_offset); /* if ack failed, break */ - if( OMPI_SUCCESS != return_status ) { + if( 0 > return_status ) { /* put the descriptor back on the list */ ompi_list_prepend(&(mca_ptl_sm.sm_pending_ack),item); break;