From 4e1837687be0a7c90f615b4b048bc1969e823270 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 2 Aug 2005 19:43:29 +0000 Subject: [PATCH] Finish simplified interfaces for put and subscribe - more details to come. This commit was SVN r6713. --- orte/mca/gpr/base/gpr_base_simplified_put.c | 17 ++ .../gpr/base/gpr_base_simplified_subscribe.c | 63 ++++- .../base/pack_api_cmd/gpr_base_pack_put_get.c | 27 +- .../pack_api_cmd/gpr_base_pack_subscribe.c | 23 +- orte/mca/gpr/proxy/gpr_proxy_subscribe.c | 26 +- .../api_layer/gpr_replica_subscribe_api.c | 26 +- .../communications/gpr_replica_subscribe_cm.c | 32 ++- .../functional_layer/gpr_replica_dump_fn.c | 15 +- .../gpr_replica_local_trig_ops_fn.c | 3 + test/mca/gpr/gpr_quick_put.c | 112 +------- test/mca/gpr/gpr_quick_triggers.c | 242 ++++++++++++++++++ 11 files changed, 422 insertions(+), 164 deletions(-) create mode 100644 test/mca/gpr/gpr_quick_triggers.c diff --git a/orte/mca/gpr/base/gpr_base_simplified_put.c b/orte/mca/gpr/base/gpr_base_simplified_put.c index 6f1fb940d1..d87459e7c4 100644 --- a/orte/mca/gpr/base/gpr_base_simplified_put.c +++ b/orte/mca/gpr/base/gpr_base_simplified_put.c @@ -42,6 +42,7 @@ int orte_gpr_base_put_1(orte_gpr_addr_mode_t addr_mode, orte_gpr_keyval_t keyval = { {OBJ_CLASS(opal_object_t),0}, NULL, 0 }; + size_t i; int rc; value.addr_mode = addr_mode; @@ -54,6 +55,14 @@ int orte_gpr_base_put_1(orte_gpr_addr_mode_t addr_mode, keyval.value = data_value; value.tokens = tokens; + /* must count the number of tokens */ + if (NULL == tokens) { + value.num_tokens = 0; + } else { + for (i=0; NULL != tokens[i]; i++) { + (value.num_tokens)++; + } + } values = &value; /* put the value on the registry */ @@ -102,6 +111,14 @@ int orte_gpr_base_put_N(orte_gpr_addr_mode_t addr_mode, } value.tokens = tokens; + /* must count the number of tokens */ + if (NULL == tokens) { + value.num_tokens = 0; + } else { + for (i=0; NULL != tokens[i]; i++) { + (value.num_tokens)++; + } + } values = &value; /* put the value on the registry */ diff --git a/orte/mca/gpr/base/gpr_base_simplified_subscribe.c b/orte/mca/gpr/base/gpr_base_simplified_subscribe.c index 360de6a87e..20eda52f5e 100644 --- a/orte/mca/gpr/base/gpr_base_simplified_subscribe.c +++ b/orte/mca/gpr/base/gpr_base_simplified_subscribe.c @@ -52,6 +52,7 @@ int orte_gpr_base_subscribe_1(orte_gpr_subscription_id_t *id, orte_gpr_trigger_t *trigs; orte_gpr_trigger_t trig = { {OBJ_CLASS(opal_object_t),0}, NULL, 0, 0, 0, NULL, 0, NULL }; + size_t i; int rc; /* assemble the subscription object */ @@ -68,17 +69,32 @@ int orte_gpr_base_subscribe_1(orte_gpr_subscription_id_t *id, value.cnt = 1; keyvals = &keyval; value.keyvals = &keyvals; + value.tokens = tokens; + /* must count the number of tokens */ + if (NULL == tokens) { + value.num_tokens = 0; + } else { + for (i=0; NULL != tokens[i]; i++) { + (value.num_tokens)++; + } + } keyval.key = key; - /* assemble the trigger object - all we have here is the name*/ - trigs = &trig; - trig.name = trig_name; - /* send the subscription */ - if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { - ORTE_ERROR_LOG(rc); + if (NULL == trig_name) { /* no trigger provided */ + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 0, NULL))) { + ORTE_ERROR_LOG(rc); + } + + } else { + trigs = &trig; + trig.name = trig_name; + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { + ORTE_ERROR_LOG(rc); + } + } /* no memory to cleanup since we didn't allocate anything */ @@ -146,14 +162,28 @@ int orte_gpr_base_subscribe_N(orte_gpr_subscription_id_t *id, } value.tokens = tokens; + /* must count the number of tokens */ + if (NULL == tokens) { + value.num_tokens = 0; + } else { + for (i=0; NULL != tokens[i]; i++) { + (value.num_tokens)++; + } + } - /* assemble the trigger object - all we have here is the name */ - trigs = &trig; - trig.name = trig_name; - /* send the subscription */ - if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { - ORTE_ERROR_LOG(rc); + if (NULL == trig_name) { /* no trigger provided */ + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 0, NULL))) { + ORTE_ERROR_LOG(rc); + } + + } else { + trigs = &trig; + trig.name = trig_name; + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { + ORTE_ERROR_LOG(rc); + } + } /* clean up memory - very carefully! @@ -193,6 +223,7 @@ int orte_gpr_base_define_trigger(orte_gpr_trigger_id_t *id, int rc; /* assemble the trigger object */ + trigs = &trig; trig.name = trig_name; trig.action = action; trig.cnt = 1; @@ -222,6 +253,14 @@ int orte_gpr_base_define_trigger(orte_gpr_trigger_id_t *id, } value.tokens = tokens; + /* must count the number of tokens */ + if (NULL == tokens) { + value.num_tokens = 0; + } else { + for (i=0; NULL != tokens[i]; i++) { + (value.num_tokens)++; + } + } /* send the subscription */ if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(0, NULL, 1, &trigs))) { diff --git a/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_put_get.c b/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_put_get.c index 45338032b0..84ca33cfc9 100644 --- a/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_put_get.c +++ b/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_put_get.c @@ -42,11 +42,13 @@ int orte_gpr_base_pack_put(orte_buffer_t *cmd, command = ORTE_GPR_PUT_CMD; if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, values, cnt, ORTE_GPR_VALUE))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } return ORTE_SUCCESS; @@ -65,15 +67,18 @@ int orte_gpr_base_pack_get(orte_buffer_t *cmd, command = ORTE_GPR_GET_CMD; if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &mode, 1, ORTE_GPR_ADDR_MODE))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &segment, 1, ORTE_STRING))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } /* compute number of tokens */ @@ -87,12 +92,14 @@ int orte_gpr_base_pack_get(orte_buffer_t *cmd, /* pack number of tokens */ if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &n, 1, ORTE_SIZE))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } if (n > 0) { if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, tokens, n, ORTE_STRING))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } } @@ -107,12 +114,14 @@ int orte_gpr_base_pack_get(orte_buffer_t *cmd, /* pack number of keys */ if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &n, 1, ORTE_SIZE))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } if (n > 0) { if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, keys, n, ORTE_STRING))) { - return rc; + ORTE_ERROR_LOG(rc); + return rc; } } diff --git a/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_subscribe.c b/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_subscribe.c index b3304b1344..c543401384 100644 --- a/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_subscribe.c +++ b/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_subscribe.c @@ -44,17 +44,28 @@ int orte_gpr_base_pack_subscribe(orte_buffer_t *cmd, command = ORTE_GPR_SUBSCRIBE_CMD; + /* can't be both NULL */ + if (NULL == subscriptions && NULL == trigs) { + ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); + return ORTE_ERR_BAD_PARAM; + } + if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD))) { ORTE_ERROR_LOG(rc); return rc; } - /* there MUST be some subscriptions, so don't have to check that here - it was checked on the API - * itself. - */ - if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, subscriptions, num_subs, ORTE_GPR_SUBSCRIPTION))) { - ORTE_ERROR_LOG(rc); - return rc; + /* see if there are subscriptions - if so, pack them */ + if (NULL != subscriptions) { + if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, subscriptions, num_subs, ORTE_GPR_SUBSCRIPTION))) { + ORTE_ERROR_LOG(rc); + return rc; + } + } else { + if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &zero, 1, ORTE_SIZE))) { + ORTE_ERROR_LOG(rc); + return rc; + } } /* the API DOES allow there to be no triggers - if that happens, then trigs will be NULL and num_trigs diff --git a/orte/mca/gpr/proxy/gpr_proxy_subscribe.c b/orte/mca/gpr/proxy/gpr_proxy_subscribe.c index 83f4fb14ec..9956a95764 100644 --- a/orte/mca/gpr/proxy/gpr_proxy_subscribe.c +++ b/orte/mca/gpr/proxy/gpr_proxy_subscribe.c @@ -49,7 +49,7 @@ orte_gpr_proxy_subscribe(size_t num_subs, size_t i; /* need to protect against errors */ - if (NULL == subscriptions) { + if (NULL == subscriptions && NULL == trigs) { /* need at least one */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); return ORTE_ERR_BAD_PARAM; } @@ -60,19 +60,23 @@ orte_gpr_proxy_subscribe(size_t num_subs, * generate id_tag to send to replica to identify lookup entry * for each subscription */ - if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_subscription( - num_subs, subscriptions))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex); - return rc; + if (NULL != subscriptions) { + if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_subscription( + num_subs, subscriptions))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex); + return rc; + } } /* if any triggers were provided, get id tags for them */ - if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_trigger( - num_trigs, trigs))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex); - return rc; + if (NULL != trigs) { + if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_trigger( + num_trigs, trigs))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex); + return rc; + } } /* check for compound cmd mode - if on, just pack the info into the diff --git a/orte/mca/gpr/replica/api_layer/gpr_replica_subscribe_api.c b/orte/mca/gpr/replica/api_layer/gpr_replica_subscribe_api.c index bb00624013..a2f011b0b1 100644 --- a/orte/mca/gpr/replica/api_layer/gpr_replica_subscribe_api.c +++ b/orte/mca/gpr/replica/api_layer/gpr_replica_subscribe_api.c @@ -41,7 +41,7 @@ orte_gpr_replica_subscribe(size_t num_subs, int rc; /* protect against errors */ - if (NULL == subscriptions) { + if (NULL == subscriptions && NULL == trigs) { /* need at least one */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); return ORTE_ERR_BAD_PARAM; } @@ -53,21 +53,25 @@ orte_gpr_replica_subscribe(size_t num_subs, * for each subscription - the subscription id is returned * inside the subscription objects */ - if (ORTE_SUCCESS != (rc = orte_gpr_replica_enter_local_subscription( - num_subs, subscriptions))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); - return rc; + if (NULL != subscriptions) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_enter_local_subscription( + num_subs, subscriptions))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); + return rc; + } } /* if any triggers were provided, get id tags for them - the * idtags are returned inside the trigger objects */ - if (ORTE_SUCCESS != (rc = orte_gpr_replica_enter_local_trigger( - num_trigs, trigs))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); - return rc; + if (NULL != trigs) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_enter_local_trigger( + num_trigs, trigs))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); + return rc; + } } /* register subscriptions */ diff --git a/orte/mca/gpr/replica/communications/gpr_replica_subscribe_cm.c b/orte/mca/gpr/replica/communications/gpr_replica_subscribe_cm.c index 13defee553..5400d1f11f 100644 --- a/orte/mca/gpr/replica/communications/gpr_replica_subscribe_cm.c +++ b/orte/mca/gpr/replica/communications/gpr_replica_subscribe_cm.c @@ -53,7 +53,28 @@ int orte_gpr_replica_recv_subscribe_cmd(orte_process_name_t* sender, goto RETURN_ERROR; } - if (0 < n) { + /* if the original command did not provide any subscriptions, then we put a size_t value in the buffer of "zero" + * to avoid causing buffer problems. thus, we need to check to see if the type is size_t vs subscription vs + * something else. if it is trigger, then we need the number to be greater than 0, which should always + * be true (we check it just to be safe). if it is size_t, then the value should be zero - anything else + * generates an error. + */ + if (ORTE_SIZE == type) { + /* this case means that there were no subscriptions, so we need to clear the value from the buffer + * and continue on + */ + n=1; + if (ORTE_SUCCESS != orte_dps.unpack(input_buffer, &num_subs, &n, ORTE_SIZE)) { + ORTE_ERROR_LOG(rc); + goto RETURN_ERROR; + } + /* if the returned number of subscriptions isn't zero, then we have a problem */ + if (0 != num_subs) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + rc = ORTE_ERR_COMM_FAILURE; + goto RETURN_ERROR; + } + } else if (ORTE_GPR_SUBSCRIPTION == type && 0 < n) { /* create the space for the subscriptions */ subscriptions = (orte_gpr_subscription_t**)malloc(n * sizeof(orte_gpr_subscription_t*)); if (NULL == subscriptions) { @@ -66,8 +87,15 @@ int orte_gpr_replica_recv_subscribe_cmd(orte_process_name_t* sender, ORTE_ERROR_LOG(rc); goto RETURN_ERROR; } + num_subs = n; + } else { + /* we must have an error condition - it either wasn't the right type, or we had the type okay + * but don't have a good number of elements. report the error and move on + */ + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + rc = ORTE_ERR_COMM_FAILURE; + goto RETURN_ERROR; } - num_subs = n; if (ORTE_SUCCESS != (rc = orte_dps.peek(input_buffer, &type, &n))) { ORTE_ERROR_LOG(rc); diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c index 71a767b17e..f46853401c 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c @@ -419,9 +419,14 @@ static void orte_gpr_replica_dump_trigger(orte_buffer_t *buffer, size_t cnt, if (NULL == trig->master) { sprintf(tmp_out, "\tNO MASTER registered"); } else { - sprintf(tmp_out, "\tTRIGGER MASTER: [%lu,%lu,%lu]@idtag %lu", - ORTE_NAME_ARGS(trig->master->requestor), - (unsigned long)trig->master->idtag); + if (NULL == trig->master->requestor) { + sprintf(tmp_out, "\tTRIGGER MASTER: LOCAL@idtag %lu", + (unsigned long)trig->master->idtag); + } else { + sprintf(tmp_out, "\tTRIGGER MASTER: [%lu,%lu,%lu]@idtag %lu", + ORTE_NAME_ARGS(trig->master->requestor), + (unsigned long)trig->master->idtag); + } } orte_gpr_replica_dump_load_string(buffer, &tmp_out); @@ -547,10 +552,10 @@ static void orte_gpr_replica_dump_subscription(orte_buffer_t *buffer, tmp = tmp_out; if (NULL == sub->name) { - sprintf(tmp, "\t\tSubscription %lu: UNNAMED", + sprintf(tmp, "\nSubscription %lu: UNNAMED", (unsigned long) sub->index); } else { - sprintf(tmp, "\t\tSubscription %lu name %s", + sprintf(tmp, "\nSubscription %lu name %s", (unsigned long) sub->index, sub->name); } diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_local_trig_ops_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_local_trig_ops_fn.c index 0cab0c45a4..77330c09fc 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_local_trig_ops_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_local_trig_ops_fn.c @@ -78,6 +78,9 @@ orte_gpr_replica_enter_local_trigger(size_t cnt, orte_gpr_trigger_t **trigs) ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } + if (NULL != trigs[i]->name) { + trig->name = strdup(trigs[i]->name); + } /* ensure that the proper routing flag is set * in the action field to match the trigger callback * function diff --git a/test/mca/gpr/gpr_quick_put.c b/test/mca/gpr/gpr_quick_put.c index 3d82dc290e..91c33c813b 100644 --- a/test/mca/gpr/gpr_quick_put.c +++ b/test/mca/gpr/gpr_quick_put.c @@ -56,8 +56,6 @@ /* output files needed by the test */ static FILE *test_out=NULL; -static char *cmd_str="diff ./test_gpr_replica_out ./test_gpr_replica_out_std"; - int main(int argc, char **argv) { int rc; @@ -67,7 +65,7 @@ int main(int argc, char **argv) orte_gpr_value_union_t value, values[5]; if (getenv("TEST_WRITE_TO_FILE") != NULL) { - test_out = fopen( "test_gpr_replica_out", "w+" ); + test_out = fopen( "test_gpr_quick_put", "w+" ); } else { test_out = stderr; } @@ -77,104 +75,8 @@ int main(int argc, char **argv) exit(1); } - /* ENSURE THE REPLICA IS ISOLATED */ - setenv("OMPI_MCA_gpr_replica_isolate", "1", 1); + orte_init(); - /* Open up the output streams */ - if (!opal_output_init()) { - return OMPI_ERROR; - } - - /* - * If threads are supported - assume that we are using threads - - * and reset otherwise. - */ - opal_set_using_threads(OMPI_HAVE_THREAD_SUPPORT); - - /* For malloc debugging */ - opal_malloc_init(); - - /* Ensure the system_info structure is instantiated and initialized */ - if (ORTE_SUCCESS != (rc = orte_sys_info())) { - return rc; - } - - /* Ensure the process info structure is instantiated and initialized */ - if (ORTE_SUCCESS != (rc = orte_proc_info())) { - return rc; - } - - orte_process_info.seed = true; - orte_process_info.my_name = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); - orte_process_info.my_name->cellid = 0; - orte_process_info.my_name->jobid = 0; - orte_process_info.my_name->vpid = 0; - - /* startup the MCA */ - if (OMPI_SUCCESS == mca_base_open()) { - fprintf(test_out, "MCA started\n"); - } else { - fprintf(test_out, "MCA could not start\n"); - exit (1); - } - - /* startup the dps */ - if (OMPI_SUCCESS == orte_dps_open()) { - fprintf(test_out, "DPS started\n"); - } else { - fprintf(test_out, "DPS could not start\n"); - exit (1); - } - - /* startup the name services */ - if (OMPI_SUCCESS == orte_ns_base_open()) { - fprintf(test_out, "NS started\n"); - } else { - fprintf(test_out, "NS could not start\n"); - exit (1); - } - - /* startup the soh */ - if (OMPI_SUCCESS == orte_soh_base_open()) { - fprintf(test_out, "SOH started\n"); - } else { - fprintf(test_out, "SOH could not start\n"); - exit (1); - } - - /* startup the rmgr */ - if (OMPI_SUCCESS == orte_rmgr_base_open()) { - fprintf(test_out, "RMGR started\n"); - } else { - fprintf(test_out, "RMGR could not start\n"); - exit (1); - } - - /* startup the schema */ - if (OMPI_SUCCESS == orte_schema_base_open()) { - fprintf(test_out, "SCHEMA started\n"); - } else { - fprintf(test_out, "SCHEMA could not start\n"); - exit (1); - } - - /* startup the registry */ - if (OMPI_SUCCESS == orte_gpr_base_open()) { - fprintf(test_out, "GPR started\n"); - } else { - fprintf(test_out, "GPR could not start\n"); - exit (1); - } - - /* do a select on the registry components */ - if (OMPI_SUCCESS == orte_gpr_base_select()) { - fprintf(test_out, "GPR selected\n"); - } else { - fprintf(test_out, "GPR could not select\n"); - exit (1); - } - - tokens[0] = strdup("test-token-1"); tokens[1] = strdup("test-token-2"); tokens[2] = NULL; @@ -214,13 +116,7 @@ int main(int argc, char **argv) orte_gpr.dump_segment(NULL, 0); fprintf(stderr, "now finalize and see if all memory cleared\n"); - orte_dps_close(); - orte_sys_info_finalize(); - orte_proc_info_finalize(); - mca_base_close(); - opal_malloc_finalize(); - opal_output_finalize(); - opal_class_finalize(); + orte_finalize(); fclose( test_out ); /* result = system( cmd_str ); @@ -231,7 +127,7 @@ int main(int argc, char **argv) test_failure( "test_gpr_replica failed"); } */ - unlink("test_gpr_replica_out"); + unlink("test_gpr_quick_put"); return(0); } diff --git a/test/mca/gpr/gpr_quick_triggers.c b/test/mca/gpr/gpr_quick_triggers.c new file mode 100644 index 0000000000..bba849eeb3 --- /dev/null +++ b/test/mca/gpr/gpr_quick_triggers.c @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + * + * The Open MPI general purpose registry - unit test + * + */ + +/* + * includes + */ + +#include "orte_config.h" +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif + +#include "include/orte_constants.h" +#include "mca/schema/schema.h" +#include "mca/schema/base/base.h" +#include "mca/ns/base/base.h" +#include "mca/soh/base/base.h" +#include "mca/rmgr/base/base.h" + +#include "support.h" + +#include "class/orte_pointer_array.h" +#include "dps/dps.h" +#include "runtime/runtime.h" +#include "util/proc_info.h" +#include "util/sys_info.h" +#include "opal/util/malloc.h" +#include "opal/util/output.h" + +#include "mca/gpr/base/base.h" +#include "mca/gpr/replica/api_layer/gpr_replica_api.h" +#include "mca/gpr/replica/functional_layer/gpr_replica_fn.h" +#include "mca/gpr/replica/communications/gpr_replica_comm.h" +#include "mca/gpr/replica/transition_layer/gpr_replica_tl.h" + +/* output files needed by the test */ +static FILE *test_out=NULL; + +static void test_cbfunc1(orte_gpr_notify_data_t *data, void *user_tag); +static void test_cbfunc2(orte_gpr_notify_message_t *msg, void *user_tag); + +int main(int argc, char **argv) +{ + int rc; + orte_gpr_subscription_id_t id; + orte_gpr_trigger_id_t id2; + size_t i; + char *tokens[5], *keys[5]; + orte_data_type_t types[5]; + orte_gpr_value_union_t value, values[5]; + + if (getenv("TEST_WRITE_TO_FILE") != NULL) { + test_out = fopen( "test_gpr_quick_triggers", "w+" ); + } else { + test_out = stderr; + } + if( test_out == NULL ) { + test_failure("gpr_test couldn't open test file failed"); + test_finalize(); + exit(1); + } + + /* start things up */ + orte_init(); + + tokens[0] = strdup("test-token-1"); + tokens[1] = strdup("test-token-2"); + tokens[2] = NULL; + value.i32 = 123456; + fprintf(stderr, "quick-subscribe one value with single key\n"); + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id, + NULL, + NULL, + ORTE_GPR_NOTIFY_VALUE_CHG, + ORTE_GPR_TOKENS_AND, + "test-put-segment", tokens, + "test-key", test_cbfunc1, NULL))) { + fprintf(test_out, "gpr_test: subscribe of 1 key no names failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + return rc; + } else { + fprintf(test_out, "gpr_test: subscribe of 1 key no names passed\n"); + } + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id, + NULL, + "orte-std-subscription-single1", + ORTE_GPR_NOTIFY_VALUE_CHG, + ORTE_GPR_TOKENS_AND, + "test-put-segment", tokens, + "test-key", test_cbfunc1, NULL))) { + fprintf(test_out, "gpr_test: subscribe of 1 key sub name failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + return rc; + } else { + fprintf(test_out, "gpr_test: subscribe of 1 key sub name passed\n"); + } + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_1(&id, + "orte-std-trigger-single", + "orte-std-subscription-single2", + ORTE_GPR_NOTIFY_VALUE_CHG, + ORTE_GPR_TOKENS_AND, + "test-put-segment", tokens, + "test-key", test_cbfunc1, NULL))) { + fprintf(test_out, "gpr_test: subscribe of 1 key trig and sub names failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + return rc; + } else { + fprintf(test_out, "gpr_test: subscribe of 1 key trig and sub names passed\n"); + } + free(tokens[0]); + free(tokens[1]); + + for (i=0; i < 4; i++) { + asprintf(&tokens[i], "test-token-%lu", (unsigned long)i); + asprintf(&keys[i], "test-keys-%lu", (unsigned long)i); + types[i] = ORTE_INT16; + values[i].i16 = i * 1000; + } + tokens[4] = NULL; + keys[4] = NULL; + fprintf(stderr, "quick-subscribe one value with multiple keyvals\n"); + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&id, + NULL, + NULL, + ORTE_GPR_NOTIFY_VALUE_CHG, + ORTE_GPR_TOKENS_AND, + "test-put-segment", tokens, + 4, keys, test_cbfunc1, NULL))) { + fprintf(test_out, "gpr_test: subscribe of multi key no names failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + return rc; + } else { + fprintf(test_out, "gpr_test: subscribe of multi key no names passed\n"); + } + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&id, + NULL, + "orte-std-subscription-multi", + ORTE_GPR_NOTIFY_VALUE_CHG, + ORTE_GPR_TOKENS_AND, + "test-put-segment", tokens, + 4, keys, test_cbfunc1, NULL))) { + fprintf(test_out, "gpr_test: subscribe of multi key sub name failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + return rc; + } else { + fprintf(test_out, "gpr_test: subscribe of multi key sub name passed\n"); + } + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&id, + "orte-std-trigger-multi", + "orte-std-subscription-multi2", + ORTE_GPR_NOTIFY_VALUE_CHG, + ORTE_GPR_TOKENS_AND, + "test-put-segment", tokens, + 4, keys, test_cbfunc1, NULL))) { + fprintf(test_out, "gpr_test: subscribe of multi key trig and sub names failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + return rc; + } else { + fprintf(test_out, "gpr_test: subscribe of multi key trig and sub names passed\n"); + } + + fprintf(stderr, "quick-define-trigger\n"); + if (ORTE_SUCCESS != (rc = orte_gpr.define_trigger(&id2, + NULL, + ORTE_GPR_TRIG_INCLUDE_TRIG_CNTRS, + ORTE_GPR_TOKENS_AND, + "test-put-segment", tokens, + 4, keys, test_cbfunc2, NULL))) { + fprintf(test_out, "gpr_test: define_trigger of multi key no names failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + return rc; + } else { + fprintf(test_out, "gpr_test: subscribe of multi key no names passed\n"); + } + if (ORTE_SUCCESS != (rc = orte_gpr.define_trigger(&id, + "orte-std-trigger", + ORTE_GPR_TRIG_INCLUDE_TRIG_CNTRS, + ORTE_GPR_TOKENS_AND, + "test-put-segment", tokens, + 4, keys, test_cbfunc2, NULL))) { + fprintf(test_out, "gpr_test: define_trigger of multi key sub name failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + return rc; + } else { + fprintf(test_out, "gpr_test: define_trigger of multi key sub name passed\n"); + } + orte_gpr.dump_subscriptions(0); + orte_gpr.dump_triggers(0); + orte_gpr.dump_local_subscriptions(0); + orte_gpr.dump_local_triggers(0); + + fprintf(stderr, "now finalize and see if all memory cleared\n"); + orte_finalize(); + + fclose( test_out ); +/* result = system( cmd_str ); + if( result == 0 ) { + test_success(); + } + else { + test_failure( "test_gpr_replica failed"); + } +*/ + unlink("test_gpr_quick_triggers"); + + return(0); +} + +void test_cbfunc1(orte_gpr_notify_data_t *data, void *tag) +{ + fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 1\n"); + + orte_gpr.dump_notify_data(data, 0); +} + +void test_cbfunc2(orte_gpr_notify_message_t *msg, void *tag) +{ + fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 2\n"); + + orte_gpr.dump_notify_msg(msg, 0); +} +