pgmon/pgmon.py

379 lines
11 KiB
Python
Raw Normal View History

2024-05-16 15:41:47 +00:00
#!/usr/bin/env python3
import yaml
import json
import time
2024-05-16 15:41:47 +00:00
import argparse
import logging
2024-05-18 18:09:43 +00:00
2024-10-31 05:03:43 +00:00
from datetime import datetime, timedelta
import psycopg
from psycopg_pool import ConnectionPool
2024-05-18 18:09:43 +00:00
import signal
from threading import Thread, Lock, Semaphore
2024-05-18 18:09:43 +00:00
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
2024-05-18 18:09:43 +00:00
2024-10-29 05:36:04 +00:00
VERSION = '0.1.0'
# Configuration
config = {}
2024-05-18 18:09:43 +00:00
# Dictionary of current PostgreSQL connection pools
connections_lock = Lock()
connections = {}
2024-05-18 18:09:43 +00:00
2024-10-31 05:03:43 +00:00
# Dictionary of unhappy databases. Keys are database names, value is the time
# the database was determined to be unhappy plus the cooldown setting. So,
# basically it's the time when we should try to connect to the database again.
unhappy = {}
# Running state (used to gracefully shut down)
running = True
2024-05-18 18:09:43 +00:00
# The http server object
httpd = None
2024-05-18 18:09:43 +00:00
# Where the config file lives
config_file = None
2024-05-18 18:09:43 +00:00
# Configure logging
log = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s')
console_log_handler = logging.StreamHandler()
console_log_handler.setFormatter(formatter)
log.addHandler(console_log_handler)
2024-05-18 18:09:43 +00:00
# Error types
class ConfigError(Exception):
pass
class DisconnectedError(Exception):
pass
2024-10-31 05:03:43 +00:00
class UnhappyDBError(Exception):
pass
# Default config settings
default_config = {
# IPC port
'port': 5400,
2024-05-18 18:09:43 +00:00
# Max PostgreSQL connection pool size
'pool_size': 4,
2024-05-18 18:09:43 +00:00
# Log level for stderr logging (or 'off')
'log_level': 'debug',
2024-05-18 18:09:43 +00:00
# Connection details
'connstr': '',
# Default database to connect to when none is specified for a metric
'dbname': 'postgres',
2024-05-18 18:09:43 +00:00
2024-10-31 05:03:43 +00:00
# PostgreSQL connection timeout (seconds)
'connect_timeout': 5,
# Time to wait before trying to reconnect again after a reconnect failure
'reconnect_cooldown': 30,
# Metrics
'metrics': {}
}
def update_deep(d1, d2):
"""
Recursively update a dict, adding keys to dictionaries and appending to
lists. Note that this both modifies and returns the first dict.
Params:
d1: the dictionary to update
2024-10-31 05:03:43 +00:00
d2: the dictionary to get new values from
Returns:
The new d1
"""
for k, v in d2.items():
if isinstance(v, dict):
d1[k] = update_deep(d1.get(k, {}), v)
elif isinstance(v, list):
d1[k] = d1.get(k, []) + v
else:
d1[k] = v
return d1
def read_config(path, included = False):
"""
Read a config file.
params:
path: path to the file to read
included: is this file included by another file?
"""
# Read config file
log.info(f"Reading log file: {path}")
with open(path, 'r') as f:
cfg = yaml.safe_load(f)
2024-10-31 05:03:43 +00:00
# Read any included config files
for inc in cfg.get('include', []):
update_deep(cfg, read_config(inc, included=True))
# Return the config we read if this is an include, otherwise set the final
# config
if included:
return cfg
else:
new_config = {}
new_config.update(default_config)
update_deep(new_config, cfg)
# Read any external queries
for metric in new_config.get('metrics', {}).values():
for vers, query in metric['query'].items():
if query.startswith('file:'):
path = query[5:]
with open(path, 'r') as f:
metric['query'][vers] = f.read()
2024-06-06 04:44:05 +00:00
# Minor sanity checks
if len(new_config['metrics']) == 0:
log.error("No metrics are defined")
raise ConfigError()
2024-05-18 18:09:43 +00:00
global config
config = new_config
2024-05-18 18:09:43 +00:00
# Apply changes to log level
log.setLevel(logging.getLevelName(config['log_level'].upper()))
2024-05-16 15:41:47 +00:00
def signal_handler(sig, frame):
"""
Function for handling signals
HUP => Reload
"""
2024-05-16 15:41:47 +00:00
# Restore the original handler
signal.signal(signal.SIGINT, signal.default_int_handler)
# Signal everything to shut down
2024-05-18 18:09:43 +00:00
if sig in [ signal.SIGINT, signal.SIGTERM, signal.SIGQUIT ]:
log.info("Shutting down ...")
2024-05-16 15:41:47 +00:00
global running
running = False
if httpd is not None:
httpd.socket.close()
# Signal a reload
if sig == signal.SIGHUP:
log.warning("Received config reload signal")
read_config(config_file)
2024-05-16 15:41:47 +00:00
def get_pool(dbname):
2024-05-16 15:41:47 +00:00
"""
Get a database connection pool.
2024-05-16 15:41:47 +00:00
"""
2024-10-31 05:03:43 +00:00
# Check if the db is unhappy and wants to be left alone
if dbname in unhappy:
if unhappy[dbname] > datetime.now():
raise UnhappyDBError()
if dbname not in connections:
with connections_lock:
# Make sure nobody created the pool while we were waiting on the
# lock
if dbname not in connections:
log.info(f"Creating connection pool for: {dbname}")
connections[dbname] = ConnectionPool(
2024-10-31 05:03:43 +00:00
name=dbname,
min_size=0, max_size=int(config['pool_size']),
conninfo=f"dbname={dbname} application_name=pgmon {config['connstr']}",
reconnect_timeout=float(config['connect_timeout']),
reconnect_failed=handle_connect_failure)
# Clear the unhappy indicator if present
unhappy.pop(dbname, None)
return connections[dbname]
2024-10-31 05:03:43 +00:00
def handle_connect_failure(pool):
"""
Mark the database as being unhappy so we can leave it alone for a while
"""
dbname = pool.name
unhappy[dbname] = datetime.now() + timedelta(seconds=int(config['reconnect_cooldown']))
def get_query(metric, version):
2024-05-16 15:41:47 +00:00
"""
Get the correct metric query for a given version of PostgreSQL.
params:
metric: The metric definition
version: The PostgreSQL version number, as given by server_version_num
2024-05-16 15:41:47 +00:00
"""
# Select the correct query
for v in reversed(sorted(metric['query'].keys())):
if version >= v:
return metric['query'][v]
raise Exception('Missing metric query')
2024-10-31 05:03:43 +00:00
def run_query_no_retry(pool, return_type, query, args):
with pool.connection(timeout=float(config['connect_timeout'])) as conn:
2024-05-16 15:41:47 +00:00
try:
with conn.cursor(row_factory=psycopg.rows.dict_row) as curs:
curs.execute(query, args)
res = curs.fetchall()
if return_type == 'value':
return str(list(res[0].values())[0])
elif return_type == 'row':
return json.dumps(res[0])
elif return_type == 'column':
return json.dumps([list(r.values())[0] for r in res])
elif return_type == 'set':
return json.dumps(res)
except:
2024-10-31 05:03:43 +00:00
dbname = pool.name
if dbname in unhappy:
raise UnhappyDBError()
elif conn.broken:
raise DisconnectedError()
else:
raise
def run_query(pool, return_type, query, args):
# If we get disconnected, I think the putconn command will close the dead
# connection. So we can just give it another shot.
try:
2024-10-31 05:03:43 +00:00
return run_query_no_retry(pool, return_type, query, args)
except DisconnectedError:
log.warning("Stale PostgreSQL connection found ... trying again")
2024-10-31 05:03:43 +00:00
# This sleep is an annoyinh hack to give the pool workers time to
# actually mark the connection, otherwise it can be given back in the
# next connection() call
time.sleep(1)
2024-10-31 05:03:43 +00:00
return run_query_no_retry(pool, return_type, query, args)
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def log_request(self, code='-', size='-'):
# Override to suppress logging
pass
def do_GET(self):
# Parse the URL
parsed_path = urlparse(self.path)
name = parsed_path.path.strip('/')
parsed_query = parse_qs(parsed_path.query)
2024-10-29 05:36:04 +00:00
if name == 'agent_version':
self._reply(200, f"{VERSION}")
return
# Note: Parse_qs returns the values as a list. Since we always expect
# single values, just grab the first from each.
2024-10-31 05:03:43 +00:00
args = {key: values[0] for key, values in parsed_query.items()}
# Get the metric definition
2024-06-07 06:10:38 +00:00
try:
metric = config['metrics'][name]
except KeyError:
log.error(f"Unknown metric: {name}")
2024-10-29 05:36:04 +00:00
self._reply(404, 'Unknown metric')
return
# Get the dbname
dbname = args.get('dbname', config['dbname'])
2024-05-18 18:09:43 +00:00
# Get the connection pool for the database
2024-10-31 05:03:43 +00:00
try:
pool = get_pool(dbname)
except UnhappyDBError:
log.info(f"Database {dbname} is unhappy, please be patient")
self._reply(503, 'Database unavailable')
return
2024-06-07 06:10:38 +00:00
# Identify the PostgreSQL version
2024-05-16 15:41:47 +00:00
try:
version = int(args['vers'])
except KeyError:
2024-05-16 15:41:47 +00:00
try:
version = int(run_query(pool, 'value', 'SHOW server_version_num', None))
2024-10-31 05:03:43 +00:00
except UnhappyDBError:
return
2024-05-16 15:41:47 +00:00
except Exception as e:
2024-10-31 05:03:43 +00:00
if dbname in unhappy:
log.info(f"Database {dbname} is unhappy, please be patient")
self._reply(503, 'Database unavailable')
else:
log.error(f"Failed to get PostgreSQL version: {e}")
self._reply(500, 'Error getting DB version')
return
2024-05-16 15:41:47 +00:00
# Get the query version
try:
query = get_query(metric, version)
2024-10-31 05:03:43 +00:00
except KeyError:
log.error(f"Failed to find a version of {name} for {version}")
self._reply(404, 'Unsupported version')
return
# Execute the quert
try:
self._reply(200, run_query(pool, metric['type'], query, args))
2024-10-31 05:03:43 +00:00
return
except Exception as e:
2024-10-31 05:03:43 +00:00
if dbname in unhappy:
log.info(f"Database {dbname} is unhappy, please be patient")
self._reply(503, 'Database unavailable')
else:
log.error(f"Error running query: {e}")
self._reply(500, "Error running query")
return
def _reply(self, code, content):
self.send_response(code)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(content, 'utf-8'))
2024-05-16 15:41:47 +00:00
if __name__ == '__main__':
# Handle cli args
parser = argparse.ArgumentParser(
prog = 'pgmon',
description='A PostgreSQL monitoring agent')
2024-05-16 15:41:47 +00:00
parser.add_argument('config_file', default='pgmon.yml', nargs='?',
help='The config file to read (default: %(default)s)')
2024-05-16 15:41:47 +00:00
args = parser.parse_args()
2024-10-31 05:03:43 +00:00
# Set the config file path
config_file = args.config_file
2024-05-16 15:41:47 +00:00
# Read the config file
read_config(config_file)
2024-05-16 15:41:47 +00:00
# Set up the http server to receive requests
server_address = ('127.0.0.1', config['port'])
httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
2024-05-16 15:41:47 +00:00
# Set up the signal handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
2024-05-16 15:41:47 +00:00
# Handle requests.
log.info(f"Listening on port {config['port']}...")
while running:
httpd.handle_request()
2024-05-16 15:41:47 +00:00
# Clean up PostgreSQL connections
# TODO: Improve this ... not sure it actually closes all the connections cleanly
for pool in connections.values():
pool.close()