1
1

cleanup, correct acks when forwarding through an intermediary

This commit was SVN r5094.
Этот коммит содержится в:
Tim Woodall 2005-03-29 19:40:38 +00:00
родитель 5f2051caad
Коммит f4f24cba9f
16 изменённых файлов: 778 добавлений и 409 удалений

Просмотреть файл

@ -107,6 +107,7 @@ static void orte_iof_base_endpoint_read_handler(int fd, short flags, void *cbdat
hdr = &frag->frag_hdr;
hdr->hdr_common.hdr_type = ORTE_IOF_BASE_HDR_MSG;
hdr->hdr_msg.msg_src = endpoint->ep_name;
hdr->hdr_msg.msg_proxy = *ORTE_RML_NAME_SELF;
hdr->hdr_msg.msg_tag = endpoint->ep_tag;
hdr->hdr_msg.msg_seq = endpoint->ep_seq;
hdr->hdr_msg.msg_len = frag->frag_len;
@ -183,6 +184,20 @@ static orte_iof_base_endpoint_t* orte_iof_base_endpoint_lookup(
orte_iof_base_mode_t mode,
int tag)
{
ompi_list_item_t* item;
OMPI_THREAD_LOCK(&orte_iof_base.iof_lock);
for(item = ompi_list_get_first(&orte_iof_base.iof_endpoints);
item != ompi_list_get_end(&orte_iof_base.iof_endpoints);
item = ompi_list_get_next(item)) {
orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)item;
if(orte_ns.compare(ORTE_NS_CMP_ALL,proc,&endpoint->ep_name) == 0 &&
endpoint->ep_tag == tag && endpoint->ep_mode == mode) {
OBJ_RETAIN(endpoint);
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return endpoint;
}
}
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return NULL;
}
@ -249,7 +264,7 @@ int orte_iof_base_endpoint_create(
ompi_list_append(&orte_iof_base.iof_endpoints, &endpoint->super);
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return OMPI_SUCCESS;
return ORTE_SUCCESS;
}
@ -262,7 +277,21 @@ int orte_iof_base_endpoint_delete(
orte_ns_cmp_bitmask_t mask,
int tag)
{
return OMPI_ERROR;
ompi_list_item_t* item;
OMPI_THREAD_LOCK(&orte_iof_base.iof_lock);
item = ompi_list_get_first(&orte_iof_base.iof_endpoints);
while(item != ompi_list_get_end(&orte_iof_base.iof_endpoints)) {
ompi_list_item_t* next = ompi_list_get_next(item);
orte_iof_base_endpoint_t* endpoint = (orte_iof_base_endpoint_t*)item;
if(orte_ns.compare(mask,proc,&endpoint->ep_name) == 0 &&
endpoint->ep_tag == tag) {
OBJ_RELEASE(endpoint);
ompi_list_remove_item(&orte_iof_base.iof_endpoints,&endpoint->super);
}
item = next;
}
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return ORTE_ERR_NOT_FOUND;
}
/*
@ -285,7 +314,7 @@ int orte_iof_base_endpoint_close(orte_iof_base_endpoint_t* endpoint)
}
break;
}
return OMPI_SUCCESS;
return ORTE_SUCCESS;
}
/*
@ -362,7 +391,7 @@ int orte_iof_base_endpoint_forward(
if(rc < 0) {
orte_iof_base_endpoint_closed(endpoint);
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return OMPI_SUCCESS;
return ORTE_SUCCESS;
}
}
@ -377,11 +406,11 @@ int orte_iof_base_endpoint_forward(
}
} else {
/* acknowledge fragment */
endpoint->ep_ack = frag->frag_hdr.hdr_msg.msg_seq + frag->frag_hdr.hdr_msg.msg_len;
orte_iof_base_endpoint_ack(endpoint, frag->frag_hdr.hdr_msg.msg_seq + frag->frag_hdr.hdr_msg.msg_len);
orte_iof_base_frag_ack(frag);
}
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return OMPI_SUCCESS;
return ORTE_SUCCESS;
}
@ -414,6 +443,6 @@ int orte_iof_base_endpoint_ack(
ompi_event_add(&endpoint->ep_event, 0);
}
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return OMPI_SUCCESS;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -72,6 +72,7 @@ typedef struct orte_iof_base_common_header_t orte_iof_base_common_header_t;
struct orte_iof_base_msg_header_t {
orte_iof_base_common_header_t hdr_common;
orte_process_name_t msg_src;
orte_process_name_t msg_proxy;
int32_t msg_tag;
uint32_t msg_seq;
uint32_t msg_len;
@ -81,6 +82,7 @@ typedef struct orte_iof_base_msg_header_t orte_iof_base_msg_header_t;
#define ORTE_IOF_BASE_HDR_MSG_NTOH(h) \
ORTE_IOF_BASE_HDR_CMN_NTOH((h).hdr_common); \
ORTE_PROCESS_NAME_NTOH((h).msg_src); \
ORTE_PROCESS_NAME_NTOH((h).msg_proxy); \
(h).msg_tag = ntohl((h).msg_tag); \
(h).msg_seq = ntohl((h).msg_seq); \
(h).msg_len = ntohl((h).msg_len);
@ -88,6 +90,7 @@ typedef struct orte_iof_base_msg_header_t orte_iof_base_msg_header_t;
#define ORTE_IOF_BASE_HDR_MSG_HTON(h) \
ORTE_IOF_BASE_HDR_CMN_HTON((h).hdr_common); \
ORTE_PROCESS_NAME_HTON((h).msg_src); \
ORTE_PROCESS_NAME_HTON((h).msg_proxy); \
(h).msg_tag = htonl((h).msg_tag); \
(h).msg_seq = htonl((h).msg_seq); \
(h).msg_len = htonl((h).msg_len);

Просмотреть файл

@ -26,6 +26,7 @@
#include "mca/iof/iof.h"
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "mca/errmgr/errmgr.h"
#include "iof_proxy.h"
#include "iof_proxy_svc.h"
@ -71,7 +72,7 @@ int orte_iof_proxy_publish(
/* publish to server */
if(mode == ORTE_IOF_SINK) {
rc = orte_iof_proxy_svc_publish(name,tag);
if(rc != OMPI_SUCCESS)
if(rc != ORTE_SUCCESS)
return rc;
}
@ -144,7 +145,7 @@ int orte_iof_proxy_push(
dst_mask,
dst_tag
);
if(rc != OMPI_SUCCESS)
if(rc != ORTE_SUCCESS)
return rc;
/* setup a local endpoint to reflect registration */
@ -178,13 +179,24 @@ int orte_iof_proxy_pull(
int rc;
rc = orte_iof_base_endpoint_create(
ORTE_RML_NAME_SELF,
ORTE_IOF_SOURCE,
ORTE_IOF_SINK,
src_tag,
fd);
if(rc != OMPI_SUCCESS)
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* send a subscription message to the server */
/* publish this endpoint */
rc = orte_iof_proxy_svc_publish(
ORTE_RML_NAME_SELF,
src_tag);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* subscribe to peer */
rc = orte_iof_proxy_svc_subscribe(
src_name,
src_mask,
@ -192,6 +204,11 @@ int orte_iof_proxy_pull(
ORTE_RML_NAME_SELF,
ORTE_NS_CMP_ALL,
src_tag);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
return rc;
}
@ -205,7 +222,7 @@ int orte_iof_proxy_buffer(
orte_iof_base_tag_t src_tag,
size_t buffer_size)
{
return OMPI_ERROR;
return ORTE_ERROR;
}
@ -253,6 +270,6 @@ int orte_iof_proxy_unsubscribe(
src_tag);
/* remove local callback */
return OMPI_ERROR;
return ORTE_ERROR;
}

Просмотреть файл

@ -131,7 +131,7 @@ orte_iof_proxy_init(int* priority, bool *allow_multi_user_threads, bool *have_hi
orte_iof_proxy_svc_recv,
NULL
);
if(rc != OMPI_SUCCESS) {
if(rc < 0) {
ompi_output(0, "orte_iof_proxy_init: unable to post non-blocking recv");
return NULL;
}

Просмотреть файл

@ -5,6 +5,7 @@
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "mca/errmgr/errmgr.h"
#include "iof_proxy.h"
#include "iof_proxy_svc.h"
@ -54,7 +55,7 @@ int orte_iof_proxy_svc_publish(
ORTE_RML_TAG_IOF_SVC,
0);
if(rc < 0) {
ompi_output(0, "orte_iof_proxy_svc_publish: orte_rml.send() failed with status=%d\n", rc);
ORTE_ERROR_LOG(rc);
return rc;
}
return OMPI_SUCCESS;
@ -92,7 +93,7 @@ int orte_iof_proxy_svc_unpublish(
ORTE_RML_TAG_IOF_SVC,
0);
if(rc < 0) {
ompi_output(0, "orte_iof_proxy_svc_unpublish: orte_rml.send() failed with status=%d\n", rc);
ORTE_ERROR_LOG(rc);
return rc;
}
return OMPI_SUCCESS;
@ -137,7 +138,7 @@ int orte_iof_proxy_svc_subscribe(
ORTE_RML_TAG_IOF_SVC,
0);
if(rc < 0) {
ompi_output(0, "orte_iof_proxy_svc_subscribe: orte_rml.send() failed with status=%d\n", rc);
ORTE_ERROR_LOG(rc);
return rc;
}
return OMPI_SUCCESS;
@ -180,7 +181,7 @@ int orte_iof_proxy_svc_unsubscribe(
ORTE_RML_TAG_IOF_SVC,
0);
if(rc < 0) {
ompi_output(0, "orte_iof_proxy_svc_unsubscribe: orte_rml.send() failed with status=%d\n", rc);
ORTE_ERROR_LOG(rc);
return rc;
}
return OMPI_SUCCESS;
@ -202,6 +203,10 @@ void orte_iof_proxy_svc_recv(
{
orte_iof_base_header_t* hdr = (orte_iof_base_header_t*)msg->iov_base;
int rc;
if(NULL == msg->iov_base) {
ompi_output(0, "orte_iof_proxy_svc_recv: invalid message\n");
return;
}
switch(hdr->hdr_common.hdr_type) {
case ORTE_IOF_BASE_HDR_MSG:
@ -231,7 +236,7 @@ void orte_iof_proxy_svc_recv(
NULL
);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "orte_iof_proxy_svc_recv: unable to post non-blocking recv");
ORTE_ERROR_LOG(rc);
return;
}
}

Просмотреть файл

@ -36,10 +36,10 @@ svc_SOURCES = \
iof_svc_component.c \
iof_svc_proxy.h \
iof_svc_proxy.c \
iof_svc_publish.h \
iof_svc_publish.c \
iof_svc_subscript.h \
iof_svc_subscript.c
iof_svc_pub.h \
iof_svc_pub.c \
iof_svc_sub.h \
iof_svc_sub.c
mcacomponentdir = $(libdir)/openmpi
mcacomponent_LTLIBRARIES = $(component_install)

Просмотреть файл

@ -24,8 +24,8 @@
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "iof_svc.h"
#include "iof_svc_publish.h"
#include "iof_svc_subscript.h"
#include "iof_svc_pub.h"
#include "iof_svc_sub.h"
orte_iof_base_module_t orte_iof_svc_module = {
@ -71,7 +71,7 @@ int orte_iof_svc_publish(
/* publish endpoint */
if(mode == ORTE_IOF_SINK) {
rc = orte_iof_svc_publish_create(
rc = orte_iof_svc_pub_create(
name,
ORTE_RML_NAME_SELF,
ORTE_NS_CMP_ALL,
@ -97,7 +97,7 @@ int orte_iof_svc_unpublish(
orte_iof_base_tag_t tag)
{
int rc;
rc = orte_iof_svc_publish_delete(
rc = orte_iof_svc_pub_delete(
name,
ORTE_RML_NAME_SELF,
mask,
@ -134,7 +134,7 @@ int orte_iof_svc_push(
int rc;
/* setup a subscription */
rc = orte_iof_svc_subscript_create(
rc = orte_iof_svc_sub_create(
ORTE_RML_NAME_SELF,
ORTE_NS_CMP_ALL,
dst_tag,
@ -182,7 +182,7 @@ int orte_iof_svc_pull(
return rc;
/* create a subscription */
rc = orte_iof_svc_subscript_create(
rc = orte_iof_svc_sub_create(
src_name,
src_mask,
src_tag,

Просмотреть файл

@ -5,10 +5,11 @@
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "mca/iof/base/iof_base_fragment.h"
#include "mca/errmgr/errmgr.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_publish.h"
#include "iof_svc_subscript.h"
#include "iof_svc_pub.h"
#include "iof_svc_sub.h"
static void orte_iof_svc_proxy_msg(const orte_process_name_t*, orte_iof_base_msg_header_t*, unsigned char*);
@ -43,7 +44,7 @@ void orte_iof_svc_proxy_recv(
orte_iof_base_header_t* hdr = (orte_iof_base_header_t*)iov[0].iov_base;
if(status < 0) {
ompi_output(0, "orte_iof_svc_recv: receive failed with status: %d", status);
ORTE_ERROR_LOG(status);
goto done;
}
@ -93,15 +94,16 @@ done:
orte_iof_svc_proxy_recv,
NULL
);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "orte_iof_svc_proxy_recv: unable to post non-blocking recv");
return;
if(rc < 0) {
ORTE_ERROR_LOG(rc);
}
}
/**
*
* Receive a data message. Check the subscription list for a match
* on the source - and on matches forward to any published endpoints
* that match the subscriptions destination.
*/
static void orte_iof_svc_proxy_msg(
@ -119,7 +121,7 @@ static void orte_iof_svc_proxy_msg(
for(item = ompi_list_get_first(&mca_iof_svc_component.svc_subscribed);
item != ompi_list_get_end(&mca_iof_svc_component.svc_subscribed);
item = ompi_list_get_next(item)) {
orte_iof_svc_subscript_t* sub = (orte_iof_svc_subscript_t*)item;
orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item;
/* tags match */
if(sub->src_tag != hdr->msg_tag && hdr->msg_tag != ORTE_IOF_ANY)
@ -131,28 +133,123 @@ static void orte_iof_svc_proxy_msg(
ompi_output(0, "[%d,%d,%d] orte_iof_svc_proxy_msg: tag %d sequence %d\n",
ORTE_NAME_ARGS(&sub->src_name),hdr->msg_tag,hdr->msg_seq);
}
orte_iof_svc_subscript_forward(sub,src,hdr,data);
orte_iof_svc_sub_forward(sub,src,hdr,data);
}
}
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
}
/**
*
*
*/
static void orte_iof_svc_ack_send_cb(
int status,
orte_process_name_t* peer,
struct iovec* msg,
int count,
orte_rml_tag_t tag,
void* cbdata)
{
orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata;
ORTE_IOF_BASE_FRAG_RETURN(frag);
if(status < 0) {
ORTE_ERROR_LOG(status);
}
}
/**
* Received an acknowledgment from an endpoint - forward on
* towards the source if all other endpoints have also
* acknowledged the data.
*/
static void orte_iof_svc_proxy_ack(
const orte_process_name_t* src,
orte_iof_base_msg_header_t* hdr)
{
ompi_list_item_t *s_item;
uint32_t seq_min = hdr->msg_seq + hdr->msg_len;
if(mca_iof_svc_component.svc_debug > 1) {
ompi_output(0, "orte_iof_svc_proxy_ack");
}
/* for each of the subscriptions that match the source of the data:
* (1) find all forwarding entries that match the source of the ack
* (2) update their sequence number
* (3) find the minimum sequence number across all endpoints
*/
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
for(s_item = ompi_list_get_first(&mca_iof_svc_component.svc_subscribed);
s_item != ompi_list_get_end(&mca_iof_svc_component.svc_subscribed);
s_item = ompi_list_get_next(s_item)) {
orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)s_item;
ompi_list_item_t *f_item;
if (orte_ns.compare(sub->src_mask,&sub->src_name,&hdr->msg_src) != 0 ||
sub->src_tag != hdr->msg_tag) {
continue;
}
/* look for this endpoint in the forwarding table */
for(f_item = ompi_list_get_first(&sub->sub_forward);
f_item != ompi_list_get_end(&sub->sub_forward);
f_item = ompi_list_get_next(f_item)) {
orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)f_item;
orte_iof_svc_pub_t* pub = fwd->fwd_pub;
if (orte_ns.compare(pub->pub_mask,&pub->pub_name,src) == 0) {
ompi_hash_table_set_proc(&fwd->fwd_seq,
&hdr->msg_src,(void*)(hdr->msg_seq+hdr->msg_len));
} else {
uint32_t seq = (uint32_t)ompi_hash_table_get_proc(
&fwd->fwd_seq,&hdr->msg_src);
if(seq < seq_min) {
seq_min = seq;
}
}
}
}
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
/* if all endpoints have acknowledged up to this sequence number
* forward ack on to the source
*/
if(seq_min == hdr->msg_seq+hdr->msg_len) {
orte_iof_base_frag_t* frag;
int rc;
ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);
if(NULL == frag) {
ORTE_ERROR_LOG(rc);
return;
}
frag->frag_hdr.hdr_msg = *hdr;
frag->frag_iov[0].iov_base = &frag->frag_hdr;
frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr);
ORTE_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg);
rc = orte_rml.send_nb(
&hdr->msg_proxy,
frag->frag_iov,
1,
ORTE_RML_TAG_IOF_SVC,
0,
orte_iof_svc_ack_send_cb,
frag);
if(rc < 0) {
ORTE_ERROR_LOG(rc);
}
}
}
/**
*
* Create an entry to represent the published endpoint. This
* also checks to see if the endpoint matches any pending
* subscriptions.
*/
static void orte_iof_svc_proxy_pub(
@ -164,13 +261,13 @@ static void orte_iof_svc_proxy_pub(
ompi_output(0, "orte_iof_svc_proxy_pub");
}
rc = orte_iof_svc_publish_create(
rc = orte_iof_svc_pub_create(
&hdr->pub_name,
&hdr->pub_proxy,
hdr->pub_mask,
hdr->pub_tag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "orte_iof_svc_pub: orte_iof_svc_publish_create failed with status=%d\n", rc);
ORTE_ERROR_LOG(rc);
}
}
@ -187,18 +284,20 @@ static void orte_iof_svc_proxy_unpub(
ompi_output(0, "orte_iof_svc_proxy_unpub");
}
rc = orte_iof_svc_publish_delete(
rc = orte_iof_svc_pub_delete(
&hdr->pub_name,
&hdr->pub_proxy,
hdr->pub_mask,
hdr->pub_tag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "orte_iof_svc_proxy_unpub: orte_iof_svc_publish_delete failed with status=%d\n", rc);
ORTE_ERROR_LOG(rc);
}
}
/**
*
* Create a subscription entry. A subscription entry
* determines the set of source(s) that will forward
* to any matching published endpoints.
*/
static void orte_iof_svc_proxy_sub(
@ -210,7 +309,7 @@ static void orte_iof_svc_proxy_sub(
ompi_output(0, "orte_iof_svc_proxy_sub");
}
rc = orte_iof_svc_subscript_create(
rc = orte_iof_svc_sub_create(
&hdr->src_name,
hdr->src_mask,
hdr->src_tag,
@ -218,12 +317,12 @@ static void orte_iof_svc_proxy_sub(
hdr->dst_mask,
hdr->dst_tag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "orte_iof_svc_proxy_sub: orte_iof_svc_subcript_create failed with status=%d\n", rc);
ORTE_ERROR_LOG(rc);
}
}
/**
*
* Remove a subscription.
*/
static void orte_iof_svc_proxy_unsub(
@ -235,7 +334,7 @@ static void orte_iof_svc_proxy_unsub(
ompi_output(0, "orte_iof_svc_proxy_unsub");
}
rc = orte_iof_svc_subscript_delete(
rc = orte_iof_svc_sub_delete(
&hdr->src_name,
hdr->src_mask,
hdr->src_tag,
@ -243,7 +342,7 @@ static void orte_iof_svc_proxy_unsub(
hdr->dst_mask,
hdr->dst_tag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "orte_iof_svc_proxy_unsub: orte_iof_svc_subcript_delete failed with status=%d\n", rc);
ORTE_ERROR_LOG(rc);
}
}

129
src/mca/iof/svc/iof_svc_pub.c Обычный файл
Просмотреть файл

@ -0,0 +1,129 @@
#include "ompi_config.h"
#include "util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/iof_base_header.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_pub.h"
#include "iof_svc_sub.h"
static void orte_iof_svc_pub_construct(orte_iof_svc_pub_t* publish)
{
}
static void orte_iof_svc_pub_destruct(orte_iof_svc_pub_t* publish)
{
}
OBJ_CLASS_INSTANCE(
orte_iof_svc_pub_t,
ompi_list_item_t,
orte_iof_svc_pub_construct,
orte_iof_svc_pub_destruct);
/**
* (1) Create an entry to represent the published endpoint
* (2) Lookup any subscriptions that match and install on the
* subscription as a destination endpoint.
*/
int orte_iof_svc_pub_create(
const orte_process_name_t *pub_name,
const orte_process_name_t *pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag)
{
orte_iof_svc_pub_t* pub = OBJ_NEW(orte_iof_svc_pub_t);
ompi_list_item_t* item;
pub->pub_name = *pub_name;
pub->pub_proxy = *pub_proxy;
pub->pub_mask = pub_mask;
pub->pub_tag = pub_tag;
pub->pub_endpoint = orte_iof_base_endpoint_match(pub_name,pub_mask,pub_tag);
/* append this published endpoint to any matching subscription */
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
for(item = ompi_list_get_first(&mca_iof_svc_component.svc_subscribed);
item != ompi_list_get_end(&mca_iof_svc_component.svc_subscribed);
item = ompi_list_get_next(item)) {
orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item;
if(orte_iof_svc_fwd_match(sub,pub)) {
orte_iof_svc_fwd_create(sub,pub);
}
}
/* append this published endpoint to the global list */
ompi_list_append(&mca_iof_svc_component.svc_published, &pub->super);
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return ORTE_SUCCESS;
}
/**
* Look for a matching endpoint.
*/
orte_iof_svc_pub_t* orte_iof_svc_pub_lookup(
const orte_process_name_t *pub_name,
const orte_process_name_t *pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag)
{
ompi_list_item_t* item;
for(item = ompi_list_get_first(&mca_iof_svc_component.svc_published);
item != ompi_list_get_end(&mca_iof_svc_component.svc_published);
item = ompi_list_get_next(item)) {
orte_iof_svc_pub_t* pub = (orte_iof_svc_pub_t*)item;
if (orte_ns.compare(ORTE_NS_CMP_ALL, &pub->pub_name,pub_name) == 0 &&
orte_ns.compare(ORTE_NS_CMP_ALL, &pub->pub_proxy,pub_proxy) == 0 &&
pub->pub_mask == pub_mask &&
pub->pub_tag == pub_tag) {
return pub;
}
}
return NULL;
}
/**
* Remove the published endpoint and cleanup any associated
* forwarding entries.
*/
int orte_iof_svc_pub_delete(
const orte_process_name_t *pub_name,
const orte_process_name_t *pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag)
{
ompi_list_item_t* item;
orte_iof_svc_pub_t* pub;
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
pub = orte_iof_svc_pub_lookup(pub_name,pub_proxy,pub_mask,pub_tag);
if(NULL == pub) {
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return ORTE_ERR_NOT_FOUND;
}
for(item = ompi_list_get_first(&mca_iof_svc_component.svc_subscribed);
item != ompi_list_get_end(&mca_iof_svc_component.svc_subscribed);
item = ompi_list_get_next(item)) {
orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item;
if(orte_iof_svc_fwd_match(sub,pub)) {
orte_iof_svc_fwd_delete(sub,pub);
}
}
ompi_list_remove_item(&mca_iof_svc_component.svc_published, &pub->super);
OBJ_RELEASE(pub);
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return ORTE_SUCCESS;
}

65
src/mca/iof/svc/iof_svc_pub.h Обычный файл
Просмотреть файл

@ -0,0 +1,65 @@
#ifndef ORTE_IOF_SVC_PUBLISH_H
#define ORTE_IOF_SVC_PUBLISH_H
#include "ompi_config.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "iof_svc.h"
/**
* Endpoints that are sinks of data are published by the
* processes that is acting as a proxy for the destination
* application. The published endpoints are matched against
* subscriptions to determine the sources of data that are
* forwarded to the endpoint.
*/
struct orte_iof_svc_pub_t {
ompi_list_item_t super;
orte_process_name_t pub_name;
orte_process_name_t pub_proxy;
orte_ns_cmp_bitmask_t pub_mask;
orte_iof_base_tag_t pub_tag;
orte_iof_base_endpoint_t* pub_endpoint;
};
typedef struct orte_iof_svc_pub_t orte_iof_svc_pub_t;
OBJ_CLASS_DECLARATION(orte_iof_svc_pub_t);
/**
* Create a new entry.
*/
int orte_iof_svc_pub_create(
const orte_process_name_t* pub_name,
const orte_process_name_t* pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag);
/**
* Lookup an existing entry.
*/
orte_iof_svc_pub_t* orte_iof_svc_pub_lookup(
const orte_process_name_t *pub_name,
const orte_process_name_t *pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag);
/**
* Cleanup an existing entry.
*/
int orte_iof_svc_pub_delete(
const orte_process_name_t* pub_name,
const orte_process_name_t* pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag);
#endif

Просмотреть файл

@ -1,63 +0,0 @@
#include "ompi_config.h"
#include "util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/iof_base_header.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_publish.h"
static void orte_iof_svc_publish_construct(orte_iof_svc_publish_t* publish)
{
}
static void orte_iof_svc_publish_destruct(orte_iof_svc_publish_t* publish)
{
}
OBJ_CLASS_INSTANCE(
orte_iof_svc_publish_t,
ompi_list_item_t,
orte_iof_svc_publish_construct,
orte_iof_svc_publish_destruct);
/**
*
*/
int orte_iof_svc_publish_create(
const orte_process_name_t *pub_name,
const orte_process_name_t *pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag)
{
orte_iof_svc_publish_t* pub = OBJ_NEW(orte_iof_svc_publish_t);
pub->pub_name = *pub_name;
pub->pub_proxy = *pub_proxy;
pub->pub_mask = pub_mask;
pub->pub_tag = pub_tag;
pub->pub_endpoint = orte_iof_base_endpoint_match(pub_name,pub_mask,pub_tag);
ompi_list_append(&mca_iof_svc_component.svc_published, &pub->super);
return OMPI_SUCCESS;
}
/**
*
*/
int orte_iof_svc_publish_delete(
const orte_process_name_t *pub_name,
const orte_process_name_t *pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag)
{
return OMPI_SUCCESS;
}

Просмотреть файл

@ -1,50 +0,0 @@
#ifndef ORTE_IOF_SVC_PUBLISH_H
#define ORTE_IOF_SVC_PUBLISH_H
#include "ompi_config.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "iof_svc.h"
/**
*
*/
struct orte_iof_svc_publish_t {
ompi_list_item_t super;
orte_process_name_t pub_name;
orte_process_name_t pub_proxy;
orte_ns_cmp_bitmask_t pub_mask;
orte_iof_base_tag_t pub_tag;
orte_iof_base_endpoint_t* pub_endpoint;
};
typedef struct orte_iof_svc_publish_t orte_iof_svc_publish_t;
OBJ_CLASS_DECLARATION(orte_iof_svc_publish_t);
/**
*
*/
int orte_iof_svc_publish_create(
const orte_process_name_t* pub_name,
const orte_process_name_t* pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag);
/**
*
*/
int orte_iof_svc_publish_delete(
const orte_process_name_t* pub_name,
const orte_process_name_t* pub_proxy,
orte_ns_cmp_bitmask_t pub_mask,
orte_iof_base_tag_t pub_tag);
#endif

273
src/mca/iof/svc/iof_svc_sub.c Обычный файл
Просмотреть файл

@ -0,0 +1,273 @@
#include "ompi_config.h"
#include "util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_fragment.h"
#include "mca/errmgr/errmgr.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_pub.h"
#include "iof_svc_sub.h"
/**
* Subscription onstructor/destructor
*/
static void orte_iof_svc_sub_construct(orte_iof_svc_sub_t* sub)
{
sub->sub_endpoint = NULL;
OBJ_CONSTRUCT(&sub->sub_forward, ompi_list_t);
}
static void orte_iof_svc_sub_destruct(orte_iof_svc_sub_t* sub)
{
ompi_list_item_t* item;
if(sub->sub_endpoint != NULL)
OBJ_RELEASE(sub->sub_endpoint);
while(NULL != (item = ompi_list_remove_first(&sub->sub_forward))) {
OBJ_RELEASE(item);
}
}
OBJ_CLASS_INSTANCE(
orte_iof_svc_sub_t,
ompi_list_item_t,
orte_iof_svc_sub_construct,
orte_iof_svc_sub_destruct);
/**
* Create a subscription/forwarding entry.
*/
int orte_iof_svc_sub_create(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag)
{
ompi_list_item_t* item;
orte_iof_svc_sub_t* sub = OBJ_NEW(orte_iof_svc_sub_t);
sub->src_name = *src_name;
sub->src_mask = src_mask;
sub->src_tag = src_tag;
sub->dst_name = *dst_name;
sub->dst_mask = dst_mask;
sub->dst_tag = dst_tag;
sub->sub_endpoint = orte_iof_base_endpoint_match(&sub->dst_name, sub->dst_mask, sub->dst_tag);
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
/* search through published endpoints for a match */
for(item = ompi_list_get_first(&mca_iof_svc_component.svc_published);
item != ompi_list_get_end(&mca_iof_svc_component.svc_published);
item = ompi_list_get_next(item)) {
orte_iof_svc_pub_t* pub = (orte_iof_svc_pub_t*)item;
if(orte_iof_svc_fwd_match(sub,pub)) {
orte_iof_svc_fwd_create(sub,pub);
}
}
ompi_list_append(&mca_iof_svc_component.svc_subscribed, &sub->super);
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return ORTE_SUCCESS;
}
/**
* Delete all matching subscriptions.
*/
int orte_iof_svc_sub_delete(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag)
{
ompi_list_item_t *item;
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
item = ompi_list_get_first(&mca_iof_svc_component.svc_subscribed);
while(item != ompi_list_get_end(&mca_iof_svc_component.svc_subscribed)) {
ompi_list_item_t* next = ompi_list_get_next(item);
orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item;
if (sub->src_mask == src_mask &&
orte_ns.compare(sub->src_mask,&sub->src_name,src_name) == 0 &&
sub->src_tag == src_tag &&
sub->dst_mask == dst_mask &&
orte_ns.compare(sub->dst_mask,&sub->dst_name,dst_name) == 0 &&
sub->dst_tag == dst_tag) {
ompi_list_remove_item(&mca_iof_svc_component.svc_subscribed, item);
OBJ_RELEASE(item);
}
item = next;
}
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return ORTE_SUCCESS;
}
/*
* Callback on send completion. Release send resources (fragment).
*/
static void orte_iof_svc_sub_send_cb(
int status,
orte_process_name_t* peer,
struct iovec* msg,
int count,
int tag,
void* cbdata)
{
orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata;
ORTE_IOF_BASE_FRAG_RETURN(frag);
if(status < 0) {
ORTE_ERROR_LOG(status);
}
}
/**
* Check for matching endpoints that have been published to the
* server. Forward data out each matching endpoint.
*/
int orte_iof_svc_sub_forward(
orte_iof_svc_sub_t* sub,
const orte_process_name_t* src,
orte_iof_base_msg_header_t* hdr,
const unsigned char* data)
{
ompi_list_item_t* item;
for(item = ompi_list_get_first(&sub->sub_forward);
item != ompi_list_get_end(&sub->sub_forward);
item = ompi_list_get_next(item)) {
orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)item;
orte_iof_svc_pub_t* pub = fwd->fwd_pub;
int rc;
if(pub->pub_endpoint != NULL) {
rc = orte_iof_base_endpoint_forward(pub->pub_endpoint,src,hdr,data);
} else {
/* forward */
orte_iof_base_frag_t* frag;
ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);
frag->frag_hdr.hdr_msg = *hdr;
frag->frag_len = frag->frag_hdr.hdr_msg.msg_len;
frag->frag_iov[0].iov_base = &frag->frag_hdr;
frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr);
frag->frag_iov[1].iov_base = frag->frag_data;
frag->frag_iov[1].iov_len = frag->frag_len;
memcpy(frag->frag_data, data, frag->frag_len);
ORTE_IOF_BASE_HDR_MSG_NTOH(frag->frag_hdr.hdr_msg);
rc = mca_oob_send_nb(
&pub->pub_proxy,
frag->frag_iov,
2,
ORTE_RML_TAG_IOF_SVC,
0,
orte_iof_svc_sub_send_cb,
frag);
}
if(rc != ORTE_SUCCESS) {
return rc;
}
}
if(sub->sub_endpoint != NULL) {
return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data);
}
return ORTE_SUCCESS;
}
/**
* I/O Forwarding entry - relates a published endpoint to
* a subscription.
*/
static void orte_iof_svc_fwd_construct(orte_iof_svc_fwd_t* fwd)
{
fwd->fwd_pub = NULL;
OBJ_CONSTRUCT(&fwd->fwd_seq, ompi_hash_table_t);
ompi_hash_table_init(&fwd->fwd_seq, 256);
}
static void orte_iof_svc_fwd_destruct(orte_iof_svc_fwd_t* fwd)
{
if(NULL != fwd->fwd_pub)
OBJ_RELEASE(fwd->fwd_pub);
OBJ_DESTRUCT(&fwd->fwd_seq);
}
OBJ_CLASS_INSTANCE(
orte_iof_svc_fwd_t,
ompi_list_item_t,
orte_iof_svc_fwd_construct,
orte_iof_svc_fwd_destruct);
/**
* Does the published endpoint match the destination specified
* in the subscription?
*/
bool orte_iof_svc_fwd_match(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub)
{
if (orte_ns.compare(sub->dst_mask,&sub->dst_name,&pub->pub_name) == 0 &&
sub->src_tag == pub->pub_tag) {
return true;
} else {
return false;
}
}
/**
* Create a forwarding entry
*/
int orte_iof_svc_fwd_create(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub)
{
orte_iof_svc_fwd_t* fwd = OBJ_NEW(orte_iof_svc_fwd_t);
if(NULL == fwd) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
OBJ_RETAIN(pub);
fwd->fwd_pub = pub;
ompi_list_append(&sub->sub_forward, &fwd->super);
return ORTE_SUCCESS;
}
/**
* Remove any forwarding entries that match the
* published endpoint.
*/
int orte_iof_svc_fwd_delete(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub)
{
ompi_list_item_t* item;
for(item = ompi_list_get_first(&sub->sub_forward);
item != ompi_list_get_end(&sub->sub_forward);
item = ompi_list_get_next(item)) {
orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)item;
if(fwd->fwd_pub == pub) {
ompi_list_remove_item(&sub->sub_forward,item);
OBJ_RELEASE(fwd);
return ORTE_SUCCESS;
}
}
return ORTE_ERR_NOT_FOUND;
}

106
src/mca/iof/svc/iof_svc_sub.h Обычный файл
Просмотреть файл

@ -0,0 +1,106 @@
#ifndef MCA_IOF_SVC_SUBSCRIPT_H
#define MCA_IOF_SVC_SUBSCRIPT_H
#include "class/orte_pointer_array.h"
#include "class/ompi_proc_table.h"
/**
* A subscription routes data from a specified set
* of source endpoints to one or more destination
* endpoints.
*/
struct orte_iof_svc_fwd_t {
ompi_list_item_t super;
orte_iof_svc_pub_t* fwd_pub;
ompi_hash_table_t fwd_seq;
};
typedef struct orte_iof_svc_fwd_t orte_iof_svc_fwd_t;
OBJ_CLASS_DECLARATION(orte_iof_svc_fwd_t);
struct orte_iof_svc_sub_t {
ompi_list_item_t super;
orte_process_name_t src_name;
orte_ns_cmp_bitmask_t src_mask;
orte_iof_base_tag_t src_tag;
orte_process_name_t dst_name;
orte_ns_cmp_bitmask_t dst_mask;
orte_iof_base_tag_t dst_tag;
orte_iof_base_endpoint_t* sub_endpoint;
ompi_list_t sub_forward;
};
typedef struct orte_iof_svc_sub_t orte_iof_svc_sub_t;
OBJ_CLASS_DECLARATION(orte_iof_svc_sub_t);
/**
* Lookup an existing subscription.
*/
orte_iof_svc_sub_t* orte_iof_svc_sub_lookup(
const orte_process_name_t* src
);
/**
* Create a subscription
*/
int orte_iof_svc_sub_create(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag);
/**
* Cleanup/remove a subscription
*/
int orte_iof_svc_sub_delete(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag);
/**
* Forward message to any endpoints that
* match the subscription.
*/
int orte_iof_svc_sub_forward(
orte_iof_svc_sub_t* sub,
const orte_process_name_t* src,
orte_iof_base_msg_header_t* hdr,
const unsigned char* data);
/**
* Check to see if the published endpoint matches
* the subscription.
*/
bool orte_iof_svc_fwd_match(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub);
/**
* Create or remove a forwarding entry on the
* current subscription.
*/
int orte_iof_svc_fwd_create(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub);
int orte_iof_svc_fwd_delete(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub);
#endif

Просмотреть файл

@ -1,176 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include "util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_fragment.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_publish.h"
#include "iof_svc_subscript.h"
static void orte_iof_svc_subscript_construct(orte_iof_svc_subscript_t* subscript)
{
subscript->sub_endpoint = NULL;
}
static void orte_iof_svc_subscript_destruct(orte_iof_svc_subscript_t* subscript)
{
if(subscript->sub_endpoint != NULL)
OBJ_RELEASE(subscript->sub_endpoint);
}
OBJ_CLASS_INSTANCE(
orte_iof_svc_subscript_t,
ompi_list_item_t,
orte_iof_svc_subscript_construct,
orte_iof_svc_subscript_destruct);
/**
* Create a subscription/forwarding entry.
*/
int orte_iof_svc_subscript_create(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag)
{
orte_iof_svc_subscript_t* sub = OBJ_NEW(orte_iof_svc_subscript_t);
sub->src_name = *src_name;
sub->src_mask = src_mask;
sub->src_tag = src_tag;
sub->dst_name = *dst_name;
sub->dst_mask = dst_mask;
sub->dst_tag = dst_tag;
sub->sub_endpoint = orte_iof_base_endpoint_match(&sub->dst_name, sub->dst_mask, sub->dst_tag);
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
ompi_list_append(&mca_iof_svc_component.svc_subscribed, &sub->super);
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return OMPI_SUCCESS;
}
/**
* Delete all matching subscriptions.
*/
int orte_iof_svc_subscript_delete(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag)
{
ompi_list_item_t *item;
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
item = ompi_list_get_first(&mca_iof_svc_component.svc_subscribed);
while(item != ompi_list_get_end(&mca_iof_svc_component.svc_subscribed)) {
ompi_list_item_t* next = ompi_list_get_next(item);
orte_iof_svc_subscript_t* sub = (orte_iof_svc_subscript_t*)item;
if (sub->src_mask == src_mask &&
orte_ns.compare(sub->src_mask,&sub->src_name,src_name) == 0 &&
sub->src_tag == src_tag &&
sub->dst_mask == dst_mask &&
orte_ns.compare(sub->dst_mask,&sub->dst_name,dst_name) == 0 &&
sub->dst_tag == dst_tag) {
ompi_list_remove_item(&mca_iof_svc_component.svc_subscribed, item);
OBJ_RELEASE(item);
}
item = next;
}
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return OMPI_SUCCESS;
}
/*
* Callback on send completion. Release send resources (fragment).
*/
static void orte_iof_svc_subscript_send_cb(
int status,
orte_process_name_t* peer,
struct iovec* msg,
int count,
int tag,
void* cbdata)
{
orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)frag;
ORTE_IOF_BASE_FRAG_RETURN(frag);
}
/**
* Check for matching endpoints that have been published to the
* server. Forward data out each matching endpoint.
*/
int orte_iof_svc_subscript_forward(
orte_iof_svc_subscript_t* sub,
const orte_process_name_t* src,
orte_iof_base_msg_header_t* hdr,
const unsigned char* data)
{
ompi_list_item_t* item;
for(item = ompi_list_get_first(&mca_iof_svc_component.svc_published);
item != ompi_list_get_end(&mca_iof_svc_component.svc_published);
item = ompi_list_get_next(item)) {
orte_iof_svc_publish_t* pub = (orte_iof_svc_publish_t*)item;
int rc;
if(sub->dst_tag != pub->pub_tag && pub->pub_tag != ORTE_IOF_ANY)
continue;
if(pub->pub_endpoint != NULL) {
rc = orte_iof_base_endpoint_forward(pub->pub_endpoint,src,hdr,data);
} else {
/* forward */
orte_iof_base_frag_t* frag;
ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);
frag->frag_hdr.hdr_msg = *hdr;
frag->frag_len = frag->frag_hdr.hdr_msg.msg_len;
frag->frag_iov[1].iov_len = frag->frag_len;
memcpy(frag->frag_data, data, frag->frag_len);
rc = mca_oob_send_nb(
&pub->pub_proxy,
frag->frag_iov,
2,
ORTE_RML_TAG_IOF_SVC,
0,
orte_iof_svc_subscript_send_cb,
frag);
}
if(rc != OMPI_SUCCESS) {
return rc;
}
}
if(sub->sub_endpoint != NULL) {
return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data);
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -1,68 +0,0 @@
#ifndef MCA_IOF_SVC_SUBSCRIPT_H
#define MCA_IOF_SVC_SUBSCRIPT_H
/**
* A subscription maps data from a specified set
* of source endpoints to one or more destination(s).
*/
struct orte_iof_svc_subscript_t {
ompi_list_item_t super;
orte_process_name_t src_name;
orte_ns_cmp_bitmask_t src_mask;
orte_iof_base_tag_t src_tag;
orte_process_name_t dst_name;
orte_ns_cmp_bitmask_t dst_mask;
orte_iof_base_tag_t dst_tag;
orte_iof_base_endpoint_t* sub_endpoint;
};
typedef struct orte_iof_svc_subscript_t orte_iof_svc_subscript_t;
OBJ_CLASS_DECLARATION(orte_iof_svc_subscript_t);
/**
*
*/
orte_iof_svc_subscript_t* orte_iof_svc_subscript_lookup(
const orte_process_name_t* src
);
/**
*
*/
int orte_iof_svc_subscript_create(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag);
/**
*
*/
int orte_iof_svc_subscript_delete(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag);
/**
*
*/
int orte_iof_svc_subscript_forward(
orte_iof_svc_subscript_t* subscript,
const orte_process_name_t* src,
orte_iof_base_msg_header_t* hdr,
const unsigned char* data);
#endif