1
1

- added proxy resource manager which is loaded when not the seed

- added support to pls fork/rsh modules for terminate_job

This commit was SVN r5110.
Этот коммит содержится в:
Tim Woodall 2005-03-31 15:47:37 +00:00
родитель 8686c60223
Коммит 447d370905
23 изменённых файлов: 1323 добавлений и 77 удалений

Просмотреть файл

@ -75,6 +75,7 @@ extern char *orte_error_strings[];
#define ORTE_NODE_SLOTS_ALLOC_KEY "orte-node-slots-alloc"
#define ORTE_NODE_SLOTS_MAX_KEY "orte-node-slots-max"
#define ORTE_NODE_ALLOC_KEY "orte-node-alloc"
#define ORTE_NODE_BOOTPROXY_KEY "orte-node-bootproxy"
#define ORTE_JOB_APP_CONTEXT_KEY "orte-job-app-context"
#define ORTE_JOB_SLOTS_KEY "orte-job-slots" /**< number of procs in job */
#define ORTE_JOB_VPID_START_KEY "orte-job-vpid-start"

Просмотреть файл

@ -42,6 +42,7 @@
#include "mca/pls/pls.h"
#include "mca/pls/base/base.h"
#include "mca/rml/rml.h"
#include "mca/gpr/gpr.h"
#include "mca/rmaps/base/base.h"
#include "mca/rmaps/base/rmaps_base_map.h"
#include "mca/soh/soh.h"
@ -95,6 +96,9 @@ static void orte_pls_fork_wait_proc(pid_t pid, int status, void* cbdata)
OMPI_THREAD_UNLOCK(&mca_pls_fork_component.lock);
}
/**
* Fork/exec the specified processes
*/
static int orte_pls_fork_proc(
orte_app_context_t* context,
@ -213,6 +217,10 @@ static int orte_pls_fork_proc(
}
/**
* Launch all processes allocated to the current node.
*/
int orte_pls_fork_launch(orte_jobid_t jobid)
{
ompi_list_t map;
@ -259,11 +267,68 @@ cleanup:
return rc;
}
/**
* Query for all processes allocated to the job and terminate
* those on the current node.
*/
int orte_pls_fork_terminate_job(orte_jobid_t jobid)
{
/* query for the pids allocated on this node */
char *segment;
char *keys[2];
orte_gpr_value_t** values = NULL;
int i, k, num_values = 0;
int rc;
/* query the job segment on the registry */
if(ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
keys[0] = ORTE_NODE_NAME_KEY;
keys[1] = ORTE_PROC_PID_KEY;
keys[2] = NULL;
rc = orte_gpr.get(
ORTE_GPR_KEYS_AND|ORTE_GPR_TOKENS_OR,
segment,
NULL,
keys,
&num_values,
&values
);
if(rc != ORTE_SUCCESS) {
free(segment);
return rc;
}
for(i=0; i<num_values; i++) {
orte_gpr_value_t* value = values[i];
pid_t pid = 0;
for(k=0; k<value->cnt; k++) {
orte_gpr_keyval_t* keyval = value->keyvals[k];
if(strcmp(keyval->key, ORTE_NODE_NAME_KEY) == 0) {
if(strcmp(keyval->value.strptr, orte_system_info.nodename) != 0) {
break;
}
} else if (strcmp(keyval->key, ORTE_PROC_PID_KEY) == 0) {
pid = keyval->value.ui32;
}
}
if(pid != 0) {
kill(pid, SIGKILL);
}
OBJ_RELEASE(value);
}
if(NULL != values)
free(values);
free(segment);
return ORTE_ERR_NOT_IMPLEMENTED;
}
int orte_pls_fork_terminate_proc(const orte_process_name_t* proc)
{
return ORTE_ERR_NOT_IMPLEMENTED;

Просмотреть файл

@ -38,9 +38,11 @@
#include "mca/ns/ns.h"
#include "mca/pls/pls.h"
#include "mca/rml/rml.h"
#include "mca/gpr/gpr.h"
#include "mca/errmgr/errmgr.h"
#include "mca/ras/base/ras_base_node.h"
#include "mca/rmaps/base/rmaps_base_map.h"
#include "mca/rmgr/base/base.h"
#include "mca/soh/soh.h"
#include "mca/soh/base/base.h"
#include "pls_rsh.h"
@ -158,6 +160,58 @@ static void orte_pls_rsh_wait_daemon(pid_t pid, int status, void* cbdata)
OBJ_RELEASE(info);
}
/**
* Set the daemons name in the registry.
*/
static int orte_pls_rsh_set_node_name(orte_ras_base_node_t* node, orte_jobid_t jobid, orte_process_name_t* name)
{
orte_gpr_value_t* values[1];
orte_gpr_value_t value;
orte_gpr_keyval_t kv_name = {{OBJ_CLASS(orte_gpr_keyval_t),0},ORTE_NODE_BOOTPROXY_KEY,ORTE_NAME};
orte_gpr_keyval_t* keyvals[1];
char* jobid_string;
int i, rc;
if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&value.tokens, &value.num_tokens,
node->node_cellid, node->node_name))) {
ORTE_ERROR_LOG(rc);
free(jobid_string);
return rc;
}
asprintf(&kv_name.key, "%s-%s", ORTE_NODE_BOOTPROXY_KEY, jobid_string);
kv_name.value.proc = *name;
keyvals[0] = &kv_name;
value.keyvals = keyvals;
value.cnt = 1;
value.addr_mode = ORTE_GPR_OVERWRITE;
value.segment = ORTE_NODE_SEGMENT;
values[0] = &value;
rc = orte_gpr.put(1, values);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
free(kv_name.key);
free(jobid_string);
for(i=0; i<value.num_tokens; i++)
free(value.tokens[i]);
free(value.tokens);
return rc;
}
/**
* Launch a daemon (bootproxy) on each node. The daemon will be responsible
* for launching the application.
*/
int orte_pls_rsh_launch(orte_jobid_t jobid)
{
@ -253,12 +307,20 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
item != ompi_list_get_end(&nodes);
item = ompi_list_get_next(item)) {
orte_ras_base_node_t* node = (orte_ras_base_node_t*)item;
orte_process_name_t* name;
pid_t pid;
/* setup node name */
argv[node_name_index1] = node->node_name;
argv[node_name_index2] = node->node_name;
/* initialize daemons process name */
rc = orte_ns.create_process_name(&name, node->node_cellid, 0, vpid);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* rsh a child to exec the rsh/ssh session */
pid = fork();
if(pid < 0) {
@ -268,16 +330,9 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
/* child */
if(pid == 0) {
orte_process_name_t* name;
char* name_string;
/* setup process name */
rc = orte_ns.create_process_name(&name, node->node_cellid, 0, vpid);
if(ORTE_SUCCESS != rc) {
ompi_output(0, "orte_pls_rsh: unable to create process name");
exit(-1);
}
rc = orte_ns.get_proc_name_string(&name_string, name);
if(ORTE_SUCCESS != rc) {
ompi_output(0, "orte_pls_rsh: unable to create process name");
@ -311,20 +366,29 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
ompi_condition_wait(&mca_pls_rsh_component.cond, &mca_pls_rsh_component.lock);
OMPI_THREAD_UNLOCK(&mca_pls_rsh_component.lock);
/* setup callback on sigchild */
daemon_info = OBJ_NEW(rsh_daemon_info_t);
OBJ_RETAIN(node);
daemon_info->node = node;
daemon_info->jobid = jobid;
orte_wait_cb(pid, orte_pls_rsh_wait_daemon, daemon_info);
vpid++;
/* save the daemons name on the node */
if (ORTE_SUCCESS != (rc = orte_pls_rsh_set_node_name(node,jobid,name))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if required - add delay to avoid problems w/ X11 authentication */
if (mca_pls_rsh_component.debug && mca_pls_rsh_component.delay) {
sleep(mca_pls_rsh_component.delay);
}
vpid++;
}
free(name);
}
cleanup:
while(NULL != (item = ompi_list_remove_first(&nodes))) {
OBJ_RELEASE(item);
@ -333,9 +397,128 @@ cleanup:
return rc;
}
/**
* Wait for a pending job to complete.
*/
static void orte_pls_rsh_terminate_job_cb(
int status,
orte_process_name_t* peer,
orte_buffer_t* req,
orte_rml_tag_t tag,
void* cbdata)
{
/* wait for response */
orte_buffer_t rsp;
int rc;
OBJ_CONSTRUCT(&rsp, orte_buffer_t);
if(0 > (rc = orte_rml.recv_buffer(peer, &rsp, ORTE_RML_TAG_RMGR_CLNT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rsp);
return;
}
if(ORTE_SUCCESS != (rc = orte_rmgr_base_unpack_rsp(&rsp))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&rsp);
OBJ_RELEASE(req);
}
/**
* Query the registry for all nodes participating in the job
*/
int orte_pls_rsh_terminate_job(orte_jobid_t jobid)
{
return ORTE_ERR_NOT_IMPLEMENTED;
char *keys[2];
char *jobid_string;
orte_gpr_value_t** values = NULL;
int i, j, num_values = 0;
int rc;
if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
asprintf(&keys[0], "%s-%s", ORTE_NODE_BOOTPROXY_KEY, jobid_string);
keys[1] = NULL;
rc = orte_gpr.get(
ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR,
ORTE_NODE_SEGMENT,
NULL,
keys,
&num_values,
&values
);
if(rc != ORTE_SUCCESS) {
free(jobid_string);
return rc;
}
if(0 == num_values) {
rc = ORTE_ERR_NOT_FOUND;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
for(i=0; i<num_values; i++) {
orte_gpr_value_t* value = values[i];
for(j=0; j<value->cnt; j++) {
orte_gpr_keyval_t* keyval = value->keyvals[j];
orte_buffer_t *cmd = OBJ_NEW(orte_buffer_t);
int ret;
if(cmd == NULL) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if(strcmp(keyval->key, keys[0]) != 0)
continue;
/* construct command */
ret = orte_rmgr_base_pack_cmd(cmd, ORTE_RMGR_CMD_TERM_JOB, jobid);
if(ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
rc = ret;
continue;
}
/* send a terminate message to the bootproxy on each node */
if(0 > (ret = orte_rml.send_buffer_nb(
&keyval->value.proc,
cmd,
ORTE_RML_TAG_RMGR_SVC,
0,
orte_pls_rsh_terminate_job_cb,
NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cmd);
rc = ret;
continue;
}
}
}
cleanup:
free(jobid_string);
free(keys[0]);
if(NULL != values) {
for(i=0; i<num_values; i++) {
if(NULL != values[i]) {
OBJ_RELEASE(values[i]);
}
}
free(values);
}
return rc;
}
int orte_pls_rsh_terminate_proc(const orte_process_name_t* proc)

Просмотреть файл

@ -34,6 +34,8 @@ libmca_orte_rmgr_base_la_SOURCES = \
rmgr_base_context.c \
rmgr_base_close.c \
rmgr_base_open.c \
rmgr_base_pack.c \
rmgr_base_unpack.c \
rmgr_base_select.c \
rmgr_base_stage_gate.c \
rmgr_base_stubs.c

Просмотреть файл

@ -27,6 +27,7 @@
#include "include/orte_types.h"
#include "class/ompi_list.h"
#include "dps/dps.h"
#include "mca/mca.h"
#include "mca/gpr/gpr_types.h"
#include "mca/rmgr/rmgr.h"
@ -68,6 +69,36 @@ OMPI_DECLSPEC int orte_rmgr_base_set_job_slots(
orte_jobid_t jobid,
size_t num_slots);
/*
* Pack/unpack
*/
OMPI_DECLSPEC int orte_rmgr_base_pack_cmd(
orte_buffer_t* buffer,
orte_rmgr_cmd_t cmd,
orte_jobid_t jobid);
OMPI_DECLSPEC int orte_rmgr_base_pack_create_cmd(
orte_buffer_t* buffer,
orte_app_context_t** context,
size_t num_context);
OMPI_DECLSPEC int orte_rmgr_base_pack_terminate_proc_cmd(
orte_buffer_t* buffer,
const orte_process_name_t* name);
OMPI_DECLSPEC int orte_rmgr_base_unpack_rsp(
orte_buffer_t* buffer);
OMPI_DECLSPEC int orte_rmgr_base_unpack_create_rsp(
orte_buffer_t* buffer,
orte_jobid_t*);
OMPI_DECLSPEC int orte_rmgr_base_cmd_dispatch(
orte_buffer_t* req,
orte_buffer_t* rsp);
/*
* Base functions that are common to all implementations - can be overridden
*/

132
src/mca/rmgr/base/rmgr_base_pack.c Обычный файл
Просмотреть файл

@ -0,0 +1,132 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include "include/constants.h"
#include "dps/dps.h"
#include "mca/rmgr/base/base.h"
#include "mca/errmgr/errmgr.h"
/*
*
*/
int orte_rmgr_base_pack_cmd(orte_buffer_t* buffer, orte_rmgr_cmd_t cmd, orte_jobid_t jobid)
{
int rc;
rc = orte_dps.pack(buffer, &cmd, 1, ORTE_RMGR_CMD);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, &jobid, 1, ORTE_JOBID);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
/*
*
*/
int orte_rmgr_base_pack_create_cmd(
orte_buffer_t* buffer,
orte_app_context_t** context,
size_t num_context)
{
int rc;
orte_rmgr_cmd_t cmd = ORTE_RMGR_CMD_CREATE;
rc = orte_dps.pack(buffer, &cmd, 1, ORTE_RMGR_CMD);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, &num_context, 1, ORTE_UINT32);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, context, num_context, ORTE_APP_CONTEXT);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
int orte_rmgr_base_pack_terminate_proc_cmd(
orte_buffer_t* buffer,
const orte_process_name_t* name)
{
int rc;
orte_rmgr_cmd_t cmd = ORTE_RMGR_CMD_CREATE;
rc = orte_dps.pack(buffer, &cmd, 1, ORTE_RMGR_CMD);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_dps.pack(buffer, (void*)name, 1, ORTE_NAME);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
int orte_rmgr_base_unpack_rsp(
orte_buffer_t* buffer)
{
int32_t rc;
size_t cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(buffer,&rc,&cnt,ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return rc;
}
int orte_rmgr_base_unpack_create_rsp(
orte_buffer_t* buffer,
orte_jobid_t* jobid)
{
int32_t rc;
size_t cnt;
cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(buffer,jobid,&cnt,ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(buffer,&rc,&cnt,ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return rc;
}

203
src/mca/rmgr/base/rmgr_base_unpack.c Обычный файл
Просмотреть файл

@ -0,0 +1,203 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include "include/constants.h"
#include "dps/dps.h"
#include "mca/rmgr/base/base.h"
#include "mca/errmgr/errmgr.h"
/*
*
*/
static int orte_rmgr_base_cmd_query(orte_buffer_t* req, orte_buffer_t* rsp)
{
int32_t rc = orte_rmgr.query();
return orte_dps.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_create(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
int32_t ret;
orte_app_context_t** context;
orte_jobid_t jobid;
size_t i, cnt, num_context;
cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(req, &num_context, &cnt, ORTE_UINT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(NULL == (context = malloc(sizeof(orte_app_context_t*)*num_context))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
cnt = num_context;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(req, context, &cnt, ORTE_APP_CONTEXT))) {
ORTE_ERROR_LOG(rc);
free(context);
return rc;
}
ret = orte_rmgr.create(context, num_context, &jobid);
if(ORTE_SUCCESS != (rc = orte_dps.pack(rsp, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if(ORTE_SUCCESS != (rc = orte_dps.pack(rsp, &ret, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
cleanup:
for(i=0; i<num_context; i++) {
OBJ_RELEASE(context[i]);
}
free(context);
return ret;
}
static int orte_rmgr_base_cmd_allocate(orte_buffer_t* req, orte_buffer_t* rsp)
{
int32_t rc;
orte_jobid_t jobid;
size_t cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.allocate(jobid);
}
return orte_dps.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_deallocate(orte_buffer_t* req, orte_buffer_t* rsp)
{
int32_t rc;
orte_jobid_t jobid;
size_t cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.deallocate(jobid);
}
return orte_dps.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_map(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_jobid_t jobid;
size_t cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.map(jobid);
}
return orte_dps.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_launch(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_jobid_t jobid;
size_t cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.launch(jobid);
}
return orte_dps.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_term_job(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_jobid_t jobid;
size_t cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.terminate_job(jobid);
}
return orte_dps.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_term_proc(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_process_name_t name;
size_t cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dps.unpack(req, &name, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.terminate_proc(&name);
}
return orte_dps.pack(rsp, &rc, 1, ORTE_INT32);
}
int orte_rmgr_base_cmd_dispatch(orte_buffer_t* req, orte_buffer_t* rsp)
{
orte_rmgr_cmd_t cmd;
size_t cnt = 1;
int rc;
rc = orte_dps.unpack(req, &cmd, &cnt, ORTE_RMGR_CMD);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
switch(cmd) {
case ORTE_RMGR_CMD_QUERY:
return orte_rmgr_base_cmd_query(req,rsp);
case ORTE_RMGR_CMD_CREATE:
return orte_rmgr_base_cmd_create(req,rsp);
case ORTE_RMGR_CMD_ALLOCATE:
return orte_rmgr_base_cmd_allocate(req,rsp);
case ORTE_RMGR_CMD_DEALLOCATE:
return orte_rmgr_base_cmd_deallocate(req,rsp);
case ORTE_RMGR_CMD_MAP:
return orte_rmgr_base_cmd_map(req,rsp);
case ORTE_RMGR_CMD_LAUNCH:
return orte_rmgr_base_cmd_launch(req,rsp);
case ORTE_RMGR_CMD_TERM_JOB:
return orte_rmgr_base_cmd_term_job(req,rsp);
case ORTE_RMGR_CMD_TERM_PROC:
return orte_rmgr_base_cmd_term_proc(req,rsp);
default:
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
}

45
src/mca/rmgr/proxy/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,45 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University.
# All rights reserved.
# Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
# All rights reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Use the top-level Makefile.options
include $(top_ompi_srcdir)/config/Makefile.options
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if OMPI_BUILD_rmgr_proxy_DSO
component_noinst =
component_install = mca_rmgr_proxy.la
else
component_noinst = libmca_rmgr_proxy.la
component_install =
endif
proxy_SOURCES = \
rmgr_proxy.c \
rmgr_proxy.h \
rmgr_proxy_component.c
mcacomponentdir = $(libdir)/openmpi
mcacomponent_LTLIBRARIES = $(component_install)
mca_rmgr_proxy_la_SOURCES = $(proxy_SOURCES)
mca_rmgr_proxy_la_LIBADD =
mca_rmgr_proxy_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_rmgr_proxy_la_SOURCES = $(proxy_SOURCES)
libmca_rmgr_proxy_la_LIBADD =
libmca_rmgr_proxy_la_LDFLAGS = -module -avoid-version

20
src/mca/rmgr/proxy/configure.params Обычный файл
Просмотреть файл

@ -0,0 +1,20 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University.
# All rights reserved.
# Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
# All rights reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Specific to this module
PARAM_INIT_FILE=rmgr_proxy.c
PARAM_CONFIG_HEADER_FILE="proxy_config.h"
PARAM_CONFIG_FILES="Makefile"

366
src/mca/rmgr/proxy/rmgr_proxy.c Обычный файл
Просмотреть файл

@ -0,0 +1,366 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include "include/constants.h"
#include "util/output.h"
#include "mca/errmgr/errmgr.h"
#include "mca/rmgr/base/base.h"
#include "mca/rml/rml.h"
#include "mca/iof/iof.h"
#include "rmgr_proxy.h"
static int orte_rmgr_proxy_query(void);
static int orte_rmgr_proxy_create(
orte_app_context_t** app_context,
size_t num_context,
orte_jobid_t* jobid);
static int orte_rmgr_proxy_allocate(
orte_jobid_t jobid);
static int orte_rmgr_proxy_deallocate(
orte_jobid_t jobid);
static int orte_rmgr_proxy_map(
orte_jobid_t jobid);
static int orte_rmgr_proxy_launch(
orte_jobid_t jobid);
static int orte_rmgr_proxy_terminate_job(
orte_jobid_t jobid);
static int orte_rmgr_proxy_terminate_proc(
const orte_process_name_t* proc_name);
static int orte_rmgr_proxy_spawn(
orte_app_context_t** app_context,
size_t num_context,
orte_jobid_t* jobid,
orte_rmgr_cb_fn_t cbfn);
orte_rmgr_base_module_t orte_rmgr_proxy_module = {
orte_rmgr_proxy_query,
orte_rmgr_proxy_create,
orte_rmgr_proxy_allocate,
orte_rmgr_proxy_deallocate,
orte_rmgr_proxy_map,
orte_rmgr_proxy_launch,
orte_rmgr_proxy_terminate_job,
orte_rmgr_proxy_terminate_proc,
orte_rmgr_proxy_spawn,
orte_rmgr_base_proc_stage_gate_init,
orte_rmgr_base_proc_stage_gate_mgr,
NULL, /* finalize */
};
/*
* Create the job segment and initialize the application context. Could
* do this in the proxy - but allowing the seed to do this moves responsibility
* for the stage gates to the seed. This allows the client to disconnect.
*/
static int orte_rmgr_proxy_create(
orte_app_context_t** app_context,
size_t num_context,
orte_jobid_t* jobid)
{
orte_buffer_t cmd;
orte_buffer_t rsp;
int rc;
/* construct command */
OBJ_CONSTRUCT(&cmd, orte_buffer_t);
rc = orte_rmgr_base_pack_create_cmd(&cmd, app_context, num_context);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&cmd);
return rc;
}
if(0 > (rc = orte_rml.send_buffer(ORTE_RML_NAME_SEED, &cmd, ORTE_RML_TAG_RMGR_SVC, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&cmd);
return rc;
}
OBJ_DESTRUCT(&cmd);
/* wait for response */
OBJ_CONSTRUCT(&rsp, orte_buffer_t);
if(0 > (rc = orte_rml.recv_buffer(ORTE_RML_NAME_SEED, &rsp, ORTE_RML_TAG_RMGR_CLNT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rsp);
return rc;
}
rc = orte_rmgr_base_unpack_create_rsp(&rsp, jobid);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rsp);
return rc;
}
OBJ_DESTRUCT(&rsp);
return rc;
}
static int orte_rmgr_proxy_cmd(orte_rmgr_cmd_t cmd_id, orte_jobid_t jobid)
{
orte_buffer_t cmd;
orte_buffer_t rsp;
int rc;
/* construct command */
OBJ_CONSTRUCT(&cmd, orte_buffer_t);
rc = orte_rmgr_base_pack_cmd(&cmd, cmd_id, jobid);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&cmd);
return rc;
}
if(0 > (rc = orte_rml.send_buffer(ORTE_RML_NAME_SEED, &cmd, ORTE_RML_TAG_RMGR_SVC, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&cmd);
return rc;
}
OBJ_DESTRUCT(&cmd);
/* wait for response */
OBJ_CONSTRUCT(&rsp, orte_buffer_t);
if(0 > (rc = orte_rml.recv_buffer(ORTE_RML_NAME_SEED, &rsp, ORTE_RML_TAG_RMGR_CLNT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rsp);
return rc;
}
rc = orte_rmgr_base_unpack_rsp(&rsp);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rsp);
return rc;
}
OBJ_DESTRUCT(&rsp);
return rc;
}
static int orte_rmgr_proxy_query(void)
{
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_QUERY, 0);
}
static int orte_rmgr_proxy_allocate(orte_jobid_t jobid)
{
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_ALLOCATE, jobid);
}
static int orte_rmgr_proxy_deallocate(orte_jobid_t jobid)
{
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_DEALLOCATE, jobid);
}
static int orte_rmgr_proxy_map(orte_jobid_t jobid)
{
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_MAP, jobid);
}
static int orte_rmgr_proxy_launch(orte_jobid_t jobid)
{
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_LAUNCH, jobid);
}
static int orte_rmgr_proxy_terminate_job(orte_jobid_t jobid)
{
return orte_rmgr_proxy_cmd(ORTE_RMGR_CMD_TERM_JOB, jobid);
}
static int orte_rmgr_proxy_terminate_proc(const orte_process_name_t* proc_name)
{
orte_buffer_t cmd;
orte_buffer_t rsp;
int rc;
/* construct command */
OBJ_CONSTRUCT(&cmd, orte_buffer_t);
rc = orte_rmgr_base_pack_terminate_proc_cmd(&cmd, proc_name);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&cmd);
return rc;
}
if(0 > (rc = orte_rml.send_buffer(ORTE_RML_NAME_SEED, &cmd, ORTE_RML_TAG_RMGR_SVC, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&cmd);
return rc;
}
OBJ_DESTRUCT(&cmd);
/* wait for response */
OBJ_CONSTRUCT(&rsp, orte_buffer_t);
if(0 > (rc = orte_rml.recv_buffer(ORTE_RML_NAME_SEED, &rsp, ORTE_RML_TAG_RMGR_CLNT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rsp);
return rc;
}
rc = orte_rmgr_base_unpack_rsp(&rsp);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rsp);
return rc;
}
OBJ_DESTRUCT(&rsp);
return ORTE_SUCCESS;
}
static void orte_rmgr_proxy_callback(orte_gpr_notify_data_t *data, void *cbdata)
{
orte_rmgr_cb_fn_t cbfunc = (orte_rmgr_cb_fn_t)cbdata;
orte_gpr_keyval_t** keyvals;
orte_jobid_t jobid;
int i, j, rc;
/* get the jobid from the segment name */
if (ORTE_SUCCESS != (rc = orte_schema.extract_jobid_from_segment_name(&jobid, data->segment))) {
ORTE_ERROR_LOG(rc);
ompi_output(0, "[%d,%d,%d] orte_rmgr_proxy_callback: %s\n",
ORTE_NAME_ARGS(orte_process_info.my_name), data->segment);
return;
}
/* determine the state change */
for(i=0; i<data->cnt; i++) {
orte_gpr_value_t* value = data->values[i];
keyvals = value->keyvals;
for(j=0; j<value->cnt; j++) {
orte_gpr_keyval_t* keyval = keyvals[j];
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG1) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG1);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG2) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG1);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_AT_STG3) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_AT_STG3);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_FINALIZED) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_FINALIZED);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_TERMINATED) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_TERMINATED);
continue;
}
if(strcmp(keyval->key, ORTE_PROC_NUM_ABORTED) == 0) {
(*cbfunc)(jobid,ORTE_PROC_STATE_ABORTED);
continue;
}
}
}
}
/*
* Shortcut for the multiple steps involved in spawning a new job.
*/
static int orte_rmgr_proxy_spawn(
orte_app_context_t** app_context,
size_t num_context,
orte_jobid_t* jobid,
orte_rmgr_cb_fn_t cbfunc)
{
int rc;
orte_process_name_t* name;
/*
* Perform resource discovery.
*/
if (ORTE_SUCCESS != (rc = orte_rmgr_proxy_query())) {
ORTE_ERROR_LOG(rc);
return rc;
}
/*
* Initialize job segment and allocate resources
*/
if (ORTE_SUCCESS !=
(rc = orte_rmgr_proxy_create(app_context,num_context,jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rmgr_proxy_allocate(*jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rmgr_proxy_map(*jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/*
* setup I/O forwarding
*/
if (ORTE_SUCCESS != (rc = orte_ns.create_process_name(&name, 0, *jobid, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_iof.iof_pull(name, ORTE_NS_CMP_JOBID, ORTE_IOF_STDOUT, 1))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_iof.iof_pull(name, ORTE_NS_CMP_JOBID, ORTE_IOF_STDERR, 2))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/*
* setup callback
*/
if(NULL != cbfunc) {
rc = orte_rmgr_base_proc_stage_gate_subscribe(*jobid, orte_rmgr_proxy_callback, (void*)cbfunc);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/*
* launch the job
*/
if (ORTE_SUCCESS != (rc = orte_rmgr_proxy_launch(*jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
orte_ns.free_name(&name);
return ORTE_SUCCESS;
}

48
src/mca/rmgr/proxy/rmgr_proxy.h Обычный файл
Просмотреть файл

@ -0,0 +1,48 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
* Universal Resource Manager (URM)
*/
#ifndef ORTE_RMGR_URM_H
#define ORTE_RMGR_URM_H
#include "mca/rmgr/rmgr.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* URM component structure -- add some stuff beyond what is in the
* normal rmgr component.
*/
struct orte_rmgr_proxy_component_t {
/** Base rmgr component */
orte_rmgr_base_component_t super;
};
/** Convenience typedef */
typedef struct orte_rmgr_proxy_component_t orte_rmgr_proxy_component_t;
/** Global URM component */
OMPI_COMP_EXPORT extern orte_rmgr_proxy_component_t mca_rmgr_proxy_component;
/** Global URM module */
OMPI_COMP_EXPORT extern orte_rmgr_base_module_t orte_rmgr_proxy_module;
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

88
src/mca/rmgr/proxy/rmgr_proxy_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,88 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "include/orte_constants.h"
#include "util/proc_info.h"
#include "util/output.h"
#include "mca/errmgr/errmgr.h"
#include "mca/rds/base/base.h"
#include "mca/ras/base/base.h"
#include "mca/rmaps/base/base.h"
#include "mca/pls/base/base.h"
#include "rmgr_proxy.h"
/*
* Local functions
*/
static int orte_rmgr_proxy_open(void);
static int orte_rmgr_proxy_close(void);
static orte_rmgr_base_module_t* orte_rmgr_proxy_init(int *priority);
orte_rmgr_proxy_component_t mca_rmgr_proxy_component = {
{
/* First, the mca_base_component_t struct containing meta
information about the component itself */
{
/* Indicate that we are a iof v1.0.0 component (which also
implies a specific MCA version) */
ORTE_RMGR_BASE_VERSION_1_0_0,
"proxy", /* MCA component name */
1, /* MCA component major version */
0, /* MCA component minor version */
0, /* MCA component release version */
orte_rmgr_proxy_open, /* component open */
orte_rmgr_proxy_close /* component close */
},
/* Next the MCA v1.0.0 component meta data */
{
/* Whether the component is checkpointable or not */
false
},
orte_rmgr_proxy_init
}
};
/**
* component open/close/init function
*/
static int orte_rmgr_proxy_open(void)
{
return ORTE_SUCCESS;
}
static orte_rmgr_base_module_t *orte_rmgr_proxy_init(int* priority)
{
*priority = 1;
return &orte_rmgr_proxy_module;
}
/**
* Close all subsystems.
*/
static int orte_rmgr_proxy_close(void)
{
return ORTE_SUCCESS;
}

Просмотреть файл

@ -20,8 +20,23 @@
/*
* REGISTRY KEY NAMES FOR COMMON DATA
*/
#define ORTE_RMGR_APP_LOC "orte-rmgr-app-location"
#define ORTE_RMGR_LAUNCHER "orte-rmgr-launcher"
/*
* Constants for command values
*/
#define ORTE_RMGR_CMD_QUERY 1
#define ORTE_RMGR_CMD_CREATE 2
#define ORTE_RMGR_CMD_ALLOCATE 3
#define ORTE_RMGR_CMD_DEALLOCATE 4
#define ORTE_RMGR_CMD_MAP 5
#define ORTE_RMGR_CMD_LAUNCH 6
#define ORTE_RMGR_CMD_TERM_JOB 7
#define ORTE_RMGR_CMD_TERM_PROC 8
#define ORTE_RMGR_CMD_SPAWN 9
#define ORTE_RMGR_CMD ORTE_UINT32
typedef uint32_t orte_rmgr_cmd_t;
#endif

Просмотреть файл

@ -31,6 +31,8 @@
#include "rmgr_urm.h"
static int orte_rmgr_urm_query(void);
static int orte_rmgr_urm_create(
orte_app_context_t** app_context,
size_t num_context,
@ -61,7 +63,7 @@ static int orte_rmgr_urm_spawn(
orte_rmgr_cb_fn_t cbfn);
orte_rmgr_base_module_t orte_rmgr_urm_module = {
orte_rds_base_query,
orte_rmgr_urm_query,
orte_rmgr_urm_create,
orte_rmgr_urm_allocate,
orte_rmgr_urm_deallocate,
@ -76,6 +78,21 @@ orte_rmgr_base_module_t orte_rmgr_urm_module = {
};
/*
* Resource discovery
*/
static int orte_rmgr_urm_query(void)
{
int rc;
if(ORTE_SUCCESS != (rc = orte_rds_base_query())) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
/*
* Create the job segment and initialize the application context.
*/
@ -89,6 +106,7 @@ static int orte_rmgr_urm_create(
/* allocate a jobid */
if (ORTE_SUCCESS != (rc = orte_ns.create_jobid(jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -96,12 +114,14 @@ static int orte_rmgr_urm_create(
if (ORTE_SUCCESS !=
(rc = orte_rmgr_base_put_app_context(*jobid, app_context,
num_context))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* setup the launch stage gate counters and subscriptions */
if (ORTE_SUCCESS !=
(rc = orte_rmgr_base_proc_stage_gate_init(*jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}

Просмотреть файл

@ -18,12 +18,16 @@
#include "include/orte_constants.h"
#include "util/proc_info.h"
#include "util/output.h"
#include "dps/dps.h"
#include "mca/errmgr/errmgr.h"
#include "mca/rds/base/base.h"
#include "mca/rml/rml.h"
#include "mca/base/mca_base_param.h"
#include "mca/ras/base/base.h"
#include "mca/rmaps/base/base.h"
#include "mca/pls/base/base.h"
#include "mca/rmgr/base/base.h"
#include "rmgr_urm.h"
/*
@ -108,10 +112,61 @@ static int orte_rmgr_urm_open(void)
}
static void orte_rmgr_urm_recv(
int status,
orte_process_name_t* peer,
orte_buffer_t* req,
orte_rml_tag_t tag,
void* cbdata)
{
int rc;
orte_buffer_t rsp;
OBJ_CONSTRUCT(&rsp, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_rmgr_base_cmd_dispatch(req,&rsp))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_rml.send_buffer(peer, &rsp, ORTE_RML_TAG_RMGR_CLNT, 0);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
cleanup:
rc = orte_rml.recv_buffer_nb(
ORTE_RML_NAME_ANY,
ORTE_RML_TAG_RMGR_SVC,
0,
orte_rmgr_urm_recv,
NULL);
if(rc < 0) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&rsp);
}
static orte_rmgr_base_module_t *orte_rmgr_urm_init(int* priority)
{
int rc;
*priority = 1;
char* pls = NULL;
if(orte_process_info.seed == false) {
/* if we are bootproxy - need to be selected */
int id = mca_base_param_register_int("rmgr","bootproxy","jobid",NULL,0);
int jobid = 0;
mca_base_param_lookup_int(id,&jobid);
if(jobid == 0) {
return NULL;
}
/* use fork pls for bootproxy */
id = mca_base_param_register_string("rmgr","bootproxy","pls",NULL,"fork");
mca_base_param_lookup_string(id,&pls);
}
/**
* Select RDS components.
@ -141,11 +196,25 @@ static orte_rmgr_base_module_t *orte_rmgr_urm_init(int* priority)
/**
* Select PLS component
*/
if (NULL == (mca_rmgr_urm_component.urm_pls = orte_pls_base_select(NULL))) {
if (NULL == (mca_rmgr_urm_component.urm_pls = orte_pls_base_select(pls))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return NULL;
}
/**
* Post non-blocking receive
*/
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(
ORTE_RML_NAME_ANY,
ORTE_RML_TAG_RMGR_SVC,
0,
orte_rmgr_urm_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
return NULL;
}
*priority = 100;
return &orte_rmgr_urm_module;
}
@ -161,6 +230,7 @@ static int orte_rmgr_urm_close(void)
* Close Process Launch Subsystem (PLS)
*/
if (ORTE_SUCCESS != (rc = orte_pls_base_close())) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -168,6 +238,7 @@ static int orte_rmgr_urm_close(void)
* Close Resource Mapping Subsystem (RMAPS)
*/
if (ORTE_SUCCESS != (rc = orte_rmaps_base_close())) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -175,6 +246,7 @@ static int orte_rmgr_urm_close(void)
* Close Resource Allocation Subsystem (RAS)
*/
if (ORTE_SUCCESS != (rc = orte_ras_base_close())) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -182,8 +254,14 @@ static int orte_rmgr_urm_close(void)
* Close Resource Discovery Subsystem (RDS)
*/
if (ORTE_SUCCESS != (rc = orte_rds_base_close())) {
ORTE_ERROR_LOG(rc);
return rc;
}
/**
* Cancel pending receive.
*/
orte_rml.recv_cancel(ORTE_RML_NAME_ANY, ORTE_RML_TAG_RMGR_SVC);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -255,7 +255,7 @@ typedef int (*orte_rml_module_send_nb_fn_t)(
typedef int (*orte_rml_module_send_buffer_nb_fn_t)(
orte_process_name_t* peer,
orte_buffer_t buffer,
orte_buffer_t* buffer,
orte_rml_tag_t tag,
int flags,
orte_rml_buffer_callback_fn_t cbfunc,

Просмотреть файл

@ -39,7 +39,8 @@ typedef uint32_t orte_rml_tag_t;
#define ORTE_RML_TAG_IOF_SVC 5
#define ORTE_RML_TAG_IOF_CLNT 6
#define ORTE_RML_TAG_XCAST 7
#define ORTE_RML_TAG_RMGR 8
#define ORTE_RML_TAG_RMGR_SVC 8
#define ORTE_RML_TAG_RMGR_CLNT 9
#define ORTE_RML_TAG_DYNAMIC 2000
#define ORTE_RML_TAG_MAX UINT32_MAX

Просмотреть файл

@ -28,7 +28,7 @@
#include <string.h>
#include "include/orte_constants.h"
#include "util/output.h"
#include "mca/ns/ns.h"
#include "mca/errmgr/errmgr.h"
@ -159,22 +159,19 @@ int orte_schema_get_job_segment_name(char **name, orte_jobid_t jobid)
int orte_schema_extract_jobid_from_segment_name(orte_jobid_t *jobid, char *name)
{
char *jobstring, *tmp;
char *jobstring;
orte_jobid_t job;
int rc;
tmp = strrchr(name, '-');
if (NULL == tmp) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
jobstring = strpbrk(tmp, "0123456789");
jobstring = strrchr(name, '-');
if (NULL == jobstring) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
jobstring++;
if (ORTE_SUCCESS != (rc = orte_ns.convert_string_to_jobid(&job, jobstring))) {
ORTE_ERROR_LOG(rc);
ompi_output(0, "[%d,%d,%d] %s\n", ORTE_NAME_ARGS(orte_process_info.my_name), jobstring);
return rc;
}
*jobid = job;

Просмотреть файл

@ -40,8 +40,7 @@ libs = $(top_builddir)/src/libmpi.la
bin_PROGRAMS = orted
orted_SOURCES = \
orted.h \
orted.c \
orted_bootproxy.c
orted.c
orted_LDADD = $(libs) $(LIBMPI_EXTRA_LIBS)
orted_LDFLAGS = $(LIBMPI_EXTRA_LDFLAGS)

Просмотреть файл

@ -55,6 +55,7 @@
#include "mca/gpr/gpr.h"
#include "mca/rml/rml.h"
#include "mca/soh/soh.h"
#include "mca/rmgr/rmgr.h"
#include "mca/soh/base/base.h"
#include "runtime/runtime.h"
@ -82,7 +83,7 @@ ompi_cmd_line_init_t orte_cmd_line_opts[] = {
{ NULL, NULL, NULL, 'd', NULL, "debug", 0,
&orted_globals.debug, OMPI_CMD_LINE_TYPE_BOOL,
"Run in debug mode (not generally intended for users)" },
{ NULL, NULL, NULL, '\0', NULL, "bootproxy", 1,
{ "rmgr", "bootproxy", "jobid", '\0', NULL, "bootproxy", 1,
&orted_globals.bootproxy, OMPI_CMD_LINE_TYPE_INT,
"Run as boot proxy for <job-id>" },
{ NULL, NULL, NULL, '\0', NULL, "name", 1,
@ -197,7 +198,7 @@ int main(int argc, char *argv[])
fd = open(log_path, O_RDWR|O_CREAT|O_TRUNC, 0666);
if(fd < 0) {
fd = open("/tmp/orted.log", O_RDWR|O_CREAT|O_TRUNC, 0666);
fd = open("/dev/null", O_RDWR|O_CREAT|O_TRUNC, 0666);
}
if(fd >= 0) {
dup2(fd, STDOUT_FILENO);
@ -210,7 +211,7 @@ int main(int argc, char *argv[])
/* check to see if I'm a bootproxy */
if (orted_globals.bootproxy) { /* perform bootproxy-specific things */
if (ORTE_SUCCESS != (ret = orte_daemon_bootproxy())) {
if (ORTE_SUCCESS != (ret = orte_rmgr.launch(orted_globals.bootproxy))) {
ORTE_ERROR_LOG(ret);
}
if (ORTE_SUCCESS != (ret = orte_finalize())) {

Просмотреть файл

@ -1,47 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file **/
#include "orte_config.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "include/orte_constants.h"
#include "runtime/runtime.h"
#include "util/daemon_init.h"
#include "util/univ_info.h"
#include "mca/pls/pls.h"
#include "mca/pls/base/base.h"
#include "orted.h"
int orte_daemon_bootproxy(void)
{
orte_pls_base_module_t* pls;
/* lookup launcher */
pls = orte_pls_base_select("fork");
if(NULL == pls) {
return ORTE_ERR_NOT_AVAILABLE;
}
/* launch the requested procs */
return pls->launch(orted_globals.bootproxy);
}

Просмотреть файл

@ -38,7 +38,6 @@ orte_proc_info_t orte_process_info = {
/* .pid = */ 0,
/* .seed = */ false,
/* .daemon = */ false,
/* .singleton = */ false,
/* .ns_replica_uri = */ NULL,
/* .gpr_replica_uri = */ NULL,
/* .ns_replica = */ NULL,

Просмотреть файл

@ -51,7 +51,6 @@ struct orte_proc_info_t {
pid_t pid; /**< Local process ID for this process */
bool seed; /**< Indicate whether or not this is seed daemon */
bool daemon; /**< Indicate whether or not I am a daemon */
bool singleton; /**< Indicate whether or not I am a singleton */
char *ns_replica_uri; /**< contact info for name services replica */
char *gpr_replica_uri; /**< contact info for registry replica */
orte_process_name_t *ns_replica; /**< Name of my name server replica (NULL=>me) */