Fix two things:
1. make it impossible to leave the thread locks set when errors are detected (couple of places had a path out of the function without clearing the lock) 2. redefine notify all so it doesn't trigger every time This commit was SVN r3700.
Этот коммит содержится в:
родитель
920fc1c494
Коммит
7287fe400c
@ -61,7 +61,7 @@ extern "C" {
|
|||||||
#define OMPI_REGISTRY_NOTIFY_INCLUDE_STARTUP_DATA (uint16_t)0x0080 /**< Provide data with startup message */
|
#define OMPI_REGISTRY_NOTIFY_INCLUDE_STARTUP_DATA (uint16_t)0x0080 /**< Provide data with startup message */
|
||||||
#define OMPI_REGISTRY_NOTIFY_INCLUDE_SHUTDOWN_DATA (uint16_t)0x0100 /**< Provide data with shutdown message */
|
#define OMPI_REGISTRY_NOTIFY_INCLUDE_SHUTDOWN_DATA (uint16_t)0x0100 /**< Provide data with shutdown message */
|
||||||
#define OMPI_REGISTRY_NOTIFY_ONE_SHOT (uint16_t)0x0200 /**< Only trigger once - then delete subscription */
|
#define OMPI_REGISTRY_NOTIFY_ONE_SHOT (uint16_t)0x0200 /**< Only trigger once - then delete subscription */
|
||||||
#define OMPI_REGISTRY_NOTIFY_ALL (uint16_t)0xffff /**< Notifies subscriber upon any action */
|
#define OMPI_REGISTRY_NOTIFY_ALL (uint16_t)0x8000 /**< Notifies subscriber upon any action */
|
||||||
|
|
||||||
typedef uint16_t ompi_registry_notify_action_t;
|
typedef uint16_t ompi_registry_notify_action_t;
|
||||||
|
|
||||||
|
@ -338,7 +338,8 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender,
|
|||||||
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
|
||||||
|
|
||||||
if (!found) { /* didn't find request */
|
if (!found) { /* didn't find request */
|
||||||
ompi_output(0, "Proxy notification error - received request not found");
|
ompi_output(0, "[%d,%d,%d] Proxy notification error - received request not found",
|
||||||
|
OMPI_NAME_ARGS(*ompi_rte_get_self()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +46,8 @@ int mca_gpr_replica_delete_segment(char *segment)
|
|||||||
/* locate the segment */
|
/* locate the segment */
|
||||||
seg = mca_gpr_replica_find_seg(false, segment, MCA_NS_BASE_JOBID_MAX);
|
seg = mca_gpr_replica_find_seg(false, segment, MCA_NS_BASE_JOBID_MAX);
|
||||||
if (NULL == seg) {
|
if (NULL == seg) {
|
||||||
return OMPI_ERROR;
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
mca_gpr_replica_delete_segment_nl(seg);
|
mca_gpr_replica_delete_segment_nl(seg);
|
||||||
@ -86,7 +87,7 @@ int mca_gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
|
|||||||
|
|
||||||
/* protect against errors */
|
/* protect against errors */
|
||||||
if (NULL == segment) {
|
if (NULL == segment) {
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mca_gpr_replica_compound_cmd_mode) {
|
if (mca_gpr_replica_compound_cmd_mode) {
|
||||||
@ -100,7 +101,8 @@ int mca_gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
|
|||||||
/* locate the segment */
|
/* locate the segment */
|
||||||
seg = mca_gpr_replica_find_seg(false, segment, ompi_name_server.get_jobid(ompi_rte_get_self()));
|
seg = mca_gpr_replica_find_seg(false, segment, ompi_name_server.get_jobid(ompi_rte_get_self()));
|
||||||
if (NULL == seg) {
|
if (NULL == seg) {
|
||||||
return OMPI_ERROR;
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
keys = mca_gpr_replica_get_key_list(seg, tokens, &num_keys);
|
keys = mca_gpr_replica_get_key_list(seg, tokens, &num_keys);
|
||||||
@ -112,7 +114,7 @@ int mca_gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
|
|||||||
mca_gpr_replica_check_synchros(seg);
|
mca_gpr_replica_check_synchros(seg);
|
||||||
|
|
||||||
if (NULL != keys) {
|
if (NULL != keys) {
|
||||||
free(keys);
|
free(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
@ -174,20 +176,21 @@ ompi_list_t* mca_gpr_replica_index(char *segment)
|
|||||||
mca_gpr_replica_segment_t *seg;
|
mca_gpr_replica_segment_t *seg;
|
||||||
|
|
||||||
if (mca_gpr_replica_compound_cmd_mode) {
|
if (mca_gpr_replica_compound_cmd_mode) {
|
||||||
mca_gpr_base_pack_index(mca_gpr_replica_compound_cmd, segment);
|
mca_gpr_base_pack_index(mca_gpr_replica_compound_cmd, segment);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||||
|
|
||||||
if (NULL == segment) { /* want global level index */
|
if (NULL == segment) { /* want global level index */
|
||||||
seg = NULL;
|
seg = NULL;
|
||||||
} else {
|
} else {
|
||||||
/* locate the segment */
|
/* locate the segment */
|
||||||
seg = mca_gpr_replica_find_seg(false, segment, MCA_NS_BASE_JOBID_MAX);
|
seg = mca_gpr_replica_find_seg(false, segment, MCA_NS_BASE_JOBID_MAX);
|
||||||
if (NULL == seg) {
|
if (NULL == seg) {
|
||||||
return NULL;
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
}
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
list = mca_gpr_replica_index_nl(seg);
|
list = mca_gpr_replica_index_nl(seg);
|
||||||
|
@ -58,8 +58,10 @@ int mca_gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
|
|||||||
/* find the segment */
|
/* find the segment */
|
||||||
seg = mca_gpr_replica_find_seg(true, segment,
|
seg = mca_gpr_replica_find_seg(true, segment,
|
||||||
ompi_name_server.get_jobid(ompi_rte_get_self()));
|
ompi_name_server.get_jobid(ompi_rte_get_self()));
|
||||||
|
|
||||||
if (NULL == seg) { /* couldn't find segment or create it */
|
if (NULL == seg) { /* couldn't find segment or create it */
|
||||||
return OMPI_ERROR;
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* convert tokens to array of keys */
|
/* convert tokens to array of keys */
|
||||||
@ -74,7 +76,7 @@ int mca_gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
|
|||||||
|
|
||||||
/* release list of keys */
|
/* release list of keys */
|
||||||
if (NULL != keys) {
|
if (NULL != keys) {
|
||||||
free(keys);
|
free(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
@ -174,11 +176,11 @@ ompi_list_t* mca_gpr_replica_get(ompi_registry_mode_t addr_mode,
|
|||||||
|
|
||||||
/* protect against errors */
|
/* protect against errors */
|
||||||
if (NULL == segment) {
|
if (NULL == segment) {
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mca_gpr_replica_compound_cmd_mode) {
|
if (mca_gpr_replica_compound_cmd_mode) {
|
||||||
mca_gpr_base_pack_get(mca_gpr_replica_compound_cmd, addr_mode, segment, tokens);
|
mca_gpr_base_pack_get(mca_gpr_replica_compound_cmd, addr_mode, segment, tokens);
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -187,7 +189,8 @@ ompi_list_t* mca_gpr_replica_get(ompi_registry_mode_t addr_mode,
|
|||||||
/* find the specified segment */
|
/* find the specified segment */
|
||||||
seg = mca_gpr_replica_find_seg(false, segment, MCA_NS_BASE_JOBID_MAX);
|
seg = mca_gpr_replica_find_seg(false, segment, MCA_NS_BASE_JOBID_MAX);
|
||||||
if (NULL == seg) { /* segment not found */
|
if (NULL == seg) { /* segment not found */
|
||||||
return list;
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* convert tokens to array of keys */
|
/* convert tokens to array of keys */
|
||||||
@ -196,7 +199,7 @@ ompi_list_t* mca_gpr_replica_get(ompi_registry_mode_t addr_mode,
|
|||||||
mca_gpr_replica_get_nl(list, addr_mode, seg, keys, num_keys);
|
mca_gpr_replica_get_nl(list, addr_mode, seg, keys, num_keys);
|
||||||
|
|
||||||
if (NULL != keys) {
|
if (NULL != keys) {
|
||||||
free(keys);
|
free(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
|
@ -40,12 +40,12 @@ mca_gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
|
|||||||
|
|
||||||
/* protect against errors */
|
/* protect against errors */
|
||||||
if (NULL == segment) {
|
if (NULL == segment) {
|
||||||
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
seg = mca_gpr_replica_find_seg(true, segment, ompi_name_server.get_jobid(ompi_rte_get_self()));
|
seg = mca_gpr_replica_find_seg(true, segment, ompi_name_server.get_jobid(ompi_rte_get_self()));
|
||||||
if (NULL == seg) { /* segment couldn't be found or created */
|
if (NULL == seg) { /* segment couldn't be found or created */
|
||||||
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
return OMPI_REGISTRY_NOTIFY_ID_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mca_gpr_replica_compound_cmd_mode) {
|
if (mca_gpr_replica_compound_cmd_mode) {
|
||||||
@ -82,7 +82,7 @@ mca_gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
|
|||||||
mca_gpr_replica_check_subscriptions(seg, MCA_GPR_REPLICA_SUBSCRIBER_ADDED);
|
mca_gpr_replica_check_subscriptions(seg, MCA_GPR_REPLICA_SUBSCRIBER_ADDED);
|
||||||
|
|
||||||
if (NULL != keys) {
|
if (NULL != keys) {
|
||||||
free(keys);
|
free(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||||
@ -115,6 +115,7 @@ int mca_gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode,
|
|||||||
0, id_tag))) {
|
0, id_tag))) {
|
||||||
|
|
||||||
if ((OMPI_REGISTRY_NOTIFY_PRE_EXISTING & action) && seg->triggers_active) { /* want list of everything there */
|
if ((OMPI_REGISTRY_NOTIFY_PRE_EXISTING & action) && seg->triggers_active) { /* want list of everything there */
|
||||||
|
ompi_output(0, "Notify pre-existing fired\n");
|
||||||
notify_msg = mca_gpr_replica_construct_notify_message(seg, trig);
|
notify_msg = mca_gpr_replica_construct_notify_message(seg, trig);
|
||||||
notify_msg->trig_action = action;
|
notify_msg->trig_action = action;
|
||||||
notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
|
notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user