Insert a flag into the registry replica to indicate that we are already processing callbacks, so don't call process_callbacks again. Helps to limit the stack size.
This commit was SVN r6047.
Этот коммит содержится в:
родитель
42d6bf48b6
Коммит
445f8a802e
@ -86,7 +86,9 @@ int orte_gpr_replica_increment_value(orte_gpr_value_t *value)
|
||||
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
|
||||
return rc;
|
||||
}
|
||||
rc = orte_gpr_replica_process_callbacks();
|
||||
if (!orte_gpr_replica.processing_callbacks) {
|
||||
rc = orte_gpr_replica_process_callbacks();
|
||||
}
|
||||
}
|
||||
|
||||
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
|
||||
@ -148,7 +150,9 @@ int orte_gpr_replica_decrement_value(orte_gpr_value_t *value)
|
||||
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
|
||||
return rc;
|
||||
}
|
||||
rc = orte_gpr_replica_process_callbacks();
|
||||
if (!orte_gpr_replica.processing_callbacks) {
|
||||
rc = orte_gpr_replica_process_callbacks();
|
||||
}
|
||||
}
|
||||
|
||||
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
|
||||
|
@ -123,7 +123,7 @@ int orte_gpr_replica_exec_compound_cmd(void)
|
||||
ompi_condition_signal(&orte_gpr_replica_globals.compound_cmd_condition);
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS == rc) {
|
||||
if (ORTE_SUCCESS == rc && !orte_gpr_replica.processing_callbacks) {
|
||||
rc = orte_gpr_replica_process_callbacks();
|
||||
}
|
||||
|
||||
|
@ -127,7 +127,7 @@ int orte_gpr_replica_delete_entries(orte_gpr_addr_mode_t addr_mode,
|
||||
free(key_itags);
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS == rc) {
|
||||
if (ORTE_SUCCESS == rc && !orte_gpr_replica.processing_callbacks) {
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr_replica_process_callbacks())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ CLEANUP:
|
||||
free(itags);
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS == rc) {
|
||||
if (ORTE_SUCCESS == rc && !orte_gpr_replica.processing_callbacks) {
|
||||
rc = orte_gpr_replica_process_callbacks();
|
||||
}
|
||||
|
||||
|
@ -66,8 +66,10 @@ void orte_gpr_replica_recv(int status, orte_process_name_t* sender,
|
||||
orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_GPR, 0, orte_gpr_replica_recv, NULL);
|
||||
|
||||
/* be sure to process callbacks before returning */
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr_replica_process_callbacks())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
if (!orte_gpr_replica.processing_callbacks) {
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr_replica_process_callbacks())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
}
|
||||
|
||||
OMPI_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include "util/output.h"
|
||||
#include "dps/dps.h"
|
||||
#include "mca/errmgr/errmgr.h"
|
||||
#include "mca/ns/ns_types.h"
|
||||
#include "mca/rml/rml.h"
|
||||
|
||||
#include "gpr_replica_comm.h"
|
||||
|
@ -49,6 +49,13 @@ int orte_gpr_replica_delete_entries_fn(orte_gpr_addr_mode_t addr_mode,
|
||||
ORTE_NAME_ARGS(*(orte_process_info.my_name)), seg->name);
|
||||
}
|
||||
|
||||
/* if num_tokens == 0 and num_keys == 0, remove segment. We don't record
|
||||
* any actions when doing this so that subscriptions don't fire like mad
|
||||
*/
|
||||
if (0 == num_tokens && 0 == num_keys) {
|
||||
return orte_gpr_replica_release_segment(seg);
|
||||
}
|
||||
|
||||
/* initialize storage for actions taken */
|
||||
orte_pointer_array_clear(orte_gpr_replica_globals.acted_upon);
|
||||
orte_gpr_replica_globals.num_acted_upon = 0;
|
||||
@ -71,11 +78,6 @@ int orte_gpr_replica_delete_entries_fn(orte_gpr_addr_mode_t addr_mode,
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
/* if num_tokens == 0 and num_keys == 0, remove segment */
|
||||
if (0 == num_tokens && 0 == num_keys) {
|
||||
return orte_gpr_replica_release_segment(seg);
|
||||
}
|
||||
|
||||
/* if num_tokens == 0 and num_keys > 0, check every container */
|
||||
if (0 == num_tokens) {
|
||||
ptr = (orte_gpr_replica_container_t**)((seg->containers)->addr);
|
||||
|
@ -55,6 +55,9 @@ int orte_gpr_replica_process_callbacks(void)
|
||||
ompi_output(0, "gpr replica: process_callbacks entered");
|
||||
}
|
||||
|
||||
/* set flag indicating callbacks being processed */
|
||||
orte_gpr_replica.processing_callbacks = true;
|
||||
|
||||
while (NULL != (cb = (orte_gpr_replica_callbacks_t*)ompi_list_remove_first(&orte_gpr_replica.callbacks))) {
|
||||
|
||||
if (NULL == cb->requestor) { /* local callback */
|
||||
@ -103,6 +106,9 @@ CLEANUP:
|
||||
OBJ_RELEASE(cb);
|
||||
}
|
||||
|
||||
/* all callbacks processed - indicate list is open */
|
||||
orte_gpr_replica.processing_callbacks = false;
|
||||
|
||||
/* cleanup any one-shot triggers that fired */
|
||||
trigs = (orte_gpr_replica_triggers_t**)((orte_gpr_replica.triggers)->addr);
|
||||
for (i=0; i < (orte_gpr_replica.triggers)->size; i++) {
|
||||
@ -116,6 +122,7 @@ CLEANUP:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -183,7 +190,7 @@ int orte_gpr_replica_register_callback(orte_gpr_replica_triggers_t *trig,
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
|
||||
if (NULL == trig->requestor) { /* local request - queue local callback */
|
||||
cb->requestor = NULL;
|
||||
if (orte_gpr_replica_globals.debug) {
|
||||
|
@ -125,6 +125,7 @@ struct orte_gpr_replica_t {
|
||||
size_t num_segs;
|
||||
orte_pointer_array_t *triggers; /**< Managed array of pointers to triggers */
|
||||
size_t num_trigs;
|
||||
bool processing_callbacks;
|
||||
ompi_list_t callbacks; /**< List of callbacks to be processed */
|
||||
};
|
||||
typedef struct orte_gpr_replica_t orte_gpr_replica_t;
|
||||
|
@ -599,6 +599,7 @@ orte_gpr_base_module_t *orte_gpr_replica_init(bool *allow_multi_user_threads, bo
|
||||
|
||||
/* initialize the callback list head */
|
||||
OBJ_CONSTRUCT(&orte_gpr_replica.callbacks, ompi_list_t);
|
||||
orte_gpr_replica.processing_callbacks = false;
|
||||
|
||||
/* initialize the search arrays for temporarily storing search results */
|
||||
if (ORTE_SUCCESS != (rc = orte_pointer_array_init(&(orte_gpr_replica_globals.srch_cptr),
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user