1
1
openmpi/ompi/mca/common/ofacm/common_ofacm_xoob.c
Joshua Ladd 9ea9bec4ad Addressing Jeff's comments:
1. Changed rng_buff_t --> opal_rng_buff_t
2. All global variables obey the prefix rule
3. Old code has been removed 
4. Found a couple of unnecessary includes

Refs trac:4298

This commit was SVN r30807.

The following Trac tickets were found above:
  Ticket 4298 --> https://svn.open-mpi.org/trac/ompi/ticket/4298
2014-02-24 23:18:35 +00:00

1538 строки
60 KiB
C

/*
* Copyright (c) 2007-2012 Mellanox Technologies. All rights reserved.
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
* Copyright (c) 2013 NVIDIA Corporation. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "opal/runtime/opal_progress.h"
#include "opal/dss/dss.h"
#include "opal/util/alfg.h"
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
#include "ompi/mca/rte/rte.h"
#include "common_ofacm_xoob.h"
#include "opal/class/opal_hash_table.h"
#include "base.h"
#include "connect.h"
#include "ompi/constants.h"
#define SIZE_OF3(A, B, C) (sizeof(A) + sizeof(B) + sizeof(C))
#define BASE_TO_XOOB(context) (ompi_common_ofacm_xoob_local_connection_context_t *)context
#define XOOB_TO_BASE(xcontext) (ompi_common_ofacm_base_local_connection_context_t *)xcontext
static void xoob_component_register(void);
static int xoob_component_query(ompi_common_ofacm_base_dev_desc_t *dev,
ompi_common_ofacm_base_module_t **cpc);
static int xoob_component_finalize(void);
static int xoob_module_start_connect
(ompi_common_ofacm_base_local_connection_context_t *context);
static void xoob_ib_address_constructor(ofacm_ib_address_t *ib_addr);
static void xoob_ib_address_destructor(ofacm_ib_address_t *ib_addr);
OBJ_CLASS_INSTANCE(ofacm_ib_address_t,
opal_list_item_t,
xoob_ib_address_constructor,
xoob_ib_address_destructor);
/*
* The "component" struct -- the top-level function pointers for the
* xoob connection scheme.
*/
ompi_common_ofacm_base_component_t ompi_common_ofacm_xoob = {
"xoob",
/* Register */
xoob_component_register,
/* Init */
NULL,
/* Query */
xoob_component_query,
/* Finalize */
xoob_component_finalize,
};
typedef enum {
ENDPOINT_XOOB_CONNECT_REQUEST,
ENDPOINT_XOOB_CONNECT_RESPONSE,
ENDPOINT_XOOB_CONNECT_XRC_REQUEST,
ENDPOINT_XOOB_CONNECT_XRC_RESPONSE,
ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE /* The xrc recv qp already was destroyed */
} connect_message_type_t;
static int xoob_priority = 60;
static bool rml_recv_posted = false;
static opal_rng_buff_t rand_buff;
#define XOOB_SET_REMOTE_INFO(EP, INFO) \
do { \
/* copy the rem_info stuff */ \
EP.rem_lid = INFO.rem_lid; \
EP.rem_subnet_id = INFO.rem_subnet_id; \
EP.rem_mtu = INFO.rem_mtu; \
EP.rem_index = INFO.rem_index; \
memcpy((void*)EP.rem_qps, (void*)INFO.rem_qps, \
sizeof(mca_btl_openib_rem_qp_info_t)); \
/* copy the rem_info stuff */ \
memcpy((void*)EP.rem_srqs, (void*)INFO.rem_srqs, \
sizeof(mca_btl_openib_rem_srq_info_t) * \
mca_btl_openib_component.num_xrc_qps); \
} while (0)
/* Constructor destructor for xoob context. */
static void xoob_local_context_constructor
(ompi_common_ofacm_xoob_local_connection_context_t *context)
{
context->addr = NULL;
context->xrc_recv_psn = 0;
}
static void xoob_local_context_destructor
(ompi_common_ofacm_xoob_local_connection_context_t *context)
{
if(NULL != context->addr) {
OBJ_RELEASE(context->addr);
}
}
OBJ_CLASS_INSTANCE(ompi_common_ofacm_xoob_local_connection_context_t,
ompi_common_ofacm_base_local_connection_context_t,
xoob_local_context_constructor,
xoob_local_context_destructor);
static void xoob_pending_context_constructor(pending_context_t *pcontext)
{
pcontext->xcontext = NULL;
}
static void xoob_pending_context_destructor(pending_context_t *pcontext)
{
/* I have nothing to do !*/
}
static void xoob_pending_context_init(pending_context_t *pcontext,
ompi_common_ofacm_xoob_local_connection_context_t *xcontext)
{
pcontext->xcontext = xcontext;
}
OBJ_CLASS_INSTANCE(pending_context_t,
opal_list_item_t,
xoob_pending_context_constructor,
xoob_pending_context_destructor);
static void xoob_ib_address_constructor(ofacm_ib_address_t *ib_addr)
{
ib_addr->key = NULL;
ib_addr->subnet_id = 0;
ib_addr->lid = 0;
ib_addr->status = XOOB_ADDR_CLOSED;
ib_addr->qps = NULL;
OBJ_CONSTRUCT(&ib_addr->addr_lock, opal_mutex_t);
OBJ_CONSTRUCT(&ib_addr->pending_contexts, opal_list_t);
}
static void xoob_ib_address_destructor(ofacm_ib_address_t *ib_addr)
{
if(NULL != ib_addr->qps && NULL != ib_addr->qps[0].lcl_qp) {
if(ibv_destroy_qp(ib_addr->qps[0].lcl_qp)) {
OFACM_ERROR(("Failed to destroy QP:%d\n", 0));
}
}
if (NULL != ib_addr->key) {
free(ib_addr->key);
}
OBJ_DESTRUCT(&ib_addr->addr_lock);
OBJ_DESTRUCT(&ib_addr->pending_contexts);
}
static int xoob_ib_address_init(ofacm_ib_address_t *ib_addr, uint16_t lid, uint64_t s_id, ompi_jobid_t ep_jobid)
{
ib_addr->key = malloc(SIZE_OF3(s_id, lid, ep_jobid));
if (NULL == ib_addr->key) {
OFACM_ERROR(("Failed to allocate memory for key\n"));
return OMPI_ERROR;
}
memset(ib_addr->key, 0, SIZE_OF3(s_id, lid, ep_jobid));
/* creating the key = lid + s_id + ep_jobid */
memcpy(ib_addr->key, &lid, sizeof(lid));
memcpy((void*)((char*)ib_addr->key + sizeof(lid)), &s_id, sizeof(s_id));
memcpy((void*)((char*)ib_addr->key + sizeof(lid) + sizeof(s_id)),
&ep_jobid, sizeof(ep_jobid));
/* caching lid and subnet id */
ib_addr->subnet_id = s_id;
ib_addr->lid = lid;
return OMPI_SUCCESS;
}
/* Create new entry in hash table for subnet_id and lid,
* update the context pointer.
* Before call to this function you need to protect with
*/
static ofacm_ib_address_t* xoob_ib_address_add_new (ompi_common_ofacm_xoob_module_t *xcpc,
uint16_t lid, uint64_t s_id, ompi_jobid_t ep_jobid)
{
void *tmp;
int ret;
struct ofacm_ib_address_t *ib_addr = OBJ_NEW(ofacm_ib_address_t);
ret = xoob_ib_address_init(ib_addr, lid, s_id, ep_jobid);
if (OMPI_SUCCESS != ret ) {
OFACM_ERROR(("XRC Internal error. Failed to init ib_addr\n"));
OBJ_DESTRUCT(ib_addr);
return NULL;
}
/* is it already in the table ?*/
if (OPAL_SUCCESS != opal_hash_table_get_value_ptr(&xcpc->ib_addr_table,
ib_addr->key,
SIZE_OF3(s_id, lid, ep_jobid), &tmp)) {
/* It is new one, lets put it on the table */
ret = opal_hash_table_set_value_ptr(&xcpc->ib_addr_table,
ib_addr->key, SIZE_OF3(s_id, lid, ep_jobid), (void*)ib_addr);
if (OPAL_SUCCESS != ret) {
OFACM_ERROR(("XRC Internal error."
" Failed to add element to ib_addr_table\n"));
OBJ_DESTRUCT(ib_addr);
return NULL;
}
} else {
/* so we have this one in the table, just return the pointer */
OBJ_DESTRUCT(ib_addr);
ib_addr = (ofacm_ib_address_t *)tmp;
OBJ_RETAIN(ib_addr);
assert(lid == ib_addr->lid && s_id == ib_addr->subnet_id);
}
/* update the context with pointer to ib address */
return ib_addr;
}
static void xoob_connection_complete(ompi_common_ofacm_xoob_local_connection_context_t *xcontext)
{
bool master = false;
pending_context_t *pcon;
ompi_common_ofacm_base_local_connection_context_t *con;
ompi_common_ofacm_base_local_connection_context_t *context =
XOOB_TO_BASE(xcontext);
OFACM_VERBOSE(("Now we are CONNECTED"));
OPAL_THREAD_LOCK(&xcontext->addr->addr_lock);
if (XOOB_ADDR_CONNECTED == xcontext->addr->status) {
/* We are not xrc master */
/* set our qp pointer to master qp */
master = false;
} else {
/* I'm master of XRC */
xcontext->addr->status = XOOB_ADDR_CONNECTED;
master = true;
}
/* The status was moved down to cpc */
context->state = MCA_COMMON_OFACM_CONNECTED;
while(master && !opal_list_is_empty(&xcontext->addr->pending_contexts)) {
pcon = (pending_context_t *)opal_list_remove_first(&xcontext->addr->pending_contexts);
con = XOOB_TO_BASE(pcon->xcontext);
OBJ_RELEASE(pcon);
if (OMPI_SUCCESS !=
xoob_module_start_connect(con)) {
OFACM_ERROR(("Failed to connect pending endpoint\n"));
}
}
OPAL_THREAD_UNLOCK(&xcontext->addr->addr_lock);
context->connect_cb(context->user_context);
}
static int xoob_init_rem_info_alloc_qp(ompi_common_ofacm_base_remote_connection_context_t *rem_info)
{
rem_info->rem_qps = (ompi_common_ofacm_base_rem_qp_info_t *)
malloc(sizeof(ompi_common_ofacm_base_rem_qp_info_t));
if (NULL == rem_info->rem_qps) {
OFACM_ERROR(("Failed to allocate memory for remote QP data\n"));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
static int xoob_init_rem_info_alloc_srq(ompi_common_ofacm_base_remote_connection_context_t *rem_info, uint8_t num_srqs)
{
rem_info->rem_srqs = (ompi_common_ofacm_base_rem_srq_info_t*)
calloc(num_srqs, sizeof(ompi_common_ofacm_base_rem_srq_info_t));
if (NULL == rem_info->rem_srqs) {
OFACM_ERROR(("Failed to allocate memory for remote SRQ data\n"));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/* Free remote information structs */
static void xoob_free_rem_info(ompi_common_ofacm_base_remote_connection_context_t *rem_info)
{
if (NULL != rem_info->rem_qps) {
free(rem_info->rem_qps);
}
if (NULL != rem_info->rem_srqs) {
free(rem_info->rem_srqs);
}
}
static int xoob_set_remote_info(ompi_common_ofacm_xoob_local_connection_context_t *xcontext,
ompi_common_ofacm_base_remote_connection_context_t *remote_info)
{
ompi_common_ofacm_base_local_connection_context_t *context = XOOB_TO_BASE(xcontext);
/* If we got qp information - copy it */
if (NULL != remote_info->rem_qps) {
xoob_init_rem_info_alloc_qp(&context->remote_info);
memcpy(context->remote_info.rem_qps,
remote_info->rem_qps,
sizeof(ompi_common_ofacm_base_rem_qp_info_t));
}
if (NULL != remote_info->rem_srqs) {
xoob_init_rem_info_alloc_srq(&context->remote_info, context->num_of_srqs);
memcpy(context->remote_info.rem_srqs, remote_info->rem_srqs,
sizeof(ompi_common_ofacm_base_rem_srq_info_t)*context->num_of_srqs);
}
context->remote_info.rem_lid = remote_info->rem_lid;
context->remote_info.rem_subnet_id = remote_info->rem_subnet_id;
context->remote_info.rem_mtu = remote_info->rem_mtu;
context->remote_info.rem_index = remote_info->rem_index;
OFACM_VERBOSE(("Setting QP info, LID = %d", context->remote_info.rem_lid));
return OMPI_SUCCESS;
}
static void xoob_report_error(ompi_common_ofacm_xoob_local_connection_context_t *xcontext)
{
if (NULL == xcontext || NULL == (XOOB_TO_BASE(xcontext))->error_cb) {
/* The context is undefined and we can not print specific error */
opal_show_help("help-mpi-common-ofacm-oob.txt",
"ofacm oob fatal error", true,
ompi_process_info.nodename,
__FILE__, __LINE__);
exit(1);
}
/* Other way, call to user error callback */
(XOOB_TO_BASE(xcontext))->error_cb((XOOB_TO_BASE(xcontext))->user_context);
}
static int xoob_context_init(ompi_common_ofacm_xoob_local_connection_context_t *xcontext,
ompi_common_ofacm_xoob_module_t *xcpc,
ompi_common_ofacm_base_context_connect_cb_fn_t connect_cb,
ompi_common_ofacm_base_context_error_cb_fn_t error_cb,
ompi_common_ofacm_base_context_prepare_recv_cb_fn_t prepare_recv_cb,
ompi_common_ofacm_base_proc_t *proc,
ompi_common_ofacm_base_qp_config_t *qp_config,
struct ibv_pd *pd, uint64_t subnet_id, int cpc_type,
uint16_t lid, uint16_t rem_lid,
int32_t user_context_index, void *user_context)
{
int ret;
ompi_common_ofacm_base_local_connection_context_t *context =
XOOB_TO_BASE(xcontext);
ompi_common_ofacm_base_module_t *cpc =
(ompi_common_ofacm_base_module_t *)xcpc;
/* Set IB address for this context */
xcontext->addr = xoob_ib_address_add_new(xcpc, rem_lid, subnet_id, proc->proc_ompi->proc_name.jobid);
if (NULL == xcontext->addr) {
OFACM_ERROR(("Failed to allocate or found xoob ib address"));
return OMPI_ERROR;
}
/* Allocate memory for QPs */
if (NULL == xcontext->addr->qps) {
xcontext->addr->qps =
calloc(qp_config->num_qps, sizeof(ompi_common_ofacm_base_qp_t));
if(NULL == xcontext->addr->qps) {
OFACM_ERROR(("Failed to allocate memory for qps"));
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
/* Update QP pointers */
context->qps = xcontext->addr->qps;
/* Init base context */
ret = ompi_common_ofacm_base_context_init(context, cpc, connect_cb, error_cb,
prepare_recv_cb, proc, qp_config,
pd, subnet_id, cpc_type, lid, rem_lid, user_context_index, user_context);
if (OMPI_SUCCESS != ret) {
return ret;
}
return OMPI_SUCCESS;
}
/* XOOB connection context init */
static ompi_common_ofacm_base_local_connection_context_t*
xoob_endpoint_init(ompi_proc_t *proc,
ompi_common_ofacm_base_qp_config_t *qp_config,
struct ibv_pd *pd, uint64_t subnet_id, int cpc_type,
uint16_t lid, uint16_t rem_lid, int32_t user_context_index, void *user_context,
ompi_common_ofacm_base_module_t *cpc,
ompi_common_ofacm_base_context_connect_cb_fn_t connect_cb,
ompi_common_ofacm_base_context_error_cb_fn_t error_cb,
ompi_common_ofacm_base_context_prepare_recv_cb_fn_t prepare_recv_cb)
{
int ret;
bool new_proc;
ompi_common_ofacm_xoob_local_connection_context_t *xcontext;
ompi_common_ofacm_base_proc_t *context_proc;
ompi_common_ofacm_xoob_module_t *xcpc =
(ompi_common_ofacm_xoob_module_t *)cpc;
xcontext = (ompi_common_ofacm_xoob_local_connection_context_t*)
OBJ_NEW(ompi_common_ofacm_xoob_local_connection_context_t);
context_proc = ompi_common_ofacm_base_find_proc(&ompi_common_ofacm_xoob, proc);
if (NULL == context_proc) {
new_proc = true;
/* constructing new proc */
context_proc = (ompi_common_ofacm_base_proc_t *)
OBJ_NEW(ompi_common_ofacm_base_proc_t );
} else {
new_proc = false;
OBJ_RETAIN(context_proc);
}
OFACM_VERBOSE(("Xoob endpoint init: cpc_type %d, rem_lid %d, my_lid %d, subnet id %d",
cpc_type, rem_lid, lid, subnet_id));
ompi_common_ofacm_base_proc_setup(context_proc, XOOB_TO_BASE(xcontext), proc);
ret = xoob_context_init(xcontext, xcpc, connect_cb, error_cb,
prepare_recv_cb, context_proc, qp_config,
pd, subnet_id, cpc_type, lid, rem_lid, user_context_index, user_context);
if (OMPI_SUCCESS != ret) {
OBJ_DESTRUCT(context_proc);
OBJ_DESTRUCT(xcontext);
return NULL;
}
if(new_proc) {
opal_list_append(&ompi_common_ofacm_xoob.all_procs,
(opal_list_item_t *)context_proc);
}
return &xcontext->super;
}
static int xoob_endpoint_finalize
(ompi_common_ofacm_base_local_connection_context_t *context)
{
opal_list_item_t *proc_item, *cntx_item, *cntx_item_next;
opal_list_t *proc_list = &ompi_common_ofacm_xoob.all_procs;
ompi_common_ofacm_xoob_local_connection_context_t *xcontext;
/* Proc cleanup. We should find the context proc in all proc list and remove
* from the proc list our context. After it we try to release the proc context */
for (proc_item = opal_list_get_first(proc_list);
proc_item != opal_list_get_end(proc_list);
proc_item = opal_list_get_next(proc_item)) {
if (context->proc == ((ompi_common_ofacm_base_proc_t *)proc_item)){
ompi_common_ofacm_base_proc_t *proc =
(ompi_common_ofacm_base_proc_t *)proc_item;
opal_list_t *cntx_list = &proc->all_contexts;
/* Remove the context from proc list */
cntx_item = opal_list_get_first(cntx_list);
while(cntx_item != opal_list_get_end(cntx_list)) {
/* take the next before removing from the list */
cntx_item_next = opal_list_get_next(cntx_item);
if (context == (ompi_common_ofacm_base_local_connection_context_t *)cntx_item) {
opal_list_remove_item(cntx_list, cntx_item);
}
cntx_item = cntx_item_next;
}
/* Remove our proc from all list */
if (opal_list_is_empty(cntx_list)) {
opal_list_remove_item(proc_list, (opal_list_item_t *)proc);
}
OBJ_RELEASE(proc);
}
}
if (0 != context->xrc_recv_qp_num) {
if(ibv_unreg_xrc_rcv_qp(context->init_attr[0].xrc_domain,
context->xrc_recv_qp_num)) {
OFACM_ERROR(("Failed to unregister XRC recv QP:%d\n", context->xrc_recv_qp_num));
}
}
xcontext = BASE_TO_XOOB(context);
/* We done with proc release and now we way destroy the context */
OBJ_DESTRUCT(xcontext);
return OMPI_SUCCESS;
}
/*
* Callback when we have finished RML sending the connect data to a
* remote peer
*/
static void xoob_rml_send_cb(int status, ompi_process_name_t* context,
opal_buffer_t* buffer, ompi_rml_tag_t tag,
void* cbdata)
{
OBJ_RELEASE(buffer);
}
/* Receive connect information to remote context */
static int xoob_receive_connect_data(ompi_common_ofacm_base_remote_connection_context_t *info, uint16_t *lid, int *cpc_type,
uint8_t *message_type, opal_buffer_t* buffer)
{
int cnt = 1, rc, srq;
uint8_t num_srqs;
/* Recv standart header */
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT8));
rc = opal_dss.unpack(buffer, message_type, &cnt, OPAL_UINT8);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack Message type = %d", *message_type));
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT64));
rc = opal_dss.unpack(buffer, &info->rem_subnet_id, &cnt, OPAL_UINT64);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack sid = %d", info->rem_subnet_id));
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT16));
rc = opal_dss.unpack(buffer, &info->rem_lid, &cnt, OPAL_UINT16);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack lid = %d", info->rem_lid));
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_INT));
rc = opal_dss.unpack(buffer, cpc_type, &cnt, OPAL_INT);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack cpc_type = %d", *cpc_type));
/* Till now we got the standart header, now we continue to recieve data for
* different packet types
*/
if (ENDPOINT_XOOB_CONNECT_REQUEST == *message_type ||
ENDPOINT_XOOB_CONNECT_RESPONSE == *message_type) {
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &info->rem_qps->rem_qp_num, &cnt,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack remote qp = %x", info->rem_qps->rem_qp_num));
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &info->rem_qps->rem_psn, &cnt,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack remote psn = %d", info->rem_qps->rem_psn));
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &info->rem_mtu, &cnt, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack remote mtu = %d", info->rem_mtu));
}
if (ENDPOINT_XOOB_CONNECT_REQUEST == *message_type ||
ENDPOINT_XOOB_CONNECT_XRC_REQUEST == *message_type) {
/* unpack requested lid info */
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT16));
rc = opal_dss.unpack(buffer, lid, &cnt, OPAL_UINT16);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack requested lid = %d", *lid));
}
/* Unpack requested recv qp number */
if (ENDPOINT_XOOB_CONNECT_XRC_REQUEST == *message_type) {
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
/* In XRC request case we will use rem_qp_num as container for requested qp number */
rc = opal_dss.unpack(buffer, &info->rem_qps->rem_qp_num, &cnt,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Recv unpack requested qp = %x", info->rem_qps->rem_qp_num));
}
if (ENDPOINT_XOOB_CONNECT_RESPONSE == *message_type ||
ENDPOINT_XOOB_CONNECT_XRC_RESPONSE == *message_type) {
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT32));
rc = opal_dss.unpack(buffer, &info->rem_index, &cnt, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack remote index = %d", info->rem_index));
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT8));
rc = opal_dss.unpack(buffer, &num_srqs, &cnt, OPAL_UINT8);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack remote num of srqs = %d", num_srqs));
rc = xoob_init_rem_info_alloc_srq(info, num_srqs);
if (OMPI_SUCCESS != rc) {
return OMPI_ERROR;
}
for (srq = 0; srq < num_srqs; srq++) {
OFACM_VERBOSE(("unpacking %d of %d\n", cnt, OPAL_UINT8));
rc = opal_dss.unpack(buffer, &info->rem_srqs[srq].rem_srq_num, &cnt, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return OMPI_ERROR;
}
OFACM_VERBOSE(("Recv unpack remote index srq num[%d]= %d", srq, info->rem_srqs[srq].rem_srq_num));
}
}
return OMPI_SUCCESS;
}
/*
* send connect information to remote context
*/
static int xoob_send_connect_data(ompi_common_ofacm_xoob_local_connection_context_t* xcontext,
uint8_t message_type)
{
opal_buffer_t* buffer = OBJ_NEW(opal_buffer_t);
int rc, srq;
ompi_common_ofacm_base_local_connection_context_t *context = XOOB_TO_BASE(xcontext);
if (NULL == buffer) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Bulding standart header that we use in all messages:
* - Message type,
* - Our subnet id
* - Our LID
*/
/* pack the info in the send buffer */
OFACM_VERBOSE(("Send pack Message type = %d", message_type));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT8));
rc = opal_dss.pack(buffer, &message_type, 1, OPAL_UINT8);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Send pack sid = %d", context->subnet_id));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT64));
rc = opal_dss.pack(buffer, &context->subnet_id, 1, OPAL_UINT64);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Send pack lid = %d", context->lid));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT16));
rc = opal_dss.pack(buffer, &context->lid, 1, OPAL_UINT16);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Send pack cpc type = %d", context->cpc_type));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_INT));
rc = opal_dss.pack(buffer, &context->cpc_type, 1, OPAL_INT);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
/* Now we append to standart header additional information
* that is required for full (open qp,etc..) connect request and response:
* - qp_num of first qp
* - psn of first qp
* - MTU
*/
if (ENDPOINT_XOOB_CONNECT_REQUEST == message_type ||
ENDPOINT_XOOB_CONNECT_RESPONSE == message_type) {
uint32_t psn, qp_num;
if (ENDPOINT_XOOB_CONNECT_REQUEST == message_type) {
qp_num = context->qps[0].lcl_qp->qp_num;
psn = context->qps[0].lcl_psn;
} else {
qp_num = context->xrc_recv_qp_num;
psn = xcontext->xrc_recv_psn;
}
/* stuff all the QP info into the buffer */
/* we need to send only one QP */
OFACM_VERBOSE(("Send pack qp num = %x", qp_num));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &qp_num, 1, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Send pack lpsn = %d", psn));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &psn, 1, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Send pack mtu = %d", context->attr[0].path_mtu));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &context->attr[0].path_mtu, 1,
OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
}
/* We append to header above additional information
* that is required for full & XRC connect request:
* - The lid ob btl on remote site that we want to connect
*/
if (ENDPOINT_XOOB_CONNECT_REQUEST == message_type ||
ENDPOINT_XOOB_CONNECT_XRC_REQUEST == message_type) {
/* when we are sending request we add remote lid that we want to connect */
OFACM_VERBOSE(("Send pack remote lid = %d", context->rem_lid));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT16));
rc = opal_dss.pack(buffer, &context->rem_lid, 1, OPAL_UINT16);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
}
/* when we are sending xrc request we add remote
* recv qp number that we want to connect. */
if (ENDPOINT_XOOB_CONNECT_XRC_REQUEST == message_type) {
OFACM_VERBOSE(("Send pack remote qp = %x", xcontext->addr->remote_xrc_rcv_qp_num));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &xcontext->addr->remote_xrc_rcv_qp_num,
1, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
}
/* We append to header above additional information
* that is required for full & XRC connect response:
* - index of our context
* - array of xrc-srq numbers
*/
if (ENDPOINT_XOOB_CONNECT_RESPONSE == message_type ||
ENDPOINT_XOOB_CONNECT_XRC_RESPONSE == message_type) {
/* we need to send the context index for immidate send */
OFACM_VERBOSE(("Send pack index = %d", context->index));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &context->index, 1, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Send pack number of srqs = %d", context->num_of_srqs));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT8));
rc = opal_dss.pack(buffer, &context->num_of_srqs, 1, OPAL_UINT8);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
/* on response we add all SRQ numbers */
for (srq = 0; srq < context->num_of_srqs; srq++) {
OFACM_VERBOSE(("Send pack srq[%d] num = %d", srq, context->srq_num[srq]));
OFACM_VERBOSE(("packing %d of %d\n", 1, OPAL_UINT32));
rc = opal_dss.pack(buffer, &context->srq_num[srq],
1, OPAL_UINT32);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
}
}
/* send to remote endpoint */
rc = ompi_rte_send_buffer_nb(&context->proc->proc_ompi->proc_name,
buffer, OMPI_RML_TAG_XOFACM,
xoob_rml_send_cb, NULL);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
return rc;
}
OFACM_VERBOSE(("Send QP Info, LID = %d, SUBNET = %d, Message type = %d",
context->lid,
context->subnet_id,
message_type));
return OMPI_SUCCESS;
}
/* Create XRC send qp */
static int xoob_send_qp_create
(ompi_common_ofacm_xoob_local_connection_context_t* xcontext)
{
struct ibv_qp *qp;
struct ibv_qp_init_attr init_attr;
struct ibv_qp_attr attr;
int ret;
size_t req_inline;
uint32_t init_mask = 0;
ompi_common_ofacm_base_local_connection_context_t *context = XOOB_TO_BASE(xcontext);
/* Prepare QP structs */
memcpy(&init_attr, &context->init_attr[0], sizeof(init_attr));
req_inline = init_attr.cap.max_inline_data;
qp = ibv_create_qp(context->ib_pd, &init_attr);
if (NULL == qp) {
OFACM_ERROR(("Error creating QP, errno says: %s", strerror(errno)));
return OMPI_ERROR;
}
context->qps[0].lcl_qp = qp;
if (init_attr.cap.max_inline_data < req_inline) {
context->qps[0].ib_inline_max = init_attr.cap.max_inline_data;
opal_show_help("help-mpi-common-ofacm-cpc-base.txt",
"inline truncated", true, ompi_process_info.nodename,
req_inline, init_attr.cap.max_inline_data);
} else {
context->qps[0].ib_inline_max = req_inline;
}
memcpy(&attr, &context->attr[0], sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
init_mask = IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS;
/* applying user specified init mask */
if (NULL != context->custom_init_attr_mask) {
init_mask |= context->custom_init_attr_mask[0];
}
ret = ibv_modify_qp(qp, &attr, init_mask);
if (ret) {
OFACM_ERROR(("Error modifying QP[%x] to IBV_QPS_INIT errno says: %s [%d]",
qp->qp_num, strerror(ret), ret));
return OMPI_ERROR;
}
/* Setup meta data on the context */
context->qps[0].lcl_psn = opal_rand(&rand_buff) & 0xffffff;
/* Now that all the qp's are created locally, post some receive
buffers, setup credits, etc. */
return context->prepare_recv_cb(context->user_context);
}
/* Send qp connect */
static int xoob_send_qp_connect(ompi_common_ofacm_xoob_local_connection_context_t *xcontext)
{
struct ibv_qp* qp;
struct ibv_qp_attr attr;
uint32_t psn, rtr_mask = 0, rts_mask = 0;
int ret;
ompi_common_ofacm_base_local_connection_context_t *context = XOOB_TO_BASE(xcontext);
enum ibv_mtu mtu = (context->attr[0].path_mtu < context->remote_info.rem_mtu) ?
context->attr[0].path_mtu : context->remote_info.rem_mtu;
OFACM_VERBOSE(("Connecting Send QP\n"));
assert(NULL != context->qps);
qp = context->qps[0].lcl_qp;
psn = context->qps[0].lcl_psn;
memset(&attr, 0, sizeof(attr));
memcpy(&attr, context->attr, sizeof(struct ibv_qp_attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = mtu;
attr.dest_qp_num = context->remote_info.rem_qps[0].rem_qp_num;
attr.rq_psn = context->remote_info.rem_qps[0].rem_psn;
attr.ah_attr.dlid = context->remote_info.rem_lid;
attr.ah_attr.static_rate = 0;
rtr_mask = IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_MIN_RNR_TIMER;
/* applying user specified rtr mask */
if (NULL != context->custom_rtr_attr_mask) {
rtr_mask |= context->custom_rtr_attr_mask[0];
}
OFACM_VERBOSE(("Set MTU to IBV value %d (%s bytes)", attr.path_mtu,
(attr.path_mtu == IBV_MTU_256) ? "256" :
(attr.path_mtu == IBV_MTU_512) ? "512" :
(attr.path_mtu == IBV_MTU_1024) ? "1024" :
(attr.path_mtu == IBV_MTU_2048) ? "2048" :
(attr.path_mtu == IBV_MTU_4096) ? "4096" :
"unknown (!)"));
ret = ibv_modify_qp(qp, &attr, rtr_mask);
if (ret) {
OFACM_ERROR(("Error modifying QP[%x] to IBV_QPS_RTR errno says: %s [%d]",
qp->qp_num, strerror(ret), ret));
return OMPI_ERROR;
}
attr.qp_state = IBV_QPS_RTS;
attr.sq_psn = context->qps[0].lcl_psn;
/* applying user specified rts mask */
rts_mask = IBV_QP_STATE |
IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC;
/* applying user specified rts mask */
if (NULL != context->custom_rts_attr_mask) {
rts_mask |= context->custom_rts_attr_mask[0];
}
ret = ibv_modify_qp(qp, &attr, rts_mask);
if (ret) {
OFACM_ERROR(("Error modifying QP[%x] to IBV_QPS_RTS errno says: %s [%d]",
qp->qp_num, strerror(ret), ret));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/* Recv qp create */
static int xoob_recv_qp_create(ompi_common_ofacm_xoob_local_connection_context_t *xcontext,
ompi_common_ofacm_base_remote_connection_context_t *remote_info)
{
struct ibv_qp_init_attr init_attr;
struct ibv_qp_attr attr;
int ret;
uint32_t init_mask = 0, rtr_mask = 0;
struct ibv_xrc_domain *xrc_domain;
ompi_common_ofacm_base_local_connection_context_t *context = XOOB_TO_BASE(xcontext);
enum ibv_mtu mtu = (context->attr[0].path_mtu < remote_info->rem_mtu) ?
context->attr[0].path_mtu : remote_info->rem_mtu;
OFACM_VERBOSE(("Connecting Recv QP\n"));
memcpy(&init_attr, &context->init_attr[0], sizeof(init_attr));
xrc_domain = init_attr.xrc_domain;
/* Only xrc_domain is required, all other are ignored */
ret = ibv_create_xrc_rcv_qp(&init_attr, &context->xrc_recv_qp_num);
if (ret) {
OFACM_ERROR(("Error creating XRC recv QP[%x], errno says: %s [%d]",
context->xrc_recv_qp_num, strerror(ret), ret));
return OMPI_ERROR;
}
memcpy(&attr, &context->attr[0], sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
init_mask = IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS;
/* applying user specified init mask */
if (NULL != context->custom_init_attr_mask) {
init_mask |= context->custom_init_attr_mask[0];
}
ret = ibv_modify_xrc_rcv_qp(xrc_domain, context->xrc_recv_qp_num,
&attr, init_mask);
if (ret) {
OFACM_ERROR(("Error modifying XRC recv QP[%x] to IBV_QPS_INIT, errno says: %s [%d]",
context->xrc_recv_qp_num, strerror(ret), ret));
return OMPI_ERROR;
}
memcpy(&attr, &context->attr[0], sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = mtu;
attr.dest_qp_num = remote_info->rem_qps[0].rem_qp_num;
attr.rq_psn = remote_info->rem_qps[0].rem_psn;
attr.ah_attr.dlid = remote_info->rem_lid;
attr.ah_attr.static_rate = 0;
rtr_mask = IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC|
IBV_QP_MIN_RNR_TIMER;
/* applying user specified rtr mask */
if (NULL != context->custom_rtr_attr_mask) {
rtr_mask |= context->custom_rtr_attr_mask[0];
}
ret = ibv_modify_xrc_rcv_qp(xrc_domain, context->xrc_recv_qp_num,
&attr, rtr_mask);
if (ret) {
OFACM_ERROR(("Error modifying XRC recv QP[%x] to IBV_QPS_RTR, errno says: %s [%d]",
context->xrc_recv_qp_num, strerror(ret), ret));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/* Recv qp connect */
static int xoob_recv_qp_connect(ompi_common_ofacm_xoob_local_connection_context_t *xcontext,
ompi_common_ofacm_base_remote_connection_context_t *rem_info)
{
int ret;
ompi_common_ofacm_base_local_connection_context_t *context = XOOB_TO_BASE(xcontext);
struct ibv_xrc_domain *xrc_domain = context->init_attr[0].xrc_domain;
OFACM_VERBOSE(("Connecting Recv QP\n"));
ret = ibv_reg_xrc_rcv_qp(xrc_domain, rem_info->rem_qps->rem_qp_num);
if (ret) { /* failed to regester the qp, so it is already die and we should create new one */
/* Return NOT READY !!!*/
OFACM_ERROR(("Failed to register qp_num: %d , get error: %s (%d)\n. Replying with RNR",
rem_info->rem_qps->rem_qp_num, strerror(ret), ret));
return OMPI_ERROR;
} else {
/* save the qp number for unregister */
context->xrc_recv_qp_num = rem_info->rem_qps->rem_qp_num;
return OMPI_SUCCESS;
}
}
/*
* Reply to a `start - connect' message
*/
static int xoob_reply_first_connect(ompi_common_ofacm_xoob_local_connection_context_t *xcontext,
ompi_common_ofacm_base_remote_connection_context_t *remote_info)
{
int rc;
ompi_common_ofacm_base_local_connection_context_t *context =
XOOB_TO_BASE(xcontext);
OFACM_VERBOSE(("Initialized QPs, LID = %d", (XOOB_TO_BASE(xcontext))->lid));
/* Create local QP's and post receive resources */
if (OMPI_SUCCESS != (rc = xoob_recv_qp_create(xcontext, remote_info))) {
return rc;
}
/* prepost data on receiver site */
if (OMPI_SUCCESS != (rc = context->prepare_recv_cb(context->user_context))) {
OFACM_ERROR(("Failed to post on XRC SRQs"));
xoob_report_error(xcontext);
return rc;
}
if (OMPI_SUCCESS !=
(rc = xoob_send_connect_data(xcontext, ENDPOINT_XOOB_CONNECT_RESPONSE))) {
OFACM_ERROR(("Error in send connect request error code is %d",
rc));
return rc;
}
return OMPI_SUCCESS;
}
/* Find context for specific subnet/lid/message/cpc type */
static ompi_common_ofacm_xoob_local_connection_context_t* xoob_find_context
(ompi_process_name_t* process_name, uint64_t subnet_id,
uint16_t lid, uint8_t message_type, int cpc_type)
{
ompi_common_ofacm_xoob_local_connection_context_t *xcontext = NULL;
ompi_common_ofacm_base_proc_t *context_proc = NULL;
bool found = false;
opal_list_t *all_procs =
&ompi_common_ofacm_xoob.all_procs;
OFACM_VERBOSE(("Searching for ep and proc with follow parameters:"
"jobid %d, vpid %d, sid %d, lid %d, cpc type %d",
process_name->jobid, process_name->vpid, subnet_id, lid, cpc_type));
/* find ibproc */
for (context_proc = (ompi_common_ofacm_base_proc_t*)opal_list_get_first(all_procs);
context_proc != (ompi_common_ofacm_base_proc_t*)opal_list_get_end(all_procs);
context_proc = (ompi_common_ofacm_base_proc_t*)opal_list_get_next(context_proc)) {
if (ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
&context_proc->proc_ompi->proc_name, process_name) == OPAL_EQUAL) {
found = true;
break;
}
}
/* we found our context_proc, lets find context now */
if (found) {
opal_list_t *context_list = &context_proc->all_contexts;
ompi_common_ofacm_base_local_connection_context_t *context;
for (context = (ompi_common_ofacm_base_local_connection_context_t *)
opal_list_get_first(context_list);
context != (ompi_common_ofacm_base_local_connection_context_t *)
opal_list_get_end(context_list);
context = (ompi_common_ofacm_base_local_connection_context_t *)
opal_list_get_next(context)) {
/* we need to check different
* lid for different message type */
if (ENDPOINT_XOOB_CONNECT_RESPONSE == message_type ||
ENDPOINT_XOOB_CONNECT_XRC_RESPONSE == message_type) {
/* response message */
if (context->subnet_id == subnet_id &&
context->rem_lid == lid) {
xcontext = BASE_TO_XOOB(context);
break; /* Found one */
}
} else {
/* request message */
if (context->subnet_id == subnet_id &&
context->lid == lid) {
xcontext = BASE_TO_XOOB(context);
break; /* Found one */
}
}
}
if (NULL == xcontext) {
OFACM_ERROR(("can't find suitable context for this peer\n"));
}
} else {
OFACM_ERROR(("can't find suitable context for this peer\n"));
}
return xcontext;
}
/* In case if XRC recv qp was closed and sender still don't know about it
* we need close the qp, reset the ib_adrr status to CLOSED and start everything
* from scratch.
*/
static void xoob_restart_connect
(ompi_common_ofacm_xoob_local_connection_context_t *xcontext)
{
ompi_common_ofacm_base_local_connection_context_t *context =
XOOB_TO_BASE(xcontext);
OFACM_VERBOSE(("Restarting the connection for the context"));
OPAL_THREAD_LOCK(&xcontext->addr->addr_lock);
switch (xcontext->addr->status) {
case XOOB_ADDR_CONNECTED:
/* so we have the send qp, we just need the recive site.
* Send request for SRQ numbers */
OFACM_VERBOSE(("Restart The IB addr: sid %d lid %d"
"in XOOB_ADDR_CONNECTED status,"
" Changing to XOOB_ADDR_CLOSED and starting from scratch\n",
context->subnet_id, context->lid));
/* Switching back to closed and starting from scratch */
xcontext->addr->status = XOOB_ADDR_CLOSED;
/* destroy the qp */
if(ibv_destroy_qp(context->qps[0].lcl_qp))
OFACM_ERROR(("Failed to destroy QP"));
case XOOB_ADDR_CLOSED:
case XOOB_ADDR_CONNECTING:
OFACM_VERBOSE(("Restart The IB addr: sid %d lid %d"
"in XOOB_ADDR_CONNECTING or XOOB_ADDR_CLOSED status,"
" starting from scratch\n",
context->subnet_id, context->lid));
OPAL_THREAD_UNLOCK(&xcontext->addr->addr_lock);
/* xoob_module_start_connect() should automaticly handle all other cases */
if (OMPI_SUCCESS != xoob_module_start_connect(XOOB_TO_BASE(xcontext)))
OFACM_ERROR(("Failed to restart connection from XOOB_ADDR_CONNECTING/CLOSED"));
break;
default :
OFACM_ERROR(("Invalid context status %d", xcontext->addr->status));
OPAL_THREAD_UNLOCK(&xcontext->addr->addr_lock);
}
}
/*
* Non blocking RML recv callback. Read incoming QP and other info,
* and if this endpoint is trying to connect, reply with our QP info,
* otherwise try to modify QP's and establish reliable connection
*/
static void xoob_rml_recv_cb(int status, ompi_process_name_t* process_name,
opal_buffer_t* buffer, ompi_rml_tag_t tag,
void* cbdata)
{
int rc;
uint8_t message_type;
uint16_t requested_lid = 0;
int cpc_type = -1;
ompi_common_ofacm_base_local_connection_context_t *context;
ompi_common_ofacm_xoob_local_connection_context_t *xcontext;
ompi_common_ofacm_base_remote_connection_context_t remote_info;
/* Init remote info */
memset(&remote_info, 0,
sizeof(ompi_common_ofacm_base_remote_connection_context_t));
if ( OMPI_SUCCESS != xoob_init_rem_info_alloc_qp(&remote_info)) {
return;
}
/* Get data. */
if ( OMPI_SUCCESS !=
xoob_receive_connect_data(&remote_info, &requested_lid, &cpc_type, &message_type, buffer)) {
OFACM_ERROR(("Failed to read data\n"));
xoob_report_error(NULL);
return;
}
/* Processing message */
switch (message_type) {
case ENDPOINT_XOOB_CONNECT_REQUEST:
OFACM_VERBOSE(("Received ENDPOINT_XOOB_CONNECT_REQUEST: lid %d, sid %d, rlid %d\n",
remote_info.rem_lid,
remote_info.rem_subnet_id,
requested_lid));
xcontext = xoob_find_context(process_name,remote_info.rem_subnet_id,
requested_lid, message_type, cpc_type);
if ( NULL == xcontext) {
OFACM_ERROR(("Got ENDPOINT_XOOB_CONNECT_REQUEST."
" Failed to find context with subnet %d and LID %d",
remote_info.rem_subnet_id, requested_lid));
xoob_free_rem_info(&remote_info);
xoob_report_error(xcontext);
return;
}
context = XOOB_TO_BASE(xcontext);
OPAL_THREAD_LOCK(&context->context_lock);
/* we should create qp and send the info + srq to requestor */
rc = xoob_reply_first_connect(xcontext, &remote_info);
if (OMPI_SUCCESS != rc) {
OFACM_ERROR(("error in context reply start connect"));
xoob_free_rem_info(&remote_info);
xoob_report_error(xcontext);
return;
}
/* enable pooling for this btl */
OPAL_THREAD_UNLOCK(&context->context_lock);
break;
case ENDPOINT_XOOB_CONNECT_XRC_REQUEST:
OFACM_VERBOSE(("Received ENDPOINT_XOOB_CONNECT_XRC_REQUEST: lid %d, sid %d\n",
remote_info.rem_lid,
remote_info.rem_subnet_id));
xcontext = xoob_find_context(process_name, remote_info.rem_subnet_id,
requested_lid, message_type, cpc_type);
if (NULL == xcontext) {
OFACM_ERROR(("Got ENDPOINT_XOOB_CONNECT_XRC_REQUEST."
" Failed to find context with subnet %d and LID %d",
remote_info.rem_subnet_id, requested_lid));
xoob_free_rem_info(&remote_info);
xoob_report_error(xcontext);
return;
}
context = XOOB_TO_BASE(xcontext);
if (OMPI_SUCCESS == xoob_recv_qp_connect(xcontext, &remote_info)) {
if (OMPI_SUCCESS != context->prepare_recv_cb(context->user_context)) {
OFACM_ERROR(("Failed to post on XRC SRQs"));
xoob_free_rem_info(&remote_info);
xoob_report_error(xcontext);
return;
}
OPAL_THREAD_LOCK(&context->context_lock);
rc = xoob_send_connect_data(xcontext, ENDPOINT_XOOB_CONNECT_XRC_RESPONSE);
if (OMPI_SUCCESS != rc) {
OFACM_ERROR(("error in context reply start connect"));
xoob_free_rem_info(&remote_info);
xoob_report_error(xcontext);
return;
}
OPAL_THREAD_UNLOCK(&context->context_lock);
} else {
/* The XRC recv qp was destroyed */
OPAL_THREAD_LOCK(&context->context_lock);
rc = xoob_send_connect_data(xcontext, ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE);
if (OMPI_SUCCESS != rc) {
OFACM_ERROR(("error in context reply start connect"));
xoob_free_rem_info(&remote_info);
xoob_report_error(xcontext);
return;
}
OPAL_THREAD_UNLOCK(&context->context_lock);
}
break;
case ENDPOINT_XOOB_CONNECT_RESPONSE:
OFACM_VERBOSE(("Received ENDPOINT_XOOB_CONNECT_RESPONSE: lid %d, sid %d\n",
remote_info.rem_lid,
remote_info.rem_subnet_id));
xcontext = xoob_find_context(process_name, remote_info.rem_subnet_id,
remote_info.rem_lid, message_type, cpc_type);
if (NULL == xcontext) {
OFACM_ERROR(("Got ENDPOINT_XOOB_CONNECT_RESPONSE."
" Failed to find context with subnet %d and LID %d",
remote_info.rem_subnet_id, remote_info.rem_lid));
xoob_free_rem_info(&remote_info);
xoob_report_error(xcontext);
return;
}
context = XOOB_TO_BASE(xcontext);
OPAL_THREAD_LOCK(&context->context_lock);
/* we got all the data srq. switch the context to connect mode */
xoob_set_remote_info(xcontext, &remote_info);
/* update ib_addr with remote qp number */
xcontext->addr->remote_xrc_rcv_qp_num =
remote_info.rem_qps->rem_qp_num;
OFACM_VERBOSE(("rem_info: lid %d, sid %d ep %d %d",
remote_info.rem_lid,
remote_info.rem_subnet_id,
context->remote_info.rem_lid,
context->remote_info.rem_subnet_id));
if (OMPI_SUCCESS != xoob_send_qp_connect(xcontext)) {
OFACM_ERROR(("Failed to connect context\n"));
xoob_free_rem_info(&remote_info);
xoob_report_error(xcontext);
return;
}
xoob_connection_complete(xcontext);
OPAL_THREAD_UNLOCK(&context->context_lock);
break;
case ENDPOINT_XOOB_CONNECT_XRC_RESPONSE:
OFACM_VERBOSE(("Received ENDPOINT_XOOB_CONNECT_XRC_RESPONSE: lid %d, sid %d\n",
remote_info.rem_lid,
remote_info.rem_subnet_id));
xcontext = xoob_find_context(process_name, remote_info.rem_subnet_id,
remote_info.rem_lid, message_type, cpc_type);
if ( NULL == xcontext) {
OFACM_ERROR(("Got ENDPOINT_XOOB_CONNECT_XRC_RESPONSE."
" Failed to find context with subnet %d and LID %d",
remote_info.rem_subnet_id, remote_info.rem_lid));
xoob_report_error(xcontext);
return;
}
context = XOOB_TO_BASE(xcontext);
OPAL_THREAD_LOCK(&context->context_lock);
/* we got srq numbers on our request */
xoob_set_remote_info(xcontext, &remote_info);
xoob_connection_complete(xcontext);
OPAL_THREAD_UNLOCK(&context->context_lock);
break;
case ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE:
/* The XRC recv site already was destroyed so we need
* start to bringup the connection from scratch */
OFACM_VERBOSE(("Received ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE: lid %d, sid %d\n",
remote_info.rem_lid,
remote_info.rem_subnet_id));
xcontext = xoob_find_context(process_name, remote_info.rem_subnet_id,
remote_info.rem_lid, message_type, cpc_type);
if ( NULL == xcontext) {
OFACM_ERROR(("Got ENDPOINT_XOOB_CONNECT_XRC_NR_RESPONSE."
" Failed to find context with subnet %d and LID %d",
remote_info.rem_subnet_id, remote_info.rem_lid));
xoob_report_error(xcontext);
return;
}
xoob_restart_connect(xcontext);
break;
default :
OFACM_ERROR(("Invalid message type %d", message_type));
}
xoob_free_rem_info(&remote_info);
}
/*
* XOOB interface functions
*/
/* Quere for the XOOB priority - will be highest in XRC case */
static int xoob_component_query(ompi_common_ofacm_base_dev_desc_t *dev,
ompi_common_ofacm_base_module_t **cpc)
{
ompi_common_ofacm_xoob_module_t *xcpc; /* xoob cpc module */
ompi_common_ofacm_base_module_t *bcpc; /* base cpc module */
if (xoob_priority > 100) {
xoob_priority = 100;
} else if (xoob_priority < -1) {
xoob_priority = -1;
}
if (!(dev->capabilities & OMPI_COMMON_OFACM_XRC_ONLY)) {
OFACM_VERBOSE(("openib BTL: xoob CPC only supported with XRC receive queues; skipped on device %s",
ibv_get_device_name(dev->ib_dev)));
return OMPI_ERR_NOT_SUPPORTED;
}
xcpc = malloc(sizeof(ompi_common_ofacm_xoob_module_t));
if (NULL == xcpc) {
OFACM_VERBOSE(("openib BTL: xoob CPC system error (malloc failed)"));
return OMPI_ERR_OUT_OF_RESOURCE;
}
bcpc = &xcpc->super;
/* If this btl supports XOOB, then post the RML message. But
ensure to only post it *once*, because another btl may have
come in before this and already posted it. */
if (!rml_recv_posted) {
ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_XOFACM,
OMPI_RML_PERSISTENT,
xoob_rml_recv_cb,
NULL);
rml_recv_posted = true;
}
OBJ_CONSTRUCT(&ompi_common_ofacm_xoob.all_procs, opal_list_t);
bcpc->data.cbm_component = &ompi_common_ofacm_xoob;
bcpc->data.cbm_priority = xoob_priority;
bcpc->data.cbm_modex_message = NULL;
bcpc->data.cbm_modex_message_len = 0;
bcpc->cbm_endpoint_init = xoob_endpoint_init;
bcpc->cbm_start_connect = xoob_module_start_connect;
bcpc->cbm_endpoint_finalize = xoob_endpoint_finalize;
bcpc->cbm_finalize = NULL;
bcpc->cbm_uses_cts = false;
/* seed RNG */
opal_srand(&rand_buff,(uint32_t)(getpid()));
/* Build our hash table for subnetid-lid */
OBJ_CONSTRUCT(&xcpc->ib_addr_table, opal_hash_table_t);
assert(ompi_process_info.num_procs > 1);
if(NULL == xcpc->ib_addr_table.ht_table) {
if(OPAL_SUCCESS != opal_hash_table_init(
&xcpc->ib_addr_table, ompi_process_info.num_procs)) {
OFACM_ERROR(("XRC internal error. Failed to allocate ib_table"));
return OMPI_ERROR;
}
}
*cpc = bcpc;
OFACM_VERBOSE(("openib BTL: xoob CPC available for use on %s",
ibv_get_device_name(dev->ib_dev)));
return OMPI_SUCCESS;
}
/* Open - this functions sets up any xoob specific commandline params */
static void xoob_component_register(void)
{
xoob_priority = 60;
(void) mca_base_var_register("ompi", "common", "ofacm", "connect_xoob_priority",
"The selection method priority for xoob",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &xoob_priority);
}
/*
* Connect function. Start initiation of connections to a remote
* peer. We send our Queue Pair information over the RML/OOB
* communication mechanism. On completion of our send, a send
* completion handler is called.
*/
static int xoob_module_start_connect
(ompi_common_ofacm_base_local_connection_context_t *context)
{
int rc = OMPI_SUCCESS;
ompi_common_ofacm_xoob_local_connection_context_t *xcontext =
(ompi_common_ofacm_xoob_local_connection_context_t *)context;
pending_context_t *pcontext;
OPAL_THREAD_LOCK(&xcontext->addr->addr_lock);
switch (xcontext->addr->status) {
case XOOB_ADDR_CLOSED:
OFACM_VERBOSE(("The IB addr: sid %d lid %d"
"in XOOB_ADDR_CLOSED status,"
" sending ENDPOINT_XOOB_CONNECT_REQUEST\n",
xcontext->addr->subnet_id, xcontext->addr->lid));
if (OMPI_SUCCESS != (rc = xoob_send_qp_create(xcontext))) {
break;
}
/* Send connection info over to remote endpoint */
xcontext->super.state = MCA_COMMON_OFACM_CONNECTING;
xcontext->addr->status = XOOB_ADDR_CONNECTING;
if (OMPI_SUCCESS !=
(rc = xoob_send_connect_data(xcontext, ENDPOINT_XOOB_CONNECT_REQUEST))) {
OFACM_ERROR(("Error sending connect request, error code %d", rc));
}
break;
case XOOB_ADDR_CONNECTING:
OFACM_VERBOSE(("The IB addr: sid %d lid %d"
"in XOOB_ADDR_CONNECTING status,"
" Subscribing to this address\n",
xcontext->addr->subnet_id, xcontext->addr->lid));
pcontext = OBJ_NEW(pending_context_t);
xoob_pending_context_init(pcontext, xcontext);
/* some body already connectng to this machine, lets wait */
opal_list_append(&xcontext->addr->pending_contexts,
(opal_list_item_t *)pcontext);
xcontext->super.state = MCA_COMMON_OFACM_CONNECTING;
break;
case XOOB_ADDR_CONNECTED:
/* so we have the send qp, we just need the recive site.
* Send request for SRQ numbers */
OFACM_VERBOSE(("The IB addr: sid %d lid %d"
"in XOOB_ADDR_CONNECTED status,"
" sending ENDPOINT_XOOB_CONNECT_XRC_REQUEST\n",
context->subnet_id, context->lid));
xcontext->super.state = MCA_COMMON_OFACM_CONNECTING;
if (OMPI_SUCCESS !=
(rc = xoob_send_connect_data(xcontext, ENDPOINT_XOOB_CONNECT_XRC_REQUEST))) {
OFACM_ERROR(("error sending xrc connect request, error code %d", rc));
}
break;
default :
OFACM_ERROR(("Invalid context status %d", xcontext->addr->status));
}
OPAL_THREAD_UNLOCK(&xcontext->addr->addr_lock);
return rc;
}
/*
* Finalize function. Cleanup RML non-blocking receive.
*/
static int xoob_component_finalize(void)
{
if (rml_recv_posted) {
ompi_rte_recv_cancel(OMPI_NAME_WILDCARD, OMPI_RML_TAG_XOFACM);
rml_recv_posted = false;
}
return OMPI_SUCCESS;
}