Checkpoint the continuing re-enablement of the notifiers.
Also added a check to protect the callback system from an error being seen by Tim P. - should help with debugging. This commit was SVN r6010.
Этот коммит содержится в:
родитель
4d1b9d3f1d
Коммит
51380eba13
@ -240,10 +240,10 @@ int orte_gpr_replica_dump_callbacks_fn(orte_buffer_t *buffer)
|
||||
for (i=0; i < (orte_gpr_replica_globals.acted_upon)->size; i++) {
|
||||
if (NULL != action[i]) {
|
||||
if (NULL != action[i]->seg) {
|
||||
sprintf(tmp_out, "Action Taken on Segment: %s", action[i]->seg->name);
|
||||
sprintf(tmp_out, "\nAction Taken on Segment: %s", action[i]->seg->name);
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
} else {
|
||||
sprintf(tmp_out, "Action Taken on NULL Segment");
|
||||
sprintf(tmp_out, "\nAction Taken on NULL Segment");
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
}
|
||||
if (NULL != action[i]->cptr) {
|
||||
@ -272,36 +272,40 @@ int orte_gpr_replica_dump_callbacks_fn(orte_buffer_t *buffer)
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
}
|
||||
if (NULL != action[i]->iptr) {
|
||||
if (ORTE_GPR_REPLICA_ENTRY_ADDED == action[i]->action) {
|
||||
if (ORTE_GPR_REPLICA_ENTRY_ADDED & action[i]->action) {
|
||||
sprintf(tmp_out, "\n\tKeyval ADDED:");
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
} else if (ORTE_GPR_REPLICA_ENTRY_DELETED == action[i]->action) {
|
||||
}
|
||||
if (ORTE_GPR_REPLICA_ENTRY_DELETED & action[i]->action) {
|
||||
sprintf(tmp_out, "\n\tKeyval DELETED:");
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
} else if (ORTE_GPR_REPLICA_ENTRY_CHANGED == action[i]->action) {
|
||||
sprintf(tmp_out, "\n\tKeyval CHANGED:");
|
||||
}
|
||||
if (ORTE_GPR_REPLICA_ENTRY_CHANGED & action[i]->action) {
|
||||
sprintf(tmp_out, "\n\tKeyval CHANGED");
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
} else if (ORTE_GPR_REPLICA_ENTRY_CHG_TO == action[i]->action) {
|
||||
sprintf(tmp_out, "\n\tKeyval CHANGED TO:");
|
||||
}
|
||||
if (ORTE_GPR_REPLICA_ENTRY_CHG_TO & action[i]->action) {
|
||||
sprintf(tmp_out, "\t\tKeyval CHANGED TO:");
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
} else if (ORTE_GPR_REPLICA_ENTRY_CHG_FRM == action[i]->action) {
|
||||
sprintf(tmp_out, "\n\tKeyval CHANGED FROM:");
|
||||
}
|
||||
if (ORTE_GPR_REPLICA_ENTRY_CHG_FRM & action[i]->action) {
|
||||
sprintf(tmp_out, "\t\tKeyval CHANGED FROM:");
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup(
|
||||
&token, action[i]->seg, action[i]->iptr->itag)) {
|
||||
sprintf(tmp_out, "\n\t\tNo entry found for itag %lu",
|
||||
sprintf(tmp_out, "\t\tNo entry found for itag %lu",
|
||||
(unsigned long) action[i]->iptr->itag);
|
||||
} else {
|
||||
sprintf(tmp_out, "\n\t\titag %lu\tKey: %s",
|
||||
sprintf(tmp_out, "\t\titag %lu\tKey: %s",
|
||||
(unsigned long) action[i]->iptr->itag, token);
|
||||
free(token);
|
||||
}
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
orte_gpr_replica_dump_itagval_value(buffer, action[i]->iptr);
|
||||
} else {
|
||||
sprintf(tmp_out, "\n\tNULL Keyval");
|
||||
sprintf(tmp_out, "\tNULL Keyval");
|
||||
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ int orte_gpr_replica_process_callbacks(void)
|
||||
*/
|
||||
while (NULL != (msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_remove_first(&(cb->messages)))) {
|
||||
trig = (orte_gpr_replica_triggers_t*)((orte_gpr_replica.triggers)->addr[(msg->message)->idtag]);
|
||||
if (NULL == trig) {
|
||||
if (NULL == trig || NULL == trig->subscribed_data) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_GPR_DATA_CORRUPT);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
@ -212,10 +212,21 @@ int orte_gpr_replica_update_keyval(orte_gpr_replica_segment_t *seg,
|
||||
for (i=0; i < ptr->size; i++) {
|
||||
if (NULL != ptr->addr[i]) {
|
||||
iptr = (orte_gpr_replica_itagval_t*)ptr->addr[i];
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr_replica_delete_itagval(seg, cptr, iptr))) {
|
||||
/* release the data storage */
|
||||
i = iptr->index;
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr, iptr,
|
||||
ORTE_GPR_REPLICA_ENTRY_CHANGED |
|
||||
ORTE_GPR_REPLICA_ENTRY_CHG_FRM))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* MUST DO THE RELEASE AFTER RECORDING THE ACTION SO THAT
|
||||
* THE OBJECT DOESN'T ACTUALLY LEAVE UNTIL THE ACTION IS PROCESSED
|
||||
*/
|
||||
OBJ_RELEASE(iptr);
|
||||
/* remove the entry from the container's itagval array */
|
||||
orte_pointer_array_set_item(cptr->itagvals, i, NULL);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -226,7 +237,9 @@ int orte_gpr_replica_update_keyval(orte_gpr_replica_segment_t *seg,
|
||||
}
|
||||
|
||||
/* record that we did this */
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr, iptr, ORTE_GPR_REPLICA_ENTRY_CHANGED))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr, iptr,
|
||||
ORTE_GPR_REPLICA_ENTRY_CHANGED |
|
||||
ORTE_GPR_REPLICA_ENTRY_CHG_TO))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
@ -202,8 +202,8 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg)
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
/* check if notifier is on this subscription - if so, check it,
|
||||
* but ONLY if NOTIFY_START is NOT set
|
||||
/* check if notifier is on this subscription - if so, check to see
|
||||
* if it has fired, but ONLY if NOTIFY_START is NOT set
|
||||
*/
|
||||
if ((ORTE_GPR_NOTIFY_ANY & trig[i]->action) &
|
||||
(ORTE_GPR_TRIG_NOTIFY_START & trig[i]->action)) {
|
||||
|
@ -67,12 +67,12 @@ typedef uint8_t orte_gpr_replica_addr_mode_t;
|
||||
|
||||
/* define a few action flags for trigger evaluation
|
||||
*/
|
||||
#define ORTE_GPR_REPLICA_NO_ACTION (int8_t) 0
|
||||
#define ORTE_GPR_REPLICA_ENTRY_ADDED (int8_t) 1
|
||||
#define ORTE_GPR_REPLICA_ENTRY_DELETED (int8_t) 2
|
||||
#define ORTE_GPR_REPLICA_ENTRY_CHANGED (int8_t) 3
|
||||
#define ORTE_GPR_REPLICA_ENTRY_CHG_TO (int8_t) 4
|
||||
#define ORTE_GPR_REPLICA_ENTRY_CHG_FRM (int8_t) 5
|
||||
#define ORTE_GPR_REPLICA_NO_ACTION (int8_t) 0x00
|
||||
#define ORTE_GPR_REPLICA_ENTRY_ADDED (int8_t) 0x01
|
||||
#define ORTE_GPR_REPLICA_ENTRY_DELETED (int8_t) 0x02
|
||||
#define ORTE_GPR_REPLICA_ENTRY_CHANGED (int8_t) 0x04
|
||||
#define ORTE_GPR_REPLICA_ENTRY_CHG_TO (int8_t) 0x08
|
||||
#define ORTE_GPR_REPLICA_ENTRY_CHG_FRM (int8_t) 0x10
|
||||
|
||||
|
||||
typedef int8_t orte_gpr_replica_action_t;
|
||||
|
@ -483,6 +483,32 @@ int main(int argc, char **argv)
|
||||
}
|
||||
free(values);
|
||||
|
||||
fprintf(stderr, "overwrite a bunch of values with one\n");
|
||||
val = OBJ_NEW(orte_gpr_value_t);
|
||||
val->addr_mode = ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_XAND;
|
||||
val->cnt = 1;
|
||||
val->segment = strdup("test-put-segment");
|
||||
val->num_tokens = 5;
|
||||
val->tokens = (char**)malloc(val->num_tokens * sizeof(char*));
|
||||
for (i=0; i < 5; i++) {
|
||||
asprintf(&(val->tokens[i]), "multi-dum-dum-%lu", (unsigned long) i);
|
||||
}
|
||||
val->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
|
||||
val->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
|
||||
(val->keyvals[0])->key = strdup("stupid-value-next-one");
|
||||
(val->keyvals[0])->type = ORTE_STRING;
|
||||
(val->keyvals[0])->value.strptr = strdup("try-string-value");
|
||||
if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) {
|
||||
fprintf(test_out, "gpr_test: put multiple copies of one keyval in a container failed with error code %s\n",
|
||||
ORTE_ERROR_NAME(rc));
|
||||
test_failure("gpr_test: put multiple copies of one keyval in a container failed");
|
||||
test_finalize();
|
||||
return rc;
|
||||
}
|
||||
OBJ_RELEASE(val);
|
||||
|
||||
gpr_module->dump_all(0);
|
||||
|
||||
fprintf(stderr, "now finalize and see if all memory cleared\n");
|
||||
test_component_close(&handle);
|
||||
orte_dps_close();
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user