diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_fn.h b/src/mca/gpr/replica/functional_layer/gpr_replica_fn.h index cfafde95b9..9cca866e1c 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_fn.h +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_fn.h @@ -266,12 +266,17 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg); int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig, orte_gpr_replica_subscribed_data_t *sub); +bool orte_gpr_replica_check_notify_matches(orte_gpr_replica_subscribed_data_t *sub, + orte_gpr_replica_action_taken_t *ptr); + int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig); int orte_gpr_replica_update_storage_locations(orte_gpr_replica_itagval_t *new_iptr); -int orte_gpr_replica_construct_notify_message(orte_gpr_notify_message_t **msg, - orte_gpr_replica_triggers_t *trig); +int orte_gpr_replica_construct_notify_message(orte_gpr_notify_message_t *msg, + orte_gpr_replica_triggers_t *trig, + orte_gpr_replica_subscribed_data_t *sub, + orte_gpr_value_t *value); int orte_gpr_replica_enter_notify_request(orte_gpr_notify_id_t *local_idtag, @@ -283,14 +288,20 @@ int orte_gpr_replica_remove_notify_request(orte_gpr_notify_id_t local_idtag, orte_gpr_notify_id_t *remote_idtag); -int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig); +int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig, + orte_gpr_replica_subscribed_data_t *sub, + orte_gpr_value_t *value); int orte_gpr_replica_process_callbacks(void); int orte_gpr_replica_purge_subscriptions(orte_process_name_t *proc); -int orte_gpr_replica_add_values(orte_gpr_notify_data_t **data, - orte_gpr_replica_subscribed_data_t *sptr); +int orte_gpr_replica_add_values_from_registry(orte_gpr_notify_message_t *msg, + orte_gpr_replica_subscribed_data_t *sptr); + +int orte_gpr_replica_store_value_in_msg(orte_gpr_notify_id_t cb_num, + orte_gpr_notify_message_t *msg, + orte_gpr_value_t *value); #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c index bc1c4efef1..3f6800c948 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c @@ -121,7 +121,9 @@ CLEANUP: -int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig) +int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig, + orte_gpr_replica_subscribed_data_t *sub, + orte_gpr_value_t *value) { orte_gpr_replica_callbacks_t *cb; orte_gpr_replica_notify_msg_list_t *msg; @@ -137,7 +139,9 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig) msg != (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_end(&(cb->messages)); msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_next(msg)) { if ((msg->message)->idtag == trig->index) { /* same trigger - add to it */ - if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) { + if (ORTE_SUCCESS != (rc = + orte_gpr_replica_construct_notify_message(msg->message, + trig, sub, value))) { ORTE_ERROR_LOG(rc); } return rc; @@ -149,7 +153,7 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig) ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } - ompi_list_append(&cb->messages, &msg->item); + ompi_list_append(&cb->messages, &msg->item); /* construct the message */ msg->message = OBJ_NEW(orte_gpr_notify_message_t); @@ -164,7 +168,9 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig) (msg->message)->idtag = trig->remote_idtag; } - if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) { + if (ORTE_SUCCESS != (rc = + orte_gpr_replica_construct_notify_message(msg->message, + trig, sub, value))) { ORTE_ERROR_LOG(rc); } return rc; @@ -221,7 +227,9 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig) ompi_list_append(&cb->messages, &msg->item); - if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(&(msg->message), trig))) { + if (ORTE_SUCCESS != (rc = + orte_gpr_replica_construct_notify_message(msg->message, + trig, sub, value))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(cb); return rc; @@ -236,70 +244,34 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig) } -int orte_gpr_replica_construct_notify_message(orte_gpr_notify_message_t **msg, - orte_gpr_replica_triggers_t *trig) +int orte_gpr_replica_construct_notify_message(orte_gpr_notify_message_t *msg, + orte_gpr_replica_triggers_t *trig, + orte_gpr_replica_subscribed_data_t *sub, + orte_gpr_value_t *value) { int rc=ORTE_SUCCESS; - orte_gpr_notify_data_t **data; orte_gpr_replica_subscribed_data_t **sptr; - size_t i, k; + size_t i; /* if we don't have data, just return */ - if (0 >= trig->num_subscribed_data) { + if (0 >= trig->num_subscribed_data && NULL == value) { return ORTE_SUCCESS; } + /* check to see if value provided - if so, use it */ + if (NULL != value) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_store_value_in_msg(sub->index, + msg, value))) { + ORTE_ERROR_LOG(rc); + } + return rc; + } + + /* otherwise, go get values off of registry and add them to the data object */ sptr = (orte_gpr_replica_subscribed_data_t**)((trig->subscribed_data)->addr); for (i=0; i < (trig->subscribed_data)->size; i++) { if (NULL != sptr[i]) { - if (NULL == (*msg)->data) { /* first data item on the message */ - (*msg)->data = (orte_gpr_notify_data_t**)malloc(sizeof(orte_gpr_notify_data_t*)); - if (NULL == (*msg)->data) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - data = &((*msg)->data[0]); /* need to assign location */ - (*msg)->cnt = 1; - } else { - /* check to see if this data is going to the same callback as - * any prior data on the message. if so, then we add those keyvals - * to the existing data structure. if not, then we realloc to - * establish a new data structure and store the data there - */ - for (k=0; k < (*msg)->cnt; k++) { - if ((*msg)->data[k]->cb_num == sptr[i]->index) { /* going to the same place */ - data = &((*msg)->data[k]); - goto MOVEON; - } - } - /* no prior matching data found, so add another data location to the message */ - (*msg)->data = (orte_gpr_notify_data_t **) realloc((*msg)->data, ((*msg)->cnt + 1)*sizeof(orte_gpr_notify_data_t*)); - if (NULL == (*msg)->data) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - data = &((*msg)->data[(*msg)->cnt]); - ((*msg)->cnt)++; - } - - *data = OBJ_NEW(orte_gpr_notify_data_t); - if (NULL == *data) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - /* for each data object, store the callback_number, addressing mode, and name - * of the segment this data came from - */ - (*data)->cb_num = sptr[i]->index; - (*data)->addr_mode = sptr[i]->addr_mode; - (*data)->segment = strdup((sptr[i]->seg)->name); - if (NULL == (*data)->segment) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } -MOVEON: - /* add the values to the data object */ - if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_values(data, sptr[i]))) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_values_from_registry(msg, sptr[i]))) { ORTE_ERROR_LOG(rc); return rc; } @@ -310,13 +282,12 @@ MOVEON: } -int orte_gpr_replica_add_values(orte_gpr_notify_data_t **data, - orte_gpr_replica_subscribed_data_t *sptr) +int orte_gpr_replica_add_values_from_registry(orte_gpr_notify_message_t *msg, + orte_gpr_replica_subscribed_data_t *sptr) { - size_t i, j, k, n, m, matches, num_tokens, num_keys, cnt; + size_t i, j, num_tokens, num_keys, cnt; int rc; - orte_gpr_value_t **values = NULL, **data_values; - orte_gpr_keyval_t **kptr; + orte_gpr_value_t **values = NULL; /* get the data off the registry */ num_tokens = orte_value_array_get_size(&(sptr->tokentags)); @@ -356,115 +327,12 @@ int orte_gpr_replica_add_values(orte_gpr_notify_data_t **data, * where containers match */ for (i=0; i < cnt; i++) { - if (NULL == (*data)->values) { /* first value on the structure */ - (*data)->values = (orte_gpr_value_t**)malloc(sizeof(orte_gpr_value_t*)); - if (NULL == (*data)->values) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - data_values = &((*data)->values[0]); /* need to assign location */ - (*data)->cnt = 1; - } else { - /* check to see if this value is from the same container - * as some prior one. if so, then we add those itagvals - * to the existing value structure. if not, then we realloc to - * establish a new value structure and store the data there - */ - for (k=0; k < (*data)->cnt; k++) { - matches = 0; - num_tokens = (*data)->values[k]->num_tokens; - if (num_tokens == values[i]->num_tokens) { /* must have same number or can't match */ - for (j=0; j < num_tokens; j++) { - for (m=0; m < num_tokens; m++) { - if (0 == strcmp(((*data)->values[k])->tokens[j], values[i]->tokens[m])) { - matches++; - } - } - if (num_tokens == matches) { /* from same container - just add keyvals to it */ - data_values = &((*data)->values[k]); - goto MOVEON; - } - } - } - } - /* no prior matching data found, so add another value location to the object */ - (*data)->values = (orte_gpr_value_t**)realloc((*data)->values, ((*data)->cnt + 1)*sizeof(orte_gpr_value_t*)); - if (NULL == (*data)->values) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - data_values = &((*data)->values[(*data)->cnt]); - ((*data)->cnt)++; - } - - *data_values = OBJ_NEW(orte_gpr_value_t); - if (NULL == *data_values) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - - /* record the addressing mode */ - (*data_values)->addr_mode = sptr->addr_mode; - /* record the segment these values came from */ - (*data_values)->segment = strdup((sptr->seg)->name); - if (NULL == ((*data_values)->segment)) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - /* record the tokens describing the container */ - (*data_values)->num_tokens = values[i]->num_tokens; - (*data_values)->tokens = (char **)malloc(values[i]->num_tokens * sizeof(char*)); - if (NULL == (*data_values)->tokens) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - for (n=0; n < values[i]->num_tokens; n++) { - (*data_values)->tokens[n] = strdup(values[i]->tokens[n]); - if (NULL == (*data_values)->tokens[n]) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - } -MOVEON: - /* record the values to be returned */ - if (0 < (*data_values)->cnt) { /* already have some data here, so add to the space */ - n = (*data_values)->cnt + values[i]->cnt; - (*data_values)->keyvals = (orte_gpr_keyval_t**) - realloc((*data_values)->keyvals, n * sizeof(orte_gpr_keyval_t*)); - if (NULL == (*data_values)->keyvals) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - kptr = &((*data_values)->keyvals[(*data_values)->cnt]); - (*data_values)->cnt = n; - } else { - (*data_values)->keyvals = (orte_gpr_keyval_t**)malloc(values[i]->cnt * sizeof(orte_gpr_keyval_t*)); - if (NULL == (*data_values)->keyvals) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - (*data_values)->cnt = values[i]->cnt; - kptr = (*data_values)->keyvals; - } - - for (n=0; n < values[i]->cnt; n++) { - kptr[n] = OBJ_NEW(orte_gpr_keyval_t); - if (NULL == kptr[n]) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - kptr[n]->key = strdup((values[i]->keyvals[n])->key); - if (NULL == kptr[n]->key) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - kptr[n]->type = (values[i]->keyvals[n])->type; - if (ORTE_SUCCESS != (rc = orte_gpr_base_xfer_payload( - &(kptr[n]->value), &((values[i]->keyvals[n])->value), - (values[i]->keyvals[n])->type))) { - ORTE_ERROR_LOG(rc); - return rc; - } + if (ORTE_SUCCESS != (rc = orte_gpr_replica_store_value_in_msg(sptr->index, + msg, values[i]))) { + ORTE_ERROR_LOG(rc); + for (j=i; j < cnt; j++) OBJ_RELEASE(values[j]); + free(values); + return rc; } OBJ_RELEASE(values[i]); } /* for i */ @@ -474,3 +342,177 @@ MOVEON: return ORTE_SUCCESS; } + +int orte_gpr_replica_store_value_in_msg(orte_gpr_notify_id_t cb_num, + orte_gpr_notify_message_t *msg, + orte_gpr_value_t *value) +{ + size_t j, k, n, m, matches, num_tokens; + int rc; + orte_gpr_value_t **data_values; + orte_gpr_keyval_t **kptr; + orte_gpr_notify_data_t **data; + + if (NULL == msg->data) { /* first data item on message */ + msg->data = (orte_gpr_notify_data_t**)malloc(sizeof(orte_gpr_notify_data_t*)); + if (NULL == msg->data) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + data = &(msg->data[0]); /* need to assign location */ + msg->cnt = 1; + } else { + /* check to see if this data is going to the same callback as + * any prior data on the message. if so, then we add those keyvals + * to the existing data structure. if not, then we realloc to + * establish a new data structure and store the data there + */ + for (k=0; k < msg->cnt; k++) { + if (msg->data[k]->cb_num == cb_num) { /* going to the same place */ + data = &(msg->data[k]); + goto MOVEON; + } + } + /* no prior matching data found, so add another data location to the message */ + msg->data = (orte_gpr_notify_data_t **) realloc(msg->data, (msg->cnt + 1)*sizeof(orte_gpr_notify_data_t*)); + if (NULL == msg->data) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + data = &(msg->data[msg->cnt]); + (msg->cnt)++; + } + + *data = OBJ_NEW(orte_gpr_notify_data_t); + if (NULL == *data) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + /* store the callback_number, addressing mode, and name + * of the segment this data came from + */ + (*data)->cb_num = cb_num; + (*data)->addr_mode = value->addr_mode; + (*data)->segment = strdup(value->segment); + if (NULL == (*data)->segment) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + +MOVEON: + /* add the values to the data object */ + if (NULL == (*data)->values) { /* first value on the structure */ + (*data)->values = (orte_gpr_value_t**)malloc(sizeof(orte_gpr_value_t*)); + if (NULL == (*data)->values) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + data_values = &((*data)->values[0]); /* need to assign location */ + (*data)->cnt = 1; + } else { + /* check to see if this value is from the same container + * as some prior one. if so, then we add those itagvals + * to the existing value structure. if not, then we realloc to + * establish a new value structure and store the data there + */ + for (k=0; k < (*data)->cnt; k++) { + matches = 0; + num_tokens = (*data)->values[k]->num_tokens; + if (num_tokens == value->num_tokens) { /* must have same number or can't match */ + for (j=0; j < num_tokens; j++) { + for (m=0; m < num_tokens; m++) { + if (0 == strcmp(((*data)->values[k])->tokens[j], value->tokens[m])) { + matches++; + } + } + if (num_tokens == matches) { /* from same container - just add keyvals to it */ + data_values = &((*data)->values[k]); + goto MOVEON2; + } + } + } + } + /* no prior matching data found, so add another value location to the object */ + (*data)->values = (orte_gpr_value_t**)realloc((*data)->values, ((*data)->cnt + 1)*sizeof(orte_gpr_value_t*)); + if (NULL == (*data)->values) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + data_values = &((*data)->values[(*data)->cnt]); + ((*data)->cnt)++; + } + + *data_values = OBJ_NEW(orte_gpr_value_t); + if (NULL == *data_values) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* record the addressing mode */ + (*data_values)->addr_mode = value->addr_mode; + /* record the segment these values came from */ + (*data_values)->segment = strdup(value->segment); + if (NULL == ((*data_values)->segment)) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + /* record the tokens describing the container */ + (*data_values)->num_tokens = value->num_tokens; + if (0 < value->num_tokens) { /* could be a wildcard case */ + (*data_values)->tokens = (char **)malloc(value->num_tokens * sizeof(char*)); + if (NULL == (*data_values)->tokens) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + for (n=0; n < value->num_tokens; n++) { + (*data_values)->tokens[n] = strdup(value->tokens[n]); + if (NULL == (*data_values)->tokens[n]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + } + } + +MOVEON2: + /* record the value to be returned */ + if (0 < (*data_values)->cnt) { /* already have some data here, so add to the space */ + n = (*data_values)->cnt + value->cnt; + (*data_values)->keyvals = (orte_gpr_keyval_t**) + realloc((*data_values)->keyvals, n * sizeof(orte_gpr_keyval_t*)); + if (NULL == (*data_values)->keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + kptr = &((*data_values)->keyvals[(*data_values)->cnt]); + (*data_values)->cnt = n; + } else { + (*data_values)->keyvals = (orte_gpr_keyval_t**)malloc(value->cnt * sizeof(orte_gpr_keyval_t*)); + if (NULL == (*data_values)->keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + (*data_values)->cnt = value->cnt; + kptr = (*data_values)->keyvals; + } + + for (n=0; n < value->cnt; n++) { + kptr[n] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == kptr[n]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + kptr[n]->key = strdup((value->keyvals[n])->key); + if (NULL == kptr[n]->key) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + kptr[n]->type = (value->keyvals[n])->type; + if (ORTE_SUCCESS != (rc = orte_gpr_base_xfer_payload( + &(kptr[n]->value), &((value->keyvals[n])->value), + (value->keyvals[n])->type))) { + ORTE_ERROR_LOG(rc); + return rc; + } + } + return ORTE_SUCCESS; +} diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c index 6e7b88473f..7fbf12f4fb 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c @@ -151,6 +151,7 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, /* initialize storage for actions taken */ orte_pointer_array_clear(orte_gpr_replica_globals.acted_upon); + orte_gpr_replica_globals.num_acted_upon = 0; /* extract the token address mode and overwrite permissions */ overwrite = false; @@ -389,6 +390,7 @@ int orte_gpr_replica_get_fn(orte_gpr_addr_mode_t addr_mode, rc = ORTE_ERR_OUT_OF_RESOURCE; goto CLEANUP; } + (*values)[i]->addr_mode = addr_mode; (*values)[i]->segment = strdup(seg->name); (*values)[i]->cnt = ompi_list_get_size(gptr->ival_list); cptr2 = gptr->cptr; diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c index e632d16de4..154ea94228 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c @@ -86,6 +86,7 @@ orte_gpr_replica_enter_notify_request(orte_gpr_notify_id_t *local_idtag, ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } + (orte_gpr_replica.num_trigs)++; *local_idtag = (orte_gpr_notify_id_t)trig->index; @@ -146,6 +147,9 @@ int orte_gpr_replica_record_action(orte_gpr_replica_segment_t *seg, return ORTE_ERR_OUT_OF_RESOURCE; } + /* increment the number acted upon */ + (orte_gpr_replica_globals.num_acted_upon)++; + return ORTE_SUCCESS; } @@ -189,12 +193,15 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg) { orte_gpr_replica_triggers_t **trig; orte_gpr_replica_subscribed_data_t **sub; - size_t i, j; + size_t i, j, cntri, cntrj; int rc; trig = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr); - for (i=0; i < (orte_gpr_replica.triggers)->size; i++) { + cntri = 0; + for (i=0; cntri < orte_gpr_replica.num_trigs && + i < (orte_gpr_replica.triggers)->size; i++) { if (NULL != trig[i]) { + cntri++; /* check if trigger is on this subscription - if so, check it */ if (ORTE_GPR_TRIG_ANY & trig[i]->action) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_trig(trig[i]))) { @@ -205,8 +212,8 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg) /* check if notifier is on this subscription - if so, check to see * if it has fired, but ONLY if NOTIFY_START is NOT set */ - if ((ORTE_GPR_NOTIFY_ANY & trig[i]->action) & - (ORTE_GPR_TRIG_NOTIFY_START & trig[i]->action)) { + if ((ORTE_GPR_NOTIFY_ANY & trig[i]->action) && + !(ORTE_GPR_TRIG_NOTIFY_START & trig[i]->action)) { /* for notifier subscriptions, the data structures * in the trigger define the data being monitored. First, * check to see if the segment that was modified matches @@ -215,11 +222,16 @@ int orte_gpr_replica_check_subscriptions(orte_gpr_replica_segment_t *seg) */ sub = (orte_gpr_replica_subscribed_data_t**) (trig[i]->subscribed_data)->addr; - for (j=0; j < (trig[i]->subscribed_data)->size; j++) { - if ((NULL != sub[j]) && (seg == sub[j]->seg)) { - if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_notify(trig[i], sub[j]))) { - ORTE_ERROR_LOG(rc); - return rc; + cntrj = 0; + for (j=0; cntrj < trig[i]->num_subscribed_data && + j < (trig[i]->subscribed_data)->size; j++) { + if (NULL != sub[j]) { + cntrj++; + if (seg == sub[j]->seg) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_check_notify(trig[i], sub[j]))) { + ORTE_ERROR_LOG(rc); + return rc; + } } } } @@ -235,7 +247,7 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig) orte_gpr_replica_counter_t **cntr; orte_gpr_replica_itagval_t *base_value=NULL; bool first, fire; - size_t i; + size_t i, cntri; int cmp; int rc; @@ -243,8 +255,11 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig) cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr); first = true; fire = true; - for (i=0; i < (trig->counters)->size && fire; i++) { + cntri = 0; + for (i=0; cntri < trig->num_counters && + i < (trig->counters)->size && fire; i++) { if (NULL != cntr[i]) { + cntri++; if (first) { base_value = cntr[i]->iptr; first = false; @@ -265,7 +280,7 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig) if (orte_gpr_replica_globals.debug) { ompi_output(0, "REGISTERING CALLBACK FOR TRIG %d", trig->index); } - if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig))) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig, NULL, NULL))) { ORTE_ERROR_LOG(rc); return rc; } @@ -276,8 +291,11 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig) } else if (ORTE_GPR_TRIG_AT_LEVEL & trig->action) { /* see if counters are at a level */ cntr = (orte_gpr_replica_counter_t**)((trig->counters)->addr); fire = true; - for (i=0; i < (trig->counters)->size && fire; i++) { + cntri = 0; + for (i=0; cntri < trig->num_counters && + i < (trig->counters)->size && fire; i++) { if (NULL != cntr[i]) { + cntri++; if (ORTE_SUCCESS != (rc = orte_gpr_replica_compare_values(&cmp, cntr[i]->iptr, &(cntr[i]->trigger_level)))) { @@ -290,7 +308,7 @@ int orte_gpr_replica_check_trig(orte_gpr_replica_triggers_t *trig) } } if (fire) { /* all counters at specified trigger level */ - if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig))) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_register_callback(trig, NULL, NULL))) { ORTE_ERROR_LOG(rc); return rc; } @@ -321,44 +339,142 @@ FIRED: * When entering this function, we know two things: (a) something was modified * on the segment specified in the subscription, and (b) notifications are * active. What we now need to determine is whether or not any of the data - * objects pointed to by the subscription were involved in the change. These - * objects could just be a container - e.g., the subscriber might want to know + * objects pointed to by the subscription were involved in the change. The + * subscription could describe a container - e.g., the subscriber might want to know * if anything gets added to a container - or could be a container plus one or - * more keys when the subscriber wants to know if a specific value gets changed. + * more keys when the subscriber wants to know when a specific value gets changed. */ int orte_gpr_replica_check_notify(orte_gpr_replica_triggers_t *trig, orte_gpr_replica_subscribed_data_t *sub) { orte_gpr_replica_action_taken_t **ptr; - size_t i; + size_t i, cntr; + orte_gpr_value_t value; + orte_gpr_replica_itag_t *itaglist; + int rc=ORTE_SUCCESS; + + OBJ_CONSTRUCT(&value, orte_gpr_value_t); + value.segment = strdup(sub->seg->name); + value.addr_mode = sub->addr_mode; + value.num_tokens = orte_value_array_get_size(&(sub->tokentags)); + value.tokens = (char **)malloc(value.num_tokens * sizeof(char*)); + if (NULL == value.tokens) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + goto CLEANUP; + } + itaglist = ORTE_VALUE_ARRAY_GET_BASE(&(sub->tokentags), orte_gpr_replica_itag_t); + for (i=0; i < value.num_tokens; i++) { + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( + &(value.tokens[i]), sub->seg, itaglist[i]))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + } + value.cnt = 1; + value.keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*)); + if (NULL == value.keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == value.keyvals[0]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } ptr = (orte_gpr_replica_action_taken_t**)((orte_gpr_replica_globals.acted_upon)->addr); - for (i=0; i < (orte_gpr_replica_globals.acted_upon)->size; i++) { + cntr = 0; + for (i=0; cntr < orte_gpr_replica_globals.num_acted_upon && + i < (orte_gpr_replica_globals.acted_upon)->size; i++) { if (NULL != ptr[i]) { + cntr++; if ((trig->action & ORTE_GPR_NOTIFY_ADD_ENTRY) && - (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED)) { + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_ADDED) && + orte_gpr_replica_check_notify_matches(sub, ptr[i])) { /* send back the added entry */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_dict_reverse_lookup( + &((value.keyvals[0])->key), sub->seg, + ptr[i]->iptr->itag))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + (value.keyvals[0])->type = ptr[i]->iptr->type; + if (ORTE_SUCCESS != (rc = orte_gpr_base_xfer_payload( + &((value.keyvals[0])->value), &(ptr[i]->iptr->value), + ptr[i]->iptr->type))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + if (ORTE_SUCCESS != (rc = + orte_gpr_replica_register_callback(trig, sub, &value))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } } else if ((trig->action & ORTE_GPR_NOTIFY_DEL_ENTRY) & - (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED)){ + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_DELETED) && + orte_gpr_replica_check_notify_matches(sub, ptr[i])){ /* send back the deleted entry */ } else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) && - (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED)) { - /* see if the acted_upon data was the target of the subscription */ - /* send back the new data */ - } else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) && - (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO)) { + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_TO) && + orte_gpr_replica_check_notify_matches(sub, ptr[i])) { /* ptr contains the "new" data - check to see if it matches * the subscription. if so, send back the new data */ } else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) && - (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM)) { + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHG_FRM) && + orte_gpr_replica_check_notify_matches(sub, ptr[i])) { /* ptr contains the "old" data - check to see if it matches * the subscription. if so, send back the new data */ + } else if ((trig->action & ORTE_GPR_NOTIFY_VALUE_CHG) && + (ptr[i]->action & ORTE_GPR_REPLICA_ENTRY_CHANGED) && + orte_gpr_replica_check_notify_matches(sub, ptr[i])) { + /* see if the acted_upon data was the target of the subscription */ + /* send back the new data */ } } } - return ORTE_SUCCESS; + +CLEANUP: + OBJ_DESTRUCT(&value); + return rc; +} + + +bool orte_gpr_replica_check_notify_matches(orte_gpr_replica_subscribed_data_t *sub, + orte_gpr_replica_action_taken_t *ptr) +{ + orte_gpr_replica_addr_mode_t tokmod; + + /* when we enter this function, we already know that the segments match. + * Thus, we need to check that we are looking at a matching container + * and matching keyval pattern + */ + + /* first, check to see if the containers match */ + tokmod = 0x004f & sub->addr_mode; + if (!orte_gpr_replica_check_itag_list(tokmod, + orte_value_array_get_size(&(sub->tokentags)), + ORTE_VALUE_ARRAY_GET_BASE(&(sub->tokentags), orte_gpr_replica_itag_t), + (ptr->cptr)->num_itags, + (ptr->cptr)->itags)) { + /* not this container - return false */ + return false; + } + /* next, check to see if this keyval was on the list */ + if (orte_gpr_replica_check_itag_list(ORTE_GPR_REPLICA_OR, + orte_value_array_get_size(&(sub->keytags)), + ORTE_VALUE_ARRAY_GET_BASE(&(sub->keytags), orte_gpr_replica_itag_t), + 1, + &(ptr->iptr->itag))) { + /* keyval is on list - return true */ + return true; + } + + /* if we get here, then the keyval was NOT on the list */ + return false; } diff --git a/src/mca/gpr/replica/gpr_replica.h b/src/mca/gpr/replica/gpr_replica.h index 9f5f867268..1bb25c5af7 100644 --- a/src/mca/gpr/replica/gpr_replica.h +++ b/src/mca/gpr/replica/gpr_replica.h @@ -92,6 +92,7 @@ typedef struct { orte_pointer_array_t *srch_cptr; orte_pointer_array_t *srch_ival; orte_pointer_array_t *acted_upon; + size_t num_acted_upon; orte_bitmap_t srch_itag; } orte_gpr_replica_globals_t; @@ -121,7 +122,9 @@ typedef struct orte_gpr_replica_dict_t orte_gpr_replica_dict_t; */ struct orte_gpr_replica_t { orte_pointer_array_t *segments; /**< Managed array of pointers to segment objects */ + size_t num_segs; orte_pointer_array_t *triggers; /**< Managed array of pointers to triggers */ + size_t num_trigs; ompi_list_t callbacks; /**< List of callbacks to be processed */ }; typedef struct orte_gpr_replica_t orte_gpr_replica_t; diff --git a/src/mca/gpr/replica/transition_layer/gpr_replica_segment_tl.c b/src/mca/gpr/replica/transition_layer/gpr_replica_segment_tl.c index 4cc55a3cf8..0bdf01e035 100644 --- a/src/mca/gpr/replica/transition_layer/gpr_replica_segment_tl.c +++ b/src/mca/gpr/replica/transition_layer/gpr_replica_segment_tl.c @@ -32,7 +32,7 @@ int orte_gpr_replica_find_seg(orte_gpr_replica_segment_t **seg, { size_t len; int rc=ORTE_SUCCESS; - size_t i; + size_t i, cntri; orte_gpr_replica_segment_t **ptr; /* initialize to nothing */ @@ -42,8 +42,11 @@ int orte_gpr_replica_find_seg(orte_gpr_replica_segment_t **seg, /* search the registry segments to find which one is being referenced */ ptr = (orte_gpr_replica_segment_t**)(orte_gpr_replica.segments->addr); - for (i=0; i < (orte_gpr_replica.segments)->size; i++) { + cntri = 0; + for (i=0; cntri < orte_gpr_replica.num_segs && + i < (orte_gpr_replica.segments)->size; i++) { if (NULL != ptr[i]) { + cntri++; if (0 == strncmp(segment, ptr[i]->name, len)) { *seg = ptr[i]; return ORTE_SUCCESS; @@ -64,5 +67,7 @@ int orte_gpr_replica_find_seg(orte_gpr_replica_segment_t **seg, return rc; } (*seg)->itag = i; + (orte_gpr_replica.num_segs)++; + return ORTE_SUCCESS; } diff --git a/test/mca/gpr/gpr_triggers.c b/test/mca/gpr/gpr_triggers.c index cd19055f86..f1947d68c0 100644 --- a/test/mca/gpr/gpr_triggers.c +++ b/test/mca/gpr/gpr_triggers.c @@ -75,6 +75,7 @@ static void test_cbfunc6(orte_gpr_notify_data_t *data, void *user_tag); static int test1(void); static int test2(void); +static int test3(void); int main(int argc, char **argv) @@ -203,19 +204,27 @@ int main(int argc, char **argv) exit (1); } - /* test triggers that compare two counters to each other */ - if (ORTE_SUCCESS == test1()) { - fprintf(test_out, "triggers: compare two counters successful\n"); +// /* test triggers that compare two counters to each other */ +// if (ORTE_SUCCESS == test1()) { +// fprintf(test_out, "triggers: compare two counters successful\n"); +// } else { +// fprintf(test_out, "triggers: compare two counters failed\n"); +// rc = 1; +// } +// +// /* test triggers that fire at a level */ +// if (ORTE_SUCCESS == test2()) { +// fprintf(test_out, "triggers: trigger at level successful\n"); +// } else { +// fprintf(test_out, "triggers: trigger at level failed\n"); +// rc = 1; +// } +// + /* test notification on value added */ + if (ORTE_SUCCESS == test3()) { + fprintf(test_out, "triggers: notify upon value added successful\n"); } else { - fprintf(test_out, "triggers: compare two counters failed\n"); - rc = 1; - } - - /* test triggers that fire at a level */ - if (ORTE_SUCCESS == test2()) { - fprintf(test_out, "triggers: trigger at level successful\n"); - } else { - fprintf(test_out, "triggers: trigger at level failed\n"); + fprintf(test_out, "triggers: notify upon value added failed\n"); rc = 1; } @@ -692,6 +701,118 @@ int test2(void) } +int test3(void) +{ + int rc; + size_t i; + orte_gpr_value_t value, *val; + orte_gpr_subscription_t *subscription; + orte_gpr_notify_id_t sub; + + /* put something on the registry to start */ + val = OBJ_NEW(orte_gpr_value_t); + val->addr_mode = ORTE_GPR_NO_OVERWRITE | ORTE_GPR_TOKENS_XAND; + val->segment = strdup("test-segment"); + val->num_tokens = 10; + val->tokens = (char**)malloc(val->num_tokens * sizeof(char*)); + for (i=0; i < val->num_tokens; i++) { + asprintf(&(val->tokens[i]), "dummy-sub-%lu", (unsigned long) i); + } + val->cnt = 20; + val->keyvals = (orte_gpr_keyval_t**)malloc(val->cnt * sizeof(orte_gpr_keyval_t*)); + for (i=0; icnt; i++) { + val->keyvals[i] = OBJ_NEW(orte_gpr_keyval_t); + asprintf(&((val->keyvals[i])->key), "stupid-test-%lu", + (unsigned long) i); + (val->keyvals[i])->type = ORTE_UINT32; + (val->keyvals[i])->value.ui32 = (uint32_t)i; + } + if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) { + fprintf(test_out, "put failed with error code %s\n", + ORTE_ERROR_NAME(rc)); + test_failure("put failed"); + test_finalize(); + return rc; + } + OBJ_RELEASE(val); + + /* setup a subscription on one of the containers + * that notifies callback 1 if something is added + */ + subscription = OBJ_NEW(orte_gpr_subscription_t); + subscription->addr_mode = ORTE_GPR_TOKENS_OR; + subscription->segment = strdup("test-segment"); + subscription->user_tag = NULL; + /* monitor the dummy-sub-xx container only + */ + subscription->num_tokens = 2; + subscription->tokens = (char**)malloc(2*sizeof(char*)); + for (i=0; i < 2; i++) { + asprintf(&(subscription->tokens[i]), "dummy-sub-%lu", (unsigned long) i); + } + /* get notified when anything is added */ + subscription->num_keys = 0; + subscription->keys = NULL; + + /* send notification to callback 1 */ + subscription->cbfunc = test_cbfunc1; + + /* enter subscription */ + rc = gpr_module->subscribe( + ORTE_GPR_NOTIFY_ADD_ENTRY, + 1, &subscription, + 0, NULL, + &sub); + + gpr_module->dump_triggers(0); + + /* cleanup */ + OBJ_RELEASE(subscription); + + + /* add something to the container */ + + fprintf(test_out, "adding something - should trigger\n"); + val = OBJ_NEW(orte_gpr_value_t); + val->addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR; + val->segment = strdup("test-segment"); + val->tokens = (char**)malloc(sizeof(char*)); + if (NULL == val->tokens) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + val->tokens[0] = strdup("dummy-sub-0"); + val->num_tokens = 1; + val->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*)); + if (NULL == val->keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + val->cnt = 1; + val->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == val->keyvals[0]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + OBJ_DESTRUCT(&value); + return ORTE_ERR_OUT_OF_RESOURCE; + } + val->keyvals[0]->key = strdup("test-notify-add"); + val->keyvals[0]->type = ORTE_NULL; + + if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(val); + return rc; + } + + gpr_module->dump_all(0); + OBJ_RELEASE(val); + + return ORTE_SUCCESS; +} + + void test_cbfunc1(orte_gpr_notify_data_t *data, void *tag) { fprintf(test_out, "\n\n\nTRIGGER FIRED AND RECEIVED AT CALLBACK 1\n");