From f1ab634fabf5289ae95970eae94351d7756999d5 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Fri, 27 Aug 2004 05:23:04 +0000 Subject: [PATCH] Massive update of the registry system to incorporate publish/subscribe notifications, including new "synchro" function that allows a barrier-like operation to be performed on a registry segment. Corresponding update to unit test for replica - system passes, but additional new functionality needs to be added to test. This commit was SVN r2317. --- src/mca/gpr/base/gpr_base_open.c | 76 ++++ src/mca/gpr/gpr.h | 77 +++- src/mca/gpr/proxy/gpr_proxy.c | 94 ++-- src/mca/gpr/proxy/gpr_proxy.h | 40 +- src/mca/gpr/proxy/gpr_proxy_component.c | 124 +++++- src/mca/gpr/replica/gpr_replica.c | 331 +++++++------- src/mca/gpr/replica/gpr_replica.h | 125 +++--- src/mca/gpr/replica/gpr_replica_component.c | 323 +++++++++----- src/mca/gpr/replica/gpr_replica_internals.c | 456 +++++++++++++------- src/mca/gpr/replica/gpr_replica_internals.h | 30 +- test/mca/gpr/test_gpr_replica.c | 22 +- 11 files changed, 1169 insertions(+), 529 deletions(-) diff --git a/src/mca/gpr/base/gpr_base_open.c b/src/mca/gpr/base/gpr_base_open.c index 911708280c..ce87408906 100644 --- a/src/mca/gpr/base/gpr_base_open.c +++ b/src/mca/gpr/base/gpr_base_open.c @@ -93,6 +93,82 @@ OBJ_CLASS_INSTANCE( ompi_registry_internal_test_results_destructor); /* destructor */ +/* constructor - used to initialize notify message instance */ +static void mca_gpr_notify_request_tracker_construct(mca_gpr_notify_request_tracker_t* req) +{ + req->requestor = NULL; + req->req_tag = 0; + req->callback = NULL; + req->user_tag = NULL; + req->id_tag = 0; + req->synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE; +} + +/* destructor - used to free any resources held by instance */ +static void mca_gpr_notify_request_tracker_destructor(mca_gpr_notify_request_tracker_t* req) +{ + if (NULL != req->requestor) { + free(req->requestor); + } +} + +/* define instance of ompi_class_t */ +OBJ_CLASS_INSTANCE( + mca_gpr_notify_request_tracker_t, /* type name */ + ompi_list_item_t, /* parent "class" name */ + mca_gpr_notify_request_tracker_construct, /* constructor */ + mca_gpr_notify_request_tracker_destructor); /* destructor */ + + +/* constructor - used to initialize notify idtag list instance */ +static void mca_gpr_idtag_list_construct(mca_gpr_idtag_list_t* req) +{ + req->id_tag = 0; +} + +/* destructor - used to free any resources held by instance */ +static void mca_gpr_idtag_list_destructor(mca_gpr_idtag_list_t* req) +{ +} + +/* define instance of ompi_class_t */ +OBJ_CLASS_INSTANCE( + mca_gpr_idtag_list_tracker_t, /* type name */ + ompi_list_item_t, /* parent "class" name */ + mca_gpr_idtag_list_construct, /* constructor */ + mca_gpr_idtag_list_destructor); /* destructor */ + + +/* constructor - used to initialize notify message instance */ +static void ompi_registry_notify_message_construct(ompi_registry_notify_message_t* msg) +{ + OBJ_CONSTRUCT(&msg->data, ompi_list_t); + msg->num_tokens = 0; + msg->tokens = NULL; +} + +/* destructor - used to free any resources held by instance */ +static void ompi_registry_notify_message_destructor(ompi_registry_notify_message_t* msg) +{ + uint32_t i; + char **tokptr; + + OBJ_DESTRUCT(&msg->data); + for (i=0, tokptr=msg->tokens; i < msg->num_tokens; i++, tokptr++) { + free(*tokptr); + } + if (NULL != msg->tokens) { + free(msg->tokens); + } +} + +/* define instance of ompi_class_t */ +OBJ_CLASS_INSTANCE( + ompi_registry_notify_message_t, /* type name */ + ompi_list_item_t, /* parent "class" name */ + ompi_registry_notify_message_construct, /* constructor */ + ompi_registry_notify_message_destructor); /* destructor */ + /* * Global variables diff --git a/src/mca/gpr/gpr.h b/src/mca/gpr/gpr.h index 1f5bd1113a..9bc4e15a00 100644 --- a/src/mca/gpr/gpr.h +++ b/src/mca/gpr/gpr.h @@ -39,11 +39,29 @@ #define OMPI_REGISTRY_NOTIFY_ADD_SUBSCRIBER 0x0002 /**< Notifies subscriber when another subscriber added */ #define OMPI_REGISTRY_NOTIFY_DELETE 0x0004 /**< Notifies subscriber when object deleted */ #define OMPI_REGISTRY_NOTIFY_SYNCHRO 0x0008 /**< Indicate that synchro trigger occurred - not valid for subscribe command */ -#define OMPI_REGISTRY_NOTIFY_MESSAGE 0x0010 /**< Indicates a notify message */ #define OMPI_REGISTRY_NOTIFY_ALL 0xffff /**< Notifies subscriber upon any action */ typedef uint16_t ompi_registry_notify_action_t; +typedef uint32_t mca_gpr_notify_id_t; +#define MCA_GPR_NOTIFY_ID_MAX UINT32_MAX + +/** Return value for notify requests + */ +struct ompi_registry_notify_message_t { + ompi_list_t data; /**< List of data objects */ + uint32_t num_tokens; + char **tokens; +}; +typedef struct ompi_registry_notify_message_t ompi_registry_notify_message_t; + +OBJ_CLASS_DECLARATION(ompi_registry_notify_message_t); + +/** Notify callback function + */ +typedef void (*ompi_registry_notify_cb_fn_t)(ompi_registry_notify_message_t *notify_msg, void *user_tag); + + /** Define the mode bit-masks for registry operations. */ @@ -57,6 +75,18 @@ typedef uint16_t ompi_registry_notify_action_t; typedef uint16_t ompi_registry_mode_t; +/* + * Define synchro mode flags + */ +#define OMPI_REGISTRY_SYNCHRO_MODE_NONE 0x00 /**< No synchronization */ +#define OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING 0x01 /**< Notify when trigger is reached, ascending mode */ +#define OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING 0x02 /**< Notify when trigger is reached, descending mode */ +#define OMPI_REGISTRY_SYNCHRO_MODE_LEVEL 0x04 /**< Notify when trigger is reached, regardless of direction */ +#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS 0x08 /**< Notify whenever conditions are met */ +#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT 0x10 /**< Fire once, then terminate synchro command */ + +typedef uint16_t ompi_registry_synchro_mode_t; + /* * Define flag values for remote commands - only used internally */ @@ -69,6 +99,7 @@ typedef uint16_t ompi_registry_mode_t; #define MCA_GPR_SYNCHRO_CMD 0x0040 #define MCA_GPR_GET_CMD 0x0080 #define MCA_GPR_TEST_INTERNALS_CMD 0x0100 +#define MCA_GPR_NOTIFY_CMD 0x0200 /**< Indicates a notify message */ #define MCA_GPR_ERROR 0xffff typedef uint16_t mca_gpr_cmd_flag_t; @@ -83,6 +114,7 @@ typedef uint16_t mca_gpr_cmd_flag_t; #define MCA_GPR_OOB_PACK_ACTION OMPI_INT16 #define MCA_GPR_OOB_PACK_MODE OMPI_INT16 #define MCA_GPR_OOB_PACK_OBJECT_SIZE OMPI_INT32 +#define MCA_GPR_OOB_PACK_SYNCHRO_MODE OMPI_INT16 /* @@ -92,7 +124,6 @@ typedef uint16_t mca_gpr_cmd_flag_t; typedef void* ompi_registry_object_t; typedef uint32_t ompi_registry_object_size_t; - /* * structures */ @@ -136,28 +167,52 @@ typedef struct ompi_registry_internal_test_results_t ompi_registry_internal_test OBJ_CLASS_DECLARATION(ompi_registry_internal_test_results_t); +struct mca_gpr_notify_request_tracker_t { + ompi_list_item_t item; + ompi_process_name_t *requestor; + int req_tag; + ompi_registry_notify_cb_fn_t callback; + void *user_tag; + mca_gpr_notify_id_t id_tag; + ompi_registry_synchro_mode_t synchro; +}; +typedef struct mca_gpr_notify_request_tracker_t mca_gpr_notify_request_tracker_t; + +OBJ_CLASS_DECLARATION(mca_gpr_notify_request_tracker_t); + + +struct mca_gpr_idtag_list_t { + ompi_list_item_t item; + mca_gpr_notify_id_t id_tag; +}; +typedef struct mca_gpr_idtag_list_t mca_gpr_idtag_list_t; + +OBJ_CLASS_DECLARATION(mca_gpr_idtag_list_t); + /* * Component functions that MUST be provided */ typedef int (*mca_gpr_base_module_delete_segment_fn_t)(char *segment); typedef int (*mca_gpr_base_module_put_fn_t)(ompi_registry_mode_t mode, char *segment, - char **tokens, ompi_registry_object_t *object, + char **tokens, ompi_registry_object_t object, ompi_registry_object_size_t size); typedef ompi_list_t* (*mca_gpr_base_module_get_fn_t)(ompi_registry_mode_t mode, char *segment, char **tokens); typedef int (*mca_gpr_base_module_delete_fn_t)(ompi_registry_mode_t mode, char *segment, char **tokens); typedef ompi_list_t* (*mca_gpr_base_module_index_fn_t)(char *segment); -typedef int (*mca_gpr_base_module_subscribe_fn_t)(ompi_process_name_t *subscriber, int tag, - ompi_registry_mode_t mode, +typedef int (*mca_gpr_base_module_subscribe_fn_t)(ompi_registry_mode_t mode, ompi_registry_notify_action_t action, - char *segment, char **tokens); -typedef int (*mca_gpr_base_module_unsubscribe_fn_t)(ompi_process_name_t *subscriber, - ompi_registry_mode_t mode, - char *segment, char **tokens); -typedef int (*mca_gpr_base_module_synchro_fn_t)(ompi_process_name_t *subscriber, int tag, + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); +typedef int (*mca_gpr_base_module_unsubscribe_fn_t)(ompi_registry_mode_t mode, + ompi_registry_notify_action_t action, + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); +typedef int (*mca_gpr_base_module_synchro_fn_t)(ompi_registry_synchro_mode_t synchro_mode, ompi_registry_mode_t mode, - char *segment, char **tokens, int num); + char *segment, char **tokens, int trigger, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); /* * test interface for internal functions - optional to provide diff --git a/src/mca/gpr/proxy/gpr_proxy.c b/src/mca/gpr/proxy/gpr_proxy.c index 70e62573a9..b291f2223d 100644 --- a/src/mca/gpr/proxy/gpr_proxy.c +++ b/src/mca/gpr/proxy/gpr_proxy.c @@ -71,7 +71,7 @@ int gpr_proxy_delete_segment(char *segment) int gpr_proxy_put(ompi_registry_mode_t mode, char *segment, - char **tokens, ompi_registry_object_t *object, + char **tokens, ompi_registry_object_t object, ompi_registry_object_size_t size) { ompi_buffer_t cmd; @@ -192,6 +192,7 @@ int gpr_proxy_delete_object(ompi_registry_mode_t mode, /* compute number of tokens */ tokptr = tokens; + num_tokens = 0; while (NULL != *tokptr) { num_tokens++; tokptr++; @@ -315,24 +316,27 @@ ompi_list_t* gpr_proxy_index(char *segment) } -int gpr_proxy_subscribe(ompi_process_name_t *subscriber, int tag, - ompi_registry_mode_t mode, +int gpr_proxy_subscribe(ompi_registry_mode_t mode, ompi_registry_notify_action_t action, - char *segment, char **tokens) + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag) { return OMPI_ERR_NOT_IMPLEMENTED; } -int gpr_proxy_unsubscribe(ompi_process_name_t *subscriber, ompi_registry_mode_t mode, - char *segment, char **tokens) +int gpr_proxy_unsubscribe(ompi_registry_mode_t mode, + ompi_registry_notify_action_t action, + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag) { return OMPI_ERR_NOT_IMPLEMENTED; } -int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag, +int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode, ompi_registry_mode_t mode, - char *segment, char **tokens, int num) + char *segment, char **tokens, int trigger, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag) { ompi_buffer_t cmd; ompi_buffer_t answer; @@ -340,14 +344,19 @@ int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag, char **tokptr; int recv_tag, i; int32_t num_tokens, response; + mca_gpr_notify_request_tracker_t *trackptr; + mca_gpr_idtag_list_t *ptr_free_id; /* need to protect against errors */ - if (NULL == segment || NULL == tokens || NULL == *tokens) { + if (NULL == segment) { return OMPI_ERROR; } command = MCA_GPR_SYNCHRO_CMD; - recv_tag = MCA_OOB_TAG_GPR; + + if (OMPI_REGISTRY_SYNCHRO_MODE_NONE == synchro_mode) { /* not allowed */ + return OMPI_ERROR; + } if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */ return OMPI_ERROR; @@ -357,7 +366,7 @@ int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag, goto CLEANUP; } - response = (int32_t)tag; + response = (int32_t)synchro_mode; if (OMPI_SUCCESS != ompi_pack(cmd, &response, 1, OMPI_INT32)) { goto CLEANUP; } @@ -370,30 +379,57 @@ int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag, goto CLEANUP; } - /* compute number of tokens */ - tokptr = tokens; - while (NULL != *tokptr) { - num_tokens++; - tokptr++; + num_tokens = 0; + if (NULL != tokens) { + /* compute number of tokens */ + tokptr = tokens; + while (NULL != *tokptr) { + num_tokens++; + tokptr++; + } } if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) { goto CLEANUP; } - tokptr = tokens; - for (i=0; irequestor = NULL; + trackptr->req_tag = 0; + trackptr->callback = cb_func; + trackptr->user_tag = user_tag; + if (ompi_list_is_empty(&mca_gpr_proxy_free_notify_id_tags)) { + trackptr->id_tag = mca_gpr_proxy_last_notify_id_tag; + mca_gpr_proxy_last_notify_id_tag++; + } else { + ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_proxy_free_notify_id_tags); + trackptr->id_tag = ptr_free_id->id_tag; + } + trackptr->synchro = synchro_mode; + + if (OMPI_SUCCESS != ompi_pack(cmd, &trackptr->id_tag, 1, OMPI_INT32)) { + /* remove entry from lookup list */ + ompi_list_remove_last(&mca_gpr_proxy_notify_request_tracker); + goto CLEANUP; + } + if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) { goto CLEANUP; } @@ -402,20 +438,24 @@ int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag, goto CLEANUP; } + ompi_buffer_free(cmd); + if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) || (MCA_GPR_SYNCHRO_CMD != command)) { ompi_buffer_free(answer); - goto CLEANUP; + return OMPI_ERROR; } - if (OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) { + if ((OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) || + (OMPI_SUCCESS != response)) { ompi_buffer_free(answer); return OMPI_ERROR; - } else { - ompi_buffer_free(answer); - return OMPI_SUCCESS; } + ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item); + ompi_buffer_free(answer); + return OMPI_SUCCESS; + CLEANUP: ompi_buffer_free(cmd); return OMPI_ERROR; diff --git a/src/mca/gpr/proxy/gpr_proxy.h b/src/mca/gpr/proxy/gpr_proxy.h index 8993f6c6c3..297725123f 100644 --- a/src/mca/gpr/proxy/gpr_proxy.h +++ b/src/mca/gpr/proxy/gpr_proxy.h @@ -26,12 +26,19 @@ int mca_gpr_proxy_close(void); mca_gpr_base_module_t* mca_gpr_proxy_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority); int mca_gpr_proxy_finalize(void); +/* + * proxy-local types + */ + + /* * globals used within proxy component */ extern ompi_process_name_t *mca_gpr_my_replica; - +extern ompi_list_t mca_gpr_proxy_notify_request_tracker; +extern mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag; +extern ompi_list_t mca_gpr_proxy_free_notify_id_tags; /* * Implementation of delete_segment(). @@ -42,7 +49,7 @@ extern ompi_process_name_t *mca_gpr_my_replica; * Implementation of put() */ int gpr_proxy_put(ompi_registry_mode_t mode, char *segment, - char **tokens, ompi_registry_object_t *object, + char **tokens, ompi_registry_object_t object, ompi_registry_object_size_t size); /* @@ -60,29 +67,32 @@ ompi_list_t* gpr_proxy_index(char *segment); /* * Implementation of subscribe() */ -int gpr_proxy_subscribe(ompi_process_name_t *subscriber, int tag, - ompi_registry_mode_t mode, +int gpr_proxy_subscribe(ompi_registry_mode_t mode, ompi_registry_notify_action_t action, - char *segment, char **tokens); + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); -/* - * Implementation of unsubscribe() - */ -int gpr_proxy_unsubscribe(ompi_process_name_t *subscriber, ompi_registry_mode_t mode, - char *segment, char **tokens); +int gpr_proxy_unsubscribe(ompi_registry_mode_t mode, + ompi_registry_notify_action_t action, + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); -/* - * Implementation of synchro() - */ -int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag, +int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode, ompi_registry_mode_t mode, - char *segment, char **tokens, int num); + char *segment, char **tokens, int trigger, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); + /* * Implementation of get() */ ompi_list_t* gpr_proxy_get(ompi_registry_mode_t mode, char *segment, char **tokens); + ompi_list_t* gpr_proxy_test_internals(int level); +void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender, + ompi_buffer_t buffer, int tag, + void* cbdata); + #endif diff --git a/src/mca/gpr/proxy/gpr_proxy_component.c b/src/mca/gpr/proxy/gpr_proxy_component.c index af0bfa62bd..4a40b28210 100644 --- a/src/mca/gpr/proxy/gpr_proxy_component.c +++ b/src/mca/gpr/proxy/gpr_proxy_component.c @@ -57,6 +57,7 @@ static mca_gpr_base_module_t mca_gpr_proxy = { gpr_proxy_test_internals }; + /* * Whether or not we allowed this component to be selected */ @@ -66,6 +67,9 @@ static bool initialized = false; * globals needed within proxy component */ ompi_process_name_t *mca_gpr_my_replica; +ompi_list_t mca_gpr_proxy_notify_request_tracker; +mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag; +ompi_list_t mca_gpr_proxy_free_notify_id_tags; /* @@ -87,31 +91,44 @@ int mca_gpr_proxy_close(void) mca_gpr_base_module_t* mca_gpr_proxy_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority) { + int rc; + /* If we're NOT the seed, then we want to be selected, so do all the setup and return the module */ if (!ompi_process_info.seed) { - /* Return a module (choose an arbitrary, positive priority -- - it's only relevant compared to other ns components). If - we're not the seed, then we don't want to be selected, so - return NULL. */ + /* Return a module (choose an arbitrary, positive priority -- + it's only relevant compared to other ns components). If + we're not the seed, then we don't want to be selected, so + return NULL. */ - *priority = 10; + *priority = 10; - /* We allow multi user threads but don't have any hidden threads */ + /* We allow multi user threads but don't have any hidden threads */ - *allow_multi_user_threads = true; - *have_hidden_threads = false; + *allow_multi_user_threads = true; + *have_hidden_threads = false; - /* define the replica for us to use - for now, use only the seed */ - mca_gpr_my_replica = ompi_name_server.create_process_name(0,0,0); + /* define the replica for us to use - for now, use only the seed */ + mca_gpr_my_replica = ompi_name_server.create_process_name(0,0,0); - /* Return the module */ + /* initialize the notify list */ + OBJ_CONSTRUCT(&mca_gpr_proxy_notify_request_tracker, ompi_list_t); + mca_gpr_proxy_last_notify_id_tag = 0; + OBJ_CONSTRUCT(&mca_gpr_proxy_free_notify_id_tags, ompi_list_t); - initialized = true; - return &mca_gpr_proxy; + /* issue the non-blocking receive */ + rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR_NOTIFY, 0, mca_gpr_proxy_notify_recv, NULL); + if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { + return NULL; + } + + /* Return the module */ + + initialized = true; + return &mca_gpr_proxy; } else { - return NULL; + return NULL; } } @@ -129,3 +146,82 @@ int mca_gpr_proxy_finalize(void) return OMPI_SUCCESS; } + +/* + * handle notify messages from replicas + */ + +void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender, + ompi_buffer_t buffer, int tag, + void* cbdata) +{ + char **tokptr; + mca_gpr_cmd_flag_t command; + int32_t num_items, i, id_tag; + ompi_registry_value_t *regval; + ompi_registry_notify_message_t *message; + bool found; + mca_gpr_notify_request_tracker_t *trackptr; + + message = OBJ_NEW(ompi_registry_notify_message_t); + + if ((OMPI_SUCCESS != ompi_unpack(buffer, &command, 1, MCA_GPR_OOB_PACK_CMD)) || + (MCA_GPR_NOTIFY_CMD != command)) { + goto RETURN_ERROR; + } + + if (OMPI_SUCCESS != ompi_unpack(buffer, &id_tag, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + if (OMPI_SUCCESS != ompi_unpack(buffer, &num_items, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + for (i=0; i < num_items; i++) { + regval = OBJ_NEW(ompi_registry_value_t); + if (OMPI_SUCCESS != ompi_unpack(buffer, ®val->object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) { + goto RETURN_ERROR; + } + if (OMPI_SUCCESS != ompi_unpack(buffer, regval->object, regval->object_size, OMPI_BYTE)) { + goto RETURN_ERROR; + } + ompi_list_append(&message->data, ®val->item); + } + + if (OMPI_SUCCESS != ompi_unpack(buffer, &message->num_tokens, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + message->tokens = (char**)malloc(message->num_tokens*sizeof(char*)); + for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) { + if (OMPI_SUCCESS != ompi_unpack_string(buffer, tokptr)) { + goto RETURN_ERROR; + } + } + + /* find the request corresponding to this notify */ + found = false; + for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker); + trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) && !found; + trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) { + if (trackptr->id_tag == id_tag) { + found = true; + } + } + + if (!found) { /* didn't find request */ + ompi_output(0, "Proxy notification error - received request not found"); + return; + } + + /* process request */ + trackptr->callback(message, trackptr->user_tag); + + /* dismantle message and free memory */ + + RETURN_ERROR: + OBJ_RELEASE(message); + ompi_buffer_free(buffer); + return; +} diff --git a/src/mca/gpr/replica/gpr_replica.c b/src/mca/gpr/replica/gpr_replica.c index f92f6aa926..1d6ea1a710 100644 --- a/src/mca/gpr/replica/gpr_replica.c +++ b/src/mca/gpr/replica/gpr_replica.c @@ -28,9 +28,9 @@ int gpr_replica_delete_segment(char *segment) { - mca_gpr_registry_segment_t *seg; + mca_gpr_replica_segment_t *seg; - seg = gpr_replica_find_seg(segment); + seg = gpr_replica_find_seg(true, segment); if (NULL == seg) { /* couldn't locate segment */ return OMPI_ERROR; @@ -47,65 +47,64 @@ int gpr_replica_delete_segment(char *segment) return OMPI_SUCCESS; } -int gpr_replica_put(ompi_registry_mode_t mode, char *segment, - char **tokens, ompi_registry_object_t *object, +int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment, + char **tokens, ompi_registry_object_t object, ompi_registry_object_size_t size) { - ompi_list_t *keys; - mca_gpr_keytable_t *keyptr; - mca_gpr_keylist_t *new_keyptr; - mca_gpr_registry_segment_t *seg; - mca_gpr_registry_core_t *entry_ptr; + ompi_list_t *keylist; + mca_gpr_replica_keytable_t *keyptr; + mca_gpr_replica_segment_t *seg; + mca_gpr_replica_core_t *entry_ptr; ompi_registry_mode_t put_mode; - mca_gpr_synchro_list_t *synchro; - int return_code; + mca_gpr_replica_synchro_list_t *synchro; + ompi_registry_notify_message_t *notify_msg; + int return_code, num_tokens; + mca_gpr_replica_key_t *keys, *key2; /* protect ourselves against errors */ - if (NULL == segment || NULL == object || 0 == size || NULL == tokens || NULL == *tokens) { + if (NULL == object || 0 == size || NULL == tokens || NULL == *tokens) { return OMPI_ERROR; } - put_mode = mode & OMPI_REGISTRY_OVERWRITE; /* only overwrite permission mode flag allowed */ + put_mode = addr_mode & OMPI_REGISTRY_OVERWRITE; /* only overwrite permission mode flag allowed */ + /* find the segment */ + seg = gpr_replica_find_seg(true, segment); + if (NULL == seg) { /* couldn't find segment or create it */ + return OMPI_ERROR; + } /* convert tokens to list of keys */ - keys = gpr_replica_get_key_list(segment, tokens); - if (0 >= ompi_list_get_size(keys)) { + keylist = gpr_replica_get_key_list(segment, tokens); + if (0 >= (num_tokens = ompi_list_get_size(keylist))) { return OMPI_ERROR; } + keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t)); + key2 = keys; + /* traverse the list to find undefined tokens - get new keys for them */ - for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys); - keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys); - keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) { + for (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(keylist); + keyptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(keylist); + keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(keyptr)) { if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* need to get new key */ keyptr->key = gpr_replica_define_key(segment, keyptr->token); } - } - - /* find the segment */ - seg = gpr_replica_find_seg(segment); - if (NULL == seg) { /* couldn't find segment - try to create it */ - if (0 > gpr_replica_define_segment(segment)) { /* couldn't create it */ - return_code = OMPI_ERROR; - goto CLEANUP; - } - seg = gpr_replica_find_seg(segment); - if (NULL == seg) { /* ok, we tried - time to give up */ - return_code = OMPI_ERROR; - goto CLEANUP; - } + *key2 = keyptr->key; + key2++; } /* see if specified entry already exists */ - for (entry_ptr = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries); - entry_ptr != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries); - entry_ptr = (mca_gpr_registry_core_t*)ompi_list_get_next(entry_ptr)) { - if (gpr_replica_check_key_list(put_mode, keys, &entry_ptr->keys)) { /* found existing entry - overwrite if mode set, else error */ + for (entry_ptr = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries); + entry_ptr != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries); + entry_ptr = (mca_gpr_replica_core_t*)ompi_list_get_next(entry_ptr)) { + if (gpr_replica_check_key_list(put_mode, num_tokens, keys, + entry_ptr->num_keys, entry_ptr->keys)) { + /* found existing entry - overwrite if mode set, else error */ if (put_mode) { /* overwrite enabled */ free(entry_ptr->object); entry_ptr->object_size = size; - entry_ptr->object = (ompi_registry_object_t*)malloc(size); + entry_ptr->object = (ompi_registry_object_t)malloc(size); memcpy(entry_ptr->object, object, size); return_code = OMPI_SUCCESS; goto CLEANUP; @@ -117,14 +116,10 @@ int gpr_replica_put(ompi_registry_mode_t mode, char *segment, } /* no existing entry - create new one */ - entry_ptr = OBJ_NEW(mca_gpr_registry_core_t); - for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys); - keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys); - keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) { - new_keyptr = OBJ_NEW(mca_gpr_keylist_t); - new_keyptr->key = keyptr->key; - ompi_list_append(&entry_ptr->keys, &new_keyptr->item); - } + entry_ptr = OBJ_NEW(mca_gpr_replica_core_t); + entry_ptr->keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t)); + memcpy(entry_ptr->keys, keys, num_tokens*sizeof(mca_gpr_replica_key_t)); + entry_ptr->num_keys = num_tokens; entry_ptr->object_size = size; entry_ptr->object = (ompi_registry_object_t*)malloc(size); memcpy(entry_ptr->object, object, size); @@ -133,105 +128,126 @@ int gpr_replica_put(ompi_registry_mode_t mode, char *segment, return_code = OMPI_SUCCESS; /* update synchro list and check for trigger conditions */ - for (synchro = (mca_gpr_synchro_list_t*)ompi_list_get_first(&seg->synchros); - synchro != (mca_gpr_synchro_list_t*)ompi_list_get_end(&seg->synchros); - synchro = (mca_gpr_synchro_list_t*)ompi_list_get_next(synchro)) { - if (gpr_replica_check_key_list(synchro->mode, keys, &synchro->keys)) { - synchro->present = synchro->present + 1; + for (synchro = (mca_gpr_replica_synchro_list_t*)ompi_list_get_first(&seg->synchros); + synchro != (mca_gpr_replica_synchro_list_t*)ompi_list_get_end(&seg->synchros); + synchro = (mca_gpr_replica_synchro_list_t*)ompi_list_get_next(synchro)) { + if (gpr_replica_check_key_list(synchro->addr_mode, synchro->num_keys, synchro->keys, + num_tokens, keys)) { + synchro->count++; } - if (synchro->present >= synchro->trigger) { - gpr_replica_notify(&synchro->subscribers, OMPI_REGISTRY_NOTIFY_SYNCHRO); + if ((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & synchro->synch_mode || + OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & synchro->synch_mode) && + synchro->count == synchro->trigger) { + notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens); + gpr_replica_process_triggers(notify_msg, synchro->id_tag); } } CLEANUP: /* release list of keys */ - while (NULL != (keyptr = (mca_gpr_keytable_t*)ompi_list_remove_last(keys))) { - OBJ_DESTRUCT(keyptr); + while (NULL != (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_remove_last(keylist))) { + OBJ_RELEASE(keyptr); } - OBJ_DESTRUCT(keys); + OBJ_RELEASE(keylist); return return_code; } -int gpr_replica_delete_object(ompi_registry_mode_t mode, +int gpr_replica_delete_object(ompi_registry_mode_t addr_mode, char *segment, char **tokens) { - mca_gpr_registry_core_t *reg, *prev; - mca_gpr_keytable_t *keyptr; - ompi_list_t *keys; - mca_gpr_registry_segment_t *seg; + mca_gpr_replica_core_t *reg, *prev; + mca_gpr_replica_keytable_t *keyptr; + ompi_list_t *keylist; + mca_gpr_replica_key_t *keys, *key2; + mca_gpr_replica_segment_t *seg; + int num_tokens; + + keys = NULL; /* protect against errors */ - if (NULL == segment || NULL == tokens || NULL == *tokens) { + if (NULL == segment) { return OMPI_ERROR; } /* find the specified segment */ - seg = gpr_replica_find_seg(segment); + seg = gpr_replica_find_seg(false, segment); if (NULL == seg) { /* segment not found */ return OMPI_ERROR; } /* convert tokens to list of keys */ - keys = gpr_replica_get_key_list(segment, tokens); - if (0 == ompi_list_get_size(keys)) { - return OMPI_ERROR; - } + keylist = gpr_replica_get_key_list(segment, tokens); + if (0 == (num_tokens = ompi_list_get_size(keylist))) { /* no tokens provided - wildcard case */ + keys = NULL; - /* traverse the list to find undefined tokens - error if found */ - for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys); - keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys); - keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) { - if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */ - return OMPI_ERROR; + } else { /* tokens provided */ + keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t)); + key2 = keys; + + /* traverse the list to find undefined tokens - error if found */ + for (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(keylist); + keyptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(keylist); + keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(keyptr)) { + if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */ + goto CLEANUP; + } + *key2 = keyptr->key; + key2++; } } /* traverse the segment's registry, looking for matching tokens per the specified mode */ - for (reg = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries); - reg != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries); - reg = (mca_gpr_registry_core_t*)ompi_list_get_next(reg)) { + for (reg = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries); + reg != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries); + reg = (mca_gpr_replica_core_t*)ompi_list_get_next(reg)) { /* for each registry entry, check the key list */ - if (gpr_replica_check_key_list(mode, keys, ®->keys)) { /* found the key(s) on the list */ - prev = (mca_gpr_registry_core_t*)ompi_list_get_prev(reg); + if (gpr_replica_check_key_list(addr_mode, num_tokens, keys, + reg->num_keys, reg->keys)) { /* found the key(s) on the list */ + prev = (mca_gpr_replica_core_t*)ompi_list_get_prev(reg); ompi_list_remove_item(&seg->registry_entries, ®->item); reg = prev; } } return OMPI_SUCCESS; + + CLEANUP: + if (NULL != keys) { + free(keys); + } + return OMPI_ERROR; } ompi_list_t* gpr_replica_index(char *segment) { ompi_list_t *answer; - mca_gpr_keytable_t *ptr; - mca_gpr_registry_segment_t *seg; + mca_gpr_replica_keytable_t *ptr; + mca_gpr_replica_segment_t *seg; ompi_registry_index_value_t *ans; answer = OBJ_NEW(ompi_list_t); if (NULL == segment) { /* looking for index of global registry */ - for (ptr = (mca_gpr_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); - ptr != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); - ptr = (mca_gpr_keytable_t*)ompi_list_get_next(ptr)) { + for (ptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); + ptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); + ptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr)) { ans = OBJ_NEW(ompi_registry_index_value_t); ans->token = strdup(ptr->token); ompi_list_append(answer, &ans->item); } } else { /* want index of specific segment */ /* find the specified segment */ - seg = gpr_replica_find_seg(segment); + seg = gpr_replica_find_seg(false, segment); if (NULL == seg) { /* segment not found */ return answer; } /* got segment - now index that dictionary */ - for (ptr = (mca_gpr_keytable_t*)ompi_list_get_first(&seg->keytable); - ptr != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable); - ptr = (mca_gpr_keytable_t*)ompi_list_get_next(ptr)) { + for (ptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&seg->keytable); + ptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable); + ptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr)) { ans = OBJ_NEW(ompi_registry_index_value_t); ans->token = strdup(ptr->token); ompi_list_append(answer, &ans->item); @@ -242,112 +258,118 @@ ompi_list_t* gpr_replica_index(char *segment) return answer; } -int gpr_replica_subscribe(ompi_process_name_t *subscriber, int tag, - ompi_registry_mode_t mode, +int gpr_replica_subscribe(ompi_registry_mode_t mode, ompi_registry_notify_action_t action, - char *segment, char **tokens) + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag) { return OMPI_ERR_NOT_IMPLEMENTED; } -int gpr_replica_unsubscribe(ompi_process_name_t *subscriber, - ompi_registry_mode_t mode, - char *segment, char **tokens) +int gpr_replica_unsubscribe(ompi_registry_mode_t mode, + ompi_registry_notify_action_t action, + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag) { return OMPI_ERR_NOT_IMPLEMENTED; } -int gpr_replica_synchro(ompi_process_name_t *subscriber, int tag, - ompi_registry_mode_t mode, - char *segment, char **tokens, int num) +int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode, + ompi_registry_mode_t addr_mode, + char *segment, char **tokens, int trigger, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag) { - mca_gpr_registry_segment_t *seg; - mca_gpr_synchro_list_t *synch; - mca_gpr_subscriber_list_t *subs; - char **tokptr; - mca_gpr_keylist_t *keyptr; - int i; + mca_gpr_notify_request_tracker_t *trackptr; + mca_gpr_idtag_list_t *ptr_free_id; /* protect against errors */ - if (NULL == segment || NULL == tokens || NULL == *tokens || 0 > num) { + if (NULL == segment || 0 > trigger) { return OMPI_ERROR; } - seg = gpr_replica_find_seg(segment); - if (NULL == seg) { /* couldn't find segment */ + /* enter request on notify tracking system */ + trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); + trackptr->requestor = NULL; + trackptr->req_tag = 0; + trackptr->callback = cb_func; + trackptr->user_tag = user_tag; + if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) { + trackptr->id_tag = mca_gpr_replica_last_notify_id_tag; + mca_gpr_replica_last_notify_id_tag++; + } else { + ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags); + trackptr->id_tag = ptr_free_id->id_tag; + } + trackptr->synchro = synchro_mode; + + /* construct the synchro - add to notify tracking system if success, otherwise dump */ + if (OMPI_SUCCESS == gpr_replica_construct_synchro(synchro_mode, addr_mode, segment, tokens, + trigger, trackptr->id_tag)) { + ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + return OMPI_SUCCESS; + } else { + OBJ_RELEASE(trackptr); return OMPI_ERROR; } - - synch = OBJ_NEW(mca_gpr_synchro_list_t); - - subs = OBJ_NEW(mca_gpr_subscriber_list_t); - subs->subscriber = subscriber; - subs->tag = tag; - ompi_list_append(&synch->subscribers, &subs->item); - - /* store key values of tokens, defining them if needed */ - for (i=i, tokptr=tokens; NULL != tokptr && NULL != *tokptr; i++, tokptr++) { - keyptr = OBJ_NEW(mca_gpr_keylist_t); - keyptr->key = gpr_replica_get_key(segment, *tokptr); - if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { - keyptr->key = gpr_replica_define_key(segment, *tokptr); - } - ompi_list_append(&synch->keys, &keyptr->item); - } - - synch->mode = mode; - synch->trigger = num; - synch->present = 0; - ompi_list_append(&seg->synchros, &synch->item); - - return OMPI_SUCCESS; } -ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode, +ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode, char *segment, char **tokens) { - mca_gpr_registry_segment_t *seg; + mca_gpr_replica_segment_t *seg; ompi_list_t *answer; ompi_registry_value_t *ans; - ompi_list_t *keys; - mca_gpr_keytable_t *keyptr; - mca_gpr_registry_core_t *reg; + mca_gpr_replica_key_t *keys, *key2; + ompi_list_t *keylist; + mca_gpr_replica_keytable_t *keyptr; + mca_gpr_replica_core_t *reg; + int num_tokens; answer = OBJ_NEW(ompi_list_t); /* protect against errors */ - if (NULL == segment || NULL == tokens || NULL == *tokens) { + if (NULL == segment) { return answer; } /* find the specified segment */ - seg = gpr_replica_find_seg(segment); + seg = gpr_replica_find_seg(false, segment); if (NULL == seg) { /* segment not found */ return answer; } + if (NULL == tokens) { /* wildcard case - return everything */ + keys = NULL; + } else { - /* convert tokens to list of keys */ - keys = gpr_replica_get_key_list(segment, tokens); - if (0 == ompi_list_get_size(keys)) { - return answer; - } + /* convert tokens to list of keys */ + keylist = gpr_replica_get_key_list(segment, tokens); + if (0 == (num_tokens = ompi_list_get_size(keylist))) { + return answer; + } - /* traverse the list to find undefined tokens - error if found */ - for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys); - keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys); - keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) { - if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */ - goto CLEANUP; + keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t)); + key2 = keys; + + /* traverse the list to find undefined tokens - error if found */ + for (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(keylist); + keyptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(keylist); + keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(keyptr)) { + if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */ + goto CLEANUP; + } + *key2 = keyptr->key; + key2++; } } /* traverse the segment's registry, looking for matching tokens per the specified mode */ - for (reg = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries); - reg != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries); - reg = (mca_gpr_registry_core_t*)ompi_list_get_next(reg)) { + for (reg = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries); + reg != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries); + reg = (mca_gpr_replica_core_t*)ompi_list_get_next(reg)) { /* for each registry entry, check the key list */ - if (gpr_replica_check_key_list(mode, keys, ®->keys)) { /* found the key(s) on the list */ + if (gpr_replica_check_key_list(addr_mode, num_tokens, keys, + reg->num_keys, reg->keys)) { /* found the key(s) on the list */ ans = OBJ_NEW(ompi_registry_value_t); ans->object_size = reg->object_size; ans->object = (ompi_registry_object_t*)malloc(ans->object_size); @@ -358,10 +380,13 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode, CLEANUP: /* release list of keys */ - while (NULL != (keyptr = (mca_gpr_keytable_t*)ompi_list_remove_last(keys))) { - OBJ_DESTRUCT(keyptr); + while (NULL != (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(keylist))) { + OBJ_RELEASE(keyptr); + } + OBJ_RELEASE(keylist); + if (NULL != keys) { + free(keys); } - OBJ_DESTRUCT(keys); return answer; } diff --git a/src/mca/gpr/replica/gpr_replica.h b/src/mca/gpr/replica/gpr_replica.h index 258bfbd4f1..1295d13abe 100644 --- a/src/mca/gpr/replica/gpr_replica.h +++ b/src/mca/gpr/replica/gpr_replica.h @@ -8,6 +8,9 @@ #include "ompi_config.h" + +#include + #include "include/types.h" #include "include/constants.h" #include "class/ompi_list.h" @@ -16,16 +19,23 @@ /* * typedefs needed in replica component */ + +/* + * Define the registry "key" + */ typedef uint32_t mca_gpr_replica_key_t; #define MCA_GPR_REPLICA_KEY_MAX UINT32_MAX -struct mca_gpr_registry_t { +/* + * Registry "head" + */ +struct mca_gpr_replica_t { ompi_list_t registry; ompi_list_t segment_dict; mca_gpr_replica_key_t lastkey; ompi_list_t freekeys; }; -typedef struct mca_gpr_registry_t mca_gpr_registry_t; +typedef struct mca_gpr_replica_t mca_gpr_replica_t; /** Dictionary of token-key pairs. * This structure is used to create a linked list of token-key pairs. All calls to @@ -34,59 +44,63 @@ typedef struct mca_gpr_registry_t mca_gpr_registry_t; * for faster searches of the registry. This structure is also used to return token-key pairs * from the dictionary in response to an ompi_registry_index() call. */ -struct mca_gpr_keytable_t { +struct mca_gpr_replica_keytable_t { ompi_list_item_t item; /**< Allows this item to be placed on a list */ char *token; /**< Char string that defines the key */ mca_gpr_replica_key_t key; /**< Numerical value assigned by registry to represent token string */ }; -typedef struct mca_gpr_keytable_t mca_gpr_keytable_t; +typedef struct mca_gpr_replica_keytable_t mca_gpr_replica_keytable_t; -OBJ_CLASS_DECLARATION(mca_gpr_keytable_t); +OBJ_CLASS_DECLARATION(mca_gpr_replica_keytable_t); /** List of keys that describe a stored object. * Each object stored in the registry may have as many keys describing it as the * creator desires. This structure is used to create a linked list of keys * associated with each object. */ -struct mca_gpr_keylist_t { +struct mca_gpr_replica_keylist_t { ompi_list_item_t item; /**< Allows this item to be placed on a list */ mca_gpr_replica_key_t key; /**< Numerical key that defines stored object */ }; -typedef struct mca_gpr_keylist_t mca_gpr_keylist_t; +typedef struct mca_gpr_replica_keylist_t mca_gpr_replica_keylist_t; -OBJ_CLASS_DECLARATION(mca_gpr_keylist_t); +OBJ_CLASS_DECLARATION(mca_gpr_replica_keylist_t); -/** List of subscribers to a stored object. - * Each object can have an arbitrary number of subscribers desiring notification - * upon specified actions being performed against the object. This structure is - * used to create a linked list of subscribers for objects. +/** List of subscribers to objects on a segment. + * Each segment can have an arbitrary number of subscribers desiring notification + * upon specified actions being performed against the objects on the segment. This structure is + * used to create a linked list of subscribers. */ -struct mca_gpr_subscriber_list_t { +struct mca_gpr_replica_subscriber_list_t { ompi_list_item_t item; /**< Allows this item to be placed on a list */ - ompi_process_name_t *subscriber; /**< Name of the subscribing process */ - int tag; /**< OOB tag of the subscriber */ + ompi_registry_mode_t addr_mode; /**< Addressing mode for subscription */ ompi_registry_notify_action_t action; /**< Bit-mask of actions that trigger notification */ + mca_gpr_replica_key_t num_keys; /**< Number of keys in array */ + mca_gpr_replica_key_t *keys; /**< Array of keys describing subscription */ + mca_gpr_notify_id_t id_tag; /**< Tag into the list of notify structures */ }; -typedef struct mca_gpr_subscriber_list_t mca_gpr_subscriber_list_t; +typedef struct mca_gpr_replica_subscriber_list_t mca_gpr_replica_subscriber_list_t; -OBJ_CLASS_DECLARATION(mca_gpr_subscriber_list_t); +OBJ_CLASS_DECLARATION(mca_gpr_replica_subscriber_list_t); /** List of synchro actions for a segment. * Each object can have an arbitrary number of subscribers desiring notification * upon specified actions being performed against the object. This structure is * used to create a linked list of subscribers for objects. */ -struct mca_gpr_synchro_list_t { - ompi_list_item_t item; /**< Allows this item to be placed on a list */ - ompi_list_t subscribers; /**< List of subscribers to be notified upon synchro */ - ompi_list_t keys; /**< List of keys describing the objects being counted*/ - ompi_registry_mode_t mode; /**< Mode for selecting objects using keys */ - int trigger; /**< Number of objects that trigger notification */ - int present; /**< Number of qualifying objects currently in segment */ +struct mca_gpr_replica_synchro_list_t { + ompi_list_item_t item; /**< Allows this item to be placed on a list */ + ompi_registry_synchro_mode_t synch_mode; /**< Synchro mode - ascending, descending, ... */ + ompi_registry_mode_t addr_mode; /**< Addressing mode */ + mca_gpr_replica_key_t num_keys; /**< Number of keys in array */ + mca_gpr_replica_key_t *keys; /**< Array of keys describing objects to be counted */ + uint32_t trigger; /**< Number of objects that trigger notification */ + uint32_t count; /**< Number of qualifying objects currently in segment */ + mca_gpr_notify_id_t id_tag; /**< Tag into the list of notify structures */ }; -typedef struct mca_gpr_synchro_list_t mca_gpr_synchro_list_t; +typedef struct mca_gpr_replica_synchro_list_t mca_gpr_replica_synchro_list_t; -OBJ_CLASS_DECLARATION(mca_gpr_synchro_list_t); +OBJ_CLASS_DECLARATION(mca_gpr_replica_synchro_list_t); /** List of replicas that hold a stored object. * Each object can have an arbitrary number of replicas that hold a copy @@ -109,12 +123,12 @@ OBJ_CLASS_DECLARATION(mca_gpr_replica_list_t); * of the object within the global registry, and the replica holding the last known * up-to-date version of the object. */ -struct mca_gpr_write_invalidate_t { +struct mca_gpr_replica_write_invalidate_t { bool invalidate; time_t last_mod; ompi_process_name_t *valid_replica; }; -typedef struct mca_gpr_write_invalidate_t mca_gpr_write_invalidate_t; +typedef struct mca_gpr_replica_write_invalidate_t mca_gpr_replica_write_invalidate_t; /** The core registry structure. @@ -129,18 +143,18 @@ typedef struct mca_gpr_write_invalidate_t mca_gpr_write_invalidate_t; * object are automatically granted. This may be changed at some future time by adding an * "authorization" linked list of ID's and their access rights to this structure. */ -struct mca_gpr_registry_core_t { +struct mca_gpr_replica_core_t { ompi_list_item_t item; /**< Allows this item to be placed on a list */ - ompi_list_t keys; /**< Linked list of keys that define stored object */ + mca_gpr_replica_key_t num_keys; /**< Number of keys in array */ + mca_gpr_replica_key_t *keys; /**< Array of keys that define stored object */ ompi_registry_object_size_t object_size; /**< Size of stored object, in bytes */ ompi_registry_object_t *object; /**< Pointer to stored object */ - ompi_list_t subscriber; /**< Linked list of subscribers to this object */ ompi_list_t replicas; /**< Linked list of replicas that also contain this object */ - mca_gpr_write_invalidate_t write_invalidate; /**< Structure containing write invalidate info */ + mca_gpr_replica_write_invalidate_t write_invalidate; /**< Structure containing write invalidate info */ }; -typedef struct mca_gpr_registry_core_t mca_gpr_registry_core_t; +typedef struct mca_gpr_replica_core_t mca_gpr_replica_core_t; -OBJ_CLASS_DECLARATION(mca_gpr_registry_core_t); +OBJ_CLASS_DECLARATION(mca_gpr_replica_core_t); /** Registry segment definition. * The registry is subdivided into segments, each defining a unique domain. The "universe" segment @@ -151,24 +165,28 @@ OBJ_CLASS_DECLARATION(mca_gpr_registry_core_t); * list of registry elements for that segment. Each segment also holds its own token-key dictionary * to avoid naming conflicts between tokens from CommWorlds sharing a given universe. */ -struct mca_gpr_registry_segment_t { +struct mca_gpr_replica_segment_t { ompi_list_item_t item; /**< Allows this item to be placed on a list */ mca_gpr_replica_key_t segment; /**< Key corresponding to name of registry segment */ mca_gpr_replica_key_t lastkey; /**< Highest key value used */ ompi_list_t registry_entries; /**< Linked list of stored objects within this segment */ + ompi_list_t subscriber; /**< List of subscriptions to objects on this segment */ ompi_list_t synchros; /**< List of synchro requests on this segment */ ompi_list_t keytable; /**< Token-key dictionary for this segment */ ompi_list_t freekeys; /**< List of keys that have been made available */ }; -typedef struct mca_gpr_registry_segment_t mca_gpr_registry_segment_t; +typedef struct mca_gpr_replica_segment_t mca_gpr_replica_segment_t; -OBJ_CLASS_DECLARATION(mca_gpr_registry_segment_t); +OBJ_CLASS_DECLARATION(mca_gpr_replica_segment_t); /* * globals needed within component */ -extern mca_gpr_registry_t mca_gpr_replica_head; +extern mca_gpr_replica_t mca_gpr_replica_head; +extern ompi_list_t mca_gpr_replica_notify_request_tracker; +extern mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag; +extern ompi_list_t mca_gpr_replica_free_notify_id_tags; /* * Module open / close @@ -187,31 +205,42 @@ int mca_gpr_replica_finalize(void); * Implemented registry functions */ -int gpr_replica_define_segment(char *segment); int gpr_replica_delete_segment(char *segment); + int gpr_replica_put(ompi_registry_mode_t mode, char *segment, - char **tokens, ompi_registry_object_t *object, + char **tokens, ompi_registry_object_t object, ompi_registry_object_size_t size); + int gpr_replica_delete_object(ompi_registry_mode_t mode, char *segment, char **tokens); + ompi_list_t* gpr_replica_index(char *segment); -int gpr_replica_subscribe(ompi_process_name_t *subscriber, int tag, - ompi_registry_mode_t mode, + +int gpr_replica_subscribe(ompi_registry_mode_t mode, ompi_registry_notify_action_t action, - char *segment, char **tokens); -int gpr_replica_unsubscribe(ompi_process_name_t *subscriber, ompi_registry_mode_t mode, - char *segment, char **tokens); -int gpr_replica_synchro(ompi_process_name_t *subscriber, int tag, + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); + +int gpr_replica_unsubscribe(ompi_registry_mode_t mode, + ompi_registry_notify_action_t action, + char *segment, char **tokens, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); + +int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode, ompi_registry_mode_t mode, - char *segment, char **tokens, int num); + char *segment, char **tokens, int trigger, + ompi_registry_notify_cb_fn_t cb_func, void *user_tag); + ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode, char *segment, char **tokens); + ompi_list_t* gpr_replica_test_internals(int level); void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, ompi_buffer_t buffer, int tag, void* cbdata); -void gpr_replica_notify(ompi_list_t *subscribers, ompi_registry_notify_action_t flag); +void gpr_replica_remote_notify(ompi_process_name_t *recipient, int recipient_tag, + ompi_registry_notify_message_t *message); #endif diff --git a/src/mca/gpr/replica/gpr_replica_component.c b/src/mca/gpr/replica/gpr_replica_component.c index 1f466c3808..22e2994c34 100644 --- a/src/mca/gpr/replica/gpr_replica_component.c +++ b/src/mca/gpr/replica/gpr_replica_component.c @@ -13,6 +13,8 @@ */ #include "ompi_config.h" +#include + #include "include/constants.h" #include "util/proc_info.h" #include "util/output.h" @@ -70,18 +72,21 @@ static bool initialized = false; /* * globals needed within replica component */ -mca_gpr_registry_t mca_gpr_replica_head; +mca_gpr_replica_t mca_gpr_replica_head; +ompi_list_t mca_gpr_replica_notify_request_tracker; +mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag; +ompi_list_t mca_gpr_replica_free_notify_id_tags; /* constructor - used to initialize state of keytable instance */ -static void mca_gpr_keytable_construct(mca_gpr_keytable_t* keytable) +static void mca_gpr_replica_keytable_construct(mca_gpr_replica_keytable_t* keytable) { keytable->token = NULL; keytable->key = 0; } /* destructor - used to free any resources held by instance */ -static void mca_gpr_keytable_destructor(mca_gpr_keytable_t* keytable) +static void mca_gpr_replica_keytable_destructor(mca_gpr_replica_keytable_t* keytable) { if (NULL != keytable->token) { free(keytable->token); @@ -90,78 +95,81 @@ static void mca_gpr_keytable_destructor(mca_gpr_keytable_t* keytable) /* define instance of ompi_class_t */ OBJ_CLASS_INSTANCE( - mca_gpr_keytable_t, /* type name */ + mca_gpr_replica_keytable_t, /* type name */ ompi_list_item_t, /* parent "class" name */ - mca_gpr_keytable_construct, /* constructor */ - mca_gpr_keytable_destructor); /* destructor */ + mca_gpr_replica_keytable_construct, /* constructor */ + mca_gpr_replica_keytable_destructor); /* destructor */ /* constructor - used to initialize state of keylist instance */ -static void mca_gpr_keylist_construct(mca_gpr_keylist_t* keylist) +static void mca_gpr_replica_keylist_construct(mca_gpr_replica_keylist_t* keylist) { keylist->key = 0; } /* destructor - used to free any resources held by instance */ -static void mca_gpr_keylist_destructor(mca_gpr_keylist_t* keylist) +static void mca_gpr_replica_keylist_destructor(mca_gpr_replica_keylist_t* keylist) { } /* define instance of ompi_class_t */ OBJ_CLASS_INSTANCE( - mca_gpr_keylist_t, /* type name */ + mca_gpr_replica_keylist_t, /* type name */ ompi_list_item_t, /* parent "class" name */ - mca_gpr_keylist_construct, /* constructor */ - mca_gpr_keylist_destructor); /* destructor */ + mca_gpr_replica_keylist_construct, /* constructor */ + mca_gpr_replica_keylist_destructor); /* destructor */ /* constructor - used to initialize state of subscriber list instance */ -static void mca_gpr_subscriber_list_construct(mca_gpr_subscriber_list_t* subscriber) +static void mca_gpr_replica_subscriber_list_construct(mca_gpr_replica_subscriber_list_t* subscriber) { - subscriber->subscriber = NULL; - subscriber->tag = 0; - subscriber->action = 0x00; + subscriber->addr_mode = 0; + subscriber->action = 0; + subscriber->keys = NULL; + subscriber->id_tag = 0; } /* destructor - used to free any resources held by instance */ -static void mca_gpr_subscriber_list_destructor(mca_gpr_subscriber_list_t* subscriber) +static void mca_gpr_replica_subscriber_list_destructor(mca_gpr_replica_subscriber_list_t* subscriber) { - if (NULL != subscriber->subscriber) { - free(subscriber->subscriber); + if (NULL != subscriber->keys) { + free(subscriber->keys); } } /* define instance of ompi_class_t */ OBJ_CLASS_INSTANCE( - mca_gpr_subscriber_list_t, /* type name */ + mca_gpr_replica_subscriber_list_t, /* type name */ ompi_list_item_t, /* parent "class" name */ - mca_gpr_subscriber_list_construct, /* constructor */ - mca_gpr_subscriber_list_destructor); /* destructor */ + mca_gpr_replica_subscriber_list_construct, /* constructor */ + mca_gpr_replica_subscriber_list_destructor); /* destructor */ /* constructor - used to initialize state of synchro list instance */ -static void mca_gpr_synchro_list_construct(mca_gpr_synchro_list_t* synchro) +static void mca_gpr_replica_synchro_list_construct(mca_gpr_replica_synchro_list_t* synchro) { - OBJ_CONSTRUCT(&synchro->subscribers, ompi_list_t); - OBJ_CONSTRUCT(&synchro->keys, ompi_list_t); - synchro->mode = 0x00; + synchro->synch_mode = OMPI_REGISTRY_SYNCHRO_MODE_NONE; + synchro->addr_mode = OMPI_REGISTRY_NONE; + synchro->keys = NULL; synchro->trigger = 0; - synchro->present = 0; + synchro->count = 0; + synchro->id_tag = 0; } /* destructor - used to free any resources held by instance */ -static void mca_gpr_synchro_list_destructor(mca_gpr_synchro_list_t* synchro) +static void mca_gpr_replica_synchro_list_destructor(mca_gpr_replica_synchro_list_t* synchro) { - OBJ_DESTRUCT(&synchro->subscribers); - OBJ_DESTRUCT(&synchro->keys); + if (NULL != synchro->keys) { + free(synchro->keys); + } } /* define instance of ompi_class_t */ OBJ_CLASS_INSTANCE( - mca_gpr_synchro_list_t, /* type name */ + mca_gpr_replica_synchro_list_t, /* type name */ ompi_list_item_t, /* parent "class" name */ - mca_gpr_synchro_list_construct, /* constructor */ - mca_gpr_synchro_list_destructor); /* destructor */ + mca_gpr_replica_synchro_list_construct, /* constructor */ + mca_gpr_replica_synchro_list_destructor); /* destructor */ /* constructor - used to initialize state of replica list instance */ @@ -188,50 +196,58 @@ OBJ_CLASS_INSTANCE( /* constructor - used to initialize state of registry core instance */ -static void mca_gpr_registry_core_construct(mca_gpr_registry_core_t* reg) +static void mca_gpr_replica_core_construct(mca_gpr_replica_core_t* reg) { - OBJ_CONSTRUCT(®->keys, ompi_list_t); + reg->keys = NULL; reg->object_size = 0; reg->object = NULL; - OBJ_CONSTRUCT(®->subscriber, ompi_list_t); OBJ_CONSTRUCT(®->replicas, ompi_list_t); + reg->write_invalidate.invalidate = false; + reg->write_invalidate.last_mod = 0; + reg->write_invalidate.valid_replica = NULL; } /* destructor - used to free any resources held by instance */ -static void mca_gpr_registry_core_destructor(mca_gpr_registry_core_t* reg) +static void mca_gpr_replica_core_destructor(mca_gpr_replica_core_t* reg) { - OBJ_DESTRUCT(®->keys); + if (NULL != reg->keys) { + free(reg->keys); + } if (NULL != reg->object) { free(reg->object); } - OBJ_DESTRUCT(®->subscriber); + if (NULL != reg->write_invalidate.valid_replica) { + free(reg->write_invalidate.valid_replica); + } OBJ_DESTRUCT(®->replicas); } /* define instance of ompi_class_t */ OBJ_CLASS_INSTANCE( - mca_gpr_registry_core_t, /* type name */ + mca_gpr_replica_core_t, /* type name */ ompi_list_item_t, /* parent "class" name */ - mca_gpr_registry_core_construct, /* constructor */ - mca_gpr_registry_core_destructor); /* destructor */ + mca_gpr_replica_core_construct, /* constructor */ + mca_gpr_replica_core_destructor); /* destructor */ /* constructor - used to initialize state of segment instance */ -static void mca_gpr_registry_segment_construct(mca_gpr_registry_segment_t* seg) +static void mca_gpr_replica_segment_construct(mca_gpr_replica_segment_t* seg) { seg->segment = 0; seg->lastkey = 0; OBJ_CONSTRUCT(&seg->registry_entries, ompi_list_t); + OBJ_CONSTRUCT(&seg->subscriber, ompi_list_t); OBJ_CONSTRUCT(&seg->synchros, ompi_list_t); OBJ_CONSTRUCT(&seg->keytable, ompi_list_t); OBJ_CONSTRUCT(&seg->freekeys, ompi_list_t); } /* destructor - used to free any resources held by instance */ -static void mca_gpr_registry_segment_destructor(mca_gpr_registry_segment_t* seg) +static void mca_gpr_replica_segment_destructor(mca_gpr_replica_segment_t* seg) { OBJ_DESTRUCT(&seg->registry_entries); + OBJ_DESTRUCT(&seg->subscriber); OBJ_DESTRUCT(&seg->synchros); OBJ_DESTRUCT(&seg->keytable); OBJ_DESTRUCT(&seg->freekeys); @@ -239,10 +255,10 @@ static void mca_gpr_registry_segment_destructor(mca_gpr_registry_segment_t* seg) /* define instance of ompi_class_t */ OBJ_CLASS_INSTANCE( - mca_gpr_registry_segment_t, /* type name */ + mca_gpr_replica_segment_t, /* type name */ ompi_list_item_t, /* parent "class" name */ - mca_gpr_registry_segment_construct, /* constructor */ - mca_gpr_registry_segment_destructor); /* destructor */ + mca_gpr_replica_segment_construct, /* constructor */ + mca_gpr_replica_segment_destructor); /* destructor */ /* @@ -265,6 +281,7 @@ int mca_gpr_replica_close(void) mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority) { + ompi_output(0, "entered replica init"); /* If we're the seed, then we want to be selected, so do all the setup and return the module */ @@ -286,16 +303,29 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool /* initialize the registry head */ OBJ_CONSTRUCT(&mca_gpr_replica_head.registry, ompi_list_t); + ompi_output(0, "registry head setup"); + /* initialize the global dictionary for segment id's */ OBJ_CONSTRUCT(&mca_gpr_replica_head.segment_dict, ompi_list_t); OBJ_CONSTRUCT(&mca_gpr_replica_head.freekeys, ompi_list_t); mca_gpr_replica_head.lastkey = 0; - /* issue the non-blocking receive */ - rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); - if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { - return NULL; - } + ompi_output(0, "global dict setup"); + + /* initialize the notify request tracker */ + OBJ_CONSTRUCT(&mca_gpr_replica_notify_request_tracker, ompi_list_t); + mca_gpr_replica_last_notify_id_tag = 0; + OBJ_CONSTRUCT(&mca_gpr_replica_free_notify_id_tags, ompi_list_t); + + ompi_output(0, "req tracker setup"); + +/* /\* issue the non-blocking receive *\/ */ +/* rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); */ +/* if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { */ +/* return NULL; */ +/* } */ + + ompi_output(0, "nb receive setup"); /* Return the module */ @@ -315,6 +345,8 @@ int mca_gpr_replica_finalize(void) if (initialized) { OBJ_DESTRUCT(&mca_gpr_replica_head); + OBJ_DESTRUCT(&mca_gpr_replica_notify_request_tracker); + OBJ_DESTRUCT(&mca_gpr_replica_free_notify_id_tags); initialized = false; } @@ -341,10 +373,12 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, ompi_registry_internal_test_results_t *testval; ompi_registry_index_value_t *indexval; char **tokens, **tokptr; - int32_t num_tokens, test_level, i, trigger; + int32_t num_tokens, test_level, i, trigger, id_tag; mca_gpr_cmd_flag_t command; char *segment; - int32_t response, target_tag; + int32_t response, synchro_mode; + mca_gpr_notify_request_tracker_t *trackptr; + mca_gpr_idtag_list_t *ptr_free_id; if (OMPI_SUCCESS != ompi_buffer_init(&answer, 0)) { /* RHC -- not sure what to do if this fails */ @@ -354,6 +388,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } + /****** DELETE SEGMENT *****/ if (MCA_GPR_DELETE_SEGMENT_CMD == command) { /* got command to delete a segment */ if (0 > ompi_unpack_string(buffer, &segment)) { goto RETURN_ERROR; @@ -370,6 +405,8 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, if (0 > mca_oob_send_packed(sender, answer, tag, 0)) { /* RHC -- not sure what to do if the return send fails */ } + + /***** PUT *****/ } else if (MCA_GPR_PUT_CMD == command) { /* got command to put object on registry */ if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { @@ -384,6 +421,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } + if (0 >= num_tokens) { /** no tokens provided - error for PUT */ + goto RETURN_ERROR; + } + tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); tokptr = tokens; @@ -399,6 +440,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } + if (0 >= object_size) { /* error condition - nothing to store */ + goto RETURN_ERROR; + } + object = (ompi_registry_object_t)malloc(object_size); if (OMPI_SUCCESS != ompi_unpack(buffer, object, object_size, OMPI_BYTE)) { goto RETURN_ERROR; @@ -416,6 +461,8 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, if (0 > mca_oob_send_packed(sender, answer, tag, 0)) { /* RHC -- not sure what to do if the return send fails */ } + + /***** GET *****/ } else if (MCA_GPR_GET_CMD == command) { /* got command to put object on registry */ if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { @@ -430,15 +477,19 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } - tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); - tokptr = tokens; - for (i=0; i ompi_unpack_string(buffer, tokptr)) { - goto RETURN_ERROR; + if (0 >= num_tokens) { /* no tokens provided - wildcard case */ + tokens = NULL; + } else { /* tokens provided */ + tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); + tokptr = tokens; + for (i=0; i ompi_unpack_string(buffer, tokptr)) { + goto RETURN_ERROR; + } + tokptr++; } - tokptr++; + *tokptr = NULL; } - *tokptr = NULL; returned_list = ompi_registry.get(mode, segment, tokens); @@ -466,6 +517,8 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, if (0 > mca_oob_send_packed(sender, answer, tag, 0)) { /* RHC -- not sure what to do if the return send fails */ } + + /***** DELETE OBJECT *****/ } else if (MCA_GPR_DELETE_OBJECT_CMD == command) { if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { @@ -480,15 +533,19 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } - tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); - tokptr = tokens; - for (i=0; i ompi_unpack_string(buffer, tokptr)) { - goto RETURN_ERROR; + if (0 >= num_tokens) { /* no tokens provided - wildcard case */ + tokens = NULL; + } else { /* tokens provided */ + tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); + tokptr = tokens; + for (i=0; i ompi_unpack_string(buffer, tokptr)) { + goto RETURN_ERROR; + } + tokptr++; } - tokptr++; + *tokptr = NULL; } - *tokptr = NULL; response = (int32_t)ompi_registry.delete_object(mode, segment, tokens); @@ -504,6 +561,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, /* RHC -- not sure what to do if the return send fails */ } + /***** INDEX *****/ } else if (MCA_GPR_INDEX_CMD == command) { if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { @@ -543,15 +601,22 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, /* RHC -- not sure what to do if the return send fails */ } + /***** SUBSCRIBE *****/ } else if (MCA_GPR_SUBSCRIBE_CMD == command) { action = OMPI_REGISTRY_NOTIFY_ALL; goto RETURN_ERROR; + /***** UNSUBSCRIBE *****/ } else if (MCA_GPR_UNSUBSCRIBE_CMD == command) { goto RETURN_ERROR; + /***** SYNCHRO *****/ } else if (MCA_GPR_SYNCHRO_CMD == command) { - if (OMPI_SUCCESS != ompi_unpack(buffer, &target_tag, 1, OMPI_INT32)) { + if (OMPI_SUCCESS != ompi_unpack(buffer, &synchro_mode, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + if (OMPI_REGISTRY_SYNCHRO_MODE_NONE == synchro_mode) { goto RETURN_ERROR; } @@ -567,21 +632,46 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } - tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); - tokptr = tokens; - for (i=0; i ompi_unpack_string(buffer, tokptr)) { - goto RETURN_ERROR; + if (0 < num_tokens) { /* tokens provided */ + tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); + tokptr = tokens; + for (i=0; i ompi_unpack_string(buffer, tokptr)) { + goto RETURN_ERROR; + } + tokptr++; } - tokptr++; + *tokptr = NULL; + } else { /* no tokens provided - wildcard case, just count entries on segment */ + tokens = NULL; } - *tokptr = NULL; if (OMPI_SUCCESS != ompi_unpack(buffer, &trigger, 1, OMPI_INT32)) { goto RETURN_ERROR; } - response = (int32_t)ompi_registry.synchro(sender, (int)target_tag, mode, segment, tokens, (int)trigger); + if (OMPI_SUCCESS != ompi_unpack(buffer, &id_tag, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + /* enter request on notify tracking system */ + trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); + trackptr->requestor = ompi_name_server.copy_process_name(sender); + trackptr->req_tag = id_tag; + trackptr->callback = NULL; + trackptr->user_tag = NULL; + if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) { + trackptr->id_tag = mca_gpr_replica_last_notify_id_tag; + mca_gpr_replica_last_notify_id_tag++; + } else { + ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags); + trackptr->id_tag = ptr_free_id->id_tag; + } + trackptr->synchro = synchro_mode; + ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + + response = (int32_t)gpr_replica_construct_synchro(synchro_mode, mode, segment, tokens, + trigger, trackptr->id_tag); if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) { goto RETURN_ERROR; @@ -595,7 +685,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, /* RHC -- not sure what to do if the return send fails */ } - + /***** TEST INTERNALS *****/ } else if (MCA_GPR_TEST_INTERNALS_CMD == command) { if ((OMPI_SUCCESS != ompi_unpack(buffer, &test_level, 1, OMPI_INT32)) || @@ -630,7 +720,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, /* RHC -- not sure what to do if the return send fails */ } - + /**** UNRECOGNIZED ****/ } else { /* got an unrecognized command */ RETURN_ERROR: ompi_buffer_init(&error_answer, 8); @@ -648,34 +738,61 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, } -void gpr_replica_notify(ompi_list_t *subscribers, ompi_registry_notify_action_t flag) +void gpr_replica_remote_notify(ompi_process_name_t *recipient, int recipient_tag, + ompi_registry_notify_message_t *message) { - mca_gpr_subscriber_list_t *subs; ompi_buffer_t msg; - ompi_registry_notify_action_t command; + mca_gpr_cmd_flag_t command; + int32_t num_items, i; + ompi_registry_value_t *regval; + char **tokptr; + int recv_tag; - command = OMPI_REGISTRY_NOTIFY_MESSAGE; + command = MCA_GPR_NOTIFY_CMD; + recv_tag = MCA_OOB_TAG_GPR_NOTIFY; - for (subs = (mca_gpr_subscriber_list_t*)ompi_list_get_first(subscribers); - subs != (mca_gpr_subscriber_list_t*)ompi_list_get_end(subscribers); - subs = (mca_gpr_subscriber_list_t*)ompi_list_get_next(subs)) { - - if (OMPI_SUCCESS != ompi_buffer_init(&msg, 0)) { - return; - } - - if (OMPI_SUCCESS != ompi_pack(msg, &command, 1, MCA_GPR_OOB_PACK_ACTION)) { - return; - } - - if (OMPI_SUCCESS != ompi_pack(msg, &flag, 1, MCA_GPR_OOB_PACK_ACTION)) { - return; - } - - if (0 > mca_oob_send_packed(subs->subscriber, msg, subs->tag, 0)) { - return; - } - - ompi_buffer_free(msg); + if (OMPI_SUCCESS != ompi_buffer_init(&msg, 0)) { + return; } + + if (OMPI_SUCCESS != ompi_pack(msg, &command, 1, MCA_GPR_OOB_PACK_CMD)) { + return; + } + + i = (int32_t)recipient_tag; + if (OMPI_SUCCESS != ompi_pack(msg, &i, 1, OMPI_INT32)) { + return; + } + + num_items = (int32_t)ompi_list_get_size(&message->data); + if (OMPI_SUCCESS != ompi_pack(msg, &num_items, 1, OMPI_INT32)) { + return; + } + + if (0 < num_items) { /* don't send anything else back if the list is empty */ + while (NULL != (regval = (ompi_registry_value_t*)ompi_list_remove_first(&message->data))) { + if (OMPI_SUCCESS != ompi_pack(msg, ®val->object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) { + return; + } + if (OMPI_SUCCESS != ompi_pack(msg, regval->object, regval->object_size, OMPI_BYTE)) { + return; + } + } + } + if (OMPI_SUCCESS != ompi_pack(msg, &message->num_tokens, 1, OMPI_INT32)) { + return; + } + + for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) { + if (OMPI_SUCCESS != ompi_pack_string(msg, *tokptr)) { + return; + } + } + + if (0 > mca_oob_send_packed(recipient, msg, recv_tag, 0)) { + return; + } + + ompi_buffer_free(msg); + OBJ_RELEASE(message); } diff --git a/src/mca/gpr/replica/gpr_replica_internals.c b/src/mca/gpr/replica/gpr_replica_internals.c index e7dea01921..5efdd5ca75 100644 --- a/src/mca/gpr/replica/gpr_replica_internals.c +++ b/src/mca/gpr/replica/gpr_replica_internals.c @@ -32,77 +32,75 @@ * */ -int gpr_replica_define_segment(char *segment) +mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment) { - mca_gpr_registry_segment_t *seg; + mca_gpr_replica_segment_t *seg; mca_gpr_replica_key_t key; - int response; - response = gpr_replica_define_key(segment, NULL); - if (0 > response && OMPI_EXISTS != response) { /* got some kind of error code */ - return response; + key = gpr_replica_define_key(segment, NULL); + if (MCA_GPR_REPLICA_KEY_MAX == key) { /* got some kind of error code */ + return NULL; } /* need to add the segment to the registry */ - key = gpr_replica_get_key(segment, NULL); - if (MCA_GPR_REPLICA_KEY_MAX == key) { /* couldn't retrieve it */ - return OMPI_ERROR; - } - seg = OBJ_NEW(mca_gpr_registry_segment_t); + seg = OBJ_NEW(mca_gpr_replica_segment_t); seg->segment = key; ompi_list_append(&mca_gpr_replica_head.registry, &seg->item); - return OMPI_SUCCESS; + return seg; } -mca_gpr_registry_segment_t *gpr_replica_find_seg(char *segment) +mca_gpr_replica_segment_t *gpr_replica_find_seg(bool create, char *segment) { - mca_gpr_keytable_t *ptr_seg; - mca_gpr_registry_segment_t *seg; + mca_gpr_replica_keytable_t *ptr_seg; + mca_gpr_replica_segment_t *seg; /* search the registry segments to find which one is being referenced */ - for (ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); - ptr_seg != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); - ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_seg)) { + for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); + ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); + ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) { if (0 == strcmp(segment, ptr_seg->token)) { /* search mca_gpr_replica_head to find segment */ - for (seg=(mca_gpr_registry_segment_t*)ompi_list_get_first(&mca_gpr_replica_head.registry); - seg != (mca_gpr_registry_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry); - seg = (mca_gpr_registry_segment_t*)ompi_list_get_next(seg)) { + for (seg=(mca_gpr_replica_segment_t*)ompi_list_get_first(&mca_gpr_replica_head.registry); + seg != (mca_gpr_replica_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry); + seg = (mca_gpr_replica_segment_t*)ompi_list_get_next(seg)) { if(seg->segment == ptr_seg->key) { return(seg); } } } } - /* didn't find the dictionary entry */ - return NULL; + if (create) { + /* didn't find the dictionary entry - create it */ + return gpr_replica_define_segment(segment); + } + return NULL; /* don't create it - just return NULL */ } -mca_gpr_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token) +mca_gpr_replica_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token) { - mca_gpr_keytable_t *ptr_seg; - mca_gpr_keytable_t *ptr_key; - mca_gpr_registry_segment_t *seg; + mca_gpr_replica_keytable_t *ptr_seg; + mca_gpr_replica_keytable_t *ptr_key; + mca_gpr_replica_segment_t *seg; /* search the registry segments to find which one is being referenced */ - for (ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); - ptr_seg != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); - ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_seg)) { + for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); + ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); + ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) { if (0 == strcmp(segment, ptr_seg->token)) { if (NULL == token) { /* just want segment token-key pair */ return(ptr_seg); } /* search registry to find segment */ - for (seg=(mca_gpr_registry_segment_t*)ompi_list_get_first(&mca_gpr_replica_head.registry); - seg != (mca_gpr_registry_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry); - seg = (mca_gpr_registry_segment_t*)ompi_list_get_next(seg)) { + for (seg=(mca_gpr_replica_segment_t*)ompi_list_get_first(&mca_gpr_replica_head.registry); + seg != (mca_gpr_replica_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry); + seg = (mca_gpr_replica_segment_t*)ompi_list_get_next(seg)) { if(seg->segment == ptr_seg->key) { /* got segment - now find specified token-key pair in that dictionary */ - for (ptr_key = (mca_gpr_keytable_t*)ompi_list_get_first(&seg->keytable); - ptr_key != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable); - ptr_key = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_key)) { + for (ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&seg->keytable); + ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable); + ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) { if (0 == strcmp(token, ptr_key->token)) { return(ptr_key); } @@ -119,7 +117,7 @@ mca_gpr_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token) mca_gpr_replica_key_t gpr_replica_get_key(char *segment, char *token) { - mca_gpr_keytable_t *ptr_key; + mca_gpr_replica_keytable_t *ptr_key; /* find registry segment */ ptr_key = gpr_replica_find_dict_entry(segment, NULL); @@ -142,18 +140,18 @@ ompi_list_t *gpr_replica_get_key_list(char *segment, char **tokens) { ompi_list_t *keys; char **token; - mca_gpr_keytable_t *keyptr; + mca_gpr_replica_keytable_t *keyptr; token = tokens; keys = OBJ_NEW(ompi_list_t); /* protect against errors */ - if (NULL == segment || NULL == tokens || NULL == *token) { + if (NULL == segment || NULL == tokens) { return keys; } while (NULL != *token) { /* traverse array of tokens until NULL */ - keyptr = OBJ_NEW(mca_gpr_keytable_t); + keyptr = OBJ_NEW(mca_gpr_replica_keytable_t); keyptr->token = strdup(*token); keyptr->key = gpr_replica_get_key(segment, *token); ompi_list_append(keys, &keyptr->item); @@ -162,10 +160,10 @@ ompi_list_t *gpr_replica_get_key_list(char *segment, char **tokens) return keys; } -int gpr_replica_define_key(char *segment, char *token) +mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token) { - mca_gpr_registry_segment_t *seg; - mca_gpr_keytable_t *ptr_seg, *ptr_key, *new; + mca_gpr_replica_segment_t *seg; + mca_gpr_replica_keytable_t *ptr_seg, *ptr_key, *new; /* protect against errors */ if (NULL == segment) { @@ -174,66 +172,68 @@ int gpr_replica_define_key(char *segment, char *token) /* if token is NULL, then this is defining a segment name. Check dictionary to ensure uniqueness */ if (NULL == token) { - for (ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); - ptr_seg != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); - ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_seg)) { + for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); + ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); + ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) { if (0 == strcmp(segment, ptr_seg->token)) { - return OMPI_EXISTS; + return ptr_seg->key; } } /* okay, name is not previously taken. Define a key value for it and return */ - new = OBJ_NEW(mca_gpr_keytable_t); + new = OBJ_NEW(mca_gpr_replica_keytable_t); new->token = strdup(segment); if (0 == ompi_list_get_size(&mca_gpr_replica_head.freekeys)) { /* no keys waiting for reuse */ if (MCA_GPR_REPLICA_KEY_MAX-2 > mca_gpr_replica_head.lastkey) { /* have a key left */ mca_gpr_replica_head.lastkey++; new->key = mca_gpr_replica_head.lastkey; } else { /* out of keys */ - return OMPI_ERR_OUT_OF_RESOURCE; + return MCA_GPR_REPLICA_KEY_MAX; } } else { - ptr_key = (mca_gpr_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys); + ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys); new->key = ptr_key->key; } ompi_list_append(&mca_gpr_replica_head.segment_dict, &new->item); - return OMPI_SUCCESS; + return new->key; } /* okay, token is specified */ /* search the registry segments to find which one is being referenced */ - seg = gpr_replica_find_seg(segment); + seg = gpr_replica_find_seg(true, segment); if (NULL != seg) { /* using that segment, check dictionary to ensure uniqueness */ - for (ptr_key = (mca_gpr_keytable_t*)ompi_list_get_first(&seg->keytable); - ptr_key != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable); - ptr_key = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_key)) { + for (ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&seg->keytable); + ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable); + ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) { if (0 == strcmp(token, ptr_key->token)) { - return OMPI_EXISTS; /* already taken, report error */ + return ptr_key->key; /* already taken, report value */ } } /* okay, token is unique - create dictionary entry */ - new = OBJ_NEW(mca_gpr_keytable_t); + new = OBJ_NEW(mca_gpr_replica_keytable_t); new->token = strdup(token); if (0 == ompi_list_get_size(&seg->freekeys)) { /* no keys waiting for reuse */ seg->lastkey++; new->key = seg->lastkey; } else { - ptr_key = (mca_gpr_keytable_t*)ompi_list_remove_first(&seg->freekeys); + ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&seg->freekeys); new->key = ptr_key->key; } ompi_list_append(&seg->keytable, &new->item); - return OMPI_SUCCESS; + return new->key; } /* couldn't find segment */ - return OMPI_ERROR; + return MCA_GPR_REPLICA_KEY_MAX; } int gpr_replica_delete_key(char *segment, char *token) { - mca_gpr_registry_segment_t *seg; - mca_gpr_registry_core_t *reg, *prev; - mca_gpr_keytable_t *ptr_seg, *ptr_key, *new, *regkey; + mca_gpr_replica_segment_t *seg; + mca_gpr_replica_core_t *reg; + mca_gpr_replica_keytable_t *ptr_seg, *ptr_key, *new; + mca_gpr_replica_key_t *key; + int i; /* protect ourselves against errors */ if (NULL == segment) { @@ -241,7 +241,7 @@ int gpr_replica_delete_key(char *segment, char *token) } /* find the segment */ - seg = gpr_replica_find_seg(segment); + seg = gpr_replica_find_seg(false, segment); if (NULL != seg) { /* if specified token is NULL, then this is deleting a segment name.*/ @@ -255,7 +255,7 @@ int gpr_replica_delete_key(char *segment, char *token) return OMPI_ERROR; } /* add key to global registry's freekey list */ - new = OBJ_NEW(mca_gpr_keytable_t); + new = OBJ_NEW(mca_gpr_replica_keytable_t); new->token = NULL; new->key = ptr_seg->key; ompi_list_append(&mca_gpr_replica_head.freekeys, &new->item); @@ -268,39 +268,28 @@ int gpr_replica_delete_key(char *segment, char *token) ptr_key = gpr_replica_find_dict_entry(segment, token); if (NULL != ptr_key) { /* found key in dictionary */ - /* need to search this segment's registry to find all instances of key - then delete them */ - for (reg = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries); - reg != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries); - reg = (mca_gpr_registry_core_t*)ompi_list_get_next(reg)) { + /* need to search this segment's registry to find all instances of key & "delete" them */ + for (reg = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries); + reg != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries); + reg = (mca_gpr_replica_core_t*)ompi_list_get_next(reg)) { /* check the key list */ - for (regkey = (mca_gpr_keytable_t*)ompi_list_get_first(®->keys); - (regkey != (mca_gpr_keytable_t*)ompi_list_get_end(®->keys)) - && (regkey->key != ptr_key->key); - regkey = (mca_gpr_keytable_t*)ompi_list_get_next(regkey)); - if (regkey != (mca_gpr_keytable_t*)ompi_list_get_end(®->keys)) { - ompi_list_remove_item(®->keys, ®key->item); - } - /* if this was the last key, then remove the registry entry itself */ - if (0 == ompi_list_get_size(®->keys)) { - while (0 < ompi_list_get_size(®->subscriber)) { - ompi_list_remove_last(®->subscriber); + for (i=0, key=reg->keys; i < reg->num_keys; i++, key++) { + if (ptr_key->key == *key) { /* found match */ + *key = MCA_GPR_REPLICA_KEY_MAX; } - prev = (mca_gpr_registry_core_t*)ompi_list_get_prev(reg); - ompi_list_remove_item(&seg->registry_entries, ®->item); - reg = prev; } + + /* add key to this segment's freekey list */ + new = OBJ_NEW(mca_gpr_replica_keytable_t); + new->token = NULL; + new->key = ptr_key->key; + ompi_list_append(&seg->freekeys, &new->item); + + /* now remove the dictionary entry from the segment's dictionary */ + ompi_list_remove_item(&seg->keytable, &ptr_key->item); + return(OMPI_SUCCESS); } - - /* add key to this segment's freekey list */ - new = OBJ_NEW(mca_gpr_keytable_t); - new->token = NULL; - new->key = ptr_key->key; - ompi_list_append(&seg->freekeys, &new->item); - - /* now remove the dictionary entry from the segment's dictionary */ - ompi_list_remove_item(&seg->keytable, &ptr_key->item); - return(OMPI_SUCCESS); } return(OMPI_ERROR); /* if we get here, then we couldn't find token in dictionary */ } @@ -308,25 +297,34 @@ int gpr_replica_delete_key(char *segment, char *token) return(OMPI_ERROR); /* if we get here, then we couldn't find segment */ } -int gpr_replica_empty_segment(mca_gpr_registry_segment_t *seg) +int gpr_replica_empty_segment(mca_gpr_replica_segment_t *seg) { + mca_gpr_replica_core_t *ptr; + mca_gpr_replica_keytable_t *keytab; + mca_gpr_replica_keylist_t *keylst; + /* need to free memory from each entry - remove_last returns pointer to the entry */ /* empty the segment's registry */ - while (0 < ompi_list_get_size(&seg->registry_entries)) { - ompi_list_remove_last(&seg->registry_entries); + while (!ompi_list_is_empty(&seg->registry_entries)) { + ptr = (mca_gpr_replica_core_t*)ompi_list_remove_first(&seg->registry_entries); + OBJ_RELEASE(ptr); } /* empty the segment's dictionary */ - while (0 < ompi_list_get_size(&seg->keytable)) { - ompi_list_remove_last(&seg->keytable); + while (!ompi_list_is_empty(&seg->keytable)) { + keytab = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&seg->keytable); + OBJ_RELEASE(keytab); } + /* empty the list of free keys */ - while (0 < ompi_list_get_size(&seg->freekeys)) { - ompi_list_remove_last(&seg->freekeys); + while (!ompi_list_is_empty(&seg->freekeys)) { + keylst = (mca_gpr_replica_keylist_t*)ompi_list_remove_first(&seg->freekeys); + OBJ_RELEASE(keylst); } /* now remove segment from global registry */ ompi_list_remove_item(&mca_gpr_replica_head.registry, &seg->item); + OBJ_RELEASE(seg); return OMPI_SUCCESS; } @@ -334,28 +332,32 @@ int gpr_replica_empty_segment(mca_gpr_registry_segment_t *seg) /* * A mode of "NONE" or "OVERWRITE" defaults to "XAND" behavior */ -bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list, ompi_list_t *entry_keys) +bool gpr_replica_check_key_list(ompi_registry_mode_t addr_mode, + mca_gpr_replica_key_t num_keys_search, mca_gpr_replica_key_t *keys, + mca_gpr_replica_key_t num_keys_entry, mca_gpr_replica_key_t *entry_keys) { - mca_gpr_keylist_t *keyptr; - mca_gpr_keytable_t *key; - size_t num_keys_search, num_keys_entry, num_found; + mca_gpr_replica_key_t *key1, *key2; + int num_found; bool exclusive, no_match; + int i, j; - if (OMPI_REGISTRY_NONE == mode || - OMPI_REGISTRY_OVERWRITE == mode) { /* set default behavior for search */ - mode = OMPI_REGISTRY_XAND; + /* check for trivial case */ + if (NULL == keys) { /* wildcard case - automatically true */ + return true; } - num_keys_search = ompi_list_get_size(key_list); - num_keys_entry = ompi_list_get_size(entry_keys); + if (OMPI_REGISTRY_NONE == addr_mode || + OMPI_REGISTRY_OVERWRITE == addr_mode) { /* set default behavior for search */ + addr_mode = OMPI_REGISTRY_XAND; + } /* take care of trivial cases that don't require search */ - if ((OMPI_REGISTRY_XAND & mode) && + if ((OMPI_REGISTRY_XAND & addr_mode) && (num_keys_search != num_keys_entry)) { /* can't possibly turn out "true" */ return false; } - if ((OMPI_REGISTRY_AND & mode) && + if ((OMPI_REGISTRY_AND & addr_mode) && (num_keys_search > num_keys_entry)) { /* can't find enough matches */ return false; } @@ -363,17 +365,13 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list /* okay, have to search for remaining possibilities */ num_found = 0; exclusive = true; - for (keyptr = (mca_gpr_keylist_t*)ompi_list_get_first(entry_keys); - keyptr != (mca_gpr_keylist_t*)ompi_list_get_end(entry_keys); - keyptr = (mca_gpr_keylist_t*)ompi_list_get_next(keyptr)) { + for (i=0, key1=entry_keys; i < num_keys_entry; i++, key1++) { no_match = true; - for (key = (mca_gpr_keytable_t*)ompi_list_get_first(key_list); - (key != (mca_gpr_keytable_t*)ompi_list_get_end(key_list)) && no_match; - key = (mca_gpr_keytable_t*)ompi_list_get_next(key)) { - if (key->key == keyptr->key) { /* found a match */ + for (j=0, key2=keys; j < num_keys_search; j++, key2++) { + if (*key1 == *key2) { /* found a match */ num_found++; no_match = false; - if (OMPI_REGISTRY_OR & mode) { /* only need one match */ + if (OMPI_REGISTRY_OR & addr_mode) { /* only need one match */ return true; } } @@ -383,7 +381,7 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list } } - if (OMPI_REGISTRY_XAND & mode) { /* deal with XAND case */ + if (OMPI_REGISTRY_XAND & addr_mode) { /* deal with XAND case */ if (num_found == num_keys_entry) { /* found all, and nothing more */ return true; } else { /* found either too many or not enough */ @@ -391,7 +389,7 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list } } - if (OMPI_REGISTRY_XOR & mode) { /* deal with XOR case */ + if (OMPI_REGISTRY_XOR & addr_mode) { /* deal with XOR case */ if (num_found > 0 && exclusive) { /* found at least one and nothing not on list */ return true; } else { @@ -399,7 +397,7 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list } } - if (OMPI_REGISTRY_AND & mode) { /* deal with AND case */ + if (OMPI_REGISTRY_AND & addr_mode) { /* deal with AND case */ if (num_found == num_keys_search) { /* found all the required keys */ return true; } else { @@ -411,6 +409,166 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list return false; } +int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode, + ompi_registry_mode_t addr_mode, + char *segment, char **tokens, int trigger, + mca_gpr_notify_id_t id_tag) +{ + mca_gpr_replica_segment_t *seg; + mca_gpr_replica_synchro_list_t *synch; + char **tokptr; + mca_gpr_replica_key_t *keyptr; + int i, num_tokens; + + seg = gpr_replica_find_seg(true, segment); + if (NULL == seg) { /* couldn't find segment */ + return OMPI_ERROR; + } + + synch = OBJ_NEW(mca_gpr_replica_synchro_list_t); + + synch->addr_mode = addr_mode; + synch->trigger = trigger; + synch->count = 0; + synch->id_tag = id_tag; + + synch->num_keys = 0; + synch->keys = NULL; + + if (NULL != tokens) { /* tokens provided */ + + /* count number of tokens */ + tokptr = tokens; + num_tokens = 0; + while (NULL != tokptr && NULL != *tokptr) { + num_tokens++; + tokptr++; + } + synch->keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t)); + keyptr = synch->keys; + /* store key values of tokens, defining them if needed */ + for (i=0, tokptr=tokens; NULL != tokptr && NULL != *tokptr; i++, tokptr++) { + *keyptr = gpr_replica_get_key(segment, *tokptr); + if (MCA_GPR_REPLICA_KEY_MAX == *keyptr) { + *keyptr = gpr_replica_define_key(segment, *tokptr); + } + keyptr++; + } + synch->num_keys = num_tokens; + } + + ompi_list_append(&seg->synchros, &synch->item); + + return OMPI_SUCCESS; + +} + + +ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode, char *segment, char **tokens) +{ + ompi_list_t *reg_entries; + ompi_registry_value_t *reg, *obj; + ompi_registry_notify_message_t *msg; + char **tokptr, **tokptr2; + int num_tokens, i; + + /* protect against errors */ + if (NULL == segment) { + return NULL; + } + + reg_entries = gpr_replica_get(addr_mode, segment, tokens); + + if (ompi_list_is_empty(reg_entries)) { + return NULL; + } + + /* compute number of tokens */ + tokptr = tokens; + num_tokens = 0; + while (NULL != *tokptr) { + num_tokens++; + tokptr++; + } + + msg = OBJ_NEW(ompi_registry_notify_message_t); + msg->num_tokens = num_tokens; + msg->tokens = (char**)malloc(num_tokens*(sizeof(char*))); + tokptr = tokens; + tokptr2 = msg->tokens; + for (i=0, tokptr=tokens, tokptr2=msg->tokens; + i < num_tokens; + i++, tokptr++, tokptr2++) { + *tokptr2 = strdup(*tokptr); + } + + while (NULL != (reg = (ompi_registry_value_t*)ompi_list_remove_first(reg_entries))) { + obj = OBJ_NEW(ompi_registry_value_t); + obj->object = (ompi_registry_object_t)malloc(reg->object_size); + memcpy(obj->object, reg->object, reg->object_size); + obj->object_size = reg->object_size; + ompi_list_append(&msg->data, &obj->item); + OBJ_RELEASE(reg); + } + + OBJ_RELEASE(reg_entries); + + return msg; +} + +void gpr_replica_process_triggers(ompi_registry_notify_message_t *message, + mca_gpr_notify_id_t id_tag) +{ + mca_gpr_notify_request_tracker_t *trackptr, *tmpptr; + ompi_registry_object_t *data; + char **tokptr; + int i; + bool found; + + /* protect against errors */ + if (NULL == message) { + return; + } + + /* find corresponding notify request */ + found = false; + for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker); + trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) && !found; + trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) { + if (trackptr->id_tag == id_tag) { + found = true; + } + } + + if (!found) { /* didn't find request */ + ompi_output(0, "Notification error - request not found"); + return; + } + + /* process request */ + if (NULL == trackptr->requestor) { /* local request - callback fn with their tag */ + trackptr->callback(message, trackptr->user_tag); + /* dismantle message and free memory */ + while (NULL != (data = (ompi_registry_object_t*)ompi_list_get_first(&message->data))) { + OBJ_RELEASE(data); + } + for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) { + free(*tokptr); + } + free(message->tokens); + free(message); + + } else { /* remote request - send message back */ + gpr_replica_remote_notify(trackptr->requestor, trackptr->req_tag, message); + } + + /* if one-shot, remove request from tracking system */ + if (OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT & trackptr->synchro) { + tmpptr = (mca_gpr_notify_request_tracker_t*)ompi_list_remove_item(&message->data, &trackptr->item); + OBJ_RELEASE(tmpptr); + } +} + ompi_list_t *gpr_replica_test_internals(int level) { ompi_list_t *test_results, *keylist; @@ -419,37 +577,20 @@ ompi_list_t *gpr_replica_test_internals(int level) char *name3[30]; int i, j, k; mca_gpr_replica_key_t segkey, key; - mca_gpr_registry_segment_t *seg; - mca_gpr_keytable_t *dict_entry; + mca_gpr_replica_segment_t *seg; + mca_gpr_replica_keytable_t *dict_entry; bool success; test_results = OBJ_NEW(ompi_list_t); + ompi_output(0, "testing define segment"); /* create several test segments */ success = true; result = OBJ_NEW(ompi_registry_internal_test_results_t); result->test = strdup("test-create-segment"); for (i=0; i<5 && success; i++) { sprintf(name, "test-def-seg%d", i); - if (OMPI_SUCCESS != gpr_replica_define_segment(name)) { - success = false; - } - } - if (success) { - result->message = strdup("success"); - } else { - result->message = strdup("failed"); - } - ompi_list_append(test_results, &result->item); - - /* check that define key protects uniqueness */ - success = true; - result = OBJ_NEW(ompi_registry_internal_test_results_t); - result->test = strdup("test-define-key-uniqueness"); - for (i=0; i<5 && success; i++) { - sprintf(name, "test-def-seg%d", i); - key = gpr_replica_define_key(name, NULL); - if (OMPI_EXISTS != key) { /* got an error */ + if (NULL == gpr_replica_define_segment(name)) { success = false; } } @@ -460,6 +601,7 @@ ompi_list_t *gpr_replica_test_internals(int level) } ompi_list_append(test_results, &result->item); + ompi_output(0, "testing get key for segment "); /* check ability to get key for a segment */ success = true; result = OBJ_NEW(ompi_registry_internal_test_results_t); @@ -478,12 +620,33 @@ ompi_list_t *gpr_replica_test_internals(int level) } ompi_list_append(test_results, &result->item); + ompi_output(0, "testing define key"); + /* check that define key protects uniqueness */ + success = true; + result = OBJ_NEW(ompi_registry_internal_test_results_t); + result->test = strdup("test-define-key-uniqueness"); + for (i=0; i<5 && success; i++) { + sprintf(name, "test-def-seg%d", i); + segkey = gpr_replica_get_key(name, NULL); + key = gpr_replica_define_key(name, NULL); + if (segkey != key) { /* got an error */ + success = false; + } + } + if (success) { + result->message = strdup("success"); + } else { + result->message = strdup("failed"); + } + ompi_list_append(test_results, &result->item); + + ompi_output(0, "testing find segment"); /* check the ability to find a segment */ i = 2; sprintf(name, "test-def-seg%d", i); result = OBJ_NEW(ompi_registry_internal_test_results_t); result->test = strdup("test-find-seg"); - seg = gpr_replica_find_seg(name); + seg = gpr_replica_find_seg(false, name); if (NULL == seg) { asprintf(&result->message, "test failed with NULL returned: %s", name); } else { /* locate key and check it */ @@ -496,6 +659,7 @@ ompi_list_t *gpr_replica_test_internals(int level) } ompi_list_append(test_results, &result->item); + ompi_output(0, "testing define key within segment"); /* check ability to define key within a segment */ success = true; result = OBJ_NEW(ompi_registry_internal_test_results_t); @@ -518,6 +682,7 @@ ompi_list_t *gpr_replica_test_internals(int level) ompi_list_append(test_results, &result->item); + ompi_output(0, "testing get key within segment"); /* check ability to retrieve key within a segment */ success = true; result = OBJ_NEW(ompi_registry_internal_test_results_t); @@ -540,6 +705,7 @@ ompi_list_t *gpr_replica_test_internals(int level) ompi_list_append(test_results, &result->item); + ompi_output(0, "testing get dict entry - global"); /* check ability to get dictionary entries */ success = true; result = OBJ_NEW(ompi_registry_internal_test_results_t); @@ -559,6 +725,7 @@ ompi_list_t *gpr_replica_test_internals(int level) } ompi_list_append(test_results, &result->item); + ompi_output(0, "testing get dict entry - segment"); if (success) { /* segment values checked out - move on to within a segment */ result = OBJ_NEW(ompi_registry_internal_test_results_t); result->test = strdup("test-get-dict-entry-segment"); @@ -581,6 +748,7 @@ ompi_list_t *gpr_replica_test_internals(int level) } + ompi_output(0, "testing get key list"); /* check ability to get key list */ success = true; result = OBJ_NEW(ompi_registry_internal_test_results_t); diff --git a/src/mca/gpr/replica/gpr_replica_internals.h b/src/mca/gpr/replica/gpr_replica_internals.h index ad71a064a7..3be3a904cd 100644 --- a/src/mca/gpr/replica/gpr_replica_internals.h +++ b/src/mca/gpr/replica/gpr_replica_internals.h @@ -30,11 +30,10 @@ mca_gpr_replica_key_t gpr_replica_get_key(char *segment, char *token); * @param token Pointer to a character string containing the token to be defined. If token=NULL, * the function adds the token to the segment dictionary, thus defining a new segment name. * - * @retval OMPI_EXISTS Indicates that the requested dictionary entry already exists. - * @retval OMPI_ERR_OUT_OF_RESOURCE Indicates that the dictionary is full. - * @retval OMPI_SUCCESS Indicates that the dictionary entry was created. + * @retval key New key value + * @retval MCA_GPR_REPLICA_KEY_MAX Indicates that the dictionary is full or some other error. */ -int gpr_replica_define_key(char *segment, char *token); +mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token); /** Delete a token from a segment's dictionary. * The gpr_replica_deletekey() function allows the removal of a definition from the @@ -63,14 +62,27 @@ int gpr_replica_delete_key(char *segment, char *token); * @retval *seg Pointer to the segment * @retval NULL Indicates that the specified segment could not be found */ -mca_gpr_registry_segment_t *gpr_replica_find_seg(char *segment); +mca_gpr_replica_segment_t *gpr_replica_find_seg(bool create, char *segment); -mca_gpr_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token); +mca_gpr_replica_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token); -int gpr_replica_empty_segment(mca_gpr_registry_segment_t *seg); +int gpr_replica_empty_segment(mca_gpr_replica_segment_t *seg); ompi_list_t *gpr_replica_get_key_list(char *segment, char **tokens); -bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list, ompi_list_t *entry_keys); +bool gpr_replica_check_key_list(ompi_registry_mode_t mode, + mca_gpr_replica_key_t num_keys_search, mca_gpr_replica_key_t *keys, + mca_gpr_replica_key_t num_keys_entry, mca_gpr_replica_key_t *entry_keys); -int gpr_replica_define_segment(char *segment); +mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment); + +int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode, + ompi_registry_mode_t addr_mode, + char *segment, char **tokens, int trigger, + mca_gpr_notify_id_t id_tag); + +ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode, + char *segment, char **tokens); + +void gpr_replica_process_triggers(ompi_registry_notify_message_t *message, + mca_gpr_notify_id_t id_tag); diff --git a/test/mca/gpr/test_gpr_replica.c b/test/mca/gpr/test_gpr_replica.c index 3005c2d8be..a1214e0f2a 100644 --- a/test/mca/gpr/test_gpr_replica.c +++ b/test/mca/gpr/test_gpr_replica.c @@ -41,7 +41,7 @@ int main(int argc, char **argv) ompi_list_t *answer; ompi_registry_value_t *ans; bool multi, hidden; - int i, j; + int i, j, result; bool success; char name[30], *name2[30], *name3[30]; int put_test; /* result from system call */ @@ -65,6 +65,7 @@ int main(int argc, char **argv) exit (1); } + fprintf(test_out, "\n\ngpr opening\n"); /* open the GPR */ if (OMPI_SUCCESS == mca_gpr_base_open()) { fprintf(test_out, "GPR opened\n"); @@ -76,6 +77,7 @@ int main(int argc, char **argv) exit(1); } + fprintf(test_out, "\n\ngpr select\n"); /* startup the GPR replica */ if (OMPI_SUCCESS != mca_gpr_base_select(&multi, &hidden)) { fprintf(test_out, "GPR replica could not start\n"); @@ -87,6 +89,7 @@ int main(int argc, char **argv) test_success(); } + fprintf(test_out, "\n\ntesting internals\n"); /* check internals */ internal_tests = ompi_registry.test_internals(1); if (0 == ompi_list_get_size(internal_tests)) { /* should have been something in list */ @@ -105,6 +108,7 @@ int main(int argc, char **argv) test_success(); } + fprintf(test_out, "\n\ntesting index\n"); /* check index */ test_list = ompi_registry.index(NULL); if (0 == ompi_list_get_size(test_list)) { /* should have been something in dictionary */ @@ -122,7 +126,7 @@ int main(int argc, char **argv) test_success(); } - + fprintf(test_out, "\n\ntesting put function\n"); /* test the put function */ success = true; input_size = 10000; @@ -144,6 +148,7 @@ int main(int argc, char **argv) for (i=0; i<5 && success; i++) { sprintf(name, "test-def-seg%d", i); + fprintf(test_out, "\ttesting seg %s\n", name); if (OMPI_SUCCESS != ompi_registry.put(OMPI_REGISTRY_NONE, name, name2, test_buffer, input_size)) { fprintf(test_out, "put test failed for segment %s\n", name); @@ -163,6 +168,7 @@ int main(int argc, char **argv) exit(1); } + fprintf(test_out, "\n\ntesting overwrite function\n"); /* test the put overwrite function */ success = true; for (i=0; i<5 && success; i++) { @@ -172,10 +178,16 @@ int main(int argc, char **argv) } else { mode = OMPI_REGISTRY_NONE; } + if (OMPI_REGISTRY_OVERWRITE == mode) { + fprintf(test_out, "\toverwrite on - testing segment %s\n", name); + } else if (OMPI_REGISTRY_NONE == mode) { + fprintf(test_out, "\toverwrite off - testing segment %s\n", name); + } put_test = ompi_registry.put(mode, name, name2, test_buffer, input_size); + fprintf(test_out, "\t result %d\n", put_test); if ((OMPI_REGISTRY_OVERWRITE == mode && OMPI_SUCCESS != put_test) || (OMPI_REGISTRY_NONE == mode && OMPI_SUCCESS == put_test)) { - fprintf(test_out, "put test failed for segment %s\n", name); + fprintf(test_out, "put overwrite test failed for segment %s\n", name); for (j=0; j<10; j++) { fprintf(test_out, "\t%s\n", name2[j]); } @@ -272,14 +284,14 @@ int main(int argc, char **argv) /* check the universe segment - should have a key value of "1" */ fclose( test_out ); - /* result = system( cmd_str ); + result = system( cmd_str ); if( result == 0 ) { test_success(); } else { test_failure( "test_gpr_replica ompi_registry init, etc failed"); } - */ + test_finalize(); return(0);