1
1
openmpi/src/mca/gpr/proxy/gpr_proxy.h
Ralph Castain 8271d3f30e Okay, here is the massive checkin that restructures the registry trigger system for scalability. Actually, it isn't "quite" as large as it looks - it just touches a bunch of files.
Also included is a fix to the attribute problem for singletons.

Short explanation:
The prior system placed triggers and subscriptions on the registry for each process - approximately eight/process. Each of these had to be checked every time there was a registry operation such as a "put" or "increment-value". For large numbers of processes, this repetitive checking consumed some significant time.

The new system allows processes to "attach" to existing triggers and subscriptions, without creating a new one. Thus, there are now only eight triggers and five subscriptions on a job - *regardless of how many processes are being run*. This means that the registry now takes the same amount of time (which is pretty darn short) to process an operation regardless of how many processes are in a job.

I'll provide some startup times from scalability tests shortly - need to complete the commit so I can move the system to an appropriate cluster.

This commit was SVN r6164.
2005-06-24 16:59:37 +00:00

217 строки
5.7 KiB
C

/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#ifndef ORTE_GPR_PROXY_H
#define ORTE_GPR_PROXY_H
#include "orte_config.h"
#include "include/orte_types.h"
#include "class/ompi_object.h"
#include "class/orte_pointer_array.h"
#include "dps/dps_types.h"
#include "util/proc_info.h"
#include "mca/gpr/base/base.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/*
* Module open / close
*/
int orte_gpr_proxy_open(void);
int orte_gpr_proxy_close(void);
/*
* Startup / Shutdown
*/
orte_gpr_base_module_t*
orte_gpr_proxy_component_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority);
int orte_gpr_proxy_module_init(void);
int orte_gpr_proxy_finalize(void);
/*
* proxy-local types
*/
typedef struct {
ompi_object_t super; /**< Allows this to be an object */
orte_gpr_subscription_id_t id; /**< id of this subscription */
orte_gpr_notify_cb_fn_t callback; /**< Function to be called for notificaiton */
void *user_tag; /**< User-provided tag for callback function */
} orte_gpr_proxy_subscriber_t;
OBJ_CLASS_DECLARATION(orte_gpr_proxy_subscriber_t);
#define ORTE_GPR_PROXY_MAX_SIZE INT32_MAX
#define ORTE_GPR_PROXY_BLOCK_SIZE 100
/*
* globals used within proxy component
*/
typedef struct {
int debug;
size_t block_size;
size_t max_size;
orte_gpr_subscription_id_t num_subs;
orte_pointer_array_t *subscriptions;
orte_gpr_trigger_id_t trig_cntr;
ompi_mutex_t mutex;
bool compound_cmd_mode;
orte_buffer_t *compound_cmd;
ompi_mutex_t wait_for_compound_mutex;
ompi_condition_t compound_cmd_condition;
int compound_cmd_waiting;
} orte_gpr_proxy_globals_t;
extern orte_gpr_proxy_globals_t orte_gpr_proxy_globals;
/*
* Compound cmd functions
*/
int orte_gpr_proxy_begin_compound_cmd(void);
int orte_gpr_proxy_stop_compound_cmd(void);
int orte_gpr_proxy_exec_compound_cmd(void);
/*
* Arithmetic operations
*/
int orte_gpr_proxy_increment_value(orte_gpr_value_t *value);
int orte_gpr_proxy_decrement_value(orte_gpr_value_t *value);
/*
* Delete-index functions
*/
int orte_gpr_proxy_delete_segment(char *segment);
int orte_gpr_proxy_delete_segment_nb(char *segment,
orte_gpr_notify_cb_fn_t cbfunc, void *user_tag);
int orte_gpr_proxy_delete_entries(orte_gpr_addr_mode_t mode,
char *segment, char **tokens, char **keys);
int orte_gpr_proxy_delete_entries_nb(
orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens, char **keys,
orte_gpr_notify_cb_fn_t cbfunc, void *user_tag);
int orte_gpr_proxy_index(char *segment, size_t *cnt, char **index);
int orte_gpr_proxy_index_nb(char *segment,
orte_gpr_notify_cb_fn_t cbfunc, void *user_tag);
/*
* Cleanup functions
*/
int orte_gpr_proxy_cleanup_job(orte_jobid_t jobid);
int orte_gpr_proxy_cleanup_proc(orte_process_name_t *proc);
/*
* Put-get functions
*/
int orte_gpr_proxy_put(size_t cnt, orte_gpr_value_t **values);
int orte_gpr_proxy_put_nb(size_t cnt, orte_gpr_value_t **values,
orte_gpr_notify_cb_fn_t cbfunc, void *user_tag);
int orte_gpr_proxy_get(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens, char **keys,
size_t *cnt, orte_gpr_value_t ***values);
int orte_gpr_proxy_get_nb(orte_gpr_addr_mode_t addr_mode,
char *segment, char **tokens, char **keys,
orte_gpr_notify_cb_fn_t cbfunc, void *user_tag);
/*
* Subscribe functions
*/
int orte_gpr_proxy_subscribe(size_t num_subs,
orte_gpr_subscription_t **subscriptions,
size_t num_trigs,
orte_gpr_trigger_t **trigs);
int orte_gpr_proxy_unsubscribe(orte_gpr_subscription_id_t sub_number);
int orte_gpr_proxy_cancel_trigger(orte_gpr_trigger_id_t trig);
/*
* Diagnostic functions
*/
int orte_gpr_proxy_dump_all(int output_id);
int orte_gpr_proxy_dump_segments(int output_id);
int orte_gpr_proxy_dump_triggers(int output_id);
int orte_gpr_proxy_dump_subscriptions(int output_id);
int orte_gpr_proxy_dump_callbacks(int output_id);
int orte_gpr_proxy_dump_notify_msg(orte_gpr_notify_message_t *msg, int output_id);
int orte_gpr_proxy_dump_notify_data(orte_gpr_notify_data_t *data, int output_id);
int orte_gpr_proxy_dump_value(orte_gpr_value_t *value, int output_id);
/*
* General operations
*/
int orte_gpr_proxy_preallocate_segment(char *name, size_t num_slots);
/*
* Functions that interface to the replica
*/
void orte_gpr_proxy_notify_recv(int status, orte_process_name_t* sender,
orte_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata);
/*
* Internal functions
*/
int
orte_gpr_proxy_enter_subscription(size_t cnt, orte_gpr_subscription_t **subscriptions);
int
orte_gpr_proxy_remove_subscription(orte_gpr_subscription_id_t id);
int
orte_gpr_proxy_enter_trigger(size_t cnt, orte_gpr_trigger_t **triggers);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif