fix numerous bugs in the shared memory implementation. Can pass a
hello world test now. This commit was SVN r3313.
Этот коммит содержится в:
родитель
1b7a7bae23
Коммит
826b5ebdf6
@ -288,10 +288,6 @@ int mca_ptl_sm_add_procs(
|
||||
/* allow other procs to use this shared memory map */
|
||||
mca_ptl_sm_component.mmap_file->map_seg->seg_inited=true;
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," yeah !!! \n");
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
/* Note: Need to make sure that proc 0 initializes control
|
||||
* structures before any of the other procs can progress */
|
||||
@ -301,10 +297,6 @@ int mca_ptl_sm_add_procs(
|
||||
while(!mca_ptl_sm_component.mmap_file->map_seg->seg_inited)
|
||||
{ ; }
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," yeah !!! II %d \n",n_to_allocate);
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
/* cache the pointer to the 2d fifo array. This is a virtual
|
||||
* address, whereas the address in the virtual memory segment
|
||||
@ -315,10 +307,6 @@ int mca_ptl_sm_add_procs(
|
||||
return_code=OMPI_ERROR;
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," yeah !!! IV \n");
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
fifo_tmp=(ompi_fifo_t **)
|
||||
( (char *)(mca_ptl_sm_component.sm_ctl_header->fifo) +
|
||||
(size_t)(mca_ptl_sm_component.sm_mpool->mpool_base()) );
|
||||
@ -481,11 +469,6 @@ int mca_ptl_sm_send(
|
||||
mca_ptl_base_header_t* hdr;
|
||||
void *sm_data_ptr, *user_data_ptr;
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," ZZZZ send called %d \n",
|
||||
ptl_peer->my_smp_rank);
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
/* cast to shared memory send descriptor */
|
||||
sm_request=(mca_ptl_sm_send_request_t *)sendreq;
|
||||
|
||||
@ -495,10 +478,6 @@ int mca_ptl_sm_send(
|
||||
/* in this ptl, we will only use the cache, or fail */
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," ZZZZ II send called %d size %d\n",
|
||||
ptl_peer->my_smp_rank,size);
|
||||
fflush(stderr);
|
||||
|
||||
/* if needed, pack data in payload buffer */
|
||||
if( 0 <= size ) {
|
||||
@ -531,10 +510,6 @@ int mca_ptl_sm_send(
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," after pack \n");
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
/* fill in the fragment descriptor */
|
||||
/* get pointer to the fragment header */
|
||||
@ -546,8 +521,6 @@ int mca_ptl_sm_send(
|
||||
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
|
||||
hdr->hdr_frag.hdr_frag_seq = 0;
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = sendreq;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY -
|
||||
REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
|
||||
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
|
||||
@ -574,22 +547,12 @@ int mca_ptl_sm_send(
|
||||
send_fifo=&(mca_ptl_sm_component.fifo
|
||||
[my_local_smp_rank][peer_local_smp_rank]);
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," send_fifo %d %d %u \n",
|
||||
my_local_smp_rank,peer_local_smp_rank,getpid());
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
/* lock for thread safety - using atomic lock, not mutex, since
|
||||
* we need shared memory access to these lock, and in some pthread
|
||||
* implementation, such mutex's don't work correctly */
|
||||
if( ompi_using_threads() ) {
|
||||
ompi_atomic_lock(&(send_fifo->head_lock));
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," send_fifo after %d %d %u \n",
|
||||
my_local_smp_rank,peer_local_smp_rank,getpid());
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
if(OMPI_CB_FREE == send_fifo->head){
|
||||
/* no queues have been allocated - allocate now */
|
||||
@ -604,25 +567,19 @@ int mca_ptl_sm_send(
|
||||
return return_status;
|
||||
}
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," sending me %d it %d \n",my_local_smp_rank,
|
||||
peer_local_smp_rank);
|
||||
fflush(stderr);
|
||||
/* end if */
|
||||
|
||||
/* post descriptor */
|
||||
return_status=ompi_fifo_write_to_head(sm_request->req_frag_offset_from_base,
|
||||
send_fifo, mca_ptl_sm_component.sm_mpool,
|
||||
mca_ptl_sm_component.sm_offset);
|
||||
if( 0 <= return_status ) {
|
||||
return_status=OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* release threa lock */
|
||||
if( ompi_using_threads() ) {
|
||||
ompi_atomic_unlock(&(send_fifo->head_lock));
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," done sending \n");
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
/* return */
|
||||
return return_status;
|
||||
}
|
||||
@ -735,6 +692,9 @@ int mca_ptl_sm_send_continue(
|
||||
return_status=ompi_fifo_write_to_head(sm_request->req_frag_offset_from_base,
|
||||
send_fifo, mca_ptl_sm_component.sm_mpool,
|
||||
mca_ptl_sm_component.sm_offset);
|
||||
if( 0 <= return_status ) {
|
||||
return_status=OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* release threa lock */
|
||||
if( ompi_using_threads() ) {
|
||||
@ -818,19 +778,39 @@ void mca_ptl_sm_matched(
|
||||
* don't agragate */
|
||||
my_local_smp_rank=mca_ptl_sm_component.my_smp_rank;
|
||||
peer_local_smp_rank=sm_frag_desc->queue_index;
|
||||
|
||||
send_fifo=&(mca_ptl_sm_component.fifo
|
||||
[my_local_smp_rank][peer_local_smp_rank]);
|
||||
/* check to see if fifo is allocated */
|
||||
if(OMPI_CB_FREE == send_fifo->head){
|
||||
/* no queues have been allocated - allocate now */
|
||||
return_status=ompi_fifo_init(mca_ptl_sm_component.size_of_cb_queue,
|
||||
mca_ptl_sm_component.cb_lazy_free_freq,
|
||||
/* at this stage we are not doing anything with memory
|
||||
* locality */
|
||||
0,0,0,
|
||||
send_fifo, mca_ptl_sm_component.sm_mpool);
|
||||
if( return_status != OMPI_SUCCESS ) {
|
||||
ompi_atomic_unlock(&(send_fifo->head_lock));
|
||||
return return_status;
|
||||
}
|
||||
}
|
||||
|
||||
/* change address to be relative to offset from base of shared
|
||||
* memory segment */
|
||||
sm_frag_desc_rel_to_base= (char *) ( (char *)sm_frag_desc -
|
||||
mca_ptl_sm_component.sm_offset );
|
||||
|
||||
/* set the fragment type to be an ack */
|
||||
sm_frag_desc->super.frag_base.frag_header.hdr_common.hdr_type=
|
||||
MCA_PTL_HDR_TYPE_ACK;
|
||||
return_status=ompi_fifo_write_to_head(
|
||||
sm_frag_desc_rel_to_base,
|
||||
send_fifo, mca_ptl_sm_component.sm_mpool,
|
||||
mca_ptl_sm_component.sm_offset);
|
||||
|
||||
/* if can't ack, put on list for later delivery */
|
||||
if( OMPI_SUCCESS != return_status ) {
|
||||
if( 0 > return_status ) {
|
||||
OMPI_THREAD_LOCK(&(mca_ptl_sm.sm_pending_ack_lock));
|
||||
ompi_list_append(&(mca_ptl_sm.sm_pending_ack),
|
||||
(ompi_list_item_t *)sm_frag_desc);
|
||||
|
@ -159,11 +159,6 @@ int mca_ptl_sm_component_open(void)
|
||||
OBJ_CONSTRUCT(&mca_ptl_sm.sm_pending_ack_lock, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_ptl_sm.sm_pending_ack, ompi_list_t);
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," at end of open \n");
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -237,10 +232,6 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init(
|
||||
|
||||
/* set flag indicating ptl not inited */
|
||||
mca_ptl_sm.ptl_inited=false;
|
||||
/* debug */
|
||||
fprintf(stderr," at end of init \n");
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
return ptls;
|
||||
}
|
||||
@ -300,7 +291,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
}
|
||||
|
||||
send_fifo=&(mca_ptl_sm_component.fifo
|
||||
[my_local_smp_rank][peer_local_smp_rank]);
|
||||
[peer_local_smp_rank][my_local_smp_rank]);
|
||||
|
||||
/* if fifo is not yet setup - continue - not data has been sent*/
|
||||
if(OMPI_CB_FREE == send_fifo->tail){
|
||||
@ -332,11 +323,10 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
* memory address, to a true virtual address */
|
||||
header_ptr = (mca_ptl_sm_frag_t *)( (char *)header_ptr+
|
||||
mca_ptl_sm_component.sm_offset);
|
||||
/* debug */
|
||||
fprintf(stderr," recv :: got it %d from %d \n",
|
||||
my_local_smp_rank,peer_local_smp_rank);
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
/* set the owning ptl */
|
||||
header_ptr->super.frag_base.frag_owner=(mca_ptl_base_module_t *)
|
||||
(&mca_ptl_sm);
|
||||
|
||||
/* figure out what type of message this is */
|
||||
switch
|
||||
@ -350,7 +340,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
frag_matched=mca_ptl_base_match_in_order_network_delivery(
|
||||
matching_header,
|
||||
(mca_ptl_base_recv_frag_t *)header_ptr);
|
||||
if( NULL != frag_matched ) {
|
||||
if( frag_matched ) {
|
||||
/* deliver data, and ack */
|
||||
mca_ptl_sm_matched((mca_ptl_base_module_t *)&mca_ptl_sm,
|
||||
(mca_ptl_base_recv_frag_t *)header_ptr);
|
||||
@ -416,14 +406,19 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
sm_frag_desc_rel_to_base= (char *) ( (char *)header_ptr -
|
||||
mca_ptl_sm_component.sm_offset );
|
||||
|
||||
/* try and send an ack */
|
||||
/* try and send an ack - no need to check and see if a send
|
||||
* queue has been allocated, since entries are put here only
|
||||
* if the queue was previously full */
|
||||
|
||||
/* fragment already marked as an ack */
|
||||
|
||||
return_status=ompi_fifo_write_to_head( sm_frag_desc_rel_to_base,
|
||||
send_fifo,
|
||||
mca_ptl_sm_component.sm_mpool,
|
||||
mca_ptl_sm_component.sm_offset);
|
||||
|
||||
/* if ack failed, break */
|
||||
if( OMPI_SUCCESS != return_status ) {
|
||||
if( 0 > return_status ) {
|
||||
/* put the descriptor back on the list */
|
||||
ompi_list_prepend(&(mca_ptl_sm.sm_pending_ack),item);
|
||||
break;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user