Checkpoint. Fixed a logic problem that removed one-shot subscriptions even though the notifiers were supposed to stay.
This commit was SVN r6052.
Этот коммит содержится в:
родитель
729dc65935
Коммит
83cba7f7cf
@ -113,6 +113,13 @@ CLEANUP:
|
||||
trigs = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr);
|
||||
for (i=0; i < (orte_gpr_replica.triggers)->size; i++) {
|
||||
if (NULL != trigs[i] && trigs[i]->one_shot_fired) {
|
||||
/* if no notify actions were specified to stick around after
|
||||
* the trigger, then simply remove the trigger. Otherwise,
|
||||
* clear the trigger flags and leave the notify flags alone
|
||||
*/
|
||||
if (ORTE_GPR_NOTIFY_ANY & trigs[i]->action) {
|
||||
trigs[i]->action = trigs[i]->action & ORTE_GPR_NOTIFY_ANY;
|
||||
} else {
|
||||
k = trigs[i]->index;
|
||||
OBJ_RELEASE(trigs[i]);
|
||||
if (ORTE_SUCCESS != (rc = orte_pointer_array_set_item(orte_gpr_replica.triggers,
|
||||
@ -122,6 +129,7 @@ CLEANUP:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -140,12 +148,18 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig,
|
||||
for (cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_first(&(orte_gpr_replica.callbacks));
|
||||
cb != (orte_gpr_replica_callbacks_t*)ompi_list_get_end(&(orte_gpr_replica.callbacks));
|
||||
cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_next(cb)) {
|
||||
if (0 == orte_ns.compare(ORTE_NS_CMP_ALL, trig->requestor, cb->requestor)) { /* same requestor - add to existing callback */
|
||||
if ((NULL == trig->requestor && NULL == cb->requestor) /* both local */
|
||||
|| ((NULL != trig->requestor && NULL != cb->requestor) &&
|
||||
0 == orte_ns.compare(ORTE_NS_CMP_ALL, trig->requestor, cb->requestor))) { /* same remote requestor - add to existing callback */
|
||||
/* check to see if we already have something for this trigger - if so, add to it */
|
||||
for (msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_first(&(cb->messages));
|
||||
msg != (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_end(&(cb->messages));
|
||||
msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_next(msg)) {
|
||||
if ((msg->message)->idtag == trig->index) { /* same trigger - add to it */
|
||||
if (orte_gpr_replica_globals.debug) {
|
||||
ompi_output(0, "[%lu,%lu,%lu] process_trig: adding message for requestor [%lu,%lu,%lu] for idtag %lu\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(cb->requestor), (unsigned long)trig->index);
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc =
|
||||
orte_gpr_replica_construct_notify_message(msg->message,
|
||||
trig, sub, value))) {
|
||||
@ -194,8 +208,8 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig,
|
||||
if (NULL == trig->requestor) { /* local request - queue local callback */
|
||||
cb->requestor = NULL;
|
||||
if (orte_gpr_replica_globals.debug) {
|
||||
ompi_output(0, "[%lu,%lu,%lu] process_trig: queueing local message\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name));
|
||||
ompi_output(0, "[%lu,%lu,%lu] process_trig: queueing local callback for idtag %lu\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name), (unsigned long)trig->index);
|
||||
}
|
||||
|
||||
} else { /* remote request - queue remote callback */
|
||||
@ -205,14 +219,14 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig,
|
||||
return rc;
|
||||
}
|
||||
if (orte_gpr_replica_globals.debug) {
|
||||
ompi_output(0, "[%lu,%lu,%lu] process_trig: queueing message for [%lu,%lu,%lu] using remoteid %d\n",
|
||||
ompi_output(0, "[%lu,%lu,%lu] process_trig: queueing callback for [%lu,%lu,%lu] using remoteid %lu\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(cb->requestor),
|
||||
(int)trig->remote_idtag);
|
||||
(unsigned long)trig->remote_idtag);
|
||||
}
|
||||
}
|
||||
ompi_list_append(&orte_gpr_replica.callbacks, &cb->item);
|
||||
|
||||
/* construct the message */
|
||||
/* construct the message list item */
|
||||
msg = OBJ_NEW(orte_gpr_replica_notify_msg_list_t);
|
||||
if (NULL == msg) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
|
@ -909,8 +909,9 @@ int test4(void)
|
||||
asprintf(&(subscription->tokens[i]), "dummy-sub-%lu", (unsigned long) i);
|
||||
}
|
||||
/* get notified when anything is added */
|
||||
subscription->num_keys = 0;
|
||||
subscription->keys = NULL;
|
||||
subscription->num_keys = 1;
|
||||
subscription->keys = (char**)malloc(sizeof(char*));
|
||||
subscription->keys[0] = strdup("stupid-test-0");
|
||||
|
||||
/* send notification to callback 1 */
|
||||
subscription->cbfunc = test_cbfunc1;
|
||||
@ -968,9 +969,9 @@ int test4(void)
|
||||
gpr_module->dump_all(0);
|
||||
OBJ_RELEASE(val);
|
||||
|
||||
fprintf(test_out, "adding something - should NOT trigger\n");
|
||||
fprintf(test_out, "changing something else - should NOT trigger\n");
|
||||
val = OBJ_NEW(orte_gpr_value_t);
|
||||
val->addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
|
||||
val->addr_mode = ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
|
||||
val->segment = strdup("test-segment-2");
|
||||
val->tokens = (char**)malloc(sizeof(char*));
|
||||
if (NULL == val->tokens) {
|
||||
@ -978,7 +979,7 @@ int test4(void)
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->tokens[0] = strdup("dummy-sub-10");
|
||||
val->tokens[0] = strdup("dummy-sub-0");
|
||||
val->num_tokens = 1;
|
||||
val->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
|
||||
if (NULL == val->keyvals) {
|
||||
@ -993,8 +994,9 @@ int test4(void)
|
||||
OBJ_DESTRUCT(&value);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
val->keyvals[0]->key = strdup("test-notify-add-no-fire");
|
||||
val->keyvals[0]->type = ORTE_NULL;
|
||||
val->keyvals[0]->key = strdup("stupid-test-1");
|
||||
val->keyvals[0]->type = ORTE_UINT32;
|
||||
val->keyvals[0]->value.ui32 = 6789;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user