diff --git a/orte/mca/pls/bproc_seed/Makefile.am b/orte/mca/pls/bproc_seed/Makefile.am new file mode 100644 index 0000000000..0de779092b --- /dev/null +++ b/orte/mca/pls/bproc_seed/Makefile.am @@ -0,0 +1,50 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University. +# All rights reserved. +# Copyright (c) 2004-2005 The Trustees of the University of Tennessee. +# All rights reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + + + +AM_CPPFLAGS = -I$(top_ompi_builddir)/src/include $(pls_bproc_seed_CPPFLAGS) + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if OMPI_BUILD_pls_bproc_seed_DSO +component_noinst = +component_install = mca_pls_bproc_seed.la +else +component_noinst = libmca_pls_bproc_seed.la +component_install = +endif + +sources = \ + pls_bproc_seed.h \ + pls_bproc_seed.c \ + pls_bproc_seed_component.c + +mcacomponentdir = $(libdir)/openmpi +mcacomponent_LTLIBRARIES = $(component_install) +mca_pls_bproc_seed_la_SOURCES = $(sources) +mca_pls_bproc_seed_la_LIBADD = \ + $(pls_bproc_seed_LIBS) \ + $(top_ompi_builddir)/orte/liborte.la \ + $(top_ompi_builddir)/opal/libopal.la +mca_pls_bproc_seed_la_LDFLAGS = -module -avoid-version $(pls_bproc_seed_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_pls_bproc_seed_la_SOURCES = $(sources) +libmca_pls_bproc_seed_la_LIBADD = $(pls_bproc_seed_LIBS) +libmca_pls_bproc_seed_la_LDFLAGS = -module -avoid-version $(pls_bproc_seed_LDFLAGS) diff --git a/orte/mca/pls/bproc_seed/configure.m4 b/orte/mca/pls/bproc_seed/configure.m4 new file mode 100644 index 0000000000..1117e0a827 --- /dev/null +++ b/orte/mca/pls/bproc_seed/configure.m4 @@ -0,0 +1,39 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University. +# All rights reserved. +# Copyright (c) 2004-2005 The Trustees of the University of Tennessee. +# All rights reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_pls_bproc_seed_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_pls_bproc_seed_CONFIG],[ + # only accept newer non-Scyld bproc + OMPI_CHECK_BPROC([pls_bproc_seed], [pls_bproc_seed_good=2], + [pls_bproc_seed_good=1], [pls_bproc_seed_good=0]) + + # if check worked, set wrapper flags if so. + # Evaluate succeed / fail + AS_IF([test "$pls_bproc_seed_good" = "2"], + [pls_bproc_seed_WRAPPER_EXTRA_LDFLAGS="$pls_bproc_seed_LDFLAGS" + pls_bproc_seed_WRAPPER_EXTRA_LIBS="$pls_bproc_seed_LIBS" + $1], + [$2]) + AS_IF([test "$pls_bproc_good" = "1"], + [AC_MSG_WARN([***Scyld bproc is not supported by this component***])]) + + # set build flags to use in makefile + AC_SUBST([pls_bproc_seed_CPPFLAGS]) + AC_SUBST([pls_bproc_seed_LDFLAGS]) + AC_SUBST([pls_bproc_seed_LIBS]) +])dnl diff --git a/orte/mca/pls/bproc_seed/configure.params b/orte/mca/pls/bproc_seed/configure.params new file mode 100644 index 0000000000..babf7d6d8d --- /dev/null +++ b/orte/mca/pls/bproc_seed/configure.params @@ -0,0 +1,22 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University. +# All rights reserved. +# Copyright (c) 2004-2005 The Trustees of the University of Tennessee. +# All rights reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Specific to this module + +PARAM_INIT_FILE=pls_bproc_seed.c +PARAM_CONFIG_HEADER_FILE="pls_bproc_seed_config.h" +PARAM_CONFIG_FILES="Makefile" diff --git a/orte/mca/pls/bproc_seed/pls_bproc_seed.c b/orte/mca/pls/bproc_seed/pls_bproc_seed.c new file mode 100644 index 0000000000..ba3054b255 --- /dev/null +++ b/orte/mca/pls/bproc_seed/pls_bproc_seed.c @@ -0,0 +1,874 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "ompi_config.h" +#include +#include +#include +#include +#include +#include + +#include "opal/util/argv.h" +#include "opal/util/output.h" +#include "opal/util/opal_environ.h" +#include "util/proc_info.h" +#include "opal/event/event.h" +#include "runtime/orte_wait.h" +#include "runtime/runtime.h" +#include "mca/ns/base/base.h" +#include "mca/sds/base/base.h" +#include "mca/pls/base/base.h" +#include "mca/base/mca_base_param.h" +#include "mca/iof/iof.h" +#include "mca/rmgr/base/base.h" +#include "mca/rmaps/base/base.h" +#include "mca/rml/rml.h" +#include "mca/errmgr/errmgr.h" +#include "mca/soh/soh.h" +#include "mca/soh/base/base.h" +#include "mca/ras/base/base.h" +#include "mca/rmaps/base/rmaps_base_map.h" + +#include "pls_bproc_seed.h" + +#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS +int orte_pls_bproc_seed_launch_threaded(orte_jobid_t); +#endif + + +orte_pls_base_module_t orte_pls_bproc_seed_module = { +#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS + orte_pls_bproc_seed_launch_threaded, +#else + orte_pls_bproc_seed_launch, +#endif + orte_pls_bproc_seed_terminate_job, + orte_pls_bproc_seed_terminate_proc, + orte_pls_bproc_seed_finalize +}; + + + +static int orte_pls_bproc_nodelist(orte_rmaps_base_map_t* map, int** nodelist, size_t* num_nodes) +{ + opal_list_item_t* item; + size_t count = opal_list_get_size(&map->nodes); + size_t index = 0; + + /* build the node list */ + *nodelist = (int*)malloc(sizeof(int) * count); + if(NULL == *nodelist) + return OMPI_ERR_OUT_OF_RESOURCE; + + for(item = opal_list_get_first(&map->nodes); + item != opal_list_get_end(&map->nodes); + item = opal_list_get_next(item)) { + orte_rmaps_base_node_t* node = (orte_rmaps_base_node_t*)item; + (*nodelist)[index++] = atol(node->node_name); + } + *num_nodes = count; + return OMPI_SUCCESS; +} + +/* + * Execute/dump a process and read the image into memory. + */ + +static int orte_pls_bproc_dump(orte_app_context_t* app, uint8_t** image, size_t* image_len) +{ + pid_t pid; + int pfd[2]; + size_t cur_offset, tot_offset, num_buffers; + uint8_t *image_buffer; + int rc = ORTE_SUCCESS; + + if (pipe(pfd)) { + opal_output(0, "orte_pls_bproc_seed: pipe() failed errno=%d\n",errno); + return ORTE_ERROR; + } + + if ((pid = fork ()) < 0) { + opal_output(0, "orte_pls_bproc_seed: fork() failed errno=%d\n",errno); + return ORTE_ERROR; + } + + if (pid == 0) { + close(pfd[0]); /* close the read end - we are write only */ + chdir(app->cwd); + bproc_execdump(pfd[1], BPROC_DUMP_EXEC | BPROC_DUMP_OTHER, app->app, app->argv, app->env); + exit(0); + } + + /* this is the parent - I will read the + * info coming from the pipe + */ + + close(pfd[1]); /* close the sending end - we are read only */ + image_buffer = (uint8_t*)malloc(mca_pls_bproc_seed_component.image_frag_size); + if (!image_buffer) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + tot_offset = 0; + cur_offset = 0; + num_buffers = 1; + while (1) { + int num_bytes = read(pfd[0], image_buffer + tot_offset, mca_pls_bproc_seed_component.image_frag_size - cur_offset); + if (0 > num_bytes) { /* got an error - abort process */ + free(image_buffer); + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } else if (0 == num_bytes) { + break; + } + + tot_offset += num_bytes; + cur_offset += num_bytes; + if (mca_pls_bproc_seed_component.image_frag_size == cur_offset) { /* filled the current buffer - need to realloc */ + num_buffers++; + image_buffer = (uint8_t*)realloc(image_buffer, num_buffers * mca_pls_bproc_seed_component.image_frag_size); + if(NULL == image_buffer) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + cur_offset = 0; + } + } + *image = image_buffer; + *image_len = tot_offset; + + if(tot_offset == 0) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + +cleanup: + close(pfd[0]); + waitpid(pid,0,0); + return rc; +} + +/* + * Spew out a new child based on the in-memory process image. + */ + +static int orte_pls_bproc_undump( + orte_rmaps_base_proc_t* proc, + orte_vpid_t vpid_start, + orte_vpid_t vpid_range, + uint8_t* image, + size_t image_len, + pid_t* pid) +{ + int p_name[2]; + int p_stdout[2]; + int p_stderr[2]; + int p_image[2]; + int rc; + size_t bytes_writen = 0; + + if(pipe(p_name) < 0 || + pipe(p_stdout) < 0 || + pipe(p_stderr) < 0 || + pipe(p_image) < 0) { + ORTE_ERROR_LOG(rc); + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* fork a child process which is overwritten with the process image */ + *pid = fork(); + if (*pid == 0) { + + /* child is read-only */ + close(p_image[1]); + close(p_name[1]); + + /* child is write-only */ + close(p_stdout[0]); + close(p_stderr[0]); + + /* setup stdout/stderr */ + dup2(p_stdout[1], 1); + close(p_stdout[1]); + dup2(p_stderr[1], 2); + close(p_stderr[1]); + + /* verify that the name file descriptor is free */ + if(p_image[0] == mca_pls_bproc_seed_component.name_fd) { + int fd = dup(p_image[0]); + close(p_image[0]); + p_image[0] = fd; + } + if(p_name[0] != mca_pls_bproc_seed_component.name_fd) { + dup2(p_name[0], mca_pls_bproc_seed_component.name_fd); + close(p_name[0]); + } + + bproc_undump(p_image[0]); /* child is now executing */ + opal_output(0, "orte_pls_bproc: bproc_undump(%d) failed errno=%d\n", p_image[0], errno); + exit(1); + } + + if (*pid < 0) { + close(p_image[0]); + close(p_image[1]); + ORTE_ERROR_LOG(rc); + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* parent is write-only */ + close(p_image[0]); + close(p_name[0]); + + /* parent is read-only */ + close(p_stdout[1]); + close(p_stderr[1]); + + if(*pid < 0) { + close(p_image[1]); + close(p_name[1]); + close(p_stdout[0]); + close(p_stderr[0]); + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* connect the app to the IOF framework */ + rc = orte_iof.iof_publish(&proc->proc_name, ORTE_IOF_SOURCE, ORTE_IOF_STDOUT, p_stdout[0]); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return rc; + } + + rc = orte_iof.iof_publish(&proc->proc_name, ORTE_IOF_SOURCE, ORTE_IOF_STDERR, p_stderr[0]); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* set the process status in the registery - child is not running yet */ + rc = orte_pls_base_set_proc_pid(&proc->proc_name, *pid); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* write the process image to the app */ + while(bytes_writen < image_len) { + rc = write(p_image[1], image+bytes_writen, image_len-bytes_writen); + if(rc < 0) { + opal_output(0, "orte_pls_bproc_undump: write failed errno=%d\n", errno); + return ORTE_ERROR; + } + bytes_writen += rc; + } + close(p_image[1]); + + /* write the process name */ + orte_ns_nds_pipe_put(&proc->proc_name, vpid_start, vpid_range, p_name[1]); + close(p_name[1]); + return ORTE_SUCCESS; +} + + +/* + * Wait for a callback indicating the child has completed. + */ + +static void orte_pls_bproc_wait_proc(pid_t pid, int status, void* cbdata) +{ + orte_rmaps_base_proc_t* proc = (orte_rmaps_base_proc_t*)cbdata; + + /* set the state of this process */ + if(NULL != proc) { + int rc = orte_soh.set_proc_soh(&proc->proc_name, ORTE_PROC_STATE_TERMINATED, status); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + } + OBJ_RELEASE(proc); + } + + /* release any waiting threads */ + OPAL_THREAD_LOCK(&mca_pls_bproc_seed_component.lock); + mca_pls_bproc_seed_component.num_children--; + opal_condition_signal(&mca_pls_bproc_seed_component.condition); + OPAL_THREAD_UNLOCK(&mca_pls_bproc_seed_component.lock); +} + + +/* + * Wait for a callback indicating the daemon has exited. + */ +static void orte_pls_bproc_wait_node(pid_t pid, int status, void* cbdata) +{ + orte_rmaps_base_node_t* node = (orte_rmaps_base_node_t*)cbdata; + opal_list_item_t* item; + + /* set state of all processes associated with the daemon as terminated */ + for(item = opal_list_get_first(&node->node_procs); + item != opal_list_get_end(&node->node_procs); + item = opal_list_get_next(item)) { + orte_rmaps_base_proc_t* proc = (orte_rmaps_base_proc_t*)item; + + int rc = orte_soh.set_proc_soh(&proc->proc_name, ORTE_PROC_STATE_TERMINATED, 0); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + } + } + OBJ_RELEASE(node); + + /* release any waiting threads */ + OPAL_THREAD_LOCK(&mca_pls_bproc_seed_component.lock); + mca_pls_bproc_seed_component.num_children--; + opal_condition_signal(&mca_pls_bproc_seed_component.condition); + OPAL_THREAD_UNLOCK(&mca_pls_bproc_seed_component.lock); +} + + +/* + * (1) Execute/dump the process image and read into memory. + * (2) Fork a daemon across the allocated set of nodes. + * (3) Fork/undump the required number of copies of the process + * on each of the nodes. + */ + +static int orte_pls_bproc_launch_app( + orte_jobid_t jobid, + orte_rmaps_base_map_t* map, + orte_vpid_t vpid_start, + orte_vpid_t vpid_range) +{ + uint8_t* image = NULL; + size_t image_len; + int* node_list = NULL; + int* daemon_pids = NULL; + size_t num_nodes; + orte_vpid_t daemon_vpid_start = 0; + int rc, index; + char* uri; + char *var; + int num_env; + + /* convert node names to bproc nodelist */ + if(ORTE_SUCCESS != (rc = orte_pls_bproc_nodelist(map, &node_list, &num_nodes))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + if(NULL == (daemon_pids = (int*)malloc(sizeof(int) * num_nodes))) { + goto cleanup; + } + + /* append mca parameters to our environment */ + num_env = map->app->num_env; + if(ORTE_SUCCESS != (rc = mca_base_param_build_env(&map->app->env, &num_env, true))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + map->app->num_env = num_env; + + /* overwrite seed setting */ + var = mca_base_param_environ_variable("seed",NULL,NULL); + opal_setenv(var, "0", true, &map->app->env); + + /* set name discovery mode */ + var = mca_base_param_environ_variable("ns","nds",NULL); + opal_setenv(var, "pipe", true, &map->app->env); + free(var); + + /* ns replica contact info */ + if(NULL == orte_process_info.ns_replica) { + rc = orte_ns.copy_process_name(&orte_process_info.ns_replica,orte_process_info.my_name); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + orte_process_info.ns_replica_uri = orte_rml.get_uri(); + } + var = mca_base_param_environ_variable("ns","replica","uri"); + opal_setenv(var,orte_process_info.ns_replica_uri, true, &map->app->env); + free(var); + + /* gpr replica contact info */ + if(NULL == orte_process_info.gpr_replica) { + rc = orte_ns.copy_process_name(&orte_process_info.gpr_replica,orte_process_info.my_name); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + orte_process_info.gpr_replica_uri = orte_rml.get_uri(); + } + var = mca_base_param_environ_variable("gpr","replica","uri"); + opal_setenv(var,orte_process_info.gpr_replica_uri, true, &map->app->env); + free(var); + + /* overwrite previously specified values with the above settings */ + map->app->num_env = opal_argv_count(map->app->env); + + /* read process image */ + if(ORTE_SUCCESS != (rc = orte_pls_bproc_dump(map->app, &image, &image_len))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* allocate a range of vpids for the daemons */ + if(ORTE_SUCCESS != (rc = orte_ns.reserve_range(0, num_nodes, &daemon_vpid_start))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* save our contact information - push out to daemons */ + if(NULL == (uri = orte_rml.get_uri())) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* replicate the process image to all nodes */ + rc = bproc_vrfork(num_nodes, node_list, daemon_pids); + if(rc < 0) { + ORTE_ERROR_LOG(rc); + return OMPI_ERROR; + } + + /* return is the rank of the child or number of nodes in the parent */ + if(rc < (int)num_nodes) { + + opal_list_item_t* item; + orte_rmaps_base_node_t* node = NULL; + orte_process_name_t* daemon_name; + int fd; + int rank = rc; + + /* connect stdin to /dev/null */ + fd = open("/dev/null", O_RDONLY); + if(fd >= 0) { + if(fd != 0) { + dup2(fd, 0); + close(fd); + } + } + + /* connect stdout/stderr to a file */ + fd = open("/dev/null", O_CREAT|O_WRONLY|O_TRUNC, 0666); + if(fd >= 0) { + if(fd != 1) { + dup2(fd,1); + } + if(fd != 2) { + dup2(fd,2); + } + if(fd > 2) { + close(fd); + } + } else { + _exit(-1); + } + + if(mca_pls_bproc_seed_component.debug) { + opal_output(0, "orte_pls_bproc: rank=%d\n", rank); + } + + /* find this node */ + index = 0; + for(item = opal_list_get_first(&map->nodes); + item != opal_list_get_end(&map->nodes); + item = opal_list_get_next(item)) { + if(index++ == rank) { + node = (orte_rmaps_base_node_t*)item; + break; + } + } + if(NULL == node) { + rc = ORTE_ERR_NOT_FOUND; + ORTE_ERROR_LOG(rc); + _exit(-1); + } + + /* setup the daemons process name */ + rc = orte_ns.create_process_name( + &daemon_name, orte_process_info.my_name->cellid, 0, daemon_vpid_start + rank); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + _exit(-1); + } + if(mca_pls_bproc_seed_component.debug) { + opal_output(0, "orte_pls_bproc: node=%s name=%d.%d.%d procs=%d\n", + node->node_name, + orte_process_info.my_name->cellid, 0, + daemon_vpid_start+rank, + opal_list_get_size(&node->node_procs)); + } + + /* restart the daemon w/ the new process name */ + rc = orte_restart(daemon_name, uri); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + _exit(-1); + } + + /* save the daemons pid in the registry */ + rc = orte_pls_base_set_node_pid(node->node_cellid, node->node_name, jobid, getpid()); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + _exit(-1); + } + + /* start the required number of copies of the application */ + index = 0; + for(item = opal_list_get_first(&node->node_procs); + item != opal_list_get_end(&node->node_procs); + item = opal_list_get_next(item)) { + orte_rmaps_base_proc_t* proc = (orte_rmaps_base_proc_t*)item; + pid_t pid; + + if(mca_pls_bproc_seed_component.debug) { + opal_output(0, "orte_pls_bproc: starting: [%lu,%lu,%lu]\n", ORTE_NAME_ARGS(&proc->proc_name)); + } + rc = orte_pls_bproc_undump(proc, vpid_start, vpid_range, image, image_len, &pid); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + _exit(1); + } + + OPAL_THREAD_LOCK(&mca_pls_bproc_seed_component.lock); + mca_pls_bproc_seed_component.num_children++; + OPAL_THREAD_UNLOCK(&mca_pls_bproc_seed_component.lock); + OBJ_RETAIN(proc); + orte_wait_cb(pid, orte_pls_bproc_wait_proc, proc); + + if(mca_pls_bproc_seed_component.debug) { + opal_output(0, "orte_pls_bproc: started: [%lu,%lu,%lu]\n", ORTE_NAME_ARGS(&proc->proc_name)); + } + } + + /* free memory associated with the process image */ + free(image); + + /* wait for all children to complete */ + if(mca_pls_bproc_seed_component.debug) { + opal_output(0, "orte_pls_bproc: waiting for %d children", mca_pls_bproc_seed_component.num_children); + } + OPAL_THREAD_LOCK(&mca_pls_bproc_seed_component.lock); + while(mca_pls_bproc_seed_component.num_children > 0) { + opal_condition_wait( + &mca_pls_bproc_seed_component.condition, + &mca_pls_bproc_seed_component.lock); + } + OPAL_THREAD_UNLOCK(&mca_pls_bproc_seed_component.lock); + + /* daemon is done when all children have completed */ + orte_finalize(); + _exit(0); + + } else { + opal_list_item_t* item; + + /* post wait callback the daemons to complete */ + index = 0; + while(NULL != (item = opal_list_remove_first(&map->nodes))) { + orte_rmaps_base_node_t* node = (orte_rmaps_base_node_t*)item; + + OPAL_THREAD_LOCK(&mca_pls_bproc_seed_component.lock); + mca_pls_bproc_seed_component.num_children++; + OPAL_THREAD_UNLOCK(&mca_pls_bproc_seed_component.lock); + orte_wait_cb(daemon_pids[index++], orte_pls_bproc_wait_node, node); + } + + /* release resources */ + rc = ORTE_SUCCESS; + } + +cleanup: + if(NULL != image) + free(image); + if(NULL != node_list) + free(node_list); + if(NULL != daemon_pids) + free(daemon_pids); + return rc; +} + + +/* + * Query for the default mapping. Launch each application context + * w/ a distinct set of daemons. + */ + +int orte_pls_bproc_seed_launch(orte_jobid_t jobid) +{ + opal_list_item_t* item; + opal_list_t mapping; + orte_vpid_t vpid_start; + orte_vpid_t vpid_range; + int rc; + + /* query for the application context and allocated nodes */ + OBJ_CONSTRUCT(&mapping, opal_list_t); + if(ORTE_SUCCESS != (rc = orte_rmaps_base_get_map(jobid, &mapping))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if(ORTE_SUCCESS != (rc = orte_rmaps_base_get_vpid_range(jobid, &vpid_start, &vpid_range))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* for each application context - launch across the first n nodes required */ + for(item = opal_list_get_first(&mapping); + item != opal_list_get_end(&mapping); + item = opal_list_get_next(item)) { + orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)item; + rc = orte_pls_bproc_launch_app(jobid, map, vpid_start, vpid_range); + if(rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + +cleanup: + while(NULL != (item = opal_list_remove_first(&mapping))) + OBJ_RELEASE(item); + OBJ_DESTRUCT(&mapping); + return rc; +} + + +/** + * Terminate all processes associated with this job - including + * daemons. + */ + +int orte_pls_bproc_seed_terminate_job(orte_jobid_t jobid) +{ + pid_t* pids; + pid_t my_pid = getpid(); + size_t i, num_pids; + int rc; + + /* kill application process */ + if(ORTE_SUCCESS != (rc = orte_pls_base_get_proc_pids(jobid, &pids, &num_pids))) + return rc; + for(i=0; imutex, opal_mutex_t); + OBJ_CONSTRUCT(&stack->cond, opal_condition_t); + stack->rc = 0; + stack->complete = false; +} + +static void orte_pls_bproc_stack_destruct(orte_pls_bproc_stack_t* stack) +{ + OBJ_DESTRUCT(&stack->mutex); + OBJ_DESTRUCT(&stack->cond); +} + +static OBJ_CLASS_INSTANCE( + orte_pls_bproc_stack_t, + opal_object_t, + orte_pls_bproc_stack_construct, + orte_pls_bproc_stack_destruct); + + +static void orte_pls_bproc_seed_launch_cb(int fd, short event, void* args) +{ + orte_pls_bproc_stack_t *stack = (orte_pls_bproc_stack_t*)args; + orte_vpid_t child_vpid; + orte_process_name_t* child_name; + char* uri; + int pid; + int rc; + + /* setup the daemons process name */ + rc = orte_ns.reserve_range(orte_process_info.my_name->jobid,1,&child_vpid); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + stack->rc = rc; + goto complete; + } + rc = orte_ns.create_process_name( + &child_name, orte_process_info.my_name->cellid, + orte_process_info.my_name->jobid, + child_vpid); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + stack->rc = rc; + goto complete; + } + uri = orte_rml.get_uri(); + + /* fork the child */ + pid = fork(); + if(pid < 0) { + opal_output(0, "orte_pls_bproc: fork failed with errno=%d\n", errno); + stack->rc = ORTE_ERR_OUT_OF_RESOURCE; + + } else if (pid == 0) { + + pthread_kill_other_threads_np(); + opal_set_using_threads(false); + if(NULL == orte_process_info.ns_replica) { + rc = orte_ns.copy_process_name(&orte_process_info.ns_replica,orte_process_info.my_name); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + exit(rc); + } + orte_process_info.ns_replica_uri = orte_rml.get_uri(); + } + + if(NULL == orte_process_info.gpr_replica) { + rc = orte_ns.copy_process_name(&orte_process_info.gpr_replica,orte_process_info.my_name); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + exit(rc); + } + orte_process_info.gpr_replica_uri = orte_rml.get_uri(); + } + + /* restart the daemon w/ the new process name */ + rc = orte_restart(child_name, uri); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + exit(rc); + } + + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + exit(rc); + } + rc = orte_pls_bproc_seed_launch(stack->jobid); + orte_finalize(); + exit(rc); + + } else { + + OPAL_THREAD_LOCK(&mca_pls_bproc_seed_component.lock); + mca_pls_bproc_seed_component.num_children++; + OPAL_THREAD_UNLOCK(&mca_pls_bproc_seed_component.lock); + orte_wait_cb(pid, orte_pls_bproc_wait_proc, NULL); + + stack->rc = ORTE_SUCCESS; + } + +complete: + OPAL_THREAD_LOCK(&stack->mutex); + stack->complete = true; + opal_condition_signal(&stack->cond); + OPAL_THREAD_UNLOCK(&stack->mutex); +} + +int orte_pls_bproc_seed_launch_threaded(orte_jobid_t jobid) +{ + struct timeval tv = { 0, 0 }; + struct opal_event event; + struct orte_pls_bproc_stack_t stack; + + OBJ_CONSTRUCT(&stack, orte_pls_bproc_stack_t); + + stack.jobid = jobid; + opal_evtimer_set(&event, orte_pls_bproc_seed_launch_cb, &stack); + opal_evtimer_add(&event, &tv); + + OPAL_THREAD_LOCK(&stack.mutex); + while(stack.complete == false) + opal_condition_wait(&stack.cond, &stack.mutex); + OPAL_THREAD_UNLOCK(&stack.mutex); + OBJ_DESTRUCT(&stack); + return stack.rc; +} + +#endif + + diff --git a/orte/mca/pls/bproc_seed/pls_bproc_seed.h b/orte/mca/pls/bproc_seed/pls_bproc_seed.h new file mode 100644 index 0000000000..7f8f507a16 --- /dev/null +++ b/orte/mca/pls/bproc_seed/pls_bproc_seed.h @@ -0,0 +1,76 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * + */ + +#ifndef ORTE_PLS_BPROC_SEED_H_ +#define ORTE_PLS_BPROC_SEED_H_ + +#include "ompi_config.h" +#include "mca/pls/base/base.h" +#include "opal/threads/condition.h" +#include + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + +/* + * Module open / close + */ +int orte_pls_bproc_seed_component_open(void); +int orte_pls_bproc_seed_component_close(void); + +/* + * Startup / Shutdown + */ +orte_pls_base_module_t* orte_pls_bproc_seed_init(int *priority); + +int orte_pls_bproc_seed_finalize(void); + +/* + * Interface + */ +int orte_pls_bproc_seed_launch(orte_jobid_t); +int orte_pls_bproc_seed_terminate_job(orte_jobid_t); +int orte_pls_bproc_seed_terminate_proc(const orte_process_name_t* proc_name); + + +/** + * PLS Component + */ +struct orte_pls_bproc_seed_component_t { + orte_pls_base_component_t super; + int debug; + int name_fd; + int priority; + int reap; + int terminate_sig; + size_t image_frag_size; + size_t num_children; + opal_mutex_t lock; + opal_condition_t condition; +}; +typedef struct orte_pls_bproc_seed_component_t orte_pls_bproc_seed_component_t; + +ORTE_DECLSPEC extern orte_pls_bproc_seed_component_t mca_pls_bproc_seed_component; +ORTE_DECLSPEC extern orte_pls_base_module_t orte_pls_bproc_seed_module; + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif /* MCA_PCM_BPROCx_H_ */ diff --git a/orte/mca/pls/bproc_seed/pls_bproc_seed_component.c b/orte/mca/pls/bproc_seed/pls_bproc_seed_component.c new file mode 100644 index 0000000000..7543f7eccb --- /dev/null +++ b/orte/mca/pls/bproc_seed/pls_bproc_seed_component.c @@ -0,0 +1,125 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "ompi_config.h" + +#include "include/orte_constants.h" +#include "include/types.h" +#include "opal/class/opal_list.h" +#include "util/proc_info.h" +#include "mca/mca.h" +#include "mca/base/mca_base_param.h" +#include "mca/pls/base/base.h" + +#include +#include "pls_bproc_seed.h" + +/* + * Struct of function pointers and all that to let us be initialized + */ +orte_pls_bproc_seed_component_t mca_pls_bproc_seed_component = { + { + { + ORTE_PLS_BASE_VERSION_1_0_0, + + "bproc_seed", /* MCA component name */ + ORTE_MAJOR_VERSION, /* MCA component major version */ + ORTE_MINOR_VERSION, /* MCA component minor version */ + ORTE_RELEASE_VERSION, /* MCA component release version */ + orte_pls_bproc_seed_component_open, /* component open */ + orte_pls_bproc_seed_component_close /* component close */ + }, + { + false /* checkpoint / restart */ + }, + orte_pls_bproc_seed_init /* component init */ + } +}; + + +int orte_pls_bproc_seed_component_open(void) +{ + int id; + mca_base_component_t *c = &mca_pls_bproc_seed_component.super.pls_version; + /* init globals */ + OBJ_CONSTRUCT(&mca_pls_bproc_seed_component.lock, opal_mutex_t); + OBJ_CONSTRUCT(&mca_pls_bproc_seed_component.condition, opal_condition_t); + mca_pls_bproc_seed_component.num_children = 0; + + /* init parameters */ + mca_base_param_reg_int(c, "debug", NULL, false, false, 0, + &mca_pls_bproc_seed_component.debug); + mca_base_param_reg_int(c, "reap", NULL, false, false, 1, + &mca_pls_bproc_seed_component.reap); + mca_base_param_reg_int(c, "image_frag_size", NULL, false, false, 1*1024*1024, + (int *)&mca_pls_bproc_seed_component.image_frag_size); + mca_base_param_reg_int(c, "priority", NULL, false, false, 75, + &mca_pls_bproc_seed_component.priority); + mca_base_param_reg_int(c, "terminate_sig", NULL, false, false, 9, + &mca_pls_bproc_seed_component.terminate_sig); + + id = mca_base_param_find("nds", "pipe", "fd"); + if(id > 0) { + mca_base_param_lookup_int(id, &mca_pls_bproc_seed_component.name_fd); + } else { + mca_pls_bproc_seed_component.name_fd = 3; + } + return ORTE_SUCCESS; +} + + +int orte_pls_bproc_seed_component_close(void) +{ + if(mca_pls_bproc_seed_component.reap) { + OPAL_THREAD_LOCK(&mca_pls_bproc_seed_component.lock); + while(mca_pls_bproc_seed_component.num_children > 0) { + opal_condition_wait(&mca_pls_bproc_seed_component.condition, + &mca_pls_bproc_seed_component.lock); + } + } + OPAL_THREAD_UNLOCK(&mca_pls_bproc_seed_component.lock); + return ORTE_SUCCESS; +} + + +orte_pls_base_module_t* orte_pls_bproc_seed_init( + int *priority) +{ + int ret; + struct bproc_version_t version; + + + /* are we the seed */ + if(orte_process_info.seed == false) + return NULL; + + /* okay, we are in a daemon - now check to see if BProc is running here */ + ret = bproc_version(&version); + if (ret != 0) { + return NULL; + } + + /* only launch from the master node */ + if (bproc_currnode() != BPROC_NODE_MASTER) { + return NULL; + } + + *priority = mca_pls_bproc_seed_component.priority; + return &orte_pls_bproc_seed_module; +} +