9613b3176c
After much work by Jeff and myself, and quite a lot of discussion, it has become clear that we simply cannot resolve the infinite loops caused by RML-involved subsystems calling orte_output. The original rationale for the change to orte_output has also been reduced by shifting the output of XML-formatted vs human readable messages to an alternative approach. I have globally replaced the orte_output/ORTE_OUTPUT calls in the code base, as well as the corresponding .h file name. I have test compiled and run this on the various environments within my reach, so hopefully this will prove minimally disruptive. This commit was SVN r18619.
374 строки
9.6 KiB
C
374 строки
9.6 KiB
C
/*
|
|
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2007 Cisco, Inc. All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
#include "orte/constants.h"
|
|
|
|
#include <errno.h>
|
|
#ifdef HAVE_UNISTD_H
|
|
#include <unistd.h>
|
|
#endif /* HAVE_UNISTD_H */
|
|
#ifdef HAVE_STRING_H
|
|
#include <string.h>
|
|
#endif /* HAVE_STRING_H */
|
|
|
|
#include "orte/util/show_help.h"
|
|
|
|
#include "orte/mca/iof/iof.h"
|
|
#include "orte/mca/rml/rml.h"
|
|
#include "orte/mca/rml/rml_types.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/runtime/orte_globals.h"
|
|
|
|
#include "orte/mca/iof/iof.h"
|
|
#include "orte/mca/iof/base/base.h"
|
|
#include "orte/mca/iof/base/iof_base_endpoint.h"
|
|
|
|
#include "iof_proxy.h"
|
|
#include "iof_proxy_svc.h"
|
|
|
|
|
|
orte_iof_base_module_t orte_iof_proxy_module = {
|
|
orte_iof_proxy_publish,
|
|
orte_iof_proxy_unpublish,
|
|
orte_iof_proxy_subscribe,
|
|
orte_iof_proxy_unsubscribe,
|
|
orte_iof_proxy_push,
|
|
orte_iof_proxy_pull,
|
|
orte_iof_base_flush,
|
|
orte_iof_proxy_finalize,
|
|
orte_iof_proxy_ft_event
|
|
};
|
|
|
|
/*
|
|
* Finalize module; nothing to do
|
|
*/
|
|
|
|
int orte_iof_proxy_finalize(void )
|
|
{
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* Create an endpoint for a local file descriptor and "publish" it
|
|
* under the name of the origin process. If the publish mode is a
|
|
* SINK, then create a publication entry for it so that incoming
|
|
* messages can be forwarded to it.
|
|
*
|
|
* SOURCEs do not need to create publication records because a) the
|
|
* endpoint will automatically wake up the event engine and read off
|
|
* the fd whenever there is data available, and b) this data is then
|
|
* automatically sent to the iof svc component for possible
|
|
* forwarding.
|
|
*/
|
|
|
|
int orte_iof_proxy_publish(
|
|
const orte_process_name_t* origin,
|
|
orte_iof_base_mode_t mode,
|
|
orte_iof_base_tag_t tag,
|
|
int fd)
|
|
{
|
|
int rc;
|
|
|
|
if (orte_iof_base.iof_output >= 0) {
|
|
char* name_str;
|
|
orte_util_convert_process_name_to_string(&name_str, origin);
|
|
opal_output_verbose(1, orte_iof_base.iof_output,
|
|
"orte_iof_proxy_publish(%s,%d,%d,%d)\n",
|
|
name_str, mode, tag, fd);
|
|
free(name_str);
|
|
}
|
|
|
|
rc = orte_iof_base_endpoint_create(
|
|
origin,
|
|
mode,
|
|
tag,
|
|
fd);
|
|
if (ORTE_SUCCESS != rc) {
|
|
return rc;
|
|
}
|
|
|
|
/* publish to server */
|
|
if (ORTE_IOF_SINK == mode) {
|
|
rc = orte_iof_proxy_svc_publish(origin, tag);
|
|
if (rc != ORTE_SUCCESS) {
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
|
|
/**
|
|
* Remove all registrations matching the specified origin process
|
|
* name, mask and tag values.
|
|
*/
|
|
|
|
int orte_iof_proxy_unpublish(
|
|
const orte_process_name_t* origin,
|
|
orte_ns_cmp_bitmask_t mask,
|
|
orte_iof_base_tag_t tag)
|
|
{
|
|
int rc;
|
|
|
|
#if 0
|
|
{
|
|
int i = 0;
|
|
opal_output_verbose(1, orte_iof_base.iof_output, "%s orted: ******** ABOUT TO IOF PROXY UNPUBLISH, %d", ORTE_NAME_PRINT(orte_process_info.my_name), getpid());
|
|
fflush(stderr);
|
|
while (0 == i) sleep(5);
|
|
}
|
|
#endif
|
|
|
|
/* cleanup server */
|
|
orte_iof_proxy_svc_unpublish(
|
|
origin,
|
|
mask,
|
|
tag);
|
|
|
|
/* delete local endpoint. Note that the endpoint may have already
|
|
been deleted (e.g., if some entity noticed that the fd closed
|
|
and called orte_iof_base_endpoint_delete on the corresopnding
|
|
endpoint already). So if we get NOT_FOUND, ignore that error
|
|
-- the end result is what we want: the endpoint is deleted when
|
|
we return. */
|
|
rc = orte_iof_base_endpoint_delete(
|
|
origin,
|
|
mask,
|
|
tag);
|
|
if (ORTE_ERR_NOT_FOUND == rc || ORTE_SUCCESS == rc) {
|
|
return ORTE_SUCCESS;
|
|
} else {
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Explicitly push data from the specified file descriptor
|
|
* to the indicated SINK set of peers.
|
|
*/
|
|
|
|
int orte_iof_proxy_push(
|
|
const orte_process_name_t* sink_name,
|
|
orte_ns_cmp_bitmask_t sink_mask,
|
|
orte_iof_base_tag_t sink_tag,
|
|
int fd)
|
|
{
|
|
int rc;
|
|
|
|
/* setup a local endpoint to reflect registration. Do this before
|
|
we send the subscription to the server in case a callback
|
|
occurs *while* we are sending the subscription request. */
|
|
rc = orte_iof_base_endpoint_create(
|
|
ORTE_PROC_MY_NAME,
|
|
ORTE_IOF_SOURCE,
|
|
sink_tag,
|
|
fd);
|
|
if (ORTE_SUCCESS != rc) {
|
|
return rc;
|
|
}
|
|
|
|
/* send a subscription to server on behalf of the destination */
|
|
rc = orte_iof_proxy_svc_subscribe(
|
|
ORTE_PROC_MY_NAME,
|
|
ORTE_NS_CMP_ALL,
|
|
sink_tag,
|
|
sink_name,
|
|
sink_mask,
|
|
sink_tag
|
|
);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Explicitly pull data from the specified set of SOURCE peers and
|
|
* dump to the indicated file descriptor.
|
|
*/
|
|
|
|
int orte_iof_proxy_pull(
|
|
const orte_process_name_t* source_name,
|
|
orte_ns_cmp_bitmask_t source_mask,
|
|
orte_iof_base_tag_t source_tag,
|
|
int fd)
|
|
{
|
|
/* setup a local endpoint */
|
|
int rc;
|
|
rc = orte_iof_base_endpoint_create(
|
|
ORTE_PROC_MY_NAME,
|
|
ORTE_IOF_SINK,
|
|
source_tag,
|
|
fd);
|
|
if (ORTE_SUCCESS != rc) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
/* publish this endpoint */
|
|
rc = orte_iof_proxy_svc_publish(
|
|
ORTE_PROC_MY_NAME,
|
|
source_tag);
|
|
if (ORTE_SUCCESS != rc) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
/* subscribe to peer */
|
|
rc = orte_iof_proxy_svc_subscribe(
|
|
source_name,
|
|
source_mask,
|
|
source_tag,
|
|
ORTE_PROC_MY_NAME,
|
|
ORTE_NS_CMP_ALL,
|
|
source_tag);
|
|
if (ORTE_SUCCESS != rc) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
/*
|
|
* Subscribe to receive a callback on receipt of data
|
|
* from a specified set of origin peers.
|
|
*/
|
|
|
|
int orte_iof_proxy_subscribe(
|
|
const orte_process_name_t* origin_name,
|
|
orte_ns_cmp_bitmask_t origin_mask,
|
|
orte_iof_base_tag_t origin_tag,
|
|
orte_iof_base_callback_fn_t cbfunc,
|
|
void* cbdata)
|
|
{
|
|
int rc;
|
|
|
|
/* create a local registration to reflect the callback */
|
|
rc = orte_iof_base_callback_create(ORTE_PROC_MY_NAME,origin_tag,cbfunc,cbdata);
|
|
if (ORTE_SUCCESS != rc) {
|
|
return rc;
|
|
}
|
|
|
|
/* send a subscription message to the service */
|
|
rc = orte_iof_proxy_svc_subscribe(
|
|
origin_name,
|
|
origin_mask,
|
|
origin_tag,
|
|
ORTE_PROC_MY_NAME,
|
|
ORTE_NS_CMP_ALL,
|
|
origin_tag);
|
|
return rc;
|
|
}
|
|
|
|
/*
|
|
* Remove a subscription
|
|
*/
|
|
|
|
int orte_iof_proxy_unsubscribe(
|
|
const orte_process_name_t* origin_name,
|
|
orte_ns_cmp_bitmask_t origin_mask,
|
|
orte_iof_base_tag_t origin_tag)
|
|
{
|
|
int rc;
|
|
|
|
/* send an unsubscribe message to the service */
|
|
rc = orte_iof_proxy_svc_unsubscribe(
|
|
origin_name,
|
|
origin_mask,
|
|
origin_tag,
|
|
ORTE_PROC_MY_NAME,
|
|
ORTE_NS_CMP_ALL,
|
|
origin_tag);
|
|
if (ORTE_SUCCESS != rc) {
|
|
return rc;
|
|
}
|
|
|
|
/* remove local callback */
|
|
return orte_iof_base_callback_delete(ORTE_PROC_MY_NAME,origin_tag);
|
|
}
|
|
|
|
/*
|
|
* FT event
|
|
*/
|
|
|
|
int orte_iof_proxy_ft_event(int state) {
|
|
int ret, exit_status = ORTE_SUCCESS;
|
|
|
|
if(OPAL_CRS_CHECKPOINT == state) {
|
|
/*
|
|
* Flush
|
|
*/
|
|
if( ORTE_SUCCESS != (ret = orte_iof_base_flush() ) ) {
|
|
return ret;
|
|
}
|
|
|
|
/*
|
|
* Stop receiving events
|
|
*/
|
|
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_SVC);
|
|
}
|
|
else if(OPAL_CRS_CONTINUE == state) {
|
|
/*
|
|
* Restart Receiving events
|
|
*/
|
|
if(ORTE_SUCCESS != (ret = orte_rml.recv_nb(
|
|
ORTE_NAME_WILDCARD,
|
|
mca_iof_proxy_component.proxy_iov,
|
|
1,
|
|
ORTE_RML_TAG_IOF_SVC,
|
|
ORTE_RML_ALLOC|ORTE_RML_PERSISTENT,
|
|
orte_iof_proxy_svc_recv,
|
|
NULL
|
|
) ) ) {
|
|
exit_status = ret;
|
|
goto cleanup;
|
|
}
|
|
|
|
}
|
|
else if(OPAL_CRS_RESTART == state) {
|
|
/*
|
|
* Restart Receiving events
|
|
*/
|
|
if(ORTE_SUCCESS != (ret = orte_rml.recv_nb(
|
|
ORTE_NAME_WILDCARD,
|
|
mca_iof_proxy_component.proxy_iov,
|
|
1,
|
|
ORTE_RML_TAG_IOF_SVC,
|
|
ORTE_RML_ALLOC|ORTE_RML_PERSISTENT,
|
|
orte_iof_proxy_svc_recv,
|
|
NULL
|
|
) ) ) {
|
|
exit_status = ret;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
else if(OPAL_CRS_TERM == state ) {
|
|
;
|
|
}
|
|
else {
|
|
;
|
|
}
|
|
|
|
cleanup:
|
|
return exit_status;
|
|
}
|