Merge pull request #7018 from rhc54/cmr40/oob
v4.0.x: Cleanup stale code in ORTE/OOB
Этот коммит содержится в:
Коммит
2d0dcaeedc
@ -55,14 +55,12 @@ BEGIN_C_DECLS
|
||||
* Convenience Typedef
|
||||
*/
|
||||
typedef struct {
|
||||
opal_event_base_t *ev_base;
|
||||
char *include;
|
||||
char *exclude;
|
||||
opal_list_t components;
|
||||
opal_list_t actives;
|
||||
int max_uri_length;
|
||||
opal_hash_table_t peers;
|
||||
int num_threads;
|
||||
#if OPAL_ENABLE_TIMING
|
||||
bool timing;
|
||||
#endif
|
||||
@ -121,7 +119,7 @@ ORTE_DECLSPEC void orte_oob_base_send_nb(int fd, short args, void *cbdata);
|
||||
__FILE__, __LINE__); \
|
||||
cd = OBJ_NEW(orte_oob_send_t); \
|
||||
cd->msg = (m); \
|
||||
ORTE_THREADSHIFT(cd, orte_oob_base.ev_base, \
|
||||
ORTE_THREADSHIFT(cd, orte_event_base, \
|
||||
orte_oob_base_send_nb, ORTE_MSG_PRI); \
|
||||
}while(0)
|
||||
|
||||
|
@ -15,7 +15,7 @@
|
||||
* reserved.
|
||||
* Copyright (c) 2015-2016 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* Copyright (c) 2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2017-2019 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -55,14 +55,6 @@ orte_oob_base_t orte_oob_base = {0};
|
||||
|
||||
static int orte_oob_base_register(mca_base_register_flag_t flags)
|
||||
{
|
||||
orte_oob_base.num_threads = 0;
|
||||
(void)mca_base_var_register("orte", "oob", "base", "num_progress_threads",
|
||||
"Number of independent progress OOB messages for each interface",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&orte_oob_base.num_threads);
|
||||
|
||||
#if OPAL_ENABLE_TIMING
|
||||
/* Detailed timing setup */
|
||||
orte_oob_base.timing = false;
|
||||
@ -91,10 +83,6 @@ static int orte_oob_base_close(void)
|
||||
OBJ_RELEASE(cli);
|
||||
}
|
||||
|
||||
if (!ORTE_PROC_IS_APP && !ORTE_PROC_IS_TOOL) {
|
||||
opal_progress_thread_finalize("OOB-BASE");
|
||||
}
|
||||
|
||||
/* destruct our internal lists */
|
||||
OBJ_DESTRUCT(&orte_oob_base.actives);
|
||||
|
||||
@ -122,13 +110,6 @@ static int orte_oob_base_open(mca_base_open_flag_t flags)
|
||||
opal_hash_table_init(&orte_oob_base.peers, 128);
|
||||
OBJ_CONSTRUCT(&orte_oob_base.actives, opal_list_t);
|
||||
|
||||
if (ORTE_PROC_IS_APP || ORTE_PROC_IS_TOOL) {
|
||||
orte_oob_base.ev_base = orte_event_base;
|
||||
} else {
|
||||
orte_oob_base.ev_base = opal_progress_thread_init("OOB-BASE");
|
||||
}
|
||||
|
||||
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
/* register the FT events callback */
|
||||
orte_state.add_job_state(ORTE_JOB_STATE_FT_CHECKPOINT, orte_oob_base_ft_event, ORTE_ERROR_PRI);
|
||||
|
@ -141,12 +141,6 @@ static void ping(const orte_process_name_t *proc)
|
||||
return;
|
||||
}
|
||||
|
||||
/* has this peer had a progress thread assigned yet? */
|
||||
if (NULL == peer->ev_base) {
|
||||
/* nope - assign one */
|
||||
ORTE_OOB_TCP_NEXT_BASE(peer);
|
||||
}
|
||||
|
||||
/* if we are already connected, there is nothing to do */
|
||||
if (MCA_OOB_TCP_CONNECTED == peer->state) {
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
@ -204,11 +198,7 @@ static void send_nb(orte_rml_send_t *msg)
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num,
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
/* has this peer had a progress thread assigned yet? */
|
||||
if (NULL == peer->ev_base) {
|
||||
/* nope - assign one */
|
||||
ORTE_OOB_TCP_NEXT_BASE(peer);
|
||||
}
|
||||
|
||||
/* add the msg to the hop's send queue */
|
||||
if (MCA_OOB_TCP_CONNECTED == peer->state) {
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
|
@ -147,12 +147,8 @@ mca_oob_tcp_component_t mca_oob_tcp_component = {
|
||||
*/
|
||||
static int tcp_component_open(void)
|
||||
{
|
||||
mca_oob_tcp_component.next_base = 0;
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.peers, opal_hash_table_t);
|
||||
opal_hash_table_init(&mca_oob_tcp_component.peers, 32);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.ev_bases, opal_pointer_array_t);
|
||||
opal_pointer_array_init(&mca_oob_tcp_component.ev_bases,
|
||||
orte_oob_base.num_threads, 256, 8);
|
||||
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.listeners, opal_list_t);
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
@ -206,8 +202,6 @@ static int tcp_component_close(void)
|
||||
}
|
||||
#endif
|
||||
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.ev_bases);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
static char *static_port_string;
|
||||
@ -664,27 +658,11 @@ static orte_rml_pathway_t* component_query_transports(void)
|
||||
static int component_startup(void)
|
||||
{
|
||||
int rc = ORTE_SUCCESS;
|
||||
int i;
|
||||
char *tmp;
|
||||
opal_event_base_t *evb;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s TCP STARTUP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
/* initialize state */
|
||||
if (0 == orte_oob_base.num_threads) {
|
||||
opal_pointer_array_add(&mca_oob_tcp_component.ev_bases, orte_oob_base.ev_base);
|
||||
} else {
|
||||
for (i=0; i < orte_oob_base.num_threads; i++) {
|
||||
asprintf(&tmp, "OOB-TCP-%d", i);
|
||||
evb = opal_progress_thread_init(tmp);
|
||||
opal_pointer_array_add(&mca_oob_tcp_component.ev_bases, evb);
|
||||
opal_argv_append_nosize(&mca_oob_tcp_component.ev_threads, tmp);
|
||||
free(tmp);
|
||||
}
|
||||
}
|
||||
|
||||
/* if we are a daemon/HNP, or we are a standalone app,
|
||||
* then it is possible that someone else may initiate a
|
||||
* connection to us. In these cases, we need to start the
|
||||
@ -712,14 +690,6 @@ static void component_shutdown(void)
|
||||
"%s TCP SHUTDOWN",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
if (0 < orte_oob_base.num_threads) {
|
||||
for (i=0; i < orte_oob_base.num_threads; i++) {
|
||||
opal_progress_thread_finalize(mca_oob_tcp_component.ev_threads[i]);
|
||||
opal_pointer_array_set_item(&mca_oob_tcp_component.ev_bases, i, NULL);
|
||||
}
|
||||
opal_argv_free(mca_oob_tcp_component.ev_threads);
|
||||
}
|
||||
|
||||
if (ORTE_PROC_IS_HNP && mca_oob_tcp_component.listen_thread_active) {
|
||||
mca_oob_tcp_component.listen_thread_active = false;
|
||||
/* tell the thread to exit */
|
||||
@ -1366,7 +1336,6 @@ static char **split_and_resolve(char **orig_str, char *name)
|
||||
|
||||
static void peer_cons(mca_oob_tcp_peer_t *peer)
|
||||
{
|
||||
peer->ev_base = NULL;
|
||||
peer->auth_method = NULL;
|
||||
peer->sd = -1;
|
||||
OBJ_CONSTRUCT(&peer->addrs, opal_list_t);
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2019 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -48,9 +48,6 @@ typedef struct {
|
||||
int max_retries; /**< max number of retries before declaring peer gone */
|
||||
opal_list_t events; /**< events for monitoring connections */
|
||||
int peer_limit; /**< max size of tcp peer cache */
|
||||
opal_pointer_array_t ev_bases; // event base array for progress threads
|
||||
char** ev_threads; // event progress thread names
|
||||
int next_base; // counter to load-level thread use
|
||||
opal_hash_table_t peers; // connection addresses for peers
|
||||
|
||||
/* Port specifications */
|
||||
@ -96,13 +93,4 @@ ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_failed_to_connect(int fd, short
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata);
|
||||
|
||||
#define ORTE_OOB_TCP_NEXT_BASE(p) \
|
||||
do { \
|
||||
++mca_oob_tcp_component.next_base; \
|
||||
if (orte_oob_base.num_threads <= mca_oob_tcp_component.next_base) { \
|
||||
mca_oob_tcp_component.next_base = 0; \
|
||||
} \
|
||||
(p)->ev_base = (opal_event_base_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.ev_bases, mca_oob_tcp_component.next_base); \
|
||||
} while(0)
|
||||
|
||||
#endif /* _MCA_OOB_TCP_COMPONENT_H_ */
|
||||
|
@ -515,10 +515,7 @@ static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
if (peer->sd >= 0) {
|
||||
assert(!peer->send_ev_active && !peer->recv_ev_active);
|
||||
if (NULL == peer->ev_base) {
|
||||
ORTE_OOB_TCP_NEXT_BASE(peer);
|
||||
}
|
||||
opal_event_set(peer->ev_base,
|
||||
opal_event_set(orte_event_base,
|
||||
&peer->recv_event,
|
||||
peer->sd,
|
||||
OPAL_EV_READ|OPAL_EV_PERSIST,
|
||||
@ -530,7 +527,7 @@ static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
|
||||
peer->recv_ev_active = false;
|
||||
}
|
||||
|
||||
opal_event_set(peer->ev_base,
|
||||
opal_event_set(orte_event_base,
|
||||
&peer->send_event,
|
||||
peer->sd,
|
||||
OPAL_EV_WRITE|OPAL_EV_PERSIST,
|
||||
@ -811,7 +808,6 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
peer = OBJ_NEW(mca_oob_tcp_peer_t);
|
||||
peer->name = hdr.origin;
|
||||
ORTE_OOB_TCP_NEXT_BASE(peer); // assign it an event base
|
||||
peer->state = MCA_OOB_TCP_ACCEPTING;
|
||||
ui64 = (uint64_t*)(&peer->name);
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, (*ui64), peer)) {
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2019 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -60,14 +60,14 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
ORTE_NAME_PRINT((&(p)->name))); \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
cop->peer = (p); \
|
||||
ORTE_THREADSHIFT(cop, (p)->ev_base, (cbfunc), ORTE_MSG_PRI); \
|
||||
ORTE_THREADSHIFT(cop, orte_event_base, (cbfunc), ORTE_MSG_PRI); \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_ACCEPT_STATE(s, a, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_conn_op_t *cop; \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
opal_event_set(orte_oob_base.ev_base, &cop->ev, s, \
|
||||
opal_event_set(orte_event_base, &cop->ev, s, \
|
||||
OPAL_EV_READ, (cbfunc), cop); \
|
||||
opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \
|
||||
ORTE_POST_OBJECT(cop); \
|
||||
@ -84,7 +84,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
ORTE_NAME_PRINT((&(p)->name))); \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
cop->peer = (p); \
|
||||
opal_event_evtimer_set((p)->ev_base, \
|
||||
opal_event_evtimer_set(orte_event_base, \
|
||||
&cop->ev, \
|
||||
(cbfunc), cop); \
|
||||
ORTE_POST_OBJECT(cop); \
|
||||
|
@ -157,7 +157,7 @@ int orte_oob_tcp_start_listening(void)
|
||||
/* otherwise, setup to listen via the event lib */
|
||||
OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
|
||||
listener->ev_active = true;
|
||||
opal_event_set(orte_oob_base.ev_base, &listener->event,
|
||||
opal_event_set(orte_event_base, &listener->event,
|
||||
listener->sd,
|
||||
OPAL_EV_READ|OPAL_EV_PERSIST,
|
||||
connection_event_handler,
|
||||
@ -744,7 +744,7 @@ static void* listen_thread(opal_object_t *obj)
|
||||
* OS might start rejecting connections due to timeout.
|
||||
*/
|
||||
pending_connection = OBJ_NEW(mca_oob_tcp_pending_connection_t);
|
||||
opal_event_set(orte_oob_base.ev_base, &pending_connection->ev, -1,
|
||||
opal_event_set(orte_event_base, &pending_connection->ev, -1,
|
||||
OPAL_EV_WRITE, connection_handler, pending_connection);
|
||||
opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI);
|
||||
pending_connection->fd = accept(sd,
|
||||
|
@ -52,7 +52,6 @@ typedef struct {
|
||||
mca_oob_tcp_addr_t *active_addr;
|
||||
mca_oob_tcp_state_t state;
|
||||
int num_retries;
|
||||
opal_event_base_t *ev_base; // progress thread this peer is assigned to
|
||||
opal_event_t send_event; /**< registration with event thread for send events */
|
||||
bool send_ev_active;
|
||||
opal_event_t recv_event; /**< registration with event thread for recv events */
|
||||
@ -88,7 +87,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_peer_op_t);
|
||||
if (NULL != proxy) { \
|
||||
pop->rtmod = strdup(proxy); \
|
||||
} \
|
||||
ORTE_THREADSHIFT(pop, orte_oob_base.ev_base, \
|
||||
ORTE_THREADSHIFT(pop, orte_event_base, \
|
||||
(cbfunc), ORTE_MSG_PRI); \
|
||||
} while(0);
|
||||
|
||||
|
@ -82,7 +82,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
|
||||
do { \
|
||||
(s)->peer = (struct mca_oob_tcp_peer_t*)(p); \
|
||||
(s)->activate = (f); \
|
||||
ORTE_THREADSHIFT((s), (p)->ev_base, \
|
||||
ORTE_THREADSHIFT((s), orte_event_base, \
|
||||
mca_oob_tcp_queue_msg, ORTE_MSG_PRI); \
|
||||
} while(0)
|
||||
|
||||
@ -235,7 +235,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_op_t);
|
||||
ORTE_NAME_PRINT(&((ms)->dst))); \
|
||||
mop = OBJ_NEW(mca_oob_tcp_msg_op_t); \
|
||||
mop->msg = (ms); \
|
||||
ORTE_THREADSHIFT(mop, (ms)->peer->ev_base, \
|
||||
ORTE_THREADSHIFT(mop, orte_event_base, \
|
||||
(cbfunc), ORTE_MSG_PRI); \
|
||||
} while(0);
|
||||
|
||||
@ -281,7 +281,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
mop->hop.jobid = (h)->jobid; \
|
||||
mop->hop.vpid = (h)->vpid; \
|
||||
/* this goes to the OOB framework, so use that event base */ \
|
||||
ORTE_THREADSHIFT(mop, orte_oob_base.ev_base, \
|
||||
ORTE_THREADSHIFT(mop, orte_event_base, \
|
||||
(cbfunc), ORTE_MSG_PRI); \
|
||||
} while(0)
|
||||
|
||||
@ -299,7 +299,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
mop->hop.vpid = (h)->vpid; \
|
||||
/* this goes to the component, so use the framework \
|
||||
* event base */ \
|
||||
ORTE_THREADSHIFT(mop, orte_oob_base.ev_base, \
|
||||
ORTE_THREADSHIFT(mop, orte_event_base, \
|
||||
(c), ORTE_MSG_PRI); \
|
||||
} while(0)
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user