2005-03-30 01:31:30 +00:00
/*
2005-11-05 19:57:48 +00:00
* Copyright ( c ) 2004 - 2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation . All rights reserved .
2006-08-23 03:32:36 +00:00
* Copyright ( c ) 2004 - 2006 The University of Tennessee and The University
2005-11-05 19:57:48 +00:00
* of Tennessee Research Foundation . All rights
* reserved .
2005-03-30 01:31:30 +00:00
* Copyright ( c ) 2004 - 2005 High Performance Computing Center Stuttgart ,
* University of Stuttgart . All rights reserved .
* Copyright ( c ) 2004 - 2005 The Regents of the University of California .
* All rights reserved .
* $ COPYRIGHT $
*
* Additional copyrights may follow
*
* $ HEADER $
*/
2006-02-12 01:33:29 +00:00
# include "orte_config.h"
2005-03-30 01:31:30 +00:00
# include <string.h>
2005-07-03 23:31:27 +00:00
# include "opal/util/output.h"
2007-06-08 22:59:31 +00:00
# include "orte/mca/rml/rml.h"
2006-02-12 01:33:29 +00:00
# include "orte/mca/iof/base/iof_base_header.h"
# include "orte/mca/iof/base/iof_base_fragment.h"
# include "orte/mca/errmgr/errmgr.h"
2007-06-08 22:59:31 +00:00
# include "orte/class/orte_proc_table.h"
2005-03-29 19:40:38 +00:00
# include "iof_svc.h"
# include "iof_svc_proxy.h"
# include "iof_svc_pub.h"
# include "iof_svc_sub.h"
/**
* Subscription onstructor / destructor
*/
static void orte_iof_svc_sub_construct ( orte_iof_svc_sub_t * sub )
{
sub - > sub_endpoint = NULL ;
2007-06-08 22:59:31 +00:00
sub - > has_been_acked = true ;
sub - > last_ack_forwarded = 0 ;
2005-07-03 16:22:16 +00:00
OBJ_CONSTRUCT ( & sub - > sub_forward , opal_list_t ) ;
2005-03-29 19:40:38 +00:00
}
static void orte_iof_svc_sub_destruct ( orte_iof_svc_sub_t * sub )
{
2005-07-03 16:22:16 +00:00
opal_list_item_t * item ;
2005-03-29 19:40:38 +00:00
if ( sub - > sub_endpoint ! = NULL )
OBJ_RELEASE ( sub - > sub_endpoint ) ;
2005-07-03 16:22:16 +00:00
while ( NULL ! = ( item = opal_list_remove_first ( & sub - > sub_forward ) ) ) {
2005-03-29 19:40:38 +00:00
OBJ_RELEASE ( item ) ;
}
}
OBJ_CLASS_INSTANCE (
orte_iof_svc_sub_t ,
2005-07-03 16:22:16 +00:00
opal_list_item_t ,
2005-03-29 19:40:38 +00:00
orte_iof_svc_sub_construct ,
orte_iof_svc_sub_destruct ) ;
/**
* Create a subscription / forwarding entry .
*/
int orte_iof_svc_sub_create (
2007-06-08 22:59:31 +00:00
const orte_process_name_t * origin_name ,
orte_ns_cmp_bitmask_t origin_mask ,
orte_iof_base_tag_t origin_tag ,
const orte_process_name_t * target_name ,
orte_ns_cmp_bitmask_t target_mask ,
orte_iof_base_tag_t target_tag )
2005-03-29 19:40:38 +00:00
{
2005-11-10 04:49:51 +00:00
orte_iof_svc_sub_t * sub ;
2005-07-03 16:22:16 +00:00
opal_list_item_t * item ;
2005-03-29 19:40:38 +00:00
2007-06-08 22:59:31 +00:00
/* See if the subscription already exists */
2005-11-10 04:49:51 +00:00
OPAL_THREAD_LOCK ( & mca_iof_svc_component . svc_lock ) ;
for ( item = opal_list_get_first ( & mca_iof_svc_component . svc_subscribed ) ;
item ! = opal_list_get_end ( & mca_iof_svc_component . svc_subscribed ) ;
item = opal_list_get_next ( item ) ) {
sub = ( orte_iof_svc_sub_t * ) item ;
2007-06-08 22:59:31 +00:00
if ( sub - > origin_mask = = origin_mask & &
2007-08-01 18:59:37 +00:00
ORTE_EQUAL = = orte_ns . compare_fields ( sub - > origin_mask , & sub - > origin_name , origin_name ) & &
2007-06-08 22:59:31 +00:00
sub - > origin_tag = = origin_tag & &
sub - > target_mask = = target_mask & &
2007-08-01 18:59:37 +00:00
ORTE_EQUAL = = orte_ns . compare_fields ( sub - > target_mask , & sub - > target_name , target_name ) & &
2007-06-08 22:59:31 +00:00
sub - > target_tag = = target_tag ) {
2005-11-10 04:49:51 +00:00
OPAL_THREAD_UNLOCK ( & mca_iof_svc_component . svc_lock ) ;
return ORTE_SUCCESS ;
}
}
2007-06-08 22:59:31 +00:00
/* No, it does not -- create a new one */
2005-11-10 04:49:51 +00:00
sub = OBJ_NEW ( orte_iof_svc_sub_t ) ;
2007-06-08 22:59:31 +00:00
sub - > origin_name = * origin_name ;
sub - > origin_mask = origin_mask ;
sub - > origin_tag = origin_tag ;
sub - > target_name = * target_name ;
sub - > target_mask = target_mask ;
sub - > target_tag = target_tag ;
sub - > sub_endpoint = orte_iof_base_endpoint_match ( & sub - > target_name , sub - > target_mask , sub - > target_tag ) ;
2007-07-20 02:34:29 +00:00
opal_output ( orte_iof_base . iof_output , " created svc sub, origin %s tag %d / mask %x, target %s, tag %d / mask %x \n " ,
ORTE_NAME_PRINT ( ( orte_process_name_t * ) origin_name ) , origin_tag , origin_mask ,
ORTE_NAME_PRINT ( ( orte_process_name_t * ) target_name ) , target_tag , target_mask ) ;
2005-11-17 05:22:19 +00:00
2005-03-29 19:40:38 +00:00
/* search through published endpoints for a match */
2005-07-03 16:22:16 +00:00
for ( item = opal_list_get_first ( & mca_iof_svc_component . svc_published ) ;
item ! = opal_list_get_end ( & mca_iof_svc_component . svc_published ) ;
item = opal_list_get_next ( item ) ) {
2005-03-29 19:40:38 +00:00
orte_iof_svc_pub_t * pub = ( orte_iof_svc_pub_t * ) item ;
if ( orte_iof_svc_fwd_match ( sub , pub ) ) {
orte_iof_svc_fwd_create ( sub , pub ) ;
}
}
2005-07-03 16:22:16 +00:00
opal_list_append ( & mca_iof_svc_component . svc_subscribed , & sub - > super ) ;
2005-07-03 22:45:48 +00:00
OPAL_THREAD_UNLOCK ( & mca_iof_svc_component . svc_lock ) ;
2005-03-29 19:40:38 +00:00
return ORTE_SUCCESS ;
}
2007-06-08 22:59:31 +00:00
/**
* Release resources when the forwarding of an ACK has completed .
*/
static void ack_send_cb (
int status ,
orte_process_name_t * peer ,
struct iovec * msg ,
int count ,
orte_rml_tag_t tag ,
void * cbdata )
{
orte_iof_base_frag_t * frag = ( orte_iof_base_frag_t * ) cbdata ;
ORTE_IOF_BASE_FRAG_RETURN ( frag ) ;
if ( status < 0 ) {
ORTE_ERROR_LOG ( status ) ;
}
}
/**
* We have received an ACK from one of the targets that we previously
* forwarded a message to . However , given the one - to - many nature of
* IOF forwarding , we don ' t automatically forward that ACK on to the
* origin of the original message . Instead , we wait for * all * the
* targets of the original message to reply with the apprpriate ACK ,
* and * then * we forward the ACK on to the original message ' s origin .
*
* In this way , the origin will only broadcast as fast as the slowest
* target .
*
* Think of it this way : this function serves as a clearinghouse for
* ACKs . It will only push an ACK upstream to an origin when all the
* corresponding targets have ACK ' ed .
*/
void orte_iof_svc_sub_ack (
const orte_process_name_t * peer ,
orte_iof_base_msg_header_t * hdr ,
bool do_close )
{
opal_list_item_t * s_item ;
2007-06-12 17:30:54 +00:00
uint32_t seq_min = UINT32_MAX ;
2007-06-08 22:59:31 +00:00
uint32_t last_ack_forwarded = 0 ;
bool has_been_acked = false ;
union {
uint32_t uval ;
void * vval ;
} value ;
opal_output ( orte_iof_base . iof_output , " orte_iof_svc_proxy_ack " ) ;
if ( do_close ) {
opal_output ( orte_iof_base . iof_output , " CLOSE ACK! \n " ) ;
}
/* for each of the subscriptions that match the origin of the ACK:
* ( 1 ) find all forwarding entries that match the origin of the ACK
* ( 2 ) update their sequence number
* ( 3 ) find the minimum sequence number across all endpoints
*/
OPAL_THREAD_LOCK ( & mca_iof_svc_component . svc_lock ) ;
for ( s_item = opal_list_get_first ( & mca_iof_svc_component . svc_subscribed ) ;
s_item ! = opal_list_get_end ( & mca_iof_svc_component . svc_subscribed ) ;
s_item = opal_list_get_next ( s_item ) ) {
orte_iof_svc_sub_t * sub = ( orte_iof_svc_sub_t * ) s_item ;
opal_list_item_t * f_item ;
2007-07-20 02:34:29 +00:00
opal_output ( orte_iof_base . iof_output , " ack: checking sub origin %s tag %d / mask %x, target %s, tag %d / mask %x \n " ,
ORTE_NAME_PRINT ( & sub - > origin_name ) , sub - > origin_tag , sub - > origin_mask ,
ORTE_NAME_PRINT ( & sub - > target_name ) , sub - > target_tag , sub - > target_mask ) ;
2007-06-08 22:59:31 +00:00
/* If the subscription origin/tag doesn't match the ACK
origin / tag , skip it */
2007-08-01 18:59:37 +00:00
if ( ORTE_EQUAL ! = orte_ns . compare_fields ( sub - > origin_mask ,
& sub - > origin_name ,
& hdr - > msg_origin ) | |
2007-06-08 22:59:31 +00:00
sub - > origin_tag ! = hdr - > msg_tag ) {
continue ;
}
/* We match, so keep a running tally of whether the ACK has
been forwarded or not , and if so , how many bytes have been
ACK ' ed . */
has_been_acked | = sub - > has_been_acked ;
if ( sub - > has_been_acked ) {
if ( last_ack_forwarded > sub - > last_ack_forwarded ) {
last_ack_forwarded = sub - > last_ack_forwarded ;
}
}
2007-08-01 18:38:03 +00:00
opal_output ( orte_iof_base . iof_output ,
" ack: has_beed_acked: %d, last forwarded %d " ,
has_been_acked , last_ack_forwarded ) ;
/* If the subscription has a local endpoint and the ACK is
coming from this process , then update the seq_min
calculation */
if ( NULL ! = sub - > sub_endpoint & &
2007-08-01 18:59:37 +00:00
ORTE_EQUAL = = orte_ns . compare_fields ( ORTE_NS_CMP_ALL ,
orte_process_info . my_name ,
peer ) ) {
2007-08-01 18:38:03 +00:00
if ( do_close ) {
/* JMS what to do here? Need to set sub->sub_endpoint
to NULL . Have similar leak for do_close for
streams . See ticket # 1048. */
sub - > sub_endpoint = NULL ;
opal_output ( orte_iof_base . iof_output ,
" ack: CLOSED local ack to %u " , value . uval ) ;
} else {
value . uval = hdr - > msg_seq + hdr - > msg_len ;
opal_output ( orte_iof_base . iof_output ,
" ack: local ack to %u " , value . uval ) ;
if ( value . uval < seq_min ) {
seq_min = value . uval ;
}
}
}
2007-06-08 22:59:31 +00:00
/* Find the minimum amount ack'ed by all the origins (or,
technically speaking , ack ' ed by their proxies on their
behalf ) */
for ( f_item = opal_list_get_first ( & sub - > sub_forward ) ;
f_item ! = opal_list_get_end ( & sub - > sub_forward ) ;
f_item = opal_list_get_next ( f_item ) ) {
orte_iof_svc_fwd_t * fwd = ( orte_iof_svc_fwd_t * ) f_item ;
orte_iof_svc_pub_t * pub = fwd - > fwd_pub ;
bool value_set = true ;
2007-07-20 02:34:29 +00:00
opal_output ( orte_iof_base . iof_output , " ack: checking fwd %s tag %d / mask %x \n " ,
ORTE_NAME_PRINT ( & pub - > pub_name ) , pub - > pub_tag , pub - > pub_mask ) ;
2007-06-08 22:59:31 +00:00
/* If the publication origin or publication proxy matches
the ACK ' ing proxy , save the ACK ' ed byte count for this
* origin * ( not the proxy ) . */
2007-08-01 18:59:37 +00:00
if ( ORTE_EQUAL = = orte_ns . compare_fields ( pub - > pub_mask , & pub - > pub_name , peer ) | |
ORTE_EQUAL = = orte_ns . compare_fields ( ORTE_NS_CMP_ALL , & pub - > pub_proxy , peer ) ) {
2007-06-08 22:59:31 +00:00
opal_output ( orte_iof_base . iof_output ,
" ack: found matching pub " ) ;
/* If we're closing, then remove this proc from
the table - - we won ' t be using its value to
calculate seq_min anymore . Otherwise , put its
updated value in the table . */
if ( do_close ) {
orte_hash_table_remove_proc ( & fwd - > fwd_seq_hash ,
& hdr - > msg_origin ) ;
value_set = false ;
} else {
value . uval = hdr - > msg_seq + hdr - > msg_len ;
orte_hash_table_set_proc ( & fwd - > fwd_seq_hash ,
& hdr - > msg_origin , & value . vval ) ;
}
}
/* Otherwise, if the publication origin and publication
proxy do not match the ACK ' ing proxy , then lookup
whatever byte count was previously ACK ' ed for the origin
and use that to compute the minimum byte count ACK ' ed
so far .
As such , even though the logic is confusing , at the end
of this loop , seq_min will have the minimum number of
bytes ACK ' ed across all the forwards on this
subscription . */
else {
value . vval = orte_hash_table_get_proc ( & fwd - > fwd_seq_hash ,
& hdr - > msg_origin ) ;
}
/* If we got a valid value, update the seq_min calculation */
2007-06-12 17:30:54 +00:00
if ( value_set & & value . uval < seq_min ) {
seq_min = value . uval ;
2007-06-08 22:59:31 +00:00
}
}
}
OPAL_THREAD_UNLOCK ( & mca_iof_svc_component . svc_lock ) ;
2007-06-12 17:30:54 +00:00
/* If nothing changed ACK-wise (including the situation where we
are closing and there ' s no subscriber left to ACK ) , then we ' re
done . NOTE : this isn ' t technically right ; if there ' s no
subscribers left , we should do some more cleanup than this .
But that ' s coming in ticket # 1049 and / or # 1051. */
2007-06-08 22:59:31 +00:00
2007-06-12 17:30:54 +00:00
if ( seq_min = = UINT32_MAX ) {
2007-06-08 22:59:31 +00:00
return ;
}
/* If everyone has ACK'ed, then push the ACK up to the original
message ' s proxy */
2007-08-01 18:38:03 +00:00
if ( seq_min = = hdr - > msg_seq + hdr - > msg_len ) {
2007-06-08 22:59:31 +00:00
/* If the original message was initiated from this process,
then the ACK delivery is local . */
2007-08-01 18:59:37 +00:00
if ( ORTE_EQUAL = = orte_ns . compare_fields ( ORTE_NS_CMP_ALL ,
orte_process_info . my_name ,
& hdr - > msg_origin ) | |
ORTE_EQUAL = = orte_ns . compare_fields ( ORTE_NS_CMP_ALL ,
orte_process_info . my_name ,
& hdr - > msg_proxy ) ) {
2007-06-08 22:59:31 +00:00
orte_iof_base_endpoint_t * endpoint ;
2007-08-01 18:38:03 +00:00
endpoint = orte_iof_base_endpoint_match ( & hdr - > msg_origin ,
ORTE_NS_CMP_ALL ,
hdr - > msg_tag ) ;
if ( NULL ! = endpoint ) {
2007-06-08 22:59:31 +00:00
opal_output ( orte_iof_base . iof_output ,
" ack: forwarding ack locally: %u " , seq_min ) ;
orte_iof_base_endpoint_ack ( endpoint , seq_min ) ;
OBJ_RELEASE ( endpoint ) ;
}
}
/* Otherwise, the original message was initiated in another
process , and we need to forward the ACK to it . */
else {
orte_iof_base_frag_t * frag ;
int rc ;
ORTE_IOF_BASE_FRAG_ALLOC ( frag , rc ) ;
if ( NULL = = frag ) {
ORTE_ERROR_LOG ( rc ) ;
return ;
}
frag - > frag_hdr . hdr_msg = * hdr ;
frag - > frag_iov [ 0 ] . iov_base = ( IOVBASE_TYPE * ) & frag - > frag_hdr ;
frag - > frag_iov [ 0 ] . iov_len = sizeof ( frag - > frag_hdr ) ;
ORTE_IOF_BASE_HDR_MSG_HTON ( frag - > frag_hdr . hdr_msg ) ;
opal_output ( orte_iof_base . iof_output ,
" ack: forwarding ack remotely: %u " , seq_min ) ;
rc = orte_rml . send_nb (
& hdr - > msg_proxy ,
frag - > frag_iov ,
1 ,
ORTE_RML_TAG_IOF_SVC ,
0 ,
ack_send_cb ,
frag ) ;
if ( rc < 0 ) {
ORTE_ERROR_LOG ( rc ) ;
}
}
}
}
2005-03-29 19:40:38 +00:00
/**
* Delete all matching subscriptions .
*/
int orte_iof_svc_sub_delete (
2007-06-08 22:59:31 +00:00
const orte_process_name_t * origin_name ,
orte_ns_cmp_bitmask_t origin_mask ,
orte_iof_base_tag_t origin_tag ,
const orte_process_name_t * target_name ,
orte_ns_cmp_bitmask_t target_mask ,
orte_iof_base_tag_t target_tag )
2005-03-29 19:40:38 +00:00
{
2005-07-03 16:22:16 +00:00
opal_list_item_t * item ;
2005-07-03 22:45:48 +00:00
OPAL_THREAD_LOCK ( & mca_iof_svc_component . svc_lock ) ;
2005-07-03 16:22:16 +00:00
item = opal_list_get_first ( & mca_iof_svc_component . svc_subscribed ) ;
while ( item ! = opal_list_get_end ( & mca_iof_svc_component . svc_subscribed ) ) {
opal_list_item_t * next = opal_list_get_next ( item ) ;
2005-03-29 19:40:38 +00:00
orte_iof_svc_sub_t * sub = ( orte_iof_svc_sub_t * ) item ;
2007-06-08 22:59:31 +00:00
if ( sub - > origin_mask = = origin_mask & &
2007-08-01 18:59:37 +00:00
ORTE_EQUAL = = orte_ns . compare_fields ( sub - > origin_mask , & sub - > origin_name , origin_name ) & &
2007-06-08 22:59:31 +00:00
sub - > origin_tag = = origin_tag & &
sub - > target_mask = = target_mask & &
2007-08-01 18:59:37 +00:00
ORTE_EQUAL = = orte_ns . compare_fields ( sub - > target_mask , & sub - > target_name , target_name ) & &
2007-06-08 22:59:31 +00:00
sub - > target_tag = = target_tag ) {
2005-07-03 16:22:16 +00:00
opal_list_remove_item ( & mca_iof_svc_component . svc_subscribed , item ) ;
2005-03-29 19:40:38 +00:00
OBJ_RELEASE ( item ) ;
}
item = next ;
}
2005-07-03 22:45:48 +00:00
OPAL_THREAD_UNLOCK ( & mca_iof_svc_component . svc_lock ) ;
2005-03-29 19:40:38 +00:00
return ORTE_SUCCESS ;
}
2005-10-06 21:21:26 +00:00
int orte_iof_svc_sub_delete_all (
const orte_process_name_t * name )
{
opal_list_item_t * item ;
OPAL_THREAD_LOCK ( & mca_iof_svc_component . svc_lock ) ;
item = opal_list_get_first ( & mca_iof_svc_component . svc_subscribed ) ;
while ( item ! = opal_list_get_end ( & mca_iof_svc_component . svc_subscribed ) ) {
opal_list_item_t * next = opal_list_get_next ( item ) ;
orte_iof_svc_sub_t * sub = ( orte_iof_svc_sub_t * ) item ;
2007-06-08 22:59:31 +00:00
if ( ( sub - > origin_mask = = ORTE_NS_CMP_ALL & &
2007-08-01 18:59:37 +00:00
ORTE_EQUAL = = orte_ns . compare_fields ( ORTE_NS_CMP_ALL , & sub - > origin_name , name ) ) | |
2007-06-08 22:59:31 +00:00
( sub - > target_mask = = ORTE_NS_CMP_ALL & &
2007-08-01 18:59:37 +00:00
ORTE_EQUAL = = orte_ns . compare_fields ( ORTE_NS_CMP_ALL , & sub - > target_name , name ) ) ) {
2005-10-06 21:21:26 +00:00
opal_list_remove_item ( & mca_iof_svc_component . svc_subscribed , item ) ;
OBJ_RELEASE ( item ) ;
}
item = next ;
}
OPAL_THREAD_UNLOCK ( & mca_iof_svc_component . svc_lock ) ;
return ORTE_SUCCESS ;
}
2005-03-29 19:40:38 +00:00
/*
* Callback on send completion . Release send resources ( fragment ) .
*/
2007-06-08 22:59:31 +00:00
2005-03-29 19:40:38 +00:00
static void orte_iof_svc_sub_send_cb (
int status ,
orte_process_name_t * peer ,
struct iovec * msg ,
int count ,
2007-06-08 22:59:31 +00:00
orte_rml_tag_t tag ,
2005-03-29 19:40:38 +00:00
void * cbdata )
{
orte_iof_base_frag_t * frag = ( orte_iof_base_frag_t * ) cbdata ;
ORTE_IOF_BASE_FRAG_RETURN ( frag ) ;
if ( status < 0 ) {
ORTE_ERROR_LOG ( status ) ;
}
}
/**
* Check for matching endpoints that have been published to the
* server . Forward data out each matching endpoint .
*/
int orte_iof_svc_sub_forward (
orte_iof_svc_sub_t * sub ,
const orte_process_name_t * src ,
orte_iof_base_msg_header_t * hdr ,
2005-10-06 21:21:26 +00:00
const unsigned char * data ,
bool * forward )
2005-03-29 19:40:38 +00:00
{
2005-07-03 16:22:16 +00:00
opal_list_item_t * item ;
for ( item = opal_list_get_first ( & sub - > sub_forward ) ;
item ! = opal_list_get_end ( & sub - > sub_forward ) ;
item = opal_list_get_next ( item ) ) {
2005-03-29 19:40:38 +00:00
orte_iof_svc_fwd_t * fwd = ( orte_iof_svc_fwd_t * ) item ;
orte_iof_svc_pub_t * pub = fwd - > fwd_pub ;
int rc ;
2007-08-01 18:38:03 +00:00
if ( NULL ! = pub - > pub_endpoint ) {
opal_output ( orte_iof_base . iof_output , " sub_forward: forwarding to pub local endpoint " ) ;
2005-03-29 19:40:38 +00:00
rc = orte_iof_base_endpoint_forward ( pub - > pub_endpoint , src , hdr , data ) ;
} else {
/* forward */
orte_iof_base_frag_t * frag ;
2007-08-01 18:38:03 +00:00
opal_output ( orte_iof_base . iof_output , " sub_forward: forwarding to pub stream / remote endpoint " ) ;
2005-03-29 19:40:38 +00:00
ORTE_IOF_BASE_FRAG_ALLOC ( frag , rc ) ;
frag - > frag_hdr . hdr_msg = * hdr ;
frag - > frag_len = frag - > frag_hdr . hdr_msg . msg_len ;
2006-08-23 03:32:36 +00:00
frag - > frag_iov [ 0 ] . iov_base = ( IOVBASE_TYPE * ) & frag - > frag_hdr ;
2005-03-29 19:40:38 +00:00
frag - > frag_iov [ 0 ] . iov_len = sizeof ( frag - > frag_hdr ) ;
2006-08-23 03:32:36 +00:00
frag - > frag_iov [ 1 ] . iov_base = ( IOVBASE_TYPE * ) frag - > frag_data ;
2005-03-29 19:40:38 +00:00
frag - > frag_iov [ 1 ] . iov_len = frag - > frag_len ;
memcpy ( frag - > frag_data , data , frag - > frag_len ) ;
2007-06-08 22:59:31 +00:00
ORTE_IOF_BASE_HDR_MSG_HTON ( frag - > frag_hdr . hdr_msg ) ;
rc = orte_rml . send_nb (
2005-03-29 19:40:38 +00:00
& pub - > pub_proxy ,
frag - > frag_iov ,
2 ,
ORTE_RML_TAG_IOF_SVC ,
0 ,
orte_iof_svc_sub_send_cb ,
frag ) ;
}
2007-08-01 18:38:03 +00:00
if ( ORTE_SUCCESS ! = rc ) {
2005-03-29 19:40:38 +00:00
return rc ;
}
2005-10-06 21:21:26 +00:00
* forward = true ;
2005-03-29 19:40:38 +00:00
}
2007-08-01 18:38:03 +00:00
if ( NULL ! = sub - > sub_endpoint ) {
opal_output ( orte_iof_base . iof_output , " sub_forward: forwarding to sub local endpoint " ) ;
2005-10-06 21:21:26 +00:00
* forward = true ;
2005-03-29 19:40:38 +00:00
return orte_iof_base_endpoint_forward ( sub - > sub_endpoint , src , hdr , data ) ;
}
return ORTE_SUCCESS ;
}
/**
* I / O Forwarding entry - relates a published endpoint to
* a subscription .
*/
static void orte_iof_svc_fwd_construct ( orte_iof_svc_fwd_t * fwd )
{
fwd - > fwd_pub = NULL ;
2007-06-08 22:59:31 +00:00
OBJ_CONSTRUCT ( & fwd - > fwd_seq_hash , opal_hash_table_t ) ;
opal_hash_table_init ( & fwd - > fwd_seq_hash , 256 ) ;
2005-03-29 19:40:38 +00:00
}
static void orte_iof_svc_fwd_destruct ( orte_iof_svc_fwd_t * fwd )
{
2007-06-08 22:59:31 +00:00
if ( NULL ! = fwd - > fwd_pub ) {
2005-03-29 19:40:38 +00:00
OBJ_RELEASE ( fwd - > fwd_pub ) ;
2007-06-08 22:59:31 +00:00
}
OBJ_DESTRUCT ( & fwd - > fwd_seq_hash ) ;
2005-03-29 19:40:38 +00:00
}
OBJ_CLASS_INSTANCE (
orte_iof_svc_fwd_t ,
2005-07-03 16:22:16 +00:00
opal_list_item_t ,
2005-03-29 19:40:38 +00:00
orte_iof_svc_fwd_construct ,
orte_iof_svc_fwd_destruct ) ;
/**
* Does the published endpoint match the destination specified
* in the subscription ?
*/
bool orte_iof_svc_fwd_match (
orte_iof_svc_sub_t * sub ,
orte_iof_svc_pub_t * pub )
{
2007-08-01 18:59:37 +00:00
if ( ORTE_EQUAL = = orte_ns . compare_fields ( sub - > target_mask , & sub - > target_name , & pub - > pub_name ) & &
2007-06-08 22:59:31 +00:00
sub - > origin_tag = = pub - > pub_tag ) {
2005-03-29 19:40:38 +00:00
return true ;
} else {
return false ;
}
}
/**
* Create a forwarding entry
*/
int orte_iof_svc_fwd_create (
orte_iof_svc_sub_t * sub ,
orte_iof_svc_pub_t * pub )
{
orte_iof_svc_fwd_t * fwd = OBJ_NEW ( orte_iof_svc_fwd_t ) ;
if ( NULL = = fwd ) {
return ORTE_ERR_OUT_OF_RESOURCE ;
}
OBJ_RETAIN ( pub ) ;
fwd - > fwd_pub = pub ;
2007-07-20 02:34:29 +00:00
opal_output ( orte_iof_base . iof_output , " created svc forward, sub origin %s, tag %d / mask %x, sub target %s, tag %d / mask %x :::: pub name %s, tag %d / mask %x \n " ,
ORTE_NAME_PRINT ( & sub - > origin_name ) , sub - > origin_tag ,
2007-06-08 22:59:31 +00:00
sub - > origin_mask ,
2007-07-20 02:34:29 +00:00
ORTE_NAME_PRINT ( & sub - > target_name ) , sub - > target_tag ,
2007-06-08 22:59:31 +00:00
sub - > target_mask ,
2007-07-20 02:34:29 +00:00
ORTE_NAME_PRINT ( & pub - > pub_name ) , pub - > pub_tag , pub - > pub_mask ) ;
2005-07-03 16:22:16 +00:00
opal_list_append ( & sub - > sub_forward , & fwd - > super ) ;
2005-03-29 19:40:38 +00:00
return ORTE_SUCCESS ;
}
/**
* Remove any forwarding entries that match the
* published endpoint .
*/
int orte_iof_svc_fwd_delete (
orte_iof_svc_sub_t * sub ,
orte_iof_svc_pub_t * pub )
{
2005-07-03 16:22:16 +00:00
opal_list_item_t * item ;
for ( item = opal_list_get_first ( & sub - > sub_forward ) ;
item ! = opal_list_get_end ( & sub - > sub_forward ) ;
item = opal_list_get_next ( item ) ) {
2005-03-29 19:40:38 +00:00
orte_iof_svc_fwd_t * fwd = ( orte_iof_svc_fwd_t * ) item ;
if ( fwd - > fwd_pub = = pub ) {
2005-07-03 16:22:16 +00:00
opal_list_remove_item ( & sub - > sub_forward , item ) ;
2005-03-29 19:40:38 +00:00
OBJ_RELEASE ( fwd ) ;
return ORTE_SUCCESS ;
}
}
return ORTE_ERR_NOT_FOUND ;
}