1
1
This commit was SVN r5745.
Этот коммит содержится в:
Thara Angskun 2005-05-18 15:55:34 +00:00
родитель 81d1d0322a
Коммит c71f3f7152
4 изменённых файлов: 285 добавлений и 21 удалений

Просмотреть файл

@ -1,2 +1,3 @@
angskun
zchen
thara

Просмотреть файл

@ -41,11 +41,14 @@ struct orte_pls_poe_component_t {
int priority;
int verbose;
char* path;
char* env;
char** argv;
int argc;
int debug;
char* orted;
char* class;
char* hostfile;
char* cmdfile;
int retry;
int retrycount;
};

Просмотреть файл

@ -131,7 +131,8 @@ int orte_pls_poe_component_open(void)
mca_pls_poe_component.class = orte_pls_poe_param_reg_string("class","interactive");
mca_pls_poe_component.retry = orte_pls_poe_param_reg_int("retry", 0);
mca_pls_poe_component.retrycount = orte_pls_poe_param_reg_int("retrycount", 0);
param = orte_pls_poe_param_reg_string("progname","poe");
mca_pls_poe_component.env = orte_pls_poe_param_reg_string("progenv","env");
param = orte_pls_poe_param_reg_string("progpoe","poe");
mca_pls_poe_component.argv = ompi_argv_split(param, ' ');
mca_pls_poe_component.argc = ompi_argv_count(mca_pls_poe_component.argv);
if (mca_pls_poe_component.argc > 0) {
@ -155,6 +156,10 @@ orte_pls_base_module_t *orte_pls_poe_component_init(int *priority)
if (NULL == mca_pls_poe_component.path) {
return NULL;
}
mca_pls_poe_component.env = ompi_path_findv(mca_pls_poe_component.env, 0, environ, NULL);
if (NULL == mca_pls_poe_component.env) {
return NULL;
}
*priority = mca_pls_poe_component.priority;
return &orte_pls_poe_module;
}

Просмотреть файл

@ -22,6 +22,7 @@
#include "ompi_config.h"
#include <fcntl.h>
#include <errno.h>
#include "include/orte_constants.h"
#include "mca/pls/pls.h"
@ -32,6 +33,7 @@
#include "mca/ns/ns.h"
#include "mca/rml/rml.h"
#include "mca/errmgr/errmgr.h"
#include "util/univ_info.h"
#include "util/argv.h"
#include "util/ompi_environ.h"
@ -71,7 +73,7 @@ int __poe_argv_append_int(int *argc, char ***argv, int varname, int min, char *a
return ORTE_SUCCESS;
}
int pls_poe_launch_interactive(orte_jobid_t jobid)
int pls_poe_launch_interactive_orted(orte_jobid_t jobid)
{
ompi_list_t nodes;
ompi_list_item_t* item;
@ -88,18 +90,19 @@ int pls_poe_launch_interactive(orte_jobid_t jobid)
int pid;
int rc;
int i;
char *hostfile, *cmdfile;
int status;
FILE *hfp, *cfp;
/* query the list of nodes allocated to the job - don't need the entire
* mapping - as the daemon/proxy is responsibe for determining the apps
* to launch on each node.
*/
if (mca_pls_poe_component.verbose > 10) ompi_output(0, "%s:--- BEGIN ---\n", __FUNCTION__);
if((hostfile=tempnam(NULL,NULL))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((cmdfile=tempnam(NULL,NULL))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((hfp=fopen(hostfile,"w"))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((cfp=fopen(cmdfile,"w"))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((mca_pls_poe_component.hostfile=tempnam(NULL,NULL))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((mca_pls_poe_component.cmdfile=tempnam(NULL,NULL))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((hfp=fopen(mca_pls_poe_component.hostfile,"w"))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((cfp=fopen(mca_pls_poe_component.cmdfile,"w"))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
OBJ_CONSTRUCT(&nodes, ompi_list_t);
rc = orte_ras_base_node_query_alloc(&nodes, jobid);
@ -120,25 +123,34 @@ int pls_poe_launch_interactive(orte_jobid_t jobid)
goto cleanup;
}
/* need integer value for command line parameter - NOT hex */
asprintf(&tmp_string, "%lu", (unsigned long)jobid);
/* application */
argv = ompi_argv_copy(ompi_argv_split(mca_pls_poe_component.orted, ' '));
argc = ompi_argv_count(argv);
if (mca_pls_poe_component.debug) {
ompi_argv_append(&argc, &argv, "--debug");
}
ompi_argv_append(&argc, &argv, "--debug-daemons");
ompi_argv_append(&argc, &argv, "--no-daemonize");
ompi_argv_append(&argc, &argv, "--bootproxy");
/* need integer value for command line parameter - NOT hex */
asprintf(&tmp_string, "%lu", (unsigned long)jobid);
ompi_argv_append(&argc, &argv, tmp_string);
free(tmp_string);
ompi_argv_append(&argc, &argv, "--name");
proc_name_index = argc;
ompi_argv_append(&argc, &argv, "");
ompi_argv_append(&argc, &argv, "--nodename");
node_name_index2 = argc;
ompi_argv_append(&argc, &argv, "");
/* pass along the universe name and location info */
ompi_argv_append(&argc, &argv, "--universe");
asprintf(&tmp_string, "%s@%s:%s", orte_universe_info.uid,
orte_universe_info.host, orte_universe_info.name);
ompi_argv_append(&argc, &argv, tmp_string);
free(tmp_string);
/* setup ns contact info */
ompi_argv_append(&argc, &argv, "--nsreplica");
@ -193,13 +205,14 @@ int pls_poe_launch_interactive(orte_jobid_t jobid)
exit(-1);
}
argv[proc_name_index] = name_string;
for(i=0;i<argc;i++) {
printf("%s ",argv[i]);
fprintf(cfp,"%s ",argv[i]);
}
printf("\n"); fflush(stdout);
fprintf(cfp,"\n");
if (mca_pls_poe_component.verbose) {
ompi_output(0, "%s:cmdfile %s\n", __FUNCTION__, ompi_argv_join(argv, ' '));
}
vpid++;
free(name);
}
@ -210,21 +223,31 @@ int pls_poe_launch_interactive(orte_jobid_t jobid)
argv = ompi_argv_copy(mca_pls_poe_component.argv);
argc = mca_pls_poe_component.argc;
ompi_argv_append(&argc, &argv, "-hostfile");
ompi_argv_append(&argc, &argv, hostfile);
ompi_argv_append(&argc, &argv, mca_pls_poe_component.hostfile);
ompi_argv_append(&argc, &argv, "-cmdfile");
ompi_argv_append(&argc, &argv, cmdfile);
ompi_argv_append(&argc, &argv, mca_pls_poe_component.cmdfile);
ompi_argv_append(&argc, &argv, "-procs");
asprintf(&tmp_string, "%d", num_nodes);
ompi_argv_append(&argc, &argv, tmp_string);
free(tmp_string);
ompi_argv_append(&argc, &argv, "-pgmmodel");
ompi_argv_append(&argc, &argv, "mpmd");
ompi_argv_append(&argc, &argv, "-resd");
ompi_argv_append(&argc, &argv, "no");
ompi_argv_append(&argc, &argv, "-labelio");
ompi_argv_append(&argc, &argv, "yes");
ompi_argv_append(&argc, &argv, "-infolevel");
ompi_argv_append(&argc, &argv, "6");
ompi_argv_append(&argc, &argv, "-stdoutmode");
ompi_argv_append(&argc, &argv, "ordered");
rc=__poe_argv_append_int(&argc, &argv, mca_pls_poe_component.retry, 0, "-retry");
if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; }
rc=__poe_argv_append_int(&argc, &argv, mca_pls_poe_component.retrycount, 0, "-retrycount");
if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; }
ompi_output(0, "%s", ompi_argv_join(argv, ' '));
if (mca_pls_poe_component.verbose) {
ompi_output(0, "%s:cmdline %s\n", __FUNCTION__, ompi_argv_join(argv, ' '));
}
pid = fork();
if(pid < 0) {
@ -237,15 +260,243 @@ int pls_poe_launch_interactive(orte_jobid_t jobid)
execv(mca_pls_poe_component.path, argv);
ompi_output(0, "orte_pls_poe: execv failed with errno=%d\n", errno);
exit(-1);
} else {
/*
waitpid(pid,&status,0);
*/
}
cleanup:
while(NULL != (item = ompi_list_remove_first(&nodes))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&nodes);
if (mca_pls_poe_component.verbose > 10) ompi_output(0, "%s: --- END rc(%d) ---\n", __FUNCTION__, rc);
return rc;
}
static int orte_pls_poe_launch_create_cmd_file(
FILE *cfp,
orte_app_context_t* context,
orte_rmaps_base_proc_t* proc,
orte_vpid_t vpid_start,
orte_vpid_t vpid_range)
{
pid_t pid;
int rc;
int i;
char* param;
char* uri;
char **environ_copy;
/* setup base environment */
environ_copy = NULL;
param = mca_base_param_environ_variable("rmgr","bootproxy","jobid");
ompi_unsetenv(param, &environ_copy);
/* setup universe info */
if (NULL != orte_universe_info.name) {
param = mca_base_param_environ_variable("universe", NULL, NULL);
asprintf(&uri, "%s@%s:%s", orte_universe_info.uid,
orte_universe_info.host,
orte_universe_info.name);
ompi_setenv(param, uri, true, &environ_copy);
free(param);
free(uri);
}
/* setup ns contact info */
if(NULL != orte_process_info.ns_replica_uri) {
uri = strdup(orte_process_info.ns_replica_uri);
} else {
uri = orte_rml.get_uri();
}
param = mca_base_param_environ_variable("ns","replica","uri");
ompi_setenv(param, uri, true, &environ_copy);
free(param);
free(uri);
/* setup gpr contact info */
if(NULL != orte_process_info.gpr_replica_uri) {
uri = strdup(orte_process_info.gpr_replica_uri);
} else {
uri = orte_rml.get_uri();
}
param = mca_base_param_environ_variable("gpr","replica","uri");
ompi_setenv(param, uri, true, &environ_copy);
free(param);
free(uri);
/* push name into environment */
orte_ns_nds_env_put(&proc->proc_name, vpid_start, vpid_range, &environ_copy);
if (context->argv == NULL) {
context->argv = malloc(sizeof(char*)*2);
context->argv[0] = strdup(context->app);
context->argv[1] = NULL;
}
i=0;
fprintf(cfp,"%s",mca_pls_poe_component.env);
while(environ_copy[i]!=NULL) {
fprintf(cfp," %s",environ_copy[i++]);
}
ompi_argv_free(environ_copy);
fprintf(cfp," %s",context->app);
i=1;
while(context->argv[i]!=NULL) {
fprintf(cfp," %s",context->argv[i++]);
}
fprintf(cfp,"\n"); /* POE will upset if you don't have end line. */
return ORTE_SUCCESS;
}
int orte_pls_poe_launch_interactive(orte_jobid_t jobid)
{
ompi_list_t map;
ompi_list_item_t* item;
orte_vpid_t vpid_start;
orte_vpid_t vpid_range;
size_t num_nodes, num_procs;
ompi_list_t nodes;
char *tmp_string;
int rc;
FILE *hfp, *cfp;
char** argv;
int status;
int argc;
int pid;
if (mca_pls_poe_component.verbose > 10) ompi_output(0, "%s:--- BEGIN ---\n", __FUNCTION__);
if((mca_pls_poe_component.cmdfile=tempnam(NULL,NULL))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((cfp=fopen(mca_pls_poe_component.cmdfile,"w"))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
OBJ_CONSTRUCT(&nodes, ompi_list_t);
rc = orte_ras_base_node_query_alloc(&nodes, jobid);
if(ORTE_SUCCESS != rc) {
goto cleanup;
}
num_nodes = ompi_list_get_size(&nodes);
if(0 < num_nodes) {
/* If user specify hosts */
if((mca_pls_poe_component.hostfile=tempnam(NULL,NULL))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((hfp=fopen(mca_pls_poe_component.hostfile,"w"))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
for(item = ompi_list_get_first(&nodes);
item != ompi_list_get_end(&nodes);
item = ompi_list_get_next(item)) {
orte_ras_base_node_t* node = (orte_ras_base_node_t*)item;
fprintf(hfp,"%s\n",node->node_name);
}
fclose(hfp);
}
rc = orte_rmgr_base_get_job_slots(jobid, &num_procs);
if(ORTE_SUCCESS != rc) {
return rc;
}
/* query allocation for the job */
OBJ_CONSTRUCT(&map, ompi_list_t);
rc = orte_rmaps_base_get_map(jobid,&map);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_rmaps_base_get_vpid_range(jobid, &vpid_start, &vpid_range);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* setup a POE command file */
for(item = ompi_list_get_first(&map);
item != ompi_list_get_end(&map);
item = ompi_list_get_next(item)) {
orte_rmaps_base_map_t* map2 = (orte_rmaps_base_map_t*)item;
size_t i;
for(i=0; i<map2->num_procs; i++) {
rc = orte_pls_poe_launch_create_cmd_file(cfp,map2->app, map2->procs[i], vpid_start, vpid_range);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
fclose(cfp);
/********************/
/* POE Command line */
/********************/
argv = ompi_argv_copy(mca_pls_poe_component.argv);
argc = mca_pls_poe_component.argc;
if(num_nodes > 0) {
ompi_argv_append(&argc, &argv, "-hostfile");
ompi_argv_append(&argc, &argv, mca_pls_poe_component.hostfile);
ompi_argv_append(&argc, &argv, "-nodes");
asprintf(&tmp_string, "%d", num_nodes);
ompi_argv_append(&argc, &argv, tmp_string);
free(tmp_string);
ompi_argv_append(&argc, &argv, "-resd");
ompi_argv_append(&argc, &argv, "no");
}
ompi_argv_append(&argc, &argv, "-cmdfile");
ompi_argv_append(&argc, &argv, mca_pls_poe_component.cmdfile);
ompi_argv_append(&argc, &argv, "-procs");
asprintf(&tmp_string, "%d", num_procs);
ompi_argv_append(&argc, &argv, tmp_string);
free(tmp_string);
ompi_argv_append(&argc, &argv, "-pgmmodel");
ompi_argv_append(&argc, &argv, "mpmd");
rc=__poe_argv_append_int(&argc, &argv, mca_pls_poe_component.retry, 0, "-retry");
if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; }
rc=__poe_argv_append_int(&argc, &argv, mca_pls_poe_component.retrycount, 0, "-retrycount");
if(ORTE_SUCCESS!=rc) { ORTE_ERROR_LOG(rc); goto cleanup; }
/* FIXME: Debugging only! */
ompi_argv_append(&argc, &argv, "-labelio");
ompi_argv_append(&argc, &argv, "yes");
ompi_argv_append(&argc, &argv, "-infolevel");
ompi_argv_append(&argc, &argv, "6");
ompi_argv_append(&argc, &argv, "-stdoutmode");
ompi_argv_append(&argc, &argv, "ordered");
if (mca_pls_poe_component.verbose) {
ompi_output(0, "%s:cmdline %s\n", __FUNCTION__, ompi_argv_join(argv, ' '));
}
pid = fork();
if(pid < 0) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if(pid == 0) {
execv(mca_pls_poe_component.path, argv);
ompi_output(0, "orte_pls_poe: execv failed with errno=%d\n", errno);
exit(-1);
} else {
/*
ompi_output(0, "\n\nBEFORE WAIT!!\n\n");
orte_waitpid(pid,&status,0);
ompi_output(0, "\n\nAFTER WAIT!!\n\n");
*/
}
cleanup:
/*
while(NULL != (item = ompi_list_remove_first(&map))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&map);
*/
if (mca_pls_poe_component.verbose>10) ompi_output(0, "%s: --- END rc(%d) ---\n", __FUNCTION__, rc);
return rc;
}
@ -256,7 +507,7 @@ cleanup:
static int pls_poe_launch(orte_jobid_t jobid)
{
if(!strncmp(mca_pls_poe_component.class,"interactive",11)) {
return pls_poe_launch_interactive(jobid);
return orte_pls_poe_launch_interactive(jobid);
}
return ORTE_ERR_NOT_IMPLEMENTED;
}
@ -275,5 +526,9 @@ static int pls_poe_terminate_proc(const orte_process_name_t *name)
static int pls_poe_finalize(void)
{
return ORTE_ERR_NOT_IMPLEMENTED;
if (mca_pls_poe_component.verbose > 10) ompi_output(0, "%s: --- BEGIN ---\n", __FUNCTION__);
unlink(mca_pls_poe_component.cmdfile);
unlink(mca_pls_poe_component.hostfile);
if (mca_pls_poe_component.verbose > 10) ompi_output(0, "%s: --- END ---\n", __FUNCTION__);
return ORTE_SUCCESS;
}