Коммит
164fc6436d
@ -11,7 +11,7 @@
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
|
||||
@ -47,12 +47,12 @@ BEGIN_C_DECLS
|
||||
/*
|
||||
* The default starting chunk size
|
||||
*/
|
||||
#define OPAL_DSS_DEFAULT_INITIAL_SIZE 128
|
||||
#define OPAL_DSS_DEFAULT_INITIAL_SIZE 2048
|
||||
/*
|
||||
* The default threshold size when we switch from doubling the
|
||||
* buffer size to addatively increasing it
|
||||
*/
|
||||
#define OPAL_DSS_DEFAULT_THRESHOLD_SIZE 1024
|
||||
#define OPAL_DSS_DEFAULT_THRESHOLD_SIZE 4096
|
||||
|
||||
/*
|
||||
* Internal type corresponding to size_t. Do not use this in
|
||||
|
@ -9,6 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
|
@ -261,7 +261,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
|
||||
orte_proc_t *pptr, *dmn;
|
||||
opal_buffer_t *bptr;
|
||||
orte_app_context_t *app;
|
||||
bool found;
|
||||
orte_node_t *node;
|
||||
bool newmap = false;
|
||||
|
||||
@ -409,6 +408,13 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
|
||||
if (NULL == jdata->map) {
|
||||
jdata->map = OBJ_NEW(orte_job_map_t);
|
||||
newmap = true;
|
||||
} else if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_MAP_INITIALIZED)) {
|
||||
/* zero all the node map flags */
|
||||
for (n=0; n < jdata->map->nodes->size; n++) {
|
||||
if (NULL != (node = (orte_node_t*)opal_pointer_array_get_item(jdata->map->nodes, n))) {
|
||||
ORTE_FLAG_UNSET(node, ORTE_NODE_FLAG_MAPPED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* if we have a file map, then we need to load it */
|
||||
@ -454,17 +460,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
|
||||
opal_pointer_array_add(dmn->node->procs, pptr);
|
||||
|
||||
/* add the node to the map, if not already there */
|
||||
found = false;
|
||||
for (k=0; k < jdata->map->nodes->size; k++) {
|
||||
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(jdata->map->nodes, k))) {
|
||||
continue;
|
||||
}
|
||||
if (node->daemon == dmn) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
if (!ORTE_FLAG_TEST(dmn->node, ORTE_NODE_FLAG_MAPPED)) {
|
||||
OBJ_RETAIN(dmn->node);
|
||||
opal_pointer_array_add(jdata->map->nodes, dmn->node);
|
||||
if (newmap) {
|
||||
@ -497,6 +493,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
|
||||
app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, pptr->app_idx);
|
||||
ORTE_FLAG_SET(app, ORTE_APP_FLAG_USED_ON_NODE);
|
||||
}
|
||||
ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_MAP_INITIALIZED);
|
||||
}
|
||||
|
||||
COMPLETE:
|
||||
|
@ -11,6 +11,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -54,13 +55,14 @@ OPAL_TIMING_DECLARE_EXT(ORTE_DECLSPEC, tm_oob)
|
||||
* Convenience Typedef
|
||||
*/
|
||||
typedef struct {
|
||||
opal_event_base_t *ev_base;
|
||||
char *include;
|
||||
char *exclude;
|
||||
opal_list_t components;
|
||||
opal_list_t actives;
|
||||
int max_uri_length;
|
||||
opal_hash_table_t peers;
|
||||
bool use_module_threads;
|
||||
int num_threads;
|
||||
#if OPAL_ENABLE_TIMING
|
||||
bool timing;
|
||||
#endif
|
||||
@ -119,7 +121,7 @@ ORTE_DECLSPEC void orte_oob_base_send_nb(int fd, short args, void *cbdata);
|
||||
__FILE__, __LINE__); \
|
||||
cd = OBJ_NEW(orte_oob_send_t); \
|
||||
cd->msg = (m); \
|
||||
opal_event_set(orte_event_base, &cd->ev, -1, \
|
||||
opal_event_set(orte_oob_base.ev_base, &cd->ev, -1, \
|
||||
OPAL_EV_WRITE, \
|
||||
orte_oob_base_send_nb, cd); \
|
||||
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI); \
|
||||
@ -173,7 +175,7 @@ OBJ_CLASS_DECLARATION(mca_oob_uri_req_t);
|
||||
mca_oob_uri_req_t *rq; \
|
||||
rq = OBJ_NEW(mca_oob_uri_req_t); \
|
||||
rq->uri = strdup((u)); \
|
||||
opal_event_set(orte_event_base, &(rq)->ev, -1, \
|
||||
opal_event_set(orte_oob_base.ev_base, &(rq)->ev, -1, \
|
||||
OPAL_EV_WRITE, \
|
||||
orte_oob_base_set_addr, (rq)); \
|
||||
opal_event_set_priority(&(rq)->ev, ORTE_MSG_PRI); \
|
||||
@ -193,4 +195,3 @@ ORTE_DECLSPEC void orte_oob_base_ft_event(int fd, short args, void *cbdata);
|
||||
|
||||
END_C_DECLS
|
||||
#endif
|
||||
|
||||
|
@ -29,6 +29,7 @@
|
||||
|
||||
#include "opal/class/opal_bitmap.h"
|
||||
#include "orte/mca/mca.h"
|
||||
#include "opal/runtime/opal_progress_threads.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/mca/base/base.h"
|
||||
|
||||
@ -53,19 +54,20 @@
|
||||
orte_oob_base_t orte_oob_base = {0};
|
||||
OPAL_TIMING_DECLARE(tm_oob)
|
||||
|
||||
|
||||
static int orte_oob_base_register(mca_base_register_flag_t flags)
|
||||
{
|
||||
if (ORTE_PROC_IS_APP || ORTE_PROC_IS_TOOL) {
|
||||
orte_oob_base.use_module_threads = false;
|
||||
orte_oob_base.num_threads = 0;
|
||||
} else {
|
||||
orte_oob_base.use_module_threads = true;
|
||||
orte_oob_base.num_threads = 8;
|
||||
}
|
||||
(void)mca_base_var_register("orte", "oob", "base", "enable_module_progress_threads",
|
||||
"Whether to independently progress OOB messages for each interface",
|
||||
(void)mca_base_var_register("orte", "oob", "base", "num_progress_threads",
|
||||
"Number of independent progress OOB messages for each interface",
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&orte_oob_base.use_module_threads);
|
||||
&orte_oob_base.num_threads);
|
||||
|
||||
#if OPAL_ENABLE_TIMING
|
||||
/* Detailed timing setup */
|
||||
@ -107,6 +109,11 @@ static int orte_oob_base_close(void)
|
||||
|
||||
OBJ_DESTRUCT(&orte_oob_base.peers);
|
||||
|
||||
if (ORTE_PROC_IS_APP || ORTE_PROC_IS_TOOL) {
|
||||
opal_progress_thread_finalize(NULL);
|
||||
} else {
|
||||
opal_progress_thread_finalize("OOB-BASE");
|
||||
}
|
||||
|
||||
OPAL_TIMING_EVENT((&tm_oob, "Finish"));
|
||||
OPAL_TIMING_REPORT(orte_oob_base.timing, &tm_oob);
|
||||
@ -126,6 +133,13 @@ static int orte_oob_base_open(mca_base_open_flag_t flags)
|
||||
opal_hash_table_init(&orte_oob_base.peers, 128);
|
||||
OBJ_CONSTRUCT(&orte_oob_base.actives, opal_list_t);
|
||||
|
||||
if (ORTE_PROC_IS_APP || ORTE_PROC_IS_TOOL) {
|
||||
orte_oob_base.ev_base = opal_progress_thread_init(NULL);
|
||||
} else {
|
||||
orte_oob_base.ev_base = opal_progress_thread_init("OOB-BASE");
|
||||
}
|
||||
|
||||
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
/* register the FT events callback */
|
||||
orte_state.add_job_state(ORTE_JOB_STATE_FT_CHECKPOINT, orte_oob_base_ft_event, ORTE_ERROR_PRI);
|
||||
|
@ -12,7 +12,7 @@
|
||||
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
|
||||
# Copyright (c) 2012-2013 Los Alamos National Security, LLC.
|
||||
# All rights reserved
|
||||
# Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
# Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
@ -32,7 +32,6 @@ sources = \
|
||||
oob_tcp_sendrecv.h \
|
||||
oob_tcp_hdr.h \
|
||||
oob_tcp_peer.h \
|
||||
oob_tcp_ping.h \
|
||||
oob_tcp.c \
|
||||
oob_tcp_listener.c \
|
||||
oob_tcp_common.c \
|
||||
@ -59,4 +58,3 @@ mca_oob_tcp_la_LDFLAGS = -module -avoid-version
|
||||
noinst_LTLIBRARIES = $(component_noinst)
|
||||
libmca_oob_tcp_la_SOURCES = $(sources)
|
||||
libmca_oob_tcp_la_LDFLAGS = -module -avoid-version
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -46,6 +46,7 @@
|
||||
#endif
|
||||
#include <ctype.h>
|
||||
|
||||
#include "opal/runtime/opal_progress_threads.h"
|
||||
#include "opal/util/show_help.h"
|
||||
#include "opal/util/error.h"
|
||||
#include "opal/util/output.h"
|
||||
@ -69,118 +70,27 @@
|
||||
#include "orte/mca/oob/tcp/oob_tcp_peer.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_common.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_ping.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_sendrecv.h"
|
||||
|
||||
static void tcp_init(void);
|
||||
static void tcp_fini(void);
|
||||
static void accept_connection(const int accepted_fd,
|
||||
const struct sockaddr *addr);
|
||||
static void set_peer(const orte_process_name_t* name,
|
||||
const uint16_t af_family,
|
||||
const char *net, const char *ports);
|
||||
static void ping(const orte_process_name_t *proc);
|
||||
static void send_nb(orte_rml_send_t *msg);
|
||||
static void resend(struct mca_oob_tcp_msg_error_t *mop);
|
||||
static void ft_event(int state);
|
||||
|
||||
mca_oob_tcp_module_t mca_oob_tcp_module = {
|
||||
{
|
||||
tcp_init,
|
||||
tcp_fini,
|
||||
accept_connection,
|
||||
set_peer,
|
||||
ping,
|
||||
send_nb,
|
||||
resend,
|
||||
ft_event
|
||||
}
|
||||
.accept_connection = accept_connection,
|
||||
.ping = ping,
|
||||
.send_nb = send_nb,
|
||||
.resend = resend,
|
||||
.ft_event = ft_event
|
||||
};
|
||||
|
||||
/*
|
||||
* Local utility functions
|
||||
*/
|
||||
static void recv_handler(int sd, short flags, void* user);
|
||||
static void* progress_thread_engine(opal_object_t *obj)
|
||||
{
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s TCP OOB PROGRESS THREAD RUNNING",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
while (mca_oob_tcp_module.ev_active) {
|
||||
opal_event_loop(mca_oob_tcp_module.ev_base, OPAL_EVLOOP_ONCE);
|
||||
}
|
||||
return OPAL_THREAD_CANCELLED;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Initialize global variables used w/in this module.
|
||||
*/
|
||||
static void tcp_init(void)
|
||||
{
|
||||
/* setup the module's state variables */
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_module.peers, opal_hash_table_t);
|
||||
opal_hash_table_init(&mca_oob_tcp_module.peers, 32);
|
||||
mca_oob_tcp_module.ev_active = false;
|
||||
|
||||
if (orte_oob_base.use_module_threads) {
|
||||
/* if we are to use independent progress threads at
|
||||
* the module level, start it now
|
||||
*/
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s STARTING TCP PROGRESS THREAD",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
mca_oob_tcp_module.ev_base = opal_event_base_create();
|
||||
/* construct the thread object */
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_module.progress_thread, opal_thread_t);
|
||||
/* fork off a thread to progress it */
|
||||
mca_oob_tcp_module.progress_thread.t_run = progress_thread_engine;
|
||||
mca_oob_tcp_module.ev_active = true;
|
||||
if (OPAL_SUCCESS != opal_thread_start(&mca_oob_tcp_module.progress_thread)) {
|
||||
opal_output(0, "%s progress thread failed to start",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Module cleanup.
|
||||
*/
|
||||
static void tcp_fini(void)
|
||||
{
|
||||
uint64_t ui64;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
|
||||
/* cleanup all peers */
|
||||
OPAL_HASH_TABLE_FOREACH(ui64, uint64, peer, &mca_oob_tcp_module.peers) {
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s RELEASING PEER OBJ %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == peer) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
|
||||
if (NULL != peer) {
|
||||
OBJ_RELEASE(peer);
|
||||
}
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_module.peers);
|
||||
|
||||
if (mca_oob_tcp_module.ev_active) {
|
||||
/* if we used an independent progress thread at
|
||||
* the module level, stop it now
|
||||
*/
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s STOPPING TCP PROGRESS THREAD",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
/* stop the progress thread */
|
||||
mca_oob_tcp_module.ev_active = false;
|
||||
/* break the event loop */
|
||||
opal_event_base_loopexit(mca_oob_tcp_module.ev_base);
|
||||
/* wait for thread to exit */
|
||||
opal_thread_join(&mca_oob_tcp_module.progress_thread, NULL);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_module.progress_thread);
|
||||
/* release the event base */
|
||||
opal_event_base_free(mca_oob_tcp_module.ev_base);
|
||||
}
|
||||
}
|
||||
|
||||
/* Called by mca_oob_tcp_accept() and connection_handler() on
|
||||
* a socket that has been accepted. This call finishes processing the
|
||||
@ -206,133 +116,19 @@ static void accept_connection(const int accepted_fd,
|
||||
ORTE_ACTIVATE_TCP_ACCEPT_STATE(accepted_fd, addr, recv_handler);
|
||||
}
|
||||
|
||||
/* the host in this case is always in "dot" notation, and
|
||||
* thus we do not need to do a DNS lookup to convert it */
|
||||
static int parse_uri(const uint16_t af_family,
|
||||
const char* host,
|
||||
const char *port,
|
||||
struct sockaddr_storage* inaddr)
|
||||
{
|
||||
struct sockaddr_in *in;
|
||||
|
||||
if (AF_INET == af_family) {
|
||||
memset(inaddr, 0, sizeof(struct sockaddr_in));
|
||||
in = (struct sockaddr_in*) inaddr;
|
||||
in->sin_family = AF_INET;
|
||||
in->sin_addr.s_addr = inet_addr(host);
|
||||
if (in->sin_addr.s_addr == INADDR_NONE) {
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
((struct sockaddr_in*) inaddr)->sin_port = htons(atoi(port));
|
||||
}
|
||||
#if OPAL_ENABLE_IPV6
|
||||
else if (AF_INET6 == af_family) {
|
||||
struct sockaddr_in6 *in6;
|
||||
memset(inaddr, 0, sizeof(struct sockaddr_in6));
|
||||
in6 = (struct sockaddr_in6*) inaddr;
|
||||
|
||||
if (0 == inet_pton(AF_INET6, host, (void*)&in6->sin6_addr)) {
|
||||
opal_output (0, "oob_tcp_parse_uri: Could not convert %s\n", host);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
else {
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Record listening address for this peer - the connection
|
||||
* is created on first-send
|
||||
*/
|
||||
static void process_set_peer(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
int rc=ORTE_SUCCESS;
|
||||
uint64_t *ui64 = (uint64_t*)(&pop->peer);
|
||||
mca_oob_tcp_addr_t *maddr;
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s:tcp:processing set_peer cmd",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
if (AF_INET != pop->af_family) {
|
||||
opal_output_verbose(20, orte_oob_base_framework.framework_output,
|
||||
"%s NOT AF_INET", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&pop->peer))) {
|
||||
peer = OBJ_NEW(mca_oob_tcp_peer_t);
|
||||
peer->name.jobid = pop->peer.jobid;
|
||||
peer->name.vpid = pop->peer.vpid;
|
||||
opal_output_verbose(20, orte_oob_base_framework.framework_output,
|
||||
"%s SET_PEER ADDING PEER %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer));
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_module.peers, (*ui64), peer)) {
|
||||
OBJ_RELEASE(peer);
|
||||
return;
|
||||
}
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
/* we have to initiate the connection because otherwise the
|
||||
* daemon has no way to communicate to us via this component
|
||||
* as the app doesn't have a listening port */
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
}
|
||||
}
|
||||
|
||||
maddr = OBJ_NEW(mca_oob_tcp_addr_t);
|
||||
if (ORTE_SUCCESS != (rc = parse_uri(pop->af_family, pop->net, pop->port, (struct sockaddr_storage*) &(maddr->addr)))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(maddr);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
opal_output_verbose(20, orte_oob_base_framework.framework_output,
|
||||
"%s set_peer: peer %s is listening on net %s port %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer),
|
||||
(NULL == pop->net) ? "NULL" : pop->net,
|
||||
(NULL == pop->port) ? "NULL" : pop->port);
|
||||
opal_list_append(&peer->addrs, &maddr->super);
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(pop);
|
||||
}
|
||||
|
||||
static void set_peer(const orte_process_name_t *name,
|
||||
const uint16_t af_family,
|
||||
const char *net, const char *ports)
|
||||
{
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s:tcp set addr for peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(name));
|
||||
|
||||
/* have to push this into our event base for processing */
|
||||
ORTE_ACTIVATE_TCP_PEER_OP(name, af_family, net, ports, process_set_peer);
|
||||
}
|
||||
|
||||
|
||||
/* API functions */
|
||||
static void process_ping(int fd, short args, void *cbdata)
|
||||
static void ping(const orte_process_name_t *proc)
|
||||
{
|
||||
mca_oob_tcp_ping_t *op = (mca_oob_tcp_ping_t*)cbdata;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s:[%s:%d] processing ping to peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->peer));
|
||||
ORTE_NAME_PRINT(proc));
|
||||
|
||||
/* do we know this peer? */
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&op->peer))) {
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(proc))) {
|
||||
/* push this back to the component so it can try
|
||||
* another module within this transport. If no
|
||||
* module can be found, the component can push back
|
||||
@ -342,9 +138,15 @@ static void process_ping(int fd, short args, void *cbdata)
|
||||
"%s:[%s:%d] hop %s unknown",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->peer));
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, NULL, &op->peer, mca_oob_tcp_component_hop_unknown);
|
||||
goto cleanup;
|
||||
ORTE_NAME_PRINT(proc));
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, NULL, proc, mca_oob_tcp_component_hop_unknown);
|
||||
return;
|
||||
}
|
||||
|
||||
/* has this peer had a progress thread assigned yet? */
|
||||
if (NULL == peer->ev_base) {
|
||||
/* nope - assign one */
|
||||
ORTE_OOB_TCP_NEXT_BASE(peer);
|
||||
}
|
||||
|
||||
/* if we are already connected, there is nothing to do */
|
||||
@ -353,8 +155,8 @@ static void process_ping(int fd, short args, void *cbdata)
|
||||
"%s:[%s:%d] already connected to peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->peer));
|
||||
goto cleanup;
|
||||
ORTE_NAME_PRINT(proc));
|
||||
return;
|
||||
}
|
||||
|
||||
/* if we are already connecting, there is nothing to do */
|
||||
@ -364,39 +166,23 @@ static void process_ping(int fd, short args, void *cbdata)
|
||||
"%s:[%s:%d] already connecting to peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->peer));
|
||||
goto cleanup;
|
||||
ORTE_NAME_PRINT(proc));
|
||||
return;
|
||||
}
|
||||
|
||||
/* attempt the connection */
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
static void ping(const orte_process_name_t *proc)
|
||||
static void send_nb(orte_rml_send_t *msg)
|
||||
{
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s:[%s:%d] pinging peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(proc));
|
||||
|
||||
/* push this into our event base for processing */
|
||||
ORTE_ACTIVATE_TCP_PING(proc, process_ping);
|
||||
}
|
||||
|
||||
static void process_send(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_msg_op_t *op = (mca_oob_tcp_msg_op_t*)cbdata;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
orte_process_name_t hop;
|
||||
|
||||
|
||||
/* do we have a route to this peer (could be direct)? */
|
||||
hop = orte_routed.get_route(op->msg->routed, &op->msg->dst);
|
||||
hop = orte_routed.get_route(msg->routed, &msg->dst);
|
||||
/* do we know this hop? */
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&hop))) {
|
||||
/* push this back to the component so it can try
|
||||
@ -408,32 +194,37 @@ static void process_send(int fd, short args, void *cbdata)
|
||||
"%s:[%s:%d] processing send to peer %s:%d seq_num = %d hop %s unknown",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->msg->dst), op->msg->tag, op->msg->seq_num,
|
||||
ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num,
|
||||
ORTE_NAME_PRINT(&hop));
|
||||
ORTE_ACTIVATE_TCP_NO_ROUTE(op->msg, &hop, mca_oob_tcp_component_no_route);
|
||||
goto cleanup;
|
||||
ORTE_ACTIVATE_TCP_NO_ROUTE(msg, &hop, mca_oob_tcp_component_no_route);
|
||||
return;
|
||||
}
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s:[%s:%d] processing send to peer %s:%d seq_num = %d via %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->msg->dst), op->msg->tag, op->msg->seq_num,
|
||||
ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num,
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
/* has this peer had a progress thread assigned yet? */
|
||||
if (NULL == peer->ev_base) {
|
||||
/* nope - assign one */
|
||||
ORTE_OOB_TCP_NEXT_BASE(peer);
|
||||
}
|
||||
/* add the msg to the hop's send queue */
|
||||
if (MCA_OOB_TCP_CONNECTED == peer->state) {
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:send_nb: already connected to %s - queueing for send",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
MCA_OOB_TCP_QUEUE_SEND(op->msg, peer);
|
||||
goto cleanup;
|
||||
MCA_OOB_TCP_QUEUE_SEND(msg, peer);
|
||||
return;
|
||||
}
|
||||
|
||||
/* add the message to the queue for sending after the
|
||||
* connection is formed
|
||||
*/
|
||||
MCA_OOB_TCP_QUEUE_PENDING(op->msg, peer);
|
||||
MCA_OOB_TCP_QUEUE_PENDING(msg, peer);
|
||||
|
||||
if (MCA_OOB_TCP_CONNECTING != peer->state &&
|
||||
MCA_OOB_TCP_CONNECT_ACK != peer->state) {
|
||||
@ -449,34 +240,20 @@ static void process_send(int fd, short args, void *cbdata)
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
static void send_nb(orte_rml_send_t *msg)
|
||||
static void resend(struct mca_oob_tcp_msg_error_t *mpi)
|
||||
{
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:send_nb to peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&msg->dst));
|
||||
|
||||
/* push this into our event base for processing */
|
||||
ORTE_ACTIVATE_TCP_POST_SEND(msg, process_send);
|
||||
}
|
||||
|
||||
static void process_resend(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_msg_error_t *op = (mca_oob_tcp_msg_error_t*)cbdata;
|
||||
mca_oob_tcp_msg_error_t *mp = (mca_oob_tcp_msg_error_t*)mpi;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s:tcp processing resend to peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&op->hop));
|
||||
ORTE_NAME_PRINT(&mp->hop));
|
||||
|
||||
/* do we know this peer? */
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&op->hop))) {
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&mp->hop))) {
|
||||
/* push this back to the component so it can try
|
||||
* another module within this transport. If no
|
||||
* module can be found, the component can push back
|
||||
@ -486,9 +263,15 @@ static void process_resend(int fd, short args, void *cbdata)
|
||||
"%s:[%s:%d] peer %s unknown",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->hop));
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(op->snd, NULL, &op->hop, mca_oob_tcp_component_hop_unknown);
|
||||
goto cleanup;
|
||||
ORTE_NAME_PRINT(&mp->hop));
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(mp->snd, NULL, &mp->hop, mca_oob_tcp_component_hop_unknown);
|
||||
return;
|
||||
}
|
||||
|
||||
/* should be impossible, but...has this peer had a progress thread assigned yet? */
|
||||
if (NULL == peer->ev_base) {
|
||||
/* nope - assign one */
|
||||
ORTE_OOB_TCP_NEXT_BASE(peer);
|
||||
}
|
||||
|
||||
/* add the msg to this peer's send queue */
|
||||
@ -497,8 +280,8 @@ static void process_resend(int fd, short args, void *cbdata)
|
||||
"%s tcp:resend: already connected to %s - queueing for send",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
MCA_OOB_TCP_QUEUE_MSG(peer, op->snd, true);
|
||||
goto cleanup;
|
||||
MCA_OOB_TCP_QUEUE_MSG(peer, mp->snd, true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (MCA_OOB_TCP_CONNECTING != peer->state &&
|
||||
@ -506,7 +289,7 @@ static void process_resend(int fd, short args, void *cbdata)
|
||||
/* add the message to the queue for sending after the
|
||||
* connection is formed
|
||||
*/
|
||||
MCA_OOB_TCP_QUEUE_MSG(peer, op->snd, false);
|
||||
MCA_OOB_TCP_QUEUE_MSG(peer, mp->snd, false);
|
||||
/* we have to initiate the connection - again, we do not
|
||||
* want to block while the connection is created.
|
||||
* So throw us into an event that will create
|
||||
@ -519,22 +302,6 @@ static void process_resend(int fd, short args, void *cbdata)
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
static void resend(struct mca_oob_tcp_msg_error_t *mp)
|
||||
{
|
||||
mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)mp;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:resend to peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mop->hop));
|
||||
|
||||
/* push this into our event base for processing */
|
||||
ORTE_ACTIVATE_TCP_POST_RESEND(mop, process_resend);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -548,7 +315,6 @@ static void recv_handler(int sd, short flg, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata;
|
||||
int flags;
|
||||
uint64_t *ui64;
|
||||
mca_oob_tcp_hdr_t hdr;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
|
||||
@ -591,9 +357,6 @@ static void recv_handler(int sd, short flg, void *cbdata)
|
||||
peer->state);
|
||||
}
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
ui64 = (uint64_t*)(&peer->name);
|
||||
(void)opal_hash_table_set_value_uint64(&mca_oob_tcp_module.peers, (*ui64), NULL);
|
||||
OBJ_RELEASE(peer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -55,34 +55,19 @@ typedef struct {
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_nicaddr_t);
|
||||
|
||||
/* Module definition */
|
||||
typedef void (*mca_oob_tcp_module_init_fn_t)(void);
|
||||
typedef void (*mca_oob_tcp_module_fini_fn_t)(void);
|
||||
typedef void (*mca_oob_tcp_module_accept_connection_fn_t)(const int accepted_fd,
|
||||
const struct sockaddr *addr);
|
||||
typedef void (*mca_oob_tcp_module_set_peer_fn_t)(const orte_process_name_t* name,
|
||||
const uint16_t af_family,
|
||||
const char *net, const char *ports);
|
||||
typedef void (*mca_oob_tcp_module_ping_fn_t)(const orte_process_name_t *proc);
|
||||
typedef void (*mca_oob_tcp_module_send_nb_fn_t)(orte_rml_send_t *msg);
|
||||
typedef void (*mca_oob_tcp_module_resend_nb_fn_t)(struct mca_oob_tcp_msg_error_t *mop);
|
||||
typedef void (*mca_oob_tcp_module_ft_event_fn_t)(int state);
|
||||
|
||||
typedef struct {
|
||||
mca_oob_tcp_module_init_fn_t init;
|
||||
mca_oob_tcp_module_fini_fn_t finalize;
|
||||
mca_oob_tcp_module_accept_connection_fn_t accept_connection;
|
||||
mca_oob_tcp_module_set_peer_fn_t set_peer;
|
||||
mca_oob_tcp_module_ping_fn_t ping;
|
||||
mca_oob_tcp_module_send_nb_fn_t send_nb;
|
||||
mca_oob_tcp_module_resend_nb_fn_t resend;
|
||||
mca_oob_tcp_module_ft_event_fn_t ft_event;
|
||||
} mca_oob_tcp_module_api_t;
|
||||
typedef struct {
|
||||
mca_oob_tcp_module_api_t api;
|
||||
opal_event_base_t *ev_base; /* event base for the module progress thread */
|
||||
bool ev_active;
|
||||
opal_thread_t progress_thread;
|
||||
opal_hash_table_t peers; // connection addresses for peers
|
||||
} mca_oob_tcp_module_t;
|
||||
ORTE_MODULE_DECLSPEC extern mca_oob_tcp_module_t mca_oob_tcp_module;
|
||||
|
||||
@ -103,9 +88,9 @@ typedef enum {
|
||||
/* module-level shared functions */
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_send_handler(int fd, short args, void *cbdata);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_recv_handler(int fd, short args, void *cbdata);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_queue_msg(int sd, short args, void *cbdata);
|
||||
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* MCA_OOB_TCP_H_ */
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2009-2015 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -194,7 +194,7 @@ mca_oob_tcp_peer_t* mca_oob_tcp_peer_lookup(const orte_process_name_t *name)
|
||||
uint64_t ui64;
|
||||
|
||||
memcpy(&ui64, (char*)name, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_module.peers, ui64, (void**)&peer)) {
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&peer)) {
|
||||
return NULL;
|
||||
}
|
||||
return peer;
|
||||
@ -221,4 +221,3 @@ char* mca_oob_tcp_state_print(mca_oob_tcp_state_t state)
|
||||
return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,7 +14,7 @@
|
||||
* reserved.
|
||||
* Copyright (c) 2009-2015 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 NVIDIA Corporation. All rights reserved.
|
||||
* Copyright (c) 2015-2017 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
@ -61,6 +61,8 @@
|
||||
#include "opal/util/argv.h"
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "opal/mca/event/event.h"
|
||||
#include "opal/runtime/opal_progress_threads.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
@ -75,11 +77,11 @@
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include "orte/mca/oob/tcp/oob_tcp.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_common.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_component.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_peer.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_listener.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_ping.h"
|
||||
/*
|
||||
* Local utility functions
|
||||
*/
|
||||
@ -139,7 +141,13 @@ mca_oob_tcp_component_t mca_oob_tcp_component = {
|
||||
*/
|
||||
static int tcp_component_open(void)
|
||||
{
|
||||
/* initialize state */
|
||||
mca_oob_tcp_component.next_base = 0;
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.peers, opal_hash_table_t);
|
||||
opal_hash_table_init(&mca_oob_tcp_component.peers, 32);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.ev_bases, opal_pointer_array_t);
|
||||
opal_pointer_array_init(&mca_oob_tcp_component.ev_bases,
|
||||
orte_oob_base.num_threads, 256, 8);
|
||||
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.listeners, opal_list_t);
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.listen_thread, opal_thread_t);
|
||||
@ -174,9 +182,24 @@ static int tcp_component_open(void)
|
||||
*/
|
||||
static int tcp_component_close(void)
|
||||
{
|
||||
/* cleanup listen event list */
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
uint64_t ui64;
|
||||
|
||||
/* cleanup listen event list */
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.listeners);
|
||||
|
||||
/* cleanup all peers */
|
||||
OPAL_HASH_TABLE_FOREACH(ui64, uint64, peer, &mca_oob_tcp_component.peers) {
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s RELEASING PEER OBJ %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == peer) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
|
||||
if (NULL != peer) {
|
||||
OBJ_RELEASE(peer);
|
||||
}
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.peers);
|
||||
|
||||
if (NULL != mca_oob_tcp_component.ipv4conns) {
|
||||
opal_argv_free(mca_oob_tcp_component.ipv4conns);
|
||||
}
|
||||
@ -193,6 +216,8 @@ static int tcp_component_close(void)
|
||||
}
|
||||
#endif
|
||||
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.ev_bases);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
static char *static_port_string;
|
||||
@ -608,10 +633,6 @@ static int component_available(void)
|
||||
return ORTE_ERR_NOT_AVAILABLE;
|
||||
}
|
||||
|
||||
/* set the module event base - this is where we would spin off a separate
|
||||
* progress thread if so desired */
|
||||
mca_oob_tcp_module.ev_base = orte_event_base;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -650,14 +671,25 @@ static orte_rml_pathway_t* component_query_transports(void)
|
||||
static int component_startup(void)
|
||||
{
|
||||
int rc = ORTE_SUCCESS;
|
||||
int i;
|
||||
char *tmp;
|
||||
opal_event_base_t *evb;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s TCP STARTUP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
/* start the module */
|
||||
if (NULL != mca_oob_tcp_module.api.init) {
|
||||
mca_oob_tcp_module.api.init();
|
||||
/* initialize state */
|
||||
if (0 == orte_oob_base.num_threads) {
|
||||
opal_pointer_array_add(&mca_oob_tcp_component.ev_bases, orte_oob_base.ev_base);
|
||||
} else {
|
||||
for (i=0; i < orte_oob_base.num_threads; i++) {
|
||||
asprintf(&tmp, "OOB-TCP-%d", i);
|
||||
evb = opal_progress_thread_init(tmp);
|
||||
opal_pointer_array_add(&mca_oob_tcp_component.ev_bases, evb);
|
||||
opal_argv_append_nosize(&mca_oob_tcp_component.ev_threads, tmp);
|
||||
free(tmp);
|
||||
}
|
||||
}
|
||||
|
||||
/* if we are a daemon/HNP, or we are a standalone app,
|
||||
@ -697,6 +729,14 @@ static void component_shutdown(void)
|
||||
"%s TCP SHUTDOWN",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
if (0 < orte_oob_base.num_threads) {
|
||||
for (i=0; i < orte_oob_base.num_threads; i++) {
|
||||
opal_progress_thread_finalize(mca_oob_tcp_component.ev_threads[i]);
|
||||
opal_pointer_array_set_item(&mca_oob_tcp_component.ev_bases, i, NULL);
|
||||
}
|
||||
opal_argv_free(mca_oob_tcp_component.ev_threads);
|
||||
}
|
||||
|
||||
if (ORTE_PROC_IS_HNP && mca_oob_tcp_component.listen_thread_active) {
|
||||
mca_oob_tcp_component.listen_thread_active = false;
|
||||
/* tell the thread to exit */
|
||||
@ -723,13 +763,7 @@ static void component_shutdown(void)
|
||||
/* we can call the destruct directly */
|
||||
cleanup(0, 0, NULL);
|
||||
}
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"all listeners released");
|
||||
|
||||
/* shutdown the module */
|
||||
if (NULL != mca_oob_tcp_module.api.finalize) {
|
||||
mca_oob_tcp_module.api.finalize();
|
||||
}
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s TCP SHUTDOWN done",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
@ -742,17 +776,14 @@ static int component_send(orte_rml_send_t *msg)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num );
|
||||
|
||||
/* the module is potentially running on its own event
|
||||
* base, so all it can do is push our send request
|
||||
* onto an event - it cannot tell us if it will
|
||||
* succeed. The module will first see if it knows
|
||||
/* The module will first see if it knows
|
||||
* of a way to send the data to the target, and then
|
||||
* attempt to send the data. It will call the cbfunc
|
||||
* with the status upon completion - if it can't do it for
|
||||
* some reason, it will call the component error
|
||||
* function so we can do something about it
|
||||
* some reason, it will pass the error to our fn below so
|
||||
* it can do something about it
|
||||
*/
|
||||
mca_oob_tcp_module.api.send_nb(msg);
|
||||
mca_oob_tcp_module.send_nb(msg);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -803,15 +834,54 @@ static char* component_get_addr(void)
|
||||
return cptr;
|
||||
}
|
||||
|
||||
/* the host in this case is always in "dot" notation, and
|
||||
* thus we do not need to do a DNS lookup to convert it */
|
||||
static int parse_uri(const uint16_t af_family,
|
||||
const char* host,
|
||||
const char *port,
|
||||
struct sockaddr_storage* inaddr)
|
||||
{
|
||||
struct sockaddr_in *in;
|
||||
|
||||
if (AF_INET == af_family) {
|
||||
memset(inaddr, 0, sizeof(struct sockaddr_in));
|
||||
in = (struct sockaddr_in*) inaddr;
|
||||
in->sin_family = AF_INET;
|
||||
in->sin_addr.s_addr = inet_addr(host);
|
||||
if (in->sin_addr.s_addr == INADDR_NONE) {
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
((struct sockaddr_in*) inaddr)->sin_port = htons(atoi(port));
|
||||
}
|
||||
#if OPAL_ENABLE_IPV6
|
||||
else if (AF_INET6 == af_family) {
|
||||
struct sockaddr_in6 *in6;
|
||||
memset(inaddr, 0, sizeof(struct sockaddr_in6));
|
||||
in6 = (struct sockaddr_in6*) inaddr;
|
||||
|
||||
if (0 == inet_pton(AF_INET6, host, (void*)&in6->sin6_addr)) {
|
||||
opal_output (0, "oob_tcp_parse_uri: Could not convert %s\n", host);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
else {
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int component_set_addr(orte_process_name_t *peer,
|
||||
char **uris)
|
||||
{
|
||||
char **addrs, *hptr;
|
||||
char *tcpuri=NULL, *host, *ports;
|
||||
int i, j;
|
||||
int i, j, rc;
|
||||
uint16_t af_family = AF_UNSPEC;
|
||||
uint64_t ui64;
|
||||
bool found;
|
||||
mca_oob_tcp_peer_t *pr;
|
||||
mca_oob_tcp_addr_t *maddr;
|
||||
|
||||
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
|
||||
/* cycle across component parts and see if one belongs to us */
|
||||
@ -908,17 +978,37 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
host = addrs[j];
|
||||
}
|
||||
|
||||
/* pass this proc, and its ports, to the
|
||||
* module for handling - this module will be responsible
|
||||
* for communicating with the proc via this network.
|
||||
* Note that the modules are *not* necessarily running
|
||||
* on our event base - thus, the modules will push this
|
||||
* call into their own event base for processing.
|
||||
*/
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s PASSING ADDR %s TO MODULE",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), host);
|
||||
mca_oob_tcp_module.api.set_peer(peer, af_family, host, ports);
|
||||
if (NULL == (pr = mca_oob_tcp_peer_lookup(peer))) {
|
||||
pr = OBJ_NEW(mca_oob_tcp_peer_t);
|
||||
pr->name.jobid = peer->jobid;
|
||||
pr->name.vpid = peer->vpid;
|
||||
opal_output_verbose(20, orte_oob_base_framework.framework_output,
|
||||
"%s SET_PEER ADDING PEER %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer));
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, pr)) {
|
||||
OBJ_RELEASE(pr);
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
}
|
||||
|
||||
maddr = OBJ_NEW(mca_oob_tcp_addr_t);
|
||||
if (ORTE_SUCCESS != (rc = parse_uri(af_family, host, ports, (struct sockaddr_storage*) &(maddr->addr)))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(maddr);
|
||||
opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, NULL);
|
||||
OBJ_RELEASE(pr);
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
|
||||
opal_output_verbose(20, orte_oob_base_framework.framework_output,
|
||||
"%s set_peer: peer %s is listening on net %s port %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer),
|
||||
(NULL == host) ? "NULL" : host,
|
||||
(NULL == ports) ? "NULL" : ports);
|
||||
opal_list_append(&pr->addrs, &maddr->super);
|
||||
|
||||
found = true;
|
||||
}
|
||||
opal_argv_free(addrs);
|
||||
@ -1285,6 +1375,7 @@ static char **split_and_resolve(char **orig_str, char *name)
|
||||
|
||||
static void peer_cons(mca_oob_tcp_peer_t *peer)
|
||||
{
|
||||
peer->ev_base = NULL;
|
||||
peer->auth_method = NULL;
|
||||
peer->sd = -1;
|
||||
OBJ_CONSTRUCT(&peer->addrs, opal_list_t);
|
||||
@ -1367,10 +1458,6 @@ OBJ_CLASS_INSTANCE(mca_oob_tcp_conn_op_t,
|
||||
opal_object_t,
|
||||
NULL, NULL);
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_oob_tcp_ping_t,
|
||||
opal_object_t,
|
||||
NULL, NULL);
|
||||
|
||||
static void nicaddr_cons(mca_oob_tcp_nicaddr_t *ptr)
|
||||
{
|
||||
ptr->af_family = PF_UNSPEC;
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -32,6 +32,8 @@
|
||||
#include "opal/class/opal_bitmap.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "opal/class/opal_pointer_array.h"
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
#include "opal/mca/event/event.h"
|
||||
|
||||
#include "orte/mca/oob/oob.h"
|
||||
#include "oob_tcp.h"
|
||||
@ -46,6 +48,10 @@ typedef struct {
|
||||
int max_retries; /**< max number of retries before declaring peer gone */
|
||||
opal_list_t events; /**< events for monitoring connections */
|
||||
int peer_limit; /**< max size of tcp peer cache */
|
||||
opal_pointer_array_t ev_bases; // event base array for progress threads
|
||||
char** ev_threads; // event progress thread names
|
||||
int next_base; // counter to load-level thread use
|
||||
opal_hash_table_t peers; // connection addresses for peers
|
||||
|
||||
/* Port specifications */
|
||||
char* if_include; /**< list of ip interfaces to include */
|
||||
@ -90,4 +96,13 @@ ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_failed_to_connect(int fd, short
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata);
|
||||
|
||||
#define ORTE_OOB_TCP_NEXT_BASE(p) \
|
||||
do { \
|
||||
++mca_oob_tcp_component.next_base; \
|
||||
if (orte_oob_base.num_threads <= mca_oob_tcp_component.next_base) { \
|
||||
mca_oob_tcp_component.next_base = 0; \
|
||||
} \
|
||||
(p)->ev_base = (opal_event_base_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.ev_bases, mca_oob_tcp_component.next_base); \
|
||||
} while(0)
|
||||
|
||||
#endif /* _MCA_OOB_TCP_COMPONENT_H_ */
|
||||
|
@ -13,7 +13,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2009-2014 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2015 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved.
|
||||
@ -312,7 +312,7 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
|
||||
* an event in the component event base, and so it will fire async
|
||||
* from us if we are in our own progress thread
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(&peer->name, NULL, mca_oob_tcp_component_failed_to_connect);
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(peer, NULL, mca_oob_tcp_component_failed_to_connect);
|
||||
/* FIXME: post any messages in the send queue back to the OOB
|
||||
* level for reassignment
|
||||
*/
|
||||
@ -501,7 +501,7 @@ static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
if (peer->sd >= 0) {
|
||||
assert(!peer->send_ev_active && !peer->recv_ev_active);
|
||||
opal_event_set(mca_oob_tcp_module.ev_base,
|
||||
opal_event_set(peer->ev_base,
|
||||
&peer->recv_event,
|
||||
peer->sd,
|
||||
OPAL_EV_READ|OPAL_EV_PERSIST,
|
||||
@ -513,7 +513,7 @@ static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
|
||||
peer->recv_ev_active = false;
|
||||
}
|
||||
|
||||
opal_event_set(mca_oob_tcp_module.ev_base,
|
||||
opal_event_set(peer->ev_base,
|
||||
&peer->send_event,
|
||||
peer->sd,
|
||||
OPAL_EV_WRITE|OPAL_EV_PERSIST,
|
||||
@ -792,9 +792,10 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
peer = OBJ_NEW(mca_oob_tcp_peer_t);
|
||||
peer->name = hdr.origin;
|
||||
ORTE_OOB_TCP_NEXT_BASE(peer); // assign it an event base
|
||||
peer->state = MCA_OOB_TCP_ACCEPTING;
|
||||
ui64 = (uint64_t*)(&peer->name);
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_module.peers, (*ui64), peer)) {
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, (*ui64), peer)) {
|
||||
OBJ_RELEASE(peer);
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
@ -941,7 +942,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
|
||||
/* set the peer into the component and OOB-level peer tables to indicate
|
||||
* that we know this peer and we will be handling him
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(&peer->name, NULL, mca_oob_tcp_component_set_module);
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(peer, NULL, mca_oob_tcp_component_set_module);
|
||||
|
||||
/* connected */
|
||||
tcp_peer_connected(peer);
|
||||
@ -1030,7 +1031,7 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
|
||||
/* inform the component-level that we have lost a connection so
|
||||
* it can decide what to do about it.
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(&peer->name, NULL, mca_oob_tcp_component_lost_connection);
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(peer, NULL, mca_oob_tcp_component_lost_connection);
|
||||
|
||||
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
|
||||
/* nothing more to do */
|
||||
@ -1241,7 +1242,7 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer)
|
||||
/* set the peer into the component and OOB-level peer tables to indicate
|
||||
* that we know this peer and we will be handling him
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(&peer->name, NULL, mca_oob_tcp_component_set_module);
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(peer, NULL, mca_oob_tcp_component_set_module);
|
||||
|
||||
tcp_peer_connected(peer);
|
||||
if (!peer->recv_ev_active) {
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -59,23 +59,23 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
ORTE_NAME_PRINT((&(p)->name))); \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
cop->peer = (p); \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &cop->ev, -1, \
|
||||
opal_event_set((p)->ev_base, &cop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), cop); \
|
||||
opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&cop->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_ACCEPT_STATE(s, a, cbfunc) \
|
||||
#define ORTE_ACTIVATE_TCP_ACCEPT_STATE(s, a, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_conn_op_t *cop; \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &cop->ev, s, \
|
||||
mca_oob_tcp_conn_op_t *cop; \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
opal_event_set(orte_oob_base.ev_base, &cop->ev, s, \
|
||||
OPAL_EV_READ, (cbfunc), cop); \
|
||||
opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_add(&cop->ev, 0); \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_RETRY_TCP_CONN_STATE(p, cbfunc, tv) \
|
||||
#define ORTE_RETRY_TCP_CONN_STATE(p, cbfunc, tv) \
|
||||
do { \
|
||||
mca_oob_tcp_conn_op_t *cop; \
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
|
||||
@ -85,7 +85,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
ORTE_NAME_PRINT((&(p)->name))); \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
cop->peer = (p); \
|
||||
opal_event_evtimer_set(mca_oob_tcp_module.ev_base, \
|
||||
opal_event_evtimer_set((p)->ev_base, \
|
||||
&cop->ev, \
|
||||
(cbfunc), cop); \
|
||||
opal_event_evtimer_add(&cop->ev, (tv)); \
|
||||
|
@ -13,7 +13,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2009-2015 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2015 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -156,7 +156,7 @@ int orte_oob_tcp_start_listening(void)
|
||||
/* otherwise, setup to listen via the event lib */
|
||||
OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
|
||||
listener->ev_active = true;
|
||||
opal_event_set(orte_event_base, &listener->event,
|
||||
opal_event_set(orte_oob_base.ev_base, &listener->event,
|
||||
listener->sd,
|
||||
OPAL_EV_READ|OPAL_EV_PERSIST,
|
||||
connection_event_handler,
|
||||
@ -736,7 +736,7 @@ static void* listen_thread(opal_object_t *obj)
|
||||
* OS might start rejecting connections due to timeout.
|
||||
*/
|
||||
pending_connection = OBJ_NEW(mca_oob_tcp_pending_connection_t);
|
||||
opal_event_set(orte_event_base, &pending_connection->ev, -1,
|
||||
opal_event_set(orte_oob_base.ev_base, &pending_connection->ev, -1,
|
||||
OPAL_EV_WRITE, connection_handler, pending_connection);
|
||||
opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI);
|
||||
pending_connection->fd = accept(sd,
|
||||
@ -863,8 +863,8 @@ static void connection_handler(int sd, short flags, void* cbdata)
|
||||
opal_net_get_port((struct sockaddr*) &new_connection->addr));
|
||||
|
||||
/* process the connection */
|
||||
mca_oob_tcp_module.api.accept_connection(new_connection->fd,
|
||||
(struct sockaddr*) &(new_connection->addr));
|
||||
mca_oob_tcp_module.accept_connection(new_connection->fd,
|
||||
(struct sockaddr*) &(new_connection->addr));
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(new_connection);
|
||||
}
|
||||
@ -927,7 +927,7 @@ static void connection_event_handler(int incoming_sd, short flags, void* cbdata)
|
||||
}
|
||||
|
||||
/* process the connection */
|
||||
mca_oob_tcp_module.api.accept_connection(sd, &addr);
|
||||
mca_oob_tcp_module.accept_connection(sd, &addr);
|
||||
}
|
||||
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -25,6 +25,8 @@
|
||||
|
||||
#include "orte_config.h"
|
||||
|
||||
#include "opal/mca/event/event.h"
|
||||
|
||||
#include "oob_tcp.h"
|
||||
#include "oob_tcp_sendrecv.h"
|
||||
|
||||
@ -49,6 +51,7 @@ typedef struct {
|
||||
mca_oob_tcp_addr_t *active_addr;
|
||||
mca_oob_tcp_state_t state;
|
||||
int num_retries;
|
||||
opal_event_base_t *ev_base; // progress thread this peer is assigned to
|
||||
opal_event_t send_event; /**< registration with event thread for send events */
|
||||
bool send_ev_active;
|
||||
opal_event_t recv_event; /**< registration with event thread for recv events */
|
||||
@ -86,7 +89,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_peer_op_t);
|
||||
if (NULL != (pts)) { \
|
||||
pop->port = strdup((pts)); \
|
||||
} \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &pop->ev, -1, \
|
||||
opal_event_set((p)->ev_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
@ -97,13 +100,13 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_peer_op_t);
|
||||
mca_oob_tcp_peer_op_t *pop; \
|
||||
char *proxy; \
|
||||
pop = OBJ_NEW(mca_oob_tcp_peer_op_t); \
|
||||
pop->peer.jobid = (p)->jobid; \
|
||||
pop->peer.vpid = (p)->vpid; \
|
||||
pop->peer.jobid = (p)->name.jobid; \
|
||||
pop->peer.vpid = (p)->name.vpid; \
|
||||
proxy = (r); \
|
||||
if (NULL != proxy) { \
|
||||
pop->rtmod = strdup(proxy); \
|
||||
} \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &pop->ev, -1, \
|
||||
opal_event_set((p)->ev_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
|
@ -1,52 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2006 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef _MCA_OOB_TCP_PING_H_
|
||||
#define _MCA_OOB_TCP_PING_H_
|
||||
|
||||
#include "orte_config.h"
|
||||
|
||||
#include "opal/mca/event/event.h"
|
||||
|
||||
#include "oob_tcp.h"
|
||||
#include "oob_tcp_sendrecv.h"
|
||||
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t ev;
|
||||
orte_process_name_t peer;
|
||||
} mca_oob_tcp_ping_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_ping_t);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_PING(p, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_ping_t *pop; \
|
||||
pop = OBJ_NEW(mca_oob_tcp_ping_t); \
|
||||
pop->peer.jobid = (p)->jobid; \
|
||||
pop->peer.vpid = (p)->vpid; \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
#endif /* _MCA_OOB_TCP_PING_H_ */
|
@ -75,6 +75,33 @@
|
||||
#include "orte/mca/oob/tcp/oob_tcp_common.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
|
||||
|
||||
void mca_oob_tcp_queue_msg(int sd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_send_t *snd = (mca_oob_tcp_send_t*)cbdata;
|
||||
mca_oob_tcp_peer_t *peer = (mca_oob_tcp_peer_t*)snd->peer;
|
||||
|
||||
/* if there is no message on-deck, put this one there */
|
||||
if (NULL == peer->send_msg) {
|
||||
peer->send_msg = snd;
|
||||
} else {
|
||||
/* add it to the queue */
|
||||
opal_list_append(&peer->send_queue, &snd->super);
|
||||
}
|
||||
if (snd->activate) {
|
||||
/* if we aren't connected, then start connecting */
|
||||
if (MCA_OOB_TCP_CONNECTED != peer->state) {
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
} else {
|
||||
/* ensure the send event is active */
|
||||
if (!peer->send_ev_active) {
|
||||
opal_event_add(&peer->send_event, 0);
|
||||
peer->send_ev_active = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int send_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_send_t* msg)
|
||||
{
|
||||
struct iovec iov[2];
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2013 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -32,9 +32,15 @@
|
||||
#include "oob_tcp.h"
|
||||
#include "oob_tcp_hdr.h"
|
||||
|
||||
/* forward declare */
|
||||
struct mca_oob_tcp_peer_t;
|
||||
|
||||
/* tcp structure for sending a message */
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_event_t ev;
|
||||
struct mca_oob_tcp_peer_t *peer;
|
||||
bool activate;
|
||||
mca_oob_tcp_hdr_t hdr;
|
||||
orte_rml_send_t *msg;
|
||||
char *data;
|
||||
@ -74,27 +80,13 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
|
||||
*/
|
||||
#define MCA_OOB_TCP_QUEUE_MSG(p, s, f) \
|
||||
do { \
|
||||
/* if there is no message on-deck, put this one there */ \
|
||||
if (NULL == (p)->send_msg) { \
|
||||
(p)->send_msg = (s); \
|
||||
} else { \
|
||||
/* add it to the queue */ \
|
||||
opal_list_append(&(p)->send_queue, &(s)->super); \
|
||||
} \
|
||||
if ((f)) { \
|
||||
/* if we aren't connected, then start connecting */ \
|
||||
if (MCA_OOB_TCP_CONNECTED != (p)->state) { \
|
||||
(p)->state = MCA_OOB_TCP_CONNECTING; \
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE((p), mca_oob_tcp_peer_try_connect); \
|
||||
} else { \
|
||||
/* ensure the send event is active */ \
|
||||
if (!(p)->send_ev_active) { \
|
||||
opal_event_add(&(p)->send_event, 0); \
|
||||
(p)->send_ev_active = true; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
}while(0);
|
||||
(s)->peer = (struct mca_oob_tcp_peer_t*)(p); \
|
||||
(s)->activate = (f); \
|
||||
opal_event_set((p)->ev_base, &(s)->ev, -1, \
|
||||
OPAL_EV_WRITE, mca_oob_tcp_queue_msg, (s)); \
|
||||
opal_event_set_priority(&(s)->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&(s)->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0)
|
||||
|
||||
/* queue a message to be sent by one of our modules - must
|
||||
* provide the following params:
|
||||
@ -104,44 +96,44 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
|
||||
*/
|
||||
#define MCA_OOB_TCP_QUEUE_SEND(m, p) \
|
||||
do { \
|
||||
mca_oob_tcp_send_t *msg; \
|
||||
mca_oob_tcp_send_t *_s; \
|
||||
int i; \
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
|
||||
"%s:[%s:%d] queue send to %s", \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT(&((m)->dst))); \
|
||||
msg = OBJ_NEW(mca_oob_tcp_send_t); \
|
||||
_s = OBJ_NEW(mca_oob_tcp_send_t); \
|
||||
/* setup the header */ \
|
||||
msg->hdr.origin = (m)->origin; \
|
||||
msg->hdr.dst = (m)->dst; \
|
||||
msg->hdr.type = MCA_OOB_TCP_USER; \
|
||||
msg->hdr.tag = (m)->tag; \
|
||||
msg->hdr.seq_num = (m)->seq_num; \
|
||||
_s->hdr.origin = (m)->origin; \
|
||||
_s->hdr.dst = (m)->dst; \
|
||||
_s->hdr.type = MCA_OOB_TCP_USER; \
|
||||
_s->hdr.tag = (m)->tag; \
|
||||
_s->hdr.seq_num = (m)->seq_num; \
|
||||
if (NULL != (m)->routed) { \
|
||||
(void)strncpy(msg->hdr.routed, (m)->routed, \
|
||||
(void)strncpy(_s->hdr.routed, (m)->routed, \
|
||||
ORTE_MAX_RTD_SIZE); \
|
||||
} \
|
||||
/* point to the actual message */ \
|
||||
msg->msg = (m); \
|
||||
_s->msg = (m); \
|
||||
/* set the total number of bytes to be sent */ \
|
||||
if (NULL != (m)->buffer) { \
|
||||
msg->hdr.nbytes = (m)->buffer->bytes_used; \
|
||||
_s->hdr.nbytes = (m)->buffer->bytes_used; \
|
||||
} else if (NULL != (m)->iov) { \
|
||||
msg->hdr.nbytes = 0; \
|
||||
_s->hdr.nbytes = 0; \
|
||||
for (i=0; i < (m)->count; i++) { \
|
||||
msg->hdr.nbytes += (m)->iov[i].iov_len; \
|
||||
_s->hdr.nbytes += (m)->iov[i].iov_len; \
|
||||
} \
|
||||
} else { \
|
||||
msg->hdr.nbytes = (m)->count; \
|
||||
_s->hdr.nbytes = (m)->count; \
|
||||
} \
|
||||
/* prep header for xmission */ \
|
||||
MCA_OOB_TCP_HDR_HTON(&msg->hdr); \
|
||||
MCA_OOB_TCP_HDR_HTON(&_s->hdr); \
|
||||
/* start the send with the header */ \
|
||||
msg->sdptr = (char*)&msg->hdr; \
|
||||
msg->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
|
||||
_s->sdptr = (char*)&_s->hdr; \
|
||||
_s->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
|
||||
/* add to the msg queue for this peer */ \
|
||||
MCA_OOB_TCP_QUEUE_MSG((p), msg, true); \
|
||||
MCA_OOB_TCP_QUEUE_MSG((p), _s, true); \
|
||||
}while(0);
|
||||
|
||||
/* queue a message to be sent by one of our modules upon completing
|
||||
@ -152,44 +144,44 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
|
||||
*/
|
||||
#define MCA_OOB_TCP_QUEUE_PENDING(m, p) \
|
||||
do { \
|
||||
mca_oob_tcp_send_t *msg; \
|
||||
mca_oob_tcp_send_t *_s; \
|
||||
int i; \
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
|
||||
"%s:[%s:%d] queue pending to %s", \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT(&((m)->dst))); \
|
||||
msg = OBJ_NEW(mca_oob_tcp_send_t); \
|
||||
_s = OBJ_NEW(mca_oob_tcp_send_t); \
|
||||
/* setup the header */ \
|
||||
msg->hdr.origin = (m)->origin; \
|
||||
msg->hdr.dst = (m)->dst; \
|
||||
msg->hdr.type = MCA_OOB_TCP_USER; \
|
||||
msg->hdr.tag = (m)->tag; \
|
||||
msg->hdr.seq_num = (m)->seq_num; \
|
||||
_s->hdr.origin = (m)->origin; \
|
||||
_s->hdr.dst = (m)->dst; \
|
||||
_s->hdr.type = MCA_OOB_TCP_USER; \
|
||||
_s->hdr.tag = (m)->tag; \
|
||||
_s->hdr.seq_num = (m)->seq_num; \
|
||||
if (NULL != (m)->routed) { \
|
||||
(void)strncpy(msg->hdr.routed, (m)->routed, \
|
||||
(void)strncpy(_s->hdr.routed, (m)->routed, \
|
||||
ORTE_MAX_RTD_SIZE); \
|
||||
} \
|
||||
/* point to the actual message */ \
|
||||
msg->msg = (m); \
|
||||
_s->msg = (m); \
|
||||
/* set the total number of bytes to be sent */ \
|
||||
if (NULL != (m)->buffer) { \
|
||||
msg->hdr.nbytes = (m)->buffer->bytes_used; \
|
||||
_s->hdr.nbytes = (m)->buffer->bytes_used; \
|
||||
} else if (NULL != (m)->iov) { \
|
||||
msg->hdr.nbytes = 0; \
|
||||
_s->hdr.nbytes = 0; \
|
||||
for (i=0; i < (m)->count; i++) { \
|
||||
msg->hdr.nbytes += (m)->iov[i].iov_len; \
|
||||
_s->hdr.nbytes += (m)->iov[i].iov_len; \
|
||||
} \
|
||||
} else { \
|
||||
msg->hdr.nbytes = (m)->count; \
|
||||
_s->hdr.nbytes = (m)->count; \
|
||||
} \
|
||||
/* prep header for xmission */ \
|
||||
MCA_OOB_TCP_HDR_HTON(&msg->hdr); \
|
||||
MCA_OOB_TCP_HDR_HTON(&_s->hdr); \
|
||||
/* start the send with the header */ \
|
||||
msg->sdptr = (char*)&msg->hdr; \
|
||||
msg->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
|
||||
_s->sdptr = (char*)&_s->hdr; \
|
||||
_s->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
|
||||
/* add to the msg queue for this peer */ \
|
||||
MCA_OOB_TCP_QUEUE_MSG((p), msg, false); \
|
||||
MCA_OOB_TCP_QUEUE_MSG((p), _s, false); \
|
||||
}while(0);
|
||||
|
||||
/* queue a message for relay by one of our modules - must
|
||||
@ -200,31 +192,31 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
|
||||
*/
|
||||
#define MCA_OOB_TCP_QUEUE_RELAY(m, p) \
|
||||
do { \
|
||||
mca_oob_tcp_send_t *msg; \
|
||||
mca_oob_tcp_send_t *_s; \
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
|
||||
"%s:[%s:%d] queue relay to %s", \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT(&((p)->name))); \
|
||||
msg = OBJ_NEW(mca_oob_tcp_send_t); \
|
||||
_s = OBJ_NEW(mca_oob_tcp_send_t); \
|
||||
/* setup the header */ \
|
||||
msg->hdr.origin = (m)->hdr.origin; \
|
||||
msg->hdr.dst = (m)->hdr.dst; \
|
||||
msg->hdr.type = MCA_OOB_TCP_USER; \
|
||||
msg->hdr.tag = (m)->hdr.tag; \
|
||||
(void)strncpy(msg->hdr.routed, (m)->hdr.routed, \
|
||||
_s->hdr.origin = (m)->hdr.origin; \
|
||||
_s->hdr.dst = (m)->hdr.dst; \
|
||||
_s->hdr.type = MCA_OOB_TCP_USER; \
|
||||
_s->hdr.tag = (m)->hdr.tag; \
|
||||
(void)strncpy(_s->hdr.routed, (m)->hdr.routed, \
|
||||
ORTE_MAX_RTD_SIZE); \
|
||||
/* point to the actual message */ \
|
||||
msg->data = (m)->data; \
|
||||
_s->data = (m)->data; \
|
||||
/* set the total number of bytes to be sent */ \
|
||||
msg->hdr.nbytes = (m)->hdr.nbytes; \
|
||||
_s->hdr.nbytes = (m)->hdr.nbytes; \
|
||||
/* prep header for xmission */ \
|
||||
MCA_OOB_TCP_HDR_HTON(&msg->hdr); \
|
||||
MCA_OOB_TCP_HDR_HTON(&_s->hdr); \
|
||||
/* start the send with the header */ \
|
||||
msg->sdptr = (char*)&msg->hdr; \
|
||||
msg->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
|
||||
_s->sdptr = (char*)&_s->hdr; \
|
||||
_s->sdbytes = sizeof(mca_oob_tcp_hdr_t); \
|
||||
/* add to the msg queue for this peer */ \
|
||||
MCA_OOB_TCP_QUEUE_MSG((p), msg, true); \
|
||||
MCA_OOB_TCP_QUEUE_MSG((p), _s, true); \
|
||||
}while(0);
|
||||
|
||||
/* State machine for processing message */
|
||||
@ -245,7 +237,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_op_t);
|
||||
ORTE_NAME_PRINT(&((ms)->dst))); \
|
||||
mop = OBJ_NEW(mca_oob_tcp_msg_op_t); \
|
||||
mop->msg = (ms); \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &mop->ev, -1, \
|
||||
opal_event_set((ms)->peer->ev_base, &mop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), mop); \
|
||||
opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&mop->ev, OPAL_EV_WRITE, 1); \
|
||||
@ -293,7 +285,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
mop->hop.jobid = (h)->jobid; \
|
||||
mop->hop.vpid = (h)->vpid; \
|
||||
/* this goes to the OOB framework, so use that event base */ \
|
||||
opal_event_set(orte_event_base, &mop->ev, -1, \
|
||||
opal_event_set(orte_oob_base.ev_base, &mop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), mop); \
|
||||
opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&mop->ev, OPAL_EV_WRITE, 1); \
|
||||
@ -310,7 +302,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
mp = OBJ_NEW(mca_oob_tcp_msg_error_t); \
|
||||
mp->snd = (mop)->snd; \
|
||||
mp->hop = (mop)->hop; \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &mp->ev, -1, \
|
||||
opal_event_set(op->snd->peer->ev_base, &mp->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), mp); \
|
||||
opal_event_set_priority(&mp->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&mp->ev, OPAL_EV_WRITE, 1); \
|
||||
@ -329,7 +321,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
mop->hop.jobid = (h)->jobid; \
|
||||
mop->hop.vpid = (h)->vpid; \
|
||||
/* this goes to the OOB framework, so use that event base */ \
|
||||
opal_event_set(orte_event_base, &mop->ev, -1, \
|
||||
opal_event_set(orte_oob_base.ev_base, &mop->ev, -1, \
|
||||
OPAL_EV_WRITE, (c), mop); \
|
||||
opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&mop->ev, OPAL_EV_WRITE, 1); \
|
||||
|
@ -389,6 +389,8 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata)
|
||||
OBJ_RELEASE(caddy);
|
||||
return;
|
||||
}
|
||||
/* mark that nodes were assigned to this job */
|
||||
ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_MAP_INITIALIZED);
|
||||
|
||||
/* if any node is oversubscribed, then check to see if a binding
|
||||
* directive was given - if not, then we want to clear the default
|
||||
|
@ -12,7 +12,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -67,11 +67,14 @@ static void send_self_exe(int fd, short args, void* data)
|
||||
OBJ_RELEASE(xfer);
|
||||
}
|
||||
|
||||
static void send_msg(int fd, short args, void *cbdata)
|
||||
int orte_rml_oob_send_nb(struct orte_rml_base_module_t *mod,
|
||||
orte_process_name_t* peer,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
orte_rml_send_request_t *req = (orte_rml_send_request_t*)cbdata;
|
||||
orte_process_name_t *peer = &(req->send.dst);
|
||||
orte_rml_tag_t tag = req->send.tag;
|
||||
orte_rml_recv_t *rcv;
|
||||
orte_rml_send_t *snd;
|
||||
int bytes;
|
||||
@ -80,9 +83,22 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
char* ptr;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"%s rml_send_msg to peer %s at tag %d",
|
||||
"%s rml_send to peer %s at tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), tag));
|
||||
|
||||
if (ORTE_RML_TAG_INVALID == tag) {
|
||||
/* cannot send to an invalid tag */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
if (NULL == peer ||
|
||||
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
|
||||
/* cannot send to an invalid peer */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
OPAL_TIMING_EVENT((&tm_rml, "to %s", ORTE_NAME_PRINT(peer)));
|
||||
|
||||
/* if this is a message to myself, then just post the message
|
||||
@ -110,16 +126,11 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
|
||||
/* setup the send callback */
|
||||
xfer = OBJ_NEW(orte_self_send_xfer_t);
|
||||
if (NULL != req->send.iov) {
|
||||
xfer->iov = req->send.iov;
|
||||
xfer->count = req->send.count;
|
||||
xfer->cbfunc.iov = req->send.cbfunc.iov;
|
||||
} else {
|
||||
xfer->buffer = req->send.buffer;
|
||||
xfer->cbfunc.buffer = req->send.cbfunc.buffer;
|
||||
}
|
||||
xfer->iov = iov;
|
||||
xfer->count = count;
|
||||
xfer->cbfunc.iov = cbfunc;
|
||||
xfer->tag = tag;
|
||||
xfer->cbdata = req->send.cbdata;
|
||||
xfer->cbdata = cbdata;
|
||||
/* setup the event for the send callback */
|
||||
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
|
||||
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
|
||||
@ -129,99 +140,42 @@ static void send_msg(int fd, short args, void *cbdata)
|
||||
rcv = OBJ_NEW(orte_rml_recv_t);
|
||||
rcv->sender = *peer;
|
||||
rcv->tag = tag;
|
||||
if (NULL != req->send.iov) {
|
||||
/* get the total number of bytes in the iovec array */
|
||||
bytes = 0;
|
||||
for (i = 0 ; i < req->send.count ; ++i) {
|
||||
bytes += req->send.iov[i].iov_len;
|
||||
/* get the total number of bytes in the iovec array */
|
||||
bytes = 0;
|
||||
for (i = 0 ; i < count ; ++i) {
|
||||
bytes += iov[i].iov_len;
|
||||
}
|
||||
/* get the required memory allocation */
|
||||
if (0 < bytes) {
|
||||
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
|
||||
rcv->iov.iov_len = bytes;
|
||||
/* transfer the bytes */
|
||||
ptr = (char*)rcv->iov.iov_base;
|
||||
for (i = 0 ; i < count ; ++i) {
|
||||
memcpy(ptr, iov[i].iov_base, iov[i].iov_len);
|
||||
ptr += iov[i].iov_len;
|
||||
}
|
||||
/* get the required memory allocation */
|
||||
if (0 < bytes) {
|
||||
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes);
|
||||
rcv->iov.iov_len = bytes;
|
||||
/* transfer the bytes */
|
||||
ptr = (char*)rcv->iov.iov_base;
|
||||
for (i = 0 ; i < req->send.count ; ++i) {
|
||||
memcpy(ptr, req->send.iov[i].iov_base, req->send.iov[i].iov_len);
|
||||
ptr += req->send.iov[i].iov_len;
|
||||
}
|
||||
}
|
||||
} else if (0 < req->send.buffer->bytes_used) {
|
||||
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(req->send.buffer->bytes_used);
|
||||
memcpy(rcv->iov.iov_base, req->send.buffer->base_ptr, req->send.buffer->bytes_used);
|
||||
rcv->iov.iov_len = req->send.buffer->bytes_used;
|
||||
}
|
||||
/* post the message for receipt - since the send callback was posted
|
||||
* first and has the same priority, it will execute first
|
||||
*/
|
||||
ORTE_RML_ACTIVATE_MESSAGE(rcv);
|
||||
OBJ_RELEASE(req);
|
||||
return;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
snd = OBJ_NEW(orte_rml_send_t);
|
||||
snd->dst = *peer;
|
||||
snd->origin = *ORTE_PROC_MY_NAME;
|
||||
snd->tag = tag;
|
||||
if (NULL != req->send.iov) {
|
||||
snd->iov = req->send.iov;
|
||||
snd->count = req->send.count;
|
||||
snd->cbfunc.iov = req->send.cbfunc.iov;
|
||||
} else {
|
||||
snd->buffer = req->send.buffer;
|
||||
snd->cbfunc.buffer = req->send.cbfunc.buffer;
|
||||
}
|
||||
snd->cbdata = req->send.cbdata;
|
||||
snd->routed = strdup(req->send.routed);
|
||||
snd->iov = iov;
|
||||
snd->count = count;
|
||||
snd->cbfunc.iov = cbfunc;
|
||||
snd->cbdata = cbdata;
|
||||
snd->routed = strdup(mod->routed);
|
||||
|
||||
/* activate the OOB send state */
|
||||
ORTE_OOB_SEND(snd);
|
||||
|
||||
OBJ_RELEASE(req);
|
||||
}
|
||||
|
||||
int orte_rml_oob_send_nb(struct orte_rml_base_module_t *mod,
|
||||
orte_process_name_t* peer,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
orte_rml_send_request_t *req;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"%s rml_send to peer %s at tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), tag));
|
||||
|
||||
if (ORTE_RML_TAG_INVALID == tag) {
|
||||
/* cannot send to an invalid tag */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
if (NULL == peer ||
|
||||
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_NAME_INVALID, peer)) {
|
||||
/* cannot send to an invalid peer */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
/* get ourselves into an event to protect against
|
||||
* race conditions and threads
|
||||
*/
|
||||
req = OBJ_NEW(orte_rml_send_request_t);
|
||||
req->send.dst = *peer;
|
||||
req->send.iov = iov;
|
||||
req->send.count = count;
|
||||
req->send.tag = tag;
|
||||
req->send.cbfunc.iov = cbfunc;
|
||||
req->send.cbdata = cbdata;
|
||||
req->send.routed = strdup(mod->routed);
|
||||
/* setup the event for the send callback */
|
||||
opal_event_set(orte_event_base, &req->ev, -1, OPAL_EV_WRITE, send_msg, req);
|
||||
opal_event_set_priority(&req->ev, ORTE_MSG_PRI);
|
||||
opal_event_active(&req->ev, OPAL_EV_WRITE, 1);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -232,7 +186,9 @@ int orte_rml_oob_send_buffer_nb(struct orte_rml_base_module_t *mod,
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
orte_rml_send_request_t *req;
|
||||
orte_rml_recv_t *rcv;
|
||||
orte_rml_send_t *snd;
|
||||
orte_self_send_xfer_t *xfer;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"%s rml_send_buffer to peer %s at tag %d",
|
||||
@ -250,20 +206,68 @@ int orte_rml_oob_send_buffer_nb(struct orte_rml_base_module_t *mod,
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
/* get ourselves into an event to protect against
|
||||
* race conditions and threads
|
||||
|
||||
OPAL_TIMING_EVENT((&tm_rml, "to %s", ORTE_NAME_PRINT(peer)));
|
||||
|
||||
/* if this is a message to myself, then just post the message
|
||||
* for receipt - no need to dive into the oob
|
||||
*/
|
||||
req = OBJ_NEW(orte_rml_send_request_t);
|
||||
req->send.dst = *peer;
|
||||
req->send.buffer = buffer;
|
||||
req->send.tag = tag;
|
||||
req->send.cbfunc.buffer = cbfunc;
|
||||
req->send.cbdata = cbdata;
|
||||
req->send.routed = strdup(mod->routed);
|
||||
/* setup the event for the send callback */
|
||||
opal_event_set(orte_event_base, &req->ev, -1, OPAL_EV_WRITE, send_msg, req);
|
||||
opal_event_set_priority(&req->ev, ORTE_MSG_PRI);
|
||||
opal_event_active(&req->ev, OPAL_EV_WRITE, 1);
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { /* local delivery */
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rml_base_framework.framework_output,
|
||||
"%s rml_send_iovec_to_self at tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
|
||||
/* send to self is a tad tricky - we really don't want
|
||||
* to track the send callback function throughout the recv
|
||||
* process and execute it upon receipt as this would provide
|
||||
* very different timing from a non-self message. Specifically,
|
||||
* if we just retain a pointer to the incoming data
|
||||
* and then execute the send callback prior to the receive,
|
||||
* then the caller will think we are done with the data and
|
||||
* can release it. So we have to copy the data in order to
|
||||
* execute the send callback prior to receiving the message.
|
||||
*
|
||||
* In truth, this really is a better mimic of the non-self
|
||||
* message behavior. If we actually pushed the message out
|
||||
* on the wire and had it loop back, then we would receive
|
||||
* a new block of data anyway.
|
||||
*/
|
||||
|
||||
/* setup the send callback */
|
||||
xfer = OBJ_NEW(orte_self_send_xfer_t);
|
||||
xfer->buffer = buffer;
|
||||
xfer->cbfunc.buffer = cbfunc;
|
||||
xfer->tag = tag;
|
||||
xfer->cbdata = cbdata;
|
||||
/* setup the event for the send callback */
|
||||
opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer);
|
||||
opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI);
|
||||
opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1);
|
||||
|
||||
/* copy the message for the recv */
|
||||
rcv = OBJ_NEW(orte_rml_recv_t);
|
||||
rcv->sender = *peer;
|
||||
rcv->tag = tag;
|
||||
rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(buffer->bytes_used);
|
||||
memcpy(rcv->iov.iov_base, buffer->base_ptr, buffer->bytes_used);
|
||||
rcv->iov.iov_len = buffer->bytes_used;
|
||||
/* post the message for receipt - since the send callback was posted
|
||||
* first and has the same priority, it will execute first
|
||||
*/
|
||||
ORTE_RML_ACTIVATE_MESSAGE(rcv);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
snd = OBJ_NEW(orte_rml_send_t);
|
||||
snd->dst = *peer;
|
||||
snd->origin = *ORTE_PROC_MY_NAME;
|
||||
snd->tag = tag;
|
||||
snd->buffer = buffer;
|
||||
snd->cbfunc.buffer = cbfunc;
|
||||
snd->cbdata = cbdata;
|
||||
snd->routed = strdup(mod->routed);
|
||||
|
||||
/* activate the OOB send state */
|
||||
ORTE_OOB_SEND(snd);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -74,6 +74,11 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(jdata->jobid));
|
||||
|
||||
/* if this job has no local procs, then no need to register them */
|
||||
if (0 == jdata->num_local_procs) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* setup the info list */
|
||||
info = OBJ_NEW(opal_list_t);
|
||||
uid = geteuid();
|
||||
|
@ -12,7 +12,7 @@
|
||||
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -168,9 +168,9 @@ int orte_dt_unpack_job(opal_buffer_t *buffer, void *dest,
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if the map is NULL, then we din't pack it as there was
|
||||
/* if the map is NULL, then we didn't pack it as there was
|
||||
* nothing to pack. Instead, we packed a flag to indicate whether or not
|
||||
* the map is included */
|
||||
* the map is included */
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss_unpack_buffer(buffer,
|
||||
&j, &n, ORTE_STD_CNTR))) {
|
||||
@ -204,6 +204,8 @@ int orte_dt_unpack_job(opal_buffer_t *buffer, void *dest,
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* mark the map as uninitialized as we don't pack the node map */
|
||||
ORTE_FLAG_UNSET(jobs[i], ORTE_JOB_FLAG_MAP_INITIALIZED);
|
||||
|
||||
/* unpack the attributes */
|
||||
n=1;
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved
|
||||
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2016 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -89,7 +89,7 @@ typedef uint16_t orte_job_flags_t;
|
||||
#define ORTE_JOB_FLAG_RESTART 0x0200 //
|
||||
#define ORTE_JOB_FLAG_PROCS_MIGRATING 0x0400 // some procs in job are migrating from one node to another
|
||||
#define ORTE_JOB_FLAG_OVERSUBSCRIBED 0x0800 // at least one node in the job is oversubscribed
|
||||
|
||||
#define ORTE_JOB_FLAG_MAP_INITIALIZED 0x1000 // nodes have been assigned to this job map
|
||||
|
||||
/*** JOB ATTRIBUTE KEYS ***/
|
||||
#define ORTE_JOB_START_KEY ORTE_NODE_MAX_KEY
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user