#!/usr/bin/env python3 import yaml import json import time import argparse import logging from datetime import datetime, timedelta import psycopg from psycopg_pool import ConnectionPool import signal from threading import Thread, Lock, Semaphore from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import ThreadingHTTPServer from urllib.parse import urlparse, parse_qs VERSION = '0.1.0' # Configuration config = {} # Dictionary of current PostgreSQL connection pools connections_lock = Lock() connections = {} # Dictionary of unhappy databases. Keys are database names, value is the time # the database was determined to be unhappy plus the cooldown setting. So, # basically it's the time when we should try to connect to the database again. unhappy = {} # Version information cluster_version = None cluster_version_next_check = None cluster_version_lock = Lock() # Running state (used to gracefully shut down) running = True # The http server object httpd = None # Where the config file lives config_file = None # Configure logging log = logging.getLogger(__name__) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s') console_log_handler = logging.StreamHandler() console_log_handler.setFormatter(formatter) log.addHandler(console_log_handler) # Error types class ConfigError(Exception): pass class DisconnectedError(Exception): pass class UnhappyDBError(Exception): pass # Default config settings default_config = { # IPC port 'port': 5400, # Max PostgreSQL connection pool size 'pool_size': 4, # Log level for stderr logging (or 'off') 'log_level': 'debug', # Connection details 'connstr': '', # Default database to connect to when none is specified for a metric 'dbname': 'postgres', # PostgreSQL connection timeout (seconds) 'connect_timeout': 5, # Time to wait before trying to reconnect again after a reconnect failure 'reconnect_cooldown': 30, # How often to check the version of PostgreSQL 'version_check_period': 300, # Metrics 'metrics': {} } def update_deep(d1, d2): """ Recursively update a dict, adding keys to dictionaries and appending to lists. Note that this both modifies and returns the first dict. Params: d1: the dictionary to update d2: the dictionary to get new values from Returns: The new d1 """ if not isinstance(d1, dict) or not isinstance(d2, dict): raise TypeError('Both arguments to update_deep need to be dictionaries') for k, v2 in d2.items(): if isinstance(v2, dict): v1 = d1.get(k, {}) if not isinstance(v1, dict): raise TypeError('Type mismatch between dictionaries: {} is not a dict'.format(type(v1).__name__)) d1[k] = update_deep(v1, v2) elif isinstance(v2, list): v1 = d1.get(k, []) if not isinstance(v1, list): raise TypeError('Type mismatch between dictionaries: {} is not a list'.format(type(v1).__name__)) d1[k] = v1 + v2 else: d1[k] = v2 return d1 def read_config(path, included = False): """ Read a config file. params: path: path to the file to read included: is this file included by another file? """ # Read config file log.info(f"Reading log file: {path}") with open(path, 'r') as f: cfg = yaml.safe_load(f) # Read any included config files for inc in cfg.get('include', []): update_deep(cfg, read_config(inc, included=True)) # Return the config we read if this is an include, otherwise set the final # config if included: return cfg else: new_config = {} new_config.update(default_config) update_deep(new_config, cfg) # Read any external queries for metric in new_config.get('metrics', {}).values(): for vers, query in metric['query'].items(): if query.startswith('file:'): path = query[5:] with open(path, 'r') as f: metric['query'][vers] = f.read() # Minor sanity checks if len(new_config['metrics']) == 0: log.error("No metrics are defined") raise ConfigError() global config config = new_config # Apply changes to log level log.setLevel(logging.getLevelName(config['log_level'].upper())) def signal_handler(sig, frame): """ Function for handling signals HUP => Reload """ # Restore the original handler signal.signal(signal.SIGINT, signal.default_int_handler) # Signal everything to shut down if sig in [ signal.SIGINT, signal.SIGTERM, signal.SIGQUIT ]: log.info("Shutting down ...") global running running = False if httpd is not None: httpd.socket.close() # Signal a reload if sig == signal.SIGHUP: log.warning("Received config reload signal") read_config(config_file) def get_pool(dbname): """ Get a database connection pool. """ # Check if the db is unhappy and wants to be left alone if dbname in unhappy: if unhappy[dbname] > datetime.now(): raise UnhappyDBError() if dbname not in connections: with connections_lock: # Make sure nobody created the pool while we were waiting on the # lock if dbname not in connections: log.info(f"Creating connection pool for: {dbname}") connections[dbname] = ConnectionPool( name=dbname, min_size=0, max_size=int(config['pool_size']), conninfo=f"dbname={dbname} application_name=pgmon {config['connstr']}", reconnect_timeout=float(config['connect_timeout']), reconnect_failed=handle_connect_failure) # Clear the unhappy indicator if present unhappy.pop(dbname, None) return connections[dbname] def handle_connect_failure(pool): """ Mark the database as being unhappy so we can leave it alone for a while """ dbname = pool.name unhappy[dbname] = datetime.now() + timedelta(seconds=int(config['reconnect_cooldown'])) def get_query(metric, version): """ Get the correct metric query for a given version of PostgreSQL. params: metric: The metric definition version: The PostgreSQL version number, as given by server_version_num """ # Select the correct query for v in reversed(sorted(metric['query'].keys())): if version >= v: return metric['query'][v] raise Exception('Missing metric query') def run_query_no_retry(pool, return_type, query, args): """ Run the query with no explicit retry code """ with pool.connection(timeout=float(config['connect_timeout'])) as conn: try: with conn.cursor(row_factory=psycopg.rows.dict_row) as curs: curs.execute(query, args) res = curs.fetchall() if return_type == 'value': return str(list(res[0].values())[0]) elif return_type == 'row': return json.dumps(res[0]) elif return_type == 'column': return json.dumps([list(r.values())[0] for r in res]) elif return_type == 'set': return json.dumps(res) except: dbname = pool.name if dbname in unhappy: raise UnhappyDBError() elif conn.broken: raise DisconnectedError() else: raise def run_query(pool, return_type, query, args): """ Run the query, and if we find upon the first attempt that the connection had been closed, wait a second and try again. This is because psycopg doesn't know if a connection closed (ie: PostgreSQL was restarted or the backend was terminated) until you try to execute a query. Note that the pool has its own retry mechanism as well, but it only applies to new connections being made. Also, this will not retry a query if the query itself failed, or if the database connection could not be established. """ # If we get disconnected, I think the putconn command will close the dead # connection. So we can just give it another shot. try: return run_query_no_retry(pool, return_type, query, args) except DisconnectedError: log.warning("Stale PostgreSQL connection found ... trying again") # This sleep is an annoyinh hack to give the pool workers time to # actually mark the connection, otherwise it can be given back in the # next connection() call time.sleep(1) return run_query_no_retry(pool, return_type, query, args) def get_cluster_version(): """ Get the PostgreSQL version if we don't already know it, or if it's been too long sice the last time it was checked. """ global cluster_version global cluster_version_next_check # If we don't know the version or it's past the recheck time, get the # version from the database. Only one thread needs to do this, so they all # try to grab the lock, and then make sure nobody else beat them to it. if cluster_version is None or cluster_version_next_check is None or cluster_version_next_check < datetime.now(): with cluster_version_lock: # Only check if nobody already got the version before us if cluster_version is None or cluster_version_next_check is None or cluster_version_next_check < datetime.now(): log.info('Checking PostgreSQL cluster version') pool = get_pool(config['dbname']) cluster_version = int(run_query(pool, 'value', 'SHOW server_version_num', None)) cluster_version_next_check = datetime.now() + timedelta(seconds=int(config['version_check_period'])) log.info(f"Got PostgreSQL cluster version: {cluster_version}") log.debug(f"Next PostgreSQL cluster version check will be after: {cluster_version_next_check}") return cluster_version class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): """ This is our request handling server. It is responsible for listening for requests, processing them, and responding. """ def log_request(self, code='-', size='-'): """ Override to suppress standard request logging """ pass def do_GET(self): """ Handle a request. This is just a wrapper around the actual handler code to keep things more readable. """ try: self._handle_request() except BrokenPipeError: log.error("Client disconnected, exiting handler") def _handle_request(self): """ Request handler """ # Parse the URL parsed_path = urlparse(self.path) name = parsed_path.path.strip('/') parsed_query = parse_qs(parsed_path.query) if name == 'agent_version': self._reply(200, f"{VERSION}") return # Note: parse_qs returns the values as a list. Since we always expect # single values, just grab the first from each. args = {key: values[0] for key, values in parsed_query.items()} # Get the metric definition try: metric = config['metrics'][name] except KeyError: log.error(f"Unknown metric: {name}") self._reply(404, 'Unknown metric') return # Get the dbname. If none was provided, use the default from the # config. dbname = args.get('dbname', config['dbname']) # Get the connection pool for the database, or create one if it doesn't # already exist. try: pool = get_pool(dbname) except UnhappyDBError: log.info(f"Database {dbname} is unhappy, please be patient") self._reply(503, 'Database unavailable') return # Identify the PostgreSQL version try: version = get_cluster_version() except UnhappyDBError: return except Exception as e: if dbname in unhappy: log.info(f"Database {dbname} is unhappy, please be patient") self._reply(503, 'Database unavailable') else: log.error(f"Failed to get PostgreSQL version: {e}") self._reply(500, 'Error getting DB version') return # Get the query version try: query = get_query(metric, version) except KeyError: log.error(f"Failed to find a version of {name} for {version}") self._reply(404, 'Unsupported version') return # Execute the quert try: self._reply(200, run_query(pool, metric['type'], query, args)) return except Exception as e: if dbname in unhappy: log.info(f"Database {dbname} is unhappy, please be patient") self._reply(503, 'Database unavailable') else: log.error(f"Error running query: {e}") self._reply(500, "Error running query") return def _reply(self, code, content): """ Send a reply to the client """ self.send_response(code) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(bytes(content, 'utf-8')) if __name__ == '__main__': # Handle cli args parser = argparse.ArgumentParser( prog = 'pgmon', description='A PostgreSQL monitoring agent') parser.add_argument('config_file', default='pgmon.yml', nargs='?', help='The config file to read (default: %(default)s)') args = parser.parse_args() # Set the config file path config_file = args.config_file # Read the config file read_config(config_file) # Set up the http server to receive requests server_address = ('127.0.0.1', config['port']) httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) # Set up the signal handler signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGHUP, signal_handler) # Handle requests. log.info(f"Listening on port {config['port']}...") while running: httpd.handle_request() # Clean up PostgreSQL connections # TODO: Improve this ... not sure it actually closes all the connections cleanly for pool in connections.values(): pool.close()