diff --git a/ompi/mca/dpm/dpm.h b/ompi/mca/dpm/dpm.h index 6ef25d5a41..d2d4a82290 100644 --- a/ompi/mca/dpm/dpm.h +++ b/ompi/mca/dpm/dpm.h @@ -49,12 +49,11 @@ BEGIN_C_DECLS #define OMPI_RML_TAG_COMM_CID_INTRA OMPI_RML_TAG_BASE+4 #define OMPI_RML_TAG_XOOB OMPI_RML_TAG_BASE+5 #define OMPI_RML_TAG_SM_BACK_FILE_CREATED OMPI_RML_TAG_BASE+6 -#define OMPI_RML_TAG_WIREUP OMPI_RML_TAG_BASE+7 -#define OMPI_CRCP_COORD_BOOKMARK_TAG OMPI_RML_TAG_BASE+8 -#define OMPI_COMM_JOIN_TAG OMPI_RML_TAG_BASE+9 +#define OMPI_CRCP_COORD_BOOKMARK_TAG OMPI_RML_TAG_BASE+7 +#define OMPI_COMM_JOIN_TAG OMPI_RML_TAG_BASE+8 /* support for shared memory collectives */ -#define OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED OMPI_RML_TAG_BASE+10 +#define OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED OMPI_RML_TAG_BASE+9 #define OMPI_RML_TAG_DYNAMIC OMPI_RML_TAG_BASE+200 diff --git a/ompi/runtime/mpiruntime.h b/ompi/runtime/mpiruntime.h index afcf5a58e6..3af6ccc27c 100644 --- a/ompi/runtime/mpiruntime.h +++ b/ompi/runtime/mpiruntime.h @@ -109,7 +109,6 @@ int ompi_mpi_abort(struct ompi_communicator_t* comm, * Do a preconnect of MPI connections (i.e., force connections to * be made if they will be made). */ -int ompi_init_preconnect_oob(void); int ompi_init_preconnect_mpi(void); END_C_DECLS diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c index fecab90604..df3565a0f3 100644 --- a/ompi/runtime/ompi_mpi_init.c +++ b/ompi/runtime/ompi_mpi_init.c @@ -36,6 +36,7 @@ #include "opal/runtime/opal_progress.h" #include "opal/threads/threads.h" #include "opal/util/argv.h" +#include "opal/util/error.h" #include "opal/util/stacktrace.h" #include "opal/util/num_procs.h" #include "opal/util/show_help.h" @@ -43,15 +44,15 @@ #include "opal/event/event.h" #include "orte/util/proc_info.h" -#include "orte/util/session_dir.h" -#include "orte/util/name_fns.h" #include "orte/runtime/runtime.h" -#include "orte/mca/rml/rml.h" -#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/grpcomm/grpcomm.h" #include "orte/runtime/orte_globals.h" #include "orte/util/show_help.h" +#if !ORTE_DISABLE_FULL_SUPPORT +#include "orte/mca/routed/routed.h" +#endif + #include "ompi/constants.h" #include "ompi/mpi/f77/constants.h" #include "ompi/runtime/mpiruntime.h" @@ -294,7 +295,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) gettimeofday(&ompistart, NULL); } - /* Setup ORTE stage 1, note that we are not infrastructre */ + /* Setup ORTE - note that we are not a tool */ if (ORTE_SUCCESS != (ret = orte_init(ORTE_NON_TOOL))) { error = "ompi_mpi_init: orte_init failed"; @@ -302,6 +303,16 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) } orte_setup = true; + if (!ORTE_DISABLE_FULL_SUPPORT) { + /* warmup the OOB routes. Do this here because + it will go much faster before the event library is switched + into non-blocking mode */ + if (OMPI_SUCCESS != (ret = orte_routed.warmup_routes())) { + error = "orte_routed_warmup_routes() failed"; + goto error; + } + } + /* check for timing request - get stop time and report elapsed time if so */ if (timing && 0 == ORTE_PROC_MY_NAME->vpid) { gettimeofday(&ompistop, NULL); @@ -600,14 +611,6 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) orte_process_info.nodename); } - /* wire up the oob interface, if requested. Do this here because - it will go much faster before the event library is switched - into non-blocking mode */ - if (OMPI_SUCCESS != (ret = ompi_init_preconnect_oob())) { - error = "ompi_mpi_do_preconnect_oob() failed"; - goto error; - } - /* Do we need to wait for a debugger? */ ompi_wait_for_debugger(); diff --git a/ompi/runtime/ompi_mpi_preconnect.c b/ompi/runtime/ompi_mpi_preconnect.c index a68181cee5..e89f520f15 100644 --- a/ompi/runtime/ompi_mpi_preconnect.c +++ b/ompi/runtime/ompi_mpi_preconnect.c @@ -25,9 +25,6 @@ #include "ompi/runtime/mpiruntime.h" #include "ompi/mca/dpm/dpm.h" -#include "orte/mca/rml/rml.h" -#include "orte/mca/rml/rml_types.h" - int ompi_init_preconnect_mpi(void) { @@ -84,80 +81,3 @@ ompi_init_preconnect_mpi(void) return ret; } - - -int -ompi_init_preconnect_oob(void) -{ - int param, ret, value = 0; - size_t world_size, next, prev, i, j, world_rank, simultaneous; - ompi_proc_t **procs; - struct iovec inmsg[1], outmsg[1]; - - param = mca_base_param_find("mpi", NULL, "preconnect_oob"); - if (OMPI_ERROR == param) return OMPI_SUCCESS; - ret = mca_base_param_lookup_int(param, &value); - if (OMPI_SUCCESS != ret) return OMPI_SUCCESS; - if (0 == value) { - param = mca_base_param_find("mpi", NULL, "preconnect_all"); - if (OMPI_ERROR == param) return OMPI_SUCCESS; - ret = mca_base_param_lookup_int(param, &value); - if (OMPI_SUCCESS != ret) return OMPI_SUCCESS; - } - if (0 == value) return OMPI_SUCCESS; - - param = mca_base_param_find("mpi", NULL, "preconnect_oob_simultaneous"); - if (OMPI_ERROR == param) return OMPI_SUCCESS; - ret = mca_base_param_lookup_int(param, &value); - if (OMPI_SUCCESS != ret) return OMPI_SUCCESS; - simultaneous = (value < 1) ? 1 : value; - - procs = ompi_proc_world(&world_size); - - inmsg[0].iov_base = outmsg[0].iov_base = NULL; - inmsg[0].iov_len = outmsg[0].iov_len = 0; - - /* proc_world and ompi_comm_world should have the same proc list... */ - if ((int) world_size != ompi_comm_size(MPI_COMM_WORLD)) { - return OMPI_ERR_NOT_FOUND; - } else if (ompi_proc_local() != - procs[ompi_comm_rank(MPI_COMM_WORLD)]) { - return OMPI_ERR_NOT_FOUND; - } - world_rank = (size_t) ompi_comm_rank(MPI_COMM_WORLD); - - /* Each iteration, every process sends to its neighbor i hops to - the right and receives from its neighbor i hops to the left. - This limits any "flooding" effect that can occur with other - connection algorithms, which can overwhelm the out-of-band - connection system, leading to poor performance and hangs. */ - if (world_size < simultaneous) { - simultaneous = world_size; - } - for (i = 1 ; i <= world_size / 2 ; i += simultaneous) { - for (j = 0 ; j < simultaneous ; ++j) { - next = (world_rank + (i + j )) % world_size; - - /* sends do not wait for a match */ - ret = orte_rml.send(&procs[next]->proc_name, - outmsg, - 1, - OMPI_RML_TAG_WIREUP, - 0); - if (ret < 0) return ret; - } - for (j = 0 ; j < simultaneous ; ++j) { - prev = (world_rank - (i + j) + world_size) % world_size; - - ret = orte_rml.recv(&procs[prev]->proc_name, - inmsg, - 1, - OMPI_RML_TAG_WIREUP, - 0); - if (ret < 0) return ret; - } - } - - return OMPI_SUCCESS; -} - diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_module.c b/orte/mca/grpcomm/basic/grpcomm_basic_module.c index f0e5752e56..de1b004514 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_module.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_module.c @@ -476,10 +476,15 @@ static int modex(opal_list_t *procs) goto cleanup; } - /* decide if we need to add the architecture to the modex */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; + /* decide if we need to add the architecture to the modex. Check + * first to see if hetero is enabled - if not, then we clearly + * don't need to exchange arch's as they are all identical + */ + if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) { + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } } /* pack the entries we have received */ diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index a87d29cbc1..ea432e70e6 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -236,7 +236,10 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, goto REPORT_ERROR; } /* update the routing tree */ - orte_routed.update_routing_tree(); + if (ORTE_SUCCESS != (rc = orte_routed.update_routing_tree())) { + ORTE_ERROR_LOG(rc); + goto REPORT_ERROR; + } /* unpack the #bytes of daemon wireup info in the message */ cnt=1; diff --git a/orte/mca/routed/binomial/routed_binomial.c b/orte/mca/routed/binomial/routed_binomial.c index 5ffd5622a0..b5baeb3568 100644 --- a/orte/mca/routed/binomial/routed_binomial.c +++ b/orte/mca/routed/binomial/routed_binomial.c @@ -43,6 +43,7 @@ static bool route_is_defined(const orte_process_name_t *target); static int update_routing_tree(void); static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children); static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf); +static int warmup_routes(void); #if OPAL_ENABLE_FT == 1 static int binomial_ft_event(int state); @@ -59,6 +60,7 @@ orte_routed_module_t orte_routed_binomial_module = { update_route, get_route, init_routes, + warmup_routes, route_lost, route_is_defined, update_routing_tree, @@ -575,6 +577,24 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) } } +static int warmup_routes(void) +{ + opal_buffer_t buf; + orte_daemon_cmd_flag_t cmd=ORTE_DAEMON_NULL_CMD; + int rc; + + /* send a NULL command to my parent */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD); + if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_DAEMON, 0))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return rc; + } + OBJ_DESTRUCT(&buf); + return ORTE_SUCCESS; +} + static int route_lost(const orte_process_name_t *route) { /* if we lose the connection to the lifeline and we are NOT already, diff --git a/orte/mca/routed/direct/routed_direct.c b/orte/mca/routed/direct/routed_direct.c index 2846432c9a..c837c9aa04 100644 --- a/orte/mca/routed/direct/routed_direct.c +++ b/orte/mca/routed/direct/routed_direct.c @@ -47,6 +47,7 @@ static bool route_is_defined(const orte_process_name_t *target); static int update_routing_tree(void); static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children); static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf); +static int warmup_routes(void); #if OPAL_ENABLE_FT == 1 static int direct_ft_event(int state); @@ -60,6 +61,7 @@ orte_routed_module_t orte_routed_direct_module = { update_route, get_route, init_routes, + warmup_routes, route_lost, route_is_defined, update_routing_tree, @@ -562,6 +564,60 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndata) } } +static int warmup_routes(void) +{ + struct iovec inmsg[1], outmsg[1]; + int i, world_size, world_rank, ret; + orte_process_name_t proc; + + /* if I am a daemon, tool, or HNP, do nothing */ + if (orte_process_info.daemon || + orte_process_info.hnp || + orte_process_info.tool) { + return ORTE_SUCCESS; + } + + /* I am an application process. In this case, we + * do a semi-intelligent messaging scheme to + * force the sockets to be opened + */ + world_size = orte_process_info.num_procs; + world_rank = ORTE_PROC_MY_NAME->vpid; + proc.jobid = ORTE_PROC_MY_NAME->jobid; + for (i = 1 ; i <= world_size / 2 ; i ++) { + proc.vpid = (world_rank + i) % world_size; + + OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, + "%s routed_direct_warmup: sending to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc))); + + /* sends do not wait for a match */ + ret = orte_rml.send(&proc, + outmsg, + 1, + ORTE_RML_TAG_WIREUP, + 0); + if (ret < 0) return ret; + + proc.vpid = (world_rank - i + world_size) % world_size; + + OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, + "%s routed_direct_warmup: recv from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc))); + + ret = orte_rml.recv(&proc, + inmsg, + 1, + ORTE_RML_TAG_WIREUP, + 0); + if (ret < 0) return ret; + } + + return ORTE_SUCCESS; +} + static int route_lost(const orte_process_name_t *route) { /* if we lose the connection to the lifeline and we are NOT already, diff --git a/orte/mca/routed/linear/routed_linear.c b/orte/mca/routed/linear/routed_linear.c index 6a55206b5b..da1b4c51c1 100644 --- a/orte/mca/routed/linear/routed_linear.c +++ b/orte/mca/routed/linear/routed_linear.c @@ -42,6 +42,7 @@ static bool route_is_defined(const orte_process_name_t *target); static int update_routing_tree(void); static orte_vpid_t get_routing_tree(orte_jobid_t job, opal_list_t *children); static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf); +static int warmup_routes(void); #if OPAL_ENABLE_FT == 1 static int linear_ft_event(int state); @@ -55,6 +56,7 @@ orte_routed_module_t orte_routed_linear_module = { update_route, get_route, init_routes, + warmup_routes, route_lost, route_is_defined, update_routing_tree, @@ -558,6 +560,11 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) } } +static int warmup_routes(void) +{ + return ORTE_SUCCESS; +} + static int route_lost(const orte_process_name_t *route) { /* if we lose the connection to the lifeline and we are NOT already, diff --git a/orte/mca/routed/routed.h b/orte/mca/routed/routed.h index 80d98baf10..b69c53b69e 100644 --- a/orte/mca/routed/routed.h +++ b/orte/mca/routed/routed.h @@ -195,6 +195,15 @@ typedef int (*orte_routed_module_update_routing_tree_fn_t)(void); typedef orte_vpid_t (*orte_routed_module_get_routing_tree_fn_t)(orte_jobid_t job, opal_list_t *children); +/* + * Warmup routes + * + * Preconnect the module's routes so that the sockets are created + * and ready for messaging. Sends 0-byte messages to those + * processes that are directly connected + */ +typedef int (*orte_routed_module_warmup_routes_fn_t)(void); + /** * Handle fault tolerance updates * @@ -223,6 +232,7 @@ struct orte_routed_module_t { orte_routed_module_update_route_fn_t update_route; orte_routed_module_get_route_fn_t get_route; orte_routed_module_init_routes_fn_t init_routes; + orte_routed_module_warmup_routes_fn_t warmup_routes; orte_routed_module_route_lost_fn_t route_lost; orte_routed_module_route_is_defined_fn_t route_is_defined; /* fns for daemons */ diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index a574ac9172..263dca7992 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -437,6 +437,11 @@ static int process_commands(orte_process_name_t* sender, /* now process the command locally */ switch(command) { + /**** NULL ****/ + case ORTE_DAEMON_NULL_CMD: + ret = ORTE_SUCCESS; + break; + /**** KILL_LOCAL_PROCS ****/ case ORTE_DAEMON_KILL_LOCAL_PROCS: /* unpack the jobid */ diff --git a/orte/util/nidmap.c b/orte/util/nidmap.c index 2e2185ed24..9bec82c843 100644 --- a/orte/util/nidmap.c +++ b/orte/util/nidmap.c @@ -47,9 +47,8 @@ int orte_util_encode_nodemap(opal_byte_object_t *boptr) char *nodename; opal_buffer_t buf; int step; -#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT int32_t *arch; -#endif + bool homo; /* setup a buffer for tmp use */ OBJ_CONSTRUCT(&buf, opal_buffer_t); @@ -229,17 +228,42 @@ int orte_util_encode_nodemap(opal_byte_object_t *boptr) opal_dss.pack(&buf, vpids, num_nodes, ORTE_VPID); free(vpids); -#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT - /* allocate space for the node arch */ - arch = (int32_t*)malloc(num_nodes * 4); - /* transfer the data from the nodes */ - for (i=0; i < num_nodes; i++) { - arch[i] = nodes[i]->arch; + if (OMPI_ENABLE_HETEROGENEOUS_SUPPORT) { + /* check to see if all reported archs are the same */ + homo = true; + for (i=0; i < num_nodes; i++) { + if (arch[i] != arch[0]) { + homo = false; + break; + } + } + if (homo) { + /* if everything is homo, just set that + * flag - no need to send everything + */ + num_digs = 0; + opal_dss.pack(&buf, &num_digs, 1, OPAL_UINT8); + } else { + /* it isn't homo, so we have to pass the + * archs to the daemons + */ + num_digs = 1; + opal_dss.pack(&buf, &num_digs, 1, OPAL_UINT8); + /* allocate space for the node arch */ + arch = (int32_t*)malloc(num_nodes * 4); + /* transfer the data from the nodes */ + for (i=0; i < num_nodes; i++) { + arch[i] = nodes[i]->arch; + } + /* pack the values */ + opal_dss.pack(&buf, arch, num_nodes, OPAL_INT32); + free(arch); + } + } else { + /* pack a flag indicating that the archs are the same */ + num_digs = 0; + opal_dss.pack(&buf, &num_digs, 1, OPAL_UINT8); } - /* pack the values */ - opal_dss.pack(&buf, arch, num_nodes, OPAL_INT32); - free(arch); -#endif /* transfer the payload to the byte object */ opal_dss.unload(&buf, (void**)&boptr->bytes, &boptr->size); @@ -259,9 +283,7 @@ int orte_util_decode_nodemap(opal_byte_object_t *bo, opal_pointer_array_t *nodes orte_nid_t **nd; uint8_t incdec; int32_t index, step; -#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT int32_t *arch; -#endif opal_buffer_t buf; OPAL_OUTPUT_VERBOSE((2, orte_debug_output, @@ -427,20 +449,25 @@ vpids: orte_process_info.num_procs = num_daemons; } - -#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT - /* allocate space for the node arch */ - arch = (int32_t*)malloc(num_nodes * 4); - /* unpack the values */ - n=num_nodes; - opal_dss.unpack(&buf, arch, &n, OPAL_INT32); - /* transfer the data to the nodes */ - nd = (orte_nid_t**)nodes->addr; - for (i=0; i < num_nodes; i++) { - nd[i]->arch = arch[i]; + /* unpack a flag to see if we are in a homogeneous + * scenario - could be that no hetero is supported, + * or could be that things just are homo anyway + */ + n=1; + opal_dss.unpack(&buf, &num_digs, &n, OPAL_UINT8); + if (0 != num_digs) { + /* hetero situation - get the archs */ + arch = (int32_t*)malloc(num_nodes * 4); + /* unpack the values */ + n=num_nodes; + opal_dss.unpack(&buf, arch, &n, OPAL_INT32); + /* transfer the data to the nodes */ + nd = (orte_nid_t**)nodes->addr; + for (i=0; i < num_nodes; i++) { + nd[i]->arch = arch[i]; + } + free(arch); } - free(arch); -#endif if (0 < opal_output_get_verbosity(orte_debug_output)) { nd = (orte_nid_t**)nodes->addr;