diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c index 482727ea83..fef9fe5e19 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_messaging_fn.c @@ -113,12 +113,20 @@ CLEANUP: trigs = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr); for (i=0; i < (orte_gpr_replica.triggers)->size; i++) { if (NULL != trigs[i] && trigs[i]->one_shot_fired) { - k = trigs[i]->index; - OBJ_RELEASE(trigs[i]); - if (ORTE_SUCCESS != (rc = orte_pointer_array_set_item(orte_gpr_replica.triggers, - k, NULL))) { - ORTE_ERROR_LOG(rc); - return rc; + /* if no notify actions were specified to stick around after + * the trigger, then simply remove the trigger. Otherwise, + * clear the trigger flags and leave the notify flags alone + */ + if (ORTE_GPR_NOTIFY_ANY & trigs[i]->action) { + trigs[i]->action = trigs[i]->action & ORTE_GPR_NOTIFY_ANY; + } else { + k = trigs[i]->index; + OBJ_RELEASE(trigs[i]); + if (ORTE_SUCCESS != (rc = orte_pointer_array_set_item(orte_gpr_replica.triggers, + k, NULL))) { + ORTE_ERROR_LOG(rc); + return rc; + } } } } @@ -140,12 +148,18 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig, for (cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_first(&(orte_gpr_replica.callbacks)); cb != (orte_gpr_replica_callbacks_t*)ompi_list_get_end(&(orte_gpr_replica.callbacks)); cb = (orte_gpr_replica_callbacks_t*)ompi_list_get_next(cb)) { - if (0 == orte_ns.compare(ORTE_NS_CMP_ALL, trig->requestor, cb->requestor)) { /* same requestor - add to existing callback */ + if ((NULL == trig->requestor && NULL == cb->requestor) /* both local */ + || ((NULL != trig->requestor && NULL != cb->requestor) && + 0 == orte_ns.compare(ORTE_NS_CMP_ALL, trig->requestor, cb->requestor))) { /* same remote requestor - add to existing callback */ /* check to see if we already have something for this trigger - if so, add to it */ for (msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_first(&(cb->messages)); msg != (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_end(&(cb->messages)); msg = (orte_gpr_replica_notify_msg_list_t*)ompi_list_get_next(msg)) { if ((msg->message)->idtag == trig->index) { /* same trigger - add to it */ + if (orte_gpr_replica_globals.debug) { + ompi_output(0, "[%lu,%lu,%lu] process_trig: adding message for requestor [%lu,%lu,%lu] for idtag %lu\n", + ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(cb->requestor), (unsigned long)trig->index); + } if (ORTE_SUCCESS != (rc = orte_gpr_replica_construct_notify_message(msg->message, trig, sub, value))) { @@ -194,8 +208,8 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig, if (NULL == trig->requestor) { /* local request - queue local callback */ cb->requestor = NULL; if (orte_gpr_replica_globals.debug) { - ompi_output(0, "[%lu,%lu,%lu] process_trig: queueing local message\n", - ORTE_NAME_ARGS(orte_process_info.my_name)); + ompi_output(0, "[%lu,%lu,%lu] process_trig: queueing local callback for idtag %lu\n", + ORTE_NAME_ARGS(orte_process_info.my_name), (unsigned long)trig->index); } } else { /* remote request - queue remote callback */ @@ -205,14 +219,14 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig, return rc; } if (orte_gpr_replica_globals.debug) { - ompi_output(0, "[%lu,%lu,%lu] process_trig: queueing message for [%lu,%lu,%lu] using remoteid %d\n", + ompi_output(0, "[%lu,%lu,%lu] process_trig: queueing callback for [%lu,%lu,%lu] using remoteid %lu\n", ORTE_NAME_ARGS(orte_process_info.my_name), ORTE_NAME_ARGS(cb->requestor), - (int)trig->remote_idtag); + (unsigned long)trig->remote_idtag); } } ompi_list_append(&orte_gpr_replica.callbacks, &cb->item); - /* construct the message */ + /* construct the message list item */ msg = OBJ_NEW(orte_gpr_replica_notify_msg_list_t); if (NULL == msg) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); diff --git a/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c b/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c index cafa83e69e..dc205b2254 100644 --- a/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c +++ b/src/mca/gpr/replica/functional_layer/gpr_replica_trig_ops_fn.c @@ -445,7 +445,7 @@ bool orte_gpr_replica_check_notify_matches(orte_gpr_replica_subscribed_data_t *s * Thus, we need to check that we are looking at a matching container * and matching keyval pattern */ - + /* first, check to see if the containers match */ tokmod = 0x004f & sub->addr_mode; if (!orte_gpr_replica_check_itag_list(tokmod, diff --git a/test/mca/gpr/gpr_triggers.c b/test/mca/gpr/gpr_triggers.c index a432ad0e05..0f3cf0d835 100644 --- a/test/mca/gpr/gpr_triggers.c +++ b/test/mca/gpr/gpr_triggers.c @@ -909,8 +909,9 @@ int test4(void) asprintf(&(subscription->tokens[i]), "dummy-sub-%lu", (unsigned long) i); } /* get notified when anything is added */ - subscription->num_keys = 0; - subscription->keys = NULL; + subscription->num_keys = 1; + subscription->keys = (char**)malloc(sizeof(char*)); + subscription->keys[0] = strdup("stupid-test-0"); /* send notification to callback 1 */ subscription->cbfunc = test_cbfunc1; @@ -968,9 +969,9 @@ int test4(void) gpr_module->dump_all(0); OBJ_RELEASE(val); - fprintf(test_out, "adding something - should NOT trigger\n"); + fprintf(test_out, "changing something else - should NOT trigger\n"); val = OBJ_NEW(orte_gpr_value_t); - val->addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR; + val->addr_mode = ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR; val->segment = strdup("test-segment-2"); val->tokens = (char**)malloc(sizeof(char*)); if (NULL == val->tokens) { @@ -978,7 +979,7 @@ int test4(void) OBJ_DESTRUCT(&value); return ORTE_ERR_OUT_OF_RESOURCE; } - val->tokens[0] = strdup("dummy-sub-10"); + val->tokens[0] = strdup("dummy-sub-0"); val->num_tokens = 1; val->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*)); if (NULL == val->keyvals) { @@ -993,8 +994,9 @@ int test4(void) OBJ_DESTRUCT(&value); return ORTE_ERR_OUT_OF_RESOURCE; } - val->keyvals[0]->key = strdup("test-notify-add-no-fire"); - val->keyvals[0]->type = ORTE_NULL; + val->keyvals[0]->key = strdup("stupid-test-1"); + val->keyvals[0]->type = ORTE_UINT32; + val->keyvals[0]->value.ui32 = 6789; if (ORTE_SUCCESS != (rc = gpr_module->put(1, &val))) { ORTE_ERROR_LOG(rc);