1
1

Finish simplified interfaces for put and subscribe - more details to come.

This commit was SVN r6713.
Этот коммит содержится в:
Ralph Castain 2005-08-02 19:43:29 +00:00
родитель ef9e06451c
Коммит 4e1837687b
11 изменённых файлов: 422 добавлений и 164 удалений

Просмотреть файл

@ -42,6 +42,7 @@ int orte_gpr_base_put_1(orte_gpr_addr_mode_t addr_mode,
orte_gpr_keyval_t keyval = { {OBJ_CLASS(opal_object_t),0},
NULL,
0 };
size_t i;
int rc;
value.addr_mode = addr_mode;
@ -54,6 +55,14 @@ int orte_gpr_base_put_1(orte_gpr_addr_mode_t addr_mode,
keyval.value = data_value;
value.tokens = tokens;
/* must count the number of tokens */
if (NULL == tokens) {
value.num_tokens = 0;
} else {
for (i=0; NULL != tokens[i]; i++) {
(value.num_tokens)++;
}
}
values = &value;
/* put the value on the registry */
@ -102,6 +111,14 @@ int orte_gpr_base_put_N(orte_gpr_addr_mode_t addr_mode,
}
value.tokens = tokens;
/* must count the number of tokens */
if (NULL == tokens) {
value.num_tokens = 0;
} else {
for (i=0; NULL != tokens[i]; i++) {
(value.num_tokens)++;
}
}
values = &value;
/* put the value on the registry */

Просмотреть файл

@ -52,6 +52,7 @@ int orte_gpr_base_subscribe_1(orte_gpr_subscription_id_t *id,
orte_gpr_trigger_t *trigs;
orte_gpr_trigger_t trig = { {OBJ_CLASS(opal_object_t),0},
NULL, 0, 0, 0, NULL, 0, NULL };
size_t i;
int rc;
/* assemble the subscription object */
@ -68,17 +69,32 @@ int orte_gpr_base_subscribe_1(orte_gpr_subscription_id_t *id,
value.cnt = 1;
keyvals = &keyval;
value.keyvals = &keyvals;
value.tokens = tokens;
/* must count the number of tokens */
if (NULL == tokens) {
value.num_tokens = 0;
} else {
for (i=0; NULL != tokens[i]; i++) {
(value.num_tokens)++;
}
}
keyval.key = key;
/* assemble the trigger object - all we have here is the name*/
trigs = &trig;
trig.name = trig_name;
/* send the subscription */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
ORTE_ERROR_LOG(rc);
if (NULL == trig_name) { /* no trigger provided */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 0, NULL))) {
ORTE_ERROR_LOG(rc);
}
} else {
trigs = &trig;
trig.name = trig_name;
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
ORTE_ERROR_LOG(rc);
}
}
/* no memory to cleanup since we didn't allocate anything */
@ -146,14 +162,28 @@ int orte_gpr_base_subscribe_N(orte_gpr_subscription_id_t *id,
}
value.tokens = tokens;
/* must count the number of tokens */
if (NULL == tokens) {
value.num_tokens = 0;
} else {
for (i=0; NULL != tokens[i]; i++) {
(value.num_tokens)++;
}
}
/* assemble the trigger object - all we have here is the name */
trigs = &trig;
trig.name = trig_name;
/* send the subscription */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
ORTE_ERROR_LOG(rc);
if (NULL == trig_name) { /* no trigger provided */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 0, NULL))) {
ORTE_ERROR_LOG(rc);
}
} else {
trigs = &trig;
trig.name = trig_name;
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
ORTE_ERROR_LOG(rc);
}
}
/* clean up memory - very carefully!
@ -193,6 +223,7 @@ int orte_gpr_base_define_trigger(orte_gpr_trigger_id_t *id,
int rc;
/* assemble the trigger object */
trigs = &trig;
trig.name = trig_name;
trig.action = action;
trig.cnt = 1;
@ -222,6 +253,14 @@ int orte_gpr_base_define_trigger(orte_gpr_trigger_id_t *id,
}
value.tokens = tokens;
/* must count the number of tokens */
if (NULL == tokens) {
value.num_tokens = 0;
} else {
for (i=0; NULL != tokens[i]; i++) {
(value.num_tokens)++;
}
}
/* send the subscription */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(0, NULL, 1, &trigs))) {

Просмотреть файл

@ -42,11 +42,13 @@ int orte_gpr_base_pack_put(orte_buffer_t *cmd,
command = ORTE_GPR_PUT_CMD;
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, values, cnt, ORTE_GPR_VALUE))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
@ -65,15 +67,18 @@ int orte_gpr_base_pack_get(orte_buffer_t *cmd,
command = ORTE_GPR_GET_CMD;
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &mode, 1, ORTE_GPR_ADDR_MODE))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &segment, 1, ORTE_STRING))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
/* compute number of tokens */
@ -87,12 +92,14 @@ int orte_gpr_base_pack_get(orte_buffer_t *cmd,
/* pack number of tokens */
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &n, 1, ORTE_SIZE))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
if (n > 0) {
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, tokens, n, ORTE_STRING))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
}
@ -107,12 +114,14 @@ int orte_gpr_base_pack_get(orte_buffer_t *cmd,
/* pack number of keys */
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &n, 1, ORTE_SIZE))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
if (n > 0) {
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, keys, n, ORTE_STRING))) {
return rc;
ORTE_ERROR_LOG(rc);
return rc;
}
}

Просмотреть файл

@ -44,17 +44,28 @@ int orte_gpr_base_pack_subscribe(orte_buffer_t *cmd,
command = ORTE_GPR_SUBSCRIBE_CMD;
/* can't be both NULL */
if (NULL == subscriptions && NULL == trigs) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* there MUST be some subscriptions, so don't have to check that here - it was checked on the API
* itself.
*/
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, subscriptions, num_subs, ORTE_GPR_SUBSCRIPTION))) {
ORTE_ERROR_LOG(rc);
return rc;
/* see if there are subscriptions - if so, pack them */
if (NULL != subscriptions) {
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, subscriptions, num_subs, ORTE_GPR_SUBSCRIPTION))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else {
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &zero, 1, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* the API DOES allow there to be no triggers - if that happens, then trigs will be NULL and num_trigs

Просмотреть файл

@ -49,7 +49,7 @@ orte_gpr_proxy_subscribe(size_t num_subs,
size_t i;
/* need to protect against errors */
if (NULL == subscriptions) {
if (NULL == subscriptions && NULL == trigs) { /* need at least one */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
@ -60,19 +60,23 @@ orte_gpr_proxy_subscribe(size_t num_subs,
* generate id_tag to send to replica to identify lookup entry
* for each subscription
*/
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_subscription(
num_subs, subscriptions))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
if (NULL != subscriptions) {
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_subscription(
num_subs, subscriptions))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
}
/* if any triggers were provided, get id tags for them */
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_trigger(
num_trigs, trigs))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
if (NULL != trigs) {
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_trigger(
num_trigs, trigs))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
}
/* check for compound cmd mode - if on, just pack the info into the

Просмотреть файл

@ -41,7 +41,7 @@ orte_gpr_replica_subscribe(size_t num_subs,
int rc;
/* protect against errors */
if (NULL == subscriptions) {
if (NULL == subscriptions && NULL == trigs) { /* need at least one */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
@ -53,21 +53,25 @@ orte_gpr_replica_subscribe(size_t num_subs,
* for each subscription - the subscription id is returned
* inside the subscription objects
*/
if (ORTE_SUCCESS != (rc = orte_gpr_replica_enter_local_subscription(
num_subs, subscriptions))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc;
if (NULL != subscriptions) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_enter_local_subscription(
num_subs, subscriptions))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc;
}
}
/* if any triggers were provided, get id tags for them - the
* idtags are returned inside the trigger objects
*/
if (ORTE_SUCCESS != (rc = orte_gpr_replica_enter_local_trigger(
num_trigs, trigs))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc;
if (NULL != trigs) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_enter_local_trigger(
num_trigs, trigs))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc;
}
}
/* register subscriptions */

Просмотреть файл

@ -53,7 +53,28 @@ int orte_gpr_replica_recv_subscribe_cmd(orte_process_name_t* sender,
goto RETURN_ERROR;
}
if (0 < n) {
/* if the original command did not provide any subscriptions, then we put a size_t value in the buffer of "zero"
* to avoid causing buffer problems. thus, we need to check to see if the type is size_t vs subscription vs
* something else. if it is trigger, then we need the number to be greater than 0, which should always
* be true (we check it just to be safe). if it is size_t, then the value should be zero - anything else
* generates an error.
*/
if (ORTE_SIZE == type) {
/* this case means that there were no subscriptions, so we need to clear the value from the buffer
* and continue on
*/
n=1;
if (ORTE_SUCCESS != orte_dps.unpack(input_buffer, &num_subs, &n, ORTE_SIZE)) {
ORTE_ERROR_LOG(rc);
goto RETURN_ERROR;
}
/* if the returned number of subscriptions isn't zero, then we have a problem */
if (0 != num_subs) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
rc = ORTE_ERR_COMM_FAILURE;
goto RETURN_ERROR;
}
} else if (ORTE_GPR_SUBSCRIPTION == type && 0 < n) {
/* create the space for the subscriptions */
subscriptions = (orte_gpr_subscription_t**)malloc(n * sizeof(orte_gpr_subscription_t*));
if (NULL == subscriptions) {
@ -66,8 +87,15 @@ int orte_gpr_replica_recv_subscribe_cmd(orte_process_name_t* sender,
ORTE_ERROR_LOG(rc);
goto RETURN_ERROR;
}
num_subs = n;
} else {
/* we must have an error condition - it either wasn't the right type, or we had the type okay
* but don't have a good number of elements. report the error and move on
*/
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
rc = ORTE_ERR_COMM_FAILURE;
goto RETURN_ERROR;
}
num_subs = n;
if (ORTE_SUCCESS != (rc = orte_dps.peek(input_buffer, &type, &n))) {
ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -419,9 +419,14 @@ static void orte_gpr_replica_dump_trigger(orte_buffer_t *buffer, size_t cnt,
if (NULL == trig->master) {
sprintf(tmp_out, "\tNO MASTER registered");
} else {
sprintf(tmp_out, "\tTRIGGER MASTER: [%lu,%lu,%lu]@idtag %lu",
ORTE_NAME_ARGS(trig->master->requestor),
(unsigned long)trig->master->idtag);
if (NULL == trig->master->requestor) {
sprintf(tmp_out, "\tTRIGGER MASTER: LOCAL@idtag %lu",
(unsigned long)trig->master->idtag);
} else {
sprintf(tmp_out, "\tTRIGGER MASTER: [%lu,%lu,%lu]@idtag %lu",
ORTE_NAME_ARGS(trig->master->requestor),
(unsigned long)trig->master->idtag);
}
}
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
@ -547,10 +552,10 @@ static void orte_gpr_replica_dump_subscription(orte_buffer_t *buffer,
tmp = tmp_out;
if (NULL == sub->name) {
sprintf(tmp, "\t\tSubscription %lu: UNNAMED",
sprintf(tmp, "\nSubscription %lu: UNNAMED",
(unsigned long) sub->index);
} else {
sprintf(tmp, "\t\tSubscription %lu name %s",
sprintf(tmp, "\nSubscription %lu name %s",
(unsigned long) sub->index,
sub->name);
}

Просмотреть файл

@ -78,6 +78,9 @@ orte_gpr_replica_enter_local_trigger(size_t cnt, orte_gpr_trigger_t **trigs)
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (NULL != trigs[i]->name) {
trig->name = strdup(trigs[i]->name);
}
/* ensure that the proper routing flag is set
* in the action field to match the trigger callback
* function

Просмотреть файл

@ -56,8 +56,6 @@
/* output files needed by the test */
static FILE *test_out=NULL;
static char *cmd_str="diff ./test_gpr_replica_out ./test_gpr_replica_out_std";
int main(int argc, char **argv)
{
int rc;
@ -67,7 +65,7 @@ int main(int argc, char **argv)
orte_gpr_value_union_t value, values[5];
if (getenv("TEST_WRITE_TO_FILE") != NULL) {
test_out = fopen( "test_gpr_replica_out", "w+" );
test_out = fopen( "test_gpr_quick_put", "w+" );
} else {
test_out = stderr;
}
@ -77,104 +75,8 @@ int main(int argc, char **argv)
exit(1);
}
/* ENSURE THE REPLICA IS ISOLATED */
setenv("OMPI_MCA_gpr_replica_isolate", "1", 1);
orte_init();
/* Open up the output streams */
if (!opal_output_init()) {
return OMPI_ERROR;
}
/*
* If threads are supported - assume that we are using threads -
* and reset otherwise.
*/
opal_set_using_threads(OMPI_HAVE_THREAD_SUPPORT);
/* For malloc debugging */
opal_malloc_init();
/* Ensure the system_info structure is instantiated and initialized */
if (ORTE_SUCCESS != (rc = orte_sys_info())) {
return rc;
}
/* Ensure the process info structure is instantiated and initialized */
if (ORTE_SUCCESS != (rc = orte_proc_info())) {
return rc;
}
orte_process_info.seed = true;
orte_process_info.my_name = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
orte_process_info.my_name->cellid = 0;
orte_process_info.my_name->jobid = 0;
orte_process_info.my_name->vpid = 0;
/* startup the MCA */
if (OMPI_SUCCESS == mca_base_open()) {
fprintf(test_out, "MCA started\n");
} else {
fprintf(test_out, "MCA could not start\n");
exit (1);
}
/* startup the dps */
if (OMPI_SUCCESS == orte_dps_open()) {
fprintf(test_out, "DPS started\n");
} else {
fprintf(test_out, "DPS could not start\n");
exit (1);
}
/* startup the name services */
if (OMPI_SUCCESS == orte_ns_base_open()) {
fprintf(test_out, "NS started\n");
} else {
fprintf(test_out, "NS could not start\n");
exit (1);
}
/* startup the soh */
if (OMPI_SUCCESS == orte_soh_base_open()) {
fprintf(test_out, "SOH started\n");
} else {
fprintf(test_out, "SOH could not start\n");
exit (1);
}
/* startup the rmgr */
if (OMPI_SUCCESS == orte_rmgr_base_open()) {
fprintf(test_out, "RMGR started\n");
} else {
fprintf(test_out, "RMGR could not start\n");
exit (1);
}
/* startup the schema */
if (OMPI_SUCCESS == orte_schema_base_open()) {
fprintf(test_out, "SCHEMA started\n");
} else {
fprintf(test_out, "SCHEMA could not start\n");
exit (1);
}
/* startup the registry */
if (OMPI_SUCCESS == orte_gpr_base_open()) {
fprintf(test_out, "GPR started\n");
} else {
fprintf(test_out, "GPR could not start\n");
exit (1);
}
/* do a select on the registry components */
if (OMPI_SUCCESS == orte_gpr_base_select()) {
fprintf(test_out, "GPR selected\n");
} else {
fprintf(test_out, "GPR could not select\n");
exit (1);
}
tokens[0] = strdup("test-token-1");
tokens[1] = strdup("test-token-2");
tokens[2] = NULL;
@ -214,13 +116,7 @@ int main(int argc, char **argv)
orte_gpr.dump_segment(NULL, 0);
fprintf(stderr, "now finalize and see if all memory cleared\n");
orte_dps_close();
orte_sys_info_finalize();
orte_proc_info_finalize();
mca_base_close();
opal_malloc_finalize();
opal_output_finalize();
opal_class_finalize();
orte_finalize();
fclose( test_out );
/* result = system( cmd_str );
@ -231,7 +127,7 @@ int main(int argc, char **argv)
test_failure( "test_gpr_replica failed");
}
*/
unlink("test_gpr_replica_out");
unlink("test_gpr_quick_put");
return(0);
}

242
test/mca/gpr/gpr_quick_triggers.c Обычный файл
Просмотреть файл

@ -0,0 +1,242 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
* The Open MPI general purpose registry - unit test
*
*/
/*
* includes
*/
#include "orte_config.h"
#include <stdio.h>
#include <string.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "include/orte_constants.h"
#include "mca/schema/schema.h"
#include "mca/schema/base/base.h"
#include "mca/ns/base/base.h"
#include "mca/soh/base/base.h"
#include "mca/rmgr/base/base.h"
#include "support.h"
#include "class/orte_pointer_array.h"
#include "dps/dps.h"
#include "runtime/runtime.h"
#include "util/proc_info.h"
#include "util/sys_info.h"
#include "opal/util/malloc.h"
#include "opal/util/output.h"
#include "mca/gpr/base/base.h"
#include "mca/gpr/replica/api_layer/gpr_replica_api.h"
#include "mca/gpr/replica/functional_layer/gpr_replica_fn.h"
#include "mca/gpr/replica/communications/gpr_replica_comm.h"
#include "mca/gpr/replica/transition_layer/gpr_replica_tl.h"
/* output files needed by the test */
static FILE *test_out=NULL;
static void test_cbfunc1(orte_gpr_notify_data_t *data, void *user_tag);
static void test_cbfunc2(orte_gpr_notify_message_t *msg, void *user_tag);
int main(int argc, char **argv)
{
int rc;
orte_gpr_subscription_id_t id;
orte_gpr_trigger_id_t id2;
size_t i;
char *tokens[5], *keys[5];
orte_data_type_t types[5];
orte_gpr_value_union_t value, values[5];
if (getenv("TEST_WRITE_TO_FILE") != NULL) {
test_out = fopen( "test_gpr_quick_triggers", "w+" );
} else {
test_out = stderr;
}
if( test_out == NULL ) {
test_failure("gpr_test couldn't open test file failed");
test_finalize();
exit(1);
}
/* start things up */
orte_init();
tokens[0] = strdup("test-token-1");
tokens[1] = strdup("test-token-2");
tokens[2] = NULL;
value.i32 = 123456;
fprintf(stderr, "quick-subscribe one value with single key\n");
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id,
NULL,
NULL,
ORTE_GPR_NOTIFY_VALUE_CHG,
ORTE_GPR_TOKENS_AND,
"test-put-segment", tokens,
"test-key", test_cbfunc1, NULL))) {
fprintf(test_out, "gpr_test: subscribe of 1 key no names failed with error code %s\n",
ORTE_ERROR_NAME(rc));
return rc;
} else {
fprintf(test_out, "gpr_test: subscribe of 1 key no names passed\n");
}
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id,
NULL,
"orte-std-subscription-single1",
ORTE_GPR_NOTIFY_VALUE_CHG,
ORTE_GPR_TOKENS_AND,
"test-put-segment", tokens,
"test-key", test_cbfunc1, NULL))) {
fprintf(test_out, "gpr_test: subscribe of 1 key sub name failed with error code %s\n",
ORTE_ERROR_NAME(rc));
return rc;
} else {
fprintf(test_out, "gpr_test: subscribe of 1 key sub name passed\n");
}
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id,
"orte-std-trigger-single",
"orte-std-subscription-single2",
ORTE_GPR_NOTIFY_VALUE_CHG,
ORTE_GPR_TOKENS_AND,
"test-put-segment", tokens,
"test-key", test_cbfunc1, NULL))) {
fprintf(test_out, "gpr_test: subscribe of 1 key trig and sub names failed with error code %s\n",
ORTE_ERROR_NAME(rc));
return rc;
} else {
fprintf(test_out, "gpr_test: subscribe of 1 key trig and sub names passed\n");
}
free(tokens[0]);
free(tokens[1]);
for (i=0; i < 4; i++) {
asprintf(&tokens[i], "test-token-%lu", (unsigned long)i);
asprintf(&keys[i], "test-keys-%lu", (unsigned long)i);
types[i] = ORTE_INT16;
values[i].i16 = i * 1000;
}
tokens[4] = NULL;
keys[4] = NULL;
fprintf(stderr, "quick-subscribe one value with multiple keyvals\n");
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&id,
NULL,
NULL,
ORTE_GPR_NOTIFY_VALUE_CHG,
ORTE_GPR_TOKENS_AND,
"test-put-segment", tokens,
4, keys, test_cbfunc1, NULL))) {
fprintf(test_out, "gpr_test: subscribe of multi key no names failed with error code %s\n",
ORTE_ERROR_NAME(rc));
return rc;
} else {
fprintf(test_out, "gpr_test: subscribe of multi key no names passed\n");
}
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&id,
NULL,
"orte-std-subscription-multi",
ORTE_GPR_NOTIFY_VALUE_CHG,
ORTE_GPR_TOKENS_AND,
"test-put-segment", tokens,
4, keys, test_cbfunc1, NULL))) {
fprintf(test_out, "gpr_test: subscribe of multi key sub name failed with error code %s\n",
ORTE_ERROR_NAME(rc));
return rc;
} else {
fprintf(test_out, "gpr_test: subscribe of multi key sub name passed\n");
}
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&id,
"orte-std-trigger-multi",
"orte-std-subscription-multi2",
ORTE_GPR_NOTIFY_VALUE_CHG,
ORTE_GPR_TOKENS_AND,
"test-put-segment", tokens,
4, keys, test_cbfunc1, NULL))) {
fprintf(test_out, "gpr_test: subscribe of multi key trig and sub names failed with error code %s\n",
ORTE_ERROR_NAME(rc));
return rc;
} else {
fprintf(test_out, "gpr_test: subscribe of multi key trig and sub names passed\n");
}
fprintf(stderr, "quick-define-trigger\n");
if (ORTE_SUCCESS != (rc = orte_gpr.define_trigger(&id2,
NULL,
ORTE_GPR_TRIG_INCLUDE_TRIG_CNTRS,
ORTE_GPR_TOKENS_AND,
"test-put-segment", tokens,
4, keys, test_cbfunc2, NULL))) {
fprintf(test_out, "gpr_test: define_trigger of multi key no names failed with error code %s\n",
ORTE_ERROR_NAME(rc));
return rc;
} else {
fprintf(test_out, "gpr_test: subscribe of multi key no names passed\n");
}
if (ORTE_SUCCESS != (rc = orte_gpr.define_trigger(&id,
"orte-std-trigger",
ORTE_GPR_TRIG_INCLUDE_TRIG_CNTRS,
ORTE_GPR_TOKENS_AND,
"test-put-segment", tokens,
4, keys, test_cbfunc2, NULL))) {
fprintf(test_out, "gpr_test: define_trigger of multi key sub name failed with error code %s\n",
ORTE_ERROR_NAME(rc));
return rc;
} else {
fprintf(test_out, "gpr_test: define_trigger of multi key sub name passed\n");
}
orte_gpr.dump_subscriptions(0);
orte_gpr.dump_triggers(0);
orte_gpr.dump_local_subscriptions(0);
orte_gpr.dump_local_triggers(0);
fprintf(stderr, "now finalize and see if all memory cleared\n");
orte_finalize();
fclose( test_out );
/* result = system( cmd_str );
if( result == 0 ) {
test_success();
}
else {
test_failure( "test_gpr_replica failed");
}
*/
unlink("test_gpr_quick_triggers");
return(0);
}
void test_cbfunc1(orte_gpr_notify_data_t *data, void *tag)
{
fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 1\n");
orte_gpr.dump_notify_data(data, 0);
}
void test_cbfunc2(orte_gpr_notify_message_t *msg, void *tag)
{
fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 2\n");
orte_gpr.dump_notify_msg(msg, 0);
}