1
1

- modifications to oob to support bringing up oob before anything else (ns)

- changed ns/gpr to register callback in component init (and ignore errors if not supported)

This commit was SVN r2303.
Этот коммит содержится в:
Tim Woodall 2004-08-25 17:39:08 +00:00
родитель cbb53f305d
Коммит 3d754c4941
13 изменённых файлов: 106 добавлений и 112 удалений

@ -269,7 +269,7 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
setup and return the module */
if (ompi_process_info.seed) {
int rc;
/* Return a module (choose an arbitrary, positive priority --
it's only relevant compared to other ns components). If
@ -292,7 +292,10 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
mca_gpr_replica_head.lastkey = 0;
/* issue the non-blocking receive */
/* mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); */
rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);
if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) {
return NULL;
}
/* Return the module */

@ -454,13 +454,6 @@ typedef mca_ns_base_cellid_t (*mca_ns_base_module_get_cellid_fn_t)(const ompi_pr
*/
typedef int (*mca_ns_base_module_compare_fn_t)(ompi_ns_cmp_bitmask_t fields, const ompi_process_name_t *name1, const ompi_process_name_t *name2);
/**
* Called after all RTE components have been initialized. Provides the selected
* module the chance to perform additional initialization (e.g. register w/ OOB).
*/
typedef int (*mca_ns_base_module_init_fn_t)(void);
/*
* Ver 1.0.0
*/
@ -481,7 +474,6 @@ struct mca_ns_base_module_1_0_0_t {
mca_ns_base_module_get_jobid_fn_t get_jobid;
mca_ns_base_module_get_cellid_fn_t get_cellid;
mca_ns_base_module_compare_fn_t compare;
mca_ns_base_module_init_fn_t init;
};
typedef struct mca_ns_base_module_1_0_0_t mca_ns_base_module_1_0_0_t;

@ -25,10 +25,6 @@
#include "mca/ns/base/base.h"
#include "ns_proxy.h"
/* empty stub for proxy */
static int ns_proxy_init(void)
{ return OMPI_SUCCESS; }
/*
* Struct of function pointers that need to be initialized
@ -70,8 +66,7 @@ static mca_ns_base_module_t mca_ns_proxy = {
ns_base_get_vpid,
ns_base_get_jobid,
ns_base_get_cellid,
ns_base_compare,
ns_proxy_init
ns_base_compare
};
/*

@ -25,7 +25,6 @@
#include "mca/ns/base/base.h"
#include "ns_replica.h"
static int mca_ns_replica_module_init(void);
/*
* Struct of function pointers that need to be initialized
@ -67,8 +66,7 @@ static mca_ns_base_module_t mca_ns_replica = {
ns_base_get_vpid,
ns_base_get_jobid,
ns_base_get_cellid,
ns_base_compare,
mca_ns_replica_module_init
ns_base_compare
};
/*
@ -122,12 +120,12 @@ int mca_ns_replica_close(void)
mca_ns_base_module_t* mca_ns_replica_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority)
{
/* If we're the seed, then we want to be selected, so do all the
setup and return the module */
if (ompi_process_info.seed) {
int rc;
mca_ns_replica_last_used_cellid = 0;
mca_ns_replica_last_used_jobid = 0;
@ -156,6 +154,11 @@ mca_ns_base_module_t* mca_ns_replica_init(bool *allow_multi_user_threads, bool *
initialized = true;
/* issue non-blocking receive for call_back function */
rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_NS, 0, mca_ns_replica_recv, NULL);
if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) {
ompi_output(0, "mca_ns_replica_init: unable to post non-blocking recv\n");
return NULL;
}
return &mca_ns_replica;
} else {
@ -260,14 +263,3 @@ void mca_ns_replica_recv(int status, ompi_process_name_t* sender,
mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_NS, 0, mca_ns_replica_recv, NULL);
}
/*
* init the selected module - this is called after all RTE components
* have been initialized
*/
static int mca_ns_replica_module_init(void)
{
return mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_NS, 0, mca_ns_replica_recv, NULL);
}

@ -33,9 +33,9 @@ OBJ_CLASS_INSTANCE(
NULL
);
ompi_process_name_t mca_oob_name_seed;
ompi_process_name_t mca_oob_name_self;
ompi_process_name_t mca_oob_name_any;
ompi_process_name_t mca_oob_name_seed = { 0, 0, 0 };
ompi_process_name_t mca_oob_name_self = { MCA_NS_BASE_CELLID_MAX, MCA_NS_BASE_JOBID_MAX, MCA_NS_BASE_VPID_MAX };
ompi_process_name_t mca_oob_name_any = { MCA_NS_BASE_CELLID_MAX, MCA_NS_BASE_JOBID_MAX, MCA_NS_BASE_VPID_MAX };
/**
* Parse contact info string into process name and list of uri strings.
@ -74,7 +74,6 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
mca_oob_base_component_t *component;
mca_oob_t *module;
extern ompi_list_t mca_oob_base_components;
ompi_process_name_t *self;
int i, id;
char* seed;
char** uri = NULL;
@ -82,20 +81,6 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
char** include = ompi_argv_split(mca_oob_base_include, ',');
char** exclude = ompi_argv_split(mca_oob_base_exclude, ',');
/* setup local name */
self = mca_pcmclient.pcmclient_get_self();
if(NULL == self) {
ompi_output(0, "mca_oob_base_init: could not get pcmclient self pointer");
return OMPI_ERROR;
}
mca_oob_name_self = *self;
/* setup wildcard name */
mca_oob_name_any = *ompi_name_server.create_process_name(
MCA_NS_BASE_CELLID_MAX,
MCA_NS_BASE_JOBID_MAX,
MCA_NS_BASE_VPID_MAX);
/* setup seed daemons name and address */
id = mca_base_param_register_string("oob","base","seed",NULL,NULL);
mca_base_param_lookup_string(id,&seed);
@ -230,24 +215,20 @@ int mca_oob_set_contact_info(const char* seed)
return OMPI_SUCCESS;
}
/**
* Obtains the contact info (oob implementation specific) URI strings through
* which this process can be contacted on an OOB channel.
*
* @return A null terminated string.
*
* The caller is responsible for freeing the returned string.
*/
/**
* Called to request the selected oob components to
* register their address with the seed deamon.
*/
int mca_oob_base_register(void)
int mca_oob_base_module_init(void)
{
ompi_list_item_t* item;
/* setup self to point to actual process name */
if(ompi_name_server.compare(OMPI_NS_CMP_ALL, &mca_oob_name_self, &mca_oob_name_any) == 0) {
mca_oob_name_self = *mca_pcmclient.pcmclient_get_self();
}
/* Initialize all modules after oob/gpr/ns have initialized */
for (item = ompi_list_get_first(&mca_oob_base_modules);
item != ompi_list_get_end(&mca_oob_base_modules);

@ -138,10 +138,7 @@ mca_oob_cofs_recv_nb(
mca_oob_callback_fn_t cbfunc,
void* cbdata)
{
int status = mca_oob_cofs_recv(peer, iov, count, &tag, flags);
if(NULL != cbfunc)
cbfunc(status, peer, iov, count, tag, cbdata);
return status;
return OMPI_ERR_NOT_IMPLEMENTED;
}

@ -228,7 +228,7 @@ extern "C" {
#endif
int mca_oob_base_open(void);
int mca_oob_base_init(bool *allow_multi_user_threads, bool *have_hidden_threads);
int mca_oob_base_register(void);
int mca_oob_base_module_init(void);
int mca_oob_base_close(void);
#if defined(c_plusplus) || defined(__cplusplus)
}

@ -1,7 +1,5 @@
/* -*- C -*-
*
/*
* $HEADER$
*
*/
#include <unistd.h>
@ -15,6 +13,8 @@
#include "mca/ns/ns.h"
#include "mca/gpr/base/base.h"
#include "mca/gpr/gpr.h"
#include "mca/pcmclient/pcmclient.h"
#include "mca/pcmclient/base/base.h"
static int mca_oob_tcp_create_listen(void);
@ -265,6 +265,15 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
}
}
/* check for wildcard name - if this is true - we allocate a name from the name server
* and return to the peer
*/
if(mca_oob_tcp_process_name_compare(&name, MCA_OOB_NAME_ANY) == 0) {
name.jobid = ompi_name_server.create_jobid();
name.vpid = ompi_name_server.reserve_range(name.jobid,1);
ompi_name_server.assign_cellid_to_process(&name);
}
/* lookup the corresponding process */
peer = mca_oob_tcp_peer_lookup(&name, true);
if(NULL == peer) {
@ -331,13 +340,18 @@ int mca_oob_tcp_init(void)
char *addr;
int rc;
/* setup self to point to actual process name */
if(mca_oob_tcp_process_name_compare(&mca_oob_name_self, &mca_oob_name_any) == 0) {
mca_oob_name_self = *mca_pcmclient.pcmclient_get_self();
}
/* put contact info in registry */
keys[0] = "tcp";
keys[1] = ompi_name_server.get_proc_name_string(&mca_oob_name_self);
keys[2] = NULL;
addr = mca_oob_tcp_get_addr();
rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, "oob", keys, addr, strlen(addr)+1);
rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, "oob", keys, (ompi_registry_object_t)addr, strlen(addr)+1);
free(addr);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_oob_tcp_init: unable to contact registry.");

@ -1,3 +1,6 @@
/*
* $HEADER$
*/
#include "mca/oob/tcp/oob_tcp.h"
#include "mca/oob/tcp/oob_tcp_msg.h"

@ -1,3 +1,6 @@
/*
* $HEADER$
*/
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
@ -350,10 +353,16 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
*/
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
{
/* send process identifier to remote peer */
ompi_process_name_t guid = mca_oob_name_self;
OMPI_PROCESS_NAME_HTON(guid);
if(mca_oob_tcp_peer_send_blocking( peer, &guid, sizeof(guid)) != sizeof(guid)) {
/* send process identifier of self and peer - note that we may
* have assigned the peer a unique process name - if it came up
* without one.
*/
ompi_process_name_t guid[2];
guid[0] = mca_oob_name_self;
guid[1] = peer->peer_name;
OMPI_PROCESS_NAME_HTON(guid[0]);
OMPI_PROCESS_NAME_HTON(guid[1]);
if(mca_oob_tcp_peer_send_blocking(peer, guid, sizeof(guid)) != sizeof(guid)) {
return OMPI_ERR_UNREACH;
}
return OMPI_SUCCESS;
@ -366,19 +375,25 @@ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
*/
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
{
ompi_process_name_t guid;
if((mca_oob_tcp_peer_recv_blocking(peer, &guid, sizeof(ompi_process_name_t))) != sizeof(ompi_process_name_t)) {
ompi_process_name_t guid[2];
if((mca_oob_tcp_peer_recv_blocking(peer, guid, sizeof(guid))) != sizeof(guid)) {
return OMPI_ERR_UNREACH;
}
OMPI_PROCESS_NAME_NTOH(guid);
OMPI_PROCESS_NAME_NTOH(guid[0]);
OMPI_PROCESS_NAME_NTOH(guid[1]);
/* compare this to the expected values */
if(memcmp(&peer->peer_name, &guid, sizeof(ompi_process_name_t)) != 0) {
/* compare the peers name to the expected value */
if(memcmp(&peer->peer_name, &guid[0], sizeof(ompi_process_name_t)) != 0) {
ompi_output(0, "mca_oob_tcp_peer_connect: received unexpected process identifier");
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
/* if we have a wildcard name - use the name returned by the peer */
if(mca_oob_tcp_process_name_compare(&mca_oob_name_self, &mca_oob_name_any) == 0) {
mca_oob_name_self = guid[1];
}
/* connected */
mca_oob_tcp_peer_connected(peer);
#if OMPI_ENABLE_DEBUG

@ -1,3 +1,6 @@
/*
* $HEADER$
*/
#include "mca/oob/tcp/oob_tcp.h"
/*

@ -1,3 +1,6 @@
/*
* $HEADER$
*/
#include "mca/oob/tcp/oob_tcp.h"
/*

@ -92,10 +92,32 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
*allow_multi_user_threads = true;
*have_hidden_threads = false;
ompi_output(0, "entered rte_init - starting name server");
ompi_output(0, "entered rte_init");
/*
* Out of Band Messaging
*/
ompi_output(0, "starting oob");
if (OMPI_SUCCESS != (ret = mca_oob_base_open())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in oob_base_open\n");
return ret;
}
if (OMPI_SUCCESS != (ret = mca_oob_base_init(&user_threads,
&hidden_threads))) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in mca_oob_base_init()\n");
return ret;
}
*allow_multi_user_threads &= user_threads;
*have_hidden_threads |= hidden_threads;
/*
* Name Server
*/
ompi_output(0, "starting name server");
if (OMPI_SUCCESS != (ret = mca_ns_base_open())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in ns_base_open\n");
@ -110,10 +132,11 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
*allow_multi_user_threads &= user_threads;
*have_hidden_threads |= hidden_threads;
ompi_output(0, "starting pcm-client");
/*
* Process Control and Monitoring Client
*/
ompi_output(0, "starting pcm-client");
if (OMPI_SUCCESS != (ret = mca_pcmclient_base_open())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in pcmclient_base_open\n");
@ -158,25 +181,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
*allow_multi_user_threads &= user_threads;
*have_hidden_threads |= hidden_threads;
ompi_output(0, "starting oob");
/*
* Out of Band Messaging
*/
if (OMPI_SUCCESS != (ret = mca_oob_base_open())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in oob_base_open\n");
return ret;
}
if (OMPI_SUCCESS != (ret = mca_oob_base_init(&user_threads,
&hidden_threads))) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in mca_oob_base_init()\n");
return ret;
}
*allow_multi_user_threads &= user_threads;
*have_hidden_threads |= hidden_threads;
ompi_output(0, "starting gpr");
/*
@ -220,21 +224,14 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
return OMPI_ERROR;
}
/*
* Call back into NS/GPR/OOB to allow them to do any final initialization
* (e.g. register callbacks w/ OOB, put contact info in register).
* Call back into OOB to allow do any final initialization
* (e.g. put contact info in register).
*/
/* if (OMPI_SUCCESS != (ret = ompi_name_server.init())) { */
/* printf("show_help: ompi_rte_init failed in ompi_name_server.init()\n"); */
/* return ret; */
/* } */
/* if (OMPI_SUCCESS != (ret = mca_oob_base_register())) { */
/* printf("show_help: ompi_rte_init failed in mca_oob_base_register()\n"); */
/* return ret; */
/* } */
if (OMPI_SUCCESS != (ret = mca_oob_base_module_init())) {
ompi_output(0, "ompi_rte_init: failed in mca_oob_base_module_init()\n");
return ret;
}
/*
* All done
@ -243,7 +240,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
}
/*
* interface type support
*/