1
1

As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:

*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***

Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.

***************************************************************************************

I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.

The code is in  https://bitbucket.org/rhc/ompi-oob2


WHAT:    Rewrite of ORTE OOB

WHY:       Support asynchronous progress and a host of other features

WHEN:    Wed, August 21

SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:

* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)

* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.

* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients

* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort

* only one transport (i.e., component) can be "active"


The revised OOB resolves these problems:

* async progress is used for all application processes, with the progress thread blocking in the event library

* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")

* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.

* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.

* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object

* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions

* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel

* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport

* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active

* all blocking send/recv APIs have been removed. Everything operates asynchronously.


KNOWN LIMITATIONS:

* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline

* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker

* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways

* obviously, not every error path has been tested nor necessarily covered

* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.

* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways

* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC

This commit was SVN r29058.
This commit is contained in:
Ralph Castain 2013-08-22 16:37:40 +00:00
parent 63d10d2d0d
commit a200e4f865
187 changed files with 8647 additions and 8906 deletions

View File

@ -88,23 +88,6 @@ AC_DEFINE_UNQUOTED([ORTE_ENABLE_HEARTBEAT],
[$orte_want_heartbeats],
[Whether we want daemon heartbeat monitoring enabled])
#
# Do we want a separate orte progress thread?
AC_MSG_CHECKING([if want orte progress threads])
AC_ARG_ENABLE([orte-progress-threads],
[AC_HELP_STRING([--enable-orte-progress-threads],
[Enable orte progress thread - for experiment by developers only! (default: disabled)])])
if test "$enable_orte_progress_threads" = "yes"; then
AC_MSG_RESULT([yes])
orte_enable_progress_threads=1
else
AC_MSG_RESULT([no])
orte_enable_progress_threads=0
fi
AC_DEFINE_UNQUOTED([ORTE_ENABLE_PROGRESS_THREADS],
[$orte_enable_progress_threads],
[Whether we want orte progress threads enabled])
AC_MSG_CHECKING([if want orte static ports])
AC_ARG_ENABLE([orte-static-ports],
[AC_HELP_STRING([--enable-orte-static-ports],

View File

@ -42,7 +42,7 @@ struct mca_btl_tcp2_addr_t {
machines (read: byte order), so use network byte order
for everything and don't add padding
*/
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
struct in6_addr addr_inet; /**< IPv4/IPv6 listen address > */
#else
/* Bug, FIXME: needs testing */
@ -65,7 +65,7 @@ struct mca_btl_tcp2_addr_t {
typedef struct mca_btl_tcp2_addr_t mca_btl_tcp2_addr_t;
#define MCA_BTL_TCP_AF_INET 0
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
# define MCA_BTL_TCP_AF_INET6 1
#endif

View File

@ -71,7 +71,7 @@ struct mca_btl_tcp2_component_t {
unsigned short tcp_listen_port; /**< IPv4 listen port */
int32_t tcp_port_min; /**< IPv4 minimum port */
int32_t tcp_port_range; /**< IPv4 port range */
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
opal_event_t tcp6_recv_event; /**< recv event for IPv6 listen socket */
int tcp6_listen_sd; /**< IPv6 listen socket for incoming connection requests */
unsigned short tcp6_listen_port; /**< IPv6 listen port */

View File

@ -42,7 +42,7 @@ struct mca_btl_tcp2_addr_t {
machines (read: byte order), so use network byte order
for everything and don't add padding
*/
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
struct in6_addr addr_inet; /**< IPv4/IPv6 listen address > */
#else
/* Bug, FIXME: needs testing */
@ -65,7 +65,7 @@ struct mca_btl_tcp2_addr_t {
typedef struct mca_btl_tcp2_addr_t mca_btl_tcp2_addr_t;
#define MCA_BTL_TCP_AF_INET 0
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
# define MCA_BTL_TCP_AF_INET6 1
#endif

View File

@ -40,7 +40,7 @@
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
# ifdef HAVE_NETDB_H
# include <netdb.h>
# endif
@ -175,7 +175,7 @@ int mca_btl_tcp2_component_open(void)
/* initialize state */
mca_btl_tcp2_component.tcp_listen_sd = -1;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
mca_btl_tcp2_component.tcp6_listen_sd = -1;
#endif
mca_btl_tcp2_component.tcp_num_btls=0;
@ -234,7 +234,7 @@ int mca_btl_tcp2_component_open(void)
mca_btl_tcp2_param_register_int( "port_range_v4", message,
(0x1 << 16) - mca_btl_tcp2_component.tcp_port_min - 1);
free(message);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
mca_btl_tcp2_component.tcp6_port_min =
mca_btl_tcp2_param_register_int( "port_min_v6",
"The minimum port where the TCP BTL will try to bind (default 1024)", 1024 );
@ -304,7 +304,7 @@ int mca_btl_tcp2_component_close(void)
CLOSE_THE_SOCKET(mca_btl_tcp2_component.tcp_listen_sd);
mca_btl_tcp2_component.tcp_listen_sd = -1;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (mca_btl_tcp2_component.tcp6_listen_sd >= 0) {
opal_event_del(&mca_btl_tcp2_component.tcp6_recv_event);
CLOSE_THE_SOCKET(mca_btl_tcp2_component.tcp6_listen_sd);
@ -642,7 +642,7 @@ static int mca_btl_tcp2_component_create_listen(uint16_t af_family)
mca_btl_tcp2_set_socket_options(sd);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
{
struct addrinfo hints, *res = NULL;
int error;
@ -698,19 +698,19 @@ static int mca_btl_tcp2_component_create_listen(uint16_t af_family)
range = mca_btl_tcp2_component.tcp_port_range;
port = mca_btl_tcp2_component.tcp_port_min;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
range = mca_btl_tcp2_component.tcp6_port_range;
port = mca_btl_tcp2_component.tcp6_port_min;
}
#endif /* OPAL_WANT_IPV6 */
#endif /* OPAL_ENABLE_IPV6 */
for( index = 0; index < range; index++ ) {
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
((struct sockaddr_in6*) &inaddr)->sin6_port = htons(port + index);
#else
((struct sockaddr_in*) &inaddr)->sin_port = htons(port + index);
#endif /* OPAL_WANT_IPV6 */
#endif /* OPAL_ENABLE_IPV6 */
if(bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
continue;
@ -727,13 +727,13 @@ static int mca_btl_tcp2_component_create_listen(uint16_t af_family)
mca_btl_tcp2_component.tcp_port_min,
mca_btl_tcp2_component.tcp_port_min + range));
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
BTL_ERROR(("bind6() failed: no port available in the range [%d..%d]",
mca_btl_tcp2_component.tcp6_port_min,
mca_btl_tcp2_component.tcp6_port_min + range));
}
#endif /* OPAL_WANT_IPV6 */
#endif /* OPAL_ENABLE_IPV6 */
CLOSE_THE_SOCKET(sd);
return OMPI_ERROR;
}
@ -750,7 +750,7 @@ static int mca_btl_tcp2_component_create_listen(uint16_t af_family)
mca_btl_tcp2_component.tcp_listen_port = ((struct sockaddr_in*) &inaddr)->sin_port;
mca_btl_tcp2_component.tcp_listen_sd = sd;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
mca_btl_tcp2_component.tcp6_listen_port = ((struct sockaddr_in6*) &inaddr)->sin6_port;
mca_btl_tcp2_component.tcp6_listen_sd = sd;
@ -790,7 +790,7 @@ static int mca_btl_tcp2_component_create_listen(uint16_t af_family)
0 );
opal_event_add(&mca_btl_tcp2_component.tcp_recv_event, 0);
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
opal_event_set(opal_event_base, &mca_btl_tcp2_component.tcp6_recv_event,
mca_btl_tcp2_component.tcp6_listen_sd,
@ -865,7 +865,7 @@ static int mca_btl_tcp2_component_exchange(void)
opal_ifindextokindex (index);
current_addr++;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if ((AF_INET6 == my_ss.ss_family) &&
(6 != mca_btl_tcp2_component.tcp_disable_family)) {
memcpy(&addrs[current_addr].addr_inet,
@ -947,7 +947,7 @@ mca_btl_base_module_t** mca_btl_tcp2_component_init(int *num_btl_modules,
if(OMPI_SUCCESS != (ret = mca_btl_tcp2_component_create_listen(AF_INET) )) {
return 0;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if((ret = mca_btl_tcp2_component_create_listen(AF_INET6)) != OMPI_SUCCESS) {
if (!(OMPI_ERR_IN_ERRNO == OPAL_SOS_GET_ERROR_CODE(ret) &&
EAFNOSUPPORT == opal_socket_errno)) {
@ -993,7 +993,7 @@ static void mca_btl_tcp2_component_accept_handler( int incoming_sd,
void* unused )
{
while(true) {
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
struct sockaddr_in6 addr;
#else
struct sockaddr_in addr;

View File

@ -129,7 +129,7 @@ static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, co
char src[64];
char dst[64];
int sndbuf,rcvbuf,nodelay,flags;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
struct sockaddr_storage inaddr;
#else
struct sockaddr_in inaddr;
@ -138,7 +138,7 @@ static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, co
opal_socklen_t addrlen = sizeof(inaddr);
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
{
char *address;
address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
@ -150,7 +150,7 @@ static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, co
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
#endif
getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
{
char *address;
address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
@ -536,7 +536,7 @@ static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endp
uint16_t af_family = AF_INET;
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
af_family = AF_INET6;
addrlen = sizeof (struct sockaddr_in6);

View File

@ -158,7 +158,7 @@ mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc)
if (MCA_BTL_TCP_AF_INET == btl_proc->proc_addrs[i].addr_family) {
btl_proc->proc_addrs[i].addr_family = AF_INET;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (MCA_BTL_TCP_AF_INET6 == btl_proc->proc_addrs[i].addr_family) {
btl_proc->proc_addrs[i].addr_family = AF_INET6;
}
@ -740,7 +740,7 @@ bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* ad
continue;
}
break;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
case AF_INET6:
if( memcmp( &btl_endpoint->endpoint_addr->addr_inet,
&(((struct sockaddr_in6*)addr)->sin6_addr),
@ -777,7 +777,7 @@ bool mca_btl_tcp2_proc_tosocks(mca_btl_tcp2_addr_t* proc_addr,
&proc_addr->addr_inet, sizeof(struct in_addr));
((struct sockaddr_in*)output)->sin_port = proc_addr->addr_port;
break;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
case AF_INET6:
{
struct sockaddr_in6* inaddr = (struct sockaddr_in6*)output;

View File

@ -10,6 +10,7 @@
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2013 Intel Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -20,6 +21,8 @@
use File::Find;
use File::Basename;
use File::Compare;
use Text::Diff;
use Getopt::Long;
if (scalar(@ARGV) != 2) {
print "Usage: search_compare.pl src_dir target_dir\n";
@ -34,6 +37,25 @@ my @src_tree = ();
my @tgt_tree = ();
my $flag;
my $ok = Getopt::Long::GetOptions("diff" => \$diff_arg,
"debug|d" => \$debug_arg,
"help|h" => \$help_arg,
"ignore=s" => \$exclude_arg,
);
if (!$ok || $help_arg) {
print "Invalid command line argument.\n\n"
if (!$ok);
print "Options:
--diff | -diff Output a diff of files that were modified - turns
off the output of files added/deleted
--debug | -d Output lots of debug information
--help | -h This help list
--ignore | -ignore Name of file containing typical ignore list - e.g.,
.hgignore or .gitignore\n";
my_exit($ok ? 0 : 1);
}
sub construct {
# don't process directories or links, and dont' recurse down
# "special" directories
@ -84,6 +106,7 @@ my $tgt_file;
my @modified = ();
my @src_pared = ();
my $i;
my $d;
foreach $src (@src_tree) {
# strip the leading elements of the path that was given to us
$src_file = substr($src, $len_src_dir);
@ -97,42 +120,46 @@ foreach $src (@src_tree) {
# file has been found - ignore it
$found = 1;
if (compare($src, $tgt) != 0) {
push(@modified, $src);
$d = diff $src, $tgt;
print "Index: $tgt\n";
print "===================================================================\n";
print "$d\n";
# push(@modified, $src);
}
# remove this file from the target tree as it has been found
# splice @tgt_tree, $i, 1;
break;
}
}
if ($found == 0) {
print "Add: " . $src . "\n";
} else {
push(@src_pared, $src);
}
# if ($found == 0) {
# print "Add: " . $src . "\n";
## } else {
# push(@src_pared, $src);
# }
}
print "\n";
# print a list of files in the target tree that need to be deleted
foreach $tgt (@tgt_tree) {
$found = 0;
$tgt_file = substr($tgt, $len_tgt_dir);
foreach $src (@src_pared) {
$src_file = substr($src, $len_src_dir);
if ($src_file eq $tgt_file) {
# file has been found - ignore it
$found = 1;
break;
}
}
if ($found == 0) {
print "Delete: " . $tgt . "\n";
}
}
#foreach $tgt (@tgt_tree) {
# $found = 0;
# $tgt_file = substr($tgt, $len_tgt_dir);
# foreach $src (@src_pared) {
# $src_file = substr($src, $len_src_dir);
# if ($src_file eq $tgt_file) {
# # file has been found - ignore it
# $found = 1;
# break;
# }
# }
# if ($found == 0) {
# print "Delete: " . $tgt . "\n";
# }
#}
print "\n";
#print "\n";
# print a list of files that have been modified
foreach $tgt (@modified) {
print "Modified: " . $tgt . "\n";
}
#foreach $tgt (@modified) {
# print "Modified: " . $tgt . "\n";
#}

View File

@ -14,9 +14,10 @@
* Copyright (c) 2007 Voltaire All rights reserved.
* Copyright (c) 2006-2010 University of Houston. All rights reserved.
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, LLC. All rights
* Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -762,6 +763,23 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
return (rc);
}
typedef struct {
opal_buffer_t buf;
bool active;
} comm_cid_return_t;
static void comm_cid_recv(int status,
ompi_process_name_t* peer,
opal_buffer_t* buffer,
ompi_rml_tag_t tag,
void* cbdata)
{
comm_cid_return_t *rcid = (comm_cid_return_t*)cbdata;
opal_dss.copy_payload(&rcid->buf, buffer);
rcid->active = false;
}
/* Arguments not used in this implementation:
* - bridgecomm
*
@ -782,6 +800,7 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
int local_leader, local_rank;
ompi_process_name_t *remote_leader=NULL;
int32_t size_count;
comm_cid_return_t rcid;
local_leader = (*((int*)lleader));
remote_leader = (ompi_process_name_t*)rleader;
@ -809,37 +828,46 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
if (local_rank == local_leader ) {
opal_buffer_t *sbuf;
opal_buffer_t *rbuf;
sbuf = OBJ_NEW(opal_buffer_t);
rbuf = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(sbuf, tmpbuf, (int32_t)count, OPAL_INT))) {
goto exit;
}
if ( send_first ) {
if (0 > (rc = ompi_rte_send_buffer(remote_leader, sbuf, OMPI_RML_TAG_COMM_CID_INTRA, 0))) {
if (0 > (rc = ompi_rte_send_buffer_nb(remote_leader, sbuf,
OMPI_RML_TAG_COMM_CID_INTRA,
ompi_rte_send_cbfunc, NULL))) {
goto exit;
}
if (0 > (rc = ompi_rte_recv_buffer(remote_leader, rbuf, OMPI_RML_TAG_COMM_CID_INTRA, 0))) {
goto exit;
OBJ_CONSTRUCT(&rcid.buf, opal_buffer_t);
rcid.active = true;
ompi_rte_recv_buffer_nb(remote_leader, OMPI_RML_TAG_COMM_CID_INTRA,
OMPI_RML_NON_PERSISTENT, comm_cid_recv, &rcid);
while (rcid.active) {
opal_progress();
}
}
else {
if (0 > (rc = ompi_rte_recv_buffer(remote_leader, rbuf, OMPI_RML_TAG_COMM_CID_INTRA, 0))) {
goto exit;
OBJ_CONSTRUCT(&rcid.buf, opal_buffer_t);
rcid.active = true;
ompi_rte_recv_buffer_nb(remote_leader, OMPI_RML_TAG_COMM_CID_INTRA,
OMPI_RML_NON_PERSISTENT, comm_cid_recv, &rcid);
while (rcid.active) {
opal_progress();
}
if (0 > (rc = ompi_rte_send_buffer(remote_leader, sbuf, OMPI_RML_TAG_COMM_CID_INTRA, 0))) {
if (0 > (rc = ompi_rte_send_buffer_nb(remote_leader, sbuf,
OMPI_RML_TAG_COMM_CID_INTRA,
ompi_rte_send_cbfunc, NULL))) {
goto exit;
}
}
if (OPAL_SUCCESS != (rc = opal_dss.unpack(rbuf, outbuf, &size_count, OPAL_INT))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&rcid.buf, outbuf, &size_count, OPAL_INT))) {
goto exit;
}
OBJ_RELEASE(sbuf);
OBJ_RELEASE(rbuf);
OBJ_DESTRUCT(&rcid.buf);
count = (int)size_count;
if ( &ompi_mpi_op_max.op == op ) {

View File

@ -121,8 +121,6 @@ static void oob_component_register(void)
static int oob_component_query(mca_btl_openib_module_t *btl,
ompi_btl_openib_connect_base_module_t **cpc)
{
int rc;
/* If we have the transport_type member, check to ensure we're on
IB (this CPC will not work with iWarp). If we do not have the
transport_type member, then we must be < OFED v1.2, and
@ -148,17 +146,11 @@ static int oob_component_query(mca_btl_openib_module_t *btl,
ensure to only post it *once*, because another btl may have
come in before this and already posted it. */
if (!rml_recv_posted) {
rc = ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_OPENIB,
OMPI_RML_PERSISTENT,
rml_recv_cb,
NULL);
if (OMPI_SUCCESS != rc) {
opal_output_verbose(5, ompi_btl_base_framework.framework_output,
"openib BTL: oob CPC system error %d (%s)",
rc, opal_strerror(rc));
return rc;
}
ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_OPENIB,
OMPI_RML_PERSISTENT,
rml_recv_cb,
NULL);
rml_recv_posted = true;
}
@ -625,7 +617,7 @@ static int send_connect_data(mca_btl_base_endpoint_t* endpoint,
/* send to remote endpoint */
rc = ompi_rte_send_buffer_nb(&endpoint->endpoint_proc->proc_ompi->proc_name,
buffer, OMPI_RML_TAG_OPENIB, 0,
buffer, OMPI_RML_TAG_OPENIB,
rml_send_cb, NULL);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);

View File

@ -70,7 +70,7 @@ struct mca_btl_tcp_component_t {
unsigned short tcp_listen_port; /**< IPv4 listen port */
int tcp_port_min; /**< IPv4 minimum port */
int tcp_port_range; /**< IPv4 port range */
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
opal_event_t tcp6_recv_event; /**< recv event for IPv6 listen socket */
int tcp6_listen_sd; /**< IPv6 listen socket for incoming connection requests */
unsigned short tcp6_listen_port; /**< IPv6 listen port */

View File

@ -41,7 +41,7 @@ struct mca_btl_tcp_addr_t {
machines (read: byte order), so use network byte order
for everything and don't add padding
*/
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
struct in6_addr addr_inet; /**< IPv4/IPv6 listen address > */
#else
/* Bug, FIXME: needs testing */
@ -64,7 +64,7 @@ struct mca_btl_tcp_addr_t {
typedef struct mca_btl_tcp_addr_t mca_btl_tcp_addr_t;
#define MCA_BTL_TCP_AF_INET 0
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
# define MCA_BTL_TCP_AF_INET6 1
#endif

View File

@ -42,7 +42,7 @@
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
# ifdef HAVE_NETDB_H
# include <netdb.h>
# endif
@ -201,7 +201,7 @@ static int mca_btl_tcp_component_verify(void)
mca_btl_tcp_component.tcp_port_min );
mca_btl_tcp_component.tcp_port_min = 1024;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if( mca_btl_tcp_component.tcp6_port_min > USHRT_MAX ) {
opal_show_help("help-mpi-btl-tcp.txt", "invalid minimum port",
true, "v6", ompi_process_info.nodename,
@ -253,7 +253,7 @@ static int mca_btl_tcp_component_register(void)
(0x1 << 16) - mca_btl_tcp_component.tcp_port_min - 1,
OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_port_range);
free(message);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
mca_btl_tcp_param_register_int( "port_min_v6",
"The minimum port where the TCP BTL will try to bind (default 1024)", 1024,
OPAL_INFO_LVL_2, & mca_btl_tcp_component.tcp6_port_min );
@ -350,7 +350,7 @@ static int mca_btl_tcp_component_open(void)
/* initialize state */
mca_btl_tcp_component.tcp_listen_sd = -1;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
mca_btl_tcp_component.tcp6_listen_sd = -1;
#endif
mca_btl_tcp_component.tcp_num_btls=0;
@ -405,7 +405,7 @@ static int mca_btl_tcp_component_close(void)
CLOSE_THE_SOCKET(mca_btl_tcp_component.tcp_listen_sd);
mca_btl_tcp_component.tcp_listen_sd = -1;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (mca_btl_tcp_component.tcp6_listen_sd >= 0) {
opal_event_del(&mca_btl_tcp_component.tcp6_recv_event);
CLOSE_THE_SOCKET(mca_btl_tcp_component.tcp6_listen_sd);
@ -754,7 +754,7 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
mca_btl_tcp_set_socket_options(sd);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
{
struct addrinfo hints, *res = NULL;
int error;
@ -810,19 +810,19 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
range = mca_btl_tcp_component.tcp_port_range;
port = mca_btl_tcp_component.tcp_port_min;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
range = mca_btl_tcp_component.tcp6_port_range;
port = mca_btl_tcp_component.tcp6_port_min;
}
#endif /* OPAL_WANT_IPV6 */
#endif /* OPAL_ENABLE_IPV6 */
for( index = 0; index < range; index++ ) {
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
((struct sockaddr_in6*) &inaddr)->sin6_port = htons(port + index);
#else
((struct sockaddr_in*) &inaddr)->sin_port = htons(port + index);
#endif /* OPAL_WANT_IPV6 */
#endif /* OPAL_ENABLE_IPV6 */
if(bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
continue;
@ -839,13 +839,13 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
mca_btl_tcp_component.tcp_port_min,
mca_btl_tcp_component.tcp_port_min + range));
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
BTL_ERROR(("bind6() failed: no port available in the range [%d..%d]",
mca_btl_tcp_component.tcp6_port_min,
mca_btl_tcp_component.tcp6_port_min + range));
}
#endif /* OPAL_WANT_IPV6 */
#endif /* OPAL_ENABLE_IPV6 */
CLOSE_THE_SOCKET(sd);
return OMPI_ERROR;
}
@ -862,7 +862,7 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
mca_btl_tcp_component.tcp_listen_port = ((struct sockaddr_in*) &inaddr)->sin_port;
mca_btl_tcp_component.tcp_listen_sd = sd;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
mca_btl_tcp_component.tcp6_listen_port = ((struct sockaddr_in6*) &inaddr)->sin6_port;
mca_btl_tcp_component.tcp6_listen_sd = sd;
@ -902,7 +902,7 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
0 );
opal_event_add(&mca_btl_tcp_component.tcp_recv_event, 0);
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
opal_event_set(opal_event_base, &mca_btl_tcp_component.tcp6_recv_event,
mca_btl_tcp_component.tcp6_listen_sd,
@ -977,7 +977,7 @@ static int mca_btl_tcp_component_exchange(void)
opal_ifindextokindex (index);
current_addr++;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if ((AF_INET6 == my_ss.ss_family) &&
(6 != mca_btl_tcp_component.tcp_disable_family)) {
memcpy(&addrs[current_addr].addr_inet,
@ -1059,7 +1059,7 @@ mca_btl_base_module_t** mca_btl_tcp_component_init(int *num_btl_modules,
if(OMPI_SUCCESS != (ret = mca_btl_tcp_component_create_listen(AF_INET) )) {
return 0;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if((ret = mca_btl_tcp_component_create_listen(AF_INET6)) != OMPI_SUCCESS) {
if (!(OMPI_ERR_IN_ERRNO == ret &&
EAFNOSUPPORT == opal_socket_errno)) {
@ -1096,7 +1096,7 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd,
void* unused )
{
while(true) {
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
struct sockaddr_in6 addr;
#else
struct sockaddr_in addr;

View File

@ -131,7 +131,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
char src[64];
char dst[64];
int sndbuf,rcvbuf,nodelay,flags;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
struct sockaddr_storage inaddr;
#else
struct sockaddr_in inaddr;
@ -140,7 +140,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
opal_socklen_t addrlen = sizeof(inaddr);
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
{
char *address;
address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
@ -152,7 +152,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
#endif
getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
{
char *address;
address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
@ -572,7 +572,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
uint16_t af_family = AF_INET;
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
af_family = AF_INET6;
addrlen = sizeof (struct sockaddr_in6);

View File

@ -158,7 +158,7 @@ mca_btl_tcp_proc_t* mca_btl_tcp_proc_create(ompi_proc_t* ompi_proc)
if (MCA_BTL_TCP_AF_INET == btl_proc->proc_addrs[i].addr_family) {
btl_proc->proc_addrs[i].addr_family = AF_INET;
}
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
if (MCA_BTL_TCP_AF_INET6 == btl_proc->proc_addrs[i].addr_family) {
btl_proc->proc_addrs[i].addr_family = AF_INET6;
}
@ -745,7 +745,7 @@ bool mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr
continue;
}
break;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
case AF_INET6:
if( memcmp( &btl_endpoint->endpoint_addr->addr_inet,
&(((struct sockaddr_in6*)addr)->sin6_addr),
@ -782,7 +782,7 @@ bool mca_btl_tcp_proc_tosocks(mca_btl_tcp_addr_t* proc_addr,
&proc_addr->addr_inet, sizeof(struct in_addr));
((struct sockaddr_in*)output)->sin_port = proc_addr->addr_port;
break;
#if OPAL_WANT_IPV6
#if OPAL_ENABLE_IPV6
case AF_INET6:
{
struct sockaddr_in6* inaddr = (struct sockaddr_in6*)output;

View File

@ -146,8 +146,6 @@ static void oob_component_register(void)
static int oob_component_query(ompi_common_ofacm_base_dev_desc_t *dev,
ompi_common_ofacm_base_module_t **cpc)
{
int rc;
if (oob_priority > 100) {
oob_priority = 100;
} else if (oob_priority < -1) {
@ -174,16 +172,11 @@ static int oob_component_query(ompi_common_ofacm_base_dev_desc_t *dev,
ensure to only post it *once*, because another btl may have
come in before this and already posted it. */
if (!rml_recv_posted) {
rc = ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_OFACM,
OMPI_RML_PERSISTENT,
rml_recv_cb,
NULL);
if (OMPI_SUCCESS != rc) {
OFACM_VERBOSE(("OFACM: oob CPC system error %d (%s)",
rc, opal_strerror(rc)));
return rc;
}
ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_OFACM,
OMPI_RML_PERSISTENT,
rml_recv_cb,
NULL);
rml_recv_posted = true;
}
@ -728,7 +721,7 @@ static int send_connect_data(ompi_common_ofacm_base_local_connection_context_t*
/* send to remote endpoint */
rc = ompi_rte_send_buffer_nb(&context->proc->proc_ompi->proc_name,
buffer, OMPI_RML_TAG_OFACM, 0,
buffer, OMPI_RML_TAG_OFACM,
rml_send_cb, NULL);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);

View File

@ -11,7 +11,7 @@
* All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2008-2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2010-2012 Los Alamos National Security, LLC.
* Copyright (c) 2010-2013 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
@ -36,6 +36,25 @@
#include <string.h>
#endif
typedef struct {
opal_buffer_t buf;
bool active;
int status;
} sm_return_t;
static void sml_recv(int status,
ompi_process_name_t* peer,
opal_buffer_t* buffer,
ompi_rml_tag_t tag,
void* cbdata)
{
sm_return_t *smr = (sm_return_t*)cbdata;
opal_dss.copy_payload(&smr->buf, buffer);
smr->active = false;
smr->status = status;
}
/* ////////////////////////////////////////////////////////////////////////// */
/**
* this routine assumes that sorted_procs is in the following state:
@ -53,10 +72,13 @@ mca_common_sm_rml_info_bcast(opal_shmem_ds_t *out_ds_buf,
int rc = OMPI_SUCCESS, tmprc;
char *msg_id_str_to_tx = NULL;
opal_buffer_t *buffer = NULL;
sm_return_t smr;
if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
OBJ_CONSTRUCT(&smr.buf, opal_buffer_t);
/* figure out if i am the root proc in the group. if i am, bcast the
* message the rest of the local procs. */
if (proc0) {
@ -83,8 +105,8 @@ mca_common_sm_rml_info_bcast(opal_shmem_ds_t *out_ds_buf,
/* first num_local_procs items should be local procs */
for (p = 1; p < num_local_procs; ++p) {
/* a potential future optimization: use non-blocking routines */
tmprc = ompi_rte_send_buffer(&(procs[p]->proc_name), buffer, tag,
0);
tmprc = ompi_rte_send_buffer_nb(&(procs[p]->proc_name), buffer, tag,
ompi_rte_send_cbfunc, NULL);
if (0 > tmprc) {
OMPI_ERROR_LOG(tmprc);
opal_progress_event_users_decrement();
@ -100,16 +122,24 @@ mca_common_sm_rml_info_bcast(opal_shmem_ds_t *out_ds_buf,
/* bump up the libevent polling frequency while we're in this RML recv,
* just to ensure we're checking libevent frequently. */
opal_progress_event_users_increment();
tmprc = ompi_rte_recv_buffer(&(procs[0]->proc_name), buffer, tag, 0);
smr.active = true;
smr.status = OMPI_ERROR;
ompi_rte_recv_buffer_nb(&(procs[0]->proc_name),tag,
OMPI_RML_NON_PERSISTENT,
sml_recv, &smr);
while (smr.active) {
opal_progress();
}
opal_progress_event_users_decrement();
if (0 > tmprc) {
OMPI_ERROR_LOG(tmprc);
rc = OMPI_ERROR;
if (OMPI_SUCCESS != smr.status) {
OMPI_ERROR_LOG(smr.status);
rc = smr.status;
goto out;
}
/* unpack the buffer */
num_vals = 1;
tmprc = opal_dss.unpack(buffer, &msg_id_str_to_tx, &num_vals,
tmprc = opal_dss.unpack(&smr.buf, &msg_id_str_to_tx, &num_vals,
OPAL_STRING);
if (0 > tmprc) {
OMPI_ERROR_LOG(OMPI_ERR_UNPACK_FAILURE);
@ -117,7 +147,7 @@ mca_common_sm_rml_info_bcast(opal_shmem_ds_t *out_ds_buf,
goto out;
}
num_vals = (int32_t)sizeof(opal_shmem_ds_t);
tmprc = opal_dss.unpack(buffer, out_ds_buf, &num_vals, OPAL_BYTE);
tmprc = opal_dss.unpack(&smr.buf, out_ds_buf, &num_vals, OPAL_BYTE);
if (0 > tmprc) {
OMPI_ERROR_LOG(OMPI_ERR_UNPACK_FAILURE);
rc = OMPI_ERROR;
@ -143,6 +173,6 @@ out:
free(msg_id_str_to_tx);
msg_id_str_to_tx = NULL;
}
OBJ_RELEASE(buffer);
OBJ_DESTRUCT(&smr.buf);
return rc;
}

View File

@ -3034,7 +3034,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_ft_event(
if( opal_cr_timing_barrier_enabled ) {
OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCPBR0);
ompi_rte_barrier(&coll);
ORTE_WAIT_FOR_COMPLETION(coll.active);
OMPI_WAIT_FOR_COMPLETION(coll.active);
}
OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCP0);
@ -3103,7 +3103,7 @@ ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_ft_event(
if( opal_cr_timing_barrier_enabled ) {
OPAL_CR_SET_TIMER(OPAL_CR_TIMER_COREBR1);
ompi_rte_barrier(&coll);
ORTE_WAIT_FOR_COMPLETION(coll.active);
OMPI_WAIT_FOR_COMPLETION(coll.active);
}
OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CORE2);
}
@ -6270,7 +6270,7 @@ static void display_all_timers(int state) {
}
else if( 2 == timing_enabled ) {
ompi_rte_barrier(&coll);
ORTE_WAIT_FOR_COMPLETION(coll.active);
OMPI_WAIT_FOR_COMPLETION(coll.active);
goto done;
}
}
@ -6292,7 +6292,7 @@ static void display_all_timers(int state) {
if( timing_enabled >= 2) {
barrier_start = get_time();
ompi_rte_barrier(&coll);
ORTE_WAIT_FOR_COMPLETION(coll.active);
OMPI_WAIT_FOR_COMPLETION(coll.active);
barrier_stop = get_time();
opal_output(0,
"crcp:bkmrk: timing(%20s): %20s = %10.2f s\n",

View File

@ -12,7 +12,7 @@
* Copyright (c) 2007-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2006-2009 University of Houston. All rights reserved.
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
@ -30,8 +30,8 @@
#include "opal/util/argv.h"
#include "opal/util/opal_getcwd.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/plm/plm.h"
@ -50,6 +50,7 @@
#include "ompi/group/group.h"
#include "ompi/proc/proc.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/rte/rte.h"
#include "ompi/info/info.h"
#include "ompi/mca/dpm/base/base.h"
@ -58,14 +59,6 @@
/* Local static variables */
static opal_mutex_t ompi_dpm_port_mutex;
static orte_rml_tag_t next_tag;
static bool waiting_for_recv = false;
static opal_buffer_t *cabuf=NULL;
static orte_process_name_t carport;
/* Local static functions */
static void recv_cb(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata);
/* API functions */
static int init(void);
@ -104,14 +97,6 @@ ompi_dpm_base_module_t ompi_dpm_orte_module = {
finalize
};
static void rml_cbfunc(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
OBJ_RELEASE(buffer);
}
/*
* Init the module
*/
@ -147,6 +132,8 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
orte_grpcomm_collective_t modex;
opal_list_item_t *item;
orte_namelist_t *nm;
orte_rml_recv_cb_t xfer;
orte_process_name_t carport;
OPAL_OUTPUT_VERBOSE((1, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept with port %s %s",
@ -190,6 +177,7 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
opal_progress_event_users_increment();
if ( rank == root ) {
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
if (send_first) {
/* Get a collective id for the modex we need later on - we
* have to get a globally unique id for this purpose as
@ -203,21 +191,23 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
return OMPI_ERROR;
}
/* send the request - doesn't have to include any data */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, nbuf, ORTE_RML_TAG_COLL_ID_REQ, 0, rml_cbfunc, NULL);
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, nbuf,
ORTE_RML_TAG_COLL_ID_REQ,
orte_rml_send_callback, NULL);
/* wait for the id */
waiting_for_recv = true;
cabuf = OBJ_NEW(opal_buffer_t);
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_ID,
ORTE_RML_NON_PERSISTENT, recv_cb, NULL);
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_ID,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* wait for response */
ORTE_WAIT_FOR_COMPLETION(waiting_for_recv);
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active);
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(cabuf, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cabuf);
OBJ_DESTRUCT(&xfer);
return OMPI_ERROR;
}
OBJ_RELEASE(cabuf);
OBJ_DESTRUCT(&xfer);
/* send it to my peer on the other side */
nbuf = OBJ_NEW(opal_buffer_t);
if (NULL == nbuf) {
@ -227,22 +217,22 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
ORTE_ERROR_LOG(rc);
goto exit;
}
rc = orte_rml.send_buffer_nb(&port, nbuf, tag, 0, rml_cbfunc, NULL);
rc = orte_rml.send_buffer_nb(&port, nbuf, tag, orte_rml_send_callback, NULL);
} else {
/* wait to recv the collective id */
waiting_for_recv = true;
cabuf = OBJ_NEW(opal_buffer_t);
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT, recv_cb, NULL);
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* wait for response */
ORTE_WAIT_FOR_COMPLETION(waiting_for_recv);
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active);
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(cabuf, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cabuf);
OBJ_DESTRUCT(&xfer);
return OMPI_ERROR;
}
OBJ_RELEASE(cabuf);
OBJ_DESTRUCT(&xfer);
}
/* Generate the message buffer containing the number of processes and the list of
@ -296,59 +286,56 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
goto exit;
}
if (NULL != cabuf) {
OBJ_RELEASE(cabuf);
}
cabuf = OBJ_NEW(opal_buffer_t);
if (NULL == cabuf ) {
rc = OMPI_ERROR;
goto exit;
}
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
/* Exchange the number and the list of processes in the groups */
if ( send_first ) {
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept sending first to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&port)));
rc = orte_rml.send_buffer(&port, nbuf, tag, 0);
rc = orte_rml.send_buffer_nb(&port, nbuf, tag, orte_rml_send_callback, NULL);
/* setup to recv */
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept waiting for response",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
waiting_for_recv = true;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT, recv_cb, NULL);
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* wait for response */
ORTE_WAIT_FOR_COMPLETION(waiting_for_recv);
OMPI_WAIT_FOR_COMPLETION(xfer.active);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept got data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&carport)));
ORTE_NAME_PRINT(&xfer.name)));
} else {
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept recving first",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup to recv */
waiting_for_recv = true;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT, recv_cb, NULL);
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* wait for response */
ORTE_WAIT_FOR_COMPLETION(waiting_for_recv);
OMPI_WAIT_FOR_COMPLETION(xfer.active);
/* now send our info */
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept sending info to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&carport)));
rc = orte_rml.send_buffer(&carport, nbuf, tag, 0);