Bring the rest of the notification modes online. Update the unit test to cover notify-on-change.
This commit was SVN r6043.
Этот коммит содержится в:
родитель
e580c98365
Коммит
098cc8cf3a
@ -375,9 +375,24 @@ int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig,
|
||||
i < (orte_gpr_replica_globals.acted_upon)->size; i++) {
|
||||
if (NULL != ptr[i]) {
|
||||
cntr++;
|
||||
if ((trig->action & ORTE_GPR_NOTIFY_ADD_ENTRY) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED) &&
|
||||
orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
|
||||
if (
|
||||
(((trig->action & ORTE_GPR_NOTIFY_ADD_ENTRY) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED)) ||
|
||||
|
||||
((trig->action & ORTE_GPR_NOTIFY_DEL_ENTRY) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED)) ||
|
||||
|
||||
((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO)) ||
|
||||
|
||||
((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM)) ||
|
||||
|
||||
((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED)))
|
||||
|
||||
&& orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
|
||||
|
||||
/* need to send back the tokens from the container that is being addressed! */
|
||||
value.num_tokens = ptr[i]->cptr->num_itags;
|
||||
value.tokens = (char **)malloc(value.num_tokens * sizeof(char*));
|
||||
@ -392,7 +407,7 @@ int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig,
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
/* send back the added entry */
|
||||
/* send back the recorded data */
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup(
|
||||
&((value.keyvals[0])->key), sub->seg,
|
||||
ptr[i]->iptr->itag))) {
|
||||
@ -411,27 +426,6 @@ int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig,
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
} else if ((trig->action & ORTE_GPR_NOTIFY_DEL_ENTRY) &
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED) &&
|
||||
orte_gpr_replica_check_notify_matches(sub, ptr[i])){
|
||||
/* send back the deleted entry */
|
||||
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO) &&
|
||||
orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
|
||||
/* ptr contains the "new" data - check to see if it matches
|
||||
* the subscription. if so, send back the new data
|
||||
*/
|
||||
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM) &&
|
||||
orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
|
||||
/* ptr contains the "old" data - check to see if it matches
|
||||
* the subscription. if so, send back the new data
|
||||
*/
|
||||
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
|
||||
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED) &&
|
||||
orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
|
||||
/* see if the acted_upon data was the target of the subscription */
|
||||
/* send back the new data */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -76,6 +76,7 @@ static void test_cbfunc6(orte_gpr_notify_data_t *data, void *user_tag);
|
||||
static int test1(void);
|
||||
static int test2(void);
|
||||
static int test3(void);
|
||||
static int test4(void);
|
||||
|
||||
|
||||
int main(int argc, char **argv)
|
||||
@ -228,6 +229,14 @@ int main(int argc, char **argv)
|
||||
rc = 1;
|
||||
}
|
||||
|
||||
/* test notification on value changed */
|
||||
if (ORTE_SUCCESS == test4()) {
|
||||
fprintf(test_out, "triggers: notify upon value changed successful\n");
|
||||
} else {
|
||||
fprintf(test_out, "triggers: notify upon value changed failed\n");
|
||||
rc = 1;
|
||||
}
|
||||
|
||||
/* cleanup and finalize */
|
||||
orte_dps_close();
|
||||
orte_gpr_base_close();
|
||||
@ -809,6 +818,193 @@ int test3(void)
|
||||
gpr_module->dump_all(0);
|
||||
OBJ_RELEASE(val);
|
||||
|
||||
fprintf(test_out, "adding something - should NOT trigger\n");
|
||||
val = OBJ_NEW(orte_gpr_value_t);
|
||||
val->addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
|
||||
val->segment = strdup("test-segment");
|
||||
val->tokens = (char**)malloc(sizeof(char*));
|
||||
if (NULL == val->tokens) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->tokens[0] = strdup("dummy-sub-10");
|
||||
val->num_tokens = 1;
|
||||
val->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
|
||||
if (NULL == val->keyvals) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->cnt = 1;
|
||||
val->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
|
||||
if (NULL == val->keyvals[0]) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->keyvals[0]->key = strdup("test-notify-add-no-fire");
|
||||
val->keyvals[0]->type = ORTE_NULL;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(val);
|
||||
return rc;
|
||||
}
|
||||
|
||||
gpr_module->dump_all(0);
|
||||
OBJ_RELEASE(val);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int test4(void)
|
||||
{
|
||||
int rc;
|
||||
size_t i;
|
||||
orte_gpr_value_t value, *val;
|
||||
orte_gpr_subscription_t *subscription;
|
||||
orte_gpr_notify_id_t sub;
|
||||
|
||||
/* put something on the registry to start */
|
||||
val = OBJ_NEW(orte_gpr_value_t);
|
||||
val->addr_mode = ORTE_GPR_NO_OVERWRITE | ORTE_GPR_TOKENS_XAND;
|
||||
val->segment = strdup("test-segment-2");
|
||||
val->num_tokens = 10;
|
||||
val->tokens = (char**)malloc(val->num_tokens * sizeof(char*));
|
||||
for (i=0; i < val->num_tokens; i++) {
|
||||
asprintf(&(val->tokens[i]), "dummy-sub-%lu", (unsigned long) i);
|
||||
}
|
||||
val->cnt = 20;
|
||||
val->keyvals = (orte_gpr_keyval_t**)malloc(val->cnt * sizeof(orte_gpr_keyval_t*));
|
||||
for (i=0; i<val->cnt; i++) {
|
||||
val->keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
|
||||
asprintf(&((val->keyvals[i])->key), "stupid-test-%lu",
|
||||
(unsigned long) i);
|
||||
(val->keyvals[i])->type = ORTE_UINT32;
|
||||
(val->keyvals[i])->value.ui32 = (uint32_t)i;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) {
|
||||
fprintf(test_out, "put failed with error code %s\n",
|
||||
ORTE_ERROR_NAME(rc));
|
||||
test_failure("put failed");
|
||||
test_finalize();
|
||||
return rc;
|
||||
}
|
||||
OBJ_RELEASE(val);
|
||||
|
||||
/* setup a subscription on one of the containers
|
||||
* that notifies callback 1 if something is changed
|
||||
*/
|
||||
subscription = OBJ_NEW(orte_gpr_subscription_t);
|
||||
subscription->addr_mode = ORTE_GPR_TOKENS_OR;
|
||||
subscription->segment = strdup("test-segment-2");
|
||||
subscription->user_tag = NULL;
|
||||
/* monitor the dummy-sub-xx container only
|
||||
*/
|
||||
subscription->num_tokens = 2;
|
||||
subscription->tokens = (char**)malloc(2*sizeof(char*));
|
||||
for (i=0; i < 2; i++) {
|
||||
asprintf(&(subscription->tokens[i]), "dummy-sub-%lu", (unsigned long) i);
|
||||
}
|
||||
/* get notified when anything is added */
|
||||
subscription->num_keys = 0;
|
||||
subscription->keys = NULL;
|
||||
|
||||
/* send notification to callback 1 */
|
||||
subscription->cbfunc = test_cbfunc1;
|
||||
|
||||
/* enter subscription */
|
||||
rc = gpr_module->subscribe(
|
||||
ORTE_GPR_NOTIFY_VALUE_CHG,
|
||||
1, &subscription,
|
||||
0, NULL,
|
||||
&sub);
|
||||
|
||||
gpr_module->dump_triggers(0);
|
||||
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(subscription);
|
||||
|
||||
|
||||
/* add something to the container */
|
||||
|
||||
fprintf(test_out, "changing something - should trigger\n");
|
||||
val = OBJ_NEW(orte_gpr_value_t);
|
||||
val->addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
|
||||
val->segment = strdup("test-segment-2");
|
||||
val->tokens = (char**)malloc(sizeof(char*));
|
||||
if (NULL == val->tokens) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->tokens[0] = strdup("dummy-sub-0");
|
||||
val->num_tokens = 1;
|
||||
val->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
|
||||
if (NULL == val->keyvals) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->cnt = 1;
|
||||
val->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
|
||||
if (NULL == val->keyvals[0]) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->keyvals[0]->key = strdup("stupid-test-0");
|
||||
val->keyvals[0]->type = ORTE_UINT32;
|
||||
val->keyvals[0]->value.ui32 = 12345;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(val);
|
||||
return rc;
|
||||
}
|
||||
|
||||
gpr_module->dump_all(0);
|
||||
OBJ_RELEASE(val);
|
||||
|
||||
fprintf(test_out, "adding something - should NOT trigger\n");
|
||||
val = OBJ_NEW(orte_gpr_value_t);
|
||||
val->addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
|
||||
val->segment = strdup("test-segment-2");
|
||||
val->tokens = (char**)malloc(sizeof(char*));
|
||||
if (NULL == val->tokens) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->tokens[0] = strdup("dummy-sub-10");
|
||||
val->num_tokens = 1;
|
||||
val->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
|
||||
if (NULL == val->keyvals) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->cnt = 1;
|
||||
val->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
|
||||
if (NULL == val->keyvals[0]) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->keyvals[0]->key = strdup("test-notify-add-no-fire");
|
||||
val->keyvals[0]->type = ORTE_NULL;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(val);
|
||||
return rc;
|
||||
}
|
||||
|
||||
gpr_module->dump_all(0);
|
||||
OBJ_RELEASE(val);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user