IOF components should not assume they will be selected when queried - thus, they should not perform init functions until after selection. Create init/finalize entry points for that purpose, and have select init the module after it has been selected.
This commit was SVN r22982.
Этот коммит содержится в:
родитель
2ecc9fc2b3
Коммит
6c379fed2e
@ -21,23 +21,19 @@
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include "opal/event/event.h"
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/mca/base/base.h"
|
||||
|
||||
#include "orte/util/proc_info.h"
|
||||
|
||||
#include "orte/mca/iof/iof.h"
|
||||
#include "orte/mca/iof/base/base.h"
|
||||
|
||||
|
||||
int orte_iof_base_close(void)
|
||||
{
|
||||
bool dump;
|
||||
opal_list_item_t *item;
|
||||
orte_iof_write_output_t *output;
|
||||
orte_iof_write_event_t *wev;
|
||||
int num_written;
|
||||
/* finalize the module */
|
||||
if (NULL != orte_iof.finalize) {
|
||||
orte_iof.finalize();
|
||||
}
|
||||
|
||||
/* shutdown any remaining opened components */
|
||||
if (0 != opal_list_get_size(&orte_iof_base.iof_components_opened)) {
|
||||
@ -46,49 +42,6 @@ int orte_iof_base_close(void)
|
||||
}
|
||||
OBJ_DESTRUCT(&orte_iof_base.iof_components_opened);
|
||||
|
||||
OPAL_THREAD_LOCK(&orte_iof_base.iof_write_output_lock);
|
||||
if (!ORTE_PROC_IS_DAEMON) {
|
||||
/* check if anything is still trying to be written out */
|
||||
wev = orte_iof_base.iof_write_stdout->wev;
|
||||
if (!opal_list_is_empty(&wev->outputs)) {
|
||||
dump = false;
|
||||
/* make one last attempt to write this out */
|
||||
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
|
||||
output = (orte_iof_write_output_t*)item;
|
||||
if (!dump) {
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
/* don't retry - just cleanout the list and dump it */
|
||||
dump = true;
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(orte_iof_base.iof_write_stdout);
|
||||
if (!orte_xml_output) {
|
||||
/* we only opened stderr channel if we are NOT doing xml output */
|
||||
wev = orte_iof_base.iof_write_stderr->wev;
|
||||
if (!opal_list_is_empty(&wev->outputs)) {
|
||||
dump = false;
|
||||
/* make one last attempt to write this out */
|
||||
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
|
||||
output = (orte_iof_write_output_t*)item;
|
||||
if (!dump) {
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
/* don't retry - just cleanout the list and dump it */
|
||||
dump = true;
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(orte_iof_base.iof_write_stderr);
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_write_output_lock);
|
||||
|
||||
OBJ_DESTRUCT(&orte_iof_base.iof_write_output_lock);
|
||||
|
||||
|
||||
|
@ -23,12 +23,13 @@
|
||||
#include "opal/mca/base/base.h"
|
||||
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
|
||||
#include "orte/mca/iof/iof.h"
|
||||
#include "orte/mca/iof/base/base.h"
|
||||
|
||||
/**
|
||||
* Call the init function on all available components to find out if
|
||||
* Call the query function on all available components to find out if
|
||||
* they want to run. Select the single component with the highest
|
||||
* priority.
|
||||
*/
|
||||
@ -36,6 +37,7 @@ int orte_iof_base_select(void)
|
||||
{
|
||||
orte_iof_base_component_t *best_component = NULL;
|
||||
orte_iof_base_module_t *best_module = NULL;
|
||||
int rc;
|
||||
|
||||
/*
|
||||
* Select the best component
|
||||
@ -54,6 +56,13 @@ int orte_iof_base_select(void)
|
||||
|
||||
/* Save the winner */
|
||||
orte_iof = *best_module;
|
||||
/* init it */
|
||||
if (NULL != orte_iof.init) {
|
||||
if (ORTE_SUCCESS != (rc = orte_iof.init())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -40,6 +40,7 @@
|
||||
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/mca/odls/odls_types.h"
|
||||
|
||||
@ -49,8 +50,12 @@
|
||||
/* LOCAL FUNCTIONS */
|
||||
static void stdin_write_handler(int fd, short event, void *cbdata);
|
||||
|
||||
static void
|
||||
orte_iof_hnp_exception_handler(const orte_process_name_t* peer, orte_rml_exception_t reason);
|
||||
|
||||
/* API FUNCTIONS */
|
||||
static int init(void);
|
||||
|
||||
static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
|
||||
|
||||
static int hnp_pull(const orte_process_name_t* src_name,
|
||||
@ -60,6 +65,8 @@ static int hnp_pull(const orte_process_name_t* src_name,
|
||||
static int hnp_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag);
|
||||
|
||||
static int finalize(void);
|
||||
|
||||
static int hnp_ft_event(int state);
|
||||
|
||||
/* The API's in this module are solely used to support LOCAL
|
||||
@ -69,12 +76,45 @@ static int hnp_ft_event(int state);
|
||||
*/
|
||||
|
||||
orte_iof_base_module_t orte_iof_hnp_module = {
|
||||
init,
|
||||
hnp_push,
|
||||
hnp_pull,
|
||||
hnp_close,
|
||||
finalize,
|
||||
hnp_ft_event
|
||||
};
|
||||
|
||||
/* Initialize the module */
|
||||
static int init(void)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* post non-blocking recv to catch forwarded IO from
|
||||
* the orteds
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_IOF_HNP,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
orte_iof_hnp_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.add_exception_handler(orte_iof_hnp_exception_handler))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
|
||||
mca_iof_hnp_component.stdinev = NULL;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* Setup to read local data. If the tag is other than STDIN,
|
||||
* then this is output being pushed from one of my child processes
|
||||
@ -376,6 +416,79 @@ static int hnp_close(const orte_process_name_t* peer,
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int finalize(void)
|
||||
{
|
||||
opal_list_item_t* item;
|
||||
orte_iof_write_output_t *output;
|
||||
orte_iof_write_event_t *wev;
|
||||
int num_written;
|
||||
bool dump;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
|
||||
|
||||
OPAL_THREAD_LOCK(&orte_iof_base.iof_write_output_lock);
|
||||
/* check if anything is still trying to be written out */
|
||||
wev = orte_iof_base.iof_write_stdout->wev;
|
||||
if (!opal_list_is_empty(&wev->outputs)) {
|
||||
dump = false;
|
||||
/* make one last attempt to write this out */
|
||||
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
|
||||
output = (orte_iof_write_output_t*)item;
|
||||
if (!dump) {
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
/* don't retry - just cleanout the list and dump it */
|
||||
dump = true;
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(orte_iof_base.iof_write_stdout);
|
||||
if (!orte_xml_output) {
|
||||
/* we only opened stderr channel if we are NOT doing xml output */
|
||||
wev = orte_iof_base.iof_write_stderr->wev;
|
||||
if (!opal_list_is_empty(&wev->outputs)) {
|
||||
dump = false;
|
||||
/* make one last attempt to write this out */
|
||||
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
|
||||
output = (orte_iof_write_output_t*)item;
|
||||
if (!dump) {
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
/* don't retry - just cleanout the list and dump it */
|
||||
dump = true;
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(orte_iof_base.iof_write_stderr);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_write_output_lock);
|
||||
|
||||
/* if the stdin event is active, delete it */
|
||||
if (NULL != mca_iof_hnp_component.stdinev) {
|
||||
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
|
||||
}
|
||||
/* cleanout all registered sinks */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.sinks);
|
||||
/* cleanout all pending proc objects holding receive events */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.procs)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
|
||||
/* release and cleanup the lock */
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.lock);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
int hnp_ft_event(int state) {
|
||||
/*
|
||||
* Replica doesn't need to do anything for a checkpoint
|
||||
@ -497,3 +610,37 @@ DEPART:
|
||||
/* unlock and go */
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback when peer is disconnected
|
||||
*/
|
||||
|
||||
static void
|
||||
orte_iof_hnp_exception_handler(const orte_process_name_t* peer, orte_rml_exception_t reason)
|
||||
{
|
||||
#if 0
|
||||
orte_iof_base_endpoint_t *endpoint;
|
||||
opal_output_verbose(1, orte_iof_base.iof_output,
|
||||
"iof svc exception handler! %s\n",
|
||||
ORTE_NAME_PRINT((orte_process_name_t*)peer));
|
||||
|
||||
/* If we detect an exception on the RML connection to a peer,
|
||||
delete all of its subscriptions and publications. Note that
|
||||
exceptions can be detected during a normal RML shutdown; they
|
||||
are recoverable events (no need to abort). */
|
||||
orte_iof_hnp_sub_delete_all(peer);
|
||||
orte_iof_hnp_pub_delete_all(peer);
|
||||
opal_output_verbose(1, orte_iof_base.iof_output, "deleted all pubs and subs\n");
|
||||
|
||||
/* Find any streams on any endpoints for this peer and close them */
|
||||
while (NULL !=
|
||||
(endpoint = orte_iof_base_endpoint_match(peer, ORTE_NS_CMP_ALL,
|
||||
ORTE_IOF_ANY))) {
|
||||
orte_iof_base_endpoint_closed(endpoint);
|
||||
|
||||
/* Delete the endpoint that we just matched */
|
||||
orte_iof_base_endpoint_delete(peer, ORTE_NS_CMP_ALL, ORTE_IOF_ANY);
|
||||
}
|
||||
#endif
|
||||
opal_output_verbose(1, orte_iof_base.iof_output, "done with exception handler\n");
|
||||
}
|
||||
|
@ -25,10 +25,6 @@
|
||||
#include "opal/event/event.h"
|
||||
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/rml/rml_types.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
#include "orte/mca/iof/base/base.h"
|
||||
#include "iof_hnp.h"
|
||||
@ -39,13 +35,6 @@
|
||||
static int orte_iof_hnp_open(void);
|
||||
static int orte_iof_hnp_close(void);
|
||||
static int orte_iof_hnp_query(mca_base_module_t **module, int *priority);
|
||||
static void
|
||||
orte_iof_hnp_exception_handler(const orte_process_name_t* peer, orte_rml_exception_t reason);
|
||||
|
||||
/*
|
||||
* Local variables
|
||||
*/
|
||||
static bool initialized = false;
|
||||
|
||||
/*
|
||||
* Public string showing the iof hnp component version number
|
||||
@ -91,30 +80,6 @@ static int orte_iof_hnp_open(void)
|
||||
|
||||
static int orte_iof_hnp_close(void)
|
||||
{
|
||||
opal_list_item_t* item;
|
||||
|
||||
if (initialized) {
|
||||
OPAL_THREAD_LOCK(&mca_iof_hnp_component.lock);
|
||||
/* if the stdin event is active, delete it */
|
||||
if (NULL != mca_iof_hnp_component.stdinev) {
|
||||
OBJ_RELEASE(mca_iof_hnp_component.stdinev);
|
||||
}
|
||||
/* cleanout all registered sinks */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.sinks)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.sinks);
|
||||
/* cleanout all pending proc objects holding receive events */
|
||||
while ((item = opal_list_remove_first(&mca_iof_hnp_component.procs)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
|
||||
/* release and cleanup the lock */
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_hnp_component.lock);
|
||||
OBJ_DESTRUCT(&mca_iof_hnp_component.lock);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -124,81 +89,15 @@ static int orte_iof_hnp_close(void)
|
||||
|
||||
static int orte_iof_hnp_query(mca_base_module_t **module, int *priority)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* set default */
|
||||
*module = NULL;
|
||||
*priority = -1;
|
||||
|
||||
/* if we are not the HNP, then don't use this module */
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
*priority = -1;
|
||||
*module = NULL;
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* post non-blocking recv to catch forwarded IO from
|
||||
* the orteds
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_IOF_HNP,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
orte_iof_hnp_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.add_exception_handler(orte_iof_hnp_exception_handler))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_hnp_component.procs, opal_list_t);
|
||||
mca_iof_hnp_component.stdinev = NULL;
|
||||
|
||||
/* we must be selected */
|
||||
*priority = 100;
|
||||
*module = (mca_base_module_t *) &orte_iof_hnp_module;
|
||||
initialized = true;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Callback when peer is disconnected
|
||||
*/
|
||||
|
||||
static void
|
||||
orte_iof_hnp_exception_handler(const orte_process_name_t* peer, orte_rml_exception_t reason)
|
||||
{
|
||||
#if 0
|
||||
orte_iof_base_endpoint_t *endpoint;
|
||||
opal_output_verbose(1, orte_iof_base.iof_output,
|
||||
"iof svc exception handler! %s\n",
|
||||
ORTE_NAME_PRINT((orte_process_name_t*)peer));
|
||||
|
||||
/* If we detect an exception on the RML connection to a peer,
|
||||
delete all of its subscriptions and publications. Note that
|
||||
exceptions can be detected during a normal RML shutdown; they
|
||||
are recoverable events (no need to abort). */
|
||||
orte_iof_hnp_sub_delete_all(peer);
|
||||
orte_iof_hnp_pub_delete_all(peer);
|
||||
opal_output_verbose(1, orte_iof_base.iof_output, "deleted all pubs and subs\n");
|
||||
|
||||
/* Find any streams on any endpoints for this peer and close them */
|
||||
while (NULL !=
|
||||
(endpoint = orte_iof_base_endpoint_match(peer, ORTE_NS_CMP_ALL,
|
||||
ORTE_IOF_ANY))) {
|
||||
orte_iof_base_endpoint_closed(endpoint);
|
||||
|
||||
/* Delete the endpoint that we just matched */
|
||||
orte_iof_base_endpoint_delete(peer, ORTE_NS_CMP_ALL, ORTE_IOF_ANY);
|
||||
}
|
||||
#endif
|
||||
opal_output_verbose(1, orte_iof_base.iof_output, "done with exception handler\n");
|
||||
}
|
||||
|
@ -124,7 +124,9 @@
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
/* Predefined tag values */
|
||||
/* Initialize the selected module */
|
||||
typedef int (*orte_iof_base_init_fn_t)(void);
|
||||
|
||||
/**
|
||||
* Explicitly push data from the specified input file descriptor to
|
||||
* the stdin of the indicated peer(s). The provided peer name can
|
||||
@ -155,6 +157,9 @@ typedef int (*orte_iof_base_pull_fn_t)(const orte_process_name_t* peer,
|
||||
typedef int (*orte_iof_base_close_fn_t)(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag);
|
||||
|
||||
/* finalize the selected module */
|
||||
typedef int (*orte_iof_base_finalize_fn_t)(void);
|
||||
|
||||
/**
|
||||
* FT Event Notification
|
||||
*/
|
||||
@ -164,9 +169,11 @@ typedef int (*orte_iof_base_ft_event_fn_t)(int state);
|
||||
* IOF module.
|
||||
*/
|
||||
struct orte_iof_base_module_2_0_0_t {
|
||||
orte_iof_base_init_fn_t init;
|
||||
orte_iof_base_push_fn_t push;
|
||||
orte_iof_base_pull_fn_t pull;
|
||||
orte_iof_base_close_fn_t close;
|
||||
orte_iof_base_finalize_fn_t finalize;
|
||||
orte_iof_base_ft_event_fn_t ft_event;
|
||||
};
|
||||
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/odls/odls_types.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
|
||||
#include "orte/mca/iof/iof.h"
|
||||
#include "orte/mca/iof/base/base.h"
|
||||
@ -54,6 +55,8 @@ static void stdin_write_handler(int fd, short event, void *cbdata);
|
||||
|
||||
|
||||
/* API FUNCTIONS */
|
||||
static int init(void);
|
||||
|
||||
static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
|
||||
|
||||
static int orted_pull(const orte_process_name_t* src_name,
|
||||
@ -63,6 +66,8 @@ static int orted_pull(const orte_process_name_t* src_name,
|
||||
static int orted_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag);
|
||||
|
||||
static int finalize(void);
|
||||
|
||||
static int orted_ft_event(int state);
|
||||
|
||||
/* The API's in this module are solely used to support LOCAL
|
||||
@ -74,12 +79,38 @@ static int orted_ft_event(int state);
|
||||
*/
|
||||
|
||||
orte_iof_base_module_t orte_iof_orted_module = {
|
||||
init,
|
||||
orted_push,
|
||||
orted_pull,
|
||||
orted_close,
|
||||
finalize,
|
||||
orted_ft_event
|
||||
};
|
||||
|
||||
static int init(void)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* post a non-blocking RML receive to get messages
|
||||
from the HNP IOF component */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_IOF_PROXY,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
orte_iof_orted_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
/* setup the local global variables */
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
|
||||
mca_iof_orted_component.xoff = false;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Push data from the specified file descriptor
|
||||
@ -278,6 +309,26 @@ static int orted_close(const orte_process_name_t* peer,
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int finalize(void)
|
||||
{
|
||||
int rc;
|
||||
opal_list_item_t *item;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.sinks)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.sinks);
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.procs);
|
||||
/* Cancel the RML receive */
|
||||
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.lock);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
* FT event
|
||||
|
@ -50,6 +50,8 @@
|
||||
|
||||
#include "opal/class/opal_list.h"
|
||||
|
||||
#include "orte/mca/rml/rml_types.h"
|
||||
|
||||
#include "orte/mca/iof/iof.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
@ -22,11 +22,7 @@
|
||||
#include "opal/mca/base/base.h"
|
||||
#include "opal/mca/base/mca_base_param.h"
|
||||
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/rml/rml_types.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
#include "iof_orted.h"
|
||||
|
||||
@ -38,11 +34,6 @@ static int orte_iof_orted_close(void);
|
||||
static int orte_iof_orted_query(mca_base_module_t **module, int *priority);
|
||||
|
||||
|
||||
/*
|
||||
* Local variables
|
||||
*/
|
||||
static bool initialized = false;
|
||||
|
||||
/*
|
||||
* Public string showing the iof orted component version number
|
||||
*/
|
||||
@ -83,63 +74,21 @@ static int orte_iof_orted_open(void)
|
||||
|
||||
static int orte_iof_orted_close(void)
|
||||
{
|
||||
int rc = ORTE_SUCCESS;
|
||||
opal_list_item_t *item;
|
||||
|
||||
if (initialized) {
|
||||
OPAL_THREAD_LOCK(&mca_iof_orted_component.lock);
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.sinks)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.sinks);
|
||||
while ((item = opal_list_remove_first(&mca_iof_orted_component.procs)) != NULL) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.procs);
|
||||
/* Cancel the RML receive */
|
||||
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_orted_component.lock);
|
||||
OBJ_DESTRUCT(&mca_iof_orted_component.lock);
|
||||
}
|
||||
return rc;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int orte_iof_orted_query(mca_base_module_t **module, int *priority)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* set default */
|
||||
*module = NULL;
|
||||
*priority = -1;
|
||||
|
||||
/* if we are not a daemon, then don't use this module */
|
||||
if (!ORTE_PROC_IS_DAEMON) {
|
||||
*module = NULL;
|
||||
*priority = -1;
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* post a non-blocking RML receive to get messages
|
||||
from the HNP IOF component */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_IOF_PROXY,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
orte_iof_orted_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
/* setup the local global variables */
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.sinks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_iof_orted_component.procs, opal_list_t);
|
||||
mca_iof_orted_component.xoff = false;
|
||||
|
||||
/* we must be selected */
|
||||
*priority = 100;
|
||||
*module = (mca_base_module_t *) &orte_iof_orted_module;
|
||||
initialized = true;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -41,6 +41,7 @@
|
||||
|
||||
#include "iof_tool.h"
|
||||
|
||||
static int init(void);
|
||||
|
||||
static int tool_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag, int fd);
|
||||
|
||||
@ -51,16 +52,42 @@ static int tool_pull(const orte_process_name_t* src_name,
|
||||
static int tool_close(const orte_process_name_t* peer,
|
||||
orte_iof_tag_t source_tag);
|
||||
|
||||
static int finalize(void);
|
||||
|
||||
static int tool_ft_event(int state);
|
||||
|
||||
orte_iof_base_module_t orte_iof_tool_module = {
|
||||
init,
|
||||
tool_push,
|
||||
tool_pull,
|
||||
tool_close,
|
||||
finalize,
|
||||
tool_ft_event
|
||||
};
|
||||
|
||||
|
||||
static int init(void)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* post a non-blocking RML receive to get messages
|
||||
from the HNP IOF component */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_IOF_PROXY,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
orte_iof_tool_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&mca_iof_tool_component.lock, opal_mutex_t);
|
||||
mca_iof_tool_component.closed = false;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Push data from the specified file descriptor
|
||||
* to the indicated SINK set of peers.
|
||||
@ -197,6 +224,65 @@ static int tool_close(const orte_process_name_t* src_name,
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int finalize(void)
|
||||
{
|
||||
int rc;
|
||||
opal_list_item_t* item;
|
||||
orte_iof_write_output_t *output;
|
||||
orte_iof_write_event_t *wev;
|
||||
int num_written;
|
||||
bool dump;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_iof_tool_component.lock);
|
||||
|
||||
OPAL_THREAD_LOCK(&orte_iof_base.iof_write_output_lock);
|
||||
/* check if anything is still trying to be written out */
|
||||
wev = orte_iof_base.iof_write_stdout->wev;
|
||||
if (!opal_list_is_empty(&wev->outputs)) {
|
||||
dump = false;
|
||||
/* make one last attempt to write this out */
|
||||
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
|
||||
output = (orte_iof_write_output_t*)item;
|
||||
if (!dump) {
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
/* don't retry - just cleanout the list and dump it */
|
||||
dump = true;
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(orte_iof_base.iof_write_stdout);
|
||||
if (!orte_xml_output) {
|
||||
/* we only opened stderr channel if we are NOT doing xml output */
|
||||
wev = orte_iof_base.iof_write_stderr->wev;
|
||||
if (!opal_list_is_empty(&wev->outputs)) {
|
||||
dump = false;
|
||||
/* make one last attempt to write this out */
|
||||
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
|
||||
output = (orte_iof_write_output_t*)item;
|
||||
if (!dump) {
|
||||
num_written = write(wev->fd, output->data, output->numbytes);
|
||||
if (num_written < output->numbytes) {
|
||||
/* don't retry - just cleanout the list and dump it */
|
||||
dump = true;
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(output);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(orte_iof_base.iof_write_stderr);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_write_output_lock);
|
||||
|
||||
/* Cancel the RML receive */
|
||||
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_tool_component.lock);
|
||||
OBJ_DESTRUCT(&mca_iof_tool_component.lock);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
* FT event
|
||||
|
@ -38,11 +38,6 @@ static int orte_iof_tool_close(void);
|
||||
static int orte_iof_tool_query(mca_base_module_t **module, int *priority);
|
||||
|
||||
|
||||
/*
|
||||
* Local variables
|
||||
*/
|
||||
static bool initialized = false;
|
||||
|
||||
/*
|
||||
* Public string showing the iof tool component version number
|
||||
*/
|
||||
@ -83,51 +78,21 @@ static int orte_iof_tool_open(void)
|
||||
|
||||
static int orte_iof_tool_close(void)
|
||||
{
|
||||
int rc = ORTE_SUCCESS;
|
||||
|
||||
if (initialized) {
|
||||
OPAL_THREAD_LOCK(&mca_iof_tool_component.lock);
|
||||
/* Cancel the RML receive */
|
||||
rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
|
||||
OPAL_THREAD_UNLOCK(&mca_iof_tool_component.lock);
|
||||
OBJ_DESTRUCT(&mca_iof_tool_component.lock);
|
||||
}
|
||||
return rc;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int orte_iof_tool_query(mca_base_module_t **module, int *priority)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* set default */
|
||||
*module = NULL;
|
||||
*priority = -1;
|
||||
|
||||
/* if we are not a tool, then don't use this module */
|
||||
if (!ORTE_PROC_IS_TOOL) {
|
||||
*module = NULL;
|
||||
*priority = -1;
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
/* post a non-blocking RML receive to get messages
|
||||
from the HNP IOF component */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_IOF_PROXY,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
orte_iof_tool_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&mca_iof_tool_component.lock, opal_mutex_t);
|
||||
mca_iof_tool_component.closed = false;
|
||||
|
||||
/* we must be selected */
|
||||
*priority = 100;
|
||||
*module = (mca_base_module_t *) &orte_iof_tool_module;
|
||||
initialized = true;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user