1
1

Checkpoint the first step in re-enabling the notification for subscriptions that monitor value changes. Added a new array that stores the actions each time the registry is called via a function that modifies its values. Updated the dump function to output the action records.

This commit was SVN r5995.
Этот коммит содержится в:
Ralph Castain 2005-06-08 19:40:38 +00:00
родитель 63b382ec6c
Коммит ba7673a83f
14 изменённых файлов: 358 добавлений и 86 удалений

Просмотреть файл

@ -81,7 +81,7 @@ int orte_gpr_replica_increment_value(orte_gpr_value_t *value)
if (ORTE_SUCCESS == rc) { if (ORTE_SUCCESS == rc) {
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_VALUE_INCREMENTED))) { (rc = orte_gpr_replica_check_subscriptions(seg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc; return rc;
@ -143,7 +143,7 @@ int orte_gpr_replica_decrement_value(orte_gpr_value_t *value)
if (ORTE_SUCCESS == rc) { if (ORTE_SUCCESS == rc) {
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_VALUE_DECREMENTED))) { (rc = orte_gpr_replica_check_subscriptions(seg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc; return rc;

Просмотреть файл

@ -114,7 +114,7 @@ int orte_gpr_replica_delete_entries(orte_gpr_addr_mode_t addr_mode,
key_itags, num_keys); key_itags, num_keys);
if (ORTE_SUCCESS == rc) { if (ORTE_SUCCESS == rc) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_ENTRY_DELETED))) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscriptions(seg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
} }

Просмотреть файл

@ -36,7 +36,6 @@ int orte_gpr_replica_put(size_t cnt, orte_gpr_value_t **values)
{ {
int rc = ORTE_SUCCESS; int rc = ORTE_SUCCESS;
size_t i, j; size_t i, j;
int8_t action_taken;
orte_gpr_value_t *val; orte_gpr_value_t *val;
orte_gpr_replica_segment_t *seg=NULL; orte_gpr_replica_segment_t *seg=NULL;
orte_gpr_replica_itag_t *itags=NULL; orte_gpr_replica_itag_t *itags=NULL;
@ -63,12 +62,14 @@ int orte_gpr_replica_put(size_t cnt, orte_gpr_value_t **values)
for (j=0; j < val->cnt; j++) { for (j=0; j < val->cnt; j++) {
if (NULL == (val->keyvals[j])->key) { if (NULL == (val->keyvals[j])->key) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return ORTE_ERR_BAD_PARAM; return ORTE_ERR_BAD_PARAM;
} }
} }
/* find the segment */ /* find the segment */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_seg(&seg, true, val->segment))) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_find_seg(&seg, true, val->segment))) {
ORTE_ERROR_LOG(rc);
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc; return rc;
} }
@ -76,16 +77,17 @@ int orte_gpr_replica_put(size_t cnt, orte_gpr_value_t **values)
/* convert tokens to array of itags */ /* convert tokens to array of itags */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_itag_list(&itags, seg, if (ORTE_SUCCESS != (rc = orte_gpr_replica_get_itag_list(&itags, seg,
val->tokens, &(val->num_tokens)))) { val->tokens, &(val->num_tokens)))) {
ORTE_ERROR_LOG(rc);
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex); OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc; return rc;
} }
if (ORTE_SUCCESS != (rc = orte_gpr_replica_put_fn(val->addr_mode, seg, itags, val->num_tokens, if (ORTE_SUCCESS != (rc = orte_gpr_replica_put_fn(val->addr_mode, seg, itags, val->num_tokens,
val->cnt, val->keyvals, &action_taken))) { val->cnt, val->keyvals))) {
goto CLEANUP; goto CLEANUP;
} }
if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscriptions(seg, action_taken))) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_subscriptions(seg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto CLEANUP; goto CLEANUP;
} }

Просмотреть файл

@ -78,7 +78,7 @@ int orte_gpr_replica_recv_increment_value_cmd(orte_buffer_t *cmd, orte_buffer_t
if (ORTE_SUCCESS == ret) { if (ORTE_SUCCESS == ret) {
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_VALUE_INCREMENTED))) { (rc = orte_gpr_replica_check_subscriptions(seg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
@ -141,7 +141,7 @@ int orte_gpr_replica_recv_decrement_value_cmd(orte_buffer_t *cmd, orte_buffer_t
if (ORTE_SUCCESS == ret) { if (ORTE_SUCCESS == ret) {
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_VALUE_DECREMENTED))) { (rc = orte_gpr_replica_check_subscriptions(seg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }

Просмотреть файл

@ -157,7 +157,7 @@ int orte_gpr_replica_recv_delete_entries_cmd(orte_buffer_t *buffer, orte_buffer_
key_itags, num_keys); key_itags, num_keys);
if (ORTE_SUCCESS == ret) { if (ORTE_SUCCESS == ret) {
orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_ENTRY_DELETED); orte_gpr_replica_check_subscriptions(seg);
} }

Просмотреть файл

@ -37,7 +37,6 @@ int orte_gpr_replica_recv_put_cmd(orte_buffer_t *buffer, orte_buffer_t *answer)
orte_gpr_replica_segment_t *seg=NULL; orte_gpr_replica_segment_t *seg=NULL;
orte_gpr_replica_itag_t *itags=NULL; orte_gpr_replica_itag_t *itags=NULL;
orte_data_type_t type; orte_data_type_t type;
int8_t action_taken=0;
int rc, ret; int rc, ret;
size_t i=0, cnt; size_t i=0, cnt;
@ -92,14 +91,14 @@ int orte_gpr_replica_recv_put_cmd(orte_buffer_t *buffer, orte_buffer_t *answer)
} }
if (ORTE_SUCCESS != (ret = orte_gpr_replica_put_fn(val->addr_mode, seg, itags, if (ORTE_SUCCESS != (ret = orte_gpr_replica_put_fn(val->addr_mode, seg, itags,
val->num_tokens, val->cnt, val->keyvals, &action_taken))) { val->num_tokens, val->cnt, val->keyvals))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
goto RETURN_ERROR; goto RETURN_ERROR;
} }
if (ORTE_SUCCESS == ret) { if (ORTE_SUCCESS == ret) {
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_gpr_replica_check_subscriptions(seg, action_taken))) { (rc = orte_gpr_replica_check_subscriptions(seg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }

Просмотреть файл

@ -60,6 +60,10 @@ int orte_gpr_replica_dump_all_fn(orte_buffer_t *buffer)
return rc; return rc;
} }
if (ORTE_SUCCESS != (rc = orte_gpr_replica_dump_callbacks_fn(buffer))) {
return rc;
}
rc = orte_gpr_replica_dump_segments_fn(buffer); rc = orte_gpr_replica_dump_segments_fn(buffer);
return rc; return rc;
@ -165,7 +169,9 @@ int orte_gpr_replica_dump_callbacks_fn(orte_buffer_t *buffer)
ompi_list_item_t *item; ompi_list_item_t *item;
orte_gpr_replica_callbacks_t *cb; orte_gpr_replica_callbacks_t *cb;
orte_gpr_replica_notify_msg_list_t *msg; orte_gpr_replica_notify_msg_list_t *msg;
char *tmp_out; orte_gpr_replica_action_taken_t **action;
orte_gpr_replica_itag_t *itaglist;
char *tmp_out, *token;
size_t i, j, k; size_t i, j, k;
tmp_out = (char*)malloc(1000); tmp_out = (char*)malloc(1000);
@ -180,51 +186,131 @@ int orte_gpr_replica_dump_callbacks_fn(orte_buffer_t *buffer)
if (0 >= (k= ompi_list_get_size(&(orte_gpr_replica.callbacks)))) { if (0 >= (k= ompi_list_get_size(&(orte_gpr_replica.callbacks)))) {
sprintf(tmp_out, "--- None registered at this time ---"); sprintf(tmp_out, "--- None registered at this time ---");
orte_gpr_replica_dump_load_string(buffer, &tmp_out); orte_gpr_replica_dump_load_string(buffer, &tmp_out);
return ORTE_SUCCESS;
} else { } else {
sprintf(tmp_out, "--- %lu callback(s) registered at this time", sprintf(tmp_out, "--- %lu callback(s) registered at this time",
(unsigned long) k); (unsigned long) k);
orte_gpr_replica_dump_load_string(buffer, &tmp_out); orte_gpr_replica_dump_load_string(buffer, &tmp_out);
i=0;
for (cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_first(&(orte_gpr_replica.callbacks));
cb != (orte_gpr_replica_callbacks_t*)ompi_list_get_end(&(orte_gpr_replica.callbacks));
cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_next(cb)) {
if (NULL == cb) {
sprintf(tmp_out, "\n\t--- BAD CALLBACK POINTER %lu ---",
(unsigned long) i);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
return ORTE_SUCCESS;
}
sprintf(tmp_out, "\nInfo for callback %lu", (unsigned long) i);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
if (NULL == cb->requestor) {
sprintf(tmp_out, "Local requestor");
} else {
sprintf(tmp_out, "Requestor: [%lu,%lu,%lu]",
ORTE_NAME_ARGS(cb->requestor));
}
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
j = ompi_list_get_size(&(cb->messages));
sprintf(tmp_out, "Num messages: %lu", (unsigned long) j);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
for (k=0, item = ompi_list_get_first(&(cb->messages));
item != ompi_list_get_end(&(cb->messages));
item = ompi_list_get_next(item), k++) {
msg = (orte_gpr_replica_notify_msg_list_t*)item;
sprintf(tmp_out, "\n\nInfo for message %lu sending %lu "
" data objects to notifier id %lu",
(unsigned long) k, (unsigned long) (msg->message)->cnt,
(unsigned long) (msg->message)->idtag);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
orte_gpr_base_dump_notify_msg(buffer, msg->message);
}
i++;
}
} }
i=0;
for (cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_first(&(orte_gpr_replica.callbacks));
cb != (orte_gpr_replica_callbacks_t*)ompi_list_get_end(&(orte_gpr_replica.callbacks));
cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_next(cb)) {
if (NULL == cb) {
sprintf(tmp_out, "\n\t--- BAD CALLBACK POINTER %lu ---",
(unsigned long) i);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
return ORTE_SUCCESS;
}
sprintf(tmp_out, "\nInfo for callback %lu", (unsigned long) i);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
if (NULL == cb->requestor) {
sprintf(tmp_out, "Local requestor");
} else {
sprintf(tmp_out, "Requestor: [%lu,%lu,%lu]",
ORTE_NAME_ARGS(cb->requestor));
}
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
j = ompi_list_get_size(&(cb->messages));
sprintf(tmp_out, "Num messages: %lu", (unsigned long) j);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
for (k=0, item = ompi_list_get_first(&(cb->messages));
item != ompi_list_get_end(&(cb->messages));
item = ompi_list_get_next(item), k++) {
msg = (orte_gpr_replica_notify_msg_list_t*)item;
sprintf(tmp_out, "\n\nInfo for message %lu sending %lu "
" data objects to notifier id %lu",
(unsigned long) k, (unsigned long) (msg->message)->cnt,
(unsigned long) (msg->message)->idtag);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
orte_gpr_base_dump_notify_msg(buffer, msg->message);
}
i++;
}
sprintf(tmp_out, "\n"); sprintf(tmp_out, "\n");
orte_gpr_replica_dump_load_string(buffer, &tmp_out); orte_gpr_replica_dump_load_string(buffer, &tmp_out);
i = (orte_gpr_replica_globals.acted_upon)->size - (orte_gpr_replica_globals.acted_upon)->number_free;
if (0 < i) {
sprintf(tmp_out, "\nDUMP OF GPR ACTION RECORDS\n");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
action = (orte_gpr_replica_action_taken_t**)orte_gpr_replica_globals.acted_upon->addr;
for (i=0; i < (orte_gpr_replica_globals.acted_upon)->size; i++) {
if (NULL != action[i]) {
if (NULL != action[i]->seg) {
sprintf(tmp_out, "Action Taken on Segment: %s", action[i]->seg->name);
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
} else {
sprintf(tmp_out, "Action Taken on NULL Segment");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
if (NULL != action[i]->cptr) {
sprintf(tmp_out, "\tContainer Tokens:");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
/* reverse lookup tokens and print them */
itaglist = action[i]->cptr->itags;
for (k=0; k < action[i]->cptr->num_itags; k++) {
if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup(
&token, action[i]->seg, itaglist[k])) {
sprintf(tmp_out, "\t\titag num %lu"
": No entry found for itag %lu",
(unsigned long) k,
(unsigned long) itaglist[k]);
} else {
sprintf(tmp_out, "\t\titag num %lu: itag %lu\tToken: %s",
(unsigned long) k,
(unsigned long) itaglist[k], token);
free(token);
}
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
} else {
sprintf(tmp_out, "\tNULL Container");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
if (NULL != action[i]->iptr) {
if (ORTE_GPR_REPLICA_ENTRY_ADDED == action[i]->action) {
sprintf(tmp_out, "\n\tKeyval ADDED:");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
} else if (ORTE_GPR_REPLICA_ENTRY_DELETED == action[i]->action) {
sprintf(tmp_out, "\n\tKeyval DELETED:");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
} else if (ORTE_GPR_REPLICA_ENTRY_CHANGED == action[i]->action) {
sprintf(tmp_out, "\n\tKeyval CHANGED:");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
} else if (ORTE_GPR_REPLICA_ENTRY_CHG_TO == action[i]->action) {
sprintf(tmp_out, "\n\tKeyval CHANGED TO:");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
} else if (ORTE_GPR_REPLICA_ENTRY_CHG_FRM == action[i]->action) {
sprintf(tmp_out, "\n\tKeyval CHANGED FROM:");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
if (ORTE_SUCCESS != orte_gpr_replica_dict_reverse_lookup(
&token, action[i]->seg, action[i]->iptr->itag)) {
sprintf(tmp_out, "\n\t\tNo entry found for itag %lu",
(unsigned long) action[i]->iptr->itag);
} else {
sprintf(tmp_out, "\n\t\titag %lu\tKey: %s",
(unsigned long) action[i]->iptr->itag, token);
free(token);
}
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
orte_gpr_replica_dump_itagval_value(buffer, action[i]->iptr);
} else {
sprintf(tmp_out, "\n\tNULL Keyval");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
}
}
} else {
sprintf(tmp_out, "\nNO GPR ACTION RECORDS STORED\n");
orte_gpr_replica_dump_load_string(buffer, &tmp_out);
}
free(tmp_out); free(tmp_out);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -87,8 +87,7 @@ int orte_gpr_replica_cleanup_proc_fn(orte_process_name_t *proc);
int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode,
orte_gpr_replica_segment_t *seg, orte_gpr_replica_segment_t *seg,
orte_gpr_replica_itag_t *token_itags, size_t num_tokens, orte_gpr_replica_itag_t *token_itags, size_t num_tokens,
size_t cnt, orte_gpr_keyval_t **keyvals, size_t cnt, orte_gpr_keyval_t **keyvals);
int8_t *action_taken);
int orte_gpr_replica_put_nb_fn(orte_gpr_addr_mode_t addr_mode, int orte_gpr_replica_put_nb_fn(orte_gpr_addr_mode_t addr_mode,
orte_gpr_replica_segment_t *seg, orte_gpr_replica_segment_t *seg,
@ -257,7 +256,15 @@ void orte_gpr_replica_dump_itagval_value(orte_buffer_t *buffer,
/* /*
* Trigger Operations * Trigger Operations
*/ */
int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg, int8_t action_taken); int orte_gpr_replica_record_action(orte_gpr_replica_segment_t *seg,
orte_gpr_replica_container_t *cptr,
orte_gpr_replica_itagval_t *iptr,
orte_gpr_replica_action_t action);
int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg);
int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig,
orte_gpr_replica_subscribed_data_t *sub);
int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig); int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig);

Просмотреть файл

@ -124,8 +124,7 @@ OBJ_CLASS_INSTANCE(
int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode,
orte_gpr_replica_segment_t *seg, orte_gpr_replica_segment_t *seg,
orte_gpr_replica_itag_t *token_itags, size_t num_tokens, orte_gpr_replica_itag_t *token_itags, size_t num_tokens,
size_t cnt, orte_gpr_keyval_t **keyvals, size_t cnt, orte_gpr_keyval_t **keyvals)
int8_t *action_taken)
{ {
orte_gpr_replica_container_t **cptr, *cptr2; orte_gpr_replica_container_t **cptr, *cptr2;
orte_gpr_replica_itag_t itag; orte_gpr_replica_itag_t itag;
@ -150,8 +149,8 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode,
} }
} }
/* initialize action */ /* initialize storage for actions taken */
*action_taken = 0; orte_pointer_array_clear(orte_gpr_replica_globals.acted_upon);
/* extract the token address mode and overwrite permissions */ /* extract the token address mode and overwrite permissions */
overwrite = false; overwrite = false;
@ -189,14 +188,19 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode,
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
/* record that we did this */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr2, iptr, ORTE_GPR_REPLICA_ENTRY_ADDED))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} }
*action_taken = ORTE_GPR_REPLICA_ENTRY_ADDED;
} else { /* otherwise, go through list of containers. For each one, } else { /* otherwise, go through list of containers. For each one,
see if entry already exists in container - overwrite if allowed */ see if entry already exists in container - overwrite if allowed */
cptr = (orte_gpr_replica_container_t**)(orte_gpr_replica_globals.srch_cptr)->addr; cptr = (orte_gpr_replica_container_t**)(orte_gpr_replica_globals.srch_cptr)->addr;
for (j=0; j < (orte_gpr_replica_globals.srch_cptr)->size; j++) { for (j=0; j < (orte_gpr_replica_globals.srch_cptr)->size; j++) {
if (NULL != cptr[j]) { if (NULL != cptr[j]) {
for (i=0; i < cnt; i++) { for (i=0; i < cnt; i++) { /* for each provided keyval */
if (ORTE_SUCCESS == orte_gpr_replica_create_itag(&itag, seg, keyvals[i]->key) && if (ORTE_SUCCESS == orte_gpr_replica_create_itag(&itag, seg, keyvals[i]->key) &&
ORTE_SUCCESS == orte_gpr_replica_search_container(&num_found, ORTE_SUCCESS == orte_gpr_replica_search_container(&num_found,
ORTE_GPR_REPLICA_OR, ORTE_GPR_REPLICA_OR,
@ -209,18 +213,30 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode,
if (ORTE_SUCCESS != (rc = orte_gpr_replica_update_keyval(seg, cptr[j], keyvals[i]))) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_update_keyval(seg, cptr[j], keyvals[i]))) {
return rc; return rc;
} }
*action_taken = *action_taken | ORTE_GPR_REPLICA_ENTRY_CHANGED; /* action is recorded in update function - don't do it here */
/* turn off the overwrite flag so that any subsequent entries are
* added - otherwise, only the last value provided would be retained!
*/
overwrite = false;
} else { } else {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr[j], keyvals[i]))) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr[j], keyvals[i]))) {
return rc; return rc;
} }
*action_taken = *action_taken | ORTE_GPR_REPLICA_ENTRY_ADDED; /* record that we did this */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr[j], iptr, ORTE_GPR_REPLICA_ENTRY_CHANGED))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} }
} else { /* new key - add to container */ } else { /* new key - add to container */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr[j], keyvals[i]))) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr[j], keyvals[i]))) {
return rc; return rc;
} }
*action_taken = *action_taken | ORTE_GPR_REPLICA_ENTRY_ADDED; /* record that we did this */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr[j], iptr, ORTE_GPR_REPLICA_ENTRY_ADDED))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} }
} }
} }

Просмотреть файл

@ -172,24 +172,20 @@ int orte_gpr_replica_delete_itagval(orte_gpr_replica_segment_t *seg,
orte_gpr_replica_itagval_t *iptr) orte_gpr_replica_itagval_t *iptr)
{ {
size_t i; size_t i;
int rc;
/* see if anyone cares that this value is deleted */ /* record that we are going to do this
/* trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr); * NOTE: it is important that we make the record BEFORE doing the release.
* The record_action function will do a RETAIN on the object so it
for (i=0; i < (orte_gpr_replica.triggers)->size; i++) { * doesn't actually get released until we check subscriptions to see
if (NULL != trig[i] && * if someone wanted to be notified if/when this object was released
ORTE_GPR_REPLICA_ENTRY_DELETED & trig[i]->action) { */
sptr = (orte_gpr_replica_subscribed_data_t**)((trig[i]->subscribed_data)->addr); if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr, iptr,
for (k=0; k < (trig[i]->subscribed_data)->size; k++) { ORTE_GPR_REPLICA_ENTRY_DELETED))) {
if (NULL != sptr[k]) { ORTE_ERROR_LOG(rc);
if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig[i]))) { return rc;
ORTE_ERROR_LOG(rc); }
return rc;
}
}
*/
/* release the data storage */ /* release the data storage */
i = iptr->index; i = iptr->index;
OBJ_RELEASE(iptr); OBJ_RELEASE(iptr);
@ -229,6 +225,13 @@ int orte_gpr_replica_update_keyval(orte_gpr_replica_segment_t *seg,
return rc; return rc;
} }
/* record that we did this */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr, iptr, ORTE_GPR_REPLICA_ENTRY_CHANGED))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* update any storage locations that were pointing to these items */ /* update any storage locations that were pointing to these items */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_update_storage_locations(iptr))) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_update_storage_locations(iptr))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -289,7 +289,7 @@ int orte_gpr_replica_subscribe_fn(orte_gpr_notify_action_t action, size_t num_su
/* check the triggers on this segment before leaving to see if they are already fired */ /* check the triggers on this segment before leaving to see if they are already fired */
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=
(rc = orte_gpr_replica_check_subscriptions(seg, ORTE_GPR_REPLICA_NO_ACTION))) { (rc = orte_gpr_replica_check_subscriptions(seg))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }

Просмотреть файл

@ -112,6 +112,44 @@ orte_gpr_replica_remove_notify_request(orte_gpr_notify_id_t local_idtag,
} }
int orte_gpr_replica_record_action(orte_gpr_replica_segment_t *seg,
orte_gpr_replica_container_t *cptr,
orte_gpr_replica_itagval_t *iptr,
orte_gpr_replica_action_t action)
{
orte_gpr_replica_action_taken_t *new;
size_t index;
int rc;
new = OBJ_NEW(orte_gpr_replica_action_taken_t);
if (NULL == new) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
new->action = action;
/* store pointers to the affected itagval */
new->seg = seg;
new->cptr = cptr;
new->iptr = iptr;
/* "retain" ALL of the respective objects so they can't disappear until
* after we process the actions
*/
OBJ_RETAIN(seg);
OBJ_RETAIN(cptr);
OBJ_RETAIN(iptr);
/* add the new action record to the array */
if (0 > (rc = orte_pointer_array_add(&index, orte_gpr_replica_globals.acted_upon, new))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
return ORTE_SUCCESS;
}
int orte_gpr_replica_update_storage_locations(orte_gpr_replica_itagval_t *new_iptr) int orte_gpr_replica_update_storage_locations(orte_gpr_replica_itagval_t *new_iptr)
{ {
orte_gpr_replica_triggers_t **trig; orte_gpr_replica_triggers_t **trig;
@ -147,11 +185,11 @@ int orte_gpr_replica_update_storage_locations(orte_gpr_replica_itagval_t *new_ip
} }
int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg, int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg)
orte_gpr_replica_action_t action_taken)
{ {
orte_gpr_replica_triggers_t **trig; orte_gpr_replica_triggers_t **trig;
size_t i; orte_gpr_replica_subscribed_data_t **sub;
size_t i, j;
int rc; int rc;
trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr); trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr);
@ -164,6 +202,28 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg,
return rc; return rc;
} }
} }
/* check if notifier is on this subscription - if so, check it,
* but ONLY if NOTIFY_START is NOT set
*/
if ((ORTE_GPR_NOTIFY_ANY & trig[i]->action) &
(ORTE_GPR_TRIG_NOTIFY_START & trig[i]->action)) {
/* for notifier subscriptions, the data structures
* in the trigger define the data being monitored. First,
* check to see if the segment that was modified matches
* any of the data being monitored. If so, then we call the
* check_notify function to see if we should fire
*/
sub = (orte_gpr_replica_subscribed_data_t**)
(trig[i]->subscribed_data)->addr;
for (j=0; j < (trig[i]->subscribed_data)->size; j++) {
if ((NULL != sub[j]) && (seg == sub[j]->seg)) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_notify(trig[i], sub[j]))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
}
} /* if notify */
} /* if trig not NULL */ } /* if trig not NULL */
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -257,6 +317,50 @@ FIRED:
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
/*
* When entering this function, we know two things: (a) something was modified
* on the segment specified in the subscription, and (b) notifications are
* active. What we now need to determine is whether or not any of the data
* objects pointed to by the subscription were involved in the change. These
* objects could just be a container - e.g., the subscriber might want to know
* if anything gets added to a container - or could be a container plus one or
* more keys when the subscriber wants to know if a specific value gets changed.
*/
int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig,
orte_gpr_replica_subscribed_data_t *sub)
{
orte_gpr_replica_action_taken_t **ptr;
size_t i;
ptr = (orte_gpr_replica_action_taken_t**)((orte_gpr_replica_globals.acted_upon)->addr);
for (i=0; i < (orte_gpr_replica_globals.acted_upon)->size; i++) {
if (NULL != ptr[i]) {
if ((trig->action & ORTE_GPR_NOTIFY_ADD_ENTRY) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED)) {
/* send back the added entry */
} else if ((trig->action & ORTE_GPR_NOTIFY_DEL_ENTRY) &
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED)){
/* send back the deleted entry */
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED)) {
/* see if the acted_upon data was the target of the subscription */
/* send back the new data */
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO)) {
/* ptr contains the "new" data - check to see if it matches
* the subscription. if so, send back the new data
*/
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM)) {
/* ptr contains the "old" data - check to see if it matches
* the subscription. if so, send back the new data
*/
}
}
}
return ORTE_SUCCESS;
}
int orte_gpr_replica_purge_subscriptions(orte_process_name_t *proc) int orte_gpr_replica_purge_subscriptions(orte_process_name_t *proc)
{ {

Просмотреть файл

@ -73,9 +73,7 @@ typedef uint8_t orte_gpr_replica_addr_mode_t;
#define ORTE_GPR_REPLICA_ENTRY_CHANGED (int8_t) 3 #define ORTE_GPR_REPLICA_ENTRY_CHANGED (int8_t) 3
#define ORTE_GPR_REPLICA_ENTRY_CHG_TO (int8_t) 4 #define ORTE_GPR_REPLICA_ENTRY_CHG_TO (int8_t) 4
#define ORTE_GPR_REPLICA_ENTRY_CHG_FRM (int8_t) 5 #define ORTE_GPR_REPLICA_ENTRY_CHG_FRM (int8_t) 5
#define ORTE_GPR_REPLICA_VALUE_INCREMENTED (int8_t) 6
#define ORTE_GPR_REPLICA_VALUE_DECREMENTED (int8_t) 7
#define ORTE_GPR_REPLICA_ENTRY_UPDATED (int8_t) 8
typedef int8_t orte_gpr_replica_action_t; typedef int8_t orte_gpr_replica_action_t;
@ -93,6 +91,7 @@ typedef struct {
int compound_cmd_waiting; int compound_cmd_waiting;
orte_pointer_array_t *srch_cptr; orte_pointer_array_t *srch_cptr;
orte_pointer_array_t *srch_ival; orte_pointer_array_t *srch_ival;
orte_pointer_array_t *acted_upon;
orte_bitmap_t srch_itag; orte_bitmap_t srch_itag;
} orte_gpr_replica_globals_t; } orte_gpr_replica_globals_t;
@ -257,6 +256,24 @@ typedef struct orte_gpr_replica_triggers_t orte_gpr_replica_triggers_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(orte_gpr_replica_triggers_t); OMPI_DECLSPEC OBJ_CLASS_DECLARATION(orte_gpr_replica_triggers_t);
/*
* Action taken object - used to track what action was taken against what
* registry object during the course of a registry request. For example, if
* a PUT modifies an existing registry entry, then we store a pointer to that
* entry and a flag indicating that it was modified. This info is required for
* processing notification subscriptions.
*/
typedef struct {
ompi_object_t super; /**< Make this an object */
orte_gpr_replica_action_t action;
orte_gpr_replica_segment_t *seg;
orte_gpr_replica_container_t *cptr;
orte_gpr_replica_itagval_t *iptr;
} orte_gpr_replica_action_taken_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(orte_gpr_replica_action_taken_t);
/* /*
* Notify message list objects - used to track individual messages going to * Notify message list objects - used to track individual messages going to
* the same recipient * the same recipient

Просмотреть файл

@ -386,6 +386,35 @@ OBJ_CLASS_INSTANCE(
orte_gpr_replica_trigger_destructor); /* destructor */ orte_gpr_replica_trigger_destructor); /* destructor */
/* ACTION_TAKEN */
/* constructor - used to initialize state of action_take instance */
static void orte_gpr_replica_action_taken_construct(orte_gpr_replica_action_taken_t* ptr)
{
ptr->action = ORTE_GPR_REPLICA_NO_ACTION;
ptr->seg = NULL;
ptr->cptr = NULL;
ptr->iptr = NULL;
}
/* destructor - used to free any resources held by instance */
static void orte_gpr_replica_action_taken_destructor(orte_gpr_replica_action_taken_t* ptr)
{
/* since we did a "RETAIN" on the objects pointed to by this object,
* we need to "RELEASE" them to indicate we are done with them
*/
if (NULL != ptr->seg) OBJ_RELEASE(ptr->seg);
if (NULL != ptr->cptr) OBJ_RELEASE(ptr->cptr);
if (NULL != ptr->iptr) OBJ_RELEASE(ptr->iptr);
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
orte_gpr_replica_action_taken_t, /* type name */
ompi_object_t, /* parent "class" name */
orte_gpr_replica_action_taken_construct, /* constructor */
orte_gpr_replica_action_taken_destructor); /* destructor */
/* NOTIFY MSG LIST */ /* NOTIFY MSG LIST */
/* constructor - used to initialize state of notify msg list instance */ /* constructor - used to initialize state of notify msg list instance */
static void orte_gpr_replica_notify_msg_list_construct(orte_gpr_replica_notify_msg_list_t* msg) static void orte_gpr_replica_notify_msg_list_construct(orte_gpr_replica_notify_msg_list_t* msg)
@ -582,8 +611,13 @@ orte_gpr_base_module_t *orte_gpr_replica_init(bool *allow_multi_user_threads, bo
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return NULL; return NULL;
} }
if (ORTE_SUCCESS != (rc = orte_pointer_array_init(&(orte_gpr_replica_globals.acted_upon),
100, orte_gpr_replica_globals.max_size, 100))) {
ORTE_ERROR_LOG(rc);
return NULL;
}
if (ORTE_SUCCESS != (rc = orte_bitmap_init (&(orte_gpr_replica_globals.srch_itag), 64))) { if (ORTE_SUCCESS != (rc = orte_bitmap_init(&(orte_gpr_replica_globals.srch_itag), 64))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return NULL; return NULL;
} }
@ -669,6 +703,10 @@ int orte_gpr_replica_finalize(void)
OBJ_RELEASE(orte_gpr_replica_globals.srch_ival); OBJ_RELEASE(orte_gpr_replica_globals.srch_ival);
} }
if (NULL != orte_gpr_replica_globals.acted_upon) {
OBJ_RELEASE(orte_gpr_replica_globals.acted_upon);
}
/* All done */ /* All done */
if (orte_gpr_replica_globals.isolate) { if (orte_gpr_replica_globals.isolate) {
return ORTE_SUCCESS; return ORTE_SUCCESS;