1
1

First cut at shared memory implementation finished - not debugged,

and not working.

This commit was SVN r3203.
Этот коммит содержится в:
Rich Graham 2004-10-18 23:24:53 +00:00
родитель 09458c7dc2
Коммит ba596cc476
6 изменённых файлов: 179 добавлений и 10 удалений

Просмотреть файл

@ -39,11 +39,11 @@ struct ompi_cb_fifo_ctl_t {
ompi_lock_t lock;
/* current queue index */
volatile unsigned int fifo_index;
volatile int fifo_index;
/* number of entries that have been used, but not invalidated. used
* for lazy resource reclamation */
volatile unsigned int num_to_clear;
volatile int num_to_clear;
};
typedef struct ompi_cb_fifo_ctl_t ompi_cb_fifo_ctl_t;

Просмотреть файл

@ -69,8 +69,8 @@ int mca_ptl_sm_add_procs(
struct mca_ptl_base_peer_t **peers,
ompi_bitmap_t* reachability)
{
int i,j,proc,return_code=OMPI_SUCCESS;
size_t size,len,my_len,n_local_procs,n_to_allocate;
int return_code=OMPI_SUCCESS;
size_t i,j,proc,size,len,my_len,n_local_procs,n_to_allocate;
mca_ptl_sm_exchange_t **sm_proc_info;
ompi_proc_t* my_proc; /* pointer to caller's proc structure */
mca_ptl_sm_t *ptl_sm;
@ -607,13 +607,97 @@ int mca_ptl_sm_send_continue(
}
/*
* A posted receive has been matched - process the fragment
* and then ack.
* A posted receive has been matched:
* - deliver data to user buffers
* - update receive request data
* - ack
*
* fragment lists are NOT manipulated.
*/
void mca_ptl_sm_matched(
mca_ptl_base_module_t* ptl,
mca_ptl_base_recv_frag_t* frag)
{
mca_pml_base_recv_request_t* recv_desc;
mca_ptl_sm_frag_t *sm_frag_desc;
mca_ptl_base_match_header_t* header;
struct iovec iov;
ompi_convertor_t frag_convertor;
ompi_proc_t *proc;
int free_after,my_local_smp_rank,peer_local_smp_rank, return_status;
unsigned int iov_count, max_data;
ompi_fifo_t *send_fifo;
char *sm_frag_desc_rel_to_base;
/* copy data from shared memory buffer to user buffer */
/* get pointer to the matched receive descriptor */
recv_desc = frag->frag_request;
sm_frag_desc = (mca_ptl_sm_frag_t *)frag;
/* copy, only if there is data to copy */
if( 0 < sm_frag_desc->super.frag_base.frag_size ) {
header = &((frag)->frag_base.frag_header.hdr_match);
/*
* Initialize convertor and use it to unpack data
*/
proc = ompi_comm_peer_lookup(recv_desc->req_base.req_comm,
recv_desc->req_base.req_peer);
/* write over converter set on the send side */
ompi_convertor_copy(proc->proc_convertor,
&frag_convertor);
ompi_convertor_init_for_recv(
&frag_convertor, /* convertor */
0, /* flags */
recv_desc->req_base.req_datatype, /* datatype */
recv_desc->req_base.req_count, /* count elements */
recv_desc->req_base.req_addr, /* users buffer */
header->hdr_frag.hdr_frag_offset, /* offset in bytes into packed buffer */
NULL ); /* dont allocate memory */
/* convert address relative to segment base to virtual address */
iov.iov_base = (void *)( (char *)sm_frag_desc->
buff_offset_from_segment_base+
mca_ptl_sm_component.sm_offset);
iov.iov_len = sm_frag_desc->super.frag_base.frag_size;
iov_count = 1;
max_data = iov.iov_len;
ompi_convertor_unpack( &frag_convertor,
&iov, &iov_count, &max_data, &free_after );
}
/* update receive request information */
frag->frag_base.frag_owner->ptl_recv_progress(
ptl,
recv_desc,
sm_frag_desc->super.frag_base.frag_size,
max_data);
/* ack - ack recycles shared memory fragment resources, so
* don't agragate */
my_local_smp_rank=mca_ptl_sm_component.my_smp_rank;
peer_local_smp_rank=sm_frag_desc->queue_index;
send_fifo=&(mca_ptl_sm_component.fifo
[my_local_smp_rank][peer_local_smp_rank]);
/* change address to be relative to offset from base of shared
* memory segment */
sm_frag_desc_rel_to_base= (char *) ( (char *)sm_frag_desc -
mca_ptl_sm_component.sm_offset );
return_status=ompi_fifo_write_to_head(
sm_frag_desc_rel_to_base,
send_fifo, mca_ptl_sm_component.sm_mpool,
mca_ptl_sm_component.sm_offset);
/* if can't ack, put on list for later delivery */
if( OMPI_SUCCESS != return_status ) {
OMPI_THREAD_LOCK(&(mca_ptl_sm.sm_pending_ack_lock));
ompi_list_append(&(mca_ptl_sm.sm_pending_ack),
(ompi_list_item_t *)sm_frag_desc);
OMPI_THREAD_UNLOCK(&(mca_ptl_sm.sm_pending_ack_lock));
}
/* return */
return;
}

Просмотреть файл

@ -43,8 +43,8 @@ struct mca_ptl_sm_component_t {
int sm_second_frag_free_list_num; /**< initial size of free lists */
int sm_second_frag_free_list_max; /**< maximum size of free lists */
int sm_second_frag_free_list_inc; /**< number of elements to alloc when growing free lists */
int sm_max_procs; /**< upper limit on the number of processes using the shared memory pool */
int sm_extra_procs; /**< number of extra procs to allow */
size_t sm_max_procs; /**< upper limit on the number of processes using the shared memory pool */
size_t sm_extra_procs; /**< number of extra procs to allow */
char* sm_mpool_name; /**< name of shared memory pool module */
mca_mpool_base_module_t* sm_mpool; /**< shared memory pool */
void* sm_mpool_base; /**< base address of shared memory pool */
@ -68,7 +68,7 @@ struct mca_ptl_sm_component_t {
size_t cb_lazy_free_freq; /**< frequency of lazy free */
size_t sm_offset; /**< offset to be applied to shared memory
addresses */
int num_smp_procs; /**< current number of smp procs on this
size_t num_smp_procs; /**< current number of smp procs on this
host */
int my_smp_rank; /**< My SMP process rank. Used for accessing
* SMP specfic data structures. */
@ -130,6 +130,9 @@ struct mca_ptl_sm_t {
ompi_free_list_t sm_first_frags_to_progress; /**< list of first
fragments that are
awaiting resources */
ompi_mutex_t sm_pending_ack_lock;
ompi_list_t sm_pending_ack; /* list of fragmnent that need to be
acked */
};
typedef struct mca_ptl_sm_t mca_ptl_sm_t;

Просмотреть файл

@ -153,6 +153,8 @@ int mca_ptl_sm_component_open(void)
OBJ_CONSTRUCT(&mca_ptl_sm.sm_send_requests, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_first_frags, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_second_frags, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_pending_ack_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_pending_ack, ompi_list_t);
return OMPI_SUCCESS;
}
@ -168,6 +170,7 @@ int mca_ptl_sm_component_close(void)
OBJ_DESTRUCT(&mca_ptl_sm.sm_send_requests);
OBJ_DESTRUCT(&mca_ptl_sm.sm_first_frags);
OBJ_DESTRUCT(&mca_ptl_sm.sm_second_frags);
OBJ_DESTRUCT(&mca_ptl_sm.sm_pending_ack);
return OMPI_SUCCESS;
}
@ -284,11 +287,15 @@ int mca_ptl_sm_component_control(int param, void* value, size_t size)
int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
{
/* local variables */
int peer_local_smp_rank, my_local_smp_rank;
int my_local_smp_rank, return_status;
unsigned int peer_local_smp_rank ;
mca_ptl_sm_frag_t *header_ptr;
ompi_fifo_t *send_fifo;
bool frag_matched;
mca_ptl_base_match_header_t *matching_header;
mca_pml_base_send_request_t *base_send_req;
ompi_list_item_t *item;
char *sm_frag_desc_rel_to_base;
my_local_smp_rank=mca_ptl_sm_component.my_smp_rank;
@ -355,16 +362,42 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
(mca_ptl_base_recv_frag_t *)header_ptr);
if( NULL != frag_matched ) {
/* deliver data, and ack */
mca_ptl_sm_matched((mca_ptl_base_module_t *)&mca_ptl_sm,
(mca_ptl_base_recv_frag_t *)header_ptr);
}
break;
case MCA_PTL_HDR_TYPE_FRAG:
/* second and beyond fragment - just need to deliver
* the data, and ack */
mca_ptl_sm_matched((mca_ptl_base_module_t *)&mca_ptl_sm,
(mca_ptl_base_recv_frag_t *)header_ptr);
break;
case MCA_PTL_HDR_TYPE_ACK:
/* ack */
/* update the send statistics */
/* NOTE !!! : need to change the update stats,
* so that MPI_Wait/Test on the send can complete
* as soon as the data is copied intially into
* the shared memory buffers */
base_send_req=header_ptr->super.frag_base.frag_header.
hdr_frag.hdr_src_ptr.pval;
((mca_ptl_base_recv_frag_t *)header_ptr)->
frag_base.frag_owner->ptl_send_progress(
(mca_ptl_base_module_t *)&mca_ptl_sm,
base_send_req,
header_ptr->super.frag_base.frag_size);
/* if this is not the first fragment, recycle
* resources. The first fragment is handled by
* the PML */
if( 0 < header_ptr->super.frag_base.frag_header.
hdr_frag.hdr_frag_offset ) {
OMPI_FREE_LIST_RETURN(&mca_ptl_sm.sm_second_frags,
(ompi_list_item_t *)header_ptr);
}
break;
default:
@ -373,6 +406,47 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
} /* end peer_local_smp_rank loop */
/* progress acks */
if( !ompi_list_is_empty(&(mca_ptl_sm.sm_pending_ack)) ) {
OMPI_THREAD_LOCK(&(mca_ptl_sm.sm_pending_ack_lock));
/* remove ack from list - need to remove from list before
* sending the ack, so that when the ack is recieved,
* manipulated, and put on a new list, it is not also
* on a different list */
item = ompi_list_get_first(&(mca_ptl_sm.sm_pending_ack));
while ( item != ompi_list_get_end(&(mca_ptl_sm.sm_pending_ack)) ) {
/* get fragment pointer */
header_ptr = (mca_ptl_sm_frag_t *)item;
/* change address to address relative to the shared memory
* segment base */
sm_frag_desc_rel_to_base= (char *) ( (char *)header_ptr -
mca_ptl_sm_component.sm_offset );
/* try and send an ack */
return_status=ompi_fifo_write_to_head( sm_frag_desc_rel_to_base,
send_fifo,
mca_ptl_sm_component.sm_mpool,
mca_ptl_sm_component.sm_offset);
/* if ack failed, break */
if( OMPI_SUCCESS != return_status ) {
/* put the descriptor back on the list */
ompi_list_prepend(&(mca_ptl_sm.sm_pending_ack),item);
break;
}
/* get next fragment to ack */
item = ompi_list_get_first(&(mca_ptl_sm.sm_pending_ack));
}
OMPI_THREAD_UNLOCK(&(mca_ptl_sm.sm_pending_ack_lock));
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -39,6 +39,9 @@ static void mca_ptl_sm_first_frag_construct(mca_ptl_sm_frag_t* frag)
/* set the buffer length */
frag->buff_length=(size_t)mca_ptl_sm_component.first_fragment_size;
/* set local rank */
frag->queue_index=mca_ptl_sm_component.my_smp_rank;
/* set buffer pointer */
ptr=((char *)frag)+sizeof(mca_ptl_sm_frag_t)+
@ -70,6 +73,9 @@ static void mca_ptl_sm_second_frag_construct(mca_ptl_sm_frag_t* frag)
/* set the buffer length */
frag->buff_length=(size_t)mca_ptl_sm_component.max_fragment_size;
/* set local rank */
frag->queue_index=mca_ptl_sm_component.my_smp_rank;
/* set buffer pointer */
ptr=((char *)frag)+sizeof(mca_ptl_sm_frag_t)+
mca_ptl_sm_component.fragment_alignment;

Просмотреть файл

@ -29,6 +29,8 @@ OBJ_CLASS_DECLARATION(mca_ptl_sm_second_frag_t);
struct mca_ptl_sm_frag_t {
mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */
size_t buff_length; /**< size of buffer */
int queue_index; /**< local process index, cached for fast
acking */
void *buff; /**< pointer to buffer */
void *buff_offset_from_segment_base; /**< pointer to buffer,
relative to base of the