1
1
openmpi/orte/mca/rml/oob/rml_oob_component.c
Jeff Squyres 7ae9589d70 The header is at the address of the buffer pointed to by the iov, not
the address of the iov.

This commit was SVN r16513.
2007-10-19 12:40:14 +00:00

516 строки
16 KiB
C

/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "rml_oob.h"
#include "opal/util/output.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/orte_constants.h"
#include "orte/mca/rml/base/base.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/oob/oob.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ns/ns.h"
static orte_rml_module_t* rml_oob_init(int* priority);
static int rml_oob_open(void);
static int rml_oob_close(void);
static void rml_oob_recv_route_callback(int status,
struct orte_process_name_t* peer,
struct iovec* iov,
int count,
orte_rml_tag_t tag,
void *cbdata);
static void rml_oob_queued_progress(int fd, short event, void *arg);
/**
* component definition
*/
orte_rml_component_t mca_rml_oob_component = {
/* First, the mca_base_component_t struct containing meta
information about the component itself */
{
/* Indicate that we are a rml v1.0.0 component (which also
implies a specific MCA version) */
ORTE_RML_BASE_VERSION_1_0_0,
"oob", /* MCA component name */
ORTE_MAJOR_VERSION, /* MCA component major version */
ORTE_MINOR_VERSION, /* MCA component minor version */
ORTE_RELEASE_VERSION, /* MCA component release version */
rml_oob_open, /* component open */
rml_oob_close, /* component close */
},
/* Next the MCA v1.0.0 component meta data */
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
rml_oob_init
};
orte_rml_oob_module_t orte_rml_oob_module = {
{
orte_rml_oob_init,
orte_rml_oob_fini,
orte_rml_oob_get_uri,
orte_rml_oob_set_uri,
orte_rml_oob_get_new_name,
orte_rml_oob_ping,
orte_rml_oob_send,
orte_rml_oob_send_nb,
orte_rml_oob_send_buffer,
orte_rml_oob_send_buffer_nb,
orte_rml_oob_recv,
orte_rml_oob_recv_nb,
orte_rml_oob_recv_buffer,
orte_rml_oob_recv_buffer_nb,
orte_rml_oob_recv_cancel,
orte_rml_oob_add_exception,
orte_rml_oob_del_exception,
orte_rml_oob_ft_event
}
};
static int
rml_oob_open(void)
{
int rc;
if (ORTE_SUCCESS != (rc = mca_oob_base_open())) {
ORTE_ERROR_LOG(rc);
return rc;
}
return rc;
}
static int
rml_oob_close(void)
{
int rc;
if (ORTE_SUCCESS != (rc = mca_oob_base_close())) {
return rc;
}
return rc;
}
static orte_rml_module_t*
rml_oob_init(int* priority)
{
if (mca_oob_base_init() != ORTE_SUCCESS)
return NULL;
*priority = 1;
OBJ_CONSTRUCT(&orte_rml_oob_module.exceptions, opal_list_t);
OBJ_CONSTRUCT(&orte_rml_oob_module.exceptions_lock, opal_mutex_t);
OBJ_CONSTRUCT(&orte_rml_oob_module.queued_routing_messages, opal_list_t);
OBJ_CONSTRUCT(&orte_rml_oob_module.queued_lock, opal_mutex_t);
/* Set default timeout for queued messages to be 1/2 second */
orte_rml_oob_module.timeout.tv_sec = 0;
orte_rml_oob_module.timeout.tv_usec = 500000;
orte_rml_oob_module.timer_event = malloc(sizeof(opal_event_t));
if (NULL == orte_rml_oob_module.timer_event) {
return NULL;
}
opal_evtimer_set(orte_rml_oob_module.timer_event, rml_oob_queued_progress,
NULL);
orte_rml_oob_module.active_oob = &mca_oob;
orte_rml_oob_module.active_oob->oob_exception_callback =
orte_rml_oob_exception_callback;
return &orte_rml_oob_module.super;
}
static struct iovec route_recv_iov[1];
int
orte_rml_oob_init(void)
{
int ret;
ret = orte_rml_oob_module.active_oob->oob_init();
route_recv_iov[0].iov_base = NULL;
route_recv_iov[0].iov_len = 0;
ret = orte_rml_oob_module.active_oob->oob_recv_nb(ORTE_NAME_WILDCARD,
route_recv_iov, 1,
ORTE_RML_TAG_RML_ROUTE,
ORTE_RML_ALLOC|ORTE_RML_PERSISTENT,
rml_oob_recv_route_callback,
NULL);
return ret;
}
int
orte_rml_oob_fini(void)
{
opal_list_item_t *item;
while (NULL !=
(item = opal_list_remove_first(&orte_rml_oob_module.exceptions))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&orte_rml_oob_module.exceptions);
OBJ_DESTRUCT(&orte_rml_oob_module.exceptions_lock);
OBJ_DESTRUCT(&orte_rml_oob_module.queued_routing_messages);
OBJ_DESTRUCT(&orte_rml_oob_module.queued_lock);
orte_rml_oob_module.active_oob->oob_exception_callback = NULL;
return ORTE_SUCCESS;
}
int
orte_rml_oob_ft_event(int state) {
int exit_status = ORTE_SUCCESS;
int ret;
if(OPAL_CRS_CHECKPOINT == state) {
;
}
else if(OPAL_CRS_CONTINUE == state) {
;
}
else if(OPAL_CRS_RESTART == state) {
;
}
else if(OPAL_CRS_TERM == state ) {
;
}
else {
;
}
if( ORTE_SUCCESS !=
(ret = orte_rml_oob_module.active_oob->oob_ft_event(state)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if(OPAL_CRS_CHECKPOINT == state) {
;
}
else if(OPAL_CRS_CONTINUE == state) {
;
}
else if(OPAL_CRS_RESTART == state) {
if( ORTE_SUCCESS != (ret = mca_oob_base_close())) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = mca_oob_base_open())) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = mca_oob_base_init())) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if(NULL != orte_process_info.ns_replica_uri) {
orte_rml_oob_set_uri(orte_process_info.ns_replica_uri);
}
if(NULL != orte_process_info.gpr_replica_uri) {
orte_rml_oob_set_uri(orte_process_info.gpr_replica_uri);
}
}
else if(OPAL_CRS_TERM == state ) {
;
}
else {
;
}
cleanup:
return exit_status;
}
static void
msg_construct(orte_rml_oob_msg_t *msg)
{
OBJ_CONSTRUCT(&msg->msg_lock, opal_mutex_t);
OBJ_CONSTRUCT(&msg->msg_cond, opal_condition_t);
msg->msg_status = 0;
msg->msg_complete = false;
msg->msg_persistent = false;
OBJ_CONSTRUCT(&msg->msg_recv_buffer, orte_buffer_t);
msg->msg_data = NULL;
}
static void
msg_destruct(orte_rml_oob_msg_t *msg)
{
if (NULL != msg->msg_data) free(msg->msg_data);
OBJ_DESTRUCT(&msg->msg_recv_buffer);
OBJ_DESTRUCT(&msg->msg_lock);
OBJ_DESTRUCT(&msg->msg_cond);
}
OBJ_CLASS_INSTANCE(orte_rml_oob_msg_t, opal_object_t,
msg_construct, msg_destruct);
static void
queued_msg_construct(orte_rml_oob_queued_msg_t *msg)
{
msg->payload[0].iov_base = NULL;
msg->payload[0].iov_len = 0;
}
static void
queued_msg_destruct(orte_rml_oob_queued_msg_t *msg)
{
if (NULL != msg->payload[0].iov_base) free(msg->payload[0].iov_base);
}
OBJ_CLASS_INSTANCE(orte_rml_oob_queued_msg_t, opal_list_item_t,
queued_msg_construct, queued_msg_destruct);
static void
rml_oob_recv_route_queued_send_callback(int status,
struct orte_process_name_t* peer,
struct iovec* iov,
int count,
orte_rml_tag_t tag,
void* cbdata)
{
orte_rml_oob_queued_msg_t *qmsg = (orte_rml_oob_queued_msg_t*) cbdata;
OBJ_RELEASE(qmsg);
}
static void
rml_oob_queued_progress(int fd, short event, void *arg)
{
orte_rml_oob_queued_msg_t *qmsg;
orte_rml_oob_msg_header_t *hdr;
int real_tag;
int ret;
orte_process_name_t next, origin;
while (true) {
OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock);
qmsg = (orte_rml_oob_queued_msg_t*) opal_list_remove_first(&orte_rml_oob_module.queued_routing_messages);
OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock);
if (NULL == qmsg) break;
hdr = (orte_rml_oob_msg_header_t*) qmsg->payload[0].iov_base;
origin = hdr->origin;
next = orte_routed.get_route(&hdr->destination);
if (next.vpid == ORTE_VPID_INVALID) {
opal_output(0,
"%s tried routing message to %s, can't find route",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->destination));
abort();
}
if (0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, &next, ORTE_PROC_MY_NAME)) {
opal_output(0, "%s trying to get message to %s, routing loop",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->destination));
abort();
}
if (0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, &next, &hdr->destination)) {
real_tag = hdr->tag;
} else {
real_tag = ORTE_RML_TAG_RML_ROUTE;
}
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output,
"%s routing message from %s for %s to %s (tag: %d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->origin),
ORTE_NAME_PRINT(&hdr->destination),
ORTE_NAME_PRINT(&next),
hdr->tag));
ORTE_RML_OOB_MSG_HEADER_HTON(*hdr);
ret = orte_rml_oob_module.active_oob->oob_send_nb(&next,
&origin,
qmsg->payload,
1,
real_tag,
0,
rml_oob_recv_route_queued_send_callback,
qmsg);
if (ORTE_SUCCESS != ret) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN == ret) {
/* still no route -- try again */
OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock);
opal_list_append(&orte_rml_oob_module.queued_routing_messages,
&qmsg->super);
if (1 == opal_list_get_size(&orte_rml_oob_module.queued_routing_messages)) {
opal_evtimer_add(orte_rml_oob_module.timer_event,
&orte_rml_oob_module.timeout);
}
OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock);
} else {
opal_output(0,
"%s failed to send message to %s: %s (rc = %d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&next),
opal_strerror(ret),
ret);
abort();
}
}
}
}
static void
rml_oob_recv_route_send_callback(int status,
struct orte_process_name_t* peer,
struct iovec* iov,
int count,
orte_rml_tag_t tag,
void* cbdata)
{
}
static void
rml_oob_recv_route_callback(int status,
struct orte_process_name_t* peer,
struct iovec* iov,
int count,
orte_rml_tag_t tag,
void *cbdata)
{
orte_rml_oob_msg_header_t *hdr =
(orte_rml_oob_msg_header_t*) iov[0].iov_base;
int real_tag;
int ret;
orte_process_name_t next, origin;
/* BWB -- propogate errors here... */
assert(status >= 0);
ORTE_RML_OOB_MSG_HEADER_NTOH(*hdr);
origin = hdr->origin;
next = orte_routed.get_route(&hdr->destination);
if (next.vpid == ORTE_VPID_INVALID) {
opal_output(0,
"%s tried routing message to %s, can't find route",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->destination));
abort();
}
if (0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, &next, ORTE_PROC_MY_NAME)) {
opal_output(0, "%s trying to get message to %s, routing loop",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->destination));
abort();
}
if (0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, &next, &hdr->destination)) {
real_tag = hdr->tag;
} else {
real_tag = ORTE_RML_TAG_RML_ROUTE;
}
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output,
"%s routing message from %s for %s to %s (tag: %d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->origin),
ORTE_NAME_PRINT(&hdr->destination),
ORTE_NAME_PRINT(&next),
hdr->tag));
ORTE_RML_OOB_MSG_HEADER_HTON(*hdr);
ret = orte_rml_oob_module.active_oob->oob_send_nb(&next,
&origin,
iov,
count,
real_tag,
0,
rml_oob_recv_route_send_callback,
NULL);
if (ORTE_SUCCESS != ret) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN == ret) {
/* no route -- queue and hope we find a route */
orte_rml_oob_queued_msg_t *qmsg = OBJ_NEW(orte_rml_oob_queued_msg_t);
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output,
"%s: no OOB information for %s. Queuing for later.",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&next)));
ORTE_RML_OOB_MSG_HEADER_NTOH(*hdr);
qmsg->payload[0].iov_base = malloc(iov[0].iov_len);
if (NULL == qmsg->payload[0].iov_base) abort();
qmsg->payload[0].iov_len = iov[0].iov_len;
memcpy(qmsg->payload[0].iov_base, iov[0].iov_base, iov[0].iov_len);
OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock);
opal_list_append(&orte_rml_oob_module.queued_routing_messages,
&qmsg->super);
if (1 == opal_list_get_size(&orte_rml_oob_module.queued_routing_messages)) {
opal_evtimer_add(orte_rml_oob_module.timer_event,
&orte_rml_oob_module.timeout);
}
OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock);
} else {
opal_output(0,
"%s failed to send message to %s: %s (rc = %d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&next),
opal_strerror(ret),
ret);
abort();
}
}
}