diff --git a/orte/mca/gpr/base/Makefile.am b/orte/mca/gpr/base/Makefile.am index ab52961552..6ce836e2f5 100644 --- a/orte/mca/gpr/base/Makefile.am +++ b/orte/mca/gpr/base/Makefile.am @@ -32,8 +32,10 @@ libmca_gpr_base_la_SOURCES = \ gpr_base_open.c \ gpr_base_close.c \ gpr_base_select.c \ + gpr_base_simplified_put.c \ + gpr_base_simplified_subscribe.c \ gpr_base_xfer_payload.c \ - data_type_support/gpr_data_type_packing_fns.c \ + data_type_support/gpr_data_type_packing_fns.c \ data_type_support/gpr_data_type_unpacking_fns.c \ unpack_api_response/gpr_base_unpack_cleanup.c \ unpack_api_response/gpr_base_unpack_del_index.c \ diff --git a/orte/mca/gpr/base/base.h b/orte/mca/gpr/base/base.h index d02f1ebf8f..87ce0767b1 100644 --- a/orte/mca/gpr/base/base.h +++ b/orte/mca/gpr/base/base.h @@ -114,6 +114,54 @@ typedef uint8_t orte_gpr_cmd_flag_t; OMPI_DECLSPEC int orte_gpr_base_select(void); OMPI_DECLSPEC int orte_gpr_base_close(void); + OMPI_DECLSPEC int orte_gpr_base_put_1(orte_gpr_addr_mode_t addr_mode, + char *segment, char **tokens, + char *key, orte_data_type_t type, + orte_gpr_value_union_t value); + + + OMPI_DECLSPEC int orte_gpr_base_put_N(orte_gpr_addr_mode_t addr_mode, + char *segment, char **tokens, + size_t n, char **keys, + orte_data_type_t *types, + orte_gpr_value_union_t *data_values); + + OMPI_DECLSPEC int orte_gpr_base_subscribe_1(orte_gpr_subscription_id_t *id, + char *trig_name, + char *sub_name, + orte_gpr_notify_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + char *key, + orte_gpr_notify_cb_fn_t cbfunc, + void *user_tag); + + + OMPI_DECLSPEC int orte_gpr_base_subscribe_N(orte_gpr_subscription_id_t *id, + char *trig_name, + char *sub_name, + orte_gpr_notify_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + size_t n, + char **keys, + orte_gpr_notify_cb_fn_t cbfunc, + void *user_tag); + + + OMPI_DECLSPEC int orte_gpr_base_define_trigger(orte_gpr_trigger_id_t *id, + char *trig_name, + orte_gpr_trigger_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + size_t n, + char **keys, + orte_gpr_trigger_cb_fn_t cbfunc, + void *user_tag); + /* general usage functions */ OMPI_DECLSPEC int orte_gpr_base_pack_delete_segment(orte_buffer_t *cmd, char *segment); @@ -153,7 +201,7 @@ typedef uint8_t orte_gpr_cmd_flag_t; size_t *cnt, orte_gpr_value_t ***values); OMPI_DECLSPEC int orte_gpr_base_pack_dump_all(orte_buffer_t *cmd); - OMPI_DECLSPEC int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd); + OMPI_DECLSPEC int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd, char *segment); OMPI_DECLSPEC int orte_gpr_base_pack_dump_triggers(orte_buffer_t *cmd); OMPI_DECLSPEC int orte_gpr_base_pack_dump_subscriptions(orte_buffer_t *cmd); OMPI_DECLSPEC int orte_gpr_base_pack_dump_callbacks(orte_buffer_t *cmd); diff --git a/orte/mca/gpr/base/gpr_base_simplified_put.c b/orte/mca/gpr/base/gpr_base_simplified_put.c new file mode 100644 index 0000000000..6f1fb940d1 --- /dev/null +++ b/orte/mca/gpr/base/gpr_base_simplified_put.c @@ -0,0 +1,122 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file + * + */ + +#include "orte_config.h" +#include "orte/include/orte_constants.h" + +#include "opal/util/output.h" + +#include "orte/dps/dps.h" +#include "orte/mca/errmgr/errmgr.h" + +#include "orte/mca/gpr/base/base.h" + + +int orte_gpr_base_put_1(orte_gpr_addr_mode_t addr_mode, + char *segment, char **tokens, + char *key, orte_data_type_t type, + orte_gpr_value_union_t data_value) +{ + orte_gpr_value_t *values; + orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0}, + ORTE_GPR_TOKENS_AND, + NULL, 0, NULL, 0, NULL }; + orte_gpr_keyval_t *keyvals; + orte_gpr_keyval_t keyval = { {OBJ_CLASS(opal_object_t),0}, + NULL, + 0 }; + int rc; + + value.addr_mode = addr_mode; + value.segment = segment; + value.cnt = 1; + keyvals = &keyval; + value.keyvals = &keyvals; + keyval.key = key; + keyval.type = type; + keyval.value = data_value; + + value.tokens = tokens; + values = &value; + + /* put the value on the registry */ + if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &values))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* no memory to clean up since we didn't allocate any */ + return ORTE_SUCCESS; +} + + +int orte_gpr_base_put_N(orte_gpr_addr_mode_t addr_mode, + char *segment, char **tokens, + size_t n, char **keys, + orte_data_type_t *types, + orte_gpr_value_union_t *data_values) +{ + orte_gpr_value_t *values; + orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0}, + ORTE_GPR_TOKENS_AND, + NULL, 0, NULL, 0, NULL }; + size_t i, j; + int rc; + + value.addr_mode = addr_mode; + value.segment = segment; + value.cnt = n; + value.keyvals = (orte_gpr_keyval_t**)malloc(n * sizeof(orte_gpr_keyval_t*)); + if (NULL == value.keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + for (i=0; i < n; i++) { + value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == value.keyvals[i]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + for (j=0; j < i; j++) OBJ_RELEASE(value.keyvals[j]); + free(value.keyvals); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.keyvals[i]->key = keys[i]; + value.keyvals[i]->type = types[i]; + value.keyvals[i]->value = data_values[i]; + } + + value.tokens = tokens; + values = &value; + + /* put the value on the registry */ + if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &values))) { + ORTE_ERROR_LOG(rc); + } + + /* clean up memory - very carefully! + * We can't use the object destructors because we didn't + * copy input data fields into the objects. Thus, only + * release the data that we explicitly allocated + */ + for (i=0; i < n; i++) free(value.keyvals[i]); + free(value.keyvals); + + return rc; +} + diff --git a/orte/mca/gpr/base/gpr_base_simplified_subscribe.c b/orte/mca/gpr/base/gpr_base_simplified_subscribe.c new file mode 100644 index 0000000000..360de6a87e --- /dev/null +++ b/orte/mca/gpr/base/gpr_base_simplified_subscribe.c @@ -0,0 +1,244 @@ +/* -*- C -*- + * + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file + * + */ + +#include "orte_config.h" +#include "orte/include/orte_constants.h" + +#include "opal/util/output.h" + +#include "orte/dps/dps.h" +#include "orte/mca/errmgr/errmgr.h" + +#include "orte/mca/gpr/base/base.h" + +int orte_gpr_base_subscribe_1(orte_gpr_subscription_id_t *id, + char *trig_name, + char *sub_name, + orte_gpr_notify_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + char *key, + orte_gpr_notify_cb_fn_t cbfunc, + void *user_tag) +{ + orte_gpr_value_t *values; + orte_gpr_keyval_t *keyvals; + orte_gpr_keyval_t keyval = { {OBJ_CLASS(opal_object_t),0}, + NULL, ORTE_NULL }; + orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0}, + ORTE_GPR_TOKENS_AND, + NULL, 0, NULL, 0, NULL }; + orte_gpr_subscription_t *subs; + orte_gpr_subscription_t sub = { {OBJ_CLASS(opal_object_t),0}, + NULL, 0, 0, 0, NULL, 0, NULL }; + orte_gpr_trigger_t *trigs; + orte_gpr_trigger_t trig = { {OBJ_CLASS(opal_object_t),0}, + NULL, 0, 0, 0, NULL, 0, NULL }; + int rc; + + /* assemble the subscription object */ + subs = ⊂ + sub.action = action; + sub.cnt = 1; + values = &value; + sub.values = &values; + sub.cbfunc = cbfunc; + sub.user_tag = user_tag; + + value.addr_mode = addr_mode; + value.segment = segment; + value.cnt = 1; + keyvals = &keyval; + value.keyvals = &keyvals; + value.tokens = tokens; + + keyval.key = key; + + /* assemble the trigger object - all we have here is the name*/ + trigs = &trig; + trig.name = trig_name; + + /* send the subscription */ + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { + ORTE_ERROR_LOG(rc); + } + + /* no memory to cleanup since we didn't allocate anything */ + + /* return the subscription id */ + *id = sub.id; + + return rc; +} + + +int orte_gpr_base_subscribe_N(orte_gpr_subscription_id_t *id, + char *trig_name, + char *sub_name, + orte_gpr_notify_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + size_t n, + char **keys, + orte_gpr_notify_cb_fn_t cbfunc, + void *user_tag) +{ + orte_gpr_value_t *values; + orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0}, + ORTE_GPR_TOKENS_AND, + NULL, 0, NULL, 0, NULL }; + orte_gpr_subscription_t *subs; + orte_gpr_subscription_t sub = { {OBJ_CLASS(opal_object_t),0}, + NULL, 0, 0, 0, NULL, 0, NULL }; + orte_gpr_trigger_t *trigs; + orte_gpr_trigger_t trig = { {OBJ_CLASS(opal_object_t),0}, + NULL, 0, 0, 0, NULL, 0, NULL }; + size_t i, j; + int rc; + + /* assemble the subscription object */ + subs = ⊂ + sub.name = sub_name; + sub.action = action; + sub.cnt = 1; + values = &value; + sub.values = &values; + sub.cbfunc = cbfunc; + sub.user_tag = user_tag; + + value.addr_mode = addr_mode; + value.segment = segment; + value.cnt = n; + + value.keyvals = (orte_gpr_keyval_t**)malloc(n * sizeof(orte_gpr_keyval_t*)); + if (NULL == value.keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + for (i=0; i < n; i++) { + value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == value.keyvals[i]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + for (j=0; j < i; j++) OBJ_RELEASE(value.keyvals[j]); + free(value.keyvals); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.keyvals[i]->key = keys[i]; + } + + value.tokens = tokens; + + /* assemble the trigger object - all we have here is the name */ + trigs = &trig; + trig.name = trig_name; + + /* send the subscription */ + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { + ORTE_ERROR_LOG(rc); + } + + /* clean up memory - very carefully! + * We can't use the object destructors because we didn't + * copy input data fields into the objects. Thus, only + * release the data that we explicitly allocated + */ + for (i=0; i < n; i++) free(value.keyvals[i]); + free(value.keyvals); + + /* return the subscription id */ + *id = sub.id; + + return rc; +} + + +int orte_gpr_base_define_trigger(orte_gpr_trigger_id_t *id, + char *trig_name, + orte_gpr_trigger_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + size_t n, + char **keys, + orte_gpr_trigger_cb_fn_t cbfunc, + void *user_tag) +{ + orte_gpr_value_t *values; + orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0}, + ORTE_GPR_TOKENS_AND, + NULL, 0, NULL, 0, NULL }; + orte_gpr_trigger_t *trigs; + orte_gpr_trigger_t trig = { {OBJ_CLASS(opal_object_t),0}, + NULL, 0, 0, 0, NULL, 0, NULL }; + size_t i, j; + int rc; + + /* assemble the trigger object */ + trig.name = trig_name; + trig.action = action; + trig.cnt = 1; + values = &value; + trig.values = &values; + trig.cbfunc = cbfunc; + trig.user_tag = user_tag; + + value.addr_mode = addr_mode; + value.segment = segment; + value.cnt = n; + + value.keyvals = (orte_gpr_keyval_t**)malloc(n * sizeof(orte_gpr_keyval_t*)); + if (NULL == value.keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + for (i=0; i < n; i++) { + value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == value.keyvals[i]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + for (j=0; j < i; j++) OBJ_RELEASE(value.keyvals[j]); + free(value.keyvals); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.keyvals[i]->key = keys[i]; + } + + value.tokens = tokens; + + /* send the subscription */ + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(0, NULL, 1, &trigs))) { + ORTE_ERROR_LOG(rc); + } + + /* clean up memory - very carefully! + * We can't use the object destructors because we didn't + * copy input data fields into the objects. Thus, only + * release the data that we explicitly allocated + */ + for (i=0; i < n; i++) free(value.keyvals[i]); + free(value.keyvals); + + /* return the subscription id */ + *id = trig.id; + + return rc; +} + diff --git a/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_dump.c b/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_dump.c index 0ebdeb5864..1c76bb8a4d 100644 --- a/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_dump.c +++ b/orte/mca/gpr/base/pack_api_cmd/gpr_base_pack_dump.c @@ -25,8 +25,9 @@ #include "orte_config.h" -#include "include/orte_constants.h" -#include "dps/dps.h" +#include "orte/include/orte_constants.h" +#include "orte/dps/dps.h" +#include "orte/mca/errmgr/errmgr.h" #include "mca/gpr/base/base.h" @@ -39,13 +40,24 @@ int orte_gpr_base_pack_dump_all(orte_buffer_t *cmd) return orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD); } -int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd) +int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd, char *segment) { orte_gpr_cmd_flag_t command; + int rc; command = ORTE_GPR_DUMP_SEGMENTS_CMD; - return orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD); + if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &segment, 1, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + return ORTE_SUCCESS; } int orte_gpr_base_pack_dump_triggers(orte_buffer_t *cmd) diff --git a/orte/mca/gpr/gpr.h b/orte/mca/gpr/gpr.h index f93e4d9ec4..39e7d15633 100644 --- a/orte/mca/gpr/gpr.h +++ b/orte/mca/gpr/gpr.h @@ -246,6 +246,19 @@ typedef int (*orte_gpr_base_module_delete_segment_nb_fn_t)(char *segment, */ typedef int (*orte_gpr_base_module_put_fn_t)(size_t cnt, orte_gpr_value_t **values); +/* simplified version of the put command */ +typedef int (*orte_gpr_base_module_put_1_fn_t)(orte_gpr_addr_mode_t addr_mode, + char *segment, char **tokens, + char *key, orte_data_type_t type, + orte_gpr_value_union_t value); + +typedef int (*orte_gpr_base_module_put_N_fn_t)(orte_gpr_addr_mode_t addr_mode, + char *segment, char **tokens, + size_t n, char **keys, + orte_data_type_t *types, + orte_gpr_value_union_t *data_values); + + /* * Put data on the registry (NON-BLOCKING) * A non-blocking version of put. @@ -450,6 +463,41 @@ typedef int (*orte_gpr_base_module_subscribe_fn_t)( size_t num_trigs, orte_gpr_trigger_t **triggers); +/* simplified subscription functions */ +typedef int (*orte_gpr_base_module_subscribe_1_fn_t)(orte_gpr_subscription_id_t *id, + char *trig_name, + char *sub_name, + orte_gpr_notify_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + char *key, + orte_gpr_notify_cb_fn_t cbfunc, + void *user_tag); + +typedef int (*orte_gpr_base_module_subscribe_N_fn_t)(orte_gpr_subscription_id_t *id, + char *trig_name, + char *sub_name, + orte_gpr_notify_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + size_t n, + char **keys, + orte_gpr_notify_cb_fn_t cbfunc, + void *user_tag); + +typedef int (*orte_gpr_base_module_define_trigger_fn_t)(orte_gpr_trigger_id_t *id, + char *trig_name, + orte_gpr_trigger_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + size_t n, + char **keys, + orte_gpr_trigger_cb_fn_t cbfunc, + void *user_tag); + /* * Cancel a subscription. * Once a subscription has been entered on the registry, a caller may choose to permanently @@ -503,12 +551,16 @@ typedef int (*orte_gpr_base_module_cancel_trigger_fn_t)(orte_gpr_trigger_id_t tr */ typedef int (*orte_gpr_base_module_dump_all_fn_t)(int output_id); -typedef int (*orte_gpr_base_module_dump_segments_fn_t)(int output_id); +typedef int (*orte_gpr_base_module_dump_segment_fn_t)(char *segment, int output_id); typedef int (*orte_gpr_base_module_dump_triggers_fn_t)(int output_id); typedef int (*orte_gpr_base_module_dump_subscriptions_fn_t)(int output_id); +typedef int (*orte_gpr_base_module_dump_local_triggers_fn_t)(int output_id); + +typedef int (*orte_gpr_base_module_dump_local_subscriptions_fn_t)(int output_id); + typedef int (*orte_gpr_base_module_dump_callbacks_fn_t) (int output_id); typedef int (*orte_gpr_base_module_dump_notify_msg_fn_t)(orte_gpr_notify_message_t *msg, int output_id); @@ -556,6 +608,8 @@ struct orte_gpr_base_module_1_0_0_t { /* BLOCKING OPERATIONS */ orte_gpr_base_module_get_fn_t get; orte_gpr_base_module_put_fn_t put; + orte_gpr_base_module_put_1_fn_t put_1; + orte_gpr_base_module_put_N_fn_t put_N; orte_gpr_base_module_delete_entries_fn_t delete_entries; orte_gpr_base_module_delete_segment_fn_t delete_segment; orte_gpr_base_module_index_fn_t index; @@ -573,6 +627,9 @@ struct orte_gpr_base_module_1_0_0_t { orte_gpr_base_module_decrement_value_fn_t decrement_value; /* SUBSCRIBE OPERATIONS */ orte_gpr_base_module_subscribe_fn_t subscribe; + orte_gpr_base_module_subscribe_1_fn_t subscribe_1; + orte_gpr_base_module_subscribe_N_fn_t subscribe_N; + orte_gpr_base_module_define_trigger_fn_t define_trigger; orte_gpr_base_module_unsubscribe_fn_t unsubscribe; orte_gpr_base_module_cancel_trigger_fn_t cancel_trigger; /* COMPOUND COMMANDS */ @@ -581,9 +638,11 @@ struct orte_gpr_base_module_1_0_0_t { orte_gpr_base_module_exec_compound_cmd_fn_t exec_compound_cmd; /* DIAGNOSTIC OPERATIONS */ orte_gpr_base_module_dump_all_fn_t dump_all; - orte_gpr_base_module_dump_segments_fn_t dump_segments; + orte_gpr_base_module_dump_segment_fn_t dump_segment; orte_gpr_base_module_dump_triggers_fn_t dump_triggers; orte_gpr_base_module_dump_subscriptions_fn_t dump_subscriptions; + orte_gpr_base_module_dump_local_triggers_fn_t dump_local_triggers; + orte_gpr_base_module_dump_local_subscriptions_fn_t dump_local_subscriptions; orte_gpr_base_module_dump_callbacks_fn_t dump_callbacks; orte_gpr_base_module_dump_notify_msg_fn_t dump_notify_msg; orte_gpr_base_module_dump_notify_data_fn_t dump_notify_data; diff --git a/orte/mca/gpr/null/gpr_null.c b/orte/mca/gpr/null/gpr_null.c index 7905683f03..83940d607a 100644 --- a/orte/mca/gpr/null/gpr_null.c +++ b/orte/mca/gpr/null/gpr_null.c @@ -168,7 +168,7 @@ orte_gpr_null_dump_all(int output_id) } static int -orte_gpr_null_dump_segments(int output_id) +orte_gpr_null_dump_segments(char *segment, int output_id) { return ORTE_SUCCESS; } @@ -185,6 +185,18 @@ orte_gpr_null_dump_subscriptions(int output_id) return ORTE_SUCCESS; } +static int +orte_gpr_null_dump_local_triggers(int output_id) +{ + return ORTE_SUCCESS; +} + +static int +orte_gpr_null_dump_local_subscriptions(int output_id) +{ + return ORTE_SUCCESS; +} + static int orte_gpr_null_dump_callbacks(int output_id) { @@ -231,6 +243,69 @@ orte_gpr_null_xfer_payload(orte_gpr_value_union_t * dest, return ORTE_SUCCESS; } +static int orte_gpr_null_put_1(orte_gpr_addr_mode_t addr_mode, + char *segment, char **tokens, + char *key, orte_data_type_t type, + orte_gpr_value_union_t value) +{ + return ORTE_SUCCESS; +} + + +static int orte_gpr_null_put_N(orte_gpr_addr_mode_t addr_mode, + char *segment, char **tokens, + size_t n, char **keys, + orte_data_type_t *types, + orte_gpr_value_union_t *data_values) +{ + return ORTE_SUCCESS; +} + +static int orte_gpr_null_subscribe_1(orte_gpr_subscription_id_t *id, + char *trig_name, + char *sub_name, + orte_gpr_notify_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + char *key, + orte_gpr_notify_cb_fn_t cbfunc, + void *user_tag) +{ + return ORTE_SUCCESS; +} + + +static int orte_gpr_null_subscribe_N(orte_gpr_subscription_id_t *id, + char *trig_name, + char *sub_name, + orte_gpr_notify_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + size_t n, + char **keys, + orte_gpr_notify_cb_fn_t cbfunc, + void *user_tag) +{ + return ORTE_SUCCESS; +} + + +static int orte_gpr_null_define_trigger(orte_gpr_trigger_id_t *id, + char *trig_name, + orte_gpr_trigger_action_t action, + orte_gpr_addr_mode_t addr_mode, + char *segment, + char **tokens, + size_t n, + char **keys, + orte_gpr_trigger_cb_fn_t cbfunc, + void *user_tag) +{ + return ORTE_SUCCESS; +} + /* @@ -242,6 +317,8 @@ orte_gpr_base_module_t orte_gpr_null_module = { /* BLOCKING OPERATIONS */ orte_gpr_null_get, orte_gpr_null_put, + orte_gpr_null_put_1, + orte_gpr_null_put_N, orte_gpr_null_delete_entries, orte_gpr_null_delete_segment, orte_gpr_null_index, @@ -259,6 +336,9 @@ orte_gpr_base_module_t orte_gpr_null_module = { orte_gpr_null_decrement_value, /* SUBSCRIBE OPERATIONS */ orte_gpr_null_subscribe, + orte_gpr_null_subscribe_1, + orte_gpr_null_subscribe_N, + orte_gpr_null_define_trigger, orte_gpr_null_unsubscribe, orte_gpr_null_cancel_trigger, /* COMPOUND COMMANDS */ @@ -270,6 +350,8 @@ orte_gpr_base_module_t orte_gpr_null_module = { orte_gpr_null_dump_segments, orte_gpr_null_dump_triggers, orte_gpr_null_dump_subscriptions, + orte_gpr_null_dump_local_triggers, + orte_gpr_null_dump_local_subscriptions, orte_gpr_null_dump_callbacks, orte_gpr_null_dump_notify_msg, orte_gpr_null_dump_notify_data, diff --git a/orte/mca/gpr/proxy/Makefile.am b/orte/mca/gpr/proxy/Makefile.am index 59c42355b5..b943a86a42 100644 --- a/orte/mca/gpr/proxy/Makefile.am +++ b/orte/mca/gpr/proxy/Makefile.am @@ -24,6 +24,7 @@ sources = \ gpr_proxy_del_index.c \ gpr_proxy_cleanup.c \ gpr_proxy_dump.c \ + gpr_proxy_dump_local_trigs_subs.c \ gpr_proxy_internals.c \ gpr_proxy_put_get.c \ gpr_proxy_general_operations.c \ diff --git a/orte/mca/gpr/proxy/gpr_proxy.h b/orte/mca/gpr/proxy/gpr_proxy.h index e7d3ab8a4d..357e644d4e 100644 --- a/orte/mca/gpr/proxy/gpr_proxy.h +++ b/orte/mca/gpr/proxy/gpr_proxy.h @@ -55,6 +55,7 @@ int orte_gpr_proxy_finalize(void); typedef struct { opal_object_t super; /**< Allows this to be an object */ orte_gpr_subscription_id_t id; /**< id of this subscription */ + char *name; orte_gpr_notify_cb_fn_t callback; /**< Function to be called for notificaiton */ void *user_tag; /**< User-provided tag for callback function */ } orte_gpr_proxy_subscriber_t; @@ -65,6 +66,7 @@ OBJ_CLASS_DECLARATION(orte_gpr_proxy_subscriber_t); typedef struct { opal_object_t super; /**< Allows this to be an object */ orte_gpr_trigger_id_t id; /**< id of this trigger */ + char *name; orte_gpr_trigger_cb_fn_t callback; /**< Function to be called for notification */ void *user_tag; /**< User-provided tag for callback function */ } orte_gpr_proxy_trigger_t; @@ -173,12 +175,16 @@ int orte_gpr_proxy_cancel_trigger(orte_gpr_trigger_id_t trig); */ int orte_gpr_proxy_dump_all(int output_id); -int orte_gpr_proxy_dump_segments(int output_id); +int orte_gpr_proxy_dump_segments(char *segment, int output_id); int orte_gpr_proxy_dump_triggers(int output_id); int orte_gpr_proxy_dump_subscriptions(int output_id); +int orte_gpr_proxy_dump_local_triggers(int output_id); + +int orte_gpr_proxy_dump_local_subscriptions(int output_id); + int orte_gpr_proxy_dump_callbacks(int output_id); int orte_gpr_proxy_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id); diff --git a/orte/mca/gpr/proxy/gpr_proxy_component.c b/orte/mca/gpr/proxy/gpr_proxy_component.c index aeb05d29e5..fbd6961ad7 100644 --- a/orte/mca/gpr/proxy/gpr_proxy_component.c +++ b/orte/mca/gpr/proxy/gpr_proxy_component.c @@ -70,6 +70,8 @@ static orte_gpr_base_module_t orte_gpr_proxy = { /* BLOCKING OPERATIONS */ orte_gpr_proxy_get, orte_gpr_proxy_put, + orte_gpr_base_put_1, + orte_gpr_base_put_N, orte_gpr_proxy_delete_entries, orte_gpr_proxy_delete_segment, orte_gpr_proxy_index, @@ -87,6 +89,9 @@ static orte_gpr_base_module_t orte_gpr_proxy = { orte_gpr_proxy_decrement_value, /* SUBSCRIBE OPERATIONS */ orte_gpr_proxy_subscribe, + orte_gpr_base_subscribe_1, + orte_gpr_base_subscribe_N, + orte_gpr_base_define_trigger, orte_gpr_proxy_unsubscribe, orte_gpr_proxy_cancel_trigger, /* COMPOUND COMMANDS */ @@ -98,6 +103,8 @@ static orte_gpr_base_module_t orte_gpr_proxy = { orte_gpr_proxy_dump_segments, orte_gpr_proxy_dump_triggers, orte_gpr_proxy_dump_subscriptions, + orte_gpr_proxy_dump_local_triggers, + orte_gpr_proxy_dump_local_subscriptions, orte_gpr_proxy_dump_callbacks, orte_gpr_proxy_dump_notify_msg, orte_gpr_proxy_dump_notify_data, @@ -125,11 +132,13 @@ static void orte_gpr_proxy_subscriber_construct(orte_gpr_proxy_subscriber_t* req req->callback = NULL; req->user_tag = NULL; req->id = 0; + req->name = NULL; } /* destructor - used to free any resources held by instance */ static void orte_gpr_proxy_subscriber_destructor(orte_gpr_proxy_subscriber_t* req) { + if (NULL != req->name) free(req->name); } /* define instance of opal_class_t */ @@ -147,11 +156,13 @@ static void orte_gpr_proxy_trigger_construct(orte_gpr_proxy_trigger_t* req) req->callback = NULL; req->user_tag = NULL; req->id = 0; + req->name = NULL; } /* destructor - used to free any resources held by instance */ static void orte_gpr_proxy_trigger_destructor(orte_gpr_proxy_trigger_t* req) { + if (NULL != req->name) free(req->name); } /* define instance of opal_class_t */ diff --git a/orte/mca/gpr/proxy/gpr_proxy_dump.c b/orte/mca/gpr/proxy/gpr_proxy_dump.c index 807db6a80a..dd797f3d13 100644 --- a/orte/mca/gpr/proxy/gpr_proxy_dump.c +++ b/orte/mca/gpr/proxy/gpr_proxy_dump.c @@ -106,7 +106,7 @@ int orte_gpr_proxy_dump_all(int output_id) return rc; } -int orte_gpr_proxy_dump_segments(int output_id) +int orte_gpr_proxy_dump_segments(char *segment, int output_id) { orte_gpr_cmd_flag_t command; orte_buffer_t *cmd; @@ -115,7 +115,7 @@ int orte_gpr_proxy_dump_segments(int output_id) size_t n; if (orte_gpr_proxy_globals.compound_cmd_mode) { - return orte_gpr_base_pack_dump_segments(orte_gpr_proxy_globals.compound_cmd); + return orte_gpr_base_pack_dump_segments(orte_gpr_proxy_globals.compound_cmd, segment); } cmd = OBJ_NEW(orte_buffer_t); @@ -124,7 +124,7 @@ int orte_gpr_proxy_dump_segments(int output_id) return ORTE_ERR_OUT_OF_RESOURCE; } - if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_dump_segments(cmd))) { + if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_dump_segments(cmd, segment))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(cmd); return rc; diff --git a/orte/mca/gpr/proxy/gpr_proxy_dump_local_trigs_subs.c b/orte/mca/gpr/proxy/gpr_proxy_dump_local_trigs_subs.c new file mode 100644 index 0000000000..45bd522a46 --- /dev/null +++ b/orte/mca/gpr/proxy/gpr_proxy_dump_local_trigs_subs.c @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + * + * The Open MPI general purpose registry - implementation. + * + */ + +/* + * includes + */ + +#include "orte_config.h" + +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#ifdef HAVE_LIBGEN_H +#include +#endif + +#include "orte/include/orte_constants.h" + +#include "orte/mca/errmgr/errmgr.h" + +#include "opal/util/output.h" + +#include "gpr_proxy.h" + +int orte_gpr_proxy_dump_local_triggers(int output_id) +{ + orte_gpr_proxy_trigger_t **trigs; + size_t j, k; + + opal_output(output_id, "DUMP OF LOCAL TRIGGERS for [%lu,%lu,%lu]\n", + ORTE_NAME_ARGS(orte_process_info.my_name)); + opal_output(output_id, "Number of triggers: %lu\n", (unsigned long) orte_gpr_proxy_globals.num_trigs); + + trigs = (orte_gpr_proxy_trigger_t**)(orte_gpr_proxy_globals.triggers)->addr; + for (j=0, k=0; k < orte_gpr_proxy_globals.num_trigs && + j < (orte_gpr_proxy_globals.triggers)->size; j++) { + if (NULL != trigs[j]) { + k++; + opal_output(output_id, "Data for trigger %lu", (unsigned long) trigs[j]->id); + if (NULL == trigs[j]->name) { + opal_output(output_id, "\tNOT a named trigger"); + } else { + opal_output(output_id, "\ttrigger name: %s", trigs[j]->name); + } + } + } + return ORTE_SUCCESS; +} + +int orte_gpr_proxy_dump_local_subscriptions(int output_id) +{ + orte_gpr_proxy_subscriber_t **subs; + size_t j, k; + + opal_output(output_id, "DUMP OF LOCAL SUBSCRIPTIONS for [%lu,%lu,%lu]\n", + ORTE_NAME_ARGS(orte_process_info.my_name)); + opal_output(output_id, "Number of subscriptions: %lu\n", (unsigned long) orte_gpr_proxy_globals.num_subs); + + subs = (orte_gpr_proxy_subscriber_t**)(orte_gpr_proxy_globals.subscriptions)->addr; + for (j=0, k=0; k < orte_gpr_proxy_globals.num_subs && + j < (orte_gpr_proxy_globals.subscriptions)->size; j++) { + if (NULL != subs[j]) { + k++; + opal_output(output_id, "Data for subscription %lu", (unsigned long) subs[j]->id); + if (NULL == subs[j]->name) { + opal_output(output_id, "\tNOT a named subscription"); + } else { + opal_output(output_id, "\tsubscription name: %s", subs[j]->name); + } + } + } + return ORTE_SUCCESS; +} diff --git a/orte/mca/gpr/proxy/gpr_proxy_internals.c b/orte/mca/gpr/proxy/gpr_proxy_internals.c index 8550fd600a..cb17d91664 100644 --- a/orte/mca/gpr/proxy/gpr_proxy_internals.c +++ b/orte/mca/gpr/proxy/gpr_proxy_internals.c @@ -25,16 +25,11 @@ #include "orte_config.h" -#include "include/orte_constants.h" -#include "opal/util/output.h" -#include "util/proc_info.h" +#include "orte/include/orte_constants.h" -#include "mca/errmgr/errmgr.h" -#include "mca/ns/ns_types.h" -#include "mca/oob/oob_types.h" -#include "mca/rml/rml.h" - -#include "gpr_proxy.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/class/orte_pointer_array.h" +#include "orte/mca/gpr/proxy/gpr_proxy.h" int orte_gpr_proxy_enter_subscription(size_t cnt, orte_gpr_subscription_t **subscriptions) @@ -48,6 +43,9 @@ orte_gpr_proxy_enter_subscription(size_t cnt, orte_gpr_subscription_t **subscrip ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } + if (NULL != subscriptions[i]->name) { + sub->name = strdup(subscriptions[i]->name); + } sub->callback = subscriptions[i]->cbfunc; sub->user_tag = subscriptions[i]->user_tag; if (0 > orte_pointer_array_add(&id, orte_gpr_proxy_globals.subscriptions, sub)) { @@ -75,6 +73,9 @@ orte_gpr_proxy_enter_trigger(size_t cnt, orte_gpr_trigger_t **trigs) ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } + if (NULL != trigs[i]->name) { + trig->name = strdup(trigs[i]->name); + } /* ensure that the proper routing flag is set * in the action field to match the trigger callback * function diff --git a/orte/mca/gpr/replica/api_layer/Makefile.am b/orte/mca/gpr/replica/api_layer/Makefile.am index 26d9ed8149..624aca0006 100644 --- a/orte/mca/gpr/replica/api_layer/Makefile.am +++ b/orte/mca/gpr/replica/api_layer/Makefile.am @@ -24,6 +24,7 @@ libmca_gpr_replica_api_la_SOURCES = \ gpr_replica_compound_cmd_api.c \ gpr_replica_del_index_api.c \ gpr_replica_dump_api.c \ + gpr_replica_dump_local_trigs_subs_api.c \ gpr_replica_arithmetic_ops_api.c \ gpr_replica_put_get_api.c \ gpr_replica_subscribe_api.c diff --git a/orte/mca/gpr/replica/api_layer/gpr_replica_api.h b/orte/mca/gpr/replica/api_layer/gpr_replica_api.h index 504d82cead..a961310592 100644 --- a/orte/mca/gpr/replica/api_layer/gpr_replica_api.h +++ b/orte/mca/gpr/replica/api_layer/gpr_replica_api.h @@ -127,12 +127,16 @@ int orte_gpr_replica_cancel_trigger(orte_gpr_trigger_id_t trig); */ int orte_gpr_replica_dump_all(int output_id); -int orte_gpr_replica_dump_segments(int output_id); +int orte_gpr_replica_dump_segments(char *segment, int output_id); int orte_gpr_replica_dump_triggers(int output_id); int orte_gpr_replica_dump_subscriptions(int output_id); +int orte_gpr_replica_dump_local_triggers(int output_id); + +int orte_gpr_replica_dump_local_subscriptions(int output_id); + int orte_gpr_replica_dump_callbacks(int output_id); int orte_gpr_replica_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id); diff --git a/orte/mca/gpr/replica/api_layer/gpr_replica_dump_api.c b/orte/mca/gpr/replica/api_layer/gpr_replica_dump_api.c index 83f7f0acfa..28a0aece95 100644 --- a/orte/mca/gpr/replica/api_layer/gpr_replica_dump_api.c +++ b/orte/mca/gpr/replica/api_layer/gpr_replica_dump_api.c @@ -66,7 +66,7 @@ int orte_gpr_replica_dump_all(int output_id) return rc; } -int orte_gpr_replica_dump_segments(int output_id) +int orte_gpr_replica_dump_segments(char *segment, int output_id) { orte_buffer_t *buffer; int rc; @@ -84,7 +84,7 @@ int orte_gpr_replica_dump_segments(int output_id) return ORTE_ERR_OUT_OF_RESOURCE; } - if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_segments_fn(buffer))) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_segments_fn(buffer, segment))) { ORTE_ERROR_LOG(rc); } diff --git a/orte/mca/gpr/replica/api_layer/gpr_replica_dump_local_trigs_subs_api.c b/orte/mca/gpr/replica/api_layer/gpr_replica_dump_local_trigs_subs_api.c new file mode 100644 index 0000000000..03e94fe334 --- /dev/null +++ b/orte/mca/gpr/replica/api_layer/gpr_replica_dump_local_trigs_subs_api.c @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + * + * The Open MPI general purpose registry - implementation. + * + */ + +/* + * includes + */ + +#include "orte_config.h" + +#include "opal/util/output.h" +#include "util/proc_info.h" + +#include "mca/ns/ns_types.h" +#include "mca/errmgr/errmgr.h" + +#include "mca/gpr/replica/api_layer/gpr_replica_api.h" + +int orte_gpr_replica_dump_local_triggers(int output_id) +{ + orte_gpr_replica_local_trigger_t **trigs; + size_t j, k; + + opal_output(output_id, "DUMP OF LOCAL TRIGGERS for [%lu,%lu,%lu]\n", + ORTE_NAME_ARGS(orte_process_info.my_name)); + opal_output(output_id, "Number of triggers: %lu\n", (unsigned long) orte_gpr_replica_globals.num_local_trigs); + + trigs = (orte_gpr_replica_local_trigger_t**)(orte_gpr_replica_globals.local_triggers)->addr; + for (j=0, k=0; k < orte_gpr_replica_globals.num_local_trigs && + j < (orte_gpr_replica_globals.local_triggers)->size; j++) { + if (NULL != trigs[j]) { + k++; + opal_output(output_id, "Data for trigger %lu", (unsigned long) trigs[j]->id); + if (NULL == trigs[j]->name) { + opal_output(output_id, "\tNOT a named trigger"); + } else { + opal_output(output_id, "\ttrigger name: %s", trigs[j]->name); + } + } + } + return ORTE_SUCCESS; +} + +int orte_gpr_replica_dump_local_subscriptions(int output_id) +{ + orte_gpr_replica_local_subscriber_t **subs; + size_t j, k; + + opal_output(output_id, "DUMP OF LOCAL SUBSCRIPTIONS for [%lu,%lu,%lu]\n", + ORTE_NAME_ARGS(orte_process_info.my_name)); + opal_output(output_id, "Number of subscriptions: %lu\n", (unsigned long) orte_gpr_replica_globals.num_local_subs); + + subs = (orte_gpr_replica_local_subscriber_t**)(orte_gpr_replica_globals.local_subscriptions)->addr; + for (j=0, k=0; k < orte_gpr_replica_globals.num_local_subs && + j < (orte_gpr_replica_globals.local_subscriptions)->size; j++) { + if (NULL != subs[j]) { + k++; + opal_output(output_id, "Data for subscription %lu", (unsigned long) subs[j]->id); + if (NULL == subs[j]->name) { + opal_output(output_id, "\tNOT a named subscription"); + } else { + opal_output(output_id, "\tsubscription name: %s", subs[j]->name); + } + } + } + return ORTE_SUCCESS; +} diff --git a/orte/mca/gpr/replica/communications/gpr_replica_cmd_processor.c b/orte/mca/gpr/replica/communications/gpr_replica_cmd_processor.c index 69f4b46a6e..f2d2307123 100644 --- a/orte/mca/gpr/replica/communications/gpr_replica_cmd_processor.c +++ b/orte/mca/gpr/replica/communications/gpr_replica_cmd_processor.c @@ -204,7 +204,7 @@ int orte_gpr_replica_process_command_buffer(orte_buffer_t *input_buffer, opal_output(0, "\tdump segments cmd"); } - if (ORTE_SUCCESS != (ret = orte_gpr_replica_recv_dump_segments_cmd(answer))) { + if (ORTE_SUCCESS != (ret = orte_gpr_replica_recv_dump_segments_cmd(input_buffer, answer))) { ORTE_ERROR_LOG(ret); goto RETURN_ERROR; } diff --git a/orte/mca/gpr/replica/communications/gpr_replica_comm.h b/orte/mca/gpr/replica/communications/gpr_replica_comm.h index 36f25eb17e..52664f249e 100644 --- a/orte/mca/gpr/replica/communications/gpr_replica_comm.h +++ b/orte/mca/gpr/replica/communications/gpr_replica_comm.h @@ -109,7 +109,7 @@ int orte_gpr_replica_recv_cancel_trigger_cmd(orte_process_name_t* sender, int orte_gpr_replica_recv_dump_all_cmd(orte_buffer_t *answer); -int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *answer); +int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *input_buffer, orte_buffer_t *answer); int orte_gpr_replica_recv_dump_triggers_cmd(orte_buffer_t *answer); diff --git a/orte/mca/gpr/replica/communications/gpr_replica_dump_cm.c b/orte/mca/gpr/replica/communications/gpr_replica_dump_cm.c index 428aa79f14..f9f91dbd33 100644 --- a/orte/mca/gpr/replica/communications/gpr_replica_dump_cm.c +++ b/orte/mca/gpr/replica/communications/gpr_replica_dump_cm.c @@ -49,9 +49,11 @@ int orte_gpr_replica_recv_dump_all_cmd(orte_buffer_t *answer) return rc; } -int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *answer) +int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *input_buffer, orte_buffer_t *answer) { orte_gpr_cmd_flag_t command=ORTE_GPR_DUMP_SEGMENTS_CMD; + char *segment; + size_t n; int rc; if (ORTE_SUCCESS != (rc = orte_dps.pack(answer, &command, 1, ORTE_GPR_CMD))) { @@ -59,7 +61,12 @@ int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *answer) return rc; } - rc = orte_gpr_replica_dump_segments_fn(answer); + n=1; + if (ORTE_SUCCESS != (rc = orte_dps.unpack(input_buffer, &segment, &n, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + return rc; + } + rc = orte_gpr_replica_dump_segments_fn(answer, segment); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c index a90c36db16..71a767b17e 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c @@ -70,20 +70,53 @@ int orte_gpr_replica_dump_all_fn(orte_buffer_t *buffer) return rc; } - rc = orte_gpr_replica_dump_segments_fn(buffer); + rc = orte_gpr_replica_dump_segments_fn(buffer, NULL); return rc; } -int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer) +int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer, char *segment) { - orte_gpr_replica_segment_t **seg; + orte_gpr_replica_segment_t **seg, *segptr; + size_t i, m; + int rc; + + /* if segment = NULL, loop through all segments */ + if (NULL == segment) { + seg = (orte_gpr_replica_segment_t**)(orte_gpr_replica.segments)->addr; + for (i=0, m=0; m < orte_gpr_replica.num_segs && + i < (orte_gpr_replica.segments)->size; i++) { + if (NULL != seg[i]) { + m++; + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_a_segment_fn(buffer, seg[i]))) { + ORTE_ERROR_LOG(rc); + return rc; + } + } + } + return ORTE_SUCCESS; + } + + /* otherwise, dump just the one specified */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_seg(&segptr, false, segment))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_a_segment_fn(buffer, segptr))) { + ORTE_ERROR_LOG(rc); + return rc; + } + return ORTE_SUCCESS; + } + + int orte_gpr_replica_dump_a_segment_fn(orte_buffer_t *buffer, orte_gpr_replica_segment_t *seg) + { orte_gpr_replica_container_t **cptr; orte_gpr_replica_itag_t *itaglist; orte_gpr_replica_itagval_t **iptr; char *token; size_t num_objects; - size_t i, j, k, m, n, p; + size_t j, k, n, p; char *tmp_out; tmp_out = (char*)malloc(1000); @@ -92,84 +125,73 @@ int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer) return ORTE_ERR_OUT_OF_RESOURCE; } - sprintf(tmp_out, "\nDUMP OF GPR SEGMENTS"); + sprintf(tmp_out, "\nDUMP OF GPR SEGMENT %s", seg->name); orte_gpr_replica_dump_load_string(buffer, &tmp_out); - - /* loop through all segments */ - seg = (orte_gpr_replica_segment_t**)(orte_gpr_replica.segments)->addr; - for (i=0, m=0; m < orte_gpr_replica.num_segs && - i < (orte_gpr_replica.segments)->size; i++) { - if (NULL != seg[i]) { - m++; - sprintf(tmp_out, "\nGPR Dump for Segment: %s", seg[i]->name); + + num_objects = (seg->containers)->size - (seg->containers)->number_free; + + sprintf(tmp_out, "\tNumber of containers: %lu\n", + (unsigned long) num_objects); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + + /* loop through all containers and print their info and contents */ + cptr = (orte_gpr_replica_container_t**)(seg->containers)->addr; + for (j=0, n=0; n < seg->num_containers && + j < (seg->containers)->size; j++) { + if (NULL != cptr[j]) { + n++; + sprintf(tmp_out, "\n\tInfo for container %lu" + "\tNumber of keyvals: %lu" + "\n\tTokens:\n", + (unsigned long) j, + ((unsigned long) (cptr[j]->itagvals)->size - (cptr[j]->itagvals)->number_free)); orte_gpr_replica_dump_load_string(buffer, &tmp_out); - - num_objects = (seg[i]->containers)->size - (seg[i]->containers)->number_free; - - sprintf(tmp_out, "\tNumber of containers: %lu\n", - (unsigned long) num_objects); + + /* reverse lookup tokens and print them */ + itaglist = cptr[j]->itags; + for (k=0; k < cptr[j]->num_itags; k++) { + if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup( + &token, seg, itaglist[k])) { + sprintf(tmp_out, "\t\titag num %lu" + ": No entry found for itag %lu", + (unsigned long) k, + (unsigned long) itaglist[k]); + } else { + sprintf(tmp_out, "\t\titag num %lu: itag %lu\tToken: %s", + (unsigned long) k, + (unsigned long) itaglist[k], token); + free(token); + } + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } + + sprintf(tmp_out, "\n\tKeyval info:"); orte_gpr_replica_dump_load_string(buffer, &tmp_out); - - /* loop through all containers and print their info and contents */ - cptr = (orte_gpr_replica_container_t**)(seg[i]->containers)->addr; - for (j=0, n=0; n < seg[i]->num_containers && - j < (seg[i]->containers)->size; j++) { - if (NULL != cptr[j]) { - n++; - sprintf(tmp_out, "\n\tInfo for container %lu" - "\tNumber of keyvals: %lu" - "\n\tTokens:\n", - (unsigned long) j, - ((unsigned long) (cptr[j]->itagvals)->size - (cptr[j]->itagvals)->number_free)); - orte_gpr_replica_dump_load_string(buffer, &tmp_out); - - /* reverse lookup tokens and print them */ - itaglist = cptr[j]->itags; - for (k=0; k < cptr[j]->num_itags; k++) { - if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup( - &token, seg[i], itaglist[k])) { - sprintf(tmp_out, "\t\titag num %lu" - ": No entry found for itag %lu", - (unsigned long) k, - (unsigned long) itaglist[k]); - } else { - sprintf(tmp_out, "\t\titag num %lu: itag %lu\tToken: %s", - (unsigned long) k, - (unsigned long) itaglist[k], token); - free(token); - } - orte_gpr_replica_dump_load_string(buffer, &tmp_out); + + /* loop through all itagvals and print their info */ + iptr = (orte_gpr_replica_itagval_t**)(cptr[j]->itagvals)->addr; + for (k=0, p=0; p < cptr[j]->num_itagvals && + k < (cptr[j]->itagvals)->size; k++) { + if (NULL != iptr[k]) { + p++; + if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup( + &token, seg, iptr[k]->itag)) { + sprintf(tmp_out, "\n\t\titag num %lu: No entry found for itag %lu", + (unsigned long) k, + (unsigned long) iptr[k]->itag); + } else { + sprintf(tmp_out, "\n\t\tEntry %lu: itag %lu\tKey: %s", + (unsigned long) k, + (unsigned long) iptr[k]->itag, token); + free(token); } - - sprintf(tmp_out, "\n\tKeyval info:"); orte_gpr_replica_dump_load_string(buffer, &tmp_out); - - /* loop through all itagvals and print their info */ - iptr = (orte_gpr_replica_itagval_t**)(cptr[j]->itagvals)->addr; - for (k=0, p=0; p < cptr[j]->num_itagvals && - k < (cptr[j]->itagvals)->size; k++) { - if (NULL != iptr[k]) { - p++; - if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup( - &token, seg[i], iptr[k]->itag)) { - sprintf(tmp_out, "\n\t\titag num %lu: No entry found for itag %lu", - (unsigned long) k, - (unsigned long) iptr[k]->itag); - } else { - sprintf(tmp_out, "\n\t\tEntry %lu: itag %lu\tKey: %s", - (unsigned long) k, - (unsigned long) iptr[k]->itag, token); - free(token); - } - orte_gpr_replica_dump_load_string(buffer, &tmp_out); - orte_gpr_replica_dump_itagval_value(buffer, iptr[k]); - } - } + orte_gpr_replica_dump_itagval_value(buffer, iptr[k]); } } } - } - + } + free(tmp_out); return ORTE_SUCCESS; } diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h b/orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h index baf7cf3533..7125e4942d 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h @@ -122,7 +122,9 @@ int orte_gpr_replica_subscribe_fn(orte_process_name_t *requestor, */ int orte_gpr_replica_dump_all_fn(orte_buffer_t *buffer); -int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer); +int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer, char *segment); + +int orte_gpr_replica_dump_a_segment_fn(orte_buffer_t *buffer, orte_gpr_replica_segment_t *seg); int orte_gpr_replica_dump_triggers_fn(orte_buffer_t *buffer); diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_local_trig_ops_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_local_trig_ops_fn.c index 115800d462..0cab0c45a4 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_local_trig_ops_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_local_trig_ops_fn.c @@ -48,6 +48,9 @@ orte_gpr_replica_enter_local_subscription(size_t cnt, orte_gpr_subscription_t ** ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } + if (NULL != subscriptions[i]->name) { + sub->name = strdup(subscriptions[i]->name); + } sub->callback = subscriptions[i]->cbfunc; sub->user_tag = subscriptions[i]->user_tag; if (0 > orte_pointer_array_add(&id, orte_gpr_replica_globals.local_subscriptions, sub)) { diff --git a/orte/mca/gpr/replica/gpr_replica.h b/orte/mca/gpr/replica/gpr_replica.h index 4645a66469..3f460f6958 100644 --- a/orte/mca/gpr/replica/gpr_replica.h +++ b/orte/mca/gpr/replica/gpr_replica.h @@ -77,6 +77,7 @@ typedef uint8_t orte_gpr_replica_action_t; typedef struct { opal_object_t super; /**< Allows this to be an object */ orte_gpr_subscription_id_t id; /**< id of this subscription */ + char *name; orte_gpr_notify_cb_fn_t callback; /**< Function to be called for notificaiton */ void *user_tag; /**< User-provided tag for callback function */ } orte_gpr_replica_local_subscriber_t; @@ -91,6 +92,7 @@ OBJ_CLASS_DECLARATION(orte_gpr_replica_local_subscriber_t); typedef struct { opal_object_t super; /**< Allows this to be an object */ orte_gpr_trigger_id_t id; /**< id of this trigger */ + char *name; orte_gpr_trigger_cb_fn_t callback; /**< Function to be called for notification */ void *user_tag; /**< User-provided tag for callback function */ } orte_gpr_replica_local_trigger_t; diff --git a/orte/mca/gpr/replica/gpr_replica_class_instances.h b/orte/mca/gpr/replica/gpr_replica_class_instances.h index ec8d7ca709..60a61d3d80 100644 --- a/orte/mca/gpr/replica/gpr_replica_class_instances.h +++ b/orte/mca/gpr/replica/gpr_replica_class_instances.h @@ -34,23 +34,40 @@ */ /* LOCAL_SUBSCRIBER */ -/* no constructor or destructor needed, so just - * define instance */ +static void orte_gpr_replica_local_subscriber_constructor(orte_gpr_replica_local_subscriber_t *ptr) +{ + ptr->name = NULL; +} + +static void orte_gpr_replica_local_subscriber_destructor(orte_gpr_replica_local_subscriber_t *ptr) +{ + if (NULL != ptr->name) free(ptr->name); +} + OBJ_CLASS_INSTANCE( orte_gpr_replica_local_subscriber_t, /* type name */ opal_object_t, /* parent "class" name */ - NULL, /* constructor */ - NULL); /* destructor */ + orte_gpr_replica_local_subscriber_constructor, /* constructor */ + orte_gpr_replica_local_subscriber_destructor); /* destructor */ /* LOCAL_TRIGGER */ -/* no constructor or destructor needed, so just - * define instance */ +static void orte_gpr_replica_local_trigger_constructor(orte_gpr_replica_local_trigger_t *ptr) +{ + ptr->name = NULL; +} + +static void orte_gpr_replica_local_trigger_destructor(orte_gpr_replica_local_trigger_t *ptr) +{ + if (NULL != ptr->name) free(ptr->name); +} + +/* define instance */ OBJ_CLASS_INSTANCE( orte_gpr_replica_local_trigger_t, /* type name */ opal_object_t, /* parent "class" name */ - NULL, /* constructor */ - NULL); /* destructor */ + orte_gpr_replica_local_trigger_constructor, /* constructor */ + orte_gpr_replica_local_trigger_destructor); /* destructor */ /* SEGMENT */ diff --git a/orte/mca/gpr/replica/gpr_replica_component.c b/orte/mca/gpr/replica/gpr_replica_component.c index 8dc1d21f43..ecc114d5b7 100644 --- a/orte/mca/gpr/replica/gpr_replica_component.c +++ b/orte/mca/gpr/replica/gpr_replica_component.c @@ -68,6 +68,8 @@ static orte_gpr_base_module_t orte_gpr_replica_module = { /* BLOCKING OPERATIONS */ orte_gpr_replica_get, orte_gpr_replica_put, + orte_gpr_base_put_1, + orte_gpr_base_put_N, orte_gpr_replica_delete_entries, orte_gpr_replica_delete_segment, orte_gpr_replica_index, @@ -85,6 +87,9 @@ static orte_gpr_base_module_t orte_gpr_replica_module = { orte_gpr_replica_decrement_value, /* SUBSCRIBE OPERATIONS */ orte_gpr_replica_subscribe, + orte_gpr_base_subscribe_1, + orte_gpr_base_subscribe_N, + orte_gpr_base_define_trigger, orte_gpr_replica_unsubscribe, orte_gpr_replica_cancel_trigger, /* COMPOUND COMMANDS */ @@ -96,6 +101,8 @@ static orte_gpr_base_module_t orte_gpr_replica_module = { orte_gpr_replica_dump_segments, orte_gpr_replica_dump_triggers, orte_gpr_replica_dump_subscriptions, + orte_gpr_replica_dump_local_triggers, + orte_gpr_replica_dump_local_subscriptions, orte_gpr_replica_dump_callbacks, orte_gpr_replica_dump_notify_msg, orte_gpr_replica_dump_notify_data, diff --git a/orte/runtime/orte_setup_hnp.c b/orte/runtime/orte_setup_hnp.c index 2562b65548..ac5a3113c7 100644 --- a/orte/runtime/orte_setup_hnp.c +++ b/orte/runtime/orte_setup_hnp.c @@ -292,7 +292,7 @@ MOVEON: can_launch = true; } - orte_gpr.dump_segments(0); + orte_gpr.dump_segment(NULL, 0); if (!can_launch || ORTE_CELLID_MAX == cellid) { return ORTE_ERR_UNREACH;