Add query test script and test mode

* Add a mode to test all metric queries

* Add a script to run query tests against different versions of
  PostgeSQL

* Add Docker elements for query testing

* Switch to using a --config flag when specifying the config file

* Fix some metric queries

* Allow the agent address to be configured

* Allow the sslmode connection parameter to be configured
This commit is contained in:
James Campbell 2025-05-22 14:53:25 -04:00
parent 529bef9679
commit c0e1531083
Signed by: james
GPG Key ID: 2287C33A40DC906A
11 changed files with 174 additions and 108 deletions

View File

@ -19,6 +19,6 @@ start_pre() {
} }
command="/usr/bin/pgmon" command="/usr/bin/pgmon"
command_args="'$CONFIG_FILE'" command_args="-c '$CONFIG_FILE'"
command_background="true" command_background="true"
command_user="${PGMON_USER}:${PGMON_GROUP}" command_user="${PGMON_USER}:${PGMON_GROUP}"

View File

@ -11,7 +11,8 @@ metrics:
discover_slots: discover_slots:
type: set type: set
query: query:
0: SELECT slot_name, plugin, slot_type, database, temporary, active FROM pg_replication_slots 0: SELECT slot_name, plugin, slot_type, database, false as temporary, active FROM pg_replication_slots
100000: SELECT slot_name, plugin, slot_type, database, temporary, active FROM pg_replication_slots
# cluster-wide metrics # cluster-wide metrics
version: version:
@ -29,6 +30,8 @@ metrics:
query: query:
0: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, blk_read_time, blk_write_time, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s 0: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, blk_read_time, blk_write_time, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s
140000: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, COALESCE(checksum_failures, 0) AS checksum_failures, blk_read_time, blk_write_time, session_time, active_time, idle_in_transaction_time, sessions, sessions_abandoned, sessions_fatal, sessions_killed, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s 140000: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, COALESCE(checksum_failures, 0) AS checksum_failures, blk_read_time, blk_write_time, session_time, active_time, idle_in_transaction_time, sessions, sessions_abandoned, sessions_fatal, sessions_killed, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s
test_args:
dbname: postgres
# Debugging # Debugging
ntables: ntables:
@ -40,7 +43,9 @@ metrics:
rep_stats: rep_stats:
type: row type: row
query: query:
0: SELECT * FROM pg_stat_database WHERE client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') = '{repid}' 0: SELECT * FROM pg_stat_replication WHERE client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') = '{repid}'
test_args:
repid: 127.0.0.1_test_rep
# Debugging # Debugging
sleep: sleep:
@ -52,4 +57,7 @@ metrics:
slot_stats: slot_stats:
type: row type: row
query: query:
0: SELECT active_pid, xmin, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS restart_bytes, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}' 0: SELECT active_pid, xmin, pg_xlog_location_diff(pg_current_xlog_location(), restart_lsn) AS restart_bytes, pg_xlog_location_diff(pg_current_xlog_location(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}'
100000: SELECT active_pid, xmin, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS restart_bytes, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}'
test_args:
slot: test_slot

View File

@ -1,3 +1,6 @@
# The address the agent binds to
#address: 127.0.0.1
# The port the agent listens on for requests # The port the agent listens on for requests
#port: 5400 #port: 5400
@ -26,6 +29,9 @@
# Default database to connect to when none is specified for a metric # Default database to connect to when none is specified for a metric
#dbname: 'postgres' #dbname: 'postgres'
# SSL connection mode
#ssl_mode: require
# Timeout for getting a connection slot from a pool # Timeout for getting a connection slot from a pool
#pool_slot_timeout: 5 #pool_slot_timeout: 5

View File

@ -85,6 +85,8 @@ class MetricVersionError(Exception):
# Default config settings # Default config settings
default_config = { default_config = {
# The address the agent binds to
"address": "127.0.0.1",
# The port the agent listens on for requests # The port the agent listens on for requests
"port": 5400, "port": 5400,
# Min PostgreSQL connection pool size (per database) # Min PostgreSQL connection pool size (per database)
@ -103,6 +105,8 @@ default_config = {
"dbport": 5432, "dbport": 5432,
# Default database to connect to when none is specified for a metric # Default database to connect to when none is specified for a metric
"dbname": "postgres", "dbname": "postgres",
# SSL connection mode
"ssl_mode": "require",
# Timeout for getting a connection slot from a pool # Timeout for getting a connection slot from a pool
"pool_slot_timeout": 5, "pool_slot_timeout": 5,
# PostgreSQL connection timeout (seconds) # PostgreSQL connection timeout (seconds)
@ -325,6 +329,7 @@ def get_pool(dbname):
# lock # lock
if dbname not in connections: if dbname not in connections:
log.info("Creating connection pool for: {}".format(dbname)) log.info("Creating connection pool for: {}".format(dbname))
# Actually create the connection pool
connections[dbname] = ConnectionPool( connections[dbname] = ConnectionPool(
dbname, dbname,
int(config["min_pool_size"]), int(config["min_pool_size"]),
@ -334,7 +339,7 @@ def get_pool(dbname):
port=config["dbport"], port=config["dbport"],
user=config["dbuser"], user=config["dbuser"],
connect_timeout=int(config["connect_timeout"]), connect_timeout=int(config["connect_timeout"]),
sslmode="require", sslmode=config["ssl_mode"],
) )
# Clear the unhappy indicator if present # Clear the unhappy indicator if present
unhappy_cooldown.pop(dbname, None) unhappy_cooldown.pop(dbname, None)
@ -382,10 +387,16 @@ def run_query_no_retry(pool, return_type, query, args):
res = curs.fetchall() res = curs.fetchall()
if return_type == "value": if return_type == "value":
if len(res) == 0:
return ""
return str(list(res[0].values())[0]) return str(list(res[0].values())[0])
elif return_type == "row": elif return_type == "row":
if len(res) == 0:
return "[]"
return json.dumps(res[0]) return json.dumps(res[0])
elif return_type == "column": elif return_type == "column":
if len(res) == 0:
return "[]"
return json.dumps([list(r.values())[0] for r in res]) return json.dumps([list(r.values())[0] for r in res])
elif return_type == "set": elif return_type == "set":
return json.dumps(res) return json.dumps(res)
@ -393,7 +404,7 @@ def run_query_no_retry(pool, return_type, query, args):
dbname = pool.name dbname = pool.name
if dbname in unhappy_cooldown: if dbname in unhappy_cooldown:
raise UnhappyDBError() raise UnhappyDBError()
elif conn.broken: elif conn.closed != 0:
raise DisconnectedError() raise DisconnectedError()
else: else:
raise raise
@ -505,15 +516,14 @@ def test_queries():
# We just use the default db for tests # We just use the default db for tests
dbname = config["dbname"] dbname = config["dbname"]
# Loop through all defined metrics. # Loop through all defined metrics.
for metric_name in config["metrics"].keys(): for name, metric in config["metrics"].items():
# Get the actual metric definition
metric = metrics[metric_name]
# If the metric has arguments to use while testing, grab those # If the metric has arguments to use while testing, grab those
args = metric.get("test_args", {}) args = metric.get("test_args", {})
# Run the query without the ability to retry. # Run the query without the ability to retry.
res = sample_metric(dbname, metric_name, args, retry=False) res = sample_metric(dbname, name, args, retry=False)
# Compare the result to the provided sample results # Compare the result to the provided sample results
# TODO # TODO
print("{} -> {}".format(name, res))
# Return the number of errors # Return the number of errors
# TODO # TODO
return 0 return 0
@ -603,13 +613,16 @@ if __name__ == "__main__":
) )
parser.add_argument( parser.add_argument(
"config_file", "-c",
"--config_file",
default="pgmon.yml", default="pgmon.yml",
nargs="?", nargs="?",
help="The config file to read (default: %(default)s)", help="The config file to read (default: %(default)s)",
) )
parser.add_argument("test", action="store_true", help="Run query tests and exit") parser.add_argument(
"-t", "--test", action="store_true", help="Run query tests and exit"
)
args = parser.parse_args() args = parser.parse_args()
@ -628,7 +641,7 @@ if __name__ == "__main__":
sys.exit(0) sys.exit(0)
# Set up the http server to receive requests # Set up the http server to receive requests
server_address = ("127.0.0.1", config["port"]) server_address = (config["address"], config["port"])
httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
# Set up the signal handler # Set up the signal handler

View File

@ -7,7 +7,7 @@ After=network.target
[Service] [Service]
EnvironmentFile=/etc/pgmon/%i-service.conf EnvironmentFile=/etc/pgmon/%i-service.conf
User=${SERVICE_USER:-postgres} User=${SERVICE_USER:-postgres}
ExecStart=/usr/local/bin/pgmon /etc/pgmon/%i.yml ExecStart=/usr/local/bin/pgmon -c /etc/pgmon/%i.yml
ExecReload=kill -HUP $MAINPID ExecReload=kill -HUP $MAINPID
Restart=on-failure Restart=on-failure
Type=exec Type=exec

22
tests/Dockerfile Normal file
View File

@ -0,0 +1,22 @@
FROM alpine:3.21
RUN apk update && \
apk add py3-psycopg2 \
py3-yaml \
tini
WORKDIR /app
COPY src/pgmon.py /app/
COPY sample-config/pgmon-metrics.yml /app/
COPY tests/test-config.yml /app/
COPY --chmod=0600 --chown=postgres:postgres tests/pgpass /root/.pgpass
ENTRYPOINT ["tini", "--"]
EXPOSE 5400
CMD ["/app/pgmon.py", "-c", "/app/test-config.yml", "--test"]

32
tests/docker-compose.yml Normal file
View File

@ -0,0 +1,32 @@
---
services:
agent:
image: pgmon
build:
context: ..
dockerfile: tests/Dockerfile
ports:
- :5400
depends_on:
db:
condition: service_healthy
db:
image: "postgres:${PGTAG:-17-bookworm}"
ports:
- :5432
environment:
POSTGRES_PASSWORD: secret
healthcheck:
#test: [ "CMD", "pg_isready", "-U", "postgres" ]
test: [ "CMD-SHELL", "pg_controldata /var/lib/postgresql/data/ | grep -q 'in production'" ]
interval: 5s
timeout: 2s
retries: 10
command: >
postgres -c ssl=on
-c ssl_cert_file='/etc/ssl/certs/ssl-cert-snakeoil.pem'
-c ssl_key_file='/etc/ssl/private/ssl-cert-snakeoil.key'
-c listen_addresses='*'

1
tests/pgpass Normal file
View File

@ -0,0 +1 @@
db:5432:*:postgres:secret

62
tests/run-tests.sh Executable file
View File

@ -0,0 +1,62 @@
#!/bin/bash
# Versions to test
versions=( $@ )
# If we weren't given any versions, test them all
if [ ${#versions[@]} -eq 0 ]
then
versions=( 9.2 9.6 10 11 12 13 14 15 16 17 )
fi
# Image tags to use
declare -A images=()
images["9.2"]='9.2'
images["9.6"]='9.6-bullseye'
images["10"]='10-bullseye'
images["11"]='11-bookworm'
images["12"]='12-bookworm'
images["13"]='13-bookworm'
images["14"]='14-bookworm'
images["15"]='15-bookworm'
images["16"]='16-bookworm'
images["17"]='17-bookworm'
declare -A results=()
# Make sure everything's down to start with
docker compose down
# Make sure our agent container is up to date
docker compose build agent
for version in "${versions[@]}"
do
echo
echo "Testing: PostgreSQL ${version}"
# Specify the version we're testing against
export PGTAG="${images["$version"]}"
# Start the containers
docker compose up --exit-code-from=agent agent
rc=$?
results["$version"]=$rc
# Destroy the containers
docker compose down
done
echo
echo
for v in "${versions[@]}"
do
case "${results["$v"]}" in
0) msg="OK" ;;
1) msg="Query failure detected" ;;
18) msg="Docker image error: 18" ;;
*) msg="Unexpected error: ${results["$v"]}" ;;
esac
echo "$v -> $msg"
done

View File

@ -1,94 +0,0 @@
#!/usr/bin/env python3
from testcontainers import PostgresContainer
import requests
import yaml
import sys
pg_versions = [9.2, 9.6, 10, 11, 12, 13, 14, 15, 16, 17]
pgmon_port = 93849
tests = {}
container = None
def std_version(version):
if version[0] == "9":
return int(f"{version[0]}0{version[1]}00")
else:
return int(f"{version}0000")
def run_test(metric, params, status, check):
"""
Validate the return code and restults of a query
params:
metric: The name of the metric to test
params: A dictionary of query parameters to use when testing
status: The expected status code
check: A regular expression to validate the results (or None)
"""
result = requests.get(f"http://localhost:{pgmon_port}/{metric}", params=params)
if result.status_code != status:
print(
f"FAIL: {metric}[{params}] returned wrong status code: {result.status_code}"
)
return False
if re.match(check, result.text):
print(f"SUCCESS: {metric}[{params}]")
return True
else:
print(f"FAIL: {metric}[{params}] result is invalid, got:\n {result.text}")
return False
def run_all_tests(version):
"""
Run all defined tests against the current running instance
params:
version: The PostgreSQL version currently being tested (server_version_num format)
"""
errors = 0
# Convert versions like 12 to 120000
version_num = std_version(version)
# Loop through all of the metrics to test.
for metric in tests.keys():
params = metric.get("params", {})
status = 200
check = ""
# Find the correct version of the status and check parameters (assuming there are any).
# If there are any check conditions, find the highest version that does not exceed the version we're currently testing against.
# To do this, we order the keys (versions) in reverse, so we start with the highest.
for v in reversed(sorted(metric.get("expect", {}).keys())):
# If we've reached a version <= the one we're testing use it.
if int(v) <= version_num:
status = metric["expect"][v]["status"]
check = metric["expect"][v]["check"]
break
if not run_test(metric, metrics[metric].get(params, {}), status, check):
errors += 1
return errors
def start_test_db(version):
# container = PostgresContainer()
pass
# Read the test script
try:
with open("metric_tests.yml", "r") as f:
tests = yaml.safe_load(f)
except yaml.parser.ParserError as e:
sys.exit("Failed to parse metrics_test.yml: {e}")

16
tests/test-config.yml Normal file
View File

@ -0,0 +1,16 @@
---
# Bind to all interfaces so we can submit requests from outside the test container
address: 0.0.0.0
# We always just connect to the db container
dbhost: db
dbport: 5432
dbuser: postgres
# Allow some insecure SSL parameters for the 9.2 test
ssl_ciphers: DEFAULT@SECLEVEL=1
# Pull in the standard metrics
include:
- pgmon-metrics.yml