1
1
This commit was SVN r3049.
Этот коммит содержится в:
Rich Graham 2004-10-11 22:56:46 +00:00
родитель 12c5be481d
Коммит e83db5af32
6 изменённых файлов: 284 добавлений и 18 удалений

Просмотреть файл

@ -366,7 +366,7 @@ static inline void *ompi_cb_fifo_read_from_tail(ompi_cb_fifo_t *fifo,
if ((q_ptr[t_ptr->fifo_index] == OMPI_CB_FREE) ||
(q_ptr[t_ptr->fifo_index] == OMPI_CB_RESERVED))
{
return (void *)OMPI_CB_ERROR;
return (void *)OMPI_CB_FREE;
}
/* set return data */

Просмотреть файл

@ -190,6 +190,8 @@ static inline int ompi_fifo_free( ompi_fifo_t *fifo,
*
* @param data Pointer value to write in specified slot (IN)
*
* @param offset Offset relative to base of the memory segement (IN)
*
* @returncode Slot index data written to
*
*/
@ -206,6 +208,8 @@ static inline int ompi_fifo_write_to_slot(cb_slot_t *slot, void* data,
*
* @param fifo Pointer to data structure defining this fifo (IN)
*
* @param offset Offset relative to base of the memory segement (IN)
*
* @returncode Slot index to which data is written
*
*/
@ -288,6 +292,8 @@ static inline int ompi_fifo_write_to_head(void *data, ompi_fifo_t
*
* @returncode Slot index to which data is written
*
* @param offset Offset relative to base of the memory segement (IN)
*
* @returncode OMPI_CB_ERROR failed to allocate index
*
*/
@ -366,11 +372,11 @@ static inline cb_slot_t ompi_fifo_get_slot(ompi_fifo_t *fifo,
/**
* Try to read pointer from the tail of the queue
*
* @param data Pointer to where data was be written (out)
*
* @param fifo Pointer to data structure defining this fifo (IN)
*
* @returncode Slot index to which data is written
* @param offset Offset relative to base of the memory segement (IN)
*
* @returncode Pointer - OMPI_CB_FREE indicates no data to read
*
*/
static inline void *ompi_fifo_read_from_tail(ompi_fifo_t *fifo, size_t

Просмотреть файл

@ -120,7 +120,7 @@ mca_mpool_sm_init(bool *allow_multi_user_threads)
if(NULL ==
(mca_common_sm_mmap =
mca_common_sm_mmap_init(mca_mpool_sm_component.sm_size,
&(file_name[0]),sizeof(mca_common_sm_segment_t), 8 )
&(file_name[0]),sizeof(mca_common_sm_mmap_t), 8 )
))
{
ompi_output(0, "mca_mpool_sm_init: unable to create shared memory mapping");

Просмотреть файл

@ -27,6 +27,7 @@
#include "util/printf.h"
#include "mca/ptl/sm/src/ptl_sm_sendreq.h"
#include "class/ompi_fifo.h"
#include "class/ompi_free_list.h"
#include "threads/mutex.h"
#include "datatype/datatype.h"
@ -49,7 +50,7 @@ mca_ptl_sm_t mca_ptl_sm = {
mca_ptl_sm_del_procs,
mca_ptl_sm_finalize,
mca_ptl_sm_send, /* first fragment send function */
mca_ptl_sm_send, /* second and subsequent send function */
mca_ptl_sm_send_continue, /* second and subsequent send function */
NULL, /* get function */
mca_ptl_sm_matched, /* function called after match is made */
mca_ptl_sm_send_request_init, /* initialization routine */
@ -344,10 +345,17 @@ void mca_ptl_sm_request_return(struct mca_ptl_base_module_t* ptl, struct mca_pml
/*
* Initiate a send. If this is the first fragment, use the fragment
* descriptor allocated with the send requests, otherwise obtain
* one from the free list. Initialize the fragment and foward
* on to the peer.
* Initiate a send. The fragment descriptor allocated with the
* send requests. If the send descriptor is NOT obtained from
* the cache, this implementation will ONLY return an error code.
* If we don't do this, then, because we rely on memory ordering
* to provide the required MPI message ordering, we would need to
* add logic to check and see if there are any other sends waiting
* on resrouces to progress and complete all of them, before the
* current one can continue. To reduce latency, and because the
* actual amount of shared memory resrouces can be set at run time,
* this ptl implementation does not do this. Initialize the
* fragment and foward on to the peer.
*
* NOTE: this routine assumes that only one sending thread will be accessing
* the send descriptor at a time.
@ -374,13 +382,25 @@ int mca_ptl_sm_send(
/* determine if send descriptor is obtained from the cache. If
* so, all the memory resource needed have been obtained */
if( !sm_request->super.req_cached) {
/* in this ptl, we will only use the cache, or fail */
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* if needed, pack data in payload buffer */
if( 0 <= size ) {
ompi_convertor_t *convertor;
unsigned int iov_count, max_data;
int free_after=0;
struct iovec address;
convertor = &sendreq->req_convertor;
ompi_convertor_copy(&sendreq->req_convertor, convertor);
ompi_convertor_init_for_send( convertor, 0,
sendreq->req_base.req_datatype,
sendreq->req_base.req_count,
sendreq->req_base.req_addr,
offset, NULL);
sm_data_ptr=sm_request->req_frag->buff;
user_data_ptr=sendreq->req_base.req_addr;
@ -389,7 +409,10 @@ int mca_ptl_sm_send(
address.iov_len=sm_request->req_frag->buff_length;
convertor = &sendreq->req_convertor;
return_status=ompi_convertor_pack(convertor,&address,1);
iov_count=1;
max_data=address.iov_len;
return_status=ompi_convertor_pack(convertor,&address,&iov_count,
&max_data, &free_after);
if( 0 > return_status ) {
return OMPI_ERROR;
}
@ -401,11 +424,12 @@ int mca_ptl_sm_send(
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendreq;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY -
REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendreq;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
@ -416,6 +440,11 @@ int mca_ptl_sm_send(
/* update the offset within the payload */
sendreq->req_offset += size;
/*
* update the fragment descriptor
*/
sm_request->req_frag->super.frag_base.frag_size=size;
/*
* post the descriptor in the queue - post with the relative
* address
@ -423,8 +452,17 @@ int mca_ptl_sm_send(
/* see if queues are allocated */
my_local_smp_rank=ptl_peer->my_smp_rank;
peer_local_smp_rank=ptl_peer->peer_smp_rank;
send_fifo=&(mca_ptl_sm_component.fifo
[my_local_smp_rank][peer_local_smp_rank]);
/* lock for thread safety - using atomic lock, not mutex, since
* we need shared memory access to these lock, and in some pthread
* implementation, such mutex's don't work correctly */
if( ompi_using_threads() ) {
ompi_atomic_lock(&(send_fifo->head_lock));
}
if(OMPI_CB_FREE == send_fifo->head){
/* no queues have been allocated - allocate now */
return_status=ompi_fifo_init(mca_ptl_sm_component.size_of_cb_queue,
@ -438,6 +476,116 @@ int mca_ptl_sm_send(
}
}
/* post descriptor */
return_status=ompi_fifo_write_to_head(sm_request->req_frag_offset_from_base,
send_fifo, mca_ptl_sm_component.sm_mpool,
mca_ptl_sm_component.sm_offset);
/* release threa lock */
if( ompi_using_threads() ) {
ompi_atomic_unlock(&(send_fifo->head_lock));
}
/* return */
return return_status;
}
/*
* Continue a send. Second fragment and beyond.
*
* NOTE: this routine assumes that only one sending thread will be accessing
* the send descriptor at a time.
*/
int mca_ptl_sm_send_continue(
struct mca_ptl_base_module_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_pml_base_send_request_t* sendreq,
size_t offset,
size_t size,
int flags)
{
mca_ptl_sm_send_request_t *sm_request;
int my_local_smp_rank, peer_local_smp_rank, return_code;
int return_status=OMPI_SUCCESS, free_after=0;
ompi_fifo_t *send_fifo;
mca_ptl_base_header_t* hdr;
void *sm_data_ptr, *user_data_ptr;
ompi_list_item_t* item;
mca_ptl_sm_second_frag_t *send_frag;
ompi_convertor_t *convertor;
struct iovec address;
unsigned int max_data,iov_count;
/* cast to shared memory send descriptor */
sm_request=(mca_ptl_sm_send_request_t *)sendreq;
/* obtain fragment descriptor and payload from free list */
OMPI_FREE_LIST_GET(&mca_ptl_sm.sm_second_frags, item, return_code);
/* if we don't get a fragment descriptor, return w/o
* updating any counters. The PML will re-issue the
* request */
if(NULL == (send_frag = (mca_ptl_sm_second_frag_t *)item)){
return return_code;
}
/* pack data in payload buffer */
convertor = &sendreq->req_convertor;
ompi_convertor_copy(&sendreq->req_convertor, convertor);
ompi_convertor_init_for_send( convertor, 0,
sendreq->req_base.req_datatype,
sendreq->req_base.req_count,
sendreq->req_base.req_addr,
offset, NULL);
sm_data_ptr=sm_request->req_frag->buff;
user_data_ptr=sendreq->req_base.req_addr;
/* set up the shared memory iovec */
address.iov_base=sm_data_ptr;
address.iov_len=sm_request->req_frag->buff_length;
convertor = &sendreq->req_convertor;
iov_count=1;
max_data=address.iov_len;
return_status=ompi_convertor_pack(convertor,&address,&iov_count,
&max_data, &free_after);
if( 0 > return_status ) {
return OMPI_ERROR;
}
/* fill in the fragment descriptor */
/* get pointer to the fragment header */
hdr = &(send_frag->super.frag_base.frag_header);
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_frag.hdr_src_ptr.pval = sendreq;
/* set the pointer to the recv descriptor - this is valid only
* at the peer. Get this value from the first fragment */
hdr->hdr_frag.hdr_dst_ptr.pval =
sm_request->req_frag->super.frag_request;
/* update the offset within the payload */
sendreq->req_offset += size;
/*
* update the fragment descriptor
*/
sm_request->req_frag->super.frag_base.frag_size=size;
/*
* post the descriptor in the queue - post with the relative
* address
*/
/* see if queues are allocated */
my_local_smp_rank=ptl_peer->my_smp_rank;
peer_local_smp_rank=ptl_peer->peer_smp_rank;
send_fifo=&(mca_ptl_sm_component.fifo
[my_local_smp_rank][peer_local_smp_rank]);
/* since the first fragment has already been posted,
* the queue has already been initialized, so no need to check */
/* post descriptor */
/* lock for thread safety - using atomic lock, not mutex, since
* we need shared memory access to these lock, and in some pthread
@ -458,16 +606,14 @@ int mca_ptl_sm_send(
return return_status;
}
/*
* A posted receive has been matched - if required send an
* ack back to the peer and process the fragment.
* A posted receive has been matched - process the fragment
* and then ack.
*/
void mca_ptl_sm_matched(
mca_ptl_base_module_t* ptl,
mca_ptl_base_recv_frag_t* frag)
{
}

Просмотреть файл

@ -127,6 +127,9 @@ struct mca_ptl_sm_t {
ompi_free_list_t sm_second_frags; /**< free list of sm second
and above fragments */
ompi_free_list_t sm_send_requests; /**< free list of sm send requests -- sendreq + sendfrag */
ompi_free_list_t sm_first_frags_to_progress; /**< list of first
fragments that are
awaiting resources */
};
typedef struct mca_ptl_sm_t mca_ptl_sm_t;
@ -236,6 +239,25 @@ extern int mca_ptl_sm_send(
int flags
);
/**
* PML->PTL send second and subsequent fragments
*
* @param ptl (IN) PTL instance
* @param ptl_base_peer (IN) PTL peer addressing
* @param send_request (IN/OUT) Send request (allocated by PML via mca_ptl_base_request_alloc_fn_t)
* @param size (IN) Number of bytes PML is requesting PTL to deliver
* @param flags (IN) Flags that should be passed to the peer via the message header.
* @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments
*/
extern int mca_ptl_sm_send_continue(
struct mca_ptl_base_module_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_pml_base_send_request_t*,
size_t offset,
size_t size,
int flags
);
/**
* Data structure used to hold information that will be exchanged with
* all other procs at startup. !!!!! This is only temporary, until the

Просмотреть файл

@ -152,6 +152,7 @@ int mca_ptl_sm_component_open(void)
OBJ_CONSTRUCT(&mca_ptl_sm_component.sm_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_send_requests, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_first_frags, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm.sm_second_frags, ompi_free_list_t);
return OMPI_SUCCESS;
}
@ -166,6 +167,7 @@ int mca_ptl_sm_component_close(void)
OBJ_DESTRUCT(&mca_ptl_sm_component.sm_lock);
OBJ_DESTRUCT(&mca_ptl_sm.sm_send_requests);
OBJ_DESTRUCT(&mca_ptl_sm.sm_first_frags);
OBJ_DESTRUCT(&mca_ptl_sm.sm_second_frags);
return OMPI_SUCCESS;
}
@ -219,7 +221,7 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init(
mca_ptl_sm_component.max_fragment_size;
ompi_free_list_init(&mca_ptl_sm.sm_second_frags, length,
OBJ_CLASS(mca_ptl_sm_frag_t),
OBJ_CLASS(mca_ptl_sm_second_frag_t),
mca_ptl_sm_component.sm_second_frag_free_list_num,
mca_ptl_sm_component.sm_second_frag_free_list_max,
mca_ptl_sm_component.sm_second_frag_free_list_inc,
@ -281,6 +283,96 @@ int mca_ptl_sm_component_control(int param, void* value, size_t size)
int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
{
/* local variables */
int peer_local_smp_rank, my_local_smp_rank;
mca_ptl_sm_frag_t *header_ptr;
ompi_fifo_t *send_fifo;
bool frag_matched;
mca_ptl_base_match_header_t *matching_header;
my_local_smp_rank=mca_ptl_sm_component.my_smp_rank;
/* send progress is made by the PML */
/*
* receive progress
*/
/* poll each fifo */
/* loop over fifo's */
for( peer_local_smp_rank=0 ;
peer_local_smp_rank < mca_ptl_sm_component.num_smp_procs
; peer_local_smp_rank++ )
{
/* we don't use the shared memory ptl to send to ourselves */
if( peer_local_smp_rank == my_local_smp_rank ) {
continue;
}
send_fifo=&(mca_ptl_sm_component.fifo
[my_local_smp_rank][peer_local_smp_rank]);
/* if fifo is not yet setup - continue - not data has been sent*/
if(OMPI_CB_FREE == send_fifo->tail){
continue;
}
/* aquire thread lock */
if( ompi_using_threads() ) {
ompi_atomic_lock(&(send_fifo->tail_lock));
}
/* get pointer */
header_ptr=(mca_ptl_sm_frag_t *)ompi_fifo_read_from_tail( send_fifo,
mca_ptl_sm_component.sm_offset);
if( OMPI_CB_FREE == header_ptr ) {
continue;
}
/* release thread lock */
if( ompi_using_threads() ) {
ompi_atomic_unlock(&(send_fifo->tail_lock));
}
/* change the address from address relative to the shared
* memory address, to a true virtual address */
header_ptr = (mca_ptl_sm_frag_t *)( (char *)header_ptr+
mca_ptl_sm_component.sm_offset);
/* figure out what type of message this is */
switch
(header_ptr->super.frag_base.frag_header.hdr_common.hdr_type)
{
case MCA_PTL_HDR_TYPE_MATCH:
/* attempt match */
matching_header= &(header_ptr->super.frag_base.
frag_header.hdr_match);
frag_matched=mca_ptl_base_match_in_order_network_delivery(
matching_header,
(mca_ptl_base_recv_frag_t *)header_ptr);
if( NULL != frag_matched ) {
/* deliver data, and ack */
}
break;
case MCA_PTL_HDR_TYPE_FRAG:
/* second and beyond fragment - just need to deliver
* the data, and ack */
break;
case MCA_PTL_HDR_TYPE_ACK:
/* ack */
break;
default:
}
} /* end peer_local_smp_rank loop */
return OMPI_SUCCESS;
}