1
1
This commit was SVN r24370.
Этот коммит содержится в:
Ralph Castain 2011-02-12 16:52:03 +00:00
родитель 81222e1fe7
Коммит 33b68132cc
8 изменённых файлов: 117 добавлений и 32 удалений

Просмотреть файл

@ -25,7 +25,6 @@
#include "opal/mca/base/mca_base_param.h" #include "opal/mca/base/mca_base_param.h"
#include "opal/util/argv.h" #include "opal/util/argv.h"
#include "opal/util/if.h" #include "opal/util/if.h"
#include "opal/util/opal_sos.h"
#include "opal/class/opal_ring_buffer.h" #include "opal/class/opal_ring_buffer.h"
#include "opal/class/opal_list.h" #include "opal/class/opal_list.h"

Просмотреть файл

@ -12,7 +12,6 @@
#include "opal/mca/mca.h" #include "opal/mca/mca.h"
#include "opal/mca/base/base.h" #include "opal/mca/base/base.h"
#include "opal/util/opal_sos.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"

Просмотреть файл

@ -120,10 +120,10 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg)
goto cleanup; goto cleanup;
} }
/* if this is a heartbeat and I am not a daemon, then ignore it /* if this is a heartbeat and I am not a daemon or a scheduler, then ignore it
* to avoid swamping tools * to avoid swamping tools
*/ */
if (!ORTE_PROC_IS_DAEMON && ORTE_RMCAST_TAG_HEARTBEAT == tag) { if (!(ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_SCHEDULER) && ORTE_RMCAST_TAG_HEARTBEAT == tag) {
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv ignoring heartbeat", "%s rmcast:base:process_recv ignoring heartbeat",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -205,7 +205,7 @@ void orte_rmcast_base_process_msg(orte_rmcast_msg_t *msg)
(ORTE_RMCAST_SEQ_MAX == trkr->seq_num && 0 != recvd_seq_num)) { (ORTE_RMCAST_SEQ_MAX == trkr->seq_num && 0 != recvd_seq_num)) {
/* missing a message - request it */ /* missing a message - request it */
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output,
"%s Missing msg %d (%d) on channel %d from source %s", "%s Missed msg %d (%d) on channel %d from source %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recvd_seq_num, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recvd_seq_num,
trkr->seq_num, channel, ORTE_NAME_PRINT(&name))); trkr->seq_num, channel, ORTE_NAME_PRINT(&name)));
OBJ_CONSTRUCT(&alert, opal_buffer_t); OBJ_CONSTRUCT(&alert, opal_buffer_t);

Просмотреть файл

@ -77,6 +77,7 @@ typedef int32_t orte_rmcast_tag_t;
#define ORTE_RMCAST_TAG_COMMAND 12 #define ORTE_RMCAST_TAG_COMMAND 12
#define ORTE_RMCAST_TAG_ERRMGR 13 #define ORTE_RMCAST_TAG_ERRMGR 13
#define ORTE_RMCAST_TAG_UPDATE_STATE 14 #define ORTE_RMCAST_TAG_UPDATE_STATE 14
#define ORTE_RMCAST_TAG_TERMINATE 15
/* starting value for dynamically assignable tags */ /* starting value for dynamically assignable tags */
#define ORTE_RMCAST_TAG_DYNAMIC 100 #define ORTE_RMCAST_TAG_DYNAMIC 100

Просмотреть файл

@ -47,6 +47,7 @@ static bool init_completed = false;
static orte_job_t *daemons=NULL; static orte_job_t *daemons=NULL;
static bool comm_enabled = false; static bool comm_enabled = false;
static orte_thread_ctl_t ctl; static orte_thread_ctl_t ctl;
static opal_list_t tools;
/* LOCAL FUNCTIONS */ /* LOCAL FUNCTIONS */
static void recv_handler(int status, orte_process_name_t* sender, static void recv_handler(int status, orte_process_name_t* sender,
@ -169,6 +170,7 @@ static int init(void)
/* setup local ctl */ /* setup local ctl */
OBJ_CONSTRUCT(&ctl, orte_thread_ctl_t); OBJ_CONSTRUCT(&ctl, orte_thread_ctl_t);
OBJ_CONSTRUCT(&tools, opal_list_t);
/* setup the respective public address channel */ /* setup the respective public address channel */
if (ORTE_PROC_IS_TOOL) { if (ORTE_PROC_IS_TOOL) {
@ -180,7 +182,7 @@ static int init(void)
} }
orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels);
orte_rmcast_base.my_input_channel = NULL; orte_rmcast_base.my_input_channel = NULL;
} else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) { } else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_SCHEDULER) {
/* daemons and hnp open the sys and data server channels */ /* daemons and hnp open the sys and data server channels */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_SYS_CHANNEL, "system", if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_SYS_CHANNEL, "system",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
@ -274,6 +276,8 @@ static int init(void)
static void finalize(void) static void finalize(void)
{ {
opal_list_item_t *item;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: finalize called", "%s rmcast:tcp: finalize called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -286,6 +290,11 @@ static void finalize(void)
/* stop the processing thread */ /* stop the processing thread */
orte_rmcast_base_stop_threads(); orte_rmcast_base_stop_threads();
while (NULL != (item = opal_list_remove_first(&tools))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&tools);
OBJ_DESTRUCT(&ctl); OBJ_DESTRUCT(&ctl);
return; return;
} }
@ -306,15 +315,26 @@ static void disable_comm(void)
ORTE_RELEASE_THREAD(&ctl); ORTE_RELEASE_THREAD(&ctl);
} }
static void cbfunc(int status,
struct orte_process_name_t* peer,
struct opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata)
{
OBJ_RELEASE(buffer);
}
static int send_data(rmcast_base_send_t *snd, static int send_data(rmcast_base_send_t *snd,
orte_rmcast_channel_t channel) orte_rmcast_channel_t channel)
{ {
opal_list_item_t *item; opal_list_item_t *item, *next;
orte_proc_t *proc; orte_proc_t *proc;
orte_odls_child_t *child; orte_odls_child_t *child;
int rc, v; int rc, v;
opal_buffer_t *buf; opal_buffer_t *buf;
rmcast_base_channel_t *ch; rmcast_base_channel_t *ch;
orte_namelist_t *tool;
if (!comm_enabled) { if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED; return ORTE_ERR_COMM_DISABLED;
@ -353,10 +373,12 @@ static int send_data(rmcast_base_send_t *snd,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP))); ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
/* ignore errors */ /* ignore errors */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_MULTICAST, 0))) { OBJ_RETAIN(buf);
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) { if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc && ORTE_ERR_UNREACH != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
rc = ORTE_SUCCESS; /* don't confuse up-stream client */
} }
} else { } else {
/* if we don't already have it, get the daemon object */ /* if we don't already have it, get the daemon object */
@ -381,10 +403,30 @@ static int send_data(rmcast_base_send_t *snd,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name))); ORTE_NAME_PRINT(&proc->name)));
/* ignore errors */ /* ignore errors */
if (0 > (rc = orte_rml.send_buffer(&proc->name, buf, ORTE_RML_TAG_MULTICAST, 0))) { OBJ_RETAIN(buf);
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) { if (0 > (rc = orte_rml.send_buffer_nb(&proc->name, buf, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc && ORTE_ERR_UNREACH != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
rc = ORTE_SUCCESS; /* don't confuse up-stream client */
}
/* now send it to all attached tools */
item = opal_list_get_first(&tools);
while (item != opal_list_get_end(&tools)) {
tool = (orte_namelist_t*)item;
next = opal_list_get_next(item);
OBJ_RETAIN(buf);
if (0 > (rc = orte_rml.send_buffer_nb(&tool->name, buf, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc && ORTE_ERR_UNREACH != rc) {
ORTE_ERROR_LOG(rc);
}
opal_list_remove_item(&tools, item);
OBJ_RELEASE(item);
OBJ_RELEASE(buf);
rc = ORTE_SUCCESS; /* don't confuse up-stream client */
}
item = next;
} }
} }
} }
@ -401,10 +443,12 @@ static int send_data(rmcast_base_send_t *snd,
continue; continue;
} }
/* ignore errors */ /* ignore errors */
if (0 > (rc = orte_rml.send_buffer(child->name, buf, ORTE_RML_TAG_MULTICAST, 0))) { OBJ_RETAIN(buf);
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) { if (0 > (rc = orte_rml.send_buffer_nb(child->name, buf, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc && ORTE_ERR_UNREACH != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
rc = ORTE_SUCCESS; /* don't confuse up-stream client */
} }
} }
rc = ORTE_SUCCESS; rc = ORTE_SUCCESS;
@ -414,13 +458,10 @@ static int send_data(rmcast_base_send_t *snd,
"%s rmcast:tcp sending multicast to HNP %s", "%s rmcast:tcp sending multicast to HNP %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP))); ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_MULTICAST, 0))) { OBJ_RETAIN(buf);
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) { if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
ORTE_ERROR_LOG(rc); orte_errmgr.abort(rc, "%s Failed to send message to multicast channel %d",
/* didn't get the message out */
opal_output(0, "%s failed to send message to multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)ch->channel); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)ch->channel);
}
goto cleanup; goto cleanup;
} }
rc = ORTE_SUCCESS; rc = ORTE_SUCCESS;
@ -831,17 +872,33 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
static void process_msg(orte_rmcast_msg_t *msg) static void process_msg(orte_rmcast_msg_t *msg)
{ {
int rc; int rc;
opal_list_item_t *item; opal_list_item_t *item, *next;
int v; int v;
orte_proc_t *proc; orte_proc_t *proc;
orte_odls_child_t *child; orte_odls_child_t *child;
opal_buffer_t *buf;
orte_namelist_t *tool;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp processing message from %s", "%s rmcast:tcp processing message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->sender))); ORTE_NAME_PRINT(&msg->sender)));
buf = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(buf, msg->buf);
if (ORTE_PROC_IS_HNP) { if (ORTE_PROC_IS_HNP) {
/* if this message came from a different job family, then we have
* to track the sender so we can relay mcast messages to them as
* they won't be a member of the daemon job
*/
if (ORTE_JOB_FAMILY(msg->sender.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
tool = OBJ_NEW(orte_namelist_t);
tool->name.jobid = msg->sender.jobid;
tool->name.vpid = msg->sender.vpid;
opal_list_append(&tools, &tool->item);
}
/* if we don't already have it, get the daemon object */ /* if we don't already have it, get the daemon object */
if (NULL == daemons) { if (NULL == daemons) {
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
@ -868,12 +925,35 @@ static void process_msg(orte_rmcast_msg_t *msg)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name))); ORTE_NAME_PRINT(&proc->name)));
/* ignore errors */ /* ignore errors */
if (0 > (rc = orte_rml.send_buffer(&proc->name, msg->buf, ORTE_RML_TAG_MULTICAST, 0))) { OBJ_RETAIN(buf);
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) { if (0 > (rc = orte_rml.send_buffer_nb(&proc->name, buf, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc && ORTE_ERR_UNREACH != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
rc = ORTE_SUCCESS; /* don't confuse up-stream client */
}
/* now send it to all attached tools except whomever sent it to me, if applicable */
item = opal_list_get_first(&tools);
while (item != opal_list_get_end(&tools)) {
tool = (orte_namelist_t*)item;
next = opal_list_get_next(item);
if (msg->sender.jobid == tool->name.jobid &&
msg->sender.vpid == tool->name.vpid) {
item = next;
continue;
}
OBJ_RETAIN(buf);
if (0 > (rc = orte_rml.send_buffer_nb(&tool->name, buf, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc && ORTE_ERR_UNREACH != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
opal_list_remove_item(&tools, item);
OBJ_RELEASE(item);
OBJ_RELEASE(buf);
rc = ORTE_SUCCESS; /* don't confuse up-stream client */
}
item = next;
}
} }
} }
@ -900,13 +980,16 @@ static void process_msg(orte_rmcast_msg_t *msg)
ORTE_NAME_PRINT(child->name))); ORTE_NAME_PRINT(child->name)));
/* ignore errors */ /* ignore errors */
if (0 > (rc = orte_rml.send_buffer(child->name, msg->buf, ORTE_RML_TAG_MULTICAST, 0))) { OBJ_RETAIN(buf);
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) { if (0 > (rc = orte_rml.send_buffer_nb(child->name, buf, ORTE_RML_TAG_MULTICAST, 0, cbfunc, NULL))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc && ORTE_ERR_UNREACH != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
rc = ORTE_SUCCESS; /* don't confuse up-stream client */
} }
} }
} }
OBJ_RELEASE(buf);
/* now process it myself - this releases the msg */ /* now process it myself - this releases the msg */
orte_rmcast_base_process_msg(msg); orte_rmcast_base_process_msg(msg);

Просмотреть файл

@ -186,7 +186,7 @@ static int init(void)
} }
orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels); orte_rmcast_base.my_output_channel = (rmcast_base_channel_t*)opal_list_get_last(&orte_rmcast_base.channels);
orte_rmcast_base.my_input_channel = NULL; orte_rmcast_base.my_input_channel = NULL;
} else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) { } else if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_SCHEDULER) {
/* daemons and hnp open the sys and data server channels */ /* daemons and hnp open the sys and data server channels */
if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_SYS_CHANNEL, "system", if (ORTE_SUCCESS != (rc = open_channel(ORTE_RMCAST_SYS_CHANNEL, "system",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
@ -789,7 +789,6 @@ static void missed_msg(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag, opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata) void* cbdata)
{ {
opal_output(0, "%s RECVD MISSING MESSAGE", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_MULTICAST_MESSAGE_EVENT(sender, buffer); ORTE_MULTICAST_MESSAGE_EVENT(sender, buffer);
} }
@ -841,7 +840,8 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
"%s setup:channel activating recv event on fd %d", "%s setup:channel activating recv event on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),(int)chan->recv)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),(int)chan->recv));
opal_event_set(opal_event_base, &chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan); opal_event_set(opal_event_base, &chan->recv_ev, chan->recv,
OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
opal_event_add(&chan->recv_ev, 0); opal_event_add(&chan->recv_ev, 0);
} }

Просмотреть файл

@ -12,7 +12,6 @@
#include "opal/mca/base/base.h" #include "opal/mca/base/base.h"
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/mca/base/mca_base_param.h" #include "opal/mca/base/mca_base_param.h"
#include "opal/mca/event/event.h"
#include "orte/util/proc_info.h" #include "orte/util/proc_info.h"
#include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml.h"
@ -107,7 +106,7 @@ static int orte_rmcast_udp_close(void)
static int orte_rmcast_udp_query(mca_base_module_t **module, int *priority) static int orte_rmcast_udp_query(mca_base_module_t **module, int *priority)
{ {
/* selected by default */ /* selected by default */
*priority = 10; *priority = 100;
*module = (mca_base_module_t *) &orte_rmcast_udp_module; *module = (mca_base_module_t *) &orte_rmcast_udp_module;
initialized = true; initialized = true;

Просмотреть файл

@ -53,6 +53,8 @@ typedef uint32_t orte_proc_type_t;
#define ORTE_PROC_MPI 0x0020 #define ORTE_PROC_MPI 0x0020
#define ORTE_PROC_APP 0x0030 #define ORTE_PROC_APP 0x0030
#define ORTE_PROC_CM 0x0040 #define ORTE_PROC_CM 0x0040
#define ORTE_PROC_IOF_ENDPT 0x1000
#define ORTE_PROC_SCHEDULER 0x2000
#define ORTE_PROC_IS_SINGLETON (ORTE_PROC_SINGLETON & orte_process_info.proc_type) #define ORTE_PROC_IS_SINGLETON (ORTE_PROC_SINGLETON & orte_process_info.proc_type)
#define ORTE_PROC_IS_DAEMON (ORTE_PROC_DAEMON & orte_process_info.proc_type) #define ORTE_PROC_IS_DAEMON (ORTE_PROC_DAEMON & orte_process_info.proc_type)
@ -62,6 +64,8 @@ typedef uint32_t orte_proc_type_t;
#define ORTE_PROC_IS_MPI (ORTE_PROC_MPI & orte_process_info.proc_type) #define ORTE_PROC_IS_MPI (ORTE_PROC_MPI & orte_process_info.proc_type)
#define ORTE_PROC_IS_APP (ORTE_PROC_APP & orte_process_info.proc_type) #define ORTE_PROC_IS_APP (ORTE_PROC_APP & orte_process_info.proc_type)
#define ORTE_PROC_IS_CM (ORTE_PROC_CM & orte_process_info.proc_type) #define ORTE_PROC_IS_CM (ORTE_PROC_CM & orte_process_info.proc_type)
#define ORTE_PROC_IS_IOF_ENDPT (ORTE_PROC_IOF_ENDPT & orte_process_info.proc_type)
#define ORTE_PROC_IS_SCHEDULER (ORTE_PROC_SCHEDULER & orte_process_info.proc_type)
/** /**