2004-11-22 00:37:56 +00:00
|
|
|
/*
|
2005-11-05 19:57:48 +00:00
|
|
|
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
|
|
|
* University Research and Technology
|
|
|
|
* Corporation. All rights reserved.
|
|
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
|
|
* of Tennessee Research Foundation. All rights
|
|
|
|
* reserved.
|
2006-02-07 03:32:36 +00:00
|
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
2004-11-28 20:09:25 +00:00
|
|
|
* University of Stuttgart. All rights reserved.
|
2005-03-24 12:43:37 +00:00
|
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
|
|
* All rights reserved.
|
2007-05-31 18:27:54 +00:00
|
|
|
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
|
2004-11-22 01:38:40 +00:00
|
|
|
* $COPYRIGHT$
|
2006-02-07 03:32:36 +00:00
|
|
|
*
|
2004-11-22 01:38:40 +00:00
|
|
|
* Additional copyrights may follow
|
2006-02-07 03:32:36 +00:00
|
|
|
*
|
2004-11-22 00:37:56 +00:00
|
|
|
* $HEADER$
|
|
|
|
*/
|
|
|
|
|
2006-02-12 01:33:29 +00:00
|
|
|
#include "orte_config.h"
|
2008-02-28 01:57:57 +00:00
|
|
|
#include "orte/constants.h"
|
|
|
|
#include "orte/types.h"
|
2006-11-03 18:55:05 +00:00
|
|
|
|
2004-11-20 19:12:43 +00:00
|
|
|
#include <string.h>
|
2008-12-09 23:49:02 +00:00
|
|
|
#include <fcntl.h>
|
2004-11-20 19:12:43 +00:00
|
|
|
|
2005-10-31 16:21:51 +00:00
|
|
|
#include "opal/threads/condition.h"
|
2006-11-28 00:06:25 +00:00
|
|
|
#include "opal/util/bit_ops.h"
|
2008-03-05 22:44:35 +00:00
|
|
|
#include "opal/class/opal_hash_table.h"
|
2008-02-28 01:57:57 +00:00
|
|
|
#include "opal/dss/dss.h"
|
2008-12-09 23:49:02 +00:00
|
|
|
#include "opal/runtime/opal.h"
|
2008-06-18 22:17:53 +00:00
|
|
|
|
2006-02-12 01:33:29 +00:00
|
|
|
#include "orte/mca/errmgr/errmgr.h"
|
2008-06-18 22:17:53 +00:00
|
|
|
#include "orte/mca/ess/ess.h"
|
2009-01-27 19:13:56 +00:00
|
|
|
#include "orte/mca/odls/base/base.h"
|
Bring in the code for routing xcast stage gate messages via the local orteds. This code is inactive unless you specifically request it via an mca param oob_xcast_mode (can be set to "linear" or "direct"). Direct mode is the old standard method where we send messages directly to each MPI process. Linear mode sends the xcast message via the orteds, with the HNP sending the message to each orted directly.
There is a binomial algorithm in the code (i.e., the HNP would send to a subset of the orteds, which then relay it on according to the typical log-2 algo), but that has a bug in it so the code won't let you select it even if you tried (and the mca param doesn't show, so you'd *really* have to try).
This also involved a slight change to the oob.xcast API, so propagated that as required.
Note: this has *only* been tested on rsh, SLURM, and Bproc environments (now that it has been transferred to the OMPI trunk, I'll need to re-test it [only done rsh so far]). It should work fine on any environment that uses the ORTE daemons - anywhere else, you are on your own... :-)
Also, correct a mistake where the orte_debug_flag was declared an int, but the mca param was set as a bool. Move the storage for that flag to the orte/runtime/params.c and orte/runtime/params.h files appropriately.
This commit was SVN r14475.
2007-04-23 18:41:04 +00:00
|
|
|
#include "orte/mca/rml/rml.h"
|
2009-01-27 19:13:56 +00:00
|
|
|
#include "orte/mca/routed/routed.h"
|
2008-02-28 01:57:57 +00:00
|
|
|
#include "orte/util/name_fns.h"
|
2008-06-18 22:17:53 +00:00
|
|
|
#include "orte/util/show_help.h"
|
|
|
|
#include "orte/util/proc_info.h"
|
2009-01-07 15:00:26 +00:00
|
|
|
#include "orte/util/nidmap.h"
|
2008-02-28 01:57:57 +00:00
|
|
|
#include "orte/orted/orted.h"
|
2008-02-28 19:58:32 +00:00
|
|
|
#include "orte/runtime/orte_wait.h"
|
2008-06-18 22:17:53 +00:00
|
|
|
#include "orte/runtime/orte_globals.h"
|
2004-11-20 19:12:43 +00:00
|
|
|
|
These changes were mostly captured in a prior RFC (except for #2 below) and are aimed specifically at improving startup performance and setting up the remaining modifications described in that RFC.
The commit has been tested for C/R and Cray operations, and on Odin (SLURM, rsh) and RoadRunner (TM). I tried to update all environments, but obviously could not test them. I know that Windows needs some work, and have highlighted what is know to be needed in the odls process component.
This represents a lot of work by Brian, Tim P, Josh, and myself, with much advice from Jeff and others. For posterity, I have appended a copy of the email describing the work that was done:
As we have repeatedly noted, the modex operation in MPI_Init is the single greatest consumer of time during startup. To-date, we have executed that operation as an ORTE stage gate that held the process until a startup message containing all required modex (and OOB contact info - see #3 below) info could be sent to it. Each process would send its data to the HNP's registry, which assembled and sent the message when all processes had reported in.
In addition, ORTE had taken responsibility for monitoring process status as it progressed through a series of "stage gates". The process reported its status at each gate, and ORTE would then send a "release" message once all procs had reported in.
The incoming changes revamp these procedures in three ways:
1. eliminating the ORTE stage gate system and cleanly delineating responsibility between the OMPI and ORTE layers for MPI init/finalize. The modex stage gate (STG1) has been replaced by a collective operation in the modex itself that performs an allgather on the required modex info. The allgather is implemented using the orte_grpcomm framework since the BTL's are not active at that point. At the moment, the grpcomm framework only has a "basic" component analogous to OMPI's "basic" coll framework - I would recommend that the MPI team create additional, more advanced components to improve performance of this step.
The other stage gates have been replaced by orte_grpcomm barrier functions. We tried to use MPI barriers instead (since the BTL's are active at that point), but - as we discussed on the telecon - these are not currently true barriers so the job would hang when we fell through while messages were still in process. Note that the grpcomm barrier doesn't actually resolve that problem, but Brian has pointed out that we are unlikely to ever see it violated. Again, you might want to spend a little time on an advanced barrier algorithm as the one in "basic" is very simplistic.
Summarizing this change: ORTE no longer tracks process state nor has direct responsibility for synchronizing jobs. This is now done via collective operations within the MPI layer, albeit using ORTE collective communication services. I -strongly- urge the MPI team to implement advanced collective algorithms to improve the performance of this critical procedure.
2. reducing the volume of data exchanged during modex. Data in the modex consisted of the process name, the name of the node where that process is located (expressed as a string), plus a string representation of all contact info. The nodename was required in order for the modex to determine if the process was local or not - in addition, some people like to have it to print pretty error messages when a connection failed.
The size of this data has been reduced in three ways:
(a) reducing the size of the process name itself. The process name consisted of two 32-bit fields for the jobid and vpid. This is far larger than any current system, or system likely to exist in the near future, can support. Accordingly, the default size of these fields has been reduced to 16-bits, which means you can have 32k procs in each of 32k jobs. Since the daemons must have a vpid, and we require one daemon/node, this also restricts the default configuration to 32k nodes.
To support any future "mega-clusters", a configuration option --enable-jumbo-apps has been added. This option increases the jobid and vpid field sizes to 32-bits. Someday, if necessary, someone can add yet another option to increase them to 64-bits, I suppose.
(b) replacing the string nodename with an integer nodeid. Since we have one daemon/node, the nodeid corresponds to the local daemon's vpid. This replaces an often lengthy string with only 2 (or at most 4) bytes, a substantial reduction.
(c) when the mca param requesting that nodenames be sent to support pretty error messages, a second mca param is now used to request FQDN - otherwise, the domain name is stripped (by default) from the message to save space. If someone wants to combine those into a single param somehow (perhaps with an argument?), they are welcome to do so - I didn't want to alter what people are already using.
While these may seem like small savings, they actually amount to a significant impact when aggregated across the entire modex operation. Since every proc must receive the modex data regardless of the collective used to send it, just reducing the size of the process name removes nearly 400MBytes of communication from a 32k proc job (admittedly, much of this comm may occur in parallel). So it does add up pretty quickly.
3. routing RML messages to reduce connections. The default messaging system remains point-to-point - i.e., each proc opens a socket to every proc it communicates with and sends its messages directly. A new option uses the orteds as routers - i.e., each proc only opens a single socket to its local orted. All messages are sent from the proc to the orted, which forwards the message to the orted on the node where the intended recipient proc is located - that orted then forwards the message to its local proc (the recipient). This greatly reduces the connection storm we have encountered during startup.
It also has the benefit of removing the sharing of every proc's OOB contact with every other proc. The orted routing tables are populated during launch since every orted gets a map of where every proc is being placed. Each proc, therefore, only needs to know the contact info for its local daemon, which is passed in via the environment when the proc is fork/exec'd by the daemon. This alone removes ~50 bytes/process of communication that was in the current STG1 startup message - so for our 32k proc job, this saves us roughly 32k*50 = 1.6MBytes sent to 32k procs = 51GBytes of messaging.
Note that you can use the new routing method by specifying -mca routed tree - if you so desire. This mode will become the default at some point in the future.
There are a few minor additional changes in the commit that I'll just note in passing:
* propagation of command line mca params to the orteds - fixes ticket #1073. See note there for details.
* requiring of "finalize" prior to "exit" for MPI procs - fixes ticket #1144. See note there for details.
* cleanup of some stale header files
This commit was SVN r16364.
2007-10-05 19:48:23 +00:00
|
|
|
#include "orte/mca/grpcomm/base/base.h"
|
2007-07-20 01:34:02 +00:00
|
|
|
#include "grpcomm_basic.h"
|
2004-11-20 19:12:43 +00:00
|
|
|
|
2008-02-28 01:57:57 +00:00
|
|
|
|
2008-03-24 20:50:31 +00:00
|
|
|
/* Static API's */
|
|
|
|
static int init(void);
|
|
|
|
static void finalize(void);
|
|
|
|
static int xcast(orte_jobid_t job,
|
|
|
|
opal_buffer_t *buffer,
|
|
|
|
orte_rml_tag_t tag);
|
2008-04-09 22:10:53 +00:00
|
|
|
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
|
|
|
static int barrier(void);
|
2008-06-18 22:17:53 +00:00
|
|
|
static int modex(opal_list_t *procs);
|
2008-12-09 23:49:02 +00:00
|
|
|
static int set_proc_attr(const char *attr_name, const void *data, size_t size);
|
2009-01-07 15:00:26 +00:00
|
|
|
static int get_proc_attr(const orte_process_name_t proc,
|
|
|
|
const char * attribute_name, void **val,
|
|
|
|
size_t *size);
|
2008-03-24 20:50:31 +00:00
|
|
|
|
|
|
|
/* Module def */
|
|
|
|
orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
|
|
|
init,
|
|
|
|
finalize,
|
|
|
|
xcast,
|
2008-04-09 22:10:53 +00:00
|
|
|
allgather,
|
2008-03-24 20:50:31 +00:00
|
|
|
orte_grpcomm_base_allgather_list,
|
2008-04-09 22:10:53 +00:00
|
|
|
barrier,
|
2008-12-09 23:49:02 +00:00
|
|
|
set_proc_attr,
|
2009-01-07 15:00:26 +00:00
|
|
|
get_proc_attr,
|
2008-06-18 22:17:53 +00:00
|
|
|
modex,
|
2008-03-24 20:50:31 +00:00
|
|
|
orte_grpcomm_base_purge_proc_attrs
|
|
|
|
};
|
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
static bool recv_on;
|
|
|
|
static opal_buffer_t *profile_buf=NULL;
|
2009-01-27 19:13:56 +00:00
|
|
|
static int profile_fd = -1;
|
|
|
|
static void profile_recv(int status, orte_process_name_t* sender,
|
|
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
|
|
void* cbdata);
|
|
|
|
static void daemon_coll_recv(int status, orte_process_name_t* sender,
|
|
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
|
|
void* cbdata);
|
2008-12-09 23:49:02 +00:00
|
|
|
|
2008-03-24 20:50:31 +00:00
|
|
|
/**
|
|
|
|
* Initialize the module
|
|
|
|
*/
|
|
|
|
static int init(void)
|
|
|
|
{
|
|
|
|
int rc;
|
2008-12-09 23:49:02 +00:00
|
|
|
int value;
|
2008-03-24 20:50:31 +00:00
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
mca_base_param_reg_int_name("orte", "grpcomm_recv_on",
|
|
|
|
"Turn on grpcomm recv for profile purposes",
|
|
|
|
true, false,
|
|
|
|
(int) false, &value);
|
|
|
|
recv_on = OPAL_INT_TO_BOOL(value);
|
|
|
|
|
2008-03-24 20:50:31 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
2008-12-09 23:49:02 +00:00
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
if (opal_profile && orte_process_info.mpi_proc) {
|
|
|
|
/* if I am an MPI application proc, then create a buffer
|
|
|
|
* to pack all my attributes in */
|
|
|
|
profile_buf = OBJ_NEW(opal_buffer_t);
|
|
|
|
/* seed it with the node name */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &orte_process_info.nodename, 1, OPAL_STRING))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
}
|
2008-12-09 23:49:02 +00:00
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
if (orte_process_info.hnp && recv_on) {
|
2009-01-27 19:13:56 +00:00
|
|
|
/* open the profile file for writing */
|
|
|
|
if (NULL == opal_profile_file) {
|
|
|
|
/* no file specified - we will just ignore any incoming data */
|
|
|
|
profile_fd = -1;
|
|
|
|
} else {
|
|
|
|
profile_fd = open(opal_profile_file, O_CREAT|O_RDWR|O_TRUNC, 0644);
|
|
|
|
if (profile_fd < 0) {
|
|
|
|
/* couldn't be opened */
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
|
|
|
return ORTE_ERR_FILE_OPEN_FAILURE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
|
|
|
ORTE_RML_TAG_GRPCOMM_PROFILE,
|
|
|
|
ORTE_RML_NON_PERSISTENT,
|
|
|
|
profile_recv,
|
|
|
|
NULL))) {
|
2008-12-09 23:49:02 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
2009-01-27 19:13:56 +00:00
|
|
|
}
|
2008-12-09 23:49:02 +00:00
|
|
|
}
|
|
|
|
|
2009-01-27 19:13:56 +00:00
|
|
|
/* if we are a daemon or the hnp, we need to post a
|
|
|
|
* recv to catch any collective operations
|
|
|
|
*/
|
|
|
|
if (orte_process_info.daemon || orte_process_info.hnp) {
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
|
|
|
ORTE_RML_TAG_DAEMON_COLLECTIVE,
|
|
|
|
ORTE_RML_NON_PERSISTENT,
|
|
|
|
daemon_coll_recv,
|
|
|
|
NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return rc;
|
2008-03-24 20:50:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Finalize the module
|
|
|
|
*/
|
|
|
|
static void finalize(void)
|
|
|
|
{
|
2009-01-07 15:00:26 +00:00
|
|
|
opal_byte_object_t bo, *boptr;
|
|
|
|
opal_buffer_t profile;
|
|
|
|
|
2008-03-24 20:50:31 +00:00
|
|
|
orte_grpcomm_base_modex_finalize();
|
2008-12-09 23:49:02 +00:00
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
if (opal_profile && orte_process_info.mpi_proc) {
|
|
|
|
/* if I am an MPI proc, send my buffer to the collector */
|
|
|
|
boptr = &bo;
|
|
|
|
opal_dss.unload(profile_buf, (void**)&boptr->bytes, &boptr->size);
|
|
|
|
OBJ_RELEASE(profile_buf);
|
|
|
|
/* store it as a single object */
|
|
|
|
OBJ_CONSTRUCT(&profile, opal_buffer_t);
|
|
|
|
opal_dss.pack(&profile, &boptr, 1, OPAL_BYTE_OBJECT);
|
|
|
|
/* send the buffer */
|
|
|
|
orte_rml.send_buffer(ORTE_PROC_MY_HNP, &profile, ORTE_RML_TAG_GRPCOMM_PROFILE, 0);
|
|
|
|
/* done with buffer */
|
|
|
|
OBJ_DESTRUCT(&profile);
|
2008-12-09 23:49:02 +00:00
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
|
|
|
|
if (orte_process_info.hnp && recv_on) {
|
|
|
|
/* if we are profiling and I am the HNP, then stop the
|
|
|
|
* profiling receive
|
|
|
|
*/
|
2009-01-27 19:13:56 +00:00
|
|
|
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_GRPCOMM_PROFILE);
|
|
|
|
if (0 <= profile_fd) {
|
|
|
|
close(profile_fd);
|
|
|
|
profile_fd = -1;
|
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
}
|
2009-01-27 19:13:56 +00:00
|
|
|
|
|
|
|
/* if we are a daemon or the hnp, we need to cancel the
|
|
|
|
* recv we posted
|
|
|
|
*/
|
|
|
|
if (orte_process_info.daemon || orte_process_info.hnp) {
|
|
|
|
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE);
|
|
|
|
}
|
2008-03-24 20:50:31 +00:00
|
|
|
}
|
|
|
|
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
/**
|
|
|
|
* A "broadcast-like" function to a job's processes.
|
|
|
|
* @param jobid The job whose processes are to receive the message
|
|
|
|
* @param buffer The data to broadcast
|
|
|
|
*/
|
2006-12-01 22:30:39 +00:00
|
|
|
|
2007-07-20 01:34:02 +00:00
|
|
|
static int xcast(orte_jobid_t job,
|
2008-02-28 01:57:57 +00:00
|
|
|
opal_buffer_t *buffer,
|
2007-07-20 01:34:02 +00:00
|
|
|
orte_rml_tag_t tag)
|
2006-12-01 22:30:39 +00:00
|
|
|
{
|
2006-12-03 00:19:11 +00:00
|
|
|
int rc = ORTE_SUCCESS;
|
2008-05-05 22:32:25 +00:00
|
|
|
opal_buffer_t buf;
|
|
|
|
orte_daemon_cmd_flag_t command;
|
2006-12-01 22:30:39 +00:00
|
|
|
|
2008-06-09 14:53:58 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
2008-03-17 19:34:36 +00:00
|
|
|
"%s grpcomm:xcast sent to job %s tag %ld",
|
These changes were mostly captured in a prior RFC (except for #2 below) and are aimed specifically at improving startup performance and setting up the remaining modifications described in that RFC.
The commit has been tested for C/R and Cray operations, and on Odin (SLURM, rsh) and RoadRunner (TM). I tried to update all environments, but obviously could not test them. I know that Windows needs some work, and have highlighted what is know to be needed in the odls process component.
This represents a lot of work by Brian, Tim P, Josh, and myself, with much advice from Jeff and others. For posterity, I have appended a copy of the email describing the work that was done:
As we have repeatedly noted, the modex operation in MPI_Init is the single greatest consumer of time during startup. To-date, we have executed that operation as an ORTE stage gate that held the process until a startup message containing all required modex (and OOB contact info - see #3 below) info could be sent to it. Each process would send its data to the HNP's registry, which assembled and sent the message when all processes had reported in.
In addition, ORTE had taken responsibility for monitoring process status as it progressed through a series of "stage gates". The process reported its status at each gate, and ORTE would then send a "release" message once all procs had reported in.
The incoming changes revamp these procedures in three ways:
1. eliminating the ORTE stage gate system and cleanly delineating responsibility between the OMPI and ORTE layers for MPI init/finalize. The modex stage gate (STG1) has been replaced by a collective operation in the modex itself that performs an allgather on the required modex info. The allgather is implemented using the orte_grpcomm framework since the BTL's are not active at that point. At the moment, the grpcomm framework only has a "basic" component analogous to OMPI's "basic" coll framework - I would recommend that the MPI team create additional, more advanced components to improve performance of this step.
The other stage gates have been replaced by orte_grpcomm barrier functions. We tried to use MPI barriers instead (since the BTL's are active at that point), but - as we discussed on the telecon - these are not currently true barriers so the job would hang when we fell through while messages were still in process. Note that the grpcomm barrier doesn't actually resolve that problem, but Brian has pointed out that we are unlikely to ever see it violated. Again, you might want to spend a little time on an advanced barrier algorithm as the one in "basic" is very simplistic.
Summarizing this change: ORTE no longer tracks process state nor has direct responsibility for synchronizing jobs. This is now done via collective operations within the MPI layer, albeit using ORTE collective communication services. I -strongly- urge the MPI team to implement advanced collective algorithms to improve the performance of this critical procedure.
2. reducing the volume of data exchanged during modex. Data in the modex consisted of the process name, the name of the node where that process is located (expressed as a string), plus a string representation of all contact info. The nodename was required in order for the modex to determine if the process was local or not - in addition, some people like to have it to print pretty error messages when a connection failed.
The size of this data has been reduced in three ways:
(a) reducing the size of the process name itself. The process name consisted of two 32-bit fields for the jobid and vpid. This is far larger than any current system, or system likely to exist in the near future, can support. Accordingly, the default size of these fields has been reduced to 16-bits, which means you can have 32k procs in each of 32k jobs. Since the daemons must have a vpid, and we require one daemon/node, this also restricts the default configuration to 32k nodes.
To support any future "mega-clusters", a configuration option --enable-jumbo-apps has been added. This option increases the jobid and vpid field sizes to 32-bits. Someday, if necessary, someone can add yet another option to increase them to 64-bits, I suppose.
(b) replacing the string nodename with an integer nodeid. Since we have one daemon/node, the nodeid corresponds to the local daemon's vpid. This replaces an often lengthy string with only 2 (or at most 4) bytes, a substantial reduction.
(c) when the mca param requesting that nodenames be sent to support pretty error messages, a second mca param is now used to request FQDN - otherwise, the domain name is stripped (by default) from the message to save space. If someone wants to combine those into a single param somehow (perhaps with an argument?), they are welcome to do so - I didn't want to alter what people are already using.
While these may seem like small savings, they actually amount to a significant impact when aggregated across the entire modex operation. Since every proc must receive the modex data regardless of the collective used to send it, just reducing the size of the process name removes nearly 400MBytes of communication from a 32k proc job (admittedly, much of this comm may occur in parallel). So it does add up pretty quickly.
3. routing RML messages to reduce connections. The default messaging system remains point-to-point - i.e., each proc opens a socket to every proc it communicates with and sends its messages directly. A new option uses the orteds as routers - i.e., each proc only opens a single socket to its local orted. All messages are sent from the proc to the orted, which forwards the message to the orted on the node where the intended recipient proc is located - that orted then forwards the message to its local proc (the recipient). This greatly reduces the connection storm we have encountered during startup.
It also has the benefit of removing the sharing of every proc's OOB contact with every other proc. The orted routing tables are populated during launch since every orted gets a map of where every proc is being placed. Each proc, therefore, only needs to know the contact info for its local daemon, which is passed in via the environment when the proc is fork/exec'd by the daemon. This alone removes ~50 bytes/process of communication that was in the current STG1 startup message - so for our 32k proc job, this saves us roughly 32k*50 = 1.6MBytes sent to 32k procs = 51GBytes of messaging.
Note that you can use the new routing method by specifying -mca routed tree - if you so desire. This mode will become the default at some point in the future.
There are a few minor additional changes in the commit that I'll just note in passing:
* propagation of command line mca params to the orteds - fixes ticket #1073. See note there for details.
* requiring of "finalize" prior to "exit" for MPI procs - fixes ticket #1144. See note there for details.
* cleanup of some stale header files
This commit was SVN r16364.
2007-10-05 19:48:23 +00:00
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
2008-02-28 01:57:57 +00:00
|
|
|
ORTE_JOBID_PRINT(job), (long)tag));
|
2007-07-20 01:34:02 +00:00
|
|
|
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
/* if there is no message to send, then just return ok */
|
|
|
|
if (NULL == buffer) {
|
|
|
|
return ORTE_SUCCESS;
|
|
|
|
}
|
These changes were mostly captured in a prior RFC (except for #2 below) and are aimed specifically at improving startup performance and setting up the remaining modifications described in that RFC.
The commit has been tested for C/R and Cray operations, and on Odin (SLURM, rsh) and RoadRunner (TM). I tried to update all environments, but obviously could not test them. I know that Windows needs some work, and have highlighted what is know to be needed in the odls process component.
This represents a lot of work by Brian, Tim P, Josh, and myself, with much advice from Jeff and others. For posterity, I have appended a copy of the email describing the work that was done:
As we have repeatedly noted, the modex operation in MPI_Init is the single greatest consumer of time during startup. To-date, we have executed that operation as an ORTE stage gate that held the process until a startup message containing all required modex (and OOB contact info - see #3 below) info could be sent to it. Each process would send its data to the HNP's registry, which assembled and sent the message when all processes had reported in.
In addition, ORTE had taken responsibility for monitoring process status as it progressed through a series of "stage gates". The process reported its status at each gate, and ORTE would then send a "release" message once all procs had reported in.
The incoming changes revamp these procedures in three ways:
1. eliminating the ORTE stage gate system and cleanly delineating responsibility between the OMPI and ORTE layers for MPI init/finalize. The modex stage gate (STG1) has been replaced by a collective operation in the modex itself that performs an allgather on the required modex info. The allgather is implemented using the orte_grpcomm framework since the BTL's are not active at that point. At the moment, the grpcomm framework only has a "basic" component analogous to OMPI's "basic" coll framework - I would recommend that the MPI team create additional, more advanced components to improve performance of this step.
The other stage gates have been replaced by orte_grpcomm barrier functions. We tried to use MPI barriers instead (since the BTL's are active at that point), but - as we discussed on the telecon - these are not currently true barriers so the job would hang when we fell through while messages were still in process. Note that the grpcomm barrier doesn't actually resolve that problem, but Brian has pointed out that we are unlikely to ever see it violated. Again, you might want to spend a little time on an advanced barrier algorithm as the one in "basic" is very simplistic.
Summarizing this change: ORTE no longer tracks process state nor has direct responsibility for synchronizing jobs. This is now done via collective operations within the MPI layer, albeit using ORTE collective communication services. I -strongly- urge the MPI team to implement advanced collective algorithms to improve the performance of this critical procedure.
2. reducing the volume of data exchanged during modex. Data in the modex consisted of the process name, the name of the node where that process is located (expressed as a string), plus a string representation of all contact info. The nodename was required in order for the modex to determine if the process was local or not - in addition, some people like to have it to print pretty error messages when a connection failed.
The size of this data has been reduced in three ways:
(a) reducing the size of the process name itself. The process name consisted of two 32-bit fields for the jobid and vpid. This is far larger than any current system, or system likely to exist in the near future, can support. Accordingly, the default size of these fields has been reduced to 16-bits, which means you can have 32k procs in each of 32k jobs. Since the daemons must have a vpid, and we require one daemon/node, this also restricts the default configuration to 32k nodes.
To support any future "mega-clusters", a configuration option --enable-jumbo-apps has been added. This option increases the jobid and vpid field sizes to 32-bits. Someday, if necessary, someone can add yet another option to increase them to 64-bits, I suppose.
(b) replacing the string nodename with an integer nodeid. Since we have one daemon/node, the nodeid corresponds to the local daemon's vpid. This replaces an often lengthy string with only 2 (or at most 4) bytes, a substantial reduction.
(c) when the mca param requesting that nodenames be sent to support pretty error messages, a second mca param is now used to request FQDN - otherwise, the domain name is stripped (by default) from the message to save space. If someone wants to combine those into a single param somehow (perhaps with an argument?), they are welcome to do so - I didn't want to alter what people are already using.
While these may seem like small savings, they actually amount to a significant impact when aggregated across the entire modex operation. Since every proc must receive the modex data regardless of the collective used to send it, just reducing the size of the process name removes nearly 400MBytes of communication from a 32k proc job (admittedly, much of this comm may occur in parallel). So it does add up pretty quickly.
3. routing RML messages to reduce connections. The default messaging system remains point-to-point - i.e., each proc opens a socket to every proc it communicates with and sends its messages directly. A new option uses the orteds as routers - i.e., each proc only opens a single socket to its local orted. All messages are sent from the proc to the orted, which forwards the message to the orted on the node where the intended recipient proc is located - that orted then forwards the message to its local proc (the recipient). This greatly reduces the connection storm we have encountered during startup.
It also has the benefit of removing the sharing of every proc's OOB contact with every other proc. The orted routing tables are populated during launch since every orted gets a map of where every proc is being placed. Each proc, therefore, only needs to know the contact info for its local daemon, which is passed in via the environment when the proc is fork/exec'd by the daemon. This alone removes ~50 bytes/process of communication that was in the current STG1 startup message - so for our 32k proc job, this saves us roughly 32k*50 = 1.6MBytes sent to 32k procs = 51GBytes of messaging.
Note that you can use the new routing method by specifying -mca routed tree - if you so desire. This mode will become the default at some point in the future.
There are a few minor additional changes in the commit that I'll just note in passing:
* propagation of command line mca params to the orteds - fixes ticket #1073. See note there for details.
* requiring of "finalize" prior to "exit" for MPI procs - fixes ticket #1144. See note there for details.
* cleanup of some stale header files
This commit was SVN r16364.
2007-10-05 19:48:23 +00:00
|
|
|
|
2008-05-05 22:32:25 +00:00
|
|
|
/* setup a buffer to handle the xcast command */
|
|
|
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
|
|
/* all we need to do is send this to the HNP - the relay logic
|
|
|
|
* will ensure everyone else gets it! So tell the HNP to
|
|
|
|
* process and relay it. The HNP will use the routed.get_routing_tree
|
|
|
|
* to find out who it should relay the message to.
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
*/
|
2008-02-28 01:57:57 +00:00
|
|
|
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
|
2008-05-05 22:32:25 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
2008-05-05 22:32:25 +00:00
|
|
|
/* pack the target jobid and tag for use in relay */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &job, 1, ORTE_JOBID))) {
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
2008-05-05 22:32:25 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG))) {
|
2008-05-02 14:30:07 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
2008-03-03 20:07:02 +00:00
|
|
|
}
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
|
Bring in the generalized xcast communication system along with the correspondingly revised orted launch. I will send a message out to developers explaining the basic changes. In brief:
1. generalize orte_rml.xcast to become a general broadcast-like messaging system. Messages can now be sent to any tag on the daemons or processes. Note that any message sent via xcast will be delivered to ALL processes in the specified job - you don't get to pick and choose. At a later date, we will introduce an augmented capability that will use the daemons as relays, but will allow you to send to a specified array of process names.
2. extended orte_rml.xcast so it supports more scalable message routing methodologies. At the moment, we support three: (a) direct, which sends the message directly to all recipients; (b) linear, which sends the message to the local daemon on each node, which then relays it to its own local procs; and (b) binomial, which sends the message via a binomial algo across all the daemons, each of which then relays to its own local procs. The crossover points between the algos are adjustable via MCA param, or you can simply demand that a specific algo be used.
3. orteds no longer exhibit two types of behavior: bootproxy or VM. Orteds now always behave like they are part of a virtual machine - they simply launch a job if mpirun tells them to do so. This is another step towards creating an "orteboot" functionality, but also provided a clean system for supporting message relaying.
Note one major impact of this commit: multiple daemons on a node cannot be supported any longer! Only a single daemon/node is now allowed.
This commit is known to break support for the following environments: POE, Xgrid, Xcpu, Windows. It has been tested on rsh, SLURM, and Bproc. Modifications for TM support have been made but could not be verified due to machine problems at LANL. Modifications for SGE have been made but could not be verified. The developers for the non-verified environments will be separately notified along with suggestions on how to fix the problems.
This commit was SVN r15007.
2007-06-12 13:28:54 +00:00
|
|
|
/* if this isn't intended for the daemon command tag, then we better
|
|
|
|
* tell the daemon to deliver it to the procs, and what job is supposed
|
|
|
|
* to get it - this occurs when a caller just wants to send something
|
|
|
|
* to all the procs in a job. In that use-case, the caller doesn't know
|
|
|
|
* anything about inserting daemon commands or what routing algo might
|
|
|
|
* be used, so we have to help them out a little. Functions that are
|
|
|
|
* sending commands to the daemons themselves are smart enough to know
|
|
|
|
* what they need to do.
|
|
|
|
*/
|
|
|
|
if (ORTE_RML_TAG_DAEMON != tag) {
|
|
|
|
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
2008-05-05 22:32:25 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
|
Bring in the generalized xcast communication system along with the correspondingly revised orted launch. I will send a message out to developers explaining the basic changes. In brief:
1. generalize orte_rml.xcast to become a general broadcast-like messaging system. Messages can now be sent to any tag on the daemons or processes. Note that any message sent via xcast will be delivered to ALL processes in the specified job - you don't get to pick and choose. At a later date, we will introduce an augmented capability that will use the daemons as relays, but will allow you to send to a specified array of process names.
2. extended orte_rml.xcast so it supports more scalable message routing methodologies. At the moment, we support three: (a) direct, which sends the message directly to all recipients; (b) linear, which sends the message to the local daemon on each node, which then relays it to its own local procs; and (b) binomial, which sends the message via a binomial algo across all the daemons, each of which then relays to its own local procs. The crossover points between the algos are adjustable via MCA param, or you can simply demand that a specific algo be used.
3. orteds no longer exhibit two types of behavior: bootproxy or VM. Orteds now always behave like they are part of a virtual machine - they simply launch a job if mpirun tells them to do so. This is another step towards creating an "orteboot" functionality, but also provided a clean system for supporting message relaying.
Note one major impact of this commit: multiple daemons on a node cannot be supported any longer! Only a single daemon/node is now allowed.
This commit is known to break support for the following environments: POE, Xgrid, Xcpu, Windows. It has been tested on rsh, SLURM, and Bproc. Modifications for TM support have been made but could not be verified due to machine problems at LANL. Modifications for SGE have been made but could not be verified. The developers for the non-verified environments will be separately notified along with suggestions on how to fix the problems.
This commit was SVN r15007.
2007-06-12 13:28:54 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
2008-05-05 22:32:25 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &job, 1, ORTE_JOBID))) {
|
Bring in the generalized xcast communication system along with the correspondingly revised orted launch. I will send a message out to developers explaining the basic changes. In brief:
1. generalize orte_rml.xcast to become a general broadcast-like messaging system. Messages can now be sent to any tag on the daemons or processes. Note that any message sent via xcast will be delivered to ALL processes in the specified job - you don't get to pick and choose. At a later date, we will introduce an augmented capability that will use the daemons as relays, but will allow you to send to a specified array of process names.
2. extended orte_rml.xcast so it supports more scalable message routing methodologies. At the moment, we support three: (a) direct, which sends the message directly to all recipients; (b) linear, which sends the message to the local daemon on each node, which then relays it to its own local procs; and (b) binomial, which sends the message via a binomial algo across all the daemons, each of which then relays to its own local procs. The crossover points between the algos are adjustable via MCA param, or you can simply demand that a specific algo be used.
3. orteds no longer exhibit two types of behavior: bootproxy or VM. Orteds now always behave like they are part of a virtual machine - they simply launch a job if mpirun tells them to do so. This is another step towards creating an "orteboot" functionality, but also provided a clean system for supporting message relaying.
Note one major impact of this commit: multiple daemons on a node cannot be supported any longer! Only a single daemon/node is now allowed.
This commit is known to break support for the following environments: POE, Xgrid, Xcpu, Windows. It has been tested on rsh, SLURM, and Bproc. Modifications for TM support have been made but could not be verified due to machine problems at LANL. Modifications for SGE have been made but could not be verified. The developers for the non-verified environments will be separately notified along with suggestions on how to fix the problems.
This commit was SVN r15007.
2007-06-12 13:28:54 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
2008-05-05 22:32:25 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG))) {
|
Bring in the generalized xcast communication system along with the correspondingly revised orted launch. I will send a message out to developers explaining the basic changes. In brief:
1. generalize orte_rml.xcast to become a general broadcast-like messaging system. Messages can now be sent to any tag on the daemons or processes. Note that any message sent via xcast will be delivered to ALL processes in the specified job - you don't get to pick and choose. At a later date, we will introduce an augmented capability that will use the daemons as relays, but will allow you to send to a specified array of process names.
2. extended orte_rml.xcast so it supports more scalable message routing methodologies. At the moment, we support three: (a) direct, which sends the message directly to all recipients; (b) linear, which sends the message to the local daemon on each node, which then relays it to its own local procs; and (b) binomial, which sends the message via a binomial algo across all the daemons, each of which then relays to its own local procs. The crossover points between the algos are adjustable via MCA param, or you can simply demand that a specific algo be used.
3. orteds no longer exhibit two types of behavior: bootproxy or VM. Orteds now always behave like they are part of a virtual machine - they simply launch a job if mpirun tells them to do so. This is another step towards creating an "orteboot" functionality, but also provided a clean system for supporting message relaying.
Note one major impact of this commit: multiple daemons on a node cannot be supported any longer! Only a single daemon/node is now allowed.
This commit is known to break support for the following environments: POE, Xgrid, Xcpu, Windows. It has been tested on rsh, SLURM, and Bproc. Modifications for TM support have been made but could not be verified due to machine problems at LANL. Modifications for SGE have been made but could not be verified. The developers for the non-verified environments will be separately notified along with suggestions on how to fix the problems.
This commit was SVN r15007.
2007-06-12 13:28:54 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/* copy the payload into the new buffer - this is non-destructive, so our
|
|
|
|
* caller is still responsible for releasing any memory in the buffer they
|
|
|
|
* gave to us
|
|
|
|
*/
|
2008-05-05 22:32:25 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, buffer))) {
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
|
|
|
|
2008-04-09 22:10:53 +00:00
|
|
|
/* if I am the HNP, just set things up so the cmd processor gets called.
|
|
|
|
* We don't want to message ourselves as this can create circular logic
|
|
|
|
* in the RML. Instead, this macro will set a zero-time event which will
|
|
|
|
* cause the buffer to be processed by the cmd processor - probably will
|
|
|
|
* fire right away, but that's okay
|
|
|
|
* The macro makes a copy of the buffer, so it's okay to release it here
|
|
|
|
*/
|
|
|
|
if (orte_process_info.hnp) {
|
2008-05-05 22:32:25 +00:00
|
|
|
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
2008-04-09 22:10:53 +00:00
|
|
|
} else {
|
|
|
|
/* otherwise, send it to the HNP for relay */
|
2008-05-05 22:32:25 +00:00
|
|
|
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buf, ORTE_RML_TAG_DAEMON, 0))) {
|
2008-03-03 20:07:02 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
|
|
|
rc = ORTE_SUCCESS;
|
|
|
|
}
|
2008-02-28 01:57:57 +00:00
|
|
|
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
CLEANUP:
|
2008-05-05 22:32:25 +00:00
|
|
|
OBJ_DESTRUCT(&buf);
|
Commit the orted-failed-to-start code. This correctly causes the system to detect the failure of an orted to start and allows the system to terminate all procs/orteds that *did* start.
The primary change that underlies all this is in the OOB. Specifically, the problem in the code until now has been that the OOB attempts to resolve an address when we call the "send" to an unknown recipient. The OOB would then wait forever if that recipient never actually started (and hence, never reported back its OOB contact info). In the case of an orted that failed to start, we would correctly detect that the orted hadn't started, but then we would attempt to order all orteds (including the one that failed to start) to die. This would cause the OOB to "hang" the system.
Unfortunately, revising how the OOB resolves addresses introduced a number of additional problems. Specifically, and most troublesome, was the fact that comm_spawn involved the immediate transmission of the rendezvous point from parent-to-child after the child was spawned. The current code used the OOB address resolution as a "barrier" - basically, the parent would attempt to send the info to the child, and then "hold" there until the child's contact info had arrived (meaning the child had started) and the send could be completed.
Note that this also caused comm_spawn to "hang" the entire system if the child never started... The app-failed-to-start helped improve that behavior - this code provides additional relief.
With this change, the OOB will return an ADDRESSEE_UNKNOWN error if you attempt to send to a recipient whose contact info isn't already in the OOB's hash tables. To resolve comm_spawn issues, we also now force the cross-sharing of connection info between parent and child jobs during spawn.
Finally, to aid in setting triggers to the right values, we introduce the "arith" API for the GPR. This function allows you to atomically change the value in a registry location (either divide, multiply, add, or subtract) by the provided operand. It is equivalent to first fetching the value using a "get", then modifying it, and then putting the result back into the registry via a "put".
This commit was SVN r14711.
2007-05-21 18:31:28 +00:00
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
2008-04-09 22:10:53 +00:00
|
|
|
|
|
|
|
static bool barrier_recvd;
|
|
|
|
|
|
|
|
static void barrier_recv(int status, orte_process_name_t* sender,
|
|
|
|
opal_buffer_t *buffer,
|
|
|
|
orte_rml_tag_t tag, void *cbdata)
|
|
|
|
{
|
|
|
|
/* flag as recvd */
|
|
|
|
barrier_recvd = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int barrier(void)
|
|
|
|
{
|
|
|
|
opal_buffer_t buf;
|
|
|
|
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_BARRIER;
|
|
|
|
int rc;
|
|
|
|
|
2008-06-09 14:53:58 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
2008-04-09 22:10:53 +00:00
|
|
|
"%s grpcomm:basic entering barrier",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* everyone sends barrier to local daemon */
|
|
|
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
|
|
/* tell the daemon we are doing a barrier */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_DESTRUCT(&buf);
|
|
|
|
return rc;
|
2009-01-07 15:00:26 +00:00
|
|
|
}
|
2008-04-09 22:10:53 +00:00
|
|
|
/* send to local daemon */
|
2009-01-27 19:13:56 +00:00
|
|
|
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
|
2008-04-09 22:10:53 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_DESTRUCT(&buf);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
OBJ_DESTRUCT(&buf);
|
|
|
|
|
2008-06-09 14:53:58 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
2008-04-09 22:10:53 +00:00
|
|
|
"%s grpcomm:basic barrier sent",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* now receive the release. Be sure to do this in
|
|
|
|
* a manner that allows us to return without being in a recv!
|
|
|
|
*/
|
|
|
|
barrier_recvd = false;
|
|
|
|
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER,
|
|
|
|
ORTE_RML_NON_PERSISTENT, barrier_recv, NULL);
|
|
|
|
if (rc != ORTE_SUCCESS) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
ORTE_PROGRESSED_WAIT(barrier_recvd, 0, 1);
|
|
|
|
|
2008-06-09 14:53:58 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
2008-04-09 22:10:53 +00:00
|
|
|
"%s grpcomm:basic received barrier release",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
2008-02-28 01:57:57 +00:00
|
|
|
return ORTE_SUCCESS;
|
|
|
|
}
|
|
|
|
|
2008-04-09 22:10:53 +00:00
|
|
|
static opal_buffer_t *allgather_buf;
|
|
|
|
static orte_std_cntr_t allgather_complete;
|
|
|
|
|
|
|
|
static void allgather_recv(int status, orte_process_name_t* sender,
|
|
|
|
opal_buffer_t *buffer,
|
|
|
|
orte_rml_tag_t tag, void *cbdata)
|
2008-03-03 20:07:02 +00:00
|
|
|
{
|
2008-04-09 22:10:53 +00:00
|
|
|
int rc;
|
2008-03-03 20:07:02 +00:00
|
|
|
|
2008-04-09 22:10:53 +00:00
|
|
|
/* xfer the data */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(allgather_buf, buffer))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
allgather_complete = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
opal_buffer_t coll;
|
|
|
|
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_ALLGATHER;
|
|
|
|
|
2008-06-09 14:53:58 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
2008-04-09 22:10:53 +00:00
|
|
|
"%s grpcomm:basic entering allgather",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* everyone sends data to their local daemon */
|
|
|
|
OBJ_CONSTRUCT(&coll, opal_buffer_t);
|
|
|
|
/* tell the daemon we are doing an allgather */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_DESTRUCT(&coll);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
/* add our data to it */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&coll, sbuf))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_DESTRUCT(&coll);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
/* send to local daemon */
|
2009-01-27 19:13:56 +00:00
|
|
|
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
|
2008-04-09 22:10:53 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_DESTRUCT(&coll);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
OBJ_DESTRUCT(&coll);
|
|
|
|
|
2008-06-09 14:53:58 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
2008-04-09 22:10:53 +00:00
|
|
|
"%s grpcomm:basic allgather buffer sent",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* setup the buffer that will recv the results */
|
|
|
|
allgather_buf = OBJ_NEW(opal_buffer_t);
|
|
|
|
|
|
|
|
/* now receive the final result. Be sure to do this in
|
|
|
|
* a manner that allows us to return without being in a recv!
|
|
|
|
*/
|
|
|
|
allgather_complete = false;
|
|
|
|
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER,
|
|
|
|
ORTE_RML_NON_PERSISTENT, allgather_recv, NULL);
|
|
|
|
if (rc != ORTE_SUCCESS) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
2008-03-03 20:07:02 +00:00
|
|
|
}
|
|
|
|
|
2008-04-09 22:10:53 +00:00
|
|
|
ORTE_PROGRESSED_WAIT(allgather_complete, 0, 1);
|
|
|
|
|
|
|
|
/* copy payload to the caller's buffer */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(allgather_buf);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
OBJ_RELEASE(allgather_buf);
|
|
|
|
|
2008-06-09 14:53:58 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
2008-04-09 22:10:53 +00:00
|
|
|
"%s grpcomm:basic allgather completed",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
2008-03-03 20:07:02 +00:00
|
|
|
return ORTE_SUCCESS;
|
|
|
|
}
|
2008-06-18 22:17:53 +00:00
|
|
|
|
|
|
|
/*** MODEX SECTION ***/
|
2009-01-07 15:00:26 +00:00
|
|
|
static int do_modex(opal_list_t *procs)
|
2008-06-18 22:17:53 +00:00
|
|
|
{
|
|
|
|
opal_buffer_t buf, rbuf;
|
2009-02-04 22:26:35 +00:00
|
|
|
int32_t i, num_procs;
|
2009-01-07 15:00:26 +00:00
|
|
|
orte_std_cntr_t cnt, j, num_recvd_entries;
|
2008-06-18 22:17:53 +00:00
|
|
|
orte_process_name_t proc_name;
|
2008-12-09 23:49:02 +00:00
|
|
|
int rc=ORTE_SUCCESS;
|
2008-06-18 22:17:53 +00:00
|
|
|
int32_t arch;
|
2009-01-07 15:00:26 +00:00
|
|
|
bool modex_reqd;
|
|
|
|
orte_nid_t *nid;
|
2008-06-18 22:17:53 +00:00
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex: performing modex",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* setup the buffer that will actually be sent */
|
|
|
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
|
|
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
|
|
|
|
|
|
|
/* put our process name in the buffer so it can be unpacked later */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* pack the entries we have received */
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex: executing allgather",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
|
|
|
if (NULL == procs) {
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex: processing modex info",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* process the results */
|
|
|
|
/* extract the number of procs that put data in the buffer */
|
|
|
|
cnt=1;
|
2009-02-04 22:26:35 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, OPAL_INT32))) {
|
2009-01-07 15:00:26 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex: received %ld data bytes from %ld procs",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs));
|
|
|
|
|
|
|
|
/* if the buffer doesn't have any more data, ignore it */
|
|
|
|
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* otherwise, process it */
|
|
|
|
for (i=0; i < num_procs; i++) {
|
|
|
|
/* unpack the process name */
|
|
|
|
cnt=1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* unpack its architecture */
|
|
|
|
cnt=1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* update the arch in the ESS
|
|
|
|
* RHC: DO NOT UPDATE ARCH IF THE PROC IS NOT IN OUR JOB. THIS IS A TEMPORARY
|
|
|
|
* FIX TO COMPENSATE FOR A PROBLEM IN THE CONNECT/ACCEPT CODE WHERE WE EXCHANGE
|
|
|
|
* INFO INCLUDING THE ARCH, BUT THEN DO A MODEX THAT ALSO INCLUDES THE ARCH. WE
|
|
|
|
* CANNOT UPDATE THE ARCH FOR JOBS OUTSIDE OUR OWN AS THE ESS HAS NO INFO ON
|
|
|
|
* THOSE PROCS/NODES - AND DOESN'T NEED IT AS THE MPI LAYER HAS ALREADY SET
|
|
|
|
* ITSELF UP AND DOES NOT NEED ESS SUPPORT FOR PROCS IN THE OTHER JOB
|
|
|
|
*
|
|
|
|
* EVENTUALLY, WE WILL SUPPORT THE ESS HAVING INFO ON OTHER JOBS FOR
|
|
|
|
* FAULT TOLERANCE PURPOSES - BUT NOT RIGHT NOW
|
|
|
|
*/
|
|
|
|
if (proc_name.jobid == ORTE_PROC_MY_NAME->jobid) {
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex: adding modex entry for proc %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&proc_name)));
|
|
|
|
|
|
|
|
/* unpack the number of entries for this proc */
|
|
|
|
cnt=1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex adding %d entries for proc %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries,
|
|
|
|
ORTE_NAME_PRINT(&proc_name)));
|
|
|
|
|
|
|
|
/* find this proc's node in the nidmap */
|
|
|
|
if (NULL == (nid = orte_util_lookup_nid(&proc_name))) {
|
|
|
|
/* proc wasn't found - return error */
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex no nidmap entry for proc %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&proc_name)));
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
rc = ORTE_ERR_NOT_FOUND;
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Extract the attribute names and values
|
|
|
|
*/
|
|
|
|
for (j = 0; j < num_recvd_entries; j++) {
|
|
|
|
size_t num_bytes;
|
|
|
|
orte_attr_t *attr;
|
|
|
|
|
|
|
|
attr = OBJ_NEW(orte_attr_t);
|
|
|
|
cnt = 1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &(attr->name), &cnt, OPAL_STRING))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
|
|
|
|
cnt = 1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
attr->size = num_bytes;
|
|
|
|
|
|
|
|
if (num_bytes != 0) {
|
|
|
|
if (NULL == (attr->bytes = malloc(num_bytes))) {
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
|
|
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
cnt = (orte_std_cntr_t) num_bytes;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, attr->bytes, &cnt, OPAL_BYTE))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* add this to the node's attribute list */
|
|
|
|
opal_list_append(&nid->attrs, &attr->super);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
cleanup:
|
|
|
|
OBJ_DESTRUCT(&buf);
|
|
|
|
OBJ_DESTRUCT(&rbuf);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int modex(opal_list_t *procs)
|
|
|
|
{
|
|
|
|
int rc=ORTE_SUCCESS;
|
|
|
|
int fd;
|
|
|
|
opal_byte_object_t bo, *boptr;
|
|
|
|
int32_t i, n;
|
|
|
|
char *nodename, *attr;
|
|
|
|
orte_nid_t **nd, *ndptr;
|
|
|
|
orte_attr_t *attrdata;
|
|
|
|
opal_buffer_t bobuf;
|
|
|
|
|
2008-06-18 22:17:53 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic: modex entered",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
2008-12-09 23:49:02 +00:00
|
|
|
/* if we were given a list of procs to modex with, then this is happening
|
|
|
|
* as part of a connect/accept operation. In this case, we -must- do the
|
|
|
|
* modex for two reasons:
|
|
|
|
*
|
|
|
|
* (a) the modex could involve procs from different mpiruns. In this case,
|
|
|
|
* there is no way for the two sets of procs to know which node the
|
|
|
|
* other procs are on, so we cannot use the profile_file to determine
|
|
|
|
* their contact info
|
|
|
|
*
|
|
|
|
* (b) in a comm_spawn, the parent job does not have a pidmap for the
|
|
|
|
* child job. Thus, it cannot know where the child procs are located,
|
|
|
|
* and cannot use the profile_file to determine their contact info
|
2009-01-07 15:00:26 +00:00
|
|
|
*
|
|
|
|
* We also do the modex if we are doing an opal_profile so that the
|
|
|
|
* HNP can collect our modex info.
|
2008-06-19 13:48:26 +00:00
|
|
|
*/
|
2009-01-07 15:00:26 +00:00
|
|
|
if (NULL != procs || opal_profile) {
|
|
|
|
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
return rc;
|
2008-12-09 23:49:02 +00:00
|
|
|
} else if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) {
|
|
|
|
/* decide if we need to add the architecture to the modex. Check
|
|
|
|
* first to see if hetero is enabled - if not, then we clearly
|
|
|
|
* don't need to exchange arch's as they are all identical
|
|
|
|
*/
|
2008-07-01 02:44:57 +00:00
|
|
|
/* Case 1: If different apps in this job were built differently - e.g., some
|
|
|
|
* are built 32-bit while others are built 64-bit - then we need to modex
|
|
|
|
* regardless of any other consideration. The user is reqd to tell us via a
|
|
|
|
* cmd line option if this situation exists, which will result in an mca param
|
|
|
|
* being set for us, so all we need to do is check for the global boolean
|
|
|
|
* that corresponds to that param
|
2008-11-03 21:48:52 +00:00
|
|
|
*
|
|
|
|
* Case 2: the nodes are hetero, but the app binaries were built
|
2008-07-01 02:44:57 +00:00
|
|
|
* the same - i.e., either they are both 32-bit, or they are both 64-bit, but
|
|
|
|
* no mixing of the two. In this case, we include the info in the modex
|
|
|
|
*/
|
2008-11-03 21:48:52 +00:00
|
|
|
if (orte_hetero_apps || !orte_homogeneous_nodes) {
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic: modex is required",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
return rc;
|
2008-07-01 02:44:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
/* no modex is required - see if the data was included in the launch message */
|
|
|
|
if (orte_send_profile) {
|
|
|
|
/* the info was provided in the nidmap - there is nothing more we have to do */
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex using nidmap",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
return ORTE_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* see if a profile file was given to us */
|
|
|
|
if (NULL == opal_profile_file) {
|
|
|
|
/* if we don't have any other way to do this, then let's default to doing the
|
|
|
|
* modex so we at least can function, even if it isn't as fast as we might like
|
|
|
|
*/
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic: modex is required",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
if (ORTE_SUCCESS != (rc = do_modex(procs))) {
|
2008-12-09 23:49:02 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
fd = open(opal_profile_file, O_RDONLY);
|
|
|
|
if (fd < 0) {
|
|
|
|
orte_show_help("help-orte-runtime.txt", "grpcomm-basic:file-cant-open", true, opal_profile_file);
|
|
|
|
return ORTE_ERR_NOT_FOUND;
|
|
|
|
}
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:modex reading %s file",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), opal_profile_file));
|
|
|
|
|
|
|
|
/* loop through file until end */
|
|
|
|
boptr = &bo;
|
|
|
|
nd = (orte_nid_t**)orte_nidmap.addr;
|
|
|
|
while (0 < read(fd, &bo.size, sizeof(bo.size))) {
|
|
|
|
/* this is the number of bytes in the byte object */
|
|
|
|
bo.bytes = malloc(bo.size);
|
|
|
|
if (0 > read(fd, bo.bytes, bo.size)) {
|
|
|
|
orte_show_help("help-orte-runtime.txt", "orte_nidmap:unable-read-file", true, opal_profile_file);
|
|
|
|
close(fd);
|
|
|
|
return ORTE_ERR_FILE_READ_FAILURE;
|
|
|
|
}
|
|
|
|
/* load the byte object into a buffer for unpacking */
|
|
|
|
OBJ_CONSTRUCT(&bobuf, opal_buffer_t);
|
|
|
|
opal_dss.load(&bobuf, boptr->bytes, boptr->size);
|
|
|
|
/* unpack the nodename */
|
|
|
|
n = 1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, &nodename, &n, OPAL_STRING))) {
|
2008-12-09 23:49:02 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
2009-01-07 15:00:26 +00:00
|
|
|
return rc;
|
2008-12-09 23:49:02 +00:00
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
/* find this node in nidmap */
|
|
|
|
for (i=0, ndptr=NULL; i < orte_nidmap.size && NULL != nd[i]; i++) {
|
|
|
|
/* since we may not have kept fqdn hostnames, we can only check
|
|
|
|
* for equality to the length of the name in the nid
|
|
|
|
*/
|
|
|
|
if (0 == strncmp(nd[i]->name, nodename, strlen(nd[i]->name))) {
|
|
|
|
ndptr = nd[i];
|
|
|
|
break;
|
2008-06-18 22:17:53 +00:00
|
|
|
}
|
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
free(nodename); /* done with this */
|
|
|
|
if (NULL == ndptr) {
|
|
|
|
/* didn't find it! */
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
return ORTE_ERR_NOT_FOUND;
|
2008-06-18 22:17:53 +00:00
|
|
|
}
|
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
/* loop through the rest of the object to unpack the attr's themselves */
|
|
|
|
n = 1;
|
|
|
|
while (ORTE_SUCCESS == opal_dss.unpack(&bobuf, &attr, &n, OPAL_STRING)) {
|
|
|
|
attrdata = OBJ_NEW(orte_attr_t);
|
|
|
|
attrdata->name = strdup(attr);
|
|
|
|
/* read the number of bytes in the blob */
|
|
|
|
n = 1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, &attrdata->size, &n, OPAL_INT32))) {
|
2008-06-18 22:17:53 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
2009-01-07 15:00:26 +00:00
|
|
|
return rc;
|
2008-06-18 22:17:53 +00:00
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
/* unpack the bytes */
|
|
|
|
attrdata->bytes = malloc(attrdata->size);
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&bobuf, attrdata->bytes, &attrdata->size, OPAL_BYTE))) {
|
2008-06-18 22:17:53 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
2009-01-07 15:00:26 +00:00
|
|
|
return rc;
|
2008-06-18 22:17:53 +00:00
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
/* add to our list for this node */
|
|
|
|
opal_list_append(&ndptr->attrs, &attrdata->super);
|
2008-06-18 22:17:53 +00:00
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
OBJ_DESTRUCT(&bobuf);
|
2008-06-18 22:17:53 +00:00
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
|
2008-06-18 22:17:53 +00:00
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic: modex completed",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
2008-12-09 23:49:02 +00:00
|
|
|
/* the HNP will -never- execute the following as it is NOT an MPI process */
|
|
|
|
static int set_proc_attr(const char *attr_name, const void *data, size_t size)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:set_proc_attr for attribute %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attr_name));
|
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
/* if we are doing a profile, pack this up */
|
2008-12-09 23:49:02 +00:00
|
|
|
if (opal_profile) {
|
|
|
|
int32_t isize;
|
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &attr_name, 1, OPAL_STRING))) {
|
2008-12-09 23:49:02 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
isize = size;
|
2009-01-07 15:00:26 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, &isize, 1, OPAL_INT32))) {
|
2008-12-09 23:49:02 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(profile_buf, data, isize, OPAL_BYTE))) {
|
2008-12-09 23:49:02 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
/* let it fall through so that the job doesn't hang! */
|
|
|
|
return orte_grpcomm_base_set_proc_attr(attr_name, data, size);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* we always have to set our own attributes in case they are needed for
|
|
|
|
* a connect/accept at some later time
|
|
|
|
*/
|
2009-01-07 15:00:26 +00:00
|
|
|
cleanup:
|
|
|
|
return orte_grpcomm_base_set_proc_attr(attr_name, data, size);
|
|
|
|
}
|
|
|
|
|
|
|
|
static int get_proc_attr(const orte_process_name_t proc,
|
|
|
|
const char * attribute_name, void **val,
|
|
|
|
size_t *size)
|
|
|
|
{
|
|
|
|
orte_nid_t *nid;
|
|
|
|
opal_list_item_t *item;
|
|
|
|
orte_attr_t *attr;
|
2008-12-09 23:49:02 +00:00
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
/* find this proc's node in the nidmap */
|
|
|
|
if (NULL == (nid = orte_util_lookup_nid((orte_process_name_t*)&proc))) {
|
|
|
|
/* proc wasn't found - return error */
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:get_proc_attr: no modex entry for proc %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&proc)));
|
2008-12-09 23:49:02 +00:00
|
|
|
return ORTE_ERR_NOT_FOUND;
|
2009-01-07 15:00:26 +00:00
|
|
|
|
2008-12-09 23:49:02 +00:00
|
|
|
}
|
|
|
|
|
2009-01-07 15:00:26 +00:00
|
|
|
/* look for this attribute */
|
|
|
|
for (item = opal_list_get_first(&nid->attrs);
|
|
|
|
item != opal_list_get_end(&nid->attrs);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
attr = (orte_attr_t*)item;
|
|
|
|
if (0 == strcmp(attr->name, attribute_name)) {
|
|
|
|
/* copy the data to the caller */
|
|
|
|
void *copy = malloc(attr->size);
|
|
|
|
|
|
|
|
if (copy == NULL) {
|
|
|
|
return ORTE_ERR_OUT_OF_RESOURCE;
|
2008-12-09 23:49:02 +00:00
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
memcpy(copy, attr->bytes, attr->size);
|
|
|
|
*val = copy;
|
|
|
|
*size = attr->size;
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:get_proc_attr: found %d bytes for attr %s on proc %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)attr->size,
|
|
|
|
attribute_name, ORTE_NAME_PRINT(&proc)));
|
|
|
|
return ORTE_SUCCESS;
|
2008-12-09 23:49:02 +00:00
|
|
|
}
|
|
|
|
}
|
2009-01-07 15:00:26 +00:00
|
|
|
|
|
|
|
/* get here if attribute isn't found */
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:get_proc_attr: no attr avail or zero byte size for proc %s attribute %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&proc), attribute_name));
|
|
|
|
*val = NULL;
|
|
|
|
*size = 0;
|
|
|
|
|
2008-12-09 23:49:02 +00:00
|
|
|
return ORTE_SUCCESS;
|
|
|
|
}
|
2009-01-27 19:13:56 +00:00
|
|
|
|
|
|
|
|
|
|
|
/* process incoming messages in order of receipt */
|
|
|
|
static void process_msg(int fd, short event, void *data)
|
|
|
|
{
|
|
|
|
orte_message_event_t *mev = (orte_message_event_t*)data;
|
|
|
|
int32_t rc, count;
|
|
|
|
opal_byte_object_t *bo;
|
|
|
|
|
|
|
|
/* save the info in the file */
|
|
|
|
if (0 <= profile_fd) {
|
|
|
|
/* extract the byte object holding the node's modex info */
|
|
|
|
count=1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &bo, &count, OPAL_BYTE_OBJECT))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:receive:profile writing %d bytes of data from proc %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
bo->size, ORTE_NAME_PRINT(&mev->sender)));
|
|
|
|
|
|
|
|
write(profile_fd, &bo->size, sizeof(bo->size));
|
|
|
|
write(profile_fd, bo->bytes, bo->size);
|
|
|
|
free(bo->bytes);
|
|
|
|
free(bo);
|
|
|
|
}
|
|
|
|
|
|
|
|
CLEANUP:
|
|
|
|
/* release the message */
|
|
|
|
OBJ_RELEASE(mev);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program.
|
|
|
|
* DO NOT RELEASE THIS BUFFER IN THIS CODE
|
|
|
|
*/
|
|
|
|
|
|
|
|
static void profile_recv(int status, orte_process_name_t* sender,
|
|
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
|
|
void* cbdata)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:basic:receive got message from %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(sender)));
|
|
|
|
|
|
|
|
/* don't process this right away - we need to get out of the recv before
|
|
|
|
* we process the message as it may ask us to do something that involves
|
|
|
|
* more messaging! Instead, setup an event so that the message gets processed
|
|
|
|
* as soon as we leave the recv.
|
|
|
|
*
|
|
|
|
* The macro makes a copy of the buffer, which we release above - the incoming
|
|
|
|
* buffer, however, is NOT released here, although its payload IS transferred
|
|
|
|
* to the message buffer for later processing
|
|
|
|
*/
|
|
|
|
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_msg);
|
|
|
|
|
|
|
|
/* reissue the recv */
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
|
|
|
ORTE_RML_TAG_GRPCOMM_PROFILE,
|
|
|
|
ORTE_RML_NON_PERSISTENT,
|
|
|
|
profile_recv,
|
|
|
|
NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*************** COLLECTIVES FOR DAEMONS **************/
|
|
|
|
|
|
|
|
static bool all_children_participated(orte_jobid_t job)
|
|
|
|
{
|
|
|
|
opal_list_item_t *item;
|
|
|
|
orte_odls_child_t *child;
|
|
|
|
|
|
|
|
for (item = opal_list_get_first(&orte_local_children);
|
|
|
|
item != opal_list_get_end(&orte_local_children);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
child = (orte_odls_child_t*)item;
|
|
|
|
|
|
|
|
/* is this child part of the specified job? */
|
|
|
|
if (child->name->jobid == job && !child->coll_recvd) {
|
|
|
|
/* if this child has *not* participated yet, return false */
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if we get here, then everyone in the job has participated */
|
|
|
|
return true;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
static int daemon_collective(orte_process_name_t *sender, opal_buffer_t *data)
|
|
|
|
{
|
|
|
|
orte_jobid_t jobid;
|
|
|
|
orte_odls_job_t *jobdat;
|
|
|
|
orte_routed_tree_t *child;
|
|
|
|
orte_std_cntr_t n;
|
|
|
|
opal_list_t daemon_tree;
|
|
|
|
opal_list_item_t *item, *next;
|
|
|
|
int32_t num_contributors;
|
|
|
|
opal_buffer_t buf;
|
|
|
|
orte_process_name_t my_parent, proc;
|
|
|
|
orte_vpid_t daemonvpid;
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s odls: daemon collective called",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* unpack the jobid using this collective */
|
|
|
|
n = 1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* lookup the job record for it */
|
|
|
|
jobdat = NULL;
|
|
|
|
for (item = opal_list_get_first(&orte_local_jobdata);
|
|
|
|
item != opal_list_get_end(&orte_local_jobdata);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
jobdat = (orte_odls_job_t*)item;
|
|
|
|
|
|
|
|
/* is this the specified job? */
|
|
|
|
if (jobdat->jobid == jobid) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == jobdat) {
|
|
|
|
/* race condition - someone sent us a collective before we could
|
|
|
|
* parse the add_local_procs cmd. Just add the jobdat object
|
|
|
|
* and continue
|
|
|
|
*/
|
|
|
|
jobdat = OBJ_NEW(orte_odls_job_t);
|
|
|
|
jobdat->jobid = jobid;
|
|
|
|
opal_list_append(&orte_local_jobdata, &jobdat->super);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* it may be possible to get here prior to having actually finished processing our
|
|
|
|
* local launch msg due to the race condition between different nodes and when
|
|
|
|
* they start their individual procs. Hence, we have to first ensure that we
|
|
|
|
* -have- finished processing the launch msg, or else we won't know whether
|
|
|
|
* or not to wait before sending this on
|
|
|
|
*/
|
|
|
|
ORTE_PROGRESSED_WAIT(jobdat->launch_msg_processed, 0, 1);
|
|
|
|
|
|
|
|
/* unpack the collective type */
|
|
|
|
n = 1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* unpack the number of contributors in this data bucket */
|
|
|
|
n = 1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_contributors, &n, OPAL_INT32))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
jobdat->num_contributors += num_contributors;
|
|
|
|
|
|
|
|
/* xfer the data */
|
|
|
|
opal_dss.copy_payload(&jobdat->collection_bucket, data);
|
|
|
|
|
|
|
|
/* count the number of participants collected */
|
|
|
|
jobdat->num_collected++;
|
|
|
|
|
|
|
|
/* if we haven't already done so, figure out how many participants we
|
|
|
|
* should be expecting
|
|
|
|
*/
|
|
|
|
if (jobdat->num_participating < 0) {
|
|
|
|
if (0 < jobdat->num_local_procs) {
|
|
|
|
/* we have children, so account for our own participation */
|
|
|
|
jobdat->num_participating = 1;
|
|
|
|
} else {
|
|
|
|
jobdat->num_participating = 0;
|
|
|
|
}
|
|
|
|
/* now see if anyone else will be sending us something */
|
|
|
|
OBJ_CONSTRUCT(&daemon_tree, opal_list_t);
|
|
|
|
orte_routed.get_routing_tree(&daemon_tree);
|
|
|
|
/* unfortunately, there is no simple way to determine which of our "child"
|
|
|
|
* daemons in the routing tree will be sending us something. All we can do
|
|
|
|
* is brute force a search, though we attempt to keep it as short as possible
|
|
|
|
*/
|
|
|
|
proc.jobid = jobid;
|
|
|
|
proc.vpid = 0;
|
|
|
|
while (proc.vpid < jobdat->num_procs && 0 < opal_list_get_size(&daemon_tree)) {
|
|
|
|
/* get the daemon that hosts this proc */
|
|
|
|
daemonvpid = orte_ess.proc_get_daemon(&proc);
|
|
|
|
/* is this daemon one of our children, or at least its contribution
|
|
|
|
* will pass through one of our children
|
|
|
|
*/
|
|
|
|
item = opal_list_get_first(&daemon_tree);
|
|
|
|
while (item != opal_list_get_end(&daemon_tree)) {
|
|
|
|
next = opal_list_get_next(item);
|
|
|
|
child = (orte_routed_tree_t*)item;
|
|
|
|
if (child->vpid == daemonvpid || opal_bitmap_is_set_bit(&child->relatives, daemonvpid)) {
|
|
|
|
/* it does - add to num_participating */
|
|
|
|
jobdat->num_participating++;
|
|
|
|
/* remove this from the list so we don't double count it */
|
|
|
|
opal_list_remove_item(&daemon_tree, item);
|
|
|
|
/* done with search */
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
item = next;
|
|
|
|
}
|
|
|
|
proc.vpid++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:bad: daemon collective for job %s from %s type %ld num_collected %d num_participating %d num_contributors %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jobid),
|
|
|
|
ORTE_NAME_PRINT(sender),
|
|
|
|
(long)jobdat->collective_type, jobdat->num_collected,
|
|
|
|
jobdat->num_participating, jobdat->num_contributors));
|
|
|
|
|
|
|
|
if (jobdat->num_collected == jobdat->num_participating) {
|
|
|
|
/* if I am the HNP, go process the results */
|
|
|
|
if (orte_process_info.hnp) {
|
|
|
|
goto hnp_process;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if I am not the HNP, send to my parent */
|
|
|
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
|
|
/* pack the jobid */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobid, 1, ORTE_JOBID))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
/* pack the collective type */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
/* pack the number of contributors */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jobdat->num_contributors, 1, OPAL_INT32))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
/* xfer the payload*/
|
|
|
|
opal_dss.copy_payload(&buf, &jobdat->collection_bucket);
|
|
|
|
/* reset everything for next collective */
|
|
|
|
jobdat->num_contributors = 0;
|
|
|
|
jobdat->num_collected = 0;
|
|
|
|
OBJ_DESTRUCT(&jobdat->collection_bucket);
|
|
|
|
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
|
|
|
|
/* send it */
|
|
|
|
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
|
|
|
|
my_parent.vpid = orte_routed.get_routing_tree(NULL);
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:bad: daemon collective not the HNP - sending to parent %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&my_parent)));
|
|
|
|
if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return rc;
|
|
|
|
}
|
|
|
|
OBJ_DESTRUCT(&buf);
|
|
|
|
}
|
|
|
|
return ORTE_SUCCESS;
|
|
|
|
|
|
|
|
hnp_process:
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:bad: daemon collective HNP - xcasting to job %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_JOBID_PRINT(jobid)));
|
|
|
|
/* setup a buffer to send the results back to the job members */
|
|
|
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
|
|
|
|
|
|
|
if (ORTE_GRPCOMM_BARRIER == jobdat->collective_type) {
|
|
|
|
/* reset everything for next collective */
|
|
|
|
jobdat->num_contributors = 0;
|
|
|
|
jobdat->num_collected = 0;
|
|
|
|
OBJ_DESTRUCT(&jobdat->collection_bucket);
|
|
|
|
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
|
|
|
|
/* don't need anything in this for a barrier */
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
} else if (ORTE_GRPCOMM_ALLGATHER == jobdat->collective_type) {
|
2009-02-04 22:26:35 +00:00
|
|
|
int32_t numc;
|
2009-01-27 19:13:56 +00:00
|
|
|
/* add the data */
|
2009-02-04 22:26:35 +00:00
|
|
|
numc = jobdat->num_contributors;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &numc, 1, OPAL_INT32))) {
|
2009-01-27 19:13:56 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, &jobdat->collection_bucket))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto cleanup;
|
|
|
|
}
|
|
|
|
/* reset everything for next collective */
|
|
|
|
jobdat->num_contributors = 0;
|
|
|
|
jobdat->num_collected = 0;
|
|
|
|
OBJ_DESTRUCT(&jobdat->collection_bucket);
|
|
|
|
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
|
|
|
|
/* send the buffer */
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
/* no other collectives currently supported! */
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
|
|
|
|
rc = ORTE_ERR_NOT_IMPLEMENTED;
|
|
|
|
}
|
|
|
|
|
|
|
|
cleanup:
|
|
|
|
OBJ_DESTRUCT(&buf);
|
|
|
|
|
|
|
|
return ORTE_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void reset_child_participation(orte_jobid_t job)
|
|
|
|
{
|
|
|
|
opal_list_item_t *item;
|
|
|
|
orte_odls_child_t *child;
|
|
|
|
|
|
|
|
for (item = opal_list_get_first(&orte_local_children);
|
|
|
|
item != opal_list_get_end(&orte_local_children);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
child = (orte_odls_child_t*)item;
|
|
|
|
|
|
|
|
/* is this child part of the specified job? */
|
|
|
|
if (child->name->jobid == job) {
|
|
|
|
/* clear flag */
|
|
|
|
child->coll_recvd = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void process_coll_msg(int fd, short event, void *data)
|
|
|
|
{
|
|
|
|
orte_message_event_t *mev = (orte_message_event_t*)data;
|
|
|
|
orte_process_name_t *proc;
|
|
|
|
opal_buffer_t *buf, relay;
|
|
|
|
int32_t rc, n;
|
|
|
|
opal_list_item_t *item;
|
|
|
|
orte_odls_child_t *child;
|
|
|
|
bool found;
|
|
|
|
orte_odls_job_t *jobdat;
|
|
|
|
|
|
|
|
proc = &mev->sender;
|
|
|
|
buf = mev->buffer;
|
|
|
|
|
|
|
|
/* is the sender a local proc, or a daemon relaying the collective? */
|
|
|
|
if (ORTE_PROC_MY_NAME->jobid == proc->jobid) {
|
|
|
|
/* this is a relay - call that code */
|
|
|
|
if (ORTE_SUCCESS != (rc = daemon_collective(proc, buf))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (item = opal_list_get_first(&orte_local_children);
|
|
|
|
item != opal_list_get_end(&orte_local_children);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
child = (orte_odls_child_t*)item;
|
|
|
|
|
|
|
|
/* find this child */
|
|
|
|
if (OPAL_EQUAL == opal_dss.compare(proc, child->name, ORTE_NAME)) {
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:bad: collecting data from child %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(child->name)));
|
|
|
|
|
|
|
|
found = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if it wasn't found on the list, then we need to add it - must have
|
|
|
|
* come from a singleton
|
|
|
|
*/
|
|
|
|
if (!found) {
|
|
|
|
child = OBJ_NEW(orte_odls_child_t);
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy((void**)&child->name, proc, ORTE_NAME))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
opal_list_append(&orte_local_children, &child->super);
|
|
|
|
/* we don't know any other info about the child, so just indicate it's
|
|
|
|
* alive
|
|
|
|
*/
|
|
|
|
child->alive = true;
|
|
|
|
/* setup a jobdat for it */
|
|
|
|
orte_odls_base_setup_singleton_jobdat(proc->jobid);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* this was one of our local procs - find the jobdat for this job */
|
|
|
|
jobdat = NULL;
|
|
|
|
for (item = opal_list_get_first(&orte_local_jobdata);
|
|
|
|
item != opal_list_get_end(&orte_local_jobdata);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
jobdat = (orte_odls_job_t*)item;
|
|
|
|
|
|
|
|
/* is this the specified job? */
|
|
|
|
if (jobdat->jobid == proc->jobid) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == jobdat) {
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
rc = ORTE_ERR_NOT_FOUND;
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* unpack the collective type */
|
|
|
|
n = 1;
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto CLEANUP;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* collect the provided data */
|
|
|
|
opal_dss.copy_payload(&jobdat->local_collection, buf);
|
|
|
|
|
|
|
|
/* flag this proc as having participated */
|
|
|
|
child->coll_recvd = true;
|
|
|
|
|
|
|
|
/* now check to see if all local procs in this job have participated */
|
|
|
|
if (all_children_participated(proc->jobid)) {
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:bad: executing collective",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
|
|
|
|
/* prep a buffer to pass it all along */
|
|
|
|
OBJ_CONSTRUCT(&relay, opal_buffer_t);
|
|
|
|
/* pack the jobid */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &proc->jobid, 1, ORTE_JOBID))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* pack the collective type */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->collective_type, 1, ORTE_GRPCOMM_COLL_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* pack the number of contributors */
|
|
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&relay, &jobdat->num_local_procs, 1, OPAL_INT32))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* xfer the payload*/
|
|
|
|
opal_dss.copy_payload(&relay, &jobdat->local_collection);
|
|
|
|
/* refresh the collection bucket for reuse */
|
|
|
|
OBJ_DESTRUCT(&jobdat->local_collection);
|
|
|
|
OBJ_CONSTRUCT(&jobdat->local_collection, opal_buffer_t);
|
|
|
|
reset_child_participation(proc->jobid);
|
|
|
|
/* pass this to the daemon collective operation */
|
|
|
|
daemon_collective(ORTE_PROC_MY_NAME, &relay);
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:bad: collective completed",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
}
|
|
|
|
|
|
|
|
CLEANUP:
|
|
|
|
/* release the message */
|
|
|
|
OBJ_RELEASE(mev);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void daemon_coll_recv(int status, orte_process_name_t* sender,
|
|
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
|
|
void* cbdata)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
|
|
|
"%s grpcomm:bad:receive got message from %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(sender)));
|
|
|
|
|
|
|
|
/* don't process this right away - we need to get out of the recv before
|
|
|
|
* we process the message as it may ask us to do something that involves
|
|
|
|
* more messaging! Instead, setup an event so that the message gets processed
|
|
|
|
* as soon as we leave the recv.
|
|
|
|
*
|
|
|
|
* The macro makes a copy of the buffer, which we release above - the incoming
|
|
|
|
* buffer, however, is NOT released here, although its payload IS transferred
|
|
|
|
* to the message buffer for later processing
|
|
|
|
*/
|
|
|
|
ORTE_MESSAGE_EVENT(sender, buffer, tag, process_coll_msg);
|
|
|
|
|
|
|
|
/* reissue the recv */
|
|
|
|
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
|
|
|
ORTE_RML_TAG_DAEMON_COLLECTIVE,
|
|
|
|
ORTE_RML_NON_PERSISTENT,
|
|
|
|
daemon_coll_recv,
|
|
|
|
NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|