1
1

Complete registry functionality - added proxy/replica connections for index and delete object.

Non-blocking receive remains commented out - thus, registry is non-operational pending release of the TCP-based oob. We will notify everyone when it becomes fully operational.

This commit was SVN r2247.
Этот коммит содержится в:
Ralph Castain 2004-08-20 04:13:10 +00:00
родитель 0e6e74115e
Коммит f2b232c16b
3 изменённых файлов: 440 добавлений и 14 удалений

Просмотреть файл

@ -23,7 +23,50 @@
int gpr_proxy_delete_segment(char *segment)
{
return OMPI_SUCCESS;
ompi_buffer_t cmd;
ompi_buffer_t answer;
mca_gpr_cmd_flag_t command;
int recv_tag;
int32_t response;
command = MCA_GPR_DELETE_SEGMENT_CMD;
recv_tag = MCA_OOB_TAG_GPR;
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
return OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
return OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) {
return OMPI_ERROR;
}
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
return OMPI_ERROR;
}
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
return OMPI_ERROR;
}
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|| (MCA_GPR_DELETE_SEGMENT_CMD != command)) {
ompi_buffer_free(answer);
return OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) {
ompi_buffer_free(answer);
return OMPI_ERROR;
} else {
ompi_buffer_free(answer);
return (int)response;
}
return OMPI_ERROR;
}
@ -116,17 +159,159 @@ int gpr_proxy_put(ompi_registry_mode_t mode, char *segment,
int gpr_proxy_delete_object(ompi_registry_mode_t mode,
char *segment, char **tokens)
{
return OMPI_SUCCESS;
ompi_buffer_t cmd;
ompi_buffer_t answer;
mca_gpr_cmd_flag_t command;
char **tokptr;
int recv_tag, i;
int32_t num_tokens, response;
/* need to protect against errors */
if (NULL == segment || NULL == tokens || NULL == *tokens) {
return OMPI_ERROR;
}
command = MCA_GPR_DELETE_OBJECT_CMD;
recv_tag = MCA_OOB_TAG_GPR;
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
return OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto CLEANUP;
}
if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
goto CLEANUP;
}
if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) {
goto CLEANUP;
}
/* compute number of tokens */
tokptr = tokens;
while (NULL != *tokptr) {
num_tokens++;
tokptr++;
}
if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) {
goto CLEANUP;
}
tokptr = tokens;
for (i=0; i<num_tokens; i++) { /* pack the tokens */
if (OMPI_SUCCESS != ompi_pack_string(cmd, *tokptr)) {
goto CLEANUP;
}
tokptr++;
}
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
goto CLEANUP;
}
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
goto CLEANUP;
}
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|| (MCA_GPR_GET_CMD != command)) {
ompi_buffer_free(answer);
goto CLEANUP;
}
if (OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) {
ompi_buffer_free(answer);
return OMPI_ERROR;
} else {
ompi_buffer_free(answer);
return (int)response;
}
CLEANUP:
ompi_buffer_free(cmd);
return OMPI_ERROR;
}
ompi_list_t* gpr_proxy_index(char *segment)
{
ompi_list_t *answer;
ompi_list_t *return_list;
ompi_buffer_t cmd;
ompi_buffer_t answer;
mca_gpr_cmd_flag_t command;
char *string1;
int recv_tag, i;
int32_t num_responses;
ompi_registry_mode_t mode;
ompi_registry_index_value_t *newptr;
answer = OBJ_NEW(ompi_list_t);
return_list = OBJ_NEW(ompi_list_t);
return answer;
command = MCA_GPR_INDEX_CMD;
recv_tag = MCA_OOB_TAG_GPR;
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
return return_list;
}
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto CLEANUP;
}
if (NULL == segment) { /* no segment specified - want universe dict */
mode = 0;
if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
goto CLEANUP;
}
} else {
mode = 1;
if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
goto CLEANUP;
}
if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) {
goto CLEANUP;
}
}
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
goto CLEANUP;
}
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
goto CLEANUP;
}
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|| (MCA_GPR_INDEX_CMD != command)) {
ompi_buffer_free(answer);
goto CLEANUP;
}
if ((OMPI_SUCCESS != ompi_unpack(answer, &num_responses, 1, OMPI_INT32)) ||
(0 >= num_responses)) {
ompi_buffer_free(answer);
goto CLEANUP;
}
for (i=0; i<num_responses; i++) {
if (0 > ompi_unpack_string(answer, &string1)) {
ompi_buffer_free(answer);
goto CLEANUP;
}
newptr = OBJ_NEW(ompi_registry_index_value_t);
newptr->token = strdup(string1);
ompi_list_append(return_list, &newptr->item);
}
CLEANUP:
ompi_buffer_free(cmd);
return return_list;
}
@ -134,14 +319,14 @@ int gpr_proxy_subscribe(ompi_process_name_t *caller, ompi_registry_mode_t mode,
ompi_registry_notify_action_t action,
char *segment, char **tokens)
{
return OMPI_SUCCESS;
return OMPI_ERR_NOT_IMPLEMENTED;
}
int gpr_proxy_unsubscribe(ompi_process_name_t *caller, ompi_registry_mode_t mode,
char *segment, char **tokens)
{
return OMPI_SUCCESS;
return OMPI_ERR_NOT_IMPLEMENTED;
}
@ -249,8 +434,71 @@ ompi_list_t* gpr_proxy_get(ompi_registry_mode_t mode, char *segment, char **toke
ompi_list_t* gpr_proxy_test_internals(int level)
{
ompi_list_t *test_results;
ompi_buffer_t cmd, answer;
char **string1, **string2;
int i;
int32_t num_responses, test_level;
ompi_registry_internal_test_results_t *newptr;
mca_gpr_cmd_flag_t command;
int recv_tag;
test_results = OBJ_NEW(ompi_list_t);
test_level = (int32_t)level;
command = MCA_GPR_TEST_INTERNALS_CMD;
recv_tag = MCA_OOB_TAG_GPR;
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
return test_results;
}
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto CLEANUP;
}
if (OMPI_SUCCESS != ompi_pack(cmd, &test_level, 1, OMPI_INT32)) {
goto CLEANUP;
}
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
goto CLEANUP;
}
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
goto CLEANUP;
}
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|| (MCA_GPR_TEST_INTERNALS_CMD != command)) {
ompi_buffer_free(answer);
goto CLEANUP;
}
if ((OMPI_SUCCESS != ompi_unpack(answer, &num_responses, 1, OMPI_INT32)) ||
(0 >= num_responses)) {
ompi_buffer_free(answer);
goto CLEANUP;
}
for (i=0; i<num_responses; i++) {
if (0 > ompi_unpack_string(answer, string1)) {
ompi_buffer_free(answer);
goto CLEANUP;
}
if (0 > ompi_unpack_string(answer, string2)) {
ompi_buffer_free(answer);
goto CLEANUP;
}
newptr = OBJ_NEW(ompi_registry_internal_test_results_t);
newptr->test = strdup(*string1);
newptr->message = strdup(*string2);
ompi_list_append(test_results, &newptr->item);
}
ompi_buffer_free(answer);
CLEANUP:
ompi_buffer_free(cmd);
return test_results;
}

Просмотреть файл

@ -145,7 +145,51 @@ int gpr_replica_put(ompi_registry_mode_t mode, char *segment,
int gpr_replica_delete_object(ompi_registry_mode_t mode,
char *segment, char **tokens)
{
return 0;
mca_gpr_registry_core_t *reg, *prev;
mca_gpr_keytable_t *keyptr;
ompi_list_t *keys;
mca_gpr_registry_segment_t *seg;
/* protect against errors */
if (NULL == segment || NULL == tokens || NULL == *tokens) {
return OMPI_ERROR;
}
/* find the specified segment */
seg = gpr_replica_find_seg(segment);
if (NULL == seg) { /* segment not found */
return OMPI_ERROR;
}
/* convert tokens to list of keys */
keys = gpr_replica_get_key_list(segment, tokens);
if (0 == ompi_list_get_size(keys)) {
return OMPI_ERROR;
}
/* traverse the list to find undefined tokens - error if found */
for (keyptr = (mca_gpr_keytable_t*)ompi_list_get_first(keys);
keyptr != (mca_gpr_keytable_t*)ompi_list_get_end(keys);
keyptr = (mca_gpr_keytable_t*)ompi_list_get_next(keyptr)) {
if (MCA_GPR_REPLICA_KEY_MAX == keyptr->key) { /* unknown token */
return OMPI_ERROR;
}
}
/* traverse the segment's registry, looking for matching tokens per the specified mode */
for (reg = (mca_gpr_registry_core_t*)ompi_list_get_first(&seg->registry_entries);
reg != (mca_gpr_registry_core_t*)ompi_list_get_end(&seg->registry_entries);
reg = (mca_gpr_registry_core_t*)ompi_list_get_next(reg)) {
/* for each registry entry, check the key list */
if (gpr_replica_check_key_list(mode, keys, reg)) { /* found the key(s) on the list */
prev = (mca_gpr_registry_core_t*)ompi_list_get_prev(reg);
ompi_list_remove_item(&seg->registry_entries, &reg->item);
reg = prev;
}
}
return OMPI_SUCCESS;
}
ompi_list_t* gpr_replica_index(char *segment)
@ -165,7 +209,21 @@ ompi_list_t* gpr_replica_index(char *segment)
ans->token = strdup(ptr->token);
ompi_list_append(answer, &ans->item);
}
} else {
} else { /* want index of specific segment */
/* find the specified segment */
seg = gpr_replica_find_seg(segment);
if (NULL == seg) { /* segment not found */
return answer;
}
/* got segment - now find specified token-key pair in that dictionary */
for (ptr = (mca_gpr_keytable_t*)ompi_list_get_first(&seg->keytable);
ptr != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr = (mca_gpr_keytable_t*)ompi_list_get_next(ptr)) {
ans = OBJ_NEW(ompi_registry_index_value_t);
ans->token = strdup(ptr->token);
ompi_list_append(answer, &ans->item);
}
}
return answer;

Просмотреть файл

@ -317,8 +317,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
ompi_registry_notify_action_t action;
ompi_registry_value_t *regval;
ompi_list_t *returned_list;
ompi_registry_internal_test_results_t *testval;
ompi_registry_index_value_t *indexval;
char **tokens, **tokptr;
int32_t num_tokens, level, i;
int32_t num_tokens, test_level, i;
mca_gpr_cmd_flag_t command;
char *segment;
int32_t response;
@ -336,7 +338,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
response = (int16_t)ompi_registry.delete_segment(segment);
response = (int32_t)ompi_registry.delete_segment(segment);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -370,7 +372,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
}
tokptr++;
}
*tokptr = NULL;
*tokptr = NULL;
if (OMPI_SUCCESS != ompi_unpack(buffer, &object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) {
goto RETURN_ERROR;
@ -407,7 +409,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
tokens = (char**)malloc(num_tokens*sizeof(char*));
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
for (i=0; i<num_tokens; i++) {
if (0 > ompi_unpack_string(buffer, tokptr)) {
@ -415,7 +417,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
}
tokptr++;
}
*tokptr = NULL;
returned_list = ompi_registry.get(mode, segment, tokens);
@ -443,6 +445,123 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
} else if (MCA_GPR_DELETE_OBJECT_CMD == command) {
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
goto RETURN_ERROR;
}
if (0 > ompi_unpack_string(buffer, &segment)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_tokens, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
tokptr = tokens;
for (i=0; i<num_tokens; i++) {
if (0 > ompi_unpack_string(buffer, tokptr)) {
goto RETURN_ERROR;
}
tokptr++;
}
*tokptr = NULL;
response = (int32_t)ompi_registry.delete_object(mode, segment, tokens);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
} else if (MCA_GPR_INDEX_CMD == command) {
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
goto RETURN_ERROR;
}
if (0 == mode) { /* only want dict of segments */
segment = NULL;
} else {
if (0 > ompi_unpack_string(buffer, &segment)) {
goto RETURN_ERROR;
}
}
returned_list = ompi_registry.index(segment);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
}
response = (int32_t)ompi_list_get_size(returned_list);
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (0 < response) { /* don't send anything else back if the list is empty */
for (indexval = (ompi_registry_index_value_t*)ompi_list_get_first(returned_list);
indexval != (ompi_registry_index_value_t*)ompi_list_get_end(returned_list);
indexval = (ompi_registry_index_value_t*)ompi_list_get_next(indexval)) { /* traverse the list */
if (OMPI_SUCCESS != ompi_pack_string(answer, indexval->token)) {
goto RETURN_ERROR;
}
}
}
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
} else if (MCA_GPR_SUBSCRIBE_CMD == command) {
action = OMPI_REGISTRY_NOTIFY_ALL;
goto RETURN_ERROR;
} else if (MCA_GPR_UNSUBSCRIBE_CMD == command) {
goto RETURN_ERROR;
} else if (MCA_GPR_TEST_INTERNALS_CMD == command) {
if ((OMPI_SUCCESS != ompi_unpack(buffer, &test_level, 1, OMPI_INT32)) ||
(0 > test_level)) {
goto RETURN_ERROR;
}
returned_list = gpr_replica_test_internals(test_level);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
}
response = (int32_t)ompi_list_get_size(returned_list);
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (0 < response) { /* don't send anything else back if the list is empty */
for (testval = (ompi_registry_internal_test_results_t*)ompi_list_get_first(returned_list);
testval != (ompi_registry_internal_test_results_t*)ompi_list_get_end(returned_list);
testval = (ompi_registry_internal_test_results_t*)ompi_list_get_next(testval)) { /* traverse the list */
if (OMPI_SUCCESS != ompi_pack_string(answer, testval->test)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_pack_string(answer, testval->message)) {
goto RETURN_ERROR;
}
}
}
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
} else { /* got an unrecognized command */
RETURN_ERROR:
ompi_buffer_init(&error_answer, 8);
@ -453,6 +572,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
}
ompi_buffer_free(answer);
ompi_buffer_free(buffer);
/* reissue the non-blocking receive */
mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);