diff --git a/orte/mca/notifier/hnp/Makefile.am b/orte/mca/notifier/hnp/Makefile.am new file mode 100644 index 0000000000..3fc9656203 --- /dev/null +++ b/orte/mca/notifier/hnp/Makefile.am @@ -0,0 +1,45 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + notifier_hnp.h \ + notifier_hnp_module.c \ + notifier_hnp_recv.c \ + notifier_hnp_component.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if OMPI_BUILD_notifier_hnp_DSO +component_noinst = +component_install = mca_notifier_hnp.la +else +component_noinst = libmca_notifier_hnp.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_notifier_hnp_la_SOURCES = $(sources) +mca_notifier_hnp_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_notifier_hnp_la_SOURCES =$(sources) +libmca_notifier_hnp_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/notifier/hnp/configure.m4 b/orte/mca/notifier/hnp/configure.m4 new file mode 100644 index 0000000000..153d8a2524 --- /dev/null +++ b/orte/mca/notifier/hnp/configure.m4 @@ -0,0 +1,14 @@ +# -*- shell-script -*- +# +# Copyright (c) 2007 Sandia National Laboratories. All rights reserved. +# Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_notifier_hnp_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_notifier_hnp_CONFIG], [$1]) diff --git a/orte/mca/notifier/hnp/configure.params b/orte/mca/notifier/hnp/configure.params new file mode 100644 index 0000000000..3513f8d956 --- /dev/null +++ b/orte/mca/notifier/hnp/configure.params @@ -0,0 +1,24 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2007 Los Alamos National Security, LLC. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Specific to this module + +PARAM_CONFIG_FILES="Makefile" diff --git a/orte/mca/notifier/hnp/notifier_hnp.h b/orte/mca/notifier/hnp/notifier_hnp.h new file mode 100644 index 0000000000..06355ce835 --- /dev/null +++ b/orte/mca/notifier/hnp/notifier_hnp.h @@ -0,0 +1,53 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ +#ifndef NOTIFIER_HNP_H +#define NOTIFIER_HNP_H + +#include "orte_config.h" + +#include "orte/types.h" +#include "orte/mca/notifier/notifier.h" +#include "orte/mca/rml/rml.h" +#include "opal/class/opal_pointer_array.h" + +BEGIN_C_DECLS + +void orte_notifier_hnp_recv_cb(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata); +#if OPAL_ENABLE_DEBUG +void orte_notifier_hnp_exception_cb(const orte_process_name_t* peer, + orte_rml_exception_t reason); +#endif + +extern opal_pointer_array_t orte_notifier_hnp_tables; +extern opal_mutex_t orte_notifier_hnp_tables_lock; + +/* + * Notifier interfaces + */ + +ORTE_MODULE_DECLSPEC extern orte_notifier_base_component_t mca_notifier_hnp_component; +extern orte_notifier_base_module_t orte_notifier_hnp_module; + +END_C_DECLS + +#endif diff --git a/orte/mca/notifier/hnp/notifier_hnp_component.c b/orte/mca/notifier/hnp/notifier_hnp_component.c new file mode 100644 index 0000000000..cbc1498355 --- /dev/null +++ b/orte/mca/notifier/hnp/notifier_hnp_component.c @@ -0,0 +1,71 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/** + * @file + * + * This component proxies notification messages up to the HNP. This + * component runs in both the HNP and non-HNP processes for ease of + * selection (e.g., so you can "--mca notifier hnp" (vs. "--mca + * notifier hnp,non_hnp"). It auto-detects where it is running and + * does the Right Thing -- if it's in the HNP process, it sets up to + * receive incoming proxied messages. If it's not in the HNP, then it + * proxies all messages to the HNP. + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include "notifier_hnp.h" + + +static int orte_notifier_hnp_component_query(mca_base_module_t **module, + int *priority); + + +/* + * Struct of function pointers that need to be initialized + */ +orte_notifier_base_component_t mca_notifier_hnp_component = { + { + ORTE_NOTIFIER_BASE_VERSION_1_0_0, + + "hnp", /* MCA module name */ + ORTE_MAJOR_VERSION, /* MCA module major version */ + ORTE_MINOR_VERSION, /* MCA module minor version */ + ORTE_RELEASE_VERSION, /* MCA module release version */ + + NULL, + NULL, + orte_notifier_hnp_component_query /* module query */ + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + } +}; + +static int orte_notifier_hnp_component_query(mca_base_module_t **module, + int *priority) +{ + *priority = 10; + *module = (mca_base_module_t *)&orte_notifier_hnp_module; + return ORTE_SUCCESS; +} diff --git a/orte/mca/notifier/hnp/notifier_hnp_module.c b/orte/mca/notifier/hnp/notifier_hnp_module.c new file mode 100644 index 0000000000..d2e13a2b04 --- /dev/null +++ b/orte/mca/notifier/hnp/notifier_hnp_module.c @@ -0,0 +1,321 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include +#ifdef HAVE_SYS_TIME_H +#include +#endif /* HAVE_SYS_TIME_H */ +#ifdef HAVE_HNP_H +#include +#endif +#ifdef HAVE_STDARG_H +#include +#endif + +#include "opal/util/show_help.h" +#include "opal/util/opal_sos.h" +#include "opal/dss/dss.h" +#include "opal/dss/dss_types.h" + +#include "orte/runtime/orte_globals.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/notifier/base/base.h" +#include "notifier_hnp.h" + +/* Global variables */ +opal_pointer_array_t orte_notifier_hnp_tables; +opal_mutex_t orte_notifier_hnp_tables_lock; + +/* Static API's */ +static int init(void); +static void finalize(void); +static void mylog(orte_notifier_base_severity_t severity, int errcode, + const char *msg, va_list ap); +static void myhelplog(orte_notifier_base_severity_t severity, int errcode, + const char *filename, const char *topic, va_list ap); +static void mypeerlog(orte_notifier_base_severity_t severity, int errcode, + orte_process_name_t *peer_proc, const char *msg, + va_list ap); +static void myeventlog(const char *msg); + +/* Module definition */ +orte_notifier_base_module_t orte_notifier_hnp_module = { + init, + finalize, + mylog, + myhelplog, + mypeerlog, + myeventlog +}; + +static int send_command(orte_notifier_base_severity_t severity, int errcode, + char *msg) +{ + /* JMS: As an example, I'll pack up the severity, errcode, and + string message. I assume you'll want to pack up the whole + corresponding OPAL SOS table. */ + opal_buffer_t *buf; + int rc; + uint8_t u8 = (uint8_t) severity; + uint32_t u32 = (uint32_t) errcode; + + buf = OBJ_NEW(opal_buffer_t); + if (NULL == buf) { + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* Pack the severity (need to use a fixed-size type) */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &u8, 1, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return rc; + } + + /* Pack the errcode (need to use a fixed-size type) */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &u32, 1, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return rc; + } + + /* Pack the message */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &msg, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return rc; + } + + /* Now send the buffer (rc = number of bytes sent) */ + rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, + ORTE_RML_TAG_NOTIFIER_HNP, 0); + if (rc <= 0) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return rc; + } + + return ORTE_SUCCESS; +} + +static int opal_dss_pack_sos_error(opal_buffer_t *buf, opal_sos_error_t *error) +{ + int rc; + if (NULL == error) { + return ORTE_ERROR; + } + + /* Pack errnum */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &error->errnum, 1, OPAL_INT))) { + return rc; + } + + /* Pack the file name in which the error occurred */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, error->file, 1, OPAL_STRING))) { + return rc; + } + + /* Pack the line number on which the error was encountered */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &error->line, 1, OPAL_INT))) { + return rc; + } + + /* Pack the function name (if any) */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, error->func, 1, OPAL_STRING))) { + return rc; + } + + /* Pack the error message */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, error->msg, 1, OPAL_STRING))) { + return rc; + } + + /* Pack the pointer to the previous opal sos error object in the + opal sos table */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &error->prev, 1, OPAL_INT))) { + return rc; + } + + /* Pack the pointer to the next error */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &error->next, 1, OPAL_INT))) { + return rc; + } + + return ORTE_SUCCESS; +} + +static int opal_sos_send_table(void) +{ + opal_sos_error_t *opal_error; + opal_buffer_t *buf; + uint32_t key; + int rc; + size_t table_size; + void *prev_error, *next_error; + next_error = NULL; + + buf = OBJ_NEW(opal_buffer_t); + if (NULL == buf) { + return ORTE_ERR_OUT_OF_RESOURCE; + } + + OPAL_THREAD_LOCK(&opal_sos_table_lock); + table_size = opal_hash_table_get_size(&opal_sos_table); + + /* Pack the size of the SOS error table */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &table_size, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto error; + } + + if (OPAL_SUCCESS != opal_hash_table_get_first_key_uint32(&opal_sos_table, + &key, (void**)&opal_error, + &prev_error)) { + rc = ORTE_ERROR; + goto error; + } + + /* Pack the sos error object */ + if (ORTE_SUCCESS != (rc = opal_dss_pack_sos_error(buf, opal_error))) { + ORTE_ERROR_LOG(rc); + goto error; + } + + while (OPAL_SUCCESS == opal_hash_table_get_next_key_uint32(&opal_sos_table, + &key, (void**)&opal_error, + &prev_error, &next_error)) + { + if (ORTE_SUCCESS != (rc = opal_dss_pack_sos_error(buf, opal_error))) { + ORTE_ERROR_LOG(rc); + goto error; + } + } + OPAL_THREAD_UNLOCK(&opal_sos_table_lock); + + /* Now send the buffer (rc = number of bytes sent) */ + rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, + ORTE_RML_TAG_NOTIFIER_HNP, 0); + if (rc <= 0) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + return rc; + } + + return ORTE_SUCCESS; + +error: + OPAL_THREAD_UNLOCK(&opal_sos_table_lock); + OBJ_RELEASE(buf); + return rc; +} + +static int init(void) +{ + int rc; + + /* If I'm the HNP, post a non-blocking RML receive */ + if (ORTE_PROC_IS_HNP) { + if (ORTE_SUCCESS != + (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_NOTIFIER_HNP, + ORTE_RML_NON_PERSISTENT, + orte_notifier_hnp_recv_cb, + NULL))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + OBJ_CONSTRUCT(&orte_notifier_hnp_tables, opal_pointer_array_t); + opal_pointer_array_init(&orte_notifier_hnp_tables, + orte_process_info.num_procs, + INT32_MAX, 8); + OBJ_CONSTRUCT(&orte_notifier_hnp_tables_lock, opal_mutex_t); + +#if OPAL_ENABLE_DEBUG + /* If we're debugging, also add an exception handler -- just to + watch for problems in the RML */ + if (ORTE_SUCCESS != + (rc = orte_rml.add_exception_handler(orte_notifier_hnp_exception_cb))) { + ORTE_ERROR_LOG(rc); + orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFIER_HNP); + return rc; + } +#endif + } + + return ORTE_SUCCESS; +} + +static void finalize(void) +{ + /* If I'm the HNP, then cancel the non-blocking RML receive */ + if (ORTE_PROC_IS_HNP) { + OBJ_DESTRUCT(&orte_notifier_hnp_tables); + OBJ_DESTRUCT(&orte_notifier_hnp_tables_lock); + + orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFIER_HNP); + } +} + +static void mylog(orte_notifier_base_severity_t severity, int errcode, + const char *msg, va_list ap) +{ + char *output; + + /* If there was a message, output it */ + vasprintf(&output, msg, ap); + + if (NULL != output && !ORTE_PROC_IS_HNP) { + send_command(severity, errcode, output); + free(output); + } +} + +static void myhelplog(orte_notifier_base_severity_t severity, int errcode, + const char *filename, const char *topic, va_list ap) +{ + char *output; + + output = opal_show_help_vstring(filename, topic, false, ap); + + if (NULL != output && !ORTE_PROC_IS_HNP) { + send_command(severity, errcode, output); + free(output); + } +} + +static void mypeerlog(orte_notifier_base_severity_t severity, int errcode, + orte_process_name_t *peer_proc, const char *msg, + va_list ap) +{ + char *buf = orte_notifier_base_peer_log(errcode, peer_proc, msg, ap); + + if (NULL != buf && !ORTE_PROC_IS_HNP) { + send_command(severity, errcode, buf); + free(buf); + } +} + +static void myeventlog(const char *msg) +{ + send_command(ORTE_NOTIFIER_NOTICE, ORTE_SUCCESS, (char *)msg); +} diff --git a/orte/mca/notifier/hnp/notifier_hnp_recv.c b/orte/mca/notifier/hnp/notifier_hnp_recv.c new file mode 100644 index 0000000000..95ad79406e --- /dev/null +++ b/orte/mca/notifier/hnp/notifier_hnp_recv.c @@ -0,0 +1,263 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" + +#include "orte/constants.h" +#include "orte/runtime/orte_globals.h" +#include "orte/mca/notifier/base/base.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/util/show_help.h" +#include "opal/util/opal_sos.h" +#include "opal/class/opal_hash_table.h" + +#include "notifier_hnp.h" + +/* + * This function is called back *after* the RML receive callback to avoid + */ +static void process_msg(int fd, short event, void *cbdata) +{ + orte_message_event_t *mev = (orte_message_event_t*)cbdata; + uint8_t u8; + uint32_t u32; + int rc, count; + orte_notifier_base_severity_t severity; + int errcode; + char *msg; + + /* JMS: As an example, we packed the severity, errcode, and string + in notifier_hnp_module.c. We'll unpack it here. */ + + /* Unpack the severity */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(mev->buffer, &u8, &count, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + goto CLEAN_RETURN; + } + severity = (orte_notifier_base_severity_t) u8; + + /* Unpack the errcode */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(mev->buffer, &u32, &count, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto CLEAN_RETURN; + } + errcode = (int) u32; + + /* Unpack the string */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(mev->buffer, &msg, &count, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto CLEAN_RETURN; + } + + orte_show_help("opal_sos_reporter.txt", "notifier message", false, msg); + +CLEAN_RETURN: + /* release the message event */ + OBJ_RELEASE(mev); + return; +} + +static int opal_dss_unpack_sos_error(opal_buffer_t *buf, opal_sos_error_t *error) +{ + int count, rc; + if (NULL == error) { + return ORTE_ERROR; + } + + /* Unpack the errcode */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(buf, &error->errnum, &count, OPAL_INT))) { + return rc; + } + + /* Unpack the filename */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(buf, error->file, &count, OPAL_STRING))) { + return rc; + } + + /* Unpack the line number */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(buf, &error->line, &count, OPAL_INT))) { + return rc; + } + + /* Unpack the function name */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(buf, error->func, &count, OPAL_STRING))) { + return rc; + } + + /* Unpack the error message */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(buf, error->msg, &count, OPAL_STRING))) { + return rc; + } + + /* Unpack the pointer to the previous error */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(buf, &error->prev, &count, OPAL_INT))) { + return rc; + } + + /* Unpack the pointer to the next error */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(buf, &error->next, &count, OPAL_INT))) { + return rc; + } + + return ORTE_SUCCESS; +} + +static void process_sos_table_msg(int fd, short event, void *cbdata) +{ + orte_message_event_t *mev = (orte_message_event_t*)cbdata; + size_t table_size; + int i, rc = ORTE_SUCCESS, count, numerrors; + opal_sos_error_t *opal_error; + opal_hash_table_t *sos_table, *old_sos_table; + + /* Allocate a new SOS table */ + sos_table = OBJ_NEW(opal_hash_table_t); + if (NULL == sos_table) { + ORTE_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + OBJ_RELEASE(mev); + return; + } + + /* Unpack the size of the SOS table */ + count = 1; + if (ORTE_SUCCESS != + (rc = opal_dss.unpack(mev->buffer, &table_size, &count, OPAL_SIZE))) { + goto error; + } + numerrors = (int) table_size; + + /* Initialize the SOS table */ + opal_hash_table_init(sos_table, table_size); + + for (i = 0; i < numerrors; i++) { + + opal_error = OBJ_NEW(opal_sos_error_t); + if (NULL == opal_error) { + rc = OPAL_ERR_OUT_OF_RESOURCE; + goto error; + } + + if (ORTE_SUCCESS != + (rc = opal_dss_unpack_sos_error(mev->buffer, opal_error))) { + goto error; + } + + opal_hash_table_set_value_uint32(sos_table, + opal_error->errnum, + (void *)opal_error); + } + + /* Add this SOS table to the list of SOS tables. + If it already exists, we destroy the old table + and set the new one as the current SOS table. */ + OPAL_THREAD_LOCK(&orte_notifier_hnp_tables_lock); + if (false == + opal_pointer_array_test_and_set_item(&orte_notifier_hnp_tables, + mev->sender.vpid, + (void *)sos_table)) { + old_sos_table = opal_pointer_array_get_item(&orte_notifier_hnp_tables, + mev->sender.vpid); + OBJ_DESTRUCT(old_sos_table); + old_sos_table = NULL; + opal_pointer_array_set_item(&orte_notifier_hnp_tables, + mev->sender.vpid, + (void *)sos_table); + } + OPAL_THREAD_UNLOCK(&orte_notifier_hnp_tables_lock); + OBJ_RELEASE(mev); + return; + +error: + ORTE_ERROR_LOG(rc); + /* release the message event */ + OBJ_RELEASE(mev); + + /* destroy the sos table */ + OBJ_DESTRUCT(sos_table); + return; +} + +void orte_notifier_hnp_recv_cb(int status, orte_process_name_t* sender, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int rc; + + OPAL_OUTPUT_VERBOSE((5, orte_notifier_base_output, + "%s notifier:hnp:receive got message from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + + /* Don't process the message right away - remember that we're in a + * callback during the actual RML receive! We need to get out of + * the receive before we process the message to avoid performing + * the rest of the job while still inside this receive. Instead, + * setup an event so that the message gets processed as soon as we + * leave the receive. This avoids the "receive recursion of + * death" scenarios. + * + * The ORTE_MESSAGE_EVENT macro makes a copy of the buffer, which + * we release in the process_msg() callback - the incoming buffer, + * however, is NOT released here, although its payload IS + * transferred to the message buffer for later processing. + */ + ORTE_MESSAGE_EVENT(sender, buffer, tag, process_msg); + + /* reissue the receive, since it is non-persistent */ + if (ORTE_SUCCESS != + (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, + ORTE_RML_TAG_NOTIFIER_HNP, + ORTE_RML_NON_PERSISTENT, + orte_notifier_hnp_recv_cb, + NULL))) { + ORTE_ERROR_LOG(rc); + } +} + + +#if OPAL_ENABLE_DEBUG +void orte_notifier_hnp_exception_cb(const orte_process_name_t* peer, + orte_rml_exception_t reason) +{ + opal_output(orte_notifier_base_output, + "Notifier HNP RML receive exception from %s", + ORTE_NAME_PRINT((orte_process_name_t*)peer)); +} +#endif