1
1
openmpi/ompi/mca/common/ofacm/common_ofacm_oob.c

1671 строка
58 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2006-2012 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2008-2012 Mellanox Technologies. All rights reserved.
*
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "opal/runtime/opal_progress.h"
#include "opal/dss/dss.h"
#include "opal/util/alfg.h"
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
#include "ompi/mca/rte/rte.h"
#include "connect.h"
#include "base.h"
#include "opal/class/opal_hash_table.h"
#include "opal/class/opal_object.h"
#include "ompi/constants.h"
#include <inttypes.h>
#define MAX_LINE_LEN 80
#define NUM_OF_TOKENS 7
typedef enum {
ENDPOINT_CONNECT_REQUEST,
ENDPOINT_CONNECT_RESPONSE,
ENDPOINT_CONNECT_ACK
} connect_message_type_t;
typedef struct port_to_switch_lids{
uint16_t port_lid;
uint16_t switch_lid;
struct port_to_switch_lids* next;
} port_to_switch_lids;
typedef struct switch_to_switch_sl{
uint16_t switch_lid;
uint8_t service_level;
struct switch_to_switch_sl* next;
} switch_to_switch_sl;
static int oob_priority = 50;
static bool rml_recv_posted = false;
static rng_buff_t rand_buff;
static void oob_component_register(void);
static int oob_component_query(ompi_common_ofacm_base_dev_desc_t *dev,
ompi_common_ofacm_base_module_t **cpc);
static int oob_component_finalize(void);
static int oob_module_start_connect(ompi_common_ofacm_base_local_connection_context_t* context);
static int reply_start_connect(ompi_common_ofacm_base_local_connection_context_t* context,
ompi_common_ofacm_base_remote_connection_context_t *remote_info);
static int set_remote_info(ompi_common_ofacm_base_local_connection_context_t *context,
ompi_common_ofacm_base_remote_connection_context_t *remote_info);
static int qp_connect_all(ompi_common_ofacm_base_local_connection_context_t* context);
static int qp_create_all(ompi_common_ofacm_base_local_connection_context_t* context);
static int qp_create_one(ompi_common_ofacm_base_local_connection_context_t* context, int qp);
static int send_connect_data(ompi_common_ofacm_base_local_connection_context_t* context,
uint8_t message_type);
static ompi_common_ofacm_base_local_connection_context_t*
oob_endpoint_init(ompi_proc_t *proc,
ompi_common_ofacm_base_qp_config_t *qp_config,
struct ibv_pd *pd, uint64_t subnet_id, int cpc_type,
uint16_t lid, uint16_t rem_lid,
int32_t user_context_index, void *user_context,
ompi_common_ofacm_base_module_t *cpc,
ompi_common_ofacm_base_context_connect_cb_fn_t connect_cb,
ompi_common_ofacm_base_context_error_cb_fn_t error_cb,
ompi_common_ofacm_base_context_prepare_recv_cb_fn_t prepare_recv_cb);
static int oob_endpoint_finalize(ompi_common_ofacm_base_local_connection_context_t *context);
static void report_error(ompi_common_ofacm_base_local_connection_context_t* context);
static void rml_send_cb(int status, ompi_process_name_t* endpoint,
opal_buffer_t* buffer, ompi_rml_tag_t tag,
void* cbdata);
static void rml_recv_cb(int status, ompi_process_name_t* process_name,
opal_buffer_t* buffer, ompi_rml_tag_t tag,
void* cbdata);
/* Build service level hashtables per port */
static int create_service_level_table_for_port(uint16_t lid,
opal_hash_table_t* port_to_switch_hash_table,
opal_hash_table_t* switch_to_switch_hash_table);
/* Pick the service level of path between to endpoints */
static int pick_service_level(uint16_t src_port_lid, uint16_t dst_port_lid,
uint8_t* service_level,
opal_hash_table_t* port_to_switch_hash_table,
opal_hash_table_t* switch_to_switch_hash_table);
/*
* The "component" struct -- the top-level function pointers for the
* oob connection scheme.
*/
ompi_common_ofacm_base_component_t ompi_common_ofacm_oob = {
"oob",
/* Register */
oob_component_register,
/* Init */
NULL,
/* Query */
oob_component_query,
/* Finalize */
oob_component_finalize,
};
/* Open - this functions sets up any oob specific commandline params */
static void oob_component_register(void)
{
MCA/base: Add new MCA variable system Features: - Support for an override parameter file (openmpi-mca-param-override.conf). Variable values in this file can not be overridden by any file or environment value. - Support for boolean, unsigned, and unsigned long long variables. - Support for true/false values. - Support for enumerations on integer variables. - Support for MPIT scope, verbosity, and binding. - Support for command line source. - Support for setting variable source via the environment using OMPI_MCA_SOURCE_<var name>=source (either command or file:filename) - Cleaner API. - Support for variable groups (equivalent to MPIT categories). Notes: - Variables must be created with a backing store (char **, int *, or bool *) that must live at least as long as the variable. - Creating a variable with the MCA_BASE_VAR_FLAG_SETTABLE enables the use of mca_base_var_set_value() to change the value. - String values are duplicated when the variable is registered. It is up to the caller to free the original value if necessary. The new value will be freed by the mca_base_var system and must not be freed by the user. - Variables with constant scope may not be settable. - Variable groups (and all associated variables) are deregistered when the component is closed or the component repository item is freed. This prevents a segmentation fault from accessing a variable after its component is unloaded. - After some discussion we decided we should remove the automatic registration of component priority variables. Few component actually made use of this feature. - The enumerator interface was updated to be general enough to handle future uses of the interface. - The code to generate ompi_info output has been moved into the MCA variable system. See mca_base_var_dump(). opal: update core and components to mca_base_var system orte: update core and components to mca_base_var system ompi: update core and components to mca_base_var system This commit also modifies the rmaps framework. The following variables were moved from ppr and lama: rmaps_base_pernode, rmaps_base_n_pernode, rmaps_base_n_persocket. Both lama and ppr create synonyms for these variables. This commit was SVN r28236.
2013-03-28 01:09:41 +04:00
oob_priority = 50;
(void) mca_base_var_register("ompi", "common", "ofacm", "connect_oob_priority",
"The selection method priority for oob",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&oob_priority);
}
/*
* Init function. Post non-blocking RML receive to accept incoming
* connection requests.
*/
static int oob_component_query(ompi_common_ofacm_base_dev_desc_t *dev,
ompi_common_ofacm_base_module_t **cpc)
{
MCA/base: Add new MCA variable system Features: - Support for an override parameter file (openmpi-mca-param-override.conf). Variable values in this file can not be overridden by any file or environment value. - Support for boolean, unsigned, and unsigned long long variables. - Support for true/false values. - Support for enumerations on integer variables. - Support for MPIT scope, verbosity, and binding. - Support for command line source. - Support for setting variable source via the environment using OMPI_MCA_SOURCE_<var name>=source (either command or file:filename) - Cleaner API. - Support for variable groups (equivalent to MPIT categories). Notes: - Variables must be created with a backing store (char **, int *, or bool *) that must live at least as long as the variable. - Creating a variable with the MCA_BASE_VAR_FLAG_SETTABLE enables the use of mca_base_var_set_value() to change the value. - String values are duplicated when the variable is registered. It is up to the caller to free the original value if necessary. The new value will be freed by the mca_base_var system and must not be freed by the user. - Variables with constant scope may not be settable. - Variable groups (and all associated variables) are deregistered when the component is closed or the component repository item is freed. This prevents a segmentation fault from accessing a variable after its component is unloaded. - After some discussion we decided we should remove the automatic registration of component priority variables. Few component actually made use of this feature. - The enumerator interface was updated to be general enough to handle future uses of the interface. - The code to generate ompi_info output has been moved into the MCA variable system. See mca_base_var_dump(). opal: update core and components to mca_base_var system orte: update core and components to mca_base_var system ompi: update core and components to mca_base_var system This commit also modifies the rmaps framework. The following variables were moved from ppr and lama: rmaps_base_pernode, rmaps_base_n_pernode, rmaps_base_n_persocket. Both lama and ppr create synonyms for these variables. This commit was SVN r28236.
2013-03-28 01:09:41 +04:00
if (oob_priority > 100) {
oob_priority = 100;
} else if (oob_priority < -1) {
oob_priority = -1;
}
/* If we have the transport_type member, check to ensure we're on
IB (this CPC will not work with iWarp). If we do not have the
transport_type member, then we must be < OFED v1.2, and
therefore we must be IB. */
#if defined(HAVE_STRUCT_IBV_DEVICE_TRANSPORT_TYPE)
if (IBV_TRANSPORT_IB != dev->ib_dev->transport_type) {
OFACM_VERBOSE(("OFACM: oob CPC only supported on InfiniBand; skipped on device %s",
ibv_get_device_name(dev->ib_dev)));
return OMPI_ERR_NOT_SUPPORTED;
}
#endif
if (dev->capabilities & OMPI_COMMON_OFACM_XRC_ONLY) {
OFACM_VERBOSE(("OFACM: oob CPC not supported with XRC receive queues, please try xoob CPC; skipped"));
return OMPI_ERR_NOT_SUPPORTED;
}
/* If this btl supports OOB, then post the RML message. But
ensure to only post it *once*, because another btl may have
come in before this and already posted it. */
if (!rml_recv_posted) {
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB: *** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE *** Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro. *************************************************************************************** I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week. The code is in https://bitbucket.org/rhc/ompi-oob2 WHAT: Rewrite of ORTE OOB WHY: Support asynchronous progress and a host of other features WHEN: Wed, August 21 SYNOPSIS: The current OOB has served us well, but a number of limitations have been identified over the years. Specifically: * it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code) * we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface. * the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients * there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort * only one transport (i.e., component) can be "active" The revised OOB resolves these problems: * async progress is used for all application processes, with the progress thread blocking in the event library * each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on") * multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC. * a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions. * opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object * NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions * obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel * the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport * routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active * all blocking send/recv APIs have been removed. Everything operates asynchronously. KNOWN LIMITATIONS: * although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline * the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker * routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways * obviously, not every error path has been tested nor necessarily covered * determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost. * reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways * the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC This commit was SVN r29058.
2013-08-22 20:37:40 +04:00
ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_OFACM,
OMPI_RML_PERSISTENT,
rml_recv_cb,
NULL);
rml_recv_posted = true;
}
*cpc = malloc(sizeof(ompi_common_ofacm_base_module_t));
if (NULL == *cpc) {
ompi_rte_recv_cancel(OMPI_NAME_WILDCARD, OMPI_RML_TAG_OFACM);
rml_recv_posted = false;
OFACM_VERBOSE(("openib BTL: oob CPC system error (malloc failed)"));
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Init global list of all connection contexts */
OBJ_CONSTRUCT(&ompi_common_ofacm_oob.all_procs, opal_list_t);
(*cpc)->data.cbm_component = &ompi_common_ofacm_oob;
(*cpc)->data.cbm_priority = oob_priority;
(*cpc)->data.cbm_modex_message = NULL;
(*cpc)->data.cbm_modex_message_len = 0;
(*cpc)->cbm_endpoint_init = oob_endpoint_init;
(*cpc)->cbm_start_connect = oob_module_start_connect;
(*cpc)->cbm_endpoint_finalize = oob_endpoint_finalize;
(*cpc)->cbm_finalize = NULL;
(*cpc)->cbm_uses_cts = false;
/* seed RNG */
opal_srand(&rand_buff,(uint32_t) getpid());
OFACM_VERBOSE(("openib BTL: oob CPC available for use on %s",
ibv_get_device_name(dev->ib_dev)));
return OMPI_SUCCESS;
}
static ompi_common_ofacm_base_proc_t* find_proc(ompi_proc_t *proc)
{
ompi_common_ofacm_base_proc_t *ret = NULL;
opal_list_item_t *item;
opal_list_t *list = &ompi_common_ofacm_oob.all_procs;
for (item = opal_list_get_first(list);
item != opal_list_get_end(list);
item = opal_list_get_next(item)) {
if (proc == ((ompi_common_ofacm_base_proc_t *)item)->proc_ompi){
ret = (ompi_common_ofacm_base_proc_t *)item;
}
}
return ret;
}
/* OOB connection context init */
static ompi_common_ofacm_base_local_connection_context_t*
oob_endpoint_init(ompi_proc_t *proc,
ompi_common_ofacm_base_qp_config_t *qp_config,
struct ibv_pd *pd, uint64_t subnet_id, int cpc_type,
uint16_t lid, uint16_t rem_lid,
int32_t user_context_index, void *user_context,
ompi_common_ofacm_base_module_t *cpc,
ompi_common_ofacm_base_context_connect_cb_fn_t connect_cb,
ompi_common_ofacm_base_context_error_cb_fn_t error_cb,
ompi_common_ofacm_base_context_prepare_recv_cb_fn_t prepare_recv_cb)
{
int ret;
bool new_proc;
ompi_common_ofacm_base_local_connection_context_t *context;
ompi_common_ofacm_base_proc_t *context_proc;
context = (ompi_common_ofacm_base_local_connection_context_t*)
OBJ_NEW(ompi_common_ofacm_base_local_connection_context_t);
context_proc = find_proc(proc);
if (NULL == context_proc) {
new_proc = true;
/* constructing new proc */
context_proc = (ompi_common_ofacm_base_proc_t *)
OBJ_NEW(ompi_common_ofacm_base_proc_t );
} else {
new_proc = false;
OBJ_RETAIN(context_proc);
}
ompi_common_ofacm_base_proc_setup(context_proc, context, proc);
ret = ompi_common_ofacm_base_context_init(context, cpc, connect_cb, error_cb,
prepare_recv_cb, context_proc, qp_config,
pd, subnet_id, cpc_type, lid, rem_lid, user_context_index, user_context);
if (OMPI_SUCCESS != ret) {
OBJ_DESTRUCT(context_proc);
OBJ_DESTRUCT(context);
return NULL;
}
if (new_proc) {
opal_list_append(&ompi_common_ofacm_oob.all_procs, (opal_list_item_t *)context_proc);
}
return context;
}
/* OOB connection context finalization */
static int oob_endpoint_finalize
(ompi_common_ofacm_base_local_connection_context_t *context)
{
opal_list_item_t *proc_item, *cntx_item, *cntx_item_next;
bool found = false;
bool pfound = false;
int qp;
opal_list_t *proc_list = &ompi_common_ofacm_oob.all_procs;
/* Proc cleanup. We should find the context proc in all proc list and remove
* from the proc list our context. After it we try to release the proc context */
for (proc_item = opal_list_get_first(proc_list);
proc_item != opal_list_get_end(proc_list);
proc_item = opal_list_get_next(proc_item)) {
if (context->proc == ((ompi_common_ofacm_base_proc_t *)proc_item)){
ompi_common_ofacm_base_proc_t *proc =
(ompi_common_ofacm_base_proc_t *)proc_item;
opal_list_t *cntx_list = &proc->all_contexts;
pfound = true;
/* Remove the context from proc list */
cntx_item = opal_list_get_first(cntx_list);
while(cntx_item != opal_list_get_end(cntx_list)) {
/* take the next before removing from the list */
cntx_item_next = opal_list_get_next(cntx_item);
if (context == (ompi_common_ofacm_base_local_connection_context_t *)cntx_item) {
found = true;
opal_list_remove_item(cntx_list, cntx_item);
}
cntx_item = cntx_item_next;
}
/* Remove our proc from all list */
if (opal_list_is_empty(cntx_list)) {
opal_list_remove_item(proc_list, (opal_list_item_t *)proc);
}
OBJ_RELEASE(proc);
}
}
/* Release QPs */
for (qp = 0; qp < context->num_of_qps; qp++) {
if(NULL != context->qps[qp].lcl_qp) {
if(ibv_destroy_qp(context->qps[qp].lcl_qp)) {
OFACM_ERROR(("Failed to destroy QP:%d\n", qp));
}
}
}
assert(true == found);
assert(true == pfound);
/* We done with proc release and now we way destroy the context */
OBJ_RELEASE(context);
return OMPI_SUCCESS;
}
/*
* Connect function. Start initiation of connections to a remote
* peer. We send our Queue Pair information over the RML/OOB
* communication mechanism. On completion of our send, a send
* completion handler is called.
*/
static int oob_module_start_connect(ompi_common_ofacm_base_local_connection_context_t *context)
{
int rc;
if (OMPI_SUCCESS != (rc = qp_create_all(context))) {
return rc;
}
/* Send connection info over to remote endpoint */
context->state = MCA_COMMON_OFACM_CONNECTING;
if (OMPI_SUCCESS !=
(rc = send_connect_data(context, ENDPOINT_CONNECT_REQUEST))) {
OFACM_ERROR(("error sending connect request, error code %d", rc));
return rc;
}
return OMPI_SUCCESS;
}
/*
* Component finalize function. Cleanup RML non-blocking receive.
*/
static int oob_component_finalize(void)
{
if (rml_recv_posted) {
ompi_rte_recv_cancel(OMPI_NAME_WILDCARD, OMPI_RML_TAG_OFACM);
rml_recv_posted = false;
}
return OMPI_SUCCESS;
}
/**************************************************************************/
/*
* Reply to a `start - connect' message
*/
static int reply_start_connect(ompi_common_ofacm_base_local_connection_context_t* context,
ompi_common_ofacm_base_remote_connection_context_t *remote_info)
{
int rc;
OFACM_VERBOSE(("Initialized QPs, LID = %d", context->lid));
/* Create local QP's and post receive resources */
if (OMPI_SUCCESS != (rc = qp_create_all(context))) {
return rc;
}
/* Set the remote side info */
set_remote_info(context, remote_info);
/* Connect to remote endpoint qp's */
if (OMPI_SUCCESS != (rc = qp_connect_all(context))) {
return rc;
}
/* Send connection info over to remote endpoint */
context->state = MCA_COMMON_OFACM_CONNECT_ACK;
if (OMPI_SUCCESS !=
(rc = send_connect_data(context, ENDPOINT_CONNECT_RESPONSE))) {
OFACM_ERROR(("error in endpoint send connect request error code is %d",
rc));
return rc;
}
return OMPI_SUCCESS;
}
static int set_remote_info(ompi_common_ofacm_base_local_connection_context_t *context,
ompi_common_ofacm_base_remote_connection_context_t *remote_info)
{
/* copy the remote_info stuff */
memcpy(&context->remote_info,
remote_info, sizeof(ompi_common_ofacm_base_remote_connection_context_t ));
OFACM_VERBOSE(("Setting QP info, LID = %d", context->remote_info.rem_lid));
return OMPI_SUCCESS;
}
/*
* Connect the local ends of all qp's to the remote side
*/
static int qp_connect_all(ompi_common_ofacm_base_local_connection_context_t* context)
{
int i;
uint8_t service_level = 0;
uint32_t rtr_mask = 0, rts_mask = 0;
int rc = OMPI_SUCCESS;
static bool is_hash_table_initialized = false;
static opal_hash_table_t switch_to_switch_hash_table;
static opal_hash_table_t port_to_switch_hash_table;
/* Create two hash tables for a given port in order to allow
* an efficient search of service level on any route exiting
* from it */
if((NULL != ompi_common_ofacm_three_dim_torus) &&
(false == is_hash_table_initialized)){
rc = create_service_level_table_for_port(context->lid, &port_to_switch_hash_table,
&switch_to_switch_hash_table);
if(OMPI_SUCCESS != rc){
/* Failed to create service table for port */
return OMPI_ERROR;
}
is_hash_table_initialized = true;
}
/* Pick the Service Level of each route from the table */
if(is_hash_table_initialized){
rc = pick_service_level(context->lid, context->remote_info.rem_lid, &service_level,
&port_to_switch_hash_table, &switch_to_switch_hash_table);
if(OMPI_SUCCESS != rc){
/* Failed to retrieve service level on the route */
return OMPI_ERROR;
}
/*printf("Debug: qp_connect_all: lid %hu rem lid %hu num_qps %d SL %c\n", context->lid,
context->remote_info.rem_lid, context->num_of_qps, service_level);*/
}
for (i = 0; i < context->num_of_qps; i++) {
struct ibv_qp_attr attr;
struct ibv_qp* qp = context->qps[i].lcl_qp;
enum ibv_mtu mtu = (context->attr[i].path_mtu < context->remote_info.rem_mtu) ?
context->attr[i].path_mtu : context->remote_info.rem_mtu;
memset(&attr, 0, sizeof(attr));
memcpy(&attr, context->attr, sizeof(struct ibv_qp_attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = mtu;
attr.dest_qp_num = context->remote_info.rem_qps[i].rem_qp_num;
attr.rq_psn = context->remote_info.rem_qps[i].rem_psn;
attr.ah_attr.dlid = context->remote_info.rem_lid;
if(is_hash_table_initialized){
attr.ah_attr.sl = service_level;
}
/* JMS to be filled in later dynamically */
attr.ah_attr.static_rate = 0;
rtr_mask = IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_MIN_RNR_TIMER;
/* applying user specified rtr mask */
if (NULL != context->custom_rtr_attr_mask) {
rtr_mask |= context->custom_rtr_attr_mask[i];
}
OFACM_VERBOSE(("Set MTU to IBV value %d (%s bytes)", mtu,
(mtu == IBV_MTU_256) ? "256" :
(mtu == IBV_MTU_512) ? "512" :
(mtu == IBV_MTU_1024) ? "1024" :
(mtu == IBV_MTU_2048) ? "2048" :
(mtu == IBV_MTU_4096) ? "4096" :
"unknown (!)"));
if (ibv_modify_qp(qp, &attr, rtr_mask)) {
OFACM_ERROR(("Error modifing QP to RTR errno says %s",
strerror(errno)));
return OMPI_ERROR;
}
attr.qp_state = IBV_QPS_RTS;
/* On PP QPs we have SW flow control, no need for rnr retries. Setting
* it to zero helps to catch bugs */
/*
attr.rnr_retry = BTL_OPENIB_QP_TYPE_PP(i) ? 0 :
mca_btl_openib_component.ib_rnr_retry;
*/
attr.sq_psn = context->qps[i].lcl_psn;
rts_mask = IBV_QP_STATE |
IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC;
/* applying user specified rts mask */
if (NULL != context->custom_rts_attr_mask) {
rts_mask |= context->custom_rts_attr_mask[i];
}
if (ibv_modify_qp(qp, &attr, rts_mask)) {
OFACM_ERROR(("error modifying QP to RTS errno says %s",
strerror(errno)));
return OMPI_ERROR;
}
}
return OMPI_SUCCESS;
}
/*
* Create the local side of all the qp's. The remote sides will be
* connected later.
*/
static int qp_create_all(ompi_common_ofacm_base_local_connection_context_t* context)
{
int qp, rc;
for (qp = 0; qp < context->num_of_qps; ++qp) {
rc = qp_create_one(context, qp);
if (OMPI_SUCCESS != rc) {
return rc;
}
}
/* Now that all the qp's are created locally, post some receive
buffers, setup credits, etc. */
return context->prepare_recv_cb(context->user_context);
}
/*
* Create the local side of one qp. The remote side will be connected
* later.
*/
static int qp_create_one(ompi_common_ofacm_base_local_connection_context_t *context, int qp)
{
struct ibv_qp *my_qp;
struct ibv_qp_init_attr init_attr;
struct ibv_qp_attr attr;
size_t req_inline = context->init_attr[qp].cap.max_inline_data;
uint32_t init_mask = 0;
/* Taking default init attributes from user */
memcpy(&init_attr, &context->init_attr[qp], sizeof(init_attr));
my_qp = ibv_create_qp(context->ib_pd, &init_attr);
if (NULL == my_qp) {
OFACM_ERROR(("error creating qp errno says %s", strerror(errno)));
return OMPI_ERROR;
}
context->qps[qp].lcl_qp = my_qp;
if (init_attr.cap.max_inline_data < req_inline) {
context->qps[qp].ib_inline_max = init_attr.cap.max_inline_data;
opal_show_help("help-mpi-common-ofacm-cpc-base.txt",
"inline truncated", true, ompi_process_info.nodename,
req_inline, init_attr.cap.max_inline_data);
} else {
context->qps[qp].ib_inline_max = req_inline;
}
/* Taking default attributes from user */
memcpy(&attr, &context->attr[qp], sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
init_mask = IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS;
/* apply user specified init mask */
if (NULL != context->custom_init_attr_mask) {
init_mask |= context->custom_init_attr_mask[qp];
}
if (ibv_modify_qp(context->qps[qp].lcl_qp,
&attr, init_mask)) {
OFACM_ERROR(("Error modifying qp to INIT errno says %s", strerror(errno)));
return OMPI_ERROR;
}
/* Setup meta data on the endpoint */
//context->qps[qp].lcl_psn = lrand48() & 0xffffff;
context->qps[qp].lcl_psn = opal_rand(&rand_buff) & 0xffffff;
return OMPI_SUCCESS;
}
/*
* RML send connect information to remote endpoint
*/
static int send_connect_data(ompi_common_ofacm_base_local_connection_context_t* context,
uint8_t message_type)
{
opal_buffer_t* buffer = OBJ_NEW(opal_buffer_t);
int rc;
if (NULL == buffer) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* pack the info in the send buffer */
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT8));
OFACM_VERBOSE(("type %d\n", message_type));
rc = opal_dss.pack(buffer, &message_type, 1, OPAL_UINT8);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT64));
rc = opal_dss.pack(buffer, &context->subnet_id, 1, OPAL_UINT64);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
if (message_type != ENDPOINT_CONNECT_REQUEST) {
/* send the QP connect request info we respond to */
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer,
&context->remote_info.rem_qps[0].rem_qp_num, 1,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT16));
rc = opal_dss.pack(buffer, &context->remote_info.rem_lid, 1, OPAL_UINT16);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
}
if (message_type != ENDPOINT_CONNECT_ACK) {
int qp;
/* send CM type/family */
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_INT));
rc = opal_dss.pack(buffer, &context->cpc_type, 1, OPAL_INT);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
/* Pasha: Send number of qp here. We don't must to send number of QPs here, BUT
* recv side callback code is pretty complicated and I don't want to touch
* it now. So best work around on this stage is send another 1byte with number of
* qps.
*/
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT8));
rc = opal_dss.pack(buffer, &context->num_of_qps, 1, OPAL_UINT8);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
/* stuff all the QP info into the buffer */
for (qp = 0; qp < context->num_of_qps; qp++) {
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &context->qps[qp].lcl_qp->qp_num,
1, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &context->qps[qp].lcl_psn, 1,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
}
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT16));
rc = opal_dss.pack(buffer, &context->lid, 1, OPAL_UINT16);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &context->attr[0].path_mtu, 1,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &context->index, 1, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
}
/* send to remote endpoint */
rc = ompi_rte_send_buffer_nb(&context->proc->proc_ompi->proc_name,
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB: *** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE *** Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro. *************************************************************************************** I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week. The code is in https://bitbucket.org/rhc/ompi-oob2 WHAT: Rewrite of ORTE OOB WHY: Support asynchronous progress and a host of other features WHEN: Wed, August 21 SYNOPSIS: The current OOB has served us well, but a number of limitations have been identified over the years. Specifically: * it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code) * we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface. * the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients * there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort * only one transport (i.e., component) can be "active" The revised OOB resolves these problems: * async progress is used for all application processes, with the progress thread blocking in the event library * each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on") * multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC. * a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions. * opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object * NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions * obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel * the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport * routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active * all blocking send/recv APIs have been removed. Everything operates asynchronously. KNOWN LIMITATIONS: * although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline * the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker * routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways * obviously, not every error path has been tested nor necessarily covered * determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost. * reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways * the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC This commit was SVN r29058.
2013-08-22 20:37:40 +04:00
buffer, OMPI_RML_TAG_OFACM,
rml_send_cb, NULL);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Sent QP Info, LID = %d, SUBNET = %lx\n",
context->lid,
context->subnet_id));
return OMPI_SUCCESS;
}
static void report_error(ompi_common_ofacm_base_local_connection_context_t* context)
{
if (NULL == context || NULL == context->error_cb) {
/* The context is undefined and we can not print specific error */
opal_show_help("help-mpi-common-ofacm-oob.txt",
"ofacm oob fatal error", true,
ompi_process_info.nodename,
__FILE__, __LINE__);
exit(1);
}
/* Other way, call to user error callback */
context->error_cb(context->user_context);
}
/*
* Callback when we have finished RML sending the connect data to a
* remote peer
*/
static void rml_send_cb(int status, ompi_process_name_t* endpoint,
opal_buffer_t* buffer, ompi_rml_tag_t tag,
void* cbdata)
{
OBJ_RELEASE(buffer);
}
/*
* Non blocking RML recv callback. Read incoming QP and other info,
* and if this endpoint is trying to connect, reply with our QP info,
* otherwise try to modify QP's and establish reliable connection
*/
static void rml_recv_cb(int status, ompi_process_name_t* process_name,
opal_buffer_t* buffer, ompi_rml_tag_t tag,
void* cbdata)
{
int context_state;
int rc;
uint32_t lcl_qp = 0;
uint16_t lcl_lid = 0;
int32_t cnt = 1;
ompi_common_ofacm_base_remote_connection_context_t remote_info;
ompi_common_ofacm_base_local_connection_context_t *l_context;
ompi_common_ofacm_base_proc_t *proc;
uint8_t message_type, num_qps;
int cpc_type;
opal_list_t *procs_list = &ompi_common_ofacm_oob.all_procs;
opal_list_t *context_list;
bool master;
/* start by unpacking data first so we know who is knocking at
our door */
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT8));
rc = opal_dss.unpack(buffer, &message_type, &cnt, OPAL_UINT8);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT64));
rc = opal_dss.unpack(buffer, &remote_info.rem_subnet_id, &cnt, OPAL_UINT64);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
if (ENDPOINT_CONNECT_REQUEST != message_type) {
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &lcl_qp, &cnt, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT16));
rc = opal_dss.unpack(buffer, &lcl_lid, &cnt, OPAL_UINT16);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
}
if (ENDPOINT_CONNECT_ACK != message_type) {
int qp;
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_INT));
rc = opal_dss.unpack(buffer, &cpc_type, &cnt, OPAL_INT);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
/* Pasha: Reading number of qps, in original code we tool it from
* btl component. In future we may change order of operations here. We may start
* lookup for connection descriptor after receiving subnet_id and lid. But in order
* to do it here I need totally to rewrite the recv callback...next time ;)
*/
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT8));
rc = opal_dss.unpack(buffer, &num_qps, &cnt, OPAL_UINT8);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
/* get ready for the data */
ompi_common_ofacm_base_remote_context_init(&remote_info,
num_qps, 0);
/* unpack all the qp info */
for (qp = 0; qp < num_qps; ++qp) {
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &remote_info.rem_qps[qp].rem_qp_num, &cnt,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &remote_info.rem_qps[qp].rem_psn, &cnt,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
}
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT16));
rc = opal_dss.unpack(buffer, &remote_info.rem_lid, &cnt, OPAL_UINT16);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &remote_info.rem_mtu, &cnt, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &remote_info.rem_index, &cnt, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
report_error(NULL);
return;
}
}
OFACM_VERBOSE(("Received QP Info, LID = %d, SUBNET = %lx, CPC_TYPE = %d",
remote_info.rem_lid,
remote_info.rem_subnet_id,
cpc_type));
master = ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL, OMPI_PROC_MY_NAME,
process_name) >= 0 ? true : false;
for (proc = (ompi_common_ofacm_base_proc_t *)opal_list_get_first(procs_list);
proc != (ompi_common_ofacm_base_proc_t *)opal_list_get_end(procs_list);
proc = (ompi_common_ofacm_base_proc_t *)opal_list_get_next(proc)){
bool found = false;
if (ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
&proc->proc_ompi->proc_name,
process_name) != OPAL_EQUAL) {
continue;
}
context_list = &proc->all_contexts;
if (ENDPOINT_CONNECT_REQUEST != message_type) {
/* This is a reply message. Try to get the endpoint
instance the reply belongs to */
for (l_context = (ompi_common_ofacm_base_local_connection_context_t *)opal_list_get_first(context_list);
l_context != (ompi_common_ofacm_base_local_connection_context_t *)opal_list_get_end(context_list);
l_context = (ompi_common_ofacm_base_local_connection_context_t *)opal_list_get_next(l_context)) {
if (l_context->qps[0].lcl_qp != NULL &&
lcl_lid == l_context->lid &&
lcl_qp == l_context->qps[0].lcl_qp->qp_num &&
remote_info.rem_subnet_id == l_context->subnet_id) {
found = true;
break;
}
}
} else {
/* This is new connection request. If this is master try
to find endpoint in a connecting state. If this is
slave try to find endpoint in closed state and
initiate connection back */
ompi_common_ofacm_base_local_connection_context_t *context_found = NULL;
for (l_context = (ompi_common_ofacm_base_local_connection_context_t *)opal_list_get_first(context_list);
l_context != (ompi_common_ofacm_base_local_connection_context_t *)opal_list_get_end(context_list);
l_context = (ompi_common_ofacm_base_local_connection_context_t *)opal_list_get_next(l_context)) {
if (l_context->subnet_id != remote_info.rem_subnet_id ||
l_context->cpc_type != cpc_type ||
(l_context->state != MCA_COMMON_OFACM_CONNECTING
&& l_context->state != MCA_COMMON_OFACM_CLOSED))
continue;
found = true;
context_found = l_context;
if ((master &&
MCA_COMMON_OFACM_CONNECTING == l_context->state) ||
(!master &&
MCA_COMMON_OFACM_CLOSED == l_context->state))
break; /* Found one. No point to continue */
}
l_context = context_found;
/* if this is slave and there is no endpoints in closed
state then all connection are already in progress so
just ignore this connection request */
if (found && !master &&
MCA_COMMON_OFACM_CLOSED != l_context->state) {
return;
}
}
if (!found) {
OFACM_ERROR(("can't find suitable endpoint for this peer\n"));
report_error(NULL);
return;
}
OPAL_THREAD_LOCK(&l_context->context_lock);
context_state = l_context->state;
/* Update status */
switch (context_state) {
case MCA_COMMON_OFACM_CLOSED:
/* We had this connection closed before. The endpoint is
trying to connect. Move the status of this connection
to CONNECTING, and then reply with our QP
information */
if (master) {
rc = reply_start_connect(l_context, &remote_info);
} else {
rc = oob_module_start_connect(l_context);
}
if (OMPI_SUCCESS != rc) {
OFACM_ERROR(("error in endpoint reply start connect"));
report_error(l_context);
break;
}
/* As long as we expect a message from the peer (in order
to setup the connection) let the event engine pool the
RML events. Note: we increment it once peer active
connection. */
opal_progress_event_users_increment();
break;
case MCA_COMMON_OFACM_CONNECTING:
/* preparing remote info for this context */
ompi_common_ofacm_base_remote_context_init(&l_context->remote_info,
l_context->num_of_qps, 0);
/* need to check status here */
set_remote_info(l_context, &remote_info);
if (OMPI_SUCCESS != (rc = qp_connect_all(l_context))) {
OFACM_ERROR(("endpoint connect error: %d", rc));
report_error(l_context);
break;
}
if (master) {
l_context->state = MCA_COMMON_OFACM_WAITING_ACK;
/* Send him an ACK */
send_connect_data(l_context, ENDPOINT_CONNECT_RESPONSE);
} else {
send_connect_data(l_context, ENDPOINT_CONNECT_ACK);
/* Tell main BTL that we're done */
l_context->state = MCA_COMMON_OFACM_CONNECTED;
l_context->connect_cb(l_context->user_context);
}
break;
case MCA_COMMON_OFACM_WAITING_ACK:
/* Tell main BTL that we're done */
l_context->state = MCA_COMMON_OFACM_CONNECTED;
l_context->connect_cb(l_context->user_context);
break;
case MCA_COMMON_OFACM_CONNECT_ACK:
send_connect_data(l_context, ENDPOINT_CONNECT_ACK);
/* Tell main BTL that we're done */
l_context->state = MCA_COMMON_OFACM_CONNECTED;
l_context->connect_cb(l_context->user_context);
break;
case MCA_COMMON_OFACM_CONNECTED:
break;
default :
OFACM_ERROR(("Invalid endpoint state %d", context_state));
report_error(l_context);
}
OPAL_THREAD_UNLOCK(&l_context->context_lock);
break;
}
}
/*
* Get the service level on the route between
* source port LID and destination port LID.
* @Param src_port_lid - LID of the source port.
* @Param dst_port_lid - LID of destination port.
* @Param service_level - Returned value.
* The service level on the route between source port
* to destination port.
* @return - Error Code. Non Zero value on error.
*/
static int pick_service_level(uint16_t src_port_lid, uint16_t dst_port_lid, uint8_t* service_level,
opal_hash_table_t* port_to_switch_hash_table, opal_hash_table_t* switch_to_switch_hash_table)
{
uint8_t* sl;
uint16_t* dst_switch_lid;
void* p_src_switch_lid = NULL;
void* p_dst_switch_lid = NULL;
void* p_service_level = NULL;
int rc = OMPI_SUCCESS;
/* Get the switch LID connected tothe source HCA LID */
rc = opal_hash_table_get_value_ptr(port_to_switch_hash_table, &src_port_lid, sizeof(uint16_t), &p_src_switch_lid);
if(OMPI_SUCCESS != rc){
/* Could not find source port LID */
rc = OMPI_ERROR;
return rc;
}
/* Get the switch LID connected to the destination HCA LID */
rc = opal_hash_table_get_value_ptr(port_to_switch_hash_table, &dst_port_lid, sizeof(uint16_t), &p_dst_switch_lid);
if(OMPI_SUCCESS != rc){
/* Could not find destination port LID */
rc = OMPI_ERROR;
return rc;
}
dst_switch_lid = (uint16_t*)p_dst_switch_lid;
/* Get the service level of the route beween the source HCA LID and destination HCA LID */
rc = opal_hash_table_get_value_ptr(switch_to_switch_hash_table, dst_switch_lid, sizeof(uint16_t), &p_service_level);
if(OMPI_SUCCESS != rc){
/* Could not find destination switch LID in hashtable*/
rc = OMPI_ERROR;
return rc;
}
sl = (uint8_t*)p_service_level;
*service_level = *sl;
return rc;
}
/*
* Get the size of the port to switch hashtable from a file.
* @Params fp - Descriptor of the input file.
* @Param hash_table_size - Pointer to the size of
* the port to switch hashtable.
* @param head - pointer to a linked list containing
* the pairs to be stored in the hashtable.
* @return - Error code. Non zero value for failure.
*/
static int get_port_to_switch_hashtable_data_from_file(FILE* fp, int* hash_table_size, port_to_switch_lids** head)
{
int i;
char c;
int num_items;
int rc = OMPI_SUCCESS;
int ret = OMPI_SUCCESS;
uint64_t guid;
uint16_t port_lid;
uint16_t switch_lid;
uint16_t mtu, rate, lmc; /* TODO: Check binary representation */
int port_number;
port_to_switch_lids* item = NULL;
port_to_switch_lids* p_head = *head;
port_to_switch_lids* p_next_item = NULL;
char str[MAX_LINE_LEN] = "\0";
char input_str[NUM_OF_TOKENS][MAX_LINE_LEN] = {"\0"};
char expected_str[NUM_OF_TOKENS][MAX_LINE_LEN] = {"\0"};
c = fgetc(fp);
fseek(fp, -1, SEEK_CUR);
/* Init expected input strings */
strcpy(expected_str[0], "Channel");
strcpy(expected_str[1], "Adapter");
strcpy(expected_str[2], "base");
strcpy(expected_str[3], "LID");
strcpy(expected_str[4], "LMC");
strcpy(expected_str[5], "port");
/* Create list */
p_head = (port_to_switch_lids*)calloc(1, sizeof(port_to_switch_lids));
if(NULL == p_head){
rc = OMPI_ERR_OUT_OF_RESOURCE;
return rc;
}
*head = p_head;
/* Pre-process the port-to-switch table */
while(EOF != c)
{
ret = fscanf(fp, "%s %s %" PRIx64 " %c", input_str[0], input_str[1], &guid, &c);
ret += fscanf(fp, "%s %s %hx %c", input_str[2], input_str[3], &port_lid, &c);
ret += fscanf(fp, "%s %hu %c", input_str[4], &lmc, &c);
ret += fscanf(fp, "%s %s %d", input_str[6], input_str[5], &port_number);
if(14 != ret){
rc = OMPI_ERR_FILE_READ_FAILURE;
return rc;
}
for(i = 0; i < 6; i++)
{
/*if(strncmp(str, table_header, hash_table_header_size)){*/
if(strcmp(input_str[i], expected_str[i])){
/* Incorrect table header */
rc = OMPI_ERROR;
return rc;
}
}
c = fgetc(fp);
fgets(str, MAX_LINE_LEN, fp);
if(strncmp(str, "# LID : MTU : RATE", strlen(str) - 1)){
/* Incorrect table header */
rc = OMPI_ERROR;
return rc;
}
c = fgetc(fp);
fseek(fp, -1, SEEK_CUR);
/* Read next line */
fgets(str, MAX_LINE_LEN, fp);
/* Update the port to switch hashtable size if read valid data */
num_items = sscanf(str, "%hx %c %hu %c %hu", &switch_lid, &c, &mtu, &c, &rate);
if(5 == num_items){
(*hash_table_size)++;
}
else{
/* Wrong file format */
rc = OMPI_ERROR;
return rc;
}
/* Store port LID and switch LID */
item = calloc(1, sizeof(port_to_switch_lids));
if(NULL == item){
rc = OMPI_ERR_OUT_OF_RESOURCE;
return rc;
}
item->port_lid = port_lid;
item->switch_lid = switch_lid;
/* Insert the item to the head of the list */
p_next_item = p_head->next;
p_head->next = item;
item->next = p_next_item;
/* Get Next char */
c = fgetc(fp);
fseek(fp, -1, SEEK_CUR);
}
return rc;
}
/*
* Get from the input file the size of the
* switch-to-switch hashtable dedicated for
* the input switch LID.
* @Params fp - Descriptor of the input file.
* @Param switch_lid - the source switch local ID (LID).
* @Param hash_table_size - Pointer to the hashtable size.
* Value returned by this routine.
* @Param head - pointer to a linked list containing the pairs
* to be stored in the hashtable.
* @return - Error code. Non zero value for failure.
*/
static int get_switch_to_switch_hashtable_size_from_file(FILE* fp, uint16_t switch_lid, int* hash_table_size, switch_to_switch_sl** head)
{
int i;
char c;
int num_items;
int port;
uint64_t guid;
uint16_t source_lid;
uint16_t dest_lid;
int rc = OMPI_SUCCESS;
int ret = OMPI_SUCCESS;uint8_t service_level;
switch_to_switch_sl* item = NULL;
switch_to_switch_sl* p_head = NULL;
switch_to_switch_sl* p_next_item = NULL;
int table_offset = 0;
int offset_in_table = 0;
char str[MAX_LINE_LEN] = "\0";
char input_str[NUM_OF_TOKENS][MAX_LINE_LEN] = {"\0"};
char expected_str[NUM_OF_TOKENS][MAX_LINE_LEN] = {"\0"};
/* Init expected strings */
strcpy(expected_str[0], "Switch");
strcpy(expected_str[1], "base");
strcpy(expected_str[2], "LID");
strcpy(expected_str[3], "port");
/* Allocate empty list */
p_head = (switch_to_switch_sl*)calloc(1, sizeof(switch_to_switch_sl));
if(NULL == p_head){
rc = OMPI_ERR_OUT_OF_RESOURCE;
return rc;
}
*head = p_head;
c = fgetc(fp);
fseek(fp, -1, SEEK_CUR);
/* Read info */
while(EOF != c){
/* Go over the switch-to-switch routing tables until the requested
* table dedicated for the input switch_lid is found */
ret = fscanf(fp, "%s %" PRIx64 " %c", input_str[0], &guid, &c);
ret += fscanf(fp, "%s %s %hx %c", input_str[1], input_str[2], &source_lid, &c);
ret += fscanf(fp, "%s %s %d", input_str[4], input_str[3], &port);
c = fgetc(fp);
if(10 != ret)
{
rc = OMPI_ERR_FILE_READ_FAILURE;
return rc;
}
for(i = 0; i < 4; i++){
/* Validate the table header correctness */
if(strncmp(input_str[i], expected_str[i], strlen(input_str[i]))){
/* Incorrect table header */
rc = OMPI_ERROR;
return rc;
}
}
/* Get next line acording to the currect structure of the file */
fgets(str, MAX_LINE_LEN, fp);
if(strncmp(str, "# LID : SL : MTU : RATE", strlen(str) - 1)){
rc = OMPI_ERROR;
return rc;
}
/* Test if this is the requested table,
* dedicated for the input source switch lid */
if(source_lid != switch_lid){
/* Skip to next table */
while(EOF != c)
{
offset_in_table = ftell(fp);
fgets(str, MAX_LINE_LEN, fp);
if(!strncmp(str, "Switch", strlen("Switch"))){
/* Found new table found - start over */
fseek(fp, offset_in_table, SEEK_SET);
break;
}
/* Receive next charecter */
c = fgetc(fp);
fseek(fp, -1, SEEK_CUR);
}
if(EOF == c){
/* End-Of-File was met without
* finding the required routing table*/
rc = OMPI_ERROR;
}
}
else{
/* The right table was found */
while(EOF != c){
fgets(str, MAX_LINE_LEN, fp);
/* Test if a new table was found */
if(!strncmp(str, "Switch", strlen("Switch"))){
/* Quit the search - table was fully read */
return rc;
}
/* Still in the required switch route table */
else{
/* Check correcness of the data and update table size */
num_items = sscanf(str, "%hx %c %c", &dest_lid, &c, &service_level);
if(3 != num_items){
/* Failed to read input data / wrong input formate */
rc = OMPI_ERROR;
return rc;
}
(*hash_table_size)++;
/* Add the data to the list*/
item = (switch_to_switch_sl*)calloc(1, sizeof(switch_to_switch_sl));
if(NULL == item){
rc = OMPI_ERR_OUT_OF_RESOURCE;
return rc;
}
item->switch_lid = dest_lid;
item->service_level = service_level;
p_next_item = p_head->next;
p_head->next = item;
item->next = p_next_item;
}
/* Get next charecter */
c = fgetc(fp);
fseek(fp, -1, SEEK_CUR);
}
/* Set file descriptor to the beginning
* of the required table table */
fseek(fp, table_offset, SEEK_SET);
}
}
return rc;
}
/*
* Set port to switch hashtable according to data read from an input file.
* The hashtable Key is the port local ID (uint16_t).
* The hashtable Value is the local ID (uint16_t) of the switch connected to the port in the fabric.
*
* @Param hashtable - the hashtable to set.
* @Param hashtable_size - the number of hashtable elements.
* @Param head - Pointer to a linked list containing
* the pairs two be stored in the hashtable.
* @return - Error code. Non Zero value on error.
*/
static int set_port_to_switch_hash_table(opal_hash_table_t* hashtable, size_t hashtable_size, port_to_switch_lids** p_head)
{
int ret;
uint16_t key;
uint16_t* value = NULL;
unsigned int i;
int rc = OMPI_SUCCESS;
port_to_switch_lids* head = NULL;
port_to_switch_lids* p_item = NULL;
port_to_switch_lids* p_item_next = NULL;
if((NULL == p_head) || (NULL == *p_head)){
rc = OMPI_ERROR;
return rc;
}
head = *p_head;
for(i = 0; i < hashtable_size; i++){
/* Read pairs of port-lid and witch-lid from
* file and store them in the input hashtable */
value = (uint16_t*)calloc(1, sizeof(uint16_t));
if(NULL == value){
rc = OMPI_ERR_OUT_OF_RESOURCE;
return rc;
}
/* Get next pair to store */
p_item = head->next;
if(NULL == p_item){
rc = OMPI_ERROR;
return rc;
}
key = p_item->port_lid;
*value = p_item->switch_lid;
/* Remove item from list */
p_item_next = p_item->next;
head->next = p_item_next;
free(p_item);
/* Set the port to switch LIDS hashtable */
ret = opal_hash_table_set_value_ptr(hashtable, &key, sizeof(uint16_t), (void*)value);
if(OPAL_SUCCESS != ret){
OFACM_ERROR(("Failed to set port2switch hashtable\n"));
rc = OMPI_ERROR;
break;
}
}
free(*p_head);
*p_head = NULL;
return rc;
}
/*
* Set switch to switch hashtable according to data read from an input file.
* The hashtable Key is a switch local ID (uint16_t).
* The hashtable Value is the service level (uint8_t) of the route in the
* fabric between local switch LID (represented by key) and remote switch LID.
*
* @Param hashtable - The hashtable to set.
* @Param hashtable_size - The number of hashtable elements.
* @Param head - Pointer to a list of all the data
* pair to be inserted into the hashtable.
* @return - Error code. Non Zero value on error.
*/
static int set_switch_to_switch_hash_table(opal_hash_table_t* hashtable, size_t hashtable_size, switch_to_switch_sl** p_head)
{
uint16_t key; /* switch lid */
uint8_t* value = NULL;
unsigned int i;
int rc = OMPI_SUCCESS;
int ret = OMPI_SUCCESS;
switch_to_switch_sl* head = NULL;
switch_to_switch_sl* item = NULL;
switch_to_switch_sl* p_next_item = NULL;
if((NULL == p_head) || (NULL == *p_head)){
rc = OMPI_ERROR;
return rc;
}
head = *p_head;
/* Read pairs of remote switch (LID) and
* route service level (SL) from file
* and store the in the input hashtable */
for(i = 0; i < hashtable_size; i++)
{
value = (uint8_t*)calloc(1, sizeof(uint8_t));
if(NULL == value){
rc = OMPI_ERR_OUT_OF_RESOURCE;
return rc;
}
/* Get data from list */
item = head->next;
if(NULL == item){
rc = OMPI_ERROR;
return rc;
}
key = item->switch_lid;
*value = item->service_level;
/* Remove data item from list */
p_next_item = item->next;
head->next = p_next_item;
free(item);
ret = opal_hash_table_set_value_ptr(hashtable, &key, sizeof(uint16_t), value);
if(OPAL_SUCCESS != ret){
OFACM_ERROR(("Failed to set sw2sw hashtable\n"));
rc = OMPI_ERROR;
break;
}
}
free(*p_head);
*p_head = NULL;
return rc;
}
/*
* An efficient method that allows to find the service level of any
* any route from an input port to any other port in the fabric.
*
* Create two hashtables according to data read from an input file.
* The first table maps any port LID in the fabric to the LID of
* the switch it is connected to.
* The second table is dedicated to the switch LID to which the
* local port is connected.
*
* The table maps a remote switch LID to the service level
* of the route between the table's LID and this remote LID.
*
* @Param lid - the local ID of the port.
* @return - Error Code. Non Zero value in case of error.
*/
static int create_service_level_table_for_port(uint16_t lid, opal_hash_table_t* port_to_switch_hash_table,
opal_hash_table_t* switch_to_switch_hash_table)
{
FILE* fp = NULL;
uint16_t* switch_lid;
void* p_switch_lid = NULL;
int rc = OMPI_SUCCESS;
int ret = OMPI_SUCCESS;
int file_name_len;
char* switch_to_sl = NULL;
int port_to_switch_hash_table_size = 0;
int switch_to_switch_hash_table_size = 0;
port_to_switch_lids* port_switch_lids = NULL;
switch_to_switch_sl* switch_sl = NULL;
/* Open input configuration file */
fp = fopen(ompi_common_ofacm_three_dim_torus, "rt");
if(NULL == fp){
/* File Opening failed */
fprintf(stderr, "Failed to open the input file for the fabric's service level\n");
rc = OMPI_ERR_FILE_OPEN_FAILURE;
goto ERROR;
}
/* Get port-to-switch hashtable size */
rc = get_port_to_switch_hashtable_data_from_file(fp, &port_to_switch_hash_table_size, &port_switch_lids);
if(OMPI_SUCCESS != rc){
goto ERROR;
}
fclose(fp);
fp = NULL;
/* Build and initialize the port-to-swich hashtable */
OBJ_CONSTRUCT(port_to_switch_hash_table, opal_hash_table_t);
opal_hash_table_init(port_to_switch_hash_table, port_to_switch_hash_table_size);
/* Set the port-to-switch hashtable */
rc = set_port_to_switch_hash_table(port_to_switch_hash_table, port_to_switch_hash_table_size, &port_switch_lids);
if(OMPI_SUCCESS != rc){
goto ERROR;
}
/* Get the LID of the switch connected to the port's LID */
ret = opal_hash_table_get_value_ptr(port_to_switch_hash_table, &lid, sizeof(uint16_t), &p_switch_lid);
if(OPAL_SUCCESS != ret){
rc = OMPI_ERROR;
goto ERROR;
}
/* Open the file containing the mapping from switch-to-switch route to service level */
file_name_len = strlen(ompi_common_ofacm_three_dim_torus);
switch_to_sl = (char*)calloc(file_name_len + 7, sizeof(char));
if(NULL == switch_to_sl){
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto ERROR;
}
/* Build the switch-to-switch file name based on the port-to-switch file name */
strncpy(switch_to_sl, ompi_common_ofacm_three_dim_torus,
strlen(ompi_common_ofacm_three_dim_torus) - strlen("peer-paths.dump") - 1);
strcat(switch_to_sl, "-sw2sw-path-records.dump");
/* Open path-to-SL file */
fp = fopen(switch_to_sl, "rt");
if(NULL == fp){
/* File Opening failed */
fprintf(stderr, "Failed to open the input file for the fabric's service level\n");
rc = OMPI_ERR_FILE_OPEN_FAILURE;
goto ERROR;
}
free(switch_to_sl);
switch_lid = (uint16_t*)p_switch_lid;
rc = get_switch_to_switch_hashtable_size_from_file(fp, *(uint16_t*)switch_lid,
&switch_to_switch_hash_table_size, &switch_sl);
if(OMPI_SUCCESS != rc){
goto ERROR;
}
fclose(fp);
fp = NULL;
/* Build and initialize the switch-to-switch hashtable */
OBJ_CONSTRUCT(switch_to_switch_hash_table, opal_hash_table_t);
opal_hash_table_init(switch_to_switch_hash_table, switch_to_switch_hash_table_size);
/* Set the switch-to-switch hashtable */
rc = set_switch_to_switch_hash_table(switch_to_switch_hash_table,
switch_to_switch_hash_table_size, &switch_sl);
if(OMPI_SUCCESS != rc){
goto ERROR;
}
/* Use: opal_hash_table_get_value_uint64 */
return OMPI_SUCCESS;
ERROR:
/* Close open files */
if(NULL != fp){
fclose(fp);
}
/* Release allocated resources */
if(NULL != port_switch_lids){
port_to_switch_lids* p_list = port_switch_lids;
port_to_switch_lids* p_item = NULL;
while(p_list->next != NULL){
p_item = p_list->next;
if(NULL != p_item){
p_list->next = p_item->next;
free(p_item);
}
}
free(p_list);
}
if(NULL != switch_sl){
switch_to_switch_sl* p_list = switch_sl;
switch_to_switch_sl* p_item = NULL;
while(p_list->next != NULL){
p_item = p_list->next;
if(NULL != p_item){
p_list->next = p_item->next;
free(p_item);
}
}
free(p_list);
}
return rc;
}