From c10079d2c5181316a4d3470f546a607726af2f31 Mon Sep 17 00:00:00 2001 From: James Campbell Date: Thu, 31 Oct 2024 02:18:45 -0400 Subject: [PATCH] Cache PostgreSQL version --- pgmon.py | 105 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 87 insertions(+), 18 deletions(-) diff --git a/pgmon.py b/pgmon.py index 74bc103..407c24a 100755 --- a/pgmon.py +++ b/pgmon.py @@ -33,6 +33,11 @@ connections = {} # basically it's the time when we should try to connect to the database again. unhappy = {} +# Version information +cluster_version = None +cluster_version_next_check = None +cluster_version_lock = Lock() + # Running state (used to gracefully shut down) running = True @@ -80,6 +85,9 @@ default_config = { # Time to wait before trying to reconnect again after a reconnect failure 'reconnect_cooldown': 30, + # How often to check the version of PostgreSQL + 'version_check_period': 300, + # Metrics 'metrics': {} } @@ -223,6 +231,9 @@ def get_query(metric, version): def run_query_no_retry(pool, return_type, query, args): + """ + Run the query with no explicit retry code + """ with pool.connection(timeout=float(config['connect_timeout'])) as conn: try: with conn.cursor(row_factory=psycopg.rows.dict_row) as curs: @@ -247,6 +258,18 @@ def run_query_no_retry(pool, return_type, query, args): raise def run_query(pool, return_type, query, args): + """ + Run the query, and if we find upon the first attempt that the connection + had been closed, wait a second and try again. This is because psycopg + doesn't know if a connection closed (ie: PostgreSQL was restarted or the + backend was terminated) until you try to execute a query. + + Note that the pool has its own retry mechanism as well, but it only applies + to new connections being made. + + Also, this will not retry a query if the query itself failed, or if the + database connection could not be established. + """ # If we get disconnected, I think the putconn command will close the dead # connection. So we can just give it another shot. try: @@ -259,12 +282,56 @@ def run_query(pool, return_type, query, args): time.sleep(1) return run_query_no_retry(pool, return_type, query, args) +def get_cluster_version(): + """ + Get the PostgreSQL version if we don't already know it, or if it's been + too long sice the last time it was checked. + """ + global cluster_version + global cluster_version_next_check + + # If we don't know the version or it's past the recheck time, get the + # version from the database. Only one thread needs to do this, so they all + # try to grab the lock, and then make sure nobody else beat them to it. + if cluster_version is None or cluster_version_next_check is None or cluster_version_next_check < datetime.now(): + with cluster_version_lock: + # Only check if nobody already got the version before us + if cluster_version is None or cluster_version_next_check is None or cluster_version_next_check < datetime.now(): + log.info('Checking PostgreSQL cluster version') + pool = get_pool(config['dbname']) + cluster_version = int(run_query(pool, 'value', 'SHOW server_version_num', None)) + cluster_version_next_check = datetime.now() + timedelta(seconds=int(config['version_check_period'])) + log.info(f"Got PostgreSQL cluster version: {cluster_version}") + log.debug(f"Next PostgreSQL cluster version check will be after: {cluster_version_next_check}") + + return cluster_version + class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): + """ + This is our request handling server. It is responsible for listening for + requests, processing them, and responding. + """ + def log_request(self, code='-', size='-'): - # Override to suppress logging + """ + Override to suppress standard request logging + """ pass def do_GET(self): + """ + Handle a request. This is just a wrapper around the actual handler + code to keep things more readable. + """ + try: + self._handle_request() + except BrokenPipeError: + log.error("Client disconnected, exiting handler") + + def _handle_request(self): + """ + Request handler + """ # Parse the URL parsed_path = urlparse(self.path) name = parsed_path.path.strip('/') @@ -274,7 +341,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): self._reply(200, f"{VERSION}") return - # Note: Parse_qs returns the values as a list. Since we always expect + # Note: parse_qs returns the values as a list. Since we always expect # single values, just grab the first from each. args = {key: values[0] for key, values in parsed_query.items()} @@ -286,10 +353,12 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): self._reply(404, 'Unknown metric') return - # Get the dbname + # Get the dbname. If none was provided, use the default from the + # config. dbname = args.get('dbname', config['dbname']) - # Get the connection pool for the database + # Get the connection pool for the database, or create one if it doesn't + # already exist. try: pool = get_pool(dbname) except UnhappyDBError: @@ -299,20 +368,17 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): # Identify the PostgreSQL version try: - version = int(args['vers']) - except KeyError: - try: - version = int(run_query(pool, 'value', 'SHOW server_version_num', None)) - except UnhappyDBError: - return - except Exception as e: - if dbname in unhappy: - log.info(f"Database {dbname} is unhappy, please be patient") - self._reply(503, 'Database unavailable') - else: - log.error(f"Failed to get PostgreSQL version: {e}") - self._reply(500, 'Error getting DB version') - return + version = get_cluster_version() + except UnhappyDBError: + return + except Exception as e: + if dbname in unhappy: + log.info(f"Database {dbname} is unhappy, please be patient") + self._reply(503, 'Database unavailable') + else: + log.error(f"Failed to get PostgreSQL version: {e}") + self._reply(500, 'Error getting DB version') + return # Get the query version try: @@ -336,6 +402,9 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): return def _reply(self, code, content): + """ + Send a reply to the client + """ self.send_response(code) self.send_header('Content-type', 'application/json') self.end_headers()