diff --git a/src/mca/gpr/base/gpr_base_open.c b/src/mca/gpr/base/gpr_base_open.c index dcf0d8585d..9992f947c4 100644 --- a/src/mca/gpr/base/gpr_base_open.c +++ b/src/mca/gpr/base/gpr_base_open.c @@ -142,6 +142,8 @@ OBJ_CLASS_INSTANCE( static void ompi_registry_notify_message_construct(ompi_registry_notify_message_t* msg) { OBJ_CONSTRUCT(&msg->data, ompi_list_t); + msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE; + msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE; msg->num_tokens = 0; msg->tokens = NULL; } diff --git a/src/mca/gpr/gpr.h b/src/mca/gpr/gpr.h index 5bd65824f9..b792c2ab8c 100644 --- a/src/mca/gpr/gpr.h +++ b/src/mca/gpr/gpr.h @@ -40,6 +40,7 @@ #define OMPI_REGISTRY_NOTIFY_ADD_SUBSCRIBER 0x0002 /**< Notifies subscriber when another subscriber added */ #define OMPI_REGISTRY_NOTIFY_DELETE_ENTRY 0x0004 /**< Notifies subscriber when object deleted */ #define OMPI_REGISTRY_NOTIFY_ADD_ENTRY 0x0008 /**< Notifies subscriber when object added */ +#define OMPI_REGISTRY_NOTIFY_PRE_EXISTING 0x0010 /**< Send all pre-existing entries that meet conditions */ #define OMPI_REGISTRY_NOTIFY_ALL 0xffff /**< Notifies subscriber upon any action */ typedef uint16_t ompi_registry_notify_action_t; @@ -47,10 +48,25 @@ typedef uint16_t ompi_registry_notify_action_t; typedef uint32_t mca_gpr_notify_id_t; #define MCA_GPR_NOTIFY_ID_MAX UINT32_MAX +/* + * Define synchro mode flags + */ +#define OMPI_REGISTRY_SYNCHRO_MODE_NONE 0x00 /**< No synchronization */ +#define OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING 0x01 /**< Notify when trigger is reached, ascending mode */ +#define OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING 0x02 /**< Notify when trigger is reached, descending mode */ +#define OMPI_REGISTRY_SYNCHRO_MODE_LEVEL 0x04 /**< Notify when trigger is reached, regardless of direction */ +#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS 0x08 /**< Notify whenever conditions are met */ +#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT 0x10 /**< Fire once, then terminate synchro command */ + +typedef uint16_t ompi_registry_synchro_mode_t; + + /** Return value for notify requests */ struct ompi_registry_notify_message_t { ompi_list_t data; /**< List of data objects */ + ompi_registry_notify_action_t trig_action; + ompi_registry_synchro_mode_t trig_synchro; uint32_t num_tokens; char **tokens; }; @@ -76,18 +92,6 @@ typedef void (*ompi_registry_notify_cb_fn_t)(ompi_registry_notify_message_t *not typedef uint16_t ompi_registry_mode_t; -/* - * Define synchro mode flags - */ -#define OMPI_REGISTRY_SYNCHRO_MODE_NONE 0x00 /**< No synchronization */ -#define OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING 0x01 /**< Notify when trigger is reached, ascending mode */ -#define OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING 0x02 /**< Notify when trigger is reached, descending mode */ -#define OMPI_REGISTRY_SYNCHRO_MODE_LEVEL 0x04 /**< Notify when trigger is reached, regardless of direction */ -#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS 0x08 /**< Notify whenever conditions are met */ -#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT 0x10 /**< Fire once, then terminate synchro command */ - -typedef uint16_t ompi_registry_synchro_mode_t; - /* * Define flag values for remote commands - only used internally */ diff --git a/src/mca/gpr/proxy/gpr_proxy_component.c b/src/mca/gpr/proxy/gpr_proxy_component.c index d5734ab4c8..cfd91d8348 100644 --- a/src/mca/gpr/proxy/gpr_proxy_component.c +++ b/src/mca/gpr/proxy/gpr_proxy_component.c @@ -182,6 +182,14 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } + if (OMPI_SUCCESS != ompi_unpack(buffer, &message->trig_action, 1, MCA_GPR_OOB_PACK_ACTION)) { + goto RETURN_ERROR; + } + + if (OMPI_SUCCESS != ompi_unpack(buffer, &message->trig_synchro, 1, MCA_GPR_OOB_PACK_SYNCHRO_MODE)) { + goto RETURN_ERROR; + } + if (OMPI_SUCCESS != ompi_unpack(buffer, &num_items, 1, OMPI_INT32)) { goto RETURN_ERROR; } diff --git a/src/mca/gpr/replica/gpr_replica.c b/src/mca/gpr/replica/gpr_replica.c index f232c87cd4..b491025ff6 100644 --- a/src/mca/gpr/replica/gpr_replica.c +++ b/src/mca/gpr/replica/gpr_replica.c @@ -134,12 +134,18 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment, trig->count++; } if ((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & trig->synch_mode && trig->count >= trig->trigger) || - (OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger) || - (OMPI_REGISTRY_NOTIFY_ALL & trig->action) || + (OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger)) { + notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens); + notify_msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE; + notify_msg->trig_synchro = trig->synch_mode; + gpr_replica_process_triggers(segment, trig, notify_msg); + } else if ((OMPI_REGISTRY_NOTIFY_ALL & trig->action) || (OMPI_REGISTRY_NOTIFY_ADD_ENTRY & trig->action) || (OMPI_REGISTRY_NOTIFY_MODIFICATION & trig->action && OMPI_REGISTRY_OVERWRITE & put_mode)) { notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens); - gpr_replica_process_triggers(seg, trig, notify_msg); + notify_msg->trig_action = trig->action; + notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE; + gpr_replica_process_triggers(segment, trig, notify_msg); } } @@ -228,11 +234,17 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode, trig->count--; } if ((OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING & trig->synch_mode && trig->count <= trig->trigger) || - (OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger) || - (OMPI_REGISTRY_NOTIFY_ALL & trig->action) || + (OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger)) { + notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens); + notify_msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE; + notify_msg->trig_synchro = trig->synch_mode; + gpr_replica_process_triggers(segment, trig, notify_msg); + } else if ((OMPI_REGISTRY_NOTIFY_ALL & trig->action) || (OMPI_REGISTRY_NOTIFY_DELETE_ENTRY & trig->action)) { notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens); - gpr_replica_process_triggers(seg, trig, notify_msg); + notify_msg->trig_action = trig->action; + notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE; + gpr_replica_process_triggers(segment, trig, notify_msg); } } @@ -290,6 +302,8 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode, { mca_gpr_notify_request_tracker_t *trackptr; mca_gpr_idtag_list_t *ptr_free_id; + mca_gpr_replica_trigger_list_t *trig; + ompi_registry_notify_message_t *notify_msg; /* protect against errors */ if (NULL == segment) { @@ -311,10 +325,17 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode, } /* construct the trigger - add to notify tracking system if success, otherwise dump */ - if (OMPI_SUCCESS == gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, + if (NULL != (trig = gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, addr_mode, segment, tokens, - 0, trackptr->id_tag)) { + 0, trackptr->id_tag))) { ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + + if (OMPI_REGISTRY_NOTIFY_PRE_EXISTING & action) { /* want list of everything there */ + notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens); + notify_msg->trig_action = action; + notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE; + gpr_replica_process_triggers(segment, trig, notify_msg); + } return OMPI_SUCCESS; } else { OBJ_RELEASE(trackptr); @@ -366,6 +387,8 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode, { mca_gpr_notify_request_tracker_t *trackptr; mca_gpr_idtag_list_t *ptr_free_id; + mca_gpr_replica_trigger_list_t *trig; + ompi_registry_notify_message_t *notify_msg; /* protect against errors */ if (NULL == segment || 0 > trigger) { @@ -387,10 +410,20 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode, } /* construct the trigger - add to notify tracking system if success, otherwise dump */ - if (OMPI_SUCCESS == gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, + if (NULL != (trig = gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, addr_mode, segment, tokens, - trigger, trackptr->id_tag)) { + trigger, trackptr->id_tag))) { ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + + /* if synchro condition already met, construct and send message */ + if ((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & synchro_mode && trig->count >= trigger) || + (OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & synchro_mode && trig->count == trigger) || + (OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING & synchro_mode && trig->count <= trigger)) { + notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens); + notify_msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE; + notify_msg->trig_synchro = trig->synch_mode; + gpr_replica_process_triggers(segment, trig, notify_msg); + } return OMPI_SUCCESS; } else { OBJ_RELEASE(trackptr); diff --git a/src/mca/gpr/replica/gpr_replica_component.c b/src/mca/gpr/replica/gpr_replica_component.c index 1b6eb60696..5db075e80a 100644 --- a/src/mca/gpr/replica/gpr_replica_component.c +++ b/src/mca/gpr/replica/gpr_replica_component.c @@ -327,29 +327,39 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool /* initialize the registry head */ OBJ_CONSTRUCT(&mca_gpr_replica_head.registry, ompi_list_t); - /* ompi_output(0, "registry head setup"); */ + if (mca_gpr_replica_debug) { + ompi_output(0, "registry head setup"); + } /* initialize the global dictionary for segment id's */ OBJ_CONSTRUCT(&mca_gpr_replica_head.segment_dict, ompi_list_t); OBJ_CONSTRUCT(&mca_gpr_replica_head.freekeys, ompi_list_t); mca_gpr_replica_head.lastkey = 0; - /* ompi_output(0, "global dict setup"); */ + if (mca_gpr_replica_debug) { + ompi_output(0, "global dict setup"); + } /* initialize the notify request tracker */ OBJ_CONSTRUCT(&mca_gpr_replica_notify_request_tracker, ompi_list_t); mca_gpr_replica_last_notify_id_tag = 0; OBJ_CONSTRUCT(&mca_gpr_replica_free_notify_id_tags, ompi_list_t); - /* ompi_output(0, "req tracker setup"); */ + if (mca_gpr_replica_debug) { + ompi_output(0, "req tracker setup"); + } /* issue the non-blocking receive */ - rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); - if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { - return NULL; - } + if (!mca_gpr_replica_debug) { + rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); + if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { + return NULL; + } + } - /* ompi_output(0, "nb receive setup"); */ + if (!mca_gpr_replica_debug) { + ompi_output(0, "nb receive setup"); + } /* Return the module */ @@ -1040,6 +1050,15 @@ void gpr_replica_remote_notify(ompi_process_name_t *recipient, int recipient_tag return; } + if (OMPI_SUCCESS != ompi_pack(msg, &message->trig_action, 1, MCA_GPR_OOB_PACK_ACTION)) { + return; + } + + if (OMPI_SUCCESS != ompi_pack(msg, &message->trig_synchro, 1, MCA_GPR_OOB_PACK_SYNCHRO_MODE)) { + return; + } + + num_items = (int32_t)ompi_list_get_size(&message->data); if (OMPI_SUCCESS != ompi_pack(msg, &num_items, 1, OMPI_INT32)) { return; diff --git a/src/mca/gpr/replica/gpr_replica_internals.c b/src/mca/gpr/replica/gpr_replica_internals.c index 7fef4b48d2..e24a706bd9 100644 --- a/src/mca/gpr/replica/gpr_replica_internals.c +++ b/src/mca/gpr/replica/gpr_replica_internals.c @@ -409,13 +409,14 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t addr_mode, return false; } -int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode, - ompi_registry_notify_action_t action, - ompi_registry_mode_t addr_mode, - char *segment, char **tokens, int trigger, - mca_gpr_notify_id_t id_tag) +mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode, + ompi_registry_notify_action_t action, + ompi_registry_mode_t addr_mode, + char *segment, char **tokens, int trigger, + mca_gpr_notify_id_t id_tag) { mca_gpr_replica_segment_t *seg; + mca_gpr_replica_core_t *reg; mca_gpr_replica_trigger_list_t *trig; char **tokptr; mca_gpr_replica_key_t *keyptr; @@ -423,7 +424,7 @@ int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode, seg = gpr_replica_find_seg(true, segment); if (NULL == seg) { /* couldn't find or create segment */ - return OMPI_ERROR; + return NULL; } trig = OBJ_NEW(mca_gpr_replica_trigger_list_t); @@ -460,9 +461,20 @@ int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode, trig->num_keys = num_tokens; } + /* traverse segment entries and initialize trigger count */ + for (reg = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries); + reg != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries); + reg = (mca_gpr_replica_core_t*)ompi_list_get_next(reg)) { + if (gpr_replica_check_key_list(addr_mode, trig->num_keys, trig->keys, + reg->num_keys, reg->keys)) { + trig->count++; + } + } + + ompi_list_append(&seg->triggers, &trig->item); - return OMPI_SUCCESS; + return trig; } @@ -544,6 +556,7 @@ mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t sync return MCA_GPR_NOTIFY_ID_MAX; } + ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode, char *segment, char **tokens) { ompi_list_t *reg_entries; @@ -592,10 +605,11 @@ ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_regist return msg; } -void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg, +void gpr_replica_process_triggers(char *segment, mca_gpr_replica_trigger_list_t *trig, ompi_registry_notify_message_t *message) { + mca_gpr_replica_segment_t *seg; mca_gpr_notify_request_tracker_t *trackptr; ompi_registry_object_t *data; char **tokptr; @@ -603,7 +617,12 @@ void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg, bool found; /* protect against errors */ - if (NULL == message) { + if (NULL == message || NULL == segment) { + return; + } + + seg = gpr_replica_find_seg(false, segment); + if (NULL == seg) { /* couldn't find segment */ return; } diff --git a/src/mca/gpr/replica/gpr_replica_internals.h b/src/mca/gpr/replica/gpr_replica_internals.h index 4250a95176..f8a8d3bbc1 100644 --- a/src/mca/gpr/replica/gpr_replica_internals.h +++ b/src/mca/gpr/replica/gpr_replica_internals.h @@ -76,16 +76,16 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment); -int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode, - ompi_registry_notify_action_t action, - ompi_registry_mode_t addr_mode, - char *segment, char **tokens, int trigger, - mca_gpr_notify_id_t id_tag); +mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode, + ompi_registry_notify_action_t action, + ompi_registry_mode_t addr_mode, + char *segment, char **tokens, int trigger, + mca_gpr_notify_id_t id_tag); ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode, char *segment, char **tokens); -void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg, +void gpr_replica_process_triggers(char *segment, mca_gpr_replica_trigger_list_t *trig, ompi_registry_notify_message_t *message);