1
1

* Changes to make MPI_Comm_spawn_multiple work properly

This commit was SVN r3833.
Этот коммит содержится в:
Brian Barrett 2004-12-16 15:42:02 +00:00
родитель cfdeb89420
Коммит d3d52a4aed
9 изменённых файлов: 133 добавлений и 124 удалений

Просмотреть файл

@ -272,17 +272,24 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
MPI_Info info, char *port_name )
int
ompi_comm_start_processes(int count, char **array_of_commands,
char ***array_of_argv,
int *array_of_maxprocs,
MPI_Info *array_of_info,
char *port_name)
{
mca_ns_base_jobid_t new_jobid;
ompi_rte_node_schedule_t *sched;
ompi_rte_spawn_handle_t *spawn_handle;
ompi_list_t *nodelist=NULL;
ompi_list_t **nodelists = NULL;
ompi_list_t schedlist;
char *tmp, *envvarname, *segment, *my_contact_info;
char cwd[MAXPATHLEN];
ompi_registry_notify_id_t rc_tag;
int i;
int total_start_procs = 0;
int requires;
/* parse the info object */
/* check potentially for:
@ -298,83 +305,96 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
new_jobid = ompi_name_server.create_jobid();
/* get the spawn handle to start spawning stuff */
spawn_handle =
ompi_rte_get_spawn_handle(OMPI_RTE_SPAWN_FROM_MPI|OMPI_RTE_SPAWN_HIGH_QOS,
true);
requires = OMPI_RTE_SPAWN_FROM_MPI | OMPI_RTE_SPAWN_HIGH_QOS;
if (count > 1) requires |= OMPI_RTE_SPAWN_MPMD;
spawn_handle = ompi_rte_get_spawn_handle(requires, true);
if (NULL == spawn_handle) {
printf("show_help: get_spawn_handle failed\n");
return -1;
return OMPI_ERROR;
}
/* BWB - fix jobid, procs, and nodes */
nodelist = ompi_rte_allocate_resources(spawn_handle, new_jobid, 0, maxprocs);
if (NULL == nodelist) {
/* BWB show_help */
printf("show_help: ompi_rte_allocate_resources failed\n");
return -1;
}
/*
* Process mapping
*/
/* get our allocations and set them up, one by one */
OBJ_CONSTRUCT(&schedlist, ompi_list_t);
sched = OBJ_NEW(ompi_rte_node_schedule_t);
ompi_list_append(&schedlist, (ompi_list_item_t*) sched);
/* ompi_cmd_line_get_tail(cmd_line, &(sched->argc), &(sched->argv)); */
ompi_argv_append (&(sched->argc), &(sched->argv), command);
nodelists = malloc(sizeof(ompi_list_t*) * count);
if (NULL == nodelists) {
return OMPI_ERROR;
}
/* iterate through all the counts, creating an app schema entry
for each one */
for (i = 0 ; i < count ; ++i) {
nodelists[i] = ompi_rte_allocate_resources(spawn_handle,
new_jobid, 0,
array_of_maxprocs[i]);
if (NULL == nodelists[i]) {
/* BWB - XXX - help - need to unwind what already done */
return OMPI_ERROR;
}
total_start_procs += array_of_maxprocs[i];
/*
* Process mapping
*/
sched = OBJ_NEW(ompi_rte_node_schedule_t);
ompi_argv_append (&(sched->argc), &(sched->argv),
array_of_commands[i]);
if (argv != MPI_ARGV_NULL ) {
int i=0;
char *arg=argv[i];
if (array_of_argv != MPI_ARGVS_NULL &&
array_of_argv[i] != MPI_ARGV_NULL ) {
int j = 0;
char *arg = array_of_argv[i][j];
while ( arg!=NULL ) {
ompi_argv_append(&(sched->argc), &(sched->argv), arg);
arg = argv[++i];
}
}
while (arg != NULL) {
ompi_argv_append(&(sched->argc), &(sched->argv), arg);
arg = array_of_argv[i][++j];
}
}
/*
* build environment to be passed
*/
mca_pcm_base_build_base_env(environ, &(sched->envc), &(sched->env));
/*
* build environment to be passed
*/
mca_pcm_base_build_base_env(environ, &(sched->envc), &(sched->env));
/* set initial contact info */
if (ompi_process_info.seed) {
my_contact_info = mca_oob_get_contact_info();
} else {
my_contact_info = strdup(ompi_universe_info.ns_replica);
}
/* set initial contact info */
if (ompi_process_info.seed) {
my_contact_info = mca_oob_get_contact_info();
} else {
my_contact_info = strdup(ompi_universe_info.ns_replica);
}
asprintf(&tmp, "OMPI_MCA_ns_base_replica=%s", my_contact_info);
ompi_argv_append(&(sched->envc), &(sched->env), tmp);
free(tmp);
asprintf(&tmp, "OMPI_MCA_ns_base_replica=%s", my_contact_info);
ompi_argv_append(&(sched->envc), &(sched->env), tmp);
free(tmp);
asprintf(&tmp, "OMPI_MCA_gpr_base_replica=%s", my_contact_info);
ompi_argv_append(&(sched->envc), &(sched->env), tmp);
free(tmp);
asprintf(&tmp, "OMPI_MCA_gpr_base_replica=%s", my_contact_info);
ompi_argv_append(&(sched->envc), &(sched->env), tmp);
free(tmp);
if (NULL != ompi_universe_info.name) {
asprintf(&tmp, "OMPI_universe_name=%s", ompi_universe_info.name);
ompi_argv_append(&(sched->envc), &(sched->env), tmp);
free(tmp);
}
if (NULL != ompi_universe_info.name) {
asprintf(&tmp, "OMPI_universe_name=%s", ompi_universe_info.name);
ompi_argv_append(&(sched->envc), &(sched->env), tmp);
free(tmp);
}
/* Add environment variable with the contact information for the
child processes */
asprintf(&envvarname, "OMPI_PARENT_PORT_%u", new_jobid);
asprintf(&tmp, "%s=%s", envvarname, port_name);
ompi_argv_append(&(sched->envc), &(sched->env), tmp);
free(tmp);
free(envvarname);
getcwd(cwd, MAXPATHLEN);
sched->cwd = strdup(cwd);
sched->nodelist = nodelist;
/* Add environment variable with the contact information for the
child processes */
asprintf(&envvarname, "OMPI_PARENT_PORT_%u", new_jobid);
asprintf(&tmp, "%s=%s", envvarname, port_name);
ompi_argv_append(&(sched->envc), &(sched->env), tmp);
free(tmp);
free(envvarname);
if (sched->argc == 0) {
printf("no app to start\n");
return MPI_ERR_ARG;
}
getcwd(cwd, MAXPATHLEN);
sched->cwd = strdup(cwd);
sched->nodelist = nodelists[i];
if (sched->argc == 0) {
printf("no app to start\n");
return MPI_ERR_ARG;
}
ompi_list_append(&schedlist, (ompi_list_item_t*) sched);
} /* for (i = 0 ; i < count ; ++i) */
/*
@ -391,7 +411,7 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
OMPI_REGISTRY_OR,
segment,
NULL,
maxprocs,
total_start_procs,
ompi_rte_all_procs_registered, NULL);
free(segment);
@ -417,8 +437,15 @@ int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
/*
* Clean up
*/
if (NULL != nodelist) ompi_rte_deallocate_resources(spawn_handle,
new_jobid, nodelist);
if (NULL != nodelists) {
for (i = 0 ; i < count ; ++i) {
if (NULL != nodelists[i]) {
ompi_rte_deallocate_resources(spawn_handle,
new_jobid, nodelists[i]);
}
}
free(nodelists);
}
if (NULL != spawn_handle) OBJ_RELEASE(spawn_handle);
OBJ_DESTRUCT(&schedlist);

Просмотреть файл

@ -416,12 +416,14 @@ struct ompi_communicator_t {
void ompi_comm_reg_init(void);
void ompi_comm_reg_finalize(void);
/* start the new processes from MPI_Comm_spawn. Initial version,
* a version for Comm_spawn_multiple still missing.
/* start the new processes from MPI_Comm_spawn_multiple. Initial
* version, very rough
*/
int ompi_comm_start_processes (char *command, char **argv, int maxprocs,
MPI_Info info, char *port_name);
int ompi_comm_start_processes(int count, char **array_of_commands,
char ***array_of_argv,
int *array_of_maxprocs,
MPI_Info *array_of_info,
char *port_name);
/*
* This routine checks, whether an application has been spawned

Просмотреть файл

@ -103,11 +103,8 @@ typedef mca_llm_base_component_1_0_0_t mca_llm_base_component_t;
/**
* Allocate requested resources
*
* Allocate the specified nodes / processes for use in a new job.
* Requires a jobid from the PCM interface. The allocation returned
* may be smaller than requested - it is up to the caller to proceed
* as appropriate should this occur. This function should only be
* called once per jobid.
* See notes for \c ompi_rte_allocate_resources() for complete
* description.
*
* @param jobid (IN) Jobid with which to associate the given resources.
* @param nodes (IN) Number of ndoes to try to allocate. If 0, the

Просмотреть файл

@ -177,11 +177,8 @@ typedef mca_pcm_base_component_1_0_0_t mca_pcm_base_component_t;
/**
* Allocate requested resources
*
* Allocate the specified nodes / processes for use in a new job.
* Requires a jobid from the PCM interface. The allocation returned
* may be smaller than requested - it is up to the caller to proceed
* as appropriate should this occur. This function should only be
* called once per jobid.
* See notes for \c ompi_rte_allocate_resources() for complete
* description.
*
* @param me (IN) Pointer to the module struct
* @param jobid (IN) Jobid with which to associate the given resources.

Просмотреть файл

@ -119,6 +119,8 @@ mca_pcm_rms_init(int *priority,
if (0 != (constraints & OMPI_RTE_SPAWN_DAEMON)) return NULL;
/* no MPI_COMM_SPAWN* */
if (0 != (constraints & OMPI_RTE_SPAWN_FROM_MPI)) return NULL;
/* we don't really have a way of doing MPMD with rms :( */
if (0 != (constraints & OMPI_RTE_SPAWN_MPMD)) return NULL;
/* see if we are an RMS system */
num_cpus = rms_numCpus(NULL);

Просмотреть файл

@ -133,6 +133,9 @@ mca_pcm_slurm_init(int *priority,
if (0 != (constraints & OMPI_RTE_SPAWN_DAEMON)) return NULL;
/* no MPI_COMM_SPAWN* */
if (0 != (constraints & OMPI_RTE_SPAWN_FROM_MPI)) return NULL;
/* BWB - XXX - Fix me. Disable MPMD with slurm for now. I think
we can make this work later on.... */
if (0 != (constraints & OMPI_RTE_SPAWN_MPMD)) return NULL;
srun = ompi_path_env_findv("srun", X_OK, environ, NULL);
if (NULL == srun) return NULL;

Просмотреть файл

@ -89,7 +89,8 @@ int MPI_Comm_spawn(char *command, char **argv, int maxprocs, MPI_Info info,
/* Open a port. The port_name is passed as an environment variable
to the children. */
ompi_open_port (port_name);
ompi_comm_start_processes (command, argv, maxprocs, info, port_name);
ompi_comm_start_processes (1, &command, &argv, &maxprocs,
&info, port_name);
tmp_port = ompi_parse_port (port_name, &tag);
free(tmp_port);
}

Просмотреть файл

@ -39,6 +39,8 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o
int totalnumprocs=0;
ompi_communicator_t *newcomp=NULL;
int send_first=0; /* they are contacting us first */
char port_name[MPI_MAX_PORT_NAME];
char *tmp_port;
if ( MPI_PARAM_CHECK ) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
@ -87,10 +89,6 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
}
if ( NULL == array_of_argv ) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
}
if ( NULL == array_of_maxprocs ) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
@ -104,10 +102,6 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
}
if ( NULL == array_of_argv[i] ) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
}
if ( 0 > array_of_maxprocs[i] ) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
@ -117,35 +111,19 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o
}
if ( rank == root ) {
for ( i=0; i < count; i++ ) {
totalnumprocs += array_of_maxprocs[i];
}
/* parse the info[i] */
/* check potentially for:
- "host": desired host where to spawn the processes
- "arch": desired architecture
- "wdir": directory, where executable can be found
- "path": list of directories where to look for the executable
- "file": filename, where additional information is provided.
- "soft": see page 92 of MPI-2.
*/
/* map potentially array_of_argvs == MPI_ARGVS_NULL to a correct value */
/* map potentially array_of_argvs[i] == MPI_ARGV_NULL to a correct value.
not required by the standard. */
/* start processes */
/* publish name, which should be based on the jobid of the children */
/* rc = ompi_comm_namepublish (service_name, port_name ); */
/* Open a port. The port_name is passed as an environment variable
to the children. */
ompi_open_port (port_name);
ompi_comm_start_processes(count, array_of_commands,
array_of_argv, array_of_maxprocs,
array_of_info, port_name);
tmp_port = ompi_parse_port (port_name, &tag);
free(tmp_port);
}
rc = ompi_comm_connect_accept (comm, root, NULL, send_first, &newcomp, tag);
if ( rank == root ) {
/* unpublish name */
}
/* close the port again. Nothing has to be done for that at the moment.*/
/* set array of errorcodes */
if (MPI_ERRCODES_IGNORE != array_of_errcodes) {

Просмотреть файл

@ -69,6 +69,10 @@
MPI_COMM_SPAWN and MPI_COMM_SPAWN_MULTIPLE. The calling process
will follow the semantics of the MPI_COMM_SPAWN_* functions. */
#define OMPI_RTE_SPAWN_FROM_MPI 0x0008
/** Spawn constraint - require ability to launch either MPMD (hence
the name) applications or applications with specific placement of
processes. */
#define OMPI_RTE_SPAWN_MPMD 0x0010
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
@ -229,7 +233,10 @@ OMPI_DECLSPEC ompi_rte_spawn_handle_t* ompi_rte_get_spawn_handle(int criteria
*
* Allocate the specified nodes / processes for use in a new job.
* This function should be called exactly once per call to \c
* ompi_rte_spawn_procs.
* ompi_rte_spawn_procs, unless \c OMPI_RTE_SPAWN_MPMD was
* specified as a constraint to \c ompi_rte_get_spawn_handle(), in
* which case this function can be called as many times as
* necessary.
*
* @param handle (IN) Handle from \c ompi_rte_get_spawn_handle
* @param jobid (IN) Jobid with which to associate the given resources.
@ -252,11 +259,6 @@ OMPI_DECLSPEC ompi_rte_spawn_handle_t* ompi_rte_get_spawn_handle(int criteria
* can be called again, but with a smaller
* resource request.
*
* @note In the future, a more complex resource allocation
* function may be added, which allows for complicated
* resource requests. This function will continue to exist
* as a special case of that function.
*
* Some systems are not capable of providing a maximum
* available resource count and there is an inherent race
* condition to do so in many other systems. On these