From 92a5f2da1f29203b125c4c36d0e507b11739fcd8 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 23 Sep 2004 14:35:51 +0000 Subject: [PATCH] ka-ching This commit was SVN r2830. --- src/mca/gpr/replica/gpr_replica.c | 292 +++++++++++++++++++----------- 1 file changed, 191 insertions(+), 101 deletions(-) diff --git a/src/mca/gpr/replica/gpr_replica.c b/src/mca/gpr/replica/gpr_replica.c index ddfcef5f40..4e997bb1d3 100644 --- a/src/mca/gpr/replica/gpr_replica.c +++ b/src/mca/gpr/replica/gpr_replica.c @@ -25,6 +25,8 @@ #include "util/output.h" #include "util/proc_info.h" +#include "util/sys_info.h" + #include "mca/gpr/base/base.h" #include "gpr_replica.h" #include "gpr_replica_internals.h" @@ -409,20 +411,25 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode, ompi_registry_notify_cb_fn_t cb_func, void *user_tag) { int rc; + mca_gpr_notify_id_t local_idtag; + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); - rc = gpr_replica_subscribe_nl(addr_mode,action,segment,tokens,cb_func,user_tag); + + /* enter request on notify tracking system */ + local_idtag = gpr_replica_enter_notify_request(NULL, 0, cb_func, user_tag); + + /* process subscription */ + rc = gpr_replica_subscribe_nl(addr_mode,action,segment,tokens,local_idtag); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return rc; } int gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode, - ompi_registry_notify_action_t action, - char *segment, char **tokens, - ompi_registry_notify_cb_fn_t cb_func, void *user_tag) + ompi_registry_notify_action_t action, + char *segment, char **tokens, mca_gpr_notify_id_t id_tag) { - mca_gpr_notify_request_tracker_t *trackptr; - mca_gpr_idtag_list_t *ptr_free_id; mca_gpr_replica_trigger_list_t *trig; ompi_registry_notify_message_t *notify_msg; @@ -438,25 +445,10 @@ int gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode, } - /* enter request on notify tracking system */ - trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); - trackptr->requestor = NULL; - trackptr->req_tag = 0; - trackptr->callback = cb_func; - trackptr->user_tag = user_tag; - if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) { - trackptr->id_tag = mca_gpr_replica_last_notify_id_tag; - mca_gpr_replica_last_notify_id_tag++; - } else { - ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags); - trackptr->id_tag = ptr_free_id->id_tag; - } - - /* construct the trigger - add to notify tracking system if success, otherwise dump */ + /* construct the trigger - add to notify tracking system if success, otherwise dump */ if (NULL != (trig = gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, addr_mode, segment, tokens, - 0, trackptr->id_tag))) { - ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + 0, id_tag))) { if (OMPI_REGISTRY_NOTIFY_PRE_EXISTING & action) { /* want list of everything there */ notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens); @@ -466,7 +458,6 @@ int gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode, } return OMPI_SUCCESS; } else { - OBJ_RELEASE(trackptr); return OMPI_ERROR; } } @@ -480,16 +471,20 @@ int gpr_replica_unsubscribe(ompi_registry_mode_t addr_mode, OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); rc = gpr_replica_unsubscribe_nl(addr_mode,action,segment,tokens); OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); - return rc; + + if (MCA_GPR_NOTIFY_ID_MAX == rc) { + return OMPI_ERROR; + } + + return OMPI_SUCCESS; } -int gpr_replica_unsubscribe_nl(ompi_registry_mode_t addr_mode, - ompi_registry_notify_action_t action, - char *segment, char **tokens) +mca_gpr_notify_id_t gpr_replica_unsubscribe_nl(ompi_registry_mode_t addr_mode, + ompi_registry_notify_action_t action, + char *segment, char **tokens) { - mca_gpr_notify_request_tracker_t *trackptr; - mca_gpr_notify_id_t id_tag; + mca_gpr_notify_id_t id_tag, req_idtag; if (mca_gpr_replica_debug) { ompi_output(0, "[%d,%d,%d] gpr replica: unsubscribe entered: segment %s 1st token %s", @@ -499,32 +494,22 @@ int gpr_replica_unsubscribe_nl(ompi_registry_mode_t addr_mode, /* protect against errors */ if (NULL == segment) { - return OMPI_ERROR; + return MCA_GPR_NOTIFY_ID_MAX; } /* find trigger on replica - return id_tag */ - if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, - addr_mode, segment, tokens, 0))) { - return OMPI_ERROR; + id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action, + addr_mode, segment, tokens, 0); + + if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */ + + req_idtag = gpr_replica_remove_notify_request(id_tag); + return req_idtag; + } else { + return MCA_GPR_NOTIFY_ID_MAX; } - /* find request on notify tracking system */ - for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker); - trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) && - trackptr->id_tag != id_tag; - trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)); - - /* ...and remove it */ - if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) { - ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item); - OBJ_RELEASE(trackptr); - - return OMPI_SUCCESS; - } - - /* if we get here, then couldn't find request */ - return OMPI_ERROR; } int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode, @@ -533,19 +518,26 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode, ompi_registry_notify_cb_fn_t cb_func, void *user_tag) { int rc; + mca_gpr_notify_id_t local_idtag; + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); - rc = gpr_replica_synchro_nl(synchro_mode,addr_mode,segment,tokens,trigger,cb_func,user_tag); + + /* enter request on notify tracking system */ + local_idtag = gpr_replica_enter_notify_request(NULL, 0, cb_func, user_tag); + + /* process synchro request */ + rc = gpr_replica_synchro_nl(synchro_mode, addr_mode, + segment, tokens, trigger, local_idtag); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); return rc; } int gpr_replica_synchro_nl(ompi_registry_synchro_mode_t synchro_mode, - ompi_registry_mode_t addr_mode, - char *segment, char **tokens, int trigger, - ompi_registry_notify_cb_fn_t cb_func, void *user_tag) + ompi_registry_mode_t addr_mode, + char *segment, char **tokens, int trigger, + mca_gpr_notify_id_t id_tag) { - mca_gpr_notify_request_tracker_t *trackptr; - mca_gpr_idtag_list_t *ptr_free_id; mca_gpr_replica_trigger_list_t *trig; ompi_registry_notify_message_t *notify_msg; @@ -560,25 +552,10 @@ int gpr_replica_synchro_nl(ompi_registry_synchro_mode_t synchro_mode, return OMPI_ERROR; } - /* enter request on notify tracking system */ - trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t); - trackptr->requestor = NULL; - trackptr->req_tag = 0; - trackptr->callback = cb_func; - trackptr->user_tag = user_tag; - if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) { - trackptr->id_tag = mca_gpr_replica_last_notify_id_tag; - mca_gpr_replica_last_notify_id_tag++; - } else { - ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags); - trackptr->id_tag = ptr_free_id->id_tag; - } - - /* construct the trigger - add to notify tracking system if success, otherwise dump */ + /* construct the trigger */ if (NULL != (trig = gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, addr_mode, segment, tokens, - trigger, trackptr->id_tag))) { - ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item); + trigger, id_tag))) { /* if synchro condition already met, construct and send message */ if ((OMPI_REGISTRY_SYNCHRO_MODE_GT_EQUAL & synchro_mode && trig->count >= trigger) || @@ -591,7 +568,6 @@ int gpr_replica_synchro_nl(ompi_registry_synchro_mode_t synchro_mode, } return OMPI_SUCCESS; } else { - OBJ_RELEASE(trackptr); return OMPI_ERROR; } } @@ -604,15 +580,19 @@ int gpr_replica_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode, OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); rc = gpr_replica_cancel_synchro_nl(synchro_mode,addr_mode,segment,tokens,trigger); OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); - return rc; + + if (MCA_GPR_NOTIFY_ID_MAX == rc) { + return OMPI_ERROR; + } + + return OMPI_SUCCESS; } -int gpr_replica_cancel_synchro_nl(ompi_registry_synchro_mode_t synchro_mode, - ompi_registry_mode_t addr_mode, - char *segment, char **tokens, int trigger) +mca_gpr_notify_id_t gpr_replica_cancel_synchro_nl(ompi_registry_synchro_mode_t synchro_mode, + ompi_registry_mode_t addr_mode, + char *segment, char **tokens, int trigger) { - mca_gpr_notify_request_tracker_t *trackptr; - mca_gpr_notify_id_t id_tag; + mca_gpr_notify_id_t id_tag, req_idtag; if (mca_gpr_replica_debug) { ompi_output(0, "[%d,%d,%d] gpr replica: cancel_synchro entered: segment %s 1st token %s", @@ -625,30 +605,140 @@ int gpr_replica_cancel_synchro_nl(ompi_registry_synchro_mode_t synchro_mode, return OMPI_ERROR; } - /* find trigger on replica - return id_tag */ - if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, - addr_mode, segment, tokens, trigger))) { - return OMPI_ERROR; + /* find trigger on replica - return local id_tag */ + id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE, + addr_mode, segment, tokens, trigger); + + if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */ + /* remove notify request - return requestor id_tag */ + req_idtag = gpr_replica_remove_notify_request(id_tag); + return req_idtag; + } else { + return MCA_GPR_NOTIFY_ID_MAX; } - - /* find request on notify tracking system */ - for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker); - trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) && - trackptr->id_tag != id_tag; - trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr)); - - /* ...and remove it */ - if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) { - ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item); - OBJ_RELEASE(trackptr); - - return OMPI_SUCCESS; - } - - /* if we get here, then couldn't find request */ - return OMPI_ERROR; } + +int gpr_replica_rte_register(char *contact_info, size_t num_procs, + ompi_registry_notify_cb_fn_t start_cb_func, void *start_user_tag, + ompi_registry_notify_cb_fn_t end_cb_func, void *end_user_tag) +{ + int ret; + mca_gpr_notify_id_t local_idtag1, local_idtag2; + ompi_buffer_t buffer; + + /* create the buffer to store the local information */ + ompi_buffer_init(&buffer, 0); + ompi_pack_string(buffer, contact_info); + ompi_pack(buffer, &ompi_process_info.pid, 1, OMPI_INT32); + ompi_pack_string(buffer, ompi_system_info.nodename); + + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); + + local_idtag1 = gpr_replica_enter_notify_request(NULL, 0, start_cb_func, start_user_tag); + + local_idtag2 = gpr_replica_enter_notify_request(NULL, 0, end_cb_func, end_user_tag); + + ret = gpr_replica_rte_register_nl(contact_info, buffer, num_procs, + local_idtag1, local_idtag2); + + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); + return ret; +} + +int gpr_replica_rte_register_nl(char *contact_info, ompi_buffer_t buffer, size_t num_procs, + mca_gpr_notify_id_t start_idtag, + mca_gpr_notify_id_t end_idtag) +{ + char *segment; + char *keys[2]; + void *addr; + int rc, size; + ompi_process_name_t proc={0,0,0}; + + + /* extract process name from contact info */ + mca_oob_parse_contact_info(contact_info, &proc, NULL); + + /* setup keys and segment for this job */ + asprintf(&segment, "ompi-job-%s", ns_base_get_jobid_string(&proc)); + keys[0] = ns_base_get_proc_name_string(&proc); + keys[1] = NULL; + + if (mca_gpr_replica_debug) { + ompi_output(0, "gpr_replica_register: entered for proc %s", keys[0]); + } + + /* peek the buffer and resulting size */ + ompi_buffer_get(buffer, &addr, &size); + + /* place on registry, no overwrite - error if already there */ + rc = gpr_replica_put_nl(OMPI_REGISTRY_XAND, + segment, keys, addr, size); + + if (OMPI_SUCCESS != rc) { + if (mca_gpr_replica_debug) { + ompi_output(0, "gpr_replica_register: duplicate registration attempt from %s", keys[0]); + } + return rc; + } + + /* register a synchro on the segment so we get notified when everyone registers */ + rc = gpr_replica_synchro_nl( + OMPI_REGISTRY_SYNCHRO_MODE_LEVEL|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT, + OMPI_REGISTRY_OR, + segment, + NULL, + num_procs, + start_idtag); + + /* register a synchro on the segment so we get notified when everyone is gone */ + rc = gpr_replica_synchro_nl( + OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT, + OMPI_REGISTRY_OR, + segment, + NULL, + 0, + end_idtag); + + return rc; +} + +int gpr_replica_rte_unregister(char *proc_name_string) +{ + int ret; + + OMPI_THREAD_LOCK(&mca_gpr_replica_mutex); + ret = gpr_replica_rte_unregister_nl(proc_name_string); + OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex); + + return ret; +} + + +int gpr_replica_rte_unregister_nl(char *proc_name_string) +{ + char *segment; + char *keys[2]; + int rc; + ompi_process_name_t *proc; + + /* convert string to process name */ + proc = ns_base_convert_string_to_process_name(proc_name_string); + + /* setup keys and segment for this job */ + asprintf(&segment, "ompi-job-%s", ns_base_get_jobid_string(proc)); + keys[0] = strdup(proc_name_string); + keys[1] = NULL; + + rc = gpr_replica_delete_object_nl(OMPI_REGISTRY_XAND, segment, keys); + free(keys[0]); + free(proc); + return rc; + +} + + ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode, char *segment, char **tokens) {