1
1

Massive update of the registry system to incorporate publish/subscribe notifications, including new "synchro" function that allows a barrier-like operation to be performed on a registry segment. Corresponding update to unit test for replica - system passes, but additional new functionality needs to be added to test.

This commit was SVN r2317.
Этот коммит содержится в:
Ralph Castain 2004-08-27 05:23:04 +00:00
родитель 7ccf210a28
Коммит f1ab634fab
11 изменённых файлов: 1169 добавлений и 529 удалений

Просмотреть файл

@ -93,6 +93,82 @@ OBJ_CLASS_INSTANCE(
ompi_registry_internal_test_results_destructor); /* destructor */
/* constructor - used to initialize notify message instance */
static void mca_gpr_notify_request_tracker_construct(mca_gpr_notify_request_tracker_t* req)
{
req->requestor = NULL;
req->req_tag = 0;
req->callback = NULL;
req->user_tag = NULL;
req->id_tag = 0;
req->synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_notify_request_tracker_destructor(mca_gpr_notify_request_tracker_t* req)
{
if (NULL != req->requestor) {
free(req->requestor);
}
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_notify_request_tracker_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_notify_request_tracker_construct, /* constructor */
mca_gpr_notify_request_tracker_destructor); /* destructor */
/* constructor - used to initialize notify idtag list instance */
static void mca_gpr_idtag_list_construct(mca_gpr_idtag_list_t* req)
{
req->id_tag = 0;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_idtag_list_destructor(mca_gpr_idtag_list_t* req)
{
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_idtag_list_tracker_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_idtag_list_construct, /* constructor */
mca_gpr_idtag_list_destructor); /* destructor */
/* constructor - used to initialize notify message instance */
static void ompi_registry_notify_message_construct(ompi_registry_notify_message_t* msg)
{
OBJ_CONSTRUCT(&msg->data, ompi_list_t);
msg->num_tokens = 0;
msg->tokens = NULL;
}
/* destructor - used to free any resources held by instance */
static void ompi_registry_notify_message_destructor(ompi_registry_notify_message_t* msg)
{
uint32_t i;
char **tokptr;
OBJ_DESTRUCT(&msg->data);
for (i=0, tokptr=msg->tokens; i < msg->num_tokens; i++, tokptr++) {
free(*tokptr);
}
if (NULL != msg->tokens) {
free(msg->tokens);
}
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
ompi_registry_notify_message_t, /* type name */
ompi_list_item_t, /* parent "class" name */
ompi_registry_notify_message_construct, /* constructor */
ompi_registry_notify_message_destructor); /* destructor */
/*
* Global variables

Просмотреть файл

@ -39,11 +39,29 @@
#define OMPI_REGISTRY_NOTIFY_ADD_SUBSCRIBER 0x0002 /**< Notifies subscriber when another subscriber added */
#define OMPI_REGISTRY_NOTIFY_DELETE 0x0004 /**< Notifies subscriber when object deleted */
#define OMPI_REGISTRY_NOTIFY_SYNCHRO 0x0008 /**< Indicate that synchro trigger occurred - not valid for subscribe command */
#define OMPI_REGISTRY_NOTIFY_MESSAGE 0x0010 /**< Indicates a notify message */
#define OMPI_REGISTRY_NOTIFY_ALL 0xffff /**< Notifies subscriber upon any action */
typedef uint16_t ompi_registry_notify_action_t;
typedef uint32_t mca_gpr_notify_id_t;
#define MCA_GPR_NOTIFY_ID_MAX UINT32_MAX
/** Return value for notify requests
*/
struct ompi_registry_notify_message_t {
ompi_list_t data; /**< List of data objects */
uint32_t num_tokens;
char **tokens;
};
typedef struct ompi_registry_notify_message_t ompi_registry_notify_message_t;
OBJ_CLASS_DECLARATION(ompi_registry_notify_message_t);
/** Notify callback function
*/
typedef void (*ompi_registry_notify_cb_fn_t)(ompi_registry_notify_message_t *notify_msg, void *user_tag);
/** Define the mode bit-masks for registry operations.
*/
@ -57,6 +75,18 @@ typedef uint16_t ompi_registry_notify_action_t;
typedef uint16_t ompi_registry_mode_t;
/*
* Define synchro mode flags
*/
#define OMPI_REGISTRY_SYNCHRO_MODE_NONE 0x00 /**< No synchronization */
#define OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING 0x01 /**< Notify when trigger is reached, ascending mode */
#define OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING 0x02 /**< Notify when trigger is reached, descending mode */
#define OMPI_REGISTRY_SYNCHRO_MODE_LEVEL 0x04 /**< Notify when trigger is reached, regardless of direction */
#define OMPI_REGISTRY_SYNCHRO_MODE_CONTINUOUS 0x08 /**< Notify whenever conditions are met */
#define OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT 0x10 /**< Fire once, then terminate synchro command */
typedef uint16_t ompi_registry_synchro_mode_t;
/*
* Define flag values for remote commands - only used internally
*/
@ -69,6 +99,7 @@ typedef uint16_t ompi_registry_mode_t;
#define MCA_GPR_SYNCHRO_CMD 0x0040
#define MCA_GPR_GET_CMD 0x0080
#define MCA_GPR_TEST_INTERNALS_CMD 0x0100
#define MCA_GPR_NOTIFY_CMD 0x0200 /**< Indicates a notify message */
#define MCA_GPR_ERROR 0xffff
typedef uint16_t mca_gpr_cmd_flag_t;
@ -83,6 +114,7 @@ typedef uint16_t mca_gpr_cmd_flag_t;
#define MCA_GPR_OOB_PACK_ACTION OMPI_INT16
#define MCA_GPR_OOB_PACK_MODE OMPI_INT16
#define MCA_GPR_OOB_PACK_OBJECT_SIZE OMPI_INT32
#define MCA_GPR_OOB_PACK_SYNCHRO_MODE OMPI_INT16
/*
@ -92,7 +124,6 @@ typedef uint16_t mca_gpr_cmd_flag_t;
typedef void* ompi_registry_object_t;
typedef uint32_t ompi_registry_object_size_t;
/*
* structures
*/
@ -136,28 +167,52 @@ typedef struct ompi_registry_internal_test_results_t ompi_registry_internal_test
OBJ_CLASS_DECLARATION(ompi_registry_internal_test_results_t);
struct mca_gpr_notify_request_tracker_t {
ompi_list_item_t item;
ompi_process_name_t *requestor;
int req_tag;
ompi_registry_notify_cb_fn_t callback;
void *user_tag;
mca_gpr_notify_id_t id_tag;
ompi_registry_synchro_mode_t synchro;
};
typedef struct mca_gpr_notify_request_tracker_t mca_gpr_notify_request_tracker_t;
OBJ_CLASS_DECLARATION(mca_gpr_notify_request_tracker_t);
struct mca_gpr_idtag_list_t {
ompi_list_item_t item;
mca_gpr_notify_id_t id_tag;
};
typedef struct mca_gpr_idtag_list_t mca_gpr_idtag_list_t;
OBJ_CLASS_DECLARATION(mca_gpr_idtag_list_t);
/*
* Component functions that MUST be provided
*/
typedef int (*mca_gpr_base_module_delete_segment_fn_t)(char *segment);
typedef int (*mca_gpr_base_module_put_fn_t)(ompi_registry_mode_t mode, char *segment,
char **tokens, ompi_registry_object_t *object,
char **tokens, ompi_registry_object_t object,
ompi_registry_object_size_t size);
typedef ompi_list_t* (*mca_gpr_base_module_get_fn_t)(ompi_registry_mode_t mode,
char *segment, char **tokens);
typedef int (*mca_gpr_base_module_delete_fn_t)(ompi_registry_mode_t mode,
char *segment, char **tokens);
typedef ompi_list_t* (*mca_gpr_base_module_index_fn_t)(char *segment);
typedef int (*mca_gpr_base_module_subscribe_fn_t)(ompi_process_name_t *subscriber, int tag,
ompi_registry_mode_t mode,
typedef int (*mca_gpr_base_module_subscribe_fn_t)(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens);
typedef int (*mca_gpr_base_module_unsubscribe_fn_t)(ompi_process_name_t *subscriber,
ompi_registry_mode_t mode,
char *segment, char **tokens);
typedef int (*mca_gpr_base_module_synchro_fn_t)(ompi_process_name_t *subscriber, int tag,
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
typedef int (*mca_gpr_base_module_unsubscribe_fn_t)(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
typedef int (*mca_gpr_base_module_synchro_fn_t)(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_mode_t mode,
char *segment, char **tokens, int num);
char *segment, char **tokens, int trigger,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
/*
* test interface for internal functions - optional to provide

Просмотреть файл

@ -71,7 +71,7 @@ int gpr_proxy_delete_segment(char *segment)
int gpr_proxy_put(ompi_registry_mode_t mode, char *segment,
char **tokens, ompi_registry_object_t *object,
char **tokens, ompi_registry_object_t object,
ompi_registry_object_size_t size)
{
ompi_buffer_t cmd;
@ -192,6 +192,7 @@ int gpr_proxy_delete_object(ompi_registry_mode_t mode,
/* compute number of tokens */
tokptr = tokens;
num_tokens = 0;
while (NULL != *tokptr) {
num_tokens++;
tokptr++;
@ -315,24 +316,27 @@ ompi_list_t* gpr_proxy_index(char *segment)
}
int gpr_proxy_subscribe(ompi_process_name_t *subscriber, int tag,
ompi_registry_mode_t mode,
int gpr_proxy_subscribe(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens)
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
int gpr_proxy_unsubscribe(ompi_process_name_t *subscriber, ompi_registry_mode_t mode,
char *segment, char **tokens)
int gpr_proxy_unsubscribe(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag,
int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_mode_t mode,
char *segment, char **tokens, int num)
char *segment, char **tokens, int trigger,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
{
ompi_buffer_t cmd;
ompi_buffer_t answer;
@ -340,14 +344,19 @@ int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag,
char **tokptr;
int recv_tag, i;
int32_t num_tokens, response;
mca_gpr_notify_request_tracker_t *trackptr;
mca_gpr_idtag_list_t *ptr_free_id;
/* need to protect against errors */
if (NULL == segment || NULL == tokens || NULL == *tokens) {
if (NULL == segment) {
return OMPI_ERROR;
}
command = MCA_GPR_SYNCHRO_CMD;
recv_tag = MCA_OOB_TAG_GPR;
if (OMPI_REGISTRY_SYNCHRO_MODE_NONE == synchro_mode) { /* not allowed */
return OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
return OMPI_ERROR;
@ -357,7 +366,7 @@ int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag,
goto CLEANUP;
}
response = (int32_t)tag;
response = (int32_t)synchro_mode;
if (OMPI_SUCCESS != ompi_pack(cmd, &response, 1, OMPI_INT32)) {
goto CLEANUP;
}
@ -370,30 +379,57 @@ int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag,
goto CLEANUP;
}
/* compute number of tokens */
tokptr = tokens;
while (NULL != *tokptr) {
num_tokens++;
tokptr++;
num_tokens = 0;
if (NULL != tokens) {
/* compute number of tokens */
tokptr = tokens;
while (NULL != *tokptr) {
num_tokens++;
tokptr++;
}
}
if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) {
goto CLEANUP;
}
tokptr = tokens;
for (i=0; i<num_tokens; i++) { /* pack the tokens */
if (OMPI_SUCCESS != ompi_pack_string(cmd, *tokptr)) {
goto CLEANUP;
if (0 < num_tokens) {
tokptr = tokens;
for (i=0; i<num_tokens; i++) { /* pack the tokens */
if (OMPI_SUCCESS != ompi_pack_string(cmd, *tokptr)) {
goto CLEANUP;
}
tokptr++;
}
tokptr++;
}
response = (int32_t)num;
response = (int32_t)trigger;
if (OMPI_SUCCESS != ompi_pack(cmd, &response, 1, OMPI_INT32)) {
goto CLEANUP;
}
/* store callback function and user_tag in local list for lookup */
/* generate id_tag to send to replica to identify lookup entry */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
trackptr->requestor = NULL;
trackptr->req_tag = 0;
trackptr->callback = cb_func;
trackptr->user_tag = user_tag;
if (ompi_list_is_empty(&mca_gpr_proxy_free_notify_id_tags)) {
trackptr->id_tag = mca_gpr_proxy_last_notify_id_tag;
mca_gpr_proxy_last_notify_id_tag++;
} else {
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_proxy_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
trackptr->synchro = synchro_mode;
if (OMPI_SUCCESS != ompi_pack(cmd, &trackptr->id_tag, 1, OMPI_INT32)) {
/* remove entry from lookup list */
ompi_list_remove_last(&mca_gpr_proxy_notify_request_tracker);
goto CLEANUP;
}
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
goto CLEANUP;
}
@ -402,20 +438,24 @@ int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag,
goto CLEANUP;
}
ompi_buffer_free(cmd);
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|| (MCA_GPR_SYNCHRO_CMD != command)) {
ompi_buffer_free(answer);
goto CLEANUP;
return OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) {
if ((OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) ||
(OMPI_SUCCESS != response)) {
ompi_buffer_free(answer);
return OMPI_ERROR;
} else {
ompi_buffer_free(answer);
return OMPI_SUCCESS;
}
ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item);
ompi_buffer_free(answer);
return OMPI_SUCCESS;
CLEANUP:
ompi_buffer_free(cmd);
return OMPI_ERROR;

Просмотреть файл

@ -26,12 +26,19 @@ int mca_gpr_proxy_close(void);
mca_gpr_base_module_t* mca_gpr_proxy_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority);
int mca_gpr_proxy_finalize(void);
/*
* proxy-local types
*/
/*
* globals used within proxy component
*/
extern ompi_process_name_t *mca_gpr_my_replica;
extern ompi_list_t mca_gpr_proxy_notify_request_tracker;
extern mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag;
extern ompi_list_t mca_gpr_proxy_free_notify_id_tags;
/*
* Implementation of delete_segment().
@ -42,7 +49,7 @@ extern ompi_process_name_t *mca_gpr_my_replica;
* Implementation of put()
*/
int gpr_proxy_put(ompi_registry_mode_t mode, char *segment,
char **tokens, ompi_registry_object_t *object,
char **tokens, ompi_registry_object_t object,
ompi_registry_object_size_t size);
/*
@ -60,29 +67,32 @@ ompi_list_t* gpr_proxy_index(char *segment);
/*
* Implementation of subscribe()
*/
int gpr_proxy_subscribe(ompi_process_name_t *subscriber, int tag,
ompi_registry_mode_t mode,
int gpr_proxy_subscribe(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens);
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
/*
* Implementation of unsubscribe()
*/
int gpr_proxy_unsubscribe(ompi_process_name_t *subscriber, ompi_registry_mode_t mode,
char *segment, char **tokens);
int gpr_proxy_unsubscribe(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
/*
* Implementation of synchro()
*/
int gpr_proxy_synchro(ompi_process_name_t *subscriber, int tag,
int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_mode_t mode,
char *segment, char **tokens, int num);
char *segment, char **tokens, int trigger,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
/*
* Implementation of get()
*/
ompi_list_t* gpr_proxy_get(ompi_registry_mode_t mode,
char *segment, char **tokens);
ompi_list_t* gpr_proxy_test_internals(int level);
void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender,
ompi_buffer_t buffer, int tag,
void* cbdata);
#endif

Просмотреть файл

@ -57,6 +57,7 @@ static mca_gpr_base_module_t mca_gpr_proxy = {
gpr_proxy_test_internals
};
/*
* Whether or not we allowed this component to be selected
*/
@ -66,6 +67,9 @@ static bool initialized = false;
* globals needed within proxy component
*/
ompi_process_name_t *mca_gpr_my_replica;
ompi_list_t mca_gpr_proxy_notify_request_tracker;
mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag;
ompi_list_t mca_gpr_proxy_free_notify_id_tags;
/*
@ -87,31 +91,44 @@ int mca_gpr_proxy_close(void)
mca_gpr_base_module_t* mca_gpr_proxy_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority)
{
int rc;
/* If we're NOT the seed, then we want to be selected, so do all
the setup and return the module */
if (!ompi_process_info.seed) {
/* Return a module (choose an arbitrary, positive priority --
it's only relevant compared to other ns components). If
we're not the seed, then we don't want to be selected, so
return NULL. */
/* Return a module (choose an arbitrary, positive priority --
it's only relevant compared to other ns components). If
we're not the seed, then we don't want to be selected, so
return NULL. */
*priority = 10;
*priority = 10;
/* We allow multi user threads but don't have any hidden threads */
/* We allow multi user threads but don't have any hidden threads */
*allow_multi_user_threads = true;
*have_hidden_threads = false;
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/* define the replica for us to use - for now, use only the seed */
mca_gpr_my_replica = ompi_name_server.create_process_name(0,0,0);
/* define the replica for us to use - for now, use only the seed */
mca_gpr_my_replica = ompi_name_server.create_process_name(0,0,0);
/* Return the module */
/* initialize the notify list */
OBJ_CONSTRUCT(&mca_gpr_proxy_notify_request_tracker, ompi_list_t);
mca_gpr_proxy_last_notify_id_tag = 0;
OBJ_CONSTRUCT(&mca_gpr_proxy_free_notify_id_tags, ompi_list_t);
initialized = true;
return &mca_gpr_proxy;
/* issue the non-blocking receive */
rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR_NOTIFY, 0, mca_gpr_proxy_notify_recv, NULL);
if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) {
return NULL;
}
/* Return the module */
initialized = true;
return &mca_gpr_proxy;
} else {
return NULL;
return NULL;
}
}
@ -129,3 +146,82 @@ int mca_gpr_proxy_finalize(void)
return OMPI_SUCCESS;
}
/*
* handle notify messages from replicas
*/
void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender,
ompi_buffer_t buffer, int tag,
void* cbdata)
{
char **tokptr;
mca_gpr_cmd_flag_t command;
int32_t num_items, i, id_tag;
ompi_registry_value_t *regval;
ompi_registry_notify_message_t *message;
bool found;
mca_gpr_notify_request_tracker_t *trackptr;
message = OBJ_NEW(ompi_registry_notify_message_t);
if ((OMPI_SUCCESS != ompi_unpack(buffer, &command, 1, MCA_GPR_OOB_PACK_CMD)) ||
(MCA_GPR_NOTIFY_CMD != command)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(buffer, &id_tag, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_items, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
for (i=0; i < num_items; i++) {
regval = OBJ_NEW(ompi_registry_value_t);
if (OMPI_SUCCESS != ompi_unpack(buffer, &regval->object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(buffer, regval->object, regval->object_size, OMPI_BYTE)) {
goto RETURN_ERROR;
}
ompi_list_append(&message->data, &regval->item);
}
if (OMPI_SUCCESS != ompi_unpack(buffer, &message->num_tokens, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
message->tokens = (char**)malloc(message->num_tokens*sizeof(char*));
for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) {
if (OMPI_SUCCESS != ompi_unpack_string(buffer, tokptr)) {
goto RETURN_ERROR;
}
}
/* find the request corresponding to this notify */
found = false;
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) && !found;
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
if (trackptr->id_tag == id_tag) {
found = true;
}
}
if (!found) { /* didn't find request */
ompi_output(0, "Proxy notification error - received request not found");
return;
}
/* process request */
trackptr->callback(message, trackptr->user_tag);
/* dismantle message and free memory */
RETURN_ERROR:
OBJ_RELEASE(message);
ompi_buffer_free(buffer);
return;
}

Просмотреть файл

@ -28,9 +28,9 @@
int gpr_replica_delete_segment(char *segment)
{
mca_gpr_registry_segment_t *seg;
mca_gpr_replica_segment_t *seg;
seg = gpr_replica_find_seg(segment);
seg = gpr_replica_find_seg(true, segment);
if (NULL == seg) { /* couldn't locate segment */
return OMPI_ERROR;
@ -47,65 +47,64 @@ int gpr_replica_delete_segment(char *segment)
return OMPI_SUCCESS;
}
int gpr_replica_put(ompi_registry_mode_t mode, char *segment,
char **tokens, ompi_registry_object_t *object,
int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
char **tokens, ompi_registry_object_t object,
ompi_registry_object_size_t size)
{
ompi_list_t *keys;
mca_gpr_keytable_t *keyptr;
mca_gpr_keylist_t *new_keyptr;
mca_gpr_registry_segment_t *seg;
mca_gpr_registry_core_t *entry_ptr;
ompi_list_t *keylist;
mca_gpr_replica_keytable_t *keyptr;
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_core_t *entry_ptr;
ompi_registry_mode_t put_mode;
mca_gpr_synchro_list_t *synchro;
int return_code;
mca_gpr_replica_synchro_list_t *synchro;
ompi_registry_notify_message_t *notify_msg;
int return_code, num_tokens;
mca_gpr_replica_key_t *keys, *key2;
/* protect ourselves against errors */
if (NULL == segment || NULL == object || 0 == size || NULL == tokens || NULL == *tokens) {
if (NULL == object || 0 == size || NULL == tokens || NULL == *tokens) {
return OMPI_ERROR;
}
put_mode = mode & OMPI_REGISTRY_OVERWRITE; /* only overwrite permission mode flag allowed */
put_mode = addr_mode & OMPI_REGISTRY_OVERWRITE; /* only overwrite permission mode flag allowed */
/* find the segment */
seg = gpr_replica_find_seg(true, segment);
if (NULL == seg) { /* couldn't find segment or create it */
return OMPI_ERROR;
}
/* convert tokens to list of keys */
keys = gpr_replica_get_key_list(segment, tokens);
if (0 >= ompi_list_get_size(keys)) {
keylist = gpr_replica_get_key_list(segment, tokens);
if (0 >= (num_tokens = ompi_list_get_size(keylist))) {
return OMPI_ERROR;
}
keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t));
key2 = keys;
/* traverse the list to find undefined tokens - get new keys for them */
for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys);
keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys);
keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) {
for (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(keylist);
keyptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(keylist);
keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(keyptr)) {
if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* need to get new key */
keyptr->key = gpr_replica_define_key(segment, keyptr->token);
}
}
/* find the segment */
seg = gpr_replica_find_seg(segment);
if (NULL == seg) { /* couldn't find segment - try to create it */
if (0 > gpr_replica_define_segment(segment)) { /* couldn't create it */
return_code = OMPI_ERROR;
goto CLEANUP;
}
seg = gpr_replica_find_seg(segment);
if (NULL == seg) { /* ok, we tried - time to give up */
return_code = OMPI_ERROR;
goto CLEANUP;
}
*key2 = keyptr->key;
key2++;
}
/* see if specified entry already exists */
for (entry_ptr = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries);
entry_ptr != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries);
entry_ptr = (mca_gpr_registry_core_t*)ompi_list_get_next(entry_ptr)) {
if (gpr_replica_check_key_list(put_mode, keys, &entry_ptr->keys)) { /* found existing entry - overwrite if mode set, else error */
for (entry_ptr = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries);
entry_ptr != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries);
entry_ptr = (mca_gpr_replica_core_t*)ompi_list_get_next(entry_ptr)) {
if (gpr_replica_check_key_list(put_mode, num_tokens, keys,
entry_ptr->num_keys, entry_ptr->keys)) {
/* found existing entry - overwrite if mode set, else error */
if (put_mode) { /* overwrite enabled */
free(entry_ptr->object);
entry_ptr->object_size = size;
entry_ptr->object = (ompi_registry_object_t*)malloc(size);
entry_ptr->object = (ompi_registry_object_t)malloc(size);
memcpy(entry_ptr->object, object, size);
return_code = OMPI_SUCCESS;
goto CLEANUP;
@ -117,14 +116,10 @@ int gpr_replica_put(ompi_registry_mode_t mode, char *segment,
}
/* no existing entry - create new one */
entry_ptr = OBJ_NEW(mca_gpr_registry_core_t);
for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys);
keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys);
keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) {
new_keyptr = OBJ_NEW(mca_gpr_keylist_t);
new_keyptr->key = keyptr->key;
ompi_list_append(&entry_ptr->keys, &new_keyptr->item);
}
entry_ptr = OBJ_NEW(mca_gpr_replica_core_t);
entry_ptr->keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t));
memcpy(entry_ptr->keys, keys, num_tokens*sizeof(mca_gpr_replica_key_t));
entry_ptr->num_keys = num_tokens;
entry_ptr->object_size = size;
entry_ptr->object = (ompi_registry_object_t*)malloc(size);
memcpy(entry_ptr->object, object, size);
@ -133,105 +128,126 @@ int gpr_replica_put(ompi_registry_mode_t mode, char *segment,
return_code = OMPI_SUCCESS;
/* update synchro list and check for trigger conditions */
for (synchro = (mca_gpr_synchro_list_t*)ompi_list_get_first(&seg->synchros);
synchro != (mca_gpr_synchro_list_t*)ompi_list_get_end(&seg->synchros);
synchro = (mca_gpr_synchro_list_t*)ompi_list_get_next(synchro)) {
if (gpr_replica_check_key_list(synchro->mode, keys, &synchro->keys)) {
synchro->present = synchro->present + 1;
for (synchro = (mca_gpr_replica_synchro_list_t*)ompi_list_get_first(&seg->synchros);
synchro != (mca_gpr_replica_synchro_list_t*)ompi_list_get_end(&seg->synchros);
synchro = (mca_gpr_replica_synchro_list_t*)ompi_list_get_next(synchro)) {
if (gpr_replica_check_key_list(synchro->addr_mode, synchro->num_keys, synchro->keys,
num_tokens, keys)) {
synchro->count++;
}
if (synchro->present >= synchro->trigger) {
gpr_replica_notify(&synchro->subscribers, OMPI_REGISTRY_NOTIFY_SYNCHRO);
if ((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & synchro->synch_mode ||
OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & synchro->synch_mode) &&
synchro->count == synchro->trigger) {
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, tokens);
gpr_replica_process_triggers(notify_msg, synchro->id_tag);
}
}
CLEANUP:
/* release list of keys */
while (NULL != (keyptr = (mca_gpr_keytable_t*)ompi_list_remove_last(keys))) {
OBJ_DESTRUCT(keyptr);
while (NULL != (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_remove_last(keylist))) {
OBJ_RELEASE(keyptr);
}
OBJ_DESTRUCT(keys);
OBJ_RELEASE(keylist);
return return_code;
}
int gpr_replica_delete_object(ompi_registry_mode_t mode,
int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
char *segment, char **tokens)
{
mca_gpr_registry_core_t *reg, *prev;
mca_gpr_keytable_t *keyptr;
ompi_list_t *keys;
mca_gpr_registry_segment_t *seg;
mca_gpr_replica_core_t *reg, *prev;
mca_gpr_replica_keytable_t *keyptr;
ompi_list_t *keylist;
mca_gpr_replica_key_t *keys, *key2;
mca_gpr_replica_segment_t *seg;
int num_tokens;
keys = NULL;
/* protect against errors */
if (NULL == segment || NULL == tokens || NULL == *tokens) {
if (NULL == segment) {
return OMPI_ERROR;
}
/* find the specified segment */
seg = gpr_replica_find_seg(segment);
seg = gpr_replica_find_seg(false, segment);
if (NULL == seg) { /* segment not found */
return OMPI_ERROR;
}
/* convert tokens to list of keys */
keys = gpr_replica_get_key_list(segment, tokens);
if (0 == ompi_list_get_size(keys)) {
return OMPI_ERROR;
}
keylist = gpr_replica_get_key_list(segment, tokens);
if (0 == (num_tokens = ompi_list_get_size(keylist))) { /* no tokens provided - wildcard case */
keys = NULL;
/* traverse the list to find undefined tokens - error if found */
for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys);
keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys);
keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) {
if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */
return OMPI_ERROR;
} else { /* tokens provided */
keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t));
key2 = keys;
/* traverse the list to find undefined tokens - error if found */
for (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(keylist);
keyptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(keylist);
keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(keyptr)) {
if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */
goto CLEANUP;
}
*key2 = keyptr->key;
key2++;
}
}
/* traverse the segment's registry, looking for matching tokens per the specified mode */
for (reg = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries);
reg != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries);
reg = (mca_gpr_registry_core_t*)ompi_list_get_next(reg)) {
for (reg = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries);
reg != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries);
reg = (mca_gpr_replica_core_t*)ompi_list_get_next(reg)) {
/* for each registry entry, check the key list */
if (gpr_replica_check_key_list(mode, keys, &reg->keys)) { /* found the key(s) on the list */
prev = (mca_gpr_registry_core_t*)ompi_list_get_prev(reg);
if (gpr_replica_check_key_list(addr_mode, num_tokens, keys,
reg->num_keys, reg->keys)) { /* found the key(s) on the list */
prev = (mca_gpr_replica_core_t*)ompi_list_get_prev(reg);
ompi_list_remove_item(&seg->registry_entries, &reg->item);
reg = prev;
}
}
return OMPI_SUCCESS;
CLEANUP:
if (NULL != keys) {
free(keys);
}
return OMPI_ERROR;
}
ompi_list_t* gpr_replica_index(char *segment)
{
ompi_list_t *answer;
mca_gpr_keytable_t *ptr;
mca_gpr_registry_segment_t *seg;
mca_gpr_replica_keytable_t *ptr;
mca_gpr_replica_segment_t *seg;
ompi_registry_index_value_t *ans;
answer = OBJ_NEW(ompi_list_t);
if (NULL == segment) { /* looking for index of global registry */
for (ptr = (mca_gpr_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr = (mca_gpr_keytable_t*)ompi_list_get_next(ptr)) {
for (ptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr)) {
ans = OBJ_NEW(ompi_registry_index_value_t);
ans->token = strdup(ptr->token);
ompi_list_append(answer, &ans->item);
}
} else { /* want index of specific segment */
/* find the specified segment */
seg = gpr_replica_find_seg(segment);
seg = gpr_replica_find_seg(false, segment);
if (NULL == seg) { /* segment not found */
return answer;
}
/* got segment - now index that dictionary */
for (ptr = (mca_gpr_keytable_t*)ompi_list_get_first(&seg->keytable);
ptr != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr = (mca_gpr_keytable_t*)ompi_list_get_next(ptr)) {
for (ptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&seg->keytable);
ptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr)) {
ans = OBJ_NEW(ompi_registry_index_value_t);
ans->token = strdup(ptr->token);
ompi_list_append(answer, &ans->item);
@ -242,112 +258,118 @@ ompi_list_t* gpr_replica_index(char *segment)
return answer;
}
int gpr_replica_subscribe(ompi_process_name_t *subscriber, int tag,
ompi_registry_mode_t mode,
int gpr_replica_subscribe(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens)
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
int gpr_replica_unsubscribe(ompi_process_name_t *subscriber,
ompi_registry_mode_t mode,
char *segment, char **tokens)
int gpr_replica_unsubscribe(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
int gpr_replica_synchro(ompi_process_name_t *subscriber, int tag,
ompi_registry_mode_t mode,
char *segment, char **tokens, int num)
int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_mode_t addr_mode,
char *segment, char **tokens, int trigger,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
{
mca_gpr_registry_segment_t *seg;
mca_gpr_synchro_list_t *synch;
mca_gpr_subscriber_list_t *subs;
char **tokptr;
mca_gpr_keylist_t *keyptr;
int i;
mca_gpr_notify_request_tracker_t *trackptr;
mca_gpr_idtag_list_t *ptr_free_id;
/* protect against errors */
if (NULL == segment || NULL == tokens || NULL == *tokens || 0 > num) {
if (NULL == segment || 0 > trigger) {
return OMPI_ERROR;
}
seg = gpr_replica_find_seg(segment);
if (NULL == seg) { /* couldn't find segment */
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
trackptr->requestor = NULL;
trackptr->req_tag = 0;
trackptr->callback = cb_func;
trackptr->user_tag = user_tag;
if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) {
trackptr->id_tag = mca_gpr_replica_last_notify_id_tag;
mca_gpr_replica_last_notify_id_tag++;
} else {
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
trackptr->synchro = synchro_mode;
/* construct the synchro - add to notify tracking system if success, otherwise dump */
if (OMPI_SUCCESS == gpr_replica_construct_synchro(synchro_mode, addr_mode, segment, tokens,
trigger, trackptr->id_tag)) {
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
return OMPI_SUCCESS;
} else {
OBJ_RELEASE(trackptr);
return OMPI_ERROR;
}
synch = OBJ_NEW(mca_gpr_synchro_list_t);
subs = OBJ_NEW(mca_gpr_subscriber_list_t);
subs->subscriber = subscriber;
subs->tag = tag;
ompi_list_append(&synch->subscribers, &subs->item);
/* store key values of tokens, defining them if needed */
for (i=i, tokptr=tokens; NULL != tokptr && NULL != *tokptr; i++, tokptr++) {
keyptr = OBJ_NEW(mca_gpr_keylist_t);
keyptr->key = gpr_replica_get_key(segment, *tokptr);
if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) {
keyptr->key = gpr_replica_define_key(segment, *tokptr);
}
ompi_list_append(&synch->keys, &keyptr->item);
}
synch->mode = mode;
synch->trigger = num;
synch->present = 0;
ompi_list_append(&seg->synchros, &synch->item);
return OMPI_SUCCESS;
}
ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode,
ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
char *segment, char **tokens)
{
mca_gpr_registry_segment_t *seg;
mca_gpr_replica_segment_t *seg;
ompi_list_t *answer;
ompi_registry_value_t *ans;
ompi_list_t *keys;
mca_gpr_keytable_t *keyptr;
mca_gpr_registry_core_t *reg;
mca_gpr_replica_key_t *keys, *key2;
ompi_list_t *keylist;
mca_gpr_replica_keytable_t *keyptr;
mca_gpr_replica_core_t *reg;
int num_tokens;
answer = OBJ_NEW(ompi_list_t);
/* protect against errors */
if (NULL == segment || NULL == tokens || NULL == *tokens) {
if (NULL == segment) {
return answer;
}
/* find the specified segment */
seg = gpr_replica_find_seg(segment);
seg = gpr_replica_find_seg(false, segment);
if (NULL == seg) { /* segment not found */
return answer;
}
if (NULL == tokens) { /* wildcard case - return everything */
keys = NULL;
} else {
/* convert tokens to list of keys */
keys = gpr_replica_get_key_list(segment, tokens);
if (0 == ompi_list_get_size(keys)) {
return answer;
}
/* convert tokens to list of keys */
keylist = gpr_replica_get_key_list(segment, tokens);
if (0 == (num_tokens = ompi_list_get_size(keylist))) {
return answer;
}
/* traverse the list to find undefined tokens - error if found */
for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys);
keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys);
keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) {
if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */
goto CLEANUP;
keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t));
key2 = keys;
/* traverse the list to find undefined tokens - error if found */
for (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_first(keylist);
keyptr != (mca_gpr_replica_keytable_t*)ompi_list_get_end(keylist);
keyptr = (mca_gpr_replica_keytable_t*)ompi_list_get_next(keyptr)) {
if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */
goto CLEANUP;
}
*key2 = keyptr->key;
key2++;
}
}
/* traverse the segment's registry, looking for matching tokens per the specified mode */
for (reg = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries);
reg != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries);
reg = (mca_gpr_registry_core_t*)ompi_list_get_next(reg)) {
for (reg = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries);
reg != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries);
reg = (mca_gpr_replica_core_t*)ompi_list_get_next(reg)) {
/* for each registry entry, check the key list */
if (gpr_replica_check_key_list(mode, keys, &reg->keys)) { /* found the key(s) on the list */
if (gpr_replica_check_key_list(addr_mode, num_tokens, keys,
reg->num_keys, reg->keys)) { /* found the key(s) on the list */
ans = OBJ_NEW(ompi_registry_value_t);
ans->object_size = reg->object_size;
ans->object = (ompi_registry_object_t*)malloc(ans->object_size);
@ -358,10 +380,13 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode,
CLEANUP:
/* release list of keys */
while (NULL != (keyptr = (mca_gpr_keytable_t*)ompi_list_remove_last(keys))) {
OBJ_DESTRUCT(keyptr);
while (NULL != (keyptr = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(keylist))) {
OBJ_RELEASE(keyptr);
}
OBJ_RELEASE(keylist);
if (NULL != keys) {
free(keys);
}
OBJ_DESTRUCT(keys);
return answer;
}

Просмотреть файл

@ -8,6 +8,9 @@
#include "ompi_config.h"
#include <time.h>
#include "include/types.h"
#include "include/constants.h"
#include "class/ompi_list.h"
@ -16,16 +19,23 @@
/*
* typedefs needed in replica component
*/
/*
* Define the registry "key"
*/
typedef uint32_t mca_gpr_replica_key_t;
#define MCA_GPR_REPLICA_KEY_MAX UINT32_MAX
struct mca_gpr_registry_t {
/*
* Registry "head"
*/
struct mca_gpr_replica_t {
ompi_list_t registry;
ompi_list_t segment_dict;
mca_gpr_replica_key_t lastkey;
ompi_list_t freekeys;
};
typedef struct mca_gpr_registry_t mca_gpr_registry_t;
typedef struct mca_gpr_replica_t mca_gpr_replica_t;
/** Dictionary of token-key pairs.
* This structure is used to create a linked list of token-key pairs. All calls to
@ -34,59 +44,63 @@ typedef struct mca_gpr_registry_t mca_gpr_registry_t;
* for faster searches of the registry. This structure is also used to return token-key pairs
* from the dictionary in response to an ompi_registry_index() call.
*/
struct mca_gpr_keytable_t {
struct mca_gpr_replica_keytable_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
char *token; /**< Char string that defines the key */
mca_gpr_replica_key_t key; /**< Numerical value assigned by registry to represent token string */
};
typedef struct mca_gpr_keytable_t mca_gpr_keytable_t;
typedef struct mca_gpr_replica_keytable_t mca_gpr_replica_keytable_t;
OBJ_CLASS_DECLARATION(mca_gpr_keytable_t);
OBJ_CLASS_DECLARATION(mca_gpr_replica_keytable_t);
/** List of keys that describe a stored object.
* Each object stored in the registry may have as many keys describing it as the
* creator desires. This structure is used to create a linked list of keys
* associated with each object.
*/
struct mca_gpr_keylist_t {
struct mca_gpr_replica_keylist_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
mca_gpr_replica_key_t key; /**< Numerical key that defines stored object */
};
typedef struct mca_gpr_keylist_t mca_gpr_keylist_t;
typedef struct mca_gpr_replica_keylist_t mca_gpr_replica_keylist_t;
OBJ_CLASS_DECLARATION(mca_gpr_keylist_t);
OBJ_CLASS_DECLARATION(mca_gpr_replica_keylist_t);
/** List of subscribers to a stored object.
* Each object can have an arbitrary number of subscribers desiring notification
* upon specified actions being performed against the object. This structure is
* used to create a linked list of subscribers for objects.
/** List of subscribers to objects on a segment.
* Each segment can have an arbitrary number of subscribers desiring notification
* upon specified actions being performed against the objects on the segment. This structure is
* used to create a linked list of subscribers.
*/
struct mca_gpr_subscriber_list_t {
struct mca_gpr_replica_subscriber_list_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
ompi_process_name_t *subscriber; /**< Name of the subscribing process */
int tag; /**< OOB tag of the subscriber */
ompi_registry_mode_t addr_mode; /**< Addressing mode for subscription */
ompi_registry_notify_action_t action; /**< Bit-mask of actions that trigger notification */
mca_gpr_replica_key_t num_keys; /**< Number of keys in array */
mca_gpr_replica_key_t *keys; /**< Array of keys describing subscription */
mca_gpr_notify_id_t id_tag; /**< Tag into the list of notify structures */
};
typedef struct mca_gpr_subscriber_list_t mca_gpr_subscriber_list_t;
typedef struct mca_gpr_replica_subscriber_list_t mca_gpr_replica_subscriber_list_t;
OBJ_CLASS_DECLARATION(mca_gpr_subscriber_list_t);
OBJ_CLASS_DECLARATION(mca_gpr_replica_subscriber_list_t);
/** List of synchro actions for a segment.
* Each object can have an arbitrary number of subscribers desiring notification
* upon specified actions being performed against the object. This structure is
* used to create a linked list of subscribers for objects.
*/
struct mca_gpr_synchro_list_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
ompi_list_t subscribers; /**< List of subscribers to be notified upon synchro */
ompi_list_t keys; /**< List of keys describing the objects being counted*/
ompi_registry_mode_t mode; /**< Mode for selecting objects using keys */
int trigger; /**< Number of objects that trigger notification */
int present; /**< Number of qualifying objects currently in segment */
struct mca_gpr_replica_synchro_list_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
ompi_registry_synchro_mode_t synch_mode; /**< Synchro mode - ascending, descending, ... */
ompi_registry_mode_t addr_mode; /**< Addressing mode */
mca_gpr_replica_key_t num_keys; /**< Number of keys in array */
mca_gpr_replica_key_t *keys; /**< Array of keys describing objects to be counted */
uint32_t trigger; /**< Number of objects that trigger notification */
uint32_t count; /**< Number of qualifying objects currently in segment */
mca_gpr_notify_id_t id_tag; /**< Tag into the list of notify structures */
};
typedef struct mca_gpr_synchro_list_t mca_gpr_synchro_list_t;
typedef struct mca_gpr_replica_synchro_list_t mca_gpr_replica_synchro_list_t;
OBJ_CLASS_DECLARATION(mca_gpr_synchro_list_t);
OBJ_CLASS_DECLARATION(mca_gpr_replica_synchro_list_t);
/** List of replicas that hold a stored object.
* Each object can have an arbitrary number of replicas that hold a copy
@ -109,12 +123,12 @@ OBJ_CLASS_DECLARATION(mca_gpr_replica_list_t);
* of the object within the global registry, and the replica holding the last known
* up-to-date version of the object.
*/
struct mca_gpr_write_invalidate_t {
struct mca_gpr_replica_write_invalidate_t {
bool invalidate;
time_t last_mod;
ompi_process_name_t *valid_replica;
};
typedef struct mca_gpr_write_invalidate_t mca_gpr_write_invalidate_t;
typedef struct mca_gpr_replica_write_invalidate_t mca_gpr_replica_write_invalidate_t;
/** The core registry structure.
@ -129,18 +143,18 @@ typedef struct mca_gpr_write_invalidate_t mca_gpr_write_invalidate_t;
* object are automatically granted. This may be changed at some future time by adding an
* "authorization" linked list of ID's and their access rights to this structure.
*/
struct mca_gpr_registry_core_t {
struct mca_gpr_replica_core_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
ompi_list_t keys; /**< Linked list of keys that define stored object */
mca_gpr_replica_key_t num_keys; /**< Number of keys in array */
mca_gpr_replica_key_t *keys; /**< Array of keys that define stored object */
ompi_registry_object_size_t object_size; /**< Size of stored object, in bytes */
ompi_registry_object_t *object; /**< Pointer to stored object */
ompi_list_t subscriber; /**< Linked list of subscribers to this object */
ompi_list_t replicas; /**< Linked list of replicas that also contain this object */
mca_gpr_write_invalidate_t write_invalidate; /**< Structure containing write invalidate info */
mca_gpr_replica_write_invalidate_t write_invalidate; /**< Structure containing write invalidate info */
};
typedef struct mca_gpr_registry_core_t mca_gpr_registry_core_t;
typedef struct mca_gpr_replica_core_t mca_gpr_replica_core_t;
OBJ_CLASS_DECLARATION(mca_gpr_registry_core_t);
OBJ_CLASS_DECLARATION(mca_gpr_replica_core_t);
/** Registry segment definition.
* The registry is subdivided into segments, each defining a unique domain. The "universe" segment
@ -151,24 +165,28 @@ OBJ_CLASS_DECLARATION(mca_gpr_registry_core_t);
* list of registry elements for that segment. Each segment also holds its own token-key dictionary
* to avoid naming conflicts between tokens from CommWorlds sharing a given universe.
*/
struct mca_gpr_registry_segment_t {
struct mca_gpr_replica_segment_t {
ompi_list_item_t item; /**< Allows this item to be placed on a list */
mca_gpr_replica_key_t segment; /**< Key corresponding to name of registry segment */
mca_gpr_replica_key_t lastkey; /**< Highest key value used */
ompi_list_t registry_entries; /**< Linked list of stored objects within this segment */
ompi_list_t subscriber; /**< List of subscriptions to objects on this segment */
ompi_list_t synchros; /**< List of synchro requests on this segment */
ompi_list_t keytable; /**< Token-key dictionary for this segment */
ompi_list_t freekeys; /**< List of keys that have been made available */
};
typedef struct mca_gpr_registry_segment_t mca_gpr_registry_segment_t;
typedef struct mca_gpr_replica_segment_t mca_gpr_replica_segment_t;
OBJ_CLASS_DECLARATION(mca_gpr_registry_segment_t);
OBJ_CLASS_DECLARATION(mca_gpr_replica_segment_t);
/*
* globals needed within component
*/
extern mca_gpr_registry_t mca_gpr_replica_head;
extern mca_gpr_replica_t mca_gpr_replica_head;
extern ompi_list_t mca_gpr_replica_notify_request_tracker;
extern mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
extern ompi_list_t mca_gpr_replica_free_notify_id_tags;
/*
* Module open / close
@ -187,31 +205,42 @@ int mca_gpr_replica_finalize(void);
* Implemented registry functions
*/
int gpr_replica_define_segment(char *segment);
int gpr_replica_delete_segment(char *segment);
int gpr_replica_put(ompi_registry_mode_t mode, char *segment,
char **tokens, ompi_registry_object_t *object,
char **tokens, ompi_registry_object_t object,
ompi_registry_object_size_t size);
int gpr_replica_delete_object(ompi_registry_mode_t mode,
char *segment, char **tokens);
ompi_list_t* gpr_replica_index(char *segment);
int gpr_replica_subscribe(ompi_process_name_t *subscriber, int tag,
ompi_registry_mode_t mode,
int gpr_replica_subscribe(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens);
int gpr_replica_unsubscribe(ompi_process_name_t *subscriber, ompi_registry_mode_t mode,
char *segment, char **tokens);
int gpr_replica_synchro(ompi_process_name_t *subscriber, int tag,
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
int gpr_replica_unsubscribe(ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_mode_t mode,
char *segment, char **tokens, int num);
char *segment, char **tokens, int trigger,
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode,
char *segment, char **tokens);
ompi_list_t* gpr_replica_test_internals(int level);
void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
ompi_buffer_t buffer, int tag,
void* cbdata);
void gpr_replica_notify(ompi_list_t *subscribers, ompi_registry_notify_action_t flag);
void gpr_replica_remote_notify(ompi_process_name_t *recipient, int recipient_tag,
ompi_registry_notify_message_t *message);
#endif

Просмотреть файл

@ -13,6 +13,8 @@
*/
#include "ompi_config.h"
#include <time.h>
#include "include/constants.h"
#include "util/proc_info.h"
#include "util/output.h"
@ -70,18 +72,21 @@ static bool initialized = false;
/*
* globals needed within replica component
*/
mca_gpr_registry_t mca_gpr_replica_head;
mca_gpr_replica_t mca_gpr_replica_head;
ompi_list_t mca_gpr_replica_notify_request_tracker;
mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
ompi_list_t mca_gpr_replica_free_notify_id_tags;
/* constructor - used to initialize state of keytable instance */
static void mca_gpr_keytable_construct(mca_gpr_keytable_t* keytable)
static void mca_gpr_replica_keytable_construct(mca_gpr_replica_keytable_t* keytable)
{
keytable->token = NULL;
keytable->key = 0;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_keytable_destructor(mca_gpr_keytable_t* keytable)
static void mca_gpr_replica_keytable_destructor(mca_gpr_replica_keytable_t* keytable)
{
if (NULL != keytable->token) {
free(keytable->token);
@ -90,78 +95,81 @@ static void mca_gpr_keytable_destructor(mca_gpr_keytable_t* keytable)
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_keytable_t, /* type name */
mca_gpr_replica_keytable_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_keytable_construct, /* constructor */
mca_gpr_keytable_destructor); /* destructor */
mca_gpr_replica_keytable_construct, /* constructor */
mca_gpr_replica_keytable_destructor); /* destructor */
/* constructor - used to initialize state of keylist instance */
static void mca_gpr_keylist_construct(mca_gpr_keylist_t* keylist)
static void mca_gpr_replica_keylist_construct(mca_gpr_replica_keylist_t* keylist)
{
keylist->key = 0;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_keylist_destructor(mca_gpr_keylist_t* keylist)
static void mca_gpr_replica_keylist_destructor(mca_gpr_replica_keylist_t* keylist)
{
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_keylist_t, /* type name */
mca_gpr_replica_keylist_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_keylist_construct, /* constructor */
mca_gpr_keylist_destructor); /* destructor */
mca_gpr_replica_keylist_construct, /* constructor */
mca_gpr_replica_keylist_destructor); /* destructor */
/* constructor - used to initialize state of subscriber list instance */
static void mca_gpr_subscriber_list_construct(mca_gpr_subscriber_list_t* subscriber)
static void mca_gpr_replica_subscriber_list_construct(mca_gpr_replica_subscriber_list_t* subscriber)
{
subscriber->subscriber = NULL;
subscriber->tag = 0;
subscriber->action = 0x00;
subscriber->addr_mode = 0;
subscriber->action = 0;
subscriber->keys = NULL;
subscriber->id_tag = 0;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_subscriber_list_destructor(mca_gpr_subscriber_list_t* subscriber)
static void mca_gpr_replica_subscriber_list_destructor(mca_gpr_replica_subscriber_list_t* subscriber)
{
if (NULL != subscriber->subscriber) {
free(subscriber->subscriber);
if (NULL != subscriber->keys) {
free(subscriber->keys);
}
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_subscriber_list_t, /* type name */
mca_gpr_replica_subscriber_list_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_subscriber_list_construct, /* constructor */
mca_gpr_subscriber_list_destructor); /* destructor */
mca_gpr_replica_subscriber_list_construct, /* constructor */
mca_gpr_replica_subscriber_list_destructor); /* destructor */
/* constructor - used to initialize state of synchro list instance */
static void mca_gpr_synchro_list_construct(mca_gpr_synchro_list_t* synchro)
static void mca_gpr_replica_synchro_list_construct(mca_gpr_replica_synchro_list_t* synchro)
{
OBJ_CONSTRUCT(&synchro->subscribers, ompi_list_t);
OBJ_CONSTRUCT(&synchro->keys, ompi_list_t);
synchro->mode = 0x00;
synchro->synch_mode = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
synchro->addr_mode = OMPI_REGISTRY_NONE;
synchro->keys = NULL;
synchro->trigger = 0;
synchro->present = 0;
synchro->count = 0;
synchro->id_tag = 0;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_synchro_list_destructor(mca_gpr_synchro_list_t* synchro)
static void mca_gpr_replica_synchro_list_destructor(mca_gpr_replica_synchro_list_t* synchro)
{
OBJ_DESTRUCT(&synchro->subscribers);
OBJ_DESTRUCT(&synchro->keys);
if (NULL != synchro->keys) {
free(synchro->keys);
}
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_synchro_list_t, /* type name */
mca_gpr_replica_synchro_list_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_synchro_list_construct, /* constructor */
mca_gpr_synchro_list_destructor); /* destructor */
mca_gpr_replica_synchro_list_construct, /* constructor */
mca_gpr_replica_synchro_list_destructor); /* destructor */
/* constructor - used to initialize state of replica list instance */
@ -188,50 +196,58 @@ OBJ_CLASS_INSTANCE(
/* constructor - used to initialize state of registry core instance */
static void mca_gpr_registry_core_construct(mca_gpr_registry_core_t* reg)
static void mca_gpr_replica_core_construct(mca_gpr_replica_core_t* reg)
{
OBJ_CONSTRUCT(&reg->keys, ompi_list_t);
reg->keys = NULL;
reg->object_size = 0;
reg->object = NULL;
OBJ_CONSTRUCT(&reg->subscriber, ompi_list_t);
OBJ_CONSTRUCT(&reg->replicas, ompi_list_t);
reg->write_invalidate.invalidate = false;
reg->write_invalidate.last_mod = 0;
reg->write_invalidate.valid_replica = NULL;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_registry_core_destructor(mca_gpr_registry_core_t* reg)
static void mca_gpr_replica_core_destructor(mca_gpr_replica_core_t* reg)
{
OBJ_DESTRUCT(&reg->keys);
if (NULL != reg->keys) {
free(reg->keys);
}
if (NULL != reg->object) {
free(reg->object);
}
OBJ_DESTRUCT(&reg->subscriber);
if (NULL != reg->write_invalidate.valid_replica) {
free(reg->write_invalidate.valid_replica);
}
OBJ_DESTRUCT(&reg->replicas);
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_registry_core_t, /* type name */
mca_gpr_replica_core_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_registry_core_construct, /* constructor */
mca_gpr_registry_core_destructor); /* destructor */
mca_gpr_replica_core_construct, /* constructor */
mca_gpr_replica_core_destructor); /* destructor */
/* constructor - used to initialize state of segment instance */
static void mca_gpr_registry_segment_construct(mca_gpr_registry_segment_t* seg)
static void mca_gpr_replica_segment_construct(mca_gpr_replica_segment_t* seg)
{
seg->segment = 0;
seg->lastkey = 0;
OBJ_CONSTRUCT(&seg->registry_entries, ompi_list_t);
OBJ_CONSTRUCT(&seg->subscriber, ompi_list_t);
OBJ_CONSTRUCT(&seg->synchros, ompi_list_t);
OBJ_CONSTRUCT(&seg->keytable, ompi_list_t);
OBJ_CONSTRUCT(&seg->freekeys, ompi_list_t);
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_registry_segment_destructor(mca_gpr_registry_segment_t* seg)
static void mca_gpr_replica_segment_destructor(mca_gpr_replica_segment_t* seg)
{
OBJ_DESTRUCT(&seg->registry_entries);
OBJ_DESTRUCT(&seg->subscriber);
OBJ_DESTRUCT(&seg->synchros);
OBJ_DESTRUCT(&seg->keytable);
OBJ_DESTRUCT(&seg->freekeys);
@ -239,10 +255,10 @@ static void mca_gpr_registry_segment_destructor(mca_gpr_registry_segment_t* seg)
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_registry_segment_t, /* type name */
mca_gpr_replica_segment_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_registry_segment_construct, /* constructor */
mca_gpr_registry_segment_destructor); /* destructor */
mca_gpr_replica_segment_construct, /* constructor */
mca_gpr_replica_segment_destructor); /* destructor */
/*
@ -265,6 +281,7 @@ int mca_gpr_replica_close(void)
mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority)
{
ompi_output(0, "entered replica init");
/* If we're the seed, then we want to be selected, so do all the
setup and return the module */
@ -286,16 +303,29 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
/* initialize the registry head */
OBJ_CONSTRUCT(&mca_gpr_replica_head.registry, ompi_list_t);
ompi_output(0, "registry head setup");
/* initialize the global dictionary for segment id's */
OBJ_CONSTRUCT(&mca_gpr_replica_head.segment_dict, ompi_list_t);
OBJ_CONSTRUCT(&mca_gpr_replica_head.freekeys, ompi_list_t);
mca_gpr_replica_head.lastkey = 0;
/* issue the non-blocking receive */
rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);
if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) {
return NULL;
}
ompi_output(0, "global dict setup");
/* initialize the notify request tracker */
OBJ_CONSTRUCT(&mca_gpr_replica_notify_request_tracker, ompi_list_t);
mca_gpr_replica_last_notify_id_tag = 0;
OBJ_CONSTRUCT(&mca_gpr_replica_free_notify_id_tags, ompi_list_t);
ompi_output(0, "req tracker setup");
/* /\* issue the non-blocking receive *\/ */
/* rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); */
/* if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { */
/* return NULL; */
/* } */
ompi_output(0, "nb receive setup");
/* Return the module */
@ -315,6 +345,8 @@ int mca_gpr_replica_finalize(void)
if (initialized) {
OBJ_DESTRUCT(&mca_gpr_replica_head);
OBJ_DESTRUCT(&mca_gpr_replica_notify_request_tracker);
OBJ_DESTRUCT(&mca_gpr_replica_free_notify_id_tags);
initialized = false;
}
@ -341,10 +373,12 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
ompi_registry_internal_test_results_t *testval;
ompi_registry_index_value_t *indexval;
char **tokens, **tokptr;
int32_t num_tokens, test_level, i, trigger;
int32_t num_tokens, test_level, i, trigger, id_tag;
mca_gpr_cmd_flag_t command;
char *segment;
int32_t response, target_tag;
int32_t response, synchro_mode;
mca_gpr_notify_request_tracker_t *trackptr;
mca_gpr_idtag_list_t *ptr_free_id;
if (OMPI_SUCCESS != ompi_buffer_init(&answer, 0)) {
/* RHC -- not sure what to do if this fails */
@ -354,6 +388,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
/****** DELETE SEGMENT *****/
if (MCA_GPR_DELETE_SEGMENT_CMD == command) { /* got command to delete a segment */
if (0 > ompi_unpack_string(buffer, &segment)) {
goto RETURN_ERROR;
@ -370,6 +405,8 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
/***** PUT *****/
} else if (MCA_GPR_PUT_CMD == command) { /* got command to put object on registry */
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
@ -384,6 +421,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
if (0 >= num_tokens) { /** no tokens provided - error for PUT */
goto RETURN_ERROR;
}
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
@ -399,6 +440,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
if (0 >= object_size) { /* error condition - nothing to store */
goto RETURN_ERROR;
}
object = (ompi_registry_object_t)malloc(object_size);
if (OMPI_SUCCESS != ompi_unpack(buffer, object, object_size, OMPI_BYTE)) {
goto RETURN_ERROR;
@ -416,6 +461,8 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
/***** GET *****/
} else if (MCA_GPR_GET_CMD == command) { /* got command to put object on registry */
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
@ -430,15 +477,19 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
for (i=0; i<num_tokens; i++) {
if (0 > ompi_unpack_string(buffer, tokptr)) {
goto RETURN_ERROR;
if (0 >= num_tokens) { /* no tokens provided - wildcard case */
tokens = NULL;
} else { /* tokens provided */
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
for (i=0; i<num_tokens; i++) {
if (0 > ompi_unpack_string(buffer, tokptr)) {
goto RETURN_ERROR;
}
tokptr++;
}
tokptr++;
*tokptr = NULL;
}
*tokptr = NULL;
returned_list = ompi_registry.get(mode, segment, tokens);
@ -466,6 +517,8 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
/***** DELETE OBJECT *****/
} else if (MCA_GPR_DELETE_OBJECT_CMD == command) {
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
@ -480,15 +533,19 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
for (i=0; i<num_tokens; i++) {
if (0 > ompi_unpack_string(buffer, tokptr)) {
goto RETURN_ERROR;
if (0 >= num_tokens) { /* no tokens provided - wildcard case */
tokens = NULL;
} else { /* tokens provided */
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
for (i=0; i<num_tokens; i++) {
if (0 > ompi_unpack_string(buffer, tokptr)) {
goto RETURN_ERROR;
}
tokptr++;
}
tokptr++;
*tokptr = NULL;
}
*tokptr = NULL;
response = (int32_t)ompi_registry.delete_object(mode, segment, tokens);
@ -504,6 +561,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/* RHC -- not sure what to do if the return send fails */
}
/***** INDEX *****/
} else if (MCA_GPR_INDEX_CMD == command) {
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
@ -543,15 +601,22 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/* RHC -- not sure what to do if the return send fails */
}
/***** SUBSCRIBE *****/
} else if (MCA_GPR_SUBSCRIBE_CMD == command) {
action = OMPI_REGISTRY_NOTIFY_ALL;
goto RETURN_ERROR;
/***** UNSUBSCRIBE *****/
} else if (MCA_GPR_UNSUBSCRIBE_CMD == command) {
goto RETURN_ERROR;
/***** SYNCHRO *****/
} else if (MCA_GPR_SYNCHRO_CMD == command) {
if (OMPI_SUCCESS != ompi_unpack(buffer, &target_tag, 1, OMPI_INT32)) {
if (OMPI_SUCCESS != ompi_unpack(buffer, &synchro_mode, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (OMPI_REGISTRY_SYNCHRO_MODE_NONE == synchro_mode) {
goto RETURN_ERROR;
}
@ -567,21 +632,46 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
for (i=0; i<num_tokens; i++) {
if (0 > ompi_unpack_string(buffer, tokptr)) {
goto RETURN_ERROR;
if (0 < num_tokens) { /* tokens provided */
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
for (i=0; i<num_tokens; i++) {
if (0 > ompi_unpack_string(buffer, tokptr)) {
goto RETURN_ERROR;
}
tokptr++;
}
tokptr++;
*tokptr = NULL;
} else { /* no tokens provided - wildcard case, just count entries on segment */
tokens = NULL;
}
*tokptr = NULL;
if (OMPI_SUCCESS != ompi_unpack(buffer, &trigger, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
response = (int32_t)ompi_registry.synchro(sender, (int)target_tag, mode, segment, tokens, (int)trigger);
if (OMPI_SUCCESS != ompi_unpack(buffer, &id_tag, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
trackptr->requestor = ompi_name_server.copy_process_name(sender);
trackptr->req_tag = id_tag;
trackptr->callback = NULL;
trackptr->user_tag = NULL;
if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) {
trackptr->id_tag = mca_gpr_replica_last_notify_id_tag;
mca_gpr_replica_last_notify_id_tag++;
} else {
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
trackptr->synchro = synchro_mode;
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
response = (int32_t)gpr_replica_construct_synchro(synchro_mode, mode, segment, tokens,
trigger, trackptr->id_tag);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -595,7 +685,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/* RHC -- not sure what to do if the return send fails */
}
/***** TEST INTERNALS *****/
} else if (MCA_GPR_TEST_INTERNALS_CMD == command) {
if ((OMPI_SUCCESS != ompi_unpack(buffer, &test_level, 1, OMPI_INT32)) ||
@ -630,7 +720,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/* RHC -- not sure what to do if the return send fails */
}
/**** UNRECOGNIZED ****/
} else { /* got an unrecognized command */
RETURN_ERROR:
ompi_buffer_init(&error_answer, 8);
@ -648,34 +738,61 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
}
void gpr_replica_notify(ompi_list_t *subscribers, ompi_registry_notify_action_t flag)
void gpr_replica_remote_notify(ompi_process_name_t *recipient, int recipient_tag,
ompi_registry_notify_message_t *message)
{
mca_gpr_subscriber_list_t *subs;
ompi_buffer_t msg;
ompi_registry_notify_action_t command;
mca_gpr_cmd_flag_t command;
int32_t num_items, i;
ompi_registry_value_t *regval;
char **tokptr;
int recv_tag;
command = OMPI_REGISTRY_NOTIFY_MESSAGE;
command = MCA_GPR_NOTIFY_CMD;
recv_tag = MCA_OOB_TAG_GPR_NOTIFY;
for (subs = (mca_gpr_subscriber_list_t*)ompi_list_get_first(subscribers);
subs != (mca_gpr_subscriber_list_t*)ompi_list_get_end(subscribers);
subs = (mca_gpr_subscriber_list_t*)ompi_list_get_next(subs)) {
if (OMPI_SUCCESS != ompi_buffer_init(&msg, 0)) {
return;
}
if (OMPI_SUCCESS != ompi_pack(msg, &command, 1, MCA_GPR_OOB_PACK_ACTION)) {
return;
}
if (OMPI_SUCCESS != ompi_pack(msg, &flag, 1, MCA_GPR_OOB_PACK_ACTION)) {
return;
}
if (0 > mca_oob_send_packed(subs->subscriber, msg, subs->tag, 0)) {
return;
}
ompi_buffer_free(msg);
if (OMPI_SUCCESS != ompi_buffer_init(&msg, 0)) {
return;
}
if (OMPI_SUCCESS != ompi_pack(msg, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
return;
}
i = (int32_t)recipient_tag;
if (OMPI_SUCCESS != ompi_pack(msg, &i, 1, OMPI_INT32)) {
return;
}
num_items = (int32_t)ompi_list_get_size(&message->data);
if (OMPI_SUCCESS != ompi_pack(msg, &num_items, 1, OMPI_INT32)) {
return;
}
if (0 < num_items) { /* don't send anything else back if the list is empty */
while (NULL != (regval = (ompi_registry_value_t*)ompi_list_remove_first(&message->data))) {
if (OMPI_SUCCESS != ompi_pack(msg, &regval->object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) {
return;
}
if (OMPI_SUCCESS != ompi_pack(msg, regval->object, regval->object_size, OMPI_BYTE)) {
return;
}
}
}
if (OMPI_SUCCESS != ompi_pack(msg, &message->num_tokens, 1, OMPI_INT32)) {
return;
}
for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) {
if (OMPI_SUCCESS != ompi_pack_string(msg, *tokptr)) {
return;
}
}
if (0 > mca_oob_send_packed(recipient, msg, recv_tag, 0)) {
return;
}
ompi_buffer_free(msg);
OBJ_RELEASE(message);
}

Просмотреть файл

@ -32,77 +32,75 @@
*
*/
int gpr_replica_define_segment(char *segment)
mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment)
{
mca_gpr_registry_segment_t *seg;
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_key_t key;
int response;
response = gpr_replica_define_key(segment, NULL);
if (0 > response && OMPI_EXISTS != response) { /* got some kind of error code */
return response;
key = gpr_replica_define_key(segment, NULL);
if (MCA_GPR_REPLICA_KEY_MAX == key) { /* got some kind of error code */
return NULL;
}
/* need to add the segment to the registry */
key = gpr_replica_get_key(segment, NULL);
if (MCA_GPR_REPLICA_KEY_MAX == key) { /* couldn't retrieve it */
return OMPI_ERROR;
}
seg = OBJ_NEW(mca_gpr_registry_segment_t);
seg = OBJ_NEW(mca_gpr_replica_segment_t);
seg->segment = key;
ompi_list_append(&mca_gpr_replica_head.registry, &seg->item);
return OMPI_SUCCESS;
return seg;
}
mca_gpr_registry_segment_t *gpr_replica_find_seg(char *segment)
mca_gpr_replica_segment_t *gpr_replica_find_seg(bool create, char *segment)
{
mca_gpr_keytable_t *ptr_seg;
mca_gpr_registry_segment_t *seg;
mca_gpr_replica_keytable_t *ptr_seg;
mca_gpr_replica_segment_t *seg;
/* search the registry segments to find which one is being referenced */
for (ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_seg)) {
for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) {
if (0 == strcmp(segment, ptr_seg->token)) {
/* search mca_gpr_replica_head to find segment */
for (seg=(mca_gpr_registry_segment_t*)ompi_list_get_first(&mca_gpr_replica_head.registry);
seg != (mca_gpr_registry_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry);
seg = (mca_gpr_registry_segment_t*)ompi_list_get_next(seg)) {
for (seg=(mca_gpr_replica_segment_t*)ompi_list_get_first(&mca_gpr_replica_head.registry);
seg != (mca_gpr_replica_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry);
seg = (mca_gpr_replica_segment_t*)ompi_list_get_next(seg)) {
if(seg->segment == ptr_seg->key) {
return(seg);
}
}
}
}
/* didn't find the dictionary entry */
return NULL;
if (create) {
/* didn't find the dictionary entry - create it */
return gpr_replica_define_segment(segment);
}
return NULL; /* don't create it - just return NULL */
}
mca_gpr_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token)
mca_gpr_replica_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token)
{
mca_gpr_keytable_t *ptr_seg;
mca_gpr_keytable_t *ptr_key;
mca_gpr_registry_segment_t *seg;
mca_gpr_replica_keytable_t *ptr_seg;
mca_gpr_replica_keytable_t *ptr_key;
mca_gpr_replica_segment_t *seg;
/* search the registry segments to find which one is being referenced */
for (ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_seg)) {
for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) {
if (0 == strcmp(segment, ptr_seg->token)) {
if (NULL == token) { /* just want segment token-key pair */
return(ptr_seg);
}
/* search registry to find segment */
for (seg=(mca_gpr_registry_segment_t*)ompi_list_get_first(&mca_gpr_replica_head.registry);
seg != (mca_gpr_registry_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry);
seg = (mca_gpr_registry_segment_t*)ompi_list_get_next(seg)) {
for (seg=(mca_gpr_replica_segment_t*)ompi_list_get_first(&mca_gpr_replica_head.registry);
seg != (mca_gpr_replica_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry);
seg = (mca_gpr_replica_segment_t*)ompi_list_get_next(seg)) {
if(seg->segment == ptr_seg->key) {
/* got segment - now find specified token-key pair in that dictionary */
for (ptr_key = (mca_gpr_keytable_t*)ompi_list_get_first(&seg->keytable);
ptr_key != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr_key = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_key)) {
for (ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&seg->keytable);
ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) {
if (0 == strcmp(token, ptr_key->token)) {
return(ptr_key);
}
@ -119,7 +117,7 @@ mca_gpr_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token)
mca_gpr_replica_key_t gpr_replica_get_key(char *segment, char *token)
{
mca_gpr_keytable_t *ptr_key;
mca_gpr_replica_keytable_t *ptr_key;
/* find registry segment */
ptr_key = gpr_replica_find_dict_entry(segment, NULL);
@ -142,18 +140,18 @@ ompi_list_t *gpr_replica_get_key_list(char *segment, char **tokens)
{
ompi_list_t *keys;
char **token;
mca_gpr_keytable_t *keyptr;
mca_gpr_replica_keytable_t *keyptr;
token = tokens;
keys = OBJ_NEW(ompi_list_t);
/* protect against errors */
if (NULL == segment || NULL == tokens || NULL == *token) {
if (NULL == segment || NULL == tokens) {
return keys;
}
while (NULL != *token) { /* traverse array of tokens until NULL */
keyptr = OBJ_NEW(mca_gpr_keytable_t);
keyptr = OBJ_NEW(mca_gpr_replica_keytable_t);
keyptr->token = strdup(*token);
keyptr->key = gpr_replica_get_key(segment, *token);
ompi_list_append(keys, &keyptr->item);
@ -162,10 +160,10 @@ ompi_list_t *gpr_replica_get_key_list(char *segment, char **tokens)
return keys;
}
int gpr_replica_define_key(char *segment, char *token)
mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
{
mca_gpr_registry_segment_t *seg;
mca_gpr_keytable_t *ptr_seg, *ptr_key, *new;
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_keytable_t *ptr_seg, *ptr_key, *new;
/* protect against errors */
if (NULL == segment) {
@ -174,66 +172,68 @@ int gpr_replica_define_key(char *segment, char *token)
/* if token is NULL, then this is defining a segment name. Check dictionary to ensure uniqueness */
if (NULL == token) {
for (ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_seg)) {
for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) {
if (0 == strcmp(segment, ptr_seg->token)) {
return OMPI_EXISTS;
return ptr_seg->key;
}
}
/* okay, name is not previously taken. Define a key value for it and return */
new = OBJ_NEW(mca_gpr_keytable_t);
new = OBJ_NEW(mca_gpr_replica_keytable_t);
new->token = strdup(segment);
if (0 == ompi_list_get_size(&mca_gpr_replica_head.freekeys)) { /* no keys waiting for reuse */
if (MCA_GPR_REPLICA_KEY_MAX-2 > mca_gpr_replica_head.lastkey) { /* have a key left */
mca_gpr_replica_head.lastkey++;
new->key = mca_gpr_replica_head.lastkey;
} else { /* out of keys */
return OMPI_ERR_OUT_OF_RESOURCE;
return MCA_GPR_REPLICA_KEY_MAX;
}
} else {
ptr_key = (mca_gpr_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys);
ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys);
new->key = ptr_key->key;
}
ompi_list_append(&mca_gpr_replica_head.segment_dict, &new->item);
return OMPI_SUCCESS;
return new->key;
}
/* okay, token is specified */
/* search the registry segments to find which one is being referenced */
seg = gpr_replica_find_seg(segment);
seg = gpr_replica_find_seg(true, segment);
if (NULL != seg) {
/* using that segment, check dictionary to ensure uniqueness */
for (ptr_key = (mca_gpr_keytable_t*)ompi_list_get_first(&seg->keytable);
ptr_key != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr_key = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_key)) {
for (ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&seg->keytable);
ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) {
if (0 == strcmp(token, ptr_key->token)) {
return OMPI_EXISTS; /* already taken, report error */
return ptr_key->key; /* already taken, report value */
}
}
/* okay, token is unique - create dictionary entry */
new = OBJ_NEW(mca_gpr_keytable_t);
new = OBJ_NEW(mca_gpr_replica_keytable_t);
new->token = strdup(token);
if (0 == ompi_list_get_size(&seg->freekeys)) { /* no keys waiting for reuse */
seg->lastkey++;
new->key = seg->lastkey;
} else {
ptr_key = (mca_gpr_keytable_t*)ompi_list_remove_first(&seg->freekeys);
ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&seg->freekeys);
new->key = ptr_key->key;
}
ompi_list_append(&seg->keytable, &new->item);
return OMPI_SUCCESS;
return new->key;
}
/* couldn't find segment */
return OMPI_ERROR;
return MCA_GPR_REPLICA_KEY_MAX;
}
int gpr_replica_delete_key(char *segment, char *token)
{
mca_gpr_registry_segment_t *seg;
mca_gpr_registry_core_t *reg, *prev;
mca_gpr_keytable_t *ptr_seg, *ptr_key, *new, *regkey;
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_core_t *reg;
mca_gpr_replica_keytable_t *ptr_seg, *ptr_key, *new;
mca_gpr_replica_key_t *key;
int i;
/* protect ourselves against errors */
if (NULL == segment) {
@ -241,7 +241,7 @@ int gpr_replica_delete_key(char *segment, char *token)
}
/* find the segment */
seg = gpr_replica_find_seg(segment);
seg = gpr_replica_find_seg(false, segment);
if (NULL != seg) {
/* if specified token is NULL, then this is deleting a segment name.*/
@ -255,7 +255,7 @@ int gpr_replica_delete_key(char *segment, char *token)
return OMPI_ERROR;
}
/* add key to global registry's freekey list */
new = OBJ_NEW(mca_gpr_keytable_t);
new = OBJ_NEW(mca_gpr_replica_keytable_t);
new->token = NULL;
new->key = ptr_seg->key;
ompi_list_append(&mca_gpr_replica_head.freekeys, &new->item);
@ -268,39 +268,28 @@ int gpr_replica_delete_key(char *segment, char *token)
ptr_key = gpr_replica_find_dict_entry(segment, token);
if (NULL != ptr_key) {
/* found key in dictionary */
/* need to search this segment's registry to find all instances of key - then delete them */
for (reg = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries);
reg != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries);
reg = (mca_gpr_registry_core_t*)ompi_list_get_next(reg)) {
/* need to search this segment's registry to find all instances of key & "delete" them */
for (reg = (mca_gpr_replica_core_t*)ompi_list_get_first(&seg->registry_entries);
reg != (mca_gpr_replica_core_t*)ompi_list_get_end(&seg->registry_entries);
reg = (mca_gpr_replica_core_t*)ompi_list_get_next(reg)) {
/* check the key list */
for (regkey = (mca_gpr_keytable_t*)ompi_list_get_first(&reg->keys);
(regkey != (mca_gpr_keytable_t*)ompi_list_get_end(&reg->keys))
&& (regkey->key != ptr_key->key);
regkey = (mca_gpr_keytable_t*)ompi_list_get_next(regkey));
if (regkey != (mca_gpr_keytable_t*)ompi_list_get_end(&reg->keys)) {
ompi_list_remove_item(&reg->keys, &regkey->item);
}
/* if this was the last key, then remove the registry entry itself */
if (0 == ompi_list_get_size(&reg->keys)) {
while (0 < ompi_list_get_size(&reg->subscriber)) {
ompi_list_remove_last(&reg->subscriber);
for (i=0, key=reg->keys; i < reg->num_keys; i++, key++) {
if (ptr_key->key == *key) { /* found match */
*key = MCA_GPR_REPLICA_KEY_MAX;
}
prev = (mca_gpr_registry_core_t*)ompi_list_get_prev(reg);
ompi_list_remove_item(&seg->registry_entries, &reg->item);
reg = prev;
}
/* add key to this segment's freekey list */
new = OBJ_NEW(mca_gpr_replica_keytable_t);
new->token = NULL;
new->key = ptr_key->key;
ompi_list_append(&seg->freekeys, &new->item);
/* now remove the dictionary entry from the segment's dictionary */
ompi_list_remove_item(&seg->keytable, &ptr_key->item);
return(OMPI_SUCCESS);
}
/* add key to this segment's freekey list */
new = OBJ_NEW(mca_gpr_keytable_t);
new->token = NULL;
new->key = ptr_key->key;
ompi_list_append(&seg->freekeys, &new->item);
/* now remove the dictionary entry from the segment's dictionary */
ompi_list_remove_item(&seg->keytable, &ptr_key->item);
return(OMPI_SUCCESS);
}
return(OMPI_ERROR); /* if we get here, then we couldn't find token in dictionary */
}
@ -308,25 +297,34 @@ int gpr_replica_delete_key(char *segment, char *token)
return(OMPI_ERROR); /* if we get here, then we couldn't find segment */
}
int gpr_replica_empty_segment(mca_gpr_registry_segment_t *seg)
int gpr_replica_empty_segment(mca_gpr_replica_segment_t *seg)
{
mca_gpr_replica_core_t *ptr;
mca_gpr_replica_keytable_t *keytab;
mca_gpr_replica_keylist_t *keylst;
/* need to free memory from each entry - remove_last returns pointer to the entry */
/* empty the segment's registry */
while (0 < ompi_list_get_size(&seg->registry_entries)) {
ompi_list_remove_last(&seg->registry_entries);
while (!ompi_list_is_empty(&seg->registry_entries)) {
ptr = (mca_gpr_replica_core_t*)ompi_list_remove_first(&seg->registry_entries);
OBJ_RELEASE(ptr);
}
/* empty the segment's dictionary */
while (0 < ompi_list_get_size(&seg->keytable)) {
ompi_list_remove_last(&seg->keytable);
while (!ompi_list_is_empty(&seg->keytable)) {
keytab = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&seg->keytable);
OBJ_RELEASE(keytab);
}
/* empty the list of free keys */
while (0 < ompi_list_get_size(&seg->freekeys)) {
ompi_list_remove_last(&seg->freekeys);
while (!ompi_list_is_empty(&seg->freekeys)) {
keylst = (mca_gpr_replica_keylist_t*)ompi_list_remove_first(&seg->freekeys);
OBJ_RELEASE(keylst);
}
/* now remove segment from global registry */
ompi_list_remove_item(&mca_gpr_replica_head.registry, &seg->item);
OBJ_RELEASE(seg);
return OMPI_SUCCESS;
}
@ -334,28 +332,32 @@ int gpr_replica_empty_segment(mca_gpr_registry_segment_t *seg)
/*
* A mode of "NONE" or "OVERWRITE" defaults to "XAND" behavior
*/
bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list, ompi_list_t *entry_keys)
bool gpr_replica_check_key_list(ompi_registry_mode_t addr_mode,
mca_gpr_replica_key_t num_keys_search, mca_gpr_replica_key_t *keys,
mca_gpr_replica_key_t num_keys_entry, mca_gpr_replica_key_t *entry_keys)
{
mca_gpr_keylist_t *keyptr;
mca_gpr_keytable_t *key;
size_t num_keys_search, num_keys_entry, num_found;
mca_gpr_replica_key_t *key1, *key2;
int num_found;
bool exclusive, no_match;
int i, j;
if (OMPI_REGISTRY_NONE == mode ||
OMPI_REGISTRY_OVERWRITE == mode) { /* set default behavior for search */
mode = OMPI_REGISTRY_XAND;
/* check for trivial case */
if (NULL == keys) { /* wildcard case - automatically true */
return true;
}
num_keys_search = ompi_list_get_size(key_list);
num_keys_entry = ompi_list_get_size(entry_keys);
if (OMPI_REGISTRY_NONE == addr_mode ||
OMPI_REGISTRY_OVERWRITE == addr_mode) { /* set default behavior for search */
addr_mode = OMPI_REGISTRY_XAND;
}
/* take care of trivial cases that don't require search */
if ((OMPI_REGISTRY_XAND & mode) &&
if ((OMPI_REGISTRY_XAND & addr_mode) &&
(num_keys_search != num_keys_entry)) { /* can't possibly turn out "true" */
return false;
}
if ((OMPI_REGISTRY_AND & mode) &&
if ((OMPI_REGISTRY_AND & addr_mode) &&
(num_keys_search > num_keys_entry)) { /* can't find enough matches */
return false;
}
@ -363,17 +365,13 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list
/* okay, have to search for remaining possibilities */
num_found = 0;
exclusive = true;
for (keyptr = (mca_gpr_keylist_t*)ompi_list_get_first(entry_keys);
keyptr != (mca_gpr_keylist_t*)ompi_list_get_end(entry_keys);
keyptr = (mca_gpr_keylist_t*)ompi_list_get_next(keyptr)) {
for (i=0, key1=entry_keys; i < num_keys_entry; i++, key1++) {
no_match = true;
for (key = (mca_gpr_keytable_t*)ompi_list_get_first(key_list);
(key != (mca_gpr_keytable_t*)ompi_list_get_end(key_list)) && no_match;
key = (mca_gpr_keytable_t*)ompi_list_get_next(key)) {
if (key->key == keyptr->key) { /* found a match */
for (j=0, key2=keys; j < num_keys_search; j++, key2++) {
if (*key1 == *key2) { /* found a match */
num_found++;
no_match = false;
if (OMPI_REGISTRY_OR & mode) { /* only need one match */
if (OMPI_REGISTRY_OR & addr_mode) { /* only need one match */
return true;
}
}
@ -383,7 +381,7 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list
}
}
if (OMPI_REGISTRY_XAND & mode) { /* deal with XAND case */
if (OMPI_REGISTRY_XAND & addr_mode) { /* deal with XAND case */
if (num_found == num_keys_entry) { /* found all, and nothing more */
return true;
} else { /* found either too many or not enough */
@ -391,7 +389,7 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list
}
}
if (OMPI_REGISTRY_XOR & mode) { /* deal with XOR case */
if (OMPI_REGISTRY_XOR & addr_mode) { /* deal with XOR case */
if (num_found > 0 && exclusive) { /* found at least one and nothing not on list */
return true;
} else {
@ -399,7 +397,7 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list
}
}
if (OMPI_REGISTRY_AND & mode) { /* deal with AND case */
if (OMPI_REGISTRY_AND & addr_mode) { /* deal with AND case */
if (num_found == num_keys_search) { /* found all the required keys */
return true;
} else {
@ -411,6 +409,166 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list
return false;
}
int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_mode_t addr_mode,
char *segment, char **tokens, int trigger,
mca_gpr_notify_id_t id_tag)
{
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_synchro_list_t *synch;
char **tokptr;
mca_gpr_replica_key_t *keyptr;
int i, num_tokens;
seg = gpr_replica_find_seg(true, segment);
if (NULL == seg) { /* couldn't find segment */
return OMPI_ERROR;
}
synch = OBJ_NEW(mca_gpr_replica_synchro_list_t);
synch->addr_mode = addr_mode;
synch->trigger = trigger;
synch->count = 0;
synch->id_tag = id_tag;
synch->num_keys = 0;
synch->keys = NULL;
if (NULL != tokens) { /* tokens provided */
/* count number of tokens */
tokptr = tokens;
num_tokens = 0;
while (NULL != tokptr && NULL != *tokptr) {
num_tokens++;
tokptr++;
}
synch->keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t));
keyptr = synch->keys;
/* store key values of tokens, defining them if needed */
for (i=0, tokptr=tokens; NULL != tokptr && NULL != *tokptr; i++, tokptr++) {
*keyptr = gpr_replica_get_key(segment, *tokptr);
if (MCA_GPR_REPLICA_KEY_MAX == *keyptr) {
*keyptr = gpr_replica_define_key(segment, *tokptr);
}
keyptr++;
}
synch->num_keys = num_tokens;
}
ompi_list_append(&seg->synchros, &synch->item);
return OMPI_SUCCESS;
}
ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode, char *segment, char **tokens)
{
ompi_list_t *reg_entries;
ompi_registry_value_t *reg, *obj;
ompi_registry_notify_message_t *msg;
char **tokptr, **tokptr2;
int num_tokens, i;
/* protect against errors */
if (NULL == segment) {
return NULL;
}
reg_entries = gpr_replica_get(addr_mode, segment, tokens);
if (ompi_list_is_empty(reg_entries)) {
return NULL;
}
/* compute number of tokens */
tokptr = tokens;
num_tokens = 0;
while (NULL != *tokptr) {
num_tokens++;
tokptr++;
}
msg = OBJ_NEW(ompi_registry_notify_message_t);
msg->num_tokens = num_tokens;
msg->tokens = (char**)malloc(num_tokens*(sizeof(char*)));
tokptr = tokens;
tokptr2 = msg->tokens;
for (i=0, tokptr=tokens, tokptr2=msg->tokens;
i < num_tokens;
i++, tokptr++, tokptr2++) {
*tokptr2 = strdup(*tokptr);
}
while (NULL != (reg = (ompi_registry_value_t*)ompi_list_remove_first(reg_entries))) {
obj = OBJ_NEW(ompi_registry_value_t);
obj->object = (ompi_registry_object_t)malloc(reg->object_size);
memcpy(obj->object, reg->object, reg->object_size);
obj->object_size = reg->object_size;
ompi_list_append(&msg->data, &obj->item);
OBJ_RELEASE(reg);
}
OBJ_RELEASE(reg_entries);
return msg;
}
void gpr_replica_process_triggers(ompi_registry_notify_message_t *message,
mca_gpr_notify_id_t id_tag)
{
mca_gpr_notify_request_tracker_t *trackptr, *tmpptr;
ompi_registry_object_t *data;
char **tokptr;
int i;
bool found;
/* protect against errors */
if (NULL == message) {
return;
}
/* find corresponding notify request */
found = false;
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) && !found;
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
if (trackptr->id_tag == id_tag) {
found = true;
}
}
if (!found) { /* didn't find request */
ompi_output(0, "Notification error - request not found");
return;
}
/* process request */
if (NULL == trackptr->requestor) { /* local request - callback fn with their tag */
trackptr->callback(message, trackptr->user_tag);
/* dismantle message and free memory */
while (NULL != (data = (ompi_registry_object_t*)ompi_list_get_first(&message->data))) {
OBJ_RELEASE(data);
}
for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) {
free(*tokptr);
}
free(message->tokens);
free(message);
} else { /* remote request - send message back */
gpr_replica_remote_notify(trackptr->requestor, trackptr->req_tag, message);
}
/* if one-shot, remove request from tracking system */
if (OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT & trackptr->synchro) {
tmpptr = (mca_gpr_notify_request_tracker_t*)ompi_list_remove_item(&message->data, &trackptr->item);
OBJ_RELEASE(tmpptr);
}
}
ompi_list_t *gpr_replica_test_internals(int level)
{
ompi_list_t *test_results, *keylist;
@ -419,37 +577,20 @@ ompi_list_t *gpr_replica_test_internals(int level)
char *name3[30];
int i, j, k;
mca_gpr_replica_key_t segkey, key;
mca_gpr_registry_segment_t *seg;
mca_gpr_keytable_t *dict_entry;
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_keytable_t *dict_entry;
bool success;
test_results = OBJ_NEW(ompi_list_t);
ompi_output(0, "testing define segment");
/* create several test segments */
success = true;
result = OBJ_NEW(ompi_registry_internal_test_results_t);
result->test = strdup("test-create-segment");
for (i=0; i<5 && success; i++) {
sprintf(name, "test-def-seg%d", i);
if (OMPI_SUCCESS != gpr_replica_define_segment(name)) {
success = false;
}
}
if (success) {
result->message = strdup("success");
} else {
result->message = strdup("failed");
}
ompi_list_append(test_results, &result->item);
/* check that define key protects uniqueness */
success = true;
result = OBJ_NEW(ompi_registry_internal_test_results_t);
result->test = strdup("test-define-key-uniqueness");
for (i=0; i<5 && success; i++) {
sprintf(name, "test-def-seg%d", i);
key = gpr_replica_define_key(name, NULL);
if (OMPI_EXISTS != key) { /* got an error */
if (NULL == gpr_replica_define_segment(name)) {
success = false;
}
}
@ -460,6 +601,7 @@ ompi_list_t *gpr_replica_test_internals(int level)
}
ompi_list_append(test_results, &result->item);
ompi_output(0, "testing get key for segment ");
/* check ability to get key for a segment */
success = true;
result = OBJ_NEW(ompi_registry_internal_test_results_t);
@ -478,12 +620,33 @@ ompi_list_t *gpr_replica_test_internals(int level)
}
ompi_list_append(test_results, &result->item);
ompi_output(0, "testing define key");
/* check that define key protects uniqueness */
success = true;
result = OBJ_NEW(ompi_registry_internal_test_results_t);
result->test = strdup("test-define-key-uniqueness");
for (i=0; i<5 && success; i++) {
sprintf(name, "test-def-seg%d", i);
segkey = gpr_replica_get_key(name, NULL);
key = gpr_replica_define_key(name, NULL);
if (segkey != key) { /* got an error */
success = false;
}
}
if (success) {
result->message = strdup("success");
} else {
result->message = strdup("failed");
}
ompi_list_append(test_results, &result->item);
ompi_output(0, "testing find segment");
/* check the ability to find a segment */
i = 2;
sprintf(name, "test-def-seg%d", i);
result = OBJ_NEW(ompi_registry_internal_test_results_t);
result->test = strdup("test-find-seg");
seg = gpr_replica_find_seg(name);
seg = gpr_replica_find_seg(false, name);
if (NULL == seg) {
asprintf(&result->message, "test failed with NULL returned: %s", name);
} else { /* locate key and check it */
@ -496,6 +659,7 @@ ompi_list_t *gpr_replica_test_internals(int level)
}
ompi_list_append(test_results, &result->item);
ompi_output(0, "testing define key within segment");
/* check ability to define key within a segment */
success = true;
result = OBJ_NEW(ompi_registry_internal_test_results_t);
@ -518,6 +682,7 @@ ompi_list_t *gpr_replica_test_internals(int level)
ompi_list_append(test_results, &result->item);
ompi_output(0, "testing get key within segment");
/* check ability to retrieve key within a segment */
success = true;
result = OBJ_NEW(ompi_registry_internal_test_results_t);
@ -540,6 +705,7 @@ ompi_list_t *gpr_replica_test_internals(int level)
ompi_list_append(test_results, &result->item);
ompi_output(0, "testing get dict entry - global");
/* check ability to get dictionary entries */
success = true;
result = OBJ_NEW(ompi_registry_internal_test_results_t);
@ -559,6 +725,7 @@ ompi_list_t *gpr_replica_test_internals(int level)
}
ompi_list_append(test_results, &result->item);
ompi_output(0, "testing get dict entry - segment");
if (success) { /* segment values checked out - move on to within a segment */
result = OBJ_NEW(ompi_registry_internal_test_results_t);
result->test = strdup("test-get-dict-entry-segment");
@ -581,6 +748,7 @@ ompi_list_t *gpr_replica_test_internals(int level)
}
ompi_output(0, "testing get key list");
/* check ability to get key list */
success = true;
result = OBJ_NEW(ompi_registry_internal_test_results_t);

Просмотреть файл

@ -30,11 +30,10 @@ mca_gpr_replica_key_t gpr_replica_get_key(char *segment, char *token);
* @param token Pointer to a character string containing the token to be defined. If token=NULL,
* the function adds the token to the segment dictionary, thus defining a new segment name.
*
* @retval OMPI_EXISTS Indicates that the requested dictionary entry already exists.
* @retval OMPI_ERR_OUT_OF_RESOURCE Indicates that the dictionary is full.
* @retval OMPI_SUCCESS Indicates that the dictionary entry was created.
* @retval key New key value
* @retval MCA_GPR_REPLICA_KEY_MAX Indicates that the dictionary is full or some other error.
*/
int gpr_replica_define_key(char *segment, char *token);
mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token);
/** Delete a token from a segment's dictionary.
* The gpr_replica_deletekey() function allows the removal of a definition from the
@ -63,14 +62,27 @@ int gpr_replica_delete_key(char *segment, char *token);
* @retval *seg Pointer to the segment
* @retval NULL Indicates that the specified segment could not be found
*/
mca_gpr_registry_segment_t *gpr_replica_find_seg(char *segment);
mca_gpr_replica_segment_t *gpr_replica_find_seg(bool create, char *segment);
mca_gpr_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token);
mca_gpr_replica_keytable_t *gpr_replica_find_dict_entry(char *segment, char *token);
int gpr_replica_empty_segment(mca_gpr_registry_segment_t *seg);
int gpr_replica_empty_segment(mca_gpr_replica_segment_t *seg);
ompi_list_t *gpr_replica_get_key_list(char *segment, char **tokens);
bool gpr_replica_check_key_list(ompi_registry_mode_t mode, ompi_list_t *key_list, ompi_list_t *entry_keys);
bool gpr_replica_check_key_list(ompi_registry_mode_t mode,
mca_gpr_replica_key_t num_keys_search, mca_gpr_replica_key_t *keys,
mca_gpr_replica_key_t num_keys_entry, mca_gpr_replica_key_t *entry_keys);
int gpr_replica_define_segment(char *segment);
mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment);
int gpr_replica_construct_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_mode_t addr_mode,
char *segment, char **tokens, int trigger,
mca_gpr_notify_id_t id_tag);
ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode,
char *segment, char **tokens);
void gpr_replica_process_triggers(ompi_registry_notify_message_t *message,
mca_gpr_notify_id_t id_tag);

Просмотреть файл

@ -41,7 +41,7 @@ int main(int argc, char **argv)
ompi_list_t *answer;
ompi_registry_value_t *ans;
bool multi, hidden;
int i, j;
int i, j, result;
bool success;
char name[30], *name2[30], *name3[30];
int put_test; /* result from system call */
@ -65,6 +65,7 @@ int main(int argc, char **argv)
exit (1);
}
fprintf(test_out, "\n\ngpr opening\n");
/* open the GPR */
if (OMPI_SUCCESS == mca_gpr_base_open()) {
fprintf(test_out, "GPR opened\n");
@ -76,6 +77,7 @@ int main(int argc, char **argv)
exit(1);
}
fprintf(test_out, "\n\ngpr select\n");
/* startup the GPR replica */
if (OMPI_SUCCESS != mca_gpr_base_select(&multi, &hidden)) {
fprintf(test_out, "GPR replica could not start\n");
@ -87,6 +89,7 @@ int main(int argc, char **argv)
test_success();
}
fprintf(test_out, "\n\ntesting internals\n");
/* check internals */
internal_tests = ompi_registry.test_internals(1);
if (0 == ompi_list_get_size(internal_tests)) { /* should have been something in list */
@ -105,6 +108,7 @@ int main(int argc, char **argv)
test_success();
}
fprintf(test_out, "\n\ntesting index\n");
/* check index */
test_list = ompi_registry.index(NULL);
if (0 == ompi_list_get_size(test_list)) { /* should have been something in dictionary */
@ -122,7 +126,7 @@ int main(int argc, char **argv)
test_success();
}
fprintf(test_out, "\n\ntesting put function\n");
/* test the put function */
success = true;
input_size = 10000;
@ -144,6 +148,7 @@ int main(int argc, char **argv)
for (i=0; i<5 && success; i++) {
sprintf(name, "test-def-seg%d", i);
fprintf(test_out, "\ttesting seg %s\n", name);
if (OMPI_SUCCESS != ompi_registry.put(OMPI_REGISTRY_NONE, name,
name2, test_buffer, input_size)) {
fprintf(test_out, "put test failed for segment %s\n", name);
@ -163,6 +168,7 @@ int main(int argc, char **argv)
exit(1);
}
fprintf(test_out, "\n\ntesting overwrite function\n");
/* test the put overwrite function */
success = true;
for (i=0; i<5 && success; i++) {
@ -172,10 +178,16 @@ int main(int argc, char **argv)
} else {
mode = OMPI_REGISTRY_NONE;
}
if (OMPI_REGISTRY_OVERWRITE == mode) {
fprintf(test_out, "\toverwrite on - testing segment %s\n", name);
} else if (OMPI_REGISTRY_NONE == mode) {
fprintf(test_out, "\toverwrite off - testing segment %s\n", name);
}
put_test = ompi_registry.put(mode, name, name2, test_buffer, input_size);
fprintf(test_out, "\t result %d\n", put_test);
if ((OMPI_REGISTRY_OVERWRITE == mode && OMPI_SUCCESS != put_test) ||
(OMPI_REGISTRY_NONE == mode && OMPI_SUCCESS == put_test)) {
fprintf(test_out, "put test failed for segment %s\n", name);
fprintf(test_out, "put overwrite test failed for segment %s\n", name);
for (j=0; j<10; j++) {
fprintf(test_out, "\t%s\n", name2[j]);
}
@ -272,14 +284,14 @@ int main(int argc, char **argv)
/* check the universe segment - should have a key value of "1" */
fclose( test_out );
/* result = system( cmd_str );
result = system( cmd_str );
if( result == 0 ) {
test_success();
}
else {
test_failure( "test_gpr_replica ompi_registry init, etc failed");
}
*/
test_finalize();
return(0);