1
1

Checkpoint the notifier work - notify when something is added now works, need to simply turn on the other checks.

Existing code shouldn't see any impacts. Tested on up to 125 processes.

This commit was SVN r6020.
Этот коммит содержится в:
Ralph Castain 2005-06-09 20:37:25 +00:00
родитель c848b9fb90
Коммит 1c57ae20b0
7 изменённых файлов: 519 добавлений и 219 удалений

Просмотреть файл

@ -266,12 +266,17 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg);
int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig,
orte_gpr_replica_subscribed_data_t *sub);
bool orte_gpr_replica_check_notify_matches(orte_gpr_replica_subscribed_data_t *sub,
orte_gpr_replica_action_taken_t *ptr);
int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig);
int orte_gpr_replica_update_storage_locations(orte_gpr_replica_itagval_t *new_iptr);
int orte_gpr_replica_construct_notify_message(orte_gpr_notify_message_t **msg,
orte_gpr_replica_triggers_t *trig);
int orte_gpr_replica_construct_notify_message(orte_gpr_notify_message_t *msg,
orte_gpr_replica_triggers_t *trig,
orte_gpr_replica_subscribed_data_t *sub,
orte_gpr_value_t *value);
int
orte_gpr_replica_enter_notify_request(orte_gpr_notify_id_t *local_idtag,
@ -283,14 +288,20 @@ int
orte_gpr_replica_remove_notify_request(orte_gpr_notify_id_t local_idtag,
orte_gpr_notify_id_t *remote_idtag);
int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig);
int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig,
orte_gpr_replica_subscribed_data_t *sub,
orte_gpr_value_t *value);
int orte_gpr_replica_process_callbacks(void);
int orte_gpr_replica_purge_subscriptions(orte_process_name_t *proc);
int orte_gpr_replica_add_values(orte_gpr_notify_data_t **data,
orte_gpr_replica_subscribed_data_t *sptr);
int orte_gpr_replica_add_values_from_registry(orte_gpr_notify_message_t *msg,
orte_gpr_replica_subscribed_data_t *sptr);
int orte_gpr_replica_store_value_in_msg(orte_gpr_notify_id_t cb_num,
orte_gpr_notify_message_t *msg,
orte_gpr_value_t *value);
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -121,7 +121,9 @@ CLEANUP:
int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig)
int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig,
orte_gpr_replica_subscribed_data_t *sub,
orte_gpr_value_t *value)
{
orte_gpr_replica_callbacks_t *cb;
orte_gpr_replica_notify_msg_list_t *msg;
@ -137,7 +139,9 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig)
msg != (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_end(&(cb->messages));
msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_next(msg)) {
if ((msg->message)->idtag == trig->index) { /* same trigger - add to it */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) {
if (ORTE_SUCCESS != (rc =
orte_gpr_replica_construct_notify_message(msg->message,
trig, sub, value))) {
ORTE_ERROR_LOG(rc);
}
return rc;
@ -149,7 +153,7 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig)
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
ompi_list_append(&cb->messages, &msg->item);
ompi_list_append(&cb->messages, &msg->item);
/* construct the message */
msg->message = OBJ_NEW(orte_gpr_notify_message_t);
@ -164,7 +168,9 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig)
(msg->message)->idtag = trig->remote_idtag;
}
if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) {
if (ORTE_SUCCESS != (rc =
orte_gpr_replica_construct_notify_message(msg->message,
trig, sub, value))) {
ORTE_ERROR_LOG(rc);
}
return rc;
@ -221,7 +227,9 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig)
ompi_list_append(&cb->messages, &msg->item);
if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) {
if (ORTE_SUCCESS != (rc =
orte_gpr_replica_construct_notify_message(msg->message,
trig, sub, value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cb);
return rc;
@ -236,70 +244,34 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig)
}
int orte_gpr_replica_construct_notify_message(orte_gpr_notify_message_t **msg,
orte_gpr_replica_triggers_t *trig)
int orte_gpr_replica_construct_notify_message(orte_gpr_notify_message_t *msg,
orte_gpr_replica_triggers_t *trig,
orte_gpr_replica_subscribed_data_t *sub,
orte_gpr_value_t *value)
{
int rc=ORTE_SUCCESS;
orte_gpr_notify_data_t **data;
orte_gpr_replica_subscribed_data_t **sptr;
size_t i, k;
size_t i;
/* if we don't have data, just return */
if (0 >= trig->num_subscribed_data) {
if (0 >= trig->num_subscribed_data && NULL == value) {
return ORTE_SUCCESS;
}
/* check to see if value provided - if so, use it */
if (NULL != value) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_store_value_in_msg(sub->index,
msg, value))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* otherwise, go get values off of registry and add them to the data object */
sptr = (orte_gpr_replica_subscribed_data_t**)((trig->subscribed_data)->addr);
for (i=0; i < (trig->subscribed_data)->size; i++) {
if (NULL != sptr[i]) {
if (NULL == (*msg)->data) { /* first data item on the message */
(*msg)->data = (orte_gpr_notify_data_t**)malloc(sizeof(orte_gpr_notify_data_t*));
if (NULL == (*msg)->data) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
data = &((*msg)->data[0]); /* need to assign location */
(*msg)->cnt = 1;
} else {
/* check to see if this data is going to the same callback as
* any prior data on the message. if so, then we add those keyvals
* to the existing data structure. if not, then we realloc to
* establish a new data structure and store the data there
*/
for (k=0; k < (*msg)->cnt; k++) {
if ((*msg)->data[k]->cb_num == sptr[i]->index) { /* going to the same place */
data = &((*msg)->data[k]);
goto MOVEON;
}
}
/* no prior matching data found, so add another data location to the message */
(*msg)->data = (orte_gpr_notify_data_t **) realloc((*msg)->data, ((*msg)->cnt + 1)*sizeof(orte_gpr_notify_data_t*));
if (NULL == (*msg)->data) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
data = &((*msg)->data[(*msg)->cnt]);
((*msg)->cnt)++;
}
*data = OBJ_NEW(orte_gpr_notify_data_t);
if (NULL == *data) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* for each data object, store the callback_number, addressing mode, and name
* of the segment this data came from
*/
(*data)->cb_num = sptr[i]->index;
(*data)->addr_mode = sptr[i]->addr_mode;
(*data)->segment = strdup((sptr[i]->seg)->name);
if (NULL == (*data)->segment) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
MOVEON:
/* add the values to the data object */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_values(data, sptr[i]))) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_values_from_registry(msg, sptr[i]))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -310,13 +282,12 @@ MOVEON:
}
int orte_gpr_replica_add_values(orte_gpr_notify_data_t **data,
orte_gpr_replica_subscribed_data_t *sptr)
int orte_gpr_replica_add_values_from_registry(orte_gpr_notify_message_t *msg,
orte_gpr_replica_subscribed_data_t *sptr)
{
size_t i, j, k, n, m, matches, num_tokens, num_keys, cnt;
size_t i, j, num_tokens, num_keys, cnt;
int rc;
orte_gpr_value_t **values = NULL, **data_values;
orte_gpr_keyval_t **kptr;
orte_gpr_value_t **values = NULL;
/* get the data off the registry */
num_tokens = orte_value_array_get_size(&(sptr->tokentags));
@ -356,115 +327,12 @@ int orte_gpr_replica_add_values(orte_gpr_notify_data_t **data,
* where containers match
*/
for (i=0; i < cnt; i++) {
if (NULL == (*data)->values) { /* first value on the structure */
(*data)->values = (orte_gpr_value_t**)malloc(sizeof(orte_gpr_value_t*));
if (NULL == (*data)->values) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
data_values = &((*data)->values[0]); /* need to assign location */
(*data)->cnt = 1;
} else {
/* check to see if this value is from the same container
* as some prior one. if so, then we add those itagvals
* to the existing value structure. if not, then we realloc to
* establish a new value structure and store the data there
*/
for (k=0; k < (*data)->cnt; k++) {
matches = 0;
num_tokens = (*data)->values[k]->num_tokens;
if (num_tokens == values[i]->num_tokens) { /* must have same number or can't match */
for (j=0; j < num_tokens; j++) {
for (m=0; m < num_tokens; m++) {
if (0 == strcmp(((*data)->values[k])->tokens[j], values[i]->tokens[m])) {
matches++;
}
}
if (num_tokens == matches) { /* from same container - just add keyvals to it */
data_values = &((*data)->values[k]);
goto MOVEON;
}
}
}
}
/* no prior matching data found, so add another value location to the object */
(*data)->values = (orte_gpr_value_t**)realloc((*data)->values, ((*data)->cnt + 1)*sizeof(orte_gpr_value_t*));
if (NULL == (*data)->values) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
data_values = &((*data)->values[(*data)->cnt]);
((*data)->cnt)++;
}
*data_values = OBJ_NEW(orte_gpr_value_t);
if (NULL == *data_values) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* record the addressing mode */
(*data_values)->addr_mode = sptr->addr_mode;
/* record the segment these values came from */
(*data_values)->segment = strdup((sptr->seg)->name);
if (NULL == ((*data_values)->segment)) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* record the tokens describing the container */
(*data_values)->num_tokens = values[i]->num_tokens;
(*data_values)->tokens = (char **)malloc(values[i]->num_tokens * sizeof(char*));
if (NULL == (*data_values)->tokens) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (n=0; n < values[i]->num_tokens; n++) {
(*data_values)->tokens[n] = strdup(values[i]->tokens[n]);
if (NULL == (*data_values)->tokens[n]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
}
MOVEON:
/* record the values to be returned */
if (0 < (*data_values)->cnt) { /* already have some data here, so add to the space */
n = (*data_values)->cnt + values[i]->cnt;
(*data_values)->keyvals = (orte_gpr_keyval_t**)
realloc((*data_values)->keyvals, n * sizeof(orte_gpr_keyval_t*));
if (NULL == (*data_values)->keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
kptr = &((*data_values)->keyvals[(*data_values)->cnt]);
(*data_values)->cnt = n;
} else {
(*data_values)->keyvals = (orte_gpr_keyval_t**)malloc(values[i]->cnt * sizeof(orte_gpr_keyval_t*));
if (NULL == (*data_values)->keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
(*data_values)->cnt = values[i]->cnt;
kptr = (*data_values)->keyvals;
}
for (n=0; n < values[i]->cnt; n++) {
kptr[n] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == kptr[n]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
kptr[n]->key = strdup((values[i]->keyvals[n])->key);
if (NULL == kptr[n]->key) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
kptr[n]->type = (values[i]->keyvals[n])->type;
if (ORTE_SUCCESS != (rc = orte_gpr_base_xfer_payload(
&(kptr[n]->value), &((values[i]->keyvals[n])->value),
(values[i]->keyvals[n])->type))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr_replica_store_value_in_msg(sptr->index,
msg, values[i]))) {
ORTE_ERROR_LOG(rc);
for (j=i; j < cnt; j++) OBJ_RELEASE(values[j]);
free(values);
return rc;
}
OBJ_RELEASE(values[i]);
} /* for i */
@ -474,3 +342,177 @@ MOVEON:
return ORTE_SUCCESS;
}
int orte_gpr_replica_store_value_in_msg(orte_gpr_notify_id_t cb_num,
orte_gpr_notify_message_t *msg,
orte_gpr_value_t *value)
{
size_t j, k, n, m, matches, num_tokens;
int rc;
orte_gpr_value_t **data_values;
orte_gpr_keyval_t **kptr;
orte_gpr_notify_data_t **data;
if (NULL == msg->data) { /* first data item on message */
msg->data = (orte_gpr_notify_data_t**)malloc(sizeof(orte_gpr_notify_data_t*));
if (NULL == msg->data) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
data = &(msg->data[0]); /* need to assign location */
msg->cnt = 1;
} else {
/* check to see if this data is going to the same callback as
* any prior data on the message. if so, then we add those keyvals
* to the existing data structure. if not, then we realloc to
* establish a new data structure and store the data there
*/
for (k=0; k < msg->cnt; k++) {
if (msg->data[k]->cb_num == cb_num) { /* going to the same place */
data = &(msg->data[k]);
goto MOVEON;
}
}
/* no prior matching data found, so add another data location to the message */
msg->data = (orte_gpr_notify_data_t **) realloc(msg->data, (msg->cnt + 1)*sizeof(orte_gpr_notify_data_t*));
if (NULL == msg->data) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
data = &(msg->data[msg->cnt]);
(msg->cnt)++;
}
*data = OBJ_NEW(orte_gpr_notify_data_t);
if (NULL == *data) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* store the callback_number, addressing mode, and name
* of the segment this data came from
*/
(*data)->cb_num = cb_num;
(*data)->addr_mode = value->addr_mode;
(*data)->segment = strdup(value->segment);
if (NULL == (*data)->segment) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
MOVEON:
/* add the values to the data object */
if (NULL == (*data)->values) { /* first value on the structure */
(*data)->values = (orte_gpr_value_t**)malloc(sizeof(orte_gpr_value_t*));
if (NULL == (*data)->values) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
data_values = &((*data)->values[0]); /* need to assign location */
(*data)->cnt = 1;
} else {
/* check to see if this value is from the same container
* as some prior one. if so, then we add those itagvals
* to the existing value structure. if not, then we realloc to
* establish a new value structure and store the data there
*/
for (k=0; k < (*data)->cnt; k++) {
matches = 0;
num_tokens = (*data)->values[k]->num_tokens;
if (num_tokens == value->num_tokens) { /* must have same number or can't match */
for (j=0; j < num_tokens; j++) {
for (m=0; m < num_tokens; m++) {
if (0 == strcmp(((*data)->values[k])->tokens[j], value->tokens[m])) {
matches++;
}
}
if (num_tokens == matches) { /* from same container - just add keyvals to it */
data_values = &((*data)->values[k]);
goto MOVEON2;
}
}
}
}
/* no prior matching data found, so add another value location to the object */
(*data)->values = (orte_gpr_value_t**)realloc((*data)->values, ((*data)->cnt + 1)*sizeof(orte_gpr_value_t*));
if (NULL == (*data)->values) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
data_values = &((*data)->values[(*data)->cnt]);
((*data)->cnt)++;
}
*data_values = OBJ_NEW(orte_gpr_value_t);
if (NULL == *data_values) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* record the addressing mode */
(*data_values)->addr_mode = value->addr_mode;
/* record the segment these values came from */
(*data_values)->segment = strdup(value->segment);
if (NULL == ((*data_values)->segment)) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* record the tokens describing the container */
(*data_values)->num_tokens = value->num_tokens;
if (0 < value->num_tokens) { /* could be a wildcard case */
(*data_values)->tokens = (char **)malloc(value->num_tokens * sizeof(char*));
if (NULL == (*data_values)->tokens) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (n=0; n < value->num_tokens; n++) {
(*data_values)->tokens[n] = strdup(value->tokens[n]);
if (NULL == (*data_values)->tokens[n]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
}
}
MOVEON2:
/* record the value to be returned */
if (0 < (*data_values)->cnt) { /* already have some data here, so add to the space */
n = (*data_values)->cnt + value->cnt;
(*data_values)->keyvals = (orte_gpr_keyval_t**)
realloc((*data_values)->keyvals, n * sizeof(orte_gpr_keyval_t*));
if (NULL == (*data_values)->keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
kptr = &((*data_values)->keyvals[(*data_values)->cnt]);
(*data_values)->cnt = n;
} else {
(*data_values)->keyvals = (orte_gpr_keyval_t**)malloc(value->cnt * sizeof(orte_gpr_keyval_t*));
if (NULL == (*data_values)->keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
(*data_values)->cnt = value->cnt;
kptr = (*data_values)->keyvals;
}
for (n=0; n < value->cnt; n++) {
kptr[n] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == kptr[n]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
kptr[n]->key = strdup((value->keyvals[n])->key);
if (NULL == kptr[n]->key) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
kptr[n]->type = (value->keyvals[n])->type;
if (ORTE_SUCCESS != (rc = orte_gpr_base_xfer_payload(
&(kptr[n]->value), &((value->keyvals[n])->value),
(value->keyvals[n])->type))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -151,6 +151,7 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode,
/* initialize storage for actions taken */
orte_pointer_array_clear(orte_gpr_replica_globals.acted_upon);
orte_gpr_replica_globals.num_acted_upon = 0;
/* extract the token address mode and overwrite permissions */
overwrite = false;
@ -389,6 +390,7 @@ int orte_gpr_replica_get_fn(orte_gpr_addr_mode_t addr_mode,
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
(*values)[i]->addr_mode = addr_mode;
(*values)[i]->segment = strdup(seg->name);
(*values)[i]->cnt = ompi_list_get_size(gptr->ival_list);
cptr2 = gptr->cptr;

Просмотреть файл

@ -86,6 +86,7 @@ orte_gpr_replica_enter_notify_request(orte_gpr_notify_id_t *local_idtag,
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
(orte_gpr_replica.num_trigs)++;
*local_idtag = (orte_gpr_notify_id_t)trig->index;
@ -146,6 +147,9 @@ int orte_gpr_replica_record_action(orte_gpr_replica_segment_t *seg,
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* increment the number acted upon */
(orte_gpr_replica_globals.num_acted_upon)++;
return ORTE_SUCCESS;
}
@ -189,12 +193,15 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg)
{
orte_gpr_replica_triggers_t **trig;
orte_gpr_replica_subscribed_data_t **sub;
size_t i, j;
size_t i, j, cntri, cntrj;
int rc;
trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr);
for (i=0; i < (orte_gpr_replica.triggers)->size; i++) {
cntri = 0;
for (i=0; cntri < orte_gpr_replica.num_trigs &&
i < (orte_gpr_replica.triggers)->size; i++) {
if (NULL != trig[i]) {
cntri++;
/* check if trigger is on this subscription - if so, check it */
if (ORTE_GPR_TRIG_ANY & trig[i]->action) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_trig(trig[i]))) {
@ -205,8 +212,8 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg)
/* check if notifier is on this subscription - if so, check to see
* if it has fired, but ONLY if NOTIFY_START is NOT set
*/
if ((ORTE_GPR_NOTIFY_ANY & trig[i]->action) &
(ORTE_GPR_TRIG_NOTIFY_START & trig[i]->action)) {
if ((ORTE_GPR_NOTIFY_ANY & trig[i]->action) &&
!(ORTE_GPR_TRIG_NOTIFY_START & trig[i]->action)) {
/* for notifier subscriptions, the data structures
* in the trigger define the data being monitored. First,
* check to see if the segment that was modified matches
@ -215,11 +222,16 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg)
*/
sub = (orte_gpr_replica_subscribed_data_t**)
(trig[i]->subscribed_data)->addr;
for (j=0; j < (trig[i]->subscribed_data)->size; j++) {
if ((NULL != sub[j]) && (seg == sub[j]->seg)) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_notify(trig[i], sub[j]))) {
ORTE_ERROR_LOG(rc);
return rc;
cntrj = 0;
for (j=0; cntrj < trig[i]->num_subscribed_data &&
j < (trig[i]->subscribed_data)->size; j++) {
if (NULL != sub[j]) {
cntrj++;
if (seg == sub[j]->seg) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_notify(trig[i], sub[j]))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
}
}
@ -235,7 +247,7 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig)
orte_gpr_replica_counter_t **cntr;
orte_gpr_replica_itagval_t *base_value=NULL;
bool first, fire;
size_t i;
size_t i, cntri;
int cmp;
int rc;
@ -243,8 +255,11 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig)
cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr);
first = true;
fire = true;
for (i=0; i < (trig->counters)->size && fire; i++) {
cntri = 0;
for (i=0; cntri < trig->num_counters &&
i < (trig->counters)->size && fire; i++) {
if (NULL != cntr[i]) {
cntri++;
if (first) {
base_value = cntr[i]->iptr;
first = false;
@ -265,7 +280,7 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig)
if (orte_gpr_replica_globals.debug) {
ompi_output(0, "REGISTERING CALLBACK FOR TRIG %d", trig->index);
}
if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig))) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig, NULL, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -276,8 +291,11 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig)
} else if (ORTE_GPR_TRIG_AT_LEVEL & trig->action) { /* see if counters are at a level */
cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr);
fire = true;
for (i=0; i < (trig->counters)->size && fire; i++) {
cntri = 0;
for (i=0; cntri < trig->num_counters &&
i < (trig->counters)->size && fire; i++) {
if (NULL != cntr[i]) {
cntri++;
if (ORTE_SUCCESS != (rc =
orte_gpr_replica_compare_values(&cmp, cntr[i]->iptr,
&(cntr[i]->trigger_level)))) {
@ -290,7 +308,7 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig)
}
}
if (fire) { /* all counters at specified trigger level */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig))) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig, NULL, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -321,44 +339,142 @@ FIRED:
* When entering this function, we know two things: (a) something was modified
* on the segment specified in the subscription, and (b) notifications are
* active. What we now need to determine is whether or not any of the data
* objects pointed to by the subscription were involved in the change. These
* objects could just be a container - e.g., the subscriber might want to know
* objects pointed to by the subscription were involved in the change. The
* subscription could describe a container - e.g., the subscriber might want to know
* if anything gets added to a container - or could be a container plus one or
* more keys when the subscriber wants to know if a specific value gets changed.
* more keys when the subscriber wants to know when a specific value gets changed.
*/
int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig,
orte_gpr_replica_subscribed_data_t *sub)
{
orte_gpr_replica_action_taken_t **ptr;
size_t i;
size_t i, cntr;
orte_gpr_value_t value;
orte_gpr_replica_itag_t *itaglist;
int rc=ORTE_SUCCESS;
OBJ_CONSTRUCT(&value, orte_gpr_value_t);
value.segment = strdup(sub->seg->name);
value.addr_mode = sub->addr_mode;
value.num_tokens = orte_value_array_get_size(&(sub->tokentags));
value.tokens = (char **)malloc(value.num_tokens * sizeof(char*));
if (NULL == value.tokens) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
itaglist = ORTE_VALUE_ARRAY_GET_BASE(&(sub->tokentags), orte_gpr_replica_itag_t);
for (i=0; i < value.num_tokens; i++) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup(
&(value.tokens[i]), sub->seg, itaglist[i]))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
value.cnt = 1;
value.keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
if (NULL == value.keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value.keyvals[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
ptr = (orte_gpr_replica_action_taken_t**)((orte_gpr_replica_globals.acted_upon)->addr);
for (i=0; i < (orte_gpr_replica_globals.acted_upon)->size; i++) {
cntr = 0;
for (i=0; cntr < orte_gpr_replica_globals.num_acted_upon &&
i < (orte_gpr_replica_globals.acted_upon)->size; i++) {
if (NULL != ptr[i]) {
cntr++;
if ((trig->action & ORTE_GPR_NOTIFY_ADD_ENTRY) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED)) {
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED) &&
orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
/* send back the added entry */
if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup(
&((value.keyvals[0])->key), sub->seg,
ptr[i]->iptr->itag))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
(value.keyvals[0])->type = ptr[i]->iptr->type;
if (ORTE_SUCCESS != (rc = orte_gpr_base_xfer_payload(
&((value.keyvals[0])->value), &(ptr[i]->iptr->value),
ptr[i]->iptr->type))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc =
orte_gpr_replica_register_callback(trig, sub, &value))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
} else if ((trig->action & ORTE_GPR_NOTIFY_DEL_ENTRY) &
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED)){
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED) &&
orte_gpr_replica_check_notify_matches(sub, ptr[i])){
/* send back the deleted entry */
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED)) {
/* see if the acted_upon data was the target of the subscription */
/* send back the new data */
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO)) {
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO) &&
orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
/* ptr contains the "new" data - check to see if it matches
* the subscription. if so, send back the new data
*/
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM)) {
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM) &&
orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
/* ptr contains the "old" data - check to see if it matches
* the subscription. if so, send back the new data
*/
} else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) &&
(ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED) &&
orte_gpr_replica_check_notify_matches(sub, ptr[i])) {
/* see if the acted_upon data was the target of the subscription */
/* send back the new data */
}
}
}
return ORTE_SUCCESS;
CLEANUP:
OBJ_DESTRUCT(&value);
return rc;
}
bool orte_gpr_replica_check_notify_matches(orte_gpr_replica_subscribed_data_t *sub,
orte_gpr_replica_action_taken_t *ptr)
{
orte_gpr_replica_addr_mode_t tokmod;
/* when we enter this function, we already know that the segments match.
* Thus, we need to check that we are looking at a matching container
* and matching keyval pattern
*/
/* first, check to see if the containers match */
tokmod = 0x004f & sub->addr_mode;
if (!orte_gpr_replica_check_itag_list(tokmod,
orte_value_array_get_size(&(sub->tokentags)),
ORTE_VALUE_ARRAY_GET_BASE(&(sub->tokentags), orte_gpr_replica_itag_t),
(ptr->cptr)->num_itags,
(ptr->cptr)->itags)) {
/* not this container - return false */
return false;
}
/* next, check to see if this keyval was on the list */
if (orte_gpr_replica_check_itag_list(ORTE_GPR_REPLICA_OR,
orte_value_array_get_size(&(sub->keytags)),
ORTE_VALUE_ARRAY_GET_BASE(&(sub->keytags), orte_gpr_replica_itag_t),
1,
&(ptr->iptr->itag))) {
/* keyval is on list - return true */
return true;
}
/* if we get here, then the keyval was NOT on the list */
return false;
}

Просмотреть файл

@ -92,6 +92,7 @@ typedef struct {
orte_pointer_array_t *srch_cptr;
orte_pointer_array_t *srch_ival;
orte_pointer_array_t *acted_upon;
size_t num_acted_upon;
orte_bitmap_t srch_itag;
} orte_gpr_replica_globals_t;
@ -121,7 +122,9 @@ typedef struct orte_gpr_replica_dict_t orte_gpr_replica_dict_t;
*/
struct orte_gpr_replica_t {
orte_pointer_array_t *segments; /**< Managed array of pointers to segment objects */
size_t num_segs;
orte_pointer_array_t *triggers; /**< Managed array of pointers to triggers */
size_t num_trigs;
ompi_list_t callbacks; /**< List of callbacks to be processed */
};
typedef struct orte_gpr_replica_t orte_gpr_replica_t;

Просмотреть файл

@ -32,7 +32,7 @@ int orte_gpr_replica_find_seg(orte_gpr_replica_segment_t **seg,
{
size_t len;
int rc=ORTE_SUCCESS;
size_t i;
size_t i, cntri;
orte_gpr_replica_segment_t **ptr;
/* initialize to nothing */
@ -42,8 +42,11 @@ int orte_gpr_replica_find_seg(orte_gpr_replica_segment_t **seg,
/* search the registry segments to find which one is being referenced */
ptr = (orte_gpr_replica_segment_t**)(orte_gpr_replica.segments->addr);
for (i=0; i < (orte_gpr_replica.segments)->size; i++) {
cntri = 0;
for (i=0; cntri < orte_gpr_replica.num_segs &&
i < (orte_gpr_replica.segments)->size; i++) {
if (NULL != ptr[i]) {
cntri++;
if (0 == strncmp(segment, ptr[i]->name, len)) {
*seg = ptr[i];
return ORTE_SUCCESS;
@ -64,5 +67,7 @@ int orte_gpr_replica_find_seg(orte_gpr_replica_segment_t **seg,
return rc;
}
(*seg)->itag = i;
(orte_gpr_replica.num_segs)++;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -75,6 +75,7 @@ static void test_cbfunc6(orte_gpr_notify_data_t *data, void *user_tag);
static int test1(void);
static int test2(void);
static int test3(void);
int main(int argc, char **argv)
@ -203,19 +204,27 @@ int main(int argc, char **argv)
exit (1);
}
/* test triggers that compare two counters to each other */
if (ORTE_SUCCESS == test1()) {
fprintf(test_out, "triggers: compare two counters successful\n");
// /* test triggers that compare two counters to each other */
// if (ORTE_SUCCESS == test1()) {
// fprintf(test_out, "triggers: compare two counters successful\n");
// } else {
// fprintf(test_out, "triggers: compare two counters failed\n");
// rc = 1;
// }
//
// /* test triggers that fire at a level */
// if (ORTE_SUCCESS == test2()) {
// fprintf(test_out, "triggers: trigger at level successful\n");
// } else {
// fprintf(test_out, "triggers: trigger at level failed\n");
// rc = 1;
// }
//
/* test notification on value added */
if (ORTE_SUCCESS == test3()) {
fprintf(test_out, "triggers: notify upon value added successful\n");
} else {
fprintf(test_out, "triggers: compare two counters failed\n");
rc = 1;
}
/* test triggers that fire at a level */
if (ORTE_SUCCESS == test2()) {
fprintf(test_out, "triggers: trigger at level successful\n");
} else {
fprintf(test_out, "triggers: trigger at level failed\n");
fprintf(test_out, "triggers: notify upon value added failed\n");
rc = 1;
}
@ -692,6 +701,118 @@ int test2(void)
}
int test3(void)
{
int rc;
size_t i;
orte_gpr_value_t value, *val;
orte_gpr_subscription_t *subscription;
orte_gpr_notify_id_t sub;
/* put something on the registry to start */
val = OBJ_NEW(orte_gpr_value_t);
val->addr_mode = ORTE_GPR_NO_OVERWRITE | ORTE_GPR_TOKENS_XAND;
val->segment = strdup("test-segment");
val->num_tokens = 10;
val->tokens = (char**)malloc(val->num_tokens * sizeof(char*));
for (i=0; i < val->num_tokens; i++) {
asprintf(&(val->tokens[i]), "dummy-sub-%lu", (unsigned long) i);
}
val->cnt = 20;
val->keyvals = (orte_gpr_keyval_t**)malloc(val->cnt * sizeof(orte_gpr_keyval_t*));
for (i=0; i<val->cnt; i++) {
val->keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
asprintf(&((val->keyvals[i])->key), "stupid-test-%lu",
(unsigned long) i);
(val->keyvals[i])->type = ORTE_UINT32;
(val->keyvals[i])->value.ui32 = (uint32_t)i;
}
if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) {
fprintf(test_out, "put failed with error code %s\n",
ORTE_ERROR_NAME(rc));
test_failure("put failed");
test_finalize();
return rc;
}
OBJ_RELEASE(val);
/* setup a subscription on one of the containers
* that notifies callback 1 if something is added
*/
subscription = OBJ_NEW(orte_gpr_subscription_t);
subscription->addr_mode = ORTE_GPR_TOKENS_OR;
subscription->segment = strdup("test-segment");
subscription->user_tag = NULL;
/* monitor the dummy-sub-xx container only
*/
subscription->num_tokens = 2;
subscription->tokens = (char**)malloc(2*sizeof(char*));
for (i=0; i < 2; i++) {
asprintf(&(subscription->tokens[i]), "dummy-sub-%lu", (unsigned long) i);
}
/* get notified when anything is added */
subscription->num_keys = 0;
subscription->keys = NULL;
/* send notification to callback 1 */
subscription->cbfunc = test_cbfunc1;
/* enter subscription */
rc = gpr_module->subscribe(
ORTE_GPR_NOTIFY_ADD_ENTRY,
1, &subscription,
0, NULL,
&sub);
gpr_module->dump_triggers(0);
/* cleanup */
OBJ_RELEASE(subscription);
/* add something to the container */
fprintf(test_out, "adding something - should trigger\n");
val = OBJ_NEW(orte_gpr_value_t);
val->addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
val->segment = strdup("test-segment");
val->tokens = (char**)malloc(sizeof(char*));
if (NULL == val->tokens) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
val->tokens[0] = strdup("dummy-sub-0");
val->num_tokens = 1;
val->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*));
if (NULL == val->keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
val->cnt = 1;
val->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == val->keyvals[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_DESTRUCT(&value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
val->keyvals[0]->key = strdup("test-notify-add");
val->keyvals[0]->type = ORTE_NULL;
if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(val);
return rc;
}
gpr_module->dump_all(0);
OBJ_RELEASE(val);
return ORTE_SUCCESS;
}
void test_cbfunc1(orte_gpr_notify_data_t *data, void *tag)
{
fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 1\n");