1
1

Updated xcpu launcher. open-mpi no longer needs xcpu library. Launcher code is now moved within xcpu.

This commit was SVN r10342.
Этот коммит содержится в:
Sushant Sharma 2006-06-13 23:21:56 +00:00
родитель b9e5acfbe3
Коммит b5a16b6515
6 изменённых файлов: 519 добавлений и 67 удалений

Просмотреть файл

@ -7,7 +7,7 @@
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# Copyright (c) 2004-2006 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
@ -16,8 +16,6 @@
# $HEADER$
#
#dist_pkgdata_DATA = help-pls-bproc.txt
AM_CPPFLAGS = $(pls_xcpu_CPPFLAGS)
# Make the output library in this directory, and name it either

Просмотреть файл

@ -8,7 +8,7 @@
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# Copyright (c) 2004-2006 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
@ -21,9 +21,9 @@
# -----------------------------------------------------------
AC_DEFUN([MCA_pls_xcpu_CONFIG],[
OMPI_CHECK_XCPU([pls_xcpu], [pls_xcpu_good=1], [pls_xcpu_good=0])
# if xcpu is present and working, pls_xcpu_good=1.
# if check worked, set wrapper flags.
# Evaluate succeed / fail
AS_IF([test "$pls_xcpu_good" = "1"],
[pls_xcpu_WRAPPER_EXTRA_LDFLAGS="$pls_xcpu_LDFLAGS"
pls_xcpu_WRAPPER_EXTRA_LIBS="$pls_xcpu_LIBS"

Просмотреть файл

@ -8,7 +8,7 @@
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# Copyright (c) 2004-2006 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#

Просмотреть файл

@ -8,7 +8,7 @@
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
@ -67,11 +67,29 @@
#include "orte/runtime/runtime.h"
#include "pls_xcpu.h"
#include <regex.h>
#include <dirent.h>
/**
* Our current evironment
*/
extern char **environ;
extern int errno;
char **g_environ;
int g_regexploc=1;
regex_t g_compiled_exp;
orte_pls_xcpu_mount_nodes *g_current_m=NULL;
orte_pls_xcpu_thread_info *g_thread_info;
orte_pls_xcpu_pthread_tindex t_info;
orte_pls_xcpu_stdio_thread_info *g_stdout_thread_info, *g_stderr_thread_info;
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
orte_pls_xcpu_pthread_tindex *orte_pls_xcpu_launch_procs(int, char **, char**, orte_process_name_t *);
int orte_pls_xcpu_cmd_check(int, char **);
void orte_pls_xcpu_cleanup();
void *orte_pls_xcpu_start_thread(void *);
void *orte_pls_xcpu_stdio_thread(void *);
int orte_pls_xcpu_check_exp(char *);
/**
* Initialization of the xcpu module with all the needed function pointers
@ -80,14 +98,9 @@ orte_pls_base_module_t orte_pls_xcpu_module = {
orte_pls_xcpu_launch,
orte_pls_xcpu_terminate_job,
orte_pls_xcpu_terminate_proc,
orte_pls_xcpu_signal_job,
orte_pls_xcpu_signal_proc,
orte_pls_xcpu_finalize
};
/** include a prototype for the xcpu launch function */
int lrx(int argc, char **argv ,char **env);
/** LOCAL SUPPORT FUNCTIONS **/
/** provide a local function to release the function stack
@ -100,6 +113,440 @@ static void orte_pls_xcpu_free_stack(orte_pls_xcpu_tid_stack *s){
}
}
/* for handling stdout/err */
void *orte_pls_xcpu_stdio_thread(void *info){
orte_pls_xcpu_stdio_thread_info *io_t_info;
char buf[100];int x, rc;
io_t_info = (orte_pls_xcpu_stdio_thread_info*)info;
if((x=open(io_t_info->stdio_path, O_RDONLY))<0){
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
}else{
while(1){
if((rc=read(x, buf, 100))>0){
write(io_t_info->outdes, buf, rc);
}else{
if(rc==-1){
ORTE_ERROR_LOG(ORTE_ERR_FILE_READ_FAILURE);
}
break;
}
}
}
x>=0?close(x):0;
free(io_t_info->stdio_path);
free(io_t_info);
pthread_exit(NULL);
}
/* used by orte_pls_xcpu_launch_procs to start process
* on remote compute node.
* one thread per process for time being
*
* @info: contains all the information required by thread
* to launch process on remote compute node.
*/
void *orte_pls_xcpu_start_thread(void *info){
orte_pls_xcpu_thread_info *t_info;
char *session_clone, session_dir[255], *session_dir_path;
int clone_des, rc=0, des1, des2/*, tdes*/, trc[2];
char *env_path, *exec_path, *argv_path, *ctl_path;
char character[8193];
int i;
orte_process_name_t *peers;
pthread_t tids[2];
trc[0]=trc[1]=0;
t_info=(orte_pls_xcpu_thread_info*)info;
session_clone=(char*)malloc(strlen(t_info->local_mounts.name)+7);
sprintf(session_clone, "%s/clone", t_info->local_mounts.name);
if((clone_des=open(session_clone, O_RDONLY))<0){
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
}
if((rc=read(clone_des, session_dir, 255))<0){
ORTE_ERROR_LOG(ORTE_ERR_FILE_READ_FAILURE);
}
else{
session_dir[rc]='\0';
session_dir_path=(char*)malloc(strlen(t_info->local_mounts.name)+strlen(session_dir)+2);
sprintf(session_dir_path, "%s/%s", t_info->local_mounts.name, session_dir);
/* write environment if needed */
env_path=(char*)malloc(strlen(session_dir_path)+5);
sprintf(env_path, "%s/env", session_dir_path);
if(t_info->env){
if((des1=open(env_path, O_WRONLY))<0){
ORTE_ERROR_LOG(ORTE_ERR_FILE_WRITE_FAILURE);
}else{
i=0;
while(t_info->env[i]){
/*printf("from lrx: %s\n", t_info->env[i]);
*/if(write(des1, t_info->env[i], strlen(t_info->env[i])) == -1){
ORTE_ERROR_LOG(ORTE_ERR_FILE_WRITE_FAILURE);
break;
}else{
if(t_info->env[i+1]){
if(write(des1, "\n", 1) == -1){
ORTE_ERROR_LOG(ORTE_ERR_FILE_WRITE_FAILURE);
break;
}
}
}
i++;
}
close(des1);
}
}
free(env_path);
/*then copy binary*/
exec_path=(char*)malloc(strlen(session_dir_path)+6);
sprintf(exec_path, "%s/exec", session_dir_path);
if((des1=open(exec_path, O_WRONLY))<0){
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
}else
if((des2=open(t_info->binary, O_RDONLY))<0){
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
}else{
while(1){
if((rc=read(des2, character, 8192))<=0){
if(close(des1)!=0){ /*?????*/
/*no ORTE_ERR defined for FILE_CLOSE_FAILURE*/
}
if(close(des2)!=0){
/*no ORTE_ERR defined for FILE_CLOSE_FAILURE*/
}
break;
}else{
if(write(des1, character, rc)==-1){
ORTE_ERROR_LOG(ORTE_ERR_FILE_WRITE_FAILURE);
break;
}
}
}
}
/* then write args*/
argv_path=(char*)malloc(strlen(session_dir_path)+6);
sprintf(argv_path, "%s/argv", session_dir_path);
if((des1=open(argv_path, O_WRONLY))<0){
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
}else{
write(des1, t_info->argv, strlen(t_info->argv));
close(des1);
}
/* then write exec into ctl file to start remote execution*/
ctl_path=(char*)malloc(strlen(session_dir_path)+5);
sprintf(ctl_path, "%s/ctl", session_dir_path);
/*continuation of writing ctl*/
if((des1=open(ctl_path, O_WRONLY))<0){
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
}else{
if(write(des1, "exec\n", 5)==-1){
ORTE_ERROR_LOG(ORTE_ERR_FILE_WRITE_FAILURE);
}else
close(des1);
}
/*then spawn threads for stderr and atdout*/
g_stdout_thread_info=(orte_pls_xcpu_stdio_thread_info*)malloc(sizeof(orte_pls_xcpu_stdio_thread_info));
g_stdout_thread_info->stdio_path=(char*)malloc(strlen(session_dir_path)+8);
sprintf(g_stdout_thread_info->stdio_path, "%s/stdout", session_dir_path);
g_stdout_thread_info->outdes=1;
if((rc=pthread_create(&tids[0], NULL, orte_pls_xcpu_stdio_thread, (void*)g_stdout_thread_info))==0){
trc[0]=1;
}else ;
/*ORTE_ERR for thread_creation_failure not defined yet*/
/*fprintf(stderr, "\nstdout thread creation error\n");*/
g_stderr_thread_info=(orte_pls_xcpu_stdio_thread_info*)malloc(sizeof(orte_pls_xcpu_stdio_thread_info));
g_stderr_thread_info->stdio_path=(char*)malloc(strlen(session_dir_path)+8);
sprintf(g_stderr_thread_info->stdio_path, "%s/stderr", session_dir_path);
g_stderr_thread_info->outdes=2;
if((rc=pthread_create(&tids[1], NULL, orte_pls_xcpu_stdio_thread, (void*)g_stderr_thread_info))==0){
trc[1]=1;
}else ;
/*ORTE_ERR for thread_creation_failure not defined yet*/
/*fprintf(stderr, "stderr thread creation error\n");*/
free(session_dir_path);
free(exec_path);
free(argv_path);
free(ctl_path);
if(trc[0]){
pthread_join(tids[0], NULL);
}
if(trc[1]){
pthread_join(tids[1], NULL);
}
}
free(session_clone);
(clone_des>0)?close(clone_des):0;
/* make registry update thread-safe */
pthread_mutex_lock(&mymutex);
/*write into registry that you are done*/
if (ORTE_SUCCESS != (orte_soh_base_set_proc_soh(t_info->peers, ORTE_PROC_STATE_TERMINATED, 0)) ){
ORTE_ERROR_LOG(rc);
}
pthread_mutex_unlock(&mymutex);
/* free the allocated variables after you are done*/
free(t_info->local_mounts.name);
free(t_info->binary);
free(t_info->argv);
free(t_info);
pthread_exit(NULL);
}
/* xcpu launcher function.
* this function is called once for each process to be launched. or might
* be called one time for multiple processes if regular expression is passed
* to it. but for now regular expressions are not being passed.
*
* @argc: number of arguments or number of elements in argv
* @argv: it will be name of remote node as mounted at $XCPUBASE or /mnt/xcpu/
* @env: environment the needs to be setup on remote node before
* starting the process
* @peers: process info, this will be passed onto the threads to help them write
* process completion information in open-mpi registry.
*/
orte_pls_xcpu_pthread_tindex *orte_pls_xcpu_launch_procs(int argc, char **argv, char **env, orte_process_name_t *peers){
char *xcpu_base, *xcpu_argv;
struct dirent *d_entry;
DIR *dirp;
int temp_fd, rc=0, index=0, argvsize=0, ntids=0;
pthread_t *tids;
orte_pls_xcpu_mount_nodes *m_nodes, *local_mounts;
g_current_m=NULL;
m_nodes=NULL;
(!(xcpu_base=getenv("XCPUBASE")))?xcpu_base="/mnt/xcpu":0;
if(!(dirp=opendir(xcpu_base))){
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);/* it should be DIR_OPEN_ERROR */
return NULL;
}
/* this logic should be fast than the one commented below*/
m_nodes=(orte_pls_xcpu_mount_nodes*)malloc(sizeof(orte_pls_xcpu_mount_nodes));
m_nodes->next=g_current_m;
m_nodes->name=(char*)malloc(1+strlen(xcpu_base)+1+
strlen(argv[1])+1+strlen("xcpu")+1);
sprintf(m_nodes->name, "%s/%s/xcpu", xcpu_base, argv[1]);
if((temp_fd=open(m_nodes->name, O_RDONLY))<0){
fprintf(stderr, "Node %s/%s/xcpu does not exist\n",xcpu_base, argv[1]);
free(m_nodes->name);
}else{
close(temp_fd);
g_current_m=m_nodes;
}
/* logic ends */
/*
while((d_entry=readdir(dirp))!=NULL){
printf("comapring %s %s\n",d_entry->d_name, argv[1]);
if((strcmp(d_entry->d_name, ".")==0)||(strcmp(d_entry->d_name, "..")==0))
;else
if(regexec(&g_compiled_exp, d_entry->d_name, 0, NULL, 0)!=REG_NOMATCH){
printf("matched %s\n", argv[1]);
ntids++;
m_nodes=(orte_pls_xcpu_mount_nodes*)malloc(sizeof(orte_pls_xcpu_mount_nodes));
m_nodes->next=g_current_m;
m_nodes->name=(char*)malloc(1+strlen(xcpu_base)+1+
strlen(d_entry->d_name)+1+strlen("xcpu")+1);
sprintf(m_nodes->name, "%s/%s/xcpu", xcpu_base, d_entry->d_name);
g_current_m=m_nodes;
*/ /* we can break after finding the first one
* or if you want to give the user an option of
* specifying regular expressions in hostfiles
* then don't break here
*/
/* on a second thought we should not be going thrugh mounted node list
* just check if xcpu_base/d_entry->d_name/xcpu exists or not
*/
/* break;
}
}*/
if(g_current_m==NULL){ /* is that an error.... no?*/
return NULL;
}
closedir(dirp);
/* now combine argv's so that they could be passed on */
/* g_regexploc will have proper value only if
* cmd_check is already called in lrx
* and the location of first arg after name of binary will be
* argv[g_regexploc+2] because usage: ./o.lrx [-D xx] regexp binary args
*/
/* number of arguments = argc - g_regexploc - 2;*/
index=g_regexploc+2-1; /*argv[0] could be anything*/
while(argv[index]){
argvsize+=strlen(argv[index])+1;
index++;
}
xcpu_argv=(char*)malloc(argvsize+1);
index=g_regexploc+2-1;
while(argv[index]){
if(index==g_regexploc+2-1)
strcpy(xcpu_argv, argv[index]);/* i dont know why strcpy 1st time?*/
else
strcat(xcpu_argv, argv[index]);
strcat(xcpu_argv, " ");
index++;
}
xcpu_argv[argvsize]='\0';
local_mounts=g_current_m; /* this is a linked list of mounted directories
* where binaries need to run
*/
tids=(pthread_t*)malloc(ntids*sizeof(pthread_t));
index=0;
while(local_mounts){
/* dont use a shared copy
* give every thread its own copy since we dont know
* when all threads will exit and when to free a shared copy
*/
g_thread_info=(orte_pls_xcpu_thread_info*)malloc(sizeof(orte_pls_xcpu_thread_info));
/*copy name first*/
g_thread_info->local_mounts.name=(char*)malloc(strlen(local_mounts->name)+1);
strcpy(g_thread_info->local_mounts.name, local_mounts->name);
/*then copy binary*/
g_thread_info->binary=(char*)malloc(strlen(argv[g_regexploc+1])+1);
strcpy(g_thread_info->binary,argv[g_regexploc+1]);
/*then copy argv*/
g_thread_info->argv=(char*)malloc(strlen(xcpu_argv)+1);
strcpy(g_thread_info->argv, xcpu_argv);
/* for env and peers, since we are not allocating space for these
* and these will be freed after all the threads are completed at the
* end of mpirun (i hope).. otherwise we might have to copy these
* first and then pass to threads
*/
g_thread_info->env=env;
g_thread_info->peers=peers;
/*following thread will free the thread_info structure*/
rc=pthread_create(&tids[index], NULL, orte_pls_xcpu_start_thread, (void*)g_thread_info);
index++;
if(rc){
/*ORTE_ERR for thread_creation_failure not defined yet*/
/*fprintf(stderr, "pthread_create: error while creating thread %d\n", rc);*/
return NULL;
}
local_mounts=local_mounts->next;
}
/* use pthrad_join here if you want to wait for threads
* to finish execution
*//*
while(1){
index--;
pthread_join(tids[index], NULL);
if(index==0)
break;
}
free(tids);*/
/* remember to free tids in calling function*/
free(xcpu_argv);
t_info.tids=tids;
t_info.index=index;
return &t_info;
}
/* this function is to check if argv is in correct format.
* Some checks being done in this function (for -D) are not necessary
* and will be removed in future.
*/
int orte_pls_xcpu_cmd_check(int argc, char **argv){
char *temp_exp;
int rc=0;
g_regexploc=1;
if(argc>=3){
if(argv[1][0]=='-'){
switch(argv[1][1]){
case 'D': /* for debugging*/
g_regexploc+=2;
if(argc<5){
/*fprintf(stderr, "usage: o.lrx [-D debuglevel"
"] nodes binary [argv0 argv1 ...]\n");
*/rc=1;
}
break;
default: /* unspecified option*/
/*fprintf(stderr, "usage: o.lrx [-D debuglevel"
"] nodes binary [argv0 argv1 ...]\n");
*/return 1;
break;
}
}
}else{
/*fprintf(stderr, "usage: o.lrx [-D debuglevel"
"] nodes binary [argv0 argv1 ...]\n");
*/rc=1;
}
if(!rc){/*check for regular expression*/
temp_exp=(char*)malloc(strlen(argv[g_regexploc])+3);
sprintf(temp_exp, "^%s$", argv[g_regexploc]);
rc=orte_pls_xcpu_check_exp(temp_exp);
free(temp_exp);
}
return rc;
}
void orte_pls_xcpu_free_mount(orte_pls_xcpu_mount_nodes *g_current_m){
if(g_current_m){
orte_pls_xcpu_free_mount(g_current_m->next);
free(g_current_m->name);
free(g_current_m);
}
}
void orte_pls_xcpu_cleanup(){
regfree(&g_compiled_exp);
orte_pls_xcpu_free_mount(g_current_m);
}
/* Launcher can accept regular expressions as the list of nodes where
* processes are going to be launched. This is just a helper function to check
* if regular expression is correct or not
*/
int orte_pls_xcpu_check_exp(char *exp){
if(regcomp(&g_compiled_exp, exp, REG_EXTENDED|REG_NOSUB)){
/*fprintf(stderr, "Invlid regular expression: %s\n", exp);*/
return 1;
}
/*regfree(&g_compiled_exp);*/
return 0; /* now dont forget to call regfree at the end*/
}
/* This is the main launcher function
* It will call orte_pls_xcpu_launch_procs which will
* start a thread for each process to be launched
*/
int lrx(int argc, char **argv, char **env, orte_process_name_t *peers){
int rc;
orte_pls_xcpu_pthread_tindex *t_info;
if((rc=orte_pls_xcpu_cmd_check(argc, argv))==1){
return 0;
}
if((t_info=orte_pls_xcpu_launch_procs(argc, argv, env, peers))==NULL){
/*fprintf(stderr, "lrx: 0 processes launched\n");*/
orte_pls_xcpu_cleanup();
return 0;
}
else{
orte_pls_xcpu_cleanup();
t_info->index--;
rc=t_info->tids[t_info->index];
free(t_info->tids);
return rc; /* no need to return thread_id
* thread will write its completition
* itself in the registry
*/
}
/*
while(1){
t_info->index--;
pthread_join(t_info->tids[t_info->index], NULL);
if(t_info->index==0)
break;
}
*/
return 0;/* can never be called*/
}
/** provide a function to setup the environment for the remote
* processes. We need to ensure that the remote processes know
* their gpr and ns replicas, the universe
@ -196,14 +643,14 @@ int orte_pls_xcpu_launch(orte_jobid_t jobid){
orte_rmaps_base_node_t *node;
orte_rmaps_base_proc_t *proc;
orte_vpid_t vpid_start, vpid_range;
orte_process_name_t *peers;
int peer_id, num_peers;
/** first get the mapping we are going to use to launch job. The head
* of the list is OBJ_CONSTRUCT'd since it is not dynamically allocated. The
* get_map function, however, will dynamically allocate the items in the
* list itself - these will be released when we OBJ_DESTRUCT the list at
* the end
*/
/*fprintf(stdout, "\nxcpu launch called, job id: %d\n", jobid);*/
OBJ_CONSTRUCT(&mapping, opal_list_t);
/** get the mapping from the registry. This will provide a linked list, one
* item for each mapping. Each item contains the full context of the application
@ -230,6 +677,10 @@ int orte_pls_xcpu_launch(orte_jobid_t jobid){
/** Now loop through all the provided maps to launch their associated apps */
t_stack=NULL;
nprocs = 0;
peer_id=0;
if (ORTE_SUCCESS != (rc = orte_ns.get_job_peers(&peers, &num_peers, jobid))) {
ORTE_ERROR_LOG(rc);
}
for(item = opal_list_get_first(&mapping);
item != opal_list_get_end(&mapping);
item = opal_list_get_next(item)) {
@ -303,24 +754,29 @@ int orte_pls_xcpu_launch(orte_jobid_t jobid){
t_stack=temp_stack;
/** launch the process */
t_stack->tid=lrx(argc, map->app->argv, env);
t_stack->tid=lrx(argc, map->app->argv, env, &peers[peer_id]);
if(t_stack->tid==0){
/* first kill all the processes started on remote nodes
*/
i=0;
while(i<num_peers){
if (ORTE_SUCCESS != (orte_soh_base_set_proc_soh(&peers[i], ORTE_PROC_STATE_TERMINATED, 0)) ){
ORTE_ERROR_LOG(rc);
}
i++;
}
break;
}
peer_id++;
}
}
/** wait for all threads that have launched processes on remote nodes */
/*temp_stack=t_stack;
while(t_stack){
pthread_join(t_stack->tid, NULL);
t_stack=t_stack->next;
}
orte_soh.begin_monitoring_job(jobid);
*/
/** cleanup local storage */
orte_pls_xcpu_free_stack(temp_stack);
OBJ_DESTRUCT(&mapping);
/** launch complete */
/*fprintf(stdout, "launch finished\n");*/
return ORTE_SUCCESS;
}
@ -330,12 +786,6 @@ int orte_pls_xcpu_terminate_job(orte_jobid_t jobid){
int orte_pls_xcpu_terminate_proc(const orte_process_name_t* proc_name){
return ORTE_SUCCESS;
}
int orte_pls_xcpu_signal_job(orte_jobid_t jobid, int32_t signal){
return ORTE_SUCCESS;
}
int orte_pls_xcpu_signal_proc(const orte_process_name_t* proc_name, int32_t signal){
return ORTE_SUCCESS;
}
int orte_pls_xcpu_finalize(void){
return ORTE_SUCCESS;
}

Просмотреть файл

@ -8,7 +8,7 @@
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
@ -22,9 +22,10 @@
* @file:
* Header file for the xcpu launcher. This will use xcpu to launch jobs on
* the list of nodes that it will get from RAS (resource allocation
* system (slurm??)
* -# pls_xcpu is called by orterun. It reads the ompi registry and launch
* the binary on the nodes specified in the registry.
* system
* -# pls_xcpu is called by orterun. It first setsup environment for the
* process to be launched on remote node, then reads the ompi registry and
* then launch the binary on the nodes specified in the registry.
*/
#ifndef ORTE_PLS_XCPU_H_
@ -44,14 +45,13 @@ extern "C" {
/*
* Module open / close -- defined in component file
*/
int orte_pls_xcpu_component_open(void); /*probably do nothing*/
int orte_pls_xcpu_component_close(void); /*probably do nothing*/
int orte_pls_xcpu_component_open(void);
int orte_pls_xcpu_component_close(void);
/*
* Startup / Shutdown
*/
orte_pls_base_module_t* orte_pls_xcpu_init(int *priority); /* in component file */
/* int orte_pls_xcpu_finalize(void); */ /* should be with interface */
/*
* Interface
@ -59,8 +59,6 @@ orte_pls_base_module_t* orte_pls_xcpu_init(int *priority); /* in component file
int orte_pls_xcpu_launch(orte_jobid_t);
int orte_pls_xcpu_terminate_job(orte_jobid_t);
int orte_pls_xcpu_terminate_proc(const orte_process_name_t* proc_name);
int orte_pls_xcpu_signal_job(orte_jobid_t, int32_t);
int orte_pls_xcpu_signal_proc(const orte_process_name_t* proc_name, int32_t);
int orte_pls_xcpu_finalize(void);
@ -68,29 +66,20 @@ int orte_pls_xcpu_finalize(void);
* (P)rocess (L)aunch (S)ubsystem xcpu Component
*/
struct orte_pls_xcpu_component_t {
orte_pls_base_component_t super;/*base_class this is needed others below this are not*/
/*base_class this is needed others below this may or may not*/
orte_pls_base_component_t super;
/* most of the memebrs below are going to get removed from this structure
* and so are their registrations from open() function
*/
bool done_launching; /* Is true if we are done launching the user's app. */
int debug; /* If greater than 0 print debugging information */
int num_procs; /* The number of processes that are running */
int priority; /* The priority of this component. This will be returned if
* we determine that xcpu is available and running on this node,
*/
int terminate_sig; /* The signal that gets sent to a process to kill it. */
size_t num_daemons; /* The number of daemons that are currently running. */
orte_pointer_array_t * daemon_names;
opal_mutex_t lock; /* Lock used to prevent some race conditions */
opal_condition_t condition; /* Condition that is signaled when all the daemons have died */
orte_pointer_array_t * daemon_names;
/* Array of the process names of all the daemons. This is used to send
* the daemons a termonation signal when all the user processes are done */
orte_cellid_t cellid;
};
/**
* Convenience typedef
*/
typedef struct orte_pls_xcpu_component_t orte_pls_xcpu_component_t;
struct orte_pls_xcpu_tid_stack {
@ -99,6 +88,33 @@ struct orte_pls_xcpu_tid_stack {
};
typedef struct orte_pls_xcpu_tid_stack orte_pls_xcpu_tid_stack;
struct orte_pls_xcpu_mount_nodes{
char *name;
struct orte_pls_xcpu_mount_nodes *next;
};
typedef struct orte_pls_xcpu_mount_nodes orte_pls_xcpu_mount_nodes;
struct orte_pls_xcpu_thread_info{
orte_pls_xcpu_mount_nodes local_mounts;/* can have only *name */
char *binary;
char *argv;
char **env;
orte_process_name_t *peers;
};
typedef struct orte_pls_xcpu_thread_info orte_pls_xcpu_thread_info;
struct orte_pls_xcpu_stdio_thread_info{
char *stdio_path;
int outdes;
};
typedef struct orte_pls_xcpu_stdio_thread_info orte_pls_xcpu_stdio_thread_info;
struct orte_pls_xcpu_pthread_tindex{
pthread_t *tids;
int index;
};
typedef struct orte_pls_xcpu_pthread_tindex orte_pls_xcpu_pthread_tindex;
ORTE_DECLSPEC extern orte_pls_xcpu_component_t mca_pls_xcpu_component;
ORTE_DECLSPEC extern orte_pls_base_module_t orte_pls_xcpu_module; /* this is defined in pls_xcpu.c file */

Просмотреть файл

@ -8,7 +8,7 @@
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
@ -60,7 +60,7 @@ int orte_pls_xcpu_component_open(void) {
/* init parameters */
/*read trunk/opal/mca/base/mca_base_param.h for reg_int details*/
mca_base_component_t *c = &mca_pls_xcpu_component.super.pls_version;
mca_base_param_reg_int(c, "priority", NULL, false, false,0,
mca_base_param_reg_int(c, "priority", NULL, false, false,5,
&mca_pls_xcpu_component.priority);
mca_base_param_reg_int(c, "debug",
"If > 0 prints library debugging information",
@ -68,12 +68,9 @@ int orte_pls_xcpu_component_open(void) {
mca_base_param_reg_int(c, "terminate_sig",
"Signal sent to processes to terminate them", false,
false, 9, &mca_pls_xcpu_component.terminate_sig);
mca_pls_xcpu_component.num_procs = 0;
mca_pls_xcpu_component.num_daemons = 0;
mca_pls_xcpu_component.done_launching = false;
OBJ_CONSTRUCT(&mca_pls_xcpu_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_pls_xcpu_component.condition, opal_condition_t);
/* init the list to hold the daemon names */
rc = orte_pointer_array_init(&mca_pls_xcpu_component.daemon_names, 8, 200000, 8);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
@ -91,19 +88,10 @@ int orte_pls_xcpu_component_close(void) {
return ORTE_SUCCESS;
}
/**
* Initializes the module. We do not want to run unless, xcpu
* is running and we are on the control node.
*/
/* What I thnk is that this function will be called some where from (R)esource (M)ana(G)e(R)
* and then it will return orte_pls_xcpu_module that contains function pointers for launch,
* finalize etc. and then resource manager can call these functions
*/
orte_pls_base_module_t* orte_pls_xcpu_init(int *priority) {
/* check if xcpu component should be loaded or not
* if not, then return NULL here
*/
/*return NULL; *//*for time being*/
*priority = mca_pls_xcpu_component.priority;
return &orte_pls_xcpu_module; /* this is defined in pls_xcpu.c and will contains
* function pointers for launch, terminate_job