diff --git a/src/mca/gpr/proxy/gpr_proxy.c b/src/mca/gpr/proxy/gpr_proxy.c index ce960f7c84..68f88b012e 100644 --- a/src/mca/gpr/proxy/gpr_proxy.c +++ b/src/mca/gpr/proxy/gpr_proxy.c @@ -9,6 +9,8 @@ #include +#include "threads/mutex.h" + #include "util/output.h" #include "util/proc_info.h" @@ -408,6 +410,8 @@ int gpr_proxy_subscribe(ompi_registry_mode_t mode, } } + OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex); + /* store callback function and user_tag in local list for lookup */ /* generate id_tag to send to replica to identify lookup entry */ trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); @@ -450,6 +454,7 @@ int gpr_proxy_subscribe(ompi_registry_mode_t mode, ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item); ompi_buffer_free(answer); ompi_buffer_free(cmd); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return OMPI_SUCCESS; CLEANUP: @@ -458,6 +463,7 @@ int gpr_proxy_subscribe(ompi_registry_mode_t mode, OBJ_RELEASE(trackptr); } ompi_buffer_free(cmd); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return OMPI_ERROR; } @@ -552,6 +558,7 @@ int gpr_proxy_unsubscribe(ompi_registry_mode_t mode, goto CLEANUP; } + OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex); /* locate corresponding entry on proxy tracker list and remove it */ for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker); trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) && @@ -570,10 +577,12 @@ int gpr_proxy_unsubscribe(ompi_registry_mode_t mode, OBJ_RELEASE(trackptr); ompi_buffer_free(answer); ompi_buffer_free(cmd); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return OMPI_SUCCESS; CLEANUP: ompi_buffer_free(cmd); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return OMPI_ERROR; } @@ -655,6 +664,7 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode, goto CLEANUP; } + OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex); /* store callback function and user_tag in local list for lookup */ /* generate id_tag to send to replica to identify lookup entry */ trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); @@ -697,6 +707,7 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode, ompi_list_append(&mca_gpr_proxy_notify_request_tracker, &trackptr->item); ompi_buffer_free(answer); ompi_buffer_free(cmd); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return OMPI_SUCCESS; CLEANUP: @@ -705,6 +716,7 @@ int gpr_proxy_synchro(ompi_registry_synchro_mode_t synchro_mode, OBJ_RELEASE(trackptr); } ompi_buffer_free(cmd); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return OMPI_ERROR; } @@ -805,6 +817,7 @@ int gpr_proxy_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode, goto CLEANUP; } + OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex); /* locate corresponding entry on proxy tracker list and remove it */ for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker); trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_proxy_notify_request_tracker) && @@ -823,10 +836,12 @@ int gpr_proxy_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode, OBJ_RELEASE(trackptr); ompi_buffer_free(answer); ompi_buffer_free(cmd); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return OMPI_SUCCESS; CLEANUP: ompi_buffer_free(cmd); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return OMPI_ERROR; } diff --git a/src/mca/gpr/proxy/gpr_proxy.h b/src/mca/gpr/proxy/gpr_proxy.h index 620dc48505..b7edff9580 100644 --- a/src/mca/gpr/proxy/gpr_proxy.h +++ b/src/mca/gpr/proxy/gpr_proxy.h @@ -10,6 +10,9 @@ #include "ompi_config.h" #include "include/types.h" #include "include/constants.h" + +#include "threads/mutex.h" + #include "class/ompi_list.h" #include "mca/gpr/base/base.h" @@ -40,6 +43,7 @@ extern ompi_list_t mca_gpr_proxy_notify_request_tracker; extern mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag; extern ompi_list_t mca_gpr_proxy_free_notify_id_tags; extern int mca_gpr_proxy_debug; +extern ompi_mutex_t mca_gpr_proxy_mutex; /* * Implementation of delete_segment(). diff --git a/src/mca/gpr/proxy/gpr_proxy_component.c b/src/mca/gpr/proxy/gpr_proxy_component.c index bf4b789f1c..8bb659e68d 100644 --- a/src/mca/gpr/proxy/gpr_proxy_component.c +++ b/src/mca/gpr/proxy/gpr_proxy_component.c @@ -14,6 +14,9 @@ #include "ompi_config.h" #include "include/constants.h" + +#include "threads/mutex.h" + #include "util/proc_info.h" #include "util/output.h" #include "mca/mca.h" @@ -74,6 +77,7 @@ ompi_list_t mca_gpr_proxy_notify_request_tracker; mca_gpr_notify_id_t mca_gpr_proxy_last_notify_id_tag; ompi_list_t mca_gpr_proxy_free_notify_id_tags; int mca_gpr_proxy_debug; +ompi_mutex_t mca_gpr_proxy_mutex; /* @@ -126,6 +130,9 @@ mca_gpr_base_module_t* mca_gpr_proxy_init(bool *allow_multi_user_threads, bool * *allow_multi_user_threads = true; *have_hidden_threads = false; + /* setup thread lock */ + OBJ_CONSTRUCT(&mca_gpr_proxy_mutex, ompi_mutex_t); + /* define the replica for us to use - get it from process_info */ mca_gpr_my_replica = ompi_name_server.copy_process_name(ompi_process_info.gpr_replica); if (NULL == mca_gpr_my_replica) { /* can't function */ @@ -246,6 +253,8 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender, message->tokens = NULL; } + OMPI_THREAD_LOCK(&mca_gpr_proxy_mutex); + /* find the request corresponding to this notify */ found = false; for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_proxy_notify_request_tracker); @@ -259,6 +268,7 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender, if (!found) { /* didn't find request */ ompi_output(0, "Proxy notification error - received request not found"); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); return; } @@ -270,6 +280,8 @@ void mca_gpr_proxy_notify_recv(int status, ompi_process_name_t* sender, RETURN_ERROR: OBJ_RELEASE(message); + OMPI_THREAD_UNLOCK(&mca_gpr_proxy_mutex); + /* reissue non-blocking receive */ mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR_NOTIFY, 0, mca_gpr_proxy_notify_recv, NULL); diff --git a/src/mca/gpr/replica/gpr_replica.c b/src/mca/gpr/replica/gpr_replica.c index 3b102fc145..a1782f3da9 100644 --- a/src/mca/gpr/replica/gpr_replica.c +++ b/src/mca/gpr/replica/gpr_replica.c @@ -20,6 +20,9 @@ #include #include "include/constants.h" + +#include "threads/mutex.h" + #include "util/output.h" #include "util/proc_info.h" #include "mca/gpr/base/base.h" @@ -31,6 +34,8 @@ int gpr_replica_delete_segment(char *segment) { mca_gpr_replica_segment_t *seg; + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); + if (mca_gpr_replica_debug) { ompi_output(0, "[%d,%d,%d] gpr replica: delete_segment entered", ompi_process_info.name->cellid, ompi_process_info.name->jobid, ompi_process_info.name->vpid); @@ -45,9 +50,11 @@ int gpr_replica_delete_segment(char *segment) OBJ_RELEASE(seg); if (OMPI_SUCCESS != gpr_replica_delete_key(segment, NULL)) { /* couldn't remove dictionary entry */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_ERROR; } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_SUCCESS; } @@ -87,6 +94,8 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment, */ put_mode = addr_mode & OMPI_REGISTRY_OVERWRITE; + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); + /* find the segment */ seg = gpr_replica_find_seg(true, segment); if (NULL == seg) { /* couldn't find segment or create it */ @@ -155,8 +164,8 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment, trig->count++; } if (((OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING & trig->synch_mode) - && (trig->count >= trig->trigger) - && (MCA_GPR_REPLICA_TRIGGER_BELOW_LEVEL == trig->above_below)) || + && (trig->count >= trig->trigger) + && (MCA_GPR_REPLICA_TRIGGER_BELOW_LEVEL == trig->above_below)) || (OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger) || (OMPI_REGISTRY_SYNCHRO_MODE_GT_EQUAL & trig->synch_mode && trig->count >= trig->trigger)) { notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens); @@ -164,8 +173,8 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment, notify_msg->trig_synchro = trig->synch_mode; gpr_replica_process_triggers(segment, trig, notify_msg); } else if ((OMPI_REGISTRY_NOTIFY_ALL & trig->action) || - (OMPI_REGISTRY_NOTIFY_ADD_ENTRY & trig->action) || - (OMPI_REGISTRY_NOTIFY_MODIFICATION & trig->action && OMPI_REGISTRY_OVERWRITE & put_mode)) { + (OMPI_REGISTRY_NOTIFY_ADD_ENTRY & trig->action) || + (OMPI_REGISTRY_NOTIFY_MODIFICATION & trig->action && OMPI_REGISTRY_OVERWRITE & put_mode)) { notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens); notify_msg->trig_action = trig->action; notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE; @@ -189,6 +198,8 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment, ompi_process_info.name->jobid, ompi_process_info.name->vpid); } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); + return return_code; } @@ -218,9 +229,12 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode, return OMPI_ERROR; } + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); + /* find the specified segment */ seg = gpr_replica_find_seg(false, segment); if (NULL == seg) { /* segment not found */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_ERROR; } @@ -271,15 +285,15 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode, trig->count--; } if (((OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING & trig->synch_mode) - && (trig->count <= trig->trigger) - && (MCA_GPR_REPLICA_TRIGGER_ABOVE_LEVEL == trig->above_below)) || + && (trig->count <= trig->trigger) + && (MCA_GPR_REPLICA_TRIGGER_ABOVE_LEVEL == trig->above_below)) || (OMPI_REGISTRY_SYNCHRO_MODE_LEVEL & trig->synch_mode && trig->count == trig->trigger)) { notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens); notify_msg->trig_action = OMPI_REGISTRY_NOTIFY_NONE; notify_msg->trig_synchro = trig->synch_mode; gpr_replica_process_triggers(segment, trig, notify_msg); } else if ((OMPI_REGISTRY_NOTIFY_ALL & trig->action) || - (OMPI_REGISTRY_NOTIFY_DELETE_ENTRY & trig->action)) { + (OMPI_REGISTRY_NOTIFY_DELETE_ENTRY & trig->action)) { notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens); notify_msg->trig_action = trig->action; notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE; @@ -301,6 +315,9 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode, if (NULL != keys) { free(keys); } + + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); + return return_code; } @@ -316,6 +333,8 @@ ompi_list_t* gpr_replica_index(char *segment) ompi_process_info.name->jobid, ompi_process_info.name->vpid, segment); } + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); + answer = OBJ_NEW(ompi_list_t); if (NULL == segment) { /* looking for index of global registry */ @@ -330,6 +349,7 @@ ompi_list_t* gpr_replica_index(char *segment) /* find the specified segment */ seg = gpr_replica_find_seg(false, segment); if (NULL == seg) { /* segment not found */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return answer; } /* got segment - now index that dictionary */ @@ -343,6 +363,7 @@ ompi_list_t* gpr_replica_index(char *segment) } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return answer; } @@ -367,6 +388,8 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode, return OMPI_ERROR; } + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); + /* enter request on notify tracking system */ trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); trackptr->requestor = NULL; @@ -393,9 +416,11 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode, notify_msg->trig_synchro = OMPI_REGISTRY_SYNCHRO_MODE_NONE; gpr_replica_process_triggers(segment, trig, notify_msg); } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_SUCCESS; } else { OBJ_RELEASE(trackptr); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_ERROR; } } @@ -418,9 +443,12 @@ int gpr_replica_unsubscribe(ompi_registry_mode_t addr_mode, return OMPI_ERROR; } + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); + /* find trigger on replica - return id_tag */ if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, - addr_mode, segment, tokens, 0))) { + addr_mode, segment, tokens, 0))) { + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_ERROR; } @@ -435,10 +463,12 @@ int gpr_replica_unsubscribe(ompi_registry_mode_t addr_mode, ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item); OBJ_RELEASE(trackptr); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_SUCCESS; } /* if we get here, then couldn't find request */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_ERROR; } @@ -464,6 +494,7 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode, return OMPI_ERROR; } + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); /* enter request on notify tracking system */ trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); trackptr->requestor = NULL; @@ -493,8 +524,10 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode, notify_msg->trig_synchro = trig->synch_mode; gpr_replica_process_triggers(segment, trig, notify_msg); } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_SUCCESS; } else { + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); OBJ_RELEASE(trackptr); return OMPI_ERROR; } @@ -518,9 +551,11 @@ int gpr_replica_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode, return OMPI_ERROR; } + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); /* find trigger on replica - return id_tag */ if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, - addr_mode, segment, tokens, trigger))) { + addr_mode, segment, tokens, trigger))) { + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_ERROR; } @@ -535,10 +570,12 @@ int gpr_replica_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode, ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item); OBJ_RELEASE(trackptr); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_SUCCESS; } /* if we get here, then couldn't find request */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return OMPI_ERROR; } @@ -566,9 +603,11 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode, return answer; } + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); /* find the specified segment */ seg = gpr_replica_find_seg(false, segment); if (NULL == seg) { /* segment not found */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return answer; } if (mca_gpr_replica_debug) { @@ -577,13 +616,14 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode, } if (NULL == tokens) { /* wildcard case - return everything */ - keylist = NULL; + keylist = NULL; keys = NULL; } else { /* convert tokens to list of keys */ keylist = gpr_replica_get_key_list(segment, tokens); if (0 == (num_tokens = ompi_list_get_size(keylist))) { + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return answer; } @@ -640,6 +680,7 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode, ompi_process_info.name->jobid, ompi_process_info.name->vpid); } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return answer; } diff --git a/src/mca/gpr/replica/gpr_replica.h b/src/mca/gpr/replica/gpr_replica.h index 72c8ddf0d5..4bed034150 100644 --- a/src/mca/gpr/replica/gpr_replica.h +++ b/src/mca/gpr/replica/gpr_replica.h @@ -180,6 +180,7 @@ extern ompi_list_t mca_gpr_replica_notify_request_tracker; extern mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag; extern ompi_list_t mca_gpr_replica_free_notify_id_tags; extern int mca_gpr_replica_debug; +extern ompi_mutex_t mca_gpr_replica_mutex, mca_gpr_replica_internals_mutex; /* * Module open / close diff --git a/src/mca/gpr/replica/gpr_replica_component.c b/src/mca/gpr/replica/gpr_replica_component.c index fe47c017e1..e1c9674473 100644 --- a/src/mca/gpr/replica/gpr_replica_component.c +++ b/src/mca/gpr/replica/gpr_replica_component.c @@ -16,6 +16,9 @@ #include #include "include/constants.h" + +#include "threads/mutex.h" + #include "util/proc_info.h" #include "util/output.h" #include "util/pack.h" @@ -79,6 +82,8 @@ ompi_list_t mca_gpr_replica_notify_request_tracker; mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag; ompi_list_t mca_gpr_replica_free_notify_id_tags; int mca_gpr_replica_debug; +ompi_mutex_t mca_gpr_replica_component_mutex; +ompi_mutex_t mca_gpr_replica_mutex, mca_gpr_replica_internals_mutex; /* constructor - used to initialize state of keytable instance */ @@ -324,6 +329,11 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool *allow_multi_user_threads = true; *have_hidden_threads = false; + /* setup the thread locks */ + OBJ_CONSTRUCT(&mca_gpr_replica_component_mutex, ompi_mutex_t); + OBJ_CONSTRUCT(&mca_gpr_replica_internals_mutex, ompi_mutex_t); + OBJ_CONSTRUCT(&mca_gpr_replica_mutex, ompi_mutex_t); + /* initialize the registry head */ OBJ_CONSTRUCT(&mca_gpr_replica_head.registry, ompi_list_t); @@ -350,10 +360,10 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool } /* issue the non-blocking receive */ - rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); - if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { - return NULL; - } + rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL); + if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) { + return NULL; + } if (mca_gpr_replica_debug) { ompi_output(0, "nb receive setup"); @@ -377,44 +387,44 @@ int mca_gpr_replica_finalize(void) ompi_output(0, "finalizing gpr replica"); } -/* mca_gpr_replica_segment_t *seg; */ -/* mca_gpr_replica_keytable_t *kt; */ -/* mca_gpr_replica_keylist_t *kl; */ -/* mca_gpr_notify_request_tracker_t *tk; */ -/* mca_gpr_idtag_list_t *id; */ + /* mca_gpr_replica_segment_t *seg; */ + /* mca_gpr_replica_keytable_t *kt; */ + /* mca_gpr_replica_keylist_t *kl; */ + /* mca_gpr_notify_request_tracker_t *tk; */ + /* mca_gpr_idtag_list_t *id; */ -/* /\* free all storage, but only if this component was initialized *\/ */ + /* /\* free all storage, but only if this component was initialized *\/ */ -/* if (initialized) { */ + /* if (initialized) { */ -/* while (NULL != (seg = (mca_gpr_replica_segment_t*)ompi_list_remove_first(&mca_gpr_replica_head.registry))) { */ -/* OBJ_RELEASE(seg); */ -/* } */ -/* OBJ_DESTRUCT(&mca_gpr_replica_head.registry); */ + /* while (NULL != (seg = (mca_gpr_replica_segment_t*)ompi_list_remove_first(&mca_gpr_replica_head.registry))) { */ + /* OBJ_RELEASE(seg); */ + /* } */ + /* OBJ_DESTRUCT(&mca_gpr_replica_head.registry); */ -/* while (NULL != (kt = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.segment_dict))) { */ -/* OBJ_RELEASE(kt); */ -/* } */ -/* OBJ_DESTRUCT(&mca_gpr_replica_head.segment_dict); */ + /* while (NULL != (kt = (mca_gpr_replica_keytable_t*)ompi_list_remove_first(&mca_gpr_replica_head.segment_dict))) { */ + /* OBJ_RELEASE(kt); */ + /* } */ + /* OBJ_DESTRUCT(&mca_gpr_replica_head.segment_dict); */ -/* while (NULL != (kl = (mca_gpr_replica_keylist_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys))) { */ -/* OBJ_RELEASE(kl); */ -/* } */ -/* OBJ_DESTRUCT(&mca_gpr_replica_head.freekeys); */ + /* while (NULL != (kl = (mca_gpr_replica_keylist_t*)ompi_list_remove_first(&mca_gpr_replica_head.freekeys))) { */ + /* OBJ_RELEASE(kl); */ + /* } */ + /* OBJ_DESTRUCT(&mca_gpr_replica_head.freekeys); */ -/* while (NULL != (tk = (mca_gpr_notify_request_tracker_t*)ompi_list_remove_first(&mca_gpr_replica_notify_request_tracker))) { */ -/* OBJ_RELEASE(tk); */ -/* } */ -/* OBJ_DESTRUCT(&mca_gpr_replica_notify_request_tracker); */ + /* while (NULL != (tk = (mca_gpr_notify_request_tracker_t*)ompi_list_remove_first(&mca_gpr_replica_notify_request_tracker))) { */ + /* OBJ_RELEASE(tk); */ + /* } */ + /* OBJ_DESTRUCT(&mca_gpr_replica_notify_request_tracker); */ -/* while (NULL != (id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags))) { */ -/* OBJ_RELEASE(id); */ -/* } */ -/* OBJ_DESTRUCT(&mca_gpr_replica_free_notify_id_tags); */ -/* initialized = false; */ -/* } */ + /* while (NULL != (id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags))) { */ + /* OBJ_RELEASE(id); */ + /* } */ + /* OBJ_DESTRUCT(&mca_gpr_replica_free_notify_id_tags); */ + /* initialized = false; */ + /* } */ /* All done */ @@ -733,6 +743,9 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } + /******* LOCK *****/ + OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex); + /* enter request on notify tracking system */ trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); trackptr->requestor = ompi_name_server.copy_process_name(sender); @@ -748,6 +761,9 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, } ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex); + /****** UNLOCK ******/ + response = (int32_t)gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, mode, segment, tokens, 0, trackptr->id_tag); @@ -806,6 +822,11 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, segment, tokens, 0); if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */ + + + /******* LOCK *****/ + OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex); + /* find request on replica notify tracking system */ for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker); trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) && @@ -828,6 +849,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, ompi_list_append(&mca_gpr_replica_free_notify_id_tags, &ptr_free_id->item); /* release tracker item */ OBJ_RELEASE(trackptr); + + OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex); + /****** UNLOCK ******/ + } } else { response = (int32_t)MCA_GPR_NOTIFY_ID_MAX; @@ -895,8 +920,20 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, goto RETURN_ERROR; } + + /******* LOCK *****/ + OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex); + /* enter request on notify tracking system */ trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); + if (mca_gpr_replica_debug) { + if (NULL != sender) { + ompi_output(0, "gpr_replica_recv: received synchro req from [%d,%d,%d]", sender->cellid, + sender->jobid, sender->vpid); + } else { + ompi_output(0, "gpr_replica_recv: received synchro req from NULL"); + } + } trackptr->requestor = ompi_name_server.copy_process_name(sender); trackptr->req_tag = id_tag; trackptr->callback = NULL; @@ -910,13 +947,16 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, } ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex); + /****** UNLOCK ******/ + if(NULL != gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, - mode, segment, tokens, - trigger, trackptr->id_tag)) { - response = OMPI_SUCCESS; - } else { - response = OMPI_ERROR; - } + mode, segment, tokens, + trigger, trackptr->id_tag)) { + response = OMPI_SUCCESS; + } else { + response = OMPI_ERROR; + } if (OMPI_SUCCESS != ompi_pack(answer, &command, 1, MCA_GPR_OOB_PACK_CMD)) { goto RETURN_ERROR; @@ -979,6 +1019,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, segment, tokens, trigger); if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */ + + /******* LOCK *****/ + OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex); + /* find request on replica notify tracking system */ for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker); trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) && @@ -1001,6 +1045,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, ompi_list_append(&mca_gpr_replica_free_notify_id_tags, &ptr_free_id->item); /* release tracker item */ OBJ_RELEASE(trackptr); + + OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex); + /****** UNLOCK ******/ + } } else { response = (int32_t)MCA_GPR_NOTIFY_ID_MAX; @@ -1022,6 +1070,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, /***** TEST INTERNALS *****/ } else if (MCA_GPR_TEST_INTERNALS_CMD == command) { + + /******* LOCK *****/ + OMPI_THREAD_LOCK(&mca_gpr_replica_component_mutex); + if ((OMPI_SUCCESS != ompi_unpack(buffer, &test_level, 1, OMPI_INT32)) || (0 > test_level)) { goto RETURN_ERROR; @@ -1053,8 +1105,11 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender, if (0 > mca_oob_send_packed(sender, answer, tag, 0)) { /* RHC -- not sure what to do if the return send fails */ } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_component_mutex); + /****** UNLOCK ******/ + - /**** UNRECOGNIZED ****/ + /**** UNRECOGNIZED COMMAND ****/ } else { /* got an unrecognized command */ RETURN_ERROR: ompi_buffer_init(&error_answer, 8); @@ -1123,8 +1178,8 @@ void gpr_replica_remote_notify(ompi_process_name_t *recipient, int recipient_tag if (OMPI_SUCCESS != ompi_pack(msg, regval->object, regval->object_size, OMPI_BYTE)) { return; } - /* TSW - should we add */ - /* OBJ_RELEASE(regval); */ + /* TSW - should we add */ + /* OBJ_RELEASE(regval); */ } } if (OMPI_SUCCESS != ompi_pack(msg, &message->num_tokens, 1, OMPI_INT32)) { diff --git a/src/mca/gpr/replica/gpr_replica_internals.c b/src/mca/gpr/replica/gpr_replica_internals.c index 495a503e3d..b6ad3d20e6 100644 --- a/src/mca/gpr/replica/gpr_replica_internals.c +++ b/src/mca/gpr/replica/gpr_replica_internals.c @@ -20,6 +20,9 @@ #include #include "include/constants.h" + +#include "threads/mutex.h" + #include "util/output.h" #include "util/printf.h" #include "util/proc_info.h" @@ -29,6 +32,7 @@ #include "gpr_replica.h" #include "gpr_replica_internals.h" + /* * */ @@ -38,17 +42,22 @@ mca_gpr_replica_segment_t *gpr_replica_define_segment(char *segment) mca_gpr_replica_segment_t *seg; mca_gpr_replica_key_t key; - key = gpr_replica_define_key(segment, NULL); - if (MCA_GPR_REPLICA_KEY_MAX == key) { /* got some kind of error code */ + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + + key = gpr_replica_define_key(segment, NULL); + if (MCA_GPR_REPLICA_KEY_MAX == key) { /* got some kind of error code */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return NULL; - } + } - /* need to add the segment to the registry */ - seg = OBJ_NEW(mca_gpr_replica_segment_t); - seg->segment = key; - ompi_list_append(&mca_gpr_replica_head.registry, &seg->item); + /* need to add the segment to the registry */ + seg = OBJ_NEW(mca_gpr_replica_segment_t); + seg->segment = key; + ompi_list_append(&mca_gpr_replica_head.registry, &seg->item); - return seg; + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); + + return seg; } @@ -57,6 +66,8 @@ mca_gpr_replica_segment_t *gpr_replica_find_seg(bool create, char *segment) mca_gpr_replica_keytable_t *ptr_seg; mca_gpr_replica_segment_t *seg; + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + /* search the registry segments to find which one is being referenced */ for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); @@ -67,14 +78,18 @@ mca_gpr_replica_segment_t *gpr_replica_find_seg(bool create, char *segment) seg != (mca_gpr_replica_segment_t*)ompi_list_get_end(&mca_gpr_replica_head.registry); seg = (mca_gpr_replica_segment_t*)ompi_list_get_next(seg)) { if(seg->segment == ptr_seg->key) { + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(seg); } } } } + + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); + if (create) { - /* didn't find the dictionary entry - create it */ - return gpr_replica_define_segment(segment); + /* didn't find the dictionary entry - create it */ + return gpr_replica_define_segment(segment); } return NULL; /* don't create it - just return NULL */ } @@ -85,12 +100,15 @@ mca_gpr_replica_keytable_t *gpr_replica_find_dict_entry(char *segment, char *tok mca_gpr_replica_keytable_t *ptr_key; mca_gpr_replica_segment_t *seg; + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + /* search the registry segments to find which one is being referenced */ for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) { if (0 == strcmp(segment, ptr_seg->token)) { if (NULL == token) { /* just want segment token-key pair */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(ptr_seg); } /* search registry to find segment */ @@ -103,15 +121,19 @@ mca_gpr_replica_keytable_t *gpr_replica_find_dict_entry(char *segment, char *tok ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable); ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) { if (0 == strcmp(token, ptr_key->token)) { + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(ptr_key); } } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(NULL); /* couldn't find the specified entry */ } } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(NULL); /* couldn't find segment, even though we found entry in registry dict */ } } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(NULL); /* couldn't find segment token-key pair */ } @@ -152,15 +174,19 @@ char *gpr_replica_get_token(char *segment, mca_gpr_replica_key_t key) return NULL; } + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + /* find the matching key */ for (ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&seg->keytable); ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable); ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) { if (key == ptr_key->key) { answer = strdup(ptr_key->token); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return answer; } } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(NULL); /* couldn't find the specified entry */ } @@ -200,6 +226,9 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token) /* if token is NULL, then this is defining a segment name. Check dictionary to ensure uniqueness */ if (NULL == token) { + + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + for (ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_first(&mca_gpr_replica_head.segment_dict); ptr_seg != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&mca_gpr_replica_head.segment_dict); ptr_seg = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_seg)) { @@ -213,9 +242,10 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token) new->token = strdup(segment); if (0 == ompi_list_get_size(&mca_gpr_replica_head.freekeys)) { /* no keys waiting for reuse */ if (MCA_GPR_REPLICA_KEY_MAX-2 > mca_gpr_replica_head.lastkey) { /* have a key left */ - mca_gpr_replica_head.lastkey++; - new->key = mca_gpr_replica_head.lastkey; + mca_gpr_replica_head.lastkey++; + new->key = mca_gpr_replica_head.lastkey; } else { /* out of keys */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return MCA_GPR_REPLICA_KEY_MAX; } } else { @@ -223,8 +253,10 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token) new->key = ptr_key->key; } ompi_list_append(&mca_gpr_replica_head.segment_dict, &new->item); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return new->key; } + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); /* okay, token is specified */ /* search the registry segments to find which one is being referenced */ @@ -235,6 +267,7 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token) ptr_key != (mca_gpr_replica_keytable_t*)ompi_list_get_end(&seg->keytable); ptr_key = (mca_gpr_replica_keytable_t*)ompi_list_get_next(ptr_key)) { if (0 == strcmp(token, ptr_key->token)) { + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return ptr_key->key; /* already taken, report value */ } } @@ -249,6 +282,7 @@ mca_gpr_replica_key_t gpr_replica_define_key(char *segment, char *token) new->key = ptr_key->key; } ompi_list_append(&seg->keytable, &new->item); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return new->key; } /* couldn't find segment */ @@ -268,6 +302,8 @@ int gpr_replica_delete_key(char *segment, char *token) return(OMPI_ERROR); } + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + /* find the segment */ seg = gpr_replica_find_seg(false, segment); if (NULL != seg) { @@ -275,13 +311,16 @@ int gpr_replica_delete_key(char *segment, char *token) /* if specified token is NULL, then this is deleting a segment name.*/ if (NULL == token) { if (OMPI_SUCCESS != gpr_replica_empty_segment(seg)) { /* couldn't empty segment */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return OMPI_ERROR; } /* now remove the dictionary entry from the global registry dictionary*/ ptr_seg = gpr_replica_find_dict_entry(segment, NULL); if (NULL == ptr_seg) { /* failed to find dictionary entry */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return OMPI_ERROR; } + /* add key to global registry's freekey list */ new = OBJ_NEW(mca_gpr_replica_keytable_t); new->token = NULL; @@ -290,6 +329,9 @@ int gpr_replica_delete_key(char *segment, char *token) /* remove the dictionary entry */ ompi_list_remove_item(&mca_gpr_replica_head.segment_dict, &ptr_seg->item); + + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); + return(OMPI_SUCCESS); } else { /* token not null, so need to find dictionary element to delete */ @@ -316,12 +358,15 @@ int gpr_replica_delete_key(char *segment, char *token) /* now remove the dictionary entry from the segment's dictionary */ ompi_list_remove_item(&seg->keytable, &ptr_key->item); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(OMPI_SUCCESS); } } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(OMPI_ERROR); /* if we get here, then we couldn't find token in dictionary */ } } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return(OMPI_ERROR); /* if we get here, then we couldn't find segment */ } @@ -333,6 +378,8 @@ int gpr_replica_empty_segment(mca_gpr_replica_segment_t *seg) /* need to free memory from each entry - remove_last returns pointer to the entry */ + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + /* empty the segment's registry */ while (!ompi_list_is_empty(&seg->registry_entries)) { ptr = (mca_gpr_replica_core_t*)ompi_list_remove_first(&seg->registry_entries); @@ -354,6 +401,8 @@ int gpr_replica_empty_segment(mca_gpr_replica_segment_t *seg) ompi_list_remove_item(&mca_gpr_replica_head.registry, &seg->item); OBJ_RELEASE(seg); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); + return OMPI_SUCCESS; } @@ -437,11 +486,12 @@ bool gpr_replica_check_key_list(ompi_registry_mode_t addr_mode, return false; } -mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode, - ompi_registry_notify_action_t action, - ompi_registry_mode_t addr_mode, - char *segment, char **tokens, int trigger, - mca_gpr_notify_id_t id_tag) +mca_gpr_replica_trigger_list_t* +gpr_replica_construct_trigger(ompi_registry_synchro_mode_t synchro_mode, + ompi_registry_notify_action_t action, + ompi_registry_mode_t addr_mode, + char *segment, char **tokens, int trigger, + mca_gpr_notify_id_t id_tag) { mca_gpr_replica_segment_t *seg; mca_gpr_replica_core_t *reg; @@ -450,8 +500,11 @@ mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_sync mca_gpr_replica_key_t *keyptr; int i, num_tokens; + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + seg = gpr_replica_find_seg(true, segment); if (NULL == seg) { /* couldn't find or create segment */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return NULL; } @@ -519,6 +572,8 @@ mca_gpr_replica_trigger_list_t *gpr_replica_construct_trigger(ompi_registry_sync ompi_list_append(&seg->triggers, &trig->item); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); + return trig; } @@ -536,8 +591,11 @@ mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t sync int i=0, num_tokens=0; bool found=false, mismatch=false; + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + seg = gpr_replica_find_seg(false, segment); if (NULL == seg) { /* couldn't find segment */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return MCA_GPR_NOTIFY_ID_MAX; } @@ -565,6 +623,7 @@ mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t sync } } + /* search segment's trigger list for specified trigger event */ for (trig = (mca_gpr_replica_trigger_list_t*)ompi_list_get_first(&seg->triggers); trig != (mca_gpr_replica_trigger_list_t*)ompi_list_get_end(&seg->triggers) && !found; @@ -595,9 +654,12 @@ mca_gpr_notify_id_t gpr_replica_remove_trigger(ompi_registry_synchro_mode_t sync id_tag = trig->id_tag; ompi_list_remove_item(&seg->triggers, &trig->item); OBJ_RELEASE(trig); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return id_tag; } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); + return MCA_GPR_NOTIFY_ID_MAX; } @@ -684,6 +746,8 @@ void gpr_replica_process_triggers(char *segment, return; } + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + seg = gpr_replica_find_seg(false, segment); if (NULL == seg) { /* couldn't find segment */ return; @@ -694,28 +758,30 @@ void gpr_replica_process_triggers(char *segment, ompi_process_info.name->jobid, ompi_process_info.name->vpid); } + /* find corresponding notify request */ found = false; for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker); - trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker); - trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) { + trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker); + trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)) { if (trackptr->id_tag == trig->id_tag) { found = true; - break; + break; } } if (!found) { /* didn't find request */ ompi_output(0, "Notification error - request not found"); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); return; } /* process request */ if (NULL == trackptr->requestor) { /* local request - callback fn with their tag */ - if (mca_gpr_replica_debug) { - ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: local callback", ompi_process_info.name->cellid, - ompi_process_info.name->jobid, ompi_process_info.name->vpid); - } + if (mca_gpr_replica_debug) { + ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: local callback", ompi_process_info.name->cellid, + ompi_process_info.name->jobid, ompi_process_info.name->vpid); + } trackptr->callback(message, trackptr->user_tag); /* dismantle message and free memory */ while (NULL != (data = (ompi_registry_object_t*)ompi_list_remove_first(&message->data))) { @@ -725,38 +791,40 @@ void gpr_replica_process_triggers(char *segment, for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) { free(*tokptr); } - if(NULL != message->tokens) { + if(NULL != message->tokens) { free(message->tokens); - } + } free(message); - if (mca_gpr_replica_debug) { - ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: data released", ompi_process_info.name->cellid, - ompi_process_info.name->jobid, ompi_process_info.name->vpid); - } + if (mca_gpr_replica_debug) { + ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: data released", ompi_process_info.name->cellid, + ompi_process_info.name->jobid, ompi_process_info.name->vpid); + } } else { /* remote request - send message back */ gpr_replica_remote_notify(trackptr->requestor, trackptr->req_tag, message); - if (mca_gpr_replica_debug) { - ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: remote message sent", ompi_process_info.name->cellid, - ompi_process_info.name->jobid, ompi_process_info.name->vpid); - } + if (mca_gpr_replica_debug) { + ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: remote message sent", ompi_process_info.name->cellid, + ompi_process_info.name->jobid, ompi_process_info.name->vpid); + } } /* if one-shot, remove request from tracking system */ if (OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT & trig->synch_mode) { - ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item); - OBJ_RELEASE(trackptr); + ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + OBJ_RELEASE(trackptr); - /* ....and from the corresponding registry segment */ - ompi_list_remove_item(&seg->triggers, &trig->item); - OBJ_RELEASE(trig); + /* ....and from the corresponding registry segment */ + ompi_list_remove_item(&seg->triggers, &trig->item); + OBJ_RELEASE(trig); } if (mca_gpr_replica_debug) { ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: complete", ompi_process_info.name->cellid, ompi_process_info.name->jobid, ompi_process_info.name->vpid); } + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); + } ompi_list_t *gpr_replica_test_internals(int level) @@ -771,6 +839,8 @@ ompi_list_t *gpr_replica_test_internals(int level) mca_gpr_replica_keytable_t *dict_entry; bool success; + OMPI_THREAD_LOCK(&mca_gpr_replica_internals_mutex); + test_results = OBJ_NEW(ompi_list_t); ompi_output(0, "testing define segment"); @@ -963,5 +1033,7 @@ ompi_list_t *gpr_replica_test_internals(int level) /* check ability to empty segment */ + OMPI_THREAD_UNLOCK(&mca_gpr_replica_internals_mutex); + return test_results; } diff --git a/src/mca/ns/replica/src/ns_replica.c b/src/mca/ns/replica/src/ns_replica.c index cb84016bc2..2c428a3953 100644 --- a/src/mca/ns/replica/src/ns_replica.c +++ b/src/mca/ns/replica/src/ns_replica.c @@ -6,6 +6,8 @@ */ #include +#include "threads/mutex.h" + #include "ompi_config.h" #include "util/output.h" #include "mca/mca.h" @@ -22,10 +24,14 @@ mca_ns_base_cellid_t ns_replica_create_cellid(void) { + OMPI_THREAD_LOCK(&mca_ns_replica_mutex); + if ((MCA_NS_BASE_CELLID_MAX-2) >= mca_ns_replica_last_used_cellid) { mca_ns_replica_last_used_cellid = mca_ns_replica_last_used_cellid + 1; + OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex); return(mca_ns_replica_last_used_cellid); } else { + OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex); return MCA_NS_BASE_CELLID_MAX; } } @@ -34,14 +40,18 @@ mca_ns_base_jobid_t ns_replica_create_jobid(void) { mca_ns_replica_name_tracker_t *new; + OMPI_THREAD_LOCK(&mca_ns_replica_mutex); + if ((MCA_NS_BASE_JOBID_MAX-2) >= mca_ns_replica_last_used_jobid) { mca_ns_replica_last_used_jobid = mca_ns_replica_last_used_jobid + 1; new = OBJ_NEW(mca_ns_replica_name_tracker_t); new->job = mca_ns_replica_last_used_jobid; new->last_used_vpid = 0; ompi_list_append(&mca_ns_replica_name_tracker, &new->item); + OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex); return(mca_ns_replica_last_used_jobid); } else { + OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex); return MCA_NS_BASE_JOBID_MAX; } } @@ -52,17 +62,24 @@ mca_ns_base_vpid_t ns_replica_reserve_range(mca_ns_base_jobid_t job, mca_ns_base mca_ns_replica_name_tracker_t *ptr; mca_ns_base_vpid_t start; + OMPI_THREAD_LOCK(&mca_ns_replica_mutex); + for (ptr = (mca_ns_replica_name_tracker_t*)ompi_list_get_first(&mca_ns_replica_name_tracker); ptr != (mca_ns_replica_name_tracker_t*)ompi_list_get_end(&mca_ns_replica_name_tracker); ptr = (mca_ns_replica_name_tracker_t*)ompi_list_get_next(ptr)) { if (job == ptr->job) { /* found the specified job */ if ((MCA_NS_BASE_VPID_MAX-range-2) >= ptr->last_used_vpid) { /* requested range available */ - start = ptr->last_used_vpid + 1; - ptr->last_used_vpid = ptr->last_used_vpid + range; + start = ptr->last_used_vpid; + if (0 == job && start == 0) { /* vpid=0 reserved for job=0 */ + start = 1; + } + ptr->last_used_vpid = start + range; + OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex); return(start); } } } + OMPI_THREAD_UNLOCK(&mca_ns_replica_mutex); return MCA_NS_BASE_VPID_MAX; } diff --git a/src/mca/ns/replica/src/ns_replica.h b/src/mca/ns/replica/src/ns_replica.h index 6b2a39662f..0d4da253e9 100644 --- a/src/mca/ns/replica/src/ns_replica.h +++ b/src/mca/ns/replica/src/ns_replica.h @@ -9,6 +9,7 @@ #include "ompi_config.h" #include "include/types.h" #include "include/constants.h" +#include "threads/mutex.h" #include "class/ompi_list.h" #include "mca/oob/oob.h" #include "mca/ns/ns.h" @@ -34,6 +35,7 @@ extern mca_ns_base_cellid_t mca_ns_replica_last_used_cellid; extern mca_ns_base_jobid_t mca_ns_replica_last_used_jobid; extern ompi_list_t mca_ns_replica_name_tracker; extern int mca_ns_replica_debug; +extern ompi_mutex_t mca_ns_replica_mutex; /* * Module open / close diff --git a/src/mca/ns/replica/src/ns_replica_component.c b/src/mca/ns/replica/src/ns_replica_component.c index 3927855781..7d24f03979 100644 --- a/src/mca/ns/replica/src/ns_replica_component.c +++ b/src/mca/ns/replica/src/ns_replica_component.c @@ -18,6 +18,7 @@ #include "ompi_config.h" #include "include/constants.h" +#include "threads/mutex.h" #include "util/proc_info.h" #include "util/output.h" #include "mca/mca.h" @@ -102,6 +103,7 @@ mca_ns_base_cellid_t mca_ns_replica_last_used_cellid; mca_ns_base_jobid_t mca_ns_replica_last_used_jobid; ompi_list_t mca_ns_replica_name_tracker; int mca_ns_replica_debug; +ompi_mutex_t mca_ns_replica_mutex; /* * don't really need this function - could just put NULL in the above structure diff --git a/src/mpi/runtime/ompi_mpi_init.c b/src/mpi/runtime/ompi_mpi_init.c index 016ba9eb6a..67dd03ccc5 100644 --- a/src/mpi/runtime/ompi_mpi_init.c +++ b/src/mpi/runtime/ompi_mpi_init.c @@ -156,10 +156,10 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) /* * Register my process info with my replica. */ - if (OMPI_SUCCESS != (ret = ompi_rte_register())) { - error = "ompi_rte_init: failed in ompi_rte_register()\n"; - goto error; - } +/* if (OMPI_SUCCESS != (ret = ompi_rte_register())) { */ +/* error = "ompi_rte_init: failed in ompi_rte_register()\n"; */ +/* goto error; */ +/* } */ /* finalize the rte startup */ diff --git a/src/tools/mpirun/mpirun2.c b/src/tools/mpirun/mpirun2.c index 7c60ad63c8..4c0427b9e1 100644 --- a/src/tools/mpirun/mpirun2.c +++ b/src/tools/mpirun/mpirun2.c @@ -222,10 +222,10 @@ main(int argc, char *argv[]) /* * Register my process info with my replica. */ - if (OMPI_SUCCESS != (ret = ompi_rte_register())) { - ompi_output(0, "ompi_rte_init: failed in ompi_rte_register()\n"); - return ret; - } +/* if (OMPI_SUCCESS != (ret = ompi_rte_register())) { */ +/* ompi_output(0, "ompi_rte_init: failed in ompi_rte_register()\n"); */ +/* return ret; */ +/* } */ /* finalize the rte startup */ if (OMPI_SUCCESS != (ret = ompi_rte_init_finalstage(&multi_thread,