1
1

added support for querying oob contact info and setting

contact info for seed daemon 

This commit was SVN r2167.
Этот коммит содержится в:
Tim Woodall 2004-08-16 19:39:54 +00:00
родитель bd8af9ecdf
Коммит 9c9037ef2b
9 изменённых файлов: 286 добавлений и 28 удалений

Просмотреть файл

@ -239,7 +239,6 @@ typedef void (*mca_oob_callback_packed_fn_t)(
int status,
ompi_process_name_t* peer,
ompi_buffer_t* buffer,
int count,
int* tag,
void* cbdata);

Просмотреть файл

@ -8,6 +8,7 @@
#include "runtime/runtime.h"
#include "util/output.h"
#include "util/proc_info.h"
#include "mca/mca.h"
#include "mca/base/base.h"
#include "mca/ns/base/base.h"
@ -33,6 +34,29 @@ ompi_process_name_t mca_oob_name_seed;
ompi_process_name_t mca_oob_name_self;
ompi_process_name_t mca_oob_name_any;
/**
* Parse contact info string into process name and list of uri strings.
*/
static int mca_oob_base_parse_contact_info(
char* contact_info,
ompi_process_name_t* name,
char*** uri)
{
/* parse the process name */
char* ptr = strchr(contact_info, '=');
if(NULL == ptr)
return OMPI_ERR_BAD_PARAM;
*ptr = '\0';
ptr++;
*name = *ns_base_convert_string_to_process_name(contact_info);
/* parse the remainder of the string into an array of uris */
*uri = ompi_argv_split(ptr, ';');
return OMPI_SUCCESS;
}
/**
* Function for selecting one module from all those that are
* available.
@ -48,6 +72,9 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
mca_oob_t *module;
extern ompi_list_t mca_oob_base_components;
ompi_process_name_t *self;
int i, id;
char* seed;
char** uri = NULL;
/* setup local name */
self = mca_pcm.pcm_self();
@ -58,12 +85,25 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
mca_oob_name_self = *self;
/* setup wildcard name */
mca_oob_name_any = *ompi_name_server.create_process_name(MCA_NS_BASE_CELLID_MAX,
MCA_NS_BASE_JOBID_MAX,
MCA_NS_BASE_VPID_MAX);
mca_oob_name_any = *ompi_name_server.create_process_name(
MCA_NS_BASE_CELLID_MAX,
MCA_NS_BASE_JOBID_MAX,
MCA_NS_BASE_VPID_MAX);
/* setup seed daemons name */
mca_oob_name_seed = *ompi_name_server.create_process_name(0,0,0);
/* setup seed daemons name and address */
id = mca_base_param_register_string("oob","base","seed",NULL,NULL);
mca_base_param_lookup_string(id,&seed);
if(seed == NULL) {
/* we are seed daemon */
mca_oob_name_seed = mca_oob_name_self;
} else {
/* resolve name of seed daemon */
mca_oob_base_parse_contact_info(seed,&mca_oob_name_seed, &uri);
if(NULL == uri || NULL == *uri) {
ompi_output(0, "mca_oob_base_init: unable to parse seed contact info.");
return OMPI_ERROR;
}
}
/* Traverse the list of available modules; call their init functions. */
for (item = ompi_list_get_first(&mca_oob_base_components);
@ -85,9 +125,23 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
inited->oob_component = component;
inited->oob_module = module;
ompi_list_append(&mca_oob_base_modules, &inited->super);
/* lookup contact info for this component */
if(NULL != uri) {
for(i=0; NULL != uri[i]; i++) {
if(strncmp(uri[i], component->oob_base.mca_component_name,
strlen(component->oob_base.mca_component_name)) == 0) {
module->oob_set_seed(uri[i]);
}
}
}
}
}
}
if(uri != NULL) {
ompi_argv_free(uri);
}
/* set the global variable to point to the first initialize module */
if (0 < ompi_list_get_size(&mca_oob_base_modules)) {
first = (mca_oob_base_info_t *) ompi_list_get_first(&mca_oob_base_modules);
@ -113,5 +167,13 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
char* mca_oob_get_contact_info()
{
return strdup("tcp:localhost:5000");
char *proc_name = ns_base_get_proc_name_string(MCA_OOB_NAME_SELF);
char *proc_addr = mca_oob.oob_get_addr();
size_t size = strlen(proc_name) + 1 + strlen(proc_addr) + 1;
char *contact_info = malloc(size);
sprintf(contact_info, "%s=%s", proc_name, proc_addr);
free(proc_name);
free(proc_addr);
return contact_info;
}

Просмотреть файл

@ -24,6 +24,9 @@ int mca_oob_cofs_close(void);
mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads);
int mca_oob_cofs_finalize(mca_oob_t*);
/* stubs */
char* mca_oob_cofs_get_addr(void);
int mca_oob_cofs_set_seed(const char*);
/**

Просмотреть файл

@ -41,6 +41,8 @@ mca_oob_base_component_1_0_0_t mca_oob_cofs_component = {
};
mca_oob_t mca_oob_cofs = {
mca_oob_cofs_get_addr,
mca_oob_cofs_set_seed,
mca_oob_cofs_send,
mca_oob_cofs_recv,
mca_oob_cofs_send_nb,
@ -54,6 +56,16 @@ int mca_oob_cofs_my_procid;
uint64_t mca_oob_cofs_serial;
char* mca_oob_cofs_get_addr(void)
{
return strdup("cofs://");
}
int mca_oob_cofs_set_seed(const char* addr)
{
return OMPI_SUCCESS;
}
mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
{
int len;
@ -93,7 +105,11 @@ mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_t
free(tmp);
mca_oob_cofs_serial = 0;
#if 0
return NULL;
#else
return &mca_oob_cofs;
#endif
}

Просмотреть файл

@ -38,6 +38,20 @@ typedef struct mca_oob_1_0_0_t mca_oob_t;
* OOB Component/Module function prototypes.
*/
/**
* Implementation of mca_oob_base_module_get_addr().
*/
typedef char* (*mca_oob_base_module_get_addr_fn_t)(void);
/**
* Implementation of mca_oob_base_module_set_seed().
*
* @param addr Address of seed in component specific uri format.
*/
typedef int (*mca_oob_base_module_set_seed_fn_t)(const char* addr);
/**
* Implementation of mca_oob_send().
*
@ -130,12 +144,15 @@ typedef int (*mca_oob_base_module_finalize_fn_t)(mca_oob_t*);
* OOB Module
*/
struct mca_oob_1_0_0_t {
mca_oob_base_module_send_fn_t oob_send;
mca_oob_base_module_recv_fn_t oob_recv;
mca_oob_base_module_send_nb_fn_t oob_send_nb;
mca_oob_base_module_recv_nb_fn_t oob_recv_nb;
mca_oob_base_module_finalize_fn_t oob_finalize;
mca_oob_base_module_get_addr_fn_t oob_get_addr;
mca_oob_base_module_set_seed_fn_t oob_set_seed;
mca_oob_base_module_send_fn_t oob_send;
mca_oob_base_module_recv_fn_t oob_recv;
mca_oob_base_module_send_nb_fn_t oob_send_nb;
mca_oob_base_module_recv_nb_fn_t oob_recv_nb;
mca_oob_base_module_finalize_fn_t oob_finalize;
};
/**
* OOB Component
*/
@ -147,7 +164,7 @@ typedef mca_oob_t* (*mca_oob_base_component_init_fn_t)(
* the standard component data structure
*/
struct mca_oob_base_component_1_0_0_t {
mca_base_component_t oob_version;
mca_base_component_t oob_base;
mca_base_component_data_1_0_0_t oob_data;
mca_oob_base_component_init_fn_t oob_init;
};

Просмотреть файл

@ -7,7 +7,10 @@
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "util/output.h"
#include "util/if.h"
#include "mca/oob/tcp/oob_tcp.h"
@ -38,6 +41,8 @@ mca_oob_tcp_component_t mca_oob_tcp_component = {
};
static mca_oob_t mca_oob_tcp = {
mca_oob_tcp_get_addr,
mca_oob_tcp_set_seed,
mca_oob_tcp_send,
mca_oob_tcp_recv,
mca_oob_tcp_send_nb,
@ -61,6 +66,17 @@ static inline int mca_oob_tcp_param_register_int(
}
static inline char* mca_oob_tcp_param_register_str(
const char* param_name,
const char* default_value)
{
int id = mca_base_param_register_string("ptl","tcp",param_name,NULL,default_value);
char* param_value = NULL;
mca_base_param_lookup_string(id,&param_value);
return param_value;
}
/*
* Initialize global variables used w/in this module.
*/
@ -80,6 +96,7 @@ int mca_oob_tcp_open(void)
mca_oob_tcp_param_register_int("peer_limit", -1);
mca_oob_tcp_component.tcp_peer_retries =
mca_oob_tcp_param_register_int("peer_retries", 60);
memset(&mca_oob_tcp_component.tcp_seed_addr, 0, sizeof(mca_oob_tcp_component.tcp_seed_addr));
/* initialize state */
mca_oob_tcp_component.tcp_listen_sd = -1;
@ -363,3 +380,104 @@ int mca_oob_tcp_process_name_compare(const ompi_process_name_t* n1, const ompi_p
return(0);
}
/*
* Return local process address as a URI string.
*/
char* mca_oob_tcp_get_addr(void)
{
int i;
char *contact_info = malloc((ompi_ifcount()+1) * 32);
char *ptr = contact_info;
*ptr = 0;
for(i=ompi_ifbegin(); i>0; i=ompi_ifnext(i)) {
struct sockaddr_in addr;
ompi_ifindextoaddr(i, (struct sockaddr*)&addr, sizeof(addr));
if(ptr != contact_info) {
ptr += sprintf(ptr, ";");
}
ptr += sprintf(ptr, "tcp://%s:%d", inet_ntoa(addr.sin_addr), ntohs(mca_oob_tcp_component.tcp_listen_port));
}
return contact_info;
}
/*
* Parse a URI string into an IP address and port number.
*/
static int mca_oob_tcp_parse_uri(const char* uri, struct sockaddr_in* inaddr)
{
char* tmp = strdup(uri);
char* ptr = tmp + 6;
char* addr = ptr;
char* port;
if(strncmp(tmp, "tcp://", 6) != 0) {
free(tmp);
return OMPI_ERR_BAD_PARAM;
}
ptr = strchr(addr, ':');
if(NULL == ptr) {
free(tmp);
return OMPI_ERR_BAD_PARAM;
}
*ptr = '\0';
ptr++;
port = ptr;
memset(inaddr, 0, sizeof(inaddr));
inaddr->sin_family = AF_INET;
inaddr->sin_addr.s_addr = inet_addr(addr);
if(inaddr->sin_addr.s_addr == INADDR_ANY) {
free(tmp);
return OMPI_ERR_BAD_PARAM;
}
inaddr->sin_port = htons(atoi(port));
free(tmp);
return OMPI_SUCCESS;
}
/*
* Set address for the seed daemon. Note that this could be called multiple
* times if the seed daemon exports multiple addresses.
*/
int mca_oob_tcp_set_seed(const char* uri)
{
struct sockaddr_in inaddr;
int rc;
int ifindex;
if((rc = mca_oob_tcp_parse_uri(uri,&inaddr)) != OMPI_SUCCESS)
return rc;
/* scan through the list of interface address exported by this host
* and look for a match on a directly connected network
*/
for(ifindex=ompi_ifbegin(); ifindex>0; ifindex=ompi_ifnext(ifindex)) {
struct sockaddr_in ifaddr;
struct sockaddr_in ifmask;
ompi_ifindextoaddr(ifindex, (struct sockaddr*)&ifaddr, sizeof(ifaddr));
ompi_ifindextomask(ifindex, (struct sockaddr*)&ifmask, sizeof(ifmask));
if((ifaddr.sin_addr.s_addr & ifmask.sin_addr.s_addr) ==
(inaddr.sin_addr.s_addr & ifmask.sin_addr.s_addr)) {
mca_oob_tcp_component.tcp_seed_addr = inaddr;
return OMPI_SUCCESS;
}
}
/* if no match was found - may be still be reachable - go ahead and
* set this adddress as seed address.
*/
if (mca_oob_tcp_component.tcp_seed_addr.sin_family == 0) {
mca_oob_tcp_component.tcp_seed_addr = inaddr;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -67,6 +67,18 @@ int mca_oob_tcp_finalize(mca_oob_t*);
*/
int mca_oob_tcp_process_name_compare(const ompi_process_name_t* n1, const ompi_process_name_t* n2);
/**
* Obtain contact information for this host (e.g. <ipaddress>:<port>)
*/
char* mca_oob_tcp_get_addr(void);
/**
* Set address for the seed.
*/
int mca_oob_tcp_set_seed(const char*);
/**
* Similiar to unix writev(2).
*
@ -160,21 +172,23 @@ int mca_oob_tcp_recv_nb(
*/
struct mca_oob_tcp_component_t {
mca_oob_base_component_1_0_0_t super; /**< base OOB component */
int tcp_listen_sd; /**< listen socket for incoming connection requests */
unsigned short tcp_listen_port; /**< listen port */
ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */
ompi_rb_tree_t tcp_peer_tree; /**< tree of peers sorted by name */
ompi_free_list_t tcp_peer_free; /**< free list of peers */
size_t tcp_peer_limit; /**< max size of tcp peer cache */
int tcp_peer_retries; /**< max number of retries before declaring peer gone */
ompi_free_list_t tcp_msgs; /**< free list of messages */
ompi_event_t tcp_send_event; /**< event structure for sends */
ompi_event_t tcp_recv_event; /**< event structure for recvs */
ompi_mutex_t tcp_lock; /**< lock for accessing module state */
ompi_list_t tcp_msg_post; /**< list of recieves user has posted */
ompi_list_t tcp_msg_recv; /**< list of recieved messages */
ompi_mutex_t tcp_match_lock; /**< lock held while searching/posting messages */
int tcp_listen_sd; /**< listen socket for incoming connection requests */
unsigned short tcp_listen_port; /**< listen port */
struct sockaddr_in tcp_seed_addr; /**< uri string of tcp peer address */
ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */
ompi_rb_tree_t tcp_peer_tree; /**< tree of peers sorted by name */
ompi_free_list_t tcp_peer_free; /**< free list of peers */
size_t tcp_peer_limit; /**< max size of tcp peer cache */
int tcp_peer_retries; /**< max number of retries before declaring peer gone */
ompi_free_list_t tcp_msgs; /**< free list of messages */
ompi_event_t tcp_send_event; /**< event structure for sends */
ompi_event_t tcp_recv_event; /**< event structure for recvs */
ompi_mutex_t tcp_lock; /**< lock for accessing module state */
ompi_list_t tcp_msg_post; /**< list of recieves user has posted */
ompi_list_t tcp_msg_recv; /**< list of recieved messages */
ompi_mutex_t tcp_match_lock; /**< lock held while searching/posting messages */
};
/**
* Convenience Typedef
*/

Просмотреть файл

@ -237,6 +237,12 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
ompi_output(0, "mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed with errno=%d\n", errno);
}
/* resolve the peer address */
if ((rc = mca_oob_tcp_peer_name_lookup(peer)) != OMPI_SUCCESS) {
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
/* start the connect - will likely fail with EINPROGRESS */
if(connect(peer->peer_sd, (struct sockaddr*)&(peer->peer_addr), sizeof(peer->peer_addr)) < 0) {
/* non-blocking so wait for completion */
@ -247,7 +253,6 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
}
ompi_output(0, "mca_oob_tcp_msg_peer_start_connect: unable to connect to peer. errno=%d", errno);
mca_oob_tcp_peer_close(peer);
peer->peer_retries++;
return OMPI_ERR_UNREACH;
}
@ -745,3 +750,22 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
}
/*
* resolve process name to an actual internet address.
*/
int mca_oob_tcp_peer_name_lookup(mca_oob_tcp_peer_t* peer)
{
if(mca_oob_tcp_process_name_compare(&peer->peer_name, MCA_OOB_NAME_SEED) == 0) {
peer->peer_addr = mca_oob_tcp_component.tcp_seed_addr;
return OMPI_SUCCESS;
} else {
peer->peer_addr.sin_family = AF_INET;
peer->peer_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
peer->peer_addr.sin_port = htons(5000+peer->peer_name.vpid);
}
/* insert code to resolve name via registry */
return OMPI_ERROR;
}

Просмотреть файл

@ -117,6 +117,11 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd);
*/
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer);
/**
* Attempt to resolve peer address.
*/
int mca_oob_tcp_peer_name_lookup(mca_oob_tcp_peer_t* peer);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif