1
1
openmpi/orte/mca/iof/proxy/iof_proxy.c
Josh Hursey dadca7da88 Merging in the jjhursey-ft-cr-stable branch (r13912 : HEAD).
This merge adds Checkpoint/Restart support to Open MPI. The initial
frameworks and components support a LAM/MPI-like implementation.

This commit follows the risk assessment presented to the Open MPI core
development group on Feb. 22, 2007.

This commit closes trac:158

More details to follow.

This commit was SVN r14051.

The following SVN revisions from the original message are invalid or
inconsistent and therefore were not cross-referenced:
  r13912

The following Trac tickets were found above:
  Ticket 158 --> https://svn.open-mpi.org/trac/ompi/ticket/158
2007-03-16 23:11:45 +00:00

353 строки
8.9 KiB
C

/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#ifdef HAVE_STRING_H
#include <string.h>
#endif /* HAVE_STRING_H */
#include "orte/orte_constants.h"
#include "opal/util/output.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/iof/base/base.h"
#include "orte/mca/iof/base/iof_base_endpoint.h"
#include "orte/mca/errmgr/errmgr.h"
#include "iof_proxy.h"
#include "iof_proxy_svc.h"
orte_iof_base_module_t orte_iof_proxy_module = {
orte_iof_proxy_publish,
orte_iof_proxy_unpublish,
orte_iof_proxy_push,
orte_iof_proxy_pull,
orte_iof_proxy_subscribe,
orte_iof_proxy_unsubscribe,
orte_iof_base_flush,
orte_iof_proxy_finalize,
orte_iof_proxy_ft_event
};
int orte_iof_proxy_finalize(void ) {
return ORTE_SUCCESS;
}
/**
* Publish a local file descriptor as an endpoint that is logically
* associated with the specified process name (e.g. master side of a
* pipe/pty connected to a child process)
*
* @param name
* @param mode
* @param tag
* @param fd
*
*/
int orte_iof_proxy_publish(
const orte_process_name_t* name,
orte_iof_base_mode_t mode,
orte_iof_base_tag_t tag,
int fd)
{
int rc;
if(mca_iof_proxy_component.proxy_debug > 1) {
char* name_str;
orte_ns.get_proc_name_string(&name_str, name);
opal_output(0, "orte_iof_proxy_publish(%s,%d,%d,%d)\n", name_str, mode, tag, fd);
free(name_str);
}
/* publish to server */
if(mode == ORTE_IOF_SINK) {
rc = orte_iof_proxy_svc_publish(name,tag);
if(rc != ORTE_SUCCESS)
return rc;
}
/* setup a local endpoint to reflect registration */
rc = orte_iof_base_endpoint_create(
name,
mode,
tag,
fd);
return rc;
}
/**
* Remove all registrations matching the specified process
* name, mask and tag values.
*
* @param name
* @param mask
* @param tag
*
*/
int orte_iof_proxy_unpublish(
const orte_process_name_t* name,
orte_ns_cmp_bitmask_t mask,
orte_iof_base_tag_t tag)
{
int rc;
/* cleanup server */
orte_iof_proxy_svc_unpublish(
name,
mask,
tag);
/* setup a local endpoint to reflect registration */
rc = orte_iof_base_endpoint_delete(
name,
mask,
tag);
return rc;
}
/**
* Explicitly push data from the specified file descriptor
* to the indicated set of peers.
*
* @param dst_name Name used to qualify set of peers.
* @param dst_mask Mask that specified how name is interpreted.
* @param dst_tag Match a specific peer endpoint.
* @param fd Local file descriptor.
*/
int orte_iof_proxy_push(
const orte_process_name_t* dst_name,
orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag,
int fd)
{
int rc;
/* send a subscription to server on behalf of the destination */
rc = orte_iof_proxy_svc_subscribe(
ORTE_PROC_MY_NAME,
ORTE_NS_CMP_ALL,
dst_tag,
dst_name,
dst_mask,
dst_tag
);
if(rc != ORTE_SUCCESS)
return rc;
/* setup a local endpoint to reflect registration */
rc = orte_iof_base_endpoint_create(
ORTE_PROC_MY_NAME,
ORTE_IOF_SOURCE,
dst_tag,
fd);
return rc;
}
/**
* Explicitly pull data from the specified set of peers
* and dump to the indicated file descriptor.
*
* @param dst_name Name used to qualify set of peers.
* @param dst_mask Mask that specified how name is interpreted.
* @param dst_tag Match a specific peer endpoint.
* @param fd Local file descriptor.
*/
int orte_iof_proxy_pull(
const orte_process_name_t* src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
int fd)
{
/* setup a local endpoint */
int rc;
rc = orte_iof_base_endpoint_create(
ORTE_PROC_MY_NAME,
ORTE_IOF_SINK,
src_tag,
fd);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* publish this endpoint */
rc = orte_iof_proxy_svc_publish(
ORTE_PROC_MY_NAME,
src_tag);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* subscribe to peer */
rc = orte_iof_proxy_svc_subscribe(
src_name,
src_mask,
src_tag,
ORTE_PROC_MY_NAME,
ORTE_NS_CMP_ALL,
src_tag);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
return rc;
}
/**
* Setup buffering for a specified set of endpoints.
*/
int orte_iof_proxy_buffer(
const orte_process_name_t* src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
size_t buffer_size)
{
return ORTE_ERROR;
}
/*
* Subscribe to receive a callback on receipt of data
* from a specified set of peers.
*/
int orte_iof_proxy_subscribe(
const orte_process_name_t* src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag,
orte_iof_base_callback_fn_t cbfunc,
void* cbdata)
{
int rc;
/* create a local registration to reflect the callback */
rc = orte_iof_base_callback_create(ORTE_PROC_MY_NAME,src_tag,cbfunc,cbdata);
if(rc != ORTE_SUCCESS)
return rc;
/* send a subscription message to the service */
rc = orte_iof_proxy_svc_subscribe(
src_name,
src_mask,
src_tag,
ORTE_PROC_MY_NAME,
ORTE_NS_CMP_ALL,
src_tag);
return rc;
}
int orte_iof_proxy_unsubscribe(
const orte_process_name_t* src_name,
orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag)
{
int rc;
/* send an unsubscribe message to the service */
rc = orte_iof_proxy_svc_unsubscribe(
src_name,
src_mask,
src_tag,
ORTE_PROC_MY_NAME,
ORTE_NS_CMP_ALL,
src_tag);
if(rc != ORTE_SUCCESS)
return rc;
/* remove local callback */
return orte_iof_base_callback_delete(ORTE_PROC_MY_NAME,src_tag);
}
int orte_iof_proxy_ft_event(int state) {
int ret, exit_status = ORTE_SUCCESS;
if(OPAL_CRS_CHECKPOINT == state) {
/*
* Flush
*/
if( ORTE_SUCCESS != (ret = orte_iof_base_flush() ) ) {
return ret;
}
/*
* Stop receiving events
*/
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_SVC);
}
else if(OPAL_CRS_CONTINUE == state) {
/*
* Restart Receiving events
*/
if(ORTE_SUCCESS != (ret = orte_rml.recv_nb(
ORTE_NAME_WILDCARD,
mca_iof_proxy_component.proxy_iov,
1,
ORTE_RML_TAG_IOF_SVC,
ORTE_RML_ALLOC|ORTE_RML_PERSISTENT,
orte_iof_proxy_svc_recv,
NULL
) ) ) {
exit_status = ret;
goto cleanup;
}
}
else if(OPAL_CRS_RESTART == state) {
/*
* Restart Receiving events
*/
if(ORTE_SUCCESS != (ret = orte_rml.recv_nb(
ORTE_NAME_WILDCARD,
mca_iof_proxy_component.proxy_iov,
1,
ORTE_RML_TAG_IOF_SVC,
ORTE_RML_ALLOC|ORTE_RML_PERSISTENT,
orte_iof_proxy_svc_recv,
NULL
) ) ) {
exit_status = ret;
goto cleanup;
}
}
else if(OPAL_CRS_TERM == state ) {
;
}
else {
;
}
cleanup:
return exit_status;
}