1
1

Continue beating on comm_spawn. Setup to debug bproc.

This commit was SVN r11932.
Этот коммит содержится в:
Ralph Castain 2006-10-02 14:58:22 +00:00
родитель 65593cd67e
Коммит 559b9b0ae8
10 изменённых файлов: 77 добавлений и 16 удалений

Просмотреть файл

@ -41,8 +41,8 @@ typedef uint8_t orte_daemon_cmd_flag_t;
#define ORTE_DAEMON_KILL_LOCAL_PROCS (orte_daemon_cmd_flag_t) 4
#define ORTE_DAEMON_SIGNAL_LOCAL_PROCS (orte_daemon_cmd_flag_t) 5
#define ORTE_DAEMON_ADD_LOCAL_PROCS (orte_daemon_cmd_flag_t) 6
#define ORTE_DAEMON_HEARTBEAT_CMD (orte_daemon_cmd_flag_t) 254
#define ORTE_DAEMON_EXIT_CMD (orte_daemon_cmd_flag_t) 255
#define ORTE_DAEMON_HEARTBEAT_CMD (orte_daemon_cmd_flag_t) 7
#define ORTE_DAEMON_EXIT_CMD (orte_daemon_cmd_flag_t) 8
#if defined(c_plusplus) || defined(__cplusplus)

Просмотреть файл

@ -43,6 +43,8 @@ int orte_pls_base_orted_exit(opal_list_t *daemons)
OBJ_CONSTRUCT(&cmd, orte_buffer_t);
opal_output(0, "pls_base_orted_exit: called with %ld daemons", (long)opal_list_get_size(daemons));
/* pack the command */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
@ -55,6 +57,8 @@ int orte_pls_base_orted_exit(opal_list_t *daemons)
item = opal_list_get_next(item)) {
dmn = (orte_pls_daemon_info_t*)item;
opal_output(0, "pls_base_orted_exit: sending cmd to [%ld,%ld,%ld]", ORTE_NAME_ARGS(dmn->name));
if (0 > orte_rml.send_buffer(dmn->name, &cmd, ORTE_RML_TAG_PLS_ORTED, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&cmd);
@ -104,6 +108,8 @@ int orte_pls_base_orted_kill_local_procs(opal_list_t *daemons, orte_jobid_t job)
item = opal_list_get_next(item)) {
dmn = (orte_pls_daemon_info_t*)item;
opal_output(0, "pls_base_orted_kill_local: sending cmd to [%ld,%ld,%ld]", ORTE_NAME_ARGS(dmn->name));
if (0 > orte_rml.send_buffer(dmn->name, &cmd, ORTE_RML_TAG_PLS_ORTED, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&cmd);

Просмотреть файл

@ -125,11 +125,13 @@ void orte_pls_base_recv(int status, orte_process_name_t* sender,
break;
case ORTE_PLS_TERMINATE_JOB_CMD:
opal_output(0, "pls_base_recv: terminate job");
count = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &job, &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto SEND_ANSWER;
}
opal_output(0, "pls_base_recv: terminate job with jobid %ld", (long)job);
if (ORTE_SUCCESS != (rc = orte_pls.terminate_job(job))) {
ORTE_ERROR_LOG(rc);
@ -143,6 +145,8 @@ void orte_pls_base_recv(int status, orte_process_name_t* sender,
goto SEND_ANSWER;
}
opal_output(0, "pls_base_recv: terminate orteds with jobid %ld", (long)job);
if (ORTE_SUCCESS != (rc = orte_pls.terminate_orteds(job))) {
ORTE_ERROR_LOG(rc);
}

Просмотреть файл

@ -50,6 +50,7 @@
#include "opal/util/path.h"
#include "opal/util/os_path.h"
#include "opal/util/show_help.h"
#include "opal/util/trace.h"
#include "orte/dss/dss.h"
#include "orte/util/sys_info.h"
@ -149,6 +150,8 @@ static int orte_pls_bproc_node_array(orte_rmaps_base_map_t* map,
int num_procs = 0;
int num_on_node;
OPAL_TRACE(1);
*node_array_len = 0;
for(item = opal_list_get_first(&map->nodes);
item != opal_list_get_end(&map->nodes);
@ -191,6 +194,9 @@ static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
int ** node_list, int * num_nodes,
int num_procs) {
int node;
OPAL_TRACE(1);
*num_nodes = 0;
*node_list = (int*)malloc(sizeof(int) * node_array_len);
if(NULL == *node_list) {
@ -222,6 +228,8 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
char *frontend = NULL, *path = NULL, *job = NULL;
int rc, i;
OPAL_TRACE(1);
/* ensure that system info is set */
orte_sys_info();
if (NULL == orte_system_info.user) { /* error condition */
@ -296,6 +304,8 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
orte_process_name_t * proc = (orte_process_name_t*) data;
int rc;
OPAL_TRACE(1);
/* set the state of this process */
if(WIFEXITED(status)) {
rc = orte_smr.set_proc_state(proc, ORTE_PROC_STATE_TERMINATED, status);
@ -316,6 +326,9 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
* @param data a pointer to the node the daemon was on
*/
static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data) {
OPAL_TRACE(1);
if(!mca_pls_bproc_component.done_launching) {
/* if a daemon exits before we are done launching the user apps we send a
* message to ourself so we will break out of the receive loop and exit */
@ -359,6 +372,9 @@ static int bproc_vexecmove_io(int nnodes, int *nodes, int *pids,
char * const argv[], char * envp[]) {
int i;
char * rank;
OPAL_TRACE(1);
for(i = 0; i < nnodes; i++) {
pids[i] = fork();
if(0 == pids[i]) {
@ -406,6 +422,8 @@ static void orte_pls_bproc_setup_env(char *** env)
int rc;
int num_env;
OPAL_TRACE(1);
num_env = opal_argv_count(*env);
/* append mca parameters to our environment */
if(ORTE_SUCCESS != (rc = mca_base_param_build_env(env, &num_env, false))) {
@ -499,6 +517,8 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
orte_pls_daemon_info_t *dmn;
opal_list_item_t *item;
OPAL_TRACE(1);
/* setup a list that will contain the info for all the daemons
* so we can store it on the registry when done
*/
@ -708,8 +728,8 @@ orte_pls_bproc_check_node_state(orte_gpr_notify_data_t *notify_data,
char *dead_node_name;
orte_std_cntr_t i, j;
printf("inside check node state... \n");
OPAL_TRACE(1);
/* first see if node is in
ORTE_NODE_STATE_DOWN or
ORTE_NODE_STATE_REBOOT */
@ -805,6 +825,9 @@ static int
orte_pls_bproc_monitor_nodes(void)
{
orte_gpr_subscription_id_t id;
OPAL_TRACE(1);
return orte_gpr.subscribe_1(&id,
NULL,
NULL,
@ -848,6 +871,8 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
struct bproc_io_t bproc_io[3];
orte_rmaps_base_node_t *node;
OPAL_TRACE(1);
if(NULL == (pids = (int*)malloc(sizeof(int) * node_array_len))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
@ -1049,6 +1074,8 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
orte_std_cntr_t idx;
char cwd_save[OMPI_PATH_MAX + 1];
OPAL_TRACE(1);
/* make sure the pls_bproc receive function has been started */
if (ORTE_SUCCESS != (rc = orte_pls_bproc_comm_start())) {
ORTE_ERROR_LOG(rc);
@ -1256,6 +1283,8 @@ int orte_pls_bproc_terminate_job(orte_jobid_t jobid) {
orte_std_cntr_t i, num_pids;
int rc;
OPAL_TRACE(1);
if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "orte_pls_bproc: terminating job %ld", jobid);
}
@ -1285,6 +1314,8 @@ int orte_pls_bproc_terminate_orteds(orte_jobid_t jobid)
opal_list_t daemons;
opal_list_item_t *item;
OPAL_TRACE(1);
/* construct the list of active daemons on this job */
OBJ_CONSTRUCT(&daemons, opal_list_t);
if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid))) {
@ -1311,6 +1342,9 @@ CLEANUP:
int orte_pls_bproc_terminate_proc(const orte_process_name_t* proc_name) {
int rc;
pid_t pid;
OPAL_TRACE(1);
if(ORTE_SUCCESS != (rc = orte_pls_bproc_get_proc_pid(proc_name, &pid)))
return rc;
if(kill(pid, mca_pls_bproc_component.terminate_sig) != 0) {
@ -1336,6 +1370,8 @@ int orte_pls_bproc_signal_job(orte_jobid_t jobid, int32_t signal) {
orte_std_cntr_t i, num_pids;
int rc;
OPAL_TRACE(1);
/* signal application process */
if(ORTE_SUCCESS != (rc = orte_pls_bproc_get_proc_pids(jobid, &pids, &num_pids)))
return rc;
@ -1359,6 +1395,8 @@ int orte_pls_bproc_signal_proc(const orte_process_name_t* proc_name, int32_t sig
int rc;
pid_t pid;
OPAL_TRACE(1);
if(ORTE_SUCCESS != (rc = orte_pls_bproc_get_proc_pid(proc_name, &pid)))
return rc;
if(kill(pid, (int)signal) != 0) {

Просмотреть файл

@ -26,6 +26,8 @@
#include "orte/orte_constants.h"
#include "orte/orte_types.h"
#include "opal/util/output.h"
#include "orte/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
@ -133,6 +135,8 @@ int orte_pls_proxy_terminate_job(orte_jobid_t job)
return rc;
}
opal_output(0, "pls_proxy_terminate_job: sending for job %ld", (long)job);
if (0 > orte_rml.send_buffer(orte_pls_proxy_replica, cmd, ORTE_RML_TAG_PLS, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(cmd);
@ -177,6 +181,8 @@ int orte_pls_proxy_terminate_orteds(orte_jobid_t job)
orte_std_cntr_t count;
int rc;
opal_output(0, "pls_proxy_terminate_orteds: sending for job %ld", (long)job);
command = ORTE_PLS_TERMINATE_ORTEDS_CMD;
cmd = OBJ_NEW(orte_buffer_t);

Просмотреть файл

@ -1081,6 +1081,8 @@ int orte_pls_rsh_terminate_job(orte_jobid_t jobid)
goto CLEANUP;
}
opal_output(0, "pls_rsh_terminate_job: called for job %ld with %ld daemons", jobid, (long)opal_list_get_size(&daemons));
/* order them to kill their local procs for this job */
if (ORTE_SUCCESS != (rc = orte_pls_base_orted_kill_local_procs(&daemons, jobid))) {
ORTE_ERROR_LOG(rc);
@ -1111,6 +1113,8 @@ int orte_pls_rsh_terminate_orteds(orte_jobid_t jobid)
goto CLEANUP;
}
opal_output(0, "pls_rsh_terminate_orteds: called for job %ld with %ld daemons", jobid, (long)opal_list_get_size(&daemons));
/* now tell them to die! */
if (ORTE_SUCCESS != (rc = orte_pls_base_orted_exit(&daemons))) {
ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -287,7 +287,7 @@ static int orte_rmgr_proxy_spawn_job(
orte_proc_state_t cb_conditions)
{
int rc;
orte_process_name_t* name;
orte_process_name_t name = {0, ORTE_JOBID_INVALID, 0};
OPAL_TRACE(1);
@ -320,15 +320,13 @@ static int orte_rmgr_proxy_spawn_job(
* setup I/O forwarding
*/
if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, *jobid, 0))) {
name.jobid = *jobid;
if (ORTE_SUCCESS != (rc = orte_iof.iof_pull(&name, ORTE_NS_CMP_JOBID, ORTE_IOF_STDOUT, 1))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_iof.iof_pull(name, ORTE_NS_CMP_JOBID, ORTE_IOF_STDOUT, 1))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_iof.iof_pull(name, ORTE_NS_CMP_JOBID, ORTE_IOF_STDERR, 2))) {
if (ORTE_SUCCESS != (rc = orte_iof.iof_pull(&name, ORTE_NS_CMP_JOBID, ORTE_IOF_STDERR, 2))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -372,7 +370,6 @@ static int orte_rmgr_proxy_spawn_job(
return rc;
}
free(&name);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -68,6 +68,7 @@ typedef uint32_t orte_rml_tag_t;
* size for the receive and return the allocated buffer and size in the first
* element of the iovec array. */
#define ORTE_RML_PERSISTENT 0x08 /**< posted non-blocking recv is persistent */
#define ORTE_RML_NON_PERSISTENT 0x00
/**
* The wildcard for receives from any peer.

Просмотреть файл

@ -399,12 +399,12 @@ int main(int argc, char *argv[])
OBJ_CONSTRUCT(&orted_globals.condition, opal_condition_t);
/* register the daemon main receive functions */
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_PLS_ORTED, 0, orte_daemon_recv_pls, NULL);
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_PLS_ORTED, ORTE_RML_NON_PERSISTENT, orte_daemon_recv_pls, NULL);
if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(ret);
return ret;
}
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_DAEMON, 0, orte_daemon_recv, NULL);
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_DAEMON, ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL);
if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(ret);
return ret;
@ -635,6 +635,8 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender,
* we should kill all local procs. Otherwise, only kill those within
* the specified jobid
*/
opal_output(0, "orted_daemon_recv_pls: kill_local_procs");
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &job, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
@ -685,6 +687,8 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender,
/**** EXIT COMMAND ****/
case ORTE_DAEMON_EXIT_CMD:
opal_output(0, "orted_daemon_recv_pls: exit");
/* send the response before we wakeup because otherwise
* we'll depart before it gets out!
*/
@ -711,7 +715,7 @@ DONE:
OPAL_THREAD_UNLOCK(&orted_globals.mutex);
/* reissue the non-blocking receive */
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_PLS_ORTED, 0, orte_daemon_recv_pls, NULL);
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_PLS_ORTED, ORTE_RML_NON_PERSISTENT, orte_daemon_recv_pls, NULL);
if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(ret);
}
@ -800,7 +804,7 @@ DONE:
OPAL_THREAD_UNLOCK(&orted_globals.mutex);
/* reissue the non-blocking receive */
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_DAEMON, 0, orte_daemon_recv, NULL);
ret = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_DAEMON, ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL);
if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(ret);
}

Просмотреть файл

@ -756,6 +756,7 @@ static void abort_signal_callback(int fd, short flags, void *arg)
* it can kill all the orteds
*/
if (jobid != ORTE_JOBID_INVALID) {
fprintf(stderr, "terminating job %ld", (long)jobid);
ret = orte_pls.terminate_job(jobid);
if (ORTE_SUCCESS != ret) {
jobid = ORTE_JOBID_INVALID;