1
1

Fix MXM connection establishment flow

This commit was SVN r28329.
Этот коммит содержится в:
Alex Margolin 2013-04-12 16:37:42 +00:00
родитель 538a4f92d3
Коммит 0ab7675019
4 изменённых файлов: 204 добавлений и 231 удалений

Просмотреть файл

@ -20,12 +20,17 @@ active on the node and the hardware is functioning.
Error: %s
[unable to extract endpoint address]
[unable to extract endpoint ptl address]
MXM was unable to read settings for endpoint
PTL ID: %d
Error: %s
[unable to extract endpoint address]
MXM was unable to read settings for endpoint
Error: %s
[mxm mq create]
Failed to create MQ for endpoint

Просмотреть файл

@ -20,7 +20,6 @@
#include "mtl_mxm_types.h"
#include "mtl_mxm_endpoint.h"
#include "mtl_mxm_request.h"
#include <mxm/api/mxm_addr.h>
mca_mtl_mxm_module_t ompi_mtl_mxm = {
{
@ -90,7 +89,7 @@ static int ompi_mtl_mxm_get_ep_address(ompi_mtl_mxm_ep_conn_info_t *ep_info, mxm
err = mxm_ep_address(ompi_mtl_mxm.ep, ptlid,
(struct sockaddr *) &ep_info->ptl_addr[ptlid], &addrlen);
if (MXM_OK != err) {
opal_show_help("help-mtl-mxm.txt", "unable to extract endpoint address",
opal_show_help("help-mtl-mxm.txt", "unable to extract endpoint ptl address",
true, (int)ptlid, mxm_error_string(err));
return OMPI_ERROR;
}
@ -98,36 +97,40 @@ static int ompi_mtl_mxm_get_ep_address(ompi_mtl_mxm_ep_conn_info_t *ep_info, mxm
return OMPI_SUCCESS;
}
#else
static int ompi_mtl_mxm_get_ep_address(ompi_mtl_mxm_ep_conn_info_t *ep_info, int dest_rank,
mxm_domain_id_t domain)
static int ompi_mtl_mxm_get_ep_address(void **address_p, size_t *address_len_p)
{
size_t addrlen;
mxm_error_t err;
addrlen = sizeof(ep_info->dest_addr[domain]);
err = mxm_ep_address(ompi_mtl_mxm.ep, domain, dest_rank,
(struct sockaddr *) &ep_info->dest_addr[domain], &addrlen);
if (MXM_OK == err) {
ep_info->domain_bitmap |= MXM_BIT(domain);
return OMPI_SUCCESS;
} else if (MXM_ERR_UNREACHABLE == err) {
return OMPI_SUCCESS;
} else {
opal_show_help("help-mtl-mxm.txt", "unable to extract endpoint address",
true, (int)domain, mxm_error_string(err));
*address_len_p = 0;
err = mxm_ep_get_address(ompi_mtl_mxm.ep, NULL, address_len_p);
if (err != MXM_ERR_BUFFER_TOO_SMALL) {
MXM_ERROR("Failed to get ep address length");
return OMPI_ERROR;
}
*address_p = malloc(*address_len_p);
if (*address_p == NULL) {
MXM_ERROR("Failed to allocate ep address buffer");
return OMPI_ERR_OUT_OF_RESOURCE;
}
err = mxm_ep_get_address(ompi_mtl_mxm.ep, *address_p, address_len_p);
if (MXM_OK != err) {
opal_show_help("help-mtl-mxm.txt", "unable to extract endpoint address",
true, mxm_error_string(err));
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
#endif
#define max(a,b) ((a)>(b)?(a):(b))
static mxm_error_t ompi_mtl_mxm_create_ep(mxm_h ctx, mxm_ep_h *ep, unsigned ptl_bitmap, int lr,
uint32_t jobid, uint64_t mxlr, int nlps
#if MXM_API >= MXM_VERSION(2, 0)
, int totps
#endif
) {
static mxm_error_t
ompi_mtl_mxm_create_ep(mxm_h ctx, mxm_ep_h *ep, unsigned ptl_bitmap, int lr,
uint32_t jobid, uint64_t mxlr, int nlps)
{
mxm_error_t err;
#if MXM_API < MXM_VERSION(1,5)
@ -165,6 +168,19 @@ static mxm_error_t ompi_mtl_mxm_create_ep(mxm_h ctx, mxm_ep_h *ep, unsigned ptl_
MXM_VERBOSE(1, "MXM version is old, consider to upgrade");
err = mxm_ep_create(ctx, &ep_opt, ep);
#elif MXM_API < MXM_VERSION(2,0)
mxm_ep_opts_t *ep_opts;
err = mxm_config_read_ep_opts(&ep_opts);
if (err != MXM_OK) {
MXM_ERROR("Failed to parse MXM configuration");
return err;
}
ep_opts->job_id = jobid;
ep_opts->local_rank = lr;
ep_opts->num_local_procs = nlps;
err = mxm_ep_create(ctx, ep_opts, ep);
mxm_config_free(ep_opts);
#else
mxm_ep_opts_t *ep_opts;
err = mxm_config_read_ep_opts(&ep_opts);
@ -173,119 +189,129 @@ static mxm_error_t ompi_mtl_mxm_create_ep(mxm_h ctx, mxm_ep_h *ep, unsigned ptl_
return err;
}
#if MXM_API >= MXM_VERSION(2, 0)
err = mxm_ep_create(ctx, ep_opts, ep, totps);
#else
ep_opts->job_id = jobid;
ep_opts->local_rank = lr;
ep_opts->num_local_procs = nlps;
err = mxm_ep_create(ctx, ep_opts, ep);
#endif
mxm_config_free(ep_opts);
mxm_config_free_ep_opts(ep_opts);
#endif
return err;
}
#if MXM_API >= MXM_VERSION(2,0)
static void ompi_mtl_mxm_set_conn_req(mxm_conn_req_t *conn_req, ompi_mtl_mxm_ep_conn_info_t *ep_info,
mxm_domain_id_t domain)
/*
* send information using modex (in some case there is limitation on data size for example ess/pmi)
* set size of data sent for once
*
*/
static int ompi_mtl_mxm_send_ep_address(void *address, size_t address_len)
{
if (ep_info->domain_bitmap & MXM_BIT(domain)) {
conn_req->addr[domain] = (struct sockaddr *)&(ep_info->dest_addr[domain]);
} else {
conn_req->addr[domain] = NULL;
}
}
#endif
char *modex_component_name = mca_base_component_to_string(&mca_mtl_mxm_component.super.mtl_version);
char *modex_name = malloc(strlen(modex_component_name) + 5);
const size_t modex_max_size = 0x60;
unsigned char *modex_buf_ptr;
size_t modex_buf_size;
size_t modex_cur_size;
int modex_name_id = 0;
int rc;
#if MXM_API >= MXM_VERSION(2,0)
#define MTL_MXM_MODEX_MAX_SIZE ((size_t)0x60)
static int ompi_mtl_mxm_send_ep_address(ompi_mtl_mxm_ep_conn_info_t *ep_info, int totps)
{
int rc, dest;
mca_mtl_base_component_2_0_0_t *cm = &mca_mtl_mxm_component.super;
char *modex_key, *mxm_version = mca_base_component_to_string(&cm->mtl_version);
/* Rough approximation of the next string length: mxm_version-dest_rank-portion_num */
modex_key = malloc(strlen(mxm_version) + 8 * sizeof(int) + 8 * sizeof(size_t) + 2);
if (NULL == modex_key) {
MXM_ERROR("Cannot allocate memory.");
return OMPI_ERR_OUT_OF_RESOURCE;
/* Send address length */
sprintf(modex_name, "%s-len", modex_component_name);
rc = ompi_modex_send_string((const char *)modex_name, &address_len, sizeof(address_len));
if (OMPI_SUCCESS != rc) {
MXM_ERROR("failed to send address length");
goto bail;
}
/*
* Send information using modex (in some case there is limitation on data size for example ess/pmi)
* set size of data sent for once
/* Send address, in parts.
* modex name looks as mtl.mxm.1.5-18 where mtl.mxm.1.5 is the component and 18 is part index.
*/
for (dest = 0; dest < totps; ++dest) {
/*
* Get address for each PTL on this endpoint, and share it with other ranks.
*/
int modex_name_id = 0;
size_t modex_cur_size, modex_buf_size = sizeof(*ep_info);
unsigned char *modex_buf_ptr = (unsigned char *) ep_info;
modex_cur_size = modex_buf_size < MTL_MXM_MODEX_MAX_SIZE ?
modex_buf_size : MTL_MXM_MODEX_MAX_SIZE;
ep_info->domain_bitmap = 0;
rc = ompi_mtl_mxm_get_ep_address(ep_info, dest, MXM_DOMAIN_SELF);
modex_buf_size = address_len;
modex_buf_ptr = address;
while (modex_buf_size) {
sprintf(modex_name, "%s-%d", modex_component_name, modex_name_id);
modex_cur_size = (modex_buf_size < modex_max_size) ? modex_buf_size : modex_max_size;
rc = ompi_modex_send_string(modex_name, modex_buf_ptr, modex_cur_size);
if (OMPI_SUCCESS != rc) {
MXM_ERROR("Failed to get endpoint address: for domain SELF dest %d.", dest);
return OMPI_ERROR;
MXM_ERROR("Open MPI couldn't distribute EP connection details");
goto bail;
}
rc = ompi_mtl_mxm_get_ep_address(ep_info, dest, MXM_DOMAIN_SHM);
if (OMPI_SUCCESS != rc) {
MXM_ERROR("Failed to get endpoint address: for domain SHM dest %d.", dest);
return OMPI_ERROR;
}
rc = ompi_mtl_mxm_get_ep_address(ep_info, dest, MXM_DOMAIN_IB);
if (OMPI_SUCCESS != rc) {
MXM_ERROR("Failed to get endpoint address: for domain IB dest %d.", dest);
return OMPI_ERROR;
}
while (modex_buf_size) {
/* Modex key looks as mtl.mxm.1.5-1-18 where mtl.mxm.1.5 is the component,
1 is a destination rank and 18 is a portion index */
sprintf(modex_key, "%s-%d-%d", mxm_version, dest, modex_name_id);
rc = ompi_modex_send_string((const char *) modex_key, modex_buf_ptr, modex_cur_size);
if (OMPI_SUCCESS != rc) {
MXM_ERROR("Open MPI couldn't distribute EP connection details");
free(modex_key);
free(mxm_version);
return OMPI_ERROR;
}
++modex_name_id;
modex_buf_ptr += modex_cur_size;
modex_buf_size -= modex_cur_size;
modex_cur_size = modex_buf_size < MTL_MXM_MODEX_MAX_SIZE ?
modex_buf_size : MTL_MXM_MODEX_MAX_SIZE;
}
modex_name_id++;
modex_buf_ptr += modex_cur_size;
modex_buf_size -= modex_cur_size;
}
free(modex_key);
free(mxm_version);
rc = OMPI_SUCCESS;
return OMPI_SUCCESS;
bail:
free(modex_component_name);
free(modex_name);
return rc;
}
/*
* recieve information using modex
*/
static int ompi_mtl_mxm_recv_ep_address(ompi_proc_t *source_proc, void **address_p,
size_t *address_len_p)
{
char *modex_component_name = mca_base_component_to_string(&mca_mtl_mxm_component.super.mtl_version);
char *modex_name = malloc(strlen(modex_component_name) + 5);
unsigned char *modex_buf_ptr;
size_t modex_cur_size;
size_t modex_buf_size;
size_t *address_len_buf_ptr;
int modex_name_id = 0;
int rc;
*address_p = NULL;
*address_len_p = 0;
/* Receive address length */
sprintf(modex_name, "%s-len", modex_component_name);
rc = ompi_modex_recv_string(modex_name, source_proc, (void**)&address_len_buf_ptr,
&modex_cur_size);
if (OMPI_SUCCESS != rc) {
MXM_ERROR("Failed to receive ep address length");
goto bail;
}
/* Allocate buffer to hold the address */
*address_len_p = *address_len_buf_ptr;
*address_p = malloc(*address_len_p);
if (*address_p == NULL) {
MXM_ERROR("Failed to allocate modex receive buffer");
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto bail;
}
/* Receive the data, in parts */
modex_buf_size = 0;
while (modex_buf_size < *address_len_p) {
sprintf(modex_name, "%s-%d", modex_component_name, modex_name_id);
rc = ompi_modex_recv_string(modex_name, source_proc, (void**)&modex_buf_ptr,
&modex_cur_size);
if (OMPI_SUCCESS != rc) {
MXM_ERROR("Open MPI couldn't distribute EP connection details");
goto bail;
}
memcpy((char*)(*address_p) + modex_buf_size, modex_buf_ptr, modex_cur_size);
modex_buf_size += modex_cur_size;
modex_name_id++;
}
rc = OMPI_SUCCESS;
bail:
free(modex_component_name);
free(modex_name);
return rc;
}
#endif
int ompi_mtl_mxm_module_init(void)
{
#if MXM_API < MXM_VERSION(2,0)
ompi_mtl_mxm_ep_conn_info_t ep_info;
#endif
void *ep_address;
size_t ep_address_len;
mxm_error_t err;
uint32_t jobid;
uint64_t mxlr;
@ -293,6 +319,7 @@ int ompi_mtl_mxm_module_init(void)
unsigned ptl_bitmap;
size_t totps, proc;
int lr, nlps;
int rc;
mxlr = 0;
lr = -1;
@ -340,12 +367,7 @@ int ompi_mtl_mxm_module_init(void)
/* Open MXM endpoint */
err = ompi_mtl_mxm_create_ep(ompi_mtl_mxm.mxm_context, &ompi_mtl_mxm.ep,
ptl_bitmap, lr, jobid, mxlr, nlps
#if MXM_API >= MXM_VERSION(2, 0)
, totps
#endif
);
ptl_bitmap, lr, jobid, mxlr, nlps);
if (MXM_OK != err) {
opal_show_help("help-mtl-mxm.txt", "unable to create endpoint", true,
mxm_error_string(err));
@ -369,47 +391,24 @@ int ompi_mtl_mxm_module_init(void)
return OMPI_ERROR;
}
/*
* send information using modex (in some case there is limitation on data size for example ess/pmi)
* set size of data sent for once
*/
{
size_t modex_max_size = 0x60;
unsigned char *modex_buf_ptr = (unsigned char *)&ep_info;
size_t modex_buf_size = sizeof(ep_info);
size_t modex_cur_size = (modex_buf_size < modex_max_size ? modex_buf_size : modex_max_size);
char *modex_component_name = mca_base_component_to_string(&mca_mtl_mxm_component.super.mtl_version);
char *modex_name = malloc(strlen(modex_component_name) + 5);
int modex_name_id = 0;
while (modex_buf_size) {
/* modex name looks as mtl.mxm.1.5-18 where mtl.mxm.1.5 is the component and 18 is portion index */
sprintf(modex_name, "%s-%d", modex_component_name, modex_name_id);
if (OMPI_SUCCESS != ompi_modex_send_string((const char *)modex_name, modex_buf_ptr, modex_cur_size)) {
MXM_ERROR("Open MPI couldn't distribute EP connection details");
free(modex_component_name);
free(modex_name);
return OMPI_ERROR;
}
modex_name_id++;
modex_buf_ptr += modex_cur_size;
modex_buf_size -= modex_cur_size;
modex_cur_size = (modex_buf_size < modex_max_size ? modex_buf_size : modex_max_size);
}
free(modex_component_name);
free(modex_name);
}
ep_address = &ep_info;
ep_address_len = sizeof(ep_info);
#else
{
int rc;
rc = ompi_mtl_mxm_send_ep_address(&ep_info, totps);
if (OMPI_SUCCESS != rc) {
MXM_ERROR("Modex session failed.");
return rc;
}
rc = ompi_mtl_mxm_get_ep_address(&ep_address, &ep_address_len);
if (OMPI_SUCCESS != rc) {
return rc;
}
#endif
rc = ompi_mtl_mxm_send_ep_address(ep_address, ep_address_len);
if (OMPI_SUCCESS != rc) {
MXM_ERROR("Modex session failed.");
return rc;
}
#if MXM_API >= MXM_VERSION(2,0)
free(ep_address);
#endif
/* Register the MXM progress function */
opal_progress_register(ompi_mtl_mxm_progress);
@ -427,94 +426,63 @@ int ompi_mtl_mxm_add_procs(struct mca_mtl_base_module_t *mtl, size_t nprocs,
struct ompi_proc_t** procs, /*const*/
struct mca_mtl_base_endpoint_t **mtl_peer_data)
{
#if MXM_API < MXM_VERSION(2,0)
ompi_mtl_mxm_ep_conn_info_t *ep_info;
mxm_conn_req_t *conn_reqs;
int timeout;
#endif
void *ep_address;
size_t ep_address_len;
mxm_error_t err;
size_t i;
int rc;
int timeout;
assert(mtl == &ompi_mtl_mxm.super);
#if MXM_API < MXM_VERSION(2,0)
/* Allocate connection requests */
conn_reqs = malloc(nprocs * sizeof(mxm_conn_req_t));
ep_info = malloc(nprocs * sizeof(ompi_mtl_mxm_ep_conn_info_t));
memset(conn_reqs, 0x0, sizeof(mxm_conn_req_t));
memset(ep_info, 0x0, sizeof(ompi_mtl_mxm_ep_conn_info_t));
conn_reqs = calloc(nprocs, sizeof(mxm_conn_req_t));
ep_info = calloc(nprocs, sizeof(ompi_mtl_mxm_ep_conn_info_t));
if (NULL == conn_reqs || NULL == ep_info) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto bail;
}
#endif
/* Get the EP connection requests for all the processes from modex */
for (i = 0; i < nprocs; ++i) {
/*
* recieve information using modex
*/
{
unsigned char *ep_info_ptr = (unsigned char*)(ep_info + i);
unsigned char *modex_buf_ptr = NULL;
size_t modex_buf_size = sizeof(ompi_mtl_mxm_ep_conn_info_t);
size_t modex_cur_size = 0;
char *modex_component_name = mca_base_component_to_string(&mca_mtl_mxm_component.super.mtl_version);
int modex_name_id = 0;
#if MXM_API < MXM_VERSION(2,0)
char *modex_name = malloc(strlen(modex_component_name) + 5);
#else
char *modex_name = malloc(strlen(modex_component_name) + 8 * sizeof(int) + 8 * sizeof(size_t) + 2);
#endif
while (modex_buf_size > 0) {
/* modex name looks as mtl.mxm.1.5-18 where mtl.mxm.1.5 is the component and 18 is portion index */
#if MXM_API < MXM_VERSION(2,0)
sprintf(modex_name, "%s-%d", modex_component_name, modex_name_id);
#else
sprintf(modex_name, "%s-%d-%d", modex_component_name, ompi_process_info.my_name.vpid, modex_name_id);
#endif
if (OMPI_SUCCESS != ompi_modex_recv_string((const char *)modex_name, procs[i], (void**)&modex_buf_ptr, &modex_cur_size)) {
MXM_ERROR("Open MPI couldn't distribute EP connection details");
free(modex_name);
rc = OMPI_ERROR;
goto bail;
}
/* fill in ep_info[i] with recieved data */
memcpy((void*)ep_info_ptr, modex_buf_ptr, modex_cur_size);
if (modex_cur_size > modex_buf_size) {
MXM_ERROR("Open MPI couldn't distribute EP connection details: incorrect size of message");
free(modex_component_name);
free(modex_name);
rc = OMPI_ERROR;
goto bail;
}
modex_name_id++;
ep_info_ptr += modex_cur_size;
modex_buf_size -= modex_cur_size;
modex_cur_size = 0;
}
free(modex_component_name);
free(modex_name);
rc = ompi_mtl_mxm_recv_ep_address(procs[i], &ep_address, &ep_address_len);
if (rc != OMPI_SUCCESS) {
goto bail;
}
#if MXM_API < MXM_VERSION(2,0)
if (ep_address_len != sizeof(ep_info[i])) {
MXM_ERROR("Invalid endpoint address length");
rc = OMPI_ERROR;
goto bail;
}
memcpy(&ep_info[i], ep_address, ep_address_len);
conn_reqs[i].ptl_addr[MXM_PTL_SELF] = (struct sockaddr *)&(ep_info[i].ptl_addr[MXM_PTL_SELF]);
conn_reqs[i].ptl_addr[MXM_PTL_SHM] = (struct sockaddr *)&(ep_info[i].ptl_addr[MXM_PTL_SHM]);
conn_reqs[i].ptl_addr[MXM_PTL_SHM] = (struct sockaddr *)&(ep_info[i].ptl_addr[MXM_PTL_SHM]);
conn_reqs[i].ptl_addr[MXM_PTL_RDMA] = (struct sockaddr *)&(ep_info[i].ptl_addr[MXM_PTL_RDMA]);
#else
ompi_mtl_mxm_set_conn_req(&conn_reqs[i], &ep_info[i], MXM_DOMAIN_SELF);
ompi_mtl_mxm_set_conn_req(&conn_reqs[i], &ep_info[i], MXM_DOMAIN_SHM);
ompi_mtl_mxm_set_conn_req(&conn_reqs[i], &ep_info[i], MXM_DOMAIN_IB);
mtl_peer_data[i] = (mca_mtl_mxm_endpoint_t *) OBJ_NEW(mca_mtl_mxm_endpoint_t);
mtl_peer_data[i]->mtl_mxm_module = &ompi_mtl_mxm;
err = mxm_ep_connect(ompi_mtl_mxm.ep, ep_address, &mtl_peer_data[i]->mxm_conn);
if (err != MXM_OK) {
MXM_ERROR("MXM returned connect error: %s\n", mxm_error_string(err));
rc = OMPI_ERROR;
goto bail;
}
#endif
free(ep_address);
}
#if MXM_API < MXM_VERSION(2,0)
/* Connect to remote peers */
if (mxm_get_version() < MXM_VERSION(1,5)) {
timeout = 1000;
} else {
timeout = -1;
}
timeout = (mxm_get_version() < MXM_VERSION(1,5)) ? 1000 : -1;
err = mxm_ep_connect(ompi_mtl_mxm.ep, conn_reqs, nprocs, timeout);
if (MXM_OK != err) {
MXM_ERROR("MXM returned connect error: %s\n", mxm_error_string(err));
@ -534,13 +502,14 @@ int ompi_mtl_mxm_add_procs(struct mca_mtl_base_module_t *mtl, size_t nprocs,
mtl_peer_data[i]->mtl_mxm_module = &ompi_mtl_mxm;
mtl_peer_data[i]->mxm_conn = conn_reqs[i].conn;
}
rc = OMPI_SUCCESS;
#endif
rc = OMPI_SUCCESS;
bail:
if (conn_reqs)
free(conn_reqs);
if (ep_info)
free(ep_info);
#if MXM_API < MXM_VERSION(2,0)
free(conn_reqs);
free(ep_info);
#endif
return rc;
}

Просмотреть файл

@ -15,7 +15,9 @@
#include <unistd.h>
#include <mxm/api/mxm_api.h>
#if MXM_API < MXM_VERSION(2, 0)
#include <mxm/api/mxm_addr.h>
#endif
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/mtl/mtl.h"

Просмотреть файл

@ -38,14 +38,11 @@ typedef struct mca_mtl_mxm_module_t {
} mca_mtl_mxm_module_t;
typedef struct ompi_mtl_mxm_ep_conn_info_t {
#if MXM_API < MXM_VERSION(2,0)
typedef struct ompi_mtl_mxm_ep_conn_info_t {
struct sockaddr_storage ptl_addr[MXM_PTL_LAST];
#else
unsigned domain_bitmap;
struct sockaddr_storage dest_addr[MXM_DOMAIN_LAST];
#endif
} ompi_mtl_mxm_ep_conn_info_t;
#endif
extern mca_mtl_mxm_module_t ompi_mtl_mxm;