We do have to track the origin of messages sent over usock as the daemon does route them back down, and we need to get the "sender" info correct. Also do a better job of dealing with simultaneous connections to avoid binding to a used socket.
Refs trac:4280 This commit was SVN r30781. The following Trac tickets were found above: Ticket 4280 --> https://svn.open-mpi.org/trac/ompi/ticket/4280
Этот коммит содержится в:
родитель
d35019ba19
Коммит
5520d6971b
@ -521,7 +521,7 @@ static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod,
|
|||||||
}
|
}
|
||||||
|
|
||||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s connect-ack sent to socket %d",
|
"%s blocking send complete to socket %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd);
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd);
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
|
@ -342,123 +342,35 @@ static void send_nb(orte_rml_send_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Handle probe
|
* Event callback when there is data available on the registered
|
||||||
|
* socket to recv. This is called for the listen sockets to accept an
|
||||||
|
* incoming connection, on new sockets trying to complete the software
|
||||||
|
* connection process, and for probes. Data on an established
|
||||||
|
* connection is handled elsewhere.
|
||||||
*/
|
*/
|
||||||
static void recv_probe(int sd, mca_oob_usock_hdr_t* hdr)
|
static void recv_handler(int sd, short flags, void *cbdata)
|
||||||
{
|
{
|
||||||
unsigned char* ptr = (unsigned char*)hdr;
|
mca_oob_usock_conn_op_t *op = (mca_oob_usock_conn_op_t*)cbdata;
|
||||||
size_t cnt = 0;
|
mca_oob_usock_hdr_t hdr;
|
||||||
|
mca_oob_usock_peer_t *peer;
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
||||||
"%s:usock:recv:probe called",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
|
|
||||||
hdr->type = MCA_OOB_USOCK_PROBE;
|
|
||||||
hdr->dst = *ORTE_PROC_MY_NAME;
|
|
||||||
|
|
||||||
while (cnt < sizeof(mca_oob_usock_hdr_t)) {
|
|
||||||
int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_usock_hdr_t)-cnt, 0);
|
|
||||||
if (retval < 0) {
|
|
||||||
if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
|
|
||||||
opal_output(0, "%s-%s mca_oob_usock_peer_recv_probe: send() failed: %s (%d)\n",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&(hdr->dst)),
|
|
||||||
strerror(opal_socket_errno),
|
|
||||||
opal_socket_errno);
|
|
||||||
CLOSE_THE_SOCKET(sd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
cnt += retval;
|
|
||||||
}
|
|
||||||
CLOSE_THE_SOCKET(sd);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Complete the OOB-level handshake to establish a connection with
|
|
||||||
* another peer. Called when the remote peer replies with his process
|
|
||||||
* identifier. Used in both the threaded and event listen modes.
|
|
||||||
*/
|
|
||||||
static void recv_connect(int sd, mca_oob_usock_hdr_t* hdr)
|
|
||||||
{
|
|
||||||
mca_oob_usock_peer_t* peer;
|
|
||||||
int flags;
|
|
||||||
int cmpval;
|
|
||||||
uint64_t *ui64;
|
uint64_t *ui64;
|
||||||
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s:usock:recv:connect called",
|
"%s:usock:recv:handler called",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||||
|
|
||||||
/* check for invalid name - if this is true, then we have an error
|
/* get the handshake */
|
||||||
*/
|
if (ORTE_SUCCESS != mca_oob_usock_peer_recv_connect_ack(NULL, sd, &hdr)) {
|
||||||
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->dst, ORTE_NAME_INVALID);
|
goto cleanup;
|
||||||
if (cmpval == OPAL_EQUAL) {
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
/* finish processing ident */
|
||||||
"%s mca_oob_usock_recv_connect: processing connection from %s for socket %d",
|
if (MCA_OOB_USOCK_IDENT == hdr.type) {
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
if (NULL == (peer = mca_oob_usock_peer_lookup(&hdr.origin))) {
|
||||||
ORTE_NAME_PRINT(&hdr->dst), sd);
|
/* should never happen */
|
||||||
|
mca_oob_usock_peer_close(peer);
|
||||||
/* lookup the corresponding process */
|
goto cleanup;
|
||||||
peer = mca_oob_usock_peer_lookup(&hdr->dst);
|
|
||||||
if (NULL == peer) {
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
||||||
"%s mca_oob_usock_recv_connect: connection from new peer %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&hdr->dst));
|
|
||||||
peer = OBJ_NEW(mca_oob_usock_peer_t);
|
|
||||||
peer->name = hdr->dst;
|
|
||||||
peer->state = MCA_OOB_USOCK_ACCEPTING;
|
|
||||||
ui64 = (uint64_t*)(&peer->name);
|
|
||||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), peer)) {
|
|
||||||
OBJ_RELEASE(peer);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
/* check for a race condition - if I was in the process of
|
|
||||||
* creating a connection to the peer, or have already established
|
|
||||||
* such a connection, then we need to reject this connection. We will
|
|
||||||
* let the higher ranked process retry - if I'm the lower ranked
|
|
||||||
* process, I'll simply defer until I receive the request
|
|
||||||
*/
|
|
||||||
if (MCA_OOB_USOCK_CONNECTED == peer->state ||
|
|
||||||
MCA_OOB_USOCK_CONNECTING == peer->state ||
|
|
||||||
MCA_OOB_USOCK_CONNECT_ACK == peer->state) {
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
||||||
"%s SIMUL CONNECTION WITH %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&hdr->dst));
|
|
||||||
if (peer->recv_ev_active) {
|
|
||||||
opal_event_del(&peer->recv_event);
|
|
||||||
peer->recv_ev_active = false;
|
|
||||||
}
|
|
||||||
if (peer->send_ev_active) {
|
|
||||||
opal_event_del(&peer->send_event);
|
|
||||||
peer->send_ev_active = false;
|
|
||||||
}
|
|
||||||
if (0 < peer->sd) {
|
|
||||||
CLOSE_THE_SOCKET(peer->sd);
|
|
||||||
peer->sd = -1;
|
|
||||||
}
|
|
||||||
CLOSE_THE_SOCKET(sd);
|
|
||||||
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->dst, ORTE_PROC_MY_NAME);
|
|
||||||
if (OPAL_VALUE1_GREATER == cmpval) {
|
|
||||||
/* force the other end to retry the connection */
|
|
||||||
peer->state = MCA_OOB_USOCK_UNCONNECTED;
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
/* retry the connection */
|
|
||||||
peer->state = MCA_OOB_USOCK_CONNECTING;
|
|
||||||
ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* set socket up to be non-blocking */
|
/* set socket up to be non-blocking */
|
||||||
if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
|
if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
|
||||||
opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_GETFL) failed: %s (%d)",
|
opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_GETFL) failed: %s (%d)",
|
||||||
@ -486,67 +398,6 @@ static void recv_connect(int sd, mca_oob_usock_hdr_t* hdr)
|
|||||||
opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), NULL);
|
opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), NULL);
|
||||||
OBJ_RELEASE(peer);
|
OBJ_RELEASE(peer);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Event callback when there is data available on the registered
|
|
||||||
* socket to recv. This is called for the listen sockets to accept an
|
|
||||||
* incoming connection, on new sockets trying to complete the software
|
|
||||||
* connection process, and for probes. Data on an established
|
|
||||||
* connection is handled elsewhere.
|
|
||||||
*/
|
|
||||||
static void recv_handler(int sd, short flags, void *cbdata)
|
|
||||||
{
|
|
||||||
mca_oob_usock_conn_op_t *op = (mca_oob_usock_conn_op_t*)cbdata;
|
|
||||||
mca_oob_usock_hdr_t hdr;
|
|
||||||
int rc;
|
|
||||||
size_t cnt;
|
|
||||||
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
||||||
"%s:usock:recv:handler called",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
|
|
||||||
/* ensure all is zero'd */
|
|
||||||
memset(&hdr, 0, sizeof(hdr));
|
|
||||||
|
|
||||||
/* recv the process identifier */
|
|
||||||
cnt = 0;
|
|
||||||
while (cnt < sizeof(hdr)) {
|
|
||||||
rc = recv(sd, (char *)&hdr, sizeof(hdr), 0);
|
|
||||||
if (0 == rc) {
|
|
||||||
if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
|
|
||||||
opal_output(0, "%s mca_oob_usock_recv_handler: peer closed connection",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
||||||
}
|
|
||||||
CLOSE_THE_SOCKET(sd);
|
|
||||||
goto cleanup;
|
|
||||||
} else if (rc < 0) {
|
|
||||||
if (opal_socket_errno != EINTR &&
|
|
||||||
opal_socket_errno != EAGAIN &&
|
|
||||||
opal_socket_errno != EWOULDBLOCK) {
|
|
||||||
opal_output(0, "%s mca_oob_usock_recv_handler: recv() failed: %s (%d)\n",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
|
|
||||||
CLOSE_THE_SOCKET(sd);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
cnt += rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* dispatch based on message type */
|
|
||||||
switch (hdr.type) {
|
|
||||||
case MCA_OOB_USOCK_PROBE:
|
|
||||||
recv_probe(sd, &hdr);
|
|
||||||
break;
|
|
||||||
case MCA_OOB_USOCK_IDENT:
|
|
||||||
recv_connect(sd, &hdr);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
opal_output(0, "%s mca_oob_usock_recv_handler: invalid message type: %d\n",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hdr.type);
|
|
||||||
CLOSE_THE_SOCKET(sd);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
|
@ -51,6 +51,7 @@
|
|||||||
#include "opal_stdint.h"
|
#include "opal_stdint.h"
|
||||||
#include "opal/mca/backtrace/backtrace.h"
|
#include "opal/mca/backtrace/backtrace.h"
|
||||||
#include "opal/mca/base/mca_base_var.h"
|
#include "opal/mca/base/mca_base_var.h"
|
||||||
|
#include "opal/mca/sec/sec.h"
|
||||||
#include "opal/util/output.h"
|
#include "opal/util/output.h"
|
||||||
#include "opal/util/net.h"
|
#include "opal/util/net.h"
|
||||||
#include "opal/util/error.h"
|
#include "opal/util/error.h"
|
||||||
@ -73,9 +74,9 @@
|
|||||||
static void usock_peer_event_init(mca_oob_usock_peer_t* peer);
|
static void usock_peer_event_init(mca_oob_usock_peer_t* peer);
|
||||||
static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer);
|
static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer);
|
||||||
static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer,
|
static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer,
|
||||||
void* data, size_t size);
|
int sd, void* data, size_t size);
|
||||||
static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
|
static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
|
||||||
void* data, size_t size);
|
int sd, void* data, size_t size);
|
||||||
static void usock_peer_connected(mca_oob_usock_peer_t* peer);
|
static void usock_peer_connected(mca_oob_usock_peer_t* peer);
|
||||||
|
|
||||||
static int usock_peer_create_socket(mca_oob_usock_peer_t* peer)
|
static int usock_peer_create_socket(mca_oob_usock_peer_t* peer)
|
||||||
@ -276,7 +277,11 @@ void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata)
|
|||||||
|
|
||||||
static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer)
|
static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer)
|
||||||
{
|
{
|
||||||
|
char *msg;
|
||||||
mca_oob_usock_hdr_t hdr;
|
mca_oob_usock_hdr_t hdr;
|
||||||
|
int rc;
|
||||||
|
size_t sdsize;
|
||||||
|
opal_sec_cred_t *cred;
|
||||||
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
"%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||||
@ -284,11 +289,34 @@ static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer)
|
|||||||
/* send a handshake that includes our process identifier
|
/* send a handshake that includes our process identifier
|
||||||
* to ensure we are talking to another OMPI process
|
* to ensure we are talking to another OMPI process
|
||||||
*/
|
*/
|
||||||
hdr.dst = *ORTE_PROC_MY_NAME;
|
hdr.origin = *ORTE_PROC_MY_NAME;
|
||||||
|
hdr.dst = peer->name;
|
||||||
hdr.type = MCA_OOB_USOCK_IDENT;
|
hdr.type = MCA_OOB_USOCK_IDENT;
|
||||||
hdr.tag = 0;
|
hdr.tag = 0;
|
||||||
hdr.nbytes = 0;
|
|
||||||
if (ORTE_SUCCESS != usock_peer_send_blocking(peer, &hdr, sizeof(hdr))) {
|
/* get our security credential*/
|
||||||
|
if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential((opal_identifier_t*)ORTE_PROC_MY_NAME, &cred))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* set the number of bytes to be read beyond the header */
|
||||||
|
hdr.nbytes = strlen(orte_version_string) + 1 + cred->size;
|
||||||
|
|
||||||
|
/* create a space for our message */
|
||||||
|
sdsize = (sizeof(hdr) + strlen(orte_version_string) + 1 + cred->size);
|
||||||
|
if (NULL == (msg = (char*)malloc(sdsize))) {
|
||||||
|
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
memset(msg, 0, sdsize);
|
||||||
|
|
||||||
|
/* load the message */
|
||||||
|
memcpy(msg, &hdr, sizeof(hdr));
|
||||||
|
memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string));
|
||||||
|
memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred->credential, cred->size);
|
||||||
|
|
||||||
|
|
||||||
|
if (ORTE_SUCCESS != usock_peer_send_blocking(peer, peer->sd, msg, sdsize)) {
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_UNREACH);
|
ORTE_ERROR_LOG(ORTE_ERR_UNREACH);
|
||||||
return ORTE_ERR_UNREACH;
|
return ORTE_ERR_UNREACH;
|
||||||
}
|
}
|
||||||
@ -413,24 +441,23 @@ void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t *peer)
|
|||||||
* information that identifies the peers endpoint.
|
* information that identifies the peers endpoint.
|
||||||
*/
|
*/
|
||||||
static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer,
|
static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer,
|
||||||
void* data, size_t size)
|
int sd, void* data, size_t size)
|
||||||
{
|
{
|
||||||
unsigned char* ptr = (unsigned char*)data;
|
unsigned char* ptr = (unsigned char*)data;
|
||||||
size_t cnt = 0;
|
size_t cnt = 0;
|
||||||
int retval;
|
int retval;
|
||||||
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s sending connect-ack to %s",
|
"%s send blocking of %"PRIsize_t" bytes to socket %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&(peer->name)));
|
size, sd);
|
||||||
|
|
||||||
while (cnt < size) {
|
while (cnt < size) {
|
||||||
retval = send(peer->sd, (char*)ptr+cnt, size-cnt, 0);
|
retval = send(sd, (char*)ptr+cnt, size-cnt, 0);
|
||||||
if (retval < 0) {
|
if (retval < 0) {
|
||||||
if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
|
if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
|
||||||
opal_output(0, "%s usock_peer_send_blocking: send() to %s failed: %s (%d)\n",
|
opal_output(0, "%s usock_peer_send_blocking: send() to socket %d failed: %s (%d)\n",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd,
|
||||||
ORTE_NAME_PRINT(&(peer->name)),
|
|
||||||
strerror(opal_socket_errno),
|
strerror(opal_socket_errno),
|
||||||
opal_socket_errno);
|
opal_socket_errno);
|
||||||
peer->state = MCA_OOB_USOCK_FAILED;
|
peer->state = MCA_OOB_USOCK_FAILED;
|
||||||
@ -443,9 +470,8 @@ static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer,
|
|||||||
}
|
}
|
||||||
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s connect-ack sent to %s",
|
"%s blocking send complete to socket %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd);
|
||||||
ORTE_NAME_PRINT(&(peer->name)));
|
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -455,19 +481,28 @@ static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer,
|
|||||||
* connected socket and verify the expected response. If so, move the
|
* connected socket and verify the expected response. If so, move the
|
||||||
* socket to a connected state.
|
* socket to a connected state.
|
||||||
*/
|
*/
|
||||||
int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer)
|
int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* pr, int sd,
|
||||||
|
mca_oob_usock_hdr_t *dhdr)
|
||||||
{
|
{
|
||||||
|
char *msg;
|
||||||
|
char *version;
|
||||||
|
int rc, cmpval;
|
||||||
|
opal_sec_cred_t creds;
|
||||||
|
mca_oob_usock_peer_t *peer;
|
||||||
mca_oob_usock_hdr_t hdr;
|
mca_oob_usock_hdr_t hdr;
|
||||||
|
uint64_t *ui64;
|
||||||
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
|
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&peer->name), peer->sd);
|
(NULL == pr) ? "UNKNOWN" : ORTE_NAME_PRINT(&pr->name), sd);
|
||||||
|
|
||||||
|
peer = pr;
|
||||||
/* ensure all is zero'd */
|
/* ensure all is zero'd */
|
||||||
memset(&hdr, 0, sizeof(hdr));
|
memset(&hdr, 0, sizeof(mca_oob_usock_hdr_t));
|
||||||
|
|
||||||
if (usock_peer_recv_blocking(peer, &hdr, sizeof(hdr))) {
|
if (usock_peer_recv_blocking(peer, sd, &hdr, sizeof(mca_oob_usock_hdr_t))) {
|
||||||
|
if (NULL != peer) {
|
||||||
/* If the peer state is CONNECT_ACK, then we were waiting for
|
/* If the peer state is CONNECT_ACK, then we were waiting for
|
||||||
* the connection to be ack'd
|
* the connection to be ack'd
|
||||||
*/
|
*/
|
||||||
@ -475,31 +510,182 @@ int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer)
|
|||||||
/* handshake broke down - abort this connection */
|
/* handshake broke down - abort this connection */
|
||||||
opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d",
|
opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&peer->name), peer->sd);
|
ORTE_NAME_PRINT(&peer->name), sd);
|
||||||
mca_oob_usock_peer_close(peer);
|
mca_oob_usock_peer_close(peer);
|
||||||
return ORTE_ERR_UNREACH;
|
return ORTE_ERR_UNREACH;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
/* unable to complete the recv */
|
/* unable to complete the recv */
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
|
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&peer->name), peer->sd);
|
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
|
||||||
return ORTE_ERR_UNREACH;
|
return ORTE_ERR_UNREACH;
|
||||||
}
|
}
|
||||||
|
/* if the requestor wanted the header returned, then do so now */
|
||||||
|
if (NULL != dhdr) {
|
||||||
|
*dhdr = hdr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (MCA_OOB_USOCK_PROBE == hdr.type) {
|
||||||
|
/* send a header back */
|
||||||
|
hdr.type = MCA_OOB_USOCK_PROBE;
|
||||||
|
hdr.dst = hdr.origin;
|
||||||
|
hdr.origin = *ORTE_PROC_MY_NAME;
|
||||||
|
usock_peer_send_blocking(peer, sd, &hdr, sizeof(mca_oob_usock_hdr_t));
|
||||||
|
CLOSE_THE_SOCKET(sd);
|
||||||
|
return ORTE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
if (hdr.type != MCA_OOB_USOCK_IDENT) {
|
if (hdr.type != MCA_OOB_USOCK_IDENT) {
|
||||||
opal_output(0, "usock_peer_recv_connect_ack: invalid header type: %d\n", hdr.type);
|
opal_output(0, "usock_peer_recv_connect_ack: invalid header type: %d\n", hdr.type);
|
||||||
|
if (NULL != peer) {
|
||||||
peer->state = MCA_OOB_USOCK_FAILED;
|
peer->state = MCA_OOB_USOCK_FAILED;
|
||||||
mca_oob_usock_peer_close(peer);
|
mca_oob_usock_peer_close(peer);
|
||||||
|
} else {
|
||||||
|
CLOSE_THE_SOCKET(sd);
|
||||||
|
}
|
||||||
return ORTE_ERR_UNREACH;
|
return ORTE_ERR_UNREACH;
|
||||||
}
|
}
|
||||||
|
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s connect-ack recvd from %s",
|
"%s connect-ack recvd from %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name));
|
||||||
|
|
||||||
|
/* if we don't already have it, get the peer */
|
||||||
|
if (NULL == peer) {
|
||||||
|
peer = mca_oob_usock_peer_lookup(&hdr.origin);
|
||||||
|
if (NULL == peer) {
|
||||||
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
|
"%s mca_oob_usock_recv_connect: connection from new peer",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||||
|
peer = OBJ_NEW(mca_oob_usock_peer_t);
|
||||||
|
peer->name = hdr.origin;
|
||||||
|
peer->state = MCA_OOB_USOCK_ACCEPTING;
|
||||||
|
peer->sd = sd;
|
||||||
|
ui64 = (uint64_t*)(&peer->name);
|
||||||
|
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), peer)) {
|
||||||
|
OBJ_RELEASE(peer);
|
||||||
|
CLOSE_THE_SOCKET(sd);
|
||||||
|
return ORTE_ERR_UNREACH;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/* check for a race condition - if I was in the process of
|
||||||
|
* creating a connection to the peer, or have already established
|
||||||
|
* such a connection, then we need to reject this connection. We will
|
||||||
|
* let the higher ranked process retry - if I'm the lower ranked
|
||||||
|
* process, I'll simply defer until I receive the request
|
||||||
|
*/
|
||||||
|
if (MCA_OOB_USOCK_CONNECTED == peer->state ||
|
||||||
|
MCA_OOB_USOCK_CONNECTING == peer->state ||
|
||||||
|
MCA_OOB_USOCK_CONNECT_ACK == peer->state) {
|
||||||
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
|
"%s SIMUL CONNECTION WITH %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&hdr.origin));
|
||||||
|
if (peer->recv_ev_active) {
|
||||||
|
opal_event_del(&peer->recv_event);
|
||||||
|
peer->recv_ev_active = false;
|
||||||
|
}
|
||||||
|
if (peer->send_ev_active) {
|
||||||
|
opal_event_del(&peer->send_event);
|
||||||
|
peer->send_ev_active = false;
|
||||||
|
}
|
||||||
|
if (0 < peer->sd) {
|
||||||
|
CLOSE_THE_SOCKET(peer->sd);
|
||||||
|
peer->sd = -1;
|
||||||
|
}
|
||||||
|
CLOSE_THE_SOCKET(sd);
|
||||||
|
peer->retries = 0;
|
||||||
|
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr.origin, ORTE_PROC_MY_NAME);
|
||||||
|
if (OPAL_VALUE1_GREATER == cmpval) {
|
||||||
|
/* force the other end to retry the connection */
|
||||||
|
peer->state = MCA_OOB_USOCK_UNCONNECTED;
|
||||||
|
return ORTE_ERR_UNREACH;
|
||||||
|
} else {
|
||||||
|
/* retry the connection */
|
||||||
|
peer->state = MCA_OOB_USOCK_CONNECTING;
|
||||||
|
ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect);
|
||||||
|
return ORTE_ERR_UNREACH;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/* compare the peers name to the expected value */
|
||||||
|
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
|
||||||
|
opal_output(0, "%s usock_peer_recv_connect_ack: "
|
||||||
|
"received unexpected process identifier %s from %s\n",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&(hdr.origin)),
|
||||||
|
ORTE_NAME_PRINT(&(peer->name)));
|
||||||
|
peer->state = MCA_OOB_USOCK_FAILED;
|
||||||
|
mca_oob_usock_peer_close(peer);
|
||||||
|
return ORTE_ERR_UNREACH;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
|
"%s connect-ack header from %s is okay",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&peer->name));
|
ORTE_NAME_PRINT(&peer->name));
|
||||||
|
|
||||||
|
/* get the authentication and version payload */
|
||||||
|
if (NULL == (msg = (char*)malloc(hdr.nbytes))) {
|
||||||
|
peer->state = MCA_OOB_USOCK_FAILED;
|
||||||
|
mca_oob_usock_peer_close(peer);
|
||||||
|
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||||
|
}
|
||||||
|
if (!usock_peer_recv_blocking(peer, sd, msg, hdr.nbytes)) {
|
||||||
|
/* unable to complete the recv */
|
||||||
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
|
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&peer->name), peer->sd);
|
||||||
|
free(msg);
|
||||||
|
return ORTE_ERR_UNREACH;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* check that this is from a matching version */
|
||||||
|
version = (char*)(msg);
|
||||||
|
if (0 != strcmp(version, orte_version_string)) {
|
||||||
|
opal_output(0, "%s usock_peer_recv_connect_ack: "
|
||||||
|
"received different version from %s: %s instead of %s\n",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&(peer->name)),
|
||||||
|
version, orte_version_string);
|
||||||
|
peer->state = MCA_OOB_USOCK_FAILED;
|
||||||
|
mca_oob_usock_peer_close(peer);
|
||||||
|
free(msg);
|
||||||
|
return ORTE_ERR_UNREACH;
|
||||||
|
}
|
||||||
|
|
||||||
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
|
"%s connect-ack version from %s matches ours",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&peer->name));
|
||||||
|
|
||||||
|
/* check security token */
|
||||||
|
creds.credential = (char*)(msg + strlen(version) + 1);
|
||||||
|
creds.size = hdr.nbytes - strlen(version) - 1;
|
||||||
|
if (OPAL_SUCCESS != (rc = opal_sec.authenticate(&creds))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
free(msg);
|
||||||
|
|
||||||
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
|
"%s connect-ack %s authenticated",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&peer->name));
|
||||||
|
|
||||||
|
/* if the requestor wanted the header returned, then they
|
||||||
|
* will complete their processing
|
||||||
|
*/
|
||||||
|
if (NULL != dhdr) {
|
||||||
|
return ORTE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
/* set the peer into the component and OOB-level peer tables to indicate
|
/* set the peer into the component and OOB-level peer tables to indicate
|
||||||
* that we know this peer and we will be handling him
|
* that we know this peer and we will be handling him
|
||||||
*/
|
*/
|
||||||
@ -591,7 +777,7 @@ void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer)
|
|||||||
* information that identifies the peers endpoint.
|
* information that identifies the peers endpoint.
|
||||||
*/
|
*/
|
||||||
static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
|
static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
|
||||||
void* data, size_t size)
|
int sd, void* data, size_t size)
|
||||||
{
|
{
|
||||||
unsigned char* ptr = (unsigned char*)data;
|
unsigned char* ptr = (unsigned char*)data;
|
||||||
size_t cnt = 0;
|
size_t cnt = 0;
|
||||||
@ -599,10 +785,10 @@ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
|
|||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s waiting for connect ack from %s",
|
"%s waiting for connect ack from %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&(peer->name)));
|
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
|
||||||
|
|
||||||
while (cnt < size) {
|
while (cnt < size) {
|
||||||
int retval = recv(peer->sd, (char *)ptr+cnt, size-cnt, 0);
|
int retval = recv(sd, (char *)ptr+cnt, size-cnt, 0);
|
||||||
|
|
||||||
/* remote closed connection */
|
/* remote closed connection */
|
||||||
if (retval == 0) {
|
if (retval == 0) {
|
||||||
@ -610,8 +796,8 @@ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
|
|||||||
"%s-%s usock_peer_recv_blocking: "
|
"%s-%s usock_peer_recv_blocking: "
|
||||||
"peer closed connection: peer state %d",
|
"peer closed connection: peer state %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&(peer->name)),
|
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)),
|
||||||
peer->state);
|
(NULL == peer) ? 0 : peer->state);
|
||||||
mca_oob_usock_peer_close(peer);
|
mca_oob_usock_peer_close(peer);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -640,18 +826,22 @@ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
|
|||||||
"%s connect ack received error %s from %s",
|
"%s connect ack received error %s from %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
strerror(opal_socket_errno),
|
strerror(opal_socket_errno),
|
||||||
ORTE_NAME_PRINT(&(peer->name)));
|
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
opal_output(0,
|
opal_output(0,
|
||||||
"%s usock_peer_recv_blocking: "
|
"%s usock_peer_recv_blocking: "
|
||||||
"recv() failed for %s: %s (%d)\n",
|
"recv() failed for %s: %s (%d)\n",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&(peer->name)),
|
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)),
|
||||||
strerror(opal_socket_errno),
|
strerror(opal_socket_errno),
|
||||||
opal_socket_errno);
|
opal_socket_errno);
|
||||||
|
if (NULL != peer) {
|
||||||
peer->state = MCA_OOB_USOCK_FAILED;
|
peer->state = MCA_OOB_USOCK_FAILED;
|
||||||
mca_oob_usock_peer_close(peer);
|
mca_oob_usock_peer_close(peer);
|
||||||
|
} else {
|
||||||
|
CLOSE_THE_SOCKET(sd);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -663,7 +853,7 @@ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
|
|||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s connect ack received from %s",
|
"%s connect ack received from %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&(peer->name)));
|
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,7 +95,8 @@ ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_try_connect(int fd, short args, voi
|
|||||||
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_dump(mca_oob_usock_peer_t* peer, const char* msg);
|
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_dump(mca_oob_usock_peer_t* peer, const char* msg);
|
||||||
ORTE_MODULE_DECLSPEC bool mca_oob_usock_peer_accept(mca_oob_usock_peer_t* peer);
|
ORTE_MODULE_DECLSPEC bool mca_oob_usock_peer_accept(mca_oob_usock_peer_t* peer);
|
||||||
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t* peer);
|
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t* peer);
|
||||||
ORTE_MODULE_DECLSPEC int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer);
|
ORTE_MODULE_DECLSPEC int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer,
|
||||||
|
int sd, mca_oob_usock_hdr_t *hdr);
|
||||||
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer);
|
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer);
|
||||||
|
|
||||||
#endif /* _MCA_OOB_USOCK_CONNECTION_H_ */
|
#endif /* _MCA_OOB_USOCK_CONNECTION_H_ */
|
||||||
|
@ -40,6 +40,8 @@ typedef enum {
|
|||||||
|
|
||||||
/* header for usock msgs */
|
/* header for usock msgs */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
/* the original sender */
|
||||||
|
orte_process_name_t origin;
|
||||||
/* the intended final recipient */
|
/* the intended final recipient */
|
||||||
orte_process_name_t dst;
|
orte_process_name_t dst;
|
||||||
/* type of message */
|
/* type of message */
|
||||||
|
@ -393,7 +393,7 @@ void mca_oob_usock_recv_handler(int sd, short flags, void *cbdata)
|
|||||||
|
|
||||||
switch (peer->state) {
|
switch (peer->state) {
|
||||||
case MCA_OOB_USOCK_CONNECT_ACK:
|
case MCA_OOB_USOCK_CONNECT_ACK:
|
||||||
if (ORTE_SUCCESS == (rc = mca_oob_usock_peer_recv_connect_ack(peer))) {
|
if (ORTE_SUCCESS == (rc = mca_oob_usock_peer_recv_connect_ack(peer, peer->sd, NULL))) {
|
||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s:usock:recv:handler starting send/recv events",
|
"%s:usock:recv:handler starting send/recv events",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||||
@ -499,7 +499,7 @@ void mca_oob_usock_recv_handler(int sd, short flags, void *cbdata)
|
|||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s RECVD COMPLETE MESSAGE FROM %s OF %d BYTES FOR DEST %s TAG %d",
|
"%s RECVD COMPLETE MESSAGE FROM %s OF %d BYTES FOR DEST %s TAG %d",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_NAME_PRINT(&peer->name),
|
ORTE_NAME_PRINT(&peer->recv_msg->hdr.origin),
|
||||||
(int)peer->recv_msg->hdr.nbytes,
|
(int)peer->recv_msg->hdr.nbytes,
|
||||||
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
|
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
|
||||||
peer->recv_msg->hdr.tag);
|
peer->recv_msg->hdr.tag);
|
||||||
@ -510,18 +510,20 @@ void mca_oob_usock_recv_handler(int sd, short flags, void *cbdata)
|
|||||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||||
"%s DELIVERING TO RML",
|
"%s DELIVERING TO RML",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||||
ORTE_RML_POST_MESSAGE(&peer->name, peer->recv_msg->hdr.tag,
|
ORTE_RML_POST_MESSAGE(&peer->recv_msg->hdr.origin, peer->recv_msg->hdr.tag,
|
||||||
peer->recv_msg->data,
|
peer->recv_msg->data,
|
||||||
peer->recv_msg->hdr.nbytes);
|
peer->recv_msg->hdr.nbytes);
|
||||||
OBJ_RELEASE(peer->recv_msg);
|
OBJ_RELEASE(peer->recv_msg);
|
||||||
} else {
|
} else {
|
||||||
/* no - we don't route things, so we promote this
|
/* no - we don't route things, so we promote this
|
||||||
* back to the OOB and let another transport move
|
* back to the OOB and let another transport move
|
||||||
* it along
|
* it along. If we are a daemon and it is intended
|
||||||
|
* for another of our local procs, it will just come
|
||||||
|
* back to us and be handled then
|
||||||
*/
|
*/
|
||||||
snd = OBJ_NEW(orte_rml_send_t);
|
snd = OBJ_NEW(orte_rml_send_t);
|
||||||
snd->dst = peer->recv_msg->hdr.dst;
|
snd->dst = peer->recv_msg->hdr.dst;
|
||||||
snd->origin = peer->name;
|
snd->origin = peer->recv_msg->hdr.origin;
|
||||||
snd->tag = peer->recv_msg->hdr.tag;
|
snd->tag = peer->recv_msg->hdr.tag;
|
||||||
snd->data = peer->recv_msg->data;
|
snd->data = peer->recv_msg->data;
|
||||||
snd->count = peer->recv_msg->hdr.nbytes;
|
snd->count = peer->recv_msg->hdr.nbytes;
|
||||||
|
@ -119,6 +119,7 @@ OBJ_CLASS_DECLARATION(mca_oob_usock_recv_t);
|
|||||||
ORTE_NAME_PRINT(&((m)->dst))); \
|
ORTE_NAME_PRINT(&((m)->dst))); \
|
||||||
msg = OBJ_NEW(mca_oob_usock_send_t); \
|
msg = OBJ_NEW(mca_oob_usock_send_t); \
|
||||||
/* setup the header */ \
|
/* setup the header */ \
|
||||||
|
msg->hdr.origin = (m)->origin; \
|
||||||
msg->hdr.dst = (m)->dst; \
|
msg->hdr.dst = (m)->dst; \
|
||||||
msg->hdr.type = MCA_OOB_USOCK_USER; \
|
msg->hdr.type = MCA_OOB_USOCK_USER; \
|
||||||
msg->hdr.tag = (m)->tag; \
|
msg->hdr.tag = (m)->tag; \
|
||||||
@ -159,6 +160,7 @@ OBJ_CLASS_DECLARATION(mca_oob_usock_recv_t);
|
|||||||
ORTE_NAME_PRINT(&((m)->dst))); \
|
ORTE_NAME_PRINT(&((m)->dst))); \
|
||||||
msg = OBJ_NEW(mca_oob_usock_send_t); \
|
msg = OBJ_NEW(mca_oob_usock_send_t); \
|
||||||
/* setup the header */ \
|
/* setup the header */ \
|
||||||
|
msg->hdr.origin = (m)->origin; \
|
||||||
msg->hdr.dst = (m)->dst; \
|
msg->hdr.dst = (m)->dst; \
|
||||||
msg->hdr.type = MCA_OOB_USOCK_USER; \
|
msg->hdr.type = MCA_OOB_USOCK_USER; \
|
||||||
msg->hdr.tag = (m)->tag; \
|
msg->hdr.tag = (m)->tag; \
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user