- cleanup on loss of connection to peer
- generate ack if no one to forward msg to This commit was SVN r7651.
Этот коммит содержится в:
родитель
3280f6e655
Коммит
797922fbab
@ -23,6 +23,8 @@
|
|||||||
#include "mca/rml/rml_types.h"
|
#include "mca/rml/rml_types.h"
|
||||||
#include "iof_svc.h"
|
#include "iof_svc.h"
|
||||||
#include "iof_svc_proxy.h"
|
#include "iof_svc_proxy.h"
|
||||||
|
#include "iof_svc_pub.h"
|
||||||
|
#include "iof_svc_sub.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local functions
|
* Local functions
|
||||||
@ -123,7 +125,22 @@ static int orte_iof_svc_close(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback when peer is disconnected
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void
|
||||||
|
orte_iof_svc_exception_handler(const orte_process_name_t* peer, orte_rml_exception_t reason)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "orte_iof_svc_exception_handler [%lu,%lu,%lu]\n", ORTE_NAME_ARGS(peer));
|
||||||
|
orte_iof_svc_sub_delete_all(peer);
|
||||||
|
orte_iof_svc_pub_delete_all(peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Module Initialization
|
||||||
|
*/
|
||||||
|
|
||||||
static orte_iof_base_module_t*
|
static orte_iof_base_module_t*
|
||||||
orte_iof_svc_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads)
|
orte_iof_svc_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads)
|
||||||
@ -158,6 +175,8 @@ orte_iof_svc_init(int* priority, bool *allow_multi_user_threads, bool *have_hidd
|
|||||||
opal_output(0, "orte_iof_svc_init: unable to post non-blocking recv");
|
opal_output(0, "orte_iof_svc_init: unable to post non-blocking recv");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rc = orte_rml.add_exception_handler(orte_iof_svc_exception_handler);
|
||||||
initialized = true;
|
initialized = true;
|
||||||
return &orte_iof_svc_module;
|
return &orte_iof_svc_module;
|
||||||
}
|
}
|
||||||
|
@ -100,6 +100,24 @@ done:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release resources when ack completed.
|
||||||
|
*/
|
||||||
|
static void orte_iof_svc_ack_send_cb(
|
||||||
|
int status,
|
||||||
|
orte_process_name_t* peer,
|
||||||
|
struct iovec* msg,
|
||||||
|
int count,
|
||||||
|
orte_rml_tag_t tag,
|
||||||
|
void* cbdata)
|
||||||
|
{
|
||||||
|
orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata;
|
||||||
|
ORTE_IOF_BASE_FRAG_RETURN(frag);
|
||||||
|
if(status < 0) {
|
||||||
|
ORTE_ERROR_LOG(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Receive a data message. Check the subscription list for a match
|
* Receive a data message. Check the subscription list for a match
|
||||||
* on the source - and on matches forward to any published endpoints
|
* on the source - and on matches forward to any published endpoints
|
||||||
@ -112,6 +130,7 @@ static void orte_iof_svc_proxy_msg(
|
|||||||
unsigned char* data)
|
unsigned char* data)
|
||||||
{
|
{
|
||||||
opal_list_item_t* item;
|
opal_list_item_t* item;
|
||||||
|
bool forward = false;
|
||||||
if(mca_iof_svc_component.svc_debug > 1) {
|
if(mca_iof_svc_component.svc_debug > 1) {
|
||||||
opal_output(0, "orte_iof_svc_proxy_msg: tag %d seq %d\n",hdr->msg_tag,hdr->msg_seq);
|
opal_output(0, "orte_iof_svc_proxy_msg: tag %d seq %d\n",hdr->msg_tag,hdr->msg_seq);
|
||||||
}
|
}
|
||||||
@ -133,30 +152,41 @@ static void orte_iof_svc_proxy_msg(
|
|||||||
opal_output(0, "[%lu,%lu,%lu] orte_iof_svc_proxy_msg: tag %d sequence %d\n",
|
opal_output(0, "[%lu,%lu,%lu] orte_iof_svc_proxy_msg: tag %d sequence %d\n",
|
||||||
ORTE_NAME_ARGS(&sub->src_name),hdr->msg_tag,hdr->msg_seq);
|
ORTE_NAME_ARGS(&sub->src_name),hdr->msg_tag,hdr->msg_seq);
|
||||||
}
|
}
|
||||||
orte_iof_svc_sub_forward(sub,src,hdr,data);
|
orte_iof_svc_sub_forward(sub,src,hdr,data,&forward);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
|
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/* if there is no one to forward to - go ahead and ack */
|
||||||
*
|
if(forward == false) {
|
||||||
*/
|
orte_iof_base_frag_t* frag;
|
||||||
static void orte_iof_svc_ack_send_cb(
|
int rc;
|
||||||
int status,
|
|
||||||
orte_process_name_t* peer,
|
ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);
|
||||||
struct iovec* msg,
|
if(NULL == frag) {
|
||||||
int count,
|
ORTE_ERROR_LOG(rc);
|
||||||
orte_rml_tag_t tag,
|
return;
|
||||||
void* cbdata)
|
}
|
||||||
{
|
|
||||||
orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata;
|
frag->frag_hdr.hdr_msg = *hdr;
|
||||||
ORTE_IOF_BASE_FRAG_RETURN(frag);
|
frag->frag_hdr.hdr_common.hdr_type = ORTE_IOF_BASE_HDR_ACK;
|
||||||
if(status < 0) {
|
frag->frag_iov[0].iov_base = (void*)&frag->frag_hdr;
|
||||||
ORTE_ERROR_LOG(status);
|
frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr);
|
||||||
|
ORTE_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg);
|
||||||
|
|
||||||
|
rc = orte_rml.send_nb(
|
||||||
|
&hdr->msg_proxy,
|
||||||
|
frag->frag_iov,
|
||||||
|
1,
|
||||||
|
ORTE_RML_TAG_IOF_SVC,
|
||||||
|
0,
|
||||||
|
orte_iof_svc_ack_send_cb,
|
||||||
|
frag);
|
||||||
|
if(rc < 0) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Received an acknowledgment from an endpoint - forward on
|
* Received an acknowledgment from an endpoint - forward on
|
||||||
|
@ -142,4 +142,38 @@ int orte_iof_svc_pub_delete(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Remove all publications associated w/ the given process name.
|
||||||
|
*/
|
||||||
|
|
||||||
|
void orte_iof_svc_pub_delete_all(
|
||||||
|
const orte_process_name_t* name)
|
||||||
|
{
|
||||||
|
opal_list_item_t* p_item;
|
||||||
|
|
||||||
|
OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
|
||||||
|
p_item = opal_list_get_first(&mca_iof_svc_component.svc_published);
|
||||||
|
while(p_item != opal_list_get_end(&mca_iof_svc_component.svc_published)) {
|
||||||
|
opal_list_item_t* p_next = opal_list_get_next(p_item);
|
||||||
|
orte_iof_svc_pub_t* pub = (orte_iof_svc_pub_t*)p_item;
|
||||||
|
|
||||||
|
if (orte_ns.compare(ORTE_NS_CMP_ALL, &pub->pub_name,name) == 0 ||
|
||||||
|
orte_ns.compare(ORTE_NS_CMP_ALL, &pub->pub_proxy,name) == 0) {
|
||||||
|
|
||||||
|
opal_list_item_t* s_item;
|
||||||
|
for(s_item = opal_list_get_first(&mca_iof_svc_component.svc_subscribed);
|
||||||
|
s_item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed);
|
||||||
|
s_item = opal_list_get_next(s_item)) {
|
||||||
|
orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)s_item;
|
||||||
|
if(orte_iof_svc_fwd_match(sub,pub)) {
|
||||||
|
orte_iof_svc_fwd_delete(sub,pub);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
opal_list_remove_item(&mca_iof_svc_component.svc_published, p_item);
|
||||||
|
OBJ_RELEASE(pub);
|
||||||
|
}
|
||||||
|
p_item = p_next;
|
||||||
|
}
|
||||||
|
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ -60,6 +60,12 @@ int orte_iof_svc_pub_delete(
|
|||||||
orte_ns_cmp_bitmask_t pub_mask,
|
orte_ns_cmp_bitmask_t pub_mask,
|
||||||
orte_iof_base_tag_t pub_tag);
|
orte_iof_base_tag_t pub_tag);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove all entries matching a specified process name.
|
||||||
|
*/
|
||||||
|
|
||||||
|
void orte_iof_svc_pub_delete_all(
|
||||||
|
const orte_process_name_t* name);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -131,6 +131,30 @@ int orte_iof_svc_sub_delete(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int orte_iof_svc_sub_delete_all(
|
||||||
|
const orte_process_name_t *name)
|
||||||
|
{
|
||||||
|
opal_list_item_t *item;
|
||||||
|
OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
|
||||||
|
item = opal_list_get_first(&mca_iof_svc_component.svc_subscribed);
|
||||||
|
while(item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed)) {
|
||||||
|
opal_list_item_t* next = opal_list_get_next(item);
|
||||||
|
orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item;
|
||||||
|
if ((sub->src_mask == ORTE_NS_CMP_ALL &&
|
||||||
|
orte_ns.compare(ORTE_NS_CMP_ALL,&sub->src_name,name) == 0) ||
|
||||||
|
(sub->dst_mask == ORTE_NS_CMP_ALL &&
|
||||||
|
orte_ns.compare(ORTE_NS_CMP_ALL,&sub->dst_name,name) == 0)) {
|
||||||
|
opal_list_remove_item(&mca_iof_svc_component.svc_subscribed, item);
|
||||||
|
OBJ_RELEASE(item);
|
||||||
|
}
|
||||||
|
item = next;
|
||||||
|
}
|
||||||
|
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
|
||||||
|
return ORTE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Callback on send completion. Release send resources (fragment).
|
* Callback on send completion. Release send resources (fragment).
|
||||||
*/
|
*/
|
||||||
@ -159,7 +183,8 @@ int orte_iof_svc_sub_forward(
|
|||||||
orte_iof_svc_sub_t* sub,
|
orte_iof_svc_sub_t* sub,
|
||||||
const orte_process_name_t* src,
|
const orte_process_name_t* src,
|
||||||
orte_iof_base_msg_header_t* hdr,
|
orte_iof_base_msg_header_t* hdr,
|
||||||
const unsigned char* data)
|
const unsigned char* data,
|
||||||
|
bool *forward)
|
||||||
{
|
{
|
||||||
opal_list_item_t* item;
|
opal_list_item_t* item;
|
||||||
for(item = opal_list_get_first(&sub->sub_forward);
|
for(item = opal_list_get_first(&sub->sub_forward);
|
||||||
@ -195,8 +220,10 @@ int orte_iof_svc_sub_forward(
|
|||||||
if(rc != ORTE_SUCCESS) {
|
if(rc != ORTE_SUCCESS) {
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
*forward = true;
|
||||||
}
|
}
|
||||||
if(sub->sub_endpoint != NULL) {
|
if(sub->sub_endpoint != NULL) {
|
||||||
|
*forward = true;
|
||||||
return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data);
|
return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data);
|
||||||
}
|
}
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
|
@ -69,6 +69,9 @@ int orte_iof_svc_sub_delete(
|
|||||||
orte_ns_cmp_bitmask_t dst_mask,
|
orte_ns_cmp_bitmask_t dst_mask,
|
||||||
orte_iof_base_tag_t dst_tag);
|
orte_iof_base_tag_t dst_tag);
|
||||||
|
|
||||||
|
int orte_iof_svc_sub_delete_all(
|
||||||
|
const orte_process_name_t *dst_name);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Forward message to any endpoints that
|
* Forward message to any endpoints that
|
||||||
* match the subscription.
|
* match the subscription.
|
||||||
@ -78,7 +81,8 @@ int orte_iof_svc_sub_forward(
|
|||||||
orte_iof_svc_sub_t* sub,
|
orte_iof_svc_sub_t* sub,
|
||||||
const orte_process_name_t* src,
|
const orte_process_name_t* src,
|
||||||
orte_iof_base_msg_header_t* hdr,
|
orte_iof_base_msg_header_t* hdr,
|
||||||
const unsigned char* data);
|
const unsigned char* data,
|
||||||
|
bool *forward);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check to see if the published endpoint matches
|
* Check to see if the published endpoint matches
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user