1
1

Fix a few thread-lock things discovered by Josh. The thread locks in the registry's local notify delivery system had not been updated to reflect the design change whereby the xcast uses the notify delivery system. This has now been fixed.

Also revised the callbacks to store and utilize local variables to avoid problems where threads modify the global structures. Not sure this totally fixes the problem, but it's a shot - suggested by Josh (and Jeff, I believe).

This commit was SVN r7694.
Этот коммит содержится в:
Ralph Castain 2005-10-11 19:35:04 +00:00
родитель 607bdf51b6
Коммит e1244fc160

Просмотреть файл

@ -37,12 +37,17 @@ int orte_gpr_replica_deliver_notify_msg(orte_gpr_notify_message_t *msg)
orte_gpr_notify_data_t **data;
orte_gpr_replica_local_trigger_t **local_trigs;
orte_gpr_replica_local_subscriber_t **local_subs, *sub;
orte_gpr_trigger_cb_fn_t trig_cb;
orte_gpr_notify_cb_fn_t sub_cb;
void *sub_usertag;
size_t i, j, k, n;
int rc;
bool processed;
OPAL_TRACE(1);
OPAL_THREAD_LOCK(&orte_gpr_replica_globals.mutex);
/* we first have to check if the message is a trigger message - if so,
* then the message is intended to be
* sent as a single block to that trigger's callback function.
@ -58,8 +63,9 @@ int orte_gpr_replica_deliver_notify_msg(orte_gpr_notify_message_t *msg)
if (NULL != local_trigs[i]) {
j++;
if (msg->id == local_trigs[i]->id) {
trig_cb = local_trigs[i]->callback;
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
local_trigs[i]->callback(msg);
trig_cb(msg);
OPAL_THREAD_LOCK(&orte_gpr_replica_globals.mutex);
processed = true;
}
@ -67,8 +73,10 @@ int orte_gpr_replica_deliver_notify_msg(orte_gpr_notify_message_t *msg)
}
if (!processed) { /* trigger could not be found */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return ORTE_ERR_NOT_FOUND;
}
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return ORTE_SUCCESS;
}
@ -78,6 +86,7 @@ int orte_gpr_replica_deliver_notify_msg(orte_gpr_notify_message_t *msg)
*/
if (ORTE_GPR_SUBSCRIPTION_MSG != msg->msg_type) {
ORTE_ERROR_LOG(ORTE_ERR_GPR_DATA_CORRUPT);
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return ORTE_ERR_GPR_DATA_CORRUPT;
}
@ -120,22 +129,27 @@ int orte_gpr_replica_deliver_notify_msg(orte_gpr_notify_message_t *msg)
/* get here and not found => abort */
if (NULL == sub ) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return ORTE_ERR_NOT_FOUND;
}
sub_cb = sub->callback;
sub_usertag = sub->user_tag;
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
sub->callback(data[i], sub->user_tag);
sub_cb(data[i], sub->user_tag);
OPAL_THREAD_LOCK(&orte_gpr_replica_globals.mutex);
if (data[i]->remove) {
if (ORTE_SUCCESS != (rc = orte_gpr_replica_remove_local_subscription(sub))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
return rc;
}
}
}
}
}
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
/* the calling program will release the message object */
return ORTE_SUCCESS;