1
1

Add a new test for the db framework, fix some minor bugs in the daemon module

This commit was SVN r23085.
Этот коммит содержится в:
Ralph Castain 2010-05-04 02:38:11 +00:00
родитель 9dfb5c7c62
Коммит 8e7faf9119
2 изменённых файлов: 79 добавлений и 107 удалений

Просмотреть файл

@ -48,34 +48,36 @@ orte_db_base_module_t orte_db_daemon_module = {
/* local types */ /* local types */
typedef struct { typedef struct {
opal_object_t super; opal_object_t super;
orte_process_name_t name;
char *key; char *key;
char *dptr; int32_t size;
int size; uint8_t *bytes;
} orte_db_data_t; } orte_db_data_t;
static void constructor(orte_db_data_t *dt) static void dtconstructor(orte_db_data_t *dt)
{ {
dt->key = NULL; dt->key = NULL;
dt->dptr = NULL; dt->bytes = NULL;
dt->size = 0; dt->size = 0;
} }
static void destructor(orte_db_data_t *dt) static void dtdestructor(orte_db_data_t *dt)
{ {
if (NULL != dt->key) { if (NULL != dt->key) {
free(dt->key); free(dt->key);
} }
if (NULL != dt->dptr) { if (NULL != dt->bytes) {
free(dt->dptr); free(dt->bytes);
} }
} }
OBJ_CLASS_INSTANCE(orte_db_data_t, OBJ_CLASS_INSTANCE(orte_db_data_t,
opal_object_t, opal_object_t,
constructor, dtconstructor,
destructor); dtdestructor);
/* local variables */ /* local variables */
static orte_vpid_t num_recvd; static orte_vpid_t num_recvd;
static bool ack_reqd; static bool ack_reqd;
static opal_pointer_array_t datastore; static opal_pointer_array_t datastore;
static orte_rmcast_channel_t my_group_channel;
/* local functions */ /* local functions */
static void callback_fn(int status, static void callback_fn(int status,
@ -90,12 +92,6 @@ static void recv_cmd(int status,
orte_process_name_t *sender, orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata); opal_buffer_t *buf, void* cbdata);
static void recv_data(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
static void recv_ack(int status, static void recv_ack(int status,
orte_rmcast_channel_t channel, orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
@ -135,15 +131,10 @@ static int init(void)
OBJ_CONSTRUCT(&datastore, opal_pointer_array_t); OBJ_CONSTRUCT(&datastore, opal_pointer_array_t);
opal_pointer_array_init(&datastore, 16, INT_MAX, 16); opal_pointer_array_init(&datastore, 16, INT_MAX, 16);
} else if (ORTE_PROC_IS_APP) { } else if (ORTE_PROC_IS_APP) {
/* recv data back */ /* get my multicast group */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL, my_group_channel = orte_rmcast.query_channel();
ORTE_RMCAST_TAG_DATA,
ORTE_RMCAST_PERSISTENT,
recv_data, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* recv cmd acks */ /* recv responses */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL, if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_GROUP_CHANNEL,
ORTE_RMCAST_TAG_CMD_ACK, ORTE_RMCAST_TAG_CMD_ACK,
ORTE_RMCAST_PERSISTENT, ORTE_RMCAST_PERSISTENT,
@ -178,37 +169,41 @@ static int finalize(void)
static int send_data(orte_db_cmd_t cmd, char *key, void *object, opal_data_type_t type) static int send_data(orte_db_cmd_t cmd, char *key, void *object, opal_data_type_t type)
{ {
opal_buffer_t *buf; opal_buffer_t *buf, dat;
orte_job_t *jdata; orte_job_t *jdata;
orte_proc_t *proc; orte_proc_t *proc;
orte_job_state_t *job_state; orte_job_state_t *job_state;
orte_proc_state_t *proc_state; orte_proc_state_t *proc_state;
int rc; int rc;
bool got_response; bool got_response;
opal_byte_object_t bo;
/* construct the buffer we will use for packing the data */ /* construct the buffer we will use for packing the data */
buf = OBJ_NEW(opal_buffer_t); buf = OBJ_NEW(opal_buffer_t);
opal_dss.pack(buf, &cmd, 1, ORTE_DB_CMD_T); /* add cmd */ opal_dss.pack(buf, &cmd, 1, ORTE_DB_CMD_T); /* add cmd */
opal_dss.pack(buf, &my_group_channel, 1, ORTE_RMCAST_CHANNEL_T); /* tell the server my channel */
opal_dss.pack(buf, &key, 1, OPAL_STRING); /* pack the key */ opal_dss.pack(buf, &key, 1, OPAL_STRING); /* pack the key */
if (NULL != object) {
OBJ_CONSTRUCT(&dat, opal_buffer_t);
/* pack the data */ /* pack the data */
switch (type) { switch (type) {
case ORTE_JOB: case ORTE_JOB:
jdata = (orte_job_t*)object; jdata = (orte_job_t*)object;
opal_dss.pack(buf, &jdata, 1, ORTE_JOB); opal_dss.pack(&dat, &jdata, 1, ORTE_JOB);
break; break;
case ORTE_JOB_STATE: case ORTE_JOB_STATE:
job_state = (orte_job_state_t*)object; job_state = (orte_job_state_t*)object;
opal_dss.pack(buf, job_state, 1, ORTE_JOB_STATE); opal_dss.pack(&dat, job_state, 1, ORTE_JOB_STATE);
break; break;
case ORTE_PROC: case ORTE_PROC:
proc = (orte_proc_t*)object; proc = (orte_proc_t*)object;
opal_dss.pack(buf, &proc, 1, ORTE_PROC); opal_dss.pack(&dat, &proc, 1, ORTE_PROC);
break; break;
case ORTE_PROC_STATE: case ORTE_PROC_STATE:
proc_state = (orte_proc_state_t*)object; proc_state = (orte_proc_state_t*)object;
opal_dss.pack(buf, proc_state, 1, ORTE_PROC_STATE); opal_dss.pack(&dat, proc_state, 1, ORTE_PROC_STATE);
break; break;
default: default:
@ -217,6 +212,12 @@ static int send_data(orte_db_cmd_t cmd, char *key, void *object, opal_data_type_
goto cleanup; goto cleanup;
break; break;
} }
opal_dss.unload(&dat, (void**)&bo.bytes, &bo.size);
opal_dss.pack(buf, &bo.size, 1, OPAL_INT32);
opal_dss.pack(buf, &bo.bytes, bo.size, OPAL_UINT8);
OBJ_DESTRUCT(&dat);
free(bo.bytes);
}
got_response = false; got_response = false;
num_recvd = 0; num_recvd = 0;
@ -229,7 +230,7 @@ static int send_data(orte_db_cmd_t cmd, char *key, void *object, opal_data_type_
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
/* wait for all daemons to ack the request */ /* wait for all daemons to ack the request */
ORTE_PROGRESSED_WAIT(got_response, num_recvd, orte_process_info.num_procs-1); ORTE_PROGRESSED_WAIT(got_response, num_recvd, orte_process_info.num_daemons);
ack_reqd = false; ack_reqd = false;
cleanup: cleanup:
@ -263,34 +264,15 @@ static int set_source(orte_process_name_t *name)
static int fetch(char *key, void *object, opal_data_type_t type) static int fetch(char *key, void *object, opal_data_type_t type)
{ {
opal_buffer_t *buf;
int rc; int rc;
bool got_response;
double cpu_time_used, start; double cpu_time_used, start;
orte_db_cmd_t cmd=ORTE_DB_FETCH_CMD;
TIMER_START(start); TIMER_START(start);
/* construct the buffer we will use for packing the data */ if (ORTE_SUCCESS != (rc = send_data(ORTE_DB_FETCH_CMD, key, NULL, OPAL_INT32))) {
buf = OBJ_NEW(opal_buffer_t);
opal_dss.pack(buf, &cmd, 1, ORTE_DB_CMD_T); /* add cmd */
opal_dss.pack(buf, &key, 1, OPAL_STRING); /* pack the key */
got_response = false;
num_recvd = 0;
ack_reqd = true;
/* send the cmd to all the daemons */
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_DATA_SERVER_CHANNEL,
ORTE_RMCAST_TAG_DATA, buf,
callback_fn, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
/* wait for all daemons to respond */
ORTE_PROGRESSED_WAIT(got_response, num_recvd, orte_process_info.num_procs-1);
ack_reqd = false;
TIMER_STOP(cpu_time_used, start); TIMER_STOP(cpu_time_used, start);
opal_output(0, "%s TOOK %g usecs TO FETCH", opal_output(0, "%s TOOK %g usecs TO FETCH",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -321,33 +303,15 @@ static int update(char *key, void *object, opal_data_type_t type)
static int remove_data(char *key) static int remove_data(char *key)
{ {
opal_buffer_t *buf;
int rc; int rc;
bool got_response;
double start; double start;
double cpu_time_used; double cpu_time_used;
orte_db_cmd_t cmd=ORTE_DB_REMOVE_CMD;
TIMER_START(start); TIMER_START(start);
/* construct the buffer we will use for packing the data */ if (ORTE_SUCCESS != (rc = send_data(ORTE_DB_REMOVE_CMD, key, NULL, OPAL_INT32))) {
buf = OBJ_NEW(opal_buffer_t);
opal_dss.pack(buf, &cmd, 1, ORTE_DB_CMD_T); /* add cmd */
opal_dss.pack(buf, &key, 1, OPAL_STRING); /* pack the key */
got_response = false;
num_recvd = 0;
ack_reqd = true;
/* send the data to all the daemons */
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(ORTE_RMCAST_DATA_SERVER_CHANNEL,
ORTE_RMCAST_TAG_DATA, buf,
callback_fn, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
/* wait for all daemons to ack the request */
ORTE_PROGRESSED_WAIT(got_response, num_recvd, orte_process_info.num_procs-1);
ack_reqd = false;
TIMER_STOP(cpu_time_used, start); TIMER_STOP(cpu_time_used, start);
opal_output(0, "%s TOOK %g usecs TO REMOVE", opal_output(0, "%s TOOK %g usecs TO REMOVE",
@ -384,11 +348,12 @@ static void recv_cmd(int status,
opal_buffer_t *buf, void* cbdata) opal_buffer_t *buf, void* cbdata)
{ {
orte_db_cmd_t cmd; orte_db_cmd_t cmd;
opal_buffer_t *ans, xfer; opal_buffer_t *ans;
int count, i; int count, i;
int32_t rc; int32_t rc, ret;
char *key; char *key;
orte_db_data_t *dat; orte_db_data_t *dat;
orte_rmcast_channel_t ch;
OPAL_OUTPUT_VERBOSE((2, orte_db_base_output, OPAL_OUTPUT_VERBOSE((2, orte_db_base_output,
"%s db:daemon: cmd recvd from %s", "%s db:daemon: cmd recvd from %s",
@ -398,6 +363,8 @@ static void recv_cmd(int status,
count=1; count=1;
opal_dss.unpack(buf, &cmd, &count, ORTE_DB_CMD_T); opal_dss.unpack(buf, &cmd, &count, ORTE_DB_CMD_T);
count=1; count=1;
opal_dss.unpack(buf, &ch, &count, ORTE_RMCAST_CHANNEL_T);
count=1;
opal_dss.unpack(buf, &key, &count, OPAL_STRING); opal_dss.unpack(buf, &key, &count, OPAL_STRING);
ans = OBJ_NEW(opal_buffer_t); ans = OBJ_NEW(opal_buffer_t);
@ -406,8 +373,13 @@ static void recv_cmd(int status,
switch (cmd) { switch (cmd) {
case ORTE_DB_STORE_CMD: case ORTE_DB_STORE_CMD:
dat = OBJ_NEW(orte_db_data_t); dat = OBJ_NEW(orte_db_data_t);
dat->name.jobid = sender->jobid;
dat->name.vpid = sender->vpid;
dat->key = key; dat->key = key;
opal_dss.unload(buf, (void**)&dat->dptr, &dat->size); count=1;
opal_dss.unpack(buf, &dat->size, &count, OPAL_INT32);
dat->bytes = (uint8_t*)malloc(dat->size);
opal_dss.unpack(buf, dat->bytes, &dat->size, OPAL_UINT8);
opal_pointer_array_add(&datastore, dat); opal_pointer_array_add(&datastore, dat);
OPAL_OUTPUT_VERBOSE((2, orte_db_base_output, OPAL_OUTPUT_VERBOSE((2, orte_db_base_output,
"%s db:daemon: data from %s stored: key %s", "%s db:daemon: data from %s stored: key %s",
@ -428,10 +400,8 @@ static void recv_cmd(int status,
/* found the data - return it */ /* found the data - return it */
rc = ORTE_SUCCESS; rc = ORTE_SUCCESS;
opal_dss.pack(ans, &rc, 1, OPAL_INT32); opal_dss.pack(ans, &rc, 1, OPAL_INT32);
OBJ_CONSTRUCT(&xfer, opal_buffer_t); opal_dss.pack(ans, &dat->size, 1, OPAL_INT32);
opal_dss.load(&xfer, dat->dptr, dat->size); opal_dss.pack(ans, dat->bytes, dat->size, OPAL_UINT8);
opal_dss.copy_payload(ans, &xfer);
OBJ_DESTRUCT(&xfer);
OPAL_OUTPUT_VERBOSE((2, orte_db_base_output, OPAL_OUTPUT_VERBOSE((2, orte_db_base_output,
"%s db:daemon: data fetched for %s: key %s", "%s db:daemon: data fetched for %s: key %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -450,15 +420,11 @@ static void recv_cmd(int status,
rc = ORTE_ERR_NOT_FOUND; rc = ORTE_ERR_NOT_FOUND;
break; break;
} }
/* open a channel back to the sender */
orte_rmcast.send_buffer(channel, ORTE_RMCAST_TAG_CMD_ACK, ans); if (ORTE_SUCCESS != (ret = orte_rmcast.open_channel(&ch, ORTE_NAME_PRINT(sender),
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(ret);
return;
} }
orte_rmcast.send_buffer_nb(ch, ORTE_RMCAST_TAG_CMD_ACK, ans, callback_fn, NULL);
static void recv_data(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata)
{
} }

Просмотреть файл

@ -41,6 +41,12 @@ int main(int argc, char* argv[])
if (ORTE_SUCCESS != (rc = orte_db.fetch("test-insert", &proc, ORTE_PROC))) { if (ORTE_SUCCESS != (rc = orte_db.fetch("test-insert", &proc, ORTE_PROC))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
if (ORTE_SUCCESS != (rc = orte_db.store("test-insert2", &proc, ORTE_PROC))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != (rc = orte_db.fetch("test-insert2", &proc, ORTE_PROC))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&proc); OBJ_DESTRUCT(&proc);
orte_finalize(); orte_finalize();