- further integration w/ gpr - use synchro/subscribe to asynchronously populate
a cache of peer addresses as they come online - resolved issues with the event library This commit was SVN r2434.
Этот коммит содержится в:
родитель
6548b50fce
Коммит
f43f072673
@ -48,9 +48,8 @@ extern ompi_process_name_t mca_oob_name_self;
|
||||
#define MCA_OOB_TAG_NS 1
|
||||
#define MCA_OOB_TAG_GPR 2
|
||||
#define MCA_OOB_TAG_GPR_NOTIFY 3
|
||||
#define MCA_OOB_TAG_GPR_NOTIFY 3
|
||||
#define MCA_OOB_TAG_RTE 4
|
||||
#define MCA_OOB_TAG_EXEC 5
|
||||
#define MCA_OOB_TAG_RTE 4
|
||||
#define MCA_OOB_TAG_EXEC 5
|
||||
|
||||
|
||||
/*
|
||||
|
@ -3,12 +3,13 @@
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
#include <string.h>
|
||||
#include <netinet/in.h>
|
||||
|
||||
#include "include/constants.h"
|
||||
#include "mca/oob/oob.h"
|
||||
#include "mca/oob/base/base.h"
|
||||
#include <string.h>
|
||||
#include <netinet/in.h>
|
||||
#include "util/output.h"
|
||||
/*
|
||||
* Similiar to unix send(2).
|
||||
*
|
||||
|
@ -9,6 +9,8 @@ include $(top_ompi_srcdir)/config/Makefile.options
|
||||
sources = \
|
||||
oob_tcp.c \
|
||||
oob_tcp.h \
|
||||
oob_tcp_addr.c \
|
||||
oob_tcp_addr.h \
|
||||
oob_tcp_hdr.h \
|
||||
oob_tcp_msg.c \
|
||||
oob_tcp_msg.h \
|
||||
|
@ -22,6 +22,20 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user);
|
||||
static void mca_oob_tcp_accept(void);
|
||||
|
||||
|
||||
struct mca_oob_tcp_subscription_t {
|
||||
ompi_list_item_t item;
|
||||
mca_ns_base_jobid_t jobid;
|
||||
};
|
||||
typedef struct mca_oob_tcp_subscription_t mca_oob_tcp_subscription_t;
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_oob_tcp_subscription_t,
|
||||
ompi_list_item_t,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Struct of function pointers and all that to let us be initialized
|
||||
*/
|
||||
@ -63,7 +77,7 @@ static inline int mca_oob_tcp_param_register_int(
|
||||
const char* param_name,
|
||||
int default_value)
|
||||
{
|
||||
int id = mca_base_param_register_int("ptl","tcp",param_name,NULL,default_value);
|
||||
int id = mca_base_param_register_int("oob","tcp",param_name,NULL,default_value);
|
||||
int param_value = default_value;
|
||||
mca_base_param_lookup_int(id,¶m_value);
|
||||
return param_value;
|
||||
@ -74,7 +88,7 @@ static inline char* mca_oob_tcp_param_register_str(
|
||||
const char* param_name,
|
||||
const char* default_value)
|
||||
{
|
||||
int id = mca_base_param_register_string("ptl","tcp",param_name,NULL,default_value);
|
||||
int id = mca_base_param_register_string("oob","tcp",param_name,NULL,default_value);
|
||||
char* param_value = NULL;
|
||||
mca_base_param_lookup_string(id,¶m_value);
|
||||
return param_value;
|
||||
@ -86,20 +100,24 @@ static inline char* mca_oob_tcp_param_register_str(
|
||||
*/
|
||||
int mca_oob_tcp_component_open(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_tree, ompi_rb_tree_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free, ompi_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs, ompi_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_subscriptions, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_tree, ompi_rb_tree_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_names, ompi_rb_tree_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free, ompi_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs, ompi_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, ompi_mutex_t);
|
||||
|
||||
/* register oob module parameters */
|
||||
mca_oob_tcp_component.tcp_peer_limit =
|
||||
mca_oob_tcp_param_register_int("peer_limit", -1);
|
||||
mca_oob_tcp_component.tcp_peer_retries =
|
||||
mca_oob_tcp_param_register_int("peer_retries", 60);
|
||||
mca_oob_tcp_component.tcp_debug =
|
||||
mca_oob_tcp_param_register_int("debug", 1);
|
||||
memset(&mca_oob_tcp_component.tcp_seed_addr, 0, sizeof(mca_oob_tcp_component.tcp_seed_addr));
|
||||
|
||||
/* initialize state */
|
||||
@ -117,6 +135,7 @@ int mca_oob_tcp_component_close(void)
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_tree);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_subscriptions);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post);
|
||||
@ -188,7 +207,7 @@ static int mca_oob_tcp_create_listen(void)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* resolve system assignend port */
|
||||
/* resolve system assigned port */
|
||||
addrlen = sizeof(struct sockaddr_in);
|
||||
if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
|
||||
ompi_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", errno);
|
||||
@ -246,12 +265,16 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
|
||||
free(user);
|
||||
|
||||
/* recv the process identifier */
|
||||
rc = recv(sd, guid, sizeof(guid), 0);
|
||||
if(rc != sizeof(guid)) {
|
||||
ompi_output(0, "mca_oob_tcp_recv_handler: recv() return value %d != %d, errno = %d",
|
||||
rc, sizeof(guid), errno);
|
||||
close(sd);
|
||||
return;
|
||||
while((rc = recv(sd, guid, sizeof(guid), 0)) != sizeof(guid)) {
|
||||
if(rc >= 0) {
|
||||
close(sd);
|
||||
return;
|
||||
}
|
||||
if(errno != EINTR) {
|
||||
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n", errno);
|
||||
close(sd);
|
||||
return;
|
||||
}
|
||||
}
|
||||
OMPI_PROCESS_NAME_NTOH(guid[0]);
|
||||
OMPI_PROCESS_NAME_NTOH(guid[1]);
|
||||
@ -276,7 +299,7 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
|
||||
}
|
||||
|
||||
/* lookup the corresponding process */
|
||||
peer = mca_oob_tcp_peer_lookup(guid, true);
|
||||
peer = mca_oob_tcp_peer_lookup(guid);
|
||||
if(NULL == peer) {
|
||||
ompi_output(0, "mca_oob_tcp_recv_handler: unable to locate peer");
|
||||
close(sd);
|
||||
@ -306,6 +329,7 @@ mca_oob_t* mca_oob_tcp_component_init(int* priority, bool *allow_multi_user_thre
|
||||
|
||||
/* initialize data structures */
|
||||
ompi_rb_tree_init(&mca_oob_tcp_component.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)mca_oob_tcp_process_name_compare);
|
||||
ompi_rb_tree_init(&mca_oob_tcp_component.tcp_peer_names, (ompi_rb_tree_comp_fn_t)mca_oob_tcp_process_name_compare);
|
||||
|
||||
ompi_free_list_init(&mca_oob_tcp_component.tcp_peer_free,
|
||||
sizeof(mca_oob_tcp_peer_t),
|
||||
@ -339,25 +363,206 @@ mca_oob_t* mca_oob_tcp_component_init(int* priority, bool *allow_multi_user_thre
|
||||
return &mca_oob_tcp;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Callback from registry on change to subscribed segments.
|
||||
*/
|
||||
|
||||
static void mca_oob_tcp_registry_callback(
|
||||
ompi_registry_notify_message_t* msg,
|
||||
void* cbdata)
|
||||
{
|
||||
ompi_list_item_t* item;
|
||||
if(mca_oob_tcp_component.tcp_debug > 1) {
|
||||
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_registry_callback\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self));
|
||||
}
|
||||
|
||||
/* process the callback */
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
while((item = ompi_list_remove_first(&msg->data)) != NULL) {
|
||||
|
||||
ompi_registry_value_t* value = (ompi_registry_value_t*)item;
|
||||
ompi_buffer_t buffer;
|
||||
mca_oob_tcp_addr_t* addr, *existing;
|
||||
mca_oob_tcp_peer_t* peer;
|
||||
|
||||
/* transfer ownership of registry object to buffer and unpack */
|
||||
ompi_buffer_init_preallocated(&buffer, value->object, value->object_size);
|
||||
value->object = NULL;
|
||||
value->object_size = 0;
|
||||
addr = mca_oob_tcp_addr_unpack(buffer);
|
||||
ompi_buffer_free(buffer);
|
||||
if(NULL == addr) {
|
||||
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_registry_callback: unable to unpack peer address\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self));
|
||||
OBJ_RELEASE(item);
|
||||
continue;
|
||||
}
|
||||
|
||||
if(mca_oob_tcp_component.tcp_debug > 1) {
|
||||
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_registry_callback: received peer [%d,%d,%d]\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(addr->addr_name));
|
||||
}
|
||||
|
||||
/* check for existing cache entry */
|
||||
existing = ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name);
|
||||
if(NULL != existing) {
|
||||
/* TSW - need to update existing entry */
|
||||
OBJ_RELEASE(addr);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* insert into cache and notify peer */
|
||||
ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);
|
||||
peer = ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_tree, &addr->addr_name);
|
||||
if(NULL != peer)
|
||||
mca_oob_tcp_peer_resolved(peer, addr);
|
||||
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Attempt to resolve peer name.
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
mca_oob_tcp_addr_t* addr;
|
||||
mca_oob_tcp_subscription_t* subscription;
|
||||
ompi_list_item_t* item;
|
||||
char segment[32];
|
||||
int rc;
|
||||
|
||||
/* if the address is already cached - simply return it */
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
addr = ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_names, &peer->peer_name);
|
||||
if(NULL != addr) {
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
mca_oob_tcp_peer_resolved(peer, addr);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* check to see if we have subscribed to this registry segment */
|
||||
for( item = ompi_list_get_first(&mca_oob_tcp_component.tcp_subscriptions);
|
||||
item != ompi_list_get_end(&mca_oob_tcp_component.tcp_subscriptions);
|
||||
item = ompi_list_get_next(item)) {
|
||||
subscription = (mca_oob_tcp_subscription_t*)item;
|
||||
if(subscription->jobid == peer->peer_name.jobid) {
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
/* otherwise - need to subscribe to this registry segment
|
||||
* record the subscription
|
||||
*/
|
||||
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
|
||||
subscription->jobid = peer->peer_name.jobid;
|
||||
ompi_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
|
||||
/* subscribe */
|
||||
sprintf(segment, "oob-tcp-%u", peer->peer_name.jobid);
|
||||
rc = ompi_registry.subscribe(
|
||||
OMPI_REGISTRY_OR,
|
||||
OMPI_REGISTRY_NOTIFY_ADD_ENTRY|OMPI_REGISTRY_NOTIFY_DELETE_ENTRY|OMPI_REGISTRY_NOTIFY_MODIFICATION,
|
||||
segment,
|
||||
NULL,
|
||||
mca_oob_tcp_registry_callback,
|
||||
NULL);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
ompi_output(0, "mca_oob_tcp_resolve: ompi_registry.subscribe failed with error status: %d\n", rc);
|
||||
return rc;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Setup contact information in the registry.
|
||||
*/
|
||||
int mca_oob_tcp_init(void)
|
||||
{
|
||||
char *keys[3];
|
||||
char *addr;
|
||||
char *keys[2];
|
||||
void *addr;
|
||||
int32_t size;
|
||||
char segment[32];
|
||||
ompi_buffer_t buffer;
|
||||
ompi_process_name_t* peers;
|
||||
mca_oob_tcp_subscription_t* subscription;
|
||||
size_t npeers;
|
||||
int rc;
|
||||
ompi_list_item_t* item;
|
||||
|
||||
/* put contact info in registry */
|
||||
keys[0] = "tcp";
|
||||
keys[1] = ompi_name_server.get_proc_name_string(&mca_oob_name_self);
|
||||
keys[2] = NULL;
|
||||
/* iterate through the open connections and send an ident message to all peers -
|
||||
* note that we initially come up w/out knowing our process name - and are assigned
|
||||
* a temporary name by our peer. once we have determined our real name - we send it
|
||||
* to the peer.
|
||||
*/
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
for(item = ompi_list_get_first(&mca_oob_tcp_component.tcp_peer_list);
|
||||
item != ompi_list_get_end(&mca_oob_tcp_component.tcp_peer_list);
|
||||
item = ompi_list_get_next(item)) {
|
||||
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;
|
||||
mca_oob_tcp_peer_send_ident(peer);
|
||||
}
|
||||
|
||||
addr = mca_oob_tcp_get_addr();
|
||||
rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, "oob", keys, (ompi_registry_object_t)addr, strlen(addr)+1);
|
||||
free(addr);
|
||||
rc = mca_pcmclient.pcmclient_get_peers(&peers, &npeers);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
ompi_output(0, "mca_oob_tcp_init: unable to contact registry.");
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
return rc;
|
||||
}
|
||||
|
||||
sprintf(segment, "oob-tcp-%u", mca_oob_name_self.jobid);
|
||||
if(mca_oob_tcp_component.tcp_debug > 1) {
|
||||
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_init: calling ompi_registry.synchro(%s,%d)\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
segment,
|
||||
npeers);
|
||||
}
|
||||
|
||||
/* register synchro callback to receive notification when all processes have registered */
|
||||
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
|
||||
subscription->jobid = mca_oob_name_self.jobid;
|
||||
ompi_list_append(&mca_oob_tcp_component.tcp_subscriptions, subscription);
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
|
||||
rc = ompi_registry.synchro(
|
||||
OMPI_REGISTRY_OR,
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
||||
segment,
|
||||
NULL,
|
||||
npeers,
|
||||
mca_oob_tcp_registry_callback,
|
||||
NULL);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
ompi_output(0, "mca_oob_tcp_init: registry synchro failed with error code %d.", rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* put our contact info in registry */
|
||||
keys[0] = ompi_name_server.get_proc_name_string(&mca_oob_name_self);
|
||||
keys[1] = NULL;
|
||||
|
||||
if(mca_oob_tcp_component.tcp_debug > 1) {
|
||||
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_init: calling ompi_registry.put(%s,%s)\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
segment,
|
||||
keys[0]);
|
||||
}
|
||||
|
||||
ompi_buffer_init(&buffer, 128);
|
||||
mca_oob_tcp_addr_pack(buffer);
|
||||
ompi_buffer_get(buffer, &addr, &size);
|
||||
rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, segment, keys, addr, size);
|
||||
ompi_buffer_free(buffer);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_init: registry put failed with error code %d.",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self), rc);
|
||||
return rc;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
@ -434,6 +639,8 @@ char* mca_oob_tcp_get_addr(void)
|
||||
for(i=ompi_ifbegin(); i>0; i=ompi_ifnext(i)) {
|
||||
struct sockaddr_in addr;
|
||||
ompi_ifindextoaddr(i, (struct sockaddr*)&addr, sizeof(addr));
|
||||
if(addr.sin_addr.s_addr == inet_addr("127.0.0.1"))
|
||||
continue;
|
||||
if(ptr != contact_info) {
|
||||
ptr += sprintf(ptr, ";");
|
||||
}
|
||||
@ -488,35 +695,20 @@ int mca_oob_tcp_parse_uri(const char* uri, struct sockaddr_in* inaddr)
|
||||
int mca_oob_tcp_set_seed(const char* uri)
|
||||
{
|
||||
struct sockaddr_in inaddr;
|
||||
mca_oob_tcp_addr_t* addr;
|
||||
int rc;
|
||||
int ifindex;
|
||||
|
||||
if((rc = mca_oob_tcp_parse_uri(uri,&inaddr)) != OMPI_SUCCESS)
|
||||
return rc;
|
||||
|
||||
/* scan through the list of interface address exported by this host
|
||||
* and look for a match on a directly connected network
|
||||
*/
|
||||
|
||||
for(ifindex=ompi_ifbegin(); ifindex>0; ifindex=ompi_ifnext(ifindex)) {
|
||||
struct sockaddr_in ifaddr;
|
||||
struct sockaddr_in ifmask;
|
||||
ompi_ifindextoaddr(ifindex, (struct sockaddr*)&ifaddr, sizeof(ifaddr));
|
||||
ompi_ifindextomask(ifindex, (struct sockaddr*)&ifmask, sizeof(ifmask));
|
||||
if((ifaddr.sin_addr.s_addr & ifmask.sin_addr.s_addr) ==
|
||||
(inaddr.sin_addr.s_addr & ifmask.sin_addr.s_addr)) {
|
||||
mca_oob_tcp_component.tcp_seed_addr = inaddr;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
addr = (mca_oob_tcp_addr_t*)ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_names, &mca_oob_name_seed);
|
||||
if(NULL == addr) {
|
||||
addr = OBJ_NEW(mca_oob_tcp_addr_t);
|
||||
addr->addr_name = mca_oob_name_seed;
|
||||
ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);
|
||||
}
|
||||
|
||||
/* if no match was found - may be still be reachable - go ahead and
|
||||
* set this adddress as seed address.
|
||||
*/
|
||||
if (mca_oob_tcp_component.tcp_seed_addr.sin_family == 0) {
|
||||
mca_oob_tcp_component.tcp_seed_addr = inaddr;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
rc = mca_oob_tcp_addr_insert(addr, &inaddr);
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
|
@ -25,6 +25,10 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
#define OMPI_NAME_COMPONENTS(n) (n).cellid,(n).jobid,(n).vpid
|
||||
|
||||
|
||||
/*
|
||||
* standard component functions
|
||||
*/
|
||||
@ -177,6 +181,12 @@ int mca_oob_tcp_recv_nb(
|
||||
mca_oob_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
/**
|
||||
* Attempt to map a peer name to its corresponding address.
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_resolve(mca_oob_tcp_peer_t*);
|
||||
|
||||
/**
|
||||
* Parse a URI string into an IP address and port number.
|
||||
*/
|
||||
@ -194,8 +204,10 @@ struct mca_oob_tcp_component_t {
|
||||
int tcp_listen_sd; /**< listen socket for incoming connection requests */
|
||||
unsigned short tcp_listen_port; /**< listen port */
|
||||
struct sockaddr_in tcp_seed_addr; /**< uri string of tcp peer address */
|
||||
ompi_list_t tcp_subscriptions; /**< list of registry subscriptions */
|
||||
ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */
|
||||
ompi_rb_tree_t tcp_peer_tree; /**< tree of peers sorted by name */
|
||||
ompi_rb_tree_t tcp_peer_names; /**< cache of peer contact info sorted by name */
|
||||
ompi_free_list_t tcp_peer_free; /**< free list of peers */
|
||||
size_t tcp_peer_limit; /**< max size of tcp peer cache */
|
||||
int tcp_peer_retries; /**< max number of retries before declaring peer gone */
|
||||
@ -206,6 +218,7 @@ struct mca_oob_tcp_component_t {
|
||||
ompi_list_t tcp_msg_post; /**< list of recieves user has posted */
|
||||
ompi_list_t tcp_msg_recv; /**< list of recieved messages */
|
||||
ompi_mutex_t tcp_match_lock; /**< lock held while searching/posting messages */
|
||||
int tcp_debug; /**< debug level */
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -9,10 +9,16 @@
|
||||
#ifndef _MCA_OOB_TCP_HDR_H_
|
||||
#define _MCA_OOB_TCP_HDR_H_
|
||||
|
||||
#define MCA_OOB_TCP_IDENT 1
|
||||
#define MCA_OOB_TCP_MSG 2
|
||||
|
||||
/**
|
||||
* Header used by tcp oob protocol.
|
||||
*/
|
||||
struct mca_oob_tcp_hdr_t {
|
||||
ompi_process_name_t msg_src;
|
||||
ompi_process_name_t msg_dst;
|
||||
uint32_t msg_type; /**< type of message */
|
||||
uint32_t msg_size; /**< the total size of the message body - excluding header */
|
||||
int32_t msg_tag; /**< user provided tag */
|
||||
};
|
||||
@ -21,16 +27,22 @@ typedef struct mca_oob_tcp_hdr_t mca_oob_tcp_hdr_t;
|
||||
/**
|
||||
* Convert the message header to host byte order
|
||||
*/
|
||||
#define MCA_OOB_TCP_HDR_NTOHL(h) \
|
||||
ntohl(h->msg_size); \
|
||||
ntohl(h->msg_tag);
|
||||
#define MCA_OOB_TCP_HDR_NTOH(h) \
|
||||
OMPI_PROCESS_NAME_NTOH((h)->msg_src); \
|
||||
OMPI_PROCESS_NAME_NTOH((h)->msg_dst); \
|
||||
ntohl((h)->msg_type); \
|
||||
ntohl((h)->msg_size); \
|
||||
ntohl((h)->msg_tag);
|
||||
|
||||
/**
|
||||
* Convert the message header to network byte order
|
||||
*/
|
||||
#define MCA_OOB_TCP_HDR_HTONL(h) \
|
||||
htonl(h->msg_size); \
|
||||
htonl(h->msg_tag);
|
||||
#define MCA_OOB_TCP_HDR_HTON(h) \
|
||||
OMPI_PROCESS_NAME_HTON((h)->msg_src); \
|
||||
OMPI_PROCESS_NAME_HTON((h)->msg_dst); \
|
||||
htonl((h)->msg_type); \
|
||||
htonl((h)->msg_size); \
|
||||
htonl((h)->msg_tag);
|
||||
|
||||
#endif /* _MCA_OOB_TCP_MESSAGE_H_ */
|
||||
|
||||
|
@ -78,7 +78,7 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer)
|
||||
ompi_mutex_lock(&msg->msg_lock);
|
||||
msg->msg_complete = true;
|
||||
if(NULL != msg->msg_cbfunc) {
|
||||
msg->msg_cbfunc(msg->msg_rc, peer, msg->msg_uiov, msg->msg_ucnt, ntohl(msg->msg_hdr.msg_tag), msg->msg_cbdata);
|
||||
msg->msg_cbfunc(msg->msg_rc, peer, msg->msg_uiov, msg->msg_ucnt, msg->msg_hdr.msg_tag, msg->msg_cbdata);
|
||||
ompi_mutex_unlock(&msg->msg_lock);
|
||||
MCA_OOB_TCP_MSG_RETURN(msg);
|
||||
} else {
|
||||
|
@ -10,9 +10,10 @@
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include "util/output.h"
|
||||
#include "mca/oob/tcp/oob_tcp_peer.h"
|
||||
#include "mca/gpr/base/base.h"
|
||||
#include "mca/gpr/gpr.h"
|
||||
#include "oob_tcp.h"
|
||||
#include "oob_tcp_peer.h"
|
||||
|
||||
|
||||
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer);
|
||||
@ -28,6 +29,7 @@ static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user);
|
||||
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user);
|
||||
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user);
|
||||
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);
|
||||
static mca_oob_tcp_msg_t* mca_oob_tcp_peer_msg_start(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr);
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
@ -73,6 +75,8 @@ static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer)
|
||||
*/
|
||||
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event));
|
||||
memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event));
|
||||
ompi_event_set(
|
||||
&peer->peer_recv_event,
|
||||
peer->peer_sd,
|
||||
@ -101,35 +105,15 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
|
||||
case MCA_OOB_TCP_CONNECTING:
|
||||
case MCA_OOB_TCP_CONNECT_ACK:
|
||||
case MCA_OOB_TCP_CLOSED:
|
||||
case MCA_OOB_TCP_RESOLVE:
|
||||
/*
|
||||
* queue the message and start the connection to the peer
|
||||
* queue the message and attempt to resolve the peer address
|
||||
*/
|
||||
ompi_list_append(&peer->peer_send_queue, (ompi_list_item_t*)msg);
|
||||
|
||||
if(peer->peer_state == MCA_OOB_TCP_CLOSED) {
|
||||
peer->peer_state = MCA_OOB_TCP_CONNECTING;
|
||||
peer->peer_state = MCA_OOB_TCP_RESOLVE;
|
||||
OMPI_THREAD_UNLOCK(&peer->peer_lock);
|
||||
|
||||
/*
|
||||
* attempt to resolve peer address
|
||||
*/
|
||||
if (mca_oob_tcp_peer_name_lookup(peer) != OMPI_SUCCESS) {
|
||||
OMPI_THREAD_LOCK(&peer->peer_lock);
|
||||
if(peer->peer_retries++ < mca_oob_tcp_component.tcp_peer_retries) {
|
||||
struct timeval tv = { 1, 0 };
|
||||
ompi_evtimer_add(&peer->peer_timer_event, &tv);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&peer->peer_lock);
|
||||
|
||||
/*
|
||||
* start connection
|
||||
*/
|
||||
} else {
|
||||
OMPI_THREAD_LOCK(&peer->peer_lock);
|
||||
rc = mca_oob_tcp_peer_start_connect(peer);
|
||||
OMPI_THREAD_UNLOCK(&peer->peer_lock);
|
||||
}
|
||||
return rc;
|
||||
return mca_oob_tcp_resolve(peer);
|
||||
}
|
||||
break;
|
||||
case MCA_OOB_TCP_FAILED:
|
||||
@ -159,38 +143,31 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
|
||||
/*
|
||||
* Lookup a peer by name, create one if it doesn't exist.
|
||||
* @param name Peers globally unique identifier.
|
||||
* @param get_lock says whether the function should get the main tcp lock or not.
|
||||
* this should be true unless the caller already owns the lock.
|
||||
* @retval Pointer to the newly created struture or NULL on error.
|
||||
*/
|
||||
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name, bool get_lock)
|
||||
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name)
|
||||
{
|
||||
int rc;
|
||||
mca_oob_tcp_peer_t * peer, * old;
|
||||
|
||||
if(get_lock) {
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
}
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
peer = (mca_oob_tcp_peer_t*)ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_tree,
|
||||
(ompi_process_name_t *) name);
|
||||
if(NULL != peer) {
|
||||
if(get_lock) {
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
return peer;
|
||||
}
|
||||
|
||||
/* allocate from free list */
|
||||
MCA_OOB_TCP_PEER_ALLOC(peer, rc);
|
||||
if(NULL == peer) {
|
||||
if(get_lock) {
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* initialize peer state */
|
||||
peer->peer_name = *name;
|
||||
peer->peer_addr = NULL;
|
||||
peer->peer_sd = -1;
|
||||
peer->peer_state = MCA_OOB_TCP_CLOSED;
|
||||
peer->peer_recv_msg = NULL;
|
||||
@ -200,9 +177,7 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name, bool get
|
||||
/* add to lookup table */
|
||||
if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name, peer)) {
|
||||
MCA_OOB_TCP_PEER_RETURN(peer);
|
||||
if(get_lock) {
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -229,12 +204,11 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name, bool get
|
||||
}
|
||||
}
|
||||
}
|
||||
if(get_lock) {
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
return peer;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Start a connection to the peer. This will likely not complete,
|
||||
* as the socket is set to non-blocking, so register for event
|
||||
@ -246,6 +220,10 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name, bool get
|
||||
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
int rc,flags;
|
||||
struct sockaddr_in inaddr;
|
||||
|
||||
/* create socket */
|
||||
peer->peer_state = MCA_OOB_TCP_CONNECTING;
|
||||
peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (peer->peer_sd < 0) {
|
||||
peer->peer_retries++;
|
||||
@ -257,24 +235,46 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
|
||||
|
||||
/* setup the socket as non-blocking */
|
||||
if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {
|
||||
ompi_output(0, "mca_oob_tcp_peer_connect: fcntl(F_GETFL) failed with errno=%d\n", errno);
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_connect: fcntl(F_GETFL) failed with errno=%d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
errno);
|
||||
} else {
|
||||
flags |= O_NONBLOCK;
|
||||
flags |= O_NONBLOCK;
|
||||
if(fcntl(peer->peer_sd, F_SETFL, flags) < 0)
|
||||
ompi_output(0, "mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed with errno=%d\n", errno);
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed with errno=%d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
errno);
|
||||
}
|
||||
|
||||
/* pick an address in round-robin fashion from the list exported by the peer */
|
||||
if((rc = mca_oob_tcp_addr_get_next(peer->peer_addr, &inaddr)) != OMPI_SUCCESS) {
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_start_connect: mca_oob_tcp_addr_get_next failed with error=%d",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if(mca_oob_tcp_component.tcp_debug > 2) {
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_start_connect: connecting to: %s:%d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
inet_ntoa(inaddr.sin_addr),
|
||||
ntohs(inaddr.sin_port));
|
||||
}
|
||||
|
||||
/* start the connect - will likely fail with EINPROGRESS */
|
||||
if(connect(peer->peer_sd, (struct sockaddr*)&(peer->peer_addr), sizeof(peer->peer_addr)) < 0) {
|
||||
if(connect(peer->peer_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
|
||||
/* non-blocking so wait for completion */
|
||||
if(errno == EINPROGRESS) {
|
||||
peer->peer_state = MCA_OOB_TCP_CONNECTING;
|
||||
ompi_event_add(&peer->peer_send_event, 0);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_start_connect: connect failed with errno=%d",
|
||||
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
|
||||
peer->peer_name.cellid, peer->peer_name.jobid,peer->peer_name.vpid,
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
errno);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return OMPI_ERR_UNREACH;
|
||||
@ -288,8 +288,8 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
|
||||
ompi_output(0,
|
||||
"[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_start_connect: "
|
||||
"mca_oob_tcp_peer_send_connect_ack failed with errno=%d",
|
||||
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
|
||||
peer->peer_name.cellid, peer->peer_name.jobid,peer->peer_name.vpid,
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
rc);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
}
|
||||
@ -312,7 +312,10 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
|
||||
|
||||
/* check connect completion status */
|
||||
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) {
|
||||
ompi_output(0, "mca_oob_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n", errno);
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
errno);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return;
|
||||
}
|
||||
@ -328,7 +331,10 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
|
||||
ompi_evtimer_add(&peer->peer_timer_event, &tv);
|
||||
return;
|
||||
} else if(so_error != 0) {
|
||||
ompi_output(0, "mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error);
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
so_error);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return;
|
||||
}
|
||||
@ -337,7 +343,9 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
|
||||
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
|
||||
ompi_event_add(&peer->peer_recv_event, 0);
|
||||
} else {
|
||||
ompi_output(0, "mca_oob_tcp_peer_complete_connect: unable to send connect ack.");
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_complete_connect: unable to send connect ack.",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name));
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
}
|
||||
}
|
||||
@ -365,6 +373,14 @@ static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
|
||||
*/
|
||||
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
if(mca_oob_tcp_component.tcp_debug > 1) {
|
||||
ompi_output(0, "[%d,%d,%d] closing peer [%d,%d,%d] sd %d state %d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
peer->peer_sd,
|
||||
peer->peer_state);
|
||||
}
|
||||
|
||||
/* giving up and cleanup any pending messages */
|
||||
if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) {
|
||||
mca_oob_tcp_msg_t *msg = peer->peer_send_msg;
|
||||
@ -376,13 +392,13 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
|
||||
peer->peer_send_msg = NULL;
|
||||
}
|
||||
|
||||
if(peer->peer_state != MCA_OOB_TCP_CLOSED &&
|
||||
peer->peer_sd >= 0) {
|
||||
if (peer->peer_state != MCA_OOB_TCP_CLOSED && peer->peer_sd >= 0) {
|
||||
ompi_event_del(&peer->peer_recv_event);
|
||||
ompi_event_del(&peer->peer_send_event);
|
||||
close(peer->peer_sd);
|
||||
peer->peer_sd = -1;
|
||||
}
|
||||
|
||||
ompi_event_del(&peer->peer_timer_event);
|
||||
peer->peer_state = MCA_OOB_TCP_CLOSED;
|
||||
}
|
||||
@ -436,9 +452,9 @@ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
|
||||
|
||||
/* connected */
|
||||
mca_oob_tcp_peer_connected(peer);
|
||||
#if OMPI_ENABLE_DEBUG && 0
|
||||
mca_oob_tcp_peer_dump(peer, "connected");
|
||||
#endif
|
||||
if(mca_oob_tcp_component.tcp_debug > 2) {
|
||||
mca_oob_tcp_peer_dump(peer, "connected");
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -463,8 +479,8 @@ static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data,
|
||||
if(retval < 0) {
|
||||
if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_blocking: recv() failed with errno=%d\n",
|
||||
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
|
||||
peer->peer_name.cellid, peer->peer_name.jobid, peer->peer_name.vpid,
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
errno);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return -1;
|
||||
@ -473,8 +489,6 @@ static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data,
|
||||
}
|
||||
cnt += retval;
|
||||
}
|
||||
if((int)cnt == -1)
|
||||
ompi_output(0, "mca_oob_tcp_peer_recv_blocking: invalid cnt\n");
|
||||
return cnt;
|
||||
}
|
||||
|
||||
@ -491,8 +505,8 @@ static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data,
|
||||
if(retval < 0) {
|
||||
if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_send_blocking: send() failed with errno=%d\n",
|
||||
mca_oob_name_self.cellid, mca_oob_name_self.jobid, mca_oob_name_self.vpid,
|
||||
peer->peer_name.cellid, peer->peer_name.jobid, peer->peer_name.vpid,
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
errno);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return -1;
|
||||
@ -575,6 +589,33 @@ static void mca_oob_tcp_peer_recv_progress(mca_oob_tcp_peer_t* peer, mca_oob_tcp
|
||||
}
|
||||
|
||||
|
||||
int mca_oob_tcp_peer_send_ident(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
mca_oob_tcp_hdr_t hdr;
|
||||
if(peer->peer_state != MCA_OOB_TCP_CONNECTED)
|
||||
return OMPI_SUCCESS;
|
||||
hdr.msg_src = mca_oob_name_self;
|
||||
hdr.msg_dst = peer->peer_name;
|
||||
hdr.msg_type = MCA_OOB_TCP_IDENT;
|
||||
hdr.msg_size = 0;
|
||||
hdr.msg_tag = 0;
|
||||
MCA_OOB_TCP_HDR_HTON(&hdr);
|
||||
if(mca_oob_tcp_peer_send_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr))
|
||||
return OMPI_ERR_UNREACH;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static void mca_oob_tcp_peer_recv_ident(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr)
|
||||
{
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
ompi_rb_tree_delete(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name);
|
||||
peer->peer_name = hdr->msg_src;
|
||||
ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name, peer);
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Start receiving a new message.
|
||||
* (1) receive header
|
||||
@ -584,21 +625,45 @@ static void mca_oob_tcp_peer_recv_progress(mca_oob_tcp_peer_t* peer, mca_oob_tcp
|
||||
*/
|
||||
static mca_oob_tcp_msg_t* mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
mca_oob_tcp_msg_t* msg;
|
||||
mca_oob_tcp_hdr_t hdr;
|
||||
uint32_t size;
|
||||
mca_oob_tcp_hdr_t hdr;
|
||||
|
||||
/* blocking receive of the message header */
|
||||
if(mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr))
|
||||
return NULL;
|
||||
size = ntohl(hdr.msg_size);
|
||||
if(mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr)) == sizeof(hdr)) {
|
||||
MCA_OOB_TCP_HDR_NTOH(&hdr);
|
||||
|
||||
if(mca_oob_tcp_component.tcp_debug > 2) {
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_handler: src [%d,%d,%d] dst [%d,%d,%d] tag %d type %d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
OMPI_NAME_COMPONENTS(hdr.msg_src),
|
||||
OMPI_NAME_COMPONENTS(hdr.msg_dst),
|
||||
hdr.msg_tag,
|
||||
hdr.msg_type);
|
||||
}
|
||||
|
||||
switch(hdr.msg_type) {
|
||||
case MCA_OOB_TCP_IDENT:
|
||||
mca_oob_tcp_peer_recv_ident(peer, &hdr);
|
||||
return NULL;
|
||||
case MCA_OOB_TCP_MSG:
|
||||
return mca_oob_tcp_peer_msg_start(peer,&hdr);
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static mca_oob_tcp_msg_t* mca_oob_tcp_peer_msg_start(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr)
|
||||
{
|
||||
mca_oob_tcp_msg_t* msg;
|
||||
uint32_t size = hdr->msg_size;
|
||||
|
||||
/* attempt to match posted receive
|
||||
* however - dont match message w/ peek attribute, as we need to
|
||||
* queue the message anyway to match subsequent recv.
|
||||
*/
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
|
||||
msg = mca_oob_tcp_msg_match_post(&peer->peer_name, hdr.msg_tag, false);
|
||||
msg = mca_oob_tcp_msg_match_post(&peer->peer_name, hdr->msg_tag, false);
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
|
||||
if(NULL != msg) {
|
||||
uint32_t posted_size = 0;
|
||||
@ -648,9 +713,8 @@ static mca_oob_tcp_msg_t* mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
|
||||
}
|
||||
|
||||
msg->msg_rwptr = msg->msg_rwiov;
|
||||
msg->msg_hdr = hdr;
|
||||
msg->msg_hdr = *hdr;
|
||||
return msg;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -732,7 +796,9 @@ static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user)
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ompi_output(0, "mca_oob_tcp_peer_send_handler: invalid connection state (%d)",
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_send_handler: invalid connection state (%d)",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
peer->peer_state);
|
||||
ompi_event_del(&peer->peer_send_event);
|
||||
break;
|
||||
@ -788,7 +854,9 @@ static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
|
||||
nodelay = 0;
|
||||
#endif
|
||||
|
||||
sprintf(buff, "%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
|
||||
sprintf(buff, "[%d,%d,%d]-[%d,%d,%d] %s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
msg, src, dst, nodelay, sndbuf, rcvbuf, flags);
|
||||
ompi_output(0, buff);
|
||||
}
|
||||
@ -819,9 +887,9 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
|
||||
|
||||
mca_oob_tcp_peer_connected(peer);
|
||||
ompi_event_add(&peer->peer_recv_event, 0);
|
||||
#if OMPI_ENABLE_DEBUG && 0
|
||||
mca_oob_tcp_peer_dump(peer, "accepted");
|
||||
#endif
|
||||
if(mca_oob_tcp_component.tcp_debug > 2) {
|
||||
mca_oob_tcp_peer_dump(peer, "accepted");
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&peer->peer_lock);
|
||||
return true;
|
||||
}
|
||||
@ -834,40 +902,14 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
|
||||
* resolve process name to an actual internet address.
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_peer_name_lookup(mca_oob_tcp_peer_t* peer)
|
||||
void mca_oob_tcp_peer_resolved(mca_oob_tcp_peer_t* peer, mca_oob_tcp_addr_t* addr)
|
||||
{
|
||||
if(mca_oob_tcp_process_name_compare(&peer->peer_name, MCA_OOB_NAME_SEED) == 0) {
|
||||
peer->peer_addr = mca_oob_tcp_component.tcp_seed_addr;
|
||||
return OMPI_SUCCESS;
|
||||
} else {
|
||||
ompi_registry_value_t *item;
|
||||
ompi_list_t* items;
|
||||
char *keys[3];
|
||||
char *uri = NULL;
|
||||
|
||||
/* lookup the name in the registry */
|
||||
keys[0] = "tcp";
|
||||
keys[1] = ompi_name_server.get_proc_name_string(&peer->peer_name);
|
||||
keys[2] = NULL;
|
||||
items = ompi_registry.get(OMPI_REGISTRY_AND, "oob", keys);
|
||||
if(items == NULL || ompi_list_get_size(items) == 0) {
|
||||
return OMPI_ERR_UNREACH;
|
||||
}
|
||||
|
||||
/* unpack the results into a uri string */
|
||||
item = (ompi_registry_value_t*)ompi_list_remove_first(items);
|
||||
if((uri = item->object) == NULL) {
|
||||
return OMPI_ERR_UNREACH;
|
||||
}
|
||||
|
||||
/* validate the result */
|
||||
if(mca_oob_tcp_parse_uri(uri, &peer->peer_addr) != OMPI_SUCCESS) {
|
||||
OBJ_RELEASE(item);
|
||||
return OMPI_ERR_UNREACH;
|
||||
}
|
||||
OBJ_RELEASE(item);
|
||||
return OMPI_SUCCESS;
|
||||
OMPI_THREAD_LOCK(&peer->peer_lock);
|
||||
peer->peer_addr = addr;
|
||||
if(peer->peer_state == MCA_OOB_TCP_RESOLVE) {
|
||||
mca_oob_tcp_peer_start_connect(peer);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&peer->peer_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -876,21 +918,11 @@ int mca_oob_tcp_peer_name_lookup(mca_oob_tcp_peer_t* peer)
|
||||
|
||||
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user)
|
||||
{
|
||||
/* resolve the peer address */
|
||||
mca_oob_tcp_peer_t *peer = (mca_oob_tcp_peer_t*)user;
|
||||
if (mca_oob_tcp_peer_name_lookup(peer) != OMPI_SUCCESS) {
|
||||
OMPI_THREAD_LOCK(&peer->peer_lock);
|
||||
if(peer->peer_retries++ < mca_oob_tcp_component.tcp_peer_retries) {
|
||||
struct timeval tv = { 1, 0 };
|
||||
ompi_evtimer_add(&peer->peer_timer_event, &tv);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&peer->peer_lock);
|
||||
} else {
|
||||
/* start the connection to the peer */
|
||||
OMPI_THREAD_LOCK(&peer->peer_lock);
|
||||
mca_oob_tcp_peer_start_connect(peer);
|
||||
OMPI_THREAD_UNLOCK(&peer->peer_lock);
|
||||
}
|
||||
/* start the connection to the peer */
|
||||
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)user;
|
||||
OMPI_THREAD_LOCK(&peer->peer_lock);
|
||||
mca_oob_tcp_peer_start_connect(peer);
|
||||
OMPI_THREAD_UNLOCK(&peer->peer_lock);
|
||||
}
|
||||
|
||||
|
||||
|
@ -9,19 +9,23 @@
|
||||
#ifndef _MCA_OOB_TCP_PEER_H_
|
||||
#define _MCA_OOB_TCP_PEER_H_
|
||||
|
||||
#include "mca/ns/ns.h"
|
||||
#include "ompi_config.h"
|
||||
#include <netinet/in.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "class/ompi_list.h"
|
||||
#include "class/ompi_rb_tree.h"
|
||||
#include <netinet/in.h>
|
||||
#include "threads/mutex.h"
|
||||
#include <string.h>
|
||||
#include "mca/oob/tcp/oob_tcp.h"
|
||||
#include "mca/oob/tcp/oob_tcp_msg.h"
|
||||
#include "mca/ns/ns.h"
|
||||
#include "oob_tcp_msg.h"
|
||||
#include "oob_tcp_addr.h"
|
||||
|
||||
/**
|
||||
* the state of the connection
|
||||
*/
|
||||
typedef enum {
|
||||
MCA_OOB_TCP_CLOSED,
|
||||
MCA_OOB_TCP_RESOLVE,
|
||||
MCA_OOB_TCP_CONNECTING,
|
||||
MCA_OOB_TCP_CONNECT_ACK,
|
||||
MCA_OOB_TCP_CONNECTED,
|
||||
@ -37,11 +41,11 @@ struct mca_oob_tcp_peer_t {
|
||||
ompi_process_name_t peer_name; /**< the name of the peer */
|
||||
mca_oob_tcp_state_t peer_state; /**< the state of the connection */
|
||||
int peer_retries; /**< number of times connection attempt has failed */
|
||||
struct sockaddr_in peer_addr; /**< the address of the peer process */
|
||||
mca_oob_tcp_addr_t* peer_addr; /**< the addresses of the peer process */
|
||||
int peer_sd; /**< socket descriptor of the connection */
|
||||
ompi_event_t peer_send_event; /**< registration with event thread for send events */
|
||||
ompi_event_t peer_recv_event; /**< registration with event thread for recv events */
|
||||
ompi_event_t peer_timer_event; /**< used for timer callback */
|
||||
ompi_event_t peer_timer_event; /**< timer for retrying connection failures */
|
||||
ompi_mutex_t peer_lock; /**< protect critical data structures */
|
||||
ompi_list_t peer_send_queue; /**< list of messages to send */
|
||||
mca_oob_tcp_msg_t *peer_send_msg; /**< current send in progress */
|
||||
@ -85,12 +89,10 @@ extern "C" {
|
||||
* create one and cache it.
|
||||
*
|
||||
* @param peer_name the name of the peer
|
||||
* @param get_lock get the lock on the tcp struct. This should always be true
|
||||
* unless the caller already owns the lock.
|
||||
* @retval pointer to the peer's (possibly newly created) struture
|
||||
* @retval NULL if there was a problem
|
||||
*/
|
||||
mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(ompi_process_name_t* peer_name, bool get_lock);
|
||||
mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(ompi_process_name_t* peer_name);
|
||||
|
||||
/**
|
||||
* Start sending a message to the specified peer. The routine
|
||||
@ -119,9 +121,14 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd);
|
||||
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer);
|
||||
|
||||
/**
|
||||
* Attempt to resolve peer address.
|
||||
* The peers address has been resolved.
|
||||
*/
|
||||
int mca_oob_tcp_peer_name_lookup(mca_oob_tcp_peer_t* peer);
|
||||
void mca_oob_tcp_peer_resolved(mca_oob_tcp_peer_t* peer, mca_oob_tcp_addr_t* addr);
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
int mca_oob_tcp_peer_send_ident(mca_oob_tcp_peer_t* peer);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
|
@ -26,11 +26,18 @@ int mca_oob_tcp_recv(
|
||||
int i, rc, size = 0;
|
||||
int tag = (tagp != NULL) ? *tagp : MCA_OOB_TAG_ANY;
|
||||
|
||||
if(mca_oob_tcp_component.tcp_debug > 1) {
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_recv: tag %d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(*peer),
|
||||
tag);
|
||||
}
|
||||
|
||||
/* lock the tcp struct */
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
|
||||
|
||||
/* check to see if a matching receive is on the list */
|
||||
msg = mca_oob_tcp_msg_match_recv(peer, htonl(tag));
|
||||
msg = mca_oob_tcp_msg_match_recv(peer, tag);
|
||||
if(NULL != msg) {
|
||||
|
||||
if(msg->msg_rc < 0) {
|
||||
@ -64,7 +71,7 @@ int mca_oob_tcp_recv(
|
||||
}
|
||||
|
||||
if(NULL != tagp) {
|
||||
*tagp = ntohl(msg->msg_hdr.msg_tag);
|
||||
*tagp = msg->msg_hdr.msg_tag;
|
||||
}
|
||||
|
||||
/* otherwise dequeue the message and return to free list */
|
||||
@ -87,8 +94,11 @@ int mca_oob_tcp_recv(
|
||||
}
|
||||
|
||||
/* fill in the struct */
|
||||
msg->msg_hdr.msg_size = htonl(size);
|
||||
msg->msg_hdr.msg_tag = htonl(tag);
|
||||
msg->msg_hdr.msg_size = size;
|
||||
msg->msg_hdr.msg_tag = tag;
|
||||
msg->msg_hdr.msg_type = MCA_OOB_TCP_MSG;
|
||||
msg->msg_hdr.msg_src = *peer;
|
||||
msg->msg_hdr.msg_dst = mca_oob_name_self;
|
||||
msg->msg_type = MCA_OOB_TCP_POSTED;
|
||||
msg->msg_rc = 0;
|
||||
msg->msg_flags = flags;
|
||||
@ -137,7 +147,7 @@ int mca_oob_tcp_recv_nb(
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
|
||||
|
||||
/* check to see if a matching receive is on the list */
|
||||
msg = mca_oob_tcp_msg_match_recv(peer, htonl(tag));
|
||||
msg = mca_oob_tcp_msg_match_recv(peer, tag);
|
||||
if(NULL != msg) {
|
||||
|
||||
if(msg->msg_rc < 0)
|
||||
@ -159,7 +169,7 @@ int mca_oob_tcp_recv_nb(
|
||||
/* otherwise dequeue the message and return to free list */
|
||||
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg);
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
|
||||
cbfunc(rc, &msg->msg_peer, iov, count, ntohl(msg->msg_hdr.msg_tag), cbdata);
|
||||
cbfunc(rc, &msg->msg_peer, iov, count, msg->msg_hdr.msg_tag, cbdata);
|
||||
MCA_OOB_TCP_MSG_RETURN(msg);
|
||||
return 0;
|
||||
}
|
||||
@ -176,9 +186,13 @@ int mca_oob_tcp_recv_nb(
|
||||
size += iov[i].iov_len;
|
||||
}
|
||||
|
||||
/* fill in the struct */
|
||||
msg->msg_hdr.msg_size = htonl(size);
|
||||
msg->msg_hdr.msg_tag = htonl(tag);
|
||||
/* fill in the header */
|
||||
msg->msg_hdr.msg_src = mca_oob_name_self;
|
||||
msg->msg_hdr.msg_dst = *peer;
|
||||
msg->msg_hdr.msg_size = size;
|
||||
msg->msg_hdr.msg_tag = tag;
|
||||
MCA_OOB_TCP_HDR_HTON(&msg->msg_hdr);
|
||||
|
||||
msg->msg_type = MCA_OOB_TCP_POSTED;
|
||||
msg->msg_rc = 0;
|
||||
msg->msg_flags = flags;
|
||||
|
@ -20,11 +20,17 @@ int mca_oob_tcp_send(
|
||||
int tag,
|
||||
int flags)
|
||||
{
|
||||
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name, true);
|
||||
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name);
|
||||
mca_oob_tcp_msg_t* msg;
|
||||
int size;
|
||||
int rc;
|
||||
|
||||
if(mca_oob_tcp_component.tcp_debug > 1) {
|
||||
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_send: tag %d\n",
|
||||
OMPI_NAME_COMPONENTS(mca_oob_name_self),
|
||||
OMPI_NAME_COMPONENTS(peer->peer_name),
|
||||
tag);
|
||||
}
|
||||
if(NULL == peer)
|
||||
return OMPI_ERR_UNREACH;
|
||||
|
||||
@ -39,8 +45,12 @@ int mca_oob_tcp_send(
|
||||
}
|
||||
|
||||
/* turn the size to network byte order so there will be no problems */
|
||||
msg->msg_hdr.msg_size = htonl(size);
|
||||
msg->msg_hdr.msg_tag = htonl(tag);
|
||||
msg->msg_hdr.msg_type = MCA_OOB_TCP_MSG;
|
||||
msg->msg_hdr.msg_size = size;
|
||||
msg->msg_hdr.msg_tag = tag;
|
||||
msg->msg_hdr.msg_src = mca_oob_name_self;
|
||||
msg->msg_hdr.msg_dst = *name;
|
||||
MCA_OOB_TCP_HDR_HTON(&msg->msg_hdr);
|
||||
|
||||
/* create one additional iovect that will hold the header */
|
||||
msg->msg_type = MCA_OOB_TCP_POSTED;
|
||||
@ -96,7 +106,7 @@ int mca_oob_tcp_send_nb(
|
||||
mca_oob_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name, true);
|
||||
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name);
|
||||
mca_oob_tcp_msg_t* msg;
|
||||
int size;
|
||||
int rc;
|
||||
@ -114,8 +124,12 @@ int mca_oob_tcp_send_nb(
|
||||
size += iov[rc].iov_len;
|
||||
}
|
||||
/* turn the size to network byte order so there will be no problems */
|
||||
msg->msg_hdr.msg_size = htonl(size);
|
||||
msg->msg_hdr.msg_tag = htonl(tag);
|
||||
msg->msg_hdr.msg_type = MCA_OOB_TCP_MSG;
|
||||
msg->msg_hdr.msg_size = size;
|
||||
msg->msg_hdr.msg_tag = tag;
|
||||
msg->msg_hdr.msg_src = mca_oob_name_self;
|
||||
msg->msg_hdr.msg_dst = *name;
|
||||
MCA_OOB_TCP_HDR_HTON(&msg->msg_hdr);
|
||||
|
||||
/* create one additional iovect that will hold the size of the message */
|
||||
msg->msg_type = MCA_OOB_TCP_POSTED;
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user