1
1

Now we can have several point-to-point exchanges !!!

This commit was SVN r2112.
Этот коммит содержится в:
George Bosilca 2004-08-13 08:25:37 +00:00
родитель 11bca73ad4
Коммит ecb2efdced
2 изменённых файлов: 60 добавлений и 70 удалений

Просмотреть файл

@ -49,8 +49,8 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
/**header = fragment->send_frag.frag_base.frag_header;*/
if(size_in > 0) {
ompi_convertor_t *convertor;
int rc;
ompi_convertor_t *convertor;
int rc;
/* first fragment (eager send) and first fragment of long protocol
* can use the convertor initialized on the request, remaining
@ -63,13 +63,12 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
} else {
convertor = &(fragment->send_frag.frag_base.frag_convertor);
ompi_convertor_copy(&sendreq->req_convertor, convertor);
ompi_convertor_init_for_send(
convertor,
0,
sendreq->req_base.req_datatype,
sendreq->req_base.req_count,
sendreq->req_base.req_addr,
offset);
ompi_convertor_init_for_send( convertor,
0,
sendreq->req_base.req_datatype,
sendreq->req_base.req_count,
sendreq->req_base.req_addr,
offset);
}
/* if data is contigous convertor will return an offset
@ -79,9 +78,8 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
/*XXX: need to add the header */
/*copy the data to the registered buffer*/
/*copy the data to the registered buffer*/
outvec[0].iov_base = ((char*)fragment->send_buf) + header_length;
outvec[0].iov_len -= header_length; /*XXXcheck this */
if((rc = ompi_convertor_pack(convertor, &(outvec[0]), 1)) < 0)
return OMPI_ERROR;
@ -93,78 +91,72 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
* packed by convertor */
size_out = outvec[0].iov_len;
/* initiate the gm send */
gm_send_with_callback(ptl_peer->peer_ptl->my_port, fragment->send_buf,
GM_SIZE,size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
ptl_peer->port_number,send_callback, (void *)fragment );
/* initiate the gm send */
gm_send_with_callback( ptl_peer->peer_ptl->my_port, fragment->send_buf,
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
ptl_peer->port_number, send_callback, (void *)fragment );
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
fragment->send_frag.frag_base.frag_peer = (struct mca_ptl_base_peer_t*)ptl_peer;
fragment->send_frag.frag_base.frag_addr = outvec[0].iov_base;
fragment->send_frag.frag_base.frag_size = size_out; /*XXX: should this be size_out */
return OMPI_SUCCESS;
return OMPI_SUCCESS;
}
void send_callback(struct gm_port *port,void * context, gm_status_t status)
{
#if 1
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *frag;
ompi_list_t *list;
int bytes;
mca_pml_base_send_request_t *gm_send_req;
frag = (mca_ptl_gm_send_frag_t *)context;
ptl = (mca_ptl_gm_module_t *)frag->ptl;
gm_send_req = frag->req;
switch (status)
{
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_send_frag_t *frag;
ompi_list_t *list;
int bytes, header_length;
mca_pml_base_send_request_t *gm_send_req;
mca_ptl_base_frag_header_t* header;
frag = (mca_ptl_gm_send_frag_t *)context;
ptl = (mca_ptl_gm_module_t *)frag->ptl;
gm_send_req = frag->req;
header = (mca_ptl_base_frag_header_t*)frag->send_buf;
header_length = ((mca_ptl_base_header_t*)header)->hdr_common.hdr_size;
bytes = frag->send_frag.frag_base.frag_size - header_length;
switch (status) {
case GM_SUCCESS:
/* send completed, can reuse the user buffer */
ptl->num_send_tokens++;
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl,
gm_send_req,bytes);
/*ompi_list_append(&(ptl->gm_send_frags_queue),(ompi_list_item_t
* *)frag); */
list = (ompi_list_t *)(&(ptl->gm_send_frags_queue));
ompi_list_remove_first(list);
break;
/* send completed, can reuse the user buffer */
ptl->num_send_tokens++;
ptl->super.ptl_send_progress( (mca_ptl_base_module_t*)ptl, gm_send_req, bytes );
list = (ompi_list_t *)(&(ptl->gm_send_frags_queue));
ompi_list_remove_first(list);
break;
case GM_SEND_TIMED_OUT:
/* need to take care of retransmission */
break;
/* need to take care of retransmission */
break;
case GM_SEND_DROPPED:
/* need to handle this case */
break;
/* need to handle this case */
break;
default:
ompi_output(0,
"[%s:%d] error in message completion\n",__FILE__,__LINE__);
break;
"[%s:%d] error in message completion\n",__FILE__,__LINE__);
break;
}
#endif
}
void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
mca_ptl_base_header_t * header)
{
mca_ptl_gm_send_frag_t * frag;
mca_pml_base_send_request_t *req;
#if 1
mca_ptl_gm_send_frag_t * frag;
mca_pml_base_send_request_t *req;
frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval;
req = (mca_pml_base_send_request_t *) frag->req;
req->req_peer_match = header->hdr_ack.hdr_dst_match;
/* return the send fragment to the free list */
#endif
frag = (mca_ptl_gm_send_frag_t *)header->hdr_ack.hdr_src_ptr.pval;
req = (mca_pml_base_send_request_t *) frag->req;
req->req_peer_match = header->hdr_ack.hdr_dst_match;
/* return the send fragment to the free list */
}
mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
@ -174,11 +166,12 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
bool matched;
mca_ptl_base_header_t *header;
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.buffer);
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
/* allocate a receive fragment */
recv_frag->frag_recv.frag_base.frag_owner = (struct mca_ptl_base_module_t*)ptl;
recv_frag->frag_recv.frag_base.frag_peer = NULL;
recv_frag->frag_recv.frag_request = NULL;
recv_frag->frag_recv.frag_is_buffered = false;
@ -195,12 +188,10 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
recv_frag->matched = false;
recv_frag->have_allocated_buffer = false;
matched = mca_ptl_base_match(
&recv_frag->frag_recv.frag_base.frag_header.hdr_match,
&(recv_frag->frag_recv),
NULL );
matched = ptl->super.ptl_match( &(ptl->super),
&(recv_frag->frag_recv),
&(recv_frag->frag_recv.frag_base.frag_header.hdr_match) );
if( matched ) {
mca_ptl_gm_matched( &(ptl->super), &(recv_frag->frag_recv) );
return NULL;
}
ompi_output(0,"matching receive not yet posted\n");
@ -214,7 +205,7 @@ mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_ev
mca_ptl_gm_recv_frag_t* frag = NULL;
mca_ptl_base_header_t *header;
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.buffer);
switch(header->hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
@ -252,7 +243,7 @@ int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
case GM_HIGH_RECV_EVENT:
case GM_PEER_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT:
mesg = gm_ntohp(event->recv.message);
mesg = gm_ntohp(event->recv.buffer);
frag = ptl_gm_handle_recv( ptl, event );
if( (frag != NULL) && !(frag->matched) ) {
/* allocate temporary buffer: temporary until the fragment will be finally matched */
@ -264,7 +255,7 @@ int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
/* mark the fragment as having pending buffers */
frag->have_allocated_buffer = true;
}
gm_provide_receive_buffer( ptl->my_port, gm_ntohp(event->recv.message),
gm_provide_receive_buffer( ptl->my_port, gm_ntohp(event->recv.buffer),
GM_SIZE, GM_LOW_PRIORITY );
break;
case GM_NO_RECV_EVENT:

Просмотреть файл

@ -167,7 +167,7 @@ mca_ptl_gm_recv_frag_destruct (mca_ptl_gm_recv_frag_t *frag)
}
mca_ptl_gm_recv_frag_t *
mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl)
mca_ptl_gm_alloc_recv_frag( struct mca_ptl_base_module_t *ptl )
{
ompi_free_list_t *flist;
@ -178,10 +178,9 @@ mca_ptl_gm_alloc_recv_frag(struct mca_ptl_base_module_t *ptl)
flist =&( ((mca_ptl_gm_module_t *)ptl)->gm_recv_frags_free);
item = ompi_list_remove_first(&((flist)->super));
while(NULL == item)
{
ptl->ptl_component->ptlm_progress(tstamp);
item = ompi_list_remove_first (&((flist)->super));
while(NULL == item) {
ptl->ptl_component->ptlm_progress(tstamp);
item = ompi_list_remove_first (&((flist)->super));
}
frag = (mca_ptl_gm_recv_frag_t *)item;