c3ec2e2614
This commit was SVN r24884.
567 строки
18 KiB
C
567 строки
18 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; -*- */
|
|
/*
|
|
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2011 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
|
|
* Copyright (c) 2011 Los Alamos Nation Security, LLC.
|
|
* All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
#include "orte/constants.h"
|
|
|
|
#ifdef HAVE_ARPA_INET_H
|
|
#include <arpa/inet.h>
|
|
#endif
|
|
|
|
#include "opal/mca/base/base.h"
|
|
#include "opal/util/output.h"
|
|
#include "opal/util/opal_sos.h"
|
|
#include "opal/mca/base/mca_base_param.h"
|
|
#include "opal/mca/backtrace/backtrace.h"
|
|
#include "opal/mca/event/event.h"
|
|
|
|
#include "orte/mca/rml/base/base.h"
|
|
#include "orte/mca/rml/rml_types.h"
|
|
#include "orte/mca/routed/routed.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/runtime/orte_globals.h"
|
|
|
|
#include "orte/mca/oob/oob.h"
|
|
#include "orte/mca/oob/base/base.h"
|
|
#include "rml_oob.h"
|
|
|
|
static orte_rml_module_t* rml_oob_init(int* priority);
|
|
static int rml_oob_open(void);
|
|
static int rml_oob_close(void);
|
|
static void rml_oob_recv_route_callback(int status,
|
|
struct orte_process_name_t* peer,
|
|
struct iovec* iov,
|
|
int count,
|
|
orte_rml_tag_t tag,
|
|
void *cbdata);
|
|
static void rml_oob_queued_progress(int fd, short event, void *arg);
|
|
|
|
/**
|
|
* component definition
|
|
*/
|
|
orte_rml_component_t mca_rml_oob_component = {
|
|
/* First, the mca_base_component_t struct containing meta
|
|
information about the component itself */
|
|
|
|
{
|
|
ORTE_RML_BASE_VERSION_2_0_0,
|
|
|
|
"oob", /* MCA component name */
|
|
ORTE_MAJOR_VERSION, /* MCA component major version */
|
|
ORTE_MINOR_VERSION, /* MCA component minor version */
|
|
ORTE_RELEASE_VERSION, /* MCA component release version */
|
|
rml_oob_open, /* component open */
|
|
rml_oob_close, /* component close */
|
|
},
|
|
{
|
|
/* The component is checkpoint ready */
|
|
MCA_BASE_METADATA_PARAM_CHECKPOINT
|
|
},
|
|
rml_oob_init
|
|
};
|
|
|
|
orte_rml_oob_module_t orte_rml_oob_module = {
|
|
{
|
|
orte_rml_oob_init,
|
|
orte_rml_oob_fini,
|
|
|
|
orte_rml_oob_get_uri,
|
|
orte_rml_oob_set_uri,
|
|
|
|
orte_rml_oob_ping,
|
|
|
|
orte_rml_oob_send,
|
|
orte_rml_oob_send_nb,
|
|
orte_rml_oob_send_buffer,
|
|
orte_rml_oob_send_buffer_nb,
|
|
|
|
orte_rml_oob_recv,
|
|
orte_rml_oob_recv_nb,
|
|
orte_rml_oob_recv_buffer,
|
|
orte_rml_oob_recv_buffer_nb,
|
|
orte_rml_oob_recv_cancel,
|
|
|
|
orte_rml_oob_add_exception,
|
|
orte_rml_oob_del_exception,
|
|
|
|
orte_rml_oob_ft_event,
|
|
|
|
orte_rml_oob_purge
|
|
}
|
|
};
|
|
|
|
/* Local variables */
|
|
static bool init_done = false;
|
|
|
|
static int
|
|
rml_oob_open(void)
|
|
{
|
|
int rc;
|
|
|
|
if (ORTE_SUCCESS != (rc = mca_oob_base_open())) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
|
|
static int
|
|
rml_oob_close(void)
|
|
{
|
|
int rc;
|
|
|
|
if (ORTE_SUCCESS != (rc = mca_oob_base_close())) {
|
|
return rc;
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
static orte_rml_module_t*
|
|
rml_oob_init(int* priority)
|
|
{
|
|
if (init_done) {
|
|
*priority = 1;
|
|
return &orte_rml_oob_module.super;
|
|
}
|
|
|
|
if (mca_oob_base_init() != ORTE_SUCCESS) {
|
|
*priority = -1;
|
|
return NULL;
|
|
}
|
|
|
|
*priority = 1;
|
|
|
|
OBJ_CONSTRUCT(&orte_rml_oob_module.exceptions, opal_list_t);
|
|
OBJ_CONSTRUCT(&orte_rml_oob_module.exceptions_lock, opal_mutex_t);
|
|
OBJ_CONSTRUCT(&orte_rml_oob_module.queued_routing_messages, opal_list_t);
|
|
OBJ_CONSTRUCT(&orte_rml_oob_module.queued_lock, opal_mutex_t);
|
|
/* Set default timeout for queued messages to be 1/2 second */
|
|
orte_rml_oob_module.timeout.tv_sec = 0;
|
|
orte_rml_oob_module.timeout.tv_usec = 500000;
|
|
orte_rml_oob_module.timer_event = (opal_event_t *) malloc(sizeof(opal_event_t));
|
|
if (NULL == orte_rml_oob_module.timer_event) {
|
|
return NULL;
|
|
}
|
|
opal_event_evtimer_set(opal_event_base, orte_rml_oob_module.timer_event,
|
|
rml_oob_queued_progress,
|
|
NULL);
|
|
|
|
orte_rml_oob_module.active_oob = &mca_oob;
|
|
orte_rml_oob_module.active_oob->oob_exception_callback =
|
|
orte_rml_oob_exception_callback;
|
|
|
|
init_done = true;
|
|
return &orte_rml_oob_module.super;
|
|
}
|
|
|
|
static struct iovec route_recv_iov[1];
|
|
|
|
int
|
|
orte_rml_oob_init(void)
|
|
{
|
|
int ret;
|
|
|
|
ret = orte_rml_oob_module.active_oob->oob_init();
|
|
|
|
route_recv_iov[0].iov_base = NULL;
|
|
route_recv_iov[0].iov_len = 0;
|
|
|
|
ret = orte_rml_oob_module.active_oob->oob_recv_nb(ORTE_NAME_WILDCARD,
|
|
route_recv_iov, 1,
|
|
ORTE_RML_TAG_RML_ROUTE,
|
|
ORTE_RML_ALLOC|ORTE_RML_PERSISTENT,
|
|
rml_oob_recv_route_callback,
|
|
NULL);
|
|
|
|
/* enable the base receive to get updates on contact info */
|
|
orte_rml_base_comm_start();
|
|
|
|
return ret;
|
|
}
|
|
|
|
|
|
int
|
|
orte_rml_oob_fini(void)
|
|
{
|
|
opal_list_item_t *item;
|
|
|
|
while (NULL !=
|
|
(item = opal_list_remove_first(&orte_rml_oob_module.exceptions))) {
|
|
OBJ_RELEASE(item);
|
|
}
|
|
OBJ_DESTRUCT(&orte_rml_oob_module.exceptions);
|
|
OBJ_DESTRUCT(&orte_rml_oob_module.exceptions_lock);
|
|
OBJ_DESTRUCT(&orte_rml_oob_module.queued_routing_messages);
|
|
OBJ_DESTRUCT(&orte_rml_oob_module.queued_lock);
|
|
orte_rml_oob_module.active_oob->oob_exception_callback = NULL;
|
|
|
|
/* clear the base receive */
|
|
orte_rml_base_comm_stop();
|
|
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
|
|
int
|
|
orte_rml_oob_ft_event(int state) {
|
|
int exit_status = ORTE_SUCCESS;
|
|
int ret;
|
|
|
|
if(OPAL_CRS_CHECKPOINT == state) {
|
|
;
|
|
}
|
|
else if(OPAL_CRS_CONTINUE == state) {
|
|
;
|
|
}
|
|
else if(OPAL_CRS_RESTART == state) {
|
|
;
|
|
}
|
|
else if(OPAL_CRS_TERM == state ) {
|
|
;
|
|
}
|
|
else {
|
|
;
|
|
}
|
|
|
|
if( ORTE_SUCCESS !=
|
|
(ret = orte_rml_oob_module.active_oob->oob_ft_event(state)) ) {
|
|
ORTE_ERROR_LOG(ret);
|
|
exit_status = ret;
|
|
goto cleanup;
|
|
}
|
|
|
|
|
|
if(OPAL_CRS_CHECKPOINT == state) {
|
|
;
|
|
}
|
|
else if(OPAL_CRS_CONTINUE == state) {
|
|
;
|
|
}
|
|
else if(OPAL_CRS_RESTART == state) {
|
|
if( ORTE_SUCCESS != (ret = mca_oob_base_close())) {
|
|
ORTE_ERROR_LOG(ret);
|
|
exit_status = ret;
|
|
goto cleanup;
|
|
}
|
|
|
|
if( ORTE_SUCCESS != (ret = mca_oob_base_open())) {
|
|
ORTE_ERROR_LOG(ret);
|
|
exit_status = ret;
|
|
goto cleanup;
|
|
}
|
|
|
|
if( ORTE_SUCCESS != (ret = mca_oob_base_init())) {
|
|
ORTE_ERROR_LOG(ret);
|
|
exit_status = ret;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
else if(OPAL_CRS_TERM == state ) {
|
|
;
|
|
}
|
|
else {
|
|
;
|
|
}
|
|
|
|
cleanup:
|
|
return exit_status;
|
|
}
|
|
|
|
|
|
static void
|
|
msg_construct(orte_rml_oob_msg_t *msg)
|
|
{
|
|
OBJ_CONSTRUCT(&msg->msg_lock, opal_mutex_t);
|
|
OBJ_CONSTRUCT(&msg->msg_cond, opal_condition_t);
|
|
msg->msg_status = 0;
|
|
msg->msg_complete = false;
|
|
msg->msg_persistent = false;
|
|
OBJ_CONSTRUCT(&msg->msg_recv_buffer, opal_buffer_t);
|
|
msg->msg_data = NULL;
|
|
}
|
|
|
|
static void
|
|
msg_destruct(orte_rml_oob_msg_t *msg)
|
|
{
|
|
if (NULL != msg->msg_data) free(msg->msg_data);
|
|
OBJ_DESTRUCT(&msg->msg_recv_buffer);
|
|
OBJ_DESTRUCT(&msg->msg_lock);
|
|
OBJ_DESTRUCT(&msg->msg_cond);
|
|
}
|
|
|
|
OBJ_CLASS_INSTANCE(orte_rml_oob_msg_t, opal_object_t,
|
|
msg_construct, msg_destruct);
|
|
|
|
static void
|
|
queued_msg_construct(orte_rml_oob_queued_msg_t *msg)
|
|
{
|
|
msg->payload[0].iov_base = NULL;
|
|
msg->payload[0].iov_len = 0;
|
|
}
|
|
|
|
static void
|
|
queued_msg_destruct(orte_rml_oob_queued_msg_t *msg)
|
|
{
|
|
if (NULL != msg->payload[0].iov_base) free(msg->payload[0].iov_base);
|
|
}
|
|
|
|
OBJ_CLASS_INSTANCE(orte_rml_oob_queued_msg_t, opal_list_item_t,
|
|
queued_msg_construct, queued_msg_destruct);
|
|
|
|
static void
|
|
rml_oob_recv_route_queued_send_callback(int status,
|
|
struct orte_process_name_t* peer,
|
|
struct iovec* iov,
|
|
int count,
|
|
orte_rml_tag_t tag,
|
|
void* cbdata)
|
|
{
|
|
orte_rml_oob_queued_msg_t *qmsg = (orte_rml_oob_queued_msg_t*) cbdata;
|
|
OBJ_RELEASE(qmsg);
|
|
}
|
|
|
|
|
|
static void
|
|
rml_oob_queued_progress(int fd, short event, void *arg)
|
|
{
|
|
orte_rml_oob_queued_msg_t *qmsg;
|
|
orte_rml_oob_msg_header_t *hdr;
|
|
int real_tag;
|
|
int ret;
|
|
orte_process_name_t next, origin;
|
|
|
|
while (true) {
|
|
OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock);
|
|
qmsg = (orte_rml_oob_queued_msg_t*) opal_list_remove_first(&orte_rml_oob_module.queued_routing_messages);
|
|
OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock);
|
|
if (NULL == qmsg) break;
|
|
|
|
hdr = (orte_rml_oob_msg_header_t*) qmsg->payload[0].iov_base;
|
|
origin = hdr->origin;
|
|
|
|
next = orte_routed.get_route(&hdr->destination);
|
|
#if 0
|
|
if (next.vpid == ORTE_VPID_INVALID) {
|
|
opal_output(0,
|
|
"%s:queued progress tried routing message from %s to %s:%d, can't find route",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&hdr->origin),
|
|
ORTE_NAME_PRINT(&hdr->destination),
|
|
hdr->tag);
|
|
opal_backtrace_print(stderr);
|
|
orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
|
|
}
|
|
#endif
|
|
|
|
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &next, ORTE_PROC_MY_NAME)) {
|
|
opal_output(0, "%s:queued progress trying to get message from %s to %s:%d, routing loop",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&hdr->origin),
|
|
ORTE_NAME_PRINT(&hdr->destination),
|
|
hdr->tag);
|
|
opal_backtrace_print(stderr);
|
|
orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
|
|
}
|
|
|
|
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &next, &hdr->destination)) {
|
|
real_tag = hdr->tag;
|
|
} else {
|
|
real_tag = ORTE_RML_TAG_RML_ROUTE;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output,
|
|
"%s routing message from %s for %s to %s (tag: %d)",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&hdr->origin),
|
|
ORTE_NAME_PRINT(&hdr->destination),
|
|
ORTE_NAME_PRINT(&next),
|
|
hdr->tag));
|
|
|
|
ORTE_RML_OOB_MSG_HEADER_HTON(*hdr);
|
|
|
|
ret = orte_rml_oob_module.active_oob->oob_send_nb(&next,
|
|
&origin,
|
|
qmsg->payload,
|
|
1,
|
|
real_tag,
|
|
0,
|
|
rml_oob_recv_route_queued_send_callback,
|
|
qmsg);
|
|
|
|
if (ORTE_SUCCESS != ret) {
|
|
if (ORTE_ERR_ADDRESSEE_UNKNOWN == OPAL_SOS_GET_ERROR_CODE(ret)) {
|
|
/* still no route -- try again */
|
|
ORTE_RML_OOB_MSG_HEADER_NTOH(*hdr);
|
|
OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock);
|
|
opal_list_append(&orte_rml_oob_module.queued_routing_messages,
|
|
&qmsg->super);
|
|
if (1 == opal_list_get_size(&orte_rml_oob_module.queued_routing_messages)) {
|
|
opal_event_evtimer_add(orte_rml_oob_module.timer_event,
|
|
&orte_rml_oob_module.timeout);
|
|
}
|
|
OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock);
|
|
} else {
|
|
opal_output(0,
|
|
"%s failed to send message from %s to %s:%d %s (rc = %d)",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&next),
|
|
ORTE_NAME_PRINT(&origin),
|
|
real_tag,
|
|
ORTE_ERROR_NAME(ret),
|
|
ret);
|
|
abort();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
rml_oob_recv_route_send_callback(int status,
|
|
struct orte_process_name_t* peer,
|
|
struct iovec* iov,
|
|
int count,
|
|
orte_rml_tag_t tag,
|
|
void* cbdata)
|
|
{
|
|
/* NTH: free the iovec allocated by rml_oob_recv_route_callback */
|
|
free (iov);
|
|
}
|
|
|
|
|
|
static void
|
|
rml_oob_recv_route_callback(int status,
|
|
struct orte_process_name_t* peer,
|
|
struct iovec* iov,
|
|
int count,
|
|
orte_rml_tag_t tag,
|
|
void *cbdata)
|
|
{
|
|
orte_rml_oob_msg_header_t *hdr =
|
|
(orte_rml_oob_msg_header_t*) iov[0].iov_base;
|
|
int real_tag;
|
|
int ret;
|
|
orte_process_name_t next, origin;
|
|
struct iovec *new_iov;
|
|
|
|
/* BWB -- propogate errors here... */
|
|
assert(status >= 0);
|
|
|
|
ORTE_RML_OOB_MSG_HEADER_NTOH(*hdr);
|
|
|
|
origin = hdr->origin;
|
|
|
|
next = orte_routed.get_route(&hdr->destination);
|
|
if (next.vpid == ORTE_VPID_INVALID) {
|
|
#if 0
|
|
opal_output(0, "%s:route_callback tried routing message from %s to %s:%d, can't find route",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&origin),
|
|
ORTE_NAME_PRINT(&hdr->destination),
|
|
hdr->tag);
|
|
opal_backtrace_print(stderr);
|
|
orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
|
|
#endif
|
|
return;
|
|
}
|
|
|
|
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &next, ORTE_PROC_MY_NAME)) {
|
|
opal_output(0, "%s:route_callback trying to get message from %s to %s:%d, routing loop",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&origin),
|
|
ORTE_NAME_PRINT(&hdr->destination),
|
|
hdr->tag);
|
|
opal_backtrace_print(stderr);
|
|
orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
|
|
}
|
|
|
|
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &next, &hdr->destination)) {
|
|
real_tag = hdr->tag;
|
|
} else {
|
|
real_tag = ORTE_RML_TAG_RML_ROUTE;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output,
|
|
"%s routing message from %s for %s to %s (tag: %d)",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&hdr->origin),
|
|
ORTE_NAME_PRINT(&hdr->destination),
|
|
ORTE_NAME_PRINT(&next),
|
|
hdr->tag));
|
|
|
|
ORTE_RML_OOB_MSG_HEADER_HTON(*hdr);
|
|
|
|
/* NTH: fix potential race condition. oob may modify iov before the oob send completes */
|
|
new_iov = calloc (count, sizeof (struct iovec));
|
|
if (NULL == new_iov) {
|
|
opal_output (0, "%s:route_callback malloc error!", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
opal_backtrace_print(stderr);
|
|
orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
|
|
}
|
|
|
|
memcpy (new_iov, iov, count * sizeof (struct iovec));
|
|
|
|
ret = orte_rml_oob_module.active_oob->oob_send_nb(&next,
|
|
&origin,
|
|
new_iov,
|
|
count,
|
|
real_tag,
|
|
0,
|
|
rml_oob_recv_route_send_callback,
|
|
NULL);
|
|
|
|
if (ORTE_SUCCESS != ret) {
|
|
if (ORTE_ERR_ADDRESSEE_UNKNOWN == OPAL_SOS_GET_ERROR_CODE(ret)) {
|
|
/* no route -- queue and hope we find a route */
|
|
orte_rml_oob_queued_msg_t *qmsg = OBJ_NEW(orte_rml_oob_queued_msg_t);
|
|
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output,
|
|
"%s: no OOB information for %s. Queuing for later.",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&next)));
|
|
ORTE_RML_OOB_MSG_HEADER_NTOH(*hdr);
|
|
qmsg->payload[0].iov_base = (IOVBASE_TYPE*) malloc(iov[0].iov_len);
|
|
if (NULL == qmsg->payload[0].iov_base) abort();
|
|
qmsg->payload[0].iov_len = iov[0].iov_len;
|
|
memcpy(qmsg->payload[0].iov_base, iov[0].iov_base, iov[0].iov_len);
|
|
OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock);
|
|
opal_list_append(&orte_rml_oob_module.queued_routing_messages,
|
|
&qmsg->super);
|
|
if (1 == opal_list_get_size(&orte_rml_oob_module.queued_routing_messages)) {
|
|
opal_event_evtimer_add(orte_rml_oob_module.timer_event,
|
|
&orte_rml_oob_module.timeout);
|
|
}
|
|
OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock);
|
|
} else {
|
|
opal_output(0,
|
|
"%s failed to send message to %s: %s (rc = %d)",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&next),
|
|
opal_strerror(ret),
|
|
ret);
|
|
orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
|
|
}
|
|
}
|
|
}
|