1
1

start to implement the send side.

This commit was SVN r2724.
Этот коммит содержится в:
Rich Graham 2004-09-16 15:40:24 +00:00
родитель 62f5ad9979
Коммит 0b4a7aede6
8 изменённых файлов: 214 добавлений и 30 удалений

Просмотреть файл

@ -6,6 +6,7 @@
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include "util/output.h"
#include "util/if.h"
@ -25,6 +26,9 @@
#include "util/proc_info.h"
#include "util/printf.h"
#include "mca/ptl/sm/src/ptl_sm_sendreq.h"
#include "class/ompi_fifo.h"
#include "threads/mutex.h"
#include "datatype/datatype.h"
mca_ptl_sm_t mca_ptl_sm = {
{
@ -44,9 +48,9 @@ mca_ptl_sm_t mca_ptl_sm = {
mca_ptl_sm_add_procs,
mca_ptl_sm_del_procs,
mca_ptl_sm_finalize,
mca_ptl_sm_send,
mca_ptl_sm_send, /* function */
NULL,
mca_ptl_sm_send, /* first fragment send function */
mca_ptl_sm_send, /* second and subsequent send function */
NULL, /* get function */
mca_ptl_sm_matched, /* function called after match is made */
mca_ptl_sm_send_request_init, /* initialization routine */
mca_ptl_sm_request_return
@ -232,11 +236,6 @@ int mca_ptl_sm_add_procs(
return_code=OMPI_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
mca_ptl_sm_component.sm_ctl_header->fifo=
(volatile ompi_fifo_t **)
( (char *)(mca_ptl_sm_component.sm_ctl_header->fifo)-
(char *)(mca_ptl_sm_component.sm_mpool->mpool_base()) );
/* allocate vectors of ompi_fifo_t - one per
* process - offsets will be stored */
size=n_to_allocate*sizeof(ompi_fifo_t);
@ -264,26 +263,34 @@ int mca_ptl_sm_add_procs(
}
}
/* set the fifo address to be a relative address, so that
* it can be used by other procs */
mca_ptl_sm_component.sm_ctl_header->fifo=
(volatile ompi_fifo_t **)
( (char *)(mca_ptl_sm_component.sm_ctl_header->fifo)-
(char *)(mca_ptl_sm_component.sm_mpool->mpool_base()) );
/* allow other procs to use this shared memory map */
mca_ptl_sm_component.mmap_file->map_seg->seg_inited=true;
}
/* Note: Need to make sure that proc 0 initializes control
* structures before any of the other procs can progress */
if( 0 != mca_ptl_sm_component.my_smp_rank ) {
if( 0 != mca_ptl_sm_component.my_smp_rank )
{
/* spin unitl local proc 0 initializes the segment */
while(!mca_ptl_sm_component.mmap_file->map_seg->seg_inited)
{
;
}
{ ; }
}
/* Initizlize queue data structures
* - proc with lowest local rank does this
* - all the rest of the procs block until the queues are
* initialized
* - initial queue size is zero */
/* cache the pointer to the 2d fifo array. This is a virtual
* address, whereas the address in the virtual memory segment
* is a relative address */
mca_ptl_sm_component.fifo=(ompi_fifo_t **)
( (char *)(mca_ptl_sm_component.sm_ctl_header->fifo) +
(size_t)(mca_ptl_sm_component.sm_mpool->mpool_base()) );
}
/* free local memory */
@ -341,6 +348,9 @@ void mca_ptl_sm_request_return(struct mca_ptl_base_module_t* ptl, struct mca_pml
* descriptor allocated with the send requests, otherwise obtain
* one from the free list. Initialize the fragment and foward
* on to the peer.
*
* NOTE: this routine assumes that only one sending thread will be accessing
* the send descriptor at a time.
*/
int mca_ptl_sm_send(
@ -351,7 +361,101 @@ int mca_ptl_sm_send(
size_t size,
int flags)
{
return OMPI_SUCCESS;
mca_ptl_sm_send_request_t *sm_request;
int my_local_smp_rank, peer_local_smp_rank;
int return_status=OMPI_SUCCESS;
ompi_fifo_t *send_fifo;
mca_ptl_base_header_t* hdr;
void *sm_data_ptr, *user_data_ptr;
/* cast to shared memory send descriptor */
sm_request=(mca_ptl_sm_send_request_t *)sendreq;
/* determine if send descriptor is obtained from the cache. If
* so, all the memory resource needed have been obtained */
if( !sm_request->super.req_cached) {
}
/* if needed, pack data in payload buffer */
if( 0 <= size ) {
ompi_convertor_t *convertor;
struct iovec address;
sm_data_ptr=sm_request->req_frag->buff;
user_data_ptr=sendreq->req_base.req_addr;
/* set up the shared memory iovec */
address.iov_base=sm_data_ptr;
address.iov_len=sm_request->req_frag->buff_length;
convertor = &sendreq->req_convertor;
return_status=ompi_convertor_pack(convertor,&address,1);
if( 0 > return_status ) {
return OMPI_ERROR;
}
}
/* fill in the fragment descriptor */
/* get pointer to the fragment header */
hdr = &(sm_request->req_frag->super.frag_base.frag_header);
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY -
REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendreq;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
/* update the offset within the payload */
sendreq->req_offset += size;
/*
* post the descriptor in the queue - post with the relative
* address
*/
/* see if queues are allocated */
my_local_smp_rank=ptl_peer->my_smp_rank;
peer_local_smp_rank=ptl_peer->peer_smp_rank;
send_fifo=&(mca_ptl_sm_component.fifo
[my_local_smp_rank][peer_local_smp_rank]);
if(OMPI_CB_FREE == send_fifo->head){
/* no queues have been allocated - allocate now */
return_status=ompi_fifo_init(mca_ptl_sm_component.size_of_cb_queue,
mca_ptl_sm_component.cb_lazy_free_freq,
/* at this stage we are not doing anything with memory
* locality */
0,0,0,
send_fifo, mca_ptl_sm_component.sm_mpool);
if( return_status != OMPI_SUCCESS ) {
return return_status;
}
}
/* post descriptor */
/* lock for thread safety - using atomic lock, not mutex, since
* we need shared memory access to these lock, and in some pthread
* implementation, such mutex's don't work correctly */
if( ompi_using_threads() ) {
ompi_atomic_lock(&(send_fifo->head_lock));
}
return_status=ompi_fifo_write_to_head(sm_request->req_frag_offset_from_base,
send_fifo, mca_ptl_sm_component.sm_mpool,
mca_ptl_sm_component.sm_offset);
/* release threa lock */
if( ompi_using_threads() ) {
ompi_atomic_unlock(&(send_fifo->head_lock));
}
/* return */
return return_status;
}

Просмотреть файл

@ -48,11 +48,6 @@ struct mca_ptl_sm_component_t {
char* sm_mpool_name; /**< name of shared memory pool module */
mca_mpool_base_module_t* sm_mpool; /**< shared memory pool */
void* sm_mpool_base; /**< base address of shared memory pool */
ompi_free_list_t sm_send_requests; /**< free list of sm send requests -- sendreq + sendfrag */
ompi_free_list_t sm_first_frags; /**< free list of sm first
fragments */
ompi_free_list_t sm_second_frags; /**< free list of sm second
and above fragments */
size_t first_fragment_size; /**< first fragment size */
size_t max_fragment_size; /**< maximum (second and
beyone) fragment size */
@ -64,6 +59,13 @@ struct mca_ptl_sm_component_t {
file */
mca_ptl_sm_module_resource_t *sm_ctl_header; /* control header in
shared memory */
ompi_fifo_t **fifo; /**< cached copy of the pointer to the 2D
fifo array. The address in the shared
memory segment sm_ctl_header is a relative,
but this one, in process private memory, is
a real virtual address */
size_t size_of_cb_queue; /**< size of each circular buffer queue array */
size_t cb_lazy_free_freq; /**< frequency of lazy free */
size_t sm_offset; /**< offset to be applied to shared memory
addresses */
int num_smp_procs; /**< current number of smp procs on this
@ -119,6 +121,12 @@ extern int mca_ptl_sm_component_progress(
*/
struct mca_ptl_sm_t {
mca_ptl_base_module_t super; /**< base PTL interface */
ompi_free_list_t sm_first_frags; /**< free list of sm first
fragments */
ompi_free_list_t sm_second_frags; /**< free list of sm second
and above fragments */
ompi_free_list_t sm_send_requests; /**< free list of sm send requests -- sendreq + sendfrag */
};
typedef struct mca_ptl_sm_t mca_ptl_sm_t;

20
src/mca/ptl/sm/src/ptl_sm_address.h Обычный файл
Просмотреть файл

@ -0,0 +1,20 @@
/*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PTL_ADDRESS_H
#define MCA_PTL_ADDRESS_H
/*
* macro to convert virtual address, to address relative to a base
* offset
*/
#define RELATIVE_ADDRESS(A,B) (void *) ( (char *)(A) - \
(size_t)(B) )
#endif /* !ADDRESS */

Просмотреть файл

@ -126,6 +126,23 @@ int mca_ptl_sm_component_open(void)
mca_ptl_sm_component.fragment_alignment =
mca_ptl_sm_param_register_int("fragment_alignment",
CACHE_LINE_SIZE);
mca_ptl_sm_component.size_of_cb_queue =
mca_ptl_sm_param_register_int("size_of_cb_queue", 128);
mca_ptl_sm_component.cb_lazy_free_freq =
mca_ptl_sm_param_register_int("cb_lazy_free_freq", 128);
/* make sure that queue size and lazy free frequency are consistent -
* want to make sure that slots are freed at a rate they can be
* reused, w/o allocating extra new circular buffer fifo arrays */
if( (float)(mca_ptl_sm_component.cb_lazy_free_freq) >=
0.95*(float)(mca_ptl_sm_component.size_of_cb_queue) ) {
/* upper limit */
mca_ptl_sm_component.cb_lazy_free_freq=
(int)(0.95*(float)(mca_ptl_sm_component.size_of_cb_queue));
/* lower limit */
if( 0>= mca_ptl_sm_component.cb_lazy_free_freq ) {
mca_ptl_sm_component.cb_lazy_free_freq=1;
}
}
/* default number of extra procs to allow for future growth */
mca_ptl_sm_component.sm_extra_procs =
@ -133,8 +150,8 @@ int mca_ptl_sm_component_open(void)
/* initialize objects */
OBJ_CONSTRUCT(&mca_ptl_sm_component.sm_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_ptl_sm_component.sm_send_requests, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm_component.sm_first_frags, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_send_requests, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_first_frags, ompi_free_list_t);
return OMPI_SUCCESS;
}
@ -147,8 +164,8 @@ int mca_ptl_sm_component_open(void)
int mca_ptl_sm_component_close(void)
{
OBJ_DESTRUCT(&mca_ptl_sm_component.sm_lock);
OBJ_DESTRUCT(&mca_ptl_sm_component.sm_send_requests);
OBJ_DESTRUCT(&mca_ptl_sm_component.sm_first_frags);
OBJ_DESTRUCT(&mca_ptl_sm.sm_send_requests);
OBJ_DESTRUCT(&mca_ptl_sm.sm_first_frags);
return OMPI_SUCCESS;
}
@ -185,7 +202,7 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init(
length=sizeof(mca_ptl_sm_frag_t)+mca_ptl_sm_component.fragment_alignment+
mca_ptl_sm_component.first_fragment_size;
ompi_free_list_init(&mca_ptl_sm_component.sm_first_frags, length,
ompi_free_list_init(&mca_ptl_sm.sm_first_frags, length,
OBJ_CLASS(mca_ptl_sm_frag_t),
mca_ptl_sm_component.sm_first_frag_free_list_num,
mca_ptl_sm_component.sm_first_frag_free_list_max,
@ -201,7 +218,7 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init(
length=sizeof(mca_ptl_sm_frag_t)+mca_ptl_sm_component.fragment_alignment+
mca_ptl_sm_component.max_fragment_size;
ompi_free_list_init(&mca_ptl_sm_component.sm_second_frags, length,
ompi_free_list_init(&mca_ptl_sm.sm_second_frags, length,
OBJ_CLASS(mca_ptl_sm_frag_t),
mca_ptl_sm_component.sm_second_frag_free_list_num,
mca_ptl_sm_component.sm_second_frag_free_list_max,

Просмотреть файл

@ -6,6 +6,7 @@
#include <sys/errno.h>
#include "ptl_sm.h"
#include "ptl_sm_frag.h"
#include "ptl_sm_address.h"
static void mca_ptl_sm_first_frag_construct(mca_ptl_sm_frag_t* frag);
@ -44,6 +45,9 @@ static void mca_ptl_sm_first_frag_construct(mca_ptl_sm_frag_t* frag)
mca_ptl_sm_component.fragment_alignment;
/* align */
ptr=ptr-(((size_t)ptr)%(mca_ptl_sm_component.fragment_alignment));
frag->buff=ptr;
frag->buff_offset_from_segment_base=RELATIVE_ADDRESS(
ptr,mca_ptl_sm_component.sm_mpool_base);
}
@ -71,6 +75,9 @@ static void mca_ptl_sm_second_frag_construct(mca_ptl_sm_frag_t* frag)
mca_ptl_sm_component.fragment_alignment;
/* align */
ptr=ptr-(((size_t)ptr)%(mca_ptl_sm_component.fragment_alignment));
frag->buff=ptr;
frag->buff_offset_from_segment_base=RELATIVE_ADDRESS(
ptr,mca_ptl_sm_component.sm_mpool_base);
}

Просмотреть файл

@ -30,6 +30,9 @@ struct mca_ptl_sm_frag_t {
mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */
size_t buff_length; /**< size of buffer */
void *buff; /**< pointer to buffer */
void *buff_offset_from_segment_base; /**< pointer to buffer,
relative to base of the
shared memory segment */
};
typedef struct mca_ptl_sm_frag_t mca_ptl_sm_frag_t;

Просмотреть файл

@ -8,6 +8,7 @@
#include "mca/pml/base/pml_base_sendreq.h"
#include "ptl_sm.h"
#include "ptl_sm_sendreq.h"
#include "ptl_sm_address.h"
static void mca_ptl_sm_send_request_construct(mca_ptl_sm_send_request_t*);
@ -43,8 +44,29 @@ int mca_ptl_sm_send_request_init(struct mca_ptl_base_module_t* ptl,
struct mca_pml_base_send_request_t* request)
{
mca_ptl_sm_send_request_t *sm_request;
mca_ptl_sm_t *ptl_sm;
int return_value=OMPI_SUCCESS;
/* cast to shared memory send descriptor */
sm_request=(mca_ptl_sm_send_request_t *)request;
/* cast to shared memory ptl */
ptl_sm=(mca_ptl_sm_t *)ptl;
/* get first fragment descritor from free list - the pointer
* returned is valid only in this process, since different
* processes may have different base addresses
*/
sm_request->req_frag=(mca_ptl_sm_frag_t *)ompi_list_get_first(
(void *)&(ptl_sm->sm_first_frags));
if(NULL == sm_request->req_frag){
return_value=OMPI_ERR_OUT_OF_RESOURCE;
}
sm_request->req_frag_offset_from_base=RELATIVE_ADDRESS(
sm_request->req_frag,mca_ptl_sm_component.sm_mpool_base);
return return_value;
}

Просмотреть файл

@ -27,6 +27,9 @@ struct mca_ptl_sm_send_request_t {
/* pointer to first fragment descriptor */
mca_ptl_sm_frag_t *req_frag;
/* same pointer, but relative to the base of the shared memory
segment */
mca_ptl_sm_frag_t *req_frag_offset_from_base;
};
typedef struct mca_ptl_sm_send_request_t mca_ptl_sm_send_request_t;