Compare commits

...

2 Commits

Author SHA1 Message Date
c0e1531083
Add query test script and test mode
* Add a mode to test all metric queries

* Add a script to run query tests against different versions of
  PostgeSQL

* Add Docker elements for query testing

* Switch to using a --config flag when specifying the config file

* Fix some metric queries

* Allow the agent address to be configured

* Allow the sslmode connection parameter to be configured
2025-05-22 14:53:25 -04:00
529bef9679
Add ability to run query tests 2025-05-18 12:52:32 -04:00
11 changed files with 250 additions and 150 deletions

View File

@ -19,6 +19,6 @@ start_pre() {
} }
command="/usr/bin/pgmon" command="/usr/bin/pgmon"
command_args="'$CONFIG_FILE'" command_args="-c '$CONFIG_FILE'"
command_background="true" command_background="true"
command_user="${PGMON_USER}:${PGMON_GROUP}" command_user="${PGMON_USER}:${PGMON_GROUP}"

View File

@ -11,7 +11,8 @@ metrics:
discover_slots: discover_slots:
type: set type: set
query: query:
0: SELECT slot_name, plugin, slot_type, database, temporary, active FROM pg_replication_slots 0: SELECT slot_name, plugin, slot_type, database, false as temporary, active FROM pg_replication_slots
100000: SELECT slot_name, plugin, slot_type, database, temporary, active FROM pg_replication_slots
# cluster-wide metrics # cluster-wide metrics
version: version:
@ -29,6 +30,8 @@ metrics:
query: query:
0: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, blk_read_time, blk_write_time, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s 0: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, blk_read_time, blk_write_time, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s
140000: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, COALESCE(checksum_failures, 0) AS checksum_failures, blk_read_time, blk_write_time, session_time, active_time, idle_in_transaction_time, sessions, sessions_abandoned, sessions_fatal, sessions_killed, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s 140000: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, COALESCE(checksum_failures, 0) AS checksum_failures, blk_read_time, blk_write_time, session_time, active_time, idle_in_transaction_time, sessions, sessions_abandoned, sessions_fatal, sessions_killed, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s
test_args:
dbname: postgres
# Debugging # Debugging
ntables: ntables:
@ -40,7 +43,9 @@ metrics:
rep_stats: rep_stats:
type: row type: row
query: query:
0: SELECT * FROM pg_stat_database WHERE client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') = '{repid}' 0: SELECT * FROM pg_stat_replication WHERE client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') = '{repid}'
test_args:
repid: 127.0.0.1_test_rep
# Debugging # Debugging
sleep: sleep:
@ -52,4 +57,7 @@ metrics:
slot_stats: slot_stats:
type: row type: row
query: query:
0: SELECT active_pid, xmin, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS restart_bytes, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}' 0: SELECT active_pid, xmin, pg_xlog_location_diff(pg_current_xlog_location(), restart_lsn) AS restart_bytes, pg_xlog_location_diff(pg_current_xlog_location(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}'
100000: SELECT active_pid, xmin, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS restart_bytes, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}'
test_args:
slot: test_slot

View File

@ -1,3 +1,6 @@
# The address the agent binds to
#address: 127.0.0.1
# The port the agent listens on for requests # The port the agent listens on for requests
#port: 5400 #port: 5400
@ -26,6 +29,9 @@
# Default database to connect to when none is specified for a metric # Default database to connect to when none is specified for a metric
#dbname: 'postgres' #dbname: 'postgres'
# SSL connection mode
#ssl_mode: require
# Timeout for getting a connection slot from a pool # Timeout for getting a connection slot from a pool
#pool_slot_timeout: 5 #pool_slot_timeout: 5

View File

@ -4,6 +4,7 @@ import yaml
import json import json
import time import time
import os import os
import sys
import argparse import argparse
import logging import logging
@ -74,12 +75,18 @@ class UnhappyDBError(Exception):
pass pass
class UnknownMetricError(Exception):
pass
class MetricVersionError(Exception): class MetricVersionError(Exception):
pass pass
# Default config settings # Default config settings
default_config = { default_config = {
# The address the agent binds to
"address": "127.0.0.1",
# The port the agent listens on for requests # The port the agent listens on for requests
"port": 5400, "port": 5400,
# Min PostgreSQL connection pool size (per database) # Min PostgreSQL connection pool size (per database)
@ -98,6 +105,8 @@ default_config = {
"dbport": 5432, "dbport": 5432,
# Default database to connect to when none is specified for a metric # Default database to connect to when none is specified for a metric
"dbname": "postgres", "dbname": "postgres",
# SSL connection mode
"ssl_mode": "require",
# Timeout for getting a connection slot from a pool # Timeout for getting a connection slot from a pool
"pool_slot_timeout": 5, "pool_slot_timeout": 5,
# PostgreSQL connection timeout (seconds) # PostgreSQL connection timeout (seconds)
@ -320,6 +329,7 @@ def get_pool(dbname):
# lock # lock
if dbname not in connections: if dbname not in connections:
log.info("Creating connection pool for: {}".format(dbname)) log.info("Creating connection pool for: {}".format(dbname))
# Actually create the connection pool
connections[dbname] = ConnectionPool( connections[dbname] = ConnectionPool(
dbname, dbname,
int(config["min_pool_size"]), int(config["min_pool_size"]),
@ -329,7 +339,7 @@ def get_pool(dbname):
port=config["dbport"], port=config["dbport"],
user=config["dbuser"], user=config["dbuser"],
connect_timeout=int(config["connect_timeout"]), connect_timeout=int(config["connect_timeout"]),
sslmode="require", sslmode=config["ssl_mode"],
) )
# Clear the unhappy indicator if present # Clear the unhappy indicator if present
unhappy_cooldown.pop(dbname, None) unhappy_cooldown.pop(dbname, None)
@ -377,10 +387,16 @@ def run_query_no_retry(pool, return_type, query, args):
res = curs.fetchall() res = curs.fetchall()
if return_type == "value": if return_type == "value":
if len(res) == 0:
return ""
return str(list(res[0].values())[0]) return str(list(res[0].values())[0])
elif return_type == "row": elif return_type == "row":
if len(res) == 0:
return "[]"
return json.dumps(res[0]) return json.dumps(res[0])
elif return_type == "column": elif return_type == "column":
if len(res) == 0:
return "[]"
return json.dumps([list(r.values())[0] for r in res]) return json.dumps([list(r.values())[0] for r in res])
elif return_type == "set": elif return_type == "set":
return json.dumps(res) return json.dumps(res)
@ -388,7 +404,7 @@ def run_query_no_retry(pool, return_type, query, args):
dbname = pool.name dbname = pool.name
if dbname in unhappy_cooldown: if dbname in unhappy_cooldown:
raise UnhappyDBError() raise UnhappyDBError()
elif conn.broken: elif conn.closed != 0:
raise DisconnectedError() raise DisconnectedError()
else: else:
raise raise
@ -466,6 +482,53 @@ def get_cluster_version():
return cluster_version return cluster_version
def sample_metric(dbname, metric_name, args, retry=True):
"""
Run the appropriate query for the named metric against the specified database
"""
# Get the metric definition
try:
metric = config["metrics"][metric_name]
except KeyError:
raise UnknownMetricError("Unknown metric: {}".format(metric_name))
# Get the connection pool for the database, or create one if it doesn't
# already exist.
pool = get_pool(dbname)
# Identify the PostgreSQL version
version = get_cluster_version()
# Get the query version
query = get_query(metric, version)
# Execute the quert
if retry:
return run_query(pool, metric["type"], query, args)
else:
return run_query_no_retry(pool, metric["type"], query, args)
def test_queries():
"""
Run all of the metric queries against a database and check the results
"""
# We just use the default db for tests
dbname = config["dbname"]
# Loop through all defined metrics.
for name, metric in config["metrics"].items():
# If the metric has arguments to use while testing, grab those
args = metric.get("test_args", {})
# Run the query without the ability to retry.
res = sample_metric(dbname, name, args, retry=False)
# Compare the result to the provided sample results
# TODO
print("{} -> {}".format(name, res))
# Return the number of errors
# TODO
return 0
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
""" """
This is our request handling server. It is responsible for listening for This is our request handling server. It is responsible for listening for
@ -494,10 +557,10 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
""" """
# Parse the URL # Parse the URL
parsed_path = urlparse(self.path) parsed_path = urlparse(self.path)
name = parsed_path.path.strip("/") metric_name = parsed_path.path.strip("/")
parsed_query = parse_qs(parsed_path.query) parsed_query = parse_qs(parsed_path.query)
if name == "agent_version": if metric_name == "agent_version":
self._reply(200, VERSION) self._reply(200, VERSION)
return return
@ -505,60 +568,31 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
# single values, just grab the first from each. # single values, just grab the first from each.
args = {key: values[0] for key, values in parsed_query.items()} args = {key: values[0] for key, values in parsed_query.items()}
# Get the metric definition
try:
metric = config["metrics"][name]
except KeyError:
log.error("Unknown metric: {}".format(name))
self._reply(404, "Unknown metric")
return
# Get the dbname. If none was provided, use the default from the # Get the dbname. If none was provided, use the default from the
# config. # config.
dbname = args.get("dbname", config["dbname"]) dbname = args.get("dbname", config["dbname"])
# Get the connection pool for the database, or create one if it doesn't # Sample the metric
# already exist.
try: try:
pool = get_pool(dbname) self._reply(200, sample_metric(dbname, metric_name, args))
except UnhappyDBError:
log.info("Database {} is unhappy, please be patient".format(dbname))
self._reply(503, "Database unavailable")
return return
except UnknownMetricError as e:
# Identify the PostgreSQL version log.error("Unknown metric: {}".format(metric_name))
try: self._reply(404, "Unknown metric")
version = get_cluster_version()
except UnhappyDBError:
return return
except Exception as e: except MetricVersionError as e:
if dbname in unhappy_cooldown: log.error(
log.info("Database {} is unhappy, please be patient".format(dbname)) "Failed to find a version of {} for {}".format(metric_name, version)
self._reply(503, "Database unavailable") )
else:
log.error("Failed to get PostgreSQL version: {}".format(e))
self._reply(500, "Error getting DB version")
return
# Get the query version
try:
query = get_query(metric, version)
except KeyError:
log.error("Failed to find a version of {} for {}".format(name, version))
self._reply(404, "Unsupported version") self._reply(404, "Unsupported version")
return return
except UnhappyDBError as e:
# Execute the quert
try:
self._reply(200, run_query(pool, metric["type"], query, args))
return
except Exception as e:
if dbname in unhappy_cooldown:
log.info("Database {} is unhappy, please be patient".format(dbname)) log.info("Database {} is unhappy, please be patient".format(dbname))
self._reply(503, "Database unavailable") self._reply(503, "Database unavailable")
else: return
except Exception as e:
log.error("Error running query: {}".format(e)) log.error("Error running query: {}".format(e))
self._reply(500, "Error running query") self._reply(500, "Unexpected error: {}".format(e))
return return
def _reply(self, code, content): def _reply(self, code, content):
@ -579,12 +613,17 @@ if __name__ == "__main__":
) )
parser.add_argument( parser.add_argument(
"config_file", "-c",
"--config_file",
default="pgmon.yml", default="pgmon.yml",
nargs="?", nargs="?",
help="The config file to read (default: %(default)s)", help="The config file to read (default: %(default)s)",
) )
parser.add_argument(
"-t", "--test", action="store_true", help="Run query tests and exit"
)
args = parser.parse_args() args = parser.parse_args()
# Set the config file path # Set the config file path
@ -593,8 +632,16 @@ if __name__ == "__main__":
# Read the config file # Read the config file
read_config(config_file) read_config(config_file)
# Run query tests and exit if test mode is enabled
if args.test:
errors = test_queries()
if errors > 0:
sys.exit(1)
else:
sys.exit(0)
# Set up the http server to receive requests # Set up the http server to receive requests
server_address = ("127.0.0.1", config["port"]) server_address = (config["address"], config["port"])
httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
# Set up the signal handler # Set up the signal handler

View File

@ -7,7 +7,7 @@ After=network.target
[Service] [Service]
EnvironmentFile=/etc/pgmon/%i-service.conf EnvironmentFile=/etc/pgmon/%i-service.conf
User=${SERVICE_USER:-postgres} User=${SERVICE_USER:-postgres}
ExecStart=/usr/local/bin/pgmon /etc/pgmon/%i.yml ExecStart=/usr/local/bin/pgmon -c /etc/pgmon/%i.yml
ExecReload=kill -HUP $MAINPID ExecReload=kill -HUP $MAINPID
Restart=on-failure Restart=on-failure
Type=exec Type=exec

22
tests/Dockerfile Normal file
View File

@ -0,0 +1,22 @@
FROM alpine:3.21
RUN apk update && \
apk add py3-psycopg2 \
py3-yaml \
tini
WORKDIR /app
COPY src/pgmon.py /app/
COPY sample-config/pgmon-metrics.yml /app/
COPY tests/test-config.yml /app/
COPY --chmod=0600 --chown=postgres:postgres tests/pgpass /root/.pgpass
ENTRYPOINT ["tini", "--"]
EXPOSE 5400
CMD ["/app/pgmon.py", "-c", "/app/test-config.yml", "--test"]

32
tests/docker-compose.yml Normal file
View File

@ -0,0 +1,32 @@
---
services:
agent:
image: pgmon
build:
context: ..
dockerfile: tests/Dockerfile
ports:
- :5400
depends_on:
db:
condition: service_healthy
db:
image: "postgres:${PGTAG:-17-bookworm}"
ports:
- :5432
environment:
POSTGRES_PASSWORD: secret
healthcheck:
#test: [ "CMD", "pg_isready", "-U", "postgres" ]
test: [ "CMD-SHELL", "pg_controldata /var/lib/postgresql/data/ | grep -q 'in production'" ]
interval: 5s
timeout: 2s
retries: 10
command: >
postgres -c ssl=on
-c ssl_cert_file='/etc/ssl/certs/ssl-cert-snakeoil.pem'
-c ssl_key_file='/etc/ssl/private/ssl-cert-snakeoil.key'
-c listen_addresses='*'

1
tests/pgpass Normal file
View File

@ -0,0 +1 @@
db:5432:*:postgres:secret

62
tests/run-tests.sh Executable file
View File

@ -0,0 +1,62 @@
#!/bin/bash
# Versions to test
versions=( $@ )
# If we weren't given any versions, test them all
if [ ${#versions[@]} -eq 0 ]
then
versions=( 9.2 9.6 10 11 12 13 14 15 16 17 )
fi
# Image tags to use
declare -A images=()
images["9.2"]='9.2'
images["9.6"]='9.6-bullseye'
images["10"]='10-bullseye'
images["11"]='11-bookworm'
images["12"]='12-bookworm'
images["13"]='13-bookworm'
images["14"]='14-bookworm'
images["15"]='15-bookworm'
images["16"]='16-bookworm'
images["17"]='17-bookworm'
declare -A results=()
# Make sure everything's down to start with
docker compose down
# Make sure our agent container is up to date
docker compose build agent
for version in "${versions[@]}"
do
echo
echo "Testing: PostgreSQL ${version}"
# Specify the version we're testing against
export PGTAG="${images["$version"]}"
# Start the containers
docker compose up --exit-code-from=agent agent
rc=$?
results["$version"]=$rc
# Destroy the containers
docker compose down
done
echo
echo
for v in "${versions[@]}"
do
case "${results["$v"]}" in
0) msg="OK" ;;
1) msg="Query failure detected" ;;
18) msg="Docker image error: 18" ;;
*) msg="Unexpected error: ${results["$v"]}" ;;
esac
echo "$v -> $msg"
done

View File

@ -1,94 +0,0 @@
#!/usr/bin/env python3
from testcontainers import PostgresContainer
import requests
import yaml
import sys
pg_versions = [9.2, 9.6, 10, 11, 12, 13, 14, 15, 16, 17]
pgmon_port = 93849
tests = {}
container = None
def std_version(version):
if version[0] == "9":
return int(f"{version[0]}0{version[1]}00")
else:
return int(f"{version}0000")
def run_test(metric, params, status, check):
"""
Validate the return code and restults of a query
params:
metric: The name of the metric to test
params: A dictionary of query parameters to use when testing
status: The expected status code
check: A regular expression to validate the results (or None)
"""
result = requests.get(f"http://localhost:{pgmon_port}/{metric}", params=params)
if result.status_code != status:
print(
f"FAIL: {metric}[{params}] returned wrong status code: {result.status_code}"
)
return False
if re.match(check, result.text):
print(f"SUCCESS: {metric}[{params}]")
return True
else:
print(f"FAIL: {metric}[{params}] result is invalid, got:\n {result.text}")
return False
def run_all_tests(version):
"""
Run all defined tests against the current running instance
params:
version: The PostgreSQL version currently being tested (server_version_num format)
"""
errors = 0
# Convert versions like 12 to 120000
version_num = std_version(version)
# Loop through all of the metrics to test.
for metric in tests.keys():
params = metric.get("params", {})
status = 200
check = ""
# Find the correct version of the status and check parameters (assuming there are any).
# If there are any check conditions, find the highest version that does not exceed the version we're currently testing against.
# To do this, we order the keys (versions) in reverse, so we start with the highest.
for v in reversed(sorted(metric.get("expect", {}).keys())):
# If we've reached a version <= the one we're testing use it.
if int(v) <= version_num:
status = metric["expect"][v]["status"]
check = metric["expect"][v]["check"]
break
if not run_test(metric, metrics[metric].get(params, {}), status, check):
errors += 1
return errors
def start_test_db(version):
# container = PostgresContainer()
pass
# Read the test script
try:
with open("metric_tests.yml", "r") as f:
tests = yaml.safe_load(f)
except yaml.parser.ParserError as e:
sys.exit("Failed to parse metrics_test.yml: {e}")

16
tests/test-config.yml Normal file
View File

@ -0,0 +1,16 @@
---
# Bind to all interfaces so we can submit requests from outside the test container
address: 0.0.0.0
# We always just connect to the db container
dbhost: db
dbport: 5432
dbuser: postgres
# Allow some insecure SSL parameters for the 9.2 test
ssl_ciphers: DEFAULT@SECLEVEL=1
# Pull in the standard metrics
include:
- pgmon-metrics.yml