1
1

Add a few new functions that were requested last week - not tested yet, so please don't use them! I will test them this afternoon on a different computer. For now, they won't cause any problems since they aren't being called.

This commit was SVN r6689.
Этот коммит содержится в:
Ralph Castain 2005-08-01 16:38:15 +00:00
родитель 41f7bb3a2a
Коммит 8c6c78c47a
27 изменённых файлов: 944 добавлений и 113 удалений

Просмотреть файл

@ -32,6 +32,8 @@ libmca_gpr_base_la_SOURCES = \
gpr_base_open.c \ gpr_base_open.c \
gpr_base_close.c \ gpr_base_close.c \
gpr_base_select.c \ gpr_base_select.c \
gpr_base_simplified_put.c \
gpr_base_simplified_subscribe.c \
gpr_base_xfer_payload.c \ gpr_base_xfer_payload.c \
data_type_support/gpr_data_type_packing_fns.c \ data_type_support/gpr_data_type_packing_fns.c \
data_type_support/gpr_data_type_unpacking_fns.c \ data_type_support/gpr_data_type_unpacking_fns.c \

Просмотреть файл

@ -114,6 +114,54 @@ typedef uint8_t orte_gpr_cmd_flag_t;
OMPI_DECLSPEC int orte_gpr_base_select(void); OMPI_DECLSPEC int orte_gpr_base_select(void);
OMPI_DECLSPEC int orte_gpr_base_close(void); OMPI_DECLSPEC int orte_gpr_base_close(void);
OMPI_DECLSPEC int orte_gpr_base_put_1(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens,
char *key, orte_data_type_t type,
orte_gpr_value_union_t value);
OMPI_DECLSPEC int orte_gpr_base_put_N(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens,
size_t n, char **keys,
orte_data_type_t *types,
orte_gpr_value_union_t *data_values);
OMPI_DECLSPEC int orte_gpr_base_subscribe_1(orte_gpr_subscription_id_t *id,
char *trig_name,
char *sub_name,
orte_gpr_notify_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
char *key,
orte_gpr_notify_cb_fn_t cbfunc,
void *user_tag);
OMPI_DECLSPEC int orte_gpr_base_subscribe_N(orte_gpr_subscription_id_t *id,
char *trig_name,
char *sub_name,
orte_gpr_notify_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
size_t n,
char **keys,
orte_gpr_notify_cb_fn_t cbfunc,
void *user_tag);
OMPI_DECLSPEC int orte_gpr_base_define_trigger(orte_gpr_trigger_id_t *id,
char *trig_name,
orte_gpr_trigger_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
size_t n,
char **keys,
orte_gpr_trigger_cb_fn_t cbfunc,
void *user_tag);
/* general usage functions */ /* general usage functions */
OMPI_DECLSPEC int orte_gpr_base_pack_delete_segment(orte_buffer_t *cmd, OMPI_DECLSPEC int orte_gpr_base_pack_delete_segment(orte_buffer_t *cmd,
char *segment); char *segment);
@ -153,7 +201,7 @@ typedef uint8_t orte_gpr_cmd_flag_t;
size_t *cnt, orte_gpr_value_t ***values); size_t *cnt, orte_gpr_value_t ***values);
OMPI_DECLSPEC int orte_gpr_base_pack_dump_all(orte_buffer_t *cmd); OMPI_DECLSPEC int orte_gpr_base_pack_dump_all(orte_buffer_t *cmd);
OMPI_DECLSPEC int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd); OMPI_DECLSPEC int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd, char *segment);
OMPI_DECLSPEC int orte_gpr_base_pack_dump_triggers(orte_buffer_t *cmd); OMPI_DECLSPEC int orte_gpr_base_pack_dump_triggers(orte_buffer_t *cmd);
OMPI_DECLSPEC int orte_gpr_base_pack_dump_subscriptions(orte_buffer_t *cmd); OMPI_DECLSPEC int orte_gpr_base_pack_dump_subscriptions(orte_buffer_t *cmd);
OMPI_DECLSPEC int orte_gpr_base_pack_dump_callbacks(orte_buffer_t *cmd); OMPI_DECLSPEC int orte_gpr_base_pack_dump_callbacks(orte_buffer_t *cmd);

122
orte/mca/gpr/base/gpr_base_simplified_put.c Обычный файл
Просмотреть файл

@ -0,0 +1,122 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file
*
*/
#include "orte_config.h"
#include "orte/include/orte_constants.h"
#include "opal/util/output.h"
#include "orte/dps/dps.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/gpr/base/base.h"
int orte_gpr_base_put_1(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens,
char *key, orte_data_type_t type,
orte_gpr_value_union_t data_value)
{
orte_gpr_value_t *values;
orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0},
ORTE_GPR_TOKENS_AND,
NULL, 0, NULL, 0, NULL };
orte_gpr_keyval_t *keyvals;
orte_gpr_keyval_t keyval = { {OBJ_CLASS(opal_object_t),0},
NULL,
0 };
int rc;
value.addr_mode = addr_mode;
value.segment = segment;
value.cnt = 1;
keyvals = &keyval;
value.keyvals = &keyvals;
keyval.key = key;
keyval.type = type;
keyval.value = data_value;
value.tokens = tokens;
values = &value;
/* put the value on the registry */
if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &values))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* no memory to clean up since we didn't allocate any */
return ORTE_SUCCESS;
}
int orte_gpr_base_put_N(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens,
size_t n, char **keys,
orte_data_type_t *types,
orte_gpr_value_union_t *data_values)
{
orte_gpr_value_t *values;
orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0},
ORTE_GPR_TOKENS_AND,
NULL, 0, NULL, 0, NULL };
size_t i, j;
int rc;
value.addr_mode = addr_mode;
value.segment = segment;
value.cnt = n;
value.keyvals = (orte_gpr_keyval_t**)malloc(n * sizeof(orte_gpr_keyval_t*));
if (NULL == value.keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (i=0; i < n; i++) {
value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[i]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
for (j=0; j < i; j++) OBJ_RELEASE(value.keyvals[j]);
free(value.keyvals);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.keyvals[i]->key = keys[i];
value.keyvals[i]->type = types[i];
value.keyvals[i]->value = data_values[i];
}
value.tokens = tokens;
values = &value;
/* put the value on the registry */
if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &values))) {
ORTE_ERROR_LOG(rc);
}
/* clean up memory - very carefully!
* We can't use the object destructors because we didn't
* copy input data fields into the objects. Thus, only
* release the data that we explicitly allocated
*/
for (i=0; i < n; i++) free(value.keyvals[i]);
free(value.keyvals);
return rc;
}

Просмотреть файл

@ -0,0 +1,244 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file
*
*/
#include "orte_config.h"
#include "orte/include/orte_constants.h"
#include "opal/util/output.h"
#include "orte/dps/dps.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/gpr/base/base.h"
int orte_gpr_base_subscribe_1(orte_gpr_subscription_id_t *id,
char *trig_name,
char *sub_name,
orte_gpr_notify_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
char *key,
orte_gpr_notify_cb_fn_t cbfunc,
void *user_tag)
{
orte_gpr_value_t *values;
orte_gpr_keyval_t *keyvals;
orte_gpr_keyval_t keyval = { {OBJ_CLASS(opal_object_t),0},
NULL, ORTE_NULL };
orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0},
ORTE_GPR_TOKENS_AND,
NULL, 0, NULL, 0, NULL };
orte_gpr_subscription_t *subs;
orte_gpr_subscription_t sub = { {OBJ_CLASS(opal_object_t),0},
NULL, 0, 0, 0, NULL, 0, NULL };
orte_gpr_trigger_t *trigs;
orte_gpr_trigger_t trig = { {OBJ_CLASS(opal_object_t),0},
NULL, 0, 0, 0, NULL, 0, NULL };
int rc;
/* assemble the subscription object */
subs = &sub;
sub.action = action;
sub.cnt = 1;
values = &value;
sub.values = &values;
sub.cbfunc = cbfunc;
sub.user_tag = user_tag;
value.addr_mode = addr_mode;
value.segment = segment;
value.cnt = 1;
keyvals = &keyval;
value.keyvals = &keyvals;
value.tokens = tokens;
keyval.key = key;
/* assemble the trigger object - all we have here is the name*/
trigs = &trig;
trig.name = trig_name;
/* send the subscription */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
ORTE_ERROR_LOG(rc);
}
/* no memory to cleanup since we didn't allocate anything */
/* return the subscription id */
*id = sub.id;
return rc;
}
int orte_gpr_base_subscribe_N(orte_gpr_subscription_id_t *id,
char *trig_name,
char *sub_name,
orte_gpr_notify_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
size_t n,
char **keys,
orte_gpr_notify_cb_fn_t cbfunc,
void *user_tag)
{
orte_gpr_value_t *values;
orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0},
ORTE_GPR_TOKENS_AND,
NULL, 0, NULL, 0, NULL };
orte_gpr_subscription_t *subs;
orte_gpr_subscription_t sub = { {OBJ_CLASS(opal_object_t),0},
NULL, 0, 0, 0, NULL, 0, NULL };
orte_gpr_trigger_t *trigs;
orte_gpr_trigger_t trig = { {OBJ_CLASS(opal_object_t),0},
NULL, 0, 0, 0, NULL, 0, NULL };
size_t i, j;
int rc;
/* assemble the subscription object */
subs = &sub;
sub.name = sub_name;
sub.action = action;
sub.cnt = 1;
values = &value;
sub.values = &values;
sub.cbfunc = cbfunc;
sub.user_tag = user_tag;
value.addr_mode = addr_mode;
value.segment = segment;
value.cnt = n;
value.keyvals = (orte_gpr_keyval_t**)malloc(n * sizeof(orte_gpr_keyval_t*));
if (NULL == value.keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (i=0; i < n; i++) {
value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[i]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
for (j=0; j < i; j++) OBJ_RELEASE(value.keyvals[j]);
free(value.keyvals);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.keyvals[i]->key = keys[i];
}
value.tokens = tokens;
/* assemble the trigger object - all we have here is the name */
trigs = &trig;
trig.name = trig_name;
/* send the subscription */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) {
ORTE_ERROR_LOG(rc);
}
/* clean up memory - very carefully!
* We can't use the object destructors because we didn't
* copy input data fields into the objects. Thus, only
* release the data that we explicitly allocated
*/
for (i=0; i < n; i++) free(value.keyvals[i]);
free(value.keyvals);
/* return the subscription id */
*id = sub.id;
return rc;
}
int orte_gpr_base_define_trigger(orte_gpr_trigger_id_t *id,
char *trig_name,
orte_gpr_trigger_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
size_t n,
char **keys,
orte_gpr_trigger_cb_fn_t cbfunc,
void *user_tag)
{
orte_gpr_value_t *values;
orte_gpr_value_t value = { {OBJ_CLASS(opal_object_t),0},
ORTE_GPR_TOKENS_AND,
NULL, 0, NULL, 0, NULL };
orte_gpr_trigger_t *trigs;
orte_gpr_trigger_t trig = { {OBJ_CLASS(opal_object_t),0},
NULL, 0, 0, 0, NULL, 0, NULL };
size_t i, j;
int rc;
/* assemble the trigger object */
trig.name = trig_name;
trig.action = action;
trig.cnt = 1;
values = &value;
trig.values = &values;
trig.cbfunc = cbfunc;
trig.user_tag = user_tag;
value.addr_mode = addr_mode;
value.segment = segment;
value.cnt = n;
value.keyvals = (orte_gpr_keyval_t**)malloc(n * sizeof(orte_gpr_keyval_t*));
if (NULL == value.keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (i=0; i < n; i++) {
value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[i]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
for (j=0; j < i; j++) OBJ_RELEASE(value.keyvals[j]);
free(value.keyvals);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.keyvals[i]->key = keys[i];
}
value.tokens = tokens;
/* send the subscription */
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(0, NULL, 1, &trigs))) {
ORTE_ERROR_LOG(rc);
}
/* clean up memory - very carefully!
* We can't use the object destructors because we didn't
* copy input data fields into the objects. Thus, only
* release the data that we explicitly allocated
*/
for (i=0; i < n; i++) free(value.keyvals[i]);
free(value.keyvals);
/* return the subscription id */
*id = trig.id;
return rc;
}

Просмотреть файл

@ -25,8 +25,9 @@
#include "orte_config.h" #include "orte_config.h"
#include "include/orte_constants.h" #include "orte/include/orte_constants.h"
#include "dps/dps.h" #include "orte/dps/dps.h"
#include "orte/mca/errmgr/errmgr.h"
#include "mca/gpr/base/base.h" #include "mca/gpr/base/base.h"
@ -39,13 +40,24 @@ int orte_gpr_base_pack_dump_all(orte_buffer_t *cmd)
return orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD); return orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD);
} }
int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd) int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd, char *segment)
{ {
orte_gpr_cmd_flag_t command; orte_gpr_cmd_flag_t command;
int rc;
command = ORTE_GPR_DUMP_SEGMENTS_CMD; command = ORTE_GPR_DUMP_SEGMENTS_CMD;
return orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD); if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &command, 1, ORTE_GPR_CMD))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_dps.pack(cmd, &segment, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
} }
int orte_gpr_base_pack_dump_triggers(orte_buffer_t *cmd) int orte_gpr_base_pack_dump_triggers(orte_buffer_t *cmd)

Просмотреть файл

@ -246,6 +246,19 @@ typedef int (*orte_gpr_base_module_delete_segment_nb_fn_t)(char *segment,
*/ */
typedef int (*orte_gpr_base_module_put_fn_t)(size_t cnt, orte_gpr_value_t **values); typedef int (*orte_gpr_base_module_put_fn_t)(size_t cnt, orte_gpr_value_t **values);
/* simplified version of the put command */
typedef int (*orte_gpr_base_module_put_1_fn_t)(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens,
char *key, orte_data_type_t type,
orte_gpr_value_union_t value);
typedef int (*orte_gpr_base_module_put_N_fn_t)(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens,
size_t n, char **keys,
orte_data_type_t *types,
orte_gpr_value_union_t *data_values);
/* /*
* Put data on the registry (NON-BLOCKING) * Put data on the registry (NON-BLOCKING)
* A non-blocking version of put. * A non-blocking version of put.
@ -450,6 +463,41 @@ typedef int (*orte_gpr_base_module_subscribe_fn_t)(
size_t num_trigs, size_t num_trigs,
orte_gpr_trigger_t **triggers); orte_gpr_trigger_t **triggers);
/* simplified subscription functions */
typedef int (*orte_gpr_base_module_subscribe_1_fn_t)(orte_gpr_subscription_id_t *id,
char *trig_name,
char *sub_name,
orte_gpr_notify_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
char *key,
orte_gpr_notify_cb_fn_t cbfunc,
void *user_tag);
typedef int (*orte_gpr_base_module_subscribe_N_fn_t)(orte_gpr_subscription_id_t *id,
char *trig_name,
char *sub_name,
orte_gpr_notify_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
size_t n,
char **keys,
orte_gpr_notify_cb_fn_t cbfunc,
void *user_tag);
typedef int (*orte_gpr_base_module_define_trigger_fn_t)(orte_gpr_trigger_id_t *id,
char *trig_name,
orte_gpr_trigger_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
size_t n,
char **keys,
orte_gpr_trigger_cb_fn_t cbfunc,
void *user_tag);
/* /*
* Cancel a subscription. * Cancel a subscription.
* Once a subscription has been entered on the registry, a caller may choose to permanently * Once a subscription has been entered on the registry, a caller may choose to permanently
@ -503,12 +551,16 @@ typedef int (*orte_gpr_base_module_cancel_trigger_fn_t)(orte_gpr_trigger_id_t tr
*/ */
typedef int (*orte_gpr_base_module_dump_all_fn_t)(int output_id); typedef int (*orte_gpr_base_module_dump_all_fn_t)(int output_id);
typedef int (*orte_gpr_base_module_dump_segments_fn_t)(int output_id); typedef int (*orte_gpr_base_module_dump_segment_fn_t)(char *segment, int output_id);
typedef int (*orte_gpr_base_module_dump_triggers_fn_t)(int output_id); typedef int (*orte_gpr_base_module_dump_triggers_fn_t)(int output_id);
typedef int (*orte_gpr_base_module_dump_subscriptions_fn_t)(int output_id); typedef int (*orte_gpr_base_module_dump_subscriptions_fn_t)(int output_id);
typedef int (*orte_gpr_base_module_dump_local_triggers_fn_t)(int output_id);
typedef int (*orte_gpr_base_module_dump_local_subscriptions_fn_t)(int output_id);
typedef int (*orte_gpr_base_module_dump_callbacks_fn_t) (int output_id); typedef int (*orte_gpr_base_module_dump_callbacks_fn_t) (int output_id);
typedef int (*orte_gpr_base_module_dump_notify_msg_fn_t)(orte_gpr_notify_message_t *msg, int output_id); typedef int (*orte_gpr_base_module_dump_notify_msg_fn_t)(orte_gpr_notify_message_t *msg, int output_id);
@ -556,6 +608,8 @@ struct orte_gpr_base_module_1_0_0_t {
/* BLOCKING OPERATIONS */ /* BLOCKING OPERATIONS */
orte_gpr_base_module_get_fn_t get; orte_gpr_base_module_get_fn_t get;
orte_gpr_base_module_put_fn_t put; orte_gpr_base_module_put_fn_t put;
orte_gpr_base_module_put_1_fn_t put_1;
orte_gpr_base_module_put_N_fn_t put_N;
orte_gpr_base_module_delete_entries_fn_t delete_entries; orte_gpr_base_module_delete_entries_fn_t delete_entries;
orte_gpr_base_module_delete_segment_fn_t delete_segment; orte_gpr_base_module_delete_segment_fn_t delete_segment;
orte_gpr_base_module_index_fn_t index; orte_gpr_base_module_index_fn_t index;
@ -573,6 +627,9 @@ struct orte_gpr_base_module_1_0_0_t {
orte_gpr_base_module_decrement_value_fn_t decrement_value; orte_gpr_base_module_decrement_value_fn_t decrement_value;
/* SUBSCRIBE OPERATIONS */ /* SUBSCRIBE OPERATIONS */
orte_gpr_base_module_subscribe_fn_t subscribe; orte_gpr_base_module_subscribe_fn_t subscribe;
orte_gpr_base_module_subscribe_1_fn_t subscribe_1;
orte_gpr_base_module_subscribe_N_fn_t subscribe_N;
orte_gpr_base_module_define_trigger_fn_t define_trigger;
orte_gpr_base_module_unsubscribe_fn_t unsubscribe; orte_gpr_base_module_unsubscribe_fn_t unsubscribe;
orte_gpr_base_module_cancel_trigger_fn_t cancel_trigger; orte_gpr_base_module_cancel_trigger_fn_t cancel_trigger;
/* COMPOUND COMMANDS */ /* COMPOUND COMMANDS */
@ -581,9 +638,11 @@ struct orte_gpr_base_module_1_0_0_t {
orte_gpr_base_module_exec_compound_cmd_fn_t exec_compound_cmd; orte_gpr_base_module_exec_compound_cmd_fn_t exec_compound_cmd;
/* DIAGNOSTIC OPERATIONS */ /* DIAGNOSTIC OPERATIONS */
orte_gpr_base_module_dump_all_fn_t dump_all; orte_gpr_base_module_dump_all_fn_t dump_all;
orte_gpr_base_module_dump_segments_fn_t dump_segments; orte_gpr_base_module_dump_segment_fn_t dump_segment;
orte_gpr_base_module_dump_triggers_fn_t dump_triggers; orte_gpr_base_module_dump_triggers_fn_t dump_triggers;
orte_gpr_base_module_dump_subscriptions_fn_t dump_subscriptions; orte_gpr_base_module_dump_subscriptions_fn_t dump_subscriptions;
orte_gpr_base_module_dump_local_triggers_fn_t dump_local_triggers;
orte_gpr_base_module_dump_local_subscriptions_fn_t dump_local_subscriptions;
orte_gpr_base_module_dump_callbacks_fn_t dump_callbacks; orte_gpr_base_module_dump_callbacks_fn_t dump_callbacks;
orte_gpr_base_module_dump_notify_msg_fn_t dump_notify_msg; orte_gpr_base_module_dump_notify_msg_fn_t dump_notify_msg;
orte_gpr_base_module_dump_notify_data_fn_t dump_notify_data; orte_gpr_base_module_dump_notify_data_fn_t dump_notify_data;

Просмотреть файл

@ -168,7 +168,7 @@ orte_gpr_null_dump_all(int output_id)
} }
static int static int
orte_gpr_null_dump_segments(int output_id) orte_gpr_null_dump_segments(char *segment, int output_id)
{ {
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -185,6 +185,18 @@ orte_gpr_null_dump_subscriptions(int output_id)
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
static int
orte_gpr_null_dump_local_triggers(int output_id)
{
return ORTE_SUCCESS;
}
static int
orte_gpr_null_dump_local_subscriptions(int output_id)
{
return ORTE_SUCCESS;
}
static int static int
orte_gpr_null_dump_callbacks(int output_id) orte_gpr_null_dump_callbacks(int output_id)
{ {
@ -231,6 +243,69 @@ orte_gpr_null_xfer_payload(orte_gpr_value_union_t * dest,
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
static int orte_gpr_null_put_1(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens,
char *key, orte_data_type_t type,
orte_gpr_value_union_t value)
{
return ORTE_SUCCESS;
}
static int orte_gpr_null_put_N(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens,
size_t n, char **keys,
orte_data_type_t *types,
orte_gpr_value_union_t *data_values)
{
return ORTE_SUCCESS;
}
static int orte_gpr_null_subscribe_1(orte_gpr_subscription_id_t *id,
char *trig_name,
char *sub_name,
orte_gpr_notify_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
char *key,
orte_gpr_notify_cb_fn_t cbfunc,
void *user_tag)
{
return ORTE_SUCCESS;
}
static int orte_gpr_null_subscribe_N(orte_gpr_subscription_id_t *id,
char *trig_name,
char *sub_name,
orte_gpr_notify_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
size_t n,
char **keys,
orte_gpr_notify_cb_fn_t cbfunc,
void *user_tag)
{
return ORTE_SUCCESS;
}
static int orte_gpr_null_define_trigger(orte_gpr_trigger_id_t *id,
char *trig_name,
orte_gpr_trigger_action_t action,
orte_gpr_addr_mode_t addr_mode,
char *segment,
char **tokens,
size_t n,
char **keys,
orte_gpr_trigger_cb_fn_t cbfunc,
void *user_tag)
{
return ORTE_SUCCESS;
}
/* /*
@ -242,6 +317,8 @@ orte_gpr_base_module_t orte_gpr_null_module = {
/* BLOCKING OPERATIONS */ /* BLOCKING OPERATIONS */
orte_gpr_null_get, orte_gpr_null_get,
orte_gpr_null_put, orte_gpr_null_put,
orte_gpr_null_put_1,
orte_gpr_null_put_N,
orte_gpr_null_delete_entries, orte_gpr_null_delete_entries,
orte_gpr_null_delete_segment, orte_gpr_null_delete_segment,
orte_gpr_null_index, orte_gpr_null_index,
@ -259,6 +336,9 @@ orte_gpr_base_module_t orte_gpr_null_module = {
orte_gpr_null_decrement_value, orte_gpr_null_decrement_value,
/* SUBSCRIBE OPERATIONS */ /* SUBSCRIBE OPERATIONS */
orte_gpr_null_subscribe, orte_gpr_null_subscribe,
orte_gpr_null_subscribe_1,
orte_gpr_null_subscribe_N,
orte_gpr_null_define_trigger,
orte_gpr_null_unsubscribe, orte_gpr_null_unsubscribe,
orte_gpr_null_cancel_trigger, orte_gpr_null_cancel_trigger,
/* COMPOUND COMMANDS */ /* COMPOUND COMMANDS */
@ -270,6 +350,8 @@ orte_gpr_base_module_t orte_gpr_null_module = {
orte_gpr_null_dump_segments, orte_gpr_null_dump_segments,
orte_gpr_null_dump_triggers, orte_gpr_null_dump_triggers,
orte_gpr_null_dump_subscriptions, orte_gpr_null_dump_subscriptions,
orte_gpr_null_dump_local_triggers,
orte_gpr_null_dump_local_subscriptions,
orte_gpr_null_dump_callbacks, orte_gpr_null_dump_callbacks,
orte_gpr_null_dump_notify_msg, orte_gpr_null_dump_notify_msg,
orte_gpr_null_dump_notify_data, orte_gpr_null_dump_notify_data,

Просмотреть файл

@ -24,6 +24,7 @@ sources = \
gpr_proxy_del_index.c \ gpr_proxy_del_index.c \
gpr_proxy_cleanup.c \ gpr_proxy_cleanup.c \
gpr_proxy_dump.c \ gpr_proxy_dump.c \
gpr_proxy_dump_local_trigs_subs.c \
gpr_proxy_internals.c \ gpr_proxy_internals.c \
gpr_proxy_put_get.c \ gpr_proxy_put_get.c \
gpr_proxy_general_operations.c \ gpr_proxy_general_operations.c \

Просмотреть файл

@ -55,6 +55,7 @@ int orte_gpr_proxy_finalize(void);
typedef struct { typedef struct {
opal_object_t super; /**< Allows this to be an object */ opal_object_t super; /**< Allows this to be an object */
orte_gpr_subscription_id_t id; /**< id of this subscription */ orte_gpr_subscription_id_t id; /**< id of this subscription */
char *name;
orte_gpr_notify_cb_fn_t callback; /**< Function to be called for notificaiton */ orte_gpr_notify_cb_fn_t callback; /**< Function to be called for notificaiton */
void *user_tag; /**< User-provided tag for callback function */ void *user_tag; /**< User-provided tag for callback function */
} orte_gpr_proxy_subscriber_t; } orte_gpr_proxy_subscriber_t;
@ -65,6 +66,7 @@ OBJ_CLASS_DECLARATION(orte_gpr_proxy_subscriber_t);
typedef struct { typedef struct {
opal_object_t super; /**< Allows this to be an object */ opal_object_t super; /**< Allows this to be an object */
orte_gpr_trigger_id_t id; /**< id of this trigger */ orte_gpr_trigger_id_t id; /**< id of this trigger */
char *name;
orte_gpr_trigger_cb_fn_t callback; /**< Function to be called for notification */ orte_gpr_trigger_cb_fn_t callback; /**< Function to be called for notification */
void *user_tag; /**< User-provided tag for callback function */ void *user_tag; /**< User-provided tag for callback function */
} orte_gpr_proxy_trigger_t; } orte_gpr_proxy_trigger_t;
@ -173,12 +175,16 @@ int orte_gpr_proxy_cancel_trigger(orte_gpr_trigger_id_t trig);
*/ */
int orte_gpr_proxy_dump_all(int output_id); int orte_gpr_proxy_dump_all(int output_id);
int orte_gpr_proxy_dump_segments(int output_id); int orte_gpr_proxy_dump_segments(char *segment, int output_id);
int orte_gpr_proxy_dump_triggers(int output_id); int orte_gpr_proxy_dump_triggers(int output_id);
int orte_gpr_proxy_dump_subscriptions(int output_id); int orte_gpr_proxy_dump_subscriptions(int output_id);
int orte_gpr_proxy_dump_local_triggers(int output_id);
int orte_gpr_proxy_dump_local_subscriptions(int output_id);
int orte_gpr_proxy_dump_callbacks(int output_id); int orte_gpr_proxy_dump_callbacks(int output_id);
int orte_gpr_proxy_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id); int orte_gpr_proxy_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id);

Просмотреть файл

@ -70,6 +70,8 @@ static orte_gpr_base_module_t orte_gpr_proxy = {
/* BLOCKING OPERATIONS */ /* BLOCKING OPERATIONS */
orte_gpr_proxy_get, orte_gpr_proxy_get,
orte_gpr_proxy_put, orte_gpr_proxy_put,
orte_gpr_base_put_1,
orte_gpr_base_put_N,
orte_gpr_proxy_delete_entries, orte_gpr_proxy_delete_entries,
orte_gpr_proxy_delete_segment, orte_gpr_proxy_delete_segment,
orte_gpr_proxy_index, orte_gpr_proxy_index,
@ -87,6 +89,9 @@ static orte_gpr_base_module_t orte_gpr_proxy = {
orte_gpr_proxy_decrement_value, orte_gpr_proxy_decrement_value,
/* SUBSCRIBE OPERATIONS */ /* SUBSCRIBE OPERATIONS */
orte_gpr_proxy_subscribe, orte_gpr_proxy_subscribe,
orte_gpr_base_subscribe_1,
orte_gpr_base_subscribe_N,
orte_gpr_base_define_trigger,
orte_gpr_proxy_unsubscribe, orte_gpr_proxy_unsubscribe,
orte_gpr_proxy_cancel_trigger, orte_gpr_proxy_cancel_trigger,
/* COMPOUND COMMANDS */ /* COMPOUND COMMANDS */
@ -98,6 +103,8 @@ static orte_gpr_base_module_t orte_gpr_proxy = {
orte_gpr_proxy_dump_segments, orte_gpr_proxy_dump_segments,
orte_gpr_proxy_dump_triggers, orte_gpr_proxy_dump_triggers,
orte_gpr_proxy_dump_subscriptions, orte_gpr_proxy_dump_subscriptions,
orte_gpr_proxy_dump_local_triggers,
orte_gpr_proxy_dump_local_subscriptions,
orte_gpr_proxy_dump_callbacks, orte_gpr_proxy_dump_callbacks,
orte_gpr_proxy_dump_notify_msg, orte_gpr_proxy_dump_notify_msg,
orte_gpr_proxy_dump_notify_data, orte_gpr_proxy_dump_notify_data,
@ -125,11 +132,13 @@ static void orte_gpr_proxy_subscriber_construct(orte_gpr_proxy_subscriber_t* req
req->callback = NULL; req->callback = NULL;
req->user_tag = NULL; req->user_tag = NULL;
req->id = 0; req->id = 0;
req->name = NULL;
} }
/* destructor - used to free any resources held by instance */ /* destructor - used to free any resources held by instance */
static void orte_gpr_proxy_subscriber_destructor(orte_gpr_proxy_subscriber_t* req) static void orte_gpr_proxy_subscriber_destructor(orte_gpr_proxy_subscriber_t* req)
{ {
if (NULL != req->name) free(req->name);
} }
/* define instance of opal_class_t */ /* define instance of opal_class_t */
@ -147,11 +156,13 @@ static void orte_gpr_proxy_trigger_construct(orte_gpr_proxy_trigger_t* req)
req->callback = NULL; req->callback = NULL;
req->user_tag = NULL; req->user_tag = NULL;
req->id = 0; req->id = 0;
req->name = NULL;
} }
/* destructor - used to free any resources held by instance */ /* destructor - used to free any resources held by instance */
static void orte_gpr_proxy_trigger_destructor(orte_gpr_proxy_trigger_t* req) static void orte_gpr_proxy_trigger_destructor(orte_gpr_proxy_trigger_t* req)
{ {
if (NULL != req->name) free(req->name);
} }
/* define instance of opal_class_t */ /* define instance of opal_class_t */

Просмотреть файл

@ -106,7 +106,7 @@ int orte_gpr_proxy_dump_all(int output_id)
return rc; return rc;
} }
int orte_gpr_proxy_dump_segments(int output_id) int orte_gpr_proxy_dump_segments(char *segment, int output_id)
{ {
orte_gpr_cmd_flag_t command; orte_gpr_cmd_flag_t command;
orte_buffer_t *cmd; orte_buffer_t *cmd;
@ -115,7 +115,7 @@ int orte_gpr_proxy_dump_segments(int output_id)
size_t n; size_t n;
if (orte_gpr_proxy_globals.compound_cmd_mode) { if (orte_gpr_proxy_globals.compound_cmd_mode) {
return orte_gpr_base_pack_dump_segments(orte_gpr_proxy_globals.compound_cmd); return orte_gpr_base_pack_dump_segments(orte_gpr_proxy_globals.compound_cmd, segment);
} }
cmd = OBJ_NEW(orte_buffer_t); cmd = OBJ_NEW(orte_buffer_t);
@ -124,7 +124,7 @@ int orte_gpr_proxy_dump_segments(int output_id)
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_dump_segments(cmd))) { if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_dump_segments(cmd, segment))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cmd); OBJ_RELEASE(cmd);
return rc; return rc;

Просмотреть файл

@ -0,0 +1,94 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
* The Open MPI general purpose registry - implementation.
*
*/
/*
* includes
*/
#include "orte_config.h"
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_LIBGEN_H
#include <libgen.h>
#endif
#include "orte/include/orte_constants.h"
#include "orte/mca/errmgr/errmgr.h"
#include "opal/util/output.h"
#include "gpr_proxy.h"
int orte_gpr_proxy_dump_local_triggers(int output_id)
{
orte_gpr_proxy_trigger_t **trigs;
size_t j, k;
opal_output(output_id, "DUMP OF LOCAL TRIGGERS for [%lu,%lu,%lu]\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
opal_output(output_id, "Number of triggers: %lu\n", (unsigned long) orte_gpr_proxy_globals.num_trigs);
trigs = (orte_gpr_proxy_trigger_t**)(orte_gpr_proxy_globals.triggers)->addr;
for (j=0, k=0; k < orte_gpr_proxy_globals.num_trigs &&
j < (orte_gpr_proxy_globals.triggers)->size; j++) {
if (NULL != trigs[j]) {
k++;
opal_output(output_id, "Data for trigger %lu", (unsigned long) trigs[j]->id);
if (NULL == trigs[j]->name) {
opal_output(output_id, "\tNOT a named trigger");
} else {
opal_output(output_id, "\ttrigger name: %s", trigs[j]->name);
}
}
}
return ORTE_SUCCESS;
}
int orte_gpr_proxy_dump_local_subscriptions(int output_id)
{
orte_gpr_proxy_subscriber_t **subs;
size_t j, k;
opal_output(output_id, "DUMP OF LOCAL SUBSCRIPTIONS for [%lu,%lu,%lu]\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
opal_output(output_id, "Number of subscriptions: %lu\n", (unsigned long) orte_gpr_proxy_globals.num_subs);
subs = (orte_gpr_proxy_subscriber_t**)(orte_gpr_proxy_globals.subscriptions)->addr;
for (j=0, k=0; k < orte_gpr_proxy_globals.num_subs &&
j < (orte_gpr_proxy_globals.subscriptions)->size; j++) {
if (NULL != subs[j]) {
k++;
opal_output(output_id, "Data for subscription %lu", (unsigned long) subs[j]->id);
if (NULL == subs[j]->name) {
opal_output(output_id, "\tNOT a named subscription");
} else {
opal_output(output_id, "\tsubscription name: %s", subs[j]->name);
}
}
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -25,16 +25,11 @@
#include "orte_config.h" #include "orte_config.h"
#include "include/orte_constants.h" #include "orte/include/orte_constants.h"
#include "opal/util/output.h"
#include "util/proc_info.h"
#include "mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "mca/ns/ns_types.h" #include "orte/class/orte_pointer_array.h"
#include "mca/oob/oob_types.h" #include "orte/mca/gpr/proxy/gpr_proxy.h"
#include "mca/rml/rml.h"
#include "gpr_proxy.h"
int int
orte_gpr_proxy_enter_subscription(size_t cnt, orte_gpr_subscription_t **subscriptions) orte_gpr_proxy_enter_subscription(size_t cnt, orte_gpr_subscription_t **subscriptions)
@ -48,6 +43,9 @@ orte_gpr_proxy_enter_subscription(size_t cnt, orte_gpr_subscription_t **subscrip
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
if (NULL != subscriptions[i]->name) {
sub->name = strdup(subscriptions[i]->name);
}
sub->callback = subscriptions[i]->cbfunc; sub->callback = subscriptions[i]->cbfunc;
sub->user_tag = subscriptions[i]->user_tag; sub->user_tag = subscriptions[i]->user_tag;
if (0 > orte_pointer_array_add(&id, orte_gpr_proxy_globals.subscriptions, sub)) { if (0 > orte_pointer_array_add(&id, orte_gpr_proxy_globals.subscriptions, sub)) {
@ -75,6 +73,9 @@ orte_gpr_proxy_enter_trigger(size_t cnt, orte_gpr_trigger_t **trigs)
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
if (NULL != trigs[i]->name) {
trig->name = strdup(trigs[i]->name);
}
/* ensure that the proper routing flag is set /* ensure that the proper routing flag is set
* in the action field to match the trigger callback * in the action field to match the trigger callback
* function * function

Просмотреть файл

@ -24,6 +24,7 @@ libmca_gpr_replica_api_la_SOURCES = \
gpr_replica_compound_cmd_api.c \ gpr_replica_compound_cmd_api.c \
gpr_replica_del_index_api.c \ gpr_replica_del_index_api.c \
gpr_replica_dump_api.c \ gpr_replica_dump_api.c \
gpr_replica_dump_local_trigs_subs_api.c \
gpr_replica_arithmetic_ops_api.c \ gpr_replica_arithmetic_ops_api.c \
gpr_replica_put_get_api.c \ gpr_replica_put_get_api.c \
gpr_replica_subscribe_api.c gpr_replica_subscribe_api.c

Просмотреть файл

@ -127,12 +127,16 @@ int orte_gpr_replica_cancel_trigger(orte_gpr_trigger_id_t trig);
*/ */
int orte_gpr_replica_dump_all(int output_id); int orte_gpr_replica_dump_all(int output_id);
int orte_gpr_replica_dump_segments(int output_id); int orte_gpr_replica_dump_segments(char *segment, int output_id);
int orte_gpr_replica_dump_triggers(int output_id); int orte_gpr_replica_dump_triggers(int output_id);
int orte_gpr_replica_dump_subscriptions(int output_id); int orte_gpr_replica_dump_subscriptions(int output_id);
int orte_gpr_replica_dump_local_triggers(int output_id);
int orte_gpr_replica_dump_local_subscriptions(int output_id);
int orte_gpr_replica_dump_callbacks(int output_id); int orte_gpr_replica_dump_callbacks(int output_id);
int orte_gpr_replica_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id); int orte_gpr_replica_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id);

Просмотреть файл

@ -66,7 +66,7 @@ int orte_gpr_replica_dump_all(int output_id)
return rc; return rc;
} }
int orte_gpr_replica_dump_segments(int output_id) int orte_gpr_replica_dump_segments(char *segment, int output_id)
{ {
orte_buffer_t *buffer; orte_buffer_t *buffer;
int rc; int rc;
@ -84,7 +84,7 @@ int orte_gpr_replica_dump_segments(int output_id)
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_segments_fn(buffer))) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_segments_fn(buffer, segment))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }

Просмотреть файл

@ -0,0 +1,84 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
* The Open MPI general purpose registry - implementation.
*
*/
/*
* includes
*/
#include "orte_config.h"
#include "opal/util/output.h"
#include "util/proc_info.h"
#include "mca/ns/ns_types.h"
#include "mca/errmgr/errmgr.h"
#include "mca/gpr/replica/api_layer/gpr_replica_api.h"
int orte_gpr_replica_dump_local_triggers(int output_id)
{
orte_gpr_replica_local_trigger_t **trigs;
size_t j, k;
opal_output(output_id, "DUMP OF LOCAL TRIGGERS for [%lu,%lu,%lu]\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
opal_output(output_id, "Number of triggers: %lu\n", (unsigned long) orte_gpr_replica_globals.num_local_trigs);
trigs = (orte_gpr_replica_local_trigger_t**)(orte_gpr_replica_globals.local_triggers)->addr;
for (j=0, k=0; k < orte_gpr_replica_globals.num_local_trigs &&
j < (orte_gpr_replica_globals.local_triggers)->size; j++) {
if (NULL != trigs[j]) {
k++;
opal_output(output_id, "Data for trigger %lu", (unsigned long) trigs[j]->id);
if (NULL == trigs[j]->name) {
opal_output(output_id, "\tNOT a named trigger");
} else {
opal_output(output_id, "\ttrigger name: %s", trigs[j]->name);
}
}
}
return ORTE_SUCCESS;
}
int orte_gpr_replica_dump_local_subscriptions(int output_id)
{
orte_gpr_replica_local_subscriber_t **subs;
size_t j, k;
opal_output(output_id, "DUMP OF LOCAL SUBSCRIPTIONS for [%lu,%lu,%lu]\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
opal_output(output_id, "Number of subscriptions: %lu\n", (unsigned long) orte_gpr_replica_globals.num_local_subs);
subs = (orte_gpr_replica_local_subscriber_t**)(orte_gpr_replica_globals.local_subscriptions)->addr;
for (j=0, k=0; k < orte_gpr_replica_globals.num_local_subs &&
j < (orte_gpr_replica_globals.local_subscriptions)->size; j++) {
if (NULL != subs[j]) {
k++;
opal_output(output_id, "Data for subscription %lu", (unsigned long) subs[j]->id);
if (NULL == subs[j]->name) {
opal_output(output_id, "\tNOT a named subscription");
} else {
opal_output(output_id, "\tsubscription name: %s", subs[j]->name);
}
}
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -204,7 +204,7 @@ int orte_gpr_replica_process_command_buffer(orte_buffer_t *input_buffer,
opal_output(0, "\tdump segments cmd"); opal_output(0, "\tdump segments cmd");
} }
if (ORTE_SUCCESS != (ret = orte_gpr_replica_recv_dump_segments_cmd(answer))) { if (ORTE_SUCCESS != (ret = orte_gpr_replica_recv_dump_segments_cmd(input_buffer, answer))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
goto RETURN_ERROR; goto RETURN_ERROR;
} }

Просмотреть файл

@ -109,7 +109,7 @@ int orte_gpr_replica_recv_cancel_trigger_cmd(orte_process_name_t* sender,
int orte_gpr_replica_recv_dump_all_cmd(orte_buffer_t *answer); int orte_gpr_replica_recv_dump_all_cmd(orte_buffer_t *answer);
int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *answer); int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *input_buffer, orte_buffer_t *answer);
int orte_gpr_replica_recv_dump_triggers_cmd(orte_buffer_t *answer); int orte_gpr_replica_recv_dump_triggers_cmd(orte_buffer_t *answer);

Просмотреть файл

@ -49,9 +49,11 @@ int orte_gpr_replica_recv_dump_all_cmd(orte_buffer_t *answer)
return rc; return rc;
} }
int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *answer) int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *input_buffer, orte_buffer_t *answer)
{ {
orte_gpr_cmd_flag_t command=ORTE_GPR_DUMP_SEGMENTS_CMD; orte_gpr_cmd_flag_t command=ORTE_GPR_DUMP_SEGMENTS_CMD;
char *segment;
size_t n;
int rc; int rc;
if (ORTE_SUCCESS != (rc = orte_dps.pack(answer, &command, 1, ORTE_GPR_CMD))) { if (ORTE_SUCCESS != (rc = orte_dps.pack(answer, &command, 1, ORTE_GPR_CMD))) {
@ -59,7 +61,12 @@ int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *answer)
return rc; return rc;
} }
rc = orte_gpr_replica_dump_segments_fn(answer); n=1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(input_buffer, &segment, &n, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_gpr_replica_dump_segments_fn(answer, segment);
if (ORTE_SUCCESS != rc) { if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -70,20 +70,53 @@ int orte_gpr_replica_dump_all_fn(orte_buffer_t *buffer)
return rc; return rc;
} }
rc = orte_gpr_replica_dump_segments_fn(buffer); rc = orte_gpr_replica_dump_segments_fn(buffer, NULL);
return rc; return rc;
} }
int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer) int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer, char *segment)
{
orte_gpr_replica_segment_t **seg, *segptr;
size_t i, m;
int rc;
/* if segment = NULL, loop through all segments */
if (NULL == segment) {
seg = (orte_gpr_replica_segment_t**)(orte_gpr_replica.segments)->addr;
for (i=0, m=0; m < orte_gpr_replica.num_segs &&
i < (orte_gpr_replica.segments)->size; i++) {
if (NULL != seg[i]) {
m++;
if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_a_segment_fn(buffer, seg[i]))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
}
return ORTE_SUCCESS;
}
/* otherwise, dump just the one specified */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_seg(&segptr, false, segment))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_a_segment_fn(buffer, segptr))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
int orte_gpr_replica_dump_a_segment_fn(orte_buffer_t *buffer, orte_gpr_replica_segment_t *seg)
{ {
orte_gpr_replica_segment_t **seg;
orte_gpr_replica_container_t **cptr; orte_gpr_replica_container_t **cptr;
orte_gpr_replica_itag_t *itaglist; orte_gpr_replica_itag_t *itaglist;
orte_gpr_replica_itagval_t **iptr; orte_gpr_replica_itagval_t **iptr;
char *token; char *token;
size_t num_objects; size_t num_objects;
size_t i, j, k, m, n, p; size_t j, k, n, p;
char *tmp_out; char *tmp_out;
tmp_out = (char*)malloc(1000); tmp_out = (char*)malloc(1000);
@ -92,28 +125,19 @@ int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer)
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
sprintf(tmp_out, "\nDUMP OF GPR SEGMENTS"); sprintf(tmp_out, "\nDUMP OF GPR SEGMENT %s", seg->name);
orte_gpr_replica_dump_load_string(buffer, &tmp_out); orte_gpr_replica_dump_load_string(buffer, &tmp_out);
/* loop through all segments */ num_objects = (seg->containers)->size - (seg->containers)->number_free;
seg = (orte_gpr_replica_segment_t**)(orte_gpr_replica.segments)->addr;
for (i=0, m=0; m < orte_gpr_replica.num_segs &&
i < (orte_gpr_replica.segments)->size; i++) {
if (NULL != seg[i]) {
m++;
sprintf(tmp_out, "\nGPR Dump for Segment: %s", seg[i]->name);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
num_objects = (seg[i]->containers)->size - (seg[i]->containers)->number_free;
sprintf(tmp_out, "\tNumber of containers: %lu\n", sprintf(tmp_out, "\tNumber of containers: %lu\n",
(unsigned long) num_objects); (unsigned long) num_objects);
orte_gpr_replica_dump_load_string(buffer, &tmp_out); orte_gpr_replica_dump_load_string(buffer, &tmp_out);
/* loop through all containers and print their info and contents */ /* loop through all containers and print their info and contents */
cptr = (orte_gpr_replica_container_t**)(seg[i]->containers)->addr; cptr = (orte_gpr_replica_container_t**)(seg->containers)->addr;
for (j=0, n=0; n < seg[i]->num_containers && for (j=0, n=0; n < seg->num_containers &&
j < (seg[i]->containers)->size; j++) { j < (seg->containers)->size; j++) {
if (NULL != cptr[j]) { if (NULL != cptr[j]) {
n++; n++;
sprintf(tmp_out, "\n\tInfo for container %lu" sprintf(tmp_out, "\n\tInfo for container %lu"
@ -127,7 +151,7 @@ int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer)
itaglist = cptr[j]->itags; itaglist = cptr[j]->itags;
for (k=0; k < cptr[j]->num_itags; k++) { for (k=0; k < cptr[j]->num_itags; k++) {
if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup( if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup(
&token, seg[i], itaglist[k])) { &token, seg, itaglist[k])) {
sprintf(tmp_out, "\t\titag num %lu" sprintf(tmp_out, "\t\titag num %lu"
": No entry found for itag %lu", ": No entry found for itag %lu",
(unsigned long) k, (unsigned long) k,
@ -151,7 +175,7 @@ int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer)
if (NULL != iptr[k]) { if (NULL != iptr[k]) {
p++; p++;
if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup( if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup(
&token, seg[i], iptr[k]->itag)) { &token, seg, iptr[k]->itag)) {
sprintf(tmp_out, "\n\t\titag num %lu: No entry found for itag %lu", sprintf(tmp_out, "\n\t\titag num %lu: No entry found for itag %lu",
(unsigned long) k, (unsigned long) k,
(unsigned long) iptr[k]->itag); (unsigned long) iptr[k]->itag);
@ -167,8 +191,6 @@ int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer)
} }
} }
} }
}
}
free(tmp_out); free(tmp_out);
return ORTE_SUCCESS; return ORTE_SUCCESS;

Просмотреть файл

@ -122,7 +122,9 @@ int orte_gpr_replica_subscribe_fn(orte_process_name_t *requestor,
*/ */
int orte_gpr_replica_dump_all_fn(orte_buffer_t *buffer); int orte_gpr_replica_dump_all_fn(orte_buffer_t *buffer);
int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer); int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer, char *segment);
int orte_gpr_replica_dump_a_segment_fn(orte_buffer_t *buffer, orte_gpr_replica_segment_t *seg);
int orte_gpr_replica_dump_triggers_fn(orte_buffer_t *buffer); int orte_gpr_replica_dump_triggers_fn(orte_buffer_t *buffer);

Просмотреть файл

@ -48,6 +48,9 @@ orte_gpr_replica_enter_local_subscription(size_t cnt, orte_gpr_subscription_t **
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
if (NULL != subscriptions[i]->name) {
sub->name = strdup(subscriptions[i]->name);
}
sub->callback = subscriptions[i]->cbfunc; sub->callback = subscriptions[i]->cbfunc;
sub->user_tag = subscriptions[i]->user_tag; sub->user_tag = subscriptions[i]->user_tag;
if (0 > orte_pointer_array_add(&id, orte_gpr_replica_globals.local_subscriptions, sub)) { if (0 > orte_pointer_array_add(&id, orte_gpr_replica_globals.local_subscriptions, sub)) {

Просмотреть файл

@ -77,6 +77,7 @@ typedef uint8_t orte_gpr_replica_action_t;
typedef struct { typedef struct {
opal_object_t super; /**< Allows this to be an object */ opal_object_t super; /**< Allows this to be an object */
orte_gpr_subscription_id_t id; /**< id of this subscription */ orte_gpr_subscription_id_t id; /**< id of this subscription */
char *name;
orte_gpr_notify_cb_fn_t callback; /**< Function to be called for notificaiton */ orte_gpr_notify_cb_fn_t callback; /**< Function to be called for notificaiton */
void *user_tag; /**< User-provided tag for callback function */ void *user_tag; /**< User-provided tag for callback function */
} orte_gpr_replica_local_subscriber_t; } orte_gpr_replica_local_subscriber_t;
@ -91,6 +92,7 @@ OBJ_CLASS_DECLARATION(orte_gpr_replica_local_subscriber_t);
typedef struct { typedef struct {
opal_object_t super; /**< Allows this to be an object */ opal_object_t super; /**< Allows this to be an object */
orte_gpr_trigger_id_t id; /**< id of this trigger */ orte_gpr_trigger_id_t id; /**< id of this trigger */
char *name;
orte_gpr_trigger_cb_fn_t callback; /**< Function to be called for notification */ orte_gpr_trigger_cb_fn_t callback; /**< Function to be called for notification */
void *user_tag; /**< User-provided tag for callback function */ void *user_tag; /**< User-provided tag for callback function */
} orte_gpr_replica_local_trigger_t; } orte_gpr_replica_local_trigger_t;

Просмотреть файл

@ -34,23 +34,40 @@
*/ */
/* LOCAL_SUBSCRIBER */ /* LOCAL_SUBSCRIBER */
/* no constructor or destructor needed, so just static void orte_gpr_replica_local_subscriber_constructor(orte_gpr_replica_local_subscriber_t *ptr)
* define instance */ {
ptr->name = NULL;
}
static void orte_gpr_replica_local_subscriber_destructor(orte_gpr_replica_local_subscriber_t *ptr)
{
if (NULL != ptr->name) free(ptr->name);
}
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(
orte_gpr_replica_local_subscriber_t, /* type name */ orte_gpr_replica_local_subscriber_t, /* type name */
opal_object_t, /* parent "class" name */ opal_object_t, /* parent "class" name */
NULL, /* constructor */ orte_gpr_replica_local_subscriber_constructor, /* constructor */
NULL); /* destructor */ orte_gpr_replica_local_subscriber_destructor); /* destructor */
/* LOCAL_TRIGGER */ /* LOCAL_TRIGGER */
/* no constructor or destructor needed, so just static void orte_gpr_replica_local_trigger_constructor(orte_gpr_replica_local_trigger_t *ptr)
* define instance */ {
ptr->name = NULL;
}
static void orte_gpr_replica_local_trigger_destructor(orte_gpr_replica_local_trigger_t *ptr)
{
if (NULL != ptr->name) free(ptr->name);
}
/* define instance */
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(
orte_gpr_replica_local_trigger_t, /* type name */ orte_gpr_replica_local_trigger_t, /* type name */
opal_object_t, /* parent "class" name */ opal_object_t, /* parent "class" name */
NULL, /* constructor */ orte_gpr_replica_local_trigger_constructor, /* constructor */
NULL); /* destructor */ orte_gpr_replica_local_trigger_destructor); /* destructor */
/* SEGMENT */ /* SEGMENT */

Просмотреть файл

@ -68,6 +68,8 @@ static orte_gpr_base_module_t orte_gpr_replica_module = {
/* BLOCKING OPERATIONS */ /* BLOCKING OPERATIONS */
orte_gpr_replica_get, orte_gpr_replica_get,
orte_gpr_replica_put, orte_gpr_replica_put,
orte_gpr_base_put_1,
orte_gpr_base_put_N,
orte_gpr_replica_delete_entries, orte_gpr_replica_delete_entries,
orte_gpr_replica_delete_segment, orte_gpr_replica_delete_segment,
orte_gpr_replica_index, orte_gpr_replica_index,
@ -85,6 +87,9 @@ static orte_gpr_base_module_t orte_gpr_replica_module = {
orte_gpr_replica_decrement_value, orte_gpr_replica_decrement_value,
/* SUBSCRIBE OPERATIONS */ /* SUBSCRIBE OPERATIONS */
orte_gpr_replica_subscribe, orte_gpr_replica_subscribe,
orte_gpr_base_subscribe_1,
orte_gpr_base_subscribe_N,
orte_gpr_base_define_trigger,
orte_gpr_replica_unsubscribe, orte_gpr_replica_unsubscribe,
orte_gpr_replica_cancel_trigger, orte_gpr_replica_cancel_trigger,
/* COMPOUND COMMANDS */ /* COMPOUND COMMANDS */
@ -96,6 +101,8 @@ static orte_gpr_base_module_t orte_gpr_replica_module = {
orte_gpr_replica_dump_segments, orte_gpr_replica_dump_segments,
orte_gpr_replica_dump_triggers, orte_gpr_replica_dump_triggers,
orte_gpr_replica_dump_subscriptions, orte_gpr_replica_dump_subscriptions,
orte_gpr_replica_dump_local_triggers,
orte_gpr_replica_dump_local_subscriptions,
orte_gpr_replica_dump_callbacks, orte_gpr_replica_dump_callbacks,
orte_gpr_replica_dump_notify_msg, orte_gpr_replica_dump_notify_msg,
orte_gpr_replica_dump_notify_data, orte_gpr_replica_dump_notify_data,

Просмотреть файл

@ -292,7 +292,7 @@ MOVEON:
can_launch = true; can_launch = true;
} }
orte_gpr.dump_segments(0); orte_gpr.dump_segment(NULL, 0);
if (!can_launch || ORTE_CELLID_MAX == cellid) { if (!can_launch || ORTE_CELLID_MAX == cellid) {
return ORTE_ERR_UNREACH; return ORTE_ERR_UNREACH;