diff --git a/src/class/ompi_circular_buffer_fifo.h b/src/class/ompi_circular_buffer_fifo.h index 89e25c46ef..7da8df5c2e 100644 --- a/src/class/ompi_circular_buffer_fifo.h +++ b/src/class/ompi_circular_buffer_fifo.h @@ -39,11 +39,11 @@ struct ompi_cb_fifo_ctl_t { ompi_lock_t lock; /* current queue index */ - volatile unsigned int fifo_index; + volatile int fifo_index; /* number of entries that have been used, but not invalidated. used * for lazy resource reclamation */ - volatile unsigned int num_to_clear; + volatile int num_to_clear; }; typedef struct ompi_cb_fifo_ctl_t ompi_cb_fifo_ctl_t; diff --git a/src/mca/ptl/sm/src/ptl_sm.c b/src/mca/ptl/sm/src/ptl_sm.c index de3a8aa6c3..d4c6980501 100644 --- a/src/mca/ptl/sm/src/ptl_sm.c +++ b/src/mca/ptl/sm/src/ptl_sm.c @@ -69,8 +69,8 @@ int mca_ptl_sm_add_procs( struct mca_ptl_base_peer_t **peers, ompi_bitmap_t* reachability) { - int i,j,proc,return_code=OMPI_SUCCESS; - size_t size,len,my_len,n_local_procs,n_to_allocate; + int return_code=OMPI_SUCCESS; + size_t i,j,proc,size,len,my_len,n_local_procs,n_to_allocate; mca_ptl_sm_exchange_t **sm_proc_info; ompi_proc_t* my_proc; /* pointer to caller's proc structure */ mca_ptl_sm_t *ptl_sm; @@ -607,13 +607,97 @@ int mca_ptl_sm_send_continue( } /* - * A posted receive has been matched - process the fragment - * and then ack. + * A posted receive has been matched: + * - deliver data to user buffers + * - update receive request data + * - ack + * + * fragment lists are NOT manipulated. */ void mca_ptl_sm_matched( mca_ptl_base_module_t* ptl, mca_ptl_base_recv_frag_t* frag) { + mca_pml_base_recv_request_t* recv_desc; + mca_ptl_sm_frag_t *sm_frag_desc; + mca_ptl_base_match_header_t* header; + struct iovec iov; + ompi_convertor_t frag_convertor; + ompi_proc_t *proc; + int free_after,my_local_smp_rank,peer_local_smp_rank, return_status; + unsigned int iov_count, max_data; + ompi_fifo_t *send_fifo; + char *sm_frag_desc_rel_to_base; + + /* copy data from shared memory buffer to user buffer */ + /* get pointer to the matched receive descriptor */ + recv_desc = frag->frag_request; + sm_frag_desc = (mca_ptl_sm_frag_t *)frag; + + /* copy, only if there is data to copy */ + if( 0 < sm_frag_desc->super.frag_base.frag_size ) { + header = &((frag)->frag_base.frag_header.hdr_match); + + /* + * Initialize convertor and use it to unpack data + */ + proc = ompi_comm_peer_lookup(recv_desc->req_base.req_comm, + recv_desc->req_base.req_peer); + /* write over converter set on the send side */ + ompi_convertor_copy(proc->proc_convertor, + &frag_convertor); + ompi_convertor_init_for_recv( + &frag_convertor, /* convertor */ + 0, /* flags */ + recv_desc->req_base.req_datatype, /* datatype */ + recv_desc->req_base.req_count, /* count elements */ + recv_desc->req_base.req_addr, /* users buffer */ + header->hdr_frag.hdr_frag_offset, /* offset in bytes into packed buffer */ + NULL ); /* dont allocate memory */ + + /* convert address relative to segment base to virtual address */ + iov.iov_base = (void *)( (char *)sm_frag_desc-> + buff_offset_from_segment_base+ + mca_ptl_sm_component.sm_offset); + iov.iov_len = sm_frag_desc->super.frag_base.frag_size; + iov_count = 1; + max_data = iov.iov_len; + ompi_convertor_unpack( &frag_convertor, + &iov, &iov_count, &max_data, &free_after ); + } + + /* update receive request information */ + frag->frag_base.frag_owner->ptl_recv_progress( + ptl, + recv_desc, + sm_frag_desc->super.frag_base.frag_size, + max_data); + + /* ack - ack recycles shared memory fragment resources, so + * don't agragate */ + my_local_smp_rank=mca_ptl_sm_component.my_smp_rank; + peer_local_smp_rank=sm_frag_desc->queue_index; + send_fifo=&(mca_ptl_sm_component.fifo + [my_local_smp_rank][peer_local_smp_rank]); + /* change address to be relative to offset from base of shared + * memory segment */ + sm_frag_desc_rel_to_base= (char *) ( (char *)sm_frag_desc - + mca_ptl_sm_component.sm_offset ); + return_status=ompi_fifo_write_to_head( + sm_frag_desc_rel_to_base, + send_fifo, mca_ptl_sm_component.sm_mpool, + mca_ptl_sm_component.sm_offset); + + /* if can't ack, put on list for later delivery */ + if( OMPI_SUCCESS != return_status ) { + OMPI_THREAD_LOCK(&(mca_ptl_sm.sm_pending_ack_lock)); + ompi_list_append(&(mca_ptl_sm.sm_pending_ack), + (ompi_list_item_t *)sm_frag_desc); + OMPI_THREAD_UNLOCK(&(mca_ptl_sm.sm_pending_ack_lock)); + } + + /* return */ + return; } diff --git a/src/mca/ptl/sm/src/ptl_sm.h b/src/mca/ptl/sm/src/ptl_sm.h index dee41b6a89..3e939c1303 100644 --- a/src/mca/ptl/sm/src/ptl_sm.h +++ b/src/mca/ptl/sm/src/ptl_sm.h @@ -43,8 +43,8 @@ struct mca_ptl_sm_component_t { int sm_second_frag_free_list_num; /**< initial size of free lists */ int sm_second_frag_free_list_max; /**< maximum size of free lists */ int sm_second_frag_free_list_inc; /**< number of elements to alloc when growing free lists */ - int sm_max_procs; /**< upper limit on the number of processes using the shared memory pool */ - int sm_extra_procs; /**< number of extra procs to allow */ + size_t sm_max_procs; /**< upper limit on the number of processes using the shared memory pool */ + size_t sm_extra_procs; /**< number of extra procs to allow */ char* sm_mpool_name; /**< name of shared memory pool module */ mca_mpool_base_module_t* sm_mpool; /**< shared memory pool */ void* sm_mpool_base; /**< base address of shared memory pool */ @@ -68,7 +68,7 @@ struct mca_ptl_sm_component_t { size_t cb_lazy_free_freq; /**< frequency of lazy free */ size_t sm_offset; /**< offset to be applied to shared memory addresses */ - int num_smp_procs; /**< current number of smp procs on this + size_t num_smp_procs; /**< current number of smp procs on this host */ int my_smp_rank; /**< My SMP process rank. Used for accessing * SMP specfic data structures. */ @@ -130,6 +130,9 @@ struct mca_ptl_sm_t { ompi_free_list_t sm_first_frags_to_progress; /**< list of first fragments that are awaiting resources */ + ompi_mutex_t sm_pending_ack_lock; + ompi_list_t sm_pending_ack; /* list of fragmnent that need to be + acked */ }; typedef struct mca_ptl_sm_t mca_ptl_sm_t; diff --git a/src/mca/ptl/sm/src/ptl_sm_component.c b/src/mca/ptl/sm/src/ptl_sm_component.c index 5e08a3fcf4..164e6da678 100644 --- a/src/mca/ptl/sm/src/ptl_sm_component.c +++ b/src/mca/ptl/sm/src/ptl_sm_component.c @@ -153,6 +153,8 @@ int mca_ptl_sm_component_open(void) OBJ_CONSTRUCT(&mca_ptl_sm.sm_send_requests, ompi_free_list_t); OBJ_CONSTRUCT(&mca_ptl_sm.sm_first_frags, ompi_free_list_t); OBJ_CONSTRUCT(&mca_ptl_sm.sm_second_frags, ompi_free_list_t); + OBJ_CONSTRUCT(&mca_ptl_sm.sm_pending_ack_lock, ompi_mutex_t); + OBJ_CONSTRUCT(&mca_ptl_sm.sm_pending_ack, ompi_list_t); return OMPI_SUCCESS; } @@ -168,6 +170,7 @@ int mca_ptl_sm_component_close(void) OBJ_DESTRUCT(&mca_ptl_sm.sm_send_requests); OBJ_DESTRUCT(&mca_ptl_sm.sm_first_frags); OBJ_DESTRUCT(&mca_ptl_sm.sm_second_frags); + OBJ_DESTRUCT(&mca_ptl_sm.sm_pending_ack); return OMPI_SUCCESS; } @@ -284,11 +287,15 @@ int mca_ptl_sm_component_control(int param, void* value, size_t size) int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) { /* local variables */ - int peer_local_smp_rank, my_local_smp_rank; + int my_local_smp_rank, return_status; + unsigned int peer_local_smp_rank ; mca_ptl_sm_frag_t *header_ptr; ompi_fifo_t *send_fifo; bool frag_matched; mca_ptl_base_match_header_t *matching_header; + mca_pml_base_send_request_t *base_send_req; + ompi_list_item_t *item; + char *sm_frag_desc_rel_to_base; my_local_smp_rank=mca_ptl_sm_component.my_smp_rank; @@ -355,16 +362,42 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) (mca_ptl_base_recv_frag_t *)header_ptr); if( NULL != frag_matched ) { /* deliver data, and ack */ + mca_ptl_sm_matched((mca_ptl_base_module_t *)&mca_ptl_sm, + (mca_ptl_base_recv_frag_t *)header_ptr); + } break; case MCA_PTL_HDR_TYPE_FRAG: /* second and beyond fragment - just need to deliver * the data, and ack */ + mca_ptl_sm_matched((mca_ptl_base_module_t *)&mca_ptl_sm, + (mca_ptl_base_recv_frag_t *)header_ptr); break; case MCA_PTL_HDR_TYPE_ACK: /* ack */ + /* update the send statistics */ + /* NOTE !!! : need to change the update stats, + * so that MPI_Wait/Test on the send can complete + * as soon as the data is copied intially into + * the shared memory buffers */ + base_send_req=header_ptr->super.frag_base.frag_header. + hdr_frag.hdr_src_ptr.pval; + ((mca_ptl_base_recv_frag_t *)header_ptr)-> + frag_base.frag_owner->ptl_send_progress( + (mca_ptl_base_module_t *)&mca_ptl_sm, + base_send_req, + header_ptr->super.frag_base.frag_size); + + /* if this is not the first fragment, recycle + * resources. The first fragment is handled by + * the PML */ + if( 0 < header_ptr->super.frag_base.frag_header. + hdr_frag.hdr_frag_offset ) { + OMPI_FREE_LIST_RETURN(&mca_ptl_sm.sm_second_frags, + (ompi_list_item_t *)header_ptr); + } break; default: @@ -373,6 +406,47 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp) } /* end peer_local_smp_rank loop */ + /* progress acks */ + if( !ompi_list_is_empty(&(mca_ptl_sm.sm_pending_ack)) ) { + + OMPI_THREAD_LOCK(&(mca_ptl_sm.sm_pending_ack_lock)); + + /* remove ack from list - need to remove from list before + * sending the ack, so that when the ack is recieved, + * manipulated, and put on a new list, it is not also + * on a different list */ + item = ompi_list_get_first(&(mca_ptl_sm.sm_pending_ack)); + while ( item != ompi_list_get_end(&(mca_ptl_sm.sm_pending_ack)) ) { + + /* get fragment pointer */ + header_ptr = (mca_ptl_sm_frag_t *)item; + + /* change address to address relative to the shared memory + * segment base */ + sm_frag_desc_rel_to_base= (char *) ( (char *)header_ptr - + mca_ptl_sm_component.sm_offset ); + + /* try and send an ack */ + return_status=ompi_fifo_write_to_head( sm_frag_desc_rel_to_base, + send_fifo, + mca_ptl_sm_component.sm_mpool, + mca_ptl_sm_component.sm_offset); + + /* if ack failed, break */ + if( OMPI_SUCCESS != return_status ) { + /* put the descriptor back on the list */ + ompi_list_prepend(&(mca_ptl_sm.sm_pending_ack),item); + break; + } + + /* get next fragment to ack */ + item = ompi_list_get_first(&(mca_ptl_sm.sm_pending_ack)); + + } + + OMPI_THREAD_UNLOCK(&(mca_ptl_sm.sm_pending_ack_lock)); + } + return OMPI_SUCCESS; } diff --git a/src/mca/ptl/sm/src/ptl_sm_frag.c b/src/mca/ptl/sm/src/ptl_sm_frag.c index f45e016f8d..b759883379 100644 --- a/src/mca/ptl/sm/src/ptl_sm_frag.c +++ b/src/mca/ptl/sm/src/ptl_sm_frag.c @@ -39,6 +39,9 @@ static void mca_ptl_sm_first_frag_construct(mca_ptl_sm_frag_t* frag) /* set the buffer length */ frag->buff_length=(size_t)mca_ptl_sm_component.first_fragment_size; + + /* set local rank */ + frag->queue_index=mca_ptl_sm_component.my_smp_rank; /* set buffer pointer */ ptr=((char *)frag)+sizeof(mca_ptl_sm_frag_t)+ @@ -70,6 +73,9 @@ static void mca_ptl_sm_second_frag_construct(mca_ptl_sm_frag_t* frag) /* set the buffer length */ frag->buff_length=(size_t)mca_ptl_sm_component.max_fragment_size; + /* set local rank */ + frag->queue_index=mca_ptl_sm_component.my_smp_rank; + /* set buffer pointer */ ptr=((char *)frag)+sizeof(mca_ptl_sm_frag_t)+ mca_ptl_sm_component.fragment_alignment; diff --git a/src/mca/ptl/sm/src/ptl_sm_frag.h b/src/mca/ptl/sm/src/ptl_sm_frag.h index f7aa270017..aa7ff2e0b8 100644 --- a/src/mca/ptl/sm/src/ptl_sm_frag.h +++ b/src/mca/ptl/sm/src/ptl_sm_frag.h @@ -29,6 +29,8 @@ OBJ_CLASS_DECLARATION(mca_ptl_sm_second_frag_t); struct mca_ptl_sm_frag_t { mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */ size_t buff_length; /**< size of buffer */ + int queue_index; /**< local process index, cached for fast + acking */ void *buff; /**< pointer to buffer */ void *buff_offset_from_segment_base; /**< pointer to buffer, relative to base of the