diff --git a/orte/mca/iof/base/iof_base_fragment.c b/orte/mca/iof/base/iof_base_fragment.c index 832d943e2f..8dc5abe2c0 100644 --- a/orte/mca/iof/base/iof_base_fragment.c +++ b/orte/mca/iof/base/iof_base_fragment.c @@ -83,6 +83,7 @@ static void orte_iof_base_frag_send_cb( void* cbdata) { orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata; + opal_output(orte_iof_base.iof_output, "iof_base_fragment: ACK send done"); ORTE_IOF_BASE_FRAG_RETURN(frag); } @@ -101,6 +102,7 @@ int _orte_iof_base_frag_ack(orte_iof_base_frag_t* frag, bool do_close, ORTE_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg); /* start non-blocking OOB call to forward header */ + opal_output(orte_iof_base.iof_output, "iof_base_fragment: sending ACK"); rc = orte_rml.send_nb( &frag->frag_src, frag->frag_iov, diff --git a/orte/mca/iof/svc/iof_svc_proxy.c b/orte/mca/iof/svc/iof_svc_proxy.c index eaaa8ef60c..e9186c2782 100644 --- a/orte/mca/iof/svc/iof_svc_proxy.c +++ b/orte/mca/iof/svc/iof_svc_proxy.c @@ -154,7 +154,7 @@ static void orte_iof_svc_proxy_msg( unsigned char* data) { opal_list_item_t* item; - bool forward = false; + bool forwarded_at_all = false, forward = false; opal_output(orte_iof_base.iof_output, "orte_iof_svc_proxy_msg: tag %d seq %d", hdr->msg_tag,hdr->msg_seq); @@ -173,8 +173,8 @@ static void orte_iof_svc_proxy_msg( /* if the subscription origin doesn't match the message's origin, skip this subscription */ - if(orte_ns.compare_fields(sub->origin_mask,&sub->origin_name,&hdr->msg_origin) == 0) { - opal_output(orte_iof_base.iof_output, "sub origin %s, msg origin %s, msg proxy %s orte_iof_svc_proxy_msg: tag %d sequence %d, len %d\n", + if(0 == orte_ns.compare_fields(sub->origin_mask,&sub->origin_name,&hdr->msg_origin)) { + opal_output(orte_iof_base.iof_output, "sub MATCH: origin %s, msg origin %s, msg proxy %s orte_iof_svc_proxy_msg: tag %d sequence %d, len %d", ORTE_NAME_PRINT(&sub->origin_name), ORTE_NAME_PRINT(&hdr->msg_origin), ORTE_NAME_PRINT(&hdr->msg_proxy), @@ -183,16 +183,21 @@ static void orte_iof_svc_proxy_msg( OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock); orte_iof_svc_sub_forward(sub,peer,hdr,data,&forward); OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock); + + if (forward) { + forwarded_at_all = true; + } } } OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock); /* If there was no one to forward to, then we effectively drop it. But ACK it so that the sender doesn't block. */ - if(forward == false) { + if (!forwarded_at_all) { orte_iof_base_frag_t* frag; int rc; + opal_output(orte_iof_base.iof_output, "no sub match found -- dropped"); ORTE_IOF_BASE_FRAG_ALLOC(frag,rc); if(NULL == frag) { ORTE_ERROR_LOG(rc); diff --git a/orte/mca/iof/svc/iof_svc_sub.c b/orte/mca/iof/svc/iof_svc_sub.c index 498a754dd7..803015aee7 100644 --- a/orte/mca/iof/svc/iof_svc_sub.c +++ b/orte/mca/iof/svc/iof_svc_sub.c @@ -212,6 +212,32 @@ void orte_iof_svc_sub_ack( last_ack_forwarded = sub->last_ack_forwarded; } } + opal_output(orte_iof_base.iof_output, + "ack: has_beed_acked: %d, last forwarded %d", + has_been_acked, last_ack_forwarded); + + /* If the subscription has a local endpoint and the ACK is + coming from this process, then update the seq_min + calculation */ + if (NULL != sub->sub_endpoint && + 0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, + orte_process_info.my_name, peer)) { + if (do_close) { + /* JMS what to do here? Need to set sub->sub_endpoint + to NULL. Have similar leak for do_close for + streams. See ticket #1048. */ + sub->sub_endpoint = NULL; + opal_output(orte_iof_base.iof_output, + "ack: CLOSED local ack to %u", value.uval); + } else { + value.uval = hdr->msg_seq + hdr->msg_len; + opal_output(orte_iof_base.iof_output, + "ack: local ack to %u", value.uval); + if (value.uval < seq_min) { + seq_min = value.uval; + } + } + } /* Find the minimum amount ack'ed by all the origins (or, technically speaking, ack'ed by their proxies on their @@ -282,13 +308,20 @@ void orte_iof_svc_sub_ack( /* If everyone has ACK'ed, then push the ACK up to the original message's proxy */ - if(seq_min == hdr->msg_seq+hdr->msg_len) { + if (seq_min == hdr->msg_seq+hdr->msg_len) { /* If the original message was initiated from this process, then the ACK delivery is local. */ - if(orte_ns.compare_fields(ORTE_NS_CMP_ALL,orte_process_info.my_name,&hdr->msg_origin) == 0) { + if (0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, + orte_process_info.my_name, + &hdr->msg_origin) || + 0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, + orte_process_info.my_name, + &hdr->msg_proxy)) { orte_iof_base_endpoint_t* endpoint; - endpoint = orte_iof_base_endpoint_match(&hdr->msg_origin, ORTE_NS_CMP_ALL, hdr->msg_tag); - if(endpoint != NULL) { + endpoint = orte_iof_base_endpoint_match(&hdr->msg_origin, + ORTE_NS_CMP_ALL, + hdr->msg_tag); + if (NULL != endpoint) { opal_output(orte_iof_base.iof_output, "ack: forwarding ack locally: %u", seq_min); orte_iof_base_endpoint_ack(endpoint, seq_min); @@ -426,11 +459,13 @@ int orte_iof_svc_sub_forward( orte_iof_svc_pub_t* pub = fwd->fwd_pub; int rc; - if(pub->pub_endpoint != NULL) { + if (NULL != pub->pub_endpoint) { + opal_output(orte_iof_base.iof_output, "sub_forward: forwarding to pub local endpoint"); rc = orte_iof_base_endpoint_forward(pub->pub_endpoint,src,hdr,data); } else { /* forward */ orte_iof_base_frag_t* frag; + opal_output(orte_iof_base.iof_output, "sub_forward: forwarding to pub stream / remote endpoint"); ORTE_IOF_BASE_FRAG_ALLOC(frag,rc); frag->frag_hdr.hdr_msg = *hdr; frag->frag_len = frag->frag_hdr.hdr_msg.msg_len; @@ -449,12 +484,13 @@ int orte_iof_svc_sub_forward( orte_iof_svc_sub_send_cb, frag); } - if(rc != ORTE_SUCCESS) { + if (ORTE_SUCCESS != rc) { return rc; } *forward = true; } - if(sub->sub_endpoint != NULL) { + if (NULL != sub->sub_endpoint) { + opal_output(orte_iof_base.iof_output, "sub_forward: forwarding to sub local endpoint"); *forward = true; return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data); }