1
1
openmpi/ompi/mca/sbgp/ibnet/sbgp_ibnet_module.c

1030 строки
36 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
*/
#include "ompi_config.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/sbgp/ibnet/sbgp_ibnet.h"
#include "ompi/mca/common/ofacm/base.h"
#include "ompi/mca/common/ofacm/connect.h"
#include "ompi/patterns/comm/coll_ops.h"
/*
* Unused
static int ibnet_module_enable(mca_sbgp_base_module_t *module,
struct ompi_communicator_t *comm);
*/
/*
* Local functions
*/
static void
mca_sbgp_ibnet_module_construct(mca_sbgp_ibnet_module_t *module)
{
module->cgroups = NULL;
module->group_id = 0;
}
static void
mca_sbgp_ibnet_module_destruct(mca_sbgp_ibnet_module_t *module)
{
}
OBJ_CLASS_INSTANCE(mca_sbgp_ibnet_module_t,
mca_sbgp_base_module_t,
mca_sbgp_ibnet_module_construct,
mca_sbgp_ibnet_module_destruct);
static void
mca_sbgp_ibnet_proc_construct(mca_sbgp_ibnet_proc_t *proc)
{
/* done */
proc->ompi_proc = 0;
proc->num_ports = 0;
proc->use_port = NULL;
proc->remote_ports_info = NULL;
proc->duty = MCA_SBGP_IBNET_NONE;
}
static void
mca_sbgp_ibnet_proc_destruct(mca_sbgp_ibnet_proc_t *proc)
{
/* done */
if (NULL != proc->remote_ports_info) {
free(proc->remote_ports_info);
/* Pasha: need to check if we need
* to release some data from inside of the proc*/
}
if (NULL != proc->use_port) {
free(proc->use_port);
}
}
OBJ_CLASS_INSTANCE(mca_sbgp_ibnet_proc_t,
opal_list_item_t,
mca_sbgp_ibnet_proc_construct,
mca_sbgp_ibnet_proc_destruct);
/* Pack all data to gather buffer */
static int pack_gather_sbuff(char* sbuffer)
{
int port, cpc;
coll_offload_support coll_offload_flag = OFFLOAD_CONNECTX_B0; /**< Pasha: add query for collectives offload support */
char* pack_ptr = sbuffer;
mca_sbgp_ibnet_device_t *device = NULL;
uint32_t my_rank = ompi_process_info.my_name.vpid;
opal_list_t *devices = &mca_sbgp_ibnet_component.devices;
/* Message format:
* - my rank (uint32_t)
* - number of active ports (uint32_t)
* - for each active port:
* + lid (uint16_t)
* + subnetid (uint64_t)
* + mtu (uint32_t)
* + colloffload (uint8_t)
* + num of cpcs (uint8_t)
* + for each cpc: (uint8_t)
* * cpc index (uint8_t)
* * cpc priority (uint8_t)
* * cpc buffer len (uint8_t)
* * cpc buffer (byte * buffer_len)
*
*/
/* Start to put data */
/* Pack my rank , I need it because allgather doesn't work as expected */
IBNET_VERBOSE(10, ("Send pack rank = %d\n", my_rank));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint32_t)));
memcpy(pack_ptr, &my_rank, sizeof(uint32_t));
pack_ptr += sizeof(uint32_t);
/* Put number of ports that we send */
IBNET_VERBOSE(10, ("Send pack num of ports = %d\n", mca_sbgp_ibnet_component.total_active_ports));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint32_t)));
memcpy(pack_ptr, &mca_sbgp_ibnet_component.total_active_ports, sizeof(uint32_t));
pack_ptr += sizeof(uint32_t);
/* Go through list of device and build the message*/
for (device = (mca_sbgp_ibnet_device_t *) opal_list_get_first(devices);
device != (mca_sbgp_ibnet_device_t *) opal_list_get_end(devices);
device = (mca_sbgp_ibnet_device_t *) opal_list_get_next((opal_list_item_t *)device)) {
for (port = 0; port < device->num_allowed_ports; ++port) {
if (!device->ports[port].used) {
continue;
}
/* put port num */
IBNET_VERBOSE(10, ("Send pack port num = %d\n", device->ports[port].id));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint16_t)));
memcpy(pack_ptr, &device->ports[port].id, sizeof(uint16_t));
pack_ptr += sizeof(uint16_t);
/* put lid */
IBNET_VERBOSE(10, ("Send pack lid = %d\n", device->ports[port].lid));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint16_t)));
memcpy(pack_ptr, &device->ports[port].lid, sizeof(uint16_t));
pack_ptr += sizeof(uint16_t);
/* put subnetid */
IBNET_VERBOSE(10, ("Send pack subnet id = %lx\n", device->ports[port].subnet_id));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint64_t)));
memcpy(pack_ptr, &device->ports[port].subnet_id, sizeof(uint64_t));
pack_ptr += sizeof(uint64_t);
/* put default mtu */
IBNET_VERBOSE(10, ("Send pack MTU = %d\n", device->ports[port].mtu));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint32_t)));
memcpy(pack_ptr, &device->ports[port].mtu, sizeof(uint32_t));
pack_ptr += sizeof(uint32_t);
/* collectives offload support */
IBNET_VERBOSE(10, ("Send pack collectives offload = %d\n", OFFLOAD_CONNECTX_B0));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint8_t)));
/* Pasha: add query for collectives offload support */
memcpy(pack_ptr, &coll_offload_flag, sizeof(uint8_t));
pack_ptr += sizeof(uint8_t);
/* number of cpcs for this port */
IBNET_VERBOSE(10, ("Send pack number of cpcs = %d\n", device->num_cpcs));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint8_t)));
memcpy(pack_ptr, &device->num_cpcs, sizeof(uint8_t));
pack_ptr += sizeof(uint8_t);
for (cpc = 0; cpc < device->num_cpcs; cpc++) {
uint8_t cpc_index;
uint8_t cpc_buflen;
/* cpc index */
cpc_index = ompi_common_ofacm_base_get_cpc_index(device->cpcs[cpc]->data.cbm_component);
IBNET_VERBOSE(10, ("Send pack cpc index = %d\n", cpc_index));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint8_t)));
memcpy(pack_ptr, &cpc_index, sizeof(uint8_t));
pack_ptr += sizeof(uint8_t);
/* cpc priority */
IBNET_VERBOSE(10, ("Send pack cpc priority = %d\n",
device->cpcs[cpc]->data.cbm_priority));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint8_t)));
memcpy(pack_ptr, &device->cpcs[cpc]->data.cbm_priority, sizeof(uint8_t));
pack_ptr += sizeof(uint8_t);
/* cpc buffer length in bytes */
cpc_buflen = device->cpcs[cpc]->data.cbm_modex_message_len;
IBNET_VERBOSE(10, ("Send pack cpc message len = %d\n", cpc_buflen));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint8_t)));
memcpy(pack_ptr, &cpc_buflen, sizeof(uint8_t));
pack_ptr += sizeof(uint8_t);
/* cpc buffer */
if (0 != cpc_buflen) {
IBNET_VERBOSE(10, ("Send pack cpc buffer len = %d\n", cpc_buflen));
IBNET_VERBOSE(10, ("packing %d of %d\n", 1, sizeof(uint8_t)));
memcpy(pack_ptr, device->cpcs[cpc]->data.cbm_modex_message, cpc_buflen);
pack_ptr += (size_t) cpc_buflen;
}
}
}
}
return OMPI_SUCCESS;
}
/* Translation vpid to ompi_proc */
static int vpid_to_proc(ompi_vpid_t vpid,
struct ompi_proc_t ** procs, int n_procs_in, ompi_proc_t** out_proc)
{
int i;
for (i = 0; i < n_procs_in; i++) {
if (vpid == procs[i]->proc_name.vpid) {
*out_proc = procs[i];
return i;
}
}
return OMPI_ERROR;
}
static int unpack_and_load_gather_rbuff(char *rbuffer, int max_sent_bytes,
struct ompi_proc_t ** procs, int n_procs_in, opal_list_t *peers_data)
{
int i;
char* unpack_ptr;
/* Message format:
* - my rank (uint32_t)
* - number of active ports (uint32_t)
* - for each active port:
* + lid (uint16_t)
* + subnetid (uint64_t)
* + mtu (uint32_t)
* + colloffload (uint8_t)
* + num of cpcs (uint8_t)
* + for each cpc: (uint8_t)
* * cpc index (uint8_t)
* * cpc priority (uint8_t)
* * cpc buffer len (uint8_t)
* * cpc buffer (byte*buffer_len)
*
*/
/* Start to unpack data */
for(i = 0; i < n_procs_in; i++) {
uint32_t p;
mca_sbgp_ibnet_proc_t *ibnet_proc;
unpack_ptr = rbuffer + (size_t) (i * max_sent_bytes);
/* create new proc */
ibnet_proc = OBJ_NEW(mca_sbgp_ibnet_proc_t);
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint32_t)));
IBNET_VERBOSE(10, ("Recive remote rank %d\n", ibnet_proc->rank));
memcpy(&ibnet_proc->rank, unpack_ptr, sizeof(uint32_t));
unpack_ptr += sizeof(uint32_t);
/* set back pointer to ompi_proc */
ibnet_proc->ompi_proc_index =
vpid_to_proc(ibnet_proc->rank, procs,
n_procs_in, &ibnet_proc->ompi_proc);
if (OMPI_ERROR == ibnet_proc->ompi_proc_index) {
return OMPI_ERROR;
}
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint32_t)));
IBNET_VERBOSE(10, ("Recive number of ports %d\n", ibnet_proc->num_ports));
memcpy(&ibnet_proc->num_ports, unpack_ptr, sizeof(uint32_t));
unpack_ptr += sizeof(uint32_t);
/* prepare place for port data*/
ibnet_proc->remote_ports_info = calloc(ibnet_proc->num_ports, sizeof(mca_sbgp_ibnet_port_t));
if (NULL == ibnet_proc->remote_ports_info) {
return OMPI_ERROR;
}
/* load the data */
for(p = 0; p < ibnet_proc->num_ports; p++) {
mca_sbgp_ibnet_port_t *port = &ibnet_proc->remote_ports_info[p];
uint32_t cpc;
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint16_t)));
IBNET_VERBOSE(10, ("Recive id %d\n", port->id));
memcpy(&port->id, unpack_ptr, sizeof(uint16_t));
unpack_ptr += sizeof(uint16_t);
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint16_t)));
IBNET_VERBOSE(10, ("Recive lid %d\n", port->lid));
memcpy(&port->lid, unpack_ptr, sizeof(uint16_t));
unpack_ptr += sizeof(uint16_t);
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint64_t)));
IBNET_VERBOSE(10, ("Recive subnet id %lx\n", port->subnet_id));
memcpy(&port->subnet_id, unpack_ptr, sizeof(uint64_t));
unpack_ptr += sizeof(uint64_t);
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint32_t)));
IBNET_VERBOSE(10, ("Recive mtu %d\n", port->mtu));
memcpy(&port->mtu, unpack_ptr, sizeof(uint32_t));
unpack_ptr += sizeof(uint32_t);
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint8_t)));
IBNET_VERBOSE(10, ("Recive offload %d\n", port->coll_offload));
memcpy(&port->coll_offload, unpack_ptr, sizeof(uint8_t));
unpack_ptr += sizeof(uint8_t);
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint8_t)));
IBNET_VERBOSE(10, ("Recive number of cpcs %d\n", port->num_cpcs));
memcpy(&port->num_cpcs, unpack_ptr, sizeof(uint8_t));
unpack_ptr += sizeof(uint8_t);
port->pm_cpc_data = calloc(port->num_cpcs,
sizeof(ompi_common_ofacm_base_module_data_t));
if (NULL == port->pm_cpc_data) {
return OMPI_ERROR;
}
/* load cpc data */
for (cpc = 0; cpc < port->num_cpcs; cpc++) {
ompi_common_ofacm_base_module_data_t *cpc_data =
&port->pm_cpc_data[cpc];
uint8_t cpc_index = -1;
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint8_t)));
IBNET_VERBOSE(10, ("Recive cpc index %d\n", cpc_index));
memcpy(&cpc_index, unpack_ptr, sizeof(uint8_t));
unpack_ptr += sizeof(uint8_t);
cpc_data->cbm_component =
ompi_common_ofacm_base_get_cpc_byindex(cpc_index);
if (NULL == cpc_data->cbm_component) {
IBNET_VERBOSE(10, ("Failed to resolve cpc index %d\n", cpc_index));
return OMPI_ERROR;
}
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint8_t)));
IBNET_VERBOSE(10, ("Recive priority %d\n", cpc_data->cbm_priority));
memcpy(&cpc_data->cbm_priority, unpack_ptr, sizeof(uint8_t));
unpack_ptr += sizeof(uint8_t);
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, 1, sizeof(uint8_t)));
IBNET_VERBOSE(10, ("Recive cpc message len %d\n", cpc_data->cbm_modex_message_len));
memcpy(&cpc_data->cbm_modex_message_len, unpack_ptr, sizeof(uint8_t));
unpack_ptr += sizeof(uint8_t);
if (0 != cpc_data->cbm_modex_message_len) {
int cpc_buflen = cpc_data->cbm_modex_message_len;
IBNET_VERBOSE(10, ("Recive cpc message data with len %d\n", cpc_buflen));
IBNET_VERBOSE(10, ("element=%d unpacking %d of %d\n", i, cpc_buflen, cpc_buflen));
memcpy(&cpc_data->cbm_modex_message, unpack_ptr, cpc_buflen);
unpack_ptr += (size_t) cpc_buflen;
}
}
}
/* Put the new proc to the list */
opal_list_append(peers_data, (opal_list_item_t*) ibnet_proc);
}
assert((uint32_t) n_procs_in == opal_list_get_size(peers_data));
return OMPI_SUCCESS;
}
static int cmp_cgroups(const void *p1, const void *p2)
{
mca_sbgp_ibnet_connection_group_info_t *g1 =
(mca_sbgp_ibnet_connection_group_info_t *)p1;
mca_sbgp_ibnet_connection_group_info_t *g2 =
(mca_sbgp_ibnet_connection_group_info_t *)p2;
return (g2->num_procs - g1->num_procs);
}
static int set_ibnet_proc_on_cgroup(
mca_sbgp_ibnet_connection_group_info_t *cgroup,
mca_sbgp_ibnet_proc_t *ibnet_proc,
mca_sbgp_ibnet_device_t *device,
mca_sbgp_ibnet_module_t *module)
{
uint32_t p;
int k, rc, p_indx; /* port index in array of device */
for (p_indx = 0; p_indx < device->num_allowed_ports; ++p_indx) {
if (cgroup->port == device->ports[p_indx].id) {
break;
}
}
assert(device->num_act_ports > p_indx);
if (NULL == ibnet_proc->use_port) {
ibnet_proc->use_port = calloc(module->num_cgroups, sizeof(int));
if (NULL == ibnet_proc->use_port) {
IBNET_ERROR(("Failed to allocate use_port array."));
return OMPI_ERROR;
}
}
IBNET_VERBOSE(10, ("Local port is %d, idx - %d.\n",
device->ports[p_indx].id, p_indx));
for(p = 0; p < ibnet_proc->num_ports; p++) {
if (device->ports[p_indx].subnet_id ==
ibnet_proc->remote_ports_info[p].subnet_id) {
ompi_common_ofacm_base_module_t *local_cpc = NULL;
ompi_common_ofacm_base_module_data_t *remote_cpc_data = NULL;
/* check if we have matching cpc on both sides */
if (OMPI_SUCCESS !=
ompi_common_ofacm_base_find_match(device->cpcs,
device->num_cpcs,
ibnet_proc->remote_ports_info[p].pm_cpc_data,
ibnet_proc->remote_ports_info[p].num_cpcs,
&local_cpc,
&remote_cpc_data)) {
/* Failed to match, can not use the port */
IBNET_VERBOSE(10, ("Failed to match, can not use the port - %d.\n", p + 1));
continue;
}
for (k = 0; k < module->num_cgroups && ((p + 1) != (uint32_t) ibnet_proc->use_port[k]); ++k)
;
if (k < module->num_cgroups) {
/* The port in use - another connection group use it */
continue;
}
/* It means that connection group 'cgroup' communicates with
this proc over its own remote port */
ibnet_proc->use_port[cgroup->index] = p + 1;
/* if it is no group array we need to create it*/
if(OPAL_UNLIKELY(NULL == cgroup->ibnet_procs)) {
cgroup->ibnet_procs = OBJ_NEW(opal_pointer_array_t);
rc = opal_pointer_array_init(cgroup->ibnet_procs, 10, INT_MAX, 10);
if (OPAL_SUCCESS != rc) {
IBNET_ERROR(("Failed to allocate opal_pointer_array"));
return OMPI_ERROR;
}
}
IBNET_VERBOSE(10, ("Device idx %d, local port idx %d; "
"adding rank %d to the module %p, rem port %d",
device->device_index, p_indx, ibnet_proc->rank,
module, ibnet_proc->remote_ports_info[p].id));
/* No need to remove: opal_list_remove_item(peers_data, (opal_list_item_t*)ibnet_proc); */
rc = opal_pointer_array_set_item(cgroup->ibnet_procs,
/* num_selected, */ cgroup->num_procs,
(void *) ibnet_proc);
if (OPAL_SUCCESS != rc) {
IBNET_ERROR( ("Failed to set rank %d to index %d",
ibnet_proc->rank, 1 + cgroup->num_procs));
return OMPI_ERROR;
}
/* put selected cpc data to this proc */
ibnet_proc->remote_ports_info[p].local_cpc = local_cpc;
ibnet_proc->remote_ports_info[p].remote_cpc_data = remote_cpc_data;
++cgroup->num_procs;
/* we done for the proc, go to next one */
break;
}
}
return OMPI_SUCCESS;
}
static int setup_cgroup_all(
mca_sbgp_ibnet_connection_group_info_t *cgroup,
mca_sbgp_ibnet_device_t *device,
mca_sbgp_ibnet_module_t *module,
opal_list_t *peers_data)
{
int rc;
mca_sbgp_ibnet_proc_t *ibnet_proc = NULL;
for (ibnet_proc = (mca_sbgp_ibnet_proc_t *) opal_list_get_first(peers_data);
ibnet_proc != (mca_sbgp_ibnet_proc_t *) opal_list_get_end(peers_data);
ibnet_proc = (mca_sbgp_ibnet_proc_t *)
opal_list_get_next((opal_list_item_t *)ibnet_proc)) {
rc = set_ibnet_proc_on_cgroup(cgroup, ibnet_proc, device, module);
if (OMPI_SUCCESS != rc) {
return rc;
}
}
return OMPI_SUCCESS;
}
static int setup_cgroup_node(mca_sbgp_ibnet_connection_group_info_t *cgroup, mca_sbgp_ibnet_device_t *device,
mca_sbgp_ibnet_module_t *module, opal_list_t *peers_data)
{
int rc, local = 0;
mca_sbgp_ibnet_proc_t *ibnet_proc = NULL;
for (ibnet_proc = (mca_sbgp_ibnet_proc_t *)opal_list_get_first(peers_data);
ibnet_proc != (mca_sbgp_ibnet_proc_t *)opal_list_get_end(peers_data);
ibnet_proc = (mca_sbgp_ibnet_proc_t *)
opal_list_get_next((opal_list_item_t *)ibnet_proc)) {
local = OPAL_PROC_ON_LOCAL_NODE(ibnet_proc->ompi_proc->proc_flags);
if (0 == local) {
/* the remote process resides on different node */
continue;
}
/* the process resides on the same machine */
rc = set_ibnet_proc_on_cgroup(cgroup, ibnet_proc, device, module);
if (OMPI_SUCCESS != rc) {
return rc;
}
}
return OMPI_SUCCESS;
}
/* The function should be the heart of the ibnet component.
* Main purpose:
* The function should run over list of all peers and select only "reachable" peers.
* Peer that have subnet_id equal to subnet id that I have on my ports is reachable.
* All peers that have the same number of active ports on the same subnet maybe grouped
* to subgroup?
* Need to think more about the select logic on this stage I just return list of all
* procs
*/
static int select_procs(mca_sbgp_ibnet_module_t *module, opal_list_t *peers_data)
{
mca_sbgp_ibnet_device_t *device = NULL;
mca_sbgp_ibnet_proc_t *ibnet_proc = NULL;
mca_sbgp_ibnet_connection_group_info_t *cgroup = NULL;
uint32_t p = 0;
int i = 0, j, rc = OMPI_SUCCESS;
int num_grouped = 0,
groups_to_use = 1;
mca_sbgp_ibnet_component_t *cs = &mca_sbgp_ibnet_component;
IBNET_VERBOSE(10, ("Start to select procs.\n"));
module->num_cgroups = 0;
for (device = (mca_sbgp_ibnet_device_t *) opal_list_get_first(&cs->devices);
device != (mca_sbgp_ibnet_device_t *) opal_list_get_end(&cs->devices);
device = (mca_sbgp_ibnet_device_t *)
opal_list_get_next((opal_list_item_t *) device)) {
module->num_cgroups += device->num_act_ports;
IBNET_VERBOSE(10, ("Device num %d with index %d num of active ports %d\n",
++i, device->device_index, device->num_act_ports));
}
module->cgroups = calloc(module->num_cgroups,
sizeof(mca_sbgp_ibnet_connection_group_info_t));
if (NULL == module->cgroups) {
IBNET_ERROR(("Failed to allocate cgroups"));
goto select_error;
}
IBNET_VERBOSE(10, ("Num of cgroups - %d.\n", module->num_cgroups));
/* 1. Run over all active ports and build connection group
* for each one */
for (device = (mca_sbgp_ibnet_device_t *) opal_list_get_first(&cs->devices);
device != (mca_sbgp_ibnet_device_t *) opal_list_get_end(&cs->devices);
device = (mca_sbgp_ibnet_device_t *)
opal_list_get_next((opal_list_item_t *)device)) {
/* run over active ports on the device */
for(j = 0; j < device->num_act_ports; j++) {
cgroup = &module->cgroups[num_grouped];
/* Init cgroups structs */
cgroup->device_index = device->device_index;
cgroup->index = num_grouped;
cgroup->port = device->ports[j].id;
cgroup->num_procs = 0;
/* Setup comunication group */
switch(module->mode) {
case MCA_SBGP_IBNET_ALL_NET:
rc = setup_cgroup_all(cgroup, device, module, peers_data);
break;
case MCA_SBGP_IBNET_NODE_NET:
rc = setup_cgroup_node(cgroup, device, module, peers_data);
break;
default:
rc = OMPI_ERROR;
IBNET_ERROR(("Module mode is unknow, fatal error"));
}
if (OMPI_SUCCESS != rc) {
IBNET_ERROR(("Failed to setup cgroup."));
goto select_error;
}
if (0 != cgroup->num_procs) {
++num_grouped;
}
}
}
if (0 == num_grouped) {
/* No connection group was found */
IBNET_ERROR(("No connection group was found."));
goto select_error;
}
/* If we have more than one single cgroup,
* we need to return groups that connects
* to exactly the same peers
*/
if (num_grouped > 1) {
/* 2. Sort connection groups by size */
qsort(module->cgroups, num_grouped,
sizeof(mca_sbgp_ibnet_connection_group_info_t),
cmp_cgroups);
/* 3. What is the number of groups with maximal size */
/* The first is Maximal */
for (groups_to_use = 1; groups_to_use < num_grouped; groups_to_use++) {
if (module->cgroups[0].num_procs != module->cgroups[groups_to_use].num_procs) {
break;
}
}
/* Ishai - It looks that noone is uses this groups_to_use value. In any case there is a bug in it. */
/* 4. Check that all the maximal size groups are
* connect to the same peers, if not we just use FIRST cgroup */
if (groups_to_use > 1) {
/* we need to check that all groups connects
* the same set of peers. */
for (j = groups_to_use - 1; j > 0; j--) {
for (p = 0; p < module->cgroups[0].num_procs; p++) {
/* compare proc by proc....*/
if (opal_pointer_array_get_item(module->cgroups[0].ibnet_procs, p) !=
opal_pointer_array_get_item(module->cgroups[j].ibnet_procs, p)) {
/* peers are not equal, ignore this group and go to the next one */
groups_to_use--;
if (j != groups_to_use) {
/* it was not the last group, swap last and this one */
mca_sbgp_ibnet_connection_group_info_t tmp = module->cgroups[j];
module->cgroups[j] = module->cgroups[groups_to_use];
module->cgroups[groups_to_use] = tmp;
}
break; /* go to the next group */
}
}
}
}
}
/* updating sgroup number */
module->num_cgroups = groups_to_use;
/* put array of ranks and size */
module->super.group_size = module->cgroups[0].num_procs;
module->super.group_list = (int *) calloc(module->super.group_size, sizeof(int));
if (NULL == module->super.group_list) {
IBNET_ERROR(("Failed to allocate memory for group list"));
goto select_error;
}
for (i = 0; i < module->super.group_size; i++) {
ibnet_proc = (mca_sbgp_ibnet_proc_t *)
opal_pointer_array_get_item(module->cgroups[0].ibnet_procs, i);
assert(NULL != ibnet_proc);
IBNET_VERBOSE(10, ("Adding rank %d to group list", ibnet_proc->rank));
module->super.group_list[i] = ibnet_proc->ompi_proc_index;
}
/* Let proc with lowest index be a leader of the subgroup */
ibnet_proc = (mca_sbgp_ibnet_proc_t *)
opal_pointer_array_get_item(module->cgroups[0].ibnet_procs, 0);
assert(NULL != ibnet_proc);
ibnet_proc->duty = MCA_SBGP_IBNET_NODE_LEADER;
#if OPAL_ENABLE_DEBUG
IBNET_VERBOSE(10, ("Ibnet module: size - %d, num_cgroups - %d.\n",
module->super.group_size, module->num_cgroups));
for (i = 0; i < module->num_cgroups; ++i) {
IBNET_VERBOSE(10, ("cgroup %d uses port %d.\n",
i + 1, module->cgroups[i].port));
}
#endif
return OMPI_SUCCESS;
select_error:
if (NULL != module->cgroups) {
for (i = 0; i < num_grouped; i++) {
if (NULL != module->cgroups[i].ibnet_procs) {
/* Ishai: When do we destruct it if the fucntion was successful - only at the end of the process? */
OBJ_DESTRUCT(module->cgroups[i].ibnet_procs);
}
}
free(module->cgroups);
}
if (0 != module->super.group_size &&
NULL != module->super.group_list) {
free(module->super.group_list);
}
for (ibnet_proc = (mca_sbgp_ibnet_proc_t *) opal_list_get_first(peers_data);
ibnet_proc != (mca_sbgp_ibnet_proc_t *) opal_list_get_end(peers_data);
ibnet_proc = (mca_sbgp_ibnet_proc_t *)
opal_list_get_next((opal_list_item_t *) ibnet_proc)) {
if (NULL != ibnet_proc->use_port) {
free(ibnet_proc->use_port);
}
}
return rc;
}
/* This routine is used to find the list of procs that run on the
** same host as the calling process.
*/
#define IBNET_ALL "all"
#define IBNET_NODE "node"
static int key2mode(char *key)
{
if (NULL == key) {
IBNET_VERBOSE(6, ("key is NULL, return MCA_SBGP_IBNET_ALL"));
return MCA_SBGP_IBNET_ALL_NET;
}
if (strlen(IBNET_ALL) == strlen(key) &&
0 == strncmp(IBNET_ALL, key, strlen(IBNET_ALL))) {
IBNET_VERBOSE(6, ("key is MCA_SBGP_IBNET_ALL"));
return MCA_SBGP_IBNET_ALL_NET;
}
if (strlen(IBNET_NODE) == strlen(key) &&
0 == strncmp(IBNET_NODE, key, strlen(IBNET_NODE))) {
IBNET_VERBOSE(6, ("key is NODE"));
return MCA_SBGP_IBNET_NODE_NET;
}
IBNET_VERBOSE(6, ("key was not detected, return MCA_SBGP_IBNET_NONE"));
return MCA_SBGP_IBNET_NONE_NET;
}
static int mca_sbgp_ibnet_calc_sbuff_size(void)
{
int bytes_tosend = 0, port, cpc;
mca_sbgp_ibnet_device_t *device;
opal_list_t *devices = &mca_sbgp_ibnet_component.devices;
bytes_tosend += sizeof(uint32_t); /* OPAL_UINT32 rank */
bytes_tosend += sizeof(uint32_t); /* OPAL_UINT32 num of active ports */
/* Go through list of device and build the message*/
for (device = (mca_sbgp_ibnet_device_t *) opal_list_get_first(devices);
device != (mca_sbgp_ibnet_device_t *) opal_list_get_end(devices);
device = (mca_sbgp_ibnet_device_t *) opal_list_get_next((opal_list_item_t *) device)) {
for (port = 0; port < device->num_allowed_ports; ++port) {
if (!device->ports[port].used) {
continue;
}
/* OPAL_UINT16 port num */
bytes_tosend += sizeof(uint16_t);
/* OPAL_UINT16 lid */
bytes_tosend += sizeof(uint16_t);
/* OPAL_UINT64 subnetid */
bytes_tosend += sizeof(uint64_t);
/* OPAL_UINT32 default mtu */
bytes_tosend += sizeof(uint32_t);
/* OPAL_UINT8 collectives offload support */
bytes_tosend += sizeof(uint8_t);
/* OPAL_UINT8 number of cpcs for this port */
bytes_tosend += sizeof(uint8_t);
for (cpc = 0; cpc < device->num_cpcs; ++cpc) {
/* OPAL_UINT8 cpc index */
bytes_tosend += sizeof(uint8_t);
/* OPAL_UINT8 cpc priority */
bytes_tosend += sizeof(uint8_t);
/* cpc buffer length (OPAL_UINT8) in bytes */
bytes_tosend += device->cpcs[cpc]->data.cbm_modex_message_len;
bytes_tosend += sizeof(uint8_t);
}
}
}
return bytes_tosend;
}
mca_sbgp_base_module_t *mca_sbgp_ibnet_select_procs(struct ompi_proc_t **procs,
int n_procs_in,
struct ompi_communicator_t *comm,
char *key,
void *output_data
)
{
/* local variables */
opal_list_t peers_data;
mca_sbgp_ibnet_module_t *module;
uint32_t rc;
char *sbuff = NULL, *rbuff = NULL;
int *sbgp_procs_ranks = NULL, *ranks_in_comm = NULL;
int i, my_rank_in_group = -1, my_rank, num_bytes_tosend;
struct mca_sbgp_ibnet_proc_t *ibnet_proc = NULL;
mca_sbgp_ibnet_component_t *cs = &mca_sbgp_ibnet_component;
/* Create the module */
module = OBJ_NEW(mca_sbgp_ibnet_module_t);
if (OPAL_UNLIKELY(NULL == module)) {
return NULL;
}
module->num_cgroups = 0;
module->cgroups = NULL;
module->mode = key2mode(key);
if (OPAL_UNLIKELY(MCA_SBGP_IBNET_NONE_NET == module->mode)) {
goto Error_module;
}
module->super.group_size = 0;
module->super.group_list = NULL;
module->super.group_comm = comm;
module->super.group_net = OMPI_SBGP_IBCX2;
ranks_in_comm = (int *) malloc(n_procs_in * sizeof(int));
if (OPAL_UNLIKELY(NULL == ranks_in_comm)) {
IBNET_ERROR(("Cannot allocate memory.\n"));
goto Error;
}
my_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm);
for (i = 0; i < n_procs_in; i++) {
ranks_in_comm[i] = procs[i]->proc_name.vpid;
if (my_rank == ranks_in_comm[i]) {
my_rank_in_group = i;
}
}
/* Prepare send data */
num_bytes_tosend = mca_sbgp_ibnet_calc_sbuff_size();
rc = comm_allreduce_pml(&num_bytes_tosend,
&num_bytes_tosend, 1,
MPI_INT, my_rank_in_group,
MPI_MAX, n_procs_in,
ranks_in_comm, &ompi_mpi_comm_world.comm);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
goto Error;
}
IBNET_VERBOSE(10, ("The size of the send buff is %d\n", num_bytes_tosend));
assert(num_bytes_tosend > 0);
/* Allocate send/recv buffers for allgather comunication */
sbuff = (char *) malloc(num_bytes_tosend);
rbuff = (char *) malloc(num_bytes_tosend * n_procs_in);
if (OPAL_UNLIKELY(NULL == sbuff || NULL == rbuff)) {
IBNET_ERROR(("Failed to allocate buffers for send/recv ibnet allgather\n"));
goto Error;
}
rc = pack_gather_sbuff(sbuff);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
goto Error;
}
rc = comm_allgather_pml((void *) sbuff, (void *) rbuff,
num_bytes_tosend, MPI_BYTE,
my_rank_in_group, n_procs_in,
ranks_in_comm, &ompi_mpi_comm_world.comm);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
IBNET_ERROR(("Allgather call failed.\n"));
goto Error;
}
/* Prepare list for arraving data */
OBJ_CONSTRUCT(&peers_data, opal_list_t);
/* Load the data to peers data */
rc = unpack_and_load_gather_rbuff(rbuff, num_bytes_tosend,
procs, n_procs_in, &peers_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
goto Error;
}
/* Select logic */
rc = select_procs(module, &peers_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
goto Error;
}
/* Put group id */
sbgp_procs_ranks = (int *) malloc(module->super.group_size *
sizeof(int));
if (OPAL_UNLIKELY(NULL == sbgp_procs_ranks)) {
IBNET_ERROR(("Cannot allocate memory.\n"));
goto Error;
}
for (i = 0; i < module->super.group_size; ++i) {
ibnet_proc = (struct mca_sbgp_ibnet_proc_t *)
opal_pointer_array_get_item(
module->cgroups[0].ibnet_procs, i);
sbgp_procs_ranks[i] = ibnet_proc->ompi_proc->proc_name.vpid;
if (my_rank == sbgp_procs_ranks[i]) {
my_rank_in_group = i;
}
}
assert(my_rank_in_group >= 0);
rc = comm_allreduce_pml(&cs->curr_max_group_id,
&cs->curr_max_group_id, 1,
MPI_INT, my_rank_in_group,
MPI_MAX, module->super.group_size,
sbgp_procs_ranks, &ompi_mpi_comm_world.comm);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
goto Error;
}
module->group_id = cs->curr_max_group_id;
cs->curr_max_group_id++;
/* successful completion */
/* clean up the temporary structures */
OBJ_DESTRUCT(&peers_data);
free(sbuff);
free(rbuff);
free(ranks_in_comm);
free(sbgp_procs_ranks);
IBNET_VERBOSE(10, ("Return ibnet module.\n"));
return (mca_sbgp_base_module_t *) module;
/* return with error */
Error:
/* clean up */
if(NULL != module->super.group_list) {
free(module->super.group_list);
module->super.group_list = NULL;
}
/* clean up the temporary structures */
OBJ_DESTRUCT(&peers_data);
if (NULL != sbgp_procs_ranks) {
free(sbgp_procs_ranks);
}
if (NULL != ranks_in_comm) {
free(ranks_in_comm);
}
if (NULL != sbuff) {
free(sbuff);
}
if (NULL != rbuff) {
free(rbuff);
}
Error_module:
OBJ_RELEASE(module);
return NULL;
}