From ba7673a83f682ba9bc1d052832ec3e4f4be3ec8b Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 8 Jun 2005 19:40:38 +0000 Subject: [PATCH] Checkpoint the first step in re-enabling the notification for subscriptions that monitor value changes. Added a new array that stores the actions each time the registry is called via a function that modifies its values. Updated the dump function to output the action records. This commit was SVN r5995. --- .../gpr_replica_arithmetic_ops_api.c | 4 +- .../api_layer/gpr_replica_del_index_api.c | 2 +- .../api_layer/gpr_replica_put_get_api.c | 8 +- .../gpr_replica_arithmetic_ops_cm.c | 4 +- .../communications/gpr_replica_del_index_cm.c | 2 +- .../communications/gpr_replica_put_get_cm.c | 5 +- .../functional_layer/gpr_replica_dump_fn.c | 162 ++++++++++++++---- .../replica/functional_layer/gpr_replica_fn.h | 13 +- .../functional_layer/gpr_replica_put_get_fn.c | 34 +++- .../functional_layer/gpr_replica_segment_fn.c | 35 ++-- .../gpr_replica_subscribe_fn.c | 2 +- .../gpr_replica_trig_ops_fn.c | 110 +++++++++++- src/mca/gpr/replica/gpr_replica.h | 23 ++- src/mca/gpr/replica/gpr_replica_component.c | 40 ++++- 14 files changed, 358 insertions(+), 86 deletions(-) diff --git a/src/mca/gpr/replica/api_layer/gpr_replica_arithmetic_ops_api.c b/src/mca/gpr/replica/api_layer/gpr_replica_arithmetic_ops_api.c index 0e3dac9d79..290a6d8488 100644 --- a/src/mca/gpr/replica/api_layer/gpr_replica_arithmetic_ops_api.c +++ b/src/mca/gpr/replica/api_layer/gpr_replica_arithmetic_ops_api.c @@ -81,7 +81,7 @@ int orte_gpr_replica_increment_value(orte_gpr_value_t *value) if (ORTE_SUCCESS == rc) { if (ORTE_SUCCESS != - (rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_VALUE_INCREMENTED))) { + (rc = orte_gpr_replica_check_subscriptions(seg))) { ORTE_ERROR_LOG(rc); OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); return rc; @@ -143,7 +143,7 @@ int orte_gpr_replica_decrement_value(orte_gpr_value_t *value) if (ORTE_SUCCESS == rc) { if (ORTE_SUCCESS != - (rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_VALUE_DECREMENTED))) { + (rc = orte_gpr_replica_check_subscriptions(seg))) { ORTE_ERROR_LOG(rc); OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); return rc; diff --git a/src/mca/gpr/replica/api_layer/gpr_replica_del_index_api.c b/src/mca/gpr/replica/api_layer/gpr_replica_del_index_api.c index ca15158a65..44a5a1266a 100644 --- a/src/mca/gpr/replica/api_layer/gpr_replica_del_index_api.c +++ b/src/mca/gpr/replica/api_layer/gpr_replica_del_index_api.c @@ -114,7 +114,7 @@ int orte_gpr_replica_delete_entries(orte_gpr_addr_mode_t addr_mode, key_itags, num_keys); if (ORTE_SUCCESS == rc) { - if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_ENTRY_DELETED))) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscriptions(seg))) { ORTE_ERROR_LOG(rc); } } diff --git a/src/mca/gpr/replica/api_layer/gpr_replica_put_get_api.c b/src/mca/gpr/replica/api_layer/gpr_replica_put_get_api.c index 95c37fb907..6606f7727d 100644 --- a/src/mca/gpr/replica/api_layer/gpr_replica_put_get_api.c +++ b/src/mca/gpr/replica/api_layer/gpr_replica_put_get_api.c @@ -36,7 +36,6 @@ int orte_gpr_replica_put(size_t cnt, orte_gpr_value_t **values) { int rc = ORTE_SUCCESS; size_t i, j; - int8_t action_taken; orte_gpr_value_t *val; orte_gpr_replica_segment_t *seg=NULL; orte_gpr_replica_itag_t *itags=NULL; @@ -63,12 +62,14 @@ int orte_gpr_replica_put(size_t cnt, orte_gpr_value_t **values) for (j=0; j < val->cnt; j++) { if (NULL == (val->keyvals[j])->key) { ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); + OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); return ORTE_ERR_BAD_PARAM; } } /* find the segment */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_seg(&seg, true, val->segment))) { + ORTE_ERROR_LOG(rc); OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); return rc; } @@ -76,16 +77,17 @@ int orte_gpr_replica_put(size_t cnt, orte_gpr_value_t **values) /* convert tokens to array of itags */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_itag_list(&itags, seg, val->tokens, &(val->num_tokens)))) { + ORTE_ERROR_LOG(rc); OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); return rc; } if (ORTE_SUCCESS != (rc = orte_gpr_replica_put_fn(val->addr_mode, seg, itags, val->num_tokens, - val->cnt, val->keyvals, &action_taken))) { + val->cnt, val->keyvals))) { goto CLEANUP; } - if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscriptions(seg, action_taken))) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscriptions(seg))) { ORTE_ERROR_LOG(rc); goto CLEANUP; } diff --git a/src/mca/gpr/replica/communications/gpr_replica_arithmetic_ops_cm.c b/src/mca/gpr/replica/communications/gpr_replica_arithmetic_ops_cm.c index a3d9b82ba9..f37d39b269 100644 --- a/src/mca/gpr/replica/communications/gpr_replica_arithmetic_ops_cm.c +++ b/src/mca/gpr/replica/communications/gpr_replica_arithmetic_ops_cm.c @@ -78,7 +78,7 @@ int orte_gpr_replica_recv_increment_value_cmd(orte_buffer_t *cmd, orte_buffer_t if (ORTE_SUCCESS == ret) { if (ORTE_SUCCESS != - (rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_VALUE_INCREMENTED))) { + (rc = orte_gpr_replica_check_subscriptions(seg))) { ORTE_ERROR_LOG(rc); return rc; } @@ -141,7 +141,7 @@ int orte_gpr_replica_recv_decrement_value_cmd(orte_buffer_t *cmd, orte_buffer_t if (ORTE_SUCCESS == ret) { if (ORTE_SUCCESS != - (rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_VALUE_DECREMENTED))) { + (rc = orte_gpr_replica_check_subscriptions(seg))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/src/mca/gpr/replica/communications/gpr_replica_del_index_cm.c b/src/mca/gpr/replica/communications/gpr_replica_del_index_cm.c index 8f6bbee13b..894698e47d 100644 --- a/src/mca/gpr/replica/communications/gpr_replica_del_index_cm.c +++ b/src/mca/gpr/replica/communications/gpr_replica_del_index_cm.c @@ -157,7 +157,7 @@ int orte_gpr_replica_recv_delete_entries_cmd(orte_buffer_t *buffer, orte_buffer_ key_itags, num_keys); if (ORTE_SUCCESS == ret) { - orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_ENTRY_DELETED); + orte_gpr_replica_check_subscriptions(seg); } diff --git a/src/mca/gpr/replica/communications/gpr_replica_put_get_cm.c b/src/mca/gpr/replica/communications/gpr_replica_put_get_cm.c index c05d2df25f..12aea5e500 100644 --- a/src/mca/gpr/replica/communications/gpr_replica_put_get_cm.c +++ b/src/mca/gpr/replica/communications/gpr_replica_put_get_cm.c @@ -37,7 +37,6 @@ int orte_gpr_replica_recv_put_cmd(orte_buffer_t *buffer, orte_buffer_t *answer) orte_gpr_replica_segment_t *seg=NULL; orte_gpr_replica_itag_t *itags=NULL; orte_data_type_t type; - int8_t action_taken=0; int rc, ret; size_t i=0, cnt; @@ -92,14 +91,14 @@ int orte_gpr_replica_recv_put_cmd(orte_buffer_t *buffer, orte_buffer_t *answer) } if (ORTE_SUCCESS != (ret = orte_gpr_replica_put_fn(val->addr_mode, seg, itags, - val->num_tokens, val->cnt, val->keyvals, &action_taken))) { + val->num_tokens, val->cnt, val->keyvals))) { ORTE_ERROR_LOG(ret); goto RETURN_ERROR; } if (ORTE_SUCCESS == ret) { if (ORTE_SUCCESS != - (rc = orte_gpr_replica_check_subscriptions(seg, action_taken))) { + (rc = orte_gpr_replica_check_subscriptions(seg))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c index 280bc0a0f1..b9fdd5ac6c 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_dump_fn.c @@ -60,6 +60,10 @@ int orte_gpr_replica_dump_all_fn(orte_buffer_t *buffer) return rc; } + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_callbacks_fn(buffer))) { + return rc; + } + rc = orte_gpr_replica_dump_segments_fn(buffer); return rc; @@ -165,7 +169,9 @@ int orte_gpr_replica_dump_callbacks_fn(orte_buffer_t *buffer) ompi_list_item_t *item; orte_gpr_replica_callbacks_t *cb; orte_gpr_replica_notify_msg_list_t *msg; - char *tmp_out; + orte_gpr_replica_action_taken_t **action; + orte_gpr_replica_itag_t *itaglist; + char *tmp_out, *token; size_t i, j, k; tmp_out = (char*)malloc(1000); @@ -180,51 +186,131 @@ int orte_gpr_replica_dump_callbacks_fn(orte_buffer_t *buffer) if (0 >= (k= ompi_list_get_size(&(orte_gpr_replica.callbacks)))) { sprintf(tmp_out, "--- None registered at this time ---"); orte_gpr_replica_dump_load_string(buffer, &tmp_out); - return ORTE_SUCCESS; } else { sprintf(tmp_out, "--- %lu callback(s) registered at this time", (unsigned long) k); orte_gpr_replica_dump_load_string(buffer, &tmp_out); + + i=0; + for (cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_first(&(orte_gpr_replica.callbacks)); + cb != (orte_gpr_replica_callbacks_t*)ompi_list_get_end(&(orte_gpr_replica.callbacks)); + cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_next(cb)) { + if (NULL == cb) { + sprintf(tmp_out, "\n\t--- BAD CALLBACK POINTER %lu ---", + (unsigned long) i); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + return ORTE_SUCCESS; + } + sprintf(tmp_out, "\nInfo for callback %lu", (unsigned long) i); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + if (NULL == cb->requestor) { + sprintf(tmp_out, "Local requestor"); + } else { + sprintf(tmp_out, "Requestor: [%lu,%lu,%lu]", + ORTE_NAME_ARGS(cb->requestor)); + } + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + j = ompi_list_get_size(&(cb->messages)); + sprintf(tmp_out, "Num messages: %lu", (unsigned long) j); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + for (k=0, item = ompi_list_get_first(&(cb->messages)); + item != ompi_list_get_end(&(cb->messages)); + item = ompi_list_get_next(item), k++) { + msg = (orte_gpr_replica_notify_msg_list_t*)item; + sprintf(tmp_out, "\n\nInfo for message %lu sending %lu " + " data objects to notifier id %lu", + (unsigned long) k, (unsigned long) (msg->message)->cnt, + (unsigned long) (msg->message)->idtag); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + orte_gpr_base_dump_notify_msg(buffer, msg->message); + } + i++; + } } - - i=0; - for (cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_first(&(orte_gpr_replica.callbacks)); - cb != (orte_gpr_replica_callbacks_t*)ompi_list_get_end(&(orte_gpr_replica.callbacks)); - cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_next(cb)) { - if (NULL == cb) { - sprintf(tmp_out, "\n\t--- BAD CALLBACK POINTER %lu ---", - (unsigned long) i); - orte_gpr_replica_dump_load_string(buffer, &tmp_out); - return ORTE_SUCCESS; - } - sprintf(tmp_out, "\nInfo for callback %lu", (unsigned long) i); - orte_gpr_replica_dump_load_string(buffer, &tmp_out); - if (NULL == cb->requestor) { - sprintf(tmp_out, "Local requestor"); - } else { - sprintf(tmp_out, "Requestor: [%lu,%lu,%lu]", - ORTE_NAME_ARGS(cb->requestor)); - } - orte_gpr_replica_dump_load_string(buffer, &tmp_out); - j = ompi_list_get_size(&(cb->messages)); - sprintf(tmp_out, "Num messages: %lu", (unsigned long) j); - orte_gpr_replica_dump_load_string(buffer, &tmp_out); - for (k=0, item = ompi_list_get_first(&(cb->messages)); - item != ompi_list_get_end(&(cb->messages)); - item = ompi_list_get_next(item), k++) { - msg = (orte_gpr_replica_notify_msg_list_t*)item; - sprintf(tmp_out, "\n\nInfo for message %lu sending %lu " - " data objects to notifier id %lu", - (unsigned long) k, (unsigned long) (msg->message)->cnt, - (unsigned long) (msg->message)->idtag); - orte_gpr_replica_dump_load_string(buffer, &tmp_out); - orte_gpr_base_dump_notify_msg(buffer, msg->message); - } - i++; - } sprintf(tmp_out, "\n"); orte_gpr_replica_dump_load_string(buffer, &tmp_out); + + i = (orte_gpr_replica_globals.acted_upon)->size - (orte_gpr_replica_globals.acted_upon)->number_free; + if (0 < i) { + sprintf(tmp_out, "\nDUMP OF GPR ACTION RECORDS\n"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + + action = (orte_gpr_replica_action_taken_t**)orte_gpr_replica_globals.acted_upon->addr; + for (i=0; i < (orte_gpr_replica_globals.acted_upon)->size; i++) { + if (NULL != action[i]) { + if (NULL != action[i]->seg) { + sprintf(tmp_out, "Action Taken on Segment: %s", action[i]->seg->name); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } else { + sprintf(tmp_out, "Action Taken on NULL Segment"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } + if (NULL != action[i]->cptr) { + sprintf(tmp_out, "\tContainer Tokens:"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + + /* reverse lookup tokens and print them */ + itaglist = action[i]->cptr->itags; + for (k=0; k < action[i]->cptr->num_itags; k++) { + if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup( + &token, action[i]->seg, itaglist[k])) { + sprintf(tmp_out, "\t\titag num %lu" + ": No entry found for itag %lu", + (unsigned long) k, + (unsigned long) itaglist[k]); + } else { + sprintf(tmp_out, "\t\titag num %lu: itag %lu\tToken: %s", + (unsigned long) k, + (unsigned long) itaglist[k], token); + free(token); + } + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } + } else { + sprintf(tmp_out, "\tNULL Container"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } + if (NULL != action[i]->iptr) { + if (ORTE_GPR_REPLICA_ENTRY_ADDED == action[i]->action) { + sprintf(tmp_out, "\n\tKeyval ADDED:"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } else if (ORTE_GPR_REPLICA_ENTRY_DELETED == action[i]->action) { + sprintf(tmp_out, "\n\tKeyval DELETED:"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } else if (ORTE_GPR_REPLICA_ENTRY_CHANGED == action[i]->action) { + sprintf(tmp_out, "\n\tKeyval CHANGED:"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } else if (ORTE_GPR_REPLICA_ENTRY_CHG_TO == action[i]->action) { + sprintf(tmp_out, "\n\tKeyval CHANGED TO:"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } else if (ORTE_GPR_REPLICA_ENTRY_CHG_FRM == action[i]->action) { + sprintf(tmp_out, "\n\tKeyval CHANGED FROM:"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } + + if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup( + &token, action[i]->seg, action[i]->iptr->itag)) { + sprintf(tmp_out, "\n\t\tNo entry found for itag %lu", + (unsigned long) action[i]->iptr->itag); + } else { + sprintf(tmp_out, "\n\t\titag %lu\tKey: %s", + (unsigned long) action[i]->iptr->itag, token); + free(token); + } + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + orte_gpr_replica_dump_itagval_value(buffer, action[i]->iptr); + } else { + sprintf(tmp_out, "\n\tNULL Keyval"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } + } + } + } else { + sprintf(tmp_out, "\nNO GPR ACTION RECORDS STORED\n"); + orte_gpr_replica_dump_load_string(buffer, &tmp_out); + } + free(tmp_out); return ORTE_SUCCESS; } diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_fn.h b/src/mca/gpr/replica/functional_layer/gpr_replica_fn.h index 882ccae731..cfafde95b9 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_fn.h +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_fn.h @@ -87,8 +87,7 @@ int orte_gpr_replica_cleanup_proc_fn(orte_process_name_t *proc); int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, orte_gpr_replica_segment_t *seg, orte_gpr_replica_itag_t *token_itags, size_t num_tokens, - size_t cnt, orte_gpr_keyval_t **keyvals, - int8_t *action_taken); + size_t cnt, orte_gpr_keyval_t **keyvals); int orte_gpr_replica_put_nb_fn(orte_gpr_addr_mode_t addr_mode, orte_gpr_replica_segment_t *seg, @@ -257,7 +256,15 @@ void orte_gpr_replica_dump_itagval_value(orte_buffer_t *buffer, /* * Trigger Operations */ -int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg, int8_t action_taken); +int orte_gpr_replica_record_action(orte_gpr_replica_segment_t *seg, + orte_gpr_replica_container_t *cptr, + orte_gpr_replica_itagval_t *iptr, + orte_gpr_replica_action_t action); + +int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg); + +int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig, + orte_gpr_replica_subscribed_data_t *sub); int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig); diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c index 71989c9711..6e7b88473f 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c @@ -124,8 +124,7 @@ OBJ_CLASS_INSTANCE( int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, orte_gpr_replica_segment_t *seg, orte_gpr_replica_itag_t *token_itags, size_t num_tokens, - size_t cnt, orte_gpr_keyval_t **keyvals, - int8_t *action_taken) + size_t cnt, orte_gpr_keyval_t **keyvals) { orte_gpr_replica_container_t **cptr, *cptr2; orte_gpr_replica_itag_t itag; @@ -150,8 +149,8 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, } } - /* initialize action */ - *action_taken = 0; + /* initialize storage for actions taken */ + orte_pointer_array_clear(orte_gpr_replica_globals.acted_upon); /* extract the token address mode and overwrite permissions */ overwrite = false; @@ -189,14 +188,19 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, ORTE_ERROR_LOG(rc); return rc; } + /* record that we did this */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr2, iptr, ORTE_GPR_REPLICA_ENTRY_ADDED))) { + ORTE_ERROR_LOG(rc); + return rc; + } } - *action_taken = ORTE_GPR_REPLICA_ENTRY_ADDED; + } else { /* otherwise, go through list of containers. For each one, see if entry already exists in container - overwrite if allowed */ cptr = (orte_gpr_replica_container_t**)(orte_gpr_replica_globals.srch_cptr)->addr; for (j=0; j < (orte_gpr_replica_globals.srch_cptr)->size; j++) { if (NULL != cptr[j]) { - for (i=0; i < cnt; i++) { + for (i=0; i < cnt; i++) { /* for each provided keyval */ if (ORTE_SUCCESS == orte_gpr_replica_create_itag(&itag, seg, keyvals[i]->key) && ORTE_SUCCESS == orte_gpr_replica_search_container(&num_found, ORTE_GPR_REPLICA_OR, @@ -209,18 +213,30 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, if (ORTE_SUCCESS != (rc = orte_gpr_replica_update_keyval(seg, cptr[j], keyvals[i]))) { return rc; } - *action_taken = *action_taken | ORTE_GPR_REPLICA_ENTRY_CHANGED; + /* action is recorded in update function - don't do it here */ + /* turn off the overwrite flag so that any subsequent entries are + * added - otherwise, only the last value provided would be retained! + */ + overwrite = false; } else { if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr[j], keyvals[i]))) { return rc; } - *action_taken = *action_taken | ORTE_GPR_REPLICA_ENTRY_ADDED; + /* record that we did this */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr[j], iptr, ORTE_GPR_REPLICA_ENTRY_CHANGED))) { + ORTE_ERROR_LOG(rc); + return rc; + } } } else { /* new key - add to container */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr[j], keyvals[i]))) { return rc; } - *action_taken = *action_taken | ORTE_GPR_REPLICA_ENTRY_ADDED; + /* record that we did this */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr[j], iptr, ORTE_GPR_REPLICA_ENTRY_ADDED))) { + ORTE_ERROR_LOG(rc); + return rc; + } } } } diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c index e21101312c..3a88d10329 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c @@ -172,24 +172,20 @@ int orte_gpr_replica_delete_itagval(orte_gpr_replica_segment_t *seg, orte_gpr_replica_itagval_t *iptr) { size_t i; + int rc; - /* see if anyone cares that this value is deleted */ -/* trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr); - - for (i=0; i < (orte_gpr_replica.triggers)->size; i++) { - if (NULL != trig[i] && - ORTE_GPR_REPLICA_ENTRY_DELETED & trig[i]->action) { - sptr = (orte_gpr_replica_subscribed_data_t**)((trig[i]->subscribed_data)->addr); - for (k=0; k < (trig[i]->subscribed_data)->size; k++) { - if (NULL != sptr[k]) { - if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig[i]))) { - ORTE_ERROR_LOG(rc); - return rc; - } - } + /* record that we are going to do this + * NOTE: it is important that we make the record BEFORE doing the release. + * The record_action function will do a RETAIN on the object so it + * doesn't actually get released until we check subscriptions to see + * if someone wanted to be notified if/when this object was released + */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr, iptr, + ORTE_GPR_REPLICA_ENTRY_DELETED))) { + ORTE_ERROR_LOG(rc); + return rc; + } -*/ - /* release the data storage */ i = iptr->index; OBJ_RELEASE(iptr); @@ -229,6 +225,13 @@ int orte_gpr_replica_update_keyval(orte_gpr_replica_segment_t *seg, return rc; } + /* record that we did this */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr, iptr, ORTE_GPR_REPLICA_ENTRY_CHANGED))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* update any storage locations that were pointing to these items */ if (ORTE_SUCCESS != (rc = orte_gpr_replica_update_storage_locations(iptr))) { ORTE_ERROR_LOG(rc); diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_subscribe_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_subscribe_fn.c index 13b40ee9a4..4d4c482f4f 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_subscribe_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_subscribe_fn.c @@ -289,7 +289,7 @@ int orte_gpr_replica_subscribe_fn(orte_gpr_notify_action_t action, size_t num_su /* check the triggers on this segment before leaving to see if they are already fired */ if (ORTE_SUCCESS != - (rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_NO_ACTION))) { + (rc = orte_gpr_replica_check_subscriptions(seg))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c index f890984b03..2967f2a056 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c @@ -112,6 +112,44 @@ orte_gpr_replica_remove_notify_request(orte_gpr_notify_id_t local_idtag, } +int orte_gpr_replica_record_action(orte_gpr_replica_segment_t *seg, + orte_gpr_replica_container_t *cptr, + orte_gpr_replica_itagval_t *iptr, + orte_gpr_replica_action_t action) +{ + orte_gpr_replica_action_taken_t *new; + size_t index; + int rc; + + new = OBJ_NEW(orte_gpr_replica_action_taken_t); + if (NULL == new) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + new->action = action; + + /* store pointers to the affected itagval */ + new->seg = seg; + new->cptr = cptr; + new->iptr = iptr; + + /* "retain" ALL of the respective objects so they can't disappear until + * after we process the actions + */ + OBJ_RETAIN(seg); + OBJ_RETAIN(cptr); + OBJ_RETAIN(iptr); + + /* add the new action record to the array */ + if (0 > (rc = orte_pointer_array_add(&index, orte_gpr_replica_globals.acted_upon, new))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + + return ORTE_SUCCESS; +} + + int orte_gpr_replica_update_storage_locations(orte_gpr_replica_itagval_t *new_iptr) { orte_gpr_replica_triggers_t **trig; @@ -147,11 +185,11 @@ int orte_gpr_replica_update_storage_locations(orte_gpr_replica_itagval_t *new_ip } -int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg, - orte_gpr_replica_action_t action_taken) +int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg) { orte_gpr_replica_triggers_t **trig; - size_t i; + orte_gpr_replica_subscribed_data_t **sub; + size_t i, j; int rc; trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr); @@ -164,6 +202,28 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg, return rc; } } + /* check if notifier is on this subscription - if so, check it, + * but ONLY if NOTIFY_START is NOT set + */ + if ((ORTE_GPR_NOTIFY_ANY & trig[i]->action) & + (ORTE_GPR_TRIG_NOTIFY_START & trig[i]->action)) { + /* for notifier subscriptions, the data structures + * in the trigger define the data being monitored. First, + * check to see if the segment that was modified matches + * any of the data being monitored. If so, then we call the + * check_notify function to see if we should fire + */ + sub = (orte_gpr_replica_subscribed_data_t**) + (trig[i]->subscribed_data)->addr; + for (j=0; j < (trig[i]->subscribed_data)->size; j++) { + if ((NULL != sub[j]) && (seg == sub[j]->seg)) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_notify(trig[i], sub[j]))) { + ORTE_ERROR_LOG(rc); + return rc; + } + } + } + } /* if notify */ } /* if trig not NULL */ } return ORTE_SUCCESS; @@ -257,6 +317,50 @@ FIRED: return ORTE_SUCCESS; } +/* + * When entering this function, we know two things: (a) something was modified + * on the segment specified in the subscription, and (b) notifications are + * active. What we now need to determine is whether or not any of the data + * objects pointed to by the subscription were involved in the change. These + * objects could just be a container - e.g., the subscriber might want to know + * if anything gets added to a container - or could be a container plus one or + * more keys when the subscriber wants to know if a specific value gets changed. + */ +int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig, + orte_gpr_replica_subscribed_data_t *sub) +{ + orte_gpr_replica_action_taken_t **ptr; + size_t i; + + ptr = (orte_gpr_replica_action_taken_t**)((orte_gpr_replica_globals.acted_upon)->addr); + for (i=0; i < (orte_gpr_replica_globals.acted_upon)->size; i++) { + if (NULL != ptr[i]) { + if ((trig->action & ORTE_GPR_NOTIFY_ADD_ENTRY) && + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED)) { + /* send back the added entry */ + } else if ((trig->action & ORTE_GPR_NOTIFY_DEL_ENTRY) & + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED)){ + /* send back the deleted entry */ + } else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) && + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED)) { + /* see if the acted_upon data was the target of the subscription */ + /* send back the new data */ + } else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) && + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO)) { + /* ptr contains the "new" data - check to see if it matches + * the subscription. if so, send back the new data + */ + } else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) && + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM)) { + /* ptr contains the "old" data - check to see if it matches + * the subscription. if so, send back the new data + */ + } + } + } + return ORTE_SUCCESS; +} + int orte_gpr_replica_purge_subscriptions(orte_process_name_t *proc) { diff --git a/src/mca/gpr/replica/gpr_replica.h b/src/mca/gpr/replica/gpr_replica.h index 00c7f89f0e..f601479b18 100644 --- a/src/mca/gpr/replica/gpr_replica.h +++ b/src/mca/gpr/replica/gpr_replica.h @@ -73,9 +73,7 @@ typedef uint8_t orte_gpr_replica_addr_mode_t; #define ORTE_GPR_REPLICA_ENTRY_CHANGED (int8_t) 3 #define ORTE_GPR_REPLICA_ENTRY_CHG_TO (int8_t) 4 #define ORTE_GPR_REPLICA_ENTRY_CHG_FRM (int8_t) 5 -#define ORTE_GPR_REPLICA_VALUE_INCREMENTED (int8_t) 6 -#define ORTE_GPR_REPLICA_VALUE_DECREMENTED (int8_t) 7 -#define ORTE_GPR_REPLICA_ENTRY_UPDATED (int8_t) 8 + typedef int8_t orte_gpr_replica_action_t; @@ -93,6 +91,7 @@ typedef struct { int compound_cmd_waiting; orte_pointer_array_t *srch_cptr; orte_pointer_array_t *srch_ival; + orte_pointer_array_t *acted_upon; orte_bitmap_t srch_itag; } orte_gpr_replica_globals_t; @@ -257,6 +256,24 @@ typedef struct orte_gpr_replica_triggers_t orte_gpr_replica_triggers_t; OMPI_DECLSPEC OBJ_CLASS_DECLARATION(orte_gpr_replica_triggers_t); +/* + * Action taken object - used to track what action was taken against what + * registry object during the course of a registry request. For example, if + * a PUT modifies an existing registry entry, then we store a pointer to that + * entry and a flag indicating that it was modified. This info is required for + * processing notification subscriptions. + */ +typedef struct { + ompi_object_t super; /**< Make this an object */ + orte_gpr_replica_action_t action; + orte_gpr_replica_segment_t *seg; + orte_gpr_replica_container_t *cptr; + orte_gpr_replica_itagval_t *iptr; +} orte_gpr_replica_action_taken_t; + +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(orte_gpr_replica_action_taken_t); + + /* * Notify message list objects - used to track individual messages going to * the same recipient diff --git a/src/mca/gpr/replica/gpr_replica_component.c b/src/mca/gpr/replica/gpr_replica_component.c index a733e29b4c..17c324d6b0 100644 --- a/src/mca/gpr/replica/gpr_replica_component.c +++ b/src/mca/gpr/replica/gpr_replica_component.c @@ -386,6 +386,35 @@ OBJ_CLASS_INSTANCE( orte_gpr_replica_trigger_destructor); /* destructor */ +/* ACTION_TAKEN */ +/* constructor - used to initialize state of action_take instance */ +static void orte_gpr_replica_action_taken_construct(orte_gpr_replica_action_taken_t* ptr) +{ + ptr->action = ORTE_GPR_REPLICA_NO_ACTION; + ptr->seg = NULL; + ptr->cptr = NULL; + ptr->iptr = NULL; +} + +/* destructor - used to free any resources held by instance */ +static void orte_gpr_replica_action_taken_destructor(orte_gpr_replica_action_taken_t* ptr) +{ + /* since we did a "RETAIN" on the objects pointed to by this object, + * we need to "RELEASE" them to indicate we are done with them + */ + if (NULL != ptr->seg) OBJ_RELEASE(ptr->seg); + if (NULL != ptr->cptr) OBJ_RELEASE(ptr->cptr); + if (NULL != ptr->iptr) OBJ_RELEASE(ptr->iptr); +} + +/* define instance of ompi_class_t */ +OBJ_CLASS_INSTANCE( + orte_gpr_replica_action_taken_t, /* type name */ + ompi_object_t, /* parent "class" name */ + orte_gpr_replica_action_taken_construct, /* constructor */ + orte_gpr_replica_action_taken_destructor); /* destructor */ + + /* NOTIFY MSG LIST */ /* constructor - used to initialize state of notify msg list instance */ static void orte_gpr_replica_notify_msg_list_construct(orte_gpr_replica_notify_msg_list_t* msg) @@ -582,8 +611,13 @@ orte_gpr_base_module_t *orte_gpr_replica_init(bool *allow_multi_user_threads, bo ORTE_ERROR_LOG(rc); return NULL; } + if (ORTE_SUCCESS != (rc = orte_pointer_array_init(&(orte_gpr_replica_globals.acted_upon), + 100, orte_gpr_replica_globals.max_size, 100))) { + ORTE_ERROR_LOG(rc); + return NULL; + } - if (ORTE_SUCCESS != (rc = orte_bitmap_init (&(orte_gpr_replica_globals.srch_itag), 64))) { + if (ORTE_SUCCESS != (rc = orte_bitmap_init(&(orte_gpr_replica_globals.srch_itag), 64))) { ORTE_ERROR_LOG(rc); return NULL; } @@ -669,6 +703,10 @@ int orte_gpr_replica_finalize(void) OBJ_RELEASE(orte_gpr_replica_globals.srch_ival); } + if (NULL != orte_gpr_replica_globals.acted_upon) { + OBJ_RELEASE(orte_gpr_replica_globals.acted_upon); + } + /* All done */ if (orte_gpr_replica_globals.isolate) { return ORTE_SUCCESS;