1
1
openmpi/orte/mca/pls/base/pls_base_proxy.c

419 строки
11 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#include "opal/util/output.h"
#include "opal/util/argv.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/mca/pls/base/base.h"
#include "orte/orte_constants.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/pls/pls.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ras/base/ras_base_node.h"
#include "orte/mca/rmgr/base/base.h"
int
orte_pls_base_proxy_set_node_name(orte_ras_node_t* node,
orte_jobid_t jobid,
orte_process_name_t* name)
{
orte_gpr_value_t* values[1];
char* jobid_string, *key;
int rc;
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&values[0],
ORTE_GPR_OVERWRITE,
ORTE_NODE_SEGMENT,
1, 0))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
Add ability to trap and propagate SIGUSR1/2 to remote processes. There are a number of small changes that hit a bunch of files: 1. Changed the RMGR and PLS APIs to add "signal_job" and "signal_proc" entry points. Only the "signal_job" entries are implemented - none of the components have implementations for "signal_proc" at this time. Thus, you can signal all of the procs in a job, but cannot currently signal only one specific proc. 2. Implemented those new API functions in all components except xgrid (Brian will do so very soon). Only the rsh/ssh and fork modules have been tested, however, and only under OS-X. 3. Added signal traps and callback functions for SIGUSR1/2 to orterun/mpirun that catch those signals and call the appropriate commands to propagate them out to all processes in the job. 4. Added a new test directory under the orte branch to (eventually) hold unit and system level tests for just the run-time. Since our test branch of the repository is under restricted access, people working on the RTE were continually developing their own system-level tests - thus making it hard to help diagnose problems. I have moved the more commonly-used functions here, and added one specifically for testing the SIGUSR1/2 functionality. I will be contacting people directly to seek help with testing the changes on more environments. Other than compile issues, you should see absolutely no change in behavior on any of your systems - this additional functionality is transparent to anyone who does not issue a SIGUSR1/2 to mpirun. Ralph This commit was SVN r10258.
2006-06-08 18:27:17 +00:00
if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(values[0]);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&(values[0]->tokens), &(values[0]->num_tokens),
node->node_cellid, node->node_name))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(values[0]);
free(jobid_string);
return rc;
}
asprintf(&key, "%s-%s", ORTE_NODE_BOOTPROXY_KEY, jobid_string);
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[0]->keyvals[0]), key, ORTE_NAME, name))) {
ORTE_ERROR_LOG(rc);
free(jobid_string);
free(key);
OBJ_RELEASE(values[0]);
return rc;
}
rc = orte_gpr.put(1, values);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(values[0]);
free(jobid_string);
free(key);
Add ability to trap and propagate SIGUSR1/2 to remote processes. There are a number of small changes that hit a bunch of files: 1. Changed the RMGR and PLS APIs to add "signal_job" and "signal_proc" entry points. Only the "signal_job" entries are implemented - none of the components have implementations for "signal_proc" at this time. Thus, you can signal all of the procs in a job, but cannot currently signal only one specific proc. 2. Implemented those new API functions in all components except xgrid (Brian will do so very soon). Only the rsh/ssh and fork modules have been tested, however, and only under OS-X. 3. Added signal traps and callback functions for SIGUSR1/2 to orterun/mpirun that catch those signals and call the appropriate commands to propagate them out to all processes in the job. 4. Added a new test directory under the orte branch to (eventually) hold unit and system level tests for just the run-time. Since our test branch of the repository is under restricted access, people working on the RTE were continually developing their own system-level tests - thus making it hard to help diagnose problems. I have moved the more commonly-used functions here, and added one specifically for testing the SIGUSR1/2 functionality. I will be contacting people directly to seek help with testing the changes on more environments. Other than compile issues, you should see absolutely no change in behavior on any of your systems - this additional functionality is transparent to anyone who does not issue a SIGUSR1/2 to mpirun. Ralph This commit was SVN r10258.
2006-06-08 18:27:17 +00:00
return rc;
}
static int lookup_set(char *a, char *b, char *c, int default_val,
char *token, int *argc, char ***argv)
{
int id, rc;
id = mca_base_param_find(a, b, c);
if (id < 0) {
id = mca_base_param_register_int(a, b, c, NULL, default_val);
}
mca_base_param_lookup_int(id, &rc);
if (rc) {
opal_argv_append(argc, argv, token);
}
return ORTE_SUCCESS;
}
int orte_pls_base_proxy_mca_argv(int *argc, char ***argv)
{
lookup_set("orte", "debug", NULL, 0, "--debug", argc, argv);
lookup_set("orte", "debug", "daemons", 0, "--debug-daemons", argc, argv);
lookup_set("orte", "debug", "daemons_file", 0, "--debug-daemons-file", argc, argv);
return ORTE_SUCCESS;
}
/**
* Wait for a pending job to complete.
*/
static void orte_pls_rsh_terminate_job_rsp(
int status,
orte_process_name_t* peer,
orte_buffer_t* rsp,
orte_rml_tag_t tag,
void* cbdata)
{
int rc;
if (ORTE_SUCCESS != (rc = orte_rmgr_base_unpack_rsp(rsp))) {
ORTE_ERROR_LOG(rc);
}
}
static void orte_pls_rsh_terminate_job_cb(
int status,
orte_process_name_t* peer,
orte_buffer_t* req,
orte_rml_tag_t tag,
void* cbdata)
{
/* wait for response */
int rc;
if (status < 0) {
ORTE_ERROR_LOG(status);
if(NULL != req)
OBJ_RELEASE(req);
return;
}
if (0 > (rc = orte_rml.recv_buffer_nb(peer, ORTE_RML_TAG_RMGR_CLNT, 0, orte_pls_rsh_terminate_job_rsp, NULL))) {
ORTE_ERROR_LOG(rc);
}
if(NULL != req)
OBJ_RELEASE(req);
}
int
orte_pls_base_proxy_terminate_job(orte_jobid_t jobid)
{
char *keys[2];
char *jobid_string;
orte_gpr_value_t** values = NULL;
size_t i, j, num_values = 0;
orte_process_name_t proc, *pnptr;
int rc;
if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
asprintf(&keys[0], "%s-%s", ORTE_NODE_BOOTPROXY_KEY, jobid_string);
keys[1] = NULL;
rc = orte_gpr.get(
ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR,
ORTE_NODE_SEGMENT,
NULL,
keys,
&num_values,
&values
);
if (rc != ORTE_SUCCESS) {
free(jobid_string);
return rc;
}
if (0 == num_values) {
rc = ORTE_ERR_NOT_FOUND;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
for(i=0; i<num_values; i++) {
orte_gpr_value_t* value = values[i];
for(j=0; j<value->cnt; j++) {
orte_gpr_keyval_t* keyval = value->keyvals[j];
orte_buffer_t *cmd = OBJ_NEW(orte_buffer_t);
int ret;
if (cmd == NULL) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (strcmp(keyval->key, keys[0]) != 0)
continue;
/* construct command */
ret = orte_rmgr_base_pack_cmd(cmd, ORTE_RMGR_CMD_TERM_JOB, jobid);
if (ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
rc = ret;
continue;
}
/* get the process name from the returned keyval */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&pnptr, values[i]->keyvals[0]->value, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cmd);
rc = ret;
continue;
}
proc = *pnptr;
/* send a terminate message to the bootproxy on each node */
if (0 > (ret = orte_rml.send_buffer_nb(
&proc,
cmd,
ORTE_RML_TAG_RMGR_SVC,
0,
orte_pls_rsh_terminate_job_cb,
NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
rc = ret;
continue;
}
}
}
cleanup:
free(jobid_string);
free(keys[0]);
if (NULL != values) {
for(i=0; i<num_values; i++) {
if (NULL != values[i]) {
OBJ_RELEASE(values[i]);
}
}
if (NULL != values ) free(values);
}
return rc;
}
int
orte_pls_base_proxy_terminate_proc(const orte_process_name_t *proc)
{
return ORTE_ERR_NOT_IMPLEMENTED;
}
Add ability to trap and propagate SIGUSR1/2 to remote processes. There are a number of small changes that hit a bunch of files: 1. Changed the RMGR and PLS APIs to add "signal_job" and "signal_proc" entry points. Only the "signal_job" entries are implemented - none of the components have implementations for "signal_proc" at this time. Thus, you can signal all of the procs in a job, but cannot currently signal only one specific proc. 2. Implemented those new API functions in all components except xgrid (Brian will do so very soon). Only the rsh/ssh and fork modules have been tested, however, and only under OS-X. 3. Added signal traps and callback functions for SIGUSR1/2 to orterun/mpirun that catch those signals and call the appropriate commands to propagate them out to all processes in the job. 4. Added a new test directory under the orte branch to (eventually) hold unit and system level tests for just the run-time. Since our test branch of the repository is under restricted access, people working on the RTE were continually developing their own system-level tests - thus making it hard to help diagnose problems. I have moved the more commonly-used functions here, and added one specifically for testing the SIGUSR1/2 functionality. I will be contacting people directly to seek help with testing the changes on more environments. Other than compile issues, you should see absolutely no change in behavior on any of your systems - this additional functionality is transparent to anyone who does not issue a SIGUSR1/2 to mpirun. Ralph This commit was SVN r10258.
2006-06-08 18:27:17 +00:00
/**
* This function gets called when the remote node notifies us that it has sent
* the signal to its respective child processes.
*/
static void orte_pls_rsh_signal_job_rsp(
int status,
orte_process_name_t* peer,
orte_buffer_t* rsp,
orte_rml_tag_t tag,
void* cbdata)
{
int rc;
if (ORTE_SUCCESS != (rc = orte_rmgr_base_unpack_rsp(rsp))) {
ORTE_ERROR_LOG(rc);
}
}
/**
* This function gets called when the corresponding send completes. It then generates
* a non-blocking receive so we can be notified when the action was actually completed
* on the remote node.
*/
static void orte_pls_rsh_signal_job_cb(
int status,
orte_process_name_t* peer,
orte_buffer_t* req,
orte_rml_tag_t tag,
void* cbdata)
{
/* wait for response */
int rc;
if (status < 0) {
ORTE_ERROR_LOG(status);
OBJ_RELEASE(req);
return;
}
if (0 > (rc = orte_rml.recv_buffer_nb(peer, ORTE_RML_TAG_RMGR_CLNT, 0, orte_pls_rsh_signal_job_rsp, NULL))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(req);
}
int
orte_pls_base_proxy_signal_job(orte_jobid_t jobid, int32_t signal)
{
char *keys[2];
char *jobid_string;
orte_gpr_value_t** values = NULL;
size_t i, j, num_values = 0;
orte_process_name_t proc, *pnptr;
int rc;
if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
asprintf(&keys[0], "%s-%s", ORTE_NODE_BOOTPROXY_KEY, jobid_string);
keys[1] = NULL;
rc = orte_gpr.get(
ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR,
ORTE_NODE_SEGMENT,
NULL,
keys,
&num_values,
&values
);
if (rc != ORTE_SUCCESS) {
free(jobid_string);
return rc;
}
if (0 == num_values) {
rc = ORTE_ERR_NOT_FOUND;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
for(i=0; i<num_values; i++) {
orte_gpr_value_t* value = values[i];
for(j=0; j<value->cnt; j++) {
orte_gpr_keyval_t* keyval = value->keyvals[j];
orte_buffer_t *cmd = OBJ_NEW(orte_buffer_t);
int ret;
if (cmd == NULL) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (strcmp(keyval->key, keys[0]) != 0)
continue;
/** construct command */
ret = orte_rmgr_base_pack_signal_job_cmd(cmd, jobid, signal);
if (ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
rc = ret;
continue;
}
/** get the process name from the returned keyval */
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&pnptr, values[i]->keyvals[0]->value, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cmd);
rc = ret;
continue;
}
proc = *pnptr;
/** send a signal message to the bootproxy on each node */
if (0 > (ret = orte_rml.send_buffer_nb(
&proc,
cmd,
ORTE_RML_TAG_RMGR_SVC,
0,
orte_pls_rsh_signal_job_cb,
NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
rc = ret;
continue;
}
}
}
cleanup:
free(jobid_string);
free(keys[0]);
if (NULL != values) {
for(i=0; i<num_values; i++) {
if (NULL != values[i]) {
OBJ_RELEASE(values[i]);
}
}
if (NULL != values ) free(values);
}
return rc;
}
int
orte_pls_base_proxy_signal_proc(const orte_process_name_t *proc, int32_t signal)
{
return ORTE_ERR_NOT_IMPLEMENTED;
}