1
1

Fix a subtle bug in the registry callback system that was manifesting itself in the singleton case and (randomly) in the multiprocess case.

Update the unit-test-status matrix to include priority.

Add several new registry diagnostics that helped track down the above bug.
M    test/mca/gpr/gpr_triggers.c
M    test/Unit-Test-Status.xls
M    test/Unit-Test-Status.pdf
M    src/mpi/runtime/ompi_mpi_init.c
M    src/mca/oob/base/oob_base_xcast.c
M    src/mca/ns/base/ns_base_nds_env.c
M    src/mca/gpr/replica/api_layer/gpr_replica_dump_api.c
M    src/mca/gpr/replica/api_layer/gpr_replica_api.h
M    src/mca/gpr/replica/communications/gpr_replica_comm.h
M    src/mca/gpr/replica/communications/gpr_replica_remote_msg.c
M    src/mca/gpr/replica/communications/gpr_replica_cmd_processor.c
M    src/mca/gpr/replica/communications/gpr_replica_dump_cm.c
M    src/mca/gpr/replica/gpr_replica_component.c
M    src/mca/gpr/replica/gpr_replica.h
M    src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c
M    src/mca/gpr/replica/functional_layer/gpr_replica_fn.h
M    src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c
M    src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c
M    src/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c
M    src/mca/gpr/proxy/gpr_proxy_dump.c
M    src/mca/gpr/proxy/gpr_proxy.h
M    src/mca/gpr/proxy/gpr_proxy_component.c
M    src/mca/gpr/gpr_types.h
M    src/mca/gpr/base/base.h
M    src/mca/gpr/base/unpack_api_response/gpr_base_dump_notify.c
M    src/mca/gpr/base/pack_api_cmd/gpr_base_pack_dump.c
M    src/mca/gpr/gpr.h

This commit was SVN r5080.
Этот коммит содержится в:
Ralph Castain 2005-03-28 22:37:54 +00:00
родитель 28c01fd07e
Коммит dfe49d0fd2
27 изменённых файлов: 949 добавлений и 335 удалений

Просмотреть файл

@ -142,11 +142,15 @@ extern "C" {
OMPI_DECLSPEC int orte_gpr_base_pack_dump_all(orte_buffer_t *cmd);
OMPI_DECLSPEC int orte_gpr_base_pack_dump_segments(orte_buffer_t *cmd);
OMPI_DECLSPEC int orte_gpr_base_pack_dump_triggers(orte_buffer_t *cmd);
OMPI_DECLSPEC int orte_gpr_base_pack_dump_callbacks(orte_buffer_t *cmd);
OMPI_DECLSPEC int orte_gpr_base_print_dump(orte_buffer_t *buffer, int output_id);
OMPI_DECLSPEC void orte_gpr_base_dump_keyval_value(orte_gpr_keyval_t *iptr, int output_id);
OMPI_DECLSPEC void orte_gpr_base_dump_keyval_value(orte_buffer_t *buffer,
orte_gpr_keyval_t *iptr);
OMPI_DECLSPEC int orte_gpr_base_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id);
OMPI_DECLSPEC int orte_gpr_base_dump_notify_data(orte_gpr_notify_data_t *data, int output_id);
OMPI_DECLSPEC int orte_gpr_base_dump_notify_msg(orte_buffer_t *buffer,
orte_gpr_notify_message_t *msg);
OMPI_DECLSPEC int orte_gpr_base_dump_notify_data(orte_buffer_t *buffer,
orte_gpr_notify_data_t *data);
OMPI_DECLSPEC int orte_gpr_base_pack_cleanup_job(orte_buffer_t *buffer,
orte_jobid_t jobid);

Просмотреть файл

@ -56,3 +56,12 @@ int orte_gpr_base_pack_dump_triggers(orte_buffer_t *cmd)
return orte_dps.pack(cmd, &command, 1, ORTE_GPR_PACK_CMD);
}
int orte_gpr_base_pack_dump_callbacks(orte_buffer_t *cmd)
{
orte_gpr_cmd_flag_t command;
command = ORTE_GPR_DUMP_CALLBACKS_CMD;
return orte_dps.pack(cmd, &command, 1, ORTE_GPR_PACK_CMD);
}

Просмотреть файл

@ -32,112 +32,152 @@
#include "mca/gpr/base/base.h"
static void orte_gpr_base_dump_data(orte_gpr_notify_data_t *data, int output_id);
static void orte_gpr_base_dump_data(orte_buffer_t *buffer, orte_gpr_notify_data_t *data);
int orte_gpr_base_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id)
static void orte_gpr_base_dump_load_string(orte_buffer_t *buffer, char **tmp);
int orte_gpr_base_dump_notify_msg(orte_buffer_t *buffer,
orte_gpr_notify_message_t *msg)
{
char *tmp_out;
int i;
ompi_output(output_id, "\n\nDUMP OF NOTIFY MESSAGE STRUCTURE\n");
asprintf(&tmp_out, "\nDUMP OF NOTIFY MESSAGE STRUCTURE");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
if (NULL == msg) {
ompi_output(output_id, "NULL msg pointer");
asprintf(&tmp_out, "NULL msg pointer");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
return ORTE_SUCCESS;
}
ompi_output(output_id, "%d Notify data structures in message", msg->cnt);
asprintf(&tmp_out, "%d Notify data structures in message going to trigger %d",
msg->cnt, msg->idtag);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
if (0 < msg->cnt && NULL != msg->data) {
for (i=0; i < msg->cnt; i++) {
ompi_output(output_id, "\n\nDump of data structure %d", i);
orte_gpr_base_dump_data(msg->data[i], output_id);
asprintf(&tmp_out, "\nDump of data structure %d", i);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
orte_gpr_base_dump_data(buffer, msg->data[i]);
}
}
return ORTE_SUCCESS;
}
int orte_gpr_base_dump_notify_data(orte_gpr_notify_data_t *data, int output_id)
int orte_gpr_base_dump_notify_data(orte_buffer_t *buffer,
orte_gpr_notify_data_t *data)
{
ompi_output(output_id, "\n\nDUMP OF NOTIFY DATA STRUCTURE\n");
char *tmp_out;
asprintf(&tmp_out, "\nDUMP OF NOTIFY DATA STRUCTURE");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
if (NULL == data) {
ompi_output(output_id, "NULL data pointer");
asprintf(&tmp_out, "NULL data pointer");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
return ORTE_SUCCESS;
}
orte_gpr_base_dump_data(data, output_id);
orte_gpr_base_dump_data(buffer, data);
return ORTE_SUCCESS;
}
static void orte_gpr_base_dump_data(orte_gpr_notify_data_t *data, int output_id)
static void orte_gpr_base_dump_data(orte_buffer_t *buffer,
orte_gpr_notify_data_t *data)
{
char *tmp_out;
orte_gpr_addr_mode_t addr;
orte_gpr_value_t **values;
int i, j;
ompi_output(output_id, "%d Values from segment %s", data->cnt, data->segment);
asprintf(&tmp_out, "%d Values from segment %s", data->cnt, data->segment);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
if (0 < data->cnt && NULL != data->values) {
values = data->values;
for (i=0; i < data->cnt; i++) {
ompi_output(output_id, "\nData for value %d", i);
asprintf(&tmp_out, "\nData for value %d going to callback num %d",
i, data->cb_num);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
if (NULL == values[i]) {
ompi_output(output_id, "\tError encountered: NULL value pointer");
asprintf(&tmp_out, "\tError encountered: NULL value pointer");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
} else {
addr = values[i]->addr_mode;
if (NULL == values[i]->tokens) {
ompi_output(output_id, "\tNULL tokens (wildcard)");
asprintf(&tmp_out, "\tNULL tokens (wildcard)");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
} else {
ompi_output(output_id, "\t%d Tokens returned", values[i]->num_tokens);
asprintf(&tmp_out, "\t%d Tokens returned", values[i]->num_tokens);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
for (j=0; j < values[i]->num_tokens; j++) {
ompi_output(output_id, "\tToken %d: %s", j, values[i]->tokens[j]);
asprintf(&tmp_out, "\tToken %d: %s", j, values[i]->tokens[j]);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
}
ompi_output(output_id, "\tToken addressing mode:");
asprintf(&tmp_out, "\tToken addressing mode:");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
if (ORTE_GPR_TOKENS_AND & addr) {
ompi_output(output_id, "\t\tORTE_GPR_TOKENS_AND");
asprintf(&tmp_out, "\t\tORTE_GPR_TOKENS_AND");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_TOKENS_OR & addr) {
ompi_output(output_id, "\t\tORTE_GPR_TOKENS_OR");
asprintf(&tmp_out, "\t\tORTE_GPR_TOKENS_OR");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_TOKENS_XAND & addr) {
ompi_output(output_id, "\t\tORTE_GPR_TOKENS_XAND");
asprintf(&tmp_out, "\t\tORTE_GPR_TOKENS_XAND");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_TOKENS_XOR & addr) {
ompi_output(output_id, "\t\tORTE_GPR_TOKENS_XOR");
asprintf(&tmp_out, "\t\tORTE_GPR_TOKENS_XOR");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_TOKENS_NOT & addr) {
ompi_output(output_id, "\t\tORTE_GPR_TOKENS_NOT");
asprintf(&tmp_out, "\t\tORTE_GPR_TOKENS_NOT");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
ompi_output(output_id, "\n\tKey addressing mode:");
asprintf(&tmp_out, "\n\tKey addressing mode:");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
if (0x0000 == addr) {
ompi_output(output_id, "\t\tNONE");
asprintf(&tmp_out, "\t\tNONE");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_KEYS_AND & addr) {
ompi_output(output_id, "\t\tORTE_GPR_KEYS_AND");
asprintf(&tmp_out, "\t\tORTE_GPR_KEYS_AND");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_KEYS_OR & addr) {
ompi_output(output_id, "\t\tORTE_GPR_KEYS_OR");
asprintf(&tmp_out, "\t\tORTE_GPR_KEYS_OR");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_KEYS_XAND & addr) {
ompi_output(output_id, "\t\tORTE_GPR_KEYS_XAND");
asprintf(&tmp_out, "\t\tORTE_GPR_KEYS_XAND");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_KEYS_XOR & addr) {
ompi_output(output_id, "\t\tORTE_GPR_KEYS_XOR");
asprintf(&tmp_out, "\t\tORTE_GPR_KEYS_XOR");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (ORTE_GPR_KEYS_NOT & addr) {
ompi_output(output_id, "\t\tORTE_GPR_KEYS_NOT");
asprintf(&tmp_out, "\t\tORTE_GPR_KEYS_NOT");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
}
if (NULL == values[i]->keyvals) {
ompi_output(output_id, "\tNo keyvals returned");
asprintf(&tmp_out, "\tNo keyvals returned");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
} else {
ompi_output(output_id, "\t%d Keyvals returned", values[i]->cnt);
asprintf(&tmp_out, "\t%d Keyvals returned", values[i]->cnt);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
for (j=0; j < values[i]->cnt; j++) {
ompi_output(output_id, "\t\tData for keyval %d: Key: %s", j,
asprintf(&tmp_out, "\t\tData for keyval %d: Key: %s", j,
(values[i]->keyvals[j])->key);
orte_gpr_base_dump_keyval_value(values[i]->keyvals[j], output_id);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
orte_gpr_base_dump_keyval_value(buffer, values[i]->keyvals[j]);
}
}
}
@ -146,108 +186,141 @@ static void orte_gpr_base_dump_data(orte_gpr_notify_data_t *data, int output_id)
}
void orte_gpr_base_dump_keyval_value(orte_gpr_keyval_t *iptr, int output_id)
void orte_gpr_base_dump_keyval_value(orte_buffer_t *buffer, orte_gpr_keyval_t *iptr)
{
char *tmp_out;
switch(iptr->type) {
case ORTE_BYTE:
ompi_output(output_id, "\t\t\tData type: ORTE_BYTE");
asprintf(&tmp_out, "\t\t\tData type: ORTE_BYTE: no value field");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_BOOL:
ompi_output(output_id, "\t\t\tData type: ORTE_BOOL");
asprintf(&tmp_out, "\t\t\tData type: ORTE_BOOL: no value field");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_STRING:
ompi_output(output_id, "\t\t\tData type: ORTE_STRING\tValue: %s", iptr->value.strptr);
asprintf(&tmp_out, "\t\t\tData type: ORTE_STRING\tValue: %s", iptr->value.strptr);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_SIZE:
ompi_output(output_id, "\t\t\tData type: ORTE_SIZE");
asprintf(&tmp_out, "\t\t\tData type: ORTE_SIZE: no value field");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_INT:
ompi_output(output_id, "\t\t\tData type: ORTE_INT\tValue: %d", (int)iptr->value.i32);
asprintf(&tmp_out, "\t\t\tData type: ORTE_INT: no value field");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_UINT8:
ompi_output(output_id, "\t\t\tData type: ORTE_UINT8\tValue: %d", (int)iptr->value.ui8);
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT8\tValue: %d", (int)iptr->value.ui8);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_UINT16:
ompi_output(output_id, "\t\t\tData type: ORTE_UINT16\tValue: %d", (int)iptr->value.ui16);
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT16\tValue: %d", (int)iptr->value.ui16);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_UINT32:
ompi_output(output_id, "\t\t\tData type: ORTE_UINT32\tValue: %d", (int)iptr->value.ui32);
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT32\tValue: %d", (int)iptr->value.ui32);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
#ifdef HAVE_I64
case ORTE_UINT64:
ompi_output(output_id, "\t\t\tData type: ORTE_UINT64\tValue: %d", (int)iptr->value.ui64);
asprintf(&tmp_out, "\t\t\tData type: ORTE_UINT64\tValue: %d", (int)iptr->value.ui64);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
#endif
case ORTE_INT8:
ompi_output(output_id, "\t\t\tData type: ORTE_INT8\tValue: %d", (int)iptr->value.i8);
asprintf(&tmp_out, "\t\t\tData type: ORTE_INT8\tValue: %d", (int)iptr->value.i8);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_INT16:
ompi_output(output_id, "\t\t\tData type: ORTE_INT16\tValue: %d", (int)iptr->value.i16);
asprintf(&tmp_out, "\t\t\tData type: ORTE_INT16\tValue: %d", (int)iptr->value.i16);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_INT32:
ompi_output(output_id, "\t\t\tData type: ORTE_INT32\tValue: %d", (int)iptr->value.i32);
asprintf(&tmp_out, "\t\t\tData type: ORTE_INT32\tValue: %d", (int)iptr->value.i32);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
#ifdef HAVE_I64
case ORTE_INT64:
ompi_output(output_id, "\t\t\tData type: ORTE_INT64\tValue: %d", (int)iptr->value.i64);
asprintf(&tmp_out, "\t\t\tData type: ORTE_INT64\tValue: %d", (int)iptr->value.i64);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
#endif
case ORTE_BYTE_OBJECT:
ompi_output(output_id, "\t\t\tData type: ORTE_BYTE_OBJECT\tSize: %d", (int)(iptr->value.byteobject).size);
asprintf(&tmp_out, "\t\t\tData type: ORTE_BYTE_OBJECT\tSize: %d", (int)(iptr->value.byteobject).size);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_NAME:
ompi_output(output_id, "\t\t\tData type: ORTE_NAME\tValue: [%d,%d,%d]", ORTE_NAME_ARGS(&(iptr->value.proc)));
asprintf(&tmp_out, "\t\t\tData type: ORTE_NAME\tValue: [%d,%d,%d]", ORTE_NAME_ARGS(&(iptr->value.proc)));
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_VPID:
ompi_output(output_id, "\t\t\tData type: ORTE_VPID\tValue: %d", (int)iptr->value.vpid);
asprintf(&tmp_out, "\t\t\tData type: ORTE_VPID\tValue: %d", (int)iptr->value.vpid);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_JOBID:
ompi_output(output_id, "\t\t\tData type: ORTE_JOBID\tValue: %d", (int)iptr->value.jobid);
asprintf(&tmp_out, "\t\t\tData type: ORTE_JOBID\tValue: %d", (int)iptr->value.jobid);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_CELLID:
ompi_output(output_id, "\t\t\tData type: ORTE_CELLID\tValue: %d", (int)iptr->value.cellid);
asprintf(&tmp_out, "\t\t\tData type: ORTE_CELLID\tValue: %d", (int)iptr->value.cellid);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_NODE_STATE:
ompi_output(output_id, "\t\t\tData type: ORTE_NODE_STATE\tValue: %d", (int)iptr->value.node_state);
asprintf(&tmp_out, "\t\t\tData type: ORTE_NODE_STATE\tValue: %d", (int)iptr->value.node_state);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_PROC_STATE:
ompi_output(output_id, "\t\t\tData type: ORTE_PROC_STATE\tValue: %d", (int)iptr->value.proc_state);
asprintf(&tmp_out, "\t\t\tData type: ORTE_PROC_STATE\tValue: %d", (int)iptr->value.proc_state);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_EXIT_CODE:
ompi_output(output_id, "\t\t\tData type: ORTE_EXIT_CODE\tValue: %d", (int)iptr->value.exit_code);
asprintf(&tmp_out, "\t\t\tData type: ORTE_EXIT_CODE\tValue: %d", (int)iptr->value.exit_code);
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_NULL:
ompi_output(output_id, "\t\t\tData type: ORTE_NULL");
asprintf(&tmp_out, "\t\t\tData type: ORTE_NULL");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
case ORTE_APP_CONTEXT:
ompi_output(output_id, "\t\t\tData type: ORTE_APP_CONTEXT");
asprintf(&tmp_out, "\t\t\tData type: ORTE_APP_CONTEXT");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
default:
ompi_output(output_id, "\t\t\tData type: UNKNOWN");
asprintf(&tmp_out, "\t\t\tData type: UNKNOWN");
orte_gpr_base_dump_load_string(buffer, &tmp_out);
break;
}
}
static void orte_gpr_base_dump_load_string(orte_buffer_t *buffer, char **tmp)
{
orte_dps.pack(buffer, tmp, 1, ORTE_STRING);
free(*tmp);
}

Просмотреть файл

@ -490,6 +490,8 @@ typedef int (*orte_gpr_base_module_dump_segments_fn_t)(int output_id);
typedef int (*orte_gpr_base_module_dump_triggers_fn_t)(int output_id);
typedef int (*orte_gpr_base_module_dump_callbacks_fn_t) (int output_id);
typedef int (*orte_gpr_base_module_dump_notify_msg_fn_t)(orte_gpr_notify_message_t *msg, int output_id);
typedef int (*orte_gpr_base_module_dump_notify_data_fn_t)(orte_gpr_notify_data_t *data, int output_id);
@ -575,6 +577,7 @@ struct orte_gpr_base_module_1_0_0_t {
orte_gpr_base_module_dump_all_fn_t dump_all;
orte_gpr_base_module_dump_segments_fn_t dump_segments;
orte_gpr_base_module_dump_triggers_fn_t dump_triggers;
orte_gpr_base_module_dump_callbacks_fn_t dump_callbacks;
orte_gpr_base_module_dump_notify_msg_fn_t dump_notify_msg;
orte_gpr_base_module_dump_notify_data_fn_t dump_notify_data;
/* CLEANUP OPERATIONS */

Просмотреть файл

@ -80,6 +80,7 @@ typedef int32_t orte_gpr_notify_id_t;
#define ORTE_GPR_DUMP_ALL_CMD (uint16_t)0x0800
#define ORTE_GPR_DUMP_SEGMENTS_CMD (uint16_t)0x0810
#define ORTE_GPR_DUMP_TRIGGERS_CMD (uint16_t)0x0820
#define ORTE_GPR_DUMP_CALLBACKS_CMD (uint16_t)0x0830
#define ORTE_GPR_INCREMENT_VALUE_CMD (uint16_t)0x2000
#define ORTE_GPR_DECREMENT_VALUE_CMD (uint16_t)0x4000
#define ORTE_GPR_COMPOUND_CMD (uint16_t)0x8000

Просмотреть файл

@ -175,6 +175,12 @@ int orte_gpr_proxy_dump_segments(int output_id);
int orte_gpr_proxy_dump_triggers(int output_id);
int orte_gpr_proxy_dump_callbacks(int output_id);
int orte_gpr_proxy_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id);
int orte_gpr_proxy_dump_notify_data(orte_gpr_notify_data_t *data, int output_id);
/*
* General operations

Просмотреть файл

@ -96,8 +96,9 @@ static orte_gpr_base_module_t orte_gpr_proxy = {
orte_gpr_proxy_dump_all,
orte_gpr_proxy_dump_segments,
orte_gpr_proxy_dump_triggers,
orte_gpr_base_dump_notify_msg,
orte_gpr_base_dump_notify_data,
orte_gpr_proxy_dump_callbacks,
orte_gpr_proxy_dump_notify_msg,
orte_gpr_proxy_dump_notify_data,
/* CLEANUP OPERATIONS */
orte_gpr_proxy_cleanup_job,
orte_gpr_proxy_cleanup_proc
@ -294,7 +295,7 @@ void orte_gpr_proxy_notify_recv(int status, orte_process_name_t* sender,
orte_gpr_proxy_subscriber_t **subs;
size_t n;
int rc;
int32_t cnt, i, j;
int32_t num_msgs, cnt, i, j, k;
if (orte_gpr_proxy_globals.debug) {
ompi_output(0, "[%d,%d,%d] gpr proxy: received trigger message",
@ -313,52 +314,59 @@ void orte_gpr_proxy_notify_recv(int status, orte_process_name_t* sender,
}
n = 1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(buffer, &id_tag, &n, ORTE_GPR_NOTIFY_ID))) {
ORTE_ERROR_LOG(rc);
goto RETURN_ERROR;
}
/* locate request corresponding to this message */
trackptr = (orte_gpr_proxy_notify_tracker_t*)((orte_gpr_proxy_globals.notify_tracker)->addr[id_tag]);
if (NULL == trackptr) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
goto RETURN_ERROR;
}
n = 1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(buffer, &cnt, &n, ORTE_INT32))) {
if (ORTE_SUCCESS != (rc = orte_dps.unpack(buffer, &num_msgs, &n, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto RETURN_ERROR;
}
if(cnt > 0) {
/* allocate space for the array */
data = (orte_gpr_notify_data_t**)malloc(cnt * sizeof(orte_gpr_notify_data_t*));
if (NULL == data) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
for (k=0; k < num_msgs; k++) {
n = 1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(buffer, &id_tag, &n, ORTE_GPR_NOTIFY_ID))) {
ORTE_ERROR_LOG(rc);
goto RETURN_ERROR;
}
/* locate request corresponding to this message */
trackptr = (orte_gpr_proxy_notify_tracker_t*)((orte_gpr_proxy_globals.notify_tracker)->addr[id_tag]);
if (NULL == trackptr) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
goto RETURN_ERROR;
}
n = cnt;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(buffer, data, &n, ORTE_GPR_NOTIFY_DATA))) {
n = 1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(buffer, &cnt, &n, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto RETURN_ERROR;
}
for (i=0; i < cnt; i++) {
/* locate the data callback */
subs = (orte_gpr_proxy_subscriber_t**)((trackptr->callbacks)->addr);
for (j=0; j < (trackptr->callbacks)->size; j++) {
if (NULL != subs[j] && subs[j]->index == data[i]->cb_num) {
/* process request */
subs[j]->callback(data[i], subs[j]->user_tag);
break;
if(cnt > 0) {
/* allocate space for the array */
data = (orte_gpr_notify_data_t**)malloc(cnt * sizeof(orte_gpr_notify_data_t*));
if (NULL == data) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto RETURN_ERROR;
}
n = cnt;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(buffer, data, &n, ORTE_GPR_NOTIFY_DATA))) {
ORTE_ERROR_LOG(rc);
goto RETURN_ERROR;
}
for (i=0; i < cnt; i++) {
/* locate the data callback */
subs = (orte_gpr_proxy_subscriber_t**)((trackptr->callbacks)->addr);
for (j=0; j < (trackptr->callbacks)->size; j++) {
if (NULL != subs[j] && subs[j]->index == data[i]->cb_num) {
/* process request */
subs[j]->callback(data[i], subs[j]->user_tag);
break;
}
}
}
}
}
}
/* dismantle message and free memory */
RETURN_ERROR:

Просмотреть файл

@ -227,3 +227,115 @@ int orte_gpr_proxy_dump_triggers(int output_id)
OBJ_RELEASE(answer);
return rc;
}
int orte_gpr_proxy_dump_callbacks(int output_id)
{
orte_gpr_cmd_flag_t command;
orte_buffer_t *cmd;
orte_buffer_t *answer;
int rc;
size_t n;
if (orte_gpr_proxy_globals.compound_cmd_mode) {
return orte_gpr_base_pack_dump_callbacks(orte_gpr_proxy_globals.compound_cmd);
}
cmd = OBJ_NEW(orte_buffer_t);
if (NULL == cmd) { /* got a problem */
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_dump_callbacks(cmd))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cmd);
return rc;
}
if (0 > orte_rml.send_buffer(orte_process_info.gpr_replica, cmd, ORTE_RML_TAG_GPR, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (0 > orte_rml.recv_buffer(orte_process_info.gpr_replica, answer, ORTE_RML_TAG_GPR)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
n = 1;
if (ORTE_SUCCESS != (rc = orte_dps.unpack(answer, &command, &n, ORTE_GPR_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return rc;
}
if (ORTE_GPR_DUMP_CALLBACKS_CMD != command) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(answer);
return ORTE_ERR_COMM_FAILURE;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_print_dump(answer, output_id))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(answer);
return rc;
}
int orte_gpr_proxy_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id)
{
orte_buffer_t *answer;
int rc;
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) { /* got a problem */
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_dump_notify_msg(answer, msg))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_print_dump(answer, output_id))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(answer);
return rc;
}
int orte_gpr_proxy_dump_notify_data(orte_gpr_notify_data_t *data, int output_id)
{
orte_buffer_t *answer;
int rc;
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) { /* got a problem */
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_dump_notify_data(answer, data))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_print_dump(answer, output_id))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(answer);
return rc;
}

Просмотреть файл

@ -127,6 +127,12 @@ int orte_gpr_replica_dump_segments(int output_id);
int orte_gpr_replica_dump_triggers(int output_id);
int orte_gpr_replica_dump_callbacks(int output_id);
int orte_gpr_replica_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id);
int orte_gpr_replica_dump_notify_data(orte_gpr_notify_data_t *data, int output_id);
/*
* General functions

Просмотреть файл

@ -153,3 +153,94 @@ int orte_gpr_replica_dump_triggers(int output_id)
return rc;
}
int orte_gpr_replica_dump_callbacks(int output_id)
{
orte_buffer_t *buffer;
int rc;
if (orte_gpr_replica_globals.debug) {
ompi_output(0, "[%d,%d,%d] gpr_replica_dump_callbacks: entered for output on %d",
ORTE_NAME_ARGS(orte_process_info.my_name), output_id);
}
OMPI_THREAD_LOCK(&orte_gpr_replica_globals.mutex);
if (orte_gpr_replica_globals.compound_cmd_mode) {
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_dump_callbacks(orte_gpr_replica_globals.compound_cmd))) {
ORTE_ERROR_LOG(rc);
}
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc;
}
buffer = OBJ_NEW(orte_buffer_t);
if (NULL == buffer) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_callbacks_fn(buffer))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS == rc) {
orte_gpr_base_print_dump(buffer, output_id);
}
OBJ_RELEASE(buffer);
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc;
}
int orte_gpr_replica_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id)
{
orte_buffer_t *answer;
int rc;
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) { /* got a problem */
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_dump_notify_msg(answer, msg))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_print_dump(answer, output_id))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(answer);
return rc;
}
int orte_gpr_replica_dump_notify_data(orte_gpr_notify_data_t *data, int output_id)
{
orte_buffer_t *answer;
int rc;
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) { /* got a problem */
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_dump_notify_data(answer, data))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr_base_print_dump(answer, output_id))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(answer);
return rc;
}

Просмотреть файл

@ -208,6 +208,20 @@ int orte_gpr_replica_process_command_buffer(orte_buffer_t *input_buffer,
case ORTE_GPR_DUMP_CALLBACKS_CMD: /***** DUMP *****/
if (orte_gpr_replica_globals.debug) {
ompi_output(0, "\tdump cmd");
}
if (ORTE_SUCCESS != (ret = orte_gpr_replica_recv_dump_callbacks_cmd(answer))) {
ORTE_ERROR_LOG(ret);
goto RETURN_ERROR;
}
break;
case ORTE_GPR_INCREMENT_VALUE_CMD: /***** INCREMENT_VALUE *****/
if (orte_gpr_replica_globals.debug) {

Просмотреть файл

@ -68,8 +68,7 @@ int orte_gpr_replica_process_command_buffer(orte_buffer_t *input_buffer,
/*
* Messaging functions
*/
int orte_gpr_replica_remote_notify(orte_process_name_t *recipient, int recipient_tag,
orte_gpr_notify_message_t *message);
int orte_gpr_replica_remote_notify(orte_process_name_t *recipient, ompi_list_t *messages);
/*
* define the local functions for processing commands
@ -104,6 +103,8 @@ int orte_gpr_replica_recv_dump_segments_cmd(orte_buffer_t *answer);
int orte_gpr_replica_recv_dump_triggers_cmd(orte_buffer_t *answer);
int orte_gpr_replica_recv_dump_callbacks_cmd(orte_buffer_t *answer);
int orte_gpr_replica_recv_get_startup_msg_cmd(orte_buffer_t *input_buffer,
orte_buffer_t *answer);

Просмотреть файл

@ -90,3 +90,23 @@ int orte_gpr_replica_recv_dump_triggers_cmd(orte_buffer_t *answer)
}
return rc;
}
int orte_gpr_replica_recv_dump_callbacks_cmd(orte_buffer_t *answer)
{
orte_gpr_cmd_flag_t command=ORTE_GPR_DUMP_CALLBACKS_CMD;
int rc;
if (ORTE_SUCCESS != (rc = orte_dps.pack(answer, &command, 1, ORTE_GPR_CMD))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OMPI_THREAD_LOCK(&orte_gpr_replica_globals.mutex);
rc = orte_gpr_replica_dump_callbacks_fn(answer);
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
}

Просмотреть файл

@ -33,11 +33,13 @@
#include "gpr_replica_comm.h"
int orte_gpr_replica_remote_notify(orte_process_name_t *recipient, orte_gpr_notify_id_t remote_idtag,
orte_gpr_notify_message_t *message)
int orte_gpr_replica_remote_notify(orte_process_name_t *recipient, ompi_list_t *messages)
{
orte_buffer_t msg;
orte_buffer_t buffer;
orte_gpr_replica_notify_msg_list_t *msg;
orte_gpr_notify_message_t *message;
orte_gpr_cmd_flag_t command;
int32_t count;
int rc;
if (orte_gpr_replica_globals.debug) {
@ -46,36 +48,47 @@ int orte_gpr_replica_remote_notify(orte_process_name_t *recipient, orte_gpr_noti
command = ORTE_GPR_NOTIFY_CMD;
OBJ_CONSTRUCT(&msg, orte_buffer_t);
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dps.pack(&msg, &command, 1, ORTE_GPR_CMD))) {
if (ORTE_SUCCESS != (rc = orte_dps.pack(&buffer, &command, 1, ORTE_GPR_CMD))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_dps.pack(&msg, &remote_idtag, 1, ORTE_GPR_NOTIFY_ID))) {
count = (int32_t)ompi_list_get_size(messages);
if (ORTE_SUCCESS != (rc = orte_dps.pack(&buffer, &count, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_dps.pack(&msg, &message->cnt, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(message->cnt > 0) {
if (ORTE_SUCCESS != (rc = orte_dps.pack(&msg, message->data, message->cnt, ORTE_GPR_NOTIFY_DATA))) {
while (NULL != (msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_remove_first(messages))) {
message = msg->message;
if (ORTE_SUCCESS != (rc = orte_dps.pack(&buffer, &(message->idtag), 1, ORTE_GPR_NOTIFY_ID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (0 > orte_rml.send_buffer(recipient, &msg, ORTE_RML_TAG_GPR_NOTIFY, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (rc = orte_dps.pack(&buffer, &(message->cnt), 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(message->cnt > 0) {
if (ORTE_SUCCESS != (rc = orte_dps.pack(&buffer, message->data, message->cnt, ORTE_GPR_NOTIFY_DATA))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
OBJ_RELEASE(msg);
}
OBJ_DESTRUCT(&msg);
if (0 > orte_rml.send_buffer(recipient, &buffer, ORTE_RML_TAG_GPR_NOTIFY, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -141,40 +141,73 @@ int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer)
return ORTE_SUCCESS;
}
int orte_gpr_replica_dump_triggers_fn(orte_buffer_t *buffer)
int orte_gpr_replica_dump_callbacks_fn(orte_buffer_t *buffer)
{
orte_gpr_replica_triggers_t **trig;
ompi_list_item_t *item;
orte_gpr_replica_callbacks_t *cb;
orte_gpr_replica_notify_msg_list_t *msg;
char *tmp_out;
int i, j, k;
asprintf(&tmp_out, "\nDUMP OF GPR TRIGGERS\n");
asprintf(&tmp_out, "\nDUMP OF GPR REGISTERED CALLBACKS\n");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
if (0 < ompi_list_get_size(&(orte_gpr_replica.callbacks))) {
asprintf(&tmp_out, "Registered Callbacks");
if (0 >= (k= ompi_list_get_size(&(orte_gpr_replica.callbacks)))) {
asprintf(&tmp_out, "--- None registered at this time ---");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
for (i=0, item=ompi_list_get_first(&(orte_gpr_replica.callbacks));
item != ompi_list_get_end(&(orte_gpr_replica.callbacks));
i++, item=ompi_list_get_next(item)) {
cb = (orte_gpr_replica_callbacks_t*)item;
asprintf(&tmp_out, "\tInfo for callback %d", i);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
if (NULL == cb->requestor) {
asprintf(&tmp_out, "\t\tLocal requestor - local notify idtag %d", (cb->message)->idtag);
} else {
asprintf(&tmp_out, "\t\tRequestor: [%d,%d,%d] - remote notify idtag %d",
ORTE_NAME_ARGS(cb->requestor), cb->remote_idtag);
}
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
asprintf(&tmp_out, "\t\tNum values: %d", (cb->message)->cnt);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
asprintf(&tmp_out, "\n");
return ORTE_SUCCESS;
} else {
asprintf(&tmp_out, "--- %d callback(s) registered at this time", k);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
i=0;
for (cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_first(&(orte_gpr_replica.callbacks));
cb != (orte_gpr_replica_callbacks_t*)ompi_list_get_end(&(orte_gpr_replica.callbacks));
cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_next(cb)) {
if (NULL == cb) {
asprintf(&tmp_out, "\n\t--- BAD CALLBACK POINTER %d ---", i);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
return ORTE_SUCCESS;
}
asprintf(&tmp_out, "\nInfo for callback %d", i);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
if (NULL == cb->requestor) {
asprintf(&tmp_out, "Local requestor");
} else {
asprintf(&tmp_out, "Requestor: [%d,%d,%d]", ORTE_NAME_ARGS(cb->requestor));
}
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
j = ompi_list_get_size(&(cb->messages));
asprintf(&tmp_out, "Num messages: %d", j);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
for (k=0, item = ompi_list_get_first(&(cb->messages));
item != ompi_list_get_end(&(cb->messages));
item = ompi_list_get_next(item), k++) {
msg = (orte_gpr_replica_notify_msg_list_t*)item;
asprintf(&tmp_out, "\n\nInfo for message %d sending %d data objects to notifier id %d",
k, (msg->message)->cnt, (msg->message)->idtag);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
orte_gpr_base_dump_notify_msg(buffer, msg->message);
}
i++;
}
asprintf(&tmp_out, "\n");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
return ORTE_SUCCESS;
}
int orte_gpr_replica_dump_triggers_fn(orte_buffer_t *buffer)
{
orte_gpr_replica_triggers_t **trig;
char *tmp_out;
int j, k;
asprintf(&tmp_out, "\nDUMP OF GPR TRIGGERS\n");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr);
k = 0;
@ -262,13 +295,13 @@ static void orte_gpr_replica_dump_trigger(orte_buffer_t *buffer, int cnt,
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
asprintf(&tmp_out, "\tData covered by this subscription");
asprintf(&tmp_out, "\n\tData covered by this subscription");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
data = (orte_gpr_replica_subscribed_data_t**)((trig->subscribed_data)->addr);
for (i=0; i < (trig->subscribed_data)->size; i++) {
if (NULL != data[i]) {
asprintf(&tmp_out, "\t\tData on segment %s", (data[i]->seg)->name);
asprintf(&tmp_out, "\n\t\tData on segment %s", (data[i]->seg)->name);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
k = (int)orte_value_array_get_size(&(data[i]->tokentags));

Просмотреть файл

@ -126,6 +126,7 @@ int orte_gpr_replica_dump_segments_fn(orte_buffer_t *buffer);
int orte_gpr_replica_dump_triggers_fn(orte_buffer_t *buffer);
int orte_gpr_replica_dump_callbacks_fn(orte_buffer_t *buffer);
/*
* ********* INTERNAL UTILITY FUNCTIONS **********

Просмотреть файл

@ -42,9 +42,10 @@
int orte_gpr_replica_process_callbacks(void)
{
orte_gpr_replica_callbacks_t *cb;
orte_gpr_replica_notify_msg_list_t *msg;
orte_gpr_notify_data_t **data;
orte_gpr_replica_subscribed_data_t **sdata;
orte_gpr_replica_triggers_t *trig;
orte_gpr_replica_triggers_t *trig, **trigs;
bool processed;
int i, k, rc;
@ -52,51 +53,64 @@ int orte_gpr_replica_process_callbacks(void)
ompi_output(0, "gpr replica: process_callbacks entered");
}
while (NULL != (cb = (orte_gpr_replica_callbacks_t*)ompi_list_remove_first(&orte_gpr_replica.callbacks))) {
/* get this request off of the local notify request tracker */
trig = (orte_gpr_replica_triggers_t*)((orte_gpr_replica.triggers)->addr[(cb->message)->idtag]);
if (NULL == trig) {
ORTE_ERROR_LOG(ORTE_ERR_GPR_DATA_CORRUPT);
goto CLEANUP;
}
if (NULL == cb->requestor) { /* local callback */
if (orte_gpr_replica_globals.debug) {
ompi_output(0, "process_callbacks: local");
}
data = (cb->message)->data;
sdata = (orte_gpr_replica_subscribed_data_t**)((trig->subscribed_data)->addr);
for (i=0; i < (cb->message)->cnt; i++) {
processed = false;
for (k=0; k < (trig->subscribed_data)->size && !processed; k++) {
if (NULL != sdata[k] && sdata[k]->index == data[i]->cb_num) {
sdata[k]->callback(data[i], sdata[k]->user_tag);
processed = true;
/* each callback corresponds to a specific requestor, each of whom can be
* slated to receive multiple messages.
* each message in the callback corresponds to a specific subscription request entered
* on the system. Within each subscription request, we can have multiple callback functions
* specified. Hence, we have to loop through all the callback functions that were logged
* for the subscription request to find the one that is to receive each block of data
* in the message
*/
while (NULL != (msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_remove_first(&(cb->messages)))) {
trig = (orte_gpr_replica_triggers_t*)((orte_gpr_replica.triggers)->addr[(msg->message)->idtag]);
if (NULL == trig) {
ORTE_ERROR_LOG(ORTE_ERR_GPR_DATA_CORRUPT);
goto CLEANUP;
}
data = (msg->message)->data;
sdata = (orte_gpr_replica_subscribed_data_t**)((trig->subscribed_data)->addr);
for (i=0; i < (msg->message)->cnt; i++) {
processed = false;
for (k=0; k < (trig->subscribed_data)->size && !processed; k++) {
if (NULL != sdata[k] && sdata[k]->index == data[i]->cb_num) {
sdata[k]->callback(data[i], sdata[k]->user_tag);
processed = true;
}
}
}
}
} else { /* remote request - send message back */
}
} else { /* remote request - send messages back */
if (orte_gpr_replica_globals.debug) {
ompi_output(0, "process_callbacks: remote to [%d,%d,%d]",
ORTE_NAME_ARGS(cb->requestor));
}
orte_gpr_replica_remote_notify(cb->requestor, cb->remote_idtag, cb->message);
orte_gpr_replica_remote_notify(cb->requestor, &(cb->messages));
}
CLEANUP:
/* if one_shot, remove trigger action */
if (ORTE_GPR_TRIG_ONE_SHOT & trig->action) {
if (ORTE_SUCCESS != (rc = orte_pointer_array_set_item(orte_gpr_replica.triggers,
trig->index, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_RELEASE(trig);
}
OBJ_RELEASE(cb);
}
/* cleanup any one-shot triggers that fired */
trigs = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr);
for (i=0; i < (orte_gpr_replica.triggers)->size; i++) {
if (NULL != trigs[i] && trigs[i]->one_shot_fired) {
k = trigs[i]->index;
OBJ_RELEASE(trigs[i]);
if (ORTE_SUCCESS != (rc = orte_pointer_array_set_item(orte_gpr_replica.triggers,
k, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
}
return ORTE_SUCCESS;
}
@ -105,30 +119,62 @@ CLEANUP:
int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig)
{
orte_gpr_replica_callbacks_t *cb;
orte_gpr_replica_notify_msg_list_t *msg;
int rc;
/* see if a callback has already been requested for this requestor */
for (cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_first(&(orte_gpr_replica.callbacks));
cb != (orte_gpr_replica_callbacks_t*)ompi_list_get_end(&(orte_gpr_replica.callbacks));
cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_next(cb)) {
if (trig->requestor == cb->requestor) { /* same destination - add to existing callback */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(cb->message), trig))) {
ORTE_ERROR_LOG(rc);
if (trig->requestor == cb->requestor) { /* same requestor - add to existing callback */
/* check to see if we already have something for this trigger - if so, add to it */
for (msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_first(&(cb->messages));
msg != (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_end(&(cb->messages));
msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_next(msg)) {
if ((msg->message)->idtag == trig->index) { /* same trigger - add to it */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
}
return rc;
/* same requestor, different trigger - add another message to callback */
msg = OBJ_NEW(orte_gpr_replica_notify_msg_list_t);
if (NULL == msg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
ompi_list_append(&cb->messages, &msg->item);
/* construct the message */
msg->message = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == msg->message) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (NULL == trig->requestor) {
(msg->message)->idtag = trig->index;
} else {
(msg->message)->idtag = trig->remote_idtag;
}
if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
}
/* got a new callback, generate the request */
/* this is going to somebody new - create a new callback for this requestor */
cb = OBJ_NEW(orte_gpr_replica_callbacks_t);
if (NULL == cb) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* queue the callback */
if (NULL == trig->requestor) { /* local request - queue local callback */
cb->requestor = NULL;
cb->remote_idtag = ORTE_GPR_NOTIFY_ID_MAX;
if (orte_gpr_replica_globals.debug) {
ompi_output(0, "[%d,%d,%d] process_trig: queueing local message\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
@ -140,25 +186,37 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig)
OBJ_RELEASE(cb);
return rc;
}
cb->remote_idtag = trig->remote_idtag;
if (orte_gpr_replica_globals.debug) {
ompi_output(0, "[%d,%d,%d] process_trig: queueing message for [%d,%d,%d] with idtag %d using remoteid %d\n",
ompi_output(0, "[%d,%d,%d] process_trig: queueing message for [%d,%d,%d] using remoteid %d\n",
ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(cb->requestor),
(int)cb->remote_idtag, (int)trig->remote_idtag);
(int)trig->remote_idtag);
}
}
ompi_list_append(&orte_gpr_replica.callbacks, &cb->item);
/* construct the message */
cb->message = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == cb->message) {
msg = OBJ_NEW(orte_gpr_replica_notify_msg_list_t);
if (NULL == msg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
cb->message->idtag = trig->index;
/* construct the message */
msg->message = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == msg->message) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (NULL == trig->requestor) {
(msg->message)->idtag = trig->index;
} else {
(msg->message)->idtag = trig->remote_idtag;
}
if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(cb->message), trig))) {
ompi_list_append(&cb->messages, &msg->item);
if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cb);
return rc;
@ -280,10 +338,11 @@ int orte_gpr_replica_add_values(orte_gpr_notify_data_t **data,
ompi_output(0, "\ttoken num: %d\tToken: %s", j, values[i]->tokens[j]);
}
ompi_output(0, "\tGot %d keyals:", values[i]->cnt);
for (j=0; j < values[i]->cnt; j++) {
/* for (j=0; j < values[i]->cnt; j++) {
ompi_output(0, "\tValue num: %d\tKey: %s", j, (values[i]->keyvals[j])->key);
orte_gpr_base_dump_keyval_value(values[i]->keyvals[j], 0);
}
*/
}
}

Просмотреть файл

@ -109,8 +109,9 @@ int orte_gpr_replica_release_container(orte_gpr_replica_segment_t *seg,
}
/* remove container from segment and release it */
orte_pointer_array_set_item(seg->containers, cptr->index, NULL);
i = cptr->index;
OBJ_RELEASE(cptr);
orte_pointer_array_set_item(seg->containers, i, NULL);
return ORTE_SUCCESS;
}
@ -167,6 +168,8 @@ int orte_gpr_replica_delete_itagval(orte_gpr_replica_segment_t *seg,
orte_gpr_replica_container_t *cptr,
orte_gpr_replica_itagval_t *iptr)
{
int i;
/* see if anyone cares that this value is deleted */
/* trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr);
@ -184,12 +187,13 @@ int orte_gpr_replica_delete_itagval(orte_gpr_replica_segment_t *seg,
*/
/* remove the entry from the container's itagval array */
orte_pointer_array_set_item(cptr->itagvals, iptr->index, NULL);
/* release the data storage */
i = iptr->index;
OBJ_RELEASE(iptr);
/* remove the entry from the container's itagval array */
orte_pointer_array_set_item(cptr->itagvals, i, NULL);
return ORTE_SUCCESS;
}
@ -490,13 +494,14 @@ int orte_gpr_replica_xfer_payload(orte_gpr_value_union_t *dest,
int orte_gpr_replica_release_segment(orte_gpr_replica_segment_t **seg)
{
int rc;
int rc, i;
if (0 > (rc = orte_pointer_array_set_item(orte_gpr_replica.segments, (*seg)->itag, NULL))) {
return rc;
}
i = (*seg)->itag;
OBJ_RELEASE(*seg);
if (0 > (rc = orte_pointer_array_set_item(orte_gpr_replica.segments, i, NULL))) {
return rc;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -150,7 +150,7 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg,
orte_gpr_replica_action_t action_taken)
{
orte_gpr_replica_triggers_t **trig;
int i, rc;
int i, rc;
trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr);
for (i=0; i < (orte_gpr_replica.triggers)->size; i++) {
@ -239,6 +239,13 @@ FIRED:
trig->action = trig->action & ~ORTE_GPR_TRIG_NOTIFY_START;
}
/* if one-shot, set flag to indicate it has fired so it can be cleaned
* up later
*/
if (ORTE_GPR_TRIG_ONE_SHOT & trig->action) {
trig->one_shot_fired = true;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -217,6 +217,10 @@ struct orte_gpr_replica_triggers_t {
ompi_object_t super; /**< Make this an object */
/* index of this trigger in the triggers array */
int index;
/* flag that indicates this trigger is a one-shot, has fired and
* now should be cleaned up
*/
bool one_shot_fired;
/* the action that causes a notification message to be sent out */
orte_gpr_notify_action_t action;
/* to whom the notification messages go - set to NULL if local */
@ -244,14 +248,24 @@ typedef struct orte_gpr_replica_triggers_t orte_gpr_replica_triggers_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(orte_gpr_replica_triggers_t);
/*
* Notify message list objects - used to track individual messages going to
* the same recipient
*/
typedef struct {
ompi_list_item_t item;
orte_gpr_notify_message_t *message;
} orte_gpr_replica_notify_msg_list_t;
OBJ_CLASS_DECLARATION(orte_gpr_replica_notify_msg_list_t);
/*
* Callback list objects
*/
struct orte_gpr_replica_callbacks_t {
ompi_list_item_t item;
orte_gpr_notify_message_t *message;
ompi_list_t messages;
orte_process_name_t *requestor;
orte_gpr_notify_id_t remote_idtag;
};
typedef struct orte_gpr_replica_callbacks_t orte_gpr_replica_callbacks_t;

Просмотреть файл

@ -93,8 +93,9 @@ static orte_gpr_base_module_t orte_gpr_replica_module = {
orte_gpr_replica_dump_all,
orte_gpr_replica_dump_segments,
orte_gpr_replica_dump_triggers,
orte_gpr_base_dump_notify_msg,
orte_gpr_base_dump_notify_data,
orte_gpr_replica_dump_callbacks,
orte_gpr_replica_dump_notify_msg,
orte_gpr_replica_dump_notify_data,
/* CLEANUP OPERATIONS */
orte_gpr_replica_cleanup_job,
orte_gpr_replica_cleanup_proc
@ -321,6 +322,7 @@ static void orte_gpr_replica_trigger_construct(orte_gpr_replica_triggers_t* trig
{
trig->index = 0;
trig->action = 0;
trig->one_shot_fired = false;
trig->requestor = NULL;
trig->remote_idtag = ORTE_GPR_NOTIFY_ID_MAX;
@ -369,26 +371,54 @@ OBJ_CLASS_INSTANCE(
orte_gpr_replica_trigger_destructor); /* destructor */
/* NOTIFY MSG LIST */
/* constructor - used to initialize state of notify msg list instance */
static void orte_gpr_replica_notify_msg_list_construct(orte_gpr_replica_notify_msg_list_t* msg)
{
msg->message = NULL;
}
/* destructor - used to free any resources held by instance */
static void orte_gpr_replica_notify_msg_list_destructor(orte_gpr_replica_notify_msg_list_t* msg)
{
if (NULL != msg->message) {
OBJ_RELEASE(msg->message);
}
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
orte_gpr_replica_notify_msg_list_t, /* type name */
ompi_list_item_t, /* parent "class" name */
orte_gpr_replica_notify_msg_list_construct, /* constructor */
orte_gpr_replica_notify_msg_list_destructor); /* destructor */
/* CALLBACKS */
/* constructor - used to initialize state of callback list instance */
static void orte_gpr_replica_callbacks_construct(orte_gpr_replica_callbacks_t* cb)
{
cb->message = NULL;
OBJ_CONSTRUCT(&(cb->messages), ompi_list_t);
cb->requestor = NULL;
cb->remote_idtag = 0;
}
/* destructor - used to free any resources held by instance */
static void orte_gpr_replica_callbacks_destructor(orte_gpr_replica_callbacks_t* cb)
{
orte_gpr_replica_notify_msg_list_t *msg;
if (NULL != cb->requestor) {
free(cb->requestor);
cb->requestor = NULL;
}
if (NULL != cb->message) {
OBJ_RELEASE(cb->message);
if (0 < ompi_list_get_size(&(cb->messages))) {
while (NULL != (msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_remove_first(&(cb->messages)))) {
OBJ_RELEASE(msg);
}
}
OBJ_DESTRUCT(&(cb->messages));
}
/* define instance of ompi_class_t */

Просмотреть файл

@ -194,7 +194,7 @@ int orte_ns_nds_env_put(const orte_process_name_t* name,
free(param);
free(value);
asprintf(&value, "%ld", num_procs);
asprintf(&value, "%d", (int)num_procs);
if(NULL == (param = mca_base_param_environ_variable("ns","nds","num_procs"))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;

Просмотреть файл

@ -18,6 +18,7 @@
#include <string.h>
#include "include/constants.h"
#include "util/output.h"
#include "util/proc_info.h"
#include "mca/oob/oob.h"
#include "mca/oob/base/base.h"

Просмотреть файл

@ -307,20 +307,13 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
}
}
/* First barrier -- wait for message from RMGR_PROC_STAGE_GATE_MGR
to arrive */
/* FIRST BARRIER - WAIT FOR MSG FROM RMGR_PROC_STAGE_GATE_MGR TO ARRIVE */
if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL, NULL))) {
ORTE_ERROR_LOG(ret);
error = "ompi_mpi_init: failed to see all procs register\n";
goto error;
}
if (orte_debug_flag) {
ompi_output(0, "[%d,%d,%d] process startup completed",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
/* add all ompi_proc_t's to PML */
if (NULL == (procs = ompi_proc_world(&nprocs))) {
error = "ompi_proc_world() failed";

Двоичные данные
test/Unit-Test-Status.pdf

Двоичный файл не отображается.

Двоичные данные
test/Unit-Test-Status.xls

Двоичный файл не отображается.

Просмотреть файл

@ -55,24 +55,24 @@ static FILE *test_out=NULL;
static char *cmd_str="diff ./test_gpr_replica_out ./test_gpr_replica_out_std";
static void test_cbfunc(orte_gpr_notify_data_t *data, void *user_tag);
static void test_cbfunc1(orte_gpr_notify_data_t *data, void *user_tag);
static void test_cbfunc2(orte_gpr_notify_data_t *data, void *user_tag);
static void test_cbfunc3(orte_gpr_notify_data_t *data, void *user_tag);
static void test_cbfunc4(orte_gpr_notify_data_t *data, void *user_tag);
static void test_cbfunc5(orte_gpr_notify_data_t *data, void *user_tag);
int main(int argc, char **argv)
{
int rc, num_names, num_found, num_counters=6;
int rc, num_names, num_found;
int i, j, cnt, ret;
orte_gpr_value_t *values, value, trig, *trigs;
orte_gpr_subscription_t *subscription;
orte_gpr_notify_id_t sub;
orte_gpr_subscription_t *subscriptions[5];
orte_gpr_notify_id_t sub[5];
char* keys[] = {
/* changes to this ordering need to be reflected in code below */
ORTE_PROC_NUM_AT_STG1,
ORTE_PROC_NUM_AT_STG2,
ORTE_PROC_NUM_AT_STG3,
ORTE_PROC_NUM_FINALIZED,
ORTE_PROC_NUM_ABORTED,
ORTE_PROC_NUM_TERMINATED
"setpoint",
"counter",
};
test_init("test_gpr_replica_trigs");
@ -152,34 +152,10 @@ int main(int argc, char **argv)
exit (1);
}
subscription = OBJ_NEW(orte_gpr_subscription_t);
subscription->addr_mode = ORTE_GPR_TOKENS_OR;
subscription->segment = strdup("test-segment");
subscription->num_tokens = 0;
subscription->tokens = NULL;
subscription->num_keys = 0;
subscription->keys = NULL;
subscription->cbfunc = test_cbfunc;
subscription->user_tag = NULL;
fprintf(test_out, "register subscription on segment\n");
if (ORTE_SUCCESS != (rc = orte_gpr_replica_subscribe(
ORTE_GPR_NOTIFY_ADD_ENTRY,
1, &subscription,
0, NULL,
&sub))) {
fprintf(test_out, "gpr_test_trigs: subscribe on seg failed with error %s\n",
ORTE_ERROR_NAME(rc));
test_failure("gpr_test_trigs: subscribe on seg failed");
test_finalize();
return rc;
} else {
fprintf(test_out, "gpr_test_trigs: subscribe on seg registered\n");
}
orte_gpr.dump_all(0);
/* setup some test counters */
/* setup a pair of counters on the registry - one is the actual
* counter, and the other will hold the end condition when the
* trigger(s) should fire
*/
OBJ_CONSTRUCT(&value, orte_gpr_value_t);
value.addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR;
value.segment = strdup("test-segment");
@ -189,16 +165,16 @@ int main(int argc, char **argv)
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.tokens[0] = strdup(ORTE_JOB_GLOBALS); /* put counters in the segment's globals container */
value.tokens[0] = strdup("test-container");
value.num_tokens = 1;
value.cnt = num_counters;
value.keyvals = (orte_gpr_keyval_t**)malloc(num_counters * sizeof(orte_gpr_keyval_t*));
value.cnt = 2;
value.keyvals = (orte_gpr_keyval_t**)malloc(2 * sizeof(orte_gpr_keyval_t*));
if (NULL == value.keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (i=0; i < num_counters; i++) {
for (i=0; i < 2; i++) {
value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[i]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
@ -222,46 +198,150 @@ int main(int argc, char **argv)
OBJ_DESTRUCT(&value);
return rc;
}
OBJ_DESTRUCT(&value); /* clean up */
orte_gpr.dump_segments(0);
fprintf(test_out, "incrementing all counters\n");
/* put some data on the registry that the subscriptions can return
* we'll first put several keyvals in one container that have different
* keys, plus a few that have the same key.
*/
OBJ_CONSTRUCT(&value, orte_gpr_value_t);
value.addr_mode = ORTE_GPR_NO_OVERWRITE |
ORTE_GPR_TOKENS_XAND |
ORTE_GPR_KEYS_OR;
value.segment = strdup("test-segment");
value.num_tokens = 2;
value.tokens = (char**)malloc(value.num_tokens * sizeof(char*));
for (i=0; i < 2; i++) {
asprintf(&(value.tokens[i]), "dummy%d", i);
}
value.cnt = 5;
value.keyvals = (orte_gpr_keyval_t**)malloc(5*sizeof(orte_gpr_keyval_t*));
for (i=0; i < 5; i++) value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
(value.keyvals[0])->key = strdup("stupid-value-one");
(value.keyvals[0])->type = ORTE_INT32;
(value.keyvals[0])->value.i32 = 654321;
(value.keyvals[1])->key = strdup("stupid-value-two");
(value.keyvals[1])->type = ORTE_INT16;
(value.keyvals[1])->value.i16 = 128;
for (i=2; i < 5; i++) {
(value.keyvals[i])->key = strdup("stupid-value-multi");
(value.keyvals[i])->type = ORTE_INT32;
(value.keyvals[i])->value.i32 = i * 10;
}
values = &value;
/* increment the counters */
if (ORTE_SUCCESS != (rc = orte_gpr.increment_value(&value))) {
if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &values))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&value);
return rc;
}
OBJ_DESTRUCT(&value); /* clean up */
orte_gpr.dump_segments(0);
/* now put some data in a second container, some of it with
* matching keys
*/
OBJ_CONSTRUCT(&value, orte_gpr_value_t);
value.addr_mode = ORTE_GPR_NO_OVERWRITE |
ORTE_GPR_TOKENS_XAND |
ORTE_GPR_KEYS_OR;
value.segment = strdup("test-segment");
value.num_tokens = 2;
value.tokens = (char**)malloc(value.num_tokens * sizeof(char*));
for (i=0; i < 2; i++) {
asprintf(&(value.tokens[i]), "dummy%d", i+5);
}
value.cnt = 3;
value.keyvals = (orte_gpr_keyval_t**)malloc(5*sizeof(orte_gpr_keyval_t*));
for (i=0; i < 3; i++) value.keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
(value.keyvals[0])->key = strdup("stupid-value-one");
(value.keyvals[0])->type = ORTE_INT32;
(value.keyvals[0])->value.i32 = 123456;
(value.keyvals[1])->key = strdup("stupid-value-three");
(value.keyvals[1])->type = ORTE_INT16;
(value.keyvals[1])->value.i16 = 821;
(value.keyvals[2])->key = strdup("stupid-value-multi");
(value.keyvals[2])->type = ORTE_INT32;
(value.keyvals[2])->value.i32 = 2348;
values = &value;
fprintf(test_out, "decrementing all counters\n");
/* decrement the counters */
if (ORTE_SUCCESS != (rc = orte_gpr.decrement_value(&value))) {
if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &values))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&value);
return rc;
}
OBJ_DESTRUCT(&value);
OBJ_DESTRUCT(&value); /* clean up */
orte_gpr.dump_segments(0);
/* setup the subscriptions, each defining a set of data that is to be
* returned to the corresponding callback function
*/
for (i=0; i < 5; i++) {
subscriptions[i] = OBJ_NEW(orte_gpr_subscription_t);
subscriptions[i]->addr_mode = ORTE_GPR_TOKENS_OR;
subscriptions[i]->segment = strdup("test-segment");
subscriptions[i]->user_tag = NULL;
}
/* sub-0 asks for the stupid-value-one data from the first
* container ONLY
*/
subscriptions[0]->num_tokens = 2;
subscriptions[0]->tokens = (char**)malloc(2*sizeof(char*));
for (i=0; i < 2; i++) {
asprintf(&(subscriptions[0]->tokens[i]), "dummy%d", i);
}
subscriptions[0]->num_keys = 1;
subscriptions[0]->keys =(char**)malloc(sizeof(char*));
subscriptions[0]->keys[0] = strdup("stupid-value-one");
subscriptions[0]->cbfunc = test_cbfunc1;
/* sub-1 asks for the stupid-value-one data from ALL containers
*/
subscriptions[1]->num_tokens = 0;
subscriptions[1]->tokens = NULL;
subscriptions[1]->num_keys = 1;
subscriptions[1]->keys =(char**)malloc(sizeof(char*));
subscriptions[1]->keys[0] = strdup("stupid-value-one");
subscriptions[1]->cbfunc = test_cbfunc2;
/* for testing the trigger, we'll just use the prior subscription setup.
* setup the trigger information - initialize the common elements */
/* sub-2 asks for the stupid-value-multi data from the first
* container ONLY
*/
subscriptions[2]->num_tokens = 2;
subscriptions[2]->tokens = (char**)malloc(2*sizeof(char*));
for (i=0; i < 2; i++) {
asprintf(&(subscriptions[2]->tokens[i]), "dummy%d", i);
}
subscriptions[2]->num_keys = 1;
subscriptions[2]->keys =(char**)malloc(sizeof(char*));
subscriptions[2]->keys[0] = strdup("stupid-value-multi");
subscriptions[2]->cbfunc = test_cbfunc3;
/* sub-3 asks for the stupid-value-three data from ALL containers */
subscriptions[3]->num_tokens = 0;
subscriptions[3]->tokens = NULL;
subscriptions[3]->num_keys = 1;
subscriptions[3]->keys =(char**)malloc(sizeof(char*));
subscriptions[3]->keys[0] = strdup("stupid-value-three");
subscriptions[3]->cbfunc = test_cbfunc4;
/* sub-4 asks for ALL data from ALL containers */
subscriptions[4]->num_tokens = 0;
subscriptions[4]->tokens = NULL;
subscriptions[4]->num_keys = 0;
subscriptions[4]->keys = NULL;
subscriptions[4]->cbfunc = test_cbfunc5;
/* setup the trigger information - initialize the common elements */
OBJ_CONSTRUCT(&trig, orte_gpr_value_t);
trig.addr_mode = ORTE_GPR_TOKENS_XAND;
trig.segment = strdup("test-segment");
trig.tokens = (char**)malloc(sizeof(char*));
if (NULL == trig.tokens) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
for (i=0; i < 5; i++) OBJ_RELEASE(subscriptions[i]);
OBJ_DESTRUCT(&trig);
return ORTE_ERR_OUT_OF_RESOURCE;
}
trig.tokens[0] = strdup(ORTE_JOB_GLOBALS);
trig.tokens[0] = strdup("test-container");
trig.num_tokens = 1;
trig.cnt = 2;
trig.keyvals = (orte_gpr_keyval_t**)malloc(2*sizeof(orte_gpr_keyval_t*));
@ -276,75 +356,77 @@ int main(int argc, char **argv)
fprintf(test_out, "setting trigger\n");
trigs = &trig;
/* enter things as three different subscriptions */
rc = orte_gpr.subscribe(
ORTE_GPR_TRIG_CMP_LEVELS | ORTE_GPR_TRIG_MONITOR_ONLY,
1, &subscription,
2, subscriptions,
1, &trigs,
&sub);
sub);
OBJ_RELEASE(subscription);
OBJ_DESTRUCT(&trig);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_gpr.subscribe(
ORTE_GPR_TRIG_CMP_LEVELS | ORTE_GPR_TRIG_MONITOR_ONLY,
2, &(subscriptions[2]),
1, &trigs,
sub);
rc = orte_gpr.subscribe(
ORTE_GPR_TRIG_CMP_LEVELS | ORTE_GPR_TRIG_MONITOR_ONLY,
1, &(subscriptions[4]),
1, &trigs,
sub);
for (i=0; i < 5; i++) OBJ_RELEASE(subscriptions[i]);
OBJ_DESTRUCT(&trig);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
orte_gpr.dump_triggers(0);
for (j=0; j < 2; j++) {
fprintf(test_out, "incrementing until trigger\n");
/* increment the value in keys[1] until the trig fires */
OBJ_CONSTRUCT(&value, orte_gpr_value_t);
value.addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR;
value.segment = strdup("test-segment");
value.tokens = (char**)malloc(sizeof(char*));
if (NULL == value.tokens) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.tokens[0] = strdup(ORTE_JOB_GLOBALS); /* put counters in the segment's globals container */
value.num_tokens = 1;
value.keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
if (NULL == value.keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.cnt = 1;
value.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.keyvals[0]->key = strdup(keys[1]);
value.keyvals[0]->type = ORTE_NULL;
for (i=0; i < 10; i++) {
fprintf(test_out, "\tincrement %s\n", keys[1]);
if (ORTE_SUCCESS != (rc = orte_gpr.increment_value(&value))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&value);
return rc;
}
}
orte_gpr.dump_all(0);
if (j == 0) {
fprintf(test_out, "updating value in keys[1] to test update_storage_locations\n");
/* now update the value in keys[1] and do it again */
value.addr_mode = value.addr_mode | ORTE_GPR_OVERWRITE;
value.keyvals[0]->type = ORTE_UINT32;
value.keyvals[0]->value.ui32 = 0;
values = &value;
orte_gpr.put(1, &values);
}
fprintf(test_out, "incrementing until trigger\n");
/* increment the value in keys[1] until the trig fires */
OBJ_CONSTRUCT(&value, orte_gpr_value_t);
value.addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR;
value.segment = strdup("test-segment");
value.tokens = (char**)malloc(sizeof(char*));
if (NULL == value.tokens) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.tokens[0] = strdup("test-container");
value.num_tokens = 1;
value.keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
if (NULL == value.keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.cnt = 1;
value.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.keyvals[0]->key = strdup(keys[1]);
value.keyvals[0]->type = ORTE_NULL;
for (i=0; i < 10; i++) {
fprintf(test_out, "\tincrement %s\n", keys[1]);
if (ORTE_SUCCESS != (rc = orte_gpr.increment_value(&value))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&value);
return rc;
}
}
orte_gpr.dump_all(0);
OBJ_DESTRUCT(&value);
orte_dps_close();
orte_gpr_base_close();
@ -370,9 +452,37 @@ int main(int argc, char **argv)
return(0);
}
void test_cbfunc(orte_gpr_notify_data_t *data, void *tag)
void test_cbfunc1(orte_gpr_notify_data_t *data, void *tag)
{
fprintf(test_out, "TRIGGER FIRED AND RECEIVED\n");
fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 1\n");
orte_gpr.dump_notify_data(data, 0);
}
void test_cbfunc2(orte_gpr_notify_data_t *data, void *tag)
{
fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 2\n");
orte_gpr.dump_notify_data(data, 0);
}
void test_cbfunc3(orte_gpr_notify_data_t *data, void *tag)
{
fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 3\n");
orte_gpr.dump_notify_data(data, 0);
}
void test_cbfunc4(orte_gpr_notify_data_t *data, void *tag)
{
fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 4\n");
orte_gpr.dump_notify_data(data, 0);
}
void test_cbfunc5(orte_gpr_notify_data_t *data, void *tag)
{
fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 5\n");
orte_gpr.dump_notify_data(data, 0);
}