From c13951e16e4caec14d387449dd08a87c8e2d837f Mon Sep 17 00:00:00 2001 From: James Campbell Date: Thu, 24 Oct 2024 12:23:19 -0400 Subject: [PATCH] Simplify the code by switching to http for IPC --- Makefile.am | 2 +- pgmon-metrics.cfg | 14 - pgmon-metrics.yml | 39 + pgmon.cfg | 72 -- pgmon.py | 1677 ++++++-------------------------------- pgmon.yml | 21 + pgmon@.service | 8 +- pgmon_templates.yaml | 596 ++++++-------- pgmon_userparameter.conf | 1 - requirements.yml | 3 +- 10 files changed, 559 insertions(+), 1874 deletions(-) delete mode 100644 pgmon-metrics.cfg create mode 100644 pgmon-metrics.yml delete mode 100644 pgmon.cfg create mode 100644 pgmon.yml delete mode 100644 pgmon_userparameter.conf diff --git a/Makefile.am b/Makefile.am index f8c4b33..5e2605c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,2 +1,2 @@ bin_SCRIPTS = pgmon.py -dist_doc_DATA = README ChangeLog pgmon_userparameter.conf pgmon_templates.yaml +dist_doc_DATA = README ChangeLog pgmon_templates.yaml diff --git a/pgmon-metrics.cfg b/pgmon-metrics.cfg deleted file mode 100644 index 5869110..0000000 --- a/pgmon-metrics.cfg +++ /dev/null @@ -1,14 +0,0 @@ -# Discovery metrics -metric=discover_dbs:set::SELECT datname, %s AS agent, %s AS cluster FROM pg_database -metric=discover_rep:set::SELECT client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') AS repid, client_addr, state, %s AS agent, %s AS cluster FROM pg_stat_replication - -# Cluster-wide metrics -metric=version:value::SHOW server_version_num -metric=max_frozen_age:value::SELECT max(age(datfrozenxid)) FROM pg_database - -# Per-database metrics -metric=db_stats:row::SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, blk_read_time, blk_write_time, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = '{datname}' -metric=db_stats:row:140000:SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, COALESCE(checksum_failures, 0) AS checksum_failures, blk_read_time, blk_write_time, session_time, active_time, idle_in_transaction_time, sessions, sessions_abandoned, sessions_fatal, sessions_killed, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = '{datname}' - -# Per-replication metrics -metric=rep_stats:row::SELECT * FROM pg_stat_database WHERE client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') = '{repid}' diff --git a/pgmon-metrics.yml b/pgmon-metrics.yml new file mode 100644 index 0000000..7beff14 --- /dev/null +++ b/pgmon-metrics.yml @@ -0,0 +1,39 @@ +metrics: + # Discovery metrics + discover_dbs: + type: set + query: + 0: SELECT datname FROM pg_database + discover_rep: + type: set + query: + 0: SELECT client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') AS repid, client_addr, state FROM pg_stat_replication + + # cluster-wide metrics + version: + type: value + query: + 0: SHOW server_version_num + max_frozen_age: + type: value + query: + 0: SELECT max(age(datfrozenxid)) FROM pg_database + + # Per-database metrics + db_stats: + type: row + query: + 0: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, blk_read_time, blk_write_time, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(datname)s + 140000: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, COALESCE(checksum_failures, 0) AS checksum_failures, blk_read_time, blk_write_time, session_time, active_time, idle_in_transaction_time, sessions, sessions_abandoned, sessions_fatal, sessions_killed, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(datname)s + + # Per-replication metrics + rep_stats: + type: row + query: + 0: SELECT * FROM pg_stat_database WHERE client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') = '{repid}' + + # Debugging + sleep: + type: value + query: + 0: SELECT now(), pg_sleep(5); diff --git a/pgmon.cfg b/pgmon.cfg deleted file mode 100644 index 33e49d7..0000000 --- a/pgmon.cfg +++ /dev/null @@ -1,72 +0,0 @@ -## -# Misc agent settings -## - -# Where to write/find the agent PID -#pid_file=/tmt/pgmon.pid -pid_file=/run/pgmon/pgmon.pid - -## -# Agent communication settings -## - -# IPC socket -#ipc_socket=/tmp/pgmon.sock -ipc_socket=/run/pgmon/pgmon.sock - -# IPC communication timeout (s) -#ipc_timeout=10 - -# Request processing timeout (s) -#request_timeout=10 - -# Max size of the request queue before it blocks -#request_queue_size=100 - -# Max time to wait when queueing a request (s) -#request_queue_timeout=2 - -## -# Agent resource settings -## - -# Number of worker threads -#worker_count=4 - -## -# Logging settings -## - -# Log level for stderr logging (or 'off') -stderr_log_level=debug - -# Log level for file logging (od 'off') -file_log_level=off - -# Log file -#log_file=pgmon.log - -## -# DB connection settings -# -# Each cluster entry is of the form: -# name:address:port:dbname:user:password -# -# Any element other than the name can be left empty to use the defaults -## - -#cluster=local:/var/run/postgresql:5432:postgres:zbx_monitor: -cluster=pg15:localhost:54315:postgres:postgres: -cluster=pg96:localhost:54396:postgres:postgres: - -# Default database to connect to when none is specified for a metric -#dbname=postgres - -## -# Monitoring configuration -## - -# Metrics -#metrics={} - -include=pgmon-metrics.cfg diff --git a/pgmon.py b/pgmon.py index f86c258..4a64a2c 100755 --- a/pgmon.py +++ b/pgmon.py @@ -1,171 +1,113 @@ #!/usr/bin/env python3 -import argparse -import socket -import sys -import threading -import psycopg2 -import psycopg2.extras -import queue -import os -import signal +import yaml import json +import time + +import argparse import logging +import psycopg +from psycopg_pool import ConnectionPool -# -# Errors -# +import signal +from threading import Thread, Lock, Semaphore -class InvalidConfigError(Exception): - pass -class UnknownClusterError(Exception): - pass -class DuplicateMetricVersionError(Exception): - pass -class UnsupportedMetricVersionError(Exception): - pass -class RequestTimeoutError(Exception): - pass -class DBError(Exception): - pass +from http.server import BaseHTTPRequestHandler, HTTPServer +from http.server import ThreadingHTTPServer +from urllib.parse import urlparse, parse_qs -# -# Logging -# +# Configuration +config = {} -logger = None +# Dictionary of current PostgreSQL connection pools +connections_lock = Lock() +connections = {} -# Store the current log file since there is no public method to get the filename -# from a FileHandler object -current_log_file = None - -# Handler objects for easy adding/removing/modifying -file_log_handler = None -stderr_log_handler = None - -def init_logging(config): - """ - Initialize (or re-initialize/modify) logging - """ - global logger - global current_log_file - global file_log_handler - global stderr_log_handler - - # Get the logger object - logger = logging.getLogger(__name__) - - # Create a common formatter - formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s') - - # Set up or modify stderr logging - if config.stderr_log_level != 'OFF': - # Create and add the handler if it doesn't exist - if stderr_log_handler is None: - stderr_log_handler = logging.StreamHandler() - logger.addHandler(stderr_log_handler) - - # Set the formatter - stderr_log_handler.setFormatter(formatter) - - # Set the log level - level = logging.getLevelName(config.stderr_log_level) - stderr_log_handler.setLevel(level) - else: - if stderr_log_handler is not None: - logger.removeHandler(stderr_log_handler) - stderr_log_handler = None - - # Set up or modify file logging - if config.file_log_level != 'OFF': - # Checck if we're switching files - if file_log_handler is not None and config.log_file != current_log_file: - old_file_logger = file_log_handler - file_log_handler = None - else: - old_file_logger = None - - if config.log_file is not None: - # Create and add the handler if it doesn't exist - if file_log_handler is None: - file_log_handler = logging.FileHandler(config.log_file, encoding='utf-8') - logger.addHandler(file_log_handler) - current_log_file = config.log_file - - # Set the formatter - file_log_handler.setFormatter(formatter) - - # Set the log level - level = logging.getLevelName(config.file_log_level) - file_log_handler.setLevel(level) - - # Remove the old handler if there was one - if old_file_logger is not None: - logger.removeHandler(old_file_logger) - - # Note where logs are being written - #print("Logging to {} ({})".format(config.log_file, config.file_log_level)) - else: - if file_log_handler is not None: - logger.removeHandler(file_log_handler) - file_log_handler = None - - # Set the log level for the logger itself - levels = [] - if stderr_log_handler is not None and stderr_log_handler.level != logging.NOTSET: - levels.append(stderr_log_handler.level) - - if file_log_handler is not None and file_log_handler.level != logging.NOTSET: - levels.append(file_log_handler.level) - - if len(levels) > 0: - logger.setLevel(min(levels)) - else: - # If we have no handlers, just bump the level to the max - logger.setLevel(logging.CRITICAL) - - logger.debug("Logging initialized") - -# -# PID file handling -# -def write_pid_file(pid_file): - if pid_file is not None: - logger.debug("Writing PID file: {}".format(pid_file)) - with open(pid_file, 'w') as f: - f.write("{}".format(os.getpid())) - -def read_pid_file(pid_file): - if pid_file is None: - raise RuntimeError("No PID file specified") - - with open(pid_file, 'r') as f: - return int(f.read().strip()) - -def remove_pid_file(pid_file): - if pid_file is not None: - logger.debug("Removing PID file: {}".format(pid_file)) - os.unlink(pid_file) - - -# -# Global flags for signal handler -# +# Running state (used to gracefully shut down) running = True -reload = False +# The http server object +httpd = None + +# Where the config file lives +config_file = None + +# Configure logging +log = logging.getLogger(__name__) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s') +console_log_handler = logging.StreamHandler() +console_log_handler.setFormatter(formatter) +log.addHandler(console_log_handler) + +# Error types +class ConfigError(Exception): + pass +class DisconnectedError(Exception): + pass + +# Default config settings +default_config = { + # IPC port + 'port': 5400, + + # Max PostgreSQL connection pool size + 'pool_size': 4, + + # Log level for stderr logging (or 'off') + 'log_level': 'debug', + + # Connection details + 'connstr': '', + + # Default database to connect to when none is specified for a metric + 'dbname': 'postgres', + + # Metrics + 'metrics': {} +} + +def read_config(path, included = False): + """ + Read a config file. + + params: + path: path to the file to read + included: is this file included by another file? + """ + # Read config file + log.info(f"Reading log file: {path}") + with open(path, 'r') as f: + cfg = yaml.safe_load(f) + + # Read any included config files + for inc in cfg.get('include', []): + cfg.update(read_config(inc, included=True)) + + # Return the config we read if this is an include, otherwise set the final + # config + if included: + return cfg + else: + new_config = {} + new_config.update(default_config) + new_config.update(cfg) + + # Minor sanity checks + if len(new_config['metrics']) == 0: + log.error("No metrics are defined") + raise ConfigError() + + global config + config = new_config + + # Apply changes to log level + log.setLevel(logging.getLevelName(config['log_level'].upper())) -# -# Signal handler -# def signal_handler(sig, frame): """ Function for handling signals - INT => Shot down - TERM => Shut down - QUIT => Shut down HUP => Reload """ # Restore the original handler @@ -173,1313 +115,174 @@ def signal_handler(sig, frame): # Signal everything to shut down if sig in [ signal.SIGINT, signal.SIGTERM, signal.SIGQUIT ]: - logger.info("Shutting down ...") + log.info("Shutting down ...") global running running = False + if httpd is not None: + httpd.socket.close() # Signal a reload - elif sig == signal.SIGHUP: - logger.info("Reloading config ...") - global reload - reload = True + if sig == signal.SIGHUP: + log.warning("Received config reload signal") + read_config(config_file) -# -# Classes -# - -class Config: +def get_pool(dbname): """ - Agent configuration - - Note: The config is initially loaded before logging is configured, so be - mindful about logging anything in this class. - - Params: - args: (argparse.Namespace) Command line arguments - read_metrics: (bool) Indicate if metrics should be parsed - read_clusters: (bool) Indicate if cluster information should be parsed - - Exceptions: - InvalidConfigError: Indicates an issue with the config file - OSError: Thrown if there's an issue opening a config file - ValueError: Thrown if there is an encoding error + Get a database connection pool. """ - def __init__(self, args, read_metrics = True, read_clusters = True): - # Set defaults - self.pid_file = None # PID file - - self.ipc_socket = 'pgmon.sock' # IPC socket - self.ipc_timeout = 10 # IPC communication timeout (s) - self.request_timeout = 10 # Request processing timeout (s) - self.request_queue_size = 100 # Max size of the request queue before it blocks - self.request_queue_timeout = 2 # Max time to wait when queueing a request (s) - self.worker_count = 4 # Number of worker threads - - self.stderr_log_level = 'INFO' # Log level for stderr logging (or 'off') - self.file_log_level = 'INFO' # Log level for file logging (od 'off') - self.log_file = None # Log file - - self.metrics = {} # Metrics - self.clusters = {} # Known clusters - - # Store the commandline args to be used when reloading the config - self.args = args - - # Check if we have a config file - self.config_file = args.config - - # Read config - if self.config_file is not None: - self.read_file(self.config_file, read_metrics, read_clusters) - - # Override anything that was specified on the command line - if args.pidfile is not None: - self.pid_file = args.pidfile - if args.logfile is not None: - self.log_file = args.logfile - if args.socket is not None: - self.ipc_socket = args.socket - if args.verbose: - self.stderr_log_level = 'DEBUG' + if dbname not in connections: + with connections_lock: + # Make sure nobody created the pool while we were waiting on the + # lock + if dbname not in connections: + log.info(f"Creating connection pool for: {dbname}") + connections[dbname] = ConnectionPool( + min_size=0, max_size=config['pool_size'], + conninfo=f"dbname={dbname} {config['connstr']}") + return connections[dbname] - def read_file(self, config_file, read_metrics, read_clusters): - """ - Read a config file, possibly skipping metrics and clusters to lighten - the load on the agent. +def get_query(metric, version): + """ + Get the correct metric query for a given version of PostgreSQL. - Params: - config_file: (str) Path to the config file - read_metrics: (bool) Indicate if metrics should be parsed - read_clusters: (bool) Indicate if cluster information should be parsed + params: + metric: The metric definition + version: The PostgreSQL version number, as given by server_version_num + """ + # Select the correct query + for v in reversed(sorted(metric['query'].keys())): + if version >= v: + return metric['query'][v] - Exceptions: - InvalidConfigError: Indicates an issue with the config file - """ + raise Exception('Missing metric query') + + +def run_query_no_reconnect(pool, return_type, query, args): + with pool.connection() as conn: try: - with open(config_file, 'r') as f: - for line in f: - # Clean up any whitespace at either end - line = line.strip() - - # Skip empty lines and comments - if line.startswith('#'): - continue - elif line == '': - continue - - # Separate the line into a key-value pair and clean up extra - # white space that may have been around the '=' - (key, value) = [x.strip() for x in line.split('=', 1)] - if value is None: - raise InvalidConfigError("{}: {}", config_file, line) - - # Handle each key appropriately - if key == 'include': - print("Including file: {}".format(value)) - self.read_file(value, read_metrics, read_clusters) - elif key == 'pid_file': - self.pid_file = value - elif key == 'ipc_socket': - self.ipc_socket = value - elif key == 'ipc_timeout': - self.ipc_timeout = float(value) - elif key == 'request_timeout': - self.request_timeout = float(value) - elif key == 'request_queue_size': - self.request_queue_size = int(value) - elif key == 'request_queue_timeout': - self.request_queue_timeout = float(value) - elif key == 'worker_count': - self.worker_count = int(value) - elif key == 'stderr_log_level': - self.stderr_log_level = value.upper() - elif key == 'file_log_level': - self.file_log_level = value.upper() - elif key == 'log_file': - self.log_file = value - elif key == 'cluster': - if read_clusters: - self.add_cluster(value) - elif key == 'metric': - if read_metrics: - print("Adding metric: {}".format(value)) - self.add_metric(value) - else: - raise InvalidConfigError("WARNING: Unknown config option: {}".format(key)) - except OSError as e: - raise InvalidConfigError("Failed to open/read config file: {}".format(e)) - except ValueError as e: - raise InvalidConfigError("Encoding error in config file: {}".format(e)) - - def add_cluster(self, cluster_def): - """ - Parse and add connection information about a cluster to the config. - - Each cluster line is of the format: - :[address]:[port]:[dbname]:[user]:password - - The name is a unique, arbitrary identifier to associate this cluster - with a request from the monitoring agent. - - The address can be an IP address, host name, or path the the directory - where a PostgreSQL socket exists. - - The dbname field is the default database to connect to for metrics - which don't specify a database. This is also used to identify the - PostgreSQL version. - - Default values replace empty conponents, except for the name field: - address: /var/run/postgresql - port: 5432 - dbname: postgres - user: postgres - password: None - - Params: - cluster_def: (str) Cluster definition string - - Exceptions: - InvalidConfigError: Thrown if the cluster entry is missing any fields - or contains invalid content - """ - # Split up the fields - try: - (name, address, port, dbname, user, password) = cluster_def.split(':') - except ValueError: - raise InvalidConfigError("Missing fields in cluster definition: {}".format(cluster_def)) - - # Make sure we have a name - if name == '': - raise InvalidConfigError("Cluster must have a name: {}".format(cluster_def)) - - # Set defaults for anything that's blank - if address == '': - address = '/var/run/postgresql' - - if port == '': - port = 5432 - else: - # Convert the port to a number here - try: - port = int(port) - except ValueError: - raise InvalidConfigError("Invalid port number: {}".format(port)) - - if dbname == '': - dbname = 'postgres' - - if user == '': - user = 'postgres' - - if password == '': - password = None - - # Create and add the cluster object - self.clusters[name] = Cluster(name, address, port, dbname, user, password) - - def add_metric(self, metric_def): - """ - Parse and add a metric or metric version to the config. - - Each metric definition is of the form: - :[return type]:[PostgreSQL version]: - - The name is an identifier used to reference this metric from the - monitoring server. - - The return type indicates how to format the results. Possible values - are: - value: A single value is returned (first column of first record) - column: A list is returned containing the first column of all records - set: All records are returned as a list of dicts - - The version field is the first version of PostgreSQL for which this - query is valid. - - The sql field if either the SQL to execute, or a string of the form: - file: - where is the path to a file containing the SQL. In either case, - the SQL can contain references to parameters that are to be passed to - PostgreSQL. These are substituted using python's format command, so - variable names should be enclosed in curly brackets like {foo} - - Params: - metric_def: (str) Metric definition string - - Exceptions: - InvalidConfigError: Thrown if the metric entry is missing any fields - or contains invalid content - """ - # Split up the fields - try: - (name, ret_type, version, sql) = metric_def.split(':', 3) - except ValueError: - raise InvalidConfigError("Missing fields in metric definition: {}".format(metric_def)) - - # Make sure we have a name and some SQL - if name == '': - raise InvalidConfigError("Missing name for metric: {}".format(metric_def)) - - # An empty SQL query indicates a metric is not suported after a particular version - if sql == '': - sql = None - - # If the sql is in a separate file, read it in - try: - if sql.startswith('file:'): - (_,path) = sql.split(':', 1) - with open(path, 'r') as f: - sql = f.read() - except OSError as e: - raise InvalidConfigError("Failed to open/read SQL file: {}".format(e)) - except ValueError as e: - raise InvalidConfigError("Encoding error in SQL file: {}".format(e)) - - # If no starting version is given, set it to 0 - if version == '': - version = 0 - - # Find or create the metric - try: - metric = self.metrics[name] - except KeyError: - metric = Metric(name, ret_type) - self.metrics[name] = metric - - # Add what was given as a version of the metric - metric.add_version(int(version), sql) - - def get_pg_version(self, cluster_name): - """ - Return the version of PostgreSQL running on the specified cluster. The - version is cached after the first successful retrieval. - - Params: - cluster_name: (str) The identifier for the cluster, as defined in the - config file - - Returns: - The version using PostgreSQL's integer format (Mmmpp or MM00mm) - - Exceptions: - UnknownClusterError: Thrown if the named cluster is not defined - DBError: A database error occurred - """ - # TODO: Move this out of the Config class. Possibly move the code - # to the DB class, while storing the value in the Cluster object. - - # Find the cluster - try: - cluster = self.clusters[cluster_name] - except KeyError: - raise UnknownClusterError(cluster_name) - - # Query the cluster if we don't already know the version - # TODO: Expire this value at some point to pick up upgrades. - if cluster.pg_version is None: - db = DB(self, cluster_name) - cluster.pg_version = int(db.query('SHOW server_version_num')[0]['server_version_num']) - db.close() - logger.debug("Got cluster version: {}".format(cluster.pg_version)) - - # Return the version number - return cluster.pg_version - - -class Cluster: - """ - Connection information for a PostgreSQL cluster - - Params: - name: A unique, arbitrary identifier to associate this cluster with a - request from the monitoring agent. - address: An IP address, host name, or path the the directory where a - PostgreSQL socket exists. - port: The database port number. - dbname: Default database to connect to for metrics which don't specify a - database. This is also used to identify the PostgreSQL version. - user: The database user to connect as. - password: The password to use when connecting to the database. Leave - this blank to use either no password or a password stored in - the executing user's ~/.pgpass file. - """ - def __init__(self, name, address, port, dbname, user, password): - self.name = name - self.address = address - self.dbname = dbname - self.port = port - self.user = user - self.password = password - - # Dynamically acquired PostgreSQL version - self.pg_version = None - - -class DB: - """ - Database access - - Params: - config: The agent config - cluster_name: The name of the cluster to connect to - dbname: A database to connect to, or None to use the default - - Exceptions: - UnknownClusterError: Thrown if the named cluster is not defined - DBError: Thrown for any database related error - """ - def __init__(self, config, cluster_name, dbname=None): - logger.debug("Creating connection to cluster: {}".format(cluster_name)) - - # Find the named cluster - try: - cluster = config.clusters[cluster_name] - except KeyError: - raise UnknownClusterError(cluster_name) - - # Use the default database if not given one - if dbname is None: - logger.debug("Using default database: {}".format(cluster.dbname)) - dbname = cluster.dbname - - # Connect to the database - try: - self.conn = psycopg2.connect( - host = cluster.address, - port = cluster.port, - user = cluster.user, - password = cluster.password, - dbname = dbname); - except Exception as e: - raise DBError("Failed to connect to the database: {}".format(e)) - - # Make the connection readonly and enable autocommit - try: - self.conn.set_session(readonly=True, autocommit=True) - except Exception as e: - raise DBError("Failed to set session parameters: {}".format(e)) - - def query(self, sql, args=[]): - """ - Execute a query in the database. - - Params: - sql: (str) The SQL statement to execute - args: (list) List of positional arguments for the query - - Exceptions: - DBError: Thrown for any database related error - """ - logger.debug("Executuing query: {}".format(sql)) - try: - with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute(sql, args) - return(cur.fetchall()) - except Exception as e: - raise DBError("Failed to execute query: ".format(e)) - - def close(self): - logger.debug("Closing db connection") - try: - if self.conn is not None: - self.conn.close() - except psycopg2.Error as e: - logger.warning("Caught an error when closing a connection: {}".format(e)) - self.conn = None - - -class Request: - """ - A metric request - - Params: - cluster_name: (str) The name of the cluster the request is for - metric_name: (str) The name of the metric to obtain - args: (dict) Dictionary of arguments for the metric - """ - def __init__(self, cluster_name, metric_name, args = {}): - self.cluster_name = cluster_name - self.metric_name = metric_name - self.args = args - self.result = None - - # Add a lock to indicate when the request is complete - self.complete = threading.Lock() - self.complete.acquire() - - def set_result(self, result): - """ - Set the result for the metric, and release the lock that allows the - result to be returned to the client. - """ - # Set the result value - self.result = result - - # Release the lock - self.complete.release() - - def get_result(self, timeout = -1): - """ - Retrieve the result for the metric. This will wait for a result to be - available. If timeout is >0, a RequestTimeoutError exception will be - thrown if the specified number of seconds elapses. - - Params: - timeout: (float) Number of seconds to wait before timing out - """ - # Wait until the request has been completed - if self.complete.acquire(timeout = timeout): - return self.result - else: - raise RequestTimeoutError() - - -class Metric: - """ - A metric - - The return_type parameter controls how the results will be formatted when - returned to the client. Possible values are: - value: Return a single value - column: Resturn a list conprised of the first column of the query results - set: Return a list of dictionaries representing the queried records - - Params: - name: (str) The name to associate with the metric - ret_type: (str) The return type of the metric (value, column, or set) - """ - def __init__(self, name, ret_type): - self.name = name - self.versions = {} - self.type = ret_type - - # Place holders for the query cache - self.cached = {} - - def add_version(self, pg_version, sql): - """ - Add a versioned query for the metric - - Params: - pg_version: (int) The first version number for which this query - applies, or 0 for any version - sql: (str) The SQL to execute for the specified version - """ - - # Check if we already have SQL for this version - if pg_version in self.versions: - raise DuplicateMetricVersionError - - # Add the versioned SQL - self.versions[pg_version] = MetricVersion(sql) - - def get_version(self, pg_version): - """ - Get an apropriate SQL query for the viven PostgreSQL version - - Params: - pg_version: (int) The version of PostgreSQL for which to find a query - """ - # Since we're usually going to keep asking for the same version(s), - # we cache the metric version the first time we need it. - if pg_version not in self.cached: - self.cached[pg_version] = None - # Search through the cache starting from the lowest version until - # we find a supporting version - for v in reversed(sorted(self.versions.keys())): - if pg_version >= v: - self.cached[pg_version] = self.versions[v] - break - # If we didn't find a query, or the query is None (ie: the metric is - # no longer supported for this version), throw an exception - if pg_version not in self.cached or self.cached[pg_version] is None: - raise UnsupportedMetricVersionError - - # Return the cached version - return self.cached[pg_version] - - -class MetricVersion: - """ - A version of a metric - - The query is formatted using str.format with a dictionary of variables, - so you can use tings like {table} in the template, then pass table=foo when - retrieving the SQL and '{table}' will be replaced with 'foo'. - - Note: Only minimal SQL injection checking is done on this. Basically we - just throw an error if any substitution value contains an apostrophe - or a semicolon - - Params: - sql: (str) The SQL query template - """ - def __init__(self, sql): - self.sql = sql - - def get_sql(self, args): - """ - Return the SQL for this version after substituting the provided - variables. - - Params: - args: (dict) Dictionary of formatting substitutions to apply to the - query template - - Exceptions: - SQLInjectionError: if any of the values to be substituted contain an - apostrophe (') or semicolon (;) - """ - # Check for possible SQL injection attacks - for v in args.values(): - if "'" in v or ';' in v: - raise SQLInjectionError() - - # Format and return the SQL - return self.sql.format(**args) - - -class IPC: - """ - IPC handler for communication between the agent and server components - - Params: - config: (Config) The agent configuration object - mode: (str) Which side of the communication setup this is (agent|server) - - Exceptions: - RuneimeError: if the IPC object's mode is invalid - """ - def __init__(self, config, mode): - # Validate the mode - if mode not in [ 'agent', 'server' ]: - raise RuntimeError("Invalid IPC mode: {}".format(self.mode)) - - self.config = config - self.mode = mode - - # Set up the connection - try: - self.initialize() - except Exception as e: - logger.debug("IPC Initialization error: {}".format(e)) - raise - - def initialize(self): - """ - Initialize the socket - - Exceptions: - OSError: if the socket file already exists and could not be removed - RuntimeError: if the IPC object's mode is invalid - """ - logger.debug("Initializing IPC") - if self.mode == 'server': - # Try to clean up the socket if it exists - try: - logger.debug("Unlinking any former socket") - os.unlink(self.config.ipc_socket) - except OSError: - logger.debug("Caught an exception unlinking socket") - if os.path.exists(self.config.ipc_socket): - logger.debug("Socket stilll exists") - raise - logger.debug("No socket to unlink") - - # Create the socket - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - - # Set a timeout for accepting connections to be able to catch signals - self.socket.settimeout(1) - - # Bind the socket and start listening - logger.debug("Binding socket: {}".format(self.config.ipc_socket)) - self.socket.bind(self.config.ipc_socket) - logger.debug("Listening on socket") - self.socket.listen(1) - - elif self.mode == 'agent': - # Connect to the socket - self.conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - - # Set the timeout for the agent side of the connection - self.conn.settimeout(self.config.ipc_timeout) - - else: - raise RuntimeError("Invalid IPC mode: {}".format(self.mode)) - - logger.debug("IPC initialization complete") - - def connect(self): - """ - Establish a connection to a socket (agent mode) - - Establish and return a connection to the socket - - Returns: - The connected socket object - - Exceptions: - TimeoutError: if establishing the connection times out - """ - # Connect to the socket - self.conn.connect(self.config.ipc_socket) - - # Return the socket from this IPC object - return self.conn - - def accept(self): - """ - Accept a connection (server) - - Wait for, accept, and return a connection to the socket - - Returns: - The accepted socket object - - Exceptions: - TimeoutError: if no connection is accepted before the timeout - """ - # Accept the connection - (conn, _) = self.socket.accept() - - # Set the timeout on the new socket object - conn.settimeout(self.config.ipc_timeout) - - # Return the new socket object - return conn - - def send(self, conn, msg): - """ - Send a message across the IPC channel - - The message is encoded in UTF-8, and prefixed with a 4 byte, big endian - unsigned integer - - Params: - conn: (Socket) the connection to use - msg: (str) The message to send - - Exceptions: - TimeoutError: if no connection times out before finishing sending the - message - OverflowError: if the size of the message exceeds what fits in a four - byte, unsigned integer - UnicodeError: if the message is not UTF-8 encodable - """ - # Encode the message as bytes - msg_bytes = msg.encode("utf-8") - - # Get the byte length of the message - msg_len = len(msg_bytes) - - # Encode the message length - msg_len_bytes = msg_len.to_bytes(4, byteorder='big') - - # Send the size and message bytes - conn.sendall(msg_len_bytes + msg_bytes) - - def recv(self, conn): - """ - Receive a message across the IPC channel - - The message is encoded in UTF-8, and prefixed with a 4 byte, big endian - unsigned integer - - Params: - conn: (Socket) the connection to use - - Returns: - The received message - - Exceptions: - TimeoutError: if no connection times out before finishing receiving - the whole message - UnicodeError: if the message is not UTF-8 decodable - """ - # Read at least the length - buffer = [] - msg_len = -4 - while msg_len < 0: - buffer += conn.recv(1024) - msg_len = len(buffer) - 4 - - # Pull out the bytes for the length - msg_len_bytes = buffer[:4] - - # Convert the size bytes to an unsigned integer - msg_len = int.from_bytes(msg_len_bytes, byteorder='big') - - # Finish reading the message if we don't have all of it - while len(buffer) < msg_len + 4: - buffer += conn.recv(1024) - - # Decode and return the message - return bytes(buffer[4:]).decode("utf-8") - - -class Agent: - """ - The agent side of the connector - - This mode is entended to be called byt the monitoring agent and - communicates with the server side of the connector. - - The key is a comma separated string formatted as: TODO - - Results are printed to stdout. - - Params: - args: (argparse.Namespace) Command line arguments - key: (str) The key indicating the requested metric - """ - @staticmethod - def run(args, key): - # Read the agent config - config = Config(args, read_metrics = False, read_clusters = False) - - init_logging(config) - - # Connect to the IPC socket - ipc = IPC(config, 'agent') - try: - conn = ipc.connect() - - # Send a request - ipc.send(conn, key) - - # Wait for a response - res = ipc.recv(conn) - - except Exception as e: - print("IPC error: {}".format(e)) - sys.exit(1) - - # Output the response - print(res) - - -class Server: - """ - The server side of the connector - - Params: - args: (argparse.Namespace) Command line arguments - - Exceptions: - - """ - - def __init__(self, args): - # Note the path to the config file so it can be reloaded - self.config_file = args.config - - # Load config - self.config = Config(args) - - # Write pid file - # Note: we record the PID file here so it can't be changed with reload - self.pid_file = self.config.pid_file - write_pid_file(self.pid_file) - - # Initialize logging - init_logging(self.config) - - # Set up the signal handler - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGHUP, signal_handler) - - # Create reqest queue - logger.debug("Creating request queue") - self.req_queue = queue.Queue(self.config.request_queue_size) - - # Spawn worker threads - logger.debug("Spawning worker threads") - self.workers = self.spawn_workers(self.config) - - def spawn_workers(self, config): - """ - Spawn all worker threads to process requests - - Params: - config: (Config) The agent config object - """ - logger.info("Spawning {} workers".format(config.worker_count)) - - # Spawn worker threads - workers = [None] * config.worker_count - for i in range(config.worker_count): - workers[i] = Worker(config, self.req_queue) - workers[i].start() - logger.debug("Started thread #{} (tid={})".format(i, workers[i].native_id)) - - # Return the list of worker threads - return workers - - def retire_workers(self, workers): - """ - Retire (terminate) all worker threads in the given list of threads - - Params: - workers: (list) List of worker threads to part ways with - """ - logger.info("Retiring {} workers".format(len(workers))) - - # Inform the workers that their services are no longer required - for worker in workers: - worker.active = False - - # Wait for the workers to turn in their badges - for worker in workers: - worker.join() - - def reload_config(self): - """ - Reload the config and return a new config object, and spawn new worker - threads - - Exceptions: - InvalidConfigError: Indicates an issue with the config file - OSError: Thrown if there's an issue opening a config file - ValueError: Thrown if there is an encoding error when reading the - config - """ - # Clear the reload flag - global reload - reload = False - - # Read new config - new_config = Config(self.config.args) - - # Re-init logging in case settings changed - init_logging(new_config) - - # Spawn new workers - new_workers = self.spawn_workers(new_config) - - # Replace workers - old_workers = self.workers - self.workers = new_workers - - # Retire old workers - self.retire_workers(old_workers) - - # Adjust other settings - # TODO - - # Set the new config as the one the server will use - self.config = new_config - - def run(self): - """ - Run the server's main loop - """ - logger.info("Server starting") - - # Listen on ipc socket - ipc = IPC(self.config, 'server') - - # Enter the main loop - while True: - # Wait for a request connection - try: - logger.debug("Waiting for a connection") - conn = ipc.accept() - except socket.timeout: - conn = None - - # See if we should exit - if not running: - break - - # Check if we're supposed to reload the config - if reload: - try: - self.reload_config() - except Exception as e: - logger.error("Reload failed: {}".format(e)) - - # If we just timed out waiting for a request, go back to waiting - if conn is None: - continue - - # Get the request string (csv) - try: - key = ipc.recv(conn) - except socket.timeout: - # Handle timeouts when receiving the request - logger.warning("IPC communication timeout receiving request") - conn.close() - continue - - # Parse ipc request (csv) - logger.debug("Parsing request key: {}".format(key)) - try: - # Split the key into a cluster name, metric name, and list of - # metric arguments - parts = key.split(',', 3) - agent_name = parts[0] - cluster_name = parts[1] - metric_name = parts[2] - - # Parse any metric arguments into a dictionary - args_dict = {'agent': agent_name, 'cluster': cluster_name} - if len(parts) > 3: - for arg in parts[3].split(','): - if arg != '': - (k, v) = arg.split('=', 1) - args_dict[k] = v - except Exception as e: - # Handle problems parsing the request into its elements - logger.warning("Received invalid request '{}': {}".format(key, e)) - ipc.send(conn, "ERROR: Invalid key") - conn.close() - continue - - # Create request object - req = Request(cluster_name, metric_name, args_dict) - - # Queue the request - try: - self.req_queue.put(req, timeout=self.config.request_queue_timeout) - except queue.Full: - # Handle situations where the queue is full and we didn't get - # a free slot before the configured timeout - logger.warning("Failed to queue request, queue is full") - req.set_result("ERROR: Enqueue timeout") - continue - - # Spawn a thread to wait for the result - r = Responder(self.config, ipc, conn, req) - r.start() - - # Join worker threads - self.retire_workers(self.workers) - - # Clean up the PID file - remove_pid_file(self.pid_file) - - # Be polite - logger.info("Good bye") - - # Gracefully shut down logging - logging.shutdown() - - -class Worker(threading.Thread): - """ - Worker thread that processes requests (ie: queries the database) - - Params: - config: (Config) The agent config object - queue: (Queue) The request queue the worker should pull requests from - """ - def __init__(self, config, queue): - super(Worker, self).__init__() - - self.config = config - self.queue = queue - - self.active = True - - def run(self): - """ - Main processing loop for a worker thread - """ - while True: - # Wait for a request - try: - req = self.queue.get(timeout=1) - except queue.Empty: - req = None - - # Check if we're supposed to exit - if not self.active: - # If we got a request, try to put it back on the queue - if req is not None: - try: - queue.put(req, timeout=1) - logger.info("Requeued request at worker exit") - except: - logger.warning("Failed to requeue request at worker exit") - req.set_result("ERROR: Failed to requeue at Worker exit") - - logger.info("Worker exiting: tid={}".format(self.native_id)) - break - - # If we got here because we waited too long for a request, go back to waiting - if req is None: - continue - - # Find the requested metrtic - try: - metric = self.config.metrics[req.metric_name] - except KeyError: - req.set_result("ERROR: Unknown metric: {}".format(req.metric_name)) - continue - - # Get the DB version - try: - pg_version = self.config.get_pg_version(req.cluster_name) - except Exception as e: - req.set_result("Failed to retrieve database version for cluster: {}".format(req.cluster_name)) - logger.error("Failed to get Postgresql version for cluster {}: {}".format(req.cluster_name, e)) - continue - - # Get the query to use - try: - mv = metric.get_version(pg_version) - except UnsupportedMetricVersionError: - # Handle unsuported metric versions - req.set_result("Unsupported PosgreSQL version for metric") - continue - - # Identify the database to query - try: - dbname = req.args['db'] - except KeyError: - dbname = None - - # Get any positional query args - try: - pos_args = req.args['pos'].split(':') - logger.debug("Found positional args for {}: {}".format(req.metric_name, ','.join(pos_args))) - except KeyError: - pos_args = [] - logger.debug("No positional args found for {}".format(req.metric_name)) - - # Query the database - try: - db = DB(self.config, req.cluster_name, dbname) - res = db.query(mv.get_sql(req.args), pos_args) - db.close() - except Exception as e: - # Handle database errors - logger.error("Database error: {}".format(e)) - - # Make sure the database connectin is closed (ie: if the query timed out) - try: - db.close() - except: - pass - - req.set_result("Failed to query database") - continue - - # Set the result on the request - if metric.type == 'value': - if len(res) == 0: - req.set_result("Empty result set") - else: - req.set_result("{}".format(list(res[0].values())[0])) - elif metric.type == 'row': - logger.debug("Serializing row: {}".format(res[0])) - req.set_result(json.dumps(res[0])) - elif metric.type == 'column': - req.set_result(json.dumps([list(r.values())[0] for r in res])) - elif metric.type == 'set': - req.set_result(json.dumps(res)) - - -class Responder(threading.Thread): - """ - Thread responsible for replying to requests - - Params: - config: (Config) The agent config object - ipc: (IPC) The IPC object used for communication - conn: (Socket) The connected socket to communicate with - req: (Request) The request object to handle - """ - def __init__(self, config, ipc, conn, req): - super(Responder, self).__init__() - - # Wait for a result - result = req.get_result() - - # Send the result back to the client - try: - ipc.send(conn, result) - except Exception as e: - logger.warning("Failed to reply to agent: {}".format(e)) - - -def main(): - # Set up command line argument parser - parser = argparse.ArgumentParser( - prog='pgmon', - description='A briidge between monitoring tools and PostgreSQL') - - # General options - parser.add_argument('-c', '--config', default=None) - parser.add_argument('-v', '--verbose', action='store_true') - - parser.add_argument('-s', '--socket', default=None) - parser.add_argument('-l', '--logfile', default=None) - parser.add_argument('-p', '--pidfile', default=None) - - # Operational mode - parser.add_argument('-S', '--server', action='store_true') - parser.add_argument('-r', '--reload', action='store_true') - - # Agent options - parser.add_argument('key', nargs='?') - - # Parse command line arguments - args = parser.parse_args() - - if args.server: - # Try to start running in server mode - try: - server = Server(args) - except Exception as e: - sys.exit("Failed to start server: {}".format(e)) - - try: - server.run() - except Exception as e: - sys.exit("Caught an unexpected runtime error: {}".format(e)) - - elif args.reload: - # Try to signal a running server to reload its config - try: - config = Config(args, read_metrics = False) - except Exception as e: - sys.exit("Failed to read config file: {}".format(e)) - - # Read the PID file - try: - pid = read_pid_file(config.pid_file) - except Exception as e: - sys.exit("Failed to read PID file: {}".format(e)) - - # Signal the server to reload - try: - os.kill(pid, signal.SIGHUP) + with conn.cursor(row_factory=psycopg.rows.dict_row) as curs: + curs.execute(query, args) + res = curs.fetchall() + + if return_type == 'value': + return str(list(res[0].values())[0]) + elif return_type == 'row': + return json.dumps(res[0]) + elif return_type == 'column': + return json.dumps([list(r.values())[0] for r in res]) + elif return_type == 'set': + return json.dumps(res) except: - sys.exit("Failed to signal server: {}".format(e)) - else: - # Start running in agent mode - Agent.run(args, args.key) + if conn.broken: + raise DisconnectedError() + else: + raise +def run_query(pool, return_type, query, args): + # If we get disconnected, I think the putconn command will close the dead + # connection. So we can just give it another shot. + try: + return run_query_no_reconnect(pool, return_type, query, args) + except DisconnectedError: + log.warning("Stale PostgreSQL connection found ... trying again") + time.sleep(1) + return run_query_no_reconnect(pool, return_type, query, args) + +class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): + def log_request(self, code='-', size='-'): + # Override to suppress logging + pass + + def do_GET(self): + # Parse the URL + parsed_path = urlparse(self.path) + name = parsed_path.path.strip('/') + parsed_query = parse_qs(parsed_path.query) + + # Note: Parse_qs returns the values as a list. Since we always expect + # single values, just grab the first from each. + args = {key: values[0] for key, values in parsed_query.items()} + + # Get the metric definition + try: + metric = config['metrics'][name] + except KeyError: + log.error(f"Unknown metric: {name}") + self.wfile.write(bytes('Unknown metric', 'utf-8')) + return + + # Get the dbname + dbname = args.get('dbname', config['dbname']) + + # Get the connection pool for the database + pool = get_pool(dbname) + + # Identify the PostgreSQL version + try: + version = int(args['vers']) + except KeyError: + try: + version = int(run_query(pool, 'value', 'SHOW server_version_num', None)) + except Exception as e: + log.error(f"Failed to get PostgreSQL version: {e}") + self._reply(500, 'Error getting DB version') + return + + # Get the query version + try: + query = get_query(metric, version) + except: + log.error(f"Failed to find a version of {name} for {version}") + self._reply(404, 'Unsupported version') + return + + # Execute the quert + try: + self._reply(200, run_query(pool, metric['type'], query, args)) + except Exception as e: + log.error(f"Error running query: {e}") + self._reply(500, "Error running query") + + def _reply(self, code, content): + self.send_response(code) + self.send_header('Content-type', 'application/json') + self.end_headers() + + self.wfile.write(bytes(content, 'utf-8')) if __name__ == '__main__': - main() + # Handle cli args + parser = argparse.ArgumentParser( + prog = 'pgmon', + description='A PostgreSQL monitoring agent') + parser.add_argument('config_file', default='pgmon.yml', nargs='?', + help='The config file to read (default: %(default)s)') -## -# Tests -## -class TestConfig: - pass + args = parser.parse_args() + + # Set the config file path + config_file = args.config_file -class TestDB: - pass + # Read the config file + read_config(config_file) -class TestRequest: - def test_request_creation(self): - # Create result with no args - req1 = Request('c1', 'foo', {}) - assert req1.cluster_name == 'c1' - assert req1.metric_name == 'foo' - assert len(req1.args) == 0 - assert req1.complete.locked() + # Set up the http server to receive requests + server_address = ('127.0.0.1', config['port']) + httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) - # Create result with args - req2 = Request('c1', 'foo', {'arg1': 'value1', 'arg2': 'value2'}) - assert req2.metric_name == 'foo' - assert len(req2.args) == 2 - assert req2.complete.locked() + # Set up the signal handler + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGHUP, signal_handler) - def test_request_lock(self): - req1 = Request('c1', 'foo', {}) - assert req1.complete.locked() - req1.set_result('blah') - assert not req1.complete.locked() - assert 'blah' == req1.get_result() + # Handle requests. + log.info(f"Listening on port {config['port']}...") + while running: + httpd.handle_request() - - -class TestMetric: - def test_metric_creation(self): - # Test basic creation - m1 = Metric('foo', 'value') - assert m1.name == 'foo' - assert len(m1.versions) == 0 - assert m1.type == 'value' - assert m1.cached is None - assert m1.cached_version is None - - def test_metric_add_version(self): - # Test adding versions - m1 = Metric('foo', 'value') - assert len(m1.versions) == 0 - m1.add_version(0, 'default') - assert len(m1.versions) == 1 - m1.add_version(120003, 'v12.3') - assert len(m1.versions) == 2 - - # Make sure added versions are correct - assert m1.versions[0].sql == 'default' - assert m1.versions[120003].sql == 'v12.3' - - def test_metric_get_version(self): - # Test retrieving metric versions - m1 = Metric('foo', 'value') - m1.add_version(100000, 'v10.0') - m1.add_version(120000, 'v12.0') - - # Make sure cache is initially empty - assert m1.cached is None - assert m1.cached_version is None - - assert m1.get_version(110003).sql == 'v10.0' - - # Make sure cache is set - assert m1.cached is not None - assert m1.cached_version == 110003 - - # Make sure returned value changes with version - assert m1.get_version(120000).sql == 'v12.0' - assert m1.get_version(150005).sql == 'v12.0' - - # Make sure an error is thrown when no version matches - with pytest.raises(UnsupportedMetricVersionError): - m1.get_version(90603) - - # Add a default version - m1.add_version(0, 'default') - assert m1.get_version(90603).sql == 'default' - assert m1.get_version(110003).sql == 'v10.0' - assert m1.get_version(120000).sql == 'v12.0' - assert m1.get_version(150005).sql == 'v12.0' - -class TestMetricVersion: - def test_metric_version_creation(self): - mv1 = MetricVersion('test') - assert mv1.sql == 'test' - - def test_metric_version_templating(self): - mv1 = MetricVersion('foo') - assert mv1.get_sql({}) == 'foo' - - mv2 = MetricVersion('foo {a1} {a3} {a2}') - assert mv2.get_sql({'a1': 'bar', 'a2': 'blah blah blah', 'a3': 'baz'}) == 'foo bar baz blah blah blah' - - -class TestIPC: - pass - -class TestAgent: - pass - -class TestServer: - pass - -class TestWorker: - pass - -class TestResponder: - pass + # Clean up PostgreSQL connections + # TODO: Improve this ... not sure it actually closes all the connections cleanly + for pool in connections.values(): + pool.close() diff --git a/pgmon.yml b/pgmon.yml new file mode 100644 index 0000000..0ae5c4d --- /dev/null +++ b/pgmon.yml @@ -0,0 +1,21 @@ +# IPC port +port: 5400 + +# Max PostgreSQL connection pool size +#pool_size: 4 + +# Log level for stderr logging (or 'off') +log_level: debug + +# Connection string (excluding dbname) +# This can be left empty to use the libpq defaults +connstr: "user=postgres" + +# Default database to connect to when none is specified for a metric +#dbname: postgres + +# Metrics +#metrics: {} + +include: + - pgmon-metrics.yml diff --git a/pgmon@.service b/pgmon@.service index 3dc04c9..420b124 100644 --- a/pgmon@.service +++ b/pgmon@.service @@ -5,12 +5,10 @@ Description=PostgreSQL Monitoring Bridge After=network.target [Service] -ExecStart=/usr/local/bin/pgmon.py --server --config /etc/pgmon/%i.cfg -ExecReload=/user/local/bin/pgmon.py --reload --config /etc/pgmon/%i.cfg -RuntimeDirectory=pgmon -RuntimeDirectoryMode=0755 -PIDFile=/run/pgmon/%i.pid +ExecStart=/usr/local/bin/pgmon.py /etc/pgmon/%i.cfg +ExecReload=kill -HUP $MAINPID Restart=on-failure +Type=exec [Install] WantedBy=multi-user.target diff --git a/pgmon_templates.yaml b/pgmon_templates.yaml index c56d9a0..9f35470 100644 --- a/pgmon_templates.yaml +++ b/pgmon_templates.yaml @@ -1,57 +1,63 @@ zabbix_export: - version: '6.4' + version: '7.0' template_groups: - uuid: b9390195ecad4986968746a2a9b56354 name: 'My Templates' templates: - - uuid: f93f2a3535a74718b28ee81cf88cdab9 + - uuid: 5e4ccbbfed424b968a4b7fc22bd4ca1e template: 'PostgreSQL by pgmon' name: 'PostgreSQL by pgmon' description: 'Monitor PostgreSQL using pgmon' groups: - name: 'My Templates' items: - - uuid: 818721b38324495a949cfe8c10a00eec + - uuid: 8706eccb7edc4fa394f552fc31f401a9 name: 'Max Frozen XID Age' - key: 'pgmon[{$AGENT_NAME},{$CLUSTER},max_frozen_age]' + key: 'web.page.get[localhost,/max_frozen_age,{$AGENT_PORT}]' + history: 90d + trends: '0' description: 'Maximum age of any frozen XID in any database' + preprocessing: + - type: REGEX + parameters: + - '\n\s?\n([\s\S]*)' + - \1 + - type: MATCHES_REGEX + parameters: + - '^[0-9]+$' tags: - tag: Application value: PostgreSQL - - uuid: e73d02840f4043288f5fe5cc8b7c997b + - uuid: ee88f5f4d2384f97946d049af5af4502 name: 'PostgreSQL version' - key: 'pgmon[{$AGENT_NAME},{$CLUSTER},version]' + key: 'web.page.get[localhost,/version,{$AGENT_PORT}]' delay: 1h + history: 90d description: 'PostgreSQL Server version number' + preprocessing: + - type: REGEX + parameters: + - '\n\s?\n([\s\S]*)' + - \1 + - type: MATCHES_REGEX + parameters: + - '^[0-9]+$' tags: - tag: Application value: PostgreSQL discovery_rules: - - uuid: 35222acc25d34e008edd88f6b6ff7ad7 + - uuid: 085de335305e435dbb4439bd52e0d35d name: 'Discover Databases' - key: 'pgmon[{$AGENT_NAME},{$CLUSTER},discover_dbs,pos={$AGENT_NAME}:{$CLUSTER}]' + key: 'web.page.get[localhost,/discover_dbs,{$AGENT_PORT}]' delay: 10m + lifetime: 30d + enabled_lifetime_type: DISABLE_NEVER item_prototypes: - - uuid: 4bd8ecf7677a4eed97409ed86d8d0e64 - name: 'Database Stats for {#DBNAME}' - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' - history: '0' - trends: '0' - value_type: TEXT - tags: - - tag: Agent - value: '{#AGENT}' - - tag: Application - value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - - tag: Database - value: '{#DBNAME}' - - uuid: 98efbb23589b4de6ae95ab61d08c78b0 + - uuid: a30babe4a6f4440bba2a3ee46eff7ce2 name: 'Time spent executing statements on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},active_time,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[active_time,{#DBNAME}]' + history: 90d value_type: FLOAT units: s description: 'Time spent executing SQL statements in this database, in milliseconds (this corresponds to the states active and fastpath function call in pg_stat_activity)' @@ -63,84 +69,68 @@ zabbix_export: parameters: - '0.001' master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: c8e998f898b64af3b6452313d4bb8573 + - uuid: ea2a9c0db25a478d819cb290e4c734d2 name: 'Number of backends on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},backends,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[backends,{#DBNAME}]' + history: 90d description: 'Number of backends currently connected to this database, or NULL for shared objects. This is the only column in this view that returns a value reflecting current state; all other columns return the accumulated values since the last reset.' preprocessing: - type: JSONPATH parameters: - $.numbackends master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 569ed1c8849747a9b5b90ff2d859ea89 + - uuid: f115a12170744a449d11c24badfc61db name: 'Blocks hit on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},blks_hit,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[blks_hit,{#DBNAME}]' + history: 90d description: 'Number of times disk blocks were found already in the buffer cache, so that a read was not necessary (this only includes hits in the PostgreSQL buffer cache, not the operating system''s file system cache)' preprocessing: - type: JSONPATH parameters: - $.blks_hit master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 88a3eebdf53b43c0b8b8ef60aba56116 + - uuid: 361e6ae0b27344aeb5f7faea46eaab3f name: 'Blocks read on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},blks_read,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[blks_read,{#DBNAME}]' + history: 90d description: 'Number of disk blocks read in this database' preprocessing: - type: JSONPATH parameters: - $.blks_read master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 05a62cf75aa34435bb8275d4b84875bb + - uuid: b8ed3d0016c04c4d828f34fba2e54456 name: 'Time spent reading blocks on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},blk_read_time,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[blk_read_time,{#DBNAME}]' + history: 90d value_type: FLOAT units: s description: 'Time spent reading data file blocks by backends in this database, in milliseconds (if track_io_timing is enabled, otherwise zero)' @@ -152,21 +142,17 @@ zabbix_export: parameters: - '0.001' master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 51ae24232a3d484a9fe6bdc0edc5ec96 + - uuid: 8c29b1920ebe4bbdaaaeeb23074f0587 name: 'Time spent writing blocks on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},blk_write_time,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[blk_write_time,{#DBNAME}]' + history: 90d value_type: FLOAT units: s description: 'Time spent writing data file blocks by backends in this database, in milliseconds (if track_io_timing is enabled, otherwise zero)' @@ -178,21 +164,17 @@ zabbix_export: parameters: - '0.001' master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: bfae7ca4d10f497694b799d5abf15de8 + - uuid: 33b27a2a9ee4446994d03760a8f8f353 name: 'Total number of checksum failures on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},checksum_failures,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[checksum_failures,{#DBNAME}]' + history: 90d description: 'Number of data page checksum failures detected in this database (or on a shared object), or NULL if data checksums are not enabled.' preprocessing: - type: JSONPATH @@ -204,63 +186,51 @@ zabbix_export: error_handler: CUSTOM_VALUE error_handler_params: '0' master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: eb72f62ae8b24c2688a9832ed5f51792 + - uuid: 45b7fe9cb8514f4ca60a0fef5a651903 name: 'Total number of conflicts on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},conflicts,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[conflicts,{#DBNAME}]' + history: 90d description: 'Number of queries canceled due to conflicts with recovery in this database. (Conflicts occur only on standby servers; see pg_stat_database_conflicts for details.)' preprocessing: - type: JSONPATH parameters: - $.conflicts master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 5a039ddd9f27497a818e145bb912fe25 + - uuid: 402067f4ba8f486ca5881f59f67cbee1 name: 'Total number of deadlocks on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},deadlocks,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[deadlocks,{#DBNAME}]' + history: 90d description: 'Number of deadlocks detected in this database' preprocessing: - type: JSONPATH parameters: - $.deadlocks master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: f2d1c3c0540a4f2da0e48260b1ce556b + - uuid: ef4cc7644dc14969b703cc43db06727e name: 'Time spent in idle transactions on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},idle_in_transaction_time,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[idle_in_transaction_time,{#DBNAME}]' + history: 90d value_type: FLOAT units: s description: 'Time spent idling while in a transaction in this database, in milliseconds (this corresponds to the states idle in transaction and idle in transaction (aborted) in pg_stat_activity)' @@ -272,105 +242,85 @@ zabbix_export: parameters: - '0.001' master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: f52b9b40acac43b8aa130c7c7ba7c55e + - uuid: a07d6f6ec90846619296f1637fffd677 name: 'Total number of sessions on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},sessions,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[sessions,{#DBNAME}]' + history: 90d description: 'Total number of sessions established to this database' preprocessing: - type: JSONPATH parameters: - $.sessions master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 7dfe2f7842e14967bf48fb73f90f5568 + - uuid: 23efc8770d2f4c93a96b9d4de6780c7a name: 'Total number of abandoned sessions on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},sessions_abandoned,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[sessions_abandoned,{#DBNAME}]' + history: 90d description: 'Number of database sessions to this database that were terminated because connection to the client was lost' preprocessing: - type: JSONPATH parameters: - $.sessions_abandoned master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: e8bb24eba9c04d7b9bff05798d751f6f + - uuid: e95b7ade648743dba6427304feca1a8c name: 'Total number of fatal sessions on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},sessions_fatal,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[sessions_fatal,{#DBNAME}]' + history: 90d description: 'Number of database sessions to this database that were terminated by fatal errors' preprocessing: - type: JSONPATH parameters: - $.sessions_fatal master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: aaf735d7125c494e9dfc504f3ebbc7e4 + - uuid: 87f773582b5147aeadd14f38da2dbc29 name: 'Total number of terminated sessions on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},sessions_killed,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[sessions_killed,{#DBNAME}]' + history: 90d description: 'Number of database sessions to this database that were terminated by operator intervention' preprocessing: - type: JSONPATH parameters: - $.sessions_killed master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 58ede113f9ab46188239b5fa094107a2 + - uuid: 83bb18e4bcd54c1f8ef10c56a5cd9a47 name: 'Total temp file size on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},temp_bytes,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[temp_bytes,{#DBNAME}]' + history: 90d units: b description: 'Total amount of data written to temporary files by queries in this database. All temporary files are counted, regardless of why the temporary file was created, and regardless of the log_temp_files setting.' preprocessing: @@ -378,302 +328,280 @@ zabbix_export: parameters: - $.temp_bytes master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 3e837ab01d904d1fb10c50b843ffb25e + - uuid: 936c1178c6c04a8caf882308b2cbbd10 name: 'Total number of temp files on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},temp_files,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[temp_files,{#DBNAME}]' + history: 90d description: 'Number of temporary files created by queries in this database. All temporary files are counted, regardless of why the temporary file was created (e.g., sorting or hashing), and regardless of the log_temp_files setting.' preprocessing: - type: JSONPATH parameters: - $.temp_files master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 5eade95d8a374122b2e5ab861c179918 + - uuid: 2fcec7a5c0ee4fd6a7356a58fa9a44f8 name: 'Tuples deleted on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},tup_deleted,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[tup_deleted,{#DBNAME}]' + history: 90d description: 'Number of rows deleted by queries in this database' preprocessing: - type: JSONPATH parameters: - $.tup_deleted master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 40992ae903c64104ad719365575f7ac9 + - uuid: bb453280f88e4442ba0f733a50b40dd0 name: 'Tuples fetched by index scans on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},tup_fetched,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[tup_fetched,{#DBNAME}]' + history: 90d description: 'Number of live rows fetched by index scans in this database' preprocessing: - type: JSONPATH parameters: - $.tup_fetched master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 8f317cab436b429b8898643c9b36ab71 + - uuid: a1d1fa5a51fa4c228aa934e19c9c9f1d name: 'Tuples inserted on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},tup_inserted,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[tup_inserted,{#DBNAME}]' + history: 90d description: 'Number of rows inserted by queries in this database' preprocessing: - type: JSONPATH parameters: - $.tup_inserted master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: cb791a01a44840e8ad355411e3ce41be + - uuid: 009f2d0268f24b1996611837956f20e5 name: 'Tuples returned by sequential scans on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},tup_returned,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[tup_returned,{#DBNAME}]' + history: 90d description: 'Number of live rows fetched by sequential scans and index entries returned by index scans in this database' preprocessing: - type: JSONPATH parameters: - $.tup_returned master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: cc8da1b703c9424d8b08e55e61c3dd23 + - uuid: 46344f38be2f40faa45b9e9a2fce5273 name: 'Tuples updated on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},tup_updated,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[tup_updated,{#DBNAME}]' + history: 90d description: 'Number of rows updated by queries in this database' preprocessing: - type: JSONPATH parameters: - $.tup_updated master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: e0b4c77f621b49c590f1b73d3f6bbd8a + - uuid: 3886a2b0df3b481f98a15b38502522ab name: 'Total number of commits on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},xact_commit,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[xact_commit,{#DBNAME}]' + history: 90d description: 'Number of transactions in this database that have been committed' preprocessing: - type: JSONPATH parameters: - $.xact_commit master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 016eb3789cc84340bfeee252073317ac + - uuid: 5716033b216445fbb895dc5b546c003d name: 'Total number of rollbacks on {#DBNAME}' type: DEPENDENT - key: 'pgmon_db[{#AGENT},{#CLUSTER},xact_rollback,{#DBNAME}]' - delay: '0' + key: 'pgmon_db[xact_rollback,{#DBNAME}]' + history: 90d description: 'Number of transactions in this database that have been rolled back' preprocessing: - type: JSONPATH parameters: - $.xact_rollback master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},db_stats,datname={#DBNAME}]' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 3f2b24137fab474fbe3e7c66b82e68f0 + - uuid: a67f50d2a3fd44f98e79bb98158bfe10 name: 'Tuples fetched by index scans on {#DBNAME} - 1h delta' type: CALCULATED - key: 'pgmon_db_delta[{#AGENT},{#CLUSTER},tup_fetched,1h,{#DBNAME}]' + key: 'pgmon_db_delta[tup_fetched,1h,{#DBNAME}]' delay: 10m - params: 'last(//pgmon_db[{#AGENT},{#CLUSTER},tup_fetched,{#DBNAME}]) - last(//pgmon_db[{#AGENT},{#CLUSTER},tup_fetched,{#DBNAME}], #1:now-1h)' + history: 90d + params: 'last(//pgmon_db[tup_fetched,{#DBNAME}]) - last(//pgmon_db[tup_fetched,{#DBNAME}], #1:now-1h)' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 73bcb7f2272f4277ae64624f88a42d16 + - uuid: a81ba5a7c96d40bca9cc0861da574b49 name: 'Tuples fetched by index scans on {#DBNAME} - 1m delta' type: CALCULATED - key: 'pgmon_db_delta[{#AGENT},{#CLUSTER},tup_fetched,1m,{#DBNAME}]' - params: 'last(//pgmon_db[{#AGENT},{#CLUSTER},tup_fetched,{#DBNAME}]) - last(//pgmon_db[{#AGENT},{#CLUSTER},tup_fetched,{#DBNAME}], #1:now-1m)' + key: 'pgmon_db_delta[tup_fetched,1m,{#DBNAME}]' + history: 90d + params: 'last(//pgmon_db[tup_fetched,{#DBNAME}]) - last(//pgmon_db[tup_fetched,{#DBNAME}], #1:now-1m)' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: be104f19788a40b98ddffa390448fb96 + - uuid: 3bfa9b7db9394b6d9e0cac6255524f50 name: 'Tuples fetched by index scans on {#DBNAME} - 5m delta' type: CALCULATED - key: 'pgmon_db_delta[{#AGENT},{#CLUSTER},tup_fetched,5m,{#DBNAME}]' - params: 'last(//pgmon_db[{#AGENT},{#CLUSTER},tup_fetched,{#DBNAME}]) - last(//pgmon_db[{#AGENT},{#CLUSTER},tup_fetched,{#DBNAME}], #1:now-5m)' + key: 'pgmon_db_delta[tup_fetched,5m,{#DBNAME}]' + history: 90d + params: 'last(//pgmon_db[tup_fetched,{#DBNAME}]) - last(//pgmon_db[tup_fetched,{#DBNAME}], #1:now-5m)' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 9516e42e590e430e96d8df9d71e55fad + - uuid: 261e83e1e87c42d587a9409a8a26f971 name: 'Tuples returned by sequential scans on {#DBNAME} - 1h delta' type: CALCULATED - key: 'pgmon_db_delta[{#AGENT},{#CLUSTER},tup_returned,1h,{#DBNAME}]' + key: 'pgmon_db_delta[tup_returned,1h,{#DBNAME}]' delay: 10m - params: 'last(//pgmon_db[{#AGENT},{#CLUSTER},tup_returned,{#DBNAME}]) - last(//pgmon_db[{#AGENT},{#CLUSTER},tup_returned,{#DBNAME}], #1:now-1h)' + history: 90d + params: 'last(//pgmon_db[tup_returned,{#DBNAME}]) - last(//pgmon_db[tup_returned,{#DBNAME}], #1:now-1h)' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 4b361814e2e2414d8ee589f00330cc79 + - uuid: e46c4ec2a21d48288337b90549fbf757 name: 'Tuples returned by sequential scans on {#DBNAME} - 1m delta' type: CALCULATED - key: 'pgmon_db_delta[{#AGENT},{#CLUSTER},tup_returned,1m,{#DBNAME}]' + key: 'pgmon_db_delta[tup_returned,1m,{#DBNAME}]' delay: 10m - params: 'last(//pgmon_db[{#AGENT},{#CLUSTER},tup_returned,{#DBNAME}]) - last(//pgmon_db[{#AGENT},{#CLUSTER},tup_returned,{#DBNAME}], #1:now-1m)' + history: 90d + params: 'last(//pgmon_db[tup_returned,{#DBNAME}]) - last(//pgmon_db[tup_returned,{#DBNAME}], #1:now-1m)' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' - - uuid: 77204dc6d31e465eab99e780ea647a78 + - uuid: 26a42b55d7f949588f0739388ff52831 name: 'Tuples returned by sequential scans on {#DBNAME} - 5m delta' type: CALCULATED - key: 'pgmon_db_delta[{#AGENT},{#CLUSTER},tup_returned,5m,{#DBNAME}]' + key: 'pgmon_db_delta[tup_returned,5m,{#DBNAME}]' delay: 10m - params: 'last(//pgmon_db[{#AGENT},{#CLUSTER},tup_returned,{#DBNAME}]) - last(//pgmon_db[{#AGENT},{#CLUSTER},tup_returned,{#DBNAME}], #1:now-5m)' + history: 90d + params: 'last(//pgmon_db[tup_returned,{#DBNAME}]) - last(//pgmon_db[tup_returned,{#DBNAME}], #1:now-5m)' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Database value: '{#DBNAME}' + - uuid: 492b3cac15f348c2b85f97b69c114d1b + name: 'Database Stats for {#DBNAME}' + key: 'web.page.get[localhost,/db_stats?datname={#DBNAME},{$AGENT_PORT}]' + history: '0' + value_type: TEXT + preprocessing: + - type: REGEX + parameters: + - '\n\s?\n([\s\S]*)' + - \1 + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + graph_prototypes: + - uuid: 4949cdfbda614af796a2856fdfa9ac3f + name: 'Time breakdown for {#DBNAME} on {#CLUSTER}' + graph_items: + - color: 199C0D + calc_fnc: ALL + item: + host: 'PostgreSQL by pgmon' + key: 'pgmon_db[active_time,{#DBNAME}]' + - sortorder: '1' + color: F63100 + calc_fnc: ALL + item: + host: 'PostgreSQL by pgmon' + key: 'pgmon_db[idle_in_transaction_time,{#DBNAME}]' + - sortorder: '2' + color: 2774A4 + calc_fnc: ALL + item: + host: 'PostgreSQL by pgmon' + key: 'pgmon_db[blk_read_time,{#DBNAME}]' + - sortorder: '3' + color: F7941D + calc_fnc: ALL + item: + host: 'PostgreSQL by pgmon' + key: 'pgmon_db[blk_write_time,{#DBNAME}]' lld_macro_paths: - - lld_macro: '{#AGENT}' - path: $.agent - - lld_macro: '{#CLUSTER}' - path: $.cluster - lld_macro: '{#DBNAME}' path: $.datname - - uuid: fd2db56cbc4d4ad38595d51a8f5c1d1b + preprocessing: + - type: REGEX + parameters: + - '\n\s?\n([\s\S]*)' + - \1 + - uuid: 8ec029d577ae4872858e2e5cfd1cc40e name: 'Discover Replication' - key: 'pgmon[{$AGENT_NAME},{$CLUSTER},discover_rep,pos={$AGENT_NAME}:{$CLUSTER}]' + key: 'web.page.get[localhost,/discover_rep,{$AGENT_PORT}]' delay: 10m + lifetime: 30d + enabled_lifetime_type: DISABLE_NEVER item_prototypes: - - uuid: 127c3e28248e4b2bb0937e5ea34cc847 - name: 'Replication Stats for {#REPID}' - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' - history: '0' - trends: '0' - value_type: TEXT - tags: - - tag: Agent - value: '{#AGENT}' - - tag: Cluster - value: '{#CLUSTER}' - - tag: Database - value: '{#DBNAME}' - - uuid: 333f6f3c7ae749d998ea3e679f52fee1 + - uuid: 3a5a60620e6a4db694e47251148d82f5 name: 'Flush lag for {#REPID}' type: DEPENDENT - key: 'pgmon_rep[{#AGENT},{#CLUSTER},flush_lag,repid={#REPID}]' - delay: '0' + key: 'pgmon_rep[flush_lag,repid={#REPID}]' + history: 90d value_type: FLOAT description: 'Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written and flushed it (but not yet applied it). This can be used to gauge the delay that synchronous_commit level on incurred while committing if this server was configured as a synchronous standby.' preprocessing: @@ -681,24 +609,19 @@ zabbix_export: parameters: - $.flush_lag master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Component value: Replication - tag: Database value: '{#DBNAME}' - - uuid: 42303bb6d62c4288a19049352a0752d7 + - uuid: 624f8f085a3642c9a10a03361c17763d name: 'Last flush LSN for {#REPID}' type: DEPENDENT - key: 'pgmon_rep[{#AGENT},{#CLUSTER},flush_lsn,repid={#REPID}]' - delay: '0' - trends: '0' + key: 'pgmon_rep[flush_lsn,repid={#REPID}]' + history: 90d value_type: TEXT description: 'Last write-ahead log location flushed to disk by this standby server' preprocessing: @@ -706,23 +629,19 @@ zabbix_export: parameters: - $.flush_lsn master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Component value: Replication - tag: Database value: '{#DBNAME}' - - uuid: 5838b1afd66e4063b793ee4b597e89fa + - uuid: 442a0f4baa224ad69fd883879e5c768b name: 'Replay lag for {#REPID}' type: DEPENDENT - key: 'pgmon_rep[{#AGENT},{#CLUSTER},replay_lag,repid={#REPID}]' - delay: '0' + key: 'pgmon_rep[replay_lag,repid={#REPID}]' + history: 90d value_type: FLOAT description: 'Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written, flushed and applied it. This can be used to gauge the delay that synchronous_commit level remote_apply incurred while committing if this server was configured as a synchronous standby.' preprocessing: @@ -730,24 +649,19 @@ zabbix_export: parameters: - $.replay_lag master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Component value: Replication - tag: Database value: '{#DBNAME}' - - uuid: 9fdd3add95b04e65ba4e14f1fa2af864 + - uuid: fe1bed51845d4694bae8f53deed4846d name: 'Last replay LSN for {#REPID}' type: DEPENDENT - key: 'pgmon_rep[{#AGENT},{#CLUSTER},replay_lsn,repid={#REPID}]' - delay: '0' - trends: '0' + key: 'pgmon_rep[replay_lsn,repid={#REPID}]' + history: 90d value_type: TEXT description: 'Last write-ahead log location replayed into the database on this standby server' preprocessing: @@ -755,24 +669,19 @@ zabbix_export: parameters: - $.replay_lsn master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Component value: Replication - tag: Database value: '{#DBNAME}' - - uuid: 2c8088ff43c548e2b30511468a0446aa + - uuid: 68c179d0e33f45f9bf82d2d4125763f0 name: 'Last sent LSN for {#REPID}' type: DEPENDENT - key: 'pgmon_rep[{#AGENT},{#CLUSTER},sent_lsn,repid={#REPID}]' - delay: '0' - trends: '0' + key: 'pgmon_rep[sent_lsn,repid={#REPID}]' + history: 90d value_type: TEXT description: 'Last write-ahead log location sent on this connection' preprocessing: @@ -780,24 +689,19 @@ zabbix_export: parameters: - $.sent_lsn master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Component value: Replication - tag: Database value: '{#DBNAME}' - - uuid: 1f3371bfd3d84af8aca6080024dc14aa + - uuid: be57b23dc0ce48a7a000a207c9868855 name: 'Replication state for {#REPID}' type: DEPENDENT - key: 'pgmon_rep[{#AGENT},{#CLUSTER},state,repid={#REPID}]' - delay: '0' - trends: '0' + key: 'pgmon_rep[state,repid={#REPID}]' + history: 90d value_type: TEXT description: | Current WAL sender state. Possible values are: @@ -811,23 +715,19 @@ zabbix_export: parameters: - $.state master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Component value: Replication - tag: Database value: '{#DBNAME}' - - uuid: 69bfddf63bba4740a3ddc1d729eac0f4 + - uuid: c056cc1202b7412d89bf3927c2858248 name: 'Write lag for {#REPID}' type: DEPENDENT - key: 'pgmon_rep[{#AGENT},{#CLUSTER},write_lag,repid={#REPID}]' - delay: '0' + key: 'pgmon_rep[write_lag,repid={#REPID}]' + history: 90d value_type: FLOAT description: 'Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written it (but not yet flushed it or applied it). This can be used to gauge the delay that synchronous_commit level remote_write incurred while committing if this server was configured as a synchronous standby.' preprocessing: @@ -835,24 +735,19 @@ zabbix_export: parameters: - $.write_lag master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Component value: Replication - tag: Database value: '{#DBNAME}' - - uuid: 53185c929b8c430e854d7ffb28a9ce8f + - uuid: 57fb03cf63af4b0a91d8e36d6ff64d30 name: 'Last write LSN for {#REPID}' type: DEPENDENT - key: 'pgmon_rep[{#AGENT},{#CLUSTER},write_lsn,repid={#REPID}]' - delay: '0' - trends: '0' + key: 'pgmon_rep[write_lsn,repid={#REPID}]' + history: 90d value_type: TEXT description: 'Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written it (but not yet flushed it or applied it). This can be used to gauge the delay that synchronous_commit level remote_write incurred while committing if this server was configured as a synchronous standby.' preprocessing: @@ -860,14 +755,27 @@ zabbix_export: parameters: - $.write_lsn master_item: - key: 'pgmon[{#AGENT},{#CLUSTER},rep_stats,repid={#REPID}]' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Component + value: Replication + - tag: Database + value: '{#DBNAME}' + - uuid: efbe11f37c2f499488bdc5853c3d89e6 + name: 'Replication Stats for {#REPID}' + key: 'web.page.get[localhost,/rep_stats?repid={#REPID},{$AGENT_PORT}]' + history: '0' + value_type: TEXT + preprocessing: + - type: REGEX + parameters: + - '\n\s?\n([\s\S]*)' + - \1 tags: - - tag: Agent - value: '{#AGENT}' - tag: Application value: PostgreSQL - - tag: Cluster - value: '{#CLUSTER}' - tag: Component value: Replication - tag: Database @@ -883,10 +791,12 @@ zabbix_export: path: $.repid - lld_macro: '{#STATE}' path: $.state + preprocessing: + - type: REGEX + parameters: + - '\n\s?\n([\s\S]*)' + - \1 macros: - - macro: '{$AGENT_NAME}' - value: pgmon - description: 'The name of the pgmon instance to use' - - macro: '{$CLUSTER}' - value: local - description: 'The name of the cluster to monitor' + - macro: '{$AGENT_PORT}' + value: '5400' + description: 'The port the agent listens on' diff --git a/pgmon_userparameter.conf b/pgmon_userparameter.conf deleted file mode 100644 index c8f6a61..0000000 --- a/pgmon_userparameter.conf +++ /dev/null @@ -1 +0,0 @@ -UserParameter=pgmon[*],/usr/local/bin/pgmon.py -c /etc/pgmon/$1.cfg "$1,$2,$3,$4,$5,$6,$7,$8,$9" diff --git a/requirements.yml b/requirements.yml index 658130b..6612b4c 100644 --- a/requirements.yml +++ b/requirements.yml @@ -1 +1,2 @@ -psycopg2 +psycopg[binary,pool] +pyyaml