1
1
openmpi/orte/mca/iof/svc/iof_svc_sub.c
Brian Barrett a13166b500 * rename ompi_output to opal_output
This commit was SVN r6329.
2005-07-03 23:31:27 +00:00

293 строки
8.1 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include "opal/util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_fragment.h"
#include "mca/errmgr/errmgr.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_pub.h"
#include "iof_svc_sub.h"
/**
* Subscription onstructor/destructor
*/
static void orte_iof_svc_sub_construct(orte_iof_svc_sub_t* sub)
{
sub->sub_endpoint = NULL;
OBJ_CONSTRUCT(&sub->sub_forward, opal_list_t);
}
static void orte_iof_svc_sub_destruct(orte_iof_svc_sub_t* sub)
{
opal_list_item_t* item;
if(sub->sub_endpoint != NULL)
OBJ_RELEASE(sub->sub_endpoint);
while(NULL != (item = opal_list_remove_first(&sub->sub_forward))) {
OBJ_RELEASE(item);
}
}
OBJ_CLASS_INSTANCE(
orte_iof_svc_sub_t,
opal_list_item_t,
orte_iof_svc_sub_construct,
orte_iof_svc_sub_destruct);
/**
* Create a subscription/forwarding entry.
*/
int orte_iof_svc_sub_create(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag)
{
opal_list_item_t* item;
orte_iof_svc_sub_t* sub = OBJ_NEW(orte_iof_svc_sub_t);
sub->src_name = *src_name;
sub->src_mask = src_mask;
sub->src_tag = src_tag;
sub->dst_name = *dst_name;
sub->dst_mask = dst_mask;
sub->dst_tag = dst_tag;
sub->sub_endpoint = orte_iof_base_endpoint_match(&sub->dst_name, sub->dst_mask, sub->dst_tag);
OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
/* search through published endpoints for a match */
for(item = opal_list_get_first(&mca_iof_svc_component.svc_published);
item != opal_list_get_end(&mca_iof_svc_component.svc_published);
item = opal_list_get_next(item)) {
orte_iof_svc_pub_t* pub = (orte_iof_svc_pub_t*)item;
if(orte_iof_svc_fwd_match(sub,pub)) {
orte_iof_svc_fwd_create(sub,pub);
}
}
opal_list_append(&mca_iof_svc_component.svc_subscribed, &sub->super);
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return ORTE_SUCCESS;
}
/**
* Delete all matching subscriptions.
*/
int orte_iof_svc_sub_delete(
const orte_process_name_t *src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
const orte_process_name_t *dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag)
{
opal_list_item_t *item;
OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
item = opal_list_get_first(&mca_iof_svc_component.svc_subscribed);
while(item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed)) {
opal_list_item_t* next = opal_list_get_next(item);
orte_iof_svc_sub_t* sub = (orte_iof_svc_sub_t*)item;
if (sub->src_mask == src_mask &&
orte_ns.compare(sub->src_mask,&sub->src_name,src_name) == 0 &&
sub->src_tag == src_tag &&
sub->dst_mask == dst_mask &&
orte_ns.compare(sub->dst_mask,&sub->dst_name,dst_name) == 0 &&
sub->dst_tag == dst_tag) {
opal_list_remove_item(&mca_iof_svc_component.svc_subscribed, item);
OBJ_RELEASE(item);
}
item = next;
}
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return ORTE_SUCCESS;
}
/*
* Callback on send completion. Release send resources (fragment).
*/
static void orte_iof_svc_sub_send_cb(
int status,
orte_process_name_t* peer,
struct iovec* msg,
int count,
int tag,
void* cbdata)
{
orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)cbdata;
ORTE_IOF_BASE_FRAG_RETURN(frag);
if(status < 0) {
ORTE_ERROR_LOG(status);
}
}
/**
* Check for matching endpoints that have been published to the
* server. Forward data out each matching endpoint.
*/
int orte_iof_svc_sub_forward(
orte_iof_svc_sub_t* sub,
const orte_process_name_t* src,
orte_iof_base_msg_header_t* hdr,
const unsigned char* data)
{
opal_list_item_t* item;
for(item = opal_list_get_first(&sub->sub_forward);
item != opal_list_get_end(&sub->sub_forward);
item = opal_list_get_next(item)) {
orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)item;
orte_iof_svc_pub_t* pub = fwd->fwd_pub;
int rc;
if(pub->pub_endpoint != NULL) {
rc = orte_iof_base_endpoint_forward(pub->pub_endpoint,src,hdr,data);
} else {
/* forward */
orte_iof_base_frag_t* frag;
ORTE_IOF_BASE_FRAG_ALLOC(frag,rc);
frag->frag_hdr.hdr_msg = *hdr;
frag->frag_len = frag->frag_hdr.hdr_msg.msg_len;
frag->frag_iov[0].iov_base = (void*)&frag->frag_hdr;
frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr);
frag->frag_iov[1].iov_base = (void*)frag->frag_data;
frag->frag_iov[1].iov_len = frag->frag_len;
memcpy(frag->frag_data, data, frag->frag_len);
ORTE_IOF_BASE_HDR_MSG_NTOH(frag->frag_hdr.hdr_msg);
rc = mca_oob_send_nb(
&pub->pub_proxy,
frag->frag_iov,
2,
ORTE_RML_TAG_IOF_SVC,
0,
orte_iof_svc_sub_send_cb,
frag);
}
if(rc != ORTE_SUCCESS) {
return rc;
}
}
if(sub->sub_endpoint != NULL) {
return orte_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data);
}
return ORTE_SUCCESS;
}
/**
* I/O Forwarding entry - relates a published endpoint to
* a subscription.
*/
static void orte_iof_svc_fwd_construct(orte_iof_svc_fwd_t* fwd)
{
fwd->fwd_pub = NULL;
OBJ_CONSTRUCT(&fwd->fwd_seq, opal_hash_table_t);
opal_hash_table_init(&fwd->fwd_seq, 256);
}
static void orte_iof_svc_fwd_destruct(orte_iof_svc_fwd_t* fwd)
{
if(NULL != fwd->fwd_pub)
OBJ_RELEASE(fwd->fwd_pub);
OBJ_DESTRUCT(&fwd->fwd_seq);
}
OBJ_CLASS_INSTANCE(
orte_iof_svc_fwd_t,
opal_list_item_t,
orte_iof_svc_fwd_construct,
orte_iof_svc_fwd_destruct);
/**
* Does the published endpoint match the destination specified
* in the subscription?
*/
bool orte_iof_svc_fwd_match(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub)
{
if (orte_ns.compare(sub->dst_mask,&sub->dst_name,&pub->pub_name) == 0 &&
sub->src_tag == pub->pub_tag) {
return true;
} else {
return false;
}
}
/**
* Create a forwarding entry
*/
int orte_iof_svc_fwd_create(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub)
{
orte_iof_svc_fwd_t* fwd = OBJ_NEW(orte_iof_svc_fwd_t);
if(NULL == fwd) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
OBJ_RETAIN(pub);
fwd->fwd_pub = pub;
opal_list_append(&sub->sub_forward, &fwd->super);
return ORTE_SUCCESS;
}
/**
* Remove any forwarding entries that match the
* published endpoint.
*/
int orte_iof_svc_fwd_delete(
orte_iof_svc_sub_t* sub,
orte_iof_svc_pub_t* pub)
{
opal_list_item_t* item;
for(item = opal_list_get_first(&sub->sub_forward);
item != opal_list_get_end(&sub->sub_forward);
item = opal_list_get_next(item)) {
orte_iof_svc_fwd_t* fwd = (orte_iof_svc_fwd_t*)item;
if(fwd->fwd_pub == pub) {
opal_list_remove_item(&sub->sub_forward,item);
OBJ_RELEASE(fwd);
return ORTE_SUCCESS;
}
}
return ORTE_ERR_NOT_FOUND;
}