1
1

* more refactoring to reduce duplicate code

This commit was SVN r5708.
Этот коммит содержится в:
Brian Barrett 2005-05-13 04:04:08 +00:00
родитель a242d5ad4f
Коммит f64e52a28d
3 изменённых файлов: 135 добавлений и 141 удалений

Просмотреть файл

@ -80,8 +80,8 @@ ptl_portals_post_recv_md(struct mca_ptl_portals_module_t *ptl, void *data_ptr)
return OMPI_ERROR;
}
ompi_output_verbose(100, mca_ptl_portals_component.portals_output,
"new receive buffer posted");
OMPI_OUTPUT_VERBOSE((100, mca_ptl_portals_component.portals_output,
"new receive buffer posted"));
return OMPI_SUCCESS;
}
@ -94,160 +94,51 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl,
int ret;
if (ev->type == PTL_EVENT_PUT_START) {
ompi_output_verbose(100, mca_ptl_portals_component.portals_output,
"PUT_START event received (%ld)", ev->link);
OMPI_OUTPUT_VERBOSE((100, mca_ptl_portals_component.portals_output,
"PUT_START event received (%ld)", ev->link));
} else if (ev->type == PTL_EVENT_PUT_END) {
ompi_list_item_t *item;
mca_ptl_portals_recv_frag_t *recvfrag;
mca_ptl_base_header_t *hdr;
ompi_output_verbose(100, mca_ptl_portals_component.portals_output,
"message %ld received, start: %p, mlength: %lld, offset: %lld",
ev->link, ev->md.start, ev->mlength, ev->offset);
OMPI_OUTPUT_VERBOSE((100, mca_ptl_portals_component.portals_output,
"message %ld received, start: %p, mlength: %lld,"
" offset: %lld",
ev->link, ev->md.start, ev->mlength, ev->offset));
/* buffer is going to be header followed by data */
hdr = (mca_ptl_base_header_t*) (((char*) ev->md.start) + ev->offset);
switch (hdr->hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
/* get a fragment header */
OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret);
recvfrag = (mca_ptl_portals_recv_frag_t*) item;
if (OMPI_SUCCESS != ret) {
ompi_output(mca_ptl_portals_component.portals_output,
"unable to allocate resources");
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
/* save the sender */
recvfrag->frag_source = ev->initiator;
recvfrag->frag_data = ((mca_ptl_base_match_header_t*) hdr) + 1;
recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_match_header_t);
memcpy(&(recvfrag->frag_recv.frag_base.frag_header),
hdr, sizeof(mca_ptl_base_match_header_t));
recvfrag->frag_recv.frag_base.frag_owner =
(struct mca_ptl_base_module_t*) ptl;
recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */
recvfrag->frag_recv.frag_base.frag_size = 0;
recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data;
recvfrag->frag_recv.frag_is_buffered = true;
recvfrag->frag_recv.frag_request = NULL;
ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv,
&hdr->hdr_match);
ret = mca_ptl_portals_process_first_frag(ptl, hdr, ev,
sizeof(mca_ptl_base_match_header_t));
if (OMPI_SUCCESS != ret) return ret;
break;
case MCA_PTL_HDR_TYPE_RNDV:
/* get a fragment header */
OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret);
recvfrag = (mca_ptl_portals_recv_frag_t*) item;
if (OMPI_SUCCESS != ret) {
ompi_output(mca_ptl_portals_component.portals_output,
"unable to allocate resources");
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
/* save the sender */
recvfrag->frag_source = ev->initiator;
recvfrag->frag_data = ((mca_ptl_base_rendezvous_header_t*) hdr) + 1;
recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_rendezvous_header_t);
memcpy(&(recvfrag->frag_recv.frag_base.frag_header),
hdr, sizeof(mca_ptl_base_rendezvous_header_t));
recvfrag->frag_recv.frag_base.frag_owner =
(struct mca_ptl_base_module_t*) ptl;
recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */
recvfrag->frag_recv.frag_base.frag_size = 0;
recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data;
recvfrag->frag_recv.frag_is_buffered = true;
recvfrag->frag_recv.frag_request = NULL;
ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv,
&hdr->hdr_match);
ret = mca_ptl_portals_process_first_frag(ptl, hdr, ev,
sizeof(mca_ptl_base_rendezvous_header_t));
if (OMPI_SUCCESS != ret) return ret;
break;
case MCA_PTL_HDR_TYPE_FRAG:
{
unsigned int bytes_delivered;
mca_ptl_base_recv_request_t* request;
/* get a fragment header */
OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret);
recvfrag = (mca_ptl_portals_recv_frag_t*) item;
if (OMPI_SUCCESS != ret) {
ompi_output(mca_ptl_portals_component.portals_output,
"unable to allocate resources");
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
/* save the sender */
recvfrag->frag_source = ev->initiator;
recvfrag->frag_data = ((mca_ptl_base_frag_header_t*) hdr) + 1;
recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_frag_header_t);
memcpy(&(recvfrag->frag_recv.frag_base.frag_header),
hdr, sizeof(mca_ptl_base_frag_header_t));
recvfrag->frag_recv.frag_base.frag_owner =
(struct mca_ptl_base_module_t*) ptl;
recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */
recvfrag->frag_recv.frag_base.frag_size = 0;
recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data;
recvfrag->frag_recv.frag_is_buffered = true;
recvfrag->frag_recv.frag_request = hdr->hdr_frag.hdr_dst_ptr.pval;
bytes_delivered = recvfrag->frag_size;
request = recvfrag->frag_recv.frag_request;
if(recvfrag->frag_size > 0) {
struct iovec iov;
unsigned int iov_count = 1;
int free_after = 0;
ompi_proc_t *proc = ompi_comm_peer_lookup(request->req_recv.req_base.req_comm,
request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE);
ompi_convertor_t* convertor = &(recvfrag->frag_recv.frag_base.frag_convertor);
/* initialize receive convertor */
ompi_convertor_copy(proc->proc_convertor, convertor);
ompi_convertor_init_for_recv(
convertor, /* convertor */
0, /* flags */
request->req_recv.req_base.req_datatype, /* datatype */
request->req_recv.req_base.req_count, /* count elements */
request->req_recv.req_base.req_addr, /* users buffer */
hdr->hdr_frag.hdr_frag_offset, /* offset in bytes into packed buffer */
NULL ); /* not allocating memory */
/*ompi_convertor_get_packed_size(convertor, &request->req_bytes_packed); */
iov.iov_base = recvfrag->frag_data;
iov.iov_len = recvfrag->frag_size;
ompi_convertor_unpack(convertor, &iov, &iov_count, &bytes_delivered, &free_after );
}
/* update request status */
ptl->super.ptl_recv_progress(&ptl->super,
request,
recvfrag->frag_size,
bytes_delivered);
}
ret = mca_ptl_portals_process_frag_frag(ptl, hdr, ev);
if (OMPI_SUCCESS != ret) return ret;
break;
case MCA_PTL_HDR_TYPE_ACK:
{
mca_ptl_portals_send_frag_t *sendfrag;
mca_ptl_base_send_request_t *sendreq;
sendfrag = hdr->hdr_ack.hdr_src_ptr.pval;
sendreq = sendfrag->frag_send.frag_request;
sendreq->req_peer_match = hdr->hdr_ack.hdr_dst_match;
OMPI_OUTPUT_VERBOSE((100, mca_ptl_portals_component.portals_output,
OMPI_OUTPUT_VERBOSE((100,
mca_ptl_portals_component.portals_output,
"received ack for request %p",
hdr->hdr_ack.hdr_dst_match));
sendfrag = hdr->hdr_ack.hdr_src_ptr.pval;
sendreq = sendfrag->frag_send.frag_request;
sendreq->req_peer_match = hdr->hdr_ack.hdr_dst_match;
mca_ptl_portals_complete_send_event(sendfrag);
}
break;
default:
@ -257,11 +148,7 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl,
break;
}
/* see if we need to repost an md */
if (ev->md.length - (ev->offset + ev->mlength) < ev->md.max_size) {
ompi_output_verbose(100, mca_ptl_portals_component.portals_output,
"must repost event: %lld, %lld, %lld",
ev->offset, ev->mlength, ev->md.max_size);
/* use the same memory as the old md - it's not using it anymore */
ret = ptl_portals_post_recv_md(ptl, ev->md.start);
if (OMPI_SUCCESS != ret) {

Просмотреть файл

@ -36,10 +36,118 @@ typedef struct mca_ptl_portals_recv_frag_t mca_ptl_portals_recv_frag_t;
OBJ_CLASS_DECLARATION(mca_ptl_portals_recv_frag_t);
extern int ptl_portals_post_recv_md(struct mca_ptl_portals_module_t *ptl,
void *data_ptr);
extern int mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl,
ptl_event_t *ev);
int ptl_portals_post_recv_md(struct mca_ptl_portals_module_t *ptl,
void *data_ptr);
int mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl,
ptl_event_t *ev);
static inline mca_ptl_portals_recv_frag_t *
mca_ptl_portals_recv_get_frag(struct mca_ptl_portals_module_t *ptl,
mca_ptl_base_header_t *hdr,
ptl_event_t *ev,
size_t header_size)
{
mca_ptl_portals_recv_frag_t * recvfrag;
ompi_list_item_t *item;
int ret;
/* get a fragment header */
OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags,
item, ret);
recvfrag = (mca_ptl_portals_recv_frag_t*) item;
if (OMPI_SUCCESS != ret) {
ompi_output(mca_ptl_portals_component.portals_output,
"unable to allocate resources");
return NULL;
}
/* save the sender */
recvfrag->frag_source = ev->initiator;
recvfrag->frag_data = ((char*) hdr) + header_size;
recvfrag->frag_size = ev->mlength - header_size;
memcpy(&(recvfrag->frag_recv.frag_base.frag_header),
hdr, header_size);
recvfrag->frag_recv.frag_base.frag_owner = &(ptl->super);
recvfrag->frag_recv.frag_base.frag_peer = NULL; /* BWB - fix me */
recvfrag->frag_recv.frag_base.frag_size = 0;
recvfrag->frag_recv.frag_base.frag_addr = recvfrag->frag_data;
recvfrag->frag_recv.frag_is_buffered = true;
return recvfrag;
}
static inline int
mca_ptl_portals_process_first_frag(struct mca_ptl_portals_module_t *ptl,
mca_ptl_base_header_t *hdr,
ptl_event_t *ev,
size_t header_size)
{
mca_ptl_portals_recv_frag_t *recvfrag;
recvfrag = mca_ptl_portals_recv_get_frag(ptl, hdr, ev, header_size);
if (NULL == recvfrag) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
recvfrag->frag_recv.frag_request = NULL;
ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv,
&hdr->hdr_match);
return OMPI_SUCCESS;
}
static inline int
mca_ptl_portals_process_frag_frag(struct mca_ptl_portals_module_t *ptl,
mca_ptl_base_header_t *hdr,
ptl_event_t *ev)
{
unsigned int bytes_delivered;
mca_ptl_base_recv_request_t* request;
mca_ptl_portals_recv_frag_t *recvfrag;
/* get a frag and fill it in */
recvfrag = mca_ptl_portals_recv_get_frag(ptl, hdr, ev,
sizeof(mca_ptl_base_frag_header_t));
if (NULL == recvfrag) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
recvfrag->frag_recv.frag_request = hdr->hdr_frag.hdr_dst_ptr.pval;
bytes_delivered = recvfrag->frag_size;
request = recvfrag->frag_recv.frag_request;
if (recvfrag->frag_size > 0) {
struct iovec iov;
unsigned int iov_count = 1;
int free_after = 0;
ompi_proc_t *proc = ompi_comm_peer_lookup(
request->req_recv.req_base.req_comm,
request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE);
ompi_convertor_t* convertor =
&(recvfrag->frag_recv.frag_base.frag_convertor);
/* initialize receive convertor */
ompi_convertor_copy(proc->proc_convertor, convertor);
ompi_convertor_init_for_recv(
convertor, /* convertor */
0, /* flags */
request->req_recv.req_base.req_datatype, /* datatype */
request->req_recv.req_base.req_count, /* count elements */
request->req_recv.req_base.req_addr, /* users buffer */
hdr->hdr_frag.hdr_frag_offset, /* offset in bytes into packed buffer */
NULL ); /* not allocating memory */
iov.iov_base = recvfrag->frag_data;
iov.iov_len = recvfrag->frag_size;
ompi_convertor_unpack(convertor, &iov, &iov_count,
&bytes_delivered, &free_after );
}
/* update request status */
ptl->super.ptl_recv_progress(&ptl->super,
request,
recvfrag->frag_size,
bytes_delivered);
return OMPI_SUCCESS;
}
#endif

Просмотреть файл

@ -96,7 +96,7 @@ mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl_base,
sendreq->req_send.req_addr,
offset,
mca_ptl_portals_alloc );
/* if data is contigous convertor will return an offset
* into users buffer - otherwise will return an allocated buffer
* that holds the packed data
@ -167,7 +167,6 @@ mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl_base,
sendfrag->frag_send.frag_base.frag_owner = ptl_base;
sendfrag->frag_send.frag_request = sendreq;
sendfrag->frag_send.frag_base.frag_peer = ptl_peer;
/* must update the offset after actual fragment size is determined
* before attempting to send the fragment