diff --git a/src/communicator/comm_dyn.c b/src/communicator/comm_dyn.c index 6b6f424040..52280f225d 100644 --- a/src/communicator/comm_dyn.c +++ b/src/communicator/comm_dyn.c @@ -272,17 +272,24 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ -int ompi_comm_start_processes (char *command, char **argv, int maxprocs, - MPI_Info info, char *port_name ) +int +ompi_comm_start_processes(int count, char **array_of_commands, + char ***array_of_argv, + int *array_of_maxprocs, + MPI_Info *array_of_info, + char *port_name) { mca_ns_base_jobid_t new_jobid; ompi_rte_node_schedule_t *sched; ompi_rte_spawn_handle_t *spawn_handle; - ompi_list_t *nodelist=NULL; + ompi_list_t **nodelists = NULL; ompi_list_t schedlist; char *tmp, *envvarname, *segment, *my_contact_info; char cwd[MAXPATHLEN]; ompi_registry_notify_id_t rc_tag; + int i; + int total_start_procs = 0; + int requires; /* parse the info object */ /* check potentially for: @@ -298,83 +305,96 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs, new_jobid = ompi_name_server.create_jobid(); /* get the spawn handle to start spawning stuff */ - spawn_handle = - ompi_rte_get_spawn_handle(OMPI_RTE_SPAWN_FROM_MPI|OMPI_RTE_SPAWN_HIGH_QOS, - true); + requires = OMPI_RTE_SPAWN_FROM_MPI | OMPI_RTE_SPAWN_HIGH_QOS; + if (count > 1) requires |= OMPI_RTE_SPAWN_MPMD; + spawn_handle = ompi_rte_get_spawn_handle(requires, true); if (NULL == spawn_handle) { printf("show_help: get_spawn_handle failed\n"); - return -1; + return OMPI_ERROR; } - /* BWB - fix jobid, procs, and nodes */ - nodelist = ompi_rte_allocate_resources(spawn_handle, new_jobid, 0, maxprocs); - if (NULL == nodelist) { - /* BWB show_help */ - printf("show_help: ompi_rte_allocate_resources failed\n"); - return -1; - } - - /* - * Process mapping - */ + /* get our allocations and set them up, one by one */ OBJ_CONSTRUCT(&schedlist, ompi_list_t); - sched = OBJ_NEW(ompi_rte_node_schedule_t); - ompi_list_append(&schedlist, (ompi_list_item_t*) sched); - /* ompi_cmd_line_get_tail(cmd_line, &(sched->argc), &(sched->argv)); */ - ompi_argv_append (&(sched->argc), &(sched->argv), command); + nodelists = malloc(sizeof(ompi_list_t*) * count); + if (NULL == nodelists) { + return OMPI_ERROR; + } + + /* iterate through all the counts, creating an app schema entry + for each one */ + for (i = 0 ; i < count ; ++i) { + nodelists[i] = ompi_rte_allocate_resources(spawn_handle, + new_jobid, 0, + array_of_maxprocs[i]); + if (NULL == nodelists[i]) { + /* BWB - XXX - help - need to unwind what already done */ + return OMPI_ERROR; + } + total_start_procs += array_of_maxprocs[i]; + + + /* + * Process mapping + */ + sched = OBJ_NEW(ompi_rte_node_schedule_t); + ompi_argv_append (&(sched->argc), &(sched->argv), + array_of_commands[i]); - if (argv != MPI_ARGV_NULL ) { - int i=0; - char *arg=argv[i]; + if (array_of_argv != MPI_ARGVS_NULL && + array_of_argv[i] != MPI_ARGV_NULL ) { + int j = 0; + char *arg = array_of_argv[i][j]; - while ( arg!=NULL ) { - ompi_argv_append(&(sched->argc), &(sched->argv), arg); - arg = argv[++i]; - } - } + while (arg != NULL) { + ompi_argv_append(&(sched->argc), &(sched->argv), arg); + arg = array_of_argv[i][++j]; + } + } - /* - * build environment to be passed - */ - mca_pcm_base_build_base_env(environ, &(sched->envc), &(sched->env)); + /* + * build environment to be passed + */ + mca_pcm_base_build_base_env(environ, &(sched->envc), &(sched->env)); - /* set initial contact info */ - if (ompi_process_info.seed) { - my_contact_info = mca_oob_get_contact_info(); - } else { - my_contact_info = strdup(ompi_universe_info.ns_replica); - } + /* set initial contact info */ + if (ompi_process_info.seed) { + my_contact_info = mca_oob_get_contact_info(); + } else { + my_contact_info = strdup(ompi_universe_info.ns_replica); + } - asprintf(&tmp, "OMPI_MCA_ns_base_replica=%s", my_contact_info); - ompi_argv_append(&(sched->envc), &(sched->env), tmp); - free(tmp); + asprintf(&tmp, "OMPI_MCA_ns_base_replica=%s", my_contact_info); + ompi_argv_append(&(sched->envc), &(sched->env), tmp); + free(tmp); - asprintf(&tmp, "OMPI_MCA_gpr_base_replica=%s", my_contact_info); - ompi_argv_append(&(sched->envc), &(sched->env), tmp); - free(tmp); + asprintf(&tmp, "OMPI_MCA_gpr_base_replica=%s", my_contact_info); + ompi_argv_append(&(sched->envc), &(sched->env), tmp); + free(tmp); - if (NULL != ompi_universe_info.name) { - asprintf(&tmp, "OMPI_universe_name=%s", ompi_universe_info.name); - ompi_argv_append(&(sched->envc), &(sched->env), tmp); - free(tmp); - } + if (NULL != ompi_universe_info.name) { + asprintf(&tmp, "OMPI_universe_name=%s", ompi_universe_info.name); + ompi_argv_append(&(sched->envc), &(sched->env), tmp); + free(tmp); + } - /* Add environment variable with the contact information for the - child processes */ - asprintf(&envvarname, "OMPI_PARENT_PORT_%u", new_jobid); - asprintf(&tmp, "%s=%s", envvarname, port_name); - ompi_argv_append(&(sched->envc), &(sched->env), tmp); - free(tmp); - free(envvarname); - - getcwd(cwd, MAXPATHLEN); - sched->cwd = strdup(cwd); - sched->nodelist = nodelist; + /* Add environment variable with the contact information for the + child processes */ + asprintf(&envvarname, "OMPI_PARENT_PORT_%u", new_jobid); + asprintf(&tmp, "%s=%s", envvarname, port_name); + ompi_argv_append(&(sched->envc), &(sched->env), tmp); + free(tmp); + free(envvarname); - if (sched->argc == 0) { - printf("no app to start\n"); - return MPI_ERR_ARG; - } + getcwd(cwd, MAXPATHLEN); + sched->cwd = strdup(cwd); + sched->nodelist = nodelists[i]; + + if (sched->argc == 0) { + printf("no app to start\n"); + return MPI_ERR_ARG; + } + ompi_list_append(&schedlist, (ompi_list_item_t*) sched); + } /* for (i = 0 ; i < count ; ++i) */ /* @@ -391,7 +411,7 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs, OMPI_REGISTRY_OR, segment, NULL, - maxprocs, + total_start_procs, ompi_rte_all_procs_registered, NULL); free(segment); @@ -417,8 +437,15 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs, /* * Clean up */ - if (NULL != nodelist) ompi_rte_deallocate_resources(spawn_handle, - new_jobid, nodelist); + if (NULL != nodelists) { + for (i = 0 ; i < count ; ++i) { + if (NULL != nodelists[i]) { + ompi_rte_deallocate_resources(spawn_handle, + new_jobid, nodelists[i]); + } + } + free(nodelists); + } if (NULL != spawn_handle) OBJ_RELEASE(spawn_handle); OBJ_DESTRUCT(&schedlist); diff --git a/src/communicator/communicator.h b/src/communicator/communicator.h index 8351b4f18e..2247303b23 100644 --- a/src/communicator/communicator.h +++ b/src/communicator/communicator.h @@ -416,12 +416,14 @@ struct ompi_communicator_t { void ompi_comm_reg_init(void); void ompi_comm_reg_finalize(void); - - /* start the new processes from MPI_Comm_spawn. Initial version, - * a version for Comm_spawn_multiple still missing. + /* start the new processes from MPI_Comm_spawn_multiple. Initial + * version, very rough */ - int ompi_comm_start_processes (char *command, char **argv, int maxprocs, - MPI_Info info, char *port_name); + int ompi_comm_start_processes(int count, char **array_of_commands, + char ***array_of_argv, + int *array_of_maxprocs, + MPI_Info *array_of_info, + char *port_name); /* * This routine checks, whether an application has been spawned diff --git a/src/mca/llm/llm.h b/src/mca/llm/llm.h index 4517aedaa2..d86228bbee 100644 --- a/src/mca/llm/llm.h +++ b/src/mca/llm/llm.h @@ -103,11 +103,8 @@ typedef mca_llm_base_component_1_0_0_t mca_llm_base_component_t; /** * Allocate requested resources * - * Allocate the specified nodes / processes for use in a new job. - * Requires a jobid from the PCM interface. The allocation returned - * may be smaller than requested - it is up to the caller to proceed - * as appropriate should this occur. This function should only be - * called once per jobid. + * See notes for \c ompi_rte_allocate_resources() for complete + * description. * * @param jobid (IN) Jobid with which to associate the given resources. * @param nodes (IN) Number of ndoes to try to allocate. If 0, the diff --git a/src/mca/pcm/pcm.h b/src/mca/pcm/pcm.h index 695e55ce2f..cb728c60b4 100644 --- a/src/mca/pcm/pcm.h +++ b/src/mca/pcm/pcm.h @@ -177,11 +177,8 @@ typedef mca_pcm_base_component_1_0_0_t mca_pcm_base_component_t; /** * Allocate requested resources * - * Allocate the specified nodes / processes for use in a new job. - * Requires a jobid from the PCM interface. The allocation returned - * may be smaller than requested - it is up to the caller to proceed - * as appropriate should this occur. This function should only be - * called once per jobid. + * See notes for \c ompi_rte_allocate_resources() for complete + * description. * * @param me (IN) Pointer to the module struct * @param jobid (IN) Jobid with which to associate the given resources. diff --git a/src/mca/pcm/rms/src/pcm_rms_component.c b/src/mca/pcm/rms/src/pcm_rms_component.c index 0f7f823a7c..15396641d0 100644 --- a/src/mca/pcm/rms/src/pcm_rms_component.c +++ b/src/mca/pcm/rms/src/pcm_rms_component.c @@ -119,6 +119,8 @@ mca_pcm_rms_init(int *priority, if (0 != (constraints & OMPI_RTE_SPAWN_DAEMON)) return NULL; /* no MPI_COMM_SPAWN* */ if (0 != (constraints & OMPI_RTE_SPAWN_FROM_MPI)) return NULL; + /* we don't really have a way of doing MPMD with rms :( */ + if (0 != (constraints & OMPI_RTE_SPAWN_MPMD)) return NULL; /* see if we are an RMS system */ num_cpus = rms_numCpus(NULL); diff --git a/src/mca/pcm/slurm/src/pcm_slurm_component.c b/src/mca/pcm/slurm/src/pcm_slurm_component.c index 5bf4573c18..c737dab22a 100644 --- a/src/mca/pcm/slurm/src/pcm_slurm_component.c +++ b/src/mca/pcm/slurm/src/pcm_slurm_component.c @@ -133,6 +133,9 @@ mca_pcm_slurm_init(int *priority, if (0 != (constraints & OMPI_RTE_SPAWN_DAEMON)) return NULL; /* no MPI_COMM_SPAWN* */ if (0 != (constraints & OMPI_RTE_SPAWN_FROM_MPI)) return NULL; + /* BWB - XXX - Fix me. Disable MPMD with slurm for now. I think + we can make this work later on.... */ + if (0 != (constraints & OMPI_RTE_SPAWN_MPMD)) return NULL; srun = ompi_path_env_findv("srun", X_OK, environ, NULL); if (NULL == srun) return NULL; diff --git a/src/mpi/c/comm_spawn.c b/src/mpi/c/comm_spawn.c index 33be6f7115..2dc28636d0 100644 --- a/src/mpi/c/comm_spawn.c +++ b/src/mpi/c/comm_spawn.c @@ -89,7 +89,8 @@ int MPI_Comm_spawn(char *command, char **argv, int maxprocs, MPI_Info info, /* Open a port. The port_name is passed as an environment variable to the children. */ ompi_open_port (port_name); - ompi_comm_start_processes (command, argv, maxprocs, info, port_name); + ompi_comm_start_processes (1, &command, &argv, &maxprocs, + &info, port_name); tmp_port = ompi_parse_port (port_name, &tag); free(tmp_port); } diff --git a/src/mpi/c/comm_spawn_multiple.c b/src/mpi/c/comm_spawn_multiple.c index 31485680d7..047103c077 100644 --- a/src/mpi/c/comm_spawn_multiple.c +++ b/src/mpi/c/comm_spawn_multiple.c @@ -39,6 +39,8 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o int totalnumprocs=0; ompi_communicator_t *newcomp=NULL; int send_first=0; /* they are contacting us first */ + char port_name[MPI_MAX_PORT_NAME]; + char *tmp_port; if ( MPI_PARAM_CHECK ) { OMPI_ERR_INIT_FINALIZE(FUNC_NAME); @@ -87,10 +89,6 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME); } - if ( NULL == array_of_argv ) { - return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, - FUNC_NAME); - } if ( NULL == array_of_maxprocs ) { return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME); @@ -104,10 +102,6 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME); } - if ( NULL == array_of_argv[i] ) { - return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, - FUNC_NAME); - } if ( 0 > array_of_maxprocs[i] ) { return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, FUNC_NAME); @@ -117,35 +111,19 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o } if ( rank == root ) { - for ( i=0; i < count; i++ ) { - totalnumprocs += array_of_maxprocs[i]; - } - /* parse the info[i] */ - - /* check potentially for: - - "host": desired host where to spawn the processes - - "arch": desired architecture - - "wdir": directory, where executable can be found - - "path": list of directories where to look for the executable - - "file": filename, where additional information is provided. - - "soft": see page 92 of MPI-2. - */ - - /* map potentially array_of_argvs == MPI_ARGVS_NULL to a correct value */ - /* map potentially array_of_argvs[i] == MPI_ARGV_NULL to a correct value. - not required by the standard. */ - /* start processes */ - - /* publish name, which should be based on the jobid of the children */ - - /* rc = ompi_comm_namepublish (service_name, port_name ); */ + /* Open a port. The port_name is passed as an environment variable + to the children. */ + ompi_open_port (port_name); + ompi_comm_start_processes(count, array_of_commands, + array_of_argv, array_of_maxprocs, + array_of_info, port_name); + tmp_port = ompi_parse_port (port_name, &tag); + free(tmp_port); } rc = ompi_comm_connect_accept (comm, root, NULL, send_first, &newcomp, tag); - if ( rank == root ) { - /* unpublish name */ - } + /* close the port again. Nothing has to be done for that at the moment.*/ /* set array of errorcodes */ if (MPI_ERRCODES_IGNORE != array_of_errcodes) { diff --git a/src/runtime/runtime.h b/src/runtime/runtime.h index 6252932b0c..91977daacc 100644 --- a/src/runtime/runtime.h +++ b/src/runtime/runtime.h @@ -69,6 +69,10 @@ MPI_COMM_SPAWN and MPI_COMM_SPAWN_MULTIPLE. The calling process will follow the semantics of the MPI_COMM_SPAWN_* functions. */ #define OMPI_RTE_SPAWN_FROM_MPI 0x0008 +/** Spawn constraint - require ability to launch either MPMD (hence + the name) applications or applications with specific placement of + processes. */ +#define OMPI_RTE_SPAWN_MPMD 0x0010 #if defined(c_plusplus) || defined(__cplusplus) extern "C" { @@ -229,7 +233,10 @@ OMPI_DECLSPEC ompi_rte_spawn_handle_t* ompi_rte_get_spawn_handle(int criteria * * Allocate the specified nodes / processes for use in a new job. * This function should be called exactly once per call to \c - * ompi_rte_spawn_procs. + * ompi_rte_spawn_procs, unless \c OMPI_RTE_SPAWN_MPMD was + * specified as a constraint to \c ompi_rte_get_spawn_handle(), in + * which case this function can be called as many times as + * necessary. * * @param handle (IN) Handle from \c ompi_rte_get_spawn_handle * @param jobid (IN) Jobid with which to associate the given resources. @@ -252,11 +259,6 @@ OMPI_DECLSPEC ompi_rte_spawn_handle_t* ompi_rte_get_spawn_handle(int criteria * can be called again, but with a smaller * resource request. * - * @note In the future, a more complex resource allocation - * function may be added, which allows for complicated - * resource requests. This function will continue to exist - * as a special case of that function. - * * Some systems are not capable of providing a maximum * available resource count and there is an inherent race * condition to do so in many other systems. On these