Update XGrid RAS and PLS to the new infrastructure. Not yet super well
tested, but starting to get there... This commit was SVN r11810.
Этот коммит содержится в:
родитель
9111f74627
Коммит
9733c8e3bd
@ -34,6 +34,6 @@ struct orte_pls_xgrid_component_t {
|
|||||||
};
|
};
|
||||||
typedef struct orte_pls_xgrid_component_t orte_pls_xgrid_component_t;
|
typedef struct orte_pls_xgrid_component_t orte_pls_xgrid_component_t;
|
||||||
extern orte_pls_xgrid_component_t mca_pls_xgrid_component;
|
extern orte_pls_xgrid_component_t mca_pls_xgrid_component;
|
||||||
extern orte_pls_base_module_1_0_0_t orte_pls_xgrid_module;
|
extern orte_pls_base_module_1_3_0_t orte_pls_xgrid_module;
|
||||||
|
|
||||||
int orte_pls_xgrid_progress(void);
|
int orte_pls_xgrid_progress(void);
|
||||||
|
@ -20,96 +20,23 @@
|
|||||||
|
|
||||||
#import <stdio.h>
|
#import <stdio.h>
|
||||||
|
|
||||||
#import "orte/mca/pls/base/base.h"
|
|
||||||
#import "orte/orte_constants.h"
|
|
||||||
#import "orte/mca/ns/ns.h"
|
|
||||||
#import "orte/mca/ras/base/ras_base_node.h"
|
|
||||||
#import "orte/mca/gpr/gpr.h"
|
|
||||||
#import "orte/mca/rml/rml.h"
|
|
||||||
#import "opal/util/path.h"
|
#import "opal/util/path.h"
|
||||||
|
|
||||||
|
#import "orte/orte_constants.h"
|
||||||
|
#import "orte/mca/rml/rml.h"
|
||||||
|
#import "orte/mca/ns/ns.h"
|
||||||
|
#import "orte/mca/pls/base/base.h"
|
||||||
|
#import "orte/mca/pls/base/pls_private.h"
|
||||||
|
#import "orte/mca/pls/pls.h"
|
||||||
|
#import "orte/mca/errmgr/errmgr.h"
|
||||||
|
#import "orte/mca/ras/ras_types.h"
|
||||||
|
#import "orte/mca/rmaps/base/rmaps_private.h"
|
||||||
|
#import "orte/mca/smr/smr.h"
|
||||||
|
|
||||||
#import "pls_xgrid_client.h"
|
#import "pls_xgrid_client.h"
|
||||||
|
|
||||||
char **environ;
|
char **environ;
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the daemons name in the registry.
|
|
||||||
*/
|
|
||||||
|
|
||||||
static int
|
|
||||||
mca_pls_xgrid_set_node_name(orte_ras_node_t* node,
|
|
||||||
orte_jobid_t jobid,
|
|
||||||
orte_process_name_t* name)
|
|
||||||
{
|
|
||||||
orte_gpr_value_t *values[1], *value;
|
|
||||||
orte_gpr_keyval_t *kv;
|
|
||||||
char* jobid_string;
|
|
||||||
size_t i;
|
|
||||||
int rc;
|
|
||||||
|
|
||||||
values[0] = OBJ_NEW(orte_gpr_value_t);
|
|
||||||
if (NULL == values[0]) {
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
||||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
value = values[0];
|
|
||||||
value->cnt = 1;
|
|
||||||
value->addr_mode = ORTE_GPR_OVERWRITE;
|
|
||||||
value->segment = strdup(ORTE_NODE_SEGMENT);
|
|
||||||
value->keyvals = (orte_gpr_keyval_t**)malloc(value->cnt * sizeof(orte_gpr_keyval_t*));
|
|
||||||
if (NULL == value->keyvals) {
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
||||||
OBJ_RELEASE(value);
|
|
||||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
value->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
|
|
||||||
if (NULL == value->keyvals[0]) {
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
||||||
OBJ_RELEASE(value);
|
|
||||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
kv = value->keyvals[0];
|
|
||||||
|
|
||||||
if (ORTE_SUCCESS !=
|
|
||||||
(rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
OBJ_RELEASE(value);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ORTE_SUCCESS !=
|
|
||||||
(rc = orte_schema.get_node_tokens(&(value->tokens), &(value->num_tokens),
|
|
||||||
node->node_cellid, node->node_name))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
OBJ_RELEASE(value);
|
|
||||||
free(jobid_string);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
asprintf(&(kv->key), "%s-%s", ORTE_NODE_BOOTPROXY_KEY, jobid_string);
|
|
||||||
kv->value = OBJ_NEW(orte_data_value_t);
|
|
||||||
if (NULL == kv->value) {
|
|
||||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
|
||||||
OBJ_RELEASE(value);
|
|
||||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
kv->value->type = ORTE_NAME;
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_dss.copy(&(kv->value->data), name, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
OBJ_RELEASE(value);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = orte_gpr.put(1, values);
|
|
||||||
if(ORTE_SUCCESS != rc) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
}
|
|
||||||
|
|
||||||
OBJ_RELEASE(value);
|
|
||||||
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@implementation PlsXGridClient
|
@implementation PlsXGridClient
|
||||||
|
|
||||||
@ -243,6 +170,8 @@ mca_pls_xgrid_set_node_name(orte_ras_node_t* node,
|
|||||||
"pls: xgrid: connection name: %s", [[connection name] cString]);
|
"pls: xgrid: connection name: %s", [[connection name] cString]);
|
||||||
|
|
||||||
controller = [[XGController alloc] initWithConnection:connection];
|
controller = [[XGController alloc] initWithConnection:connection];
|
||||||
|
/* need to call progress exactly once for some reason to get the
|
||||||
|
controller happy enough to allow us to assign the grid */
|
||||||
opal_progress();
|
opal_progress();
|
||||||
grid = [controller defaultGrid];
|
grid = [controller defaultGrid];
|
||||||
#if 0 /* gives a warning - need to figure out "right way" */
|
#if 0 /* gives a warning - need to figure out "right way" */
|
||||||
@ -256,83 +185,152 @@ mca_pls_xgrid_set_node_name(orte_ras_node_t* node,
|
|||||||
|
|
||||||
-(int) launchJob:(orte_jobid_t) jobid
|
-(int) launchJob:(orte_jobid_t) jobid
|
||||||
{
|
{
|
||||||
opal_list_t nodes, mapping_list;
|
opal_list_t mapping;
|
||||||
opal_list_item_t *item;
|
opal_list_item_t *m_item, *n_item;
|
||||||
int ret;
|
|
||||||
size_t num_nodes;
|
size_t num_nodes;
|
||||||
orte_vpid_t vpid;
|
orte_vpid_t vpid;
|
||||||
int i = 0;
|
int rc, i = 0;
|
||||||
|
opal_list_t daemons;
|
||||||
|
orte_pls_daemon_info_t *dmn;
|
||||||
char *orted_path;
|
char *orted_path;
|
||||||
|
char *nsuri = NULL, *gpruri = NULL;
|
||||||
|
|
||||||
|
/* Query the list of nodes allocated and mapped to this job.
|
||||||
|
* We need the entire mapping for a couple of reasons:
|
||||||
|
* - need the prefix to start with.
|
||||||
|
* - need to know if we are launching on a subset of the allocated nodes
|
||||||
|
*/
|
||||||
|
OBJ_CONSTRUCT(&mapping, opal_list_t);
|
||||||
|
rc = orte_rmaps_base_get_map(jobid, &mapping);
|
||||||
|
if (ORTE_SUCCESS != rc) {
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
num_nodes = 0;
|
||||||
|
for(m_item = opal_list_get_first(&mapping);
|
||||||
|
m_item != opal_list_get_end(&mapping);
|
||||||
|
m_item = opal_list_get_next(m_item)) {
|
||||||
|
orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)m_item;
|
||||||
|
num_nodes += opal_list_get_size(&map->nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Allocate a range of vpids for the daemons.
|
||||||
|
*/
|
||||||
|
if (0 == num_nodes) {
|
||||||
|
return ORTE_ERR_BAD_PARAM;
|
||||||
|
}
|
||||||
|
rc = orte_ns.reserve_range(0, num_nodes, &vpid);
|
||||||
|
if (ORTE_SUCCESS != rc) {
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* setup the orted triggers for passing their launch info */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_smr.init_orted_stage_gates(jobid, num_nodes, NULL, NULL))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* setup a list that will contain the info for all the daemons
|
||||||
|
* so we can store it on the registry when done
|
||||||
|
*/
|
||||||
|
OBJ_CONSTRUCT(&daemons, opal_list_t);
|
||||||
|
|
||||||
/* find orted */
|
/* find orted */
|
||||||
orted_path = opal_path_findv((char*) [orted cString], 0, environ, NULL);
|
orted_path = opal_path_findv((char*) [orted cString], 0, environ, NULL);
|
||||||
|
|
||||||
|
/* setup ns contact info */
|
||||||
|
if (NULL != orte_process_info.ns_replica_uri) {
|
||||||
|
nsuri = strdup(orte_process_info.ns_replica_uri);
|
||||||
|
} else {
|
||||||
|
nsuri = orte_rml.get_uri();
|
||||||
|
}
|
||||||
|
|
||||||
/* query the list of nodes allocated to the job */
|
/* setup gpr contact info */
|
||||||
OBJ_CONSTRUCT(&nodes, opal_list_t);
|
if (NULL != orte_process_info.gpr_replica_uri) {
|
||||||
OBJ_CONSTRUCT(&mapping_list, opal_list_t);
|
gpruri = strdup(orte_process_info.gpr_replica_uri);
|
||||||
ret = orte_rmaps_base_mapped_node_query(&mapping_list, &nodes, jobid);
|
} else {
|
||||||
if (ORTE_SUCCESS != ret) goto cleanup;
|
gpruri = orte_rml.get_uri();
|
||||||
|
}
|
||||||
/* allocate vpids for the daemons */
|
|
||||||
num_nodes = opal_list_get_size(&nodes);
|
|
||||||
if (num_nodes == 0) return ORTE_ERR_BAD_PARAM;
|
|
||||||
ret = orte_ns.reserve_range(0, num_nodes, &vpid);
|
|
||||||
if (ORTE_SUCCESS != ret) goto cleanup;
|
|
||||||
|
|
||||||
/* build up the array of task specifications */
|
/* build up the array of task specifications */
|
||||||
NSMutableDictionary *taskSpecifications = [NSMutableDictionary dictionary];
|
NSMutableDictionary *taskSpecifications = [NSMutableDictionary dictionary];
|
||||||
|
|
||||||
for (item = opal_list_get_first(&nodes);
|
|
||||||
item != opal_list_get_end(&nodes);
|
|
||||||
item = opal_list_get_next(item)) {
|
|
||||||
orte_ras_node_t* node = (orte_ras_node_t*)item;
|
|
||||||
orte_process_name_t* name;
|
|
||||||
char *name_str, *nsuri, *gpruri;
|
|
||||||
|
|
||||||
ret = orte_ns.create_process_name(&name, node->node_cellid, 0, vpid);
|
/*
|
||||||
if(ORTE_SUCCESS != ret) {
|
* iterate through each of the contexts
|
||||||
ORTE_ERROR_LOG(ret);
|
*/
|
||||||
goto cleanup;
|
for (m_item = opal_list_get_first(&mapping);
|
||||||
}
|
m_item != opal_list_get_end(&mapping);
|
||||||
ret = orte_ns.get_proc_name_string(&name_str, name);
|
m_item = opal_list_get_next(m_item)) {
|
||||||
if (ORTE_SUCCESS != ret) {
|
orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)m_item;
|
||||||
ORTE_ERROR_LOG(ret);
|
|
||||||
goto cleanup;
|
/* Iterate through each of the nodes and spin
|
||||||
|
* up a daemon.
|
||||||
|
*/
|
||||||
|
for (n_item = opal_list_get_first(&map->nodes);
|
||||||
|
n_item != opal_list_get_end(&map->nodes);
|
||||||
|
n_item = opal_list_get_next(n_item)) {
|
||||||
|
orte_rmaps_base_node_t* rmaps_node = (orte_rmaps_base_node_t*)n_item;
|
||||||
|
orte_ras_node_t* node = rmaps_node->node;
|
||||||
|
orte_process_name_t* name;
|
||||||
|
char* name_string;
|
||||||
|
|
||||||
|
/* already launched on this node */
|
||||||
|
if (0 != node->node_launched++) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* new daemon - setup to record its info */
|
||||||
|
dmn = OBJ_NEW(orte_pls_daemon_info_t);
|
||||||
|
dmn->active_job = jobid;
|
||||||
|
opal_list_append(&daemons, &dmn->super);
|
||||||
|
|
||||||
|
/* record the node name in the daemon struct */
|
||||||
|
dmn->cell = node->node_cellid;
|
||||||
|
dmn->nodename = strdup(node->node_name);
|
||||||
|
|
||||||
|
/* initialize daemons process name */
|
||||||
|
rc = orte_ns.create_process_name(&name, node->node_cellid, 0, vpid);
|
||||||
|
if (ORTE_SUCCESS != rc) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* save it in the daemon struct */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(dmn->name), name, ORTE_NAME))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* setup per-node options */
|
||||||
|
opal_output(0, "pls:xgrid: launching on node %s",
|
||||||
|
node->node_name);
|
||||||
|
|
||||||
|
/* setup process name */
|
||||||
|
rc = orte_ns.get_proc_name_string(&name_string, name);
|
||||||
|
if (ORTE_SUCCESS != rc) {
|
||||||
|
opal_output(0, "pls:xgrid: unable to create process name");
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
NSMutableDictionary *task = [NSMutableDictionary dictionary];
|
||||||
|
[task setObject: [NSString stringWithCString: orted_path]
|
||||||
|
forKey: XGJobSpecificationCommandKey];
|
||||||
|
NSArray *taskArguments =
|
||||||
|
[NSArray arrayWithObjects: @"--no-daemonize",
|
||||||
|
@"--bootproxy", [NSString stringWithFormat: @"%d", jobid],
|
||||||
|
@"--name", [NSString stringWithCString: name_string],
|
||||||
|
@"--nodename", [NSString stringWithCString: node->node_name],
|
||||||
|
@"--nsreplica", [NSString stringWithCString: nsuri],
|
||||||
|
@"--gprreplica", [NSString stringWithCString: gpruri],
|
||||||
|
nil];
|
||||||
|
[task setObject: taskArguments forKey: XGJobSpecificationArgumentsKey];
|
||||||
|
|
||||||
|
[taskSpecifications setObject: task
|
||||||
|
forKey: [NSString stringWithFormat: @"%d", i]];
|
||||||
|
|
||||||
|
vpid++; i++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL != orte_process_info.ns_replica_uri) {
|
|
||||||
nsuri = strdup(orte_process_info.ns_replica_uri);
|
|
||||||
} else {
|
|
||||||
nsuri = orte_rml.get_uri();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL != orte_process_info.gpr_replica_uri) {
|
|
||||||
gpruri = strdup(orte_process_info.gpr_replica_uri);
|
|
||||||
} else {
|
|
||||||
gpruri = orte_rml.get_uri();
|
|
||||||
}
|
|
||||||
|
|
||||||
NSMutableDictionary *task = [NSMutableDictionary dictionary];
|
|
||||||
[task setObject: [NSString stringWithCString: orted_path]
|
|
||||||
forKey: XGJobSpecificationCommandKey];
|
|
||||||
NSArray *taskArguments =
|
|
||||||
[NSArray arrayWithObjects: @"--no-daemonize",
|
|
||||||
@"--bootproxy", [NSString stringWithFormat: @"%d", jobid],
|
|
||||||
@"--name", [NSString stringWithCString: name_str],
|
|
||||||
@"--nodename", [NSString stringWithCString: node->node_name],
|
|
||||||
@"--nsreplica", [NSString stringWithCString: nsuri],
|
|
||||||
@"--gprreplica", [NSString stringWithCString: gpruri],
|
|
||||||
nil];
|
|
||||||
[task setObject: taskArguments forKey: XGJobSpecificationArgumentsKey];
|
|
||||||
|
|
||||||
[taskSpecifications setObject: task
|
|
||||||
forKey: [NSString stringWithFormat: @"%d", i]];
|
|
||||||
|
|
||||||
/* add the node name into the registery */
|
|
||||||
mca_pls_xgrid_set_node_name(node, jobid, name);
|
|
||||||
|
|
||||||
free(name_str); free(nsuri); free(gpruri);
|
|
||||||
vpid++; i++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* job specification */
|
/* job specification */
|
||||||
@ -359,30 +357,42 @@ mca_pls_xgrid_set_node_name(orte_ras_node_t* node,
|
|||||||
|
|
||||||
/* we should have a result - find out if it worked */
|
/* we should have a result - find out if it worked */
|
||||||
if (XGActionMonitorOutcomeSuccess == [actionMonitor outcome]) {
|
if (XGActionMonitorOutcomeSuccess == [actionMonitor outcome]) {
|
||||||
ret = ORTE_SUCCESS;
|
rc = ORTE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
NSError *err = [actionMonitor error];
|
NSError *err = [actionMonitor error];
|
||||||
fprintf(stderr, "orte:pls:xgrid: launch failed: %s\n",
|
fprintf(stderr, "orte:pls:xgrid: launch failed: %s\n",
|
||||||
[[err localizedFailureReason] cString]);
|
[[err localizedFailureReason] cString]);
|
||||||
ret = ORTE_ERROR;
|
rc = ORTE_ERROR;
|
||||||
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* save the XGJob identifier somewhere we can get to it */
|
/* save the XGJob identifier somewhere we can get to it */
|
||||||
[active_jobs setObject: [[actionMonitor results] objectForKey: @"jobIdentifier"]
|
[active_jobs setObject: [[actionMonitor results] objectForKey: @"jobIdentifier"]
|
||||||
forKey: [NSString stringWithFormat: @"%d", jobid]];
|
forKey: [NSString stringWithFormat: @"%d", jobid]];
|
||||||
|
|
||||||
|
/* all done, so store the daemon info on the registry */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_pls_base_store_active_daemons(&daemons, jobid))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
while(NULL != (item = opal_list_remove_first(&nodes))) {
|
if (NULL != nsuri) free(nsuri);
|
||||||
OBJ_RELEASE(item);
|
if (NULL != gpruri) free(gpruri);
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&nodes);
|
|
||||||
|
|
||||||
while(NULL != (item = opal_list_remove_first(&mapping_list))) {
|
while (NULL != (m_item = opal_list_remove_first(&mapping))) {
|
||||||
OBJ_RELEASE(item);
|
OBJ_RELEASE(m_item);
|
||||||
}
|
}
|
||||||
OBJ_DESTRUCT(&mapping_list);
|
OBJ_DESTRUCT(&mapping);
|
||||||
|
|
||||||
return ret;
|
/* deconstruct the daemon list */
|
||||||
|
while (NULL != (m_item = opal_list_remove_first(&daemons))) {
|
||||||
|
OBJ_RELEASE(m_item);
|
||||||
|
}
|
||||||
|
OBJ_DESTRUCT(&daemons);
|
||||||
|
|
||||||
|
opal_output(0, "pls:xgrid:launch: finished\n");
|
||||||
|
|
||||||
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,7 +36,6 @@
|
|||||||
#import "orte/mca/pls/pls.h"
|
#import "orte/mca/pls/pls.h"
|
||||||
#import "orte/mca/pls/base/base.h"
|
#import "orte/mca/pls/base/base.h"
|
||||||
#import "opal/mca/base/mca_base_param.h"
|
#import "opal/mca/base/mca_base_param.h"
|
||||||
#import "orte/mca/rml/rml.h"
|
|
||||||
|
|
||||||
#import "pls_xgrid.h"
|
#import "pls_xgrid.h"
|
||||||
#import "pls_xgrid_client.h"
|
#import "pls_xgrid_client.h"
|
||||||
@ -65,10 +64,10 @@ orte_pls_xgrid_component_t mca_pls_xgrid_component = {
|
|||||||
about the component itself */
|
about the component itself */
|
||||||
|
|
||||||
{
|
{
|
||||||
/* Indicate that we are a pls v1.0.0 component (which also
|
/* Indicate that we are a pls v1.3.0 component (which also
|
||||||
implies a specific MCA version) */
|
implies a specific MCA version) */
|
||||||
|
|
||||||
ORTE_PLS_BASE_VERSION_1_0_0,
|
ORTE_PLS_BASE_VERSION_1_3_0,
|
||||||
|
|
||||||
/* Component name and version */
|
/* Component name and version */
|
||||||
|
|
||||||
@ -101,9 +100,20 @@ orte_pls_xgrid_component_t mca_pls_xgrid_component = {
|
|||||||
int
|
int
|
||||||
orte_pls_xgrid_component_open(void)
|
orte_pls_xgrid_component_open(void)
|
||||||
{
|
{
|
||||||
mca_base_param_register_string("pls", "xgrid", "orted", NULL, "orted");
|
mca_base_param_reg_string(&mca_pls_xgrid_component.super.pls_version,
|
||||||
mca_base_param_register_int("pls", "xgrid", "priority", NULL, 20);
|
"orted",
|
||||||
mca_base_param_register_int("pls", "xgrid", "delete_job", NULL, 1);
|
"The command name that the component will invoke for the ORTE daemon",
|
||||||
|
false, false, "orted", NULL);
|
||||||
|
|
||||||
|
mca_base_param_reg_int(&mca_pls_xgrid_component.super.pls_version,
|
||||||
|
"priority",
|
||||||
|
"Priority of the xgrid pls component",
|
||||||
|
false, false, 20, NULL);
|
||||||
|
|
||||||
|
mca_base_param_reg_int(&mca_pls_xgrid_component.super.pls_version,
|
||||||
|
"delete_job",
|
||||||
|
"Delete job from XGrid controller's database on job completion",
|
||||||
|
false, false, 1, NULL);
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -39,8 +39,6 @@
|
|||||||
#import "orte/mca/rml/rml.h"
|
#import "orte/mca/rml/rml.h"
|
||||||
#import "orte/mca/gpr/gpr.h"
|
#import "orte/mca/gpr/gpr.h"
|
||||||
#import "orte/mca/errmgr/errmgr.h"
|
#import "orte/mca/errmgr/errmgr.h"
|
||||||
#import "orte/mca/ras/base/ras_base_node.h"
|
|
||||||
#import "orte/mca/rmaps/base/rmaps_base_map.h"
|
|
||||||
#import "orte/mca/rmgr/base/base.h"
|
#import "orte/mca/rmgr/base/base.h"
|
||||||
#import "orte/mca/smr/smr.h"
|
#import "orte/mca/smr/smr.h"
|
||||||
#import "orte/mca/smr/base/base.h"
|
#import "orte/mca/smr/base/base.h"
|
||||||
@ -48,21 +46,24 @@
|
|||||||
|
|
||||||
int orte_pls_xgrid_launch(orte_jobid_t jobid);
|
int orte_pls_xgrid_launch(orte_jobid_t jobid);
|
||||||
int orte_pls_xgrid_terminate_job(orte_jobid_t jobid);
|
int orte_pls_xgrid_terminate_job(orte_jobid_t jobid);
|
||||||
|
int orte_pls_xgrid_terminate_orteds(orte_jobid_t jobid);
|
||||||
int orte_pls_xgrid_terminate_proc(const orte_process_name_t* proc);
|
int orte_pls_xgrid_terminate_proc(const orte_process_name_t* proc);
|
||||||
int orte_pls_xgrid_signal_job(orte_jobid_t, int32_t);
|
int orte_pls_xgrid_signal_job(orte_jobid_t job, int32_t signal);
|
||||||
int orte_pls_xgrid_signal_proc(const orte_process_name_t* proc_name, int32_t);
|
int orte_pls_xgrid_signal_proc(const orte_process_name_t* proc_name, int32_t signal);
|
||||||
int orte_pls_xgrid_finalize(void);
|
int orte_pls_xgrid_finalize(void);
|
||||||
|
|
||||||
|
|
||||||
orte_pls_base_module_1_0_0_t orte_pls_xgrid_module = {
|
orte_pls_base_module_1_3_0_t orte_pls_xgrid_module = {
|
||||||
orte_pls_xgrid_launch,
|
orte_pls_xgrid_launch,
|
||||||
orte_pls_xgrid_terminate_job,
|
orte_pls_xgrid_terminate_job,
|
||||||
|
orte_pls_xgrid_terminate_orteds,
|
||||||
orte_pls_xgrid_terminate_proc,
|
orte_pls_xgrid_terminate_proc,
|
||||||
orte_pls_xgrid_signal_job,
|
orte_pls_xgrid_signal_job,
|
||||||
orte_pls_xgrid_signal_proc,
|
orte_pls_xgrid_signal_proc,
|
||||||
orte_pls_xgrid_finalize
|
orte_pls_xgrid_finalize
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Launch a daemon (bootproxy) on each node. The daemon will be responsible
|
* Launch a daemon (bootproxy) on each node. The daemon will be responsible
|
||||||
* for launching the application.
|
* for launching the application.
|
||||||
@ -70,176 +71,125 @@ orte_pls_base_module_1_0_0_t orte_pls_xgrid_module = {
|
|||||||
int
|
int
|
||||||
orte_pls_xgrid_launch(orte_jobid_t jobid)
|
orte_pls_xgrid_launch(orte_jobid_t jobid)
|
||||||
{
|
{
|
||||||
|
/* handled entirely within the client. Can't just call directly
|
||||||
|
because the client has to be an ObjC class */
|
||||||
return [mca_pls_xgrid_component.client launchJob:jobid];
|
return [mca_pls_xgrid_component.client launchJob:jobid];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for a pending job to complete.
|
* Terminate all processes for a given job
|
||||||
*/
|
|
||||||
|
|
||||||
static void
|
|
||||||
orte_pls_xgrid_terminate_job_rsp(int status,
|
|
||||||
orte_process_name_t* peer,
|
|
||||||
orte_buffer_t* rsp,
|
|
||||||
orte_rml_tag_t tag,
|
|
||||||
void* cbdata)
|
|
||||||
{
|
|
||||||
int rc;
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_rmgr_base_unpack_rsp(rsp))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
orte_pls_xgrid_terminate_job_cb(int status,
|
|
||||||
orte_process_name_t* peer,
|
|
||||||
orte_buffer_t* req,
|
|
||||||
orte_rml_tag_t tag,
|
|
||||||
void* cbdata)
|
|
||||||
{
|
|
||||||
/* wait for response */
|
|
||||||
int rc;
|
|
||||||
if(status < 0) {
|
|
||||||
ORTE_ERROR_LOG(status);
|
|
||||||
OBJ_RELEASE(req);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(0 > (rc = orte_rml.recv_buffer_nb(peer, ORTE_RML_TAG_RMGR_CLNT,
|
|
||||||
0, orte_pls_xgrid_terminate_job_rsp, NULL))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
}
|
|
||||||
OBJ_RELEASE(req);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Query the registry for all nodes participating in the job
|
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
orte_pls_xgrid_terminate_job(orte_jobid_t jobid)
|
orte_pls_xgrid_terminate_job(orte_jobid_t jobid)
|
||||||
{
|
{
|
||||||
char *keys[2];
|
|
||||||
char *jobid_string;
|
|
||||||
orte_gpr_value_t** values = NULL;
|
|
||||||
orte_process_name_t *name;
|
|
||||||
orte_std_cntr_t i, j, num_values = 0;
|
|
||||||
int rc;
|
int rc;
|
||||||
|
opal_list_t daemons;
|
||||||
if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
|
opal_list_item_t *item;
|
||||||
|
|
||||||
|
/* construct the list of active daemons on this job */
|
||||||
|
OBJ_CONSTRUCT(&daemons, opal_list_t);
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
return rc;
|
goto CLEANUP;
|
||||||
}
|
}
|
||||||
|
|
||||||
asprintf(&keys[0], "%s-%s", ORTE_NODE_BOOTPROXY_KEY, jobid_string);
|
/* order them to kill their local procs for this job */
|
||||||
keys[1] = NULL;
|
if (ORTE_SUCCESS != (rc = orte_pls_base_orted_kill_local_procs(&daemons, jobid))) {
|
||||||
|
|
||||||
rc = orte_gpr.get(
|
|
||||||
ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR,
|
|
||||||
ORTE_NODE_SEGMENT,
|
|
||||||
NULL,
|
|
||||||
keys,
|
|
||||||
&num_values,
|
|
||||||
&values
|
|
||||||
);
|
|
||||||
if(rc != ORTE_SUCCESS) {
|
|
||||||
free(jobid_string);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
if(0 == num_values) {
|
|
||||||
rc = ORTE_ERR_NOT_FOUND;
|
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto cleanup;
|
goto CLEANUP;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(i=0; i<num_values; i++) {
|
CLEANUP:
|
||||||
orte_gpr_value_t* value = values[i];
|
while (NULL != (item = opal_list_remove_first(&daemons))) {
|
||||||
for(j=0; j<value->cnt; j++) {
|
OBJ_RELEASE(item);
|
||||||
orte_gpr_keyval_t* keyval = value->keyvals[j];
|
|
||||||
orte_buffer_t *cmd = OBJ_NEW(orte_buffer_t);
|
|
||||||
int ret;
|
|
||||||
if(cmd == NULL) {
|
|
||||||
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
if(strcmp(keyval->key, keys[0]) != 0)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
/* construct command */
|
|
||||||
ret = orte_rmgr_base_pack_cmd(cmd, ORTE_RMGR_CMD_TERM_JOB, jobid);
|
|
||||||
if(ORTE_SUCCESS != ret) {
|
|
||||||
ORTE_ERROR_LOG(ret);
|
|
||||||
OBJ_RELEASE(cmd);
|
|
||||||
rc = ret;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* send a terminate message to the bootproxy on each node
|
|
||||||
*/
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&name, keyval->value, ORTE_NAME))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
OBJ_RELEASE(cmd);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if(0 > (ret = orte_rml.send_buffer_nb(
|
|
||||||
name,
|
|
||||||
cmd,
|
|
||||||
ORTE_RML_TAG_RMGR_SVC,
|
|
||||||
0,
|
|
||||||
orte_pls_xgrid_terminate_job_cb,
|
|
||||||
NULL))) {
|
|
||||||
|
|
||||||
ORTE_ERROR_LOG(ret);
|
|
||||||
OBJ_RELEASE(cmd);
|
|
||||||
rc = ret;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
|
|
||||||
free(jobid_string);
|
|
||||||
free(keys[0]);
|
|
||||||
|
|
||||||
if(NULL != values) {
|
|
||||||
for(i=0; i<num_values; i++) {
|
|
||||||
if(NULL != values[i]) {
|
|
||||||
OBJ_RELEASE(values[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
free(values);
|
|
||||||
}
|
}
|
||||||
|
OBJ_DESTRUCT(&daemons);
|
||||||
|
|
||||||
if (ORTE_SUCCESS != rc) {
|
if (ORTE_SUCCESS != rc) {
|
||||||
/* ok, now that we've given the orted a chance to clean everything
|
rc = [mca_pls_xgrid_component.client terminateJob:jobid];
|
||||||
up nicely, kill everything not so nicely */
|
|
||||||
return [mca_pls_xgrid_component.client terminateJob: jobid];
|
|
||||||
} else {
|
|
||||||
return rc;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Terminate the orteds for a given job
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
orte_pls_xgrid_terminate_orteds(orte_jobid_t jobid)
|
||||||
|
{
|
||||||
|
int rc;
|
||||||
|
opal_list_t daemons;
|
||||||
|
opal_list_item_t *item;
|
||||||
|
|
||||||
|
/* construct the list of active daemons on this job */
|
||||||
|
OBJ_CONSTRUCT(&daemons, opal_list_t);
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto CLEANUP;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* now tell them to die! */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_pls_base_orted_exit(&daemons))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
CLEANUP:
|
||||||
|
while (NULL != (item = opal_list_remove_first(&daemons))) {
|
||||||
|
OBJ_RELEASE(item);
|
||||||
|
}
|
||||||
|
OBJ_DESTRUCT(&daemons);
|
||||||
|
|
||||||
|
if (ORTE_SUCCESS != rc) {
|
||||||
|
rc = [mca_pls_xgrid_component.client terminateJob:jobid];
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Terminate a specific process
|
||||||
|
*/
|
||||||
int
|
int
|
||||||
orte_pls_xgrid_terminate_proc(const orte_process_name_t* proc)
|
orte_pls_xgrid_terminate_proc(const orte_process_name_t* proc)
|
||||||
{
|
{
|
||||||
return ORTE_ERR_NOT_IMPLEMENTED;
|
return ORTE_ERR_NOT_IMPLEMENTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int
|
int
|
||||||
orte_pls_xgrid_signal_job(orte_jobid_t jobid, int32_t sig)
|
orte_pls_xgrid_signal_job(orte_jobid_t jobid, int32_t signal)
|
||||||
{
|
{
|
||||||
return ORTE_ERR_NOT_IMPLEMENTED;
|
int rc;
|
||||||
|
opal_list_t daemons;
|
||||||
|
opal_list_item_t *item;
|
||||||
|
|
||||||
|
/* construct the list of active daemons on this job */
|
||||||
|
OBJ_CONSTRUCT(&daemons, opal_list_t);
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(&daemons, jobid))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
OBJ_DESTRUCT(&daemons);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* order them to pass this signal to their local procs */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_pls_base_orted_signal_local_procs(&daemons, signal))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (NULL != (item = opal_list_remove_first(&daemons))) {
|
||||||
|
OBJ_RELEASE(item);
|
||||||
|
}
|
||||||
|
OBJ_DESTRUCT(&daemons);
|
||||||
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int
|
int
|
||||||
orte_pls_xgrid_signal_proc(const orte_process_name_t* proc_name,
|
orte_pls_xgrid_signal_proc(const orte_process_name_t* proc, int32_t signal)
|
||||||
int32_t sig)
|
|
||||||
{
|
{
|
||||||
return ORTE_ERR_NOT_IMPLEMENTED;
|
return ORTE_ERR_NOT_IMPLEMENTED;
|
||||||
}
|
}
|
||||||
@ -253,8 +203,6 @@ orte_pls_xgrid_finalize(void)
|
|||||||
|
|
||||||
opal_progress_unregister(orte_pls_xgrid_progress);
|
opal_progress_unregister(orte_pls_xgrid_progress);
|
||||||
|
|
||||||
/* cleanup any pending recvs */
|
|
||||||
orte_rml.recv_cancel(ORTE_RML_NAME_ANY, ORTE_RML_TAG_RMGR_CLNT);
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,14 +83,14 @@ orte_ras_xgrid_component_close(void)
|
|||||||
|
|
||||||
static orte_ras_base_module_t *orte_ras_xgrid_init(int* priority)
|
static orte_ras_base_module_t *orte_ras_xgrid_init(int* priority)
|
||||||
{
|
{
|
||||||
|
/* Are we running under a xgrid job? */
|
||||||
|
int id = mca_base_param_find("ras", "xgrid", "priority");
|
||||||
|
mca_base_param_lookup_int(id,priority);
|
||||||
|
|
||||||
/* if we are not an HNP, then we must not be selected */
|
/* if we are not an HNP, then we must not be selected */
|
||||||
if (!orte_process_info.seed) {
|
if (!orte_process_info.seed) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Are we running under a xgrid job? */
|
|
||||||
int id = mca_base_param_find("ras", "xgrid", "priority");
|
|
||||||
mca_base_param_lookup_int(id,priority);
|
|
||||||
|
|
||||||
if (NULL != getenv("XGRID_CONTROLLER_HOSTNAME") &&
|
if (NULL != getenv("XGRID_CONTROLLER_HOSTNAME") &&
|
||||||
NULL != getenv("XGRID_CONTROLLER_PASSWORD")) {
|
NULL != getenv("XGRID_CONTROLLER_PASSWORD")) {
|
||||||
|
@ -27,6 +27,8 @@
|
|||||||
#include "opal/util/output.h"
|
#include "opal/util/output.h"
|
||||||
#include "orte/mca/ras/base/ras_private.h"
|
#include "orte/mca/ras/base/ras_private.h"
|
||||||
#include "orte/mca/rmgr/rmgr.h"
|
#include "orte/mca/rmgr/rmgr.h"
|
||||||
|
#include "orte/mca/rmgr/base/rmgr_private.h"
|
||||||
|
#include "orte/mca/gpr/gpr.h"
|
||||||
#include "ras_xgrid.h"
|
#include "ras_xgrid.h"
|
||||||
|
|
||||||
|
|
||||||
@ -59,7 +61,6 @@ orte_ras_base_module_t orte_ras_xgrid_module = {
|
|||||||
* requested number of nodes/process slots to the job.
|
* requested number of nodes/process slots to the job.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
#include "orte/mca/gpr/gpr.h"
|
|
||||||
static int allocate(orte_jobid_t jobid)
|
static int allocate(orte_jobid_t jobid)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
@ -119,44 +120,33 @@ static int discover(orte_jobid_t jobid, opal_list_t* nodelist)
|
|||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
orte_ras_node_t *node;
|
orte_ras_node_t *node;
|
||||||
opal_list_item_t* item;
|
|
||||||
opal_list_t new_nodes;
|
|
||||||
orte_std_cntr_t num_requested = 0;
|
orte_std_cntr_t num_requested = 0;
|
||||||
orte_std_cntr_t i;
|
orte_std_cntr_t i;
|
||||||
char *hostname;
|
char *hostname;
|
||||||
|
|
||||||
/* how many slots do we need? */
|
/* how many slots do we need? */
|
||||||
if(ORTE_SUCCESS != (ret = orte_rmgr.get_job_slots(jobid, &num_requested))) {
|
if(ORTE_SUCCESS != (ret = orte_rmgr_base_get_job_slots(jobid, &num_requested))) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* create a "node" for each slot */
|
/* create a "node" for each slot */
|
||||||
OBJ_CONSTRUCT(&new_nodes, opal_list_t);
|
|
||||||
for (i = 0 ; i < num_requested ; ++i) {
|
for (i = 0 ; i < num_requested ; ++i) {
|
||||||
asprintf(&hostname, "xgrid-node-%d", (int) i);
|
asprintf(&hostname, "xgrid-node-%d", (int) i);
|
||||||
node = OBJ_NEW(orte_ras_node_t);
|
node = OBJ_NEW(orte_ras_node_t);
|
||||||
node->node_name = hostname;
|
node->node_name = hostname;
|
||||||
node->node_arch = strdup("unknown");
|
node->node_arch = NULL;
|
||||||
node->node_state = ORTE_NODE_STATE_UP;
|
node->node_state = ORTE_NODE_STATE_UP;
|
||||||
node->node_cellid = 0;
|
node->node_cellid = 0;
|
||||||
node->node_slots_inuse = 0;
|
node->node_slots_inuse = 0;
|
||||||
node->node_slots_max = 0;
|
node->node_slots_max = 0;
|
||||||
node->node_slots = 1;
|
node->node_slots = 1;
|
||||||
opal_list_append(&new_nodes, &node->super);
|
opal_list_append(nodelist, &node->super);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add these nodes to the registry, and return all the values */
|
/* Add these nodes to the registry, and return all the values */
|
||||||
opal_output(orte_ras_base.ras_output,
|
opal_output(orte_ras_base.ras_output,
|
||||||
"ras:xgrid:allocate:discover: done -- adding to registry");
|
"ras:xgrid:allocate:discover: done -- adding to registry");
|
||||||
ret = orte_ras_base_node_insert(&new_nodes);
|
ret = orte_ras_base_node_insert(nodelist);
|
||||||
for (item = opal_list_remove_first(&new_nodes);
|
|
||||||
NULL != item; item = opal_list_remove_first(&new_nodes)) {
|
|
||||||
if (ORTE_SUCCESS == ret) {
|
|
||||||
opal_list_append(nodelist, item);
|
|
||||||
} else {
|
|
||||||
OBJ_RELEASE(item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* All done */
|
/* All done */
|
||||||
if (ORTE_SUCCESS == ret) {
|
if (ORTE_SUCCESS == ret) {
|
||||||
@ -166,6 +156,6 @@ static int discover(orte_jobid_t jobid, opal_list_t* nodelist)
|
|||||||
opal_output(orte_ras_base.ras_output,
|
opal_output(orte_ras_base.ras_output,
|
||||||
"ras:xgrid:allocate:discover: failed (rc=%d)", ret);
|
"ras:xgrid:allocate:discover: failed (rc=%d)", ret);
|
||||||
}
|
}
|
||||||
OBJ_DESTRUCT(&new_nodes);
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user