- properly kill off daemons
- code cleanup This commit was SVN r6601.
Этот коммит содержится в:
родитель
e33a8205e8
Коммит
6aceaf81b7
@ -32,14 +32,11 @@
|
|||||||
#include "opal/util/opal_environ.h"
|
#include "opal/util/opal_environ.h"
|
||||||
#include "opal/util/path.h"
|
#include "opal/util/path.h"
|
||||||
#include "opal/util/sys_info.h"
|
#include "opal/util/sys_info.h"
|
||||||
#include "orte/class/orte_pointer_array.h"
|
|
||||||
#include "orte/util/proc_info.h"
|
|
||||||
#include "orte/mca/errmgr/errmgr.h"
|
#include "orte/mca/errmgr/errmgr.h"
|
||||||
#include "orte/mca/iof/iof.h"
|
#include "orte/mca/iof/iof.h"
|
||||||
#include "orte/mca/ns/base/base.h"
|
#include "orte/mca/ns/base/base.h"
|
||||||
#include "orte/mca/ns/base/ns_base_nds.h"
|
#include "orte/mca/ns/base/ns_base_nds.h"
|
||||||
#include "orte/mca/oob/base/base.h"
|
#include "orte/mca/oob/base/base.h"
|
||||||
#include "orte/mca/pls/base/base.h"
|
|
||||||
#include "orte/mca/ras/base/base.h"
|
#include "orte/mca/ras/base/base.h"
|
||||||
#include "orte/mca/rmgr/base/base.h"
|
#include "orte/mca/rmgr/base/base.h"
|
||||||
#include "orte/mca/rmaps/base/base.h"
|
#include "orte/mca/rmaps/base/base.h"
|
||||||
@ -59,8 +56,6 @@ orte_pls_base_module_t orte_pls_bproc_module = {
|
|||||||
orte_pls_bproc_finalize
|
orte_pls_bproc_finalize
|
||||||
};
|
};
|
||||||
|
|
||||||
static orte_pointer_array_t * orte_pls_bproc_daemon_names;
|
|
||||||
static size_t orte_pls_bproc_num_daemons = 0;
|
|
||||||
static int orte_pls_bproc_node_array(orte_rmaps_base_map_t* map,
|
static int orte_pls_bproc_node_array(orte_rmaps_base_map_t* map,
|
||||||
int ** node_array, int * node_array_len);
|
int ** node_array, int * node_array_len);
|
||||||
static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
|
static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
|
||||||
@ -68,7 +63,7 @@ static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
|
|||||||
int num_procs);
|
int num_procs);
|
||||||
static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
|
static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
|
||||||
int node_rank, size_t app_context);
|
int node_rank, size_t app_context);
|
||||||
static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
|
static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
|
||||||
orte_rmaps_base_map_t* map,
|
orte_rmaps_base_map_t* map,
|
||||||
orte_vpid_t vpid_start,
|
orte_vpid_t vpid_start,
|
||||||
orte_vpid_t vpid_range, size_t app_context);
|
orte_vpid_t vpid_range, size_t app_context);
|
||||||
@ -229,7 +224,6 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
|
|||||||
static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
|
static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
|
||||||
orte_process_name_t * proc = (orte_process_name_t*) data;
|
orte_process_name_t * proc = (orte_process_name_t*) data;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
/* set the state of this process */
|
/* set the state of this process */
|
||||||
if(WIFEXITED(status)) {
|
if(WIFEXITED(status)) {
|
||||||
rc = orte_soh.set_proc_soh(proc, ORTE_PROC_STATE_TERMINATED, status);
|
rc = orte_soh.set_proc_soh(proc, ORTE_PROC_STATE_TERMINATED, status);
|
||||||
@ -246,7 +240,8 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
|
|||||||
if(0 < mca_pls_bproc_component.debug) {
|
if(0 < mca_pls_bproc_component.debug) {
|
||||||
opal_output(0, "in orte_pls_bproc_waitpid_cb, %d processes left\n",
|
opal_output(0, "in orte_pls_bproc_waitpid_cb, %d processes left\n",
|
||||||
mca_pls_bproc_component.num_procs);
|
mca_pls_bproc_component.num_procs);
|
||||||
} else if(0 == mca_pls_bproc_component.num_procs &&
|
}
|
||||||
|
if(0 == mca_pls_bproc_component.num_procs &&
|
||||||
mca_pls_bproc_component.done_launching) {
|
mca_pls_bproc_component.done_launching) {
|
||||||
orte_buffer_t ack;
|
orte_buffer_t ack;
|
||||||
orte_process_name_t * proc;
|
orte_process_name_t * proc;
|
||||||
@ -258,8 +253,8 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
|
|||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(i = 0; i < orte_pls_bproc_num_daemons; i++) {
|
for(i = 0; i < mca_pls_bproc_component.num_daemons; i++) {
|
||||||
proc = orte_pointer_array_get_item(orte_pls_bproc_daemon_names, i);
|
proc = orte_pointer_array_get_item(mca_pls_bproc_component.daemon_names, i);
|
||||||
if(NULL == proc) {
|
if(NULL == proc) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
continue;
|
continue;
|
||||||
@ -271,8 +266,8 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
|
|||||||
free(proc);
|
free(proc);
|
||||||
}
|
}
|
||||||
OBJ_DESTRUCT(&ack);
|
OBJ_DESTRUCT(&ack);
|
||||||
OBJ_RELEASE(orte_pls_bproc_daemon_names);
|
OBJ_RELEASE(mca_pls_bproc_component.daemon_names);
|
||||||
while(0 < orte_pls_bproc_num_daemons) {
|
while(0 < mca_pls_bproc_component.num_daemons) {
|
||||||
opal_condition_wait(&mca_pls_bproc_component.condition,
|
opal_condition_wait(&mca_pls_bproc_component.condition,
|
||||||
&mca_pls_bproc_component.lock);
|
&mca_pls_bproc_component.lock);
|
||||||
}
|
}
|
||||||
@ -298,11 +293,15 @@ static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
OPAL_THREAD_LOCK(&mca_pls_bproc_component.lock);
|
OPAL_THREAD_LOCK(&mca_pls_bproc_component.lock);
|
||||||
if(0 < orte_pls_bproc_num_daemons) {
|
if(0 < mca_pls_bproc_component.num_daemons) {
|
||||||
orte_pls_bproc_num_daemons--;
|
mca_pls_bproc_component.num_daemons--;
|
||||||
}
|
}
|
||||||
opal_condition_signal(&mca_pls_bproc_component.condition);
|
opal_condition_signal(&mca_pls_bproc_component.condition);
|
||||||
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
|
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
|
||||||
|
if(0 < mca_pls_bproc_component.debug) {
|
||||||
|
opal_output(0, "in orte_pls_bproc_waitpid_daemon_cb, %d daemons left\n",
|
||||||
|
mca_pls_bproc_component.num_daemons);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -311,7 +310,7 @@ static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data)
|
|||||||
* tells us they are ready for the actual apps.
|
* tells us they are ready for the actual apps.
|
||||||
* 3. Launch the apps on the backend nodes
|
* 3. Launch the apps on the backend nodes
|
||||||
*/
|
*/
|
||||||
static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
|
static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
|
||||||
orte_rmaps_base_map_t* map,
|
orte_rmaps_base_map_t* map,
|
||||||
orte_vpid_t vpid_start,
|
orte_vpid_t vpid_start,
|
||||||
orte_vpid_t vpid_range, size_t app_context) {
|
orte_vpid_t vpid_range, size_t app_context) {
|
||||||
@ -330,7 +329,6 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
|
|||||||
char * orted_path;
|
char * orted_path;
|
||||||
orte_buffer_t ack;
|
orte_buffer_t ack;
|
||||||
orte_jobid_t daemon_jobid;
|
orte_jobid_t daemon_jobid;
|
||||||
orte_cellid_t cellid;
|
|
||||||
orte_process_name_t * proc_name;
|
orte_process_name_t * proc_name;
|
||||||
orte_vpid_t daemon_vpid_start = 0;
|
orte_vpid_t daemon_vpid_start = 0;
|
||||||
orte_vpid_t global_vpid_start = vpid_start;
|
orte_vpid_t global_vpid_start = vpid_start;
|
||||||
@ -446,11 +444,6 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
|
|||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc = orte_ns_base_get_cellid(&cellid, orte_process_info.my_name);
|
|
||||||
if(ORTE_SUCCESS != rc) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
/* set up the environment so the daemons can get their names once launched */
|
/* set up the environment so the daemons can get their names once launched */
|
||||||
rc = orte_ns_nds_bproc_put(cellid, daemon_jobid, daemon_vpid_start,
|
rc = orte_ns_nds_bproc_put(cellid, daemon_jobid, daemon_vpid_start,
|
||||||
global_vpid_start, num_processes, &map->app->env);
|
global_vpid_start, num_processes, &map->app->env);
|
||||||
@ -525,7 +518,7 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
|
|||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
rc = orte_pointer_array_add(&idx, orte_pls_bproc_daemon_names,
|
rc = orte_pointer_array_add(&idx, mca_pls_bproc_component.daemon_names,
|
||||||
proc_name);
|
proc_name);
|
||||||
if(ORTE_SUCCESS != rc) {
|
if(ORTE_SUCCESS != rc) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
@ -548,7 +541,7 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
orte_pls_bproc_num_daemons += num_daemons;
|
mca_pls_bproc_component.num_daemons += num_daemons;
|
||||||
|
|
||||||
if(0 < mca_pls_bproc_component.debug) {
|
if(0 < mca_pls_bproc_component.debug) {
|
||||||
opal_output(0, "PLS_BPROC DEBUG: %d daemons launched. First pid: %d\n",
|
opal_output(0, "PLS_BPROC DEBUG: %d daemons launched. First pid: %d\n",
|
||||||
@ -679,6 +672,7 @@ cleanup:
|
|||||||
int orte_pls_bproc_launch(orte_jobid_t jobid) {
|
int orte_pls_bproc_launch(orte_jobid_t jobid) {
|
||||||
opal_list_item_t* item;
|
opal_list_item_t* item;
|
||||||
opal_list_t mapping;
|
opal_list_t mapping;
|
||||||
|
orte_cellid_t cellid;
|
||||||
orte_vpid_t vpid_start;
|
orte_vpid_t vpid_start;
|
||||||
orte_vpid_t vpid_range;
|
orte_vpid_t vpid_range;
|
||||||
int rc;
|
int rc;
|
||||||
@ -696,8 +690,15 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* init the list to hold the daemon names */
|
/* init the list to hold the daemon names */
|
||||||
if(ORTE_SUCCESS != (rc = orte_pointer_array_init(&orte_pls_bproc_daemon_names,
|
rc = orte_pointer_array_init(&mca_pls_bproc_component.daemon_names, 8, 200000, 8);
|
||||||
8, 200000, 8))) {
|
if(ORTE_SUCCESS != rc) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* get the cellid */
|
||||||
|
rc = orte_ns_base_get_cellid(&cellid, orte_process_info.my_name);
|
||||||
|
if(ORTE_SUCCESS != rc) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
@ -711,7 +712,7 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
|
|||||||
item != opal_list_get_end(&mapping);
|
item != opal_list_get_end(&mapping);
|
||||||
item = opal_list_get_next(item)) {
|
item = opal_list_get_next(item)) {
|
||||||
orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)item;
|
orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)item;
|
||||||
rc = orte_pls_bproc_launch_app(jobid, map, vpid_start, vpid_range,
|
rc = orte_pls_bproc_launch_app(cellid, jobid, map, vpid_start, vpid_range,
|
||||||
map->app->idx);
|
map->app->idx);
|
||||||
if(rc != ORTE_SUCCESS) {
|
if(rc != ORTE_SUCCESS) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
|
@ -36,7 +36,10 @@
|
|||||||
#define ORTE_PLS_BPROC_H_
|
#define ORTE_PLS_BPROC_H_
|
||||||
|
|
||||||
#include "orte_config.h"
|
#include "orte_config.h"
|
||||||
|
#include "orte/class/orte_pointer_array.h"
|
||||||
|
#include "orte/include/orte_constants.h"
|
||||||
#include "orte/mca/pls/base/base.h"
|
#include "orte/mca/pls/base/base.h"
|
||||||
|
#include "orte/util/proc_info.h"
|
||||||
#include "opal/threads/condition.h"
|
#include "opal/threads/condition.h"
|
||||||
#include <sys/bproc.h>
|
#include <sys/bproc.h>
|
||||||
|
|
||||||
@ -72,10 +75,12 @@ struct orte_pls_bproc_component_t {
|
|||||||
char * orted;
|
char * orted;
|
||||||
int debug;
|
int debug;
|
||||||
int num_procs;
|
int num_procs;
|
||||||
|
size_t num_daemons;
|
||||||
int priority;
|
int priority;
|
||||||
int terminate_sig;
|
int terminate_sig;
|
||||||
opal_mutex_t lock;
|
opal_mutex_t lock;
|
||||||
opal_condition_t condition;
|
opal_condition_t condition;
|
||||||
|
orte_pointer_array_t * daemon_names;
|
||||||
};
|
};
|
||||||
typedef struct orte_pls_bproc_component_t orte_pls_bproc_component_t;
|
typedef struct orte_pls_bproc_component_t orte_pls_bproc_component_t;
|
||||||
|
|
||||||
|
@ -17,15 +17,8 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "orte_config.h"
|
#include "orte_config.h"
|
||||||
#include <sys/bproc.h>
|
|
||||||
|
|
||||||
#include "opal/class/opal_list.h"
|
|
||||||
#include "opal/mca/mca.h"
|
#include "opal/mca/mca.h"
|
||||||
#include "opal/mca/base/mca_base_param.h"
|
#include "opal/mca/base/mca_base_param.h"
|
||||||
#include "orte/include/orte_constants.h"
|
|
||||||
#include "orte/util/proc_info.h"
|
|
||||||
#include "orte/mca/pls/base/base.h"
|
|
||||||
|
|
||||||
#include "pls_bproc.h"
|
#include "pls_bproc.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -35,7 +28,6 @@ orte_pls_bproc_component_t mca_pls_bproc_component = {
|
|||||||
{
|
{
|
||||||
{
|
{
|
||||||
ORTE_PLS_BASE_VERSION_1_0_0,
|
ORTE_PLS_BASE_VERSION_1_0_0,
|
||||||
|
|
||||||
"bproc", /* MCA component name */
|
"bproc", /* MCA component name */
|
||||||
ORTE_MAJOR_VERSION, /* MCA component major version */
|
ORTE_MAJOR_VERSION, /* MCA component major version */
|
||||||
ORTE_MINOR_VERSION, /* MCA component minor version */
|
ORTE_MINOR_VERSION, /* MCA component minor version */
|
||||||
@ -82,10 +74,10 @@ int orte_pls_bproc_component_open(void) {
|
|||||||
orte_pls_bproc_param_register_int("terminate_sig", 9);
|
orte_pls_bproc_param_register_int("terminate_sig", 9);
|
||||||
|
|
||||||
mca_pls_bproc_component.num_procs = 0;
|
mca_pls_bproc_component.num_procs = 0;
|
||||||
|
mca_pls_bproc_component.num_daemons = 0;
|
||||||
mca_pls_bproc_component.done_launching = false;
|
mca_pls_bproc_component.done_launching = false;
|
||||||
OBJ_CONSTRUCT(&mca_pls_bproc_component.lock, opal_mutex_t);
|
OBJ_CONSTRUCT(&mca_pls_bproc_component.lock, opal_mutex_t);
|
||||||
OBJ_CONSTRUCT(&mca_pls_bproc_component.condition, opal_condition_t);
|
OBJ_CONSTRUCT(&mca_pls_bproc_component.condition, opal_condition_t);
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user