Update header files to reflect change in API - registry objects will now be void*. Users can call the packing routines to generate packed info, then extract and pass the resulting object to the GPR for storage. Users can also simply send a string for storage without packing as characters carry across platforms just fine.
Added OOB connection - still needs testing, but test tree is currently kaput. This commit was SVN r2226.
Этот коммит содержится в:
родитель
61291736c6
Коммит
8edd599cf4
@ -35,29 +35,60 @@
|
||||
|
||||
/** Define the notification actions for the subscription system
|
||||
*/
|
||||
typedef enum {OMPI_REGISTRY_NOTIFY_MODIFICATION=0x0001, /**< Notifies subscriber when object modified */
|
||||
OMPI_REGISTRY_NOTIFY_ADD_SUBSCRIBER=0x0002, /**< Notifies subscriber when another subscriber added */
|
||||
OMPI_REGISTRY_NOTIFY_DELETE=0x0004, /**< Notifies subscriber when object deleted */
|
||||
OMPI_REGISTRY_NOTIFY_ALL=0xffff /**< Notifies subscriber upon any action */
|
||||
} ompi_registry_notify_action_t;
|
||||
#define OMPI_REGISTRY_NOTIFY_MODIFICATION 0x0001 /**< Notifies subscriber when object modified */
|
||||
#define OMPI_REGISTRY_NOTIFY_ADD_SUBSCRIBER 0x0002 /**< Notifies subscriber when another subscriber added */
|
||||
#define OMPI_REGISTRY_NOTIFY_DELETE 0x0004 /**< Notifies subscriber when object deleted */
|
||||
#define OMPI_REGISTRY_NOTIFY_ALL 0xffff /**< Notifies subscriber upon any action */
|
||||
|
||||
typedef uint16_t ompi_registry_notify_action_t;
|
||||
|
||||
|
||||
/** Define the mode bit-masks for registry operations.
|
||||
*/
|
||||
typedef enum {OMPI_REGISTRY_NONE=0x0000, /**< None */
|
||||
OMPI_REGISTRY_OVERWRITE=0x0001, /**< Overwrite Permission */
|
||||
OMPI_REGISTRY_AND=0x0002, /**< AND tokens together for search results */
|
||||
OMPI_REGISTRY_OR=0x0004, /**< OR tokens for search results */
|
||||
OMPI_REGISTRY_XAND=0x0008, /**< All tokens required, nothing else allowed */
|
||||
OMPI_REGISTRY_XOR=0x0010 /**< Any one of the tokens required, nothing else allowed */
|
||||
} ompi_registry_mode_t;
|
||||
#define OMPI_REGISTRY_NONE 0x0000 /**< None */
|
||||
#define OMPI_REGISTRY_OVERWRITE 0x0001 /**< Overwrite Permission */
|
||||
#define OMPI_REGISTRY_AND 0x0002 /**< AND tokens together for search results */
|
||||
#define OMPI_REGISTRY_OR 0x0004 /**< OR tokens for search results */
|
||||
#define OMPI_REGISTRY_XAND 0x0008 /**< All tokens required, nothing else allowed */
|
||||
#define OMPI_REGISTRY_XOR 0x0010 /**< Any one of the tokens required, nothing else allowed */
|
||||
|
||||
typedef uint16_t ompi_registry_mode_t;
|
||||
|
||||
|
||||
/*
|
||||
* Define flag values for remote commands - only used internally
|
||||
*/
|
||||
#define MCA_GPR_DEFINE_SEGMENT_CMD 0x0001
|
||||
#define MCA_GPR_DELETE_SEGMENT_CMD 0x0002
|
||||
#define MCA_GPR_PUT_CMD 0x0004
|
||||
#define MCA_GPR_DELETE_OBJECT_CMD 0x0008
|
||||
#define MCA_GPR_INDEX_CMD 0x0010
|
||||
#define MCA_GPR_SUBSCRIBE_CMD 0x0020
|
||||
#define MCA_GPR_UNSUBSCRIBE_CMD 0x0040
|
||||
#define MCA_GPR_GET_CMD 0x0080
|
||||
#define MCA_GPR_TEST_INTERNALS_CMD 0x0100
|
||||
#define MCA_GPR_ERROR 0xffff
|
||||
|
||||
typedef uint16_t mca_gpr_cmd_flag_t;
|
||||
|
||||
/*
|
||||
* packing type definitions
|
||||
*/
|
||||
/* CAUTION - any changes here must also change corresponding
|
||||
* typedefs above
|
||||
*/
|
||||
#define MCA_GPR_OOB_PACK_CMD OMPI_INT16
|
||||
#define MCA_GPR_OOB_PACK_ACTION OMPI_INT16
|
||||
#define MCA_GPR_OOB_PACK_MODE OMPI_INT16
|
||||
#define MCA_GPR_OOB_PACK_OBJECT_SIZE OMPI_INT32
|
||||
|
||||
|
||||
/*
|
||||
* typedefs
|
||||
*/
|
||||
|
||||
typedef ompi_buffer_t ompi_registry_object_t;
|
||||
typedef size_t ompi_registry_object_size_t;
|
||||
typedef void* ompi_registry_object_t;
|
||||
typedef uint32_t ompi_registry_object_size_t;
|
||||
|
||||
|
||||
/*
|
||||
@ -74,7 +105,7 @@ typedef size_t ompi_registry_object_size_t;
|
||||
*/
|
||||
struct ompi_registry_value_t {
|
||||
ompi_list_item_t item; /**< Allows this item to be placed on a list */
|
||||
ompi_registry_object_t *object; /**< Pointer to object being returned */
|
||||
ompi_registry_object_t object; /**< Pointer to object being returned */
|
||||
ompi_registry_object_size_t object_size; /**< Size of returned object, in bytes */
|
||||
};
|
||||
typedef struct ompi_registry_value_t ompi_registry_value_t;
|
||||
|
@ -20,7 +20,48 @@
|
||||
|
||||
int gpr_proxy_define_segment(char *segment)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
ompi_buffer_t cmd;
|
||||
ompi_buffer_t answer;
|
||||
mca_gpr_cmd_flag_t command;
|
||||
int recv_tag;
|
||||
int32_t response;
|
||||
|
||||
command = MCA_GPR_DEFINE_SEGMENT_CMD;
|
||||
recv_tag = MCA_OOB_TAG_GPR;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, (void*)&command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, (void*)segment, 1, OMPI_STRING)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_DEFINE_SEGMENT_CMD != command)) {
|
||||
ompi_buffer_free(answer);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) {
|
||||
ompi_buffer_free(answer);
|
||||
return OMPI_ERROR;
|
||||
} else {
|
||||
ompi_buffer_free(answer);
|
||||
return (int)response;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -31,15 +72,92 @@ int gpr_proxy_delete_segment(char *segment)
|
||||
|
||||
|
||||
int gpr_proxy_put(ompi_registry_mode_t mode, char *segment,
|
||||
char **tokens, ompi_registry_object_t *object,
|
||||
int size)
|
||||
char **tokens, ompi_registry_object_t *object,
|
||||
int size)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
ompi_buffer_t cmd;
|
||||
ompi_buffer_t answer;
|
||||
mca_gpr_cmd_flag_t command;
|
||||
char **tokptr;
|
||||
int recv_tag, i;
|
||||
int32_t num_tokens, object_size;
|
||||
int16_t response;
|
||||
|
||||
command = MCA_GPR_PUT_CMD;
|
||||
recv_tag = MCA_OOB_TAG_GPR;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, segment, 1, OMPI_STRING)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* compute number of tokens */
|
||||
tokptr = tokens;
|
||||
while (NULL != *tokptr) {
|
||||
num_tokens++;
|
||||
tokptr++;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) { /* pack the tokens */
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, *tokptr)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
|
||||
object_size = (int32_t)size;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, object, object_size, OMPI_BYTE)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_PUT_CMD != command)) {
|
||||
ompi_buffer_free(answer);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) {
|
||||
ompi_buffer_free(answer);
|
||||
return OMPI_ERROR;
|
||||
} else {
|
||||
ompi_buffer_free(answer);
|
||||
return (int)response;
|
||||
}
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
int gpr_proxy_delete(ompi_registry_mode_t mode,
|
||||
char *segment, char **tokens)
|
||||
int gpr_proxy_delete_object(ompi_registry_mode_t mode,
|
||||
char *segment, char **tokens)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -72,11 +190,103 @@ int gpr_proxy_unsubscribe(ompi_process_name_t *caller, ompi_registry_mode_t mode
|
||||
|
||||
ompi_list_t* gpr_proxy_get(ompi_registry_mode_t mode, char *segment, char **tokens)
|
||||
{
|
||||
ompi_list_t *answer;
|
||||
ompi_buffer_t cmd;
|
||||
ompi_buffer_t answer;
|
||||
mca_gpr_cmd_flag_t command;
|
||||
char **tokptr;
|
||||
int recv_tag, i;
|
||||
int32_t num_tokens, object_size, num_responses;
|
||||
ompi_registry_value_t *newptr;
|
||||
ompi_registry_object_t *object;
|
||||
ompi_list_t *returned_list;
|
||||
|
||||
answer = OBJ_NEW(ompi_list_t);
|
||||
returned_list = OBJ_NEW(ompi_list_t);
|
||||
|
||||
return answer;
|
||||
/* need to protect against errors */
|
||||
if (NULL == segment || NULL == tokens || NULL == *tokens) {
|
||||
return returned_list;
|
||||
}
|
||||
|
||||
command = MCA_GPR_GET_CMD;
|
||||
recv_tag = MCA_OOB_TAG_GPR;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
|
||||
return returned_list;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* compute number of tokens */
|
||||
tokptr = tokens;
|
||||
while (NULL != *tokptr) {
|
||||
num_tokens++;
|
||||
tokptr++;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) { /* pack the tokens */
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, *tokptr)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
|
||||
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_GET_CMD != command)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &num_responses, 1, OMPI_INT32)) ||
|
||||
(0 >= num_responses)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
for (i=0; i<num_responses; i++) {
|
||||
if (OMPI_SUCCESS != ompi_unpack(answer, &object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
object = (ompi_registry_object_t)malloc(object_size);
|
||||
if (OMPI_SUCCESS != ompi_unpack(answer, object, object_size, OMPI_BYTE)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
newptr = OBJ_NEW(ompi_registry_value_t);
|
||||
newptr->object_size = object_size;
|
||||
newptr->object = (ompi_registry_object_t)malloc(object_size);
|
||||
memcpy(newptr->object, object, object_size);
|
||||
free(object);
|
||||
ompi_list_append(returned_list, &newptr->item);
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
ompi_buffer_free(cmd);
|
||||
return returned_list;
|
||||
}
|
||||
|
||||
ompi_list_t* gpr_proxy_test_internals(int level)
|
||||
|
@ -53,8 +53,8 @@ int gpr_proxy_put(ompi_registry_mode_t mode, char *segment,
|
||||
/*
|
||||
* Implementation of delete()
|
||||
*/
|
||||
int gpr_proxy_delete(ompi_registry_mode_t mode,
|
||||
char *segment, char **tokens);
|
||||
int gpr_proxy_delete_object(ompi_registry_mode_t mode,
|
||||
char *segment, char **tokens);
|
||||
|
||||
/*
|
||||
* Implementation of index()
|
||||
|
@ -52,7 +52,7 @@ static mca_gpr_base_module_t mca_gpr_proxy = {
|
||||
gpr_proxy_delete_segment,
|
||||
gpr_proxy_subscribe,
|
||||
gpr_proxy_unsubscribe,
|
||||
gpr_proxy_delete,
|
||||
gpr_proxy_delete_object,
|
||||
gpr_proxy_index,
|
||||
gpr_proxy_test_internals
|
||||
};
|
||||
|
@ -29,13 +29,18 @@ int gpr_replica_define_segment(char *segment)
|
||||
{
|
||||
mca_gpr_registry_segment_t *seg;
|
||||
mca_gpr_replica_key_t key;
|
||||
int response;
|
||||
|
||||
key = gpr_replica_define_key(segment, NULL);
|
||||
if (MCA_GPR_REPLICA_KEY_MAX == key) {
|
||||
return(OMPI_ERROR);
|
||||
response = gpr_replica_define_key(segment, NULL);
|
||||
if (0 > response) { /* got some kind of error code */
|
||||
return response;
|
||||
}
|
||||
|
||||
/* need to add the segment to the registry */
|
||||
key = gpr_replica_get_key(segment, NULL);
|
||||
if (MCA_GPR_REPLICA_KEY_MAX == key) { /* couldn't retrieve it */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
seg = OBJ_NEW(mca_gpr_registry_segment_t);
|
||||
seg->segment = key;
|
||||
ompi_list_append(&mca_gpr_replica_head.registry, &seg->item);
|
||||
@ -241,7 +246,7 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode,
|
||||
if (gpr_replica_check_key_list(mode, keys, reg)) { /* found the key(s) on the list */
|
||||
ans = OBJ_NEW(ompi_registry_value_t);
|
||||
ans->object_size = reg->object_size;
|
||||
ans->object = (ompi_buffer_t*)malloc(ans->object_size);
|
||||
ans->object = (ompi_registry_object_t*)malloc(ans->object_size);
|
||||
memcpy(ans->object, reg->object, ans->object_size);
|
||||
ompi_list_append(answer, &ans->item);
|
||||
}
|
||||
|
@ -185,4 +185,8 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode,
|
||||
char *segment, char **tokens);
|
||||
ompi_list_t* gpr_replica_test_internals(int level);
|
||||
|
||||
void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
|
||||
ompi_buffer_t buffer, int tag,
|
||||
void* cbdata);
|
||||
|
||||
#endif
|
||||
|
@ -16,6 +16,8 @@
|
||||
#include "include/constants.h"
|
||||
#include "util/proc_info.h"
|
||||
#include "util/output.h"
|
||||
#include "util/pack.h"
|
||||
|
||||
#include "mca/mca.h"
|
||||
#include "mca/oob/base/base.h"
|
||||
#include "mca/gpr/base/base.h"
|
||||
@ -274,6 +276,9 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
|
||||
seg->segment = gpr_replica_get_key("universe", NULL);
|
||||
ompi_list_append(&mca_gpr_replica_head.registry, &seg->item);
|
||||
|
||||
/* issue the non-blocking receive */
|
||||
mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);
|
||||
|
||||
/* Return the module */
|
||||
|
||||
initialized = true;
|
||||
@ -299,3 +304,174 @@ int mca_gpr_replica_finalize(void)
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* handle message from proxies
|
||||
*/
|
||||
|
||||
void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
|
||||
ompi_buffer_t buffer, int tag,
|
||||
void* cbdata)
|
||||
{
|
||||
ompi_buffer_t answer, error_answer;
|
||||
ompi_registry_object_t *object;
|
||||
ompi_registry_object_size_t object_size;
|
||||
ompi_registry_mode_t mode;
|
||||
ompi_registry_notify_action_t action;
|
||||
ompi_registry_value_t *regval;
|
||||
ompi_list_t *returned_list;
|
||||
char **tokens, **tokptr;
|
||||
int32_t num_tokens, level, i;
|
||||
mca_gpr_cmd_flag_t command;
|
||||
char *segment;
|
||||
int32_t response;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&answer, 0)) {
|
||||
/* RHC -- not sure what to do if this fails */
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (MCA_GPR_DEFINE_SEGMENT_CMD == command) { /* got a command to create a new segment */
|
||||
if (OMPI_SUCCESS != ompi_unpack_string(buffer, &segment)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
response = (int32_t)ompi_registry.define_segment(segment);
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT16)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
|
||||
/* RHC -- not sure what to do if the return send fails */
|
||||
}
|
||||
} else if (MCA_GPR_DELETE_SEGMENT_CMD == command) { /* got command to delete a segment */
|
||||
if (OMPI_SUCCESS != ompi_unpack_string(buffer, &segment)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
response = (int16_t)ompi_registry.delete_segment(segment);
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
|
||||
/* RHC -- not sure what to do if the return send fails */
|
||||
}
|
||||
} else if (MCA_GPR_PUT_CMD == command) { /* got command to put object on registry */
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack_string(buffer, &segment)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
tokens = (char**)malloc(num_tokens*sizeof(char*));
|
||||
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) {
|
||||
if (OMPI_SUCCESS != ompi_unpack_string(buffer, tokptr)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
object = (ompi_registry_object_t)malloc(object_size);
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, object, object_size, OMPI_BYTE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
response = (int32_t)ompi_registry.put(mode, segment, tokens, object, object_size);
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
|
||||
/* RHC -- not sure what to do if the return send fails */
|
||||
}
|
||||
} else if (MCA_GPR_GET_CMD == command) { /* got command to put object on registry */
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack_string(buffer, &segment)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
tokens = (char**)malloc(num_tokens*sizeof(char*));
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) {
|
||||
if (OMPI_SUCCESS != ompi_unpack_string(buffer, tokptr)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
|
||||
|
||||
returned_list = ompi_registry.get(mode, segment, tokens);
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, (void*)&command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
response = (int32_t)ompi_list_get_size(returned_list);
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, (void*)&response, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 < response) { /* don't send anything else back if the list is empty */
|
||||
for (regval = (ompi_registry_value_t*)ompi_list_get_first(returned_list);
|
||||
regval != (ompi_registry_value_t*)ompi_list_get_end(returned_list);
|
||||
regval = (ompi_registry_value_t*)ompi_list_get_next(regval)) { /* traverse the list */
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, ®val->object_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, regval->object, regval->object_size, OMPI_BYTE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
|
||||
/* RHC -- not sure what to do if the return send fails */
|
||||
}
|
||||
} else { /* got an unrecognized command */
|
||||
RETURN_ERROR:
|
||||
ompi_buffer_init(&error_answer, 8);
|
||||
command = MCA_GPR_ERROR;
|
||||
ompi_pack(error_answer, (void*)&command, 1, MCA_GPR_OOB_PACK_CMD);
|
||||
mca_oob_send_packed(sender, error_answer, tag, 0);
|
||||
ompi_buffer_free(error_answer);
|
||||
}
|
||||
|
||||
ompi_buffer_free(answer);
|
||||
|
||||
/* reissue the non-blocking receive */
|
||||
mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);
|
||||
}
|
||||
|
@ -135,14 +135,14 @@ ompi_list_t *gpr_replica_get_key_list(char *segment, char **tokens)
|
||||
return keys;
|
||||
}
|
||||
|
||||
mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
|
||||
int gpr_replica_define_key(char *segment, char *token)
|
||||
{
|
||||
mca_gpr_registry_segment_t *seg;
|
||||
mca_gpr_keytable_t *ptr_seg, *ptr_key, *new;
|
||||
|
||||
/* protect against errors */
|
||||
if (NULL == segment) {
|
||||
return MCA_GPR_REPLICA_KEY_MAX;
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* if token is NULL, then this is defining a segment name. Check dictionary to ensure uniqueness */
|
||||
@ -151,7 +151,7 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
|
||||
ptr_seg != (mca_gpr_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
|
||||
ptr_seg = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_seg)) {
|
||||
if (0 == strcmp(segment, ptr_seg->token)) {
|
||||
return MCA_GPR_REPLICA_KEY_MAX;
|
||||
return OMPI_EXISTS;
|
||||
}
|
||||
}
|
||||
|
||||
@ -163,14 +163,14 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
|
||||
mca_gpr_replica_head.lastkey++;
|
||||
new->key = mca_gpr_replica_head.lastkey;
|
||||
} else { /* out of keys */
|
||||
return MCA_GPR_REPLICA_KEY_MAX;
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
} else {
|
||||
ptr_key = (mca_gpr_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys);
|
||||
new->key = ptr_key->key;
|
||||
}
|
||||
ompi_list_append(&mca_gpr_replica_head.segment_dict, &new->item);
|
||||
return(new->key);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* okay, token is specified */
|
||||
@ -182,7 +182,7 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
|
||||
ptr_key != (mca_gpr_keytable_t*)ompi_list_get_end(&seg->keytable);
|
||||
ptr_key = (mca_gpr_keytable_t*)ompi_list_get_next(ptr_key)) {
|
||||
if (0 == strcmp(token, ptr_key->token)) {
|
||||
return MCA_GPR_REPLICA_KEY_MAX; /* already taken, report error */
|
||||
return OMPI_EXISTS; /* already taken, report error */
|
||||
}
|
||||
}
|
||||
/* okay, token is unique - create dictionary entry */
|
||||
@ -196,10 +196,10 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
|
||||
new->key = ptr_key->key;
|
||||
}
|
||||
ompi_list_append(&seg->keytable, &new->item);
|
||||
return(new->key);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
/* couldn't find segment */
|
||||
return MCA_GPR_REPLICA_KEY_MAX;
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
int gpr_replica_delete_key(char *segment, char *token)
|
||||
|
@ -21,7 +21,7 @@
|
||||
mca_gpr_replica_key_t gpr_replica_get_key(char *segment, char *token);
|
||||
|
||||
/** Add a token to a segment's dictionary.
|
||||
* The gpr_replica_definekey() function allows the addition of a new definition to
|
||||
* The gpr_replica_define_key() function allows the addition of a new definition to
|
||||
* the registry's token-key dictionaries. The specified token is assigned an integer
|
||||
* value within the specified segment, and the entry is added to the segment's token-key
|
||||
* dictionary.
|
||||
@ -30,10 +30,11 @@ mca_gpr_replica_key_t gpr_replica_get_key(char *segment, char *token);
|
||||
* @param token Pointer to a character string containing the token to be defined. If token=NULL,
|
||||
* the function adds the token to the segment dictionary, thus defining a new segment name.
|
||||
*
|
||||
* @retval key Unsigned long integer value corresponding to the specified token within the specified segment.
|
||||
* @retval 0 Indicates that the entry could not be created.
|
||||
* @retval OMPI_EXISTS Indicates that the requested dictionary entry already exists.
|
||||
* @retval OMPI_ERR_OUT_OF_RESOURCE Indicates that the dictionary is full.
|
||||
* @retval OMPI_SUCCESS Indicates that the dictionary entry was created.
|
||||
*/
|
||||
mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token);
|
||||
int gpr_replica_define_key(char *segment, char *token);
|
||||
|
||||
/** Delete a token from a segment's dictionary.
|
||||
* The gpr_replica_deletekey() function allows the removal of a definition from the
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user