From cc71547f5fb11ff7d65b538103230eed055654f5 Mon Sep 17 00:00:00 2001 From: James Campbell Date: Mon, 7 Jul 2025 13:07:11 -0400 Subject: [PATCH 1/9] Include application_name in replication discovery data --- sample-config/pgmon-metrics.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/sample-config/pgmon-metrics.yml b/sample-config/pgmon-metrics.yml index 0b07909..38d3b5f 100644 --- a/sample-config/pgmon-metrics.yml +++ b/sample-config/pgmon-metrics.yml @@ -18,6 +18,7 @@ metrics: query: 0: > SELECT host(client_addr) || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') AS repid, + application_name, client_addr, state FROM pg_stat_replication From ea3aca34553b21852e72cb4961c76eca9e530d1f Mon Sep 17 00:00:00 2001 From: James Campbell Date: Mon, 7 Jul 2025 13:15:03 -0400 Subject: [PATCH 2/9] Filter out initial logical replication sync workers * Slots are crated for each table during the initial sync, which only live for the duration of the copy for that table. * The initial sync backend workers' names are also based on the table being copied. * Filter out the above in Zabbix discovery based on application_name and slot_name. --- zabbix_templates/pgmon_templates.yaml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/zabbix_templates/pgmon_templates.yaml b/zabbix_templates/pgmon_templates.yaml index 64a13fe..d18c450 100644 --- a/zabbix_templates/pgmon_templates.yaml +++ b/zabbix_templates/pgmon_templates.yaml @@ -1572,6 +1572,12 @@ zabbix_export: type: HTTP_AGENT key: pgmon_discover_rep delay: 10m + filter: + conditions: + - macro: '{#APPLICATION_NAME}' + value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$' + operator: NOT_MATCHES_REGEX + formulaid: A lifetime: 30d enabled_lifetime_type: DISABLE_NEVER item_prototypes: @@ -1775,6 +1781,8 @@ zabbix_export: value: Raw url: 'http://localhost:{$AGENT_PORT}/discover_rep' lld_macro_paths: + - lld_macro: '{#APPLICATION_NAME}' + path: $.application_name - lld_macro: '{#CLIENT_ADDR}' path: $.client_addr - lld_macro: '{#REPID}' @@ -1786,6 +1794,12 @@ zabbix_export: type: HTTP_AGENT key: pgmon_discover_slots delay: 10m + filter: + conditions: + - macro: '{#SLOT_NAME}' + value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$' + operator: NOT_MATCHES_REGEX + formulaid: A item_prototypes: - uuid: 536c5f82e3074ddfbfd842b3a2e8d46c name: 'Slot {#SLOT_NAME} - Confirmed Flushed Bytes Lag' From 60589c2058daef3bd1cbbcd7b573dd3dd9e39190 Mon Sep 17 00:00:00 2001 From: James Campbell Date: Mon, 14 Jul 2025 01:39:57 -0400 Subject: [PATCH 3/9] Update undiscovered item behavior --- zabbix_templates/pgmon_templates.yaml | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/zabbix_templates/pgmon_templates.yaml b/zabbix_templates/pgmon_templates.yaml index d18c450..1f552d1 100644 --- a/zabbix_templates/pgmon_templates.yaml +++ b/zabbix_templates/pgmon_templates.yaml @@ -167,7 +167,8 @@ zabbix_export: operator: NOT_MATCHES_REGEX formulaid: A lifetime: 30d - enabled_lifetime_type: DISABLE_NEVER + enabled_lifetime_type: DISABLE_AFTER + enabled_lifetime: 1d item_prototypes: - uuid: a30babe4a6f4440bba2a3ee46eff7ce2 name: 'Time spent executing statements on {#DBNAME}' @@ -982,6 +983,9 @@ zabbix_export: type: DEPENDENT key: pgmon_discover_io_backend_types delay: '0' + lifetime: 30d + enabled_lifetime_type: DISABLE_AFTER + enabled_lifetime: 1h item_prototypes: - uuid: b1ac2e56b30f4812bf33ce973ef16b10 name: 'I/O Evictions by {#BACKEND_TYPE}' @@ -1579,7 +1583,8 @@ zabbix_export: operator: NOT_MATCHES_REGEX formulaid: A lifetime: 30d - enabled_lifetime_type: DISABLE_NEVER + enabled_lifetime_type: DISABLE_AFTER + enabled_lifetime: 7d item_prototypes: - uuid: 3a5a60620e6a4db694e47251148d82f5 name: 'Flush lag for {#REPID}' @@ -1800,6 +1805,9 @@ zabbix_export: value: '^pg_[0-9]+_sync_[0-9]+_[0-9]+$' operator: NOT_MATCHES_REGEX formulaid: A + lifetime: 30d + enabled_lifetime_type: DISABLE_AFTER + enabled_lifetime: 7d item_prototypes: - uuid: 536c5f82e3074ddfbfd842b3a2e8d46c name: 'Slot {#SLOT_NAME} - Confirmed Flushed Bytes Lag' From 29bfd07dad9a4d4b90bd0838a3146dd054b119a3 Mon Sep 17 00:00:00 2001 From: James Campbell Date: Mon, 14 Jul 2025 01:58:08 -0400 Subject: [PATCH 4/9] Run code through black --- src/pgmon.py | 13 ++++++++++--- src/test_pgmon.py | 16 ++++++++-------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/pgmon.py b/src/pgmon.py index 2b72169..e5bd3fb 100755 --- a/src/pgmon.py +++ b/src/pgmon.py @@ -400,7 +400,7 @@ def json_encode_special(obj): """ if isinstance(obj, Decimal): return float(obj) - raise TypeError(f'Cannot serialize object of {type(obj)}') + raise TypeError(f"Cannot serialize object of {type(obj)}") def run_query_no_retry(pool, return_type, query, args): @@ -424,7 +424,9 @@ def run_query_no_retry(pool, return_type, query, args): elif return_type == "column": if len(res) == 0: return "[]" - return json.dumps([list(r.values())[0] for r in res], default=json_encode_special) + return json.dumps( + [list(r.values())[0] for r in res], default=json_encode_special + ) elif return_type == "set": return json.dumps(res, default=json_encode_special) except: @@ -667,7 +669,12 @@ def test_queries(): for name, metric in config["metrics"].items(): # If the metric has arguments to use while testing, grab those args = metric.get("test_args", {}) - print("Testing {} [{}]".format(name, ", ".join(["{}={}".format(key, value) for key, value in args.items()]))) + print( + "Testing {} [{}]".format( + name, + ", ".join(["{}={}".format(key, value) for key, value in args.items()]), + ) + ) # When testing against a docker container, we may end up connecting # before the service is truly up (it restarts during the initialization # phase). To cope with this, we'll allow a few connection failures. diff --git a/src/test_pgmon.py b/src/test_pgmon.py index 1a86492..534f8ba 100644 --- a/src/test_pgmon.py +++ b/src/test_pgmon.py @@ -795,17 +795,17 @@ metrics: def test_json_encode_special(self): # Confirm that we're getting the right type - self.assertFalse(isinstance(Decimal('0.5'), float)) - self.assertTrue(isinstance(pgmon.json_encode_special(Decimal('0.5')), float)) + self.assertFalse(isinstance(Decimal("0.5"), float)) + self.assertTrue(isinstance(pgmon.json_encode_special(Decimal("0.5")), float)) # Make sure we get sane values - self.assertEqual(pgmon.json_encode_special(Decimal('0.5')), 0.5) - self.assertEqual(pgmon.json_encode_special(Decimal('12')), 12.0) + self.assertEqual(pgmon.json_encode_special(Decimal("0.5")), 0.5) + self.assertEqual(pgmon.json_encode_special(Decimal("12")), 12.0) # Make sure we can still fail for other types - self.assertRaises( - TypeError, pgmon.json_encode_special, object - ) + self.assertRaises(TypeError, pgmon.json_encode_special, object) # Make sure we can actually serialize a Decimal - self.assertEqual(json.dumps(Decimal('2.5'), default=pgmon.json_encode_special), '2.5') + self.assertEqual( + json.dumps(Decimal("2.5"), default=pgmon.json_encode_special), "2.5" + ) From 43cd162313789681951106d3c78a9a1cac122378 Mon Sep 17 00:00:00 2001 From: James Campbell Date: Tue, 23 Sep 2025 01:12:49 -0400 Subject: [PATCH 5/9] Refactor to make pylint happy --- Makefile | 14 +- pylintrc | 4 + src/pgmon.py | 623 +++++++++++++++++++++++++--------------------- src/test_pgmon.py | 534 +++++++++++++++++++++++++++++---------- 4 files changed, 767 insertions(+), 408 deletions(-) create mode 100644 pylintrc diff --git a/Makefile b/Makefile index 95ae465..fa046d0 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ SUPPORTED := ubuntu-20.04 \ # These targets are the main ones to use for most things. ## -.PHONY: all clean tgz test query-tests install-common install-openrc install-systemd +.PHONY: all clean tgz lint format test query-tests install-common install-openrc install-systemd all: package-all @@ -80,6 +80,18 @@ tgz: clean: rm -rf $(BUILD_DIR) +# Check for lint +lint: + pylint src/pgmon.py + pylint src/test_pgmon.py + black --check --diff src/pgmon.py + black --check --diff src/test_pgmon.py + +# Format the code using black +format: + black src/pgmon.py + black src/test_pylint.py + # Run unit tests for the script test: cd src ; python3 -m unittest diff --git a/pylintrc b/pylintrc new file mode 100644 index 0000000..d5f47b8 --- /dev/null +++ b/pylintrc @@ -0,0 +1,4 @@ +[MASTER] +py-version=3.5 + +disable=fixme diff --git a/src/pgmon.py b/src/pgmon.py index e5bd3fb..738ae64 100755 --- a/src/pgmon.py +++ b/src/pgmon.py @@ -1,105 +1,141 @@ #!/usr/bin/env python3 +""" +pgmon is a monitoring intermediary that sits between a PostgreSQL cluster and a monitoring systen +that is capable of parsing JSON responses over an HTTP connection. +""" + +# pylint: disable=too-few-public-methods -import yaml import json import time import os import sys - +import signal import argparse import logging +import re + +from decimal import Decimal + +from urllib.parse import urlparse, parse_qs + +from contextlib import contextmanager from datetime import datetime, timedelta +from http.server import BaseHTTPRequestHandler +from http.server import ThreadingHTTPServer + +from threading import Lock + +import yaml + import psycopg2 from psycopg2.extras import RealDictCursor from psycopg2.pool import ThreadedConnectionPool -from contextlib import contextmanager - -import signal -from threading import Thread, Lock, Semaphore - -from http.server import BaseHTTPRequestHandler, HTTPServer -from http.server import ThreadingHTTPServer -from urllib.parse import urlparse, parse_qs - import requests -import re -from decimal import Decimal VERSION = "1.0.4" -# Configuration -config = {} -# Dictionary of current PostgreSQL connection pools -connections_lock = Lock() -connections = {} +class Context: + """ + The global context for connections, config, version, nad IPC + """ -# Dictionary of unhappy databases. Keys are database names, value is the time -# the database was determined to be unhappy plus the cooldown setting. So, -# basically it's the time when we should try to connect to the database again. -unhappy_cooldown = {} + # Configuration + config = {} -# Version information -cluster_version = None -cluster_version_next_check = None -cluster_version_lock = Lock() + # Dictionary of current PostgreSQL connection pools + connections_lock = Lock() + connections = {} -# PostgreSQL latest version information -latest_version = None -latest_version_next_check = None -latest_version_lock = Lock() -release_supported = None + # Dictionary of unhappy databases. Keys are database names, value is the time + # the database was determined to be unhappy plus the cooldown setting. So, + # basically it's the time when we should try to connect to the database again. + unhappy_cooldown = {} -# Running state (used to gracefully shut down) -running = True + # Version information + cluster_version = None + cluster_version_next_check = None + cluster_version_lock = Lock() -# The http server object -httpd = None + # PostgreSQL latest version information + latest_version = None + latest_version_next_check = None + latest_version_lock = Lock() + release_supported = None -# Where the config file lives -config_file = None + # Running state (used to gracefully shut down) + running = True -# Configure logging -log = logging.getLogger(__name__) -formatter = logging.Formatter( - "%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s" -) -console_log_handler = logging.StreamHandler() -console_log_handler.setFormatter(formatter) -log.addHandler(console_log_handler) + # The http server object + httpd = None + + # Where the config file lives + config_file = None + + # Configure logging + log = logging.getLogger(__name__) + + @classmethod + def init_logging(cls): + """ + Actually initialize the logging framework. Since we don't ever instantiate the Context + class, this provides a way to make a few modifications to the log handler. + """ + + formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(filename)s: " + "%(funcName)s() line %(lineno)d: %(message)s" + ) + console_log_handler = logging.StreamHandler() + console_log_handler.setFormatter(formatter) + cls.log.addHandler(console_log_handler) # Error types class ConfigError(Exception): - pass + """ + Error type for all config related errors. + """ class DisconnectedError(Exception): - pass + """ + Error indicating a previously active connection to the database has been disconnected. + """ class UnhappyDBError(Exception): - pass + """ + Error indicating that a database the code has been asked to connect to is on the unhappy list. + """ class UnknownMetricError(Exception): - pass + """ + Error indicating that an undefined metric was requested. + """ class MetricVersionError(Exception): - pass + """ + Error indicating that there is no suitable query for a metric that was requested for the + version of PostgreSQL being monitored. + """ class LatestVersionCheckError(Exception): - pass + """ + Error indicating that there was a problem retrieving or parsing the latest version information. + """ # Default config settings -default_config = { +DEFAULT_CONFIG = { # The address the agent binds to "address": "127.0.0.1", # The port the agent listens on for requests @@ -177,6 +213,60 @@ def update_deep(d1, d2): return d1 +def validate_metric(path, name, metric): + """ + Validate a metric definition from a given file. If any query definitions come from external + files, the metric dict will be updated with the actual query. + + Params: + path: path to the file which contains this definition + name: name of the metric + metric: the dictionary containing the metric definition + """ + # Validate return types + try: + if metric["type"] not in ["value", "row", "column", "set"]: + raise ConfigError( + "Invalid return type: {} for metric {} in {}".format( + metric["type"], name, path + ) + ) + except KeyError as e: + raise ConfigError( + "No type specified for metric {} in {}".format(name, path) + ) from e + + # Ensure queries exist + query_dict = metric.get("query", {}) + if not isinstance(query_dict, dict): + raise ConfigError( + "Query definition should be a dictionary, got: {} for metric {} in {}".format( + query_dict, name, path + ) + ) + + if len(query_dict) == 0: + raise ConfigError("Missing queries for metric {} in {}".format(name, path)) + + # Read external sql files and validate version keys + config_base = os.path.dirname(path) + for vers, query in metric["query"].items(): + try: + int(vers) + except Exception as e: + raise ConfigError( + "Invalid version: {} for metric {} in {}".format(vers, name, path) + ) from e + + # Read in the external query and update the definition in the metricdictionary + if query.startswith("file:"): + query_path = query[5:] + if not query_path.startswith("/"): + query_path = os.path.join(config_base, query_path) + with open(query_path, "r", encoding="utf-8") as f: + metric["query"][vers] = f.read() + + def read_config(path, included=False): """ Read a config file. @@ -185,61 +275,21 @@ def read_config(path, included=False): path: path to the file to read included: is this file included by another file? """ + # Read config file - log.info("Reading log file: {}".format(path)) - with open(path, "r") as f: + Context.log.info("Reading log file: %s", path) + with open(path, "r", encoding="utf-8") as f: try: cfg = yaml.safe_load(f) except yaml.parser.ParserError as e: - raise ConfigError("Inavlid config file: {}: {}".format(path, e)) - - # Since we use it a few places, get the base directory from the config - config_base = os.path.dirname(path) + raise ConfigError("Inavlid config file: {}: {}".format(path, e)) from e # Read any external queries and validate metric definitions for name, metric in cfg.get("metrics", {}).items(): - # Validate return types - try: - if metric["type"] not in ["value", "row", "column", "set"]: - raise ConfigError( - "Invalid return type: {} for metric {} in {}".format( - metric["type"], name, path - ) - ) - except KeyError: - raise ConfigError( - "No type specified for metric {} in {}".format(name, path) - ) - - # Ensure queries exist - query_dict = metric.get("query", {}) - if type(query_dict) is not dict: - raise ConfigError( - "Query definition should be a dictionary, got: {} for metric {} in {}".format( - query_dict, name, path - ) - ) - - if len(query_dict) == 0: - raise ConfigError("Missing queries for metric {} in {}".format(name, path)) - - # Read external sql files and validate version keys - for vers, query in metric["query"].items(): - try: - int(vers) - except: - raise ConfigError( - "Invalid version: {} for metric {} in {}".format(vers, name, path) - ) - - if query.startswith("file:"): - query_path = query[5:] - if not query_path.startswith("/"): - query_path = os.path.join(config_base, query_path) - with open(query_path, "r") as f: - metric["query"][vers] = f.read() + validate_metric(path, name, metric) # Read any included config files + config_base = os.path.dirname(path) for inc in cfg.get("include", []): # Prefix relative paths with the directory from the current config if not inc.startswith("/"): @@ -250,34 +300,37 @@ def read_config(path, included=False): # config if included: return cfg - else: - new_config = {} - update_deep(new_config, default_config) - update_deep(new_config, cfg) - # Minor sanity checks - if len(new_config["metrics"]) == 0: - log.error("No metrics are defined") - raise ConfigError("No metrics defined") + new_config = {} + update_deep(new_config, DEFAULT_CONFIG) + update_deep(new_config, cfg) - # Validate the new log level before changing the config - if new_config["log_level"].upper() not in [ - "DEBUG", - "INFO", - "WARNING", - "ERROR", - "CRITICAL", - ]: - raise ConfigError("Invalid log level: {}".format(new_config["log_level"])) + # Minor sanity checks + if len(new_config["metrics"]) == 0: + Context.log.error("No metrics are defined") + raise ConfigError("No metrics defined") - global config - config = new_config + # Validate the new log level before changing the config + if new_config["log_level"].upper() not in [ + "DEBUG", + "INFO", + "WARNING", + "ERROR", + "CRITICAL", + ]: + raise ConfigError("Invalid log level: {}".format(new_config["log_level"])) - # Apply changes to log level - log.setLevel(logging.getLevelName(config["log_level"].upper())) + Context.config = new_config + + # Apply changes to log level + Context.log.setLevel(logging.getLevelName(Context.config["log_level"].upper())) + + # Return the config (mostly to make pylint happy, but also in case I opt to remove the side + # effect and make this more functional. + return Context.config -def signal_handler(sig, frame): +def signal_handler(sig, frame): # pylint: disable=unused-argument """ Function for handling signals @@ -288,19 +341,22 @@ def signal_handler(sig, frame): # Signal everything to shut down if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]: - log.info("Shutting down ...") - global running - running = False - if httpd is not None: - httpd.socket.close() + Context.log.info("Shutting down ...") + Context.running = False + if Context.httpd is not None: + Context.httpd.socket.close() # Signal a reload if sig == signal.SIGHUP: - log.warning("Received config reload signal") - read_config(config_file) + Context.log.warning("Received config reload signal") + read_config(Context.config_file) class ConnectionPool(ThreadedConnectionPool): + """ + Threaded connection pool that has a context manager. + """ + def __init__(self, dbname, minconn, maxconn, *args, **kwargs): # Make sure dbname isn't different in the kwargs kwargs["dbname"] = dbname @@ -309,7 +365,14 @@ class ConnectionPool(ThreadedConnectionPool): self.name = dbname @contextmanager - def connection(self, timeout=None): + def connection(self, timeout): + """ + Connection context manager for our connection pool. This will attempt to retrieve a + connection until the timeout is reached. + + Params: + timeout: how long to keep trying to get a connection bedore giving up + """ conn = None timeout_time = datetime.now() + timedelta(timeout) # We will continue to try to get a connection slot until we time out @@ -333,34 +396,37 @@ class ConnectionPool(ThreadedConnectionPool): def get_pool(dbname): """ Get a database connection pool. + + Params: + dbname: the name of the database for which a connection pool should be returned. """ # Check if the db is unhappy and wants to be left alone - if dbname in unhappy_cooldown: - if unhappy_cooldown[dbname] > datetime.now(): + if dbname in Context.unhappy_cooldown: + if Context.unhappy_cooldown[dbname] > datetime.now(): raise UnhappyDBError() # Create a connection pool if it doesn't already exist - if dbname not in connections: - with connections_lock: + if dbname not in Context.connections: + with Context.connections_lock: # Make sure nobody created the pool while we were waiting on the # lock - if dbname not in connections: - log.info("Creating connection pool for: {}".format(dbname)) + if dbname not in Context.connections: + Context.log.info("Creating connection pool for: %s", dbname) # Actually create the connection pool - connections[dbname] = ConnectionPool( + Context.connections[dbname] = ConnectionPool( dbname, - int(config["min_pool_size"]), - int(config["max_pool_size"]), + int(Context.config["min_pool_size"]), + int(Context.config["max_pool_size"]), application_name="pgmon", - host=config["dbhost"], - port=config["dbport"], - user=config["dbuser"], - connect_timeout=int(config["connect_timeout"]), - sslmode=config["ssl_mode"], + host=Context.config["dbhost"], + port=Context.config["dbport"], + user=Context.config["dbuser"], + connect_timeout=int(Context.config["connect_timeout"]), + sslmode=Context.config["ssl_mode"], ) # Clear the unhappy indicator if present - unhappy_cooldown.pop(dbname, None) - return connections[dbname] + Context.unhappy_cooldown.pop(dbname, None) + return Context.connections[dbname] def handle_connect_failure(pool): @@ -368,8 +434,8 @@ def handle_connect_failure(pool): Mark the database as being unhappy so we can leave it alone for a while """ dbname = pool.name - unhappy_cooldown[dbname] = datetime.now() + timedelta( - seconds=int(config["reconnect_cooldown"]) + Context.unhappy_cooldown[dbname] = datetime.now() + timedelta( + seconds=int(Context.config["reconnect_cooldown"]) ) @@ -400,43 +466,48 @@ def json_encode_special(obj): """ if isinstance(obj, Decimal): return float(obj) - raise TypeError(f"Cannot serialize object of {type(obj)}") + raise TypeError("Cannot serialize object of {}".format(type(obj))) def run_query_no_retry(pool, return_type, query, args): """ Run the query with no explicit retry code """ - with pool.connection(float(config["connect_timeout"])) as conn: + with pool.connection(float(Context.config["connect_timeout"])) as conn: try: with conn.cursor(cursor_factory=RealDictCursor) as curs: + output = None curs.execute(query, args) res = curs.fetchall() if return_type == "value": if len(res) == 0: - return "" - return str(list(res[0].values())[0]) + output = "" + output = str(list(res[0].values())[0]) elif return_type == "row": - if len(res) == 0: - return "[]" - return json.dumps(res[0], default=json_encode_special) + # if len(res) == 0: + # return "[]" + output = json.dumps(res[0], default=json_encode_special) elif return_type == "column": - if len(res) == 0: - return "[]" - return json.dumps( + # if len(res) == 0: + # return "[]" + output = json.dumps( [list(r.values())[0] for r in res], default=json_encode_special ) elif return_type == "set": - return json.dumps(res, default=json_encode_special) - except: + output = json.dumps(res, default=json_encode_special) + else: + raise ConfigError( + "Invalid query return type: {}".format(return_type) + ) + return output + except Exception as e: dbname = pool.name - if dbname in unhappy_cooldown: - raise UnhappyDBError() - elif conn.closed != 0: - raise DisconnectedError() - else: - raise + if dbname in Context.unhappy_cooldown: + raise UnhappyDBError() from e + if conn.closed != 0: + raise DisconnectedError() from e + raise def run_query(pool, return_type, query, args): @@ -457,7 +528,7 @@ def run_query(pool, return_type, query, args): try: return run_query_no_retry(pool, return_type, query, args) except DisconnectedError: - log.warning("Stale PostgreSQL connection found ... trying again") + Context.log.warning("Stale PostgreSQL connection found ... trying again") # This sleep is an annoying hack to give the pool workers time to # actually mark the connection, otherwise it can be given back in the # next connection() call @@ -465,9 +536,9 @@ def run_query(pool, return_type, query, args): time.sleep(1) try: return run_query_no_retry(pool, return_type, query, args) - except: + except Exception as e: handle_connect_failure(pool) - raise UnhappyDBError() + raise UnhappyDBError() from e def get_cluster_version(): @@ -475,40 +546,39 @@ def get_cluster_version(): Get the PostgreSQL version if we don't already know it, or if it's been too long sice the last time it was checked. """ - global cluster_version - global cluster_version_next_check # If we don't know the version or it's past the recheck time, get the # version from the database. Only one thread needs to do this, so they all # try to grab the lock, and then make sure nobody else beat them to it. if ( - cluster_version is None - or cluster_version_next_check is None - or cluster_version_next_check < datetime.now() + Context.cluster_version is None + or Context.cluster_version_next_check is None + or Context.cluster_version_next_check < datetime.now() ): - with cluster_version_lock: + with Context.cluster_version_lock: # Only check if nobody already got the version before us if ( - cluster_version is None - or cluster_version_next_check is None - or cluster_version_next_check < datetime.now() + Context.cluster_version is None + or Context.cluster_version_next_check is None + or Context.cluster_version_next_check < datetime.now() ): - log.info("Checking PostgreSQL cluster version") - pool = get_pool(config["dbname"]) - cluster_version = int( + Context.log.info("Checking PostgreSQL cluster version") + pool = get_pool(Context.config["dbname"]) + Context.cluster_version = int( run_query(pool, "value", "SHOW server_version_num", None) ) - cluster_version_next_check = datetime.now() + timedelta( - seconds=int(config["version_check_period"]) + Context.cluster_version_next_check = datetime.now() + timedelta( + seconds=int(Context.config["version_check_period"]) ) - log.info("Got PostgreSQL cluster version: {}".format(cluster_version)) - log.debug( - "Next PostgreSQL cluster version check will be after: {}".format( - cluster_version_next_check - ) + Context.log.info( + "Got PostgreSQL cluster version: %s", Context.cluster_version + ) + Context.log.debug( + "Next PostgreSQL cluster version check will be after: %s", + Context.cluster_version_next_check, ) - return cluster_version + return Context.cluster_version def version_num_to_release(version_num): @@ -521,8 +591,7 @@ def version_num_to_release(version_num): """ if version_num // 10000 < 10: return version_num // 10000 + (version_num % 10000 // 100 / 10) - else: - return version_num // 10000 + return version_num // 10000 def parse_version_rss(raw_rss, release): @@ -530,7 +599,7 @@ def parse_version_rss(raw_rss, release): Parse the raw RSS from the versions.rss feed to extract the latest version of PostgreSQL that's availabe for the cluster being monitored. - This sets these global variables: + This sets these Context variables: latest_version release_supported @@ -540,8 +609,6 @@ def parse_version_rss(raw_rss, release): raw_rss: The raw rss text from versions.rss release: The PostgreSQL release we care about (ex: 9.2, 14) """ - global latest_version - global release_supported # Regular expressions for parsing the RSS document version_line = re.compile( @@ -561,75 +628,75 @@ def parse_version_rss(raw_rss, release): version = m.group(1) parts = list(map(int, version.split("."))) if parts[0] < 10: - latest_version = int( + Context.latest_version = int( "{}{:02}{:02}".format(parts[0], parts[1], parts[2]) ) else: - latest_version = int("{}00{:02}".format(parts[0], parts[1])) + Context.latest_version = int("{}00{:02}".format(parts[0], parts[1])) elif release_found: # The next line after the version tells if the version is supported if unsupported_line.match(line): - release_supported = False + Context.release_supported = False else: - release_supported = True + Context.release_supported = True break # Make sure we actually found it if not release_found: raise LatestVersionCheckError("Current release ({}) not found".format(release)) - log.info( - "Got latest PostgreSQL version: {} supported={}".format( - latest_version, release_supported - ) + Context.log.info( + "Got latest PostgreSQL version: %s supported=%s", + Context.latest_version, + Context.release_supported, ) - log.debug( - "Next latest PostgreSQL version check will be after: {}".format( - latest_version_next_check - ) + Context.log.debug( + "Next latest PostgreSQL version check will be after: %s", + Context.latest_version_next_check, ) def get_latest_version(): """ - Get the latest supported version of the major PostgreSQL release running on the server being monitored. + Get the latest supported version of the major PostgreSQL release running on the server being + monitored. """ - global latest_version_next_check - # If we don't know the latest version or it's past the recheck time, get the # version from the PostgreSQL RSS feed. Only one thread needs to do this, so # they all try to grab the lock, and then make sure nobody else beat them to it. if ( - latest_version is None - or latest_version_next_check is None - or latest_version_next_check < datetime.now() + Context.latest_version is None + or Context.latest_version_next_check is None + or Context.latest_version_next_check < datetime.now() ): # Note: we get the cluster version here before grabbing the latest_version_lock # lock so it's not held while trying to talk with the DB. release = version_num_to_release(get_cluster_version()) - with latest_version_lock: + with Context.latest_version_lock: # Only check if nobody already got the version before us if ( - latest_version is None - or latest_version_next_check is None - or latest_version_next_check < datetime.now() + Context.latest_version is None + or Context.latest_version_next_check is None + or Context.latest_version_next_check < datetime.now() ): - log.info("Checking latest PostgreSQL version") - latest_version_next_check = datetime.now() + timedelta( - seconds=int(config["latest_version_check_period"]) + Context.log.info("Checking latest PostgreSQL version") + Context.latest_version_next_check = datetime.now() + timedelta( + seconds=int(Context.config["latest_version_check_period"]) ) # Grab the RSS feed - raw_rss = requests.get("https://www.postgresql.org/versions.rss") + raw_rss = requests.get( + "https://www.postgresql.org/versions.rss", timeout=30 + ) if raw_rss.status_code != 200: - raise LatestVersionCheckError("code={}".format(r.status_code)) + raise LatestVersionCheckError("code={}".format(raw_rss.status_code)) - # Parse the RSS body and set global variables + # Parse the RSS body and set Context variables parse_version_rss(raw_rss.text, release) - return latest_version + return Context.latest_version def sample_metric(dbname, metric_name, args, retry=True): @@ -638,9 +705,9 @@ def sample_metric(dbname, metric_name, args, retry=True): """ # Get the metric definition try: - metric = config["metrics"][metric_name] - except KeyError: - raise UnknownMetricError("Unknown metric: {}".format(metric_name)) + metric = Context.config["metrics"][metric_name] + except KeyError as e: + raise UnknownMetricError("Unknown metric: {}".format(metric_name)) from e # Get the connection pool for the database, or create one if it doesn't # already exist. @@ -655,8 +722,7 @@ def sample_metric(dbname, metric_name, args, retry=True): # Execute the quert if retry: return run_query(pool, metric["type"], query, args) - else: - return run_query_no_retry(pool, metric["type"], query, args) + return run_query_no_retry(pool, metric["type"], query, args) def test_queries(): @@ -664,9 +730,9 @@ def test_queries(): Run all of the metric queries against a database and check the results """ # We just use the default db for tests - dbname = config["dbname"] + dbname = Context.config["dbname"] # Loop through all defined metrics. - for name, metric in config["metrics"].items(): + for name, metric in Context.config["metrics"].items(): # If the metric has arguments to use while testing, grab those args = metric.get("test_args", {}) print( @@ -711,9 +777,8 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): """ Override to suppress standard request logging """ - pass - def do_GET(self): + def do_GET(self): # pylint: disable=invalid-name """ Handle a request. This is just a wrapper around the actual handler code to keep things more readable. @@ -721,7 +786,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): try: self._handle_request() except BrokenPipeError: - log.error("Client disconnected, exiting handler") + Context.log.error("Client disconnected, exiting handler") def _handle_request(self): """ @@ -734,7 +799,6 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): if metric_name == "agent_version": self._reply(200, VERSION) - return elif metric_name == "latest_version_info": try: get_latest_version() @@ -742,46 +806,40 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): 200, json.dumps( { - "latest": latest_version, - "supported": 1 if release_supported else 0, + "latest": Context.latest_version, + "supported": 1 if Context.release_supported else 0, } ), ) except LatestVersionCheckError as e: - log.error("Failed to retrieve latest version information: {}".format(e)) + Context.log.error( + "Failed to retrieve latest version information: %s", e + ) self._reply(503, "Failed to retrieve latest version info") - return + else: + # Note: parse_qs returns the values as a list. Since we always expect + # single values, just grab the first from each. + args = {key: values[0] for key, values in parsed_query.items()} - # Note: parse_qs returns the values as a list. Since we always expect - # single values, just grab the first from each. - args = {key: values[0] for key, values in parsed_query.items()} + # Get the dbname. If none was provided, use the default from the + # config. + dbname = args.get("dbname", Context.config["dbname"]) - # Get the dbname. If none was provided, use the default from the - # config. - dbname = args.get("dbname", config["dbname"]) - - # Sample the metric - try: - self._reply(200, sample_metric(dbname, metric_name, args)) - return - except UnknownMetricError as e: - log.error("Unknown metric: {}".format(metric_name)) - self._reply(404, "Unknown metric") - return - except MetricVersionError as e: - log.error( - "Failed to find a version of {} for {}".format(metric_name, version) - ) - self._reply(404, "Unsupported version") - return - except UnhappyDBError as e: - log.info("Database {} is unhappy, please be patient".format(dbname)) - self._reply(503, "Database unavailable") - return - except Exception as e: - log.error("Error running query: {}".format(e)) - self._reply(500, "Unexpected error: {}".format(e)) - return + # Sample the metric + try: + self._reply(200, sample_metric(dbname, metric_name, args)) + except UnknownMetricError: + Context.log.error("Unknown metric: %s", metric_name) + self._reply(404, "Unknown metric") + except MetricVersionError: + Context.log.error("Failed to find an query version for %s", metric_name) + self._reply(404, "Unsupported version") + except UnhappyDBError: + Context.log.info("Database %s is unhappy, please be patient", dbname) + self._reply(503, "Database unavailable") + except Exception as e: # pylint: disable=broad-exception-caught + Context.log.error("Error running query: %s", e) + self._reply(500, "Unexpected error: {}".format(e)) def _reply(self, code, content): """ @@ -794,7 +852,14 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): self.wfile.write(bytes(content, "utf-8")) -if __name__ == "__main__": +def main(): + """ + Main application routine + """ + + # Initialize the logging framework + Context.init_logging() + # Handle cli args parser = argparse.ArgumentParser( prog="pgmon", description="A PostgreSQL monitoring agent" @@ -815,33 +880,35 @@ if __name__ == "__main__": args = parser.parse_args() # Set the config file path - config_file = args.config_file + Context.config_file = args.config_file # Read the config file - read_config(config_file) + read_config(Context.config_file) # Run query tests and exit if test mode is enabled if args.test: - errors = test_queries() - if errors > 0: + if test_queries() > 0: sys.exit(1) - else: - sys.exit(0) + sys.exit(0) # Set up the http server to receive requests - server_address = (config["address"], config["port"]) - httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) + server_address = (Context.config["address"], Context.config["port"]) + Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) # Set up the signal handler signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGHUP, signal_handler) # Handle requests. - log.info("Listening on port {}...".format(config["port"])) - while running: - httpd.handle_request() + Context.log.info("Listening on port %s...", Context.config["port"]) + while Context.running: + Context.httpd.handle_request() # Clean up PostgreSQL connections # TODO: Improve this ... not sure it actually closes all the connections cleanly - for pool in connections.values(): + for pool in Context.connections.values(): pool.close() + + +if __name__ == "__main__": + main() diff --git a/src/test_pgmon.py b/src/test_pgmon.py index 534f8ba..e75b0aa 100644 --- a/src/test_pgmon.py +++ b/src/test_pgmon.py @@ -1,5 +1,12 @@ +""" +Unit tests for pgmon +""" + +# pylint: disable=too-many-lines + import unittest +import os from datetime import datetime, timedelta import tempfile @@ -13,7 +20,7 @@ import pgmon # Silence most logging output logging.disable(logging.CRITICAL) -versions_rss = """ +VERSIONS_RSS = """ PostgreSQL latest versionshttps://www.postgresql.org/PostgreSQL latest versionsen-usThu, 08 May 2025 00:00:00 +000017.5 https://www.postgresql.org/docs/17/release-17-5.html17.5 is the latest release in the 17 series. @@ -103,12 +110,18 @@ This version is unsupported! """ -class TestPgmonMethods(unittest.TestCase): +class TestPgmonMethods(unittest.TestCase): # pylint: disable=too-many-public-methods + """ + Unit test class for pgmon + """ + ## # update_deep ## def test_update_deep__empty_cases(self): - # Test empty dict cases + """ + Test various empty dictionary permutations + """ d1 = {} d2 = {} pgmon.update_deep(d1, d2) @@ -128,7 +141,9 @@ class TestPgmonMethods(unittest.TestCase): self.assertEqual(d2, d1) def test_update_deep__scalars(self): - # Test adding/updating scalar values + """ + Test adding/updating scalar values + """ d1 = {"foo": 1, "bar": "text", "hello": "world"} d2 = {"foo": 2, "baz": "blah"} pgmon.update_deep(d1, d2) @@ -136,7 +151,9 @@ class TestPgmonMethods(unittest.TestCase): self.assertEqual(d2, {"foo": 2, "baz": "blah"}) def test_update_deep__lists(self): - # Test adding to lists + """ + Test adding to lists + """ d1 = {"lst1": []} d2 = {"lst1": [1, 2]} pgmon.update_deep(d1, d2) @@ -172,7 +189,9 @@ class TestPgmonMethods(unittest.TestCase): self.assertEqual(d2, {"obj1": {"l1": [3, 4]}}) def test_update_deep__dicts(self): - # Test adding to lists + """ + Test adding to dictionaries + """ d1 = {"obj1": {}} d2 = {"obj1": {"a": 1, "b": 2}} pgmon.update_deep(d1, d2) @@ -199,7 +218,9 @@ class TestPgmonMethods(unittest.TestCase): self.assertEqual(d2, {"obj1": {"d1": {"a": 5, "c": 12}}}) def test_update_deep__types(self): - # Test mismatched types + """ + Test mismatched types + """ d1 = {"foo": 5} d2 = None self.assertRaises(TypeError, pgmon.update_deep, d1, d2) @@ -218,15 +239,19 @@ class TestPgmonMethods(unittest.TestCase): ## def test_get_pool__simple(self): - # Just get a pool in a normal case - pgmon.config.update(pgmon.default_config) + """ + Test getting a pool in a normal case + """ + pgmon.Context.config.update(pgmon.DEFAULT_CONFIG) pool = pgmon.get_pool("postgres") self.assertIsNotNone(pool) def test_get_pool__unhappy(self): - # Test getting an unhappy database pool - pgmon.config.update(pgmon.default_config) - pgmon.unhappy_cooldown["postgres"] = datetime.now() + timedelta(60) + """ + Test getting an unhappy database pool + """ + pgmon.Context.config.update(pgmon.DEFAULT_CONFIG) + pgmon.Context.unhappy_cooldown["postgres"] = datetime.now() + timedelta(60) self.assertRaises(pgmon.UnhappyDBError, pgmon.get_pool, "postgres") # Test getting a different database when there's an unhappy one @@ -238,30 +263,37 @@ class TestPgmonMethods(unittest.TestCase): ## def test_handle_connect_failure__simple(self): - # Test adding to an empty unhappy list - pgmon.config.update(pgmon.default_config) - pgmon.unhappy_cooldown = {} + """ + Test adding to an empty unhappy list + """ + pgmon.Context.config.update(pgmon.DEFAULT_CONFIG) + pgmon.Context.unhappy_cooldown = {} pool = pgmon.get_pool("postgres") pgmon.handle_connect_failure(pool) - self.assertGreater(pgmon.unhappy_cooldown["postgres"], datetime.now()) + self.assertGreater(pgmon.Context.unhappy_cooldown["postgres"], datetime.now()) # Test adding another database pool = pgmon.get_pool("template0") pgmon.handle_connect_failure(pool) - self.assertGreater(pgmon.unhappy_cooldown["postgres"], datetime.now()) - self.assertGreater(pgmon.unhappy_cooldown["template0"], datetime.now()) - self.assertEqual(len(pgmon.unhappy_cooldown), 2) + self.assertGreater(pgmon.Context.unhappy_cooldown["postgres"], datetime.now()) + self.assertGreater(pgmon.Context.unhappy_cooldown["template0"], datetime.now()) + self.assertEqual(len(pgmon.Context.unhappy_cooldown), 2) ## # get_query ## def test_get_query__basic(self): - # Test getting a query with one version + """ + Test getting a query with just a default version. + """ metric = {"type": "value", "query": {0: "DEFAULT"}} self.assertEqual(pgmon.get_query(metric, 100000), "DEFAULT") def test_get_query__versions(self): + """ + Test getting queries when multiple versions are present. + """ metric = {"type": "value", "query": {0: "DEFAULT", 110000: "NEW"}} # Test getting the default version of a query with no lower bound and a newer @@ -281,6 +313,9 @@ class TestPgmonMethods(unittest.TestCase): self.assertEqual(pgmon.get_query(metric, 100000), "OLD") def test_get_query__missing_version(self): + """ + Test trying to get a query that is not defined for the requested version. + """ metric = {"type": "value", "query": {96000: "OLD", 110000: "NEW", 150000: ""}} # Test getting a metric that only exists for newer versions @@ -294,11 +329,16 @@ class TestPgmonMethods(unittest.TestCase): ## def test_read_config__simple(self): - pgmon.config = {} + """ + Test reading a simple config. + """ + pgmon.Context.config = {} # Test reading just a metric and using the defaults for everything else with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- # This is a comment! @@ -310,18 +350,20 @@ metrics: """ ) - pgmon.read_config(f"{tmpdirname}/config.yml") + pgmon.read_config(os.path.join(tmpdirname, "config.yml")) self.assertEqual( - pgmon.config["max_pool_size"], pgmon.default_config["max_pool_size"] + pgmon.Context.config["max_pool_size"], pgmon.DEFAULT_CONFIG["max_pool_size"] ) - self.assertEqual(pgmon.config["dbuser"], pgmon.default_config["dbuser"]) + self.assertEqual(pgmon.Context.config["dbuser"], pgmon.DEFAULT_CONFIG["dbuser"]) - pgmon.config = {} + pgmon.Context.config = {} # Test reading a basic config with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- # This is a comment! @@ -357,22 +399,27 @@ metrics: """ ) - pgmon.read_config(f"{tmpdirname}/config.yml") + pgmon.read_config(os.path.join(tmpdirname, "config.yml")) - self.assertEqual(pgmon.config["dbuser"], "someone") - self.assertEqual(pgmon.config["metrics"]["test1"]["type"], "value") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") - self.assertEqual(pgmon.config["metrics"]["test2"]["query"][0], "TEST2") + self.assertEqual(pgmon.Context.config["dbuser"], "someone") + self.assertEqual(pgmon.Context.config["metrics"]["test1"]["type"], "value") + self.assertEqual(pgmon.Context.config["metrics"]["test1"]["query"][0], "TEST1") + self.assertEqual(pgmon.Context.config["metrics"]["test2"]["query"][0], "TEST2") def test_read_config__include(self): - pgmon.config = {} + """ + Test including one config from another. + """ + pgmon.Context.config = {} # Test reading a config that includes other files (absolute and relative paths, # multiple levels) with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( - f"""--- + """--- # This is a comment! min_pool_size: 1 max_pool_size: 2 @@ -384,13 +431,17 @@ reconnect_cooldown: 15 version_check_period: 3600 include: - dbsettings.yml - - {tmpdirname}/metrics.yml -""" + - {}/metrics.yml +""".format( + tmpdirname + ) ) - with open(f"{tmpdirname}/dbsettings.yml", "w") as f: + with open( + os.path.join(tmpdirname, "dbsettings.yml"), "w", encoding="utf-8" + ) as f: f.write( - f"""--- + """--- dbuser: someone dbhost: localhost dbport: 5555 @@ -398,9 +449,11 @@ dbname: template0 """ ) - with open(f"{tmpdirname}/metrics.yml", "w") as f: + with open( + os.path.join(tmpdirname, "metrics.yml"), "w", encoding="utf-8" + ) as f: f.write( - f"""--- + """--- metrics: test1: type: value @@ -415,9 +468,11 @@ include: """ ) - with open(f"{tmpdirname}/more_metrics.yml", "w") as f: + with open( + os.path.join(tmpdirname, "more_metrics.yml"), "w", encoding="utf-8" + ) as f: f.write( - f"""--- + """--- metrics: test3: type: value @@ -425,20 +480,25 @@ metrics: 0: TEST3 """ ) - pgmon.read_config(f"{tmpdirname}/config.yml") + pgmon.read_config(os.path.join(tmpdirname, "config.yml")) - self.assertEqual(pgmon.config["max_idle_time"], 10) - self.assertEqual(pgmon.config["dbuser"], "someone") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") - self.assertEqual(pgmon.config["metrics"]["test2"]["query"][0], "TEST2") - self.assertEqual(pgmon.config["metrics"]["test3"]["query"][0], "TEST3") + self.assertEqual(pgmon.Context.config["max_idle_time"], 10) + self.assertEqual(pgmon.Context.config["dbuser"], "someone") + self.assertEqual(pgmon.Context.config["metrics"]["test1"]["query"][0], "TEST1") + self.assertEqual(pgmon.Context.config["metrics"]["test2"]["query"][0], "TEST2") + self.assertEqual(pgmon.Context.config["metrics"]["test3"]["query"][0], "TEST3") def test_read_config__reload(self): - pgmon.config = {} + """ + Test reloading a config. + """ + pgmon.Context.config = {} # Test rereading a config to update an existing config with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- # This is a comment! @@ -466,12 +526,14 @@ metrics: """ ) - pgmon.read_config(f"{tmpdirname}/config.yml") + pgmon.read_config(os.path.join(tmpdirname, "config.yml")) # Just make sure the first config was read - self.assertEqual(len(pgmon.config["metrics"]), 2) + self.assertEqual(len(pgmon.Context.config["metrics"]), 2) - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- # This is a comment! @@ -484,18 +546,23 @@ metrics: """ ) - pgmon.read_config(f"{tmpdirname}/config.yml") + pgmon.read_config(os.path.join(tmpdirname, "config.yml")) - self.assertEqual(pgmon.config["min_pool_size"], 7) - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "NEW1") - self.assertEqual(len(pgmon.config["metrics"]), 1) + self.assertEqual(pgmon.Context.config["min_pool_size"], 7) + self.assertEqual(pgmon.Context.config["metrics"]["test1"]["query"][0], "NEW1") + self.assertEqual(len(pgmon.Context.config["metrics"]), 1) def test_read_config__query_file(self): - pgmon.config = {} + """ + Test reading a query definition from a separate file + """ + pgmon.Context.config = {} # Read a config file that reads a query from a file with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- metrics: @@ -506,22 +573,30 @@ metrics: """ ) - with open(f"{tmpdirname}/some_query.sql", "w") as f: + with open( + os.path.join(tmpdirname, "some_query.sql"), "w", encoding="utf-8" + ) as f: f.write("This is a query") - pgmon.read_config(f"{tmpdirname}/config.yml") + pgmon.read_config(os.path.join(tmpdirname, "config.yml")) self.assertEqual( - pgmon.config["metrics"]["test1"]["query"][0], "This is a query" + pgmon.Context.config["metrics"]["test1"]["query"][0], "This is a query" ) - def test_read_config__invalid(self): - pgmon.config = {} + def init_invalid_config_test(self): + """ + Initialize an invalid config read test. Basically just set up a simple valid config in + order to confirm that an invalid read does not modify the live config. + """ + pgmon.Context.config = {} # For all of these tests, we start with a valid config and also ensure that # it is not modified when a new config read fails with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- metrics: @@ -532,20 +607,48 @@ metrics: """ ) - pgmon.read_config(f"{tmpdirname}/config.yml") + pgmon.read_config(os.path.join(tmpdirname, "config.yml")) # Just make sure the config was read - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + self.assertEqual(pgmon.Context.config["metrics"]["test1"]["query"][0], "TEST1") + + def verify_invalid_config_test(self): + """ + Verify that an invalid read did not modify the live config. + """ + self.assertEqual(pgmon.Context.config["dbuser"], "postgres") + self.assertEqual(pgmon.Context.config["metrics"]["test1"]["query"][0], "TEST1") + + def test_read_config__missing(self): + """ + Test reading a nonexistant config file. + """ + # Set up the test + self.init_invalid_config_test() # Test reading a nonexistant config file with tempfile.TemporaryDirectory() as tmpdirname: self.assertRaises( - FileNotFoundError, pgmon.read_config, f"{tmpdirname}/missing.yml" + FileNotFoundError, + pgmon.read_config, + os.path.join(tmpdirname, "missing.yml"), ) + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__invalid(self): + """ + Test reading an invalid config file. + """ + # Set up the test + self.init_invalid_config_test() + # Test reading an invalid config file with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """[default] This looks a lot like an ini file to me @@ -554,12 +657,26 @@ Or maybe a TOML? """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__invalid_include(self): + """ + Test reading an invalid config file. + """ + # Set up the test + self.init_invalid_config_test() + # Test reading a config that includes an invalid file with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil @@ -573,14 +690,26 @@ include: """ ) self.assertRaises( - FileNotFoundError, pgmon.read_config, f"{tmpdirname}/config.yml" + FileNotFoundError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__invalid_log_level(self): + """ + Test reading an invalid log level from a config file. + """ + # Set up the test + self.init_invalid_config_test() # Test invalid log level with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- log_level: noisy @@ -593,14 +722,26 @@ metrics: """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__invalid_type(self): + """ + Test reading an invalid query result type form a config file. + """ + # Set up the test + self.init_invalid_config_test() # Test invalid query return type with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil @@ -612,32 +753,57 @@ metrics: """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__invalid_query_dict(self): + """ + Test reading an invalid query definition structure type form a config file. In other words + what's supposed to be a dictionary of the form version => query, we give it something else. + """ + # Set up the test + self.init_invalid_config_test() # Test invalid query dict type with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil metrics: test1: - type: lots_of_data + type: row query: EVIL1 """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__missing_type(self): + """ + Test reading a metric with a missing result type from a config file. + """ + # Set up the test + self.init_invalid_config_test() # Test incomplete metric: missing type with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil @@ -648,14 +814,26 @@ metrics: """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__missing_queries(self): + """ + Test reading a metric with no queries from a config file. + """ + # Set up the test + self.init_invalid_config_test() # Test incomplete metric: missing queries with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil @@ -665,14 +843,26 @@ metrics: """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__empty_query_dict(self): + """ + Test reading a fetric with an empty query dict from a config file. + """ + # Set up the test + self.init_invalid_config_test() # Test incomplete metric: empty queries with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil @@ -683,14 +873,26 @@ metrics: """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__none_query_dict(self): + """ + Test reading a metric where the query dict is None from a config file. + """ + # Set up the test + self.init_invalid_config_test() # Test incomplete metric: query dict is None with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil @@ -701,28 +903,53 @@ metrics: """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__missing_metrics(self): + """ + Test reading a config file with no metrics. + """ + # Set up the test + self.init_invalid_config_test() # Test reading a config with no metrics with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__missing_query_file(self): + """ + Test reading a metric from a config file where the query definition cones from a missing + file. + """ + # Set up the test + self.init_invalid_config_test() # Test reading a query defined in a file but the file is missing with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil @@ -734,14 +961,26 @@ metrics: """ ) self.assertRaises( - FileNotFoundError, pgmon.read_config, f"{tmpdirname}/config.yml" + FileNotFoundError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + def test_read_config__invalid_version(self): + """ + Test reading a metric with an invalid PostgreSQL version from a config file. + """ + # Set up the test + self.init_invalid_config_test() # Test invalid query versions with tempfile.TemporaryDirectory() as tmpdirname: - with open(f"{tmpdirname}/config.yml", "w") as f: + with open( + os.path.join(tmpdirname, "config.yml"), "w", encoding="utf-8" + ) as f: f.write( """--- dbuser: evil @@ -753,47 +992,84 @@ metrics: """ ) self.assertRaises( - pgmon.ConfigError, pgmon.read_config, f"{tmpdirname}/config.yml" + pgmon.ConfigError, + pgmon.read_config, + os.path.join(tmpdirname, "config.yml"), ) - self.assertEqual(pgmon.config["dbuser"], "postgres") - self.assertEqual(pgmon.config["metrics"]["test1"]["query"][0], "TEST1") + + # Confirm nothing changed + self.verify_invalid_config_test() + + ## + # version_num + ## def test_version_num_to_release__valid(self): + """ + Test converting PostgreSQL versions before and after 10 when the numbering scheme changed. + """ self.assertEqual(pgmon.version_num_to_release(90602), 9.6) self.assertEqual(pgmon.version_num_to_release(130002), 13) - def test_parse_version_rss__simple(self): - pgmon.parse_version_rss(versions_rss, 13) - self.assertEqual(pgmon.latest_version, 130021) - self.assertTrue(pgmon.release_supported) + ## + # parse_version_rss + ## - pgmon.parse_version_rss(versions_rss, 9.6) - self.assertEqual(pgmon.latest_version, 90624) - self.assertFalse(pgmon.release_supported) + def test_parse_version_rss__supported(self): + """ + Test parsing a supported version from the RSS feed + """ + pgmon.parse_version_rss(VERSIONS_RSS, 13) + self.assertEqual(pgmon.Context.latest_version, 130021) + self.assertTrue(pgmon.Context.release_supported) + + def test_parse_version_rss__unsupported(self): + """ + Test parsing an unsupported version from the RSS feed + """ + pgmon.parse_version_rss(VERSIONS_RSS, 9.6) + self.assertEqual(pgmon.Context.latest_version, 90624) + self.assertFalse(pgmon.Context.release_supported) def test_parse_version_rss__missing(self): - # Test asking about versions that don't exist + """ + Test asking about versions that don't exist in the RSS feed + """ self.assertRaises( - pgmon.LatestVersionCheckError, pgmon.parse_version_rss, versions_rss, 9.7 + pgmon.LatestVersionCheckError, pgmon.parse_version_rss, VERSIONS_RSS, 9.7 ) self.assertRaises( - pgmon.LatestVersionCheckError, pgmon.parse_version_rss, versions_rss, 99 + pgmon.LatestVersionCheckError, pgmon.parse_version_rss, VERSIONS_RSS, 99 ) + ## + # get_latest_version + ## + def test_get_latest_version(self): + """ + Test getting the latest version from the actual RSS feed + """ # Define a cluster version here so the test doesn't need a database - pgmon.cluster_version_next_check = datetime.now() + timedelta(hours=1) - pgmon.cluster_version = 90623 + pgmon.Context.cluster_version_next_check = datetime.now() + timedelta(hours=1) + pgmon.Context.cluster_version = 90623 # Set up a default config - pgmon.update_deep(pgmon.config, pgmon.default_config) + pgmon.update_deep(pgmon.Context.config, pgmon.DEFAULT_CONFIG) # Make sure we can pull the RSS file (we assume the 9.6 series won't be getting # any more updates) self.assertEqual(pgmon.get_latest_version(), 90624) + ## + # json_encode_special + ## + def test_json_encode_special(self): + """ + Test encoding Decimal types as JSON + """ # Confirm that we're getting the right type self.assertFalse(isinstance(Decimal("0.5"), float)) self.assertTrue(isinstance(pgmon.json_encode_special(Decimal("0.5")), float)) From 75c5d76047170f1e2df8558fec38f2a289864a66 Mon Sep 17 00:00:00 2001 From: James Campbell Date: Tue, 23 Sep 2025 01:14:50 -0400 Subject: [PATCH 6/9] Add a metric for tracking the number of granted locks --- sample-config/pgmon-metrics.yml | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sample-config/pgmon-metrics.yml b/sample-config/pgmon-metrics.yml index 38d3b5f..b20662c 100644 --- a/sample-config/pgmon-metrics.yml +++ b/sample-config/pgmon-metrics.yml @@ -230,7 +230,19 @@ metrics: SELECT COUNT(*) FILTER (WHERE has_sequence_privilege(c.oid, 'SELECT,USAGE')) AS visible_sequences, COUNT(*) AS total_sequences FROM pg_class AS c - WHERE relkind = 'S'; + WHERE relkind = 'S' + + locks: + type: row + query: + 0: + SELECT COUNT(*) AS total, + SUM(CASE WHEN granted THEN 1 ELSE 0 END) AS granted + FROM pg_locks + 90400: > + SELECT COUNT(*) AS total, + COUNT(*) FILTER (WHERE granted) AS granted + FROM pg_locks ## From ab039dc412ce9a47b4129cfb3590e8edc47fb47d Mon Sep 17 00:00:00 2001 From: James Campbell Date: Wed, 8 Oct 2025 23:53:45 -0400 Subject: [PATCH 7/9] Improve test coverage * Refactor some functions to improve unit test coverage. * Use the "coverage" module if available along with unit tests. --- .gitignore | 1 + Makefile | 24 +++++++--- src/pgmon.py | 69 ++++++++++++++++++--------- src/test_pgmon.py | 117 +++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 180 insertions(+), 31 deletions(-) diff --git a/.gitignore b/.gitignore index fc7836e..ee7104e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ missing __pycache__ venv build +.coverage diff --git a/Makefile b/Makefile index fa046d0..3f12140 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,12 @@ RPM_VERSION := $(VERSION)-$(RPM_RELEASE) DEB_VERSION := $(VERSION)~rc$(RELEASE) endif +# Python stuff +PYTHON ?= python3 +PYLINT ?= pylint +BLACK ?= black + +HAVE_COVERAGE := $(shell $(PYTHON) -c 'import coverage' 2>/dev/null && echo yes) # Where packages are built BUILD_DIR := build @@ -82,19 +88,23 @@ clean: # Check for lint lint: - pylint src/pgmon.py - pylint src/test_pgmon.py - black --check --diff src/pgmon.py - black --check --diff src/test_pgmon.py + $(PYLINT) src/pgmon.py + $(PYLINT) src/test_pgmon.py + $(BLACK) --check --diff src/pgmon.py + $(BLACK) --check --diff src/test_pgmon.py # Format the code using black format: - black src/pgmon.py - black src/test_pylint.py + $(BLACK) src/pgmon.py + $(BLACK) src/test_pgmon.py # Run unit tests for the script test: - cd src ; python3 -m unittest +ifeq ($(HAVE_COVERAGE),yes) + cd src ; $(PYTHON) -m coverage run -m unittest && python3 -m coverage report -m +else + cd src ; $(PYTHON) -m unittest +endif # Run query tests query-tests: diff --git a/src/pgmon.py b/src/pgmon.py index 738ae64..a506e9f 100755 --- a/src/pgmon.py +++ b/src/pgmon.py @@ -134,6 +134,12 @@ class LatestVersionCheckError(Exception): """ +class InvalidDataError(Exception): + """ + Error indicating query results were somehow invalid + """ + + # Default config settings DEFAULT_CONFIG = { # The address the agent binds to @@ -469,6 +475,46 @@ def json_encode_special(obj): raise TypeError("Cannot serialize object of {}".format(type(obj))) +def json_encode_result(return_type, res): + """ + Return a json string encoding of the results of a query. + + params: + return_type: the expected structure to return. One of: + value, row, column, set + res: the query results + + returns: a json string form of the results + + raises: + ConfigError: when an invalid return_type is given + InvalidDataError: when the query results don't match the return type + """ + try: + if return_type == "value": + if len(res) == 0: + return "" + return str(list(res[0].values())[0]) + + if return_type == "row": + return json.dumps( + res[0] if len(res) > 0 else {}, default=json_encode_special + ) + + if return_type == "column": + return json.dumps( + [list(r.values())[0] for r in res], default=json_encode_special + ) + + if return_type == "set": + return json.dumps(res, default=json_encode_special) + except IndexError as e: + raise InvalidDataError(e) from e + + # If we got to this point, the return type is invalid + raise ConfigError("Invalid query return type: {}".format(return_type)) + + def run_query_no_retry(pool, return_type, query, args): """ Run the query with no explicit retry code @@ -476,31 +522,10 @@ def run_query_no_retry(pool, return_type, query, args): with pool.connection(float(Context.config["connect_timeout"])) as conn: try: with conn.cursor(cursor_factory=RealDictCursor) as curs: - output = None curs.execute(query, args) res = curs.fetchall() - if return_type == "value": - if len(res) == 0: - output = "" - output = str(list(res[0].values())[0]) - elif return_type == "row": - # if len(res) == 0: - # return "[]" - output = json.dumps(res[0], default=json_encode_special) - elif return_type == "column": - # if len(res) == 0: - # return "[]" - output = json.dumps( - [list(r.values())[0] for r in res], default=json_encode_special - ) - elif return_type == "set": - output = json.dumps(res, default=json_encode_special) - else: - raise ConfigError( - "Invalid query return type: {}".format(return_type) - ) - return output + return json_encode_result(return_type, res) except Exception as e: dbname = pool.name if dbname in Context.unhappy_cooldown: diff --git a/src/test_pgmon.py b/src/test_pgmon.py index e75b0aa..3f70d2d 100644 --- a/src/test_pgmon.py +++ b/src/test_pgmon.py @@ -234,6 +234,11 @@ class TestPgmonMethods(unittest.TestCase): # pylint: disable=too-many-public-me d2 = {"foo": {"a": 7}} self.assertRaises(TypeError, pgmon.update_deep, d1, d2) + # Nested mismatched types + d1 = {"foo": {"a": 7}} + d2 = {"foo": [1, 2]} + self.assertRaises(TypeError, pgmon.update_deep, d1, d2) + ## # get_pool ## @@ -1047,7 +1052,7 @@ metrics: # get_latest_version ## - def test_get_latest_version(self): + def test_get_latest_version__basic(self): """ Test getting the latest version from the actual RSS feed """ @@ -1066,7 +1071,7 @@ metrics: # json_encode_special ## - def test_json_encode_special(self): + def test_json_encode_special__basic(self): """ Test encoding Decimal types as JSON """ @@ -1085,3 +1090,111 @@ metrics: self.assertEqual( json.dumps(Decimal("2.5"), default=pgmon.json_encode_special), "2.5" ) + + ## + # json_encode_result + ## + + def test_json_encode_result__value(self): + """ + Test encoding a single value return type, valid inputs + """ + # Empty result + self.assertEqual(pgmon.json_encode_result("value", []), "") + + # Single value + self.assertEqual(pgmon.json_encode_result("value", [{"id": 5}]), "5") + self.assertEqual(pgmon.json_encode_result("value", [{"id": "5"}]), "5") + self.assertEqual(pgmon.json_encode_result("value", [{"key": "word"}]), "word") + self.assertEqual(pgmon.json_encode_result("value", [{"id": Decimal(5)}]), "5") + + def test_json_encode_result__row(self): + """ + Test encoding a row, valid inputs + """ + # Empty result + self.assertEqual(pgmon.json_encode_result("row", {}), "{}") + + # Simple row + self.assertEqual( + pgmon.json_encode_result("row", [{"id": 5, "foo": "bar"}]), + '{"id": 5, "foo": "bar"}', + ) + + # Empry row (not ever likely to be what you want ... but technically not invalid) + self.assertEqual(pgmon.json_encode_result("row", [{}]), "{}") + + def test_json_encode_result__column(self): + """ + Test encoding a column, valid inputs + """ + # Empty result + self.assertEqual(pgmon.json_encode_result("column", []), "[]") + + # Simple column + self.assertEqual(pgmon.json_encode_result("column", [{"id": 5}]), "[5]") + self.assertEqual( + pgmon.json_encode_result("column", [{"id": 5}, {"id": 7}, {"id": 2}]), + "[5, 7, 2]", + ) + + def test_json_encode_result__set(self): + """ + Test encoding a set, valid inputs + """ + # Empty result + self.assertEqual(pgmon.json_encode_result("set", []), "[]") + + # Simple column + self.assertEqual(pgmon.json_encode_result("set", [{"id": 5}]), '[{"id": 5}]') + self.assertEqual( + pgmon.json_encode_result( + "set", [{"id": 5, "foo": "bar"}, {"id": 7, "foo": "baz"}] + ), + '[{"id": 5, "foo": "bar"}, {"id": 7, "foo": "baz"}]', + ) + + def test_json_encode_result__invalid_type(self): + """ + Test requesting an invalid return type + """ + # Make sure an empty list raises the error + self.assertRaises(pgmon.ConfigError, pgmon.json_encode_result, "foo", []) + + # Make sure including data still raises the error + self.assertRaises( + pgmon.ConfigError, + pgmon.json_encode_result, + "foo", + [{"id": 5, "foo": "bar"}], + ) + + def test_json_encode_result__invalid_value(self): + """ + Test invalid data/type combinations for a value + """ + # Note: We should always get a lsit of dicts from psycopg using the RealDictCursor + # Empty row returned + self.assertRaises( + pgmon.InvalidDataError, pgmon.json_encode_result, "value", [{}] + ) + + def test_json_encode_result__invalid_row(self): + """ + Test invalid data/type combinations for a row + """ + # Note: We should always get a lsit of dicts from psycopg using the RealDictCursor + # Note: I can't at this point think of any sort of invalid result psycopg2 should + # produce for this return type. + # This is basically a place holder for now. + True # pylint: disable=pointless-statement + + def test_json_encode_result__invalid_set(self): + """ + Test invalid data/type combinations for a set + """ + # Note: We should always get a lsit of dicts from psycopg using the RealDictCursor + # Note: I can't at this point think of any sort of invalid result psycopg2 should + # produce for this return type. + # This is basically a place holder for now. + True # pylint: disable=pointless-statement From 3ade30c04a6b4f62b1d65fa5b14a3425bbabd453 Mon Sep 17 00:00:00 2001 From: James Campbell Date: Thu, 9 Oct 2025 02:21:17 -0400 Subject: [PATCH 8/9] Add script to shot tempalte coverage * Add a script to compare the defined metrics with those used in the Zabbix template. * Add a Makefile target to run the above script. --- Makefile | 5 ++ zabbix_templates/coverage.py | 129 +++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100755 zabbix_templates/coverage.py diff --git a/Makefile b/Makefile index 3f12140..14e8ad9 100644 --- a/Makefile +++ b/Makefile @@ -110,6 +110,11 @@ endif query-tests: cd tests ; ./run-tests.sh +# Compare the sample metrics with the Zabbix template +template-coverage: + $(PYTHON) zabbix_templates/coverage.py sample-config/pgmon-metrics.yml zabbix_templates/pgmon_templates.yaml + + # Install the script at the specified base directory (common components) install-common: # Set up directories diff --git a/zabbix_templates/coverage.py b/zabbix_templates/coverage.py new file mode 100755 index 0000000..5a9d9f8 --- /dev/null +++ b/zabbix_templates/coverage.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 + +# Compare the items defined in a Zabbix template with the metrics defined in a config file. + +import sys + +import yaml + +# Special built in metrics +SPECIAL = { + "agent_version", + "latest_version_info" +} + + +class NonMetricItemError(Exception): + """ + A given item does not directly use a metric + """ + + +def read_metrics(file): + """ + Read the metrics from a config file and return the list of names + + params: + file: the name of the file to read + + returns: + list of metric named defined in the file + + raises: + yaml.parser.ParserError: invalid yaml file + """ + names = set() + config = None + + with open(file, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + + try: + for m in config["metrics"].keys(): + names.add(m) + except KeyError: + pass + + return names + + +def extract_metric(item): + """ + Extract the metric from an item definition + + params: + item: the item/discovery/prototype definition dict + + returns: + the name of the metric used in the item + + raises: + NonMetricItemError: the item does not directly use a metric + """ + try: + if item["type"] == "HTTP_AGENT": + url = item["url"] + if url.startswith("http://localhost:{$AGENT_PORT}"): + return url.split("/")[-1] + except KeyError: + raise NonMetricItemError() + + raise NonMetricItemError() + + +def read_template(file): + """ + Read the items from a Zabbix template and return the list of metric names + + params: + file: the name of the file to read + + returns: + list of metric named used in the file + + raises: + yaml.parser.ParserError: invalid yaml file + """ + names = set() + config = None + + with open(file, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + + try: + for template in config["zabbix_export"]["templates"]: + for item in template["items"]: + try: + names.add(extract_metric(item)) + except NonMetricItemError: + pass + + for rule in template["discovery_rules"]: + try: + names.add(extract_metric(rule)) + except NonMetricItemError: + pass + + for proto in rule["item_prototypes"]: + try: + names.add(extract_metric(proto)) + except NonMetricItemError: + pass + except KeyError: + pass + + return names + + +if __name__ == '__main__': + config_file = sys.argv[1] + config_metrics = read_metrics(config_file) + + template_file = sys.argv[2] + template_metrics = read_template(template_file) - SPECIAL + + config_only = config_metrics - template_metrics + template_only = template_metrics - config_metrics + + print("Config only: {}".format(sorted(list(config_only)))) + print("Template only: {}".format(sorted(list(template_only)))) From e7b97a9e88c6ef47dd99312e7f26f57a75d4cf9d Mon Sep 17 00:00:00 2001 From: James Campbell Date: Fri, 17 Oct 2025 01:24:33 -0400 Subject: [PATCH 9/9] Add items to template * Add items to the Zabbix template for: - Backgroubd writer / checkpoint stats - Lock stats - Connection states --- sample-config/pgmon-metrics.yml | 2 + zabbix_templates/pgmon_templates.yaml | 507 ++++++++++++++++++++++++++ 2 files changed, 509 insertions(+) diff --git a/sample-config/pgmon-metrics.yml b/sample-config/pgmon-metrics.yml index b20662c..92b83a2 100644 --- a/sample-config/pgmon-metrics.yml +++ b/sample-config/pgmon-metrics.yml @@ -318,3 +318,5 @@ metrics: type: value query: 0: SELECT count(*) AS ntables FROM pg_stat_user_tables + + #sequence_usage: diff --git a/zabbix_templates/pgmon_templates.yaml b/zabbix_templates/pgmon_templates.yaml index 1f552d1..260dec3 100644 --- a/zabbix_templates/pgmon_templates.yaml +++ b/zabbix_templates/pgmon_templates.yaml @@ -49,6 +49,206 @@ zabbix_export: tags: - tag: Application value: PostgreSQL + - uuid: 1dd74025ca0e463bb9eee5cb473f30db + name: 'Total buffers allocated' + type: DEPENDENT + key: 'pgmon.bgwriter[buffers_alloc,total]' + delay: '0' + history: 90d + description: 'Total number of shared buffers that have been allocated' + preprocessing: + - type: JSONPATH + parameters: + - $.buffers_alloc + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: 4ab405996e71444a8113b95c65d73be2 + name: 'Total buffers written by backends' + type: DEPENDENT + key: 'pgmon.bgwriter[buffers_backend,total]' + delay: '0' + history: 90d + description: 'Total number of shared buffers written by backends' + preprocessing: + - type: JSONPATH + parameters: + - $.buffers_backend + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: e8b440a1d6ff4ca6b3cb27e2e76f9d06 + name: 'Total number of fsyncs from backends' + type: DEPENDENT + key: 'pgmon.bgwriter[buffers_backend_fsync,total]' + delay: '0' + history: 90d + description: 'Total number of times backends have issued their own fsync calls' + preprocessing: + - type: JSONPATH + parameters: + - $.buffers_backend_fsync + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: f8a14885edf34b5cbf2e92432e8fa4c2 + name: 'Total buffers written during checkpoints' + type: DEPENDENT + key: 'pgmon.bgwriter[buffers_checkpoint,total]' + delay: '0' + history: 90d + description: 'Total number of shared buffers written during checkpoints' + preprocessing: + - type: JSONPATH + parameters: + - $.buffers_checkpoint + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: f09956f7aaad4b99b0d67339edf26dcf + name: 'Total buffers written by the background writer' + type: DEPENDENT + key: 'pgmon.bgwriter[buffers_clean,total]' + delay: '0' + history: 90d + description: 'Total number of shared buffers written by the background writer' + preprocessing: + - type: JSONPATH + parameters: + - $.buffers_clean + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: f1c6bd9346b14964bd492d7ccf8baf50 + name: 'Total checkpoints due to changes' + type: DEPENDENT + key: 'pgmon.bgwriter[checkpoints_changes,total]' + delay: '0' + history: 90d + description: 'Total number of checkpoints that have occurred due to the number of row changes' + preprocessing: + - type: JSONPATH + parameters: + - $.checkpoints_req + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: 04c4587d2f2f4f5fbad866d70938cbb3 + name: 'Total checkpoints due to time limit' + type: DEPENDENT + key: 'pgmon.bgwriter[checkpoints_timed,total]' + delay: '0' + history: 90d + description: 'Total number of checkpoints that have occurred due to the configured time limit' + preprocessing: + - type: JSONPATH + parameters: + - $.checkpoints_timed + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: 580a7e632b644aafa1166c534185ba92 + name: 'Total time spent syncing files for checkpoints' + type: DEPENDENT + key: 'pgmon.bgwriter[checkpoint_sync_time,total]' + delay: '0' + history: 90d + units: s + description: 'Total time spent syncing files for checkpoints' + preprocessing: + - type: JSONPATH + parameters: + - $.checkpoint_sync_time + - type: MULTIPLIER + parameters: + - '0.001' + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: 4faa33b55fae4fc68561020636573a46 + name: 'Total time spent writing checkpoints' + type: DEPENDENT + key: 'pgmon.bgwriter[checkpoint_write_time,total]' + delay: '0' + history: 90d + units: s + description: 'Total time spent writing checkpoints' + preprocessing: + - type: JSONPATH + parameters: + - $.checkpoint_write_time + - type: MULTIPLIER + parameters: + - '0.001' + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: c0c829c6cdc046a4b5d216e10362e4bb + name: 'Total number of times the background writer stopped' + type: DEPENDENT + key: 'pgmon.bgwriter[maxwritten_clean,total]' + delay: '0' + history: 90d + description: 'Total number of times the background writer stopped due to reaching the maximum number of buffers it''s allowed to write in a single scan' + preprocessing: + - type: JSONPATH + parameters: + - $.maxwritten_clean + master_item: + key: 'pgmon[bgwriter]' + tags: + - tag: Application + value: PostgreSQL + - uuid: 645fef7b55cd48c69b11de3201e88d78 + name: 'Number of locks' + type: DEPENDENT + key: pgmon.locks.count + delay: '0' + history: 90d + description: 'Total number of locks in any database' + preprocessing: + - type: JSONPATH + parameters: + - $.total + master_item: + key: 'pgmon[locks]' + tags: + - tag: Application + value: PostgreSQL + - uuid: a56ad753b1f341928867a47b14ed8b77 + name: 'Number of granted locks' + type: DEPENDENT + key: pgmon.locks.granted + delay: '0' + history: 90d + description: 'Total number of granted locks in any database' + preprocessing: + - type: JSONPATH + parameters: + - $.granted + master_item: + key: 'pgmon[locks]' + tags: + - tag: Application + value: PostgreSQL - uuid: de1fa757395440118026f4c7a7c4ebbe name: 'PostgreSQL latest supported version' type: DEPENDENT @@ -95,6 +295,21 @@ zabbix_export: tags: - tag: Application value: PostgreSQL + - uuid: 91baea76ebb240b19c5a5d3913d0b989 + name: 'PostgreSQL BGWriter Info' + type: HTTP_AGENT + key: 'pgmon[bgwriter]' + delay: 5m + history: '0' + value_type: TEXT + trends: '0' + description: 'Maximum age of any frozen XID and MXID in any database' + url: 'http://localhost:{$AGENT_PORT}/bgwriter' + tags: + - tag: Application + value: PostgreSQL + - tag: Type + value: Raw - uuid: 06b1d082ed1e4796bc31cc25f7db6326 name: 'PostgreSQL Backend IO Info' type: HTTP_AGENT @@ -124,6 +339,21 @@ zabbix_export: value: PostgreSQL - tag: Type value: Raw + - uuid: 4627c156923f4d53bc04789b9b88c133 + name: 'PostgreSQL Lock Info' + type: HTTP_AGENT + key: 'pgmon[locks]' + delay: 5m + history: '0' + value_type: TEXT + trends: '0' + description: 'Maximum age of any frozen XID and MXID in any database' + url: 'http://localhost:{$AGENT_PORT}/locks' + tags: + - tag: Application + value: PostgreSQL + - tag: Type + value: Raw - uuid: 8706eccb7edc4fa394f552fc31f401a9 name: 'PostgreSQL ID Age Info' type: HTTP_AGENT @@ -170,6 +400,283 @@ zabbix_export: enabled_lifetime_type: DISABLE_AFTER enabled_lifetime: 1d item_prototypes: + - uuid: 28d5fe3a4f6848149afed33aa645f677 + name: 'Max connection age: active on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[active,age,{#DBNAME}]' + delay: '0' + history: 90d + value_type: FLOAT + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "active")].max_state_time' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: ff1b817c3d1f43dc8b49bfd0dcb0d10a + name: 'Connection count: active on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[active,count,{#DBNAME}]' + delay: '0' + history: 90d + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "active")].backend_count' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 4c0b9ca43adb45b895e3ba2e200e501e + name: 'Max connection age: disabled on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[disabled,age,{#DBNAME}]' + delay: '0' + history: 90d + value_type: FLOAT + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "disabled")].max_state_time' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 280dfc9a84e8425399164e6f3e91cf92 + name: 'Connection count: disabled on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[disabled,count,{#DBNAME}]' + delay: '0' + history: 90d + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "disabled")].backend_count' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 63e62108657f47aaa29f7ec6499e45fc + name: 'Max connection age: fastpath function call on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[fastpath_function,age,{#DBNAME}]' + delay: '0' + history: 90d + value_type: FLOAT + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "fastpath function call")].max_state_time' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 33193e62f1ad4da5b2e30677581e5305 + name: 'Connection count: fastpath function call on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[fastpath_function,count,{#DBNAME}]' + delay: '0' + history: 90d + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "fastpath function call")].backend_count' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 92e2366a56424bc18a88606417eae6e4 + name: 'Max connection age: idle on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[idle,age,{#DBNAME}]' + delay: '0' + history: 90d + value_type: FLOAT + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "idle")].max_state_time' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 505bbbb4c7b745d8bab8f3b33705b76b + name: 'Connection count: idle on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[idle,count,{#DBNAME}]' + delay: '0' + history: 90d + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "idle")].backend_count' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 282494d5900c4c2e8abd298160c7cbb6 + name: 'Max connection age: idle in transaction (aborted) on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[idle_aborted,age,{#DBNAME}]' + delay: '0' + history: 90d + value_type: FLOAT + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "idle in transaction (aborted)")].max_state_time' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 0da5075d79234602836fc7967a31f1cc + name: 'Connection count: idle in transaction (aborted) on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[idle_aborted,count,{#DBNAME}]' + delay: '0' + history: 90d + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "idle in transaction (aborted)")].backend_count' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 47aa3f7f4ff1473aae425b4c89472ab4 + name: 'Max connection age: idle in transaction on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[idle_transaction,age,{#DBNAME}]' + delay: '0' + history: 90d + value_type: FLOAT + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "idle in transaction")].max_state_time' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 95519bf0aefd49799601e1bbb488ec90 + name: 'Connection count: idle in transaction on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[idle_transaction,count,{#DBNAME}]' + delay: '0' + history: 90d + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "idle in transaction")].backend_count' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: b2ae38a5733d49ceb1a31e774c11785b + name: 'Max connection age: starting on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[starting,age,{#DBNAME}]' + delay: '0' + history: 90d + value_type: FLOAT + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "starting")].max_state_time' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: c1890f2d7ce84a7ebaaadf266d3ffc51 + name: 'Connection count: starting on {#DBNAME}' + type: DEPENDENT + key: 'pgmon_connection_states[starting,count,{#DBNAME}]' + delay: '0' + history: 90d + description: 'Number of disk blocks read in this database' + preprocessing: + - type: JSONPATH + parameters: + - '$[?(@.state == "starting")].backend_count' + master_item: + key: 'pgmon_connection_states[{#DBNAME}]' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - uuid: 131653f883f448a7b861b16bc4366dfd + name: 'Database Connection States for {#DBNAME}' + type: HTTP_AGENT + key: 'pgmon_connection_states[{#DBNAME}]' + history: '0' + value_type: TEXT + trends: '0' + url: 'http://localhost:{$AGENT_PORT}/activity' + query_fields: + - name: dbname + value: '{#DBNAME}' + tags: + - tag: Application + value: PostgreSQL + - tag: Database + value: '{#DBNAME}' + - tag: Type + value: Raw - uuid: a30babe4a6f4440bba2a3ee46eff7ce2 name: 'Time spent executing statements on {#DBNAME}' type: DEPENDENT