tcp ptl initialization
This commit was SVN r389.
Этот коммит содержится в:
родитель
deb6aa4f8d
Коммит
b6457fdaa9
@ -3,12 +3,10 @@
|
||||
|
||||
void mca_pml_ptl_comm_init(mca_pml_comm_t* comm, size_t size)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
|
||||
void mca_pml_ptl_comm_destroy(mca_pml_comm_t* comm)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
|
@ -125,12 +125,12 @@ struct mca_ptl_t {
|
||||
|
||||
/* PTL common attributes */
|
||||
mca_ptl_base_module_t* ptl_module;
|
||||
size_t ptl_frag_first_size; /**< maximum size of first fragment */
|
||||
size_t ptl_frag_min_size; /**< threshold below which the CDI will not fragment */
|
||||
size_t ptl_frag_max_size; /**< maximum fragment size supported by the CDI */
|
||||
uint32_t ptl_endpoint_latency; /**< relative/absolute measure of latency */
|
||||
uint64_t ptl_endpoint_bandwidth; /**< bandwidth (bytes/sec) supported by each endpoint */
|
||||
size_t ptl_endpoint_count; /**< number endpoints supported by this CDI */
|
||||
bool ptl_exclusive; /**< indicates this PTL should be used exclusively */
|
||||
size_t ptl_first_frag_size; /**< maximum size of first fragment */
|
||||
size_t ptl_min_frag_size; /**< threshold below which the PTL will not fragment */
|
||||
size_t ptl_max_frag_size; /**< maximum fragment size supported by the PTL */
|
||||
uint32_t ptl_latency; /**< relative/absolute measure of latency */
|
||||
uint64_t ptl_bandwidth; /**< bandwidth (bytes/sec) supported by each endpoint */
|
||||
|
||||
/* PTL function table */
|
||||
mca_ptl_base_add_procs_fn_t ptl_add_procs;
|
||||
|
@ -14,6 +14,5 @@ noinst_LTLIBRARIES = libmca_ptl_tcp.la
|
||||
libmca_ptl_tcp_la_SOURCES = \
|
||||
ptl_tcp.c \
|
||||
ptl_tcp.h \
|
||||
ptl_tcp_init.c \
|
||||
ptl_tcp_module.c \
|
||||
ptl_tcp_send.c
|
||||
|
@ -3,6 +3,7 @@
|
||||
*/
|
||||
|
||||
#include "lam/mem/malloc.h"
|
||||
#include "lam/util/output.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
#include "ptl_tcp.h"
|
||||
@ -10,13 +11,13 @@
|
||||
|
||||
mca_ptl_tcp_t mca_ptl_tcp = {
|
||||
{
|
||||
&mca_ptl_tcp_module_1_0_0_0,
|
||||
&mca_ptl_tcp_module.super,
|
||||
0, /* ptl_exclusive */
|
||||
0, /* ptl_frag_first_size */
|
||||
0, /* ptl_frag_min_size */
|
||||
0, /* ptl_frag_max_size */
|
||||
0, /* ptl_endpoint_latency */
|
||||
0, /* ptl_endpoint_bandwidth */
|
||||
0, /* ptl_endpoint_count */
|
||||
0, /* ptl_latency */
|
||||
0, /* ptl_andwidth */
|
||||
mca_ptl_tcp_add_procs,
|
||||
mca_ptl_tcp_fini,
|
||||
mca_ptl_tcp_send,
|
||||
@ -24,3 +25,21 @@ mca_ptl_tcp_t mca_ptl_tcp = {
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
int mca_ptl_tcp_create(int if_index)
|
||||
{
|
||||
mca_ptl_tcp_t* ptl = (mca_ptl_tcp_t*)LAM_MALLOC(sizeof(mca_ptl_tcp_t));
|
||||
if(NULL == ptl) {
|
||||
lam_output(0,"mca_ptl_tcp: unable to allocate ptl");
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
memcpy(ptl, &mca_ptl_tcp, sizeof(mca_ptl_tcp));
|
||||
mca_ptl_tcp_module.tcp_ptls[mca_ptl_tcp_module.tcp_num_ptls++] = ptl;
|
||||
|
||||
/* initialize the ptl */
|
||||
ptl->tcp_ifindex = if_index;
|
||||
lam_ifindextoaddr(if_index, &ptl->tcp_addr, sizeof(ptl->tcp_addr));
|
||||
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1,15 +1,14 @@
|
||||
/** @file
|
||||
/* @file
|
||||
*
|
||||
* TCP PTL
|
||||
*/
|
||||
|
||||
/*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef MCA_PTL_TCP_H_
|
||||
#define MCA_PTL_TCP_H
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include "lam/util/reactor.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
@ -21,12 +20,19 @@
|
||||
|
||||
struct mca_ptl_tcp_module_1_0_0_t {
|
||||
mca_ptl_base_module_1_0_0_t super;
|
||||
struct mca_ptl_tcp_t** tcp_ptls;
|
||||
size_t tcp_num_ptls; /**< number of ptls actually used */
|
||||
size_t tcp_max_ptls; /**< maximum number of ptls - available kernel ifs */
|
||||
lam_reactor_t tcp_reactor;
|
||||
int tcp_listen;
|
||||
unsigned short tcp_port;
|
||||
char* tcp_if_include; /**< comma seperated list of interface to include */
|
||||
char* tcp_if_exclude; /**< comma seperated list of interface to exclude */
|
||||
};
|
||||
typedef struct mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_1_0_0_t;
|
||||
typedef struct mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_t;
|
||||
|
||||
extern mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_1_0_0_0;
|
||||
extern mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module;
|
||||
|
||||
extern int mca_ptl_tcp_module_open(void);
|
||||
extern int mca_ptl_tcp_module_close(void);
|
||||
@ -35,7 +41,6 @@ extern mca_ptl_t** mca_ptl_tcp_module_init(
|
||||
int *num_ptls,
|
||||
int *thread_min,
|
||||
int *thread_max
|
||||
|
||||
);
|
||||
|
||||
extern void mca_ptl_tcp_module_progress(
|
||||
@ -45,18 +50,23 @@ extern void mca_ptl_tcp_module_progress(
|
||||
|
||||
/**
|
||||
* TCP PTL Interface
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
struct mca_ptl_tcp_t {
|
||||
mca_ptl_t super; /**< comment */
|
||||
mca_ptl_t super;
|
||||
int tcp_ifindex;
|
||||
struct sockaddr_in tcp_addr;
|
||||
};
|
||||
typedef struct mca_ptl_tcp_t mca_ptl_tcp_t;
|
||||
|
||||
extern mca_ptl_tcp_t mca_ptl_tcp;
|
||||
|
||||
|
||||
extern int mca_ptl_tcp_create(
|
||||
int if_index
|
||||
);
|
||||
|
||||
extern int mca_ptl_tcp_fini(
|
||||
struct mca_ptl_t* ptl
|
||||
);
|
||||
@ -66,7 +76,7 @@ extern int mca_ptl_tcp_add_procs(
|
||||
struct lam_proc_t **procs,
|
||||
size_t nprocs
|
||||
);
|
||||
|
||||
|
||||
extern int mca_ptl_tcp_request_alloc(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_base_send_request_t**
|
||||
|
@ -1,17 +1,15 @@
|
||||
/*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include <errno.h>
|
||||
#include "lam/constants.h"
|
||||
#include "lam/util/argv.h"
|
||||
#include "lam/mem/malloc.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
#include "mca/lam/base/module_exchange.h"
|
||||
#include "ptl_tcp.h"
|
||||
|
||||
#define mca_ptl_tcp_param_register_int(n,v) \
|
||||
mca_base_param_lookup_int( \
|
||||
mca_base_param_register_int("ptl","tcp",n,0,v))
|
||||
|
||||
|
||||
mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = {
|
||||
{
|
||||
/* First, the mca_base_module_t struct containing meta information
|
||||
@ -43,18 +41,65 @@ mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = {
|
||||
mca_ptl_tcp_module_progress /* module progress */
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* data structure for receiving reactor callbacks
|
||||
*/
|
||||
|
||||
static void mca_ptl_tcp_module_recv(int sd, void*);
|
||||
static void mca_ptl_tcp_module_send(int sd, void*);
|
||||
static void mca_ptl_tcp_module_except(int sd, void*);
|
||||
|
||||
static lam_reactor_listener_t mca_ptl_tcp_module_listener = {
|
||||
mca_ptl_tcp_module_recv,
|
||||
mca_ptl_tcp_module_send,
|
||||
mca_ptl_tcp_module_except,
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* utility routines for parameter registration
|
||||
*/
|
||||
|
||||
static inline char* mca_ptl_tcp_param_register_string(
|
||||
const char* param_name,
|
||||
const char* default_value)
|
||||
{
|
||||
char *param_value;
|
||||
int id = mca_base_param_register_string("ptl","tcp",param_name,NULL,default_value);
|
||||
mca_base_param_lookup_string(id, ¶m_value);
|
||||
return param_value;
|
||||
}
|
||||
|
||||
/**
|
||||
* some comment
|
||||
*
|
||||
* @param foo description
|
||||
* @return
|
||||
*
|
||||
* long description
|
||||
static inline int mca_ptl_tcp_param_register_int(
|
||||
const char* param_name,
|
||||
int default_value)
|
||||
{
|
||||
int id = mca_base_param_register_string("ptl","tcp",param_name,NULL,default_value);
|
||||
return mca_base_param_lookup_int(id);
|
||||
}
|
||||
|
||||
/*
|
||||
* Called by MCA framework to open the module, registers
|
||||
* module parameters.
|
||||
*/
|
||||
|
||||
int mca_ptl_tcp_module_open(void)
|
||||
{
|
||||
/* register TCP module parameters */
|
||||
mca_ptl_tcp_module.tcp_if_include =
|
||||
mca_ptl_tcp_param_register_string("if-include", "");
|
||||
mca_ptl_tcp_module.tcp_if_exclude =
|
||||
mca_ptl_tcp_param_register_string("if-exclude", "");
|
||||
mca_ptl_tcp.super.ptl_exclusive =
|
||||
mca_ptl_tcp_param_register_int("exclusive", 0);
|
||||
mca_ptl_tcp.super.ptl_first_frag_size =
|
||||
mca_ptl_tcp_param_register_uint32("first-frag-size", 16*1024);
|
||||
mca_ptl_tcp.super.ptl_min_frag_size =
|
||||
mca_ptl_tcp_param_register_uint32("min-frag-size", 64*1024);
|
||||
mca_ptl_tcp.super.ptl_max_frag_size =
|
||||
mca_ptl_tcp_param_register_uint32("max-frag-size", -1);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
@ -63,14 +108,190 @@ int mca_ptl_tcp_module_close(void)
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
mca_ptl_t** mca_ptl_tcp_module_init(int* num_tcps, int* thread_min, int* thread_max)
|
||||
|
||||
/*
|
||||
* Create a TCP PTL instance for either:
|
||||
* (1) all interfaces specified by the user
|
||||
* (2) all available interfaces
|
||||
* (3) all available interfaces except for those excluded by the user
|
||||
*/
|
||||
|
||||
static int mca_ptl_tcp_module_create_instances()
|
||||
{
|
||||
lam_reactor_init(&mca_ptl_tcp_module.tcp_reactor);
|
||||
return NULL;
|
||||
int if_count = lam_ifcount();
|
||||
int if_index;
|
||||
char **include;
|
||||
char **exclude;
|
||||
char **argv;
|
||||
|
||||
if(if_count <= 0)
|
||||
return LAM_ERROR;
|
||||
|
||||
/* allocate memory for ptls */
|
||||
mca_ptl_tcp_module.tcp_max_ptls = if_count;
|
||||
mca_ptl_tcp_module.tcp_ptls = (mca_ptl_tcp_t**)LAM_MALLOC(if_count * sizeof(mca_ptl_tcp_t*));
|
||||
if(NULL == mca_ptl_tcp_module.tcp_ptls) {
|
||||
lam_output(0, "mca_ptl_tcp: unable to initialize module");
|
||||
return LAM_ERROR;
|
||||
}
|
||||
|
||||
/* if the user specified an interface list - use these only */
|
||||
argv = include = lam_argv_split(mca_ptl_tcp_module.tcp_if_include,'\'');
|
||||
while(argv && *argv) {
|
||||
char* if_name = *argv;
|
||||
int if_index = lam_ifnametoindex(if_name);
|
||||
if(if_index < 0) {
|
||||
lam_output(0,"mca_ptl_tcp_module_init: invalid interface \"%s\"", if_name);
|
||||
} else {
|
||||
mca_ptl_tcp_create(if_index);
|
||||
}
|
||||
argv++;
|
||||
}
|
||||
lam_argv_free(include);
|
||||
|
||||
/* if the interface list was not specified by the user, create
|
||||
* a PTL for each interface that was not excluded.
|
||||
*/
|
||||
exclude = lam_argv_split(mca_ptl_tcp_module.tcp_if_exclude,'\'');
|
||||
for(if_index = lam_ifbegin(); if_index >= 0; if_index = lam_ifnext()) {
|
||||
int argc;
|
||||
char if_name[32];
|
||||
lam_ifindextoname(if_index, if_name, sizeof(if_name));
|
||||
|
||||
/* check to see if this interface exists in the exclude list */
|
||||
argv = exclude;
|
||||
while(argv && *argv) {
|
||||
if(strcmp(*argv,if_name) == 0)
|
||||
break;
|
||||
argv++;
|
||||
}
|
||||
/* if this interface was not found in the excluded list - create a PTL */
|
||||
if(argv == 0 || *argv == 0)
|
||||
mca_ptl_tcp_create(if_index);
|
||||
}
|
||||
lam_argv_free(exclude);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a listen socket and bind to all interfaces
|
||||
*/
|
||||
|
||||
static int mca_ptl_tcp_module_create_listen()
|
||||
{
|
||||
/* create a listen socket for incoming connections */
|
||||
mca_ptl_tcp_module.tcp_listen = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if(mca_ptl_tcp_module.tcp_listen < 0) {
|
||||
lam_output(0,"mca_ptl_tcp_module_init: socket() failed with errno=%d", errno);
|
||||
return LAM_ERROR;
|
||||
}
|
||||
|
||||
/* bind to all addresses and dynamically assigned port */
|
||||
struct sockaddr_in inaddr;
|
||||
inaddr.sin_family = AF_INET;
|
||||
inaddr.sin_addr.s_addr = INADDR_ANY;
|
||||
inaddr.sin_port = 0;
|
||||
|
||||
if(bind(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
|
||||
lam_output(0,"mca_ptl_tcp_module_init: bind() failed with errno=%d", errno);
|
||||
return LAM_ERROR;
|
||||
}
|
||||
|
||||
/* resolve system assignend port */
|
||||
#if defined(__linux__)
|
||||
socklen_t addrlen = sizeof(struct sockaddr_in);
|
||||
#else
|
||||
int addrlen = sizeof(struct sockaddr_in);
|
||||
#endif
|
||||
if(getsockname(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&inaddr, &addrlen) < 0) {
|
||||
lam_output(0, "mca_ptl_tcp_module_init: getsockname() failed with errno=%d", errno);
|
||||
return LAM_ERROR;
|
||||
}
|
||||
mca_ptl_tcp_module.tcp_port = inaddr.sin_port;
|
||||
|
||||
/* initialize reactor and register listen port */
|
||||
lam_reactor_init(&mca_ptl_tcp_module.tcp_reactor);
|
||||
lam_reactor_insert(
|
||||
&mca_ptl_tcp_module.tcp_reactor,
|
||||
mca_ptl_tcp_module.tcp_listen,
|
||||
&mca_ptl_tcp_module_listener,
|
||||
0,
|
||||
LAM_NOTIFY_RECV|LAM_NOTIFY_EXCEPT);
|
||||
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register TCP module addressing information. The MCA framework
|
||||
* will make this available to all peers.
|
||||
*
|
||||
* FIX: just pass around sockaddr_in for now
|
||||
*/
|
||||
|
||||
static int mca_ptl_tcp_module_exchange()
|
||||
{
|
||||
size_t i;
|
||||
struct sockaddr_in* addrs = (struct sockaddr_in*)LAM_MALLOC
|
||||
(mca_ptl_tcp_module.tcp_num_ptls * sizeof(struct sockaddr_in));
|
||||
for(i=0; i<mca_ptl_tcp_module.tcp_num_ptls; i++) {
|
||||
mca_ptl_tcp_t* ptl = mca_ptl_tcp_module.tcp_ptls[i];
|
||||
addrs[i] = ptl->tcp_addr;
|
||||
addrs[i].sin_port = mca_ptl_tcp_module.tcp_listen;
|
||||
}
|
||||
return mca_base_modex_send(&mca_ptl_tcp_module.super.ptlm_version,
|
||||
addrs, sizeof(struct sockaddr_in),mca_ptl_tcp_module.tcp_num_ptls);
|
||||
}
|
||||
|
||||
/*
|
||||
* TCP module initialization:
|
||||
* (1) read interface list from kernel and compare against module parameters
|
||||
* then create a PTL instance for selected interfaces
|
||||
* (2) setup TCP listen socket for incoming connection attempts
|
||||
* (3) register PTL parameters with the MCA
|
||||
*/
|
||||
mca_ptl_t** mca_ptl_tcp_module_init(int* num_ptls, int* thread_min, int* thread_max)
|
||||
{
|
||||
*num_ptls = 0;
|
||||
*thread_min = MPI_THREAD_MULTIPLE;
|
||||
*thread_max = MPI_THREAD_MULTIPLE;
|
||||
|
||||
/* create a PTL TCP module for selected interfaces */
|
||||
if(mca_ptl_tcp_module_create_instances() != LAM_SUCCESS)
|
||||
return 0;
|
||||
|
||||
/* create a TCP listen socket for incoming connection attempts */
|
||||
if(mca_ptl_tcp_module_create_listen() != LAM_SUCCESS)
|
||||
return 0;
|
||||
|
||||
/* register TCP parameters with the MCA framework */
|
||||
if(mca_ptl_tcp_module_exchange() != LAM_SUCCESS)
|
||||
return 0;
|
||||
|
||||
*num_ptls = mca_ptl_tcp_module.tcp_num_ptls;
|
||||
return (mca_ptl_t**)mca_ptl_tcp_module.tcp_ptls;
|
||||
}
|
||||
|
||||
/*
|
||||
* All TCP progress is handled via an event loop based on select. Events
|
||||
* are dispatched to the appropriate callbacks as file descriptors become
|
||||
* available for read/write.
|
||||
*/
|
||||
void mca_ptl_tcp_module_progress(mca_ptl_base_tstamp_t tstamp)
|
||||
{
|
||||
lam_reactor_poll(&mca_ptl_tcp_module.tcp_reactor);
|
||||
}
|
||||
|
||||
|
||||
static void mca_ptl_tcp_module_recv(int sd, void* user)
|
||||
{
|
||||
mca_ptl_tcp_module_accept();
|
||||
}
|
||||
|
||||
static void mca_ptl_tcp_module_send(int sd, void* user)
|
||||
{
|
||||
}
|
||||
|
||||
static void mca_ptl_tcp_module_except(int sd, void* user)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
/*
|
||||
* $HEADER$
|
||||
*/
|
||||
#include "ptl_tcp.h"
|
||||
|
||||
|
||||
int mca_ptl_tcp_send(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_base_send_request_t* sendreq,
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user