Some upgrades to the registry system to help poor ol' Tim:
1. added a NOTIFY_PRE_EXIST flag that tells subscribe to check the registry for pre-existing entries that meet the specified conditions and return them 2. synchro now initializes its trigger count to the number of entries that meet conditions at the time synchro is called, instead of zero 3. notify messages include info on what caused the trigger event. This commit was SVN r2418.
Этот коммит содержится в:
родитель
7b55e46f83
Коммит
0241d07866
@ -142,6 +142,8 @@ OBJ_CLASS_INSTANCE(
|
||||
static void ompi_registry_notify_message_construct(ompi_registry_notify_message_t* msg)
|
||||
{
|
||||
OBJ_CONSTRUCT(&msg->data, ompi_list_t);
|
||||
msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE;
|
||||
msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
|
||||
msg->num_tokens = 0;
|
||||
msg->tokens = NULL;
|
||||
}
|
||||
|
@ -40,6 +40,7 @@
|
||||
#define OMPI_REGISTRY_NOTIFY_ADD_SUBSCRIBER 0x0002 /**< Notifies subscriber when another subscriber added */
|
||||
#define OMPI_REGISTRY_NOTIFY_DELETE_ENTRY 0x0004 /**< Notifies subscriber when object deleted */
|
||||
#define OMPI_REGISTRY_NOTIFY_ADD_ENTRY 0x0008 /**< Notifies subscriber when object added */
|
||||
#define OMPI_REGISTRY_NOTIFY_PRE_EXISTING 0x0010 /**< Send all pre-existing entries that meet conditions */
|
||||
#define OMPI_REGISTRY_NOTIFY_ALL 0xffff /**< Notifies subscriber upon any action */
|
||||
|
||||
typedef uint16_t ompi_registry_notify_action_t;
|
||||
@ -47,10 +48,25 @@ typedef uint16_t ompi_registry_notify_action_t;
|
||||
typedef uint32_t mca_gpr_notify_id_t;
|
||||
#define MCA_GPR_NOTIFY_ID_MAX UINT32_MAX
|
||||
|
||||
/*
|
||||
* Define synchro mode flags
|
||||
*/
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_NONE 0x00 /**< No synchronization */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING 0x01 /**< Notify when trigger is reached, ascending mode */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING 0x02 /**< Notify when trigger is reached, descending mode */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_LEVEL 0x04 /**< Notify when trigger is reached, regardless of direction */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS 0x08 /**< Notify whenever conditions are met */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT 0x10 /**< Fire once, then terminate synchro command */
|
||||
|
||||
typedef uint16_t ompi_registry_synchro_mode_t;
|
||||
|
||||
|
||||
/** Return value for notify requests
|
||||
*/
|
||||
struct ompi_registry_notify_message_t {
|
||||
ompi_list_t data; /**< List of data objects */
|
||||
ompi_registry_notify_action_t trig_action;
|
||||
ompi_registry_synchro_mode_t trig_synchro;
|
||||
uint32_t num_tokens;
|
||||
char **tokens;
|
||||
};
|
||||
@ -76,18 +92,6 @@ typedef void (*ompi_registry_notify_cb_fn_t)(ompi_registry_notify_message_t *not
|
||||
typedef uint16_t ompi_registry_mode_t;
|
||||
|
||||
|
||||
/*
|
||||
* Define synchro mode flags
|
||||
*/
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_NONE 0x00 /**< No synchronization */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING 0x01 /**< Notify when trigger is reached, ascending mode */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING 0x02 /**< Notify when trigger is reached, descending mode */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_LEVEL 0x04 /**< Notify when trigger is reached, regardless of direction */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS 0x08 /**< Notify whenever conditions are met */
|
||||
#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT 0x10 /**< Fire once, then terminate synchro command */
|
||||
|
||||
typedef uint16_t ompi_registry_synchro_mode_t;
|
||||
|
||||
/*
|
||||
* Define flag values for remote commands - only used internally
|
||||
*/
|
||||
|
@ -182,6 +182,14 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender,
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &message->trig_action, 1, MCA_GPR_OOB_PACK_ACTION)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &message->trig_synchro, 1, MCA_GPR_OOB_PACK_SYNCHRO_MODE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_items, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
@ -134,12 +134,18 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
|
||||
trig->count++;
|
||||
}
|
||||
if ((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & trig->synch_mode && trig->count >= trig->trigger) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger) ||
|
||||
(OMPI_REGISTRY_NOTIFY_ALL & trig->action) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger)) {
|
||||
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
|
||||
notify_msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE;
|
||||
notify_msg->trig_synchro = trig->synch_mode;
|
||||
gpr_replica_process_triggers(segment, trig, notify_msg);
|
||||
} else if ((OMPI_REGISTRY_NOTIFY_ALL & trig->action) ||
|
||||
(OMPI_REGISTRY_NOTIFY_ADD_ENTRY & trig->action) ||
|
||||
(OMPI_REGISTRY_NOTIFY_MODIFICATION & trig->action && OMPI_REGISTRY_OVERWRITE & put_mode)) {
|
||||
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
|
||||
gpr_replica_process_triggers(seg, trig, notify_msg);
|
||||
notify_msg->trig_action = trig->action;
|
||||
notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
|
||||
gpr_replica_process_triggers(segment, trig, notify_msg);
|
||||
}
|
||||
}
|
||||
|
||||
@ -228,11 +234,17 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
|
||||
trig->count--;
|
||||
}
|
||||
if ((OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING & trig->synch_mode && trig->count <= trig->trigger) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger) ||
|
||||
(OMPI_REGISTRY_NOTIFY_ALL & trig->action) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger)) {
|
||||
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
|
||||
notify_msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE;
|
||||
notify_msg->trig_synchro = trig->synch_mode;
|
||||
gpr_replica_process_triggers(segment, trig, notify_msg);
|
||||
} else if ((OMPI_REGISTRY_NOTIFY_ALL & trig->action) ||
|
||||
(OMPI_REGISTRY_NOTIFY_DELETE_ENTRY & trig->action)) {
|
||||
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
|
||||
gpr_replica_process_triggers(seg, trig, notify_msg);
|
||||
notify_msg->trig_action = trig->action;
|
||||
notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
|
||||
gpr_replica_process_triggers(segment, trig, notify_msg);
|
||||
}
|
||||
}
|
||||
|
||||
@ -290,6 +302,8 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
|
||||
{
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_idtag_list_t *ptr_free_id;
|
||||
mca_gpr_replica_trigger_list_t *trig;
|
||||
ompi_registry_notify_message_t *notify_msg;
|
||||
|
||||
/* protect against errors */
|
||||
if (NULL == segment) {
|
||||
@ -311,10 +325,17 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
|
||||
}
|
||||
|
||||
/* construct the trigger - add to notify tracking system if success, otherwise dump */
|
||||
if (OMPI_SUCCESS == gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
|
||||
if (NULL != (trig = gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
|
||||
addr_mode, segment, tokens,
|
||||
0, trackptr->id_tag)) {
|
||||
0, trackptr->id_tag))) {
|
||||
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
|
||||
if (OMPI_REGISTRY_NOTIFY_PRE_EXISTING & action) { /* want list of everything there */
|
||||
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
|
||||
notify_msg->trig_action = action;
|
||||
notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
|
||||
gpr_replica_process_triggers(segment, trig, notify_msg);
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
} else {
|
||||
OBJ_RELEASE(trackptr);
|
||||
@ -366,6 +387,8 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
{
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_idtag_list_t *ptr_free_id;
|
||||
mca_gpr_replica_trigger_list_t *trig;
|
||||
ompi_registry_notify_message_t *notify_msg;
|
||||
|
||||
/* protect against errors */
|
||||
if (NULL == segment || 0 > trigger) {
|
||||
@ -387,10 +410,20 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
}
|
||||
|
||||
/* construct the trigger - add to notify tracking system if success, otherwise dump */
|
||||
if (OMPI_SUCCESS == gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
|
||||
if (NULL != (trig = gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
|
||||
addr_mode, segment, tokens,
|
||||
trigger, trackptr->id_tag)) {
|
||||
trigger, trackptr->id_tag))) {
|
||||
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
|
||||
/* if synchro condition already met, construct and send message */
|
||||
if ((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & synchro_mode && trig->count >= trigger) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & synchro_mode && trig->count == trigger) ||
|
||||
(OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING & synchro_mode && trig->count <= trigger)) {
|
||||
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
|
||||
notify_msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE;
|
||||
notify_msg->trig_synchro = trig->synch_mode;
|
||||
gpr_replica_process_triggers(segment, trig, notify_msg);
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
} else {
|
||||
OBJ_RELEASE(trackptr);
|
||||
|
@ -327,29 +327,39 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
|
||||
/* initialize the registry head */
|
||||
OBJ_CONSTRUCT(&mca_gpr_replica_head.registry, ompi_list_t);
|
||||
|
||||
/* ompi_output(0, "registry head setup"); */
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "registry head setup");
|
||||
}
|
||||
|
||||
/* initialize the global dictionary for segment id's */
|
||||
OBJ_CONSTRUCT(&mca_gpr_replica_head.segment_dict, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_gpr_replica_head.freekeys, ompi_list_t);
|
||||
mca_gpr_replica_head.lastkey = 0;
|
||||
|
||||
/* ompi_output(0, "global dict setup"); */
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "global dict setup");
|
||||
}
|
||||
|
||||
/* initialize the notify request tracker */
|
||||
OBJ_CONSTRUCT(&mca_gpr_replica_notify_request_tracker, ompi_list_t);
|
||||
mca_gpr_replica_last_notify_id_tag = 0;
|
||||
OBJ_CONSTRUCT(&mca_gpr_replica_free_notify_id_tags, ompi_list_t);
|
||||
|
||||
/* ompi_output(0, "req tracker setup"); */
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "req tracker setup");
|
||||
}
|
||||
|
||||
/* issue the non-blocking receive */
|
||||
if (!mca_gpr_replica_debug) {
|
||||
rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);
|
||||
if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* ompi_output(0, "nb receive setup"); */
|
||||
if (!mca_gpr_replica_debug) {
|
||||
ompi_output(0, "nb receive setup");
|
||||
}
|
||||
|
||||
/* Return the module */
|
||||
|
||||
@ -1040,6 +1050,15 @@ void gpr_replica_remote_notify(ompi_process_name_t *recipient, int recipient_tag
|
||||
return;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(msg, &message->trig_action, 1, MCA_GPR_OOB_PACK_ACTION)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(msg, &message->trig_synchro, 1, MCA_GPR_OOB_PACK_SYNCHRO_MODE)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
num_items = (int32_t)ompi_list_get_size(&message->data);
|
||||
if (OMPI_SUCCESS != ompi_pack(msg, &num_items, 1, OMPI_INT32)) {
|
||||
return;
|
||||
|
@ -409,13 +409,14 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t addr_mode,
|
||||
return false;
|
||||
}
|
||||
|
||||
int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger,
|
||||
mca_gpr_notify_id_t id_tag)
|
||||
{
|
||||
mca_gpr_replica_segment_t *seg;
|
||||
mca_gpr_replica_core_t *reg;
|
||||
mca_gpr_replica_trigger_list_t *trig;
|
||||
char **tokptr;
|
||||
mca_gpr_replica_key_t *keyptr;
|
||||
@ -423,7 +424,7 @@ int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
|
||||
seg = gpr_replica_find_seg(true, segment);
|
||||
if (NULL == seg) { /* couldn't find or create segment */
|
||||
return OMPI_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
trig = OBJ_NEW(mca_gpr_replica_trigger_list_t);
|
||||
@ -460,9 +461,20 @@ int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
trig->num_keys = num_tokens;
|
||||
}
|
||||
|
||||
/* traverse segment entries and initialize trigger count */
|
||||
for (reg = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries);
|
||||
reg != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries);
|
||||
reg = (mca_gpr_replica_core_t*)ompi_list_get_next(reg)) {
|
||||
if (gpr_replica_check_key_list(addr_mode, trig->num_keys, trig->keys,
|
||||
reg->num_keys, reg->keys)) {
|
||||
trig->count++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ompi_list_append(&seg->triggers, &trig->item);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return trig;
|
||||
|
||||
}
|
||||
|
||||
@ -544,6 +556,7 @@ mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t sync
|
||||
return MCA_GPR_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
|
||||
ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode, char *segment, char **tokens)
|
||||
{
|
||||
ompi_list_t *reg_entries;
|
||||
@ -592,10 +605,11 @@ ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_regist
|
||||
return msg;
|
||||
}
|
||||
|
||||
void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg,
|
||||
void gpr_replica_process_triggers(char *segment,
|
||||
mca_gpr_replica_trigger_list_t *trig,
|
||||
ompi_registry_notify_message_t *message)
|
||||
{
|
||||
mca_gpr_replica_segment_t *seg;
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
ompi_registry_object_t *data;
|
||||
char **tokptr;
|
||||
@ -603,7 +617,12 @@ void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg,
|
||||
bool found;
|
||||
|
||||
/* protect against errors */
|
||||
if (NULL == message) {
|
||||
if (NULL == message || NULL == segment) {
|
||||
return;
|
||||
}
|
||||
|
||||
seg = gpr_replica_find_seg(false, segment);
|
||||
if (NULL == seg) { /* couldn't find segment */
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -76,7 +76,7 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode,
|
||||
|
||||
mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment);
|
||||
|
||||
int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger,
|
||||
@ -85,7 +85,7 @@ int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens);
|
||||
|
||||
void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg,
|
||||
void gpr_replica_process_triggers(char *segment,
|
||||
mca_gpr_replica_trigger_list_t *trig,
|
||||
ompi_registry_notify_message_t *message);
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user