* add ability to respond to RNDV packets with ACKs. short MPI_Ssends now
work properly. Still need to implement second fragment support This commit was SVN r5674.
Этот коммит содержится в:
родитель
956782670a
Коммит
2ec27c0927
@ -20,6 +20,8 @@
|
||||
#include "ptl_portals.h"
|
||||
#include "ptl_portals_compat.h"
|
||||
#include "ptl_portals_recv.h"
|
||||
#include "ptl_portals_send.h"
|
||||
#include "mca/ptl/base/ptl_base_sendreq.h"
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_ptl_portals_recv_frag_t,
|
||||
@ -103,6 +105,11 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl,
|
||||
"message %ld received, start: %p, mlength: %lld, offset: %lld",
|
||||
ev->link, ev->md.start, ev->mlength, ev->offset);
|
||||
|
||||
/* buffer is going to be header followed by data */
|
||||
hdr = (mca_ptl_base_header_t*) (((char*) ev->md.start) + ev->offset);
|
||||
switch (hdr->hdr_common.hdr_type) {
|
||||
|
||||
case MCA_PTL_HDR_TYPE_MATCH:
|
||||
/* get a fragment header */
|
||||
OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret);
|
||||
recvfrag = (mca_ptl_portals_recv_frag_t*) item;
|
||||
@ -112,11 +119,9 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl,
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* buffer is going to be header followed by data */
|
||||
hdr = (mca_ptl_base_header_t*) (((char*) ev->md.start) + ev->offset);
|
||||
switch (hdr->hdr_common.hdr_type) {
|
||||
/* save the sender */
|
||||
recvfrag->frag_source = ev->initiator;
|
||||
|
||||
case MCA_PTL_HDR_TYPE_MATCH:
|
||||
recvfrag->frag_data = ((mca_ptl_base_match_header_t*) hdr) + 1;
|
||||
recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_match_header_t);
|
||||
memcpy(&(recvfrag->frag_recv.frag_base.frag_header),
|
||||
@ -134,6 +139,18 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl,
|
||||
break;
|
||||
|
||||
case MCA_PTL_HDR_TYPE_RNDV:
|
||||
/* get a fragment header */
|
||||
OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_recv_frags, item, ret);
|
||||
recvfrag = (mca_ptl_portals_recv_frag_t*) item;
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
ompi_output(mca_ptl_portals_component.portals_output,
|
||||
"unable to allocate resources");
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
/* save the sender */
|
||||
recvfrag->frag_source = ev->initiator;
|
||||
|
||||
recvfrag->frag_data = ((mca_ptl_base_rendezvous_header_t*) hdr) + 1;
|
||||
recvfrag->frag_size = ev->mlength - sizeof(mca_ptl_base_rendezvous_header_t);
|
||||
memcpy(&(recvfrag->frag_recv.frag_base.frag_header),
|
||||
@ -151,6 +168,28 @@ mca_ptl_portals_process_recv_event(struct mca_ptl_portals_module_t *ptl,
|
||||
|
||||
break;
|
||||
|
||||
case MCA_PTL_HDR_TYPE_ACK:
|
||||
{
|
||||
mca_ptl_portals_send_frag_t *sendfrag;
|
||||
sendfrag = hdr->hdr_ack.hdr_src_ptr.pval;
|
||||
|
||||
sendfrag->frag_send.frag_base.frag_owner->
|
||||
ptl_send_progress(sendfrag->frag_send.frag_base.frag_owner,
|
||||
sendfrag->frag_send.frag_request,
|
||||
sendfrag->frag_send.frag_base.frag_size);
|
||||
|
||||
/* return frag to freelist if not part of request */
|
||||
if (sendfrag->frag_send.frag_request->req_cached == false) {
|
||||
if (sendfrag->frag_send.frag_base.frag_addr == NULL) {
|
||||
free(sendfrag->frag_send.frag_base.frag_addr);
|
||||
}
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_portals_component.portals_send_frags,
|
||||
(ompi_list_item_t*) sendfrag);
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
ompi_output(mca_ptl_portals_component.portals_output,
|
||||
"unable to deal with header of type %d",
|
||||
@ -194,7 +233,7 @@ mca_ptl_portals_matched(struct mca_ptl_base_module_t *ptl_base,
|
||||
|
||||
/* generate an acknowledgment if required */
|
||||
if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) {
|
||||
;
|
||||
mca_ptl_portals_send_ack(ptl, recvfrag);
|
||||
}
|
||||
|
||||
/* copy data into users buffer */
|
||||
|
@ -29,6 +29,7 @@ struct mca_ptl_portals_recv_frag_t {
|
||||
mca_ptl_base_recv_frag_t frag_recv; /**< base receive fragment descriptor */
|
||||
void *frag_data;
|
||||
size_t frag_size;
|
||||
ptl_process_id_t frag_source;
|
||||
};
|
||||
typedef struct mca_ptl_portals_recv_frag_t mca_ptl_portals_recv_frag_t;
|
||||
|
||||
|
@ -63,8 +63,6 @@ mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl_base,
|
||||
mca_ptl_portals_send_frag_t* sendfrag;
|
||||
mca_ptl_base_header_t* hdr;
|
||||
int ret;
|
||||
ptl_md_t md;
|
||||
ptl_handle_md_t md_handle;
|
||||
|
||||
ompi_output_verbose(100, mca_ptl_portals_component.portals_output,
|
||||
"mca_ptl_portals_send to %lu, %lu",
|
||||
@ -173,41 +171,7 @@ mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl_base,
|
||||
mca_ptl_base_send_request_offset(sendreq,
|
||||
sendfrag->frag_send.frag_base.frag_size);
|
||||
|
||||
/* setup the send and go */
|
||||
md.start = sendfrag->frag_vector;
|
||||
md.length = 2; /* header + data */
|
||||
md.threshold = PTL_MD_THRESH_INF; /* unlink based on protocol */
|
||||
md.max_size = 0;
|
||||
md.options = PTL_MD_IOVEC; /* BWB - can we optimize? */
|
||||
md.user_ptr = sendfrag;
|
||||
md.eq_handle = ptl->frag_eq_handle;
|
||||
|
||||
/* make a free-floater */
|
||||
ret = PtlMDBind(ptl->ni_handle,
|
||||
md,
|
||||
PTL_UNLINK,
|
||||
&md_handle);
|
||||
if (ret != PTL_OK) {
|
||||
ompi_output(mca_ptl_portals_component.portals_output,
|
||||
"PtlMDBind failed with error %d", ret);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
ret = PtlPut(md_handle,
|
||||
PTL_ACK_REQ,
|
||||
*((ptl_process_id_t*) ptl_peer),
|
||||
PTL_PORTALS_FRAG_TABLE_ID,
|
||||
0, /* ac_index */
|
||||
0, /* match bits */
|
||||
0, /* remote offset - not used */
|
||||
0); /* hdr_data - not used */
|
||||
if (ret != PTL_OK) {
|
||||
ompi_output(mca_ptl_portals_component.portals_output,
|
||||
"PtlPut failed with error %d", ret);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return mca_ptl_portals_send_frag(ptl, sendfrag);
|
||||
}
|
||||
|
||||
|
||||
@ -233,13 +197,20 @@ mca_ptl_portals_process_send_event(ptl_event_t *ev)
|
||||
ompi_output_verbose(100, mca_ptl_portals_component.portals_output,
|
||||
"ACK event for msg %d",
|
||||
(int) hdr->hdr_match.hdr_msg_seq);
|
||||
|
||||
/* discard ACKs for acks */
|
||||
if (frag->frag_send.frag_request == NULL) {
|
||||
if (frag->frag_send.frag_base.frag_addr != NULL) {
|
||||
free(frag->frag_send.frag_base.frag_addr);
|
||||
}
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_portals_component.portals_send_frags,
|
||||
(ompi_list_item_t*) frag);
|
||||
} else {
|
||||
|
||||
frag_ack = (hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) ? true : false;
|
||||
if (frag_ack == false) {
|
||||
/* this frag is done! */
|
||||
|
||||
/* unlink memory descriptor */
|
||||
PtlMDUnlink(ev->md_handle);
|
||||
|
||||
/* let the PML know */
|
||||
frag->frag_send.frag_base.frag_owner->
|
||||
ptl_send_progress(frag->frag_send.frag_base.frag_owner,
|
||||
@ -248,7 +219,7 @@ mca_ptl_portals_process_send_event(ptl_event_t *ev)
|
||||
|
||||
/* return frag to freelist if not part of request */
|
||||
if (frag->frag_send.frag_request->req_cached == false) {
|
||||
if (frag->frag_send.frag_base.frag_addr == NULL) {
|
||||
if (frag->frag_send.frag_base.frag_addr != NULL) {
|
||||
free(frag->frag_send.frag_base.frag_addr);
|
||||
}
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_portals_component.portals_send_frags,
|
||||
@ -256,9 +227,12 @@ mca_ptl_portals_process_send_event(ptl_event_t *ev)
|
||||
}
|
||||
} else {
|
||||
/* need to wait for the ack... */
|
||||
ompi_list_append(&mca_ptl_portals_component.portals_pending_acks,
|
||||
(ompi_list_item_t*) frag);
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
/* unlink memory descriptor */
|
||||
PtlMDUnlink(ev->md_handle);
|
||||
|
||||
} else {
|
||||
ompi_output_verbose(10, mca_ptl_portals_component.portals_output,
|
||||
|
@ -19,7 +19,8 @@
|
||||
#define MCA_PTL_PORTALS_SENDFRAG_H_
|
||||
|
||||
#include "mca/ptl/base/ptl_base_sendfrag.h"
|
||||
|
||||
#include "mca/ptl/base/ptl_base_recvfrag.h"
|
||||
#include "ptl_portals_recv.h"
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
@ -30,7 +31,6 @@ extern "C" {
|
||||
ptl_md_iovec_t frag_vector[2];
|
||||
};
|
||||
typedef struct mca_ptl_portals_send_frag_t mca_ptl_portals_send_frag_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION (mca_ptl_portals_send_frag_t);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
@ -39,4 +39,99 @@ extern "C" {
|
||||
|
||||
extern int mca_ptl_portals_process_send_event(ptl_event_t *ev);
|
||||
|
||||
|
||||
static inline int
|
||||
mca_ptl_portals_send_frag(struct mca_ptl_portals_module_t *ptl,
|
||||
mca_ptl_portals_send_frag_t* sendfrag)
|
||||
{
|
||||
ptl_md_t md;
|
||||
ptl_handle_md_t md_handle;
|
||||
int ret;
|
||||
|
||||
/* setup the send and go */
|
||||
md.start = sendfrag->frag_vector;
|
||||
md.length = 2; /* header + data */
|
||||
md.threshold = PTL_MD_THRESH_INF; /* unlink based on protocol */
|
||||
md.max_size = 0;
|
||||
md.options = PTL_MD_IOVEC; /* BWB - can we optimize? */
|
||||
md.user_ptr = sendfrag;
|
||||
md.eq_handle = ptl->frag_eq_handle;
|
||||
|
||||
/* make a free-floater */
|
||||
ret = PtlMDBind(ptl->ni_handle,
|
||||
md,
|
||||
PTL_UNLINK,
|
||||
&md_handle);
|
||||
if (ret != PTL_OK) {
|
||||
ompi_output(mca_ptl_portals_component.portals_output,
|
||||
"PtlMDBind failed with error %d", ret);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
ret = PtlPut(md_handle,
|
||||
PTL_ACK_REQ,
|
||||
*((ptl_process_id_t*) sendfrag->frag_send.frag_base.frag_peer),
|
||||
PTL_PORTALS_FRAG_TABLE_ID,
|
||||
0, /* ac_index */
|
||||
0, /* match bits */
|
||||
0, /* remote offset - not used */
|
||||
0); /* hdr_data - not used */
|
||||
if (ret != PTL_OK) {
|
||||
ompi_output(mca_ptl_portals_component.portals_output,
|
||||
"PtlPut failed with error %d", ret);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
|
||||
static inline int
|
||||
mca_ptl_portals_send_ack(struct mca_ptl_portals_module_t *ptl,
|
||||
mca_ptl_portals_recv_frag_t* recvfrag)
|
||||
{
|
||||
mca_ptl_base_header_t* hdr;
|
||||
mca_ptl_portals_send_frag_t* sendfrag;
|
||||
ompi_list_item_t *item;
|
||||
mca_ptl_base_recv_request_t* request = recvfrag->frag_recv.frag_request;
|
||||
int ret;
|
||||
|
||||
/* get a fragment */
|
||||
OMPI_FREE_LIST_GET(&mca_ptl_portals_component.portals_send_frags,
|
||||
item, ret);
|
||||
if (NULL == item) {
|
||||
/* BWB - fix me */
|
||||
return ret;
|
||||
}
|
||||
sendfrag = (mca_ptl_portals_send_frag_t *) item;
|
||||
|
||||
sendfrag->frag_vector[1].iov_base = NULL;
|
||||
sendfrag->frag_vector[1].iov_len = 0;
|
||||
|
||||
/* setup message header */
|
||||
hdr = &sendfrag->frag_send.frag_base.frag_header;
|
||||
|
||||
hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
|
||||
hdr->hdr_ack.hdr_common.hdr_flags = 0;
|
||||
|
||||
hdr->hdr_ack.hdr_src_ptr = recvfrag->frag_recv.frag_base.frag_header.hdr_rndv.hdr_src_ptr;
|
||||
hdr->hdr_ack.hdr_dst_match.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_ack.hdr_dst_match.pval = request;
|
||||
hdr->hdr_ack.hdr_dst_addr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_ack.hdr_dst_addr.pval = request->req_recv.req_base.req_addr;
|
||||
hdr->hdr_ack.hdr_dst_size = request->req_recv.req_bytes_packed;
|
||||
|
||||
sendfrag->frag_send.frag_request = NULL;
|
||||
sendfrag->frag_send.frag_base.frag_peer = (struct mca_ptl_base_peer_t*) &(recvfrag->frag_source);
|
||||
sendfrag->frag_send.frag_base.frag_owner = &ptl->super;
|
||||
sendfrag->frag_send.frag_base.frag_addr = NULL;
|
||||
sendfrag->frag_send.frag_base.frag_size = 0;
|
||||
|
||||
sendfrag->frag_vector[0].iov_len = sizeof(mca_ptl_base_ack_header_t);
|
||||
|
||||
return mca_ptl_portals_send_frag(ptl, sendfrag);
|
||||
}
|
||||
|
||||
|
||||
#endif /* MCA_PTL_PORTALS_SENDFRAG_H_ */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user