1
1

Update to the registry, bringing publish/subscribe system online. Most functions now available - exceptions are a couple of more esoteric notify modes, and the "unsubscribe" function. Will bring those online over the weekend.

This commit was SVN r2346.
Этот коммит содержится в:
Ralph Castain 2004-08-28 01:59:56 +00:00
родитель 09192b32b4
Коммит 37f331dcff
10 изменённых файлов: 180 добавлений и 150 удалений

Просмотреть файл

@ -101,7 +101,6 @@ static void mca_gpr_notify_request_tracker_construct(mca_gpr_notify_request_trac
req->callback = NULL;
req->user_tag = NULL;
req->id_tag = MCA_GPR_NOTIFY_ID_MAX;
req->synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
}
/* destructor - used to free any resources held by instance */

Просмотреть файл

@ -38,8 +38,8 @@
#define OMPI_REGISTRY_NOTIFY_NONE 0x0000 /**< Null case */
#define OMPI_REGISTRY_NOTIFY_MODIFICATION 0x0001 /**< Notifies subscriber when object modified */
#define OMPI_REGISTRY_NOTIFY_ADD_SUBSCRIBER 0x0002 /**< Notifies subscriber when another subscriber added */
#define OMPI_REGISTRY_NOTIFY_DELETE 0x0004 /**< Notifies subscriber when object deleted */
#define OMPI_REGISTRY_NOTIFY_SYNCHRO 0x0008 /**< Indicate that synchro trigger occurred - not valid for subscribe command */
#define OMPI_REGISTRY_NOTIFY_DELETE_ENTRY 0x0004 /**< Notifies subscriber when object deleted */
#define OMPI_REGISTRY_NOTIFY_ADD_ENTRY 0x0008 /**< Notifies subscriber when object added */
#define OMPI_REGISTRY_NOTIFY_ALL 0xffff /**< Notifies subscriber upon any action */
typedef uint16_t ompi_registry_notify_action_t;
@ -175,7 +175,6 @@ struct mca_gpr_notify_request_tracker_t {
ompi_registry_notify_cb_fn_t callback;
void *user_tag;
mca_gpr_notify_id_t id_tag;
ompi_registry_synchro_mode_t synchro;
};
typedef struct mca_gpr_notify_request_tracker_t mca_gpr_notify_request_tracker_t;

Просмотреть файл

@ -347,6 +347,8 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
mca_gpr_notify_request_tracker_t *trackptr;
mca_gpr_idtag_list_t *ptr_free_id;
trackptr = NULL;
/* need to protect against errors */
if (NULL == segment) {
return OMPI_ERROR;
@ -422,11 +424,8 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_proxy_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
trackptr->synchro = synchro_mode;
if (OMPI_SUCCESS != ompi_pack(cmd, &trackptr->id_tag, 1, OMPI_INT32)) {
/* remove entry from lookup list */
ompi_list_remove_last(&mca_gpr_proxy_notify_request_tracker);
goto CLEANUP;
}
@ -438,25 +437,29 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
goto CLEANUP;
}
ompi_buffer_free(cmd);
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|| (MCA_GPR_SYNCHRO_CMD != command)) {
ompi_buffer_free(answer);
return OMPI_ERROR;
goto CLEANUP;
}
if ((OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) ||
(OMPI_SUCCESS != response)) {
ompi_buffer_free(answer);
return OMPI_ERROR;
goto CLEANUP;
}
ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item);
ompi_buffer_free(answer);
ompi_buffer_free(cmd);
return OMPI_SUCCESS;
CLEANUP:
if (NULL != trackptr) {
ptr_free_id = OBJ_NEW(mca_gpr_idtag_list_t);
ptr_free_id->id_tag = trackptr->id_tag;
ompi_list_append(&mca_gpr_proxy_free_notify_id_tags, &ptr_free_id->item);
}
ompi_buffer_free(cmd);
return OMPI_ERROR;

Просмотреть файл

@ -39,6 +39,7 @@ extern ompi_process_name_t *mca_gpr_my_replica;
extern ompi_list_t mca_gpr_proxy_notify_request_tracker;
extern mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag;
extern ompi_list_t mca_gpr_proxy_free_notify_id_tags;
extern int mca_gpr_proxy_debug;
/*
* Implementation of delete_segment().

Просмотреть файл

@ -17,6 +17,7 @@
#include "util/proc_info.h"
#include "util/output.h"
#include "mca/mca.h"
#include "mca/base/mca_base_param.h"
#include "mca/gpr/base/base.h"
#include "gpr_proxy.h"
@ -70,6 +71,7 @@ ompi_process_name_t *mca_gpr_my_replica;
ompi_list_t mca_gpr_proxy_notify_request_tracker;
mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag;
ompi_list_t mca_gpr_proxy_free_notify_id_tags;
int mca_gpr_proxy_debug;
/*
@ -78,6 +80,11 @@ ompi_list_t mca_gpr_proxy_free_notify_id_tags;
*/
int mca_gpr_proxy_open(void)
{
int id;
id = mca_base_param_register_int("gpr", "proxy", "debug", NULL, 0);
mca_base_param_lookup_int(id, &mca_gpr_proxy_debug);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -36,9 +36,7 @@ int gpr_replica_delete_segment(char *segment)
return OMPI_ERROR;
}
if (OMPI_SUCCESS != gpr_replica_empty_segment(seg)) { /* couldn't empty the segment */
return OMPI_ERROR;
}
OBJ_RELEASE(seg);
if (OMPI_SUCCESS != gpr_replica_delete_key(segment, NULL)) { /* couldn't remove dictionary entry */
return OMPI_ERROR;
@ -56,14 +54,14 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_core_t *entry_ptr;
ompi_registry_mode_t put_mode;
mca_gpr_replica_synchro_list_t *synchro;
mca_gpr_replica_trigger_list_t *trig;
ompi_registry_notify_message_t *notify_msg;
int return_code, num_tokens;
mca_gpr_replica_key_t *keys, *key2;
/* protect ourselves against errors */
if (NULL == object || 0 == size || NULL == tokens || NULL == *tokens) {
if (NULL == segment || NULL == object || 0 == size || NULL == tokens || NULL == *tokens) {
return OMPI_ERROR;
}
put_mode = addr_mode & OMPI_REGISTRY_OVERWRITE; /* only overwrite permission mode flag allowed */
@ -127,27 +125,31 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
return_code = OMPI_SUCCESS;
/* update synchro list and check for trigger conditions */
for (synchro = (mca_gpr_replica_synchro_list_t*)ompi_list_get_first(&seg->synchros);
synchro != (mca_gpr_replica_synchro_list_t*)ompi_list_get_end(&seg->synchros);
synchro = (mca_gpr_replica_synchro_list_t*)ompi_list_get_next(synchro)) {
if (gpr_replica_check_key_list(synchro->addr_mode, synchro->num_keys, synchro->keys,
/* update trigger list and check for trigger conditions */
for (trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_first(&seg->triggers);
trig != (mca_gpr_replica_trigger_list_t*)ompi_list_get_end(&seg->triggers);
trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_next(trig)) {
if (gpr_replica_check_key_list(trig->addr_mode, trig->num_keys, trig->keys,
num_tokens, keys)) {
synchro->count++;
trig->count++;
}
if ((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & synchro->synch_mode && synchro->count >= synchro->trigger) ||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & synchro->synch_mode && synchro->count == synchro->trigger)) {
if ((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & trig->synch_mode && trig->count >= trig->trigger) ||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger) ||
(OMPI_REGISTRY_NOTIFY_ALL & trig->action) ||
(OMPI_REGISTRY_NOTIFY_ADD_ENTRY & trig->action) ||
(OMPI_REGISTRY_NOTIFY_MODIFICATION & trig->action && OMPI_REGISTRY_OVERWRITE & put_mode)) {
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
gpr_replica_process_triggers(notify_msg, synchro->id_tag);
gpr_replica_process_triggers(seg, trig, notify_msg);
}
}
CLEANUP:
/* release list of keys */
while (NULL != (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_remove_last(keylist))) {
OBJ_RELEASE(keyptr);
if (mca_gpr_replica_debug) {
ompi_output(0, "put: releasing list of keys");
}
/* release list of keys */
OBJ_RELEASE(keylist);
return return_code;
@ -161,11 +163,12 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
ompi_list_t *keylist;
mca_gpr_replica_key_t *keys, *key2;
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_synchro_list_t *synchro;
int num_tokens, return_code;
mca_gpr_replica_trigger_list_t *trig;
ompi_registry_notify_message_t *notify_msg;
int num_tokens;
keys = NULL;
return_code = OMPI_ERROR;
/* protect against errors */
if (NULL == segment) {
@ -192,6 +195,7 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
keyptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(keylist);
keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(keyptr)) {
if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */
return_code = OMPI_ERROR;
goto CLEANUP;
}
*key2 = keyptr->key;
@ -213,28 +217,33 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
}
}
return_code = OMPI_SUCCESS;
/* update synchro list and check for trigger conditions */
for (synchro = (mca_gpr_replica_synchro_list_t*)ompi_list_get_first(&seg->synchros);
synchro != (mca_gpr_replica_synchro_list_t*)ompi_list_get_end(&seg->synchros);
synchro = (mca_gpr_replica_synchro_list_t*)ompi_list_get_next(synchro)) {
if (gpr_replica_check_key_list(synchro->addr_mode, synchro->num_keys, synchro->keys,
for (trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_first(&seg->triggers);
trig != (mca_gpr_replica_trigger_list_t*)ompi_list_get_end(&seg->triggers);
trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_next(trig)) {
if (gpr_replica_check_key_list(trig->addr_mode, trig->num_keys, trig->keys,
num_tokens, keys)) {
synchro->count--;
trig->count--;
}
if ((OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING & synchro->synch_mode && synchro->count <= synchro->trigger) ||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & synchro->synch_mode && synchro->count == synchro->trigger)) {
if ((OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING & trig->synch_mode && trig->count <= trig->trigger) ||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger) ||
(OMPI_REGISTRY_NOTIFY_ALL & trig->action) ||
(OMPI_REGISTRY_NOTIFY_DELETE_ENTRY & trig->action)) {
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
gpr_replica_process_triggers(notify_msg, synchro->id_tag);
gpr_replica_process_triggers(seg, trig, notify_msg);
}
}
return OMPI_SUCCESS;
CLEANUP:
OBJ_RELEASE(keylist);
if (NULL != keys) {
free(keys);
}
return OMPI_ERROR;
return return_code;
}
ompi_list_t* gpr_replica_index(char *segment)
@ -274,12 +283,43 @@ ompi_list_t* gpr_replica_index(char *segment)
return answer;
}
int gpr_replica_subscribe(ompi_registry_mode_t mode,
int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
{
return OMPI_ERR_NOT_IMPLEMENTED;
mca_gpr_notify_request_tracker_t *trackptr;
mca_gpr_idtag_list_t *ptr_free_id;
/* protect against errors */
if (NULL == segment) {
return OMPI_ERROR;
}
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
trackptr->requestor = NULL;
trackptr->req_tag = 0;
trackptr->callback = cb_func;
trackptr->user_tag = user_tag;
if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) {
trackptr->id_tag = mca_gpr_replica_last_notify_id_tag;
mca_gpr_replica_last_notify_id_tag++;
} else {
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
/* construct the trigger - add to notify tracking system if success, otherwise dump */
if (OMPI_SUCCESS == gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
addr_mode, segment, tokens,
0, trackptr->id_tag)) {
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
return OMPI_SUCCESS;
} else {
OBJ_RELEASE(trackptr);
return OMPI_ERROR;
}
}
int gpr_replica_unsubscribe(ompi_registry_mode_t mode,
@ -316,10 +356,10 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
trackptr->synchro = synchro_mode;
/* construct the synchro - add to notify tracking system if success, otherwise dump */
if (OMPI_SUCCESS == gpr_replica_construct_synchro(synchro_mode, addr_mode, segment, tokens,
/* construct the trigger - add to notify tracking system if success, otherwise dump */
if (OMPI_SUCCESS == gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
addr_mode, segment, tokens,
trigger, trackptr->id_tag)) {
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
return OMPI_SUCCESS;
@ -396,10 +436,8 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
CLEANUP:
/* release list of keys */
while (NULL != (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(keylist))) {
OBJ_RELEASE(keyptr);
}
OBJ_RELEASE(keylist);
if (NULL != keys) {
free(keys);
}

Просмотреть файл

@ -68,41 +68,26 @@ typedef struct mca_gpr_replica_keylist_t mca_gpr_replica_keylist_t;
OBJ_CLASS_DECLARATION(mca_gpr_replica_keylist_t);
/** List of subscribers to objects on a segment.
* Each segment can have an arbitrary number of subscribers desiring notification
* upon specified actions being performed against the objects on the segment. This structure is
* used to create a linked list of subscribers.
*/
struct mca_gpr_replica_subscriber_list_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
ompi_registry_mode_t addr_mode; /**< Addressing mode for subscription */
ompi_registry_notify_action_t action; /**< Bit-mask of actions that trigger notification */
mca_gpr_replica_key_t num_keys; /**< Number of keys in array */
mca_gpr_replica_key_t *keys; /**< Array of keys describing subscription */
mca_gpr_notify_id_t id_tag; /**< Tag into the list of notify structures */
};
typedef struct mca_gpr_replica_subscriber_list_t mca_gpr_replica_subscriber_list_t;
OBJ_CLASS_DECLARATION(mca_gpr_replica_subscriber_list_t);
/** List of synchro actions for a segment.
/** List of trigger actions for a segment.
* Each object can have an arbitrary number of subscribers desiring notification
* upon specified actions being performed against the object. This structure is
* used to create a linked list of subscribers for objects.
* used to create a linked list of subscribers for objects. Both synchro and
* non-synchro triggers are supported.
*/
struct mca_gpr_replica_synchro_list_t {
struct mca_gpr_replica_trigger_list_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
ompi_registry_synchro_mode_t synch_mode; /**< Synchro mode - ascending, descending, ... */
ompi_registry_mode_t addr_mode; /**< Addressing mode */
ompi_registry_notify_action_t action; /**< Bit-mask of actions that trigger non-synchro notification */
ompi_registry_mode_t addr_mode; /**< Addressing mode */
mca_gpr_replica_key_t num_keys; /**< Number of keys in array */
mca_gpr_replica_key_t *keys; /**< Array of keys describing objects to be counted */
uint32_t trigger; /**< Number of objects that trigger notification */
uint32_t count; /**< Number of qualifying objects currently in segment */
mca_gpr_notify_id_t id_tag; /**< Tag into the list of notify structures */
};
typedef struct mca_gpr_replica_synchro_list_t mca_gpr_replica_synchro_list_t;
typedef struct mca_gpr_replica_trigger_list_t mca_gpr_replica_trigger_list_t;
OBJ_CLASS_DECLARATION(mca_gpr_replica_synchro_list_t);
OBJ_CLASS_DECLARATION(mca_gpr_replica_trigger_list_t);
/** List of replicas that hold a stored object.
* Each object can have an arbitrary number of replicas that hold a copy
@ -172,8 +157,7 @@ struct mca_gpr_replica_segment_t {
mca_gpr_replica_key_t segment; /**< Key corresponding to name of registry segment */
mca_gpr_replica_key_t lastkey; /**< Highest key value used */
ompi_list_t registry_entries; /**< Linked list of stored objects within this segment */
ompi_list_t subscriber; /**< List of subscriptions to objects on this segment */
ompi_list_t synchros; /**< List of synchro requests on this segment */
ompi_list_t triggers; /**< List of triggers on this segment */
ompi_list_t keytable; /**< Token-key dictionary for this segment */
ompi_list_t freekeys; /**< List of keys that have been made available */
};
@ -189,6 +173,7 @@ extern mca_gpr_replica_t mca_gpr_replica_head;
extern ompi_list_t mca_gpr_replica_notify_request_tracker;
extern mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
extern ompi_list_t mca_gpr_replica_free_notify_id_tags;
extern int mca_gpr_replica_debug;
/*
* Module open / close

Просмотреть файл

@ -21,6 +21,7 @@
#include "util/pack.h"
#include "mca/mca.h"
#include "mca/base/mca_base_param.h"
#include "mca/oob/base/base.h"
#include "mca/gpr/base/base.h"
#include "gpr_replica.h"
@ -76,6 +77,7 @@ mca_gpr_replica_t mca_gpr_replica_head;
ompi_list_t mca_gpr_replica_notify_request_tracker;
mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
ompi_list_t mca_gpr_replica_free_notify_id_tags;
int mca_gpr_replica_debug;
/* constructor - used to initialize state of keytable instance */
@ -88,8 +90,17 @@ static void mca_gpr_replica_keytable_construct(mca_gpr_replica_keytable_t* keyta
/* destructor - used to free any resources held by instance */
static void mca_gpr_replica_keytable_destructor(mca_gpr_replica_keytable_t* keytable)
{
if (NULL != keytable->token) {
free(keytable->token);
mca_gpr_replica_keytable_t *keyptr;
if (mca_gpr_replica_debug) {
ompi_output(0, "entered keytable destructor");
}
while (NULL != (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_remove_first((ompi_list_t*)keytable))) {
if (NULL != keyptr->token) {
free(keyptr->token);
}
OBJ_RELEASE(keyptr);
}
}
@ -120,56 +131,32 @@ OBJ_CLASS_INSTANCE(
mca_gpr_replica_keylist_destructor); /* destructor */
/* constructor - used to initialize state of subscriber list instance */
static void mca_gpr_replica_subscriber_list_construct(mca_gpr_replica_subscriber_list_t* subscriber)
/* constructor - used to initialize state of trigger list instance */
static void mca_gpr_replica_trigger_list_construct(mca_gpr_replica_trigger_list_t* trig)
{
subscriber->addr_mode = OMPI_REGISTRY_NONE;
subscriber->action = OMPI_REGISTRY_NOTIFY_NONE;
subscriber->keys = NULL;
subscriber->id_tag = MCA_GPR_NOTIFY_ID_MAX;
trig->synch_mode = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
trig->action = OMPI_REGISTRY_NOTIFY_NONE;
trig->addr_mode = OMPI_REGISTRY_NONE;
trig->keys = NULL;
trig->trigger = 0;
trig->count = 0;
trig->id_tag = MCA_GPR_NOTIFY_ID_MAX;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_replica_subscriber_list_destructor(mca_gpr_replica_subscriber_list_t* subscriber)
static void mca_gpr_replica_trigger_list_destructor(mca_gpr_replica_trigger_list_t* trig)
{
if (NULL != subscriber->keys) {
free(subscriber->keys);
if (NULL != trig->keys) {
free(trig->keys);
}
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_replica_subscriber_list_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_replica_subscriber_list_construct, /* constructor */
mca_gpr_replica_subscriber_list_destructor); /* destructor */
/* constructor - used to initialize state of synchro list instance */
static void mca_gpr_replica_synchro_list_construct(mca_gpr_replica_synchro_list_t* synchro)
{
synchro->synch_mode = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
synchro->addr_mode = OMPI_REGISTRY_NONE;
synchro->keys = NULL;
synchro->trigger = 0;
synchro->count = 0;
synchro->id_tag = MCA_GPR_NOTIFY_ID_MAX;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_replica_synchro_list_destructor(mca_gpr_replica_synchro_list_t* synchro)
{
if (NULL != synchro->keys) {
free(synchro->keys);
}
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_replica_synchro_list_t, /* type name */
mca_gpr_replica_trigger_list_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_replica_synchro_list_construct, /* constructor */
mca_gpr_replica_synchro_list_destructor); /* destructor */
mca_gpr_replica_trigger_list_construct, /* constructor */
mca_gpr_replica_trigger_list_destructor); /* destructor */
/* constructor - used to initialize state of replica list instance */
@ -246,8 +233,7 @@ static void mca_gpr_replica_segment_construct(mca_gpr_replica_segment_t* seg)
seg->segment = 0;
seg->lastkey = 0;
OBJ_CONSTRUCT(&seg->registry_entries, ompi_list_t);
OBJ_CONSTRUCT(&seg->subscriber, ompi_list_t);
OBJ_CONSTRUCT(&seg->synchros, ompi_list_t);
OBJ_CONSTRUCT(&seg->triggers, ompi_list_t);
OBJ_CONSTRUCT(&seg->keytable, ompi_list_t);
OBJ_CONSTRUCT(&seg->freekeys, ompi_list_t);
}
@ -256,25 +242,23 @@ static void mca_gpr_replica_segment_construct(mca_gpr_replica_segment_t* seg)
static void mca_gpr_replica_segment_destructor(mca_gpr_replica_segment_t* seg)
{
mca_gpr_replica_core_t *reg;
mca_gpr_replica_subscriber_list_t *sub;
mca_gpr_replica_synchro_list_t *syn;
mca_gpr_replica_trigger_list_t *tr;
mca_gpr_replica_keytable_t *kt;
mca_gpr_replica_keylist_t *kl;
if (mca_gpr_replica_debug) {
ompi_output(0, "entered segment destructor");
}
while (NULL != (reg = (mca_gpr_replica_core_t*)ompi_list_remove_first(&seg->registry_entries))) {
OBJ_RELEASE(reg);
}
OBJ_DESTRUCT(&seg->registry_entries);
while (NULL != (sub = (mca_gpr_replica_subscriber_list_t*)ompi_list_remove_first(&seg->subscriber))) {
OBJ_RELEASE(sub);
while (NULL != (tr = (mca_gpr_replica_trigger_list_t*)ompi_list_remove_first(&seg->triggers))) {
OBJ_RELEASE(tr);
}
OBJ_DESTRUCT(&seg->subscriber);
while (NULL != (syn = (mca_gpr_replica_synchro_list_t*)ompi_list_remove_first(&seg->synchros))) {
OBJ_RELEASE(syn);
}
OBJ_DESTRUCT(&seg->synchros);
OBJ_DESTRUCT(&seg->triggers);
while (NULL != (kt = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&seg->keytable))) {
OBJ_RELEASE(kt);
@ -301,6 +285,11 @@ OBJ_CLASS_INSTANCE(
*/
int mca_gpr_replica_open(void)
{
int id;
id = mca_base_param_register_int("gpr", "replica", "debug", NULL, 0);
mca_base_param_lookup_int(id, &mca_gpr_replica_debug);
return OMPI_SUCCESS;
}
@ -731,10 +720,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
trackptr->synchro = synchro_mode;
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
response = (int32_t)gpr_replica_construct_synchro(synchro_mode, mode, segment, tokens,
response = (int32_t)gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
mode, segment, tokens,
trigger, trackptr->id_tag);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {

Просмотреть файл

@ -409,32 +409,34 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t addr_mode,
return false;
}
int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode,
int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_notify_action_t action,
ompi_registry_mode_t addr_mode,
char *segment, char **tokens, int trigger,
mca_gpr_notify_id_t id_tag)
{
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_synchro_list_t *synch;
mca_gpr_replica_trigger_list_t *trig;
char **tokptr;
mca_gpr_replica_key_t *keyptr;
int i, num_tokens;
seg = gpr_replica_find_seg(true, segment);
if (NULL == seg) { /* couldn't find segment */
if (NULL == seg) { /* couldn't find or create segment */
return OMPI_ERROR;
}
synch = OBJ_NEW(mca_gpr_replica_synchro_list_t);
trig = OBJ_NEW(mca_gpr_replica_trigger_list_t);
synch->synch_mode = synchro_mode;
synch->addr_mode = addr_mode;
synch->trigger = trigger;
synch->count = 0;
synch->id_tag = id_tag;
trig->synch_mode = synchro_mode;
trig->action = action;
trig->addr_mode = addr_mode;
trig->trigger = trigger;
trig->count = 0;
trig->id_tag = id_tag;
synch->num_keys = 0;
synch->keys = NULL;
trig->num_keys = 0;
trig->keys = NULL;
if (NULL != tokens) { /* tokens provided */
@ -445,8 +447,8 @@ int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode,
num_tokens++;
tokptr++;
}
synch->keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t));
keyptr = synch->keys;
trig->keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t));
keyptr = trig->keys;
/* store key values of tokens, defining them if needed */
for (i=0, tokptr=tokens; NULL != tokptr && NULL != *tokptr; i++, tokptr++) {
*keyptr = gpr_replica_get_key(segment, *tokptr);
@ -455,10 +457,10 @@ int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode,
}
keyptr++;
}
synch->num_keys = num_tokens;
trig->num_keys = num_tokens;
}
ompi_list_append(&seg->synchros, &synch->item);
ompi_list_append(&seg->triggers, &trig->item);
return OMPI_SUCCESS;
@ -513,8 +515,9 @@ ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_regist
return msg;
}
void gpr_replica_process_triggers(ompi_registry_notify_message_t *message,
mca_gpr_notify_id_t id_tag)
void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg,
mca_gpr_replica_trigger_list_t *trig,
ompi_registry_notify_message_t *message)
{
mca_gpr_notify_request_tracker_t *trackptr, *tmpptr;
ompi_registry_object_t *data;
@ -532,7 +535,7 @@ void gpr_replica_process_triggers(ompi_registry_notify_message_t *message,
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker);
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
if (trackptr->id_tag == id_tag) {
if (trackptr->id_tag == trig->id_tag) {
found = true;
break;
}
@ -561,9 +564,13 @@ void gpr_replica_process_triggers(ompi_registry_notify_message_t *message,
}
/* if one-shot, remove request from tracking system */
if (OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT & trackptr->synchro) {
if (OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT & trig->synch_mode) {
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
OBJ_RELEASE(trackptr);
/* ....and from the corresponding registry segment */
ompi_list_remove_item(&seg->triggers, &trig->item);
OBJ_RELEASE(trig);
}
}

Просмотреть файл

@ -76,7 +76,8 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode,
mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment);
int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode,
int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_notify_action_t action,
ompi_registry_mode_t addr_mode,
char *segment, char **tokens, int trigger,
mca_gpr_notify_id_t id_tag);
@ -84,5 +85,6 @@ int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode,
char *segment, char **tokens);
void gpr_replica_process_triggers(ompi_registry_notify_message_t *message,
mca_gpr_notify_id_t id_tag);
void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg,
mca_gpr_replica_trigger_list_t *trig,
ompi_registry_notify_message_t *message);