1
1

First attempt to thread safe the registry and name server subsystems. Comment out the duplicate calls to register processes in mpi_init and mpirun2.

This commit was SVN r2697.
Этот коммит содержится в:
Ralph Castain 2004-09-16 04:14:35 +00:00
родитель 3aa0b648e2
Коммит d0e308fbc4
12 изменённых файлов: 322 добавлений и 101 удалений

Просмотреть файл

@ -9,6 +9,8 @@
#include <string.h>
#include "threads/mutex.h"
#include "util/output.h"
#include "util/proc_info.h"
@ -408,6 +410,8 @@ int gpr_proxy_subscribe(ompi_registry_mode_t mode,
}
}
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
/* store callback function and user_tag in local list for lookup */
/* generate id_tag to send to replica to identify lookup entry */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
@ -450,6 +454,7 @@ int gpr_proxy_subscribe(ompi_registry_mode_t mode,
ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item);
ompi_buffer_free(answer);
ompi_buffer_free(cmd);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return OMPI_SUCCESS;
CLEANUP:
@ -458,6 +463,7 @@ int gpr_proxy_subscribe(ompi_registry_mode_t mode,
OBJ_RELEASE(trackptr);
}
ompi_buffer_free(cmd);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return OMPI_ERROR;
}
@ -552,6 +558,7 @@ int gpr_proxy_unsubscribe(ompi_registry_mode_t mode,
goto CLEANUP;
}
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
/* locate corresponding entry on proxy tracker list and remove it */
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) &&
@ -570,10 +577,12 @@ int gpr_proxy_unsubscribe(ompi_registry_mode_t mode,
OBJ_RELEASE(trackptr);
ompi_buffer_free(answer);
ompi_buffer_free(cmd);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return OMPI_SUCCESS;
CLEANUP:
ompi_buffer_free(cmd);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return OMPI_ERROR;
}
@ -655,6 +664,7 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
goto CLEANUP;
}
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
/* store callback function and user_tag in local list for lookup */
/* generate id_tag to send to replica to identify lookup entry */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
@ -697,6 +707,7 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item);
ompi_buffer_free(answer);
ompi_buffer_free(cmd);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return OMPI_SUCCESS;
CLEANUP:
@ -705,6 +716,7 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode,
OBJ_RELEASE(trackptr);
}
ompi_buffer_free(cmd);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return OMPI_ERROR;
}
@ -805,6 +817,7 @@ int gpr_proxy_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
goto CLEANUP;
}
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
/* locate corresponding entry on proxy tracker list and remove it */
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) &&
@ -823,10 +836,12 @@ int gpr_proxy_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
OBJ_RELEASE(trackptr);
ompi_buffer_free(answer);
ompi_buffer_free(cmd);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return OMPI_SUCCESS;
CLEANUP:
ompi_buffer_free(cmd);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return OMPI_ERROR;
}

Просмотреть файл

@ -10,6 +10,9 @@
#include "ompi_config.h"
#include "include/types.h"
#include "include/constants.h"
#include "threads/mutex.h"
#include "class/ompi_list.h"
#include "mca/gpr/base/base.h"
@ -40,6 +43,7 @@ extern ompi_list_t mca_gpr_proxy_notify_request_tracker;
extern mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag;
extern ompi_list_t mca_gpr_proxy_free_notify_id_tags;
extern int mca_gpr_proxy_debug;
extern ompi_mutex_t mca_gpr_proxy_mutex;
/*
* Implementation of delete_segment().

Просмотреть файл

@ -14,6 +14,9 @@
#include "ompi_config.h"
#include "include/constants.h"
#include "threads/mutex.h"
#include "util/proc_info.h"
#include "util/output.h"
#include "mca/mca.h"
@ -74,6 +77,7 @@ ompi_list_t mca_gpr_proxy_notify_request_tracker;
mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag;
ompi_list_t mca_gpr_proxy_free_notify_id_tags;
int mca_gpr_proxy_debug;
ompi_mutex_t mca_gpr_proxy_mutex;
/*
@ -126,6 +130,9 @@ mca_gpr_base_module_t* mca_gpr_proxy_init(bool *allow_multi_user_threads, bool *
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/* setup thread lock */
OBJ_CONSTRUCT(&mca_gpr_proxy_mutex, ompi_mutex_t);
/* define the replica for us to use - get it from process_info */
mca_gpr_my_replica = ompi_name_server.copy_process_name(ompi_process_info.gpr_replica);
if (NULL == mca_gpr_my_replica) { /* can't function */
@ -246,6 +253,8 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender,
message->tokens = NULL;
}
OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex);
/* find the request corresponding to this notify */
found = false;
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker);
@ -259,6 +268,7 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender,
if (!found) { /* didn't find request */
ompi_output(0, "Proxy notification error - received request not found");
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
return;
}
@ -270,6 +280,8 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender,
RETURN_ERROR:
OBJ_RELEASE(message);
OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex);
/* reissue non-blocking receive */
mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR_NOTIFY, 0, mca_gpr_proxy_notify_recv, NULL);

Просмотреть файл

@ -20,6 +20,9 @@
#include <libgen.h>
#include "include/constants.h"
#include "threads/mutex.h"
#include "util/output.h"
#include "util/proc_info.h"
#include "mca/gpr/base/base.h"
@ -31,6 +34,8 @@ int gpr_replica_delete_segment(char *segment)
{
mca_gpr_replica_segment_t *seg;
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica: delete_segment entered", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
@ -45,9 +50,11 @@ int gpr_replica_delete_segment(char *segment)
OBJ_RELEASE(seg);
if (OMPI_SUCCESS != gpr_replica_delete_key(segment, NULL)) { /* couldn't remove dictionary entry */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_ERROR;
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_SUCCESS;
}
@ -87,6 +94,8 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
*/
put_mode = addr_mode & OMPI_REGISTRY_OVERWRITE;
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* find the segment */
seg = gpr_replica_find_seg(true, segment);
if (NULL == seg) { /* couldn't find segment or create it */
@ -155,8 +164,8 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
trig->count++;
}
if (((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & trig->synch_mode)
&& (trig->count >= trig->trigger)
&& (MCA_GPR_REPLICA_TRIGGER_BELOW_LEVEL == trig->above_below)) ||
&& (trig->count >= trig->trigger)
&& (MCA_GPR_REPLICA_TRIGGER_BELOW_LEVEL == trig->above_below)) ||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger) ||
(OMPI_REGISTRY_SYNCHRO_MODE_GT_EQUAL & trig->synch_mode && trig->count >= trig->trigger)) {
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens);
@ -164,8 +173,8 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
notify_msg->trig_synchro = trig->synch_mode;
gpr_replica_process_triggers(segment, trig, notify_msg);
} else if ((OMPI_REGISTRY_NOTIFY_ALL & trig->action) ||
(OMPI_REGISTRY_NOTIFY_ADD_ENTRY & trig->action) ||
(OMPI_REGISTRY_NOTIFY_MODIFICATION & trig->action && OMPI_REGISTRY_OVERWRITE & put_mode)) {
(OMPI_REGISTRY_NOTIFY_ADD_ENTRY & trig->action) ||
(OMPI_REGISTRY_NOTIFY_MODIFICATION & trig->action && OMPI_REGISTRY_OVERWRITE & put_mode)) {
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens);
notify_msg->trig_action = trig->action;
notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
@ -189,6 +198,8 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return return_code;
}
@ -218,9 +229,12 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
return OMPI_ERROR;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* find the specified segment */
seg = gpr_replica_find_seg(false, segment);
if (NULL == seg) { /* segment not found */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_ERROR;
}
@ -271,15 +285,15 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
trig->count--;
}
if (((OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING & trig->synch_mode)
&& (trig->count <= trig->trigger)
&& (MCA_GPR_REPLICA_TRIGGER_ABOVE_LEVEL == trig->above_below)) ||
&& (trig->count <= trig->trigger)
&& (MCA_GPR_REPLICA_TRIGGER_ABOVE_LEVEL == trig->above_below)) ||
(OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger)) {
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens);
notify_msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE;
notify_msg->trig_synchro = trig->synch_mode;
gpr_replica_process_triggers(segment, trig, notify_msg);
} else if ((OMPI_REGISTRY_NOTIFY_ALL & trig->action) ||
(OMPI_REGISTRY_NOTIFY_DELETE_ENTRY & trig->action)) {
(OMPI_REGISTRY_NOTIFY_DELETE_ENTRY & trig->action)) {
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens);
notify_msg->trig_action = trig->action;
notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
@ -301,6 +315,9 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
if (NULL != keys) {
free(keys);
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return return_code;
}
@ -316,6 +333,8 @@ ompi_list_t* gpr_replica_index(char *segment)
ompi_process_info.name->jobid, ompi_process_info.name->vpid, segment);
}
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
answer = OBJ_NEW(ompi_list_t);
if (NULL == segment) { /* looking for index of global registry */
@ -330,6 +349,7 @@ ompi_list_t* gpr_replica_index(char *segment)
/* find the specified segment */
seg = gpr_replica_find_seg(false, segment);
if (NULL == seg) { /* segment not found */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return answer;
}
/* got segment - now index that dictionary */
@ -343,6 +363,7 @@ ompi_list_t* gpr_replica_index(char *segment)
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return answer;
}
@ -367,6 +388,8 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
return OMPI_ERROR;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
trackptr->requestor = NULL;
@ -393,9 +416,11 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE;
gpr_replica_process_triggers(segment, trig, notify_msg);
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_SUCCESS;
} else {
OBJ_RELEASE(trackptr);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_ERROR;
}
}
@ -418,9 +443,12 @@ int gpr_replica_unsubscribe(ompi_registry_mode_t addr_mode,
return OMPI_ERROR;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* find trigger on replica - return id_tag */
if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
addr_mode, segment, tokens, 0))) {
addr_mode, segment, tokens, 0))) {
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_ERROR;
}
@ -435,10 +463,12 @@ int gpr_replica_unsubscribe(ompi_registry_mode_t addr_mode,
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
OBJ_RELEASE(trackptr);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_SUCCESS;
}
/* if we get here, then couldn't find request */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_ERROR;
}
@ -464,6 +494,7 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
return OMPI_ERROR;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
trackptr->requestor = NULL;
@ -493,8 +524,10 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
notify_msg->trig_synchro = trig->synch_mode;
gpr_replica_process_triggers(segment, trig, notify_msg);
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_SUCCESS;
} else {
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
OBJ_RELEASE(trackptr);
return OMPI_ERROR;
}
@ -518,9 +551,11 @@ int gpr_replica_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
return OMPI_ERROR;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* find trigger on replica - return id_tag */
if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
addr_mode, segment, tokens, trigger))) {
addr_mode, segment, tokens, trigger))) {
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_ERROR;
}
@ -535,10 +570,12 @@ int gpr_replica_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
OBJ_RELEASE(trackptr);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_SUCCESS;
}
/* if we get here, then couldn't find request */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return OMPI_ERROR;
}
@ -566,9 +603,11 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
return answer;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
/* find the specified segment */
seg = gpr_replica_find_seg(false, segment);
if (NULL == seg) { /* segment not found */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return answer;
}
if (mca_gpr_replica_debug) {
@ -577,13 +616,14 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
}
if (NULL == tokens) { /* wildcard case - return everything */
keylist = NULL;
keylist = NULL;
keys = NULL;
} else {
/* convert tokens to list of keys */
keylist = gpr_replica_get_key_list(segment, tokens);
if (0 == (num_tokens = ompi_list_get_size(keylist))) {
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return answer;
}
@ -640,6 +680,7 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return answer;
}

Просмотреть файл

@ -180,6 +180,7 @@ extern ompi_list_t mca_gpr_replica_notify_request_tracker;
extern mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
extern ompi_list_t mca_gpr_replica_free_notify_id_tags;
extern int mca_gpr_replica_debug;
extern ompi_mutex_t mca_gpr_replica_mutex, mca_gpr_replica_internals_mutex;
/*
* Module open / close

Просмотреть файл

@ -16,6 +16,9 @@
#include <time.h>
#include "include/constants.h"
#include "threads/mutex.h"
#include "util/proc_info.h"
#include "util/output.h"
#include "util/pack.h"
@ -79,6 +82,8 @@ ompi_list_t mca_gpr_replica_notify_request_tracker;
mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
ompi_list_t mca_gpr_replica_free_notify_id_tags;
int mca_gpr_replica_debug;
ompi_mutex_t mca_gpr_replica_component_mutex;
ompi_mutex_t mca_gpr_replica_mutex, mca_gpr_replica_internals_mutex;
/* constructor - used to initialize state of keytable instance */
@ -324,6 +329,11 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/* setup the thread locks */
OBJ_CONSTRUCT(&mca_gpr_replica_component_mutex, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_gpr_replica_internals_mutex, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_gpr_replica_mutex, ompi_mutex_t);
/* initialize the registry head */
OBJ_CONSTRUCT(&mca_gpr_replica_head.registry, ompi_list_t);
@ -350,10 +360,10 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
}
/* issue the non-blocking receive */
rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);
if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) {
return NULL;
}
rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);
if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) {
return NULL;
}
if (mca_gpr_replica_debug) {
ompi_output(0, "nb receive setup");
@ -377,44 +387,44 @@ int mca_gpr_replica_finalize(void)
ompi_output(0, "finalizing gpr replica");
}
/* mca_gpr_replica_segment_t *seg; */
/* mca_gpr_replica_keytable_t *kt; */
/* mca_gpr_replica_keylist_t *kl; */
/* mca_gpr_notify_request_tracker_t *tk; */
/* mca_gpr_idtag_list_t *id; */
/* mca_gpr_replica_segment_t *seg; */
/* mca_gpr_replica_keytable_t *kt; */
/* mca_gpr_replica_keylist_t *kl; */
/* mca_gpr_notify_request_tracker_t *tk; */
/* mca_gpr_idtag_list_t *id; */
/* /\* free all storage, but only if this component was initialized *\/ */
/* /\* free all storage, but only if this component was initialized *\/ */
/* if (initialized) { */
/* if (initialized) { */
/* while (NULL != (seg = (mca_gpr_replica_segment_t*)ompi_list_remove_first(&mca_gpr_replica_head.registry))) { */
/* OBJ_RELEASE(seg); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_head.registry); */
/* while (NULL != (seg = (mca_gpr_replica_segment_t*)ompi_list_remove_first(&mca_gpr_replica_head.registry))) { */
/* OBJ_RELEASE(seg); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_head.registry); */
/* while (NULL != (kt = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.segment_dict))) { */
/* OBJ_RELEASE(kt); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_head.segment_dict); */
/* while (NULL != (kt = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.segment_dict))) { */
/* OBJ_RELEASE(kt); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_head.segment_dict); */
/* while (NULL != (kl = (mca_gpr_replica_keylist_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys))) { */
/* OBJ_RELEASE(kl); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_head.freekeys); */
/* while (NULL != (kl = (mca_gpr_replica_keylist_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys))) { */
/* OBJ_RELEASE(kl); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_head.freekeys); */
/* while (NULL != (tk = (mca_gpr_notify_request_tracker_t*)ompi_list_remove_first(&mca_gpr_replica_notify_request_tracker))) { */
/* OBJ_RELEASE(tk); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_notify_request_tracker); */
/* while (NULL != (tk = (mca_gpr_notify_request_tracker_t*)ompi_list_remove_first(&mca_gpr_replica_notify_request_tracker))) { */
/* OBJ_RELEASE(tk); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_notify_request_tracker); */
/* while (NULL != (id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags))) { */
/* OBJ_RELEASE(id); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_free_notify_id_tags); */
/* initialized = false; */
/* } */
/* while (NULL != (id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags))) { */
/* OBJ_RELEASE(id); */
/* } */
/* OBJ_DESTRUCT(&mca_gpr_replica_free_notify_id_tags); */
/* initialized = false; */
/* } */
/* All done */
@ -733,6 +743,9 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
trackptr->requestor = ompi_name_server.copy_process_name(sender);
@ -748,6 +761,9 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
}
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
/****** UNLOCK ******/
response = (int32_t)gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
mode, segment, tokens,
0, trackptr->id_tag);
@ -806,6 +822,11 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
segment, tokens, 0);
if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
/* find request on replica notify tracking system */
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
@ -828,6 +849,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
ompi_list_append(&mca_gpr_replica_free_notify_id_tags, &ptr_free_id->item);
/* release tracker item */
OBJ_RELEASE(trackptr);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
/****** UNLOCK ******/
}
} else {
response = (int32_t)MCA_GPR_NOTIFY_ID_MAX;
@ -895,8 +920,20 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
goto RETURN_ERROR;
}
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
/* enter request on notify tracking system */
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
if (mca_gpr_replica_debug) {
if (NULL != sender) {
ompi_output(0, "gpr_replica_recv: received synchro req from [%d,%d,%d]", sender->cellid,
sender->jobid, sender->vpid);
} else {
ompi_output(0, "gpr_replica_recv: received synchro req from NULL");
}
}
trackptr->requestor = ompi_name_server.copy_process_name(sender);
trackptr->req_tag = id_tag;
trackptr->callback = NULL;
@ -910,13 +947,16 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
}
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
/****** UNLOCK ******/
if(NULL != gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
mode, segment, tokens,
trigger, trackptr->id_tag)) {
response = OMPI_SUCCESS;
} else {
response = OMPI_ERROR;
}
mode, segment, tokens,
trigger, trackptr->id_tag)) {
response = OMPI_SUCCESS;
} else {
response = OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) {
goto RETURN_ERROR;
@ -979,6 +1019,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
segment, tokens, trigger);
if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
/* find request on replica notify tracking system */
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
@ -1001,6 +1045,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
ompi_list_append(&mca_gpr_replica_free_notify_id_tags, &ptr_free_id->item);
/* release tracker item */
OBJ_RELEASE(trackptr);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
/****** UNLOCK ******/
}
} else {
response = (int32_t)MCA_GPR_NOTIFY_ID_MAX;
@ -1022,6 +1070,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/***** TEST INTERNALS *****/
} else if (MCA_GPR_TEST_INTERNALS_CMD == command) {
/******* LOCK *****/
OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex);
if ((OMPI_SUCCESS != ompi_unpack(buffer, &test_level, 1, OMPI_INT32)) ||
(0 > test_level)) {
goto RETURN_ERROR;
@ -1053,8 +1105,11 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
if (0 > mca_oob_send_packed(sender, answer, tag, 0)) {
/* RHC -- not sure what to do if the return send fails */
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex);
/****** UNLOCK ******/
/**** UNRECOGNIZED ****/
/**** UNRECOGNIZED COMMAND ****/
} else { /* got an unrecognized command */
RETURN_ERROR:
ompi_buffer_init(&error_answer, 8);
@ -1123,8 +1178,8 @@ void gpr_replica_remote_notify(ompi_process_name_t *recipient, int recipient_tag
if (OMPI_SUCCESS != ompi_pack(msg, regval->object, regval->object_size, OMPI_BYTE)) {
return;
}
/* TSW - should we add */
/* OBJ_RELEASE(regval); */
/* TSW - should we add */
/* OBJ_RELEASE(regval); */
}
}
if (OMPI_SUCCESS != ompi_pack(msg, &message->num_tokens, 1, OMPI_INT32)) {

Просмотреть файл

@ -20,6 +20,9 @@
#include <libgen.h>
#include "include/constants.h"
#include "threads/mutex.h"
#include "util/output.h"
#include "util/printf.h"
#include "util/proc_info.h"
@ -29,6 +32,7 @@
#include "gpr_replica.h"
#include "gpr_replica_internals.h"
/*
*
*/
@ -38,17 +42,22 @@ mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment)
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_key_t key;
key = gpr_replica_define_key(segment, NULL);
if (MCA_GPR_REPLICA_KEY_MAX == key) { /* got some kind of error code */
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
key = gpr_replica_define_key(segment, NULL);
if (MCA_GPR_REPLICA_KEY_MAX == key) { /* got some kind of error code */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return NULL;
}
}
/* need to add the segment to the registry */
seg = OBJ_NEW(mca_gpr_replica_segment_t);
seg->segment = key;
ompi_list_append(&mca_gpr_replica_head.registry, &seg->item);
/* need to add the segment to the registry */
seg = OBJ_NEW(mca_gpr_replica_segment_t);
seg->segment = key;
ompi_list_append(&mca_gpr_replica_head.registry, &seg->item);
return seg;
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return seg;
}
@ -57,6 +66,8 @@ mca_gpr_replica_segment_t *gpr_replica_find_seg(bool create, char *segment)
mca_gpr_replica_keytable_t *ptr_seg;
mca_gpr_replica_segment_t *seg;
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
/* search the registry segments to find which one is being referenced */
for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
@ -67,14 +78,18 @@ mca_gpr_replica_segment_t *gpr_replica_find_seg(bool create, char *segment)
seg != (mca_gpr_replica_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry);
seg = (mca_gpr_replica_segment_t*)ompi_list_get_next(seg)) {
if(seg->segment == ptr_seg->key) {
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(seg);
}
}
}
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
if (create) {
/* didn't find the dictionary entry - create it */
return gpr_replica_define_segment(segment);
/* didn't find the dictionary entry - create it */
return gpr_replica_define_segment(segment);
}
return NULL; /* don't create it - just return NULL */
}
@ -85,12 +100,15 @@ mca_gpr_replica_keytable_t *gpr_replica_find_dict_entry(char *segment, char *tok
mca_gpr_replica_keytable_t *ptr_key;
mca_gpr_replica_segment_t *seg;
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
/* search the registry segments to find which one is being referenced */
for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) {
if (0 == strcmp(segment, ptr_seg->token)) {
if (NULL == token) { /* just want segment token-key pair */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(ptr_seg);
}
/* search registry to find segment */
@ -103,15 +121,19 @@ mca_gpr_replica_keytable_t *gpr_replica_find_dict_entry(char *segment, char *tok
ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) {
if (0 == strcmp(token, ptr_key->token)) {
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(ptr_key);
}
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(NULL); /* couldn't find the specified entry */
}
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(NULL); /* couldn't find segment, even though we found entry in registry dict */
}
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(NULL); /* couldn't find segment token-key pair */
}
@ -152,15 +174,19 @@ char *gpr_replica_get_token(char *segment, mca_gpr_replica_key_t key)
return NULL;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
/* find the matching key */
for (ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&seg->keytable);
ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) {
if (key == ptr_key->key) {
answer = strdup(ptr_key->token);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return answer;
}
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(NULL); /* couldn't find the specified entry */
}
@ -200,6 +226,9 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
/* if token is NULL, then this is defining a segment name. Check dictionary to ensure uniqueness */
if (NULL == token) {
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict);
ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict);
ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) {
@ -213,9 +242,10 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
new->token = strdup(segment);
if (0 == ompi_list_get_size(&mca_gpr_replica_head.freekeys)) { /* no keys waiting for reuse */
if (MCA_GPR_REPLICA_KEY_MAX-2 > mca_gpr_replica_head.lastkey) { /* have a key left */
mca_gpr_replica_head.lastkey++;
new->key = mca_gpr_replica_head.lastkey;
mca_gpr_replica_head.lastkey++;
new->key = mca_gpr_replica_head.lastkey;
} else { /* out of keys */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return MCA_GPR_REPLICA_KEY_MAX;
}
} else {
@ -223,8 +253,10 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
new->key = ptr_key->key;
}
ompi_list_append(&mca_gpr_replica_head.segment_dict, &new->item);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return new->key;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
/* okay, token is specified */
/* search the registry segments to find which one is being referenced */
@ -235,6 +267,7 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable);
ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) {
if (0 == strcmp(token, ptr_key->token)) {
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return ptr_key->key; /* already taken, report value */
}
}
@ -249,6 +282,7 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token)
new->key = ptr_key->key;
}
ompi_list_append(&seg->keytable, &new->item);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return new->key;
}
/* couldn't find segment */
@ -268,6 +302,8 @@ int gpr_replica_delete_key(char *segment, char *token)
return(OMPI_ERROR);
}
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
/* find the segment */
seg = gpr_replica_find_seg(false, segment);
if (NULL != seg) {
@ -275,13 +311,16 @@ int gpr_replica_delete_key(char *segment, char *token)
/* if specified token is NULL, then this is deleting a segment name.*/
if (NULL == token) {
if (OMPI_SUCCESS != gpr_replica_empty_segment(seg)) { /* couldn't empty segment */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return OMPI_ERROR;
}
/* now remove the dictionary entry from the global registry dictionary*/
ptr_seg = gpr_replica_find_dict_entry(segment, NULL);
if (NULL == ptr_seg) { /* failed to find dictionary entry */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return OMPI_ERROR;
}
/* add key to global registry's freekey list */
new = OBJ_NEW(mca_gpr_replica_keytable_t);
new->token = NULL;
@ -290,6 +329,9 @@ int gpr_replica_delete_key(char *segment, char *token)
/* remove the dictionary entry */
ompi_list_remove_item(&mca_gpr_replica_head.segment_dict, &ptr_seg->item);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(OMPI_SUCCESS);
} else { /* token not null, so need to find dictionary element to delete */
@ -316,12 +358,15 @@ int gpr_replica_delete_key(char *segment, char *token)
/* now remove the dictionary entry from the segment's dictionary */
ompi_list_remove_item(&seg->keytable, &ptr_key->item);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(OMPI_SUCCESS);
}
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(OMPI_ERROR); /* if we get here, then we couldn't find token in dictionary */
}
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return(OMPI_ERROR); /* if we get here, then we couldn't find segment */
}
@ -333,6 +378,8 @@ int gpr_replica_empty_segment(mca_gpr_replica_segment_t *seg)
/* need to free memory from each entry - remove_last returns pointer to the entry */
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
/* empty the segment's registry */
while (!ompi_list_is_empty(&seg->registry_entries)) {
ptr = (mca_gpr_replica_core_t*)ompi_list_remove_first(&seg->registry_entries);
@ -354,6 +401,8 @@ int gpr_replica_empty_segment(mca_gpr_replica_segment_t *seg)
ompi_list_remove_item(&mca_gpr_replica_head.registry, &seg->item);
OBJ_RELEASE(seg);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return OMPI_SUCCESS;
}
@ -437,11 +486,12 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t addr_mode,
return false;
}
mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_notify_action_t action,
ompi_registry_mode_t addr_mode,
char *segment, char **tokens, int trigger,
mca_gpr_notify_id_t id_tag)
mca_gpr_replica_trigger_list_t*
gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode,
ompi_registry_notify_action_t action,
ompi_registry_mode_t addr_mode,
char *segment, char **tokens, int trigger,
mca_gpr_notify_id_t id_tag)
{
mca_gpr_replica_segment_t *seg;
mca_gpr_replica_core_t *reg;
@ -450,8 +500,11 @@ mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_sync
mca_gpr_replica_key_t *keyptr;
int i, num_tokens;
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
seg = gpr_replica_find_seg(true, segment);
if (NULL == seg) { /* couldn't find or create segment */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return NULL;
}
@ -519,6 +572,8 @@ mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_sync
ompi_list_append(&seg->triggers, &trig->item);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return trig;
}
@ -536,8 +591,11 @@ mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t sync
int i=0, num_tokens=0;
bool found=false, mismatch=false;
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
seg = gpr_replica_find_seg(false, segment);
if (NULL == seg) { /* couldn't find segment */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return MCA_GPR_NOTIFY_ID_MAX;
}
@ -565,6 +623,7 @@ mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t sync
}
}
/* search segment's trigger list for specified trigger event */
for (trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_first(&seg->triggers);
trig != (mca_gpr_replica_trigger_list_t*)ompi_list_get_end(&seg->triggers) && !found;
@ -595,9 +654,12 @@ mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t sync
id_tag = trig->id_tag;
ompi_list_remove_item(&seg->triggers, &trig->item);
OBJ_RELEASE(trig);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return id_tag;
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return MCA_GPR_NOTIFY_ID_MAX;
}
@ -684,6 +746,8 @@ void gpr_replica_process_triggers(char *segment,
return;
}
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
seg = gpr_replica_find_seg(false, segment);
if (NULL == seg) { /* couldn't find segment */
return;
@ -694,28 +758,30 @@ void gpr_replica_process_triggers(char *segment,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
/* find corresponding notify request */
found = false;
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker);
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker);
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) {
if (trackptr->id_tag == trig->id_tag) {
found = true;
break;
break;
}
}
if (!found) { /* didn't find request */
ompi_output(0, "Notification error - request not found");
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return;
}
/* process request */
if (NULL == trackptr->requestor) { /* local request - callback fn with their tag */
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: local callback", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: local callback", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
trackptr->callback(message, trackptr->user_tag);
/* dismantle message and free memory */
while (NULL != (data = (ompi_registry_object_t*)ompi_list_remove_first(&message->data))) {
@ -725,38 +791,40 @@ void gpr_replica_process_triggers(char *segment,
for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) {
free(*tokptr);
}
if(NULL != message->tokens) {
if(NULL != message->tokens) {
free(message->tokens);
}
}
free(message);
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: data released", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: data released", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
} else { /* remote request - send message back */
gpr_replica_remote_notify(trackptr->requestor, trackptr->req_tag, message);
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: remote message sent", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: remote message sent", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
}
/* if one-shot, remove request from tracking system */
if (OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT & trig->synch_mode) {
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
OBJ_RELEASE(trackptr);
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
OBJ_RELEASE(trackptr);
/* ....and from the corresponding registry segment */
ompi_list_remove_item(&seg->triggers, &trig->item);
OBJ_RELEASE(trig);
/* ....and from the corresponding registry segment */
ompi_list_remove_item(&seg->triggers, &trig->item);
OBJ_RELEASE(trig);
}
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: complete", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
}
ompi_list_t *gpr_replica_test_internals(int level)
@ -771,6 +839,8 @@ ompi_list_t *gpr_replica_test_internals(int level)
mca_gpr_replica_keytable_t *dict_entry;
bool success;
OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex);
test_results = OBJ_NEW(ompi_list_t);
ompi_output(0, "testing define segment");
@ -963,5 +1033,7 @@ ompi_list_t *gpr_replica_test_internals(int level)
/* check ability to empty segment */
OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex);
return test_results;
}

Просмотреть файл

@ -6,6 +6,8 @@
*/
#include <stdio.h>
#include "threads/mutex.h"
#include "ompi_config.h"
#include "util/output.h"
#include "mca/mca.h"
@ -22,10 +24,14 @@
mca_ns_base_cellid_t ns_replica_create_cellid(void)
{
OMPI_THREAD_LOCK(&mca_ns_replica_mutex);
if ((MCA_NS_BASE_CELLID_MAX-2) >= mca_ns_replica_last_used_cellid) {
mca_ns_replica_last_used_cellid = mca_ns_replica_last_used_cellid + 1;
OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex);
return(mca_ns_replica_last_used_cellid);
} else {
OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex);
return MCA_NS_BASE_CELLID_MAX;
}
}
@ -34,14 +40,18 @@ mca_ns_base_jobid_t ns_replica_create_jobid(void)
{
mca_ns_replica_name_tracker_t *new;
OMPI_THREAD_LOCK(&mca_ns_replica_mutex);
if ((MCA_NS_BASE_JOBID_MAX-2) >= mca_ns_replica_last_used_jobid) {
mca_ns_replica_last_used_jobid = mca_ns_replica_last_used_jobid + 1;
new = OBJ_NEW(mca_ns_replica_name_tracker_t);
new->job = mca_ns_replica_last_used_jobid;
new->last_used_vpid = 0;
ompi_list_append(&mca_ns_replica_name_tracker, &new->item);
OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex);
return(mca_ns_replica_last_used_jobid);
} else {
OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex);
return MCA_NS_BASE_JOBID_MAX;
}
}
@ -52,17 +62,24 @@ mca_ns_base_vpid_t ns_replica_reserve_range(mca_ns_base_jobid_t job, mca_ns_base
mca_ns_replica_name_tracker_t *ptr;
mca_ns_base_vpid_t start;
OMPI_THREAD_LOCK(&mca_ns_replica_mutex);
for (ptr = (mca_ns_replica_name_tracker_t*)ompi_list_get_first(&mca_ns_replica_name_tracker);
ptr != (mca_ns_replica_name_tracker_t*)ompi_list_get_end(&mca_ns_replica_name_tracker);
ptr = (mca_ns_replica_name_tracker_t*)ompi_list_get_next(ptr)) {
if (job == ptr->job) { /* found the specified job */
if ((MCA_NS_BASE_VPID_MAX-range-2) >= ptr->last_used_vpid) { /* requested range available */
start = ptr->last_used_vpid + 1;
ptr->last_used_vpid = ptr->last_used_vpid + range;
start = ptr->last_used_vpid;
if (0 == job && start == 0) { /* vpid=0 reserved for job=0 */
start = 1;
}
ptr->last_used_vpid = start + range;
OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex);
return(start);
}
}
}
OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex);
return MCA_NS_BASE_VPID_MAX;
}

Просмотреть файл

@ -9,6 +9,7 @@
#include "ompi_config.h"
#include "include/types.h"
#include "include/constants.h"
#include "threads/mutex.h"
#include "class/ompi_list.h"
#include "mca/oob/oob.h"
#include "mca/ns/ns.h"
@ -34,6 +35,7 @@ extern mca_ns_base_cellid_t mca_ns_replica_last_used_cellid;
extern mca_ns_base_jobid_t mca_ns_replica_last_used_jobid;
extern ompi_list_t mca_ns_replica_name_tracker;
extern int mca_ns_replica_debug;
extern ompi_mutex_t mca_ns_replica_mutex;
/*
* Module open / close

Просмотреть файл

@ -18,6 +18,7 @@
#include "ompi_config.h"
#include "include/constants.h"
#include "threads/mutex.h"
#include "util/proc_info.h"
#include "util/output.h"
#include "mca/mca.h"
@ -102,6 +103,7 @@ mca_ns_base_cellid_t mca_ns_replica_last_used_cellid;
mca_ns_base_jobid_t mca_ns_replica_last_used_jobid;
ompi_list_t mca_ns_replica_name_tracker;
int mca_ns_replica_debug;
ompi_mutex_t mca_ns_replica_mutex;
/*
* don't really need this function - could just put NULL in the above structure

Просмотреть файл

@ -156,10 +156,10 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
/*
* Register my process info with my replica.
*/
if (OMPI_SUCCESS != (ret = ompi_rte_register())) {
error = "ompi_rte_init: failed in ompi_rte_register()\n";
goto error;
}
/* if (OMPI_SUCCESS != (ret = ompi_rte_register())) { */
/* error = "ompi_rte_init: failed in ompi_rte_register()\n"; */
/* goto error; */
/* } */
/* finalize the rte startup */

Просмотреть файл

@ -222,10 +222,10 @@ main(int argc, char *argv[])
/*
* Register my process info with my replica.
*/
if (OMPI_SUCCESS != (ret = ompi_rte_register())) {
ompi_output(0, "ompi_rte_init: failed in ompi_rte_register()\n");
return ret;
}
/* if (OMPI_SUCCESS != (ret = ompi_rte_register())) { */
/* ompi_output(0, "ompi_rte_init: failed in ompi_rte_register()\n"); */
/* return ret; */
/* } */
/* finalize the rte startup */
if (OMPI_SUCCESS != (ret = ompi_rte_init_finalstage(&multi_thread,