1
1

Track the origin of a message so it can be passed across transports

Refs trac:4184

This commit was SVN r30433.

The following Trac tickets were found above:
  Ticket 4184 --> https://svn.open-mpi.org/trac/ompi/ticket/4184
Этот коммит содержится в:
Ralph Castain 2014-01-26 21:09:26 +00:00
родитель 09267731f8
Коммит 956aab03a7
10 изменённых файлов: 118 добавлений и 70 удалений

Просмотреть файл

@ -100,6 +100,10 @@ OBJ_CLASS_INSTANCE(mca_oob_base_component_t,
opal_list_item_t,
NULL, NULL);
OBJ_CLASS_INSTANCE(orte_oob_send_t,
opal_object_t,
NULL, NULL);
static void pr_cons(orte_oob_base_peer_t *ptr)
{
ptr->component = NULL;

Просмотреть файл

@ -98,7 +98,7 @@ int orte_oob_base_select(void)
c2 = OBJ_NEW(mca_base_component_list_item_t);
c2->cli_component = (mca_base_component_t*)component;
opal_list_insert_pos(&orte_oob_base.actives,
&c2->super, &cmp->super);
&cmp->super, &c2->super);
added = true;
break;
}

Просмотреть файл

@ -1,7 +1,7 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -23,10 +23,6 @@
#include "orte/mca/oob/base/base.h"
OBJ_CLASS_INSTANCE(orte_oob_send_t,
opal_object_t,
NULL, NULL);
static void process_uri(char *uri);
void orte_oob_base_send_nb(int fd, short args, void *cbdata)
@ -45,22 +41,22 @@ void orte_oob_base_send_nb(int fd, short args, void *cbdata)
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:send to target %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->peer));
ORTE_NAME_PRINT(&msg->dst));
/* check if we have this peer in our hash table */
memcpy(&ui64, (char*)&msg->peer, sizeof(uint64_t));
memcpy(&ui64, (char*)&msg->dst, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&pr) ||
NULL == pr) {
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:send unknown peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->peer));
ORTE_NAME_PRINT(&msg->dst));
/* for direct launched procs, the URI might be in the database,
* so check there next - if it is, the peer object will be added
* to our hash table
*/
if (OPAL_SUCCESS == opal_db.fetch_pointer((opal_identifier_t*)&msg->peer, ORTE_DB_RMLURI,
if (OPAL_SUCCESS == opal_db.fetch_pointer((opal_identifier_t*)&msg->dst, ORTE_DB_RMLURI,
(void **)&rmluri, OPAL_STRING)) {
process_uri(rmluri);
free(rmluri);
@ -83,7 +79,7 @@ void orte_oob_base_send_nb(int fd, short args, void *cbdata)
OPAL_LIST_FOREACH(cli, &orte_oob_base.actives, mca_base_component_list_item_t) {
component = (mca_oob_base_component_t*)cli->cli_component;
if (NULL != component->is_reachable) {
if (component->is_reachable(&msg->peer)) {
if (component->is_reachable(&msg->dst)) {
/* there is a way to reach this peer - record it
* so we don't waste this time again
*/
@ -121,7 +117,7 @@ void orte_oob_base_send_nb(int fd, short args, void *cbdata)
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:send known transport for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->peer));
ORTE_NAME_PRINT(&msg->dst));
if (ORTE_SUCCESS == (rc = pr->component->send_nb(msg))) {
OBJ_RELEASE(cd);
return;
@ -166,7 +162,7 @@ void orte_oob_base_send_nb(int fd, short args, void *cbdata)
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:send no path to target %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->peer));
ORTE_NAME_PRINT(&msg->dst));
msg->status = ORTE_ERR_NO_PATH_TO_TARGET;
ORTE_RML_SEND_COMPLETE(msg);
}
@ -223,20 +219,22 @@ void orte_oob_base_get_addr(char **uri)
* do NOT free it!
*/
turi = component->get_addr();
/* check overall length for limits */
if (0 < orte_oob_base.max_uri_length &&
orte_oob_base.max_uri_length < (int)(len + strlen(turi))) {
/* cannot accept the payload */
continue;
if (NULL != turi) {
/* check overall length for limits */
if (0 < orte_oob_base.max_uri_length &&
orte_oob_base.max_uri_length < (int)(len + strlen(turi))) {
/* cannot accept the payload */
continue;
}
/* add new value to final one */
asprintf(&tmp, "%s;%s", final, turi);
free(turi);
free(final);
final = tmp;
len = strlen(final);
/* flag that at least one contributed */
one_added = true;
}
/* add new value to final one */
asprintf(&tmp, "%s;%s", final, turi);
free(turi);
free(final);
final = tmp;
len = strlen(final);
/* flag that at least one contributed */
one_added = true;
}
if (!one_added) {
@ -359,6 +357,10 @@ static void process_uri(char *uri)
/* this component found reachable addresses
* in the uris
*/
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s: peer %s is reachable via component %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer), component->oob_base.mca_component_name);
opal_bitmap_set_bit(&pr->addressable, component->idx);
}
}

Просмотреть файл

@ -13,7 +13,7 @@
* All rights reserved.
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -416,10 +416,10 @@ static void process_send(int fd, short args, void *cbdata)
"%s:[%s:%d] processing send to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
__FILE__, __LINE__,
ORTE_NAME_PRINT(&op->msg->peer));
ORTE_NAME_PRINT(&op->msg->dst));
/* do we have a route to this peer (could be direct)? */
hop = orte_routed.get_route(&op->msg->peer);
hop = orte_routed.get_route(&op->msg->dst);
/* do we know this hop? */
if (NULL == (peer = mca_oob_tcp_peer_lookup(op->mod, &hop))) {
/* push this back to the component so it can try
@ -477,7 +477,7 @@ static void send_nb(struct mca_oob_tcp_module_t *md,
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s tcp:send_nb to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->peer));
ORTE_NAME_PRINT(&msg->dst));
/* push this into our event base for processing */
ORTE_ACTIVATE_TCP_POST_SEND(mod, msg, process_send);

Просмотреть файл

@ -658,10 +658,10 @@ static int component_send(orte_rml_send_t *msg)
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:tcp:send_nb to peer %s:%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->peer), msg->tag);
ORTE_NAME_PRINT(&msg->dst), msg->tag);
/* do we know some way of potentially reaching this peer? */
memcpy(&ui64, (char*)&msg->peer, sizeof(uint64_t));
memcpy(&ui64, (char*)&msg->dst, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr)) {
/* nope - let someone else try */
@ -1191,6 +1191,7 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
uint64_t ui64;
int k;
mca_oob_tcp_component_peer_t *pr;
orte_rml_send_t *snd;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:unknown hop called for peer %s on interface %s",
@ -1198,14 +1199,21 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(&mop->hop),
mop->mod->if_name);
if (orte_finalizing || orte_abnormal_term_ordered) {
/* just ignore the problem */
OBJ_RELEASE(mop);
return;
}
/* retrieve the hop's name */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
/* get the peer object */
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr) || NULL == pr) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
/* cleanup */
goto cleanup;
}
/* ensure we mark that this peer isn't reachable by this module */
@ -1228,20 +1236,24 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
}
}
/* if we get here, then we have no other modules - so we report
* the error back to the OOB and let it try other components
* or declare a problem
*/
if (!orte_finalizing && !orte_abnormal_term_ordered) {
/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
}
}
cleanup:
/* post the message to the OOB so it can see
* if another component can transfer it
*/
MCA_OOB_TCP_HDR_NTOH(&mop->snd->hdr);
snd = OBJ_NEW(orte_rml_send_t);
snd->dst = mop->snd->hdr.dst;
snd->origin = mop->snd->hdr.origin;
snd->tag = mop->snd->hdr.tag;
snd->data = mop->snd->data;
snd->count = mop->snd->hdr.nbytes;
snd->cbfunc.iov = NULL;
snd->cbdata = NULL;
/* activate the OOB send state */
ORTE_OOB_SEND(snd);
/* protect the data */
mop->snd->data = NULL;
OBJ_RELEASE(mop);
}

Просмотреть файл

@ -13,7 +13,7 @@
* All rights reserved.
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -172,11 +172,15 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
/* send the buffer data as a single block */
msg->sdptr = msg->msg->buffer->base_ptr;
msg->sdbytes = msg->msg->buffer->bytes_used;
} else {
} else if (NULL != msg->msg->iov) {
/* start with the first iovec */
msg->sdptr = msg->msg->iov[0].iov_base;
msg->sdbytes = msg->msg->iov[0].iov_len;
msg->iovnum = 0;
} else {
/* just send the data */
msg->sdptr = msg->msg->data;
msg->sdbytes = msg->msg->count;
}
/* fall thru and let the send progress */
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
@ -220,7 +224,20 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else {
} else if (NULL != msg->msg->data) {
/* this was a relay we have now completed - no need to
* notify the RML as the local proc didn't initiate
* the send
*/
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
(int)ntohl(msg->hdr.nbytes), peer->sd);
msg->msg->status = ORTE_SUCCESS;
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else {
/* rotate to the next iovec */
msg->iovnum++;
if (msg->iovnum < msg->msg->count) {

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -111,11 +111,11 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
"%s:[%s:%d] queue send to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT(&((m)->peer))); \
ORTE_NAME_PRINT(&((m)->dst))); \
msg = OBJ_NEW(mca_oob_tcp_send_t); \
/* setup the header */ \
msg->hdr.origin = *ORTE_PROC_MY_NAME; \
msg->hdr.dst = (m)->peer; \
msg->hdr.origin = (m)->origin; \
msg->hdr.dst = (m)->dst; \
msg->hdr.type = MCA_OOB_TCP_USER; \
msg->hdr.tag = (m)->tag; \
/* point to the actual message */ \
@ -123,11 +123,13 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
/* set the total number of bytes to be sent */ \
if (NULL != (m)->buffer) { \
msg->hdr.nbytes = (m)->buffer->bytes_used; \
} else { \
} else if (NULL != (m)->iov) { \
msg->hdr.nbytes = 0; \
for (i=0; i < (m)->count; i++) { \
msg->hdr.nbytes += (m)->iov[i].iov_len; \
} \
} else { \
msg->hdr.nbytes = (m)->count; \
} \
/* prep header for xmission */ \
MCA_OOB_TCP_HDR_HTON(&msg->hdr); \
@ -152,11 +154,11 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
"%s:[%s:%d] queue pending to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT(&((m)->peer))); \
ORTE_NAME_PRINT(&((m)->dst))); \
msg = OBJ_NEW(mca_oob_tcp_send_t); \
/* setup the header */ \
msg->hdr.origin = *ORTE_PROC_MY_NAME; \
msg->hdr.dst = (m)->peer; \
msg->hdr.origin = (m)->origin; \
msg->hdr.dst = (m)->dst; \
msg->hdr.type = MCA_OOB_TCP_USER; \
msg->hdr.tag = (m)->tag; \
/* point to the actual message */ \
@ -164,11 +166,13 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
/* set the total number of bytes to be sent */ \
if (NULL != (m)->buffer) { \
msg->hdr.nbytes = (m)->buffer->bytes_used; \
} else { \
} else if (NULL != (m)->iov) { \
msg->hdr.nbytes = 0; \
for (i=0; i < (m)->count; i++) { \
msg->hdr.nbytes += (m)->iov[i].iov_len; \
} \
} else { \
msg->hdr.nbytes = (m)->count; \
} \
/* prep header for xmission */ \
MCA_OOB_TCP_HDR_HTON(&msg->hdr); \
@ -228,7 +232,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_op_t);
"%s:[%s:%d] post send to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT(&((ms)->peer))); \
ORTE_NAME_PRINT(&((ms)->dst))); \
mop = OBJ_NEW(mca_oob_tcp_msg_op_t); \
mop->mod = (m); \
mop->msg = (ms); \
@ -267,6 +271,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
proxy = (r); \
/* create a send object for this message */ \
snd = OBJ_NEW(mca_oob_tcp_send_t); \
mop->snd = snd; \
/* transfer and prep the header */ \
snd->hdr = proxy->hdr; \
MCA_OOB_TCP_HDR_HTON(&snd->hdr); \

Просмотреть файл

@ -112,9 +112,10 @@ ORTE_DECLSPEC extern orte_rml_component_t *orte_rml_component;
/* structure to send RML messages - used internally */
typedef struct {
opal_list_item_t super;
orte_process_name_t peer; // targeted recipient
int status; // returned status on send
orte_rml_tag_t tag; // targeted tag
orte_process_name_t dst; // targeted recipient
orte_process_name_t origin;
int status; // returned status on send
orte_rml_tag_t tag; // targeted tag
/* user's send callback functions and data */
union {
@ -128,6 +129,10 @@ typedef struct {
int count;
/* pointer to the user's buffer */
opal_buffer_t *buffer;
/* pointer to raw data for cross-transport
* transfers
*/
char *data;
} orte_rml_send_t;
OBJ_CLASS_DECLARATION(orte_rml_send_t);
@ -208,18 +213,18 @@ OBJ_CLASS_DECLARATION(orte_rml_recv_request_t);
opal_output_verbose(5, orte_rml_base_framework.framework_output, \
"%s-%s Send message complete at %s:%d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
ORTE_NAME_PRINT(&((m)->peer)), \
ORTE_NAME_PRINT(&((m)->dst)), \
__FILE__, __LINE__); \
if (NULL != (m)->iov) { \
if (NULL != (m)->cbfunc.iov) { \
(m)->cbfunc.iov((m)->status, \
&((m)->peer), \
&((m)->dst), \
(m)->iov, (m)->count, \
(m)->tag, (m)->cbdata); \
} \
} else { \
/* non-blocking buffer send */ \
(m)->cbfunc.buffer((m)->status, &((m)->peer), \
(m)->cbfunc.buffer((m)->status, &((m)->origin), \
(m)->buffer, \
(m)->tag, (m)->cbdata); \
} \

Просмотреть файл

@ -227,6 +227,7 @@ static void send_cons(orte_rml_send_t *ptr)
ptr->cbdata = NULL;
ptr->iov = NULL;
ptr->buffer = NULL;
ptr->data = NULL;
}
OBJ_CLASS_INSTANCE(orte_rml_send_t,
opal_list_item_t,

Просмотреть файл

@ -11,7 +11,8 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
* reserved.
* reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -94,7 +95,7 @@ static void send_self_exe(int fd, short args, void* data)
static void send_msg(int fd, short args, void *cbdata)
{
orte_rml_send_request_t *req = (orte_rml_send_request_t*)cbdata;
orte_process_name_t *peer = &(req->post.peer);
orte_process_name_t *peer = &(req->post.dst);
orte_rml_tag_t tag = req->post.tag;
orte_rml_recv_t *rcv;
orte_rml_send_t *snd;
@ -181,7 +182,8 @@ static void send_msg(int fd, short args, void *cbdata)
}
snd = OBJ_NEW(orte_rml_send_t);
snd->peer = *peer;
snd->dst = *peer;
snd->origin = *ORTE_PROC_MY_NAME;
snd->tag = tag;
if (NULL != req->post.iov) {
snd->iov = req->post.iov;
@ -223,7 +225,7 @@ int orte_rml_oob_send_nb(orte_process_name_t* peer,
* race conditions and threads
*/
req = OBJ_NEW(orte_rml_send_request_t);
req->post.peer = *peer;
req->post.dst = *peer;
req->post.iov = iov;
req->post.count = count;
req->post.tag = tag;
@ -261,7 +263,7 @@ int orte_rml_oob_send_buffer_nb(orte_process_name_t* peer,
* race conditions and threads
*/
req = OBJ_NEW(orte_rml_send_request_t);
req->post.peer = *peer;
req->post.dst = *peer;
req->post.buffer = buffer;
req->post.tag = tag;
req->post.cbfunc.buffer = cbfunc;