A more advanced version, but still not working.
This commit was SVN r2109.
Этот коммит содержится в:
родитель
973c731406
Коммит
00cf66a7d9
@ -314,81 +314,79 @@ mca_ptl_gm_get (struct mca_ptl_base_module_t *ptl,
|
||||
*/
|
||||
|
||||
void
|
||||
mca_ptl_gm_matched (mca_ptl_base_module_t * ptl,
|
||||
mca_ptl_base_recv_frag_t * frag)
|
||||
mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
|
||||
mca_ptl_base_recv_frag_t * frag )
|
||||
{
|
||||
mca_pml_base_recv_request_t *request;
|
||||
/*mca_ptl_base_recv_request_t *request;*/
|
||||
mca_ptl_base_header_t *header;
|
||||
int bytes_recv, rc;
|
||||
mca_ptl_gm_module_t *gm_ptl;
|
||||
struct iovec iov[1];
|
||||
|
||||
/* might need to send an ack back */
|
||||
#if 1
|
||||
|
||||
mca_pml_base_recv_request_t *request;
|
||||
/*mca_ptl_base_recv_request_t *request;*/
|
||||
mca_ptl_base_header_t *header;
|
||||
int bytes_recv, rc;
|
||||
mca_ptl_gm_module_t *gm_ptl;
|
||||
struct iovec iov[1];
|
||||
|
||||
|
||||
header = &frag->frag_base.frag_header;
|
||||
request = frag->frag_request;
|
||||
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
|
||||
header = &frag->frag_base.frag_header;
|
||||
request = frag->frag_request;
|
||||
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
|
||||
#if 0
|
||||
int rc;
|
||||
mca_ptl_gm_send_frag_t *ack;
|
||||
recv_frag = (mca_ptl_gm_recv_frag_t *) frag;
|
||||
ack = mca_ptl_gm_alloc_send_frag(ptl,NULL);
|
||||
/* might need to send an ack back */
|
||||
int rc;
|
||||
mca_ptl_gm_send_frag_t *ack;
|
||||
recv_frag = (mca_ptl_gm_recv_frag_t *) frag;
|
||||
ack = mca_ptl_gm_alloc_send_frag(ptl,NULL);
|
||||
|
||||
if (NULL == ack) {
|
||||
ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n",
|
||||
__FILE__,___LINE__);
|
||||
OMPI_THREAD_LOCK (&mca_ptl_gm_module.gm_lock);
|
||||
recv_frag->frag_ack_pending = true;
|
||||
ompi_list_append (&mca_ptl_gm_module.gm_pending_acks,
|
||||
(ompi_list_item_t *) frag);
|
||||
OMPI_THREAD_UNLOCK (&mca_ptl_gm_module.gm_lock);
|
||||
} else {
|
||||
mca_ptl_gm_send_frag_init_ack (ack, ptl,
|
||||
recv_frag->super.super.
|
||||
frag_peer, recv_frag);
|
||||
/*XXX: check this*/
|
||||
mca_ptl_gm_peer_send (ack->super.super.frag_peer, ack,0,0,0 );
|
||||
if (NULL == ack) {
|
||||
ompi_output(0,"[%s:%d] unable to alloc a gm fragment\n",
|
||||
__FILE__,___LINE__);
|
||||
OMPI_THREAD_LOCK (&mca_ptl_gm_module.gm_lock);
|
||||
recv_frag->frag_ack_pending = true;
|
||||
ompi_list_append (&mca_ptl_gm_module.gm_pending_acks,
|
||||
(ompi_list_item_t *) frag);
|
||||
OMPI_THREAD_UNLOCK (&mca_ptl_gm_module.gm_lock);
|
||||
} else {
|
||||
mca_ptl_gm_send_frag_init_ack (ack, ptl,
|
||||
recv_frag->super.super.
|
||||
frag_peer, recv_frag);
|
||||
/*XXX: check this*/
|
||||
mca_ptl_gm_peer_send (ack->super.super.frag_peer, ack,0,0,0 );
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#endif
|
||||
|
||||
}
|
||||
/* Here we expect that frag_addr is the beging of the buffer header included */
|
||||
iov[0].iov_base = ((char*)frag->frag_base.frag_addr) + header->hdr_common.hdr_size;
|
||||
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
|
||||
iov[0].iov_len = bytes_recv;
|
||||
|
||||
iov[0].iov_base = ((char*)frag->frag_base.frag_addr) + header->hdr_common.hdr_size;
|
||||
bytes_recv = frag->frag_base.frag_size - header->hdr_common.hdr_size;
|
||||
iov[0].iov_len = bytes_recv;
|
||||
/*process fragment if complete */
|
||||
if (header->hdr_frag.hdr_frag_length > 0) {
|
||||
ompi_proc_t *proc;
|
||||
|
||||
/*process fragment if complete */
|
||||
if (header->hdr_frag.hdr_frag_length > 0) {
|
||||
ompi_proc_t *proc;
|
||||
proc = ompi_comm_peer_lookup(request->req_base.req_comm,
|
||||
request->req_base.req_peer);
|
||||
|
||||
proc = ompi_comm_peer_lookup(request->req_base.req_comm,
|
||||
request->req_base.req_peer);
|
||||
ompi_convertor_copy(proc->proc_convertor,
|
||||
&frag->frag_base.frag_convertor);
|
||||
ompi_convertor_init_for_recv(
|
||||
&frag->frag_base.frag_convertor,
|
||||
0,
|
||||
request->req_base.req_datatype,
|
||||
request->req_base.req_count,
|
||||
request->req_base.req_addr,
|
||||
header->hdr_frag.hdr_frag_offset);
|
||||
rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1);
|
||||
assert( rc == 1 );
|
||||
}
|
||||
|
||||
ompi_convertor_copy(proc->proc_convertor,
|
||||
&frag->frag_base.frag_convertor);
|
||||
ompi_convertor_init_for_recv(
|
||||
&frag->frag_base.frag_convertor,
|
||||
0,
|
||||
request->req_base.req_datatype,
|
||||
request->req_base.req_count,
|
||||
request->req_base.req_addr,
|
||||
header->hdr_frag.hdr_frag_offset);
|
||||
rc = ompi_convertor_unpack(&frag->frag_base.frag_convertor, &(iov[0]), 1);
|
||||
assert( rc == 1 );
|
||||
}
|
||||
/*update progress*/ /* XXX : check this */
|
||||
ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len );
|
||||
|
||||
/*update progress*/ /* XXX : check this */
|
||||
ptl->ptl_recv_progress( ptl, request, bytes_recv, iov[0].iov_len );
|
||||
|
||||
|
||||
/*return to free list */
|
||||
gm_ptl = (mca_ptl_gm_module_t *)ptl;
|
||||
OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags),(ompi_list_item_t*)frag);
|
||||
|
||||
|
||||
#endif
|
||||
/* Now update the status of the fragment */
|
||||
((mca_ptl_gm_recv_frag_t*)frag)->matched = true;
|
||||
if( ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer == true ) {
|
||||
free( frag->frag_base.frag_addr );
|
||||
((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer = false;
|
||||
}
|
||||
/*return to free list */
|
||||
gm_ptl = (mca_ptl_gm_module_t *)ptl;
|
||||
OMPI_FREE_LIST_RETURN(&(gm_ptl->gm_recv_frags_free), (ompi_list_item_t*)frag);
|
||||
}
|
||||
|
@ -68,7 +68,6 @@ struct mca_ptl_gm_module_t {
|
||||
/*struct mca_ptl_gm_addr_t *proc_id_table;*/
|
||||
|
||||
ompi_free_list_t gm_send_frags;
|
||||
ompi_free_list_t gm_recv_frags;
|
||||
ompi_free_list_t gm_recv_frags_free;
|
||||
ompi_list_t gm_send_frags_queue;
|
||||
ompi_list_t gm_pending_acks;
|
||||
|
@ -27,35 +27,34 @@
|
||||
#include "ptl_gm_priv.h"
|
||||
|
||||
mca_ptl_gm_component_t mca_ptl_gm_component = {
|
||||
{
|
||||
/* First, the mca_base_component_t struct containing meta information
|
||||
about the component itself */
|
||||
{
|
||||
/* Indicate that we are a pml v1.0.0 component (which also implies a
|
||||
specific MCA version) */
|
||||
MCA_PTL_BASE_VERSION_1_0_0,
|
||||
"gm", /* MCA component name */
|
||||
1, /* MCA component major version */
|
||||
0, /* MCA component minor version */
|
||||
0, /* MCA component release version */
|
||||
mca_ptl_gm_component_open, /* component open */
|
||||
mca_ptl_gm_component_close /* component close */
|
||||
}
|
||||
,
|
||||
/* Next the MCA v1.0.0 component meta data */
|
||||
{
|
||||
/* Whether the component is checkpointable or not */
|
||||
false
|
||||
},
|
||||
mca_ptl_gm_component_init,
|
||||
mca_ptl_gm_component_control,
|
||||
mca_ptl_gm_component_progress}
|
||||
/* First, the mca_base_component_t struct containing meta information
|
||||
about the component itself */
|
||||
{
|
||||
/* Indicate that we are a pml v1.0.0 component (which also implies a
|
||||
specific MCA version) */
|
||||
MCA_PTL_BASE_VERSION_1_0_0,
|
||||
"gm", /* MCA component name */
|
||||
1, /* MCA component major version */
|
||||
0, /* MCA component minor version */
|
||||
0, /* MCA component release version */
|
||||
mca_ptl_gm_component_open, /* component open */
|
||||
mca_ptl_gm_component_close /* component close */
|
||||
}
|
||||
,
|
||||
/* Next the MCA v1.0.0 component meta data */
|
||||
{
|
||||
/* Whether the component is checkpointable or not */
|
||||
false
|
||||
},
|
||||
mca_ptl_gm_component_init,
|
||||
mca_ptl_gm_component_control,
|
||||
mca_ptl_gm_component_progress
|
||||
}
|
||||
};
|
||||
|
||||
static bool mca_ptl_gm_component_initialized = false;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* utility routines for parameter registration
|
||||
*/
|
||||
@ -83,7 +82,6 @@ mca_ptl_gm_param_register_int (const char *param_name, int default_value)
|
||||
return param_value;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
@ -94,9 +92,6 @@ ompi_mca_ptl_gm_finalize (mca_ptl_gm_module_t * gm)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Called by MCA framework to open the module, registers
|
||||
* module parameters.
|
||||
@ -127,15 +122,11 @@ mca_ptl_gm_component_open (void)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* component close
|
||||
*/
|
||||
|
||||
int
|
||||
mca_ptl_gm_component_close (void)
|
||||
int mca_ptl_gm_component_close (void)
|
||||
{
|
||||
#ifdef GOPAL_TODO
|
||||
if (OMPI_SUCCESS != ompi_mca_ptl_gm_finalize(&mca_ptl_gm_component)) {
|
||||
@ -179,23 +170,6 @@ mca_ptl_gm_create (int i)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Create a GM PTL instance
|
||||
*/
|
||||
|
||||
static int
|
||||
mca_ptl_gm_module_create_instances (void)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Register GM component addressing information. The MCA framework
|
||||
* will make this available to all peers.
|
||||
@ -239,7 +213,7 @@ ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm)
|
||||
|
||||
mca_ptl_gm_component.gm_max_ptl_modules = maxptls;
|
||||
mca_ptl_gm_component.gm_ptl_modules = malloc (maxptls *
|
||||
sizeof (mca_ptl_gm_module_t *));
|
||||
sizeof (mca_ptl_gm_module_t *));
|
||||
if (NULL == mca_ptl_gm_component.gm_ptl_modules)
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
|
||||
@ -258,10 +232,11 @@ ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm)
|
||||
if (port_no == 3) continue;
|
||||
/* port 0,1,3 reserved */
|
||||
status = gm_open (&(ptl->my_port), board_no,
|
||||
port_no, "OMPI-GM", GM_API_VERSION_2_0);
|
||||
port_no, "OMPI-GM", GM_API_VERSION_2_0);
|
||||
|
||||
if (GM_SUCCESS == status) {
|
||||
ptl->my_port_id = port_no;
|
||||
mca_ptl_gm_component.gm_num_ptl_modules++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -293,14 +268,13 @@ ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm)
|
||||
static int
|
||||
ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
|
||||
{
|
||||
int i, rc;
|
||||
int i;
|
||||
mca_ptl_gm_module_t *ptl;
|
||||
gm_status_t status;
|
||||
void *gm_send_reg_memory , *gm_recv_reg_memory;
|
||||
ompi_free_list_t *fslist, *frlist, *free_rlist;
|
||||
ompi_list_item_t *item;
|
||||
ompi_free_list_t *fslist, *free_rlist;
|
||||
mca_ptl_gm_send_frag_t *sfragment;
|
||||
mca_ptl_gm_recv_frag_t *rfragment, *frag, *free_rfragment;
|
||||
mca_ptl_gm_recv_frag_t *free_rfragment;
|
||||
|
||||
for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) {
|
||||
ptl = mca_ptl_gm_component.gm_ptl_modules[i];
|
||||
@ -329,15 +303,17 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
|
||||
/* allocate the registered memory */
|
||||
gm_send_reg_memory = gm_dma_malloc ( ptl->my_port,
|
||||
(GM_SEND_BUF_SIZE * ptl->num_send_tokens) );
|
||||
|
||||
if( NULL == gm_send_reg_memory ) {
|
||||
ompi_output( 0, "unable to allocate registered memory\n" );
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
for (i = 0; i < ptl->num_send_tokens; i++) {
|
||||
ompi_list_item_t *item;
|
||||
sfragment->send_buf = gm_send_reg_memory;
|
||||
item = (ompi_list_item_t *) sfragment;
|
||||
ompi_list_append (&(fslist->super), item);
|
||||
OMPI_FREE_LIST_RETURN( fslist, item );
|
||||
|
||||
gm_send_reg_memory = ((char *) gm_send_reg_memory +
|
||||
GM_SEND_BUF_SIZE);
|
||||
gm_send_reg_memory = ((char *)gm_send_reg_memory) + GM_SEND_BUF_SIZE;
|
||||
sfragment++;
|
||||
|
||||
}
|
||||
@ -363,50 +339,26 @@ ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
|
||||
for (i = 0; i < NUM_RECV_FRAGS; i++) {
|
||||
ompi_list_item_t *item;
|
||||
item = (ompi_list_item_t *) free_rfragment;
|
||||
ompi_list_append (&(free_rlist->super), item); /* XXX: check this */
|
||||
OMPI_FREE_LIST_RETURN( free_rlist, item );
|
||||
free_rfragment++;
|
||||
}
|
||||
|
||||
|
||||
/*construct the list of recv fragments*/
|
||||
OBJ_CONSTRUCT (&(ptl->gm_recv_frags), ompi_free_list_t);
|
||||
frlist = &(ptl->gm_recv_frags);
|
||||
|
||||
/*allocate the elements */
|
||||
rfragment = (mca_ptl_gm_recv_frag_t *)
|
||||
malloc (sizeof (mca_ptl_gm_recv_frag_t) *
|
||||
(ptl->num_recv_tokens - 1));
|
||||
|
||||
/*allocate the registered memory */
|
||||
gm_recv_reg_memory =
|
||||
gm_dma_malloc (ptl->my_port,
|
||||
(GM_RECV_BUF_SIZE * ptl->num_recv_tokens ) );
|
||||
|
||||
if( NULL == gm_recv_reg_memory ) {
|
||||
ompi_output( 0, "unable to allocate registered memory for receive\n" );
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
for (i = 0; i < ptl->num_recv_tokens ; i++) {
|
||||
ompi_list_item_t *item;
|
||||
rfragment->alloc_recv_buffer = gm_recv_reg_memory;
|
||||
item = (ompi_list_item_t *) rfragment;
|
||||
ompi_list_append (&(frlist->super), item); /* XXX: check this */
|
||||
gm_recv_reg_memory = ((char *)
|
||||
gm_recv_reg_memory + GM_RECV_BUF_SIZE);
|
||||
rfragment++;
|
||||
gm_provide_receive_buffer( ptl->my_port, gm_recv_reg_memory,
|
||||
GM_SIZE, GM_LOW_PRIORITY );
|
||||
gm_recv_reg_memory = ((char *)gm_recv_reg_memory) + GM_RECV_BUF_SIZE;
|
||||
}
|
||||
|
||||
/*TODO : need to provide buffers with two different sizes
|
||||
* to distinguish between header and data
|
||||
* post receive buffers */
|
||||
for (i = 0; i < (ptl->num_recv_tokens-1) ; i++) {
|
||||
OMPI_FREE_LIST_GET( &(ptl->gm_recv_frags), item, rc);
|
||||
assert( rc == OMPI_SUCCESS );
|
||||
frag = (mca_ptl_gm_recv_frag_t*)item;
|
||||
gm_provide_receive_buffer (ptl->my_port,frag->alloc_recv_buffer,
|
||||
GM_SIZE, GM_LOW_PRIORITY);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -448,14 +400,13 @@ mca_ptl_gm_component_init (int *num_ptl_modules,
|
||||
|
||||
/* return array of PTLs */
|
||||
ptls = (mca_ptl_base_module_t**) malloc (
|
||||
mca_ptl_gm_component.gm_num_ptl_modules *
|
||||
sizeof (mca_ptl_base_module_t *));
|
||||
mca_ptl_gm_component.gm_num_ptl_modules * sizeof(mca_ptl_base_module_t *));
|
||||
if (NULL == ptls) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memcpy (ptls, mca_ptl_gm_component.gm_ptl_modules,
|
||||
mca_ptl_gm_component.gm_num_ptl_modules * sizeof (mca_ptl_gm_module_t *));
|
||||
mca_ptl_gm_component.gm_num_ptl_modules * sizeof(mca_ptl_gm_module_t *));
|
||||
*num_ptl_modules = mca_ptl_gm_component.gm_num_ptl_modules;
|
||||
return ptls;
|
||||
}
|
||||
|
@ -3,7 +3,6 @@
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#if 0
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
@ -23,6 +22,7 @@
|
||||
#include "ptl_gm_proc.h"
|
||||
#include "ptl_gm_sendfrag.h"
|
||||
|
||||
#if 0
|
||||
int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
|
||||
mca_ptl_gm_send_frag_t *fragment,
|
||||
struct mca_pml_base_send_request_t *sendreq,
|
||||
|
@ -169,134 +169,119 @@ void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
|
||||
|
||||
|
||||
|
||||
void ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
gm_recv_event_t* event )
|
||||
mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
gm_recv_event_t* event )
|
||||
{
|
||||
#if 1
|
||||
mca_ptl_gm_recv_frag_t * recv_frag;
|
||||
bool matched;
|
||||
mca_ptl_base_header_t *header;
|
||||
mca_ptl_gm_recv_frag_t * recv_frag;
|
||||
bool matched;
|
||||
mca_ptl_base_header_t *header;
|
||||
|
||||
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
|
||||
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
|
||||
|
||||
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
|
||||
/* allocate a receive fragment */
|
||||
recv_frag = mca_ptl_gm_alloc_recv_frag( (struct mca_ptl_base_module_t*)ptl );
|
||||
/* allocate a receive fragment */
|
||||
|
||||
recv_frag->frag_recv.frag_base.frag_peer = NULL;
|
||||
recv_frag->frag_recv.frag_request = NULL;
|
||||
recv_frag->frag_recv.frag_is_buffered = false;
|
||||
recv_frag->frag_hdr_cnt = 0;
|
||||
recv_frag->frag_msg_cnt = 0;
|
||||
recv_frag->frag_ack_pending = false;
|
||||
recv_frag->frag_progressed = 0;
|
||||
recv_frag->frag_recv.frag_base.frag_peer = NULL;
|
||||
recv_frag->frag_recv.frag_request = NULL;
|
||||
recv_frag->frag_recv.frag_is_buffered = false;
|
||||
recv_frag->frag_hdr_cnt = 0;
|
||||
recv_frag->frag_msg_cnt = 0;
|
||||
recv_frag->frag_ack_pending = false;
|
||||
recv_frag->frag_progressed = 0;
|
||||
|
||||
recv_frag->frag_recv.frag_base.frag_header = *header;
|
||||
recv_frag->frag_recv.frag_base.frag_header = *header;
|
||||
|
||||
recv_frag->frag_recv.frag_base.frag_addr =
|
||||
(char *)header + sizeof(mca_ptl_base_header_t);
|
||||
recv_frag->frag_recv.frag_base.frag_size = gm_ntohl(event->recv.length);
|
||||
/* header->hdr_frag.hdr_frag_length; */
|
||||
recv_frag->frag_recv.frag_base.frag_addr = header;
|
||||
recv_frag->frag_recv.frag_base.frag_size = gm_ntohl(event->recv.length);
|
||||
|
||||
matched = mca_ptl_base_match(
|
||||
&recv_frag->frag_recv.frag_base.frag_header.hdr_match,
|
||||
&(recv_frag->frag_recv),
|
||||
NULL );
|
||||
recv_frag->matched = false;
|
||||
recv_frag->have_allocated_buffer = false;
|
||||
|
||||
matched = mca_ptl_base_match(
|
||||
&recv_frag->frag_recv.frag_base.frag_header.hdr_match,
|
||||
&(recv_frag->frag_recv),
|
||||
NULL );
|
||||
|
||||
|
||||
if (!matched)
|
||||
{
|
||||
|
||||
ompi_output(0,"matching receive not yet posted\n");
|
||||
}
|
||||
/**/
|
||||
#endif
|
||||
|
||||
if (!matched) {
|
||||
ompi_output(0,"matching receive not yet posted\n");
|
||||
return recv_frag;
|
||||
}
|
||||
/* this one was matched => nothing more to do */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int ptl_gm_handle_recv(mca_ptl_gm_module_t *ptl, gm_recv_event_t* event )
|
||||
mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_event_t* event )
|
||||
{
|
||||
mca_ptl_gm_recv_frag_t* frag = NULL;
|
||||
mca_ptl_base_header_t *header;
|
||||
|
||||
#if 1
|
||||
/*int matched;*/
|
||||
mca_ptl_base_header_t *header;
|
||||
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
|
||||
|
||||
header = (mca_ptl_base_header_t *)gm_ntohp(event->recv.message);
|
||||
|
||||
switch(header->hdr_common.hdr_type)
|
||||
{
|
||||
switch(header->hdr_common.hdr_type) {
|
||||
case MCA_PTL_HDR_TYPE_MATCH:
|
||||
case MCA_PTL_HDR_TYPE_FRAG:
|
||||
ptl_gm_data_frag( ptl, event );
|
||||
break;
|
||||
frag = ptl_gm_data_frag( ptl, event );
|
||||
break;
|
||||
|
||||
case MCA_PTL_HDR_TYPE_ACK:
|
||||
case MCA_PTL_HDR_TYPE_NACK:
|
||||
ptl_gm_ctrl_frag(ptl,header);
|
||||
break;
|
||||
default:
|
||||
ptl_gm_ctrl_frag(ptl,header);
|
||||
break;
|
||||
default:
|
||||
ompi_output(0,"[%s:%d] unexpected frag type %d\n",
|
||||
__FILE__,__LINE__,header->hdr_common.hdr_type);
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
return frag;
|
||||
}
|
||||
|
||||
|
||||
int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
|
||||
{
|
||||
/*#if 0*/
|
||||
int i,rc;
|
||||
int num_ptls;
|
||||
int i;
|
||||
gm_recv_event_t *event;
|
||||
void * mesg;
|
||||
mca_ptl_gm_module_t *ptl;
|
||||
mca_ptl_gm_recv_frag_t * frag;
|
||||
ompi_list_item_t* item;
|
||||
num_ptls = gm_comp->gm_num_ptl_modules;
|
||||
|
||||
for (i=0; i< num_ptls; i++)
|
||||
{
|
||||
for( i = 0; i< gm_comp->gm_num_ptl_modules; i++) {
|
||||
ptl = gm_comp->gm_ptl_modules[i];
|
||||
|
||||
{
|
||||
event = gm_receive(ptl->my_port);
|
||||
event = gm_receive(ptl->my_port);
|
||||
|
||||
switch (gm_ntohc(event->recv.type))
|
||||
{
|
||||
case GM_RECV_EVENT:
|
||||
case GM_HIGH_RECV_EVENT:
|
||||
case GM_PEER_RECV_EVENT:
|
||||
case GM_HIGH_PEER_RECV_EVENT:
|
||||
switch (gm_ntohc(event->recv.type)) {
|
||||
case GM_RECV_EVENT:
|
||||
case GM_HIGH_RECV_EVENT:
|
||||
case GM_PEER_RECV_EVENT:
|
||||
case GM_HIGH_PEER_RECV_EVENT:
|
||||
mesg = gm_ntohp(event->recv.message);
|
||||
ptl_gm_handle_recv( ptl, event );
|
||||
frag = ptl_gm_handle_recv( ptl, event );
|
||||
if( (frag != NULL) && !(frag->matched) ) {
|
||||
/* allocate temporary buffer: temporary until the fragment will be finally matched */
|
||||
char* buffer = malloc( GM_SEND_BUF_SIZE );
|
||||
/* copy the data from the registered buffer to the newly allocated one */
|
||||
memcpy( buffer, mesg, gm_ntohl(event->recv.length) );
|
||||
/* associate the buffer with the unexpected fragment */
|
||||
frag->frag_recv.frag_base.frag_addr = buffer;
|
||||
/* mark the fragment as having pending buffers */
|
||||
frag->have_allocated_buffer = true;
|
||||
}
|
||||
break;
|
||||
case GM_NO_RECV_EVENT:
|
||||
break;
|
||||
|
||||
/* post a replacement buffer */ /*XXX: do this after frag done*/
|
||||
OMPI_FREE_LIST_GET( &(ptl->gm_recv_frags), item, rc );
|
||||
if(rc != OMPI_SUCCESS)
|
||||
ompi_output(0,"unable to allocate a buffer\n");
|
||||
frag = (mca_ptl_gm_recv_frag_t*)item;
|
||||
/*frag =(mca_ptl_gm_recv_frag_t *) st_remove_first (*/
|
||||
/*&ptl->gm_recv_frags);*/
|
||||
gm_provide_receive_buffer(ptl->my_port,
|
||||
frag->alloc_recv_buffer,
|
||||
GM_SIZE,
|
||||
GM_LOW_PRIORITY
|
||||
);
|
||||
case GM_NO_RECV_EVENT:
|
||||
break;
|
||||
default:
|
||||
gm_unknown(ptl->my_port, event);
|
||||
|
||||
default:
|
||||
gm_unknown(ptl->my_port, event);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
/* #endif*/
|
||||
}
|
||||
/* Alway repost the message */
|
||||
gm_provide_receive_buffer( ptl->my_port, gm_ntohp(event->recv.message),
|
||||
GM_SIZE, GM_LOW_PRIORITY );
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -20,10 +20,11 @@
|
||||
void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
|
||||
mca_ptl_base_header_t * header);
|
||||
|
||||
void ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
gm_recv_event_t* event );
|
||||
mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
|
||||
gm_recv_event_t* event );
|
||||
|
||||
int ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl, gm_recv_event_t* event );
|
||||
mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl,
|
||||
gm_recv_event_t* event );
|
||||
|
||||
int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp);
|
||||
|
||||
|
@ -80,7 +80,6 @@ mca_ptl_gm_proc_create (mca_ptl_gm_module_t * ptl, ompi_proc_t * ompi_proc)
|
||||
{
|
||||
int rc;
|
||||
size_t size;
|
||||
int i;
|
||||
mca_ptl_gm_proc_t *ptl_proc;
|
||||
|
||||
ptl_proc = mca_ptl_gm_proc_lookup_ompi (ompi_proc);
|
||||
|
@ -52,6 +52,8 @@ struct mca_ptl_gm_recv_frag_t {
|
||||
bool frag_ack_pending;
|
||||
void *alloc_recv_buffer;
|
||||
void *unex_recv_buffer;
|
||||
bool matched;
|
||||
bool have_allocated_buffer;
|
||||
};
|
||||
|
||||
typedef struct mca_ptl_gm_recv_frag_t mca_ptl_gm_recv_frag_t;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user