Add a binomial routed module - for now, still completely wires up the daemons, but that will be changed later.
Modify grpcomm xcast so it now uses the selected routed module - eliminates cross-wiring of xcast and routing paths. Suboptimal at the moment, but better implementation is on its way. Cleanup ignore properties on the new routed components. This commit was SVN r18377.
Этот коммит содержится в:
родитель
4154e587de
Коммит
40904dd152
@ -45,18 +45,6 @@
|
||||
#include "grpcomm_basic.h"
|
||||
|
||||
|
||||
/* Local functions */
|
||||
static int xcast_binomial_tree(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag);
|
||||
|
||||
static int xcast_linear(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag);
|
||||
|
||||
static int xcast_direct(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag);
|
||||
static int find_parent(int rank, int parent, int me, int num_procs,
|
||||
int *num_children, opal_list_t *children);
|
||||
|
||||
@ -74,7 +62,6 @@ static int xcast(orte_jobid_t job,
|
||||
orte_rml_tag_t tag);
|
||||
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||
static int barrier(void);
|
||||
static opal_list_t* next_recips(orte_grpcomm_mode_t mode);
|
||||
static int daemon_collective(orte_jobid_t jobid,
|
||||
orte_std_cntr_t num_local_contributors,
|
||||
orte_grpcomm_coll_t type,
|
||||
@ -92,7 +79,6 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
||||
barrier,
|
||||
daemon_collective,
|
||||
update_trees,
|
||||
next_recips,
|
||||
orte_grpcomm_base_set_proc_attr,
|
||||
orte_grpcomm_base_get_proc_attr,
|
||||
orte_grpcomm_base_modex,
|
||||
@ -173,6 +159,8 @@ static int xcast(orte_jobid_t job,
|
||||
orte_rml_tag_t tag)
|
||||
{
|
||||
int rc = ORTE_SUCCESS;
|
||||
opal_buffer_t buf;
|
||||
orte_daemon_cmd_flag_t command;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:xcast sent to job %s tag %ld",
|
||||
@ -183,193 +171,25 @@ static int xcast(orte_jobid_t job,
|
||||
if (NULL == buffer) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:xcast: num_procs %ld linear xover: %ld binomial xover: %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)orte_process_info.num_procs,
|
||||
(long)orte_grpcomm_basic.xcast_linear_xover,
|
||||
(long)orte_grpcomm_basic.xcast_binomial_xover));
|
||||
|
||||
if (orte_abnormal_term_ordered) {
|
||||
/* We insist that the direct xcast mode be used when
|
||||
* an orted has failed as we cannot rely on alternative
|
||||
* methods to reach all orteds and/or procs
|
||||
*/
|
||||
rc = xcast_direct(job, buffer, tag);
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
/* now use the crossover points to select the proper transmission
|
||||
* mode. We have built-in default crossover points for this
|
||||
* decision tree, but the user is free to alter them as
|
||||
* they wish via MCA params
|
||||
*/
|
||||
|
||||
if (orte_process_info.num_procs < orte_grpcomm_basic.xcast_linear_xover) {
|
||||
rc = xcast_direct(job, buffer, tag);
|
||||
} else if (orte_process_info.num_procs < orte_grpcomm_basic.xcast_binomial_xover) {
|
||||
rc = xcast_linear(job, buffer, tag);
|
||||
} else {
|
||||
rc = xcast_binomial_tree(job, buffer, tag);
|
||||
}
|
||||
|
||||
DONE:
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int xcast_binomial_tree(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag)
|
||||
{
|
||||
orte_daemon_cmd_flag_t command;
|
||||
orte_grpcomm_mode_t mode;
|
||||
int rc;
|
||||
opal_buffer_t *buf;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:entering xcast_binomial",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* binomial xcast can only go through the daemons as app procs are
|
||||
* not allowed to relay messages.
|
||||
* first, need to pack the msg and be sure to include routing info so it
|
||||
* can properly be sent through the daemons
|
||||
*/
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
/* tell the daemon to process and relay */
|
||||
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* tell the daemon the routing algorithm this xmission is using */
|
||||
mode = ORTE_GRPCOMM_BINOMIAL;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* if this isn't intended for the daemon command tag, then we better
|
||||
* tell the daemon to deliver it to the procs, and what job is supposed
|
||||
* to get it - this occurs when a caller just wants to send something
|
||||
* to all the procs in a job. In that use-case, the caller doesn't know
|
||||
* anything about inserting daemon commands or what routing algo might
|
||||
* be used, so we have to help them out a little. Functions that are
|
||||
* sending commands to the daemons themselves are smart enough to know
|
||||
* what they need to do.
|
||||
*/
|
||||
if (ORTE_RML_TAG_DAEMON != tag) {
|
||||
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
|
||||
/* copy the payload into the new buffer - this is non-destructive, so our
|
||||
* caller is still responsible for releasing any memory in the buffer they
|
||||
* gave to us
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:xcast_binomial: buffer size %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)buf->bytes_used));
|
||||
|
||||
/* setup a buffer to handle the xcast command */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
/* all we need to do is send this to the HNP - the relay logic
|
||||
* will ensure everyone else gets it!
|
||||
* will ensure everyone else gets it! So tell the HNP to
|
||||
* process and relay it. The HNP will use the routed.get_routing_tree
|
||||
* to find out who it should relay the message to.
|
||||
*/
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:xcast_binomial: sending %s => %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
|
||||
|
||||
/* if I am the HNP, just set things up so the cmd processor gets called.
|
||||
* We don't want to message ourselves as this can create circular logic
|
||||
* in the RML. Instead, this macro will set a zero-time event which will
|
||||
* cause the buffer to be processed by the cmd processor - probably will
|
||||
* fire right away, but that's okay
|
||||
* The macro makes a copy of the buffer, so it's okay to release it here
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||
} else {
|
||||
/* otherwise, send it to the HNP for relay */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
OBJ_RELEASE(buf);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:xcast_binomial: completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int xcast_linear(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag)
|
||||
{
|
||||
int rc;
|
||||
opal_buffer_t *buf;
|
||||
orte_daemon_cmd_flag_t command;
|
||||
orte_grpcomm_mode_t mode;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:entering xcast_linear",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* since we have to pack some additional info into the buffer to be
|
||||
* sent to the daemons, we create a new buffer into which we will
|
||||
* put the intermediate payload - i.e., the info that goes to the
|
||||
* daemon. This buffer will contain all the info needed by the
|
||||
* daemon, plus the payload intended for the processes themselves
|
||||
*/
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
/* if we are an application proc, then send this to our HNP so
|
||||
* we don't try to talk to every daemon directly ourselves. This
|
||||
* is necessary since we don't know how many daemons there are!
|
||||
*
|
||||
* Likewise, a daemon who is not the HNP will also let the HNP
|
||||
* act as the relay to avoid opening unnecessary connections
|
||||
* and rattling messages around the system if daemons are not
|
||||
* fully connected
|
||||
*/
|
||||
|
||||
/* tell the HNP to relay */
|
||||
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* tell the HNP the routing algorithm this xmission is using */
|
||||
mode = ORTE_GRPCOMM_LINEAR;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
|
||||
/* pack the target jobid and tag for use in relay */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
@ -385,15 +205,15 @@ static int xcast_linear(orte_jobid_t job,
|
||||
*/
|
||||
if (ORTE_RML_TAG_DAEMON != tag) {
|
||||
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
@ -403,16 +223,11 @@ static int xcast_linear(orte_jobid_t job,
|
||||
* caller is still responsible for releasing any memory in the buffer they
|
||||
* gave to us
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:xcast_linear: buffer size %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)buf->bytes_used));
|
||||
|
||||
/* if I am the HNP, just set things up so the cmd processor gets called.
|
||||
* We don't want to message ourselves as this can create circular logic
|
||||
* in the RML. Instead, this macro will set a zero-time event which will
|
||||
@ -421,342 +236,21 @@ static int xcast_linear(orte_jobid_t job,
|
||||
* The macro makes a copy of the buffer, so it's okay to release it here
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||
} else {
|
||||
/* otherwise, send it to the HNP for relay */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
CLEANUP:
|
||||
/* release the buffer */
|
||||
OBJ_RELEASE(buf);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:xcast_linear: completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int relay_via_hnp(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag) {
|
||||
opal_buffer_t *buf;
|
||||
orte_daemon_cmd_flag_t command;
|
||||
orte_grpcomm_mode_t mode;
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm: relaying buffer to HNP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* since we have to pack some additional info into the buffer
|
||||
* for this case, we create a new buffer into to contain all the
|
||||
* info needed plus the payload
|
||||
*/
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
/* start by telling the HNP to relay */
|
||||
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* default to the LINEAR mode since this is equivalent to
|
||||
* DIRECT for daemons
|
||||
*/
|
||||
mode = ORTE_GRPCOMM_LINEAR;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* if the target isn't the daemon tag, then we have to add the proper
|
||||
* command so the daemon's know what to do
|
||||
*/
|
||||
if (ORTE_RML_TAG_DAEMON != tag) {
|
||||
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
/* copy the payload into the new buffer - this is non-destructive, so our
|
||||
* caller is still responsible for releasing any memory in the buffer they
|
||||
* gave to us
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
|
||||
CLEANUP:
|
||||
OBJ_RELEASE(buf);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm: buffer relayed to HNP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int xcast_direct(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag)
|
||||
{
|
||||
int rc;
|
||||
orte_process_name_t peer;
|
||||
orte_vpid_t i, num_targets=0;
|
||||
opal_buffer_t *buf=NULL, *bfr=buffer;
|
||||
orte_daemon_cmd_flag_t command;
|
||||
orte_rml_tag_t target=tag;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm: entering xcast_direct",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if I am applicaton proc */
|
||||
if (!orte_process_info.hnp &&
|
||||
!orte_process_info.daemon &&
|
||||
!orte_process_info.tool) {
|
||||
/* if this is going to some job other
|
||||
* than my own, then we have to send it via the HNP as I have
|
||||
* no way of knowing how many procs are in the other job.
|
||||
*/
|
||||
if (ORTE_PROC_MY_NAME->jobid != job) {
|
||||
if (ORTE_SUCCESS != (rc = relay_via_hnp(job, buffer, tag))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* if it is my jobid, then we can just send this ourselves -
|
||||
* set the target tag
|
||||
*/
|
||||
target = tag;
|
||||
/* set number of procs to the #procs in our job */
|
||||
num_targets = orte_process_info.num_procs;
|
||||
/* point to the right buffer */
|
||||
bfr = buffer;
|
||||
/* go to send it */
|
||||
goto SEND;
|
||||
}
|
||||
|
||||
/* if I am a daemon */
|
||||
if (orte_process_info.daemon) {
|
||||
/* if this is going to another job, then I have to relay
|
||||
* it through the HNP as I have no idea how many procs
|
||||
* are in that job
|
||||
*/
|
||||
if (ORTE_PROC_MY_NAME->jobid != job) {
|
||||
if (ORTE_SUCCESS != (rc = relay_via_hnp(job, buffer, tag))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* if this is going to the daemon job to
|
||||
* someplace other than the daemon cmd processor, then I need to add
|
||||
* a command to the buffer so the recipient daemons know what to do
|
||||
*/
|
||||
if (ORTE_RML_TAG_DAEMON != tag) {
|
||||
/* setup a buffer to handle the additional info */
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
/* add the proper command so the daemon's know what to do */
|
||||
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* copy the payload into the new buffer - this is non-destructive, so our
|
||||
* caller is still responsible for releasing any memory in the buffer they
|
||||
* gave to us
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* point to correct buffer to be sent */
|
||||
bfr = buf;
|
||||
/* send this to the daemon tag so it gets processed correctly */
|
||||
target = ORTE_RML_TAG_DAEMON;
|
||||
/* set the number of targets to be the number of daemons */
|
||||
num_targets = orte_process_info.num_procs;
|
||||
/* send it */
|
||||
goto SEND;
|
||||
}
|
||||
}
|
||||
|
||||
/* if I am the HNP */
|
||||
if (orte_process_info.hnp) {
|
||||
orte_job_t *jdata;
|
||||
|
||||
/* if this is going to the daemon job */
|
||||
if (ORTE_PROC_MY_NAME->jobid == job) {
|
||||
/* if this is going someplace other than the daemon cmd
|
||||
* processor, then I need to add a command to the buffer
|
||||
* so the recipient daemons know what to do
|
||||
*/
|
||||
if (ORTE_RML_TAG_DAEMON != tag) {
|
||||
/* since we have to pack some additional info into the buffer
|
||||
* for this case, we create a new buffer to contain all the
|
||||
* info needed plus the payload
|
||||
*/
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
/* if the target isn't the daemon tag, then we have to add the proper
|
||||
* command so the daemon's know what to do
|
||||
*/
|
||||
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* copy the payload into the new buffer - this is non-destructive, so our
|
||||
* caller is still responsible for releasing any memory in the buffer they
|
||||
* gave to us
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* point to correct buffer to be sent */
|
||||
bfr = buf;
|
||||
/* send this to the daemon tag so it gets processed correctly */
|
||||
target = ORTE_RML_TAG_DAEMON;
|
||||
/* set the number of targets to be the number of daemons */
|
||||
num_targets = orte_process_info.num_procs;
|
||||
/* send it */
|
||||
goto SEND;
|
||||
} else {
|
||||
/* if already going to the daemon tag, then just point to
|
||||
* the right places and send it
|
||||
*/
|
||||
bfr = buffer;
|
||||
target = tag;
|
||||
num_targets = orte_process_info.num_procs;
|
||||
goto SEND;
|
||||
}
|
||||
}
|
||||
/* if this is going to any other job,
|
||||
* then I need to know the number of procs in that job so I can
|
||||
* send it
|
||||
*/
|
||||
if (NULL == (jdata = orte_get_job_data_object(job))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* set the number of targets */
|
||||
num_targets = jdata->num_procs;
|
||||
/* set the tag */
|
||||
target = tag;
|
||||
/* point to correct buffer to be sent */
|
||||
bfr = buffer;
|
||||
/* send it */
|
||||
goto SEND;
|
||||
}
|
||||
|
||||
|
||||
SEND:
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s xcast_direct: buffer size %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)buffer->bytes_used));
|
||||
|
||||
peer.jobid = job;
|
||||
for(i=0; i<num_targets; i++) {
|
||||
peer.vpid = i;
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s xcast_direct: %s => %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer)));
|
||||
|
||||
/* if I am the HNP, just set things up so the cmd processor gets called.
|
||||
* We don't want to message ourselves as this can create circular logic
|
||||
* in the RML. Instead, this macro will set a zero-time event which will
|
||||
* cause the buffer to be processed by the cmd processor - probably will
|
||||
* fire right away, but that's okay
|
||||
* The macro makes a copy of the buffer, so it's okay to release it later
|
||||
*/
|
||||
if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
|
||||
peer.vpid == ORTE_PROC_MY_NAME->vpid &&
|
||||
orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, bfr, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||
} else {
|
||||
if (0 > (rc = orte_rml.send_buffer(&peer, bfr, target, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
}
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
|
||||
CLEANUP:
|
||||
/* release buf if used */
|
||||
if (NULL != buf) {
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm: xcast_direct completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static opal_list_t* next_recips(orte_grpcomm_mode_t mode)
|
||||
{
|
||||
/* check the mode to select the proper algo */
|
||||
switch (mode) {
|
||||
case ORTE_GRPCOMM_CHAIN:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
|
||||
return NULL;
|
||||
break;
|
||||
case ORTE_GRPCOMM_BINOMIAL:
|
||||
return my_children;
|
||||
break;
|
||||
case ORTE_GRPCOMM_LINEAR:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
|
||||
return NULL;
|
||||
break;
|
||||
default:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return NULL;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static bool barrier_recvd;
|
||||
static bool barrier_timer;
|
||||
|
@ -54,8 +54,6 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||
|
||||
static int allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||
|
||||
static opal_list_t* next_recips(orte_grpcomm_mode_t mode);
|
||||
|
||||
static int set_proc_attr(const char *attr_name,
|
||||
const void *data,
|
||||
size_t size);
|
||||
@ -86,7 +84,6 @@ orte_grpcomm_base_module_t orte_grpcomm_cnos_module = {
|
||||
orte_grpcomm_cnos_barrier,
|
||||
daemon_collective,
|
||||
update_trees,
|
||||
next_recips,
|
||||
set_proc_attr,
|
||||
get_proc_attr,
|
||||
modex,
|
||||
@ -177,12 +174,6 @@ static int update_trees(void)
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static opal_list_t* next_recips(orte_grpcomm_mode_t mode)
|
||||
{
|
||||
/* nothing to do here */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int set_proc_attr(const char *attr_name,
|
||||
const void *data,
|
||||
size_t size)
|
||||
|
@ -86,9 +86,6 @@ typedef int (*orte_grpcomm_base_module_daemon_collective_fn_t)(orte_jobid_t jobi
|
||||
*/
|
||||
typedef int (*orte_grpcomm_base_module_update_trees_fn_t)(void);
|
||||
|
||||
/* for collectives, return next recipients in the chain */
|
||||
typedef opal_list_t* (*orte_gprcomm_base_module_next_recipients_fn_t)(orte_grpcomm_mode_t mode);
|
||||
|
||||
/** DATA EXCHANGE FUNCTIONS - SEE ompi/runtime/ompi_module_exchange.h FOR A DESCRIPTION
|
||||
* OF HOW THIS ALL WORKS
|
||||
*/
|
||||
@ -122,7 +119,6 @@ struct orte_grpcomm_base_module_2_0_0_t {
|
||||
orte_grpcomm_base_module_barrier_fn_t barrier;
|
||||
orte_grpcomm_base_module_daemon_collective_fn_t daemon_collective;
|
||||
orte_grpcomm_base_module_update_trees_fn_t update_trees;
|
||||
orte_gprcomm_base_module_next_recipients_fn_t next_recipients;
|
||||
/* modex functions */
|
||||
orte_grpcomm_base_module_modex_set_proc_attr_fn_t set_proc_attr;
|
||||
orte_grpcomm_base_module_modex_get_proc_attr_fn_t get_proc_attr;
|
||||
|
@ -234,6 +234,8 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto REPORT_ERROR;
|
||||
}
|
||||
/* update the routing tree */
|
||||
orte_routed.update_routing_tree();
|
||||
|
||||
/* unpack the #bytes of daemon wireup info in the message */
|
||||
cnt=1;
|
||||
|
@ -419,7 +419,10 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.update_trees())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_routing_tree())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
/* if a tree-launch was underway, clear out the cmd */
|
||||
if (NULL != orte_tree_launch_cmd) {
|
||||
OBJ_RELEASE(orte_tree_launch_cmd);
|
||||
|
36
orte/mca/routed/binomial/Makefile.am
Обычный файл
36
orte/mca/routed/binomial/Makefile.am
Обычный файл
@ -0,0 +1,36 @@
|
||||
#
|
||||
# Copyright (c) 2007 Los Alamos National Security, LLC.
|
||||
# All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
sources = \
|
||||
routed_binomial.h \
|
||||
routed_binomial.c \
|
||||
routed_binomial_component.c
|
||||
|
||||
# Make the output library in this directory, and name it either
|
||||
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
||||
# (for static builds).
|
||||
|
||||
if OMPI_BUILD_routed_binomial_DSO
|
||||
component_noinst =
|
||||
component_install = mca_routed_binomial.la
|
||||
else
|
||||
component_noinst = libmca_routed_binomial.la
|
||||
component_install =
|
||||
endif
|
||||
|
||||
mcacomponentdir = $(pkglibdir)
|
||||
mcacomponent_LTLIBRARIES = $(component_install)
|
||||
mca_routed_binomial_la_SOURCES = $(sources)
|
||||
mca_routed_binomial_la_LDFLAGS = -module -avoid-version
|
||||
|
||||
noinst_LTLIBRARIES = $(component_noinst)
|
||||
libmca_routed_binomial_la_SOURCES = $(sources)
|
||||
libmca_routed_binomial_la_LDFLAGS = -module -avoid-version
|
||||
|
14
orte/mca/routed/binomial/configure.params
Обычный файл
14
orte/mca/routed/binomial/configure.params
Обычный файл
@ -0,0 +1,14 @@
|
||||
# -*- shell-script -*-
|
||||
#
|
||||
# Copyright (c) 2007 Los Alamos National Security, LLC.
|
||||
# All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
# Specific to this module
|
||||
|
||||
PARAM_CONFIG_FILES="Makefile"
|
751
orte/mca/routed/binomial/routed_binomial.c
Обычный файл
751
orte/mca/routed/binomial/routed_binomial.c
Обычный файл
@ -0,0 +1,751 @@
|
||||
/*
|
||||
* Copyright (c) 2007 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/runtime/opal_progress.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
#include "opal/util/bit_ops.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/grpcomm/grpcomm.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/odls/odls_types.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
|
||||
#include "orte/mca/routed/base/base.h"
|
||||
#include "routed_binomial.h"
|
||||
|
||||
static int init(void);
|
||||
static int finalize(void);
|
||||
static int update_route(orte_process_name_t *target,
|
||||
orte_process_name_t *route);
|
||||
static orte_process_name_t get_route(orte_process_name_t *target);
|
||||
static int init_routes(orte_jobid_t job, opal_buffer_t *ndat);
|
||||
static int route_lost(const orte_process_name_t *route);
|
||||
static bool route_is_defined(const orte_process_name_t *target);
|
||||
static int update_routing_tree(void);
|
||||
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children);
|
||||
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf);
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
static int binomial_ft_event(int state);
|
||||
#endif
|
||||
|
||||
static orte_process_name_t *lifeline=NULL;
|
||||
static orte_process_name_t my_parent;
|
||||
static int num_children;
|
||||
static opal_list_t my_children;
|
||||
|
||||
orte_routed_module_t orte_routed_binomial_module = {
|
||||
init,
|
||||
finalize,
|
||||
update_route,
|
||||
get_route,
|
||||
init_routes,
|
||||
route_lost,
|
||||
route_is_defined,
|
||||
update_routing_tree,
|
||||
get_routing_tree,
|
||||
get_wireup_info,
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
binomial_ft_event
|
||||
#else
|
||||
NULL
|
||||
#endif
|
||||
};
|
||||
|
||||
/* local globals */
|
||||
static opal_hash_table_t peer_list;
|
||||
static opal_hash_table_t vpid_wildcard_list;
|
||||
static orte_process_name_t wildcard_route;
|
||||
static opal_condition_t cond;
|
||||
static opal_mutex_t lock;
|
||||
|
||||
|
||||
static int init(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&peer_list, opal_hash_table_t);
|
||||
opal_hash_table_init(&peer_list, 128);
|
||||
|
||||
OBJ_CONSTRUCT(&vpid_wildcard_list, opal_hash_table_t);
|
||||
opal_hash_table_init(&vpid_wildcard_list, 128);
|
||||
|
||||
wildcard_route.jobid = ORTE_NAME_INVALID->jobid;
|
||||
wildcard_route.vpid = ORTE_NAME_INVALID->vpid;
|
||||
|
||||
/* setup the global condition and lock */
|
||||
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
||||
OBJ_CONSTRUCT(&lock, opal_mutex_t);
|
||||
|
||||
lifeline = NULL;
|
||||
|
||||
/* setup the list of children */
|
||||
OBJ_CONSTRUCT(&my_children, opal_list_t);
|
||||
num_children = 0;
|
||||
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int finalize(void)
|
||||
{
|
||||
int rc;
|
||||
uint64_t key;
|
||||
void * value, *node, *next_node;
|
||||
opal_list_item_t *item;
|
||||
|
||||
/* if I am an application process, indicate that I am
|
||||
* truly finalizing prior to departure
|
||||
*/
|
||||
if (!orte_process_info.hnp &&
|
||||
!orte_process_info.daemon &&
|
||||
!orte_process_info.tool) {
|
||||
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync(false))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
/* if I am the HNP, I need to stop the comm recv */
|
||||
if (orte_process_info.hnp) {
|
||||
orte_routed_base_comm_stop();
|
||||
}
|
||||
|
||||
/* don't destruct the routes until *after* we send the
|
||||
* sync as the oob will be asking us how to route
|
||||
* the message!
|
||||
*/
|
||||
rc = opal_hash_table_get_first_key_uint64(&peer_list,
|
||||
&key, &value, &node);
|
||||
while(OPAL_SUCCESS == rc) {
|
||||
if(NULL != value) {
|
||||
free(value);
|
||||
}
|
||||
rc = opal_hash_table_get_next_key_uint64(&peer_list,
|
||||
&key, &value, node, &next_node);
|
||||
node = next_node;
|
||||
}
|
||||
OBJ_DESTRUCT(&peer_list);
|
||||
OBJ_DESTRUCT(&vpid_wildcard_list);
|
||||
/* destruct the global condition and lock */
|
||||
OBJ_DESTRUCT(&cond);
|
||||
OBJ_DESTRUCT(&lock);
|
||||
|
||||
lifeline = NULL;
|
||||
|
||||
/* deconstruct the list of children */
|
||||
while (NULL != (item = opal_list_remove_first(&my_children))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&my_children);
|
||||
num_children = 0;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int update_route(orte_process_name_t *target,
|
||||
orte_process_name_t *route)
|
||||
{
|
||||
int rc;
|
||||
orte_process_name_t * route_copy;
|
||||
|
||||
if (target->jobid == ORTE_JOBID_INVALID ||
|
||||
target->vpid == ORTE_VPID_INVALID) {
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_binomial_update: %s --> %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(target),
|
||||
ORTE_NAME_PRINT(route)));
|
||||
|
||||
|
||||
/* if I am an application process, we don't update the route unless
|
||||
* the conditions dictate it. This is done to avoid creating large
|
||||
* hash tables when they aren't needed
|
||||
*/
|
||||
if (!orte_process_info.hnp && !orte_process_info.daemon &&
|
||||
!orte_process_info.tool) {
|
||||
/* if the route is the daemon, then do nothing - we already route
|
||||
* everything through the daemon anyway
|
||||
*/
|
||||
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, route,
|
||||
ORTE_PROC_MY_DAEMON)) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if this is for my own job family, then do nothing - we -always- route
|
||||
* our own job family through the daemons
|
||||
*/
|
||||
if (ORTE_JOB_FAMILY(target->jobid) == ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
route_copy = malloc(sizeof(orte_process_name_t));
|
||||
*route_copy = *route;
|
||||
/* exact match */
|
||||
if (target->jobid != ORTE_JOBID_WILDCARD &&
|
||||
target->vpid != ORTE_VPID_WILDCARD) {
|
||||
rc = opal_hash_table_set_value_uint64(&peer_list,
|
||||
orte_util_hash_name(target), route_copy);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* vpid wildcard */
|
||||
if (target->jobid != ORTE_JOBID_WILDCARD &&
|
||||
target->vpid == ORTE_VPID_WILDCARD) {
|
||||
rc = opal_hash_table_set_value_uint32(&vpid_wildcard_list,
|
||||
target->jobid, route_copy);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
free(route_copy);
|
||||
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
|
||||
static orte_process_name_t get_route(orte_process_name_t *target)
|
||||
{
|
||||
orte_process_name_t *ret;
|
||||
int rc;
|
||||
|
||||
/* if it is me, then the route is just direct */
|
||||
if (OPAL_EQUAL == opal_dss.compare(ORTE_PROC_MY_NAME, target, ORTE_NAME)) {
|
||||
ret = target;
|
||||
goto found;
|
||||
}
|
||||
|
||||
/* check exact matches */
|
||||
rc = opal_hash_table_get_value_uint64(&peer_list,
|
||||
orte_util_hash_name(target), (void**)&ret);
|
||||
if (ORTE_SUCCESS == rc) {
|
||||
/* got a good result - return it */
|
||||
goto found;
|
||||
}
|
||||
|
||||
/* didn't find an exact match - check to see if a route for this job was defined */
|
||||
rc = opal_hash_table_get_value_uint32(&vpid_wildcard_list,
|
||||
target->jobid, (void**)&ret);
|
||||
if (ORTE_SUCCESS == rc) {
|
||||
/* got a good result - return it */
|
||||
goto found;
|
||||
}
|
||||
|
||||
/* default to wildcard route */
|
||||
ret = &wildcard_route;
|
||||
|
||||
found:
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial_get(%s) --> %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(target),
|
||||
ORTE_NAME_PRINT(ret)));
|
||||
|
||||
return *ret;
|
||||
}
|
||||
|
||||
static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
|
||||
{
|
||||
orte_proc_t **procs;
|
||||
orte_job_t *jdata;
|
||||
orte_std_cntr_t cnt;
|
||||
char *rml_uri;
|
||||
orte_process_name_t name;
|
||||
int rc;
|
||||
|
||||
/* lookup the job object for this process */
|
||||
if (NULL == (jdata = orte_get_job_data_object(job))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
procs = (orte_proc_t**)jdata->procs->addr;
|
||||
|
||||
/* unpack the data for each entry */
|
||||
cnt = 1;
|
||||
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial:callback got uri %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == rml_uri) ? "NULL" : rml_uri));
|
||||
|
||||
if (rml_uri == NULL) continue;
|
||||
|
||||
/* we don't need to set the contact info into our rml
|
||||
* hash table as we won't talk to the proc directly
|
||||
*/
|
||||
|
||||
/* extract the proc's name */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(rml_uri);
|
||||
continue;
|
||||
}
|
||||
/* the procs are stored in vpid order, so update the record */
|
||||
procs[name.vpid]->rml_uri = strdup(rml_uri);
|
||||
free(rml_uri);
|
||||
|
||||
/* update the proc state */
|
||||
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
|
||||
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
|
||||
}
|
||||
|
||||
++jdata->num_reported;
|
||||
cnt = 1;
|
||||
}
|
||||
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if all procs have reported, update our job state */
|
||||
if (jdata->num_reported == jdata->num_procs) {
|
||||
/* update the job state */
|
||||
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
|
||||
jdata->state = ORTE_JOB_STATE_RUNNING;
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
{
|
||||
/* the binomial module routes all proc communications through
|
||||
* the local daemon. Daemons must identify which of their
|
||||
* daemon-peers is "hosting" the specified recipient and
|
||||
* route the message to that daemon. Daemon contact info
|
||||
* is handled elsewhere, so all we need to do here is
|
||||
* ensure that the procs are told to route through their
|
||||
* local daemon, and that daemons are told how to route
|
||||
* for each proc
|
||||
*/
|
||||
int rc;
|
||||
|
||||
/* if I am a tool, then I stand alone - there is nothing to do */
|
||||
if (orte_process_info.tool) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if I am a daemon or HNP, then I have to extract the routing info for this job
|
||||
* from the data sent to me for launch and update the routing tables to
|
||||
* point at the daemon for each proc
|
||||
*/
|
||||
if (orte_process_info.daemon) {
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_binomial: init routes for daemon job %s\n\thnp_uri %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(job),
|
||||
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri));
|
||||
|
||||
if (NULL == ndat) {
|
||||
/* indicates this is being called during orte_init.
|
||||
* Get the HNP's name for possible later use
|
||||
*/
|
||||
if (NULL == orte_process_info.my_hnp_uri) {
|
||||
/* fatal error */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
/* set the contact info into the hash table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return(rc);
|
||||
}
|
||||
|
||||
/* extract the hnp name and store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
|
||||
ORTE_PROC_MY_HNP, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if ndat is NULL, then this is being called during init,
|
||||
* so just seed the routing table with a path back to the HNP...
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = update_route(ORTE_PROC_MY_HNP, ORTE_PROC_MY_HNP))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* set the wildcard route for anybody whose name we don't recognize
|
||||
* to be the HNP
|
||||
*/
|
||||
wildcard_route.jobid = ORTE_PROC_MY_HNP->jobid;
|
||||
wildcard_route.vpid = ORTE_PROC_MY_HNP->vpid;
|
||||
|
||||
/* set our lifeline to the the HNP - we will abort if that connection is lost */
|
||||
lifeline = ORTE_PROC_MY_HNP;
|
||||
|
||||
/* daemons will send their contact info back to the HNP as
|
||||
* part of the message confirming they are read to go. HNP's
|
||||
* load their contact info during orte_init
|
||||
*/
|
||||
} else {
|
||||
/* ndat != NULL means we are getting an update of RML info
|
||||
* for the daemons - so update our contact info and routes
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial: completed init routes",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
if (orte_process_info.hnp) {
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_binomial: init routes for HNP job %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(job)));
|
||||
|
||||
if (NULL == ndat) {
|
||||
/* if ndat is NULL, then this is being called during init, so just
|
||||
* make myself available to catch any reported contact info
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_routed_base_comm_start())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* the HNP has no lifeline */
|
||||
lifeline = NULL;
|
||||
} else {
|
||||
/* if this is for my own jobid, then I am getting an update of RML info
|
||||
* for the daemons - so update our contact info and routes
|
||||
*/
|
||||
if (ORTE_PROC_MY_NAME->jobid == job) {
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
} else {
|
||||
/* if not, then I need to process the callback */
|
||||
if (ORTE_SUCCESS != (rc = process_callback(job, ndat))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
{ /* MUST BE A PROC */
|
||||
/* if ndat != NULL, then this is being invoked by the proc to
|
||||
* init a route to a specified process that is outside of our
|
||||
* job family. We want that route to go through our HNP, routed via
|
||||
* out local daemon - however, we cannot know for
|
||||
* certain that the HNP already knows how to talk to the specified
|
||||
* procs. For example, in OMPI's publish/subscribe procedures, the
|
||||
* DPM framework looks for an mca param containing the global ompi-server's
|
||||
* uri. This info will come here so the proc can setup a route to
|
||||
* the server - we need to pass the routing info to our HNP
|
||||
*/
|
||||
if (NULL != ndat) {
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_binomial: init routes w/non-NULL data",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* send the buffer to the proper tag on the daemon */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
|
||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* we already have defined our routes to everyone to
|
||||
* be through the local daemon, so nothing further to do
|
||||
*/
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if ndat=NULL, then we are being called during orte_init. In this
|
||||
* case, we need to setup a few critical pieces of info
|
||||
*/
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_binomial: init routes for proc job %s\n\thnp_uri %s\n\tdaemon uri %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job),
|
||||
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri,
|
||||
(NULL == orte_process_info.my_daemon_uri) ? "NULL" : orte_process_info.my_daemon_uri));
|
||||
|
||||
if (NULL == orte_process_info.my_daemon_uri) {
|
||||
/* in this module, we absolutely MUST have this information - if
|
||||
* we didn't get it, then error out
|
||||
*/
|
||||
opal_output(0, "%s ERROR: Failed to identify the local daemon's URI",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
opal_output(0, "%s ERROR: This is a fatal condition when the binomial router",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
opal_output(0, "%s ERROR: has been selected - either select the unity router",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
opal_output(0, "%s ERROR: or ensure that the local daemon info is provided",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
|
||||
/* we have to set the HNP's name, even though we won't route messages directly
|
||||
* to it. This is required to ensure that we -do- send messages to the correct
|
||||
* HNP name
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
|
||||
ORTE_PROC_MY_HNP, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Set the contact info in the RML - this won't actually establish
|
||||
* the connection, but just tells the RML how to reach the daemon
|
||||
* if/when we attempt to send to it
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_daemon_uri))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return(rc);
|
||||
}
|
||||
/* extract the daemon's name so we can update the routing table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
|
||||
ORTE_PROC_MY_DAEMON, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* setup the route to all other procs to flow through the daemon */
|
||||
wildcard_route.jobid = ORTE_PROC_MY_DAEMON->jobid;
|
||||
wildcard_route.vpid = ORTE_PROC_MY_DAEMON->vpid;
|
||||
|
||||
/* set our lifeline to the local daemon - we will abort if this connection is lost */
|
||||
lifeline = ORTE_PROC_MY_DAEMON;
|
||||
|
||||
/* register ourselves -this sends a message to the daemon (warming up that connection)
|
||||
* and sends our contact info to the HNP when all local procs have reported
|
||||
*
|
||||
* NOTE: it may seem odd that we send our contact info to the HNP - after all,
|
||||
* the HNP doesn't really need to know how to talk to us directly if we are
|
||||
* using this routing method. However, this is good for two reasons:
|
||||
*
|
||||
* (1) some debuggers and/or tools may need RML contact
|
||||
* info to set themselves up
|
||||
*
|
||||
* (2) doing so allows the HNP to "block" in a dynamic launch
|
||||
* until all procs are reported running, thus ensuring that no communication
|
||||
* is attempted until the overall ORTE system knows how to talk to everyone -
|
||||
* otherwise, the system can just hang.
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync(true))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* no answer is expected or coming */
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
static int route_lost(const orte_process_name_t *route)
|
||||
{
|
||||
/* if we lose the connection to the lifeline and we are NOT already,
|
||||
* in finalize, tell the OOB to abort.
|
||||
* NOTE: we cannot call abort from here as the OOB needs to first
|
||||
* release a thread-lock - otherwise, we will hang!!
|
||||
*/
|
||||
if (!orte_finalizing &&
|
||||
NULL != lifeline &&
|
||||
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, route, lifeline)) {
|
||||
opal_output(0, "%s routed:binomial: Connection to lifeline %s lost",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(lifeline));
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
|
||||
/* we don't care about this one, so return success */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/******* stub functions - to be implemented ******/
|
||||
static bool route_is_defined(const orte_process_name_t *target)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/*************************************/
|
||||
|
||||
static int binomial_tree(int rank, int parent, int me, int num_procs)
|
||||
{
|
||||
int i, bitmap, peer, hibit, mask, found;
|
||||
orte_namelist_t *child;
|
||||
|
||||
/* is this me? */
|
||||
if (me == rank) {
|
||||
bitmap = opal_cube_dim(num_procs);
|
||||
|
||||
hibit = opal_hibit(rank, bitmap);
|
||||
--bitmap;
|
||||
|
||||
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
|
||||
peer = rank | mask;
|
||||
if (peer < num_procs) {
|
||||
child = OBJ_NEW(orte_namelist_t);
|
||||
child->name.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
child->name.vpid = peer;
|
||||
OPAL_OUTPUT_VERBOSE((3, orte_routed_base_output,
|
||||
"%s routed:binomial found child %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&child->name)));
|
||||
|
||||
opal_list_append(&my_children, &child->item);
|
||||
num_children++;
|
||||
}
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((3, orte_routed_base_output,
|
||||
"%s routed:binomial found parent %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
parent));
|
||||
return parent;
|
||||
}
|
||||
|
||||
/* find the children of this rank */
|
||||
bitmap = opal_cube_dim(num_procs);
|
||||
|
||||
hibit = opal_hibit(rank, bitmap);
|
||||
--bitmap;
|
||||
|
||||
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
|
||||
peer = rank | mask;
|
||||
if (peer < num_procs) {
|
||||
/* execute compute on this child */
|
||||
if (0 <= (found = binomial_tree(peer, rank, me, num_procs))) {
|
||||
return found;
|
||||
}
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int update_routing_tree(void)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
|
||||
/* clear the list of children if any are already present */
|
||||
while (NULL != (item = opal_list_remove_first(&my_children))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
num_children = 0;
|
||||
|
||||
/* recompute the tree */
|
||||
my_parent.vpid = binomial_tree(0, 0, ORTE_PROC_MY_NAME->vpid,
|
||||
orte_process_info.num_procs);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static orte_vpid_t get_routing_tree(orte_jobid_t job,
|
||||
opal_list_t *children)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
orte_namelist_t *nm, *child;
|
||||
|
||||
/* the binomial routing tree always goes to our children,
|
||||
* for any job
|
||||
*/
|
||||
for (item = opal_list_get_first(&my_children);
|
||||
item != opal_list_get_end(&my_children);
|
||||
item = opal_list_get_next(item)) {
|
||||
child = (orte_namelist_t*)item;
|
||||
nm = OBJ_NEW(orte_namelist_t);
|
||||
nm->name.jobid = child->name.jobid;
|
||||
nm->name.vpid = child->name.vpid;
|
||||
opal_list_append(children, &nm->item);
|
||||
}
|
||||
/* return my parent's vpid */
|
||||
return my_parent.vpid;
|
||||
}
|
||||
|
||||
|
||||
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* if we are not using static ports, then we need to share the
|
||||
* comm info - otherwise, just return
|
||||
*/
|
||||
if (orte_static_ports) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buf);
|
||||
return rc;
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
static int binomial_ft_event(int state)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
/******** Checkpoint Prep ********/
|
||||
if(OPAL_CRS_CHECKPOINT == state) {
|
||||
}
|
||||
/******** Continue Recovery ********/
|
||||
else if (OPAL_CRS_CONTINUE == state ) {
|
||||
}
|
||||
/******** Restart Recovery ********/
|
||||
else if (OPAL_CRS_RESTART == state ) {
|
||||
/*
|
||||
* Re-exchange the routes
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_routed.init_routes(ORTE_PROC_MY_NAME->jobid, NULL))) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
else if (OPAL_CRS_TERM == state ) {
|
||||
/* Nothing */
|
||||
}
|
||||
else {
|
||||
/* Error state = Nothing */
|
||||
}
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
}
|
||||
#endif
|
||||
|
27
orte/mca/routed/binomial/routed_binomial.h
Обычный файл
27
orte/mca/routed/binomial/routed_binomial.h
Обычный файл
@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright (c) 2007 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef MCA_ROUTED_BINOMIAL_H
|
||||
#define MCA_ROUTED_BINOMIAL_H
|
||||
|
||||
#include "orte_config.h"
|
||||
#include "orte/types.h"
|
||||
|
||||
#include "orte/mca/routed/routed.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
ORTE_MODULE_DECLSPEC extern orte_routed_component_t mca_routed_binomial_component;
|
||||
|
||||
extern orte_routed_module_t orte_routed_binomial_module;
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
64
orte/mca/routed/binomial/routed_binomial_component.c
Обычный файл
64
orte/mca/routed/binomial/routed_binomial_component.c
Обычный файл
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright (c) 2007 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
#include "orte/types.h"
|
||||
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
#include "opal/mca/base/base.h"
|
||||
#include "opal/mca/base/mca_base_param.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
#include "orte/mca/routed/base/base.h"
|
||||
#include "routed_binomial.h"
|
||||
|
||||
static orte_routed_module_t* routed_binomial_init(int* priority);
|
||||
|
||||
/**
|
||||
* component definition
|
||||
*/
|
||||
orte_routed_component_t mca_routed_binomial_component = {
|
||||
/* First, the mca_base_component_t struct containing meta
|
||||
information about the component itself */
|
||||
|
||||
{
|
||||
/* Indicate that we are a rml v1.0.0 component (which also
|
||||
implies a specific MCA version) */
|
||||
|
||||
ORTE_ROUTED_BASE_VERSION_1_0_0,
|
||||
|
||||
"binomial", /* MCA component name */
|
||||
ORTE_MAJOR_VERSION, /* MCA component major version */
|
||||
ORTE_MINOR_VERSION, /* MCA component minor version */
|
||||
ORTE_RELEASE_VERSION, /* MCA component release version */
|
||||
NULL,
|
||||
NULL
|
||||
},
|
||||
|
||||
/* Next the MCA v1.0.0 component meta data */
|
||||
{
|
||||
/* This component can be checkpointed */
|
||||
MCA_BASE_METADATA_PARAM_CHECKPOINT
|
||||
},
|
||||
routed_binomial_init
|
||||
};
|
||||
|
||||
static orte_routed_module_t*
|
||||
routed_binomial_init(int* priority)
|
||||
{
|
||||
*priority = 70;
|
||||
|
||||
return &orte_routed_binomial_module;
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat);
|
||||
static int route_lost(const orte_process_name_t *route);
|
||||
static bool route_is_defined(const orte_process_name_t *target);
|
||||
static int update_routing_tree(void);
|
||||
static orte_vpid_t get_routing_tree(opal_list_t *children);
|
||||
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children);
|
||||
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf);
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
@ -609,19 +609,41 @@ static bool route_is_defined(const orte_process_name_t *target)
|
||||
return false;
|
||||
}
|
||||
|
||||
/*************************************/
|
||||
|
||||
|
||||
static int update_routing_tree(void)
|
||||
{
|
||||
/* nothing to do here as the routing tree is fixed */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static orte_vpid_t get_routing_tree(opal_list_t *children)
|
||||
static orte_vpid_t get_routing_tree(orte_jobid_t job,
|
||||
opal_list_t *children)
|
||||
{
|
||||
orte_namelist_t *nm;
|
||||
|
||||
/* for anyone other than the HNP, the direct routing
|
||||
* does not go anywhere - we don't relay - and our
|
||||
* parent is the HNP
|
||||
*/
|
||||
if (!orte_process_info.hnp) {
|
||||
return ORTE_PROC_MY_HNP->vpid;
|
||||
}
|
||||
|
||||
/* if we are the HNP, then the direct routing tree
|
||||
* consists of every process in the job - indicate that by
|
||||
* adding a proc name of the jobid and a wildcard vpid. The
|
||||
* HNP is capable of looking up the vpid range for this job
|
||||
*/
|
||||
nm = OBJ_NEW(orte_namelist_t);
|
||||
nm->name.jobid = job;
|
||||
nm->name.vpid = ORTE_VPID_WILDCARD;
|
||||
opal_list_append(children, &nm->item);
|
||||
/* the parent of the HNP is invalid */
|
||||
return ORTE_VPID_INVALID;
|
||||
}
|
||||
|
||||
/*************************************/
|
||||
|
||||
|
||||
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf)
|
||||
{
|
||||
int rc;
|
||||
|
@ -39,7 +39,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat);
|
||||
static int route_lost(const orte_process_name_t *route);
|
||||
static bool route_is_defined(const orte_process_name_t *target);
|
||||
static int update_routing_tree(void);
|
||||
static orte_vpid_t get_routing_tree(opal_list_t *children);
|
||||
static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children);
|
||||
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf);
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
@ -603,18 +603,40 @@ static bool route_is_defined(const orte_process_name_t *target)
|
||||
return false;
|
||||
}
|
||||
|
||||
/*************************************/
|
||||
|
||||
|
||||
static int update_routing_tree(void)
|
||||
{
|
||||
/* nothing to do here as the routing tree is fixed */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static orte_vpid_t get_routing_tree(opal_list_t *children)
|
||||
static orte_vpid_t get_routing_tree(orte_jobid_t job,
|
||||
opal_list_t *children)
|
||||
{
|
||||
orte_namelist_t *nm;
|
||||
|
||||
/* for anyone other than the HNP, the linear routing
|
||||
* does not go anywhere - we don't relay - and our
|
||||
* parent is the HNP
|
||||
*/
|
||||
if (!orte_process_info.hnp) {
|
||||
return ORTE_PROC_MY_HNP->vpid;
|
||||
}
|
||||
|
||||
/* if we are the HNP, then the linear routing tree
|
||||
* consists of every daemon - indicate that by
|
||||
* adding a proc name of our jobid and a wildcard vpid
|
||||
*/
|
||||
nm = OBJ_NEW(orte_namelist_t);
|
||||
nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
nm->name.vpid = ORTE_VPID_WILDCARD;
|
||||
opal_list_append(children, &nm->item);
|
||||
/* the parent of the HNP is invalid */
|
||||
return ORTE_VPID_INVALID;
|
||||
}
|
||||
|
||||
/*************************************/
|
||||
|
||||
|
||||
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf)
|
||||
{
|
||||
|
@ -57,7 +57,7 @@ orte_routed_component_t mca_routed_linear_component = {
|
||||
static orte_routed_module_t*
|
||||
routed_linear_init(int* priority)
|
||||
{
|
||||
*priority = 70;
|
||||
*priority = 40;
|
||||
|
||||
return &orte_routed_linear_module;
|
||||
}
|
||||
|
@ -213,7 +213,8 @@ typedef int (*orte_routed_module_update_routing_tree_fn_t)(void);
|
||||
* Fills the provided list with the direct children of this process
|
||||
* in the routing tree, and returns the vpid of the parent
|
||||
*/
|
||||
typedef orte_vpid_t (*orte_routed_module_get_routing_tree_fn_t)(opal_list_t *children);
|
||||
typedef orte_vpid_t (*orte_routed_module_get_routing_tree_fn_t)(orte_jobid_t job,
|
||||
opal_list_t *children);
|
||||
|
||||
/**
|
||||
* Handle fault tolerance updates
|
||||
|
@ -99,85 +99,161 @@ static void send_callback(int status, orte_process_name_t *peer,
|
||||
static void send_relay(int fd, short event, void *data)
|
||||
{
|
||||
orte_message_event_t *mev = (orte_message_event_t*)data;
|
||||
opal_buffer_t *buffer = mev->buffer;
|
||||
opal_buffer_t *buffer=NULL;
|
||||
orte_rml_tag_t tag = mev->tag;
|
||||
orte_grpcomm_mode_t relay_mode;
|
||||
opal_list_t *recips;
|
||||
orte_jobid_t target_job = mev->sender.jobid;
|
||||
opal_list_t recips;
|
||||
opal_list_item_t *item;
|
||||
orte_namelist_t *nm;
|
||||
int ret;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* we pass the relay_mode in the mev "tag" field. This is a bit
|
||||
* of a hack as the two sizes may not exactly match. However, since
|
||||
* the rml_tag is an int32, it is doubtful we will ever see a
|
||||
* truncation error
|
||||
*/
|
||||
relay_mode = (orte_grpcomm_mode_t)tag;
|
||||
/* get the list of next recipients from the routed module */
|
||||
OBJ_CONSTRUCT(&recips, opal_list_t);
|
||||
/* ignore returned parent vpid - we don't care here */
|
||||
orte_routed.get_routing_tree(target_job, &recips);
|
||||
|
||||
if (ORTE_GRPCOMM_LINEAR == relay_mode) {
|
||||
orte_process_name_t dummy;
|
||||
orte_vpid_t i;
|
||||
|
||||
/* if we are NOT the HNP, do nothing */
|
||||
if (!orte_process_info.hnp) {
|
||||
/* if list is empty, nothing for us to do */
|
||||
if (opal_list_is_empty(&recips)) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay - recipient list is empty!",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* get the first recipient so we can look at it */
|
||||
item = opal_list_get_first(&recips);
|
||||
nm = (orte_namelist_t*)item;
|
||||
|
||||
/* check to see if this message is going directly to the
|
||||
* target jobid and that jobid is not my own
|
||||
*/
|
||||
if (nm->name.jobid == target_job &&
|
||||
target_job != ORTE_PROC_MY_NAME->jobid) {
|
||||
orte_daemon_cmd_flag_t command;
|
||||
orte_jobid_t job;
|
||||
orte_rml_tag_t msg_tag;
|
||||
int32_t n;
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay sending directly to job %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(nm->name.jobid)));
|
||||
/* this is going directly to the job, and not being
|
||||
* relayed any further. We need to remove the process-and-relay
|
||||
* command and the target jobid/tag from the buffer so the
|
||||
* recipients can correctly process it
|
||||
*/
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(mev->buffer, &command, &n, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay linear mode with %d procs",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)orte_process_info.num_procs));
|
||||
|
||||
/* if the mode is linear and we are the HNP, don't ask for
|
||||
* next recipients as this will generate a potentially very
|
||||
* long list! Instead, just send the message to each daemon
|
||||
* as fast as we can - but not to us!
|
||||
*/
|
||||
dummy.jobid = ORTE_PROC_MY_HNP->jobid;
|
||||
for (i=1; i < orte_process_info.num_procs; i++) {
|
||||
dummy.vpid = i;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay sending relay msg to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&dummy)));
|
||||
|
||||
if (0 > (ret = orte_rml.send_buffer(&dummy, buffer, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(mev->buffer, &job, &n, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(mev->buffer, &msg_tag, &n, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* if this isn't going to the daemon tag, then we have more to extract */
|
||||
if (ORTE_RML_TAG_DAEMON != tag) {
|
||||
/* remove the message_local_procs cmd data */
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(mev->buffer, &command, &n, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(mev->buffer, &job, &n, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(mev->buffer, &msg_tag, &n, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
goto CLEANUP;
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
opal_dss.copy_payload(buffer, mev->buffer);
|
||||
} else {
|
||||
/* buffer is already setup - just point to it */
|
||||
buffer = mev->buffer;
|
||||
/* tag needs to be set to daemon_tag */
|
||||
tag = ORTE_RML_TAG_DAEMON;
|
||||
}
|
||||
|
||||
/* ask the active grpcomm module for the next recipients */
|
||||
if (NULL == (recips = orte_grpcomm.next_recipients(relay_mode))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* send the message - do not deconstruct the list! it doesn't belong
|
||||
* to us
|
||||
|
||||
/* if the list has only one entry, and that entry has a wildcard
|
||||
* vpid, then we will handle it separately
|
||||
*/
|
||||
for (item = opal_list_get_first(recips);
|
||||
item != opal_list_get_end(recips);
|
||||
item = opal_list_get_next(item)) {
|
||||
orte_namelist_t *target = (orte_namelist_t*)item;
|
||||
if (1 == opal_list_get_size(&recips) && nm->name.vpid == ORTE_VPID_WILDCARD) {
|
||||
/* okay, this is a wildcard case. First, look up the #procs in the
|
||||
* specified job - only the HNP can do this. Fortunately, the routed
|
||||
* modules are smart enough not to ask a remote daemon to do it!
|
||||
* However, just to be safe, in case some foolish future developer
|
||||
* doesn't get that logic right... ;-)
|
||||
*/
|
||||
orte_job_t *jdata;
|
||||
orte_vpid_t i;
|
||||
orte_process_name_t target;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay sending relay msg to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&target->name)));
|
||||
|
||||
if (0 > (ret = orte_rml.send_buffer(&target->name, buffer, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
if (!orte_process_info.hnp) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
ret = ORTE_ERR_NOT_FOUND;
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* send the buffer to all members of the specified job */
|
||||
target.jobid = nm->name.jobid;
|
||||
for (i=0; i < jdata->num_procs; i++) {
|
||||
if (target.jobid == ORTE_PROC_MY_NAME->jobid &&
|
||||
i == ORTE_PROC_MY_NAME->vpid) {
|
||||
/* do not send to myself! */
|
||||
continue;
|
||||
}
|
||||
|
||||
target.vpid = i;
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay sending relay msg to %s tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&target), tag));
|
||||
if (0 > (ret = orte_rml.send_buffer(&target, buffer, tag, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* send the message to each recipient on list, deconstructing it as we go */
|
||||
while (NULL != (item = opal_list_remove_first(&recips))) {
|
||||
nm = (orte_namelist_t*)item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay sending relay msg to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&nm->name)));
|
||||
|
||||
if (0 > (ret = orte_rml.send_buffer(&nm->name, buffer, tag, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
/* cleanup */
|
||||
OBJ_DESTRUCT(&recips);
|
||||
if (NULL != buffer && buffer != mev->buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
}
|
||||
OBJ_RELEASE(mev);
|
||||
}
|
||||
|
||||
@ -209,11 +285,12 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
|
||||
void orte_daemon_cmd_processor(int fd, short event, void *data)
|
||||
{
|
||||
orte_message_event_t *mev = (orte_message_event_t*)data;
|
||||
orte_process_name_t *sender = &(mev->sender);
|
||||
orte_process_name_t *sender = &(mev->sender), target;
|
||||
opal_buffer_t *buffer = mev->buffer;
|
||||
orte_rml_tag_t tag = mev->tag;
|
||||
orte_rml_tag_t tag = mev->tag, target_tag;
|
||||
orte_jobid_t job;
|
||||
int ret;
|
||||
char *unpack_ptr;
|
||||
char *unpack_ptr, *save;
|
||||
orte_std_cntr_t n;
|
||||
orte_daemon_cmd_flag_t command;
|
||||
|
||||
@ -237,47 +314,35 @@ void orte_daemon_cmd_processor(int fd, short event, void *data)
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* see if this is a "process-and-relay" command */
|
||||
/* see if this is a "process-and-relay" command - i.e., an xcast is underway */
|
||||
if (ORTE_DAEMON_PROCESS_AND_RELAY_CMD == command) {
|
||||
opal_buffer_t relay_buf;
|
||||
orte_grpcomm_mode_t relay_mode;
|
||||
orte_rml_tag_t relay_tag;
|
||||
|
||||
/* unpack the routing mode in use */
|
||||
/* get the target jobid and tag */
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &relay_mode, &n, ORTE_GRPCOMM_MODE))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job, &n, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* we are going to use the message event's tag field to pass the relay mode
|
||||
* see comment in send_relay
|
||||
*/
|
||||
relay_tag = (orte_rml_tag_t)relay_mode;
|
||||
/* initialize the relay buffer */
|
||||
OBJ_CONSTRUCT(&relay_buf, opal_buffer_t);
|
||||
/* tell the downstream daemons to process-and-relay */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&relay_buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto PROCESS;
|
||||
}
|
||||
/* tell the downstream daemons the routing algo in use */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&relay_buf, &relay_mode, 1, ORTE_GRPCOMM_MODE))) {
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &target_tag, &n, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* copy the message payload to the relay buffer - this is non-destructive */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(&relay_buf, buffer))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto PROCESS;
|
||||
}
|
||||
/* let the send_relay function know the target jobid */
|
||||
target.jobid = job;
|
||||
target.vpid = ORTE_VPID_INVALID; /* irrelevant, but better than random */
|
||||
/* save this buffer location */
|
||||
save = buffer->unpack_ptr;
|
||||
/* rewind the buffer so we can relay it correctly */
|
||||
buffer->unpack_ptr = unpack_ptr;
|
||||
/* setup an event to actually perform the relay */
|
||||
ORTE_MESSAGE_EVENT(sender, &relay_buf, relay_tag, send_relay);
|
||||
ORTE_MESSAGE_EVENT(&target, buffer, target_tag, send_relay);
|
||||
/* rewind the buffer to the right place for processing */
|
||||
buffer->unpack_ptr = save;
|
||||
} else {
|
||||
/* rewind the buffer so we can process it correctly */
|
||||
buffer->unpack_ptr = unpack_ptr;
|
||||
}
|
||||
|
||||
PROCESS:
|
||||
/* process the command */
|
||||
if (ORTE_SUCCESS != (ret = process_commands(sender, buffer, tag))) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
|
@ -265,6 +265,11 @@ int orte_util_decode_nodemap(opal_byte_object_t *bo, opal_pointer_array_t *nodes
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if we are a daemon or the HNP, update our num_procs */
|
||||
if (orte_process_info.hnp || orte_process_info.daemon) {
|
||||
orte_process_info.num_procs = num_nodes;
|
||||
}
|
||||
|
||||
/* set the size of the nidmap storage so we minimize
|
||||
* realloc's
|
||||
*/
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user