Update the registry to complete the publish/subscribe system. Add new function "cancel_synchro" to remove a synchro trigger, enable subscribe/unsubscribe on proxies, minor cleanups. Registry should now be fully functional. Note that I am currently unable to test this in a multi-process environment - can only guarantee it compiles and passes replica-only tests.
This commit was SVN r2393.
Этот коммит содержится в:
родитель
094650dafb
Коммит
71ad56d894
@ -98,9 +98,10 @@ typedef uint16_t ompi_registry_synchro_mode_t;
|
||||
#define MCA_GPR_SUBSCRIBE_CMD 0x0010
|
||||
#define MCA_GPR_UNSUBSCRIBE_CMD 0x0020
|
||||
#define MCA_GPR_SYNCHRO_CMD 0x0040
|
||||
#define MCA_GPR_GET_CMD 0x0080
|
||||
#define MCA_GPR_TEST_INTERNALS_CMD 0x0100
|
||||
#define MCA_GPR_NOTIFY_CMD 0x0200 /**< Indicates a notify message */
|
||||
#define MCA_GPR_CANCEL_SYNCHRO_CMD 0x0080
|
||||
#define MCA_GPR_GET_CMD 0x0100
|
||||
#define MCA_GPR_TEST_INTERNALS_CMD 0x0200
|
||||
#define MCA_GPR_NOTIFY_CMD 0x0400 /**< Indicates a notify message */
|
||||
#define MCA_GPR_ERROR 0xffff
|
||||
|
||||
typedef uint16_t mca_gpr_cmd_flag_t;
|
||||
@ -196,23 +197,25 @@ typedef int (*mca_gpr_base_module_delete_segment_fn_t)(char *segment);
|
||||
typedef int (*mca_gpr_base_module_put_fn_t)(ompi_registry_mode_t mode, char *segment,
|
||||
char **tokens, ompi_registry_object_t object,
|
||||
ompi_registry_object_size_t size);
|
||||
typedef ompi_list_t* (*mca_gpr_base_module_get_fn_t)(ompi_registry_mode_t mode,
|
||||
typedef ompi_list_t* (*mca_gpr_base_module_get_fn_t)(ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens);
|
||||
typedef int (*mca_gpr_base_module_delete_fn_t)(ompi_registry_mode_t mode,
|
||||
typedef int (*mca_gpr_base_module_delete_fn_t)(ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens);
|
||||
typedef ompi_list_t* (*mca_gpr_base_module_index_fn_t)(char *segment);
|
||||
typedef int (*mca_gpr_base_module_subscribe_fn_t)(ompi_registry_mode_t mode,
|
||||
typedef int (*mca_gpr_base_module_subscribe_fn_t)(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
|
||||
typedef int (*mca_gpr_base_module_unsubscribe_fn_t)(ompi_registry_mode_t mode,
|
||||
typedef int (*mca_gpr_base_module_unsubscribe_fn_t)(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
|
||||
char *segment, char **tokens);
|
||||
typedef int (*mca_gpr_base_module_synchro_fn_t)(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
|
||||
typedef int (*mca_gpr_base_module_cancel_synchro_fn_t)(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger);
|
||||
|
||||
/*
|
||||
* test interface for internal functions - optional to provide
|
||||
@ -230,6 +233,7 @@ struct mca_gpr_base_module_1_0_0_t {
|
||||
mca_gpr_base_module_subscribe_fn_t subscribe;
|
||||
mca_gpr_base_module_unsubscribe_fn_t unsubscribe;
|
||||
mca_gpr_base_module_synchro_fn_t synchro;
|
||||
mca_gpr_base_module_cancel_synchro_fn_t cancel_synchro;
|
||||
mca_gpr_base_module_delete_fn_t delete_object;
|
||||
mca_gpr_base_module_index_fn_t index;
|
||||
mca_gpr_base_module_test_internals_fn_t test_internals;
|
||||
|
@ -321,16 +321,235 @@ int gpr_proxy_subscribe(ompi_registry_mode_t mode,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
|
||||
{
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
ompi_buffer_t cmd;
|
||||
ompi_buffer_t answer;
|
||||
mca_gpr_cmd_flag_t command;
|
||||
char **tokptr;
|
||||
int recv_tag, i;
|
||||
int32_t num_tokens, response;
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_idtag_list_t *ptr_free_id;
|
||||
|
||||
trackptr = NULL;
|
||||
|
||||
/* need to protect against errors */
|
||||
if (NULL == segment) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
command = MCA_GPR_SUBSCRIBE_CMD;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &action, 1, MCA_GPR_OOB_PACK_ACTION)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
num_tokens = 0;
|
||||
if (NULL != tokens) {
|
||||
/* compute number of tokens */
|
||||
tokptr = tokens;
|
||||
while (NULL != *tokptr) {
|
||||
num_tokens++;
|
||||
tokptr++;
|
||||
}
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 < num_tokens) {
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) { /* pack the tokens */
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, *tokptr)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
}
|
||||
|
||||
/* store callback function and user_tag in local list for lookup */
|
||||
/* generate id_tag to send to replica to identify lookup entry */
|
||||
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
|
||||
trackptr->requestor = NULL;
|
||||
trackptr->req_tag = 0;
|
||||
trackptr->callback = cb_func;
|
||||
trackptr->user_tag = user_tag;
|
||||
if (ompi_list_is_empty(&mca_gpr_proxy_free_notify_id_tags)) {
|
||||
trackptr->id_tag = mca_gpr_proxy_last_notify_id_tag;
|
||||
mca_gpr_proxy_last_notify_id_tag++;
|
||||
} else {
|
||||
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_proxy_free_notify_id_tags);
|
||||
trackptr->id_tag = ptr_free_id->id_tag;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &trackptr->id_tag, 1, OMPI_INT32)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_SUBSCRIBE_CMD != command)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) ||
|
||||
(OMPI_SUCCESS != response)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item);
|
||||
ompi_buffer_free(answer);
|
||||
ompi_buffer_free(cmd);
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
CLEANUP:
|
||||
if (NULL != trackptr) {
|
||||
mca_gpr_proxy_last_notify_id_tag--;
|
||||
OBJ_RELEASE(trackptr);
|
||||
}
|
||||
ompi_buffer_free(cmd);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
int gpr_proxy_unsubscribe(ompi_registry_mode_t mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
|
||||
char *segment, char **tokens)
|
||||
{
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
ompi_buffer_t cmd;
|
||||
ompi_buffer_t answer;
|
||||
mca_gpr_cmd_flag_t command;
|
||||
char **tokptr;
|
||||
int recv_tag, i;
|
||||
int32_t num_tokens, response;
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_idtag_list_t *ptr_free_id;
|
||||
|
||||
trackptr = NULL;
|
||||
|
||||
/* need to protect against errors */
|
||||
if (NULL == segment) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
command = MCA_GPR_UNSUBSCRIBE_CMD;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &action, 1, MCA_GPR_OOB_PACK_ACTION)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
num_tokens = 0;
|
||||
if (NULL != tokens) {
|
||||
/* compute number of tokens */
|
||||
tokptr = tokens;
|
||||
while (NULL != *tokptr) {
|
||||
num_tokens++;
|
||||
tokptr++;
|
||||
}
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 < num_tokens) {
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) { /* pack the tokens */
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, *tokptr)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_UNSUBSCRIBE_CMD != command)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) ||
|
||||
(OMPI_SUCCESS != response)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (MCA_GPR_NOTIFY_ID_MAX == response) { /* got an error on replica */
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* locate corresponding entry on proxy tracker list and remove it */
|
||||
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
|
||||
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) &&
|
||||
trackptr->id_tag != response;
|
||||
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
|
||||
|
||||
if (trackptr == (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
ompi_list_remove_item(&mca_gpr_proxy_notify_request_tracker, &trackptr->item);
|
||||
/* put id tag on free list */
|
||||
ptr_free_id = OBJ_NEW(mca_gpr_idtag_list_t);
|
||||
ptr_free_id->id_tag = trackptr->id_tag;
|
||||
ompi_list_append(&mca_gpr_proxy_free_notify_id_tags, &ptr_free_id->item);
|
||||
/* release tracker item */
|
||||
OBJ_RELEASE(trackptr);
|
||||
ompi_buffer_free(answer);
|
||||
ompi_buffer_free(cmd);
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
CLEANUP:
|
||||
ompi_buffer_free(cmd);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
@ -455,17 +674,137 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
CLEANUP:
|
||||
/* if (NULL != trackptr) {
|
||||
ptr_free_id = OBJ_NEW(mca_gpr_idtag_list_t);
|
||||
ptr_free_id->id_tag = trackptr->id_tag;
|
||||
ompi_list_append(&mca_gpr_proxy_free_notify_id_tags, &ptr_free_id->item);
|
||||
if (NULL != trackptr) {
|
||||
mca_gpr_proxy_last_notify_id_tag--;
|
||||
OBJ_RELEASE(trackptr);
|
||||
}
|
||||
*/
|
||||
ompi_buffer_free(cmd);
|
||||
return OMPI_ERROR;
|
||||
|
||||
}
|
||||
|
||||
int gpr_proxy_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger)
|
||||
{
|
||||
ompi_buffer_t cmd;
|
||||
ompi_buffer_t answer;
|
||||
mca_gpr_cmd_flag_t command;
|
||||
char **tokptr;
|
||||
int recv_tag, i;
|
||||
int32_t num_tokens, response;
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_idtag_list_t *ptr_free_id;
|
||||
|
||||
trackptr = NULL;
|
||||
|
||||
/* need to protect against errors */
|
||||
if (NULL == segment) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
command = MCA_GPR_CANCEL_SYNCHRO_CMD;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_buffer_init(&cmd, 0)) { /* got a problem */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
response = (int32_t)synchro_mode;
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &response, 1, OMPI_INT32)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &addr_mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, segment)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
num_tokens = 0;
|
||||
if (NULL != tokens) {
|
||||
/* compute number of tokens */
|
||||
tokptr = tokens;
|
||||
while (NULL != *tokptr) {
|
||||
num_tokens++;
|
||||
tokptr++;
|
||||
}
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 < num_tokens) {
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) { /* pack the tokens */
|
||||
if (OMPI_SUCCESS != ompi_pack_string(cmd, *tokptr)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
}
|
||||
|
||||
response = (int32_t)trigger;
|
||||
if (OMPI_SUCCESS != ompi_pack(cmd, &response, 1, OMPI_INT32)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(mca_gpr_my_replica, cmd, MCA_OOB_TAG_GPR, 0)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_recv_packed(mca_gpr_my_replica, &answer, &recv_tag)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|
||||
|| (MCA_GPR_CANCEL_SYNCHRO_CMD != command)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if ((OMPI_SUCCESS != ompi_unpack(answer, &response, 1, OMPI_INT32)) ||
|
||||
(OMPI_SUCCESS != response)) {
|
||||
ompi_buffer_free(answer);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (MCA_GPR_NOTIFY_ID_MAX == response) { /* got an error on replica */
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* locate corresponding entry on proxy tracker list and remove it */
|
||||
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
|
||||
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) &&
|
||||
trackptr->id_tag != response;
|
||||
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
|
||||
|
||||
if (trackptr == (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker)) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
ompi_list_remove_item(&mca_gpr_proxy_notify_request_tracker, &trackptr->item);
|
||||
/* put id tag on free list */
|
||||
ptr_free_id = OBJ_NEW(mca_gpr_idtag_list_t);
|
||||
ptr_free_id->id_tag = trackptr->id_tag;
|
||||
ompi_list_append(&mca_gpr_proxy_free_notify_id_tags, &ptr_free_id->item);
|
||||
/* release tracker item */
|
||||
OBJ_RELEASE(trackptr);
|
||||
ompi_buffer_free(answer);
|
||||
ompi_buffer_free(cmd);
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
CLEANUP:
|
||||
ompi_buffer_free(cmd);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
ompi_list_t* gpr_proxy_get(ompi_registry_mode_t mode, char *segment, char **tokens)
|
||||
{
|
||||
ompi_buffer_t cmd;
|
||||
@ -570,12 +909,12 @@ ompi_list_t* gpr_proxy_get(ompi_registry_mode_t mode, char *segment, char **toke
|
||||
|
||||
ompi_list_t* gpr_proxy_test_internals(int level)
|
||||
{
|
||||
ompi_list_t *test_results;
|
||||
ompi_list_t *test_results=NULL;
|
||||
ompi_buffer_t cmd, answer;
|
||||
char **string1, **string2;
|
||||
char **string1=NULL, **string2=NULL;
|
||||
int i;
|
||||
int32_t num_responses, test_level;
|
||||
ompi_registry_internal_test_results_t *newptr;
|
||||
ompi_registry_internal_test_results_t *newptr=NULL;
|
||||
mca_gpr_cmd_flag_t command;
|
||||
int recv_tag;
|
||||
|
||||
|
@ -75,14 +75,16 @@ int gpr_proxy_subscribe(ompi_registry_mode_t mode,
|
||||
|
||||
int gpr_proxy_unsubscribe(ompi_registry_mode_t mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
|
||||
char *segment, char **tokens);
|
||||
|
||||
int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t mode,
|
||||
char *segment, char **tokens, int trigger,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
|
||||
|
||||
int gpr_proxy_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger);
|
||||
/*
|
||||
* Implementation of get()
|
||||
*/
|
||||
|
@ -53,6 +53,7 @@ static mca_gpr_base_module_t mca_gpr_proxy = {
|
||||
gpr_proxy_subscribe,
|
||||
gpr_proxy_unsubscribe,
|
||||
gpr_proxy_synchro,
|
||||
gpr_proxy_cancel_synchro,
|
||||
gpr_proxy_delete_object,
|
||||
gpr_proxy_index,
|
||||
gpr_proxy_test_internals
|
||||
|
@ -322,14 +322,43 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
|
||||
}
|
||||
}
|
||||
|
||||
int gpr_replica_unsubscribe(ompi_registry_mode_t mode,
|
||||
int gpr_replica_unsubscribe(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
|
||||
char *segment, char **tokens)
|
||||
{
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_notify_id_t id_tag;
|
||||
|
||||
/* protect against errors */
|
||||
if (NULL == segment) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* find trigger on replica - return id_tag */
|
||||
if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
|
||||
addr_mode, segment, tokens, 0))) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* find request on notify tracking system */
|
||||
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
|
||||
trackptr->id_tag != id_tag;
|
||||
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
|
||||
|
||||
/* ...and remove it */
|
||||
if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) {
|
||||
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
OBJ_RELEASE(trackptr);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* if we get here, then couldn't find request */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger,
|
||||
@ -369,17 +398,53 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
}
|
||||
}
|
||||
|
||||
int gpr_replica_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger)
|
||||
{
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_notify_id_t id_tag;
|
||||
|
||||
/* protect against errors */
|
||||
if (NULL == segment || 0 > trigger) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* find trigger on replica - return id_tag */
|
||||
if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
|
||||
addr_mode, segment, tokens, trigger))) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* find request on notify tracking system */
|
||||
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
|
||||
trackptr->id_tag != id_tag;
|
||||
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
|
||||
|
||||
/* ...and remove it */
|
||||
if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) {
|
||||
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
OBJ_RELEASE(trackptr);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* if we get here, then couldn't find request */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens)
|
||||
{
|
||||
mca_gpr_replica_segment_t *seg;
|
||||
ompi_list_t *answer;
|
||||
ompi_registry_value_t *ans;
|
||||
mca_gpr_replica_key_t *keys, *key2;
|
||||
ompi_list_t *keylist;
|
||||
mca_gpr_replica_keytable_t *keyptr;
|
||||
mca_gpr_replica_core_t *reg;
|
||||
int num_tokens;
|
||||
mca_gpr_replica_segment_t *seg=NULL;
|
||||
ompi_list_t *answer=NULL;
|
||||
ompi_registry_value_t *ans=NULL;
|
||||
mca_gpr_replica_key_t *keys=NULL, *key2=NULL;
|
||||
ompi_list_t *keylist=NULL;
|
||||
mca_gpr_replica_keytable_t *keyptr=NULL;
|
||||
mca_gpr_replica_core_t *reg=NULL;
|
||||
int num_tokens=0;
|
||||
|
||||
answer = OBJ_NEW(ompi_list_t);
|
||||
|
||||
|
@ -194,31 +194,34 @@ int mca_gpr_replica_finalize(void);
|
||||
|
||||
int gpr_replica_delete_segment(char *segment);
|
||||
|
||||
int gpr_replica_put(ompi_registry_mode_t mode, char *segment,
|
||||
int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
|
||||
char **tokens, ompi_registry_object_t object,
|
||||
ompi_registry_object_size_t size);
|
||||
|
||||
int gpr_replica_delete_object(ompi_registry_mode_t mode,
|
||||
int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens);
|
||||
|
||||
ompi_list_t* gpr_replica_index(char *segment);
|
||||
|
||||
int gpr_replica_subscribe(ompi_registry_mode_t mode,
|
||||
int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
|
||||
|
||||
int gpr_replica_unsubscribe(ompi_registry_mode_t mode,
|
||||
int gpr_replica_unsubscribe(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
|
||||
char *segment, char **tokens);
|
||||
|
||||
int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag);
|
||||
|
||||
ompi_list_t* gpr_replica_get(ompi_registry_mode_t mode,
|
||||
int gpr_replica_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger);
|
||||
|
||||
ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens);
|
||||
|
||||
ompi_list_t* gpr_replica_test_internals(int level);
|
||||
|
@ -59,6 +59,7 @@ static mca_gpr_base_module_t mca_gpr_replica = {
|
||||
gpr_replica_subscribe,
|
||||
gpr_replica_unsubscribe,
|
||||
gpr_replica_synchro,
|
||||
gpr_replica_cancel_synchro,
|
||||
gpr_replica_delete_object,
|
||||
gpr_replica_index,
|
||||
gpr_replica_test_internals
|
||||
@ -656,12 +657,149 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
|
||||
|
||||
/***** SUBSCRIBE *****/
|
||||
} else if (MCA_GPR_SUBSCRIBE_CMD == command) {
|
||||
action = OMPI_REGISTRY_NOTIFY_ALL;
|
||||
goto RETURN_ERROR;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &action, 1, MCA_GPR_OOB_PACK_ACTION)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 > ompi_unpack_string(buffer, &segment)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 < num_tokens) { /* tokens provided */
|
||||
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) {
|
||||
if (0 > ompi_unpack_string(buffer, tokptr)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
*tokptr = NULL;
|
||||
} else { /* no tokens provided - wildcard case */
|
||||
tokens = NULL;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &id_tag, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
/* enter request on notify tracking system */
|
||||
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
|
||||
trackptr->requestor = ompi_name_server.copy_process_name(sender);
|
||||
trackptr->req_tag = id_tag;
|
||||
trackptr->callback = NULL;
|
||||
trackptr->user_tag = NULL;
|
||||
if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) {
|
||||
trackptr->id_tag = mca_gpr_replica_last_notify_id_tag;
|
||||
mca_gpr_replica_last_notify_id_tag++;
|
||||
} else {
|
||||
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
|
||||
trackptr->id_tag = ptr_free_id->id_tag;
|
||||
}
|
||||
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
|
||||
response = (int32_t)gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
|
||||
mode, segment, tokens,
|
||||
0, trackptr->id_tag);
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
|
||||
/* RHC -- not sure what to do if the return send fails */
|
||||
}
|
||||
|
||||
|
||||
/***** UNSUBSCRIBE *****/
|
||||
} else if (MCA_GPR_UNSUBSCRIBE_CMD == command) {
|
||||
goto RETURN_ERROR;
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &action, 1, MCA_GPR_OOB_PACK_ACTION)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 > ompi_unpack_string(buffer, &segment)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 < num_tokens) { /* tokens provided */
|
||||
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) {
|
||||
if (0 > ompi_unpack_string(buffer, tokptr)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
*tokptr = NULL;
|
||||
} else { /* no tokens provided - wildcard case */
|
||||
tokens = NULL;
|
||||
}
|
||||
|
||||
id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, mode,
|
||||
segment, tokens, 0);
|
||||
|
||||
if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */
|
||||
/* find request on replica notify tracking system */
|
||||
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
|
||||
trackptr->id_tag != id_tag;
|
||||
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
|
||||
|
||||
if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) {
|
||||
|
||||
/* ...pack the remote id tag to send back to proxy */
|
||||
response = (int32_t)trackptr->req_tag;
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
/* ...and remove it */
|
||||
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
/* put id tag on free list */
|
||||
ptr_free_id = OBJ_NEW(mca_gpr_idtag_list_t);
|
||||
ptr_free_id->id_tag = trackptr->id_tag;
|
||||
ompi_list_append(&mca_gpr_replica_free_notify_id_tags, &ptr_free_id->item);
|
||||
/* release tracker item */
|
||||
OBJ_RELEASE(trackptr);
|
||||
}
|
||||
} else {
|
||||
response = (int32_t)MCA_GPR_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
|
||||
/* RHC -- not sure what to do if the return send fails */
|
||||
}
|
||||
|
||||
|
||||
/***** SYNCHRO *****/
|
||||
} else if (MCA_GPR_SYNCHRO_CMD == command) {
|
||||
@ -738,6 +876,91 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
|
||||
/* RHC -- not sure what to do if the return send fails */
|
||||
}
|
||||
|
||||
/***** CANCEL SYNCHRO *****/
|
||||
} else if (MCA_GPR_CANCEL_SYNCHRO_CMD == command) {
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &synchro_mode, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_REGISTRY_SYNCHRO_MODE_NONE == synchro_mode) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &mode, 1, MCA_GPR_OOB_PACK_MODE)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 > ompi_unpack_string(buffer, &segment)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_tokens, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 < num_tokens) { /* tokens provided */
|
||||
tokens = (char**)malloc((num_tokens+1)*sizeof(char*));
|
||||
tokptr = tokens;
|
||||
for (i=0; i<num_tokens; i++) {
|
||||
if (0 > ompi_unpack_string(buffer, tokptr)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
tokptr++;
|
||||
}
|
||||
*tokptr = NULL;
|
||||
} else { /* no tokens provided - wildcard case, just count entries on segment */
|
||||
tokens = NULL;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_unpack(buffer, &trigger, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, mode,
|
||||
segment, tokens, trigger);
|
||||
|
||||
if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */
|
||||
/* find request on replica notify tracking system */
|
||||
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
|
||||
trackptr->id_tag != id_tag;
|
||||
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
|
||||
|
||||
if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) {
|
||||
|
||||
/* ...pack the remote id tag to send back to proxy */
|
||||
response = (int32_t)trackptr->req_tag;
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
/* ...and remove it */
|
||||
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
/* put id tag on free list */
|
||||
ptr_free_id = OBJ_NEW(mca_gpr_idtag_list_t);
|
||||
ptr_free_id->id_tag = trackptr->id_tag;
|
||||
ompi_list_append(&mca_gpr_replica_free_notify_id_tags, &ptr_free_id->item);
|
||||
/* release tracker item */
|
||||
OBJ_RELEASE(trackptr);
|
||||
}
|
||||
} else {
|
||||
response = (int32_t)MCA_GPR_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
|
||||
goto RETURN_ERROR;
|
||||
}
|
||||
|
||||
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
|
||||
/* RHC -- not sure what to do if the return send fails */
|
||||
}
|
||||
|
||||
|
||||
/***** TEST INTERNALS *****/
|
||||
} else if (MCA_GPR_TEST_INTERNALS_CMD == command) {
|
||||
|
||||
|
@ -466,6 +466,83 @@ int gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
|
||||
}
|
||||
|
||||
mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger)
|
||||
{
|
||||
mca_gpr_replica_segment_t *seg;
|
||||
mca_gpr_replica_trigger_list_t *trig;
|
||||
mca_gpr_notify_id_t id_tag;
|
||||
char **tokptr;
|
||||
mca_gpr_replica_key_t *keys, *keyptr, *kptr;
|
||||
int i, num_tokens;
|
||||
bool found, mismatch;
|
||||
|
||||
seg = gpr_replica_find_seg(false, segment);
|
||||
if (NULL == seg) { /* couldn't find segment */
|
||||
return MCA_GPR_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
found = false;
|
||||
num_tokens = 0;
|
||||
|
||||
if (NULL != tokens) { /* tokens provided */
|
||||
|
||||
/* count number of tokens */
|
||||
tokptr = tokens;
|
||||
num_tokens = 0;
|
||||
while (NULL != tokptr && NULL != *tokptr) {
|
||||
num_tokens++;
|
||||
tokptr++;
|
||||
}
|
||||
keys = (mca_gpr_replica_key_t*)malloc(num_tokens*sizeof(mca_gpr_replica_key_t));
|
||||
keyptr = keys;
|
||||
/* store key values of tokens - any undefined means error */
|
||||
for (i=0, tokptr=tokens; NULL != tokptr && NULL != *tokptr; i++, tokptr++) {
|
||||
*keyptr = gpr_replica_get_key(segment, *tokptr);
|
||||
if (MCA_GPR_REPLICA_KEY_MAX == *keyptr) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
keyptr++;
|
||||
}
|
||||
}
|
||||
|
||||
/* search segment's trigger list for specified trigger event */
|
||||
for (trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_first(&seg->triggers);
|
||||
trig != (mca_gpr_replica_trigger_list_t*)ompi_list_get_end(&seg->triggers) && !found;
|
||||
trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_next(trig)) {
|
||||
if (trig->synch_mode == synchro_mode &&
|
||||
trig->action == action &&
|
||||
trig->addr_mode == addr_mode &&
|
||||
trig->trigger == trigger &&
|
||||
trig->num_keys == num_tokens) { /* all else matches - check keys */
|
||||
mismatch = false;
|
||||
for (i=0, keyptr=keys, kptr=trig->keys; i < num_tokens && !mismatch; i++, keyptr++, kptr++) {
|
||||
if (*keyptr != *kptr) {
|
||||
mismatch = true;
|
||||
}
|
||||
}
|
||||
if (!mismatch) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
if (NULL != keys) {
|
||||
free(keys);
|
||||
}
|
||||
|
||||
if (found) {
|
||||
id_tag = trig->id_tag;
|
||||
ompi_list_remove_item(&seg->triggers, &trig->item);
|
||||
OBJ_RELEASE(trig);
|
||||
return id_tag;
|
||||
}
|
||||
|
||||
return MCA_GPR_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_registry_mode_t addr_mode, char *segment, char **tokens)
|
||||
{
|
||||
|
@ -88,3 +88,8 @@ ompi_registry_notify_message_t *gpr_replica_construct_notify_message(ompi_regist
|
||||
void gpr_replica_process_triggers(mca_gpr_replica_segment_t *seg,
|
||||
mca_gpr_replica_trigger_list_t *trig,
|
||||
ompi_registry_notify_message_t *message);
|
||||
|
||||
mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger);
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user