From 1e9f8e4b1d8bdf57556cd0a74914fc834aca2b36 Mon Sep 17 00:00:00 2001 From: James Campbell Date: Thu, 31 Oct 2024 01:03:43 -0400 Subject: [PATCH] Leave failed dbs alone for a bit --- pgmon.py | 89 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 72 insertions(+), 17 deletions(-) diff --git a/pgmon.py b/pgmon.py index d703b52..74bc103 100755 --- a/pgmon.py +++ b/pgmon.py @@ -7,6 +7,8 @@ import time import argparse import logging +from datetime import datetime, timedelta + import psycopg from psycopg_pool import ConnectionPool @@ -26,6 +28,11 @@ config = {} connections_lock = Lock() connections = {} +# Dictionary of unhappy databases. Keys are database names, value is the time +# the database was determined to be unhappy plus the cooldown setting. So, +# basically it's the time when we should try to connect to the database again. +unhappy = {} + # Running state (used to gracefully shut down) running = True @@ -47,6 +54,8 @@ class ConfigError(Exception): pass class DisconnectedError(Exception): pass +class UnhappyDBError(Exception): + pass # Default config settings default_config = { @@ -65,6 +74,12 @@ default_config = { # Default database to connect to when none is specified for a metric 'dbname': 'postgres', + # PostgreSQL connection timeout (seconds) + 'connect_timeout': 5, + + # Time to wait before trying to reconnect again after a reconnect failure + 'reconnect_cooldown': 30, + # Metrics 'metrics': {} } @@ -76,7 +91,7 @@ def update_deep(d1, d2): Params: d1: the dictionary to update - d2: the dictionary to get new values from + d2: the dictionary to get new values from Returns: The new d1 @@ -102,7 +117,7 @@ def read_config(path, included = False): log.info(f"Reading log file: {path}") with open(path, 'r') as f: cfg = yaml.safe_load(f) - + # Read any included config files for inc in cfg.get('include', []): update_deep(cfg, read_config(inc, included=True)) @@ -162,6 +177,11 @@ def get_pool(dbname): """ Get a database connection pool. """ + # Check if the db is unhappy and wants to be left alone + if dbname in unhappy: + if unhappy[dbname] > datetime.now(): + raise UnhappyDBError() + if dbname not in connections: with connections_lock: # Make sure nobody created the pool while we were waiting on the @@ -169,10 +189,22 @@ def get_pool(dbname): if dbname not in connections: log.info(f"Creating connection pool for: {dbname}") connections[dbname] = ConnectionPool( - min_size=0, max_size=config['pool_size'], - conninfo=f"dbname={dbname} {config['connstr']}") + name=dbname, + min_size=0, max_size=int(config['pool_size']), + conninfo=f"dbname={dbname} application_name=pgmon {config['connstr']}", + reconnect_timeout=float(config['connect_timeout']), + reconnect_failed=handle_connect_failure) + # Clear the unhappy indicator if present + unhappy.pop(dbname, None) return connections[dbname] +def handle_connect_failure(pool): + """ + Mark the database as being unhappy so we can leave it alone for a while + """ + dbname = pool.name + unhappy[dbname] = datetime.now() + timedelta(seconds=int(config['reconnect_cooldown'])) + def get_query(metric, version): """ @@ -190,8 +222,8 @@ def get_query(metric, version): raise Exception('Missing metric query') -def run_query_no_reconnect(pool, return_type, query, args): - with pool.connection() as conn: +def run_query_no_retry(pool, return_type, query, args): + with pool.connection(timeout=float(config['connect_timeout'])) as conn: try: with conn.cursor(row_factory=psycopg.rows.dict_row) as curs: curs.execute(query, args) @@ -206,7 +238,10 @@ def run_query_no_reconnect(pool, return_type, query, args): elif return_type == 'set': return json.dumps(res) except: - if conn.broken: + dbname = pool.name + if dbname in unhappy: + raise UnhappyDBError() + elif conn.broken: raise DisconnectedError() else: raise @@ -215,11 +250,14 @@ def run_query(pool, return_type, query, args): # If we get disconnected, I think the putconn command will close the dead # connection. So we can just give it another shot. try: - return run_query_no_reconnect(pool, return_type, query, args) + return run_query_no_retry(pool, return_type, query, args) except DisconnectedError: log.warning("Stale PostgreSQL connection found ... trying again") + # This sleep is an annoyinh hack to give the pool workers time to + # actually mark the connection, otherwise it can be given back in the + # next connection() call time.sleep(1) - return run_query_no_reconnect(pool, return_type, query, args) + return run_query_no_retry(pool, return_type, query, args) class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): def log_request(self, code='-', size='-'): @@ -238,7 +276,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): # Note: Parse_qs returns the values as a list. Since we always expect # single values, just grab the first from each. - args = {key: values[0] for key, values in parsed_query.items()} + args = {key: values[0] for key, values in parsed_query.items()} # Get the metric definition try: @@ -252,7 +290,12 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): dbname = args.get('dbname', config['dbname']) # Get the connection pool for the database - pool = get_pool(dbname) + try: + pool = get_pool(dbname) + except UnhappyDBError: + log.info(f"Database {dbname} is unhappy, please be patient") + self._reply(503, 'Database unavailable') + return # Identify the PostgreSQL version try: @@ -260,15 +303,21 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): except KeyError: try: version = int(run_query(pool, 'value', 'SHOW server_version_num', None)) + except UnhappyDBError: + return except Exception as e: - log.error(f"Failed to get PostgreSQL version: {e}") - self._reply(500, 'Error getting DB version') + if dbname in unhappy: + log.info(f"Database {dbname} is unhappy, please be patient") + self._reply(503, 'Database unavailable') + else: + log.error(f"Failed to get PostgreSQL version: {e}") + self._reply(500, 'Error getting DB version') return # Get the query version try: query = get_query(metric, version) - except: + except KeyError: log.error(f"Failed to find a version of {name} for {version}") self._reply(404, 'Unsupported version') return @@ -276,9 +325,15 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): # Execute the quert try: self._reply(200, run_query(pool, metric['type'], query, args)) + return except Exception as e: - log.error(f"Error running query: {e}") - self._reply(500, "Error running query") + if dbname in unhappy: + log.info(f"Database {dbname} is unhappy, please be patient") + self._reply(503, 'Database unavailable') + else: + log.error(f"Error running query: {e}") + self._reply(500, "Error running query") + return def _reply(self, code, content): self.send_response(code) @@ -297,7 +352,7 @@ if __name__ == '__main__': help='The config file to read (default: %(default)s)') args = parser.parse_args() - + # Set the config file path config_file = args.config_file