2010-05-02 17:29:41 +00:00
/*
* Copyright ( c ) 2010 Cisco Systems , Inc . All rights reserved .
* $ COPYRIGHT $
*
* Additional copyrights may follow
*
* $ HEADER $
*
*/
# include "orte_config.h"
# include "orte/constants.h"
# include "opal/types.h"
# include <string.h>
# include <sys/types.h>
# ifdef HAVE_LIMITS_H
# include <limits.h>
# endif
# include <stdio.h>
# ifdef HAVE_FCNTL_H
# include <fcntl.h>
# endif
# ifdef HAVE_UNISTD_H
# include <unistd.h>
# endif
# include <sp.h>
# include "opal/class/opal_list.h"
# include "opal/opal_socket_errno.h"
# include "opal/util/output.h"
# include "opal/util/argv.h"
# include "opal/util/if.h"
# include "opal/util/net.h"
# include "opal/dss/dss.h"
# include "orte/runtime/orte_globals.h"
# include "orte/runtime/orte_wait.h"
# include "orte/mca/errmgr/errmgr.h"
# include "orte/util/name_fns.h"
# include "orte/util/show_help.h"
# include "orte/mca/rmcast/base/private.h"
# include "orte/mca/rmcast/base/base.h"
# include "rmcast_spread.h"
# define SPREAD_NAME "4803"
/* LOCAL DATA */
static opal_mutex_t lock ;
static opal_list_t recvs ;
static opal_list_t channels ;
static bool init_completed = false ;
static orte_rmcast_channel_t next_channel ;
static opal_pointer_array_t msg_log ;
static char private_group [ MAX_GROUP_NAME ] ;
static mailbox Mbox ;
/* LOCAL FUNCTIONS */
static void recv_handler ( int sd , short flags , void * user ) ;
static int setup_channel ( rmcast_base_channel_t * chan , uint8_t direction , mailbox Mbox ) ;
static void xmit_data ( int sd , short flags , void * send_req ) ;
/* LOCAL STRUCTURE VALUES */
static rmcast_base_channel_t * my_group_channel = NULL ;
/* API FUNCTIONS */
static int init ( void ) ;
static void finalize ( void ) ;
static int spread_send_buffer ( orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
opal_buffer_t * buf ) ;
static int spread_send_buffer_nb ( orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
opal_buffer_t * buf ,
orte_rmcast_callback_buffer_fn_t cbfunc ,
void * cbdata ) ;
static int spread_send ( orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
struct iovec * msg , int count ) ;
static int spread_send_nb ( orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
struct iovec * msg , int count ,
orte_rmcast_callback_fn_t cbfunc ,
void * cbdata ) ;
static int spread_recv_buffer ( orte_process_name_t * sender ,
orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
opal_buffer_t * buf ) ;
static int spread_recv_buffer_nb ( orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
orte_rmcast_flag_t flags ,
orte_rmcast_callback_buffer_fn_t cbfunc ,
void * cbdata ) ;
static int spread_recv ( orte_process_name_t * sender ,
orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
struct iovec * * msg , int * count ) ;
static int spread_recv_nb ( orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
orte_rmcast_flag_t flags ,
orte_rmcast_callback_fn_t cbfunc ,
void * cbdata ) ;
static void cancel_recv ( orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ) ;
static int open_channel ( orte_rmcast_channel_t * channel , char * name ,
char * network , int port , char * interface , uint8_t direction ) ;
static int close_channel ( orte_rmcast_channel_t channel ) ;
static orte_rmcast_channel_t query ( void ) ;
/* Define the module */
orte_rmcast_module_t orte_rmcast_spread_module = {
init ,
finalize ,
spread_send ,
spread_send_nb ,
spread_send_buffer ,
spread_send_buffer_nb ,
spread_recv ,
spread_recv_nb ,
spread_recv_buffer ,
spread_recv_buffer_nb ,
cancel_recv ,
open_channel ,
close_channel ,
query
} ;
static void SP_error2str ( int error , char * error_str )
{
switch ( error )
{
case ILLEGAL_SPREAD :
sprintf ( error_str , " SP_error: (%d) Illegal spread was provided \n " , error ) ;
break ;
case COULD_NOT_CONNECT :
sprintf ( error_str , " SP_error: (%d) Could not connect. Is Spread running? \n " , error ) ;
break ;
case REJECT_QUOTA :
sprintf ( error_str , " SP_error: (%d) Connection rejected, to many users \n " , error ) ;
break ;
case REJECT_NO_NAME :
sprintf ( error_str , " SP_error: (%d) Connection rejected, no name was supplied \n " , error ) ;
break ;
case REJECT_ILLEGAL_NAME :
sprintf ( error_str , " SP_error: (%d) Connection rejected, illegal name \n " , error ) ;
break ;
case REJECT_NOT_UNIQUE :
sprintf ( error_str , " SP_error: (%d) Connection rejected, name not unique \n " , error ) ;
break ;
case REJECT_VERSION :
sprintf ( error_str , " SP_error: (%d) Connection rejected, library does not fit daemon \n " , error ) ;
break ;
case CONNECTION_CLOSED :
sprintf ( error_str , " SP_error: (%d) Connection closed by spread \n " , error ) ;
break ;
case REJECT_AUTH :
sprintf ( error_str , " SP_error: (%d) Connection rejected, authentication failed \n " , error ) ;
break ;
case ILLEGAL_SESSION :
sprintf ( error_str , " SP_error: (%d) Illegal session was supplied \n " , error ) ;
break ;
case ILLEGAL_SERVICE :
sprintf ( error_str , " SP_error: (%d) Illegal service request \n " , error ) ;
break ;
case ILLEGAL_MESSAGE :
sprintf ( error_str , " SP_error: (%d) Illegal message \n " , error ) ;
break ;
case ILLEGAL_GROUP :
sprintf ( error_str , " SP_error: (%d) Illegal group \n " , error ) ;
break ;
case BUFFER_TOO_SHORT :
sprintf ( error_str , " SP_error: (%d) The supplied buffer was too short \n " , error ) ;
break ;
case GROUPS_TOO_SHORT :
sprintf ( error_str , " SP_error: (%d) The supplied groups list was too short \n " , error ) ;
break ;
case MESSAGE_TOO_LONG :
sprintf ( error_str , " SP_error: (%d) The message body + group names was too large to fit in a message \n " , error ) ;
break ;
case NET_ERROR_ON_SESSION :
sprintf ( error_str , " SP_error: (%d) The network socket experienced an error. This Spread mailbox will no longer work until the connection is disconnected and then reconnected \n " , error ) ;
break ;
default :
sprintf ( error_str , " SP_error: (%d) unrecognized error \n " , error ) ;
}
}
/* during init, we setup two channels for both xmit and recv:
* ( a ) a public address announcement channel . There are two variants
* of this :
* ( 1 ) system processes - e . g . , daemons , tools . This channel
* is reserved solely for their use in performing admin
* functions
* ( 2 ) application processes . This channel is used to announce
* their existence and contact info for auto - wireup
* ( b ) our own group ' s channel , which is where our own output
* will be sent . At this time , we assume that we always
* want to hear our peers , so this channels is also
* bidirectional
*
* In addition , the HNP opens a third channel which is used solely
* for cmd - control purposes . This is where a tool , for example , might
* send a cmd to the HNP to take some action - there is no point in
* having that cmd echo around to every daemon and / or other tool
* in the system .
*/
static int init ( void )
{
int rc ;
orte_rmcast_channel_t channel ;
if ( init_completed ) {
return ORTE_SUCCESS ;
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
if ( ( rc = SP_connect ( SPREAD_NAME , getlogin ( ) , 0 , 1 , & Mbox , private_group ) ) < 0 ) {
2010-05-04 02:38:37 +00:00
char error_string [ 1024 ] ;
SP_error2str ( rc , error_string ) ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" rmcast:spread: init SP_connect failed %s " ,
error_string ) ) ;
rc = ORTE_ERROR ;
return rc ;
2010-05-02 17:29:41 +00:00
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
init_completed = true ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output , " %s rmcast:spread: init called " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ) ) ;
/* setup the globals */
OBJ_CONSTRUCT ( & lock , opal_mutex_t ) ;
OBJ_CONSTRUCT ( & recvs , opal_list_t ) ;
OBJ_CONSTRUCT ( & channels , opal_list_t ) ;
next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS ;
OBJ_CONSTRUCT ( & msg_log , opal_pointer_array_t ) ;
opal_pointer_array_init ( & msg_log , 8 , INT_MAX , 8 ) ;
/* setup the respective public address channel */
if ( ORTE_PROC_IS_HNP | | ORTE_PROC_IS_DAEMON | | ORTE_PROC_IS_TOOL ) {
channel = ORTE_RMCAST_SYS_CHANNEL ;
if ( ORTE_SUCCESS ! = ( rc = open_channel ( & channel , " system " ,
NULL , - 1 , NULL , ORTE_RMCAST_BIDIR ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
return rc ;
}
} else if ( ORTE_PROC_IS_APP ) {
channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL ;
if ( ORTE_SUCCESS ! = ( rc = open_channel ( & channel , " app-announce " ,
NULL , - 1 , NULL , ORTE_RMCAST_BIDIR ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
return rc ;
}
/* setup our grp channel, if one was given */
if ( NULL ! = orte_rmcast_base . my_group_name ) {
channel = orte_rmcast_base . my_group_number ;
if ( ORTE_SUCCESS ! = ( rc = open_channel ( & channel , orte_rmcast_base . my_group_name ,
NULL , - 1 , NULL , ORTE_RMCAST_BIDIR ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
return rc ;
}
my_group_channel = ( rmcast_base_channel_t * ) opal_list_get_last ( & channels ) ;
}
} else {
opal_output ( 0 , " rmcast:spread:init - unknown process type " ) ;
return ORTE_ERR_SILENT ;
}
return ORTE_SUCCESS ;
}
static void finalize ( void )
{
opal_list_item_t * item ;
rmcast_recv_log_t * log ;
int j ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output , " %s rmcast:spread: finalize called " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ) ) ;
/* deconstruct the globals */
OPAL_THREAD_LOCK ( & lock ) ;
while ( NULL ! = ( item = opal_list_remove_first ( & recvs ) ) ) {
OBJ_RELEASE ( item ) ;
}
OBJ_DESTRUCT ( & recvs ) ;
while ( NULL ! = ( item = opal_list_remove_first ( & channels ) ) ) {
OBJ_RELEASE ( item ) ;
}
OBJ_DESTRUCT ( & channels ) ;
for ( j = 0 ; j < msg_log . size ; j + + ) {
if ( NULL ! = ( log = opal_pointer_array_get_item ( & msg_log , j ) ) ) {
OBJ_RELEASE ( log ) ;
}
}
OBJ_DESTRUCT ( & msg_log ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
OBJ_DESTRUCT ( & lock ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
return ;
}
static void internal_snd_cb ( int status ,
orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
orte_process_name_t * sender ,
struct iovec * msg , int count , void * cbdata )
{
( ( rmcast_base_send_t * ) cbdata ) - > send_complete = true ;
}
static void internal_snd_buf_cb ( int status ,
orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
orte_process_name_t * sender ,
opal_buffer_t * buf , void * cbdata )
{
( ( rmcast_base_send_t * ) cbdata ) - > send_complete = true ;
}
static rmcast_base_channel_t * get_chan_from_name ( char * name )
{
rmcast_base_channel_t * ret = NULL ;
opal_list_item_t * item ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
for ( item = opal_list_get_first ( & channels ) ;
item ! = opal_list_get_end ( & channels ) ;
item = opal_list_get_next ( item ) ) {
ret = ( rmcast_base_channel_t * ) item ;
if ( ! strcasecmp ( name , ret - > name ) ) {
2010-05-04 02:38:37 +00:00
return ret ;
2010-05-02 17:29:41 +00:00
}
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
return NULL ;
}
static int queue_xmit ( rmcast_base_send_t * snd ,
orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag )
{
opal_list_item_t * item ;
rmcast_base_channel_t * chptr , * ch ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread: queue_xmit to %d:%d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
channel , tag ) ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
/* if we were asked to send this on our group output
* channel , substitute it
*/
2010-05-03 04:07:14 +00:00
if ( ORTE_RMCAST_GROUP_CHANNEL = = channel ) {
2010-05-02 17:29:41 +00:00
if ( NULL = = my_group_channel ) {
return ORTE_ERR_NOT_FOUND ;
}
ch = my_group_channel ;
goto process ;
}
/* find the channel */
ch = NULL ;
for ( item = opal_list_get_first ( & channels ) ;
item ! = opal_list_get_end ( & channels ) ;
item = opal_list_get_next ( item ) ) {
chptr = ( rmcast_base_channel_t * ) item ;
if ( channel = = chptr - > channel ) {
ch = chptr ;
break ;
}
}
if ( NULL = = ch ) {
/* didn't find it */
return ORTE_ERR_NOT_FOUND ;
}
process :
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread: send of %d %s "
" called on multicast channel %s " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
( NULL = = snd - > iovec_array ) ? ( int ) snd - > buf - > bytes_used : ( int ) snd - > iovec_count ,
( NULL = = snd - > iovec_array ) ? " bytes " : " iovecs " ,
ch - > name ) ) ;
/* add it to this channel's pending sends */
OPAL_THREAD_LOCK ( & ch - > send_lock ) ;
opal_list_append ( & ch - > pending_sends , & snd - > item ) ;
/* do we need to start the send event? */
if ( ! ch - > sends_in_progress ) {
opal_event_add ( & ch - > send_ev , 0 ) ;
ch - > sends_in_progress = true ;
}
OPAL_THREAD_UNLOCK ( & ch - > send_lock ) ;
return ORTE_SUCCESS ;
}
static int spread_send ( orte_rmcast_channel_t channel ,
2010-05-04 02:38:37 +00:00
orte_rmcast_tag_t tag ,
struct iovec * msg , int count )
2010-05-02 17:29:41 +00:00
{
rmcast_base_send_t * snd ;
int ret = ORTE_ERROR ;
/* queue it to be sent - preserves order! */
snd = OBJ_NEW ( rmcast_base_send_t ) ;
snd - > iovec_array = msg ;
snd - > iovec_count = count ;
snd - > tag = tag ;
snd - > cbfunc_iovec = internal_snd_cb ;
snd - > cbdata = snd ;
snd - > send_complete = false ;
if ( ( snd - > cbdata = = NULL ) | | ( ORTE_SUCCESS ! = ( ret = queue_xmit ( snd , channel , tag ) ) ) ) {
ORTE_ERROR_LOG ( ret ) ;
return ret ;
}
/* now wait for the send to complete */
ORTE_PROGRESSED_WAIT ( snd - > send_complete , 0 , 1 ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
return ORTE_SUCCESS ;
}
static int spread_send_nb ( orte_rmcast_channel_t channel ,
2010-05-04 02:38:37 +00:00
orte_rmcast_tag_t tag ,
struct iovec * msg , int count ,
orte_rmcast_callback_fn_t cbfunc ,
void * cbdata )
2010-05-02 17:29:41 +00:00
{
int ret ;
rmcast_base_send_t * snd ;
/* queue it to be sent - preserves order! */
snd = OBJ_NEW ( rmcast_base_send_t ) ;
snd - > iovec_array = msg ;
snd - > iovec_count = count ;
snd - > tag = tag ;
snd - > cbfunc_iovec = cbfunc ;
snd - > cbdata = cbdata ;
if ( ORTE_SUCCESS ! = ( ret = queue_xmit ( snd , channel , tag ) ) ) {
ORTE_ERROR_LOG ( ret ) ;
return ret ;
}
return ORTE_SUCCESS ;
}
static int spread_send_buffer ( orte_rmcast_channel_t channel ,
2010-05-04 02:38:37 +00:00
orte_rmcast_tag_t tag ,
opal_buffer_t * buf )
2010-05-02 17:29:41 +00:00
{
2010-05-04 02:38:37 +00:00
int ret ;
2010-05-02 17:29:41 +00:00
rmcast_base_send_t * snd ;
/* queue it to be sent - preserves order! */
snd = OBJ_NEW ( rmcast_base_send_t ) ;
snd - > buf = buf ;
snd - > tag = tag ;
snd - > cbfunc_buffer = internal_snd_buf_cb ;
snd - > cbdata = snd ;
snd - > send_complete = false ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
if ( ORTE_SUCCESS ! = ( ret = queue_xmit ( snd , channel , tag ) ) ) {
ORTE_ERROR_LOG ( ret ) ;
OBJ_RELEASE ( snd ) ;
return ret ;
}
/* now wait for the send to complete */
ORTE_PROGRESSED_WAIT ( snd - > send_complete , 0 , 1 ) ;
return ORTE_SUCCESS ;
}
static int spread_send_buffer_nb ( orte_rmcast_channel_t channel ,
2010-05-04 02:38:37 +00:00
orte_rmcast_tag_t tag ,
opal_buffer_t * buf ,
orte_rmcast_callback_buffer_fn_t cbfunc ,
void * cbdata )
2010-05-02 17:29:41 +00:00
{
int ret ;
rmcast_base_send_t * snd ;
/* queue it to be sent - preserves order! */
snd = OBJ_NEW ( rmcast_base_send_t ) ;
snd - > buf = buf ;
snd - > tag = tag ;
snd - > cbfunc_buffer = cbfunc ;
snd - > cbdata = cbdata ;
if ( ORTE_SUCCESS ! = ( ret = queue_xmit ( snd , channel , tag ) ) ) {
ORTE_ERROR_LOG ( ret ) ;
OBJ_RELEASE ( snd ) ;
return ret ;
}
return ORTE_SUCCESS ;
}
static int queue_recv ( rmcast_base_recv_t * recvptr ,
orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
orte_rmcast_callback_fn_t cbfunc_iovec ,
orte_rmcast_callback_buffer_fn_t cbfunc_buffer ,
bool blocking )
{
opal_list_item_t * item ;
rmcast_base_channel_t * ch , * chptr ;
rmcast_base_recv_t * rptr ;
/* find the channel */
ch = NULL ;
for ( item = opal_list_get_first ( & channels ) ;
item ! = opal_list_get_end ( & channels ) ;
item = opal_list_get_next ( item ) ) {
chptr = ( rmcast_base_channel_t * ) item ;
if ( channel = = chptr - > channel ) {
ch = chptr ;
break ;
}
}
if ( NULL = = ch ) {
/* didn't find it */
return ORTE_ERR_NOT_FOUND ;
}
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread: queue_recv called on spread channel %s tag %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , ch - > name , tag ) ) ;
if ( ! blocking ) {
/* do we already have a recv for this channel/tag/type? */
OPAL_THREAD_LOCK ( & lock ) ;
for ( item = opal_list_get_first ( & recvs ) ;
item ! = opal_list_get_end ( & recvs ) ;
item = opal_list_get_next ( item ) ) {
rptr = ( rmcast_base_recv_t * ) item ;
if ( channel ! = rptr - > channel ) {
/* different channel */
continue ;
}
if ( tag ! = rptr - > tag ) {
/* different tag */
continue ;
}
if ( ( NULL ! = cbfunc_iovec & & NULL ! = rptr - > cbfunc_iovec ) | |
( NULL ! = cbfunc_buffer & & NULL ! = rptr - > cbfunc_buffer ) ) {
/* matching type - recv already in place */
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread: matching recv already active on spread channel %s tag %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , ch - > name , tag ) ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
return ORTE_EXISTS ;
}
}
OPAL_THREAD_UNLOCK ( & lock ) ;
}
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread: adding non-blocking recv on spread channel %s tag %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , ch - > name , tag ) ) ;
OPAL_THREAD_LOCK ( & lock ) ;
opal_list_append ( & recvs , & recvptr - > item ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
return ORTE_SUCCESS ;
}
static int spread_recv ( orte_process_name_t * name ,
2010-05-04 02:38:37 +00:00
orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
struct iovec * * msg , int * count )
2010-05-02 17:29:41 +00:00
{
rmcast_base_recv_t * recvptr ;
int ret ;
recvptr = OBJ_NEW ( rmcast_base_recv_t ) ;
recvptr - > iovecs_requested = true ;
recvptr - > channel = channel ;
recvptr - > tag = tag ;
if ( ORTE_SUCCESS ! = ( ret = queue_recv ( recvptr , channel , tag , NULL , NULL , true ) ) ) {
ORTE_ERROR_LOG ( ret ) ;
OBJ_RELEASE ( recvptr ) ;
return ret ;
}
ORTE_PROGRESSED_WAIT ( recvptr - > recvd , 0 , 1 ) ;
/* xfer the data */
if ( NULL ! = name ) {
/* caller requested id of sender */
name - > jobid = recvptr - > name . jobid ;
name - > vpid = recvptr - > name . vpid ;
}
* msg = recvptr - > iovec_array ;
* count = recvptr - > iovec_count ;
/* remove the recv */
OPAL_THREAD_LOCK ( & lock ) ;
opal_list_remove_item ( & recvs , & recvptr - > item ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
OBJ_RELEASE ( recvptr ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
return ORTE_SUCCESS ;
}
static int spread_recv_nb ( orte_rmcast_channel_t channel ,
2010-05-04 02:38:37 +00:00
orte_rmcast_tag_t tag ,
orte_rmcast_flag_t flags ,
orte_rmcast_callback_fn_t cbfunc , void * cbdata )
2010-05-02 17:29:41 +00:00
{
rmcast_base_recv_t * recvptr ;
int ret ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread: recv_nb called on channel %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , channel ) ) ;
recvptr = OBJ_NEW ( rmcast_base_recv_t ) ;
recvptr - > iovecs_requested = true ;
recvptr - > channel = channel ;
recvptr - > tag = tag ;
recvptr - > flags = flags ;
recvptr - > cbfunc_iovec = cbfunc ;
recvptr - > cbdata = cbdata ;
if ( ORTE_SUCCESS ! = ( ret = queue_recv ( recvptr , channel , tag , cbfunc , NULL , false ) ) ) {
if ( ORTE_EXISTS = = ret ) {
/* this recv already exists - just release the copy */
OBJ_RELEASE ( recvptr ) ;
return ORTE_SUCCESS ;
}
ORTE_ERROR_LOG ( ret ) ;
OBJ_RELEASE ( recvptr ) ;
return ret ;
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
return ORTE_SUCCESS ;
}
static int spread_recv_buffer ( orte_process_name_t * name ,
2010-05-04 02:38:37 +00:00
orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag ,
opal_buffer_t * buf )
2010-05-02 17:29:41 +00:00
{
rmcast_base_recv_t * recvptr ;
int ret ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread: recv_buffer called on multicast channel %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , channel ) ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
recvptr = OBJ_NEW ( rmcast_base_recv_t ) ;
recvptr - > channel = channel ;
recvptr - > tag = tag ;
if ( ORTE_SUCCESS ! = ( ret = queue_recv ( recvptr , channel , tag , NULL , NULL , true ) ) ) {
ORTE_ERROR_LOG ( ret ) ;
goto cleanup ;
}
ORTE_PROGRESSED_WAIT ( recvptr - > recvd , 0 , 1 ) ;
/* xfer the data */
if ( NULL ! = name ) {
/* caller requested id of sender */
name - > jobid = recvptr - > name . jobid ;
name - > vpid = recvptr - > name . vpid ;
}
if ( ORTE_SUCCESS ! = ( ret = opal_dss . copy_payload ( buf , recvptr - > buf ) ) ) {
ORTE_ERROR_LOG ( ret ) ;
}
/* release the data */
OBJ_RELEASE ( recvptr - > buf ) ;
cleanup :
OPAL_THREAD_LOCK ( & lock ) ;
opal_list_remove_item ( & recvs , & recvptr - > item ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
OBJ_RELEASE ( recvptr ) ;
return ret ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
}
static int spread_recv_buffer_nb ( orte_rmcast_channel_t channel ,
2010-05-04 02:38:37 +00:00
orte_rmcast_tag_t tag ,
orte_rmcast_flag_t flags ,
orte_rmcast_callback_buffer_fn_t cbfunc , void * cbdata )
2010-05-02 17:29:41 +00:00
{
rmcast_base_recv_t * recvptr ;
int ret ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:udp: recv_buffer_nb called on multicast channel %d tag %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , channel , tag ) ) ;
recvptr = OBJ_NEW ( rmcast_base_recv_t ) ;
recvptr - > channel = channel ;
recvptr - > tag = tag ;
recvptr - > flags = flags ;
recvptr - > cbfunc_buffer = cbfunc ;
recvptr - > cbdata = cbdata ;
if ( ORTE_SUCCESS ! = ( ret = queue_recv ( recvptr , channel , tag , NULL , cbfunc , false ) ) ) {
if ( ORTE_EXISTS = = ret ) {
/* this recv already exists - just release the copy */
OBJ_RELEASE ( recvptr ) ;
return ORTE_SUCCESS ;
}
ORTE_ERROR_LOG ( ret ) ;
OBJ_RELEASE ( recvptr ) ;
return ret ;
}
return ORTE_SUCCESS ;
}
static void cancel_recv ( orte_rmcast_channel_t channel ,
orte_rmcast_tag_t tag )
{
opal_list_item_t * item , * next ;
rmcast_base_recv_t * ptr ;
/* find all recv's for this channel and tag */
item = opal_list_get_first ( & recvs ) ;
while ( item ! = opal_list_get_end ( & recvs ) ) {
next = opal_list_get_next ( item ) ;
ptr = ( rmcast_base_recv_t * ) item ;
if ( channel = = ptr - > channel & &
tag = = ptr - > tag ) {
OPAL_THREAD_LOCK ( & lock ) ;
opal_list_remove_item ( & recvs , & ptr - > item ) ;
OBJ_RELEASE ( ptr ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
}
item = next ;
}
}
static int open_channel ( orte_rmcast_channel_t * channel , char * name ,
char * network , int port , char * interface , uint8_t direction )
{
opal_list_item_t * item ;
rmcast_base_channel_t * nchan , * chan ;
int rc ;
/*
* We ignore the network and interface parameters for the
* Spread component .
*/
/* see if this name has already been assigned a channel */
OPAL_OUTPUT_VERBOSE ( ( 7 , orte_rmcast_base . rmcast_output ,
" %s open_channel: searching for %s:%d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , name , * channel ) ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
chan = NULL ;
for ( item = opal_list_get_first ( & channels ) ;
item ! = opal_list_get_end ( & channels ) ;
item = opal_list_get_next ( item ) ) {
nchan = ( rmcast_base_channel_t * ) item ;
OPAL_OUTPUT_VERBOSE ( ( 7 , orte_rmcast_base . rmcast_output ,
" %s open_channel: channel %s:%d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
nchan - > name , * channel ) ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
if ( nchan - > channel = = * channel | |
0 = = strcasecmp ( nchan - > name , name ) ) {
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
/* check the channel, if one was given */
if ( ORTE_RMCAST_INVALID_CHANNEL ! = * channel & &
nchan - > channel ! = * channel ) {
continue ;
}
chan = nchan ;
break ;
}
}
if ( NULL ! = chan ) {
/* already exists - check that the requested
* sockets are setup
*/
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread using existing channel %s:%d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
chan - > name , chan - > channel ) ) ;
if ( ORTE_SUCCESS ! = ( rc = setup_channel ( chan , direction , Mbox ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
return rc ;
}
return ORTE_SUCCESS ;
}
/* we didn't find an existing match, so create a new channel */
chan = OBJ_NEW ( rmcast_base_channel_t ) ;
chan - > name = strdup ( name ) ;
/* if we were given a channel, then just use it */
if ( ORTE_RMCAST_INVALID_CHANNEL ! = * channel ) {
chan - > channel = * channel ;
} else {
chan - > channel = next_channel + + ;
* channel = chan - > channel ;
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
OPAL_THREAD_LOCK ( & lock ) ;
opal_list_append ( & channels , & chan - > item ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread opening new channel %s:%d for%s%s " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
chan - > name , chan - > channel ,
( ORTE_RMCAST_RECV & direction ) ? " RECV " : " " ,
( ORTE_RMCAST_XMIT & direction ) ? " XMIT " : " " ) ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
if ( ORTE_SUCCESS ! = ( rc = setup_channel ( chan , direction , Mbox ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
return rc ;
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
return ORTE_SUCCESS ;
}
static int close_channel ( orte_rmcast_channel_t channel )
{
opal_list_item_t * item ;
rmcast_base_channel_t * chan ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
OPAL_THREAD_LOCK ( & lock ) ;
for ( item = opal_list_get_first ( & channels ) ;
item ! = opal_list_get_end ( & channels ) ;
item = opal_list_get_next ( item ) ) {
chan = ( rmcast_base_channel_t * ) item ;
if ( channel = = chan - > channel ) {
opal_list_remove_item ( & channels , item ) ;
OBJ_RELEASE ( chan ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
return ORTE_SUCCESS ;
}
}
OPAL_THREAD_UNLOCK ( & lock ) ;
return ORTE_ERR_NOT_FOUND ;
}
static orte_rmcast_channel_t query ( void )
{
return orte_rmcast_base . my_group_number ;
}
static int setup_channel ( rmcast_base_channel_t * chan , uint8_t direction , mailbox Mbox )
{
if ( 0 < = chan - > xmit & & 0 < = chan - > recv ) {
/* already setup */
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s setup:channel %d already setup " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
chan - > channel ) ) ;
return ORTE_SUCCESS ;
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
SP_join ( Mbox , chan - > name ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
if ( 0 > chan - > xmit & & ORTE_RMCAST_XMIT & direction ) {
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
chan - > xmit = Mbox ;
chan - > send_data = ( uint8_t * ) malloc ( mca_rmcast_spread_component . max_msg_size ) ;
/* setup the event to xmit messages, but don't activate it */
opal_event_set ( & chan - > send_ev , chan - > xmit , OPAL_EV_WRITE , xmit_data , chan ) ;
}
if ( 0 > chan - > recv & & ORTE_RMCAST_RECV & direction ) {
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
chan - > recv = Mbox ;
/* setup an event to catch messages */
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s setup:channel activating recv event on fd %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , ( int ) chan - > recv ) ) ;
opal_event_set ( & chan - > recv_ev , chan - > recv , OPAL_EV_READ | OPAL_EV_PERSIST , recv_handler , chan ) ;
opal_event_add ( & chan - > recv_ev , 0 ) ;
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
return ORTE_SUCCESS ;
}
static void xmit_data ( int sd , short flags , void * send_req )
{
rmcast_base_channel_t * chan = ( rmcast_base_channel_t * ) send_req ;
rmcast_base_send_t * snd ;
opal_list_item_t * item ;
char * bytes ;
int32_t sz , outbound ;
int rc ;
int8_t flag ;
opal_buffer_t buf ;
int32_t tmp32 ;
rmcast_send_log_t * log , * lg ;
OPAL_THREAD_LOCK ( & chan - > send_lock ) ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s transmitting data for channel %d(%s) " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , chan - > channel ,
2010-05-04 02:38:37 +00:00
chan - > name ) ) ;
2010-05-02 17:29:41 +00:00
while ( NULL ! = ( item = opal_list_remove_first ( & chan - > pending_sends ) ) ) {
snd = ( rmcast_base_send_t * ) item ;
/* setup a tmp buffer for a working area */
OBJ_CONSTRUCT ( & buf , opal_buffer_t ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
/* start the send data area with our header */
ORTE_MULTICAST_MESSAGE_HDR_HTON ( chan - > send_data , snd - > tag , chan - > seq_num ) ;
/* are we sending a buffer? */
if ( NULL = = snd - > buf ) {
/* flag the buffer as containing iovecs */
flag = 0 ;
if ( ORTE_SUCCESS ! = ( rc = opal_dss . pack ( & buf , & flag , 1 , OPAL_INT8 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto CLEANUP ;
}
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s packing %d iovecs " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
snd - > iovec_count ) ) ;
/* pack the number of iovecs */
if ( ORTE_SUCCESS ! = ( rc = opal_dss . pack ( & buf , & snd - > iovec_count , 1 , OPAL_INT32 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto CLEANUP ;
}
/* pack each iovec into a buffer in prep for sending
* so we can recreate the array at the other end
*/
for ( sz = 0 ; sz < snd - > iovec_count ; sz + + ) {
/* pack the size */
tmp32 = snd - > iovec_array [ sz ] . iov_len ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s packing %d bytes for iovec %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
tmp32 , sz ) ) ;
if ( ORTE_SUCCESS ! = ( rc = opal_dss . pack ( & buf , & tmp32 , 1 , OPAL_INT32 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto CLEANUP ;
}
if ( 0 < tmp32 ) {
/* pack the bytes */
if ( ORTE_SUCCESS ! = ( rc = opal_dss . pack ( & buf , snd - > iovec_array [ sz ] . iov_base , tmp32 , OPAL_UINT8 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto CLEANUP ;
}
}
}
} else {
/* flag it as being a buffer */
flag = 1 ;
if ( ORTE_SUCCESS ! = ( rc = opal_dss . pack ( & buf , & flag , 1 , OPAL_INT8 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto CLEANUP ;
}
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s copying payload " , ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ) ) ;
/* copy the payload */
if ( ORTE_SUCCESS ! = ( rc = opal_dss . copy_payload ( & buf , snd - > buf ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto CLEANUP ;
}
}
/* store the working buf in the send ring buffer in case we
* need to retransmit it later
*/
log = OBJ_NEW ( rmcast_send_log_t ) ;
log - > channel = chan - > channel ;
log - > seq_num = chan - > seq_num ;
opal_dss . copy_payload ( log - > buf , & buf ) ;
if ( NULL ! = ( lg = ( rmcast_send_log_t * ) opal_ring_buffer_push ( & chan - > cache , log ) ) ) {
/* release the old message */
OPAL_OUTPUT_VERBOSE ( ( 5 , orte_rmcast_base . rmcast_output ,
" %s releasing message %d channel %d from log " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
lg - > seq_num , lg - > channel ) ) ;
OBJ_RELEASE ( lg ) ;
}
/* unload the working buf to obtain the payload */
if ( ORTE_SUCCESS ! = ( rc = opal_dss . unload ( & buf , ( void * * ) & bytes , & sz ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto CLEANUP ;
}
/* done with the working buf */
OBJ_DESTRUCT ( & buf ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
/* add the payload, up to the limit */
ORTE_MULTICAST_LOAD_MESSAGE ( chan - > send_data , bytes , sz ,
mca_rmcast_spread_component . max_msg_size ,
& outbound ) ;
if ( outbound < 0 ) {
/* message was too large */
opal_output ( 0 , " %s message to multicast network %03d.%03d.%03d.%03d failed - size %d was too large (limit: %d) " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , OPAL_IF_FORMAT_ADDR ( chan - > network ) ,
- 1 * outbound , mca_rmcast_spread_component . max_msg_size ) ;
if ( 1 = = flag ) {
/* reload into original buffer */
if ( ORTE_SUCCESS ! = ( rc = opal_dss . load ( snd - > buf , ( void * ) bytes , sz ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
}
}
/* cleanup */
goto CLEANUP ;
}
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread multicasting %d bytes to group %s tag %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , outbound ,
chan - > name , ( int ) snd - > tag ) ) ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
if ( outbound ! = ( rc = SP_multicast ( chan - > xmit , RELIABLE_MESS , chan - > name , 0 , outbound , ( const char * ) chan - > send_data ) ) ) {
/* didn't get the message out */
opal_output ( 0 , " %s failed to send message to spread group %s " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , chan - > name ) ;
/* cleanup */
goto CLEANUP ;
}
if ( 1 = = flag ) {
/* call the cbfunc if required */
if ( NULL ! = snd - > cbfunc_buffer ) {
snd - > cbfunc_buffer ( rc , chan - > channel , snd - > tag ,
ORTE_PROC_MY_NAME , snd - > buf , snd - > cbdata ) ;
}
} else {
/* call the cbfunc if required */
if ( NULL ! = snd - > cbfunc_iovec ) {
snd - > cbfunc_iovec ( rc , chan - > channel , snd - > tag , ORTE_PROC_MY_NAME ,
snd - > iovec_array , snd - > iovec_count , snd - > cbdata ) ;
}
}
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
/* roll to next message sequence number */
ORTE_MULTICAST_NEXT_SEQUENCE_NUM ( chan - > seq_num ) ;
2010-05-04 02:38:37 +00:00
CLEANUP :
2010-05-02 17:29:41 +00:00
/* cleanup */
OBJ_RELEASE ( item ) ;
}
/* cleanup */
opal_event_del ( & chan - > send_ev ) ;
chan - > sends_in_progress = false ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
OPAL_THREAD_UNLOCK ( & chan - > send_lock ) ;
}
/**** LOCAL FUNCTIONS ****/
static void process_recv ( int fd , short event , void * cbdata )
{
orte_mcast_msg_event_t * msg = ( orte_mcast_msg_event_t * ) cbdata ;
rmcast_base_channel_t * chan = msg - > channel ;
opal_list_item_t * item ;
rmcast_base_recv_t * ptr ;
orte_process_name_t name ;
orte_rmcast_tag_t tag ;
opal_buffer_t buf ;
int8_t flag ;
struct iovec * iovec_array = NULL ;
int32_t iovec_count = 0 , i , sz , n ;
opal_buffer_t * recvd_buf = NULL ;
int rc ;
orte_rmcast_seq_t recvd_seq_num ;
rmcast_recv_log_t * log , * lg ;
2010-05-04 02:38:37 +00:00
2010-05-02 17:29:41 +00:00
/* extract the header */
ORTE_MULTICAST_MESSAGE_HDR_NTOH ( msg - > data , & name , tag , recvd_seq_num ) ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv sender: %s tag: %d seq_num: %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
ORTE_NAME_PRINT ( & name ) , ( int ) tag , recvd_seq_num ) ) ;
/* if this message is from myself, ignore it */
if ( name . jobid = = ORTE_PROC_MY_NAME - > jobid & & name . vpid = = ORTE_PROC_MY_NAME - > vpid ) {
OPAL_OUTPUT_VERBOSE ( ( 10 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv sent from myself: %s " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
ORTE_NAME_PRINT ( & name ) ) ) ;
goto cleanup ;
}
/* if this message is from a different job family, ignore it unless
* it is on the system channel . We ignore these messages to avoid
* confusion between different jobs since we all may be sharing
* multicast channels . The system channel is left open to support
* cross - job communications via the HNP .
*/
if ( ORTE_JOB_FAMILY ( name . jobid ) ! = ORTE_JOB_FAMILY ( ORTE_PROC_MY_NAME - > jobid ) ) {
/* if the channel is other than the system channel, ignore it */
if ( ORTE_RMCAST_SYS_CHANNEL ! = chan - > channel ) {
OPAL_OUTPUT_VERBOSE ( ( 10 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv from a different job family: %s " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
ORTE_NAME_PRINT ( & name ) ) ) ;
goto cleanup ;
}
/* if I am other than the HNP or a tool, ignore it */
if ( ! ORTE_PROC_IS_HNP & & ! ORTE_PROC_IS_TOOL ) {
OPAL_OUTPUT_VERBOSE ( ( 10 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv from a different job family: %s " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
ORTE_NAME_PRINT ( & name ) ) ) ;
goto cleanup ;
}
}
/* construct the buffer for unpacking */
OBJ_CONSTRUCT ( & buf , opal_buffer_t ) ;
/* unload the message */
ORTE_MULTICAST_UNLOAD_MESSAGE ( & buf , msg - > data , msg - > sz ) ;
/* unpack the iovec vs buf flag */
n = 1 ;
if ( ORTE_SUCCESS ! = ( rc = opal_dss . unpack ( & buf , & flag , & n , OPAL_INT8 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto cleanup ;
}
/* find the recv for this channel, tag, and type */
for ( item = opal_list_get_first ( & recvs ) ;
item ! = opal_list_get_end ( & recvs ) ;
item = opal_list_get_next ( item ) ) {
ptr = ( rmcast_base_recv_t * ) item ;
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv checking channel %d tag %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
( int ) ptr - > channel , ( int ) ptr - > tag ) ) ;
if ( chan - > channel ! = ptr - > channel ) {
continue ;
}
if ( tag ! = ptr - > tag & & ORTE_RMCAST_TAG_WILDCARD ! = ptr - > tag ) {
continue ;
}
if ( 0 = = flag & & ! ptr - > iovecs_requested ) {
/* it's an iovec and this recv is for buffers */
continue ;
}
if ( 1 = = flag & & ptr - > iovecs_requested ) {
/* it's a buffer and this recv is for iovecs */
continue ;
}
/* we have a recv - unpack the data */
if ( 0 = = flag ) {
/* get the number of iovecs in the buffer */
n = 1 ;
if ( ORTE_SUCCESS ! = ( rc = opal_dss . unpack ( & buf , & iovec_count , & n , OPAL_INT32 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto cleanup ;
}
/* malloc the required space */
iovec_array = ( struct iovec * ) malloc ( iovec_count * sizeof ( struct iovec ) ) ;
/* unpack the iovecs */
for ( i = 0 ; i < iovec_count ; i + + ) {
/* unpack the number of bytes in this iovec */
n = 1 ;
if ( ORTE_SUCCESS ! = ( rc = opal_dss . unpack ( & buf , & sz , & n , OPAL_INT32 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto cleanup ;
}
iovec_array [ i ] . iov_base = NULL ;
iovec_array [ i ] . iov_len = sz ;
if ( 0 < sz ) {
/* allocate the space */
iovec_array [ i ] . iov_base = ( uint8_t * ) malloc ( sz ) ;
/* unpack the data */
if ( ORTE_SUCCESS ! = ( rc = opal_dss . unpack ( & buf , iovec_array [ i ] . iov_base , & sz , OPAL_UINT8 ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto cleanup ;
}
}
}
} else {
/* buffer was included */
recvd_buf = OBJ_NEW ( opal_buffer_t ) ;
if ( ORTE_SUCCESS ! = ( rc = opal_dss . copy_payload ( recvd_buf , & buf ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto cleanup ;
}
}
/* if the sender's vpid is invalid, then this is a request for
* assignment of a name - so don ' t log the message
*/
if ( ORTE_VPID_INVALID = = name . vpid ) {
goto MATCH ;
}
/* look up the message log for this sender */
log = NULL ;
for ( n = 0 ; n < msg_log . size ; n + + ) {
if ( NULL = = ( lg = ( rmcast_recv_log_t * ) opal_pointer_array_get_item ( & msg_log , n ) ) ) {
continue ;
}
if ( ( name . jobid = = lg - > name . jobid & & name . vpid = = lg - > name . vpid ) & &
chan - > channel = = lg - > channel ) {
log = lg ;
break ;
}
}
if ( NULL = = log ) {
/* new sender - create a log */
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv creating new msg log for %s channel %d seq# %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
ORTE_NAME_PRINT ( & name ) , ( int ) msg - > channel - > channel , recvd_seq_num ) ) ;
log = OBJ_NEW ( rmcast_recv_log_t ) ;
log - > name . jobid = name . jobid ;
log - > name . vpid = name . vpid ;
log - > channel = chan - > channel ;
log - > seq_num = recvd_seq_num ;
opal_pointer_array_add ( & msg_log , log ) ;
goto MATCH ;
}
if ( recvd_seq_num < log - > seq_num ) {
/* this must be a repeat of an earlier message - ignore it */
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv recvd repeat msg %d (log at %d) from %s " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
recvd_seq_num , log - > seq_num , ORTE_NAME_PRINT ( & name ) ) ) ;
goto cleanup ;
}
if ( log - > seq_num ! = ( recvd_seq_num - 1 ) ) {
/* this message out of sequence - tell
* the sender the last number we got
*/
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv msg %d is out of sequence (log at %d) from %s " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
recvd_seq_num , log - > seq_num , ORTE_NAME_PRINT ( & name ) ) ) ;
/* ignore this message */
goto cleanup ;
}
/* update the seq number */
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv update msg log to %d from %s:%d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
recvd_seq_num , ORTE_NAME_PRINT ( & name ) , log - > channel ) ) ;
log - > seq_num = recvd_seq_num ;
MATCH :
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread:recv delivering message to channel %d tag %d " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , ptr - > channel , ( int ) tag ) ) ;
if ( 0 = = flag ) {
/* dealing with iovecs */
if ( NULL ! = ptr - > cbfunc_iovec ) {
ptr - > cbfunc_iovec ( ORTE_SUCCESS , ptr - > channel , tag ,
& name , iovec_array , iovec_count , ptr - > cbdata ) ;
/* if it isn't persistent, remove it */
if ( ! ( ORTE_RMCAST_PERSISTENT & ptr - > flags ) ) {
OPAL_THREAD_LOCK ( & lock ) ;
opal_list_remove_item ( & recvs , & ptr - > item ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
OBJ_RELEASE ( ptr ) ;
}
} else {
/* copy over the iovec array since it will be released by
* the blocking recv
*/
ptr - > iovec_array = ( struct iovec * ) malloc ( iovec_count * sizeof ( struct iovec ) ) ;
ptr - > iovec_count = iovec_count ;
for ( i = 0 ; i < iovec_count ; i + + ) {
ptr - > iovec_array [ i ] . iov_base = ( uint8_t * ) malloc ( iovec_array [ i ] . iov_len ) ;
ptr - > iovec_array [ i ] . iov_len = iovec_array [ i ] . iov_len ;
memcpy ( ptr - > iovec_array [ i ] . iov_base , iovec_array [ i ] . iov_base , iovec_array [ i ] . iov_len ) ;
}
/* copy the sender's name */
ptr - > name . jobid = name . jobid ;
ptr - > name . vpid = name . vpid ;
/* flag it as recvd to release blocking recv */
ptr - > recvd = true ;
}
} else {
if ( NULL ! = ptr - > cbfunc_buffer ) {
ptr - > cbfunc_buffer ( ORTE_SUCCESS , ptr - > channel , tag ,
& name , recvd_buf , ptr - > cbdata ) ;
/* if it isn't persistent, remove it */
if ( ! ( ORTE_RMCAST_PERSISTENT & ptr - > flags ) ) {
OPAL_THREAD_LOCK ( & lock ) ;
opal_list_remove_item ( & recvs , & ptr - > item ) ;
OPAL_THREAD_UNLOCK ( & lock ) ;
OBJ_RELEASE ( ptr ) ;
}
} else {
/* copy the buffer across since it will be released
* by the blocking recv
*/
ptr - > buf = OBJ_NEW ( opal_buffer_t ) ;
if ( ORTE_SUCCESS ! = ( rc = opal_dss . copy_payload ( ptr - > buf , recvd_buf ) ) ) {
ORTE_ERROR_LOG ( rc ) ;
goto cleanup ;
}
/* copy the sender's name */
ptr - > name . jobid = name . jobid ;
ptr - > name . vpid = name . vpid ;
/* flag it as recvd to release blocking recv */
ptr - > recvd = true ;
}
}
/* we are done - only one recv can match */
break ;
}
cleanup :
OBJ_RELEASE ( msg ) ;
if ( NULL ! = iovec_array ) {
for ( i = 0 ; i < iovec_count ; i + + ) {
free ( iovec_array [ i ] . iov_base ) ;
}
free ( iovec_array ) ;
}
if ( NULL ! = recvd_buf ) {
OBJ_RELEASE ( recvd_buf ) ;
}
return ;
}
2010-05-04 23:44:00 +00:00
static inline char * get_group_name ( char groups [ ] [ MAX_GROUP_NAME ] , int indx )
{
return groups [ indx ] ;
}
2010-05-02 17:29:41 +00:00
static void recv_handler ( int sd , short flags , void * cbdata )
{
uint8_t * data ;
2010-05-04 23:44:00 +00:00
int sz ;
2010-05-02 17:29:41 +00:00
rmcast_base_channel_t * chan = ( rmcast_base_channel_t * ) cbdata ;
service srvc ;
2010-05-04 02:38:37 +00:00
char sender [ MAX_GROUP_NAME ] ;
2010-05-04 23:44:00 +00:00
static void * groups ;
static int size_groups ;
int num_groups , size_data ;
2010-05-02 17:29:41 +00:00
int16 mess_type ;
int endian_mismatch ;
2010-05-04 23:44:00 +00:00
if ( ! groups ) {
size_groups = 1 ;
groups = malloc ( size_groups * MAX_GROUP_NAME ) ;
}
2010-05-02 17:29:41 +00:00
/* Read all available spread messages. */
while ( SP_poll ( sd ) > 0 ) {
2010-05-04 02:38:37 +00:00
size_data = mca_rmcast_spread_component . max_msg_size ;
data = ( uint8_t * ) malloc ( size_data * sizeof ( uint8_t ) ) ;
srvc = 0 ;
do {
2010-05-04 23:44:00 +00:00
sz = SP_receive ( sd , & srvc , sender , size_groups , & num_groups , groups , & mess_type , & endian_mismatch , size_data , ( char * ) data ) ;
2010-05-04 02:38:37 +00:00
if ( sz < 0 ) {
char error_string [ 1024 ] ;
SP_error2str ( sz , error_string ) ;
/* this shouldn't happen - report the errno */
opal_output ( 0 , " %s Error on multicast recv spread event: %s(%d:%d:%d) " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , error_string , sz , num_groups , endian_mismatch ) ;
2010-05-04 23:44:00 +00:00
2010-05-04 02:38:37 +00:00
switch ( sz ) {
2010-05-04 23:44:00 +00:00
2010-05-04 02:38:37 +00:00
case GROUPS_TOO_SHORT :
/*
2010-05-04 23:44:00 +00:00
* Number of groups required is " -num_groups " so we
* free the old groups array and malloc a new one of
* the right size ( - num_groups ) * MAX_GROUP_NAME .
2010-05-04 02:38:37 +00:00
*/
2010-05-04 23:44:00 +00:00
size_groups = - num_groups ;
free ( groups ) ;
groups = malloc ( size_groups * MAX_GROUP_NAME ) ;
2010-05-04 02:38:37 +00:00
break ;
case BUFFER_TOO_SHORT :
/*
* Size of buffer required is " -endian_mismatch " so we
* free the old data array and malloc a new one of the
* right size ( - endian_mismatch ) * sizeof ( uint8_t ) .
*/
size_data = - endian_mismatch ;
free ( data ) ;
data = ( uint8_t * ) malloc ( size_data * sizeof ( uint8_t ) ) ;
if ( ! data ) {
opal_output ( 0 , " %s Error in allocating data buffer for incoming message (%d) \n " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) , size_data ) ;
exit ( - 1 ) ;
}
break ;
case ILLEGAL_SESSION :
case ILLEGAL_MESSAGE :
case CONNECTION_CLOSED :
exit ( - 1 ) ;
}
}
} while ( sz < 0 ) ;
2010-05-04 23:44:00 +00:00
2010-05-04 02:38:37 +00:00
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread recvd %d bytes from channel %d(%s) " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
2010-05-04 23:44:00 +00:00
( int ) sz , num_groups , get_group_name ( groups , 0 ) ) ) ;
2010-05-04 02:38:37 +00:00
if ( Is_regular_mess ( srvc ) ) {
int i ;
for ( i = 0 ; i < num_groups ; i + + ) {
2010-05-04 23:44:00 +00:00
chan = get_chan_from_name ( get_group_name ( groups , i ) ) ;
2010-05-04 02:38:37 +00:00
if ( chan ) {
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread recvd %d bytes from channel %d(%s) " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
( int ) sz , ( int ) chan - > channel , chan - > name ) ) ;
/* clear the way for the next message */
ORTE_MULTICAST_MESSAGE_EVENT ( data , sz , chan , process_recv ) ;
} else {
/*
* We ' ve just received a message on a channel whose name
* we don ' t recognize , so log a message and drop the
* message .
*/
OPAL_OUTPUT_VERBOSE ( ( 2 , orte_rmcast_base . rmcast_output ,
" %s rmcast:spread recvd %d bytes from unknown channel named (%s) " ,
ORTE_NAME_PRINT ( ORTE_PROC_MY_NAME ) ,
2010-05-04 23:44:00 +00:00
( int ) sz , get_group_name ( groups , i ) ) ) ;
2010-05-04 02:38:37 +00:00
free ( data ) ;
}
}
} else {
2010-05-04 23:44:00 +00:00
/*
2010-05-04 02:38:37 +00:00
* We ignore all membership change messages for now .
*/
free ( data ) ;
}
2010-05-02 17:29:41 +00:00
}
return ;
}