* Fix incorrect logic in orted so that --no-daemonize works as intended
* Minor formatting fixes in XGrid RAS component * Code cleanup in XGrid PLS component: - If we can't get daemon contact information, kill the job at the XGrid level - Add MCA parameter pls_xgrid_delete_job that will delete the job from XGrid when complete (this seems like standard behavior, so it's the default) - Remove compiler warning about getting the name of a XGGrid object - Properly populate the daemon information for the killing code This commit was SVN r5697.
Этот коммит содержится в:
родитель
4b60235383
Коммит
189a536685
@ -27,7 +27,7 @@
|
||||
*/
|
||||
struct orte_pls_xgrid_component_t {
|
||||
orte_pls_base_component_t super;
|
||||
PlsXgridClient *client;
|
||||
PlsXGridClient *client;
|
||||
NSAutoreleasePool *pool;
|
||||
};
|
||||
typedef struct orte_pls_xgrid_component_t orte_pls_xgrid_component_t;
|
||||
|
@ -28,7 +28,7 @@
|
||||
#import "threads/condition.h"
|
||||
#include "mca/ns/ns_types.h"
|
||||
|
||||
@interface PlsXgridClient : NSObject
|
||||
@interface PlsXGridClient : NSObject
|
||||
{
|
||||
NSString *orted;
|
||||
NSString *controller_hostname;
|
||||
@ -45,10 +45,17 @@
|
||||
XGTwoWayRandomAuthenticator *authenticator;
|
||||
XGController *controller;
|
||||
XGGrid *grid;
|
||||
int cleanup;
|
||||
|
||||
NSMutableDictionary *active_jobs;
|
||||
}
|
||||
|
||||
/* init / finalize */
|
||||
-(id) init;
|
||||
-(id) initWithControllerHostname: (char*) hostnam
|
||||
AndControllerPassword: (char*) password
|
||||
AndOrted: (char*) ortedname
|
||||
AndCleanup: (int) val;
|
||||
-(void) dealloc;
|
||||
|
||||
/* accessors */
|
||||
@ -57,11 +64,12 @@
|
||||
-(void) setOrtedAsCString: (char*) name;
|
||||
-(void) setControllerPasswordAsCString: (char*) name;
|
||||
-(void) setControllerHostnameAsCString: (char*) password;
|
||||
|
||||
-(void) setCleanUp: (int) val;
|
||||
|
||||
/* interface for launch */
|
||||
-(int) connect;
|
||||
-(int) launchJob:(orte_jobid_t) jobid;
|
||||
-(int) terminateJob: (orte_jobid_t) jobid;
|
||||
|
||||
/* delegate for changes */
|
||||
-(void) connectionDidOpen:(XGConnection*) connection;
|
||||
|
@ -30,6 +30,7 @@
|
||||
#import "include/constants.h"
|
||||
#import "mca/ns/ns.h"
|
||||
#import "mca/ras/base/ras_base_node.h"
|
||||
#import "mca/gpr/gpr.h"
|
||||
#import "mca/rml/rml.h"
|
||||
#import "util/path.h"
|
||||
|
||||
@ -37,15 +38,94 @@
|
||||
|
||||
char **environ;
|
||||
|
||||
@implementation PlsXgridClient
|
||||
/**
|
||||
* Set the daemons name in the registry.
|
||||
*/
|
||||
|
||||
static int
|
||||
mca_pls_xgrid_set_node_name(orte_ras_base_node_t* node,
|
||||
orte_jobid_t jobid,
|
||||
orte_process_name_t* name)
|
||||
{
|
||||
orte_gpr_value_t* values[1];
|
||||
orte_gpr_value_t value;
|
||||
orte_gpr_keyval_t kv_name = { { OBJ_CLASS(orte_gpr_keyval_t),0 },
|
||||
ORTE_NODE_BOOTPROXY_KEY,ORTE_NAME };
|
||||
orte_gpr_keyval_t* keyvals[1];
|
||||
char* jobid_string;
|
||||
size_t i;
|
||||
int rc;
|
||||
|
||||
if (ORTE_SUCCESS !=
|
||||
(rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS !=
|
||||
(rc = orte_schema.get_node_tokens(&value.tokens, &value.num_tokens,
|
||||
node->node_cellid, node->node_name))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(jobid_string);
|
||||
return rc;
|
||||
}
|
||||
|
||||
asprintf(&kv_name.key, "%s-%s", ORTE_NODE_BOOTPROXY_KEY, jobid_string);
|
||||
kv_name.value.proc = *name;
|
||||
keyvals[0] = &kv_name;
|
||||
value.keyvals = keyvals;
|
||||
value.cnt = 1;
|
||||
value.addr_mode = ORTE_GPR_OVERWRITE;
|
||||
value.segment = ORTE_NODE_SEGMENT;
|
||||
values[0] = &value;
|
||||
|
||||
rc = orte_gpr.put(1, values);
|
||||
if(ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
free(kv_name.key);
|
||||
free(jobid_string);
|
||||
for (i=0; i<value.num_tokens; i++) free(value.tokens[i]);
|
||||
free(value.tokens);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
@implementation PlsXGridClient
|
||||
|
||||
/* init / finalize */
|
||||
-(id) init
|
||||
{
|
||||
return [self initWithControllerHostname: NULL
|
||||
AndControllerPassword: NULL
|
||||
AndOrted: NULL
|
||||
AndCleanup: 1];
|
||||
}
|
||||
|
||||
-(id) initWithControllerHostname: (char*) hostname
|
||||
AndControllerPassword: (char*) password
|
||||
AndOrted: (char*) ortedname
|
||||
AndCleanup: (int) val
|
||||
{
|
||||
if (self = [super init]) {
|
||||
/* class-specific initialization goes here */
|
||||
OBJ_CONSTRUCT(&state_cond, ompi_condition_t);
|
||||
OBJ_CONSTRUCT(&state_mutex, ompi_mutex_t);
|
||||
|
||||
if (NULL != password) {
|
||||
controller_password = [NSString stringWithCString: password];
|
||||
}
|
||||
if (NULL != hostname) {
|
||||
controller_hostname = [NSString stringWithCString: hostname];
|
||||
}
|
||||
cleanup = val;
|
||||
if (NULL != ortedname) {
|
||||
orted = [NSString stringWithCString: ortedname];
|
||||
}
|
||||
|
||||
active_jobs = [NSMutableDictionary dictionary];
|
||||
}
|
||||
return self;
|
||||
}
|
||||
@ -53,11 +133,37 @@ char **environ;
|
||||
|
||||
-(void) dealloc
|
||||
{
|
||||
/* if supposed to clean up jobs, do so */
|
||||
if (cleanup) {
|
||||
NSArray *keys = [active_jobs allKeys];
|
||||
NSEnumerator *enumerator = [keys objectEnumerator];
|
||||
NSString *key;
|
||||
XGJob *job;
|
||||
XGActionMonitor *actionMonitor;
|
||||
|
||||
while (key = [enumerator nextObject]) {
|
||||
job = [grid jobForIdentifier: [active_jobs objectForKey: key]];
|
||||
|
||||
actionMonitor = [job performDeleteAction];
|
||||
while (XGActionMonitorOutcomeNone == [actionMonitor outcome]) {
|
||||
ompi_progress();
|
||||
}
|
||||
|
||||
/* we should have a result - find out if it worked */
|
||||
if (XGActionMonitorOutcomeSuccess != [actionMonitor outcome]) {
|
||||
NSError *err = [actionMonitor error];
|
||||
fprintf(stderr, "orte:pls:xgrid: cleanup failed: %s\n",
|
||||
[[err localizedFailureReason] cString]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* need to shut down connection */
|
||||
[connection finalize];
|
||||
|
||||
OBJ_DESTRUCT(&state_mutex);
|
||||
OBJ_DESTRUCT(&state_cond);
|
||||
|
||||
[super dealloc];
|
||||
}
|
||||
|
||||
@ -87,6 +193,12 @@ char **environ;
|
||||
}
|
||||
|
||||
|
||||
-(void) setCleanUp: (int) val
|
||||
{
|
||||
cleanup = val;
|
||||
}
|
||||
|
||||
|
||||
/* interface for launch */
|
||||
-(int) connect
|
||||
{
|
||||
@ -115,8 +227,10 @@ char **environ;
|
||||
controller = [[XGController alloc] initWithConnection:connection];
|
||||
ompi_progress();
|
||||
grid = [controller defaultGrid];
|
||||
#if 0 /* gives a warning - need to figure out "right way" */
|
||||
ompi_output(orte_pls_base.pls_output,
|
||||
"pls: xgrid: grid name: %s", [[grid name] cString]);
|
||||
#endif
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -186,7 +300,7 @@ char **environ;
|
||||
[NSArray arrayWithObjects: @"--no-daemonize",
|
||||
@"--bootproxy", [NSString stringWithFormat: @"%d", jobid],
|
||||
@"--name", [NSString stringWithCString: name_str],
|
||||
@"--nodename", [NSString stringWithFormat: @"xgrid-node-%d", i],
|
||||
@"--nodename", [NSString stringWithCString: node->node_name],
|
||||
@"--nsreplica", [NSString stringWithCString: nsuri],
|
||||
@"--gprreplica", [NSString stringWithCString: gpruri],
|
||||
nil];
|
||||
@ -195,8 +309,10 @@ char **environ;
|
||||
[taskSpecifications setObject: task
|
||||
forKey: [NSString stringWithFormat: @"%d", i]];
|
||||
|
||||
free(name_str); free(nsuri); free(gpruri);
|
||||
/* add the node name into the registery */
|
||||
mca_pls_xgrid_set_node_name(node, jobid, name);
|
||||
|
||||
free(name_str); free(nsuri); free(gpruri);
|
||||
vpid++; i++;
|
||||
}
|
||||
|
||||
@ -215,7 +331,7 @@ char **environ;
|
||||
/* Submit the request and get our monitor */
|
||||
XGActionMonitor *actionMonitor =
|
||||
[controller performSubmitJobActionWithJobSpecification: jobSpecification
|
||||
gridIdentifier: nil];
|
||||
gridIdentifier: [grid identifier]];
|
||||
|
||||
/* wait until we have some idea if job succeeded or not */
|
||||
while (XGActionMonitorOutcomeNone == [actionMonitor outcome]) {
|
||||
@ -227,10 +343,15 @@ char **environ;
|
||||
ret = OMPI_SUCCESS;
|
||||
} else {
|
||||
NSError *err = [actionMonitor error];
|
||||
printf("launch failed: %s\n", [[err localizedFailureReason] cString]);
|
||||
fprintf(stderr, "orte:pls:xgrid: launch failed: %s\n",
|
||||
[[err localizedFailureReason] cString]);
|
||||
ret = OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* save the XGJob identifier somewhere we can get to it */
|
||||
[active_jobs setObject: [[actionMonitor results] objectForKey: @"jobIdentifier"]
|
||||
forKey: [NSString stringWithFormat: @"%d", jobid]];
|
||||
|
||||
cleanup:
|
||||
while(NULL != (item = ompi_list_remove_first(&nodes))) {
|
||||
OBJ_RELEASE(item);
|
||||
@ -240,14 +361,40 @@ cleanup:
|
||||
}
|
||||
|
||||
|
||||
-(int) terminateJob: (orte_jobid_t) jobid
|
||||
{
|
||||
int ret;
|
||||
|
||||
/* get our grid */
|
||||
XGJob *job = [grid jobForIdentifier: [active_jobs objectForKey:
|
||||
[NSString stringWithFormat: @"%d", jobid]]];
|
||||
|
||||
XGActionMonitor *actionMonitor = [job performStopAction];
|
||||
while (XGActionMonitorOutcomeNone == [actionMonitor outcome]) {
|
||||
ompi_progress();
|
||||
}
|
||||
|
||||
/* we should have a result - find out if it worked */
|
||||
if (XGActionMonitorOutcomeSuccess == [actionMonitor outcome]) {
|
||||
ret = OMPI_SUCCESS;
|
||||
} else {
|
||||
NSError *err = [actionMonitor error];
|
||||
fprintf(stderr, "orte:pls:xgrid: terminate failed: %s\n",
|
||||
[[err localizedFailureReason] cString]);
|
||||
ret = OMPI_ERROR;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/* delegate for changes */
|
||||
-(void) connectionDidOpen:(XGConnection*) connection
|
||||
{
|
||||
ompi_output(orte_pls_base.pls_output,
|
||||
"pls: xgrid: got connectionDidOpen message");
|
||||
ompi_condition_broadcast(&state_cond);
|
||||
}
|
||||
|
||||
|
||||
-(void) connectionDidNotOpen:(XGConnection*) connection withError: (NSError*) error
|
||||
{
|
||||
ompi_output(orte_pls_base.pls_output,
|
||||
@ -255,6 +402,7 @@ cleanup:
|
||||
ompi_condition_broadcast(&state_cond);
|
||||
}
|
||||
|
||||
|
||||
-(void) connectionDidClose:(XGConnection*) connection;
|
||||
{
|
||||
ompi_output(orte_pls_base.pls_output,
|
||||
|
@ -100,6 +100,7 @@ orte_pls_xgrid_component_open(void)
|
||||
{
|
||||
mca_base_param_register_string("pls", "xgrid", "orted", NULL, "orted");
|
||||
mca_base_param_register_int("pls", "xgrid", "priority", NULL, 20);
|
||||
mca_base_param_register_int("pls", "xgrid", "delete_job", NULL, 1);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -115,21 +116,20 @@ orte_pls_xgrid_component_close(void)
|
||||
orte_pls_base_module_t *
|
||||
orte_pls_xgrid_component_init(int *priority)
|
||||
{
|
||||
int param;
|
||||
char *string;
|
||||
int ret;
|
||||
int ret, val, param;
|
||||
|
||||
if (NULL == getenv("XGRID_CONTROLLER_HOSTNAME") ||
|
||||
NULL == getenv("XGRID_CONTROLLER_PASSWORD")) {
|
||||
ompi_output(orte_pls_base.pls_output,
|
||||
"pls: xgrid: controller info not set");
|
||||
"orte:pls:xgrid: not available: controller info not set");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ompi_output(orte_pls_base.pls_output,
|
||||
"pls: xgrid: initializing PlsXgridClient");
|
||||
"orte:pls:xgrid: initializing PlsXGridClient");
|
||||
mca_pls_xgrid_component.pool = [[NSAutoreleasePool alloc] init];
|
||||
mca_pls_xgrid_component.client = [[PlsXgridClient alloc] init];
|
||||
mca_pls_xgrid_component.client = [[PlsXGridClient alloc] init];
|
||||
|
||||
/* setup daemon name */
|
||||
param = mca_base_param_find("pls", "xgrid", "orted");
|
||||
@ -147,14 +147,18 @@ orte_pls_xgrid_component_init(int *priority)
|
||||
param = mca_base_param_find("pls", "xgrid", "priority");
|
||||
mca_base_param_lookup_int(param, priority);
|
||||
|
||||
param = mca_base_param_find("pls", "xgrid", "delete_job");
|
||||
mca_base_param_lookup_int(param, &val);
|
||||
[mca_pls_xgrid_component.client setCleanUp: val];
|
||||
|
||||
ompi_progress_register(orte_pls_xgrid_progress);
|
||||
|
||||
ompi_output(orte_pls_base.pls_output, "pls: xgrid: initialized");
|
||||
ompi_output(orte_pls_base.pls_output, "orte:pls:xgrid: initialized");
|
||||
|
||||
ret = [mca_pls_xgrid_component.client connect];
|
||||
if (ret != ORTE_SUCCESS) {
|
||||
ompi_output(orte_pls_base.pls_output,
|
||||
"pls: xgrid: connection failed");
|
||||
"orte:pls:xgrid: connection failed");
|
||||
orte_pls_xgrid_finalize();
|
||||
}
|
||||
|
||||
|
@ -124,7 +124,7 @@ orte_pls_xgrid_terminate_job(orte_jobid_t jobid)
|
||||
orte_gpr_value_t** values = NULL;
|
||||
size_t i, j, num_values = 0;
|
||||
int rc;
|
||||
|
||||
|
||||
if(ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&jobid_string, jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
@ -174,7 +174,8 @@ orte_pls_xgrid_terminate_job(orte_jobid_t jobid)
|
||||
continue;
|
||||
}
|
||||
|
||||
/* send a terminate message to the bootproxy on each node */
|
||||
/* send a terminate message to the bootproxy on each node
|
||||
*/
|
||||
if(0 > (ret = orte_rml.send_buffer_nb(
|
||||
&keyval->value.proc,
|
||||
cmd,
|
||||
@ -204,7 +205,14 @@ cleanup:
|
||||
}
|
||||
free(values);
|
||||
}
|
||||
return rc;
|
||||
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
/* ok, now that we've given the orted a chance to clean everything
|
||||
up nicely, kill everything not so nicely */
|
||||
return [mca_pls_xgrid_component.client terminateJob: jobid];
|
||||
} else {
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -87,11 +87,13 @@ static orte_ras_base_module_t *orte_ras_xgrid_init(int* priority)
|
||||
|
||||
if (NULL != getenv("XGRID_CONTROLLER_HOSTNAME") &&
|
||||
NULL != getenv("XGRID_CONTROLLER_PASSWORD")) {
|
||||
ompi_output(orte_ras_base.ras_output, "ras:xgrid: available for selection");
|
||||
ompi_output(orte_ras_base.ras_output,
|
||||
"orte:ras:xgrid: available for selection");
|
||||
return &orte_ras_xgrid_module;
|
||||
}
|
||||
|
||||
/* Sadly, no */
|
||||
ompi_output(orte_ras_base.ras_output, "ras:xgrid: NOT available for selection");
|
||||
ompi_output(orte_ras_base.ras_output,
|
||||
"orte:ras:xgrid: NOT available for selection");
|
||||
return NULL;
|
||||
}
|
||||
|
@ -168,7 +168,7 @@ int main(int argc, char *argv[])
|
||||
|
||||
|
||||
/* detach from controlling terminal */
|
||||
if(orted_globals.debug == false || orted_globals.no_daemonize == true) {
|
||||
if(orted_globals.debug == false && orted_globals.no_daemonize == false) {
|
||||
orte_daemon_init(NULL);
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user