/* -*- Mode: C; c-basic-offset:4 ; -*- */ /* * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2011 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Los Alamos Nation Security, LLC. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "orte_config.h" #include "orte/constants.h" #ifdef HAVE_ARPA_INET_H #include #endif #include "opal/mca/base/base.h" #include "opal/util/output.h" #include "opal/mca/base/mca_base_param.h" #include "opal/mca/backtrace/backtrace.h" #include "opal/mca/event/event.h" #include "orte/mca/rml/base/base.h" #include "orte/mca/rml/rml_types.h" #include "orte/mca/routed/routed.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/mca/oob/oob.h" #include "orte/mca/oob/base/base.h" #include "rml_oob.h" static orte_rml_module_t* rml_oob_init(int* priority); static int rml_oob_open(void); static int rml_oob_close(void); static void rml_oob_recv_route_callback(int status, struct orte_process_name_t* peer, struct iovec* iov, int count, orte_rml_tag_t tag, void *cbdata); static void rml_oob_queued_progress(int fd, short event, void *arg); /** * component definition */ orte_rml_component_t mca_rml_oob_component = { /* First, the mca_base_component_t struct containing meta information about the component itself */ { ORTE_RML_BASE_VERSION_2_0_0, "oob", /* MCA component name */ ORTE_MAJOR_VERSION, /* MCA component major version */ ORTE_MINOR_VERSION, /* MCA component minor version */ ORTE_RELEASE_VERSION, /* MCA component release version */ rml_oob_open, /* component open */ rml_oob_close, /* component close */ }, { /* The component is checkpoint ready */ MCA_BASE_METADATA_PARAM_CHECKPOINT }, rml_oob_init }; orte_rml_oob_module_t orte_rml_oob_module = { { orte_rml_oob_init, orte_rml_oob_fini, orte_rml_oob_get_uri, orte_rml_oob_set_uri, orte_rml_oob_ping, orte_rml_oob_send, orte_rml_oob_send_nb, orte_rml_oob_send_buffer, orte_rml_oob_send_buffer_nb, orte_rml_oob_recv, orte_rml_oob_recv_nb, orte_rml_oob_recv_buffer, orte_rml_oob_recv_buffer_nb, orte_rml_oob_recv_cancel, orte_rml_oob_add_exception, orte_rml_oob_del_exception, orte_rml_oob_ft_event, orte_rml_oob_purge } }; /* Local variables */ static bool init_done = false; static int rml_oob_open(void) { int rc; if (ORTE_SUCCESS != (rc = mca_oob_base_open())) { ORTE_ERROR_LOG(rc); return rc; } return rc; } static int rml_oob_close(void) { int rc; if (ORTE_SUCCESS != (rc = mca_oob_base_close())) { return rc; } return rc; } static orte_rml_module_t* rml_oob_init(int* priority) { if (init_done) { *priority = 1; return &orte_rml_oob_module.super; } if (mca_oob_base_init() != ORTE_SUCCESS) { *priority = -1; return NULL; } *priority = 1; OBJ_CONSTRUCT(&orte_rml_oob_module.exceptions, opal_list_t); OBJ_CONSTRUCT(&orte_rml_oob_module.exceptions_lock, opal_mutex_t); OBJ_CONSTRUCT(&orte_rml_oob_module.queued_routing_messages, opal_list_t); OBJ_CONSTRUCT(&orte_rml_oob_module.queued_lock, opal_mutex_t); /* Set default timeout for queued messages to be 1/2 second */ orte_rml_oob_module.timeout.tv_sec = 0; orte_rml_oob_module.timeout.tv_usec = 500000; orte_rml_oob_module.timer_event = (opal_event_t *) malloc(sizeof(opal_event_t)); if (NULL == orte_rml_oob_module.timer_event) { return NULL; } opal_event_evtimer_set(opal_event_base, orte_rml_oob_module.timer_event, rml_oob_queued_progress, NULL); orte_rml_oob_module.active_oob = &mca_oob; orte_rml_oob_module.active_oob->oob_exception_callback = orte_rml_oob_exception_callback; init_done = true; return &orte_rml_oob_module.super; } static struct iovec route_recv_iov[1]; int orte_rml_oob_init(void) { int ret; ret = orte_rml_oob_module.active_oob->oob_init(); route_recv_iov[0].iov_base = NULL; route_recv_iov[0].iov_len = 0; ret = orte_rml_oob_module.active_oob->oob_recv_nb(ORTE_NAME_WILDCARD, route_recv_iov, 1, ORTE_RML_TAG_RML_ROUTE, ORTE_RML_ALLOC|ORTE_RML_PERSISTENT, rml_oob_recv_route_callback, NULL); /* enable the base receive to get updates on contact info */ orte_rml_base_comm_start(); return ret; } int orte_rml_oob_fini(void) { opal_list_item_t *item; while (NULL != (item = opal_list_remove_first(&orte_rml_oob_module.exceptions))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&orte_rml_oob_module.exceptions); OBJ_DESTRUCT(&orte_rml_oob_module.exceptions_lock); OBJ_DESTRUCT(&orte_rml_oob_module.queued_routing_messages); OBJ_DESTRUCT(&orte_rml_oob_module.queued_lock); orte_rml_oob_module.active_oob->oob_exception_callback = NULL; /* clear the base receive */ orte_rml_base_comm_stop(); return ORTE_SUCCESS; } int orte_rml_oob_ft_event(int state) { int exit_status = ORTE_SUCCESS; int ret; if(OPAL_CRS_CHECKPOINT == state) { ; } else if(OPAL_CRS_CONTINUE == state) { ; } else if(OPAL_CRS_RESTART == state) { ; } else if(OPAL_CRS_TERM == state ) { ; } else { ; } if( ORTE_SUCCESS != (ret = orte_rml_oob_module.active_oob->oob_ft_event(state)) ) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } if(OPAL_CRS_CHECKPOINT == state) { ; } else if(OPAL_CRS_CONTINUE == state) { ; } else if(OPAL_CRS_RESTART == state) { if( ORTE_SUCCESS != (ret = mca_oob_base_close())) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } if( ORTE_SUCCESS != (ret = mca_oob_base_open())) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } if( ORTE_SUCCESS != (ret = mca_oob_base_init())) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } } else if(OPAL_CRS_TERM == state ) { ; } else { ; } cleanup: return exit_status; } static void msg_construct(orte_rml_oob_msg_t *msg) { OBJ_CONSTRUCT(&msg->msg_lock, opal_mutex_t); OBJ_CONSTRUCT(&msg->msg_cond, opal_condition_t); msg->msg_status = 0; msg->msg_complete = false; msg->msg_persistent = false; OBJ_CONSTRUCT(&msg->msg_recv_buffer, opal_buffer_t); msg->msg_data = NULL; } static void msg_destruct(orte_rml_oob_msg_t *msg) { if (NULL != msg->msg_data) free(msg->msg_data); OBJ_DESTRUCT(&msg->msg_recv_buffer); OBJ_DESTRUCT(&msg->msg_lock); OBJ_DESTRUCT(&msg->msg_cond); } OBJ_CLASS_INSTANCE(orte_rml_oob_msg_t, opal_object_t, msg_construct, msg_destruct); static void queued_msg_construct(orte_rml_oob_queued_msg_t *msg) { msg->payload[0].iov_base = NULL; msg->payload[0].iov_len = 0; } static void queued_msg_destruct(orte_rml_oob_queued_msg_t *msg) { if (NULL != msg->payload[0].iov_base) free(msg->payload[0].iov_base); } OBJ_CLASS_INSTANCE(orte_rml_oob_queued_msg_t, opal_list_item_t, queued_msg_construct, queued_msg_destruct); static void rml_oob_recv_route_queued_send_callback(int status, struct orte_process_name_t* peer, struct iovec* iov, int count, orte_rml_tag_t tag, void* cbdata) { orte_rml_oob_queued_msg_t *qmsg = (orte_rml_oob_queued_msg_t*) cbdata; OBJ_RELEASE(qmsg); } static void rml_oob_queued_progress(int fd, short event, void *arg) { orte_rml_oob_queued_msg_t *qmsg; orte_rml_oob_msg_header_t *hdr; int real_tag; int ret; orte_process_name_t next, origin; while (true) { OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock); qmsg = (orte_rml_oob_queued_msg_t*) opal_list_remove_first(&orte_rml_oob_module.queued_routing_messages); OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock); if (NULL == qmsg) break; hdr = (orte_rml_oob_msg_header_t*) qmsg->payload[0].iov_base; origin = hdr->origin; next = orte_routed.get_route(&hdr->destination); if (next.vpid == ORTE_VPID_INVALID) { opal_output(0, "%s:queued progress tried routing message from %s to %s:%d, can't find route", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&hdr->origin), ORTE_NAME_PRINT(&hdr->destination), hdr->tag); opal_backtrace_print(stderr); orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL); } if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &next, ORTE_PROC_MY_NAME)) { opal_output(0, "%s:queued progress trying to get message from %s to %s:%d, routing loop", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&hdr->origin), ORTE_NAME_PRINT(&hdr->destination), hdr->tag); opal_backtrace_print(stderr); orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL); } if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &next, &hdr->destination)) { real_tag = hdr->tag; } else { real_tag = ORTE_RML_TAG_RML_ROUTE; } OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output, "%s routing message from %s for %s to %s (tag: %d)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&hdr->origin), ORTE_NAME_PRINT(&hdr->destination), ORTE_NAME_PRINT(&next), hdr->tag)); ORTE_RML_OOB_MSG_HEADER_HTON(*hdr); ret = orte_rml_oob_module.active_oob->oob_send_nb(&next, &origin, qmsg->payload, 1, real_tag, 0, rml_oob_recv_route_queued_send_callback, qmsg); if (ORTE_SUCCESS != ret) { if (ORTE_ERR_ADDRESSEE_UNKNOWN == ret) { /* still no route -- try again */ ORTE_RML_OOB_MSG_HEADER_NTOH(*hdr); OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock); opal_list_append(&orte_rml_oob_module.queued_routing_messages, &qmsg->super); if (1 == opal_list_get_size(&orte_rml_oob_module.queued_routing_messages)) { opal_event_evtimer_add(orte_rml_oob_module.timer_event, &orte_rml_oob_module.timeout); } OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock); } else { opal_output(0, "%s failed to send message from %s to %s:%d %s (rc = %d)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&next), ORTE_NAME_PRINT(&origin), real_tag, ORTE_ERROR_NAME(ret), ret); abort(); } } } } static void rml_oob_recv_route_send_callback(int status, struct orte_process_name_t* peer, struct iovec* iov, int count, orte_rml_tag_t tag, void* cbdata) { /* NTH: free the iovec allocated by rml_oob_recv_route_callback */ free (iov); } static void rml_oob_recv_route_callback(int status, struct orte_process_name_t* peer, struct iovec* iov, int count, orte_rml_tag_t tag, void *cbdata) { orte_rml_oob_msg_header_t *hdr = (orte_rml_oob_msg_header_t*) iov[0].iov_base; int real_tag; int ret; orte_process_name_t next, origin; struct iovec *new_iov; /* BWB -- propogate errors here... */ assert(status >= 0); ORTE_RML_OOB_MSG_HEADER_NTOH(*hdr); origin = hdr->origin; next = orte_routed.get_route(&hdr->destination); if (next.vpid == ORTE_VPID_INVALID) { opal_output(0, "%s:route_callback tried routing message from %s to %s:%d, can't find route", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&origin), ORTE_NAME_PRINT(&hdr->destination), hdr->tag); opal_backtrace_print(stderr); orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL); return; } if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &next, ORTE_PROC_MY_NAME)) { opal_output(0, "%s:route_callback trying to get message from %s to %s:%d, routing loop", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&origin), ORTE_NAME_PRINT(&hdr->destination), hdr->tag); opal_backtrace_print(stderr); orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL); } if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &next, &hdr->destination)) { real_tag = hdr->tag; } else { real_tag = ORTE_RML_TAG_RML_ROUTE; } OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output, "%s routing message from %s for %s to %s (tag: %d)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&hdr->origin), ORTE_NAME_PRINT(&hdr->destination), ORTE_NAME_PRINT(&next), hdr->tag)); ORTE_RML_OOB_MSG_HEADER_HTON(*hdr); /* NTH: fix potential race condition. oob may modify iov before the oob send completes */ new_iov = (struct iovec*) calloc (count, sizeof (struct iovec)); if (NULL == new_iov) { opal_output (0, "%s:route_callback malloc error!", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_backtrace_print(stderr); orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL); } memcpy (new_iov, iov, count * sizeof (struct iovec)); ret = orte_rml_oob_module.active_oob->oob_send_nb(&next, &origin, new_iov, count, real_tag, 0, rml_oob_recv_route_send_callback, NULL); if (ORTE_SUCCESS != ret) { if (ORTE_ERR_ADDRESSEE_UNKNOWN == ret) { /* no route -- queue and hope we find a route */ orte_rml_oob_queued_msg_t *qmsg = OBJ_NEW(orte_rml_oob_queued_msg_t); OPAL_OUTPUT_VERBOSE((1, orte_rml_base_output, "%s: no OOB information for %s. Queuing for later.", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&next))); ORTE_RML_OOB_MSG_HEADER_NTOH(*hdr); qmsg->payload[0].iov_base = (IOVBASE_TYPE*) malloc(iov[0].iov_len); if (NULL == qmsg->payload[0].iov_base) abort(); qmsg->payload[0].iov_len = iov[0].iov_len; memcpy(qmsg->payload[0].iov_base, iov[0].iov_base, iov[0].iov_len); OPAL_THREAD_LOCK(&orte_rml_oob_module.queued_lock); opal_list_append(&orte_rml_oob_module.queued_routing_messages, &qmsg->super); if (1 == opal_list_get_size(&orte_rml_oob_module.queued_routing_messages)) { opal_event_evtimer_add(orte_rml_oob_module.timer_event, &orte_rml_oob_module.timeout); } OPAL_THREAD_UNLOCK(&orte_rml_oob_module.queued_lock); } else { opal_output(0, "%s failed to send message to %s: %s (rc = %d)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&next), opal_strerror(ret), ret); orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL); } } }