The HNP changing into an orted brought a bug in the iof svc component
to light: we weren't ack'ing properly for streams that originated (or originated via proxy) and terminated within the HNP. This commit fixes that. It also fixes a few style issues, and added some more opal_outputs for debugging. Also, fixed a bug where the fact that we forwarded (and therefore might need to update the ack) was not correctly reported if there were multiple forwards (which there are not as the system is currently using IOF, but there could be). Refs trac:1098 -- want to get another pair of eyes to look at this before I close the ticket. This commit was SVN r15730. The following Trac tickets were found above: Ticket 1098 --> https://svn.open-mpi.org/trac/ompi/ticket/1098
Этот коммит содержится в:
родитель
8185de6383
Коммит
8d4b6c7b0d
@ -83,6 +83,7 @@ static void orte_iof_base_frag_send_cb(
|
||||
void* cbdata)
|
||||
{
|
||||
orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata;
|
||||
opal_output(orte_iof_base.iof_output, "iof_base_fragment: ACK send done");
|
||||
ORTE_IOF_BASE_FRAG_RETURN(frag);
|
||||
}
|
||||
|
||||
@ -101,6 +102,7 @@ int _orte_iof_base_frag_ack(orte_iof_base_frag_t* frag, bool do_close,
|
||||
ORTE_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg);
|
||||
|
||||
/* start non-blocking OOB call to forward header */
|
||||
opal_output(orte_iof_base.iof_output, "iof_base_fragment: sending ACK");
|
||||
rc = orte_rml.send_nb(
|
||||
&frag->frag_src,
|
||||
frag->frag_iov,
|
||||
|
@ -154,7 +154,7 @@ static void orte_iof_svc_proxy_msg(
|
||||
unsigned char* data)
|
||||
{
|
||||
opal_list_item_t* item;
|
||||
bool forward = false;
|
||||
bool forwarded_at_all = false, forward = false;
|
||||
opal_output(orte_iof_base.iof_output,
|
||||
"orte_iof_svc_proxy_msg: tag %d seq %d",
|
||||
hdr->msg_tag,hdr->msg_seq);
|
||||
@ -173,8 +173,8 @@ static void orte_iof_svc_proxy_msg(
|
||||
|
||||
/* if the subscription origin doesn't match the message's
|
||||
origin, skip this subscription */
|
||||
if(orte_ns.compare_fields(sub->origin_mask,&sub->origin_name,&hdr->msg_origin) == 0) {
|
||||
opal_output(orte_iof_base.iof_output, "sub origin %s, msg origin %s, msg proxy %s orte_iof_svc_proxy_msg: tag %d sequence %d, len %d\n",
|
||||
if(0 == orte_ns.compare_fields(sub->origin_mask,&sub->origin_name,&hdr->msg_origin)) {
|
||||
opal_output(orte_iof_base.iof_output, "sub MATCH: origin %s, msg origin %s, msg proxy %s orte_iof_svc_proxy_msg: tag %d sequence %d, len %d",
|
||||
ORTE_NAME_PRINT(&sub->origin_name),
|
||||
ORTE_NAME_PRINT(&hdr->msg_origin),
|
||||
ORTE_NAME_PRINT(&hdr->msg_proxy),
|
||||
@ -183,16 +183,21 @@ static void orte_iof_svc_proxy_msg(
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
|
||||
orte_iof_svc_sub_forward(sub,peer,hdr,data,&forward);
|
||||
OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
|
||||
|
||||
if (forward) {
|
||||
forwarded_at_all = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
|
||||
|
||||
/* If there was no one to forward to, then we effectively drop it.
|
||||
But ACK it so that the sender doesn't block. */
|
||||
if(forward == false) {
|
||||
if (!forwarded_at_all) {
|
||||
orte_iof_base_frag_t* frag;
|
||||
int rc;
|
||||
|
||||
opal_output(orte_iof_base.iof_output, "no sub match found -- dropped");
|
||||
ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);
|
||||
if(NULL == frag) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
|
@ -212,6 +212,32 @@ void orte_iof_svc_sub_ack(
|
||||
last_ack_forwarded = sub->last_ack_forwarded;
|
||||
}
|
||||
}
|
||||
opal_output(orte_iof_base.iof_output,
|
||||
"ack: has_beed_acked: %d, last forwarded %d",
|
||||
has_been_acked, last_ack_forwarded);
|
||||
|
||||
/* If the subscription has a local endpoint and the ACK is
|
||||
coming from this process, then update the seq_min
|
||||
calculation */
|
||||
if (NULL != sub->sub_endpoint &&
|
||||
0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL,
|
||||
orte_process_info.my_name, peer)) {
|
||||
if (do_close) {
|
||||
/* JMS what to do here? Need to set sub->sub_endpoint
|
||||
to NULL. Have similar leak for do_close for
|
||||
streams. See ticket #1048. */
|
||||
sub->sub_endpoint = NULL;
|
||||
opal_output(orte_iof_base.iof_output,
|
||||
"ack: CLOSED local ack to %u", value.uval);
|
||||
} else {
|
||||
value.uval = hdr->msg_seq + hdr->msg_len;
|
||||
opal_output(orte_iof_base.iof_output,
|
||||
"ack: local ack to %u", value.uval);
|
||||
if (value.uval < seq_min) {
|
||||
seq_min = value.uval;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Find the minimum amount ack'ed by all the origins (or,
|
||||
technically speaking, ack'ed by their proxies on their
|
||||
@ -282,13 +308,20 @@ void orte_iof_svc_sub_ack(
|
||||
|
||||
/* If everyone has ACK'ed, then push the ACK up to the original
|
||||
message's proxy */
|
||||
if(seq_min == hdr->msg_seq+hdr->msg_len) {
|
||||
if (seq_min == hdr->msg_seq+hdr->msg_len) {
|
||||
/* If the original message was initiated from this process,
|
||||
then the ACK delivery is local. */
|
||||
if(orte_ns.compare_fields(ORTE_NS_CMP_ALL,orte_process_info.my_name,&hdr->msg_origin) == 0) {
|
||||
if (0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL,
|
||||
orte_process_info.my_name,
|
||||
&hdr->msg_origin) ||
|
||||
0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL,
|
||||
orte_process_info.my_name,
|
||||
&hdr->msg_proxy)) {
|
||||
orte_iof_base_endpoint_t* endpoint;
|
||||
endpoint = orte_iof_base_endpoint_match(&hdr->msg_origin, ORTE_NS_CMP_ALL, hdr->msg_tag);
|
||||
if(endpoint != NULL) {
|
||||
endpoint = orte_iof_base_endpoint_match(&hdr->msg_origin,
|
||||
ORTE_NS_CMP_ALL,
|
||||
hdr->msg_tag);
|
||||
if (NULL != endpoint) {
|
||||
opal_output(orte_iof_base.iof_output,
|
||||
"ack: forwarding ack locally: %u", seq_min);
|
||||
orte_iof_base_endpoint_ack(endpoint, seq_min);
|
||||
@ -426,11 +459,13 @@ int orte_iof_svc_sub_forward(
|
||||
orte_iof_svc_pub_t* pub = fwd->fwd_pub;
|
||||
int rc;
|
||||
|
||||
if(pub->pub_endpoint != NULL) {
|
||||
if (NULL != pub->pub_endpoint) {
|
||||
opal_output(orte_iof_base.iof_output, "sub_forward: forwarding to pub local endpoint");
|
||||
rc = orte_iof_base_endpoint_forward(pub->pub_endpoint,src,hdr,data);
|
||||
} else {
|
||||
/* forward */
|
||||
orte_iof_base_frag_t* frag;
|
||||
opal_output(orte_iof_base.iof_output, "sub_forward: forwarding to pub stream / remote endpoint");
|
||||
ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);
|
||||
frag->frag_hdr.hdr_msg = *hdr;
|
||||
frag->frag_len = frag->frag_hdr.hdr_msg.msg_len;
|
||||
@ -449,12 +484,13 @@ int orte_iof_svc_sub_forward(
|
||||
orte_iof_svc_sub_send_cb,
|
||||
frag);
|
||||
}
|
||||
if(rc != ORTE_SUCCESS) {
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
*forward = true;
|
||||
}
|
||||
if(sub->sub_endpoint != NULL) {
|
||||
if (NULL != sub->sub_endpoint) {
|
||||
opal_output(orte_iof_base.iof_output, "sub_forward: forwarding to sub local endpoint");
|
||||
*forward = true;
|
||||
return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data);
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user