1
1
- code cleanups
- improved cleanup, but still needs work

This commit was SVN r6569.
Этот коммит содержится в:
Tim Prins 2005-07-20 20:39:06 +00:00
родитель daf3ee8172
Коммит 35041a0f01
2 изменённых файлов: 66 добавлений и 25 удалений

Просмотреть файл

@ -48,7 +48,6 @@
#include "orte/mca/soh/base/base.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/runtime.h"
#include "orte/util/session_dir.h"
#include "pls_bproc.h"
extern char **environ;
@ -74,6 +73,7 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
orte_vpid_t vpid_start,
orte_vpid_t vpid_range, size_t app_context);
static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data);
static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data);
/**
* creates an array that is indexed by the node number and each entry contains the
@ -83,7 +83,7 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data);
* @param node_array a pointer to put the node array into
* @param node_array_len returns the length of the array
* @retval >=0 the number of processes
* @retval <0 ompi err
* @retval <0 orte err
*/
static int orte_pls_bproc_node_array(orte_rmaps_base_map_t* map,
int ** node_array, int * node_array_len) {
@ -103,8 +103,8 @@ static int orte_pls_bproc_node_array(orte_rmaps_base_map_t* map,
/* build the node array */
*node_array = (int*)malloc(sizeof(int) * *node_array_len);
if(NULL == *node_array) {
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
return OMPI_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(*node_array, 0, sizeof(int) * *node_array_len);
@ -136,8 +136,8 @@ static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
*num_nodes = 0;
*node_list = (int*)malloc(sizeof(int) * node_array_len);
if(NULL == *node_list) {
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
return OMPI_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* build the node list */
@ -146,7 +146,7 @@ static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
(*node_list)[(*num_nodes)++] = node;
}
}
return OMPI_SUCCESS;
return ORTE_SUCCESS;
}
/**
@ -163,12 +163,11 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
/* ensure that system info is set */
orte_sys_info();
if (NULL == orte_system_info.user) { /* error condition */
return OMPI_ERROR;
return ORTE_ERROR;
}
if (NULL == orte_universe_info.name) { /* error condition */
return OMPI_ERROR;
return ORTE_ERROR;
}
rc = orte_ns_base_convert_jobid_to_string(&job, jobid);
@ -182,15 +181,15 @@ static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
orte_system_info.user, orte_system_info.path_sep,
orte_universe_info.name, orte_system_info.path_sep, job,
(int) app_context, orte_system_info.path_sep, node_rank)) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto cleanup;
}
for(i = 0; i < 3; i++) {
if(0 > asprintf(&path, "%s%s%d", frontend, orte_system_info.path_sep, i)) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto cleanup;
}
if (mca_pls_bproc_component.debug) {
@ -254,7 +253,6 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
size_t i;
OBJ_CONSTRUCT(&ack, orte_buffer_t);
rc = orte_dps.pack(&ack, &i, 1, ORTE_BYTE);
if(rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
@ -278,6 +276,33 @@ static void orte_pls_bproc_waitpid_cb(pid_t wpid, int status, void *data) {
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
}
/**
* Callback for orte_wait_cb for the daemons. If a daemon unexpectedly dies
* before we are done launching, we abort the job. */
static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data) {
orte_process_name_t * proc = (orte_process_name_t*) data;
int rc;
/* set the state of this process */
if(!mca_pls_bproc_component.done_launching) {
/* if a daemon exits before we are done launching the user apps we send a
* message to ourself so we will break out of the recieve loop and exit */
int32_t src = -1;
orte_buffer_t ack;
OBJ_CONSTRUCT(&ack, orte_buffer_t);
orte_dps.pack(&ack, &src, 1, ORTE_INT32);
mca_oob_send_packed(MCA_OOB_NAME_SELF, &ack, MCA_OOB_TAG_BPROC, 0);
rc = orte_soh.set_proc_soh(proc, ORTE_PROC_STATE_ABORTED, status);
} else if (WIFEXITED(status)) {
rc = orte_soh.set_proc_soh(proc, ORTE_PROC_STATE_TERMINATED, status);
} else {
rc = orte_soh.set_proc_soh(proc, ORTE_PROC_STATE_ABORTED, status);
}
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
}
/**
* 1. Launch the deamons on the backend nodes.
* 2. The daemons setup files for io forwarding then connect back to us to
@ -298,6 +323,7 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
int rc, i, j;
int * pids = NULL;
int argc;
int32_t src;
char ** argv = NULL;
char * var, * param;
char * orted_path;
@ -329,7 +355,7 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
goto cleanup;
}
if(NULL == (pids = (int*)malloc(sizeof(int) * num_daemons))) {
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto cleanup;
}
@ -505,7 +531,17 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_pls_base_set_proc_pid(proc_name, pids[i]);
if (0 > asprintf(&var, "%d", node_list[i])) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto cleanup;
}
rc = orte_pls_base_set_node_pid(cellid, var, daemon_jobid, pids[i]);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_wait_cb(pids[i], orte_pls_bproc_waitpid_daemon_cb,proc_name);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
@ -526,6 +562,14 @@ static int orte_pls_bproc_launch_app(orte_jobid_t jobid,
ORTE_ERROR_LOG(rc);
goto cleanup;
}
idx = 1;
orte_dps.unpack(&ack, &src, &idx, ORTE_INT32);
if(-1 == src) {
opal_output(0, "pls_bproc: daemon exited unexpectedly\n");
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* launch the processes */
@ -691,7 +735,6 @@ int orte_pls_bproc_terminate_job(orte_jobid_t jobid) {
pid_t* pids;
size_t i, num_pids;
int rc;
/* kill application process */
if(ORTE_SUCCESS != (rc = orte_pls_base_get_proc_pids(jobid, &pids, &num_pids)))
return rc;
@ -703,7 +746,6 @@ int orte_pls_bproc_terminate_job(orte_jobid_t jobid) {
}
if(NULL != pids)
free(pids);
/* kill daemons */
if(ORTE_SUCCESS != (rc = orte_pls_base_get_node_pids(jobid, &pids, &num_pids)))
return rc;

Просмотреть файл

@ -40,6 +40,7 @@
#include "orte/mca/oob/base/base.h"
#include "orte/mca/pls/base/base.h"
#include "orte/mca/rmaps/base/rmaps_base_map.h"
#include "orte/util/session_dir.h"
#include "orte/util/univ_info.h"
#include "pls_bproc_orted.h"
@ -480,16 +481,13 @@ cleanup:
return rc;
}
/**
* Query for all processes allocated to the job and terminate
* those on the current node.
*/
int orte_pls_bproc_orted_terminate_job(orte_jobid_t jobid) {
orte_iof.iof_flush();
return ORTE_SUCCESS;
}
int orte_pls_bproc_orted_terminate_proc(const orte_process_name_t* proc) {
orte_iof.iof_flush();
return ORTE_SUCCESS;
}
@ -497,9 +495,10 @@ int orte_pls_bproc_orted_finalize(void) {
OPAL_THREAD_LOCK(&mca_pls_bproc_orted_component.lock);
opal_condition_wait(&mca_pls_bproc_orted_component.condition,
&mca_pls_bproc_orted_component.lock);
OPAL_THREAD_UNLOCK(&mca_pls_bproc_orted_component.lock);
orte_iof.iof_flush();
pls_bproc_orted_remove_dir();
orte_session_dir_finalize(orte_process_info.my_name);
OPAL_THREAD_UNLOCK(&mca_pls_bproc_orted_component.lock);
return ORTE_SUCCESS;
}