#!/usr/bin/env python3 import yaml import json import time import argparse import logging import psycopg from psycopg_pool import ConnectionPool import signal from threading import Thread, Lock, Semaphore from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import ThreadingHTTPServer from urllib.parse import urlparse, parse_qs VERSION = '0.1.0' # Configuration config = {} # Dictionary of current PostgreSQL connection pools connections_lock = Lock() connections = {} # Running state (used to gracefully shut down) running = True # The http server object httpd = None # Where the config file lives config_file = None # Configure logging log = logging.getLogger(__name__) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s: %(funcName)s() line %(lineno)d: %(message)s') console_log_handler = logging.StreamHandler() console_log_handler.setFormatter(formatter) log.addHandler(console_log_handler) # Error types class ConfigError(Exception): pass class DisconnectedError(Exception): pass # Default config settings default_config = { # IPC port 'port': 5400, # Max PostgreSQL connection pool size 'pool_size': 4, # Log level for stderr logging (or 'off') 'log_level': 'debug', # Connection details 'connstr': '', # Default database to connect to when none is specified for a metric 'dbname': 'postgres', # Metrics 'metrics': {} } def read_config(path, included = False): """ Read a config file. params: path: path to the file to read included: is this file included by another file? """ # Read config file log.info(f"Reading log file: {path}") with open(path, 'r') as f: cfg = yaml.safe_load(f) # Read any included config files for inc in cfg.get('include', []): cfg.update(read_config(inc, included=True)) # Return the config we read if this is an include, otherwise set the final # config if included: return cfg else: new_config = {} new_config.update(default_config) new_config.update(cfg) # Minor sanity checks if len(new_config['metrics']) == 0: log.error("No metrics are defined") raise ConfigError() global config config = new_config # Apply changes to log level log.setLevel(logging.getLevelName(config['log_level'].upper())) def signal_handler(sig, frame): """ Function for handling signals HUP => Reload """ # Restore the original handler signal.signal(signal.SIGINT, signal.default_int_handler) # Signal everything to shut down if sig in [ signal.SIGINT, signal.SIGTERM, signal.SIGQUIT ]: log.info("Shutting down ...") global running running = False if httpd is not None: httpd.socket.close() # Signal a reload if sig == signal.SIGHUP: log.warning("Received config reload signal") read_config(config_file) def get_pool(dbname): """ Get a database connection pool. """ if dbname not in connections: with connections_lock: # Make sure nobody created the pool while we were waiting on the # lock if dbname not in connections: log.info(f"Creating connection pool for: {dbname}") connections[dbname] = ConnectionPool( min_size=0, max_size=config['pool_size'], conninfo=f"dbname={dbname} {config['connstr']}") return connections[dbname] def get_query(metric, version): """ Get the correct metric query for a given version of PostgreSQL. params: metric: The metric definition version: The PostgreSQL version number, as given by server_version_num """ # Select the correct query for v in reversed(sorted(metric['query'].keys())): if version >= v: return metric['query'][v] raise Exception('Missing metric query') def run_query_no_reconnect(pool, return_type, query, args): with pool.connection() as conn: try: with conn.cursor(row_factory=psycopg.rows.dict_row) as curs: curs.execute(query, args) res = curs.fetchall() if return_type == 'value': return str(list(res[0].values())[0]) elif return_type == 'row': return json.dumps(res[0]) elif return_type == 'column': return json.dumps([list(r.values())[0] for r in res]) elif return_type == 'set': return json.dumps(res) except: if conn.broken: raise DisconnectedError() else: raise def run_query(pool, return_type, query, args): # If we get disconnected, I think the putconn command will close the dead # connection. So we can just give it another shot. try: return run_query_no_reconnect(pool, return_type, query, args) except DisconnectedError: log.warning("Stale PostgreSQL connection found ... trying again") time.sleep(1) return run_query_no_reconnect(pool, return_type, query, args) class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): def log_request(self, code='-', size='-'): # Override to suppress logging pass def do_GET(self): # Parse the URL parsed_path = urlparse(self.path) name = parsed_path.path.strip('/') parsed_query = parse_qs(parsed_path.query) if name == 'agent_version': self._reply(200, f"{VERSION}") return # Note: Parse_qs returns the values as a list. Since we always expect # single values, just grab the first from each. args = {key: values[0] for key, values in parsed_query.items()} # Get the metric definition try: metric = config['metrics'][name] except KeyError: log.error(f"Unknown metric: {name}") self._reply(404, 'Unknown metric') return # Get the dbname dbname = args.get('dbname', config['dbname']) # Get the connection pool for the database pool = get_pool(dbname) # Identify the PostgreSQL version try: version = int(args['vers']) except KeyError: try: version = int(run_query(pool, 'value', 'SHOW server_version_num', None)) except Exception as e: log.error(f"Failed to get PostgreSQL version: {e}") self._reply(500, 'Error getting DB version') return # Get the query version try: query = get_query(metric, version) except: log.error(f"Failed to find a version of {name} for {version}") self._reply(404, 'Unsupported version') return # Execute the quert try: self._reply(200, run_query(pool, metric['type'], query, args)) except Exception as e: log.error(f"Error running query: {e}") self._reply(500, "Error running query") def _reply(self, code, content): self.send_response(code) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(bytes(content, 'utf-8')) if __name__ == '__main__': # Handle cli args parser = argparse.ArgumentParser( prog = 'pgmon', description='A PostgreSQL monitoring agent') parser.add_argument('config_file', default='pgmon.yml', nargs='?', help='The config file to read (default: %(default)s)') args = parser.parse_args() # Set the config file path config_file = args.config_file # Read the config file read_config(config_file) # Set up the http server to receive requests server_address = ('127.0.0.1', config['port']) httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) # Set up the signal handler signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGHUP, signal_handler) # Handle requests. log.info(f"Listening on port {config['port']}...") while running: httpd.handle_request() # Clean up PostgreSQL connections # TODO: Improve this ... not sure it actually closes all the connections cleanly for pool in connections.values(): pool.close()