diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c index f4fb2b0e9b..d05c91e84c 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c @@ -278,6 +278,10 @@ static void orte_gpr_replica_dump_trigger(orte_buffer_t *buffer, int cnt, tmp_out = strdup("\t\tORTE_GPR_TRIG_ONE_SHOT"); orte_gpr_replica_dump_load_string(buffer, &tmp_out); } + if (ORTE_GPR_TRIG_AT_LEVEL & trig->action) { + tmp_out = strdup("\t\tORTE_GPR_TRIG_AT_LEVEL"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } if (ORTE_GPR_TRIG_CMP_LEVELS & trig->action) { tmp_out = strdup("\t\tORTE_GPR_TRIG_CMP_LEVELS"); orte_gpr_replica_dump_load_string(buffer, &tmp_out); @@ -392,7 +396,11 @@ static void orte_gpr_replica_dump_trigger(orte_buffer_t *buffer, int cnt, } /* for i */ if (0 < trig->num_counters) { - asprintf(&tmp_out, "\tTrigger monitoring %d counters", trig->num_counters); + if (ORTE_GPR_TRIG_AT_LEVEL & trig->action) { + asprintf(&tmp_out, "\tTrigger monitoring %d counters for level", trig->num_counters); + } else { + asprintf(&tmp_out, "\tTrigger monitoring %d counters for compare", trig->num_counters); + } orte_gpr_replica_dump_load_string(buffer, &tmp_out); cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr); for (i=0; i < (trig->counters)->size; i++) { @@ -400,9 +408,16 @@ static void orte_gpr_replica_dump_trigger(orte_buffer_t *buffer, int cnt, ORTE_SUCCESS == orte_gpr_replica_dict_reverse_lookup(&token, cntr[i]->seg, (cntr[i]->iptr)->itag)) { asprintf(&tmp_out, "\t\tCounter: %d\tSegment: %s\tName: %s", i, - (cntr[i]->seg)->name, token); + (cntr[i]->seg)->name, token); free(token); orte_gpr_replica_dump_load_string(buffer, &tmp_out); + if (ORTE_GPR_TRIG_AT_LEVEL & trig->action) { + asprintf(&tmp_out, "\t\tTrigger Level:"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + orte_gpr_replica_dump_itagval_value(buffer, &(cntr[i]->trigger_level)); + } + asprintf(&tmp_out, "\t\tCurrent Value:"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); orte_gpr_replica_dump_itagval_value(buffer, cntr[i]->iptr); } } diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c index 967f92fad0..59383c341a 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c @@ -337,7 +337,7 @@ int orte_gpr_replica_add_values(orte_gpr_notify_data_t **data, for (j=0; j < values[i]->num_tokens; j++) { ompi_output(0, "\ttoken num: %d\tToken: %s", j, values[i]->tokens[j]); } - ompi_output(0, "\tGot %d keyals:", values[i]->cnt); + ompi_output(0, "\tGot %d keyvals:", values[i]->cnt); /* for (j=0; j < values[i]->cnt; j++) { ompi_output(0, "\tValue num: %d\tKey: %s", j, (values[i]->keyvals[j])->key); orte_gpr_base_dump_keyval_value(values[i]->keyvals[j], 0); diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_subscribe_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_subscribe_fn.c index 8e707266e4..fad1d37f5e 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_subscribe_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_subscribe_fn.c @@ -208,7 +208,9 @@ int orte_gpr_replica_subscribe_fn(orte_gpr_notify_action_t action, int num_subs, cntr->seg = seg; cntr->cptr = cptr2; cntr->iptr = iptr; - if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_value((void*)(&(cntr->trigger_level)), iptr))) { + cntr->trigger_level.type = (trigs[j]->keyvals[i])->type; + if (ORTE_SUCCESS != (rc = orte_gpr_replica_xfer_payload(&(cntr->trigger_level.value), + &((trigs[j]->keyvals[i])->value), (trigs[j]->keyvals[i])->type))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } @@ -251,7 +253,9 @@ int orte_gpr_replica_subscribe_fn(orte_gpr_notify_action_t action, int num_subs, cntr->seg = seg; cntr->cptr = cptr[k]; cntr->iptr = iptr; - if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_value((void*)(&(cntr->trigger_level)), iptr))) { + cntr->trigger_level.type = (trigs[j]->keyvals[i])->type; + if (ORTE_SUCCESS != (rc = orte_gpr_replica_xfer_payload(&(cntr->trigger_level.value), + &((trigs[j]->keyvals[i])->value), (trigs[j]->keyvals[i])->type))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c index 5943f56da9..8be554ff6e 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c @@ -28,6 +28,7 @@ #include "mca/errmgr/errmgr.h" #include "mca/ns/ns.h" #include "util/output.h" +#include "mca/gpr/replica/api_layer/gpr_replica_api.h" #include "mca/gpr/replica/transition_layer/gpr_replica_tl.h" #include "gpr_replica_fn.h" @@ -207,31 +208,37 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig) } goto FIRED; } - } - return ORTE_SUCCESS; - - /* not comparing levels - check instead to see if counters are at a level */ - cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr); - fire = true; - for (i=0; i < (trig->counters)->size && fire; i++) { - if (NULL != cntr[i]) { - if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_value(&level, cntr[i]->iptr))) { + return ORTE_SUCCESS;\ + + } else if (ORTE_GPR_TRIG_AT_LEVEL & trig->action) { /* see if counters are at a level */ + cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr); + fire = true; + for (i=0; i < (trig->counters)->size && fire; i++) { + if (NULL != cntr[i]) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_value(&level, cntr[i]->iptr))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_value(&level2, &(cntr[i]->trigger_level)))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if (level2 != level) { + fire = false; + } + } + } + if (fire) { /* all counters at specified trigger level */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig))) { ORTE_ERROR_LOG(rc); return rc; } - if (cntr[i]->trigger_level != level) { - fire = false; - } + goto FIRED; } + return ORTE_SUCCESS; } - if (fire) { /* all counters at specified trigger level */ - if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig))) { - ORTE_ERROR_LOG(rc); - return rc; - } - goto FIRED; - } - return ORTE_SUCCESS; + + return ORTE_SUCCESS; /* neither cmp nor at level set */ FIRED: /* if notify_at_start set, unset it to indicate that trigger fired */ diff --git a/src/mca/gpr/replica/gpr_replica.h b/src/mca/gpr/replica/gpr_replica.h index bb2675b893..34a5d02a91 100644 --- a/src/mca/gpr/replica/gpr_replica.h +++ b/src/mca/gpr/replica/gpr_replica.h @@ -189,7 +189,7 @@ typedef struct { orte_gpr_replica_segment_t *seg; orte_gpr_replica_container_t *cptr; orte_gpr_replica_itagval_t *iptr; - int trigger_level; + orte_gpr_replica_itagval_t trigger_level; } orte_gpr_replica_counter_t; OBJ_CLASS_DECLARATION(orte_gpr_replica_counter_t); diff --git a/src/mca/gpr/replica/gpr_replica_component.c b/src/mca/gpr/replica/gpr_replica_component.c index b250d2e3b3..470191903c 100644 --- a/src/mca/gpr/replica/gpr_replica_component.c +++ b/src/mca/gpr/replica/gpr_replica_component.c @@ -265,12 +265,13 @@ static void orte_gpr_replica_counter_construct(orte_gpr_replica_counter_t* cntr) cntr->seg = NULL; cntr->cptr = NULL; cntr->iptr = NULL; - cntr->trigger_level = 0; + OBJ_CONSTRUCT(&(cntr->trigger_level), orte_gpr_replica_itagval_t); } /* destructor - used to free any resources held by instance */ -static void orte_gpr_replica_counter_destructor(orte_gpr_replica_counter_t* targ) +static void orte_gpr_replica_counter_destructor(orte_gpr_replica_counter_t* cntr) { + OBJ_DESTRUCT(&(cntr->trigger_level)); } /* define instance of ompi_class_t */ diff --git a/src/mca/rmgr/base/rmgr_base_stage_gate.c b/src/mca/rmgr/base/rmgr_base_stage_gate.c index e7dccf3a74..964109c934 100644 --- a/src/mca/rmgr/base/rmgr_base_stage_gate.c +++ b/src/mca/rmgr/base/rmgr_base_stage_gate.c @@ -26,6 +26,8 @@ #include "include/orte_constants.h" #include "include/orte_types.h" +#include "util/output.h" + #include "dps/dps.h" #include "mca/gpr/gpr.h" #include "mca/errmgr/errmgr.h" @@ -81,7 +83,7 @@ int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job) } value.keyvals[i]->key = strdup(keys[i]); value.keyvals[i]->type = ORTE_UINT32; - value.keyvals[i]->value.i32 = 0; + value.keyvals[i]->value.ui32 = 0; } values = &value; @@ -275,6 +277,7 @@ int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job) OBJ_DESTRUCT(&sub); OBJ_DESTRUCT(&trig); + return ORTE_SUCCESS; } diff --git a/test/mca/gpr/gpr_triggers.c b/test/mca/gpr/gpr_triggers.c index e660f7221c..18eb458908 100644 --- a/test/mca/gpr/gpr_triggers.c +++ b/test/mca/gpr/gpr_triggers.c @@ -60,21 +60,16 @@ static void test_cbfunc2(orte_gpr_notify_data_t *data, void *user_tag); static void test_cbfunc3(orte_gpr_notify_data_t *data, void *user_tag); static void test_cbfunc4(orte_gpr_notify_data_t *data, void *user_tag); static void test_cbfunc5(orte_gpr_notify_data_t *data, void *user_tag); +static void test_cbfunc6(orte_gpr_notify_data_t *data, void *user_tag); + +static int test1(void); +static int test2(void); int main(int argc, char **argv) { - int rc, num_names, num_found; - int i, j, cnt, ret; - orte_gpr_value_t *values, value, trig, *trigs; - orte_gpr_subscription_t *subscriptions[5]; - orte_gpr_notify_id_t sub[5]; - char* keys[] = { - /* changes to this ordering need to be reflected in code below */ - "setpoint", - "counter", - }; - + int rc=0; + test_init("test_gpr_replica_trigs"); if (getenv("TEST_WRITE_TO_FILE") != NULL) { @@ -93,7 +88,7 @@ int main(int argc, char **argv) /* Open up the output streams */ if (!ompi_output_init()) { - return OMPI_ERROR; + exit(1); } /* @@ -105,13 +100,13 @@ int main(int argc, char **argv) ompi_malloc_init(); /* Ensure the system_info structure is instantiated and initialized */ - if (ORTE_SUCCESS != (ret = orte_sys_info())) { - return ret; + if (ORTE_SUCCESS != orte_sys_info()) { + exit(1); } /* Ensure the process info structure is instantiated and initialized */ - if (ORTE_SUCCESS != (ret = orte_proc_info())) { - return ret; + if (ORTE_SUCCESS != orte_proc_info()) { + exit(1); } orte_process_info.seed = true; @@ -152,6 +147,59 @@ int main(int argc, char **argv) exit (1); } + /* test triggers that compare two counters to each other */ + if (ORTE_SUCCESS == test1()) { + fprintf(test_out, "triggers: compare two counters successful\n"); + } else { + fprintf(test_out, "triggers: compare two counters failed\n"); + rc = 1; + } + + /* test triggers that fire at a level */ + if (ORTE_SUCCESS == test2()) { + fprintf(test_out, "triggers: trigger at level successful\n"); + } else { + fprintf(test_out, "triggers: trigger at level failed\n"); + rc = 1; + } + + /* cleanup and finalize */ + orte_dps_close(); + orte_gpr_base_close(); + orte_sys_info_finalize(); + orte_proc_info_finalize(); + mca_base_close(); + ompi_malloc_finalize(); + ompi_output_finalize(); + + + fclose( test_out ); +/* result = system( cmd_str ); + if( result == 0 ) { + test_success(); + } + else { + test_failure( "test_gpr_replica failed"); + } +*/ + test_finalize(); + unlink("test_gpr_replica_out"); + + return rc; +} + +static int test1(void) +{ + int rc, i; + orte_gpr_value_t *values, value, trig, *trigs; + orte_gpr_subscription_t *subscriptions[5]; + orte_gpr_notify_id_t sub[5]; + char* keys[] = { + /* changes to this ordering need to be reflected in code below */ + "setpoint", + "counter", + }; + /* setup a pair of counters on the registry - one is the actual * counter, and the other will hold the end condition when the * trigger(s) should fire @@ -428,30 +476,163 @@ int main(int argc, char **argv) orte_gpr.dump_all(0); OBJ_DESTRUCT(&value); - orte_dps_close(); - orte_gpr_base_close(); - orte_sys_info_finalize(); - orte_proc_info_finalize(); - mca_base_close(); - ompi_malloc_finalize(); - ompi_output_finalize(); - - - fclose( test_out ); -/* result = system( cmd_str ); - if( result == 0 ) { - test_success(); - } - else { - test_failure( "test_gpr_replica failed"); - } -*/ - test_finalize(); - unlink("test_gpr_replica_out"); - - return(0); + return ORTE_SUCCESS; } +int test2(void) +{ + int rc, i; + orte_gpr_value_t *values, value, trig, *trigs; + orte_gpr_subscription_t *subscription; + orte_gpr_notify_id_t sub; + + /* setup a counter on the registry that the trigger will later refer + * to when defining the level at which to fire + */ + OBJ_CONSTRUCT(&value, orte_gpr_value_t); + value.addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR; + value.segment = strdup("test-segment"); + value.tokens = (char**)malloc(sizeof(char*)); + if (NULL == value.tokens) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.tokens[0] = strdup("test-level-trigger"); + value.num_tokens = 1; + value.cnt = 1; + value.keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*)); + if (NULL == value.keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == value.keyvals[0]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.keyvals[0]->key = strdup("level-counter"); + value.keyvals[0]->type = ORTE_UINT32; + value.keyvals[0]->value.ui32 = 0; + + values = &value; + + fprintf(test_out, "putting level test counter on registry\n"); + + /* put the counters on the registry */ + if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &values))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&value); + return rc; + } + OBJ_DESTRUCT(&value); /* clean up */ + + /* setup a subscription that defines a set of data that is to be + * returned to the corresponding callback function + */ + subscription = OBJ_NEW(orte_gpr_subscription_t); + subscription->addr_mode = ORTE_GPR_TOKENS_OR; + subscription->segment = strdup("test-segment"); + subscription->user_tag = NULL; + /* ask for the stupid-value-one data from the first + * container ONLY + */ + subscription->num_tokens = 2; + subscription->tokens = (char**)malloc(2*sizeof(char*)); + for (i=0; i < 2; i++) { + asprintf(&(subscription->tokens[i]), "dummy%d", i); + } + subscription->num_keys = 1; + subscription->keys =(char**)malloc(sizeof(char*)); + subscription->keys[0] = strdup("stupid-value-one"); + subscription->cbfunc = test_cbfunc6; + + /* setup the trigger information - want trigger to fire when + * a specific counter reaches a specified value + */ + OBJ_CONSTRUCT(&trig, orte_gpr_value_t); + trig.addr_mode = ORTE_GPR_TOKENS_XAND; + trig.segment = strdup("test-segment"); + trig.tokens = (char**)malloc(sizeof(char*)); + if (NULL == trig.tokens) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_RELEASE(subscription); + OBJ_DESTRUCT(&trig); + return ORTE_ERR_OUT_OF_RESOURCE; + } + trig.tokens[0] = strdup("test-level-trigger"); + trig.num_tokens = 1; + trig.cnt = 1; + trig.keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*)); + trig.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t); + trig.keyvals[0]->key = strdup("level-counter"); + trig.keyvals[0]->type = ORTE_INT32; + trig.keyvals[0]->value.i32 = 2; + + fprintf(test_out, "setting level trigger\n"); + + trigs = &trig; + + /* enter subscription */ + rc = orte_gpr.subscribe( + ORTE_GPR_TRIG_AT_LEVEL | ORTE_GPR_TRIG_MONITOR_ONLY, + 1, &subscription, + 1, &trigs, + &sub); + + orte_gpr.dump_triggers(0); + + /* cleanup */ + OBJ_RELEASE(subscription); + OBJ_DESTRUCT(&trig); + + fprintf(test_out, "incrementing until level trigger\n"); + /* increment the value in the counter until the trig fires */ + OBJ_CONSTRUCT(&value, orte_gpr_value_t); + value.addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR; + value.segment = strdup("test-segment"); + value.tokens = (char**)malloc(sizeof(char*)); + if (NULL == value.tokens) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.tokens[0] = strdup("test-level-trigger"); + value.num_tokens = 1; + value.keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*)); + if (NULL == value.keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.cnt = 1; + value.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == value.keyvals[0]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.keyvals[0]->key = strdup("level-counter"); + value.keyvals[0]->type = ORTE_NULL; + + for (i=0; i < 10; i++) { + fprintf(test_out, "\tincrement level-counter\n"); + if (ORTE_SUCCESS != (rc = orte_gpr.increment_value(&value))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&value); + return rc; + } + } + + orte_gpr.dump_all(0); + OBJ_DESTRUCT(&value); + + return ORTE_SUCCESS; +} + + void test_cbfunc1(orte_gpr_notify_data_t *data, void *tag) { fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 1\n"); @@ -486,3 +667,10 @@ void test_cbfunc5(orte_gpr_notify_data_t *data, void *tag) orte_gpr.dump_notify_data(data, 0); } + +void test_cbfunc6(orte_gpr_notify_data_t *data, void *tag) +{ + fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 6\n"); + + orte_gpr.dump_notify_data(data, 0); +}