1
1

* Move to using a lazy selection for pcms so that we can have multiple PCM

sets running at once - requires an additional step in spawning to get a
  handle (that will contain multiple pcms when we support multi-cell)
* change the selection logic of the pcms to not care about setting threads,
  but instead to select based on the selected thread level, since it
  would be a little late by the time we did the selection for pcms.
* started the long process of cleaning up the rsh pcm so that it
  actually kills processes and things.  Still doesn't do anything useful,
  but getting to the point where that might be possible

This commit was SVN r2794.
Этот коммит содержится в:
Brian Barrett 2004-09-21 20:27:41 +00:00
родитель 208b8410ba
Коммит 782d3af2b9
26 изменённых файлов: 380 добавлений и 177 удалений

Просмотреть файл

@ -26,8 +26,7 @@ extern "C" {
int mca_llm_base_open(void);
int mca_llm_base_select(const char *active_pcm,
mca_llm_base_module_t *selected,
bool *allow_multi_user_threads,
bool *have_hidden_threads);
bool have_threads);
int mca_llm_base_close(void);
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -39,12 +39,9 @@ typedef struct opened_component_t {
int
mca_llm_base_select(const char *active_pcm,
mca_llm_base_module_t *selected,
bool *allow_multi_user_threads,
bool *have_hidden_threads)
bool have_threads)
{
int priority, best_priority;
bool user_threads, hidden_threads;
bool best_user_threads, best_hidden_threads;
ompi_list_item_t *item;
mca_base_component_list_item_t *cli;
mca_llm_base_component_t *component, *best_component;
@ -76,8 +73,7 @@ mca_llm_base_select(const char *active_pcm,
"llm: base: select: "
"no init function; ignoring component");
} else {
module = component->llm_init(active_pcm, &priority, &user_threads,
&hidden_threads);
module = component->llm_init(active_pcm, have_threads, &priority);
if (NULL == module) {
ompi_output_verbose(10, mca_llm_base_output,
"llm: base: select: init returned failure");
@ -87,8 +83,6 @@ mca_llm_base_select(const char *active_pcm,
priority);
if (priority > best_priority) {
best_priority = priority;
best_user_threads = user_threads;
best_hidden_threads = hidden_threads;
best_component = component;
best_module = module;
}
@ -149,8 +143,6 @@ mca_llm_base_select(const char *active_pcm,
mca_llm_base_selected_component = *best_component;
*selected = *best_module;
*allow_multi_user_threads = best_user_threads;
*have_hidden_threads = best_hidden_threads;
ompi_output_verbose(5, mca_llm_base_output,
"llm: base: select: component %s selected",
component->llm_version.mca_component_name);

Просмотреть файл

@ -20,12 +20,9 @@ char *mca_llm_hostfile_filename = NULL;
struct mca_llm_base_module_1_0_0_t*
mca_llm_hostfile_component_init(const char *active_pcm,
int *priority,
bool *allow_multiple_user_threads,
bool *have_hidden_threads)
bool have_threads,
int *priority)
{
*allow_multiple_user_threads = true;
*have_hidden_threads = false;
*priority = 1;
return &mca_llm_hostfile_module;

Просмотреть файл

@ -32,9 +32,8 @@ extern "C" {
struct mca_llm_base_module_1_0_0_t*
mca_llm_hostfile_component_init(const char *active_pcm,
int *priority,
bool *allow_multiple_user_threads,
bool *have_hidden_threads);
bool have_threads,
int *priority);
int mca_llm_hostfile_component_finalize(void);

Просмотреть файл

@ -35,20 +35,18 @@
* @param active_pcm (IN) Name of the currently active PCM module,
* as it might be useful in determining
* useability.
* @param have_threads (IN) Whether the current running process is
* multi-threaded or not. true means there
* may be concurrent access into the
* underlying components *and* that the
* components may launch new threads.
* @param priority (OUT) Relative priority or ranking use by MCA to
* select a module.
* @param allow_multiple_user_threads (OUT) Whether this module can
* run with multiple threads making calls into
* the library (equivalent of MPI_THREAD_MULTIPLE
* from MPI-land).
* @param have_hidden_threads (OUT) Whether this module needs to start
* a background thread for operation.
*/
typedef struct mca_llm_base_module_1_0_0_t*
(*mca_llm_base_component_init_fn_t)(const char *active_pcm,
int *priority,
bool *allow_multiple_user_threads,
bool *have_hidden_threads);
bool have_threads,
int *priority);
/**

Просмотреть файл

@ -21,14 +21,14 @@ extern "C" {
#endif
int mca_pcm_base_open(void);
/* modules is a pointer to an array of pointers to mca_pcm_base_module_t structs */
int mca_pcm_base_select(bool *allow_multi_user_threads,
bool *have_hidden_threads,
int mca_pcm_base_select(bool have_threads,
int constraint,
mca_pcm_base_module_t ***modules,
int *modules_len);
size_t *modules_len);
int mca_pcm_base_close(void);
/* communicate the important parts of our structs around */
int mca_pcm_base_send_schedule(FILE *fd,
mca_ns_base_jobid_t jobid,
ompi_rte_node_schedule_t *sched,
@ -39,6 +39,13 @@ extern "C" {
ompi_rte_node_schedule_t *sched,
int *num_procs);
/**
* Create pushable environment
*
* Copy the parts of \c in_env that should be moved to remote
* nodes when spawning processes into \c out_envp. All variables
* starting with OMPI_ are copied.
*/
int mca_pcm_base_build_base_env(char **in_env, int *envc, char ***out_envp);
int mca_pcm_base_ioexecvp(char **cmdv, int showout, char *outbuff,
@ -46,6 +53,14 @@ extern "C" {
char* mca_pcm_base_get_username(mca_llm_base_hostfile_node_t *node);
/**
* Get unique id string available from PCM components
*
* PCM components may be able to provide some unique identifier
* that should be used in seperating universes (the Batch ID in PBS,
* for example). This function provides an interface for
* retriving that information from the components.
*/
char *mca_pcm_base_get_unique_id(void);
#if defined(c_plusplus) || defined(__cplusplus)

Просмотреть файл

@ -19,8 +19,6 @@ struct avail_module_t {
ompi_list_item_t super;
mca_pcm_base_module_t *module;
int priority;
bool user_threads;
bool hidden_threads;
};
typedef struct avail_module_t avail_module_t;
OBJ_CLASS_INSTANCE(avail_module_t, ompi_list_item_t, NULL, NULL);
@ -54,14 +52,12 @@ insert_module(ompi_list_t *avail_modules, avail_module_t *module)
* modules. Otherwise, rerturn module with highest priority.
*/
int
mca_pcm_base_select(bool *allow_multi_user_threads,
bool *have_hidden_threads,
mca_pcm_base_select(bool have_threads,
int constraints,
mca_pcm_base_module_t ***modules,
int *modules_len)
size_t *modules_len)
{
int priority;
bool user_threads, hidden_threads;
avail_module_t *avail_module;
ompi_list_item_t *item;
mca_base_component_list_item_t *cli;
@ -85,7 +81,6 @@ mca_pcm_base_select(bool *allow_multi_user_threads,
cli = (mca_base_component_list_item_t *) item;
component = (mca_pcm_base_component_t *) cli->cli_component;
hidden_threads = user_threads = false;
priority = 0;
ompi_output_verbose(10, mca_pcm_base_output,
@ -97,8 +92,7 @@ mca_pcm_base_select(bool *allow_multi_user_threads,
"pcm: base: select: "
"no init function; ignoring component");
} else {
module = component->pcm_init(&priority, &user_threads,
&hidden_threads, constraints);
module = component->pcm_init(&priority, have_threads, constraints);
if (NULL == module) {
ompi_output_verbose(10, mca_pcm_base_output,
"pcm: base: select: init returned failure");
@ -108,8 +102,6 @@ mca_pcm_base_select(bool *allow_multi_user_threads,
priority);
avail_module = OBJ_NEW(avail_module_t);
avail_module->priority = priority;
avail_module->hidden_threads = hidden_threads;
avail_module->user_threads = user_threads;
avail_module->module = module;
insert_module(&avail_module_list, avail_module);

Просмотреть файл

@ -26,7 +26,6 @@ mca_pcm_base_get_unique_id(void)
ompi_list_item_t *item;
mca_base_component_list_item_t *cli;
mca_pcm_base_component_t *component;
mca_pcm_base_module_t *module;
int priority, top_priority;
char *id, *top_id;
int ret;

Просмотреть файл

@ -5,6 +5,7 @@
*/
#include "ompi_config.h"
#include "mca/pcm/pcm.h"
#include "runtime/runtime_types.h"
/*
* Module open / close
@ -29,8 +30,7 @@ OBJ_CLASS_DECLARATION(mca_pcm_ompid_node_t);
*/
struct mca_pcm_base_module_1_0_0_t* mca_pcm_ompid_init(
int *priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads,
bool have_threads,
int constraints);
int mca_pcm_ompid_finalize(struct mca_pcm_base_module_1_0_0_t* me);

Просмотреть файл

@ -67,8 +67,7 @@ int mca_pcm_ompid_close(void)
struct mca_pcm_base_module_1_0_0_t *
mca_pcm_ompid_init(
int *priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads,
bool have_threads,
int constraints)
{
return NULL;

Просмотреть файл

@ -65,7 +65,6 @@
#include "mca/mca.h"
#include "mca/ns/ns.h"
#include "include/types.h"
#include "runtime/runtime_types.h"
/*
* MCA component management functions
@ -98,8 +97,7 @@
*/
typedef struct mca_pcm_base_module_1_0_0_t*
(*mca_pcm_base_component_init_fn_t)(int *priority,
bool *allow_multiple_user_threads,
bool *have_hidden_threads,
bool have_threads,
int constraints);

Просмотреть файл

@ -34,8 +34,7 @@ extern "C" {
* Startup / Shutdown
*/
struct mca_pcm_base_module_1_0_0_t* mca_pcm_rms_init(int *priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads,
bool have_threads;
int constraints);
int mca_pcm_rms_finalize(struct mca_pcm_base_module_1_0_0_t* me);

Просмотреть файл

@ -108,8 +108,7 @@ mca_pcm_rms_component_close(void)
mca_pcm_base_module_t*
mca_pcm_rms_init(int *priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads,
bool have_threads,
int constraints)
{
int debug;
@ -125,11 +124,6 @@ mca_pcm_rms_init(int *priority,
mca_base_param_lookup_int(mca_pcm_rms_param_priority, priority);
if (0 == priority) return NULL;
/* fill in params */
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/* check constrains */
/* no daemon */
if (0 != (constraints & OMPI_RTE_SPAWN_DAEMON)) return NULL;

Просмотреть файл

@ -29,8 +29,7 @@ extern "C" {
* Startup / Shutdown
*/
struct mca_pcm_base_module_1_0_0_t* mca_pcm_rsh_init(int *priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads,
bool have_threads,
int constraints);
int mca_pcm_rsh_finalize(struct mca_pcm_base_module_1_0_0_t* me);

Просмотреть файл

@ -118,8 +118,7 @@ mca_pcm_rsh_component_close(void)
mca_pcm_base_module_t*
mca_pcm_rsh_init(int *priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads,
bool have_threads,
int constraints)
{
int debug;
@ -146,12 +145,8 @@ mca_pcm_rsh_init(int *priority,
mca_base_param_lookup_string(mca_pcm_rsh_param_agent,
&(me->rsh_agent));
*allow_multi_user_threads = true;
*have_hidden_threads = false;
ret = mca_llm_base_select("rsh", &(me->llm), have_threads);
ret = mca_llm_base_select("rsh", &(me->llm),
allow_multi_user_threads,
have_hidden_threads);
if (OMPI_SUCCESS != ret) {
/* well, that can't be good. guess we can't run */
ompi_output_verbose(5, mca_pcm_rsh_output, "init: no llm found");
@ -168,17 +163,6 @@ mca_pcm_rsh_init(int *priority,
me->super.pcm_deallocate_resources = mca_pcm_rsh_deallocate_resources;
me->super.pcm_finalize = mca_pcm_rsh_finalize;
/* DO SOME PARAM "FIXING" */
/* BWB - remove param fixing before 1.0 */
if (0 == me->no_profile) {
printf("WARNING: reseting mca_pcm_rsh_no_profile to 1\n");
me->no_profile = 1;
}
if (0 == me->fast_boot) {
printf("WARNING: reseting mca_pcm_rsh_fast to 1\n");
me->fast_boot = 1;
}
return (mca_pcm_base_module_t*) me;
}

Просмотреть файл

@ -30,9 +30,16 @@
#include "util/numtostr.h"
#include "mca/ns/base/base.h"
/*
* Internal constants
*/
#define BOOTAGENT "mca_pcm_rsh_bootproxy"
#define PRS_BUFSIZE 1024
/*
* Internal functions
*/
static int internal_spawn_proc(mca_pcm_rsh_module_t *me,
mca_ns_base_jobid_t jobid, ompi_rte_node_schedule_t *sched,
ompi_list_t *hostlist,
@ -40,6 +47,16 @@ static int internal_spawn_proc(mca_pcm_rsh_module_t *me,
int num_procs);
/*
* This function just iterates through the schedule list, slicing it
* up into launchable pieces. While this infrastructure supports a
* tree-based or similar launching mechanism, the bootproxy used on
* the other side of the ssh tunnel does not yet support non-local
* booting. So some of this code (just the logic in the looping
* variable) is perhaps a bit overkill. The real work is done by
* internal_spawn_proc, which starts one process via RSH/SSH and then
* sends it the information passed from this function.
*/
int
mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
mca_ns_base_jobid_t jobid, ompi_list_t *schedlist)
@ -53,8 +70,11 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
ompi_list_t launch;
ompi_list_t done;
int ret, i;
int width = 1;
int local_start_vpid = 0;
int width = 1; /* number of procs to send down the tree
at a time. Currently must be 1.
bootproxy must be fixed before
this can be changed. */
int local_offset = 0;
int global_start_vpid = 0;
int num_procs = 0;
int tmp_count;
@ -62,6 +82,7 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
OBJ_CONSTRUCT(&launch, ompi_list_t);
OBJ_CONSTRUCT(&done, ompi_list_t);
/* figure out how many procs we have been allocated */
for (sched_item = ompi_list_get_first(schedlist) ;
sched_item != ompi_list_get_end(schedlist) ;
sched_item = ompi_list_get_next(sched_item)) {
@ -79,11 +100,10 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
}
}
/* BWB - make sure vpids are reserved */
local_start_vpid = 0;
/* reserve a chunk of vpids */
local_offset = 0;
global_start_vpid = (int) ompi_name_server.reserve_range(jobid, num_procs);
for (sched_item = ompi_list_get_first(schedlist) ;
sched_item != ompi_list_get_end(schedlist) ;
sched_item = ompi_list_get_next(sched_item)) {
@ -92,15 +112,9 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
for (node_item = ompi_list_get_first(sched->nodelist) ;
node_item != ompi_list_get_end(sched->nodelist) ;
node_item = ompi_list_get_next(node_item) ) {
node = (ompi_rte_node_allocation_t*) node_item;
data = (mca_llm_base_hostfile_data_t*) node->data;
/*
* make sure I'm the first node in the list and then start
* our deal. We rsh me just like everyone else so that we
* don't have any unexpected environment oddities...
*/
/* BWB - do front of list check! */
host_item = ompi_list_get_first(data->hostlist);
while (host_item != ompi_list_get_end(data->hostlist)) {
@ -127,7 +141,7 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
/* do the launch to the first node in the list, passing
him the rest of the list */
ret = internal_spawn_proc(me, jobid, sched, &launch,
local_start_vpid, global_start_vpid,
local_offset, global_start_vpid,
num_procs);
if (OMPI_SUCCESS != ret) {
/* well, crap! put ourselves back together, I guess.
@ -138,7 +152,7 @@ mca_pcm_rsh_spawn_procs(struct mca_pcm_base_module_1_0_0_t* me_super,
&done);
return ret;
}
local_start_vpid += tmp_count;
local_offset += tmp_count;
/* copy the list over to the done part */
ompi_list_join(&done, ompi_list_get_end(&done), &launch);
@ -325,7 +339,7 @@ internal_spawn_proc(mca_pcm_rsh_module_t *me,
/* starting vpid for launchee's procs */
tmp = ltostr(my_start_vpid);
ompi_argv_append(&cmdc, &cmdv, "--local_start_vpid");
ompi_argv_append(&cmdc, &cmdv, "--local_offset");
ompi_argv_append(&cmdc, &cmdv, tmp);
free(tmp);

Просмотреть файл

@ -8,6 +8,7 @@
#include "include/constants.h"
#include "runtime/runtime.h"
#include "runtime/runtime_internal.h"
#include "event/event.h"
#include "util/output.h"
#include "util/proc_info.h"
@ -37,6 +38,7 @@ int ompi_rte_finalize(void)
mca_gpr_base_close();
mca_oob_base_close();
ompi_rte_internal_fini_spawn();
ompi_session_dir_finalize();
/* All done */

Просмотреть файл

@ -26,6 +26,7 @@
#include "util/sys_info.h"
#include "runtime/runtime.h"
#include "runtime/runtime_internal.h"
/**
* Initialze and setup a process in the OMPI RTE.
@ -129,6 +130,16 @@ int ompi_rte_init_stage1(bool *allow_multi_user_threads, bool *have_hidden_threa
return ret;
}
/*
* Internal startup
*/
if (OMPI_SUCCESS != (ret = ompi_rte_internal_init_spawn())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in ompi_rte_internal_init_spawn\n");
return ret;
}
/*
* Out of Band Messaging
*/

Просмотреть файл

@ -33,8 +33,6 @@ int ompi_rte_init_stage2(bool *allow_multi_user_threads, bool *have_hidden_threa
{
int ret;
bool user_threads, hidden_threads;
mca_pcm_base_module_t **pcm_modules;
int pcm_modules_len;
/*
* Name Server - base already opened in stage1, so just complete the selection
@ -76,31 +74,13 @@ int ompi_rte_init_stage2(bool *allow_multi_user_threads, bool *have_hidden_threa
}
/*
* Process Control and Monitoring
* Process Control and Monitoring - lazy load
*/
if (OMPI_SUCCESS != (ret = mca_pcm_base_open())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in pcm_base_open\n");
return ret;
}
user_threads = true;
hidden_threads = false;
if (OMPI_SUCCESS != (ret = mca_pcm_base_select(&user_threads,
&hidden_threads, 0,
&pcm_modules,
&pcm_modules_len))) {
printf("show_help: ompi_rte_init failed in pcm_base_select\n");
/* JMS show_help */
return ret;
}
if (pcm_modules_len != 1) {
printf("show_help: unexpectedly high return from pcm_modules_len\n");
return -1;
}
mca_pcm = pcm_modules[0];
free(pcm_modules);
*allow_multi_user_threads &= user_threads;
*have_hidden_threads |= hidden_threads;
/*
* Registry

Просмотреть файл

@ -9,25 +9,61 @@
#include "runtime/runtime_types.h"
#include "mca/pcm/pcm.h"
extern mca_pcm_base_module_t *mca_pcm;
ompi_list_t*
ompi_rte_allocate_resources(mca_ns_base_jobid_t jobid, int nodes, int procs)
ompi_rte_allocate_resources(ompi_rte_spawn_handle_t *handle,
mca_ns_base_jobid_t jobid,
int nodes, int procs)
{
if (NULL == mca_pcm->pcm_allocate_resources) {
mca_pcm_base_module_t *active;
if (NULL == handle) {
errno = OMPI_ERR_BAD_PARAM;
return NULL;
}
/* check for invalide jobid */
if (nodes < 0) {
errno = OMPI_ERR_BAD_PARAM;
return NULL;
}
if (procs < 0) {
errno = OMPI_ERR_BAD_PARAM;
return NULL;
}
return mca_pcm->pcm_allocate_resources(mca_pcm, jobid, nodes, procs);
/* remove for multi-cell */
assert(1 == handle->modules_len);
active = handle->modules[0];
if (NULL == active->pcm_allocate_resources) {
errno = OMPI_ERR_NOT_IMPLEMENTED;
return NULL;
}
return active->pcm_allocate_resources(active, jobid, nodes, procs);
}
int
ompi_rte_deallocate_resources(mca_ns_base_jobid_t jobid, ompi_list_t *nodelist)
ompi_rte_deallocate_resources(ompi_rte_spawn_handle_t *handle,
mca_ns_base_jobid_t jobid,
ompi_list_t *nodelist)
{
if (NULL == mca_pcm->pcm_deallocate_resources) {
return OMPI_ERROR;
mca_pcm_base_module_t *active;
if (NULL == handle) return OMPI_ERR_BAD_PARAM;
/* check for invalide jobid */
if (NULL == nodelist) return OMPI_ERR_BAD_PARAM;
/* remove for multi-cell */
assert(1 == handle->modules_len);
active = handle->modules[0];
if (NULL == active->pcm_deallocate_resources) {
return OMPI_ERR_NOT_IMPLEMENTED;
}
return mca_pcm->pcm_deallocate_resources(mca_pcm, jobid, nodelist);
return active->pcm_deallocate_resources(active, jobid, nodelist);
}

Просмотреть файл

@ -5,34 +5,122 @@
#include "ompi_config.h"
#include "include/constants.h"
#include "class/ompi_pointer_array.h"
#include "runtime/runtime.h"
#include "runtime/runtime_types.h"
#include "mca/pcm/pcm.h"
#include "mca/pcm/base/base.h"
#include "mca/pcmclient/pcmclient.h"
#include "mca/pcmclient/base/base.h"
extern mca_pcm_base_module_t *mca_pcm;
static void ompi_rte_spawn_handle_construct(ompi_object_t *);
static void ompi_rte_spawn_handle_destruct(ompi_object_t *);
bool
ompi_rte_can_spawn(void)
static ompi_pointer_array_t avail_handles;
OBJ_CLASS_INSTANCE(ompi_rte_spawn_handle_t, ompi_object_t,
ompi_rte_spawn_handle_construct, ompi_rte_spawn_handle_destruct);
int
ompi_rte_internal_init_spawn(void)
{
if (NULL == mca_pcm) {
return OMPI_ERROR;
OBJ_CONSTRUCT(&avail_handles, ompi_pointer_array_t);
return OMPI_SUCCESS;
}
int
ompi_rte_internal_fini_spawn(void)
{
/* BWB - figure out how to clean up... */
OBJ_DESTRUCT(&avail_handles);
return OMPI_SUCCESS;
}
ompi_rte_spawn_handle_t *
ompi_rte_get_spawn_handle(int criteria, bool have_threads)
{
size_t i;
ompi_rte_spawn_handle_t *ptr;
int ret;
/* BWB - long term, this has to go. But for now, here we are */
if (0 != (OMPI_RTE_SPAWN_MULTI_CELL & criteria)) {
printf("ompi_rte_get_spawn_handle: warning: multi-cell support "
"implemented. Removing criteria.\n");
criteria ^= OMPI_RTE_SPAWN_MULTI_CELL;
}
/* make sure we don't already have a matching criteria */
for (i = 0 ; i < ompi_pointer_array_get_size(&avail_handles) ; ++i) {
ptr = ompi_pointer_array_get_item(&avail_handles, i);
if (NULL == ptr) continue;
if (ptr->criteria == criteria) {
OBJ_RETAIN(ptr);
return ptr;
}
}
/* BWB - fix me, fix me, fix me */
return true;
/* no matching criteria. create a new set of pcms and we're good
to go */
ptr = OBJ_NEW(ompi_rte_spawn_handle_t);
if (NULL == ptr) return NULL;
ret = mca_pcm_base_select(have_threads, criteria,
&(ptr->modules), &(ptr->modules_len));
if (ret != OMPI_SUCCESS) {
errno = ret;
return NULL;
}
/* remove for multi-cell */
if (ptr->modules_len != 1) {
OBJ_RELEASE(ptr);
return NULL;
}
ompi_pointer_array_add(&avail_handles, ptr);
return ptr;
}
int
ompi_rte_spawn_procs(mca_ns_base_jobid_t jobid, ompi_list_t *schedule_list)
ompi_rte_spawn_procs(ompi_rte_spawn_handle_t *handle,
mca_ns_base_jobid_t jobid,
ompi_list_t *schedule_list)
{
if (NULL == mca_pcm->pcm_spawn_procs) {
return OMPI_ERROR;
mca_pcm_base_module_t *active;
if (NULL == handle) return OMPI_ERR_BAD_PARAM;
/* BWB - check for invalid jobid */
if (NULL == schedule_list) return OMPI_ERR_BAD_PARAM;
/* remove for multi-cell */
assert(1 == handle->modules_len);
active = handle->modules[0];
if (NULL == active->pcm_spawn_procs) {
return OMPI_ERR_NOT_IMPLEMENTED;
}
return mca_pcm->pcm_spawn_procs(mca_pcm, jobid, schedule_list);
return active->pcm_spawn_procs(active, jobid, schedule_list);
}
int
ompi_rte_kill_proc(ompi_process_name_t *name, int flags)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
int
ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int flags)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
@ -40,6 +128,7 @@ ompi_process_name_t*
ompi_rte_get_self(void)
{
if (NULL == mca_pcmclient.pcmclient_get_self) {
errno = OMPI_ERR_NOT_IMPLEMENTED;
return NULL;
}
@ -51,30 +140,38 @@ int
ompi_rte_get_peers(ompi_process_name_t **peers, size_t *npeers)
{
if (NULL == mca_pcmclient.pcmclient_get_peers) {
return OMPI_ERROR;
return OMPI_ERR_NOT_IMPLEMENTED;
}
return mca_pcmclient.pcmclient_get_peers(peers, npeers);
}
int
ompi_rte_kill_proc(ompi_process_name_t *name, int flags)
static void
ompi_rte_spawn_handle_construct(ompi_object_t *obj)
{
if (NULL == mca_pcm->pcm_kill_proc) {
return OMPI_ERROR;
}
return mca_pcm->pcm_kill_proc(mca_pcm, name, flags);
ompi_rte_spawn_handle_t *handle = (ompi_rte_spawn_handle_t*) obj;
handle->criteria = 0;
handle->modules = NULL;
handle->modules_len = 0;
}
int
ompi_rte_kill_job(mca_ns_base_jobid_t jobid, int flags)
static void
ompi_rte_spawn_handle_destruct(ompi_object_t *obj)
{
if (NULL == mca_pcm->pcm_kill_job) {
return OMPI_ERROR;
ompi_rte_spawn_handle_t *handle = (ompi_rte_spawn_handle_t*) obj;
size_t i;
handle->criteria = 0;
for (i = 0 ; i < handle->modules_len ; ++i) {
mca_pcm_base_module_t *pcm = handle->modules[i];
if (NULL == pcm) continue;
if (NULL == pcm->pcm_finalize) continue;
pcm->pcm_finalize(pcm);
}
return mca_pcm->pcm_kill_job(mca_pcm, jobid, flags);
if (NULL != handle->modules) free(handle->modules);
handle->modules_len = 0;
}

Просмотреть файл

@ -110,43 +110,80 @@ extern "C" {
*/
int ompi_rte_finalize(void);
/**
* Request a handle for spawning jobs
*
* Request a handle for allocating resources and spawning a job.
* This is the first step in starting a new set of processes. It
* will load the best available set of pcm components for starting
* a job according to the \c criteria provided.
*
* The returned job handle should be OBJ_RELEASE()'ed when no
* further use of the particular job handle is needed. It is
* possible that consecutive calls to this function with the same
* \c criteria will return a pointer to the same object. In these
* situations, the reference count on the object will be adjusted
* as appropriate.
*
* The returned handle can be used to call the process startup
* related functions multiple times, both in the same job and in
* different jobs.
*
* @param criteria (IN) Selection criteria. A bitmask of the
* constants defined in \c runtime.h starting
* with \c OMPI_RTE_SPAWN_*
* @param have_threads (IN) Whether the current running process is
* multi-threaded or not. true means there
* may be concurrent access into the
* underlying components *and* that the
* components may launch new threads.
* @return jobhandle (OUT) Pointer to an \c ompi_rte_jobhandle.
* If no available pcm components are capable
* of meeting criteria, \c NULL is returned.
*/
ompi_rte_spawn_handle_t* ompi_rte_get_spawn_handle(int criteria,
bool have_threads);
/**
* Allocate requested resources
*
* Allocate the specified nodes / processes for use in a new job.
* Requires a newly created jobid. The allocation returned may be
* smaller than requested - it is up to the caller to proceed as
* appropriate should this occur. This function should only be called
* once per jobid.
* This function should be called exactly once per call to \c
* ompi_rte_spawn_procs.
*
* @param handle (IN) Handle from \c ompi_rte_get_spawn_handle
* @param jobid (IN) Jobid with which to associate the given resources.
* @param nodes (IN) Number of ndoes to try to allocate. If 0, the
* allocator will try to allocate \c procs processes
* on as many nodes as are needed. If non-zero,
* on as many nodes as are needed. If positive,
* will try to allocate \c procs process slots
* per node.
* per node. If both nodes and procs are 0,
* will attempt to return as many resources as
* possible
* @param procs (IN) Number of processors to try to allocate. See the note
* for <code>nodes</code> for usage.
* for \c nodes for usage.
* @return List of <code>ompi_rte_node_allocation_t</code>s
* describing the allocated resources.
* describing the allocated resources or NULL on
* error (error will be in errno)
*
* @note In the future, a more complex resource allocation
* function may be added, which allows for complicated
* resource requests. This function will continue to exist
* as a special case of that function.
*/
ompi_list_t* ompi_rte_allocate_resources(mca_ns_base_jobid_t jobid,
int nodes, int procs);
/**
* This tells you whether the runtime is capable of spawning new
* processes or not
*
* @return True/False
* Some systems are not capable of providing a maximum
* available resource count and there is an inherent race
* condition to do so in many other systems. On these
* systems, errno will be set to \c OMPI_ERR_NOT_SUPPORTED.
* This is not a fatal error - \c
* ompi_rte_allocate_resources can be called again, but
* without nodes = 0, procs = 0.
*/
bool ompi_rte_can_spawn(void);
ompi_list_t* ompi_rte_allocate_resources(ompi_rte_spawn_handle_t* handle,
mca_ns_base_jobid_t jobid,
int nodes, int procs);
/**
@ -157,8 +194,10 @@ extern "C" {
* of \c mca_pcm_base_schedule_t structures, which give both process
* and location information.
*
* @param handle (IN) Handle from \c ompi_rte_get_spawn_handle
*/
int ompi_rte_spawn_procs(mca_ns_base_jobid_t jobid,
int ompi_rte_spawn_procs(ompi_rte_spawn_handle_t* handle,
mca_ns_base_jobid_t jobid,
ompi_list_t *schedule_list);
@ -231,11 +270,13 @@ extern "C" {
*
* Return the resources for the given jobid to the system.
*
* @param handle (IN) Handle from \c ompi_rte_get_spawn_handle
* @param jobid (IN) Jobid associated with the resources to be freed.
* @param nodes (IN) Nodelist from associated allocate_resource call.
* All associated memory will be freed as appropriate.
*/
int ompi_rte_deallocate_resources(mca_ns_base_jobid_t jobid,
int ompi_rte_deallocate_resources(ompi_rte_spawn_handle_t *handle,
mca_ns_base_jobid_t jobid,
ompi_list_t *nodelist);

20
src/runtime/runtime_internal.h Обычный файл
Просмотреть файл

@ -0,0 +1,20 @@
/*
* $HEADER$
*/
/**
* @file
*
* Internal Run-Time interface functionality
*/
#ifndef OMPI_RUNTIME_INTERNAL_H
#define OMPI_RUNTIME_INTERNAL_H
#include "ompi_config.h"
#include "class/ompi_object.h"
int ompi_rte_internal_init_spawn(void);
int ompi_rte_internal_fini_spawn(void);
#endif

Просмотреть файл

@ -13,10 +13,32 @@
#include "class/ompi_list.h"
#include "mca/ns/ns.h"
#include "mca/pcm/pcm.h"
#include <sys/param.h>
#include <netdb.h>
/**
* Spawn Handle
*
* Private container for the Run-Time Environment to store mappings of
* spawn criteria to PCM components
*/
struct ompi_rte_spawn_handle_t {
/** make us an object instance */
ompi_object_t super;
/** criteria used to select this set of pcms */
int criteria;
/** pointers to the pcms */
mca_pcm_base_module_t **modules;
/** lentgh of modules */
size_t modules_len;
};
/** shorten ompi_rte_spawn_handle_t declarations */
typedef struct ompi_rte_spawn_handle_t ompi_rte_spawn_handle_t;
/** create the required instance information */
OBJ_CLASS_DECLARATION(ompi_rte_spawn_handle_t);
/**
* Process startup description container

Просмотреть файл

@ -16,7 +16,7 @@
static void
show_usage(char *myname)
{
printf("usage: %s --local_start_vpid [vpid] --global_start_vpid [vpid]\n"
printf("usage: %s --local_offset [vpid] --global_start_vpid [vpid]\n"
" --num_procs [num]\n\n", myname);
}
@ -37,8 +37,12 @@ main(int argc, char *argv[])
char *env_buf;
ompi_init(argc, argv);
/*
* command line parsing
*/
cmd_line = OBJ_NEW(ompi_cmd_line_t);
ompi_cmd_line_make_opt(cmd_line, '\0', "local_start_vpid", 1,
ompi_cmd_line_make_opt(cmd_line, '\0', "local_offset", 1,
"starting vpid to use when launching");
ompi_cmd_line_make_opt(cmd_line, '\0', "global_start_vpid", 1,
"starting vpid to use when launching");
@ -50,12 +54,12 @@ main(int argc, char *argv[])
exit(1);
}
if (!ompi_cmd_line_is_taken(cmd_line, "local_start_vpid")) {
if (!ompi_cmd_line_is_taken(cmd_line, "local_offset")) {
show_usage(argv[0]);
exit(1);
}
local_vpid_start =
atoi(ompi_cmd_line_get_param(cmd_line, "local_start_vpid", 0, 0));
atoi(ompi_cmd_line_get_param(cmd_line, "local_offset", 0, 0));
if (!ompi_cmd_line_is_taken(cmd_line, "global_start_vpid")) {
show_usage(argv[0]);
@ -70,9 +74,15 @@ main(int argc, char *argv[])
}
total_num_procs = atoi(ompi_cmd_line_get_param(cmd_line, "num_procs", 0, 0));
/*
* Receive the startup schedule for here
*/
sched = OBJ_NEW(ompi_rte_node_schedule_t);
if (NULL == sched) {
printf("Error in OBJ_NEW. aborting\n");
exit(1);
}
/* recv_schedule wants an already initialized ompi_list_t */
ret = mca_pcm_base_recv_schedule(stdin, &jobid, sched,
&fork_num_procs);
if (ret != OMPI_SUCCESS) {

Просмотреть файл

@ -49,6 +49,7 @@ main(int argc, char *argv[])
char *my_contact_info, *tmp, *jobid_str, *procid_str;
char *contact_file, *filenm, *universe;
pid_t pid;
ompi_rte_spawn_handle_t *spawn_handle;
/*
* Intialize our Open MPI environment
@ -284,8 +285,11 @@ main(int argc, char *argv[])
/* get the jobid for the application */
new_jobid = ompi_name_server.create_jobid();
/* get the spawn handle to start spawning stuff */
spawn_handle = ompi_rte_get_spawn_handle(OMPI_RTE_SPAWN_HIGH_QOS, true);
/* BWB - fix jobid, procs, and nodes */
nodelist = ompi_rte_allocate_resources(new_jobid, 0, num_procs);
nodelist = ompi_rte_allocate_resources(spawn_handle, new_jobid, 0, num_procs);
if (NULL == nodelist) {
/* BWB show_help */
printf("show_help: ompi_rte_allocate_resources failed\n");
@ -342,10 +346,11 @@ main(int argc, char *argv[])
/*
* spawn procs
*/
if (OMPI_SUCCESS != ompi_rte_spawn_procs(new_jobid, &schedlist)) {
if (OMPI_SUCCESS != ompi_rte_spawn_procs(spawn_handle, new_jobid, &schedlist)) {
printf("show_help: woops! we didn't spawn :( \n");
return -1;
}
/*
*
@ -360,8 +365,9 @@ main(int argc, char *argv[])
/*
* Clean up
*/
if (NULL != nodelist) ompi_rte_deallocate_resources(new_jobid, nodelist);
if (NULL != nodelist) ompi_rte_deallocate_resources(spawn_handle, new_jobid, nodelist);
if (NULL != cmd_line) OBJ_RELEASE(cmd_line);
if (NULL != spawn_handle) OBJ_RELEASE(spawn_handle);
/* eventually, mpirun won't be the seed and so won't have to do this.
* for now, though, remove the universe-setup.txt file so the directories