From f2b232c16b92163a0656dd413f6bb51d93b0c20e Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Fri, 20 Aug 2004 04:13:10 +0000 Subject: [PATCH] Complete registry functionality - added proxy/replica connections for index and delete object. Non-blocking receive remains commented out - thus, registry is non-operational pending release of the TCP-based oob. We will notify everyone when it becomes fully operational. This commit was SVN r2247. --- src/mca/gpr/proxy/gpr_proxy.c | 262 +++++++++++++++++++- src/mca/gpr/replica/gpr_replica.c | 62 ++++- src/mca/gpr/replica/gpr_replica_component.c | 130 +++++++++- 3 files changed, 440 insertions(+), 14 deletions(-) diff --git a/src/mca/gpr/proxy/gpr_proxy.c b/src/mca/gpr/proxy/gpr_proxy.c index 1470a44003..8b608ec00d 100644 --- a/src/mca/gpr/proxy/gpr_proxy.c +++ b/src/mca/gpr/proxy/gpr_proxy.c @@ -23,7 +23,50 @@ int gpr_proxy_delete_segment(char *segment) { - return OMPI_SUCCESS; + ompi_buffer_t cmd; + ompi_buffer_t answer; + mca_gpr_cmd_flag_t command; + int recv_tag; + int32_t response; + + command = MCA_GPR_DELETE_SEGMENT_CMD; + recv_tag = MCA_OOB_TAG_GPR; + + if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */ + return OMPI_ERROR; + } + + if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) { + return OMPI_ERROR; + } + + if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) { + return OMPI_ERROR; + } + + if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) { + return OMPI_ERROR; + } + + + if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) { + return OMPI_ERROR; + } + + if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) + || (MCA_GPR_DELETE_SEGMENT_CMD != command)) { + ompi_buffer_free(answer); + return OMPI_ERROR; + } + + if (OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) { + ompi_buffer_free(answer); + return OMPI_ERROR; + } else { + ompi_buffer_free(answer); + return (int)response; + } + return OMPI_ERROR; } @@ -116,17 +159,159 @@ int gpr_proxy_put(ompi_registry_mode_t mode, char *segment, int gpr_proxy_delete_object(ompi_registry_mode_t mode, char *segment, char **tokens) { - return OMPI_SUCCESS; + ompi_buffer_t cmd; + ompi_buffer_t answer; + mca_gpr_cmd_flag_t command; + char **tokptr; + int recv_tag, i; + int32_t num_tokens, response; + + /* need to protect against errors */ + if (NULL == segment || NULL == tokens || NULL == *tokens) { + return OMPI_ERROR; + } + + command = MCA_GPR_DELETE_OBJECT_CMD; + recv_tag = MCA_OOB_TAG_GPR; + + if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */ + return OMPI_ERROR; + } + + if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) { + goto CLEANUP; + } + + if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { + goto CLEANUP; + } + + if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) { + goto CLEANUP; + } + + /* compute number of tokens */ + tokptr = tokens; + while (NULL != *tokptr) { + num_tokens++; + tokptr++; + } + + if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) { + goto CLEANUP; + } + + tokptr = tokens; + for (i=0; i mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) { + goto CLEANUP; + } + + if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) { + goto CLEANUP; + } + + if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) + || (MCA_GPR_GET_CMD != command)) { + ompi_buffer_free(answer); + goto CLEANUP; + } + + if (OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) { + ompi_buffer_free(answer); + return OMPI_ERROR; + } else { + ompi_buffer_free(answer); + return (int)response; + } + + CLEANUP: + ompi_buffer_free(cmd); + return OMPI_ERROR; + } ompi_list_t* gpr_proxy_index(char *segment) { - ompi_list_t *answer; + ompi_list_t *return_list; + ompi_buffer_t cmd; + ompi_buffer_t answer; + mca_gpr_cmd_flag_t command; + char *string1; + int recv_tag, i; + int32_t num_responses; + ompi_registry_mode_t mode; + ompi_registry_index_value_t *newptr; - answer = OBJ_NEW(ompi_list_t); + return_list = OBJ_NEW(ompi_list_t); - return answer; + command = MCA_GPR_INDEX_CMD; + recv_tag = MCA_OOB_TAG_GPR; + + if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */ + return return_list; + } + + if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) { + goto CLEANUP; + } + + if (NULL == segment) { /* no segment specified - want universe dict */ + mode = 0; + if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { + goto CLEANUP; + } + } else { + mode = 1; + if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { + goto CLEANUP; + } + if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) { + goto CLEANUP; + } + } + + if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) { + goto CLEANUP; + } + + if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) { + goto CLEANUP; + } + + if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) + || (MCA_GPR_INDEX_CMD != command)) { + ompi_buffer_free(answer); + goto CLEANUP; + } + + if ((OMPI_SUCCESS != ompi_unpack(answer, &num_responses, 1, OMPI_INT32)) || + (0 >= num_responses)) { + ompi_buffer_free(answer); + goto CLEANUP; + } + + for (i=0; i ompi_unpack_string(answer, &string1)) { + ompi_buffer_free(answer); + goto CLEANUP; + } + newptr = OBJ_NEW(ompi_registry_index_value_t); + newptr->token = strdup(string1); + ompi_list_append(return_list, &newptr->item); + } + + + CLEANUP: + ompi_buffer_free(cmd); + return return_list; } @@ -134,14 +319,14 @@ int gpr_proxy_subscribe(ompi_process_name_t *caller, ompi_registry_mode_t mode, ompi_registry_notify_action_t action, char *segment, char **tokens) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; } int gpr_proxy_unsubscribe(ompi_process_name_t *caller, ompi_registry_mode_t mode, char *segment, char **tokens) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; } @@ -249,8 +434,71 @@ ompi_list_t* gpr_proxy_get(ompi_registry_mode_t mode, char *segment, char **toke ompi_list_t* gpr_proxy_test_internals(int level) { ompi_list_t *test_results; + ompi_buffer_t cmd, answer; + char **string1, **string2; + int i; + int32_t num_responses, test_level; + ompi_registry_internal_test_results_t *newptr; + mca_gpr_cmd_flag_t command; + int recv_tag; + test_results = OBJ_NEW(ompi_list_t); + test_level = (int32_t)level; + command = MCA_GPR_TEST_INTERNALS_CMD; + recv_tag = MCA_OOB_TAG_GPR; + + if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */ + return test_results; + } + + if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) { + goto CLEANUP; + } + + if (OMPI_SUCCESS != ompi_pack(cmd, &test_level, 1, OMPI_INT32)) { + goto CLEANUP; + } + + if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) { + goto CLEANUP; + } + + + if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) { + goto CLEANUP; + } + + if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) + || (MCA_GPR_TEST_INTERNALS_CMD != command)) { + ompi_buffer_free(answer); + goto CLEANUP; + } + + if ((OMPI_SUCCESS != ompi_unpack(answer, &num_responses, 1, OMPI_INT32)) || + (0 >= num_responses)) { + ompi_buffer_free(answer); + goto CLEANUP; + } + + for (i=0; i ompi_unpack_string(answer, string1)) { + ompi_buffer_free(answer); + goto CLEANUP; + } + if (0 > ompi_unpack_string(answer, string2)) { + ompi_buffer_free(answer); + goto CLEANUP; + } + newptr = OBJ_NEW(ompi_registry_internal_test_results_t); + newptr->test = strdup(*string1); + newptr->message = strdup(*string2); + ompi_list_append(test_results, &newptr->item); + } + ompi_buffer_free(answer); + + CLEANUP: + ompi_buffer_free(cmd); return test_results; } diff --git a/src/mca/gpr/replica/gpr_replica.c b/src/mca/gpr/replica/gpr_replica.c index 2a592806b2..d8a77507ec 100644 --- a/src/mca/gpr/replica/gpr_replica.c +++ b/src/mca/gpr/replica/gpr_replica.c @@ -145,7 +145,51 @@ int gpr_replica_put(ompi_registry_mode_t mode, char *segment, int gpr_replica_delete_object(ompi_registry_mode_t mode, char *segment, char **tokens) { - return 0; + mca_gpr_registry_core_t *reg, *prev; + mca_gpr_keytable_t *keyptr; + ompi_list_t *keys; + mca_gpr_registry_segment_t *seg; + + /* protect against errors */ + if (NULL == segment || NULL == tokens || NULL == *tokens) { + return OMPI_ERROR; + } + + /* find the specified segment */ + seg = gpr_replica_find_seg(segment); + if (NULL == seg) { /* segment not found */ + return OMPI_ERROR; + } + + /* convert tokens to list of keys */ + keys = gpr_replica_get_key_list(segment, tokens); + if (0 == ompi_list_get_size(keys)) { + return OMPI_ERROR; + } + + /* traverse the list to find undefined tokens - error if found */ + for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys); + keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys); + keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) { + if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */ + return OMPI_ERROR; + } + } + + /* traverse the segment's registry, looking for matching tokens per the specified mode */ + for (reg = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries); + reg != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries); + reg = (mca_gpr_registry_core_t*)ompi_list_get_next(reg)) { + + /* for each registry entry, check the key list */ + if (gpr_replica_check_key_list(mode, keys, reg)) { /* found the key(s) on the list */ + prev = (mca_gpr_registry_core_t*)ompi_list_get_prev(reg); + ompi_list_remove_item(&seg->registry_entries, ®->item); + reg = prev; + } + } + + return OMPI_SUCCESS; } ompi_list_t* gpr_replica_index(char *segment) @@ -165,7 +209,21 @@ ompi_list_t* gpr_replica_index(char *segment) ans->token = strdup(ptr->token); ompi_list_append(answer, &ans->item); } - } else { + } else { /* want index of specific segment */ + /* find the specified segment */ + seg = gpr_replica_find_seg(segment); + if (NULL == seg) { /* segment not found */ + return answer; + } + /* got segment - now find specified token-key pair in that dictionary */ + for (ptr = (mca_gpr_keytable_t*)ompi_list_get_first(&seg->keytable); + ptr != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable); + ptr = (mca_gpr_keytable_t*)ompi_list_get_next(ptr)) { + ans = OBJ_NEW(ompi_registry_index_value_t); + ans->token = strdup(ptr->token); + ompi_list_append(answer, &ans->item); + } + } return answer; diff --git a/src/mca/gpr/replica/gpr_replica_component.c b/src/mca/gpr/replica/gpr_replica_component.c index 2f73a6eafe..191b381c54 100644 --- a/src/mca/gpr/replica/gpr_replica_component.c +++ b/src/mca/gpr/replica/gpr_replica_component.c @@ -317,8 +317,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, ompi_registry_notify_action_t action; ompi_registry_value_t *regval; ompi_list_t *returned_list; + ompi_registry_internal_test_results_t *testval; + ompi_registry_index_value_t *indexval; char **tokens, **tokptr; - int32_t num_tokens, level, i; + int32_t num_tokens, test_level, i; mca_gpr_cmd_flag_t command; char *segment; int32_t response; @@ -336,7 +338,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } - response = (int16_t)ompi_registry.delete_segment(segment); + response = (int32_t)ompi_registry.delete_segment(segment); if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) { goto RETURN_ERROR; @@ -370,7 +372,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, } tokptr++; } - *tokptr = NULL; + *tokptr = NULL; if (OMPI_SUCCESS != ompi_unpack(buffer, &object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) { goto RETURN_ERROR; @@ -407,7 +409,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } - tokens = (char**)malloc(num_tokens*sizeof(char*)); + tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); tokptr = tokens; for (i=0; i ompi_unpack_string(buffer, tokptr)) { @@ -415,7 +417,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, } tokptr++; } - + *tokptr = NULL; returned_list = ompi_registry.get(mode, segment, tokens); @@ -443,6 +445,123 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, if (0 > mca_oob_send_packed(sender, answer, tag, 0)) { /* RHC -- not sure what to do if the return send fails */ } + } else if (MCA_GPR_DELETE_OBJECT_CMD == command) { + + if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { + goto RETURN_ERROR; + } + + if (0 > ompi_unpack_string(buffer, &segment)) { + goto RETURN_ERROR; + } + + if (OMPI_SUCCESS != ompi_unpack(buffer, &num_tokens, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + tokens = (char**)malloc((num_tokens+1)*sizeof(char*)); + tokptr = tokens; + for (i=0; i ompi_unpack_string(buffer, tokptr)) { + goto RETURN_ERROR; + } + tokptr++; + } + *tokptr = NULL; + + response = (int32_t)ompi_registry.delete_object(mode, segment, tokens); + + if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) { + goto RETURN_ERROR; + } + + if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + if (0 > mca_oob_send_packed(sender, answer, tag, 0)) { + /* RHC -- not sure what to do if the return send fails */ + } + + } else if (MCA_GPR_INDEX_CMD == command) { + + if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) { + goto RETURN_ERROR; + } + + if (0 == mode) { /* only want dict of segments */ + segment = NULL; + } else { + if (0 > ompi_unpack_string(buffer, &segment)) { + goto RETURN_ERROR; + } + } + + returned_list = ompi_registry.index(segment); + + if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) { + goto RETURN_ERROR; + } + + response = (int32_t)ompi_list_get_size(returned_list); + if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + if (0 < response) { /* don't send anything else back if the list is empty */ + for (indexval = (ompi_registry_index_value_t*)ompi_list_get_first(returned_list); + indexval != (ompi_registry_index_value_t*)ompi_list_get_end(returned_list); + indexval = (ompi_registry_index_value_t*)ompi_list_get_next(indexval)) { /* traverse the list */ + if (OMPI_SUCCESS != ompi_pack_string(answer, indexval->token)) { + goto RETURN_ERROR; + } + } + } + + if (0 > mca_oob_send_packed(sender, answer, tag, 0)) { + /* RHC -- not sure what to do if the return send fails */ + } + + } else if (MCA_GPR_SUBSCRIBE_CMD == command) { + action = OMPI_REGISTRY_NOTIFY_ALL; + goto RETURN_ERROR; + } else if (MCA_GPR_UNSUBSCRIBE_CMD == command) { + goto RETURN_ERROR; + } else if (MCA_GPR_TEST_INTERNALS_CMD == command) { + + if ((OMPI_SUCCESS != ompi_unpack(buffer, &test_level, 1, OMPI_INT32)) || + (0 > test_level)) { + goto RETURN_ERROR; + } + + returned_list = gpr_replica_test_internals(test_level); + + if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) { + goto RETURN_ERROR; + } + + response = (int32_t)ompi_list_get_size(returned_list); + if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) { + goto RETURN_ERROR; + } + + if (0 < response) { /* don't send anything else back if the list is empty */ + for (testval = (ompi_registry_internal_test_results_t*)ompi_list_get_first(returned_list); + testval != (ompi_registry_internal_test_results_t*)ompi_list_get_end(returned_list); + testval = (ompi_registry_internal_test_results_t*)ompi_list_get_next(testval)) { /* traverse the list */ + if (OMPI_SUCCESS != ompi_pack_string(answer, testval->test)) { + goto RETURN_ERROR; + } + if (OMPI_SUCCESS != ompi_pack_string(answer, testval->message)) { + goto RETURN_ERROR; + } + } + } + if (0 > mca_oob_send_packed(sender, answer, tag, 0)) { + /* RHC -- not sure what to do if the return send fails */ + } + + } else { /* got an unrecognized command */ RETURN_ERROR: ompi_buffer_init(&error_answer, 8); @@ -453,6 +572,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, } ompi_buffer_free(answer); + ompi_buffer_free(buffer); /* reissue the non-blocking receive */ mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);