From 3d754c49417a76364061eb4a8bd5355a04f9fd96 Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Wed, 25 Aug 2004 17:39:08 +0000 Subject: [PATCH] - modifications to oob to support bringing up oob before anything else (ns) - changed ns/gpr to register callback in component init (and ignore errors if not supported) This commit was SVN r2303. --- src/mca/gpr/replica/gpr_replica_component.c | 7 +- src/mca/ns/ns.h | 8 --- src/mca/ns/proxy/src/ns_proxy_component.c | 7 +- src/mca/ns/replica/src/ns_replica_component.c | 22 ++----- src/mca/oob/base/oob_base_init.c | 37 +++-------- src/mca/oob/cofs/src/oob_cofs.c | 5 +- src/mca/oob/oob.h | 2 +- src/mca/oob/tcp/oob_tcp.c | 22 +++++-- src/mca/oob/tcp/oob_tcp_msg.c | 3 + src/mca/oob/tcp/oob_tcp_peer.c | 33 +++++++--- src/mca/oob/tcp/oob_tcp_recv.c | 3 + src/mca/oob/tcp/oob_tcp_send.c | 3 + src/runtime/ompi_rte_init.c | 66 +++++++++---------- 13 files changed, 106 insertions(+), 112 deletions(-) diff --git a/src/mca/gpr/replica/gpr_replica_component.c b/src/mca/gpr/replica/gpr_replica_component.c index 042dd3b2a3..1f466c3808 100644 --- a/src/mca/gpr/replica/gpr_replica_component.c +++ b/src/mca/gpr/replica/gpr_replica_component.c @@ -269,7 +269,7 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool setup and return the module */ if (ompi_process_info.seed) { - + int rc; /* Return a module (choose an arbitrary, positive priority -- it's only relevant compared to other ns components). If @@ -292,7 +292,10 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool mca_gpr_replica_head.lastkey = 0; /* issue the non-blocking receive */ -/* mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); */ + rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); + if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { + return NULL; + } /* Return the module */ diff --git a/src/mca/ns/ns.h b/src/mca/ns/ns.h index b9e5532837..ce25ecb4da 100644 --- a/src/mca/ns/ns.h +++ b/src/mca/ns/ns.h @@ -454,13 +454,6 @@ typedef mca_ns_base_cellid_t (*mca_ns_base_module_get_cellid_fn_t)(const ompi_pr */ typedef int (*mca_ns_base_module_compare_fn_t)(ompi_ns_cmp_bitmask_t fields, const ompi_process_name_t *name1, const ompi_process_name_t *name2); -/** - * Called after all RTE components have been initialized. Provides the selected - * module the chance to perform additional initialization (e.g. register w/ OOB). - */ - -typedef int (*mca_ns_base_module_init_fn_t)(void); - /* * Ver 1.0.0 */ @@ -481,7 +474,6 @@ struct mca_ns_base_module_1_0_0_t { mca_ns_base_module_get_jobid_fn_t get_jobid; mca_ns_base_module_get_cellid_fn_t get_cellid; mca_ns_base_module_compare_fn_t compare; - mca_ns_base_module_init_fn_t init; }; typedef struct mca_ns_base_module_1_0_0_t mca_ns_base_module_1_0_0_t; diff --git a/src/mca/ns/proxy/src/ns_proxy_component.c b/src/mca/ns/proxy/src/ns_proxy_component.c index ee09386f34..9ac8c5ed7d 100644 --- a/src/mca/ns/proxy/src/ns_proxy_component.c +++ b/src/mca/ns/proxy/src/ns_proxy_component.c @@ -25,10 +25,6 @@ #include "mca/ns/base/base.h" #include "ns_proxy.h" -/* empty stub for proxy */ -static int ns_proxy_init(void) -{ return OMPI_SUCCESS; } - /* * Struct of function pointers that need to be initialized @@ -70,8 +66,7 @@ static mca_ns_base_module_t mca_ns_proxy = { ns_base_get_vpid, ns_base_get_jobid, ns_base_get_cellid, - ns_base_compare, - ns_proxy_init + ns_base_compare }; /* diff --git a/src/mca/ns/replica/src/ns_replica_component.c b/src/mca/ns/replica/src/ns_replica_component.c index 091ea2b1e0..9a93a1d029 100644 --- a/src/mca/ns/replica/src/ns_replica_component.c +++ b/src/mca/ns/replica/src/ns_replica_component.c @@ -25,7 +25,6 @@ #include "mca/ns/base/base.h" #include "ns_replica.h" -static int mca_ns_replica_module_init(void); /* * Struct of function pointers that need to be initialized @@ -67,8 +66,7 @@ static mca_ns_base_module_t mca_ns_replica = { ns_base_get_vpid, ns_base_get_jobid, ns_base_get_cellid, - ns_base_compare, - mca_ns_replica_module_init + ns_base_compare }; /* @@ -122,12 +120,12 @@ int mca_ns_replica_close(void) mca_ns_base_module_t* mca_ns_replica_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority) { - /* If we're the seed, then we want to be selected, so do all the setup and return the module */ if (ompi_process_info.seed) { + int rc; mca_ns_replica_last_used_cellid = 0; mca_ns_replica_last_used_jobid = 0; @@ -156,6 +154,11 @@ mca_ns_base_module_t* mca_ns_replica_init(bool *allow_multi_user_threads, bool * initialized = true; /* issue non-blocking receive for call_back function */ + rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_NS, 0, mca_ns_replica_recv, NULL); + if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { + ompi_output(0, "mca_ns_replica_init: unable to post non-blocking recv\n"); + return NULL; + } return &mca_ns_replica; } else { @@ -260,14 +263,3 @@ void mca_ns_replica_recv(int status, ompi_process_name_t* sender, mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_NS, 0, mca_ns_replica_recv, NULL); } - -/* - * init the selected module - this is called after all RTE components - * have been initialized - */ - -static int mca_ns_replica_module_init(void) -{ - return mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_NS, 0, mca_ns_replica_recv, NULL); -} - diff --git a/src/mca/oob/base/oob_base_init.c b/src/mca/oob/base/oob_base_init.c index f375813e61..6b34456ca2 100644 --- a/src/mca/oob/base/oob_base_init.c +++ b/src/mca/oob/base/oob_base_init.c @@ -33,9 +33,9 @@ OBJ_CLASS_INSTANCE( NULL ); -ompi_process_name_t mca_oob_name_seed; -ompi_process_name_t mca_oob_name_self; -ompi_process_name_t mca_oob_name_any; +ompi_process_name_t mca_oob_name_seed = { 0, 0, 0 }; +ompi_process_name_t mca_oob_name_self = { MCA_NS_BASE_CELLID_MAX, MCA_NS_BASE_JOBID_MAX, MCA_NS_BASE_VPID_MAX }; +ompi_process_name_t mca_oob_name_any = { MCA_NS_BASE_CELLID_MAX, MCA_NS_BASE_JOBID_MAX, MCA_NS_BASE_VPID_MAX }; /** * Parse contact info string into process name and list of uri strings. @@ -74,7 +74,6 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads) mca_oob_base_component_t *component; mca_oob_t *module; extern ompi_list_t mca_oob_base_components; - ompi_process_name_t *self; int i, id; char* seed; char** uri = NULL; @@ -82,20 +81,6 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads) char** include = ompi_argv_split(mca_oob_base_include, ','); char** exclude = ompi_argv_split(mca_oob_base_exclude, ','); - /* setup local name */ - self = mca_pcmclient.pcmclient_get_self(); - if(NULL == self) { - ompi_output(0, "mca_oob_base_init: could not get pcmclient self pointer"); - return OMPI_ERROR; - } - mca_oob_name_self = *self; - - /* setup wildcard name */ - mca_oob_name_any = *ompi_name_server.create_process_name( - MCA_NS_BASE_CELLID_MAX, - MCA_NS_BASE_JOBID_MAX, - MCA_NS_BASE_VPID_MAX); - /* setup seed daemons name and address */ id = mca_base_param_register_string("oob","base","seed",NULL,NULL); mca_base_param_lookup_string(id,&seed); @@ -230,24 +215,20 @@ int mca_oob_set_contact_info(const char* seed) return OMPI_SUCCESS; } -/** -* Obtains the contact info (oob implementation specific) URI strings through -* which this process can be contacted on an OOB channel. -* -* @return A null terminated string. -* -* The caller is responsible for freeing the returned string. -*/ - /** * Called to request the selected oob components to * register their address with the seed deamon. */ -int mca_oob_base_register(void) +int mca_oob_base_module_init(void) { ompi_list_item_t* item; + /* setup self to point to actual process name */ + if(ompi_name_server.compare(OMPI_NS_CMP_ALL, &mca_oob_name_self, &mca_oob_name_any) == 0) { + mca_oob_name_self = *mca_pcmclient.pcmclient_get_self(); + } + /* Initialize all modules after oob/gpr/ns have initialized */ for (item = ompi_list_get_first(&mca_oob_base_modules); item != ompi_list_get_end(&mca_oob_base_modules); diff --git a/src/mca/oob/cofs/src/oob_cofs.c b/src/mca/oob/cofs/src/oob_cofs.c index d505a30602..c9c02dacaa 100644 --- a/src/mca/oob/cofs/src/oob_cofs.c +++ b/src/mca/oob/cofs/src/oob_cofs.c @@ -138,10 +138,7 @@ mca_oob_cofs_recv_nb( mca_oob_callback_fn_t cbfunc, void* cbdata) { - int status = mca_oob_cofs_recv(peer, iov, count, &tag, flags); - if(NULL != cbfunc) - cbfunc(status, peer, iov, count, tag, cbdata); - return status; + return OMPI_ERR_NOT_IMPLEMENTED; } diff --git a/src/mca/oob/oob.h b/src/mca/oob/oob.h index 336fce136d..de6bdba694 100644 --- a/src/mca/oob/oob.h +++ b/src/mca/oob/oob.h @@ -228,7 +228,7 @@ extern "C" { #endif int mca_oob_base_open(void); int mca_oob_base_init(bool *allow_multi_user_threads, bool *have_hidden_threads); - int mca_oob_base_register(void); + int mca_oob_base_module_init(void); int mca_oob_base_close(void); #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/src/mca/oob/tcp/oob_tcp.c b/src/mca/oob/tcp/oob_tcp.c index 3874dd6919..558eef4bcc 100644 --- a/src/mca/oob/tcp/oob_tcp.c +++ b/src/mca/oob/tcp/oob_tcp.c @@ -1,7 +1,5 @@ -/* -*- C -*- - * +/* * $HEADER$ - * */ #include @@ -15,6 +13,8 @@ #include "mca/ns/ns.h" #include "mca/gpr/base/base.h" #include "mca/gpr/gpr.h" +#include "mca/pcmclient/pcmclient.h" +#include "mca/pcmclient/base/base.h" static int mca_oob_tcp_create_listen(void); @@ -265,6 +265,15 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user) } } + /* check for wildcard name - if this is true - we allocate a name from the name server + * and return to the peer + */ + if(mca_oob_tcp_process_name_compare(&name, MCA_OOB_NAME_ANY) == 0) { + name.jobid = ompi_name_server.create_jobid(); + name.vpid = ompi_name_server.reserve_range(name.jobid,1); + ompi_name_server.assign_cellid_to_process(&name); + } + /* lookup the corresponding process */ peer = mca_oob_tcp_peer_lookup(&name, true); if(NULL == peer) { @@ -331,13 +340,18 @@ int mca_oob_tcp_init(void) char *addr; int rc; + /* setup self to point to actual process name */ + if(mca_oob_tcp_process_name_compare(&mca_oob_name_self, &mca_oob_name_any) == 0) { + mca_oob_name_self = *mca_pcmclient.pcmclient_get_self(); + } + /* put contact info in registry */ keys[0] = "tcp"; keys[1] = ompi_name_server.get_proc_name_string(&mca_oob_name_self); keys[2] = NULL; addr = mca_oob_tcp_get_addr(); - rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, "oob", keys, addr, strlen(addr)+1); + rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, "oob", keys, (ompi_registry_object_t)addr, strlen(addr)+1); free(addr); if(rc != OMPI_SUCCESS) { ompi_output(0, "mca_oob_tcp_init: unable to contact registry."); diff --git a/src/mca/oob/tcp/oob_tcp_msg.c b/src/mca/oob/tcp/oob_tcp_msg.c index 0e9903bb67..30be00cede 100644 --- a/src/mca/oob/tcp/oob_tcp_msg.c +++ b/src/mca/oob/tcp/oob_tcp_msg.c @@ -1,3 +1,6 @@ +/* + * $HEADER$ + */ #include "mca/oob/tcp/oob_tcp.h" #include "mca/oob/tcp/oob_tcp_msg.h" diff --git a/src/mca/oob/tcp/oob_tcp_peer.c b/src/mca/oob/tcp/oob_tcp_peer.c index c471181008..7b61f30e50 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.c +++ b/src/mca/oob/tcp/oob_tcp_peer.c @@ -1,3 +1,6 @@ +/* + * $HEADER$ + */ #include #include #include @@ -350,10 +353,16 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer) */ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) { - /* send process identifier to remote peer */ - ompi_process_name_t guid = mca_oob_name_self; - OMPI_PROCESS_NAME_HTON(guid); - if(mca_oob_tcp_peer_send_blocking( peer, &guid, sizeof(guid)) != sizeof(guid)) { + /* send process identifier of self and peer - note that we may + * have assigned the peer a unique process name - if it came up + * without one. + */ + ompi_process_name_t guid[2]; + guid[0] = mca_oob_name_self; + guid[1] = peer->peer_name; + OMPI_PROCESS_NAME_HTON(guid[0]); + OMPI_PROCESS_NAME_HTON(guid[1]); + if(mca_oob_tcp_peer_send_blocking(peer, guid, sizeof(guid)) != sizeof(guid)) { return OMPI_ERR_UNREACH; } return OMPI_SUCCESS; @@ -366,19 +375,25 @@ static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) */ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer) { - ompi_process_name_t guid; - if((mca_oob_tcp_peer_recv_blocking(peer, &guid, sizeof(ompi_process_name_t))) != sizeof(ompi_process_name_t)) { + ompi_process_name_t guid[2]; + if((mca_oob_tcp_peer_recv_blocking(peer, guid, sizeof(guid))) != sizeof(guid)) { return OMPI_ERR_UNREACH; } - OMPI_PROCESS_NAME_NTOH(guid); + OMPI_PROCESS_NAME_NTOH(guid[0]); + OMPI_PROCESS_NAME_NTOH(guid[1]); - /* compare this to the expected values */ - if(memcmp(&peer->peer_name, &guid, sizeof(ompi_process_name_t)) != 0) { + /* compare the peers name to the expected value */ + if(memcmp(&peer->peer_name, &guid[0], sizeof(ompi_process_name_t)) != 0) { ompi_output(0, "mca_oob_tcp_peer_connect: received unexpected process identifier"); mca_oob_tcp_peer_close(peer); return OMPI_ERR_UNREACH; } + /* if we have a wildcard name - use the name returned by the peer */ + if(mca_oob_tcp_process_name_compare(&mca_oob_name_self, &mca_oob_name_any) == 0) { + mca_oob_name_self = guid[1]; + } + /* connected */ mca_oob_tcp_peer_connected(peer); #if OMPI_ENABLE_DEBUG diff --git a/src/mca/oob/tcp/oob_tcp_recv.c b/src/mca/oob/tcp/oob_tcp_recv.c index b48fc99c66..1cb7214b30 100644 --- a/src/mca/oob/tcp/oob_tcp_recv.c +++ b/src/mca/oob/tcp/oob_tcp_recv.c @@ -1,3 +1,6 @@ +/* + * $HEADER$ + */ #include "mca/oob/tcp/oob_tcp.h" /* diff --git a/src/mca/oob/tcp/oob_tcp_send.c b/src/mca/oob/tcp/oob_tcp_send.c index 6343eb6e07..3820f34ceb 100644 --- a/src/mca/oob/tcp/oob_tcp_send.c +++ b/src/mca/oob/tcp/oob_tcp_send.c @@ -1,3 +1,6 @@ +/* + * $HEADER$ + */ #include "mca/oob/tcp/oob_tcp.h" /* diff --git a/src/runtime/ompi_rte_init.c b/src/runtime/ompi_rte_init.c index ed0c2bb034..7ad71e6291 100644 --- a/src/runtime/ompi_rte_init.c +++ b/src/runtime/ompi_rte_init.c @@ -92,10 +92,32 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads) *allow_multi_user_threads = true; *have_hidden_threads = false; - ompi_output(0, "entered rte_init - starting name server"); + ompi_output(0, "entered rte_init"); + + /* + * Out of Band Messaging + */ + ompi_output(0, "starting oob"); + + if (OMPI_SUCCESS != (ret = mca_oob_base_open())) { + /* JMS show_help */ + printf("show_help: ompi_rte_init failed in oob_base_open\n"); + return ret; + } + if (OMPI_SUCCESS != (ret = mca_oob_base_init(&user_threads, + &hidden_threads))) { + /* JMS show_help */ + printf("show_help: ompi_rte_init failed in mca_oob_base_init()\n"); + return ret; + } + *allow_multi_user_threads &= user_threads; + *have_hidden_threads |= hidden_threads; + /* * Name Server */ + ompi_output(0, "starting name server"); + if (OMPI_SUCCESS != (ret = mca_ns_base_open())) { /* JMS show_help */ printf("show_help: ompi_rte_init failed in ns_base_open\n"); @@ -110,10 +132,11 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads) *allow_multi_user_threads &= user_threads; *have_hidden_threads |= hidden_threads; - ompi_output(0, "starting pcm-client"); /* * Process Control and Monitoring Client */ + ompi_output(0, "starting pcm-client"); + if (OMPI_SUCCESS != (ret = mca_pcmclient_base_open())) { /* JMS show_help */ printf("show_help: ompi_rte_init failed in pcmclient_base_open\n"); @@ -158,25 +181,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads) *allow_multi_user_threads &= user_threads; *have_hidden_threads |= hidden_threads; - ompi_output(0, "starting oob"); - - /* - * Out of Band Messaging - */ - if (OMPI_SUCCESS != (ret = mca_oob_base_open())) { - /* JMS show_help */ - printf("show_help: ompi_rte_init failed in oob_base_open\n"); - return ret; - } - if (OMPI_SUCCESS != (ret = mca_oob_base_init(&user_threads, - &hidden_threads))) { - /* JMS show_help */ - printf("show_help: ompi_rte_init failed in mca_oob_base_init()\n"); - return ret; - } - *allow_multi_user_threads &= user_threads; - *have_hidden_threads |= hidden_threads; - ompi_output(0, "starting gpr"); /* @@ -220,21 +224,14 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads) return OMPI_ERROR; } - /* - * Call back into NS/GPR/OOB to allow them to do any final initialization - * (e.g. register callbacks w/ OOB, put contact info in register). + * Call back into OOB to allow do any final initialization + * (e.g. put contact info in register). */ - /* if (OMPI_SUCCESS != (ret = ompi_name_server.init())) { */ -/* printf("show_help: ompi_rte_init failed in ompi_name_server.init()\n"); */ -/* return ret; */ -/* } */ - -/* if (OMPI_SUCCESS != (ret = mca_oob_base_register())) { */ -/* printf("show_help: ompi_rte_init failed in mca_oob_base_register()\n"); */ -/* return ret; */ -/* } */ - + if (OMPI_SUCCESS != (ret = mca_oob_base_module_init())) { + ompi_output(0, "ompi_rte_init: failed in mca_oob_base_module_init()\n"); + return ret; + } /* * All done @@ -243,7 +240,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads) } - /* * interface type support */