1
1
This commit was SVN r2831.
Этот коммит содержится в:
Ralph Castain 2004-09-23 14:36:14 +00:00
родитель 92a5f2da1f
Коммит f7a4e86227

Просмотреть файл

@ -65,7 +65,9 @@ static mca_gpr_base_module_t mca_gpr_replica = {
gpr_replica_cancel_synchro,
gpr_replica_delete_object,
gpr_replica_index,
gpr_replica_test_internals
gpr_replica_test_internals,
gpr_replica_rte_register,
gpr_replica_rte_unregister
};
/*
@ -82,8 +84,7 @@ ompi_list_t mca_gpr_replica_notify_request_tracker;
mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
ompi_list_t mca_gpr_replica_free_notify_id_tags;
int mca_gpr_replica_debug;
ompi_mutex_t mca_gpr_replica_component_mutex;
ompi_mutex_t mca_gpr_replica_mutex, mca_gpr_replica_internals_mutex;
ompi_mutex_t mca_gpr_replica_mutex;
/* constructor - used to initialize state of keytable instance */
@ -336,9 +337,7 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/* setup the thread locks */
OBJ_CONSTRUCT(&mca_gpr_replica_component_mutex, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_gpr_replica_internals_mutex, ompi_mutex_t);
/* setup the thread lock */
OBJ_CONSTRUCT(&mca_gpr_replica_mutex, ompi_mutex_t);
/* initialize the registry head */
@ -446,7 +445,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
ompi_buffer_t buffer, int tag,
void* cbdata)
{
ompi_buffer_t answer, error_answer;
ompi_buffer_t answer, error_answer, reg_buffer;
ompi_registry_object_t *object;
ompi_registry_object_size_t object_size;
ompi_registry_mode_t mode;
@ -456,12 +455,12 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
ompi_registry_internal_test_results_t *testval;
ompi_registry_index_value_t *indexval;
char **tokens, **tokptr;
int32_t num_tokens, test_level, i, trigger, id_tag;
int32_t num_tokens, test_level, i, trigger, id_tag, num_procs;
mca_gpr_notify_id_t local_idtag1, local_idtag2, start_idtag, end_idtag;
pid_t pid;
mca_gpr_cmd_flag_t command;
char *segment;
char *segment, *contact_info, *nodename, *proc_name;
int32_t response, synchro_mode;
mca_gpr_notify_request_tracker_t *trackptr;
mca_gpr_idtag_list_t *ptr_free_id;
if (mca_gpr_replica_debug) {
ompi_output(0, "gpr replica: received message");
@ -486,7 +485,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
response = (int32_t)ompi_registry.delete_segment(segment);
response = (int32_t)gpr_replica_delete_segment(segment);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -545,7 +544,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
response = (int32_t)ompi_registry.put(mode, segment, tokens, object, object_size);
response = (int32_t)gpr_replica_put(mode, segment, tokens, object, object_size);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -591,7 +590,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
*tokptr = NULL;
}
returned_list = ompi_registry.get(mode, segment, tokens);
returned_list = gpr_replica_get(mode, segment, tokens);
if (OMPI_SUCCESS != ompi_pack(answer, (void*)&command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -651,7 +650,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
*tokptr = NULL;
}
response = (int32_t)ompi_registry.delete_object(mode, segment, tokens);
response = (int32_t)gpr_replica_delete_object(mode, segment, tokens);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -684,7 +683,7 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
}
}
returned_list = ompi_registry.index(segment);
returned_list = gpr_replica_index(segment);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -751,30 +750,17 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
}
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
trackptr->requestor = ompi_name_server.copy_process_name(sender);
trackptr->req_tag = id_tag;
trackptr->callback = NULL;
trackptr->user_tag = NULL;
if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) {
trackptr->id_tag = mca_gpr_replica_last_notify_id_tag;
mca_gpr_replica_last_notify_id_tag++;
} else {
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
local_idtag1 = gpr_replica_enter_notify_request(sender, id_tag, NULL, NULL);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
response = (int32_t)gpr_replica_subscribe_nl(mode, action, segment, tokens,
local_idtag1);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
/****** UNLOCK ******/
response = (int32_t)gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
mode, segment, tokens,
0, trackptr->id_tag);
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
}
@ -825,45 +811,13 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
tokens = NULL;
}
id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, mode,
segment, tokens, 0);
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */
response = (int32_t)gpr_replica_unsubscribe_nl(mode, action, segment, tokens);
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
/* find request on replica notify tracking system */
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
trackptr->id_tag != id_tag;
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) {
/* ...pack the remote id tag to send back to proxy */
response = (int32_t)trackptr->req_tag;
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
/* ...and remove it */
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
/* put id tag on free list */
ptr_free_id = OBJ_NEW(mca_gpr_idtag_list_t);
ptr_free_id->id_tag = trackptr->id_tag;
ompi_list_append(&mca_gpr_replica_free_notify_id_tags, &ptr_free_id->item);
/* release tracker item */
OBJ_RELEASE(trackptr);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
/****** UNLOCK ******/
}
} else {
response = (int32_t)MCA_GPR_NOTIFY_ID_MAX;
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
/****** UNLOCK ******/
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -929,42 +883,18 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
if (mca_gpr_replica_debug) {
if (NULL != sender) {
ompi_output(0, "gpr_replica_recv: received synchro req from [%d,%d,%d]", sender->cellid,
sender->jobid, sender->vpid);
} else {
ompi_output(0, "gpr_replica_recv: received synchro req from NULL");
}
}
trackptr->requestor = ompi_name_server.copy_process_name(sender);
trackptr->req_tag = id_tag;
trackptr->callback = NULL;
trackptr->user_tag = NULL;
if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) {
trackptr->id_tag = mca_gpr_replica_last_notify_id_tag;
mca_gpr_replica_last_notify_id_tag++;
} else {
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
trackptr->id_tag = ptr_free_id->id_tag;
}
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
local_idtag1 = gpr_replica_enter_notify_request(sender, id_tag, NULL, NULL);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
response = (int32_t)gpr_replica_synchro_nl(synchro_mode,
mode, segment, tokens,
trigger, local_idtag1);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
/****** UNLOCK ******/
if(NULL != gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
mode, segment, tokens,
trigger, trackptr->id_tag)) {
response = OMPI_SUCCESS;
} else {
response = OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
}
@ -1022,45 +952,114 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, mode,
segment, tokens, trigger);
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */
response = (int32_t)gpr_replica_cancel_synchro_nl(synchro_mode, mode,
segment, tokens, trigger);
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
/****** UNLOCK ******/
/* find request on replica notify tracking system */
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
trackptr->id_tag != id_tag;
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) {
/* ...pack the remote id tag to send back to proxy */
response = (int32_t)trackptr->req_tag;
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
/* ...and remove it */
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
/* put id tag on free list */
ptr_free_id = OBJ_NEW(mca_gpr_idtag_list_t);
ptr_free_id->id_tag = trackptr->id_tag;
ompi_list_append(&mca_gpr_replica_free_notify_id_tags, &ptr_free_id->item);
/* release tracker item */
OBJ_RELEASE(trackptr);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
/****** UNLOCK ******/
}
} else {
response = (int32_t)MCA_GPR_NOTIFY_ID_MAX;
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
/***** REGISTER *****/
} else if (MCA_GPR_RTE_REGISTER_CMD == command) {
if (0 > ompi_unpack_string(buffer, &contact_info)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_procs, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(buffer, &pid, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (0 > ompi_unpack_string(buffer, &nodename)) {
goto RETURN_ERROR;
}
ompi_buffer_init(&reg_buffer, 0);
ompi_pack_string(reg_buffer, contact_info);
ompi_pack(reg_buffer, &pid, 1, OMPI_INT32);
ompi_pack_string(buffer, nodename);
if (OMPI_SUCCESS != ompi_unpack(buffer, &start_idtag, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_unpack(buffer, &end_idtag, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* enter start request on notify tracking system */
local_idtag1 = gpr_replica_enter_notify_request(sender, start_idtag, NULL, NULL);
/* enter end request on notify tracking system */
local_idtag2 = gpr_replica_enter_notify_request(sender, end_idtag, NULL, NULL);
/* do registration */
response = (int32_t)gpr_replica_rte_register_nl(contact_info, buffer,
num_procs, local_idtag1, local_idtag2);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
/****** UNLOCK ******/
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
}
if (OMPI_SUCCESS != ompi_pack(answer, &response, 1, OMPI_INT32)) {
goto RETURN_ERROR;
}
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
/***** UNREGISTER *****/
} else if (MCA_GPR_RTE_UNREGISTER_CMD == command) {
if (0 > ompi_unpack_string(buffer, &proc_name)) {
goto RETURN_ERROR;
}
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
response = (int32_t)gpr_replica_rte_unregister_nl(proc_name);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
/****** UNLOCK ******/
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
}
@ -1078,9 +1077,6 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
} else if (MCA_GPR_TEST_INTERNALS_CMD == command) {
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
if ((OMPI_SUCCESS != ompi_unpack(buffer, &test_level, 1, OMPI_INT32)) ||
(0 > test_level)) {
goto RETURN_ERROR;
@ -1112,9 +1108,6 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
/****** UNLOCK ******/
/**** UNRECOGNIZED COMMAND ****/
} else { /* got an unrecognized command */