Compare commits
No commits in common. "c0e153108310070dce4127a2887e88295f1c5d6d" and "8928bba337143911139d963cbca2b2aff3da50ce" have entirely different histories.
c0e1531083
...
8928bba337
@ -19,6 +19,6 @@ start_pre() {
|
||||
}
|
||||
|
||||
command="/usr/bin/pgmon"
|
||||
command_args="-c '$CONFIG_FILE'"
|
||||
command_args="'$CONFIG_FILE'"
|
||||
command_background="true"
|
||||
command_user="${PGMON_USER}:${PGMON_GROUP}"
|
||||
|
||||
@ -11,8 +11,7 @@ metrics:
|
||||
discover_slots:
|
||||
type: set
|
||||
query:
|
||||
0: SELECT slot_name, plugin, slot_type, database, false as temporary, active FROM pg_replication_slots
|
||||
100000: SELECT slot_name, plugin, slot_type, database, temporary, active FROM pg_replication_slots
|
||||
0: SELECT slot_name, plugin, slot_type, database, temporary, active FROM pg_replication_slots
|
||||
|
||||
# cluster-wide metrics
|
||||
version:
|
||||
@ -30,8 +29,6 @@ metrics:
|
||||
query:
|
||||
0: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, blk_read_time, blk_write_time, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s
|
||||
140000: SELECT numbackends, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted, conflicts, temp_files, temp_bytes, deadlocks, COALESCE(checksum_failures, 0) AS checksum_failures, blk_read_time, blk_write_time, session_time, active_time, idle_in_transaction_time, sessions, sessions_abandoned, sessions_fatal, sessions_killed, extract('epoch' from stats_reset)::float FROM pg_stat_database WHERE datname = %(dbname)s
|
||||
test_args:
|
||||
dbname: postgres
|
||||
|
||||
# Debugging
|
||||
ntables:
|
||||
@ -43,9 +40,7 @@ metrics:
|
||||
rep_stats:
|
||||
type: row
|
||||
query:
|
||||
0: SELECT * FROM pg_stat_replication WHERE client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') = '{repid}'
|
||||
test_args:
|
||||
repid: 127.0.0.1_test_rep
|
||||
0: SELECT * FROM pg_stat_database WHERE client_addr || '_' || regexp_replace(application_name, '[ ,]', '_', 'g') = '{repid}'
|
||||
|
||||
# Debugging
|
||||
sleep:
|
||||
@ -57,7 +52,4 @@ metrics:
|
||||
slot_stats:
|
||||
type: row
|
||||
query:
|
||||
0: SELECT active_pid, xmin, pg_xlog_location_diff(pg_current_xlog_location(), restart_lsn) AS restart_bytes, pg_xlog_location_diff(pg_current_xlog_location(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}'
|
||||
100000: SELECT active_pid, xmin, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS restart_bytes, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}'
|
||||
test_args:
|
||||
slot: test_slot
|
||||
0: SELECT active_pid, xmin, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS restart_bytes, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS confirmed_flush_bytes FROM pg_replication_slots WHERE slot_name = '{slot}'
|
||||
|
||||
@ -1,6 +1,3 @@
|
||||
# The address the agent binds to
|
||||
#address: 127.0.0.1
|
||||
|
||||
# The port the agent listens on for requests
|
||||
#port: 5400
|
||||
|
||||
@ -29,9 +26,6 @@
|
||||
# Default database to connect to when none is specified for a metric
|
||||
#dbname: 'postgres'
|
||||
|
||||
# SSL connection mode
|
||||
#ssl_mode: require
|
||||
|
||||
# Timeout for getting a connection slot from a pool
|
||||
#pool_slot_timeout: 5
|
||||
|
||||
|
||||
147
src/pgmon.py
147
src/pgmon.py
@ -4,7 +4,6 @@ import yaml
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
@ -75,18 +74,12 @@ class UnhappyDBError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class UnknownMetricError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class MetricVersionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# Default config settings
|
||||
default_config = {
|
||||
# The address the agent binds to
|
||||
"address": "127.0.0.1",
|
||||
# The port the agent listens on for requests
|
||||
"port": 5400,
|
||||
# Min PostgreSQL connection pool size (per database)
|
||||
@ -105,8 +98,6 @@ default_config = {
|
||||
"dbport": 5432,
|
||||
# Default database to connect to when none is specified for a metric
|
||||
"dbname": "postgres",
|
||||
# SSL connection mode
|
||||
"ssl_mode": "require",
|
||||
# Timeout for getting a connection slot from a pool
|
||||
"pool_slot_timeout": 5,
|
||||
# PostgreSQL connection timeout (seconds)
|
||||
@ -329,7 +320,6 @@ def get_pool(dbname):
|
||||
# lock
|
||||
if dbname not in connections:
|
||||
log.info("Creating connection pool for: {}".format(dbname))
|
||||
# Actually create the connection pool
|
||||
connections[dbname] = ConnectionPool(
|
||||
dbname,
|
||||
int(config["min_pool_size"]),
|
||||
@ -339,7 +329,7 @@ def get_pool(dbname):
|
||||
port=config["dbport"],
|
||||
user=config["dbuser"],
|
||||
connect_timeout=int(config["connect_timeout"]),
|
||||
sslmode=config["ssl_mode"],
|
||||
sslmode="require",
|
||||
)
|
||||
# Clear the unhappy indicator if present
|
||||
unhappy_cooldown.pop(dbname, None)
|
||||
@ -387,16 +377,10 @@ def run_query_no_retry(pool, return_type, query, args):
|
||||
res = curs.fetchall()
|
||||
|
||||
if return_type == "value":
|
||||
if len(res) == 0:
|
||||
return ""
|
||||
return str(list(res[0].values())[0])
|
||||
elif return_type == "row":
|
||||
if len(res) == 0:
|
||||
return "[]"
|
||||
return json.dumps(res[0])
|
||||
elif return_type == "column":
|
||||
if len(res) == 0:
|
||||
return "[]"
|
||||
return json.dumps([list(r.values())[0] for r in res])
|
||||
elif return_type == "set":
|
||||
return json.dumps(res)
|
||||
@ -404,7 +388,7 @@ def run_query_no_retry(pool, return_type, query, args):
|
||||
dbname = pool.name
|
||||
if dbname in unhappy_cooldown:
|
||||
raise UnhappyDBError()
|
||||
elif conn.closed != 0:
|
||||
elif conn.broken:
|
||||
raise DisconnectedError()
|
||||
else:
|
||||
raise
|
||||
@ -482,53 +466,6 @@ def get_cluster_version():
|
||||
return cluster_version
|
||||
|
||||
|
||||
def sample_metric(dbname, metric_name, args, retry=True):
|
||||
"""
|
||||
Run the appropriate query for the named metric against the specified database
|
||||
"""
|
||||
# Get the metric definition
|
||||
try:
|
||||
metric = config["metrics"][metric_name]
|
||||
except KeyError:
|
||||
raise UnknownMetricError("Unknown metric: {}".format(metric_name))
|
||||
|
||||
# Get the connection pool for the database, or create one if it doesn't
|
||||
# already exist.
|
||||
pool = get_pool(dbname)
|
||||
|
||||
# Identify the PostgreSQL version
|
||||
version = get_cluster_version()
|
||||
|
||||
# Get the query version
|
||||
query = get_query(metric, version)
|
||||
|
||||
# Execute the quert
|
||||
if retry:
|
||||
return run_query(pool, metric["type"], query, args)
|
||||
else:
|
||||
return run_query_no_retry(pool, metric["type"], query, args)
|
||||
|
||||
|
||||
def test_queries():
|
||||
"""
|
||||
Run all of the metric queries against a database and check the results
|
||||
"""
|
||||
# We just use the default db for tests
|
||||
dbname = config["dbname"]
|
||||
# Loop through all defined metrics.
|
||||
for name, metric in config["metrics"].items():
|
||||
# If the metric has arguments to use while testing, grab those
|
||||
args = metric.get("test_args", {})
|
||||
# Run the query without the ability to retry.
|
||||
res = sample_metric(dbname, name, args, retry=False)
|
||||
# Compare the result to the provided sample results
|
||||
# TODO
|
||||
print("{} -> {}".format(name, res))
|
||||
# Return the number of errors
|
||||
# TODO
|
||||
return 0
|
||||
|
||||
|
||||
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
"""
|
||||
This is our request handling server. It is responsible for listening for
|
||||
@ -557,10 +494,10 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
"""
|
||||
# Parse the URL
|
||||
parsed_path = urlparse(self.path)
|
||||
metric_name = parsed_path.path.strip("/")
|
||||
name = parsed_path.path.strip("/")
|
||||
parsed_query = parse_qs(parsed_path.query)
|
||||
|
||||
if metric_name == "agent_version":
|
||||
if name == "agent_version":
|
||||
self._reply(200, VERSION)
|
||||
return
|
||||
|
||||
@ -568,31 +505,60 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
# single values, just grab the first from each.
|
||||
args = {key: values[0] for key, values in parsed_query.items()}
|
||||
|
||||
# Get the metric definition
|
||||
try:
|
||||
metric = config["metrics"][name]
|
||||
except KeyError:
|
||||
log.error("Unknown metric: {}".format(name))
|
||||
self._reply(404, "Unknown metric")
|
||||
return
|
||||
|
||||
# Get the dbname. If none was provided, use the default from the
|
||||
# config.
|
||||
dbname = args.get("dbname", config["dbname"])
|
||||
|
||||
# Sample the metric
|
||||
# Get the connection pool for the database, or create one if it doesn't
|
||||
# already exist.
|
||||
try:
|
||||
self._reply(200, sample_metric(dbname, metric_name, args))
|
||||
return
|
||||
except UnknownMetricError as e:
|
||||
log.error("Unknown metric: {}".format(metric_name))
|
||||
self._reply(404, "Unknown metric")
|
||||
return
|
||||
except MetricVersionError as e:
|
||||
log.error(
|
||||
"Failed to find a version of {} for {}".format(metric_name, version)
|
||||
)
|
||||
self._reply(404, "Unsupported version")
|
||||
return
|
||||
except UnhappyDBError as e:
|
||||
pool = get_pool(dbname)
|
||||
except UnhappyDBError:
|
||||
log.info("Database {} is unhappy, please be patient".format(dbname))
|
||||
self._reply(503, "Database unavailable")
|
||||
return
|
||||
|
||||
# Identify the PostgreSQL version
|
||||
try:
|
||||
version = get_cluster_version()
|
||||
except UnhappyDBError:
|
||||
return
|
||||
except Exception as e:
|
||||
if dbname in unhappy_cooldown:
|
||||
log.info("Database {} is unhappy, please be patient".format(dbname))
|
||||
self._reply(503, "Database unavailable")
|
||||
else:
|
||||
log.error("Failed to get PostgreSQL version: {}".format(e))
|
||||
self._reply(500, "Error getting DB version")
|
||||
return
|
||||
|
||||
# Get the query version
|
||||
try:
|
||||
query = get_query(metric, version)
|
||||
except KeyError:
|
||||
log.error("Failed to find a version of {} for {}".format(name, version))
|
||||
self._reply(404, "Unsupported version")
|
||||
return
|
||||
|
||||
# Execute the quert
|
||||
try:
|
||||
self._reply(200, run_query(pool, metric["type"], query, args))
|
||||
return
|
||||
except Exception as e:
|
||||
if dbname in unhappy_cooldown:
|
||||
log.info("Database {} is unhappy, please be patient".format(dbname))
|
||||
self._reply(503, "Database unavailable")
|
||||
else:
|
||||
log.error("Error running query: {}".format(e))
|
||||
self._reply(500, "Unexpected error: {}".format(e))
|
||||
self._reply(500, "Error running query")
|
||||
return
|
||||
|
||||
def _reply(self, code, content):
|
||||
@ -613,17 +579,12 @@ if __name__ == "__main__":
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--config_file",
|
||||
"config_file",
|
||||
default="pgmon.yml",
|
||||
nargs="?",
|
||||
help="The config file to read (default: %(default)s)",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-t", "--test", action="store_true", help="Run query tests and exit"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set the config file path
|
||||
@ -632,16 +593,8 @@ if __name__ == "__main__":
|
||||
# Read the config file
|
||||
read_config(config_file)
|
||||
|
||||
# Run query tests and exit if test mode is enabled
|
||||
if args.test:
|
||||
errors = test_queries()
|
||||
if errors > 0:
|
||||
sys.exit(1)
|
||||
else:
|
||||
sys.exit(0)
|
||||
|
||||
# Set up the http server to receive requests
|
||||
server_address = (config["address"], config["port"])
|
||||
server_address = ("127.0.0.1", config["port"])
|
||||
httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
|
||||
|
||||
# Set up the signal handler
|
||||
|
||||
@ -7,7 +7,7 @@ After=network.target
|
||||
[Service]
|
||||
EnvironmentFile=/etc/pgmon/%i-service.conf
|
||||
User=${SERVICE_USER:-postgres}
|
||||
ExecStart=/usr/local/bin/pgmon -c /etc/pgmon/%i.yml
|
||||
ExecStart=/usr/local/bin/pgmon /etc/pgmon/%i.yml
|
||||
ExecReload=kill -HUP $MAINPID
|
||||
Restart=on-failure
|
||||
Type=exec
|
||||
|
||||
@ -1,22 +0,0 @@
|
||||
FROM alpine:3.21
|
||||
|
||||
RUN apk update && \
|
||||
apk add py3-psycopg2 \
|
||||
py3-yaml \
|
||||
tini
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY src/pgmon.py /app/
|
||||
|
||||
COPY sample-config/pgmon-metrics.yml /app/
|
||||
|
||||
COPY tests/test-config.yml /app/
|
||||
|
||||
COPY --chmod=0600 --chown=postgres:postgres tests/pgpass /root/.pgpass
|
||||
|
||||
ENTRYPOINT ["tini", "--"]
|
||||
|
||||
EXPOSE 5400
|
||||
|
||||
CMD ["/app/pgmon.py", "-c", "/app/test-config.yml", "--test"]
|
||||
@ -1,32 +0,0 @@
|
||||
---
|
||||
|
||||
services:
|
||||
agent:
|
||||
image: pgmon
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: tests/Dockerfile
|
||||
ports:
|
||||
- :5400
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
|
||||
db:
|
||||
image: "postgres:${PGTAG:-17-bookworm}"
|
||||
ports:
|
||||
- :5432
|
||||
environment:
|
||||
POSTGRES_PASSWORD: secret
|
||||
healthcheck:
|
||||
#test: [ "CMD", "pg_isready", "-U", "postgres" ]
|
||||
test: [ "CMD-SHELL", "pg_controldata /var/lib/postgresql/data/ | grep -q 'in production'" ]
|
||||
interval: 5s
|
||||
timeout: 2s
|
||||
retries: 10
|
||||
command: >
|
||||
postgres -c ssl=on
|
||||
-c ssl_cert_file='/etc/ssl/certs/ssl-cert-snakeoil.pem'
|
||||
-c ssl_key_file='/etc/ssl/private/ssl-cert-snakeoil.key'
|
||||
-c listen_addresses='*'
|
||||
|
||||
@ -1 +0,0 @@
|
||||
db:5432:*:postgres:secret
|
||||
@ -1,62 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Versions to test
|
||||
versions=( $@ )
|
||||
|
||||
# If we weren't given any versions, test them all
|
||||
if [ ${#versions[@]} -eq 0 ]
|
||||
then
|
||||
versions=( 9.2 9.6 10 11 12 13 14 15 16 17 )
|
||||
fi
|
||||
|
||||
# Image tags to use
|
||||
declare -A images=()
|
||||
images["9.2"]='9.2'
|
||||
images["9.6"]='9.6-bullseye'
|
||||
images["10"]='10-bullseye'
|
||||
images["11"]='11-bookworm'
|
||||
images["12"]='12-bookworm'
|
||||
images["13"]='13-bookworm'
|
||||
images["14"]='14-bookworm'
|
||||
images["15"]='15-bookworm'
|
||||
images["16"]='16-bookworm'
|
||||
images["17"]='17-bookworm'
|
||||
|
||||
declare -A results=()
|
||||
|
||||
# Make sure everything's down to start with
|
||||
docker compose down
|
||||
|
||||
# Make sure our agent container is up to date
|
||||
docker compose build agent
|
||||
|
||||
for version in "${versions[@]}"
|
||||
do
|
||||
echo
|
||||
echo "Testing: PostgreSQL ${version}"
|
||||
|
||||
# Specify the version we're testing against
|
||||
export PGTAG="${images["$version"]}"
|
||||
|
||||
# Start the containers
|
||||
docker compose up --exit-code-from=agent agent
|
||||
rc=$?
|
||||
|
||||
results["$version"]=$rc
|
||||
|
||||
# Destroy the containers
|
||||
docker compose down
|
||||
done
|
||||
|
||||
echo
|
||||
echo
|
||||
for v in "${versions[@]}"
|
||||
do
|
||||
case "${results["$v"]}" in
|
||||
0) msg="OK" ;;
|
||||
1) msg="Query failure detected" ;;
|
||||
18) msg="Docker image error: 18" ;;
|
||||
*) msg="Unexpected error: ${results["$v"]}" ;;
|
||||
esac
|
||||
echo "$v -> $msg"
|
||||
done
|
||||
94
tests/sql-tests.py
Normal file
94
tests/sql-tests.py
Normal file
@ -0,0 +1,94 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from testcontainers import PostgresContainer
|
||||
import requests
|
||||
import yaml
|
||||
import sys
|
||||
|
||||
pg_versions = [9.2, 9.6, 10, 11, 12, 13, 14, 15, 16, 17]
|
||||
|
||||
pgmon_port = 93849
|
||||
|
||||
tests = {}
|
||||
|
||||
container = None
|
||||
|
||||
|
||||
def std_version(version):
|
||||
if version[0] == "9":
|
||||
return int(f"{version[0]}0{version[1]}00")
|
||||
else:
|
||||
return int(f"{version}0000")
|
||||
|
||||
|
||||
def run_test(metric, params, status, check):
|
||||
"""
|
||||
Validate the return code and restults of a query
|
||||
|
||||
params:
|
||||
metric: The name of the metric to test
|
||||
params: A dictionary of query parameters to use when testing
|
||||
status: The expected status code
|
||||
check: A regular expression to validate the results (or None)
|
||||
"""
|
||||
result = requests.get(f"http://localhost:{pgmon_port}/{metric}", params=params)
|
||||
|
||||
if result.status_code != status:
|
||||
print(
|
||||
f"FAIL: {metric}[{params}] returned wrong status code: {result.status_code}"
|
||||
)
|
||||
return False
|
||||
|
||||
if re.match(check, result.text):
|
||||
print(f"SUCCESS: {metric}[{params}]")
|
||||
return True
|
||||
else:
|
||||
print(f"FAIL: {metric}[{params}] result is invalid, got:\n {result.text}")
|
||||
return False
|
||||
|
||||
|
||||
def run_all_tests(version):
|
||||
"""
|
||||
Run all defined tests against the current running instance
|
||||
|
||||
params:
|
||||
version: The PostgreSQL version currently being tested (server_version_num format)
|
||||
"""
|
||||
errors = 0
|
||||
|
||||
# Convert versions like 12 to 120000
|
||||
version_num = std_version(version)
|
||||
|
||||
# Loop through all of the metrics to test.
|
||||
for metric in tests.keys():
|
||||
params = metric.get("params", {})
|
||||
status = 200
|
||||
check = ""
|
||||
|
||||
# Find the correct version of the status and check parameters (assuming there are any).
|
||||
# If there are any check conditions, find the highest version that does not exceed the version we're currently testing against.
|
||||
# To do this, we order the keys (versions) in reverse, so we start with the highest.
|
||||
for v in reversed(sorted(metric.get("expect", {}).keys())):
|
||||
# If we've reached a version <= the one we're testing use it.
|
||||
if int(v) <= version_num:
|
||||
status = metric["expect"][v]["status"]
|
||||
check = metric["expect"][v]["check"]
|
||||
break
|
||||
|
||||
if not run_test(metric, metrics[metric].get(params, {}), status, check):
|
||||
errors += 1
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def start_test_db(version):
|
||||
# container = PostgresContainer()
|
||||
pass
|
||||
|
||||
|
||||
# Read the test script
|
||||
try:
|
||||
with open("metric_tests.yml", "r") as f:
|
||||
tests = yaml.safe_load(f)
|
||||
except yaml.parser.ParserError as e:
|
||||
sys.exit("Failed to parse metrics_test.yml: {e}")
|
||||
@ -1,16 +0,0 @@
|
||||
---
|
||||
|
||||
# Bind to all interfaces so we can submit requests from outside the test container
|
||||
address: 0.0.0.0
|
||||
|
||||
# We always just connect to the db container
|
||||
dbhost: db
|
||||
dbport: 5432
|
||||
dbuser: postgres
|
||||
|
||||
# Allow some insecure SSL parameters for the 9.2 test
|
||||
ssl_ciphers: DEFAULT@SECLEVEL=1
|
||||
|
||||
# Pull in the standard metrics
|
||||
include:
|
||||
- pgmon-metrics.yml
|
||||
Loading…
Reference in New Issue
Block a user