1
1
openmpi/orte/mca/rmgr/base/rmgr_base_context.c

300 строки
7.2 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <string.h>
#include "opal/util/trace.h"
#include "orte/include/orte_constants.h"
#include "orte/mca/rmgr/base/base.h"
#include "orte/mca/rds/base/base.h"
#include "orte/mca/ras/base/base.h"
#include "orte/mca/rmaps/base/base.h"
#include "orte/mca/pls/base/base.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/errmgr/errmgr.h"
/*
* Create the job segment and initialize the application context.
*/
int orte_rmgr_base_put_app_context(
orte_jobid_t jobid,
orte_app_context_t** app_context,
size_t num_context)
{
orte_gpr_value_t* value;
size_t i;
size_t job_slots;
int rc;
OPAL_TRACE(2);
rc = orte_rmgr_base_get_job_slots(jobid, &job_slots);
if(ORTE_SUCCESS != rc) {
return rc;
}
value = OBJ_NEW(orte_gpr_value_t);
if (NULL == value) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value->addr_mode = ORTE_GPR_OVERWRITE;
/* put context info on the job segment of the registry */
if(ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&(value->segment), jobid))) {
OBJ_RELEASE(value);
return rc;
}
value->num_tokens = 1;
if(NULL == (value->tokens = (char**)malloc(sizeof(char*)*2))) {
OBJ_RELEASE(value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value->tokens[0] = strdup(ORTE_JOB_GLOBALS);
value->tokens[1] = NULL;
value->cnt = num_context;
value->keyvals = (orte_gpr_keyval_t**)malloc(num_context * sizeof(orte_gpr_keyval_t*));
if(NULL == value->keyvals) {
OBJ_RELEASE(value);
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(value->keyvals, 0, num_context * sizeof(orte_gpr_keyval_t*));
for(i=0; i<num_context; i++) {
orte_app_context_t* app = app_context[i];
value->keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value->keyvals[i]) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
(value->keyvals[i])->key = strdup(ORTE_JOB_APP_CONTEXT_KEY);
(value->keyvals[i])->type = ORTE_APP_CONTEXT;
(value->keyvals[i])->value.app_context = app;
OBJ_RETAIN(app);
app->idx = i;
job_slots += app->num_procs;
}
rc = orte_gpr.put(1, &value);
if(ORTE_SUCCESS != rc) {
goto cleanup;
}
rc = orte_rmgr_base_set_job_slots(jobid, job_slots);
cleanup:
OBJ_RELEASE(value);
return rc;
}
/*
* Comparison function for sorting context by index.
*/
static int orte_rmgr_base_cmp_app_context(
orte_app_context_t** app1,
orte_app_context_t** app2)
{
OPAL_TRACE(3);
if ((*app1)->idx < (*app2)->idx) {
return -1;
} else if((*app1)->idx > (*app2)->idx) {
return 1;
}
return 0;
}
/*
* Retreive the application context
*/
int orte_rmgr_base_get_app_context(
orte_jobid_t jobid,
orte_app_context_t*** app_context,
size_t* num_context)
{
char *segment;
char *tokens[2];
char *keys[2];
orte_gpr_value_t** values = NULL;
size_t i, num_values = 0, index = 0;
int rc;
OPAL_TRACE(2);
/* create the job segment on the registry */
if(ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid)))
return rc;
tokens[0] = ORTE_JOB_GLOBALS;
tokens[1] = NULL;
keys[0] = ORTE_JOB_APP_CONTEXT_KEY;
keys[1] = NULL;
rc = orte_gpr.get(
ORTE_GPR_TOKENS_OR,
segment,
tokens,
keys,
&num_values,
&values
);
if(rc != ORTE_SUCCESS)
goto cleanup;
*num_context = 0;
for(i=0; i<num_values; i++) {
*num_context += values[i]->cnt;
}
*app_context = (orte_app_context_t**)malloc(*num_context * sizeof(orte_app_context_t*));
for(i=0; i<num_values; i++) {
orte_gpr_value_t* value = values[i];
orte_gpr_keyval_t** keyvals = value->keyvals;
size_t k;
for(k=0; k < value->cnt; k++) {
orte_gpr_keyval_t* keyval = keyvals[k];
(*app_context)[index++] = keyval->value.app_context;
keyval->value.app_context = NULL;
}
}
qsort(*app_context, *num_context, sizeof(orte_app_context_t*),
(int (*)(const void*,const void*))orte_rmgr_base_cmp_app_context);
cleanup:
for(i=0; i<num_values; i++) {
OBJ_RELEASE(values[i]);
}
if(NULL != values)
free(values);
free(segment);
return rc;
}
/*
* Query for the total number of process slots requested for the job.
*/
int orte_rmgr_base_get_job_slots(orte_jobid_t jobid, size_t* proc_slots)
{
char *segment;
char *tokens[2];
char *keys[2];
orte_gpr_value_t** values = NULL;
size_t i, num_values = 0;
int rc;
OPAL_TRACE(2);
/* query the job segment on the registry */
if(ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid)))
return rc;
tokens[0] = ORTE_JOB_GLOBALS;
tokens[1] = NULL;
keys[0] = ORTE_JOB_SLOTS_KEY;
keys[1] = NULL;
rc = orte_gpr.get(
ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR,
segment,
tokens,
keys,
&num_values,
&values
);
if(rc != ORTE_SUCCESS) {
free(segment);
return rc;
}
if(0 == num_values) {
*proc_slots = 0;
free(segment);
return ORTE_SUCCESS;
}
if(1 != num_values || values[0]->cnt != 1) {
free(segment);
return ORTE_ERR_NOT_FOUND;
}
*proc_slots = values[0]->keyvals[0]->value.size;
for(i=0; i<num_values; i++)
OBJ_RELEASE(values[i]);
free(segment);
free(values);
return ORTE_SUCCESS;
}
/*
* Set the total number of process slots requested for the job.
*/
int orte_rmgr_base_set_job_slots(orte_jobid_t jobid, size_t proc_slots)
{
char *segment;
char *tokens[2];
orte_gpr_value_t* values[1];
orte_gpr_value_t value;
orte_gpr_keyval_t keyval;
orte_gpr_keyval_t* keyvals[1];
int rc;
OPAL_TRACE(2);
/* query the job segment on the registry */
if(ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid)))
return rc;
tokens[0] = ORTE_JOB_GLOBALS;
tokens[1] = NULL;
keyval.type = ORTE_SIZE;
keyval.key = ORTE_JOB_SLOTS_KEY;
keyval.value.size = proc_slots;
keyvals[0] = &keyval;
value.addr_mode = ORTE_GPR_OVERWRITE;
value.segment = segment;
value.keyvals = keyvals;
value.cnt = 1;
value.tokens = tokens;
value.num_tokens = 1;
values[0] = &value;
rc = orte_gpr.put(1, values);
free(segment);
return rc;
}