1
1
openmpi/orte/mca/iof/base/iof_base_endpoint.h
Jeff Squyres 4f3a11b4db Fixes trac:967.
A bunch of fixes from the /tmp/iof-fixes branch that fix up ''some''
(but not ''all'') of the problems that we have seen with iof:

 * Reading very large files via stdin redirected to orteun (Sun saw
   this)
 * Reading a little bit of a large file redirected to orterun's stdin
   and then either closing stdin or exiting the process

The Big Change was to make the proxy iof (the one running in non-HNP
orteds) send back a "I'm closing the stream" ACK back to the service
iof.  This tells the HNP that there will be nothing more coming from
that peer, and therefore the iof forward should be removed.

Many other minor cleanups/fixes, terminology changes, and
documentation additions are included in this commit as well.  However,
there are still some pretty big outstanding issues with IOF that are
not addressed either by #967 or this commit.  A few examples:

 * IOF was designed to allow multiple subscribers to a single stream.
   We're not entirely sure that this works (for one thing, there is
   nothing in the ORTE/OMPI code base that uses this functionality).
 * There are also resources leaked when processes/jobs exit (per
   Ralph's first comment on this ticket).  
 * There is no feedback to close orterun's stdin when all subscribers
   to the corresponding stream have closed stdin.

This commit was SVN r14967.

The following Trac tickets were found above:
  Ticket 967 --> https://svn.open-mpi.org/trac/ompi/ticket/967
2007-06-08 22:59:31 +00:00

220 строки
6.5 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2007 Cisco, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _IOF_BASE_ENDPOINT_
#define _IOF_BASE_ENDPOINT_
#include "orte_config.h"
#include "opal/class/opal_list.h"
#include "opal/event/event.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/iof_base_header.h"
BEGIN_C_DECLS
/**
* Structure store callbacks
*/
struct orte_iof_base_callback_t {
opal_list_item_t super;
orte_iof_base_callback_fn_t cb_func;
void* cb_data;
};
typedef struct orte_iof_base_callback_t orte_iof_base_callback_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_base_callback_t);
/**
* Structure that represents a published endpoint.
*/
struct orte_iof_base_endpoint_t {
/** Parent */
opal_list_item_t super;
/** ORTE_IOF_SOURCE or ORTE_IOF_SINK */
orte_iof_base_mode_t ep_mode;
/** The origin process for this endpoint. Will either by myself
(i.e., it's an fd that represents a source or a sink in my
process) or another process (i.e., this process is acting as a
proxy for another process and [typically] has a pipe/fd optn
to that process to get their stdin, stdout, or stderr). */
orte_process_name_t ep_origin;
/** Predefined tags: ORTE_IOF_ANY, ORTE_IOF_STDIN, ORTE_IOF_STDOUT,
ORTE_IOF_STDERR */
int ep_tag;
/** File descriptor to read or write from (or -1 if it has been
closed */
int ep_fd;
/** Rollover byte count of what has been forwarded from the fd to
other targets */
uint32_t ep_seq;
/** Minimum byte count of what has been ACK'ed from all the targets
that are listening to this endpoint */
uint32_t ep_ack;
/** Event library event for this file descriptor */
opal_event_t ep_event;
/** Special event library event for the case of stdin */
opal_event_t ep_stdin_event;
/** The list for fragments that are in-flight from a SOURCE
endpoint */
opal_list_t ep_source_frags;
/** The list for fragments that are in-flight from a SINK
endpoint */
opal_list_t ep_sink_frags;
/** List of callbacks for subscriptions */
opal_list_t ep_callbacks;
};
typedef struct orte_iof_base_endpoint_t orte_iof_base_endpoint_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_base_endpoint_t);
/*
* Diff between two sequence numbers allowing for rollover
*/
#define ORTE_IOF_BASE_SEQDIFF(s1,s2) \
((s1 >= s2) ? (s1 - s2) : (s1 + (ULONG_MAX - s2)))
/**
* Create a local endpoint.
*
* @param name Origin process name corresponding to endpoint.
* @param mode Source or sink of data (exclusive).
* @param tag Logical tag for matching.
* @param fd Local file descriptor corresponding to endpoint. If the
* endpoint originates in this process, it'll be an fd in this
* process. If this process is acting as a proxy for another process,
* then the fd will be a pipe to that other process (e.g., the origin
* process' stdin, stdout, or stderr).
*/
ORTE_DECLSPEC int orte_iof_base_endpoint_create(
const orte_process_name_t* name,
orte_iof_base_mode_t mode,
int tag,
int fd);
/**
* Associate a callback on receipt of data.
*
* @param name Process name corresponding to endpoint.
* @param cbfunc Logical tag for matching.
* @aram cbdata Local file descriptor corresponding to endpoint.
*/
ORTE_DECLSPEC int orte_iof_base_callback_create(
const orte_process_name_t *name,
int tag,
orte_iof_base_callback_fn_t cbfunc,
void* cbdata);
ORTE_DECLSPEC int orte_iof_base_callback_delete(
const orte_process_name_t *name,
int tag);
/**
* Delete all local endpoints matching the specified origin / mask /
* tag parameters.
*
* @paran name Origin process name corresponding to one or more endpoint(s).
* @param mask Mask used for name comparisons.
* @param tag Tag for matching endpoints.
*/
ORTE_DECLSPEC int orte_iof_base_endpoint_delete(
const orte_process_name_t* name,
orte_ns_cmp_bitmask_t mask,
int tag);
/**
* Disable forwarding through the specified endpoint.
*/
ORTE_DECLSPEC int orte_iof_base_endpoint_close(
orte_iof_base_endpoint_t* endpoint);
/**
* Attempt to match an endpoint based on the origin process name /
* mask / tag.
*/
ORTE_DECLSPEC orte_iof_base_endpoint_t* orte_iof_base_endpoint_match(
const orte_process_name_t* target_name,
orte_ns_cmp_bitmask_t target_mask,
int target_tag);
/**
* Forward the specified message out the endpoint.
*/
ORTE_DECLSPEC int orte_iof_base_endpoint_forward(
orte_iof_base_endpoint_t* endpoint,
const orte_process_name_t* origin,
orte_iof_base_msg_header_t* hdr,
const unsigned char* data);
/*
* Close the file descriptor associated with an endpoint and perform
* any necessary cleanup.
*/
ORTE_DECLSPEC void orte_iof_base_endpoint_closed(
orte_iof_base_endpoint_t* endpoint);
/**
* Callback when the next set of bytes has been acknowledged.
*/
ORTE_DECLSPEC int orte_iof_base_endpoint_ack(
orte_iof_base_endpoint_t* endpoint,
uint32_t seq);
/**
* Simple check for whether we have any frags "in flight".
*
* Return "true" for SOURCEs if source_frags is not empty, indicating
* that there are frags in-flight via the RML.
*
* Return "true" for SINKs if sink_frags is not empty, indicating that
* there are pending frags for the fd that are either partially
* written or have not yet been written (because writing to the fd
* would have blocked).
*/
bool orte_iof_base_endpoint_have_pending_frags(
orte_iof_base_endpoint_t* endpoint);
/**
* Simple check for whether we have all the ACKs that we expect.
*
* Return "true" for SOURCEs if ep_seq == ep_ack.
*
* Return "true" for SINKs always; SINK endpoints don't receive ACKs.
*/
bool orte_iof_base_endpoint_have_pending_acks(
orte_iof_base_endpoint_t* endpoint);
END_C_DECLS
#endif