handle receive of user data
This commit was SVN r5942.
Этот коммит содержится в:
родитель
9e9a93e2ab
Коммит
bd59bb4a16
@ -499,9 +499,7 @@ int mca_pml_ob1_recv_frag_match(
|
|||||||
|
|
||||||
/* release matching lock before processing fragment */
|
/* release matching lock before processing fragment */
|
||||||
if(match != NULL) {
|
if(match != NULL) {
|
||||||
match->req_recv.req_bytes_packed = hdr->hdr_msg_length;
|
MCA_PML_OB1_RECV_REQUEST_MATCHED(match, hdr);
|
||||||
match->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag;
|
|
||||||
match->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src;
|
|
||||||
mca_pml_ob1_recv_request_progress(match,bmi,segments,num_segments);
|
mca_pml_ob1_recv_request_progress(match,bmi,segments,num_segments);
|
||||||
} else {
|
} else {
|
||||||
ompi_output(0, "match not found\n");
|
ompi_output(0, "match not found\n");
|
||||||
@ -510,9 +508,7 @@ int mca_pml_ob1_recv_frag_match(
|
|||||||
ompi_list_item_t* item;
|
ompi_list_item_t* item;
|
||||||
while(NULL != (item = ompi_list_remove_first(&additional_matches))) {
|
while(NULL != (item = ompi_list_remove_first(&additional_matches))) {
|
||||||
mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item;
|
mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item;
|
||||||
frag->request->req_recv.req_bytes_packed = hdr->hdr_msg_length;
|
MCA_PML_OB1_RECV_REQUEST_MATCHED(frag->request, hdr);
|
||||||
frag->request->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag;
|
|
||||||
frag->request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src;
|
|
||||||
mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments);
|
mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments);
|
||||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||||
}
|
}
|
||||||
|
@ -375,10 +375,7 @@ static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
|
|||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
find_fragment:
|
find_fragment:
|
||||||
request->req_recv.req_bytes_packed = hdr->hdr_msg_length;
|
MCA_PML_OB1_RECV_REQUEST_MATCHED(request, hdr);
|
||||||
request->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag;
|
|
||||||
request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src;
|
|
||||||
|
|
||||||
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
|
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
|
||||||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
|
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
|
||||||
ompi_list_remove_item(unexpected_frags, (ompi_list_item_t*)frag);
|
ompi_list_remove_item(unexpected_frags, (ompi_list_item_t*)frag);
|
||||||
|
@ -38,6 +38,7 @@ struct mca_pml_ob1_recv_request_t {
|
|||||||
* the last element of this struct.
|
* the last element of this struct.
|
||||||
*/
|
*/
|
||||||
mca_bmi_base_descriptor_t *req_pipeline[1];
|
mca_bmi_base_descriptor_t *req_pipeline[1];
|
||||||
|
ompi_convertor_t req_convertor;
|
||||||
};
|
};
|
||||||
typedef struct mca_pml_ob1_recv_request_t mca_pml_ob1_recv_request_t;
|
typedef struct mca_pml_ob1_recv_request_t mca_pml_ob1_recv_request_t;
|
||||||
|
|
||||||
@ -152,6 +153,37 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define MCA_PML_OB1_RECV_REQUEST_MATCHED( \
|
||||||
|
request, \
|
||||||
|
hdr) \
|
||||||
|
{ \
|
||||||
|
(request)->req_recv.req_bytes_packed = (hdr)->hdr_msg_length; \
|
||||||
|
(request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \
|
||||||
|
(request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = (hdr)->hdr_src; \
|
||||||
|
\
|
||||||
|
if((request)->req_recv.req_bytes_packed != 0) { \
|
||||||
|
ompi_proc_t *proc = \
|
||||||
|
ompi_comm_peer_lookup( \
|
||||||
|
(request)->req_recv.req_base.req_comm, (hdr)->hdr_src); \
|
||||||
|
\
|
||||||
|
ompi_convertor_copy(proc->proc_convertor, \
|
||||||
|
&(request)->req_convertor); \
|
||||||
|
ompi_convertor_init_for_recv( \
|
||||||
|
&(request->req_convertor), /* convertor */ \
|
||||||
|
0, /* flags */ \
|
||||||
|
(request)->req_recv.req_base.req_datatype, /* datatype */ \
|
||||||
|
(request)->req_recv.req_base.req_count, /* count elements */ \
|
||||||
|
(request)->req_recv.req_base.req_addr, /* users buffer */ \
|
||||||
|
0, /* offset in bytes into packed buffer */ \
|
||||||
|
NULL ); /* not allocating memory */ \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@ -165,8 +197,25 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
|||||||
bytes_received, \
|
bytes_received, \
|
||||||
bytes_delivered) \
|
bytes_delivered) \
|
||||||
{ \
|
{ \
|
||||||
/* FIX */ \
|
if(request->req_recv.req_base.req_count > 0) { \
|
||||||
bytes_delivered = bytes_received; \
|
struct iovec iov[MCA_BMI_DES_MAX_SEGMENTS]; \
|
||||||
|
uint32_t iov_count = num_segments; \
|
||||||
|
uint32_t max_data = 0; \
|
||||||
|
int32_t free_after = 0; \
|
||||||
|
size_t i; \
|
||||||
|
for(i=0; i<num_segments; i++) { \
|
||||||
|
iov[i].iov_base = segments[i].seg_addr.pval; \
|
||||||
|
iov[i].iov_len = segments[i].seg_len; \
|
||||||
|
bytes_received += segments[i].seg_len; \
|
||||||
|
} \
|
||||||
|
ompi_convertor_unpack( \
|
||||||
|
&(request)->req_convertor, \
|
||||||
|
iov, \
|
||||||
|
&iov_count, \
|
||||||
|
&max_data, \
|
||||||
|
&free_after); \
|
||||||
|
bytes_delivered = max_data; \
|
||||||
|
} \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -78,9 +78,14 @@ static void mca_pml_ob1_send_completion(
|
|||||||
int status)
|
int status)
|
||||||
{
|
{
|
||||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
||||||
|
mca_pml_ob1_endpoint_t* bmi_ep = sendreq->req_endpoint;
|
||||||
|
|
||||||
/* for now - return the descriptor - may cache these at some point */
|
/* for now - return the descriptor - may cache these at some point */
|
||||||
|
if(NULL == bmi_ep->bmi_cache) {
|
||||||
|
bmi_ep->bmi_cache = descriptor;
|
||||||
|
} else {
|
||||||
bmi->bmi_free(bmi,descriptor);
|
bmi->bmi_free(bmi,descriptor);
|
||||||
|
}
|
||||||
|
|
||||||
/* check for request completion */
|
/* check for request completion */
|
||||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||||
@ -152,20 +157,21 @@ int mca_pml_ob1_send_request_start_copy(
|
|||||||
/* shortcut for zero byte */
|
/* shortcut for zero byte */
|
||||||
if(size == 0) {
|
if(size == 0) {
|
||||||
|
|
||||||
descriptor = endpoint->bmi_cache;
|
/* allocate a descriptor */
|
||||||
if(NULL != descriptor) {
|
if(NULL != (descriptor = endpoint->bmi_cache)) {
|
||||||
endpoint->bmi_cache = NULL;
|
endpoint->bmi_cache = NULL;
|
||||||
} else {
|
} else {
|
||||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t));
|
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t));
|
||||||
if(NULL == descriptor) {
|
if(NULL == descriptor) {
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
}
|
}
|
||||||
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
|
||||||
}
|
}
|
||||||
|
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
||||||
segment = descriptor->des_src;
|
segment = descriptor->des_src;
|
||||||
|
|
||||||
/* build hdr */
|
/* build hdr */
|
||||||
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
||||||
|
hdr->hdr_common.hdr_flags = 0;
|
||||||
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
||||||
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||||
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
||||||
@ -175,12 +181,10 @@ int mca_pml_ob1_send_request_start_copy(
|
|||||||
|
|
||||||
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
||||||
if (sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
if (sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
||||||
hdr->hdr_common.hdr_flags = 0;
|
|
||||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t);
|
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t);
|
||||||
ompi_request_complete((ompi_request_t*)sendreq);
|
ompi_request_complete((ompi_request_t*)sendreq);
|
||||||
} else {
|
} else {
|
||||||
hdr->hdr_common.hdr_flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
|
||||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
||||||
hdr->hdr_rndv.hdr_frag_length = 0;
|
hdr->hdr_rndv.hdr_frag_length = 0;
|
||||||
hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||||
@ -193,14 +197,14 @@ int mca_pml_ob1_send_request_start_copy(
|
|||||||
struct iovec iov;
|
struct iovec iov;
|
||||||
unsigned int iov_count;
|
unsigned int iov_count;
|
||||||
unsigned int max_data;
|
unsigned int max_data;
|
||||||
int flags = 0;
|
bool ack = false;
|
||||||
|
|
||||||
/* determine first fragment size */
|
/* determine first fragment size */
|
||||||
if(size > endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t)) {
|
if(size > endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t)) {
|
||||||
size = endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t);
|
size = endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t);
|
||||||
flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
ack = true;
|
||||||
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
||||||
flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
ack = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* allocate space for hdr + first fragment */
|
/* allocate space for hdr + first fragment */
|
||||||
@ -213,7 +217,7 @@ int mca_pml_ob1_send_request_start_copy(
|
|||||||
|
|
||||||
/* build hdr */
|
/* build hdr */
|
||||||
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
||||||
hdr->hdr_common.hdr_flags = flags;
|
hdr->hdr_common.hdr_flags = 0;
|
||||||
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
||||||
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||||
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
||||||
@ -222,7 +226,7 @@ int mca_pml_ob1_send_request_start_copy(
|
|||||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
||||||
|
|
||||||
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
||||||
if (flags == 0) {
|
if (ack == false) {
|
||||||
int32_t free_after;
|
int32_t free_after;
|
||||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||||
|
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user