Fix xcast so it works in multi-node situations where the user specifies a particular mode to use (e.g., direct).
This commit was SVN r17682.
Этот коммит содержится в:
родитель
841d0e5208
Коммит
ffa232687a
@ -71,7 +71,6 @@ static int xcast(orte_jobid_t job,
|
||||
{
|
||||
int rc = ORTE_SUCCESS;
|
||||
struct timeval start, stop;
|
||||
orte_vpid_t num_daemons;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s xcast sent to job %s tag %ld",
|
||||
@ -87,32 +86,15 @@ static int xcast(orte_jobid_t job,
|
||||
gettimeofday(&start, NULL);
|
||||
}
|
||||
|
||||
/* application procs do not know how many daemons are currently in
|
||||
* the system. If we tell them that number at startup, then it might
|
||||
* well be inaccurate if any dynamic spawns have occurred. To avoid
|
||||
* the problem, have all application procs solely use the binomial
|
||||
* xcast so the first message just goes to the HNP who then propagates
|
||||
* it from there
|
||||
*/
|
||||
if (!orte_process_info.hnp && !orte_process_info.daemon) {
|
||||
rc = xcast_binomial_tree(job, buffer, tag);
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
/* if we are the HNP or a daemon, then the num_procs field in our
|
||||
* process_info struct contains the active number of daemons in the
|
||||
* system - use it to decide what xcast algo to use
|
||||
*/
|
||||
num_daemons = orte_process_info.num_procs;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s xcast: num_daemons %ld linear xover: %ld binomial xover: %ld",
|
||||
"%s xcast: num_procs %ld linear xover: %ld binomial xover: %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)num_daemons, (long)orte_grpcomm_basic.xcast_linear_xover,
|
||||
(long)orte_process_info.num_procs,
|
||||
(long)orte_grpcomm_basic.xcast_linear_xover,
|
||||
(long)orte_grpcomm_basic.xcast_binomial_xover));
|
||||
|
||||
if (num_daemons < 2 || orte_daemon_died) {
|
||||
/* if there is only one daemon in the system, then we must
|
||||
if (orte_process_info.num_procs < 2 || orte_daemon_died) {
|
||||
/* if there is only one proc in the system, then we must
|
||||
* use the direct mode - there is no other option. Note that
|
||||
* since the HNP is the one that typically does xcast sends,
|
||||
* only one daemon means that the HNP is sending to
|
||||
@ -138,9 +120,9 @@ static int xcast(orte_jobid_t job,
|
||||
* they wish via MCA params
|
||||
*/
|
||||
|
||||
if (num_daemons < orte_grpcomm_basic.xcast_linear_xover) {
|
||||
if (orte_process_info.num_procs < orte_grpcomm_basic.xcast_linear_xover) {
|
||||
rc = xcast_direct(job, buffer, tag);
|
||||
} else if (num_daemons < orte_grpcomm_basic.xcast_binomial_xover) {
|
||||
} else if (orte_process_info.num_procs < orte_grpcomm_basic.xcast_binomial_xover) {
|
||||
rc = xcast_linear(job, buffer, tag);
|
||||
} else {
|
||||
rc = xcast_binomial_tree(job, buffer, tag);
|
||||
@ -170,11 +152,9 @@ static int xcast_binomial_tree(orte_jobid_t job,
|
||||
"%s xcast_binomial",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* this is the HNP end, so it starts the procedure. Since the HNP is always the
|
||||
* vpid=0 at this time, we take advantage of that fact to figure out who we
|
||||
* should send this to on the first step
|
||||
*/
|
||||
/* need to pack the msg for sending - be sure to include routing info so it
|
||||
/* binomial xcast can only go through the daemons as app procs are
|
||||
* not allowed to relay messages.
|
||||
* first, need to pack the msg and be sure to include routing info so it
|
||||
* can properly be sent through the daemons
|
||||
*/
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
@ -274,6 +254,7 @@ static int xcast_linear(orte_jobid_t job,
|
||||
orte_daemon_cmd_flag_t command;
|
||||
orte_vpid_t i, range;
|
||||
orte_process_name_t dummy;
|
||||
orte_grpcomm_mode_t mode;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s xcast_linear",
|
||||
@ -286,6 +267,26 @@ static int xcast_linear(orte_jobid_t job,
|
||||
* daemon, plus the payload intended for the processes themselves
|
||||
*/
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
/* if we are an application proc, then send this to our HNP so
|
||||
* we don't try to talk to every daemon directly ourselves. This
|
||||
* is necessary since we don't know how many daemons there are!
|
||||
*/
|
||||
if (!orte_process_info.hnp && !orte_process_info.daemon) {
|
||||
/* we are an application proc */
|
||||
/* tell the HNP to relay */
|
||||
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* tell the daemon the routing algorithm this xmission is using */
|
||||
mode = ORTE_GRPCOMM_LINEAR;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
|
||||
/* if this isn't intended for the daemon command tag, then we better
|
||||
* tell the daemon to deliver it to the procs, and what job is supposed
|
||||
@ -326,7 +327,17 @@ static int xcast_linear(orte_jobid_t job,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)buf->bytes_used));
|
||||
|
||||
/* get the number of daemons out there */
|
||||
/* if we are not a daemon or the HNP, then just send this to the HNP */
|
||||
if (!orte_process_info.hnp && !orte_process_info.daemon) {
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* if we are a daemon or the HNP, get the number of daemons out there */
|
||||
range = orte_process_info.num_procs;
|
||||
|
||||
/* we have to account for all of the messages we are about to send
|
||||
@ -358,9 +369,7 @@ static int xcast_linear(orte_jobid_t job,
|
||||
rc = ORTE_SUCCESS;
|
||||
|
||||
CLEANUP:
|
||||
/* need to release the buffer so that the reference count will be correct
|
||||
* when the cb function releases it
|
||||
*/
|
||||
/* release the buffer */
|
||||
OBJ_RELEASE(buf);
|
||||
return rc;
|
||||
}
|
||||
@ -371,34 +380,126 @@ static int xcast_direct(orte_jobid_t job,
|
||||
{
|
||||
int rc;
|
||||
orte_process_name_t peer;
|
||||
orte_vpid_t i, vpid;
|
||||
orte_job_t *jdata;
|
||||
|
||||
orte_vpid_t i;
|
||||
opal_buffer_t *buf=NULL, *bfr=buffer;
|
||||
orte_daemon_cmd_flag_t command;
|
||||
orte_grpcomm_mode_t mode;
|
||||
orte_rml_tag_t target=tag;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s xcast_direct",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* need to get the job peers so we know who to send the message to */
|
||||
if (NULL == (jdata = orte_get_job_data_object(job))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
/* if I am applicaton proc and this is going to some job other
|
||||
* than my own, then we have to send it via the daemons as the proc would have
|
||||
* no way of knowing how many procs are in the other job.
|
||||
*/
|
||||
if (ORTE_PROC_MY_NAME->jobid != job &&
|
||||
!orte_process_info.hnp && !orte_process_info.daemon) {
|
||||
/* since we have to pack some additional info into the buffer
|
||||
* for this case, we create a new buffer into to contain all the
|
||||
* info needed plus the payload
|
||||
*/
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
/* I have to send this to the HNP for handling as I have no idea
|
||||
* how many recipients there are! start by telling the HNP to relay
|
||||
*/
|
||||
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* default to the LINEAR mode since this is equivalent to
|
||||
* DIRECT for daemons
|
||||
*/
|
||||
mode = ORTE_GRPCOMM_LINEAR;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* if the target isn't the daemon tag, then we have to add the proper
|
||||
* command so the daemon's know what to do
|
||||
*/
|
||||
if (ORTE_RML_TAG_DAEMON != tag) {
|
||||
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
/* copy the payload into the new buffer - this is non-destructive, so our
|
||||
* caller is still responsible for releasing any memory in the buffer they
|
||||
* gave to us
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
goto CLEANUP;
|
||||
}
|
||||
vpid = jdata->num_procs;
|
||||
|
||||
/* if I am a daemon or the HNP and this is going to the daemon job to
|
||||
* someplace other than the daemon cmd processor, then I need to add
|
||||
* a command to the buffer so the recipient daemons know what to do
|
||||
*/
|
||||
if ((orte_process_info.hnp || orte_process_info.daemon) &&
|
||||
ORTE_PROC_MY_NAME->jobid == job &&
|
||||
ORTE_RML_TAG_DAEMON != tag) {
|
||||
/* since we have to pack some additional info into the buffer
|
||||
* for this case, we create a new buffer into to contain all the
|
||||
* info needed plus the payload
|
||||
*/
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
/* if the target isn't the daemon tag, then we have to add the proper
|
||||
* command so the daemon's know what to do
|
||||
*/
|
||||
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* copy the payload into the new buffer - this is non-destructive, so our
|
||||
* caller is still responsible for releasing any memory in the buffer they
|
||||
* gave to us
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* point to correct buffer to be sent */
|
||||
bfr = buf;
|
||||
/* send this to the daemon tag so it gets processed correctly */
|
||||
target = ORTE_RML_TAG_DAEMON;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s xcast_direct: buffer size %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)buffer->bytes_used));
|
||||
|
||||
/* we have to account for all of the messages we are about to send
|
||||
* because the non-blocking send can come back almost immediately - before
|
||||
* we would get the chance to increment the num_active. This causes us
|
||||
* to not correctly wakeup and reset the xcast_in_progress flag
|
||||
*/
|
||||
|
||||
peer.jobid = job;
|
||||
for(i=0; i<vpid; i++) {
|
||||
for(i=0; i<orte_process_info.num_procs; i++) {
|
||||
peer.vpid = i;
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s xcast_direct: %s => %s",
|
||||
@ -406,22 +507,32 @@ static int xcast_direct(orte_jobid_t job,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer)));
|
||||
|
||||
/* if the target is the HNP and I am the HNP, then just setup to call the cmd processor */
|
||||
/* if I am the HNP, just set things up so the cmd processor gets called.
|
||||
* We don't want to message ourselves as this can create circular logic
|
||||
* in the RML. Instead, this macro will set a zero-time event which will
|
||||
* cause the buffer to be processed by the cmd processor - probably will
|
||||
* fire right away, but that's okay
|
||||
* The macro makes a copy of the buffer, so it's okay to release it later
|
||||
*/
|
||||
if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
|
||||
peer.vpid == ORTE_PROC_MY_NAME->vpid &&
|
||||
orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buffer, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, bfr, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||
} else {
|
||||
if (0 > (rc = orte_rml.send_buffer(&peer, buffer, tag, 0))) {
|
||||
if (0 > (rc = orte_rml.send_buffer(&peer, bfr, target, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
}
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
|
||||
CLEANUP:
|
||||
/* nothing to cleanup here */
|
||||
/* release buf if used */
|
||||
if (NULL != buf) {
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -731,6 +842,32 @@ static int binomial_recips(opal_list_t *names)
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int linear_recips(opal_list_t *names)
|
||||
{
|
||||
orte_namelist_t *target;
|
||||
orte_vpid_t i;
|
||||
|
||||
/* if we are not the HNP, we just return - only
|
||||
* the HNP sends in this mode
|
||||
*/
|
||||
if (!orte_process_info.hnp) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if we are the HNP, then just add the names of
|
||||
* all daemons to the list
|
||||
*/
|
||||
for (i=1; i < orte_process_info.num_procs; i++) {
|
||||
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
target->name.vpid = i;
|
||||
opal_list_append(names, &target->item);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode)
|
||||
{
|
||||
int rc;
|
||||
@ -743,6 +880,9 @@ static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode)
|
||||
case ORTE_GRPCOMM_BINOMIAL:
|
||||
rc = binomial_recips(names);
|
||||
break;
|
||||
case ORTE_GRPCOMM_LINEAR:
|
||||
rc = linear_recips(names);
|
||||
break;
|
||||
default:
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
|
@ -51,6 +51,8 @@ typedef uint8_t orte_grpcomm_mode_t;
|
||||
#define ORTE_GRPCOMM_CHAIN (orte_grpcomm_mode_t) 1
|
||||
/* binomial tree */
|
||||
#define ORTE_GRPCOMM_BINOMIAL (orte_grpcomm_mode_t) 2
|
||||
/* linear - HNP sends direct to all daemons */
|
||||
#define ORTE_GRPCOMM_LINEAR (orte_grpcomm_mode_t) 3
|
||||
|
||||
|
||||
END_C_DECLS
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user