1
1
openmpi/orte/mca/db/daemon/db_daemon.c

432 строки
13 KiB
C

/*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <time.h>
#include "opal/class/opal_pointer_array.h"
#include "opal/dss/dss_types.h"
#include "opal/util/output.h"
#include "orte/util/show_help.h"
#include "orte/util/name_fns.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmcast/rmcast.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/db/base/base.h"
#include "db_daemon.h"
static int init(void);
static int finalize(void);
static int store(char *key, void *object, opal_data_type_t type);
static int set_source(orte_process_name_t *name);
static int fetch(char *key, void *object, opal_data_type_t type);
static int update(char *key, void *object, opal_data_type_t type);
static int remove_data(char *key);
orte_db_base_module_t orte_db_daemon_module = {
init,
finalize,
store,
set_source,
fetch,
update,
remove_data
};
/* local types */
typedef struct {
opal_object_t super;
orte_process_name_t name;
char *key;
int32_t size;
uint8_t *bytes;
} orte_db_data_t;
static void dtconstructor(orte_db_data_t *dt)
{
dt->key = NULL;
dt->bytes = NULL;
dt->size = 0;
}
static void dtdestructor(orte_db_data_t *dt)
{
if (NULL != dt->key) {
free(dt->key);
}
if (NULL != dt->bytes) {
free(dt->bytes);
}
}
OBJ_CLASS_INSTANCE(orte_db_data_t,
opal_object_t,
dtconstructor,
dtdestructor);
/* local variables */
static orte_vpid_t num_recvd;
static bool ack_reqd;
static opal_pointer_array_t datastore;
static orte_rmcast_channel_t my_group_channel;
/* local functions */
static void callback_fn(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
static void recv_cmd(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
static void recv_ack(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
#include MCA_timer_IMPLEMENTATION_HEADER
static inline double gettime(void) __opal_attribute_always_inline__;
static inline double gettime(void)
{
double wtime;
#if OPAL_TIMER_USEC_NATIVE
wtime = ((double) opal_timer_base_get_usec()) / 1000000.0;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
wtime = tv.tv_sec;
wtime += (double)tv.tv_usec / 1000000.0;
#endif
return wtime;
}
#define TIMER_START(x) (x) = gettime();
#define TIMER_STOP(y,x) (y) = (gettime() - (x));
static int init(void)
{
int rc;
if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
/* daemons recv data server cmds */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_DATA_SERVER_CHANNEL,
ORTE_RMCAST_TAG_DATA,
ORTE_RMCAST_PERSISTENT,
recv_cmd, NULL))) {
ORTE_ERROR_LOG(rc);
}
OBJ_CONSTRUCT(&datastore, opal_pointer_array_t);
opal_pointer_array_init(&datastore, 16, INT_MAX, 16);
} else if (ORTE_PROC_IS_APP) {
/* get my multicast group */
my_group_channel = orte_rmcast.query_channel();
/* recv responses */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL,
ORTE_RMCAST_TAG_CMD_ACK,
ORTE_RMCAST_PERSISTENT,
recv_ack, NULL))) {
ORTE_ERROR_LOG(rc);
}
}
return ORTE_SUCCESS;
}
static int finalize(void)
{
int i;
orte_db_data_t *dat;
/* cancel the callbacks */
if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
orte_rmcast.cancel_recv(ORTE_RMCAST_DATA_SERVER_CHANNEL, ORTE_RMCAST_TAG_DATA);
for (i=0; i < datastore.size; i++) {
if (NULL != (dat = (orte_db_data_t*)opal_pointer_array_get_item(&datastore, i))) {
OBJ_RELEASE(dat);
}
}
OBJ_DESTRUCT(&datastore);
} else if (ORTE_PROC_IS_APP) {
orte_rmcast.cancel_recv(ORTE_RMCAST_GROUP_CHANNEL, ORTE_RMCAST_TAG_DATA);
orte_rmcast.cancel_recv(ORTE_RMCAST_GROUP_CHANNEL, ORTE_RMCAST_TAG_CMD_ACK);
}
return ORTE_SUCCESS;
}
static int send_data(orte_db_cmd_t cmd, char *key, void *object, opal_data_type_t type)
{
opal_buffer_t *buf, dat;
orte_job_t *jdata;
orte_proc_t *proc;
orte_job_state_t *job_state;
orte_proc_state_t *proc_state;
int rc;
bool got_response;
opal_byte_object_t bo;
/* construct the buffer we will use for packing the data */
buf = OBJ_NEW(opal_buffer_t);
opal_dss.pack(buf, &cmd, 1, ORTE_DB_CMD_T); /* add cmd */
opal_dss.pack(buf, &my_group_channel, 1, ORTE_RMCAST_CHANNEL_T); /* tell the server my channel */
opal_dss.pack(buf, &key, 1, OPAL_STRING); /* pack the key */
if (NULL != object) {
OBJ_CONSTRUCT(&dat, opal_buffer_t);
/* pack the data */
switch (type) {
case ORTE_JOB:
jdata = (orte_job_t*)object;
opal_dss.pack(&dat, &jdata, 1, ORTE_JOB);
break;
case ORTE_JOB_STATE:
job_state = (orte_job_state_t*)object;
opal_dss.pack(&dat, job_state, 1, ORTE_JOB_STATE);
break;
case ORTE_PROC:
proc = (orte_proc_t*)object;
opal_dss.pack(&dat, &proc, 1, ORTE_PROC);
break;
case ORTE_PROC_STATE:
proc_state = (orte_proc_state_t*)object;
opal_dss.pack(&dat, proc_state, 1, ORTE_PROC_STATE);
break;
default:
orte_show_help("help-db-base.txt", "unrecognized-type", true, type);
rc = ORTE_ERR_BAD_PARAM;
goto cleanup;
break;
}
opal_dss.unload(&dat, (void**)&bo.bytes, &bo.size);
opal_dss.pack(buf, &bo.size, 1, OPAL_INT32);
opal_dss.pack(buf, &bo.bytes, bo.size, OPAL_UINT8);
OBJ_DESTRUCT(&dat);
free(bo.bytes);
}
got_response = false;
num_recvd = 0;
ack_reqd = true;
/* send the data to all the daemons */
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_DATA_SERVER_CHANNEL,
ORTE_RMCAST_TAG_DATA, buf,
callback_fn, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* wait for all daemons to ack the request */
ORTE_PROGRESSED_WAIT(got_response, num_recvd, orte_process_info.num_daemons);
ack_reqd = false;
cleanup:
return rc;
}
static int store(char *key, void *object, opal_data_type_t type)
{
int rc;
double start;
double cpu_time_used;
TIMER_START(start);
if (ORTE_SUCCESS != (rc = send_data(ORTE_DB_STORE_CMD, key, object, type))) {
ORTE_ERROR_LOG(rc);
}
TIMER_STOP(cpu_time_used, start);
opal_output(0, "%s TOOK %g usecs TO STORE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cpu_time_used * 1000000.0);
return rc;
}
static int set_source(orte_process_name_t *name)
{
return ORTE_SUCCESS;
}
static int fetch(char *key, void *object, opal_data_type_t type)
{
int rc;
double cpu_time_used, start;
TIMER_START(start);
if (ORTE_SUCCESS != (rc = send_data(ORTE_DB_FETCH_CMD, key, NULL, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
}
TIMER_STOP(cpu_time_used, start);
opal_output(0, "%s TOOK %g usecs TO FETCH",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cpu_time_used * 1000000.0);
return ORTE_SUCCESS;
}
static int update(char *key, void *object, opal_data_type_t type)
{
int rc;
double start;
double cpu_time_used;
TIMER_START(start);
if (ORTE_SUCCESS != (rc = send_data(ORTE_DB_UPDATE_CMD, key, object, type))) {
ORTE_ERROR_LOG(rc);
}
TIMER_STOP(cpu_time_used, start);
opal_output(0, "%s TOOK %g usecs TO UPDATE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cpu_time_used * 1000000.0);
return rc;
}
static int remove_data(char *key)
{
int rc;
double start;
double cpu_time_used;
TIMER_START(start);
if (ORTE_SUCCESS != (rc = send_data(ORTE_DB_REMOVE_CMD, key, NULL, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
}
TIMER_STOP(cpu_time_used, start);
opal_output(0, "%s TOOK %g usecs TO REMOVE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cpu_time_used * 1000000.0);
return rc;
}
static void callback_fn(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{
OBJ_RELEASE(buf);
}
static void recv_ack(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{
if (ack_reqd) {
num_recvd++;
}
}
static void recv_cmd(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{
orte_db_cmd_t cmd;
opal_buffer_t *ans;
int count, i;
int32_t rc;
char *key;
orte_db_data_t *dat;
orte_rmcast_channel_t ch;
char *ch_name;
OPAL_OUTPUT_VERBOSE((2, orte_db_base_output,
"%s db:daemon: cmd recvd from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
count=1;
opal_dss.unpack(buf, &cmd, &count, ORTE_DB_CMD_T);
count=1;
opal_dss.unpack(buf, &ch, &count, ORTE_RMCAST_CHANNEL_T);
count=1;
opal_dss.unpack(buf, &ch_name, &count, OPAL_STRING);
count=1;
opal_dss.unpack(buf, &key, &count, OPAL_STRING);
ans = OBJ_NEW(opal_buffer_t);
opal_dss.pack(ans, &cmd, 1, ORTE_DB_CMD_T);
switch (cmd) {
case ORTE_DB_STORE_CMD:
dat = OBJ_NEW(orte_db_data_t);
dat->name.jobid = sender->jobid;
dat->name.vpid = sender->vpid;
dat->key = key;
count=1;
opal_dss.unpack(buf, &dat->size, &count, OPAL_INT32);
dat->bytes = (uint8_t*)malloc(dat->size);
opal_dss.unpack(buf, dat->bytes, &dat->size, OPAL_UINT8);
opal_pointer_array_add(&datastore, dat);
OPAL_OUTPUT_VERBOSE((2, orte_db_base_output,
"%s db:daemon: data from %s stored: key %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), key));
rc = ORTE_SUCCESS;
opal_dss.pack(ans, &rc, 1, OPAL_INT32);
break;
case ORTE_DB_FETCH_CMD:
/* find the key */
for (i=0; i < datastore.size; i++) {
if (NULL == (dat = (orte_db_data_t*)opal_pointer_array_get_item(&datastore, i))) {
continue;
}
if (0 != strcmp(key, dat->key)) {
continue;
}
/* found the data - return it */
rc = ORTE_SUCCESS;
opal_dss.pack(ans, &rc, 1, OPAL_INT32);
opal_dss.pack(ans, &dat->size, 1, OPAL_INT32);
opal_dss.pack(ans, dat->bytes, dat->size, OPAL_UINT8);
OPAL_OUTPUT_VERBOSE((2, orte_db_base_output,
"%s db:daemon: data fetched for %s: key %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), key));
break;
}
rc = ORTE_ERR_NOT_FOUND;
opal_dss.pack(ans, &rc, 1, OPAL_INT32);
OPAL_OUTPUT_VERBOSE((2, orte_db_base_output,
"%s db:daemon: data fetch request from %s not found: key %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), key));
break;
default:
rc = ORTE_ERR_NOT_FOUND;
break;
}
/* ensure the return channel is open */
orte_rmcast.open_channel(ch, ch_name, NULL, -1, NULL, ORTE_RMCAST_XMIT);
orte_rmcast.send_buffer_nb(ch, ORTE_RMCAST_TAG_CMD_ACK, ans, callback_fn, NULL);
}