1
1

minor clean up and treespawn support

This commit was SVN r13876.
Этот коммит содержится в:
Li-Ta Lo 2007-03-01 22:32:37 +00:00
родитель caa1522a22
Коммит a0e5b6a27c
4 изменённых файлов: 40 добавлений и 62 удалений

Просмотреть файл

@ -68,6 +68,7 @@
#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_wait.h"
#include "orte/runtime/runtime.h" #include "orte/runtime/runtime.h"
#include <math.h>
#include "pls_xcpu.h" #include "pls_xcpu.h"
#include "spfs.h" #include "spfs.h"
#include "spclient.h" #include "spclient.h"
@ -76,9 +77,6 @@
extern char **environ; extern char **environ;
/** external variable defined in libspclient */
extern int spc_chatty;
/** /**
* Initialization of the xcpu module with all the needed function pointers * Initialization of the xcpu module with all the needed function pointers
*/ */
@ -96,7 +94,7 @@ orte_pls_base_module_t orte_pls_xcpu_module = {
/* array of *Xpcommand and Xpnodeset, each xcmd/nodeset correspond to one OMPI app_context */ /* array of *Xpcommand and Xpnodeset, each xcmd/nodeset correspond to one OMPI app_context */
Xpcommand **xcmd_sets; Xpcommand **xcmd_sets;
Xpnodeset **node_sets; Xpnodeset **node_sets;
int num_xcmds; int num_apps;
void void
pls_xcpu_stdout_cb(Xpsession *s, u8 *buf, u32 buflen) pls_xcpu_stdout_cb(Xpsession *s, u8 *buf, u32 buflen)
@ -242,16 +240,12 @@ pls_xcpu_setup_env(char ***e)
int int
orte_pls_xcpu_launch_job(orte_jobid_t jobid) orte_pls_xcpu_launch_job(orte_jobid_t jobid)
{ {
int i, n, rc; int i, fanout, rc;
int num_processes = 0; int num_processes = 0;
orte_cellid_t cellid; orte_cellid_t cellid;
opal_list_item_t *node_item, *proc_item; opal_list_item_t *node_item, *proc_item;
orte_job_map_t *map; orte_job_map_t *map;
orte_vpid_t vpid_start, vpid_range; orte_vpid_t vpid_start, vpid_range;
char **env;
if (mca_pls_xcpu_component.chatty)
spc_chatty = 1;
/* get the job map */ /* get the job map */
rc = orte_rmaps.get_job_map(&map, jobid); rc = orte_rmaps.get_job_map(&map, jobid);
@ -259,6 +253,7 @@ orte_pls_xcpu_launch_job(orte_jobid_t jobid)
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
num_apps = map->num_apps;
/* next, get the vpid_start and range */ /* next, get the vpid_start and range */
rc = orte_rmgr.get_vpid_range(jobid, &vpid_start, &vpid_range); rc = orte_rmgr.get_vpid_range(jobid, &vpid_start, &vpid_range);
@ -271,13 +266,11 @@ orte_pls_xcpu_launch_job(orte_jobid_t jobid)
cellid = orte_process_info.my_name->cellid; cellid = orte_process_info.my_name->cellid;
/* create num_apps of pointers to Xpnodeset and Xpcommand */ /* create num_apps of pointers to Xpnodeset and Xpcommand */
node_sets = (Xpnodeset **) malloc(map->num_apps * sizeof(Xpnodeset *)); node_sets = (Xpnodeset **) malloc(num_apps * sizeof(Xpnodeset *));
xcmd_sets = (Xpcommand **) malloc(map->num_apps * sizeof(Xpcommand *)); xcmd_sets = (Xpcommand **) malloc(num_apps * sizeof(Xpcommand *));
num_xcmds = map->num_apps;
/* create Xpnodeset for each app_context */ /* create Xpnodeset for each app_context */
for (i = 0; i < map->num_apps; i++) { for (i = 0; i < num_apps; i++) {
node_sets[i] = xp_nodeset_create(); node_sets[i] = xp_nodeset_create();
} }
@ -299,13 +292,13 @@ orte_pls_xcpu_launch_job(orte_jobid_t jobid)
} }
/* setup envrionment variables for each app context */ /* setup envrionment variables for each app context */
for (i = 0; i < map->num_apps; i++) { for (i = 0; i < num_apps; i++) {
/* FixME: how many layers of *? */ /* FixME: how many layers of *? */
pls_xcpu_setup_env(&map->apps[i]->env); pls_xcpu_setup_env(&map->apps[i]->env);
num_processes += map->apps[i]->num_procs; num_processes += map->apps[i]->num_procs;
} }
for (i = 0; i < map->num_apps; i++) { for (i = 0; i < num_apps; i++) {
rc = orte_ns_nds_xcpu_put(cellid, jobid, vpid_start, rc = orte_ns_nds_xcpu_put(cellid, jobid, vpid_start,
num_processes, &map->apps[i]->env); num_processes, &map->apps[i]->env);
if (rc != ORTE_SUCCESS) { if (rc != ORTE_SUCCESS) {
@ -315,9 +308,18 @@ orte_pls_xcpu_launch_job(orte_jobid_t jobid)
} }
/* create Xpcommand for each app_context from Xpnodeset */ /* create Xpcommand for each app_context from Xpnodeset */
for (i = 0; i < map->num_apps; i++) { for (i = 0; i < num_apps; i++) {
xcmd_sets[i] = xp_command_create(node_sets[i]); xcmd_sets[i] = xp_command_create(node_sets[i]);
/* caculate maximum fan out for tree spawn */
if (mca_pls_xcpu_component.maxsessions < 0) {
fanout = (int) sqrt(node_sets[i]->len);
if (fanout*fanout < node_sets[i]->len)
fanout++;
} else
fanout = mca_pls_xcpu_component.maxsessions;
xcmd_sets[i]->nspawn = fanout;
/* setup argc, argv and evn in xcpu command */ /* setup argc, argv and evn in xcpu command */
xcmd_sets[i]->cwd = strdup(map->apps[i]->cwd); xcmd_sets[i]->cwd = strdup(map->apps[i]->cwd);
xcmd_sets[i]->env = process_env(map->apps[i]->env); xcmd_sets[i]->env = process_env(map->apps[i]->env);
@ -341,7 +343,7 @@ orte_pls_xcpu_launch_job(orte_jobid_t jobid)
* FixME: we are blocked here so both success and faulure cases * FixME: we are blocked here so both success and faulure cases
* fall back to the error handler and all resources are freed. * fall back to the error handler and all resources are freed.
* this should be changed when we have non-blocking command_wait() */ * this should be changed when we have non-blocking command_wait() */
if (xp_commands_wait(map->num_apps, xcmd_sets) < 0) { if (xp_commands_wait(num_apps, xcmd_sets) < 0) {
rc = ORTE_ERROR; rc = ORTE_ERROR;
} else { } else {
rc = ORTE_SUCCESS; rc = ORTE_SUCCESS;
@ -349,7 +351,7 @@ orte_pls_xcpu_launch_job(orte_jobid_t jobid)
error: error:
/* error handling and clean up, kill all the processes */ /* error handling and clean up, kill all the processes */
for (i = 0; i < map->num_apps; i++) { for (i = 0; i < num_apps; i++) {
if (xcmd_sets[i] != NULL) { if (xcmd_sets[i] != NULL) {
xp_command_wipe(xcmd_sets[i]); xp_command_wipe(xcmd_sets[i]);
xp_command_destroy(xcmd_sets[i]); xp_command_destroy(xcmd_sets[i]);
@ -363,18 +365,9 @@ error:
int orte_pls_xcpu_terminate_job(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs) int orte_pls_xcpu_terminate_job(orte_jobid_t jobid, struct timeval *timeout, opal_list_t *attrs)
{ {
int i, rc; int i;
orte_job_map_t *map;
for (i = 0; i < num_apps; i++) {
/* get the job map */
rc = orte_rmaps.get_job_map(&map, jobid);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
for (i = 0; i < map->num_apps; i++) {
if (xcmd_sets[i] != NULL) { if (xcmd_sets[i] != NULL) {
xp_command_kill(xcmd_sets[i], SIGTERM); xp_command_kill(xcmd_sets[i], SIGTERM);
} }
@ -391,7 +384,7 @@ int orte_pls_xcpu_terminate_proc(const orte_process_name_t* proc_name)
{ {
fprintf(stderr, __FILE__ " terminate_proc\n"); fprintf(stderr, __FILE__ " terminate_proc\n");
/* libxcpu can not wipe individual process in an /* libxcpu can not kill individual process in an
* Xpcommand/Xpsessionset, only to the whole session set */ * Xpcommand/Xpsessionset, only to the whole session set */
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -399,19 +392,11 @@ int orte_pls_xcpu_terminate_proc(const orte_process_name_t* proc_name)
int orte_pls_xcpu_signal_job(orte_jobid_t jobid, int32_t sig, opal_list_t *attrs) int orte_pls_xcpu_signal_job(orte_jobid_t jobid, int32_t sig, opal_list_t *attrs)
{ {
int i, rc; int i;
orte_job_map_t *map;
fprintf(stderr, __FILE__ " signal_job, sig = %d\n", sig); fprintf(stderr, __FILE__ " signal_job, sig = %d\n", sig);
/* get the job map */ for (i = 0; i < num_apps; i++) {
rc = orte_rmaps.get_job_map(&map, jobid);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
for (i = 0; i < map->num_apps; i++) {
if (xcmd_sets[i] != NULL) if (xcmd_sets[i] != NULL)
xp_command_kill(xcmd_sets[i], sig); xp_command_kill(xcmd_sets[i], sig);
} }

Просмотреть файл

@ -84,13 +84,10 @@ struct orte_pls_xcpu_component_t {
/* The priority of this component. This will be returned if /* The priority of this component. This will be returned if
* we determine that xcpu is available and running on this node, * we determine that xcpu is available and running on this node,
*/ */
int terminate_sig;
/* The signal that gets sent to a process to kill it. */
opal_mutex_t lock;
/* Lock used to prevent some race conditions */
opal_condition_t condition;
/* Condition that is signaled when all the daemons have died */
int chatty; int chatty;
/* enable print out of 9P protocol */
int maxsessions;
/* maximum fan out for tree spawn */
}; };
typedef struct orte_pls_xcpu_component_t orte_pls_xcpu_component_t; typedef struct orte_pls_xcpu_component_t orte_pls_xcpu_component_t;

Просмотреть файл

@ -52,6 +52,9 @@ orte_pls_xcpu_component_t mca_pls_xcpu_component = {
} }
}; };
/** external variable defined in libspclient */
extern int spc_chatty;
/** /**
* Opens the pls_xcpu component, setting all the needed mca parameters and * Opens the pls_xcpu component, setting all the needed mca parameters and
* finishes setting up the component struct. * finishes setting up the component struct.
@ -69,10 +72,15 @@ int orte_pls_xcpu_component_open(void)
mca_base_param_reg_int(c, "debug", mca_base_param_reg_int(c, "debug",
"If > 0 prints library debugging information", "If > 0 prints library debugging information",
false, false, 0, &mca_pls_xcpu_component.debug); false, false, 0, &mca_pls_xcpu_component.debug);
mca_base_param_reg_int(c, "chatty", "Prints 9P protocol transactions", mca_base_param_reg_int(c, "chatty",
"Prints 9P protocol transactions",
false, false, 0, &mca_pls_xcpu_component.chatty); false, false, 0, &mca_pls_xcpu_component.chatty);
OBJ_CONSTRUCT(&mca_pls_xcpu_component.lock, opal_mutex_t); mca_base_param_reg_int(c, "maxsession",
OBJ_CONSTRUCT(&mca_pls_xcpu_component.condition, opal_condition_t); "Max fan out when using XCPUFS tree spawn",
false, false, -1, &mca_pls_xcpu_component.maxsessions);
if (mca_pls_xcpu_component.chatty)
spc_chatty = 1;
return rc; return rc;
} }
@ -82,11 +90,6 @@ int orte_pls_xcpu_component_open(void)
*/ */
int orte_pls_xcpu_component_close(void) int orte_pls_xcpu_component_close(void)
{ {
//fprintf(stderr, "orte_pls_xcpu_component_close\n");
OBJ_DESTRUCT(&mca_pls_xcpu_component.lock);
OBJ_DESTRUCT(&mca_pls_xcpu_component.condition);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -30,7 +30,6 @@
#include <assert.h> #include <assert.h>
#include <limits.h> #include <limits.h>
#include "spfs.h" #include "spfs.h"
//#include "spfsimpl.h"
#include "orte_config.h" #include "orte_config.h"
#include "opal/event/event.h" #include "opal/event/event.h"
#include "opal/runtime/opal_progress.h" #include "opal/runtime/opal_progress.h"
@ -97,7 +96,6 @@ spfd_add(int fd, void (*notify)(Spfd *, void *), void *aux)
if (!spfd) if (!spfd)
return NULL; return NULL;
// fprintf(stderr, "spfd_add spfd %p fd %d\n", spfd, fd);
fcntl(fd, F_SETFL, O_NONBLOCK); fcntl(fd, F_SETFL, O_NONBLOCK);
spfd->fd = fd; spfd->fd = fd;
spfd->flags = 0; spfd->flags = 0;
@ -116,7 +114,6 @@ spfd_add(int fd, void (*notify)(Spfd *, void *), void *aux)
void void
spfd_remove(Spfd *spfd) spfd_remove(Spfd *spfd)
{ {
// fprintf(stderr, "spfd_remove spfd %p\n", spfd);
if (spfd->prev) if (spfd->prev)
spfd->prev->next = spfd->next; spfd->prev->next = spfd->next;
else else
@ -213,7 +210,6 @@ spfd_handler(int fd, short event, void *aux)
spfd = aux; spfd = aux;
// fprintf(stderr, "spfd_handler spfd %p event %d events %d flags %d\n", spfd, event, spfd->events, spfd->flags);
flags = spfd->flags; flags = spfd->flags;
events = spfd->events; events = spfd->events;
@ -244,9 +240,6 @@ spfd_handler(int fd, short event, void *aux)
static void static void
sp_setup_event(Spfd *spfd) sp_setup_event(Spfd *spfd)
{ {
// fprintf(stderr, "sp_setup_event ");
// sp_printtime(stderr);
// fprintf(stderr, " spfd %p events %d\n", spfd, spfd->events);
opal_event_set(&spfd->opevent, spfd->fd, spfd->events, spfd_handler, spfd); opal_event_set(&spfd->opevent, spfd->fd, spfd->events, spfd_handler, spfd);
opal_event_add(&spfd->opevent, sptval); opal_event_add(&spfd->opevent, sptval);
} }