Cache PostgreSQL version
This commit is contained in:
parent
1e9f8e4b1d
commit
c10079d2c5
105
pgmon.py
105
pgmon.py
@ -33,6 +33,11 @@ connections = {}
|
|||||||
# basically it's the time when we should try to connect to the database again.
|
# basically it's the time when we should try to connect to the database again.
|
||||||
unhappy = {}
|
unhappy = {}
|
||||||
|
|
||||||
|
# Version information
|
||||||
|
cluster_version = None
|
||||||
|
cluster_version_next_check = None
|
||||||
|
cluster_version_lock = Lock()
|
||||||
|
|
||||||
# Running state (used to gracefully shut down)
|
# Running state (used to gracefully shut down)
|
||||||
running = True
|
running = True
|
||||||
|
|
||||||
@ -80,6 +85,9 @@ default_config = {
|
|||||||
# Time to wait before trying to reconnect again after a reconnect failure
|
# Time to wait before trying to reconnect again after a reconnect failure
|
||||||
'reconnect_cooldown': 30,
|
'reconnect_cooldown': 30,
|
||||||
|
|
||||||
|
# How often to check the version of PostgreSQL
|
||||||
|
'version_check_period': 300,
|
||||||
|
|
||||||
# Metrics
|
# Metrics
|
||||||
'metrics': {}
|
'metrics': {}
|
||||||
}
|
}
|
||||||
@ -223,6 +231,9 @@ def get_query(metric, version):
|
|||||||
|
|
||||||
|
|
||||||
def run_query_no_retry(pool, return_type, query, args):
|
def run_query_no_retry(pool, return_type, query, args):
|
||||||
|
"""
|
||||||
|
Run the query with no explicit retry code
|
||||||
|
"""
|
||||||
with pool.connection(timeout=float(config['connect_timeout'])) as conn:
|
with pool.connection(timeout=float(config['connect_timeout'])) as conn:
|
||||||
try:
|
try:
|
||||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as curs:
|
with conn.cursor(row_factory=psycopg.rows.dict_row) as curs:
|
||||||
@ -247,6 +258,18 @@ def run_query_no_retry(pool, return_type, query, args):
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
def run_query(pool, return_type, query, args):
|
def run_query(pool, return_type, query, args):
|
||||||
|
"""
|
||||||
|
Run the query, and if we find upon the first attempt that the connection
|
||||||
|
had been closed, wait a second and try again. This is because psycopg
|
||||||
|
doesn't know if a connection closed (ie: PostgreSQL was restarted or the
|
||||||
|
backend was terminated) until you try to execute a query.
|
||||||
|
|
||||||
|
Note that the pool has its own retry mechanism as well, but it only applies
|
||||||
|
to new connections being made.
|
||||||
|
|
||||||
|
Also, this will not retry a query if the query itself failed, or if the
|
||||||
|
database connection could not be established.
|
||||||
|
"""
|
||||||
# If we get disconnected, I think the putconn command will close the dead
|
# If we get disconnected, I think the putconn command will close the dead
|
||||||
# connection. So we can just give it another shot.
|
# connection. So we can just give it another shot.
|
||||||
try:
|
try:
|
||||||
@ -259,12 +282,56 @@ def run_query(pool, return_type, query, args):
|
|||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
return run_query_no_retry(pool, return_type, query, args)
|
return run_query_no_retry(pool, return_type, query, args)
|
||||||
|
|
||||||
|
def get_cluster_version():
|
||||||
|
"""
|
||||||
|
Get the PostgreSQL version if we don't already know it, or if it's been
|
||||||
|
too long sice the last time it was checked.
|
||||||
|
"""
|
||||||
|
global cluster_version
|
||||||
|
global cluster_version_next_check
|
||||||
|
|
||||||
|
# If we don't know the version or it's past the recheck time, get the
|
||||||
|
# version from the database. Only one thread needs to do this, so they all
|
||||||
|
# try to grab the lock, and then make sure nobody else beat them to it.
|
||||||
|
if cluster_version is None or cluster_version_next_check is None or cluster_version_next_check < datetime.now():
|
||||||
|
with cluster_version_lock:
|
||||||
|
# Only check if nobody already got the version before us
|
||||||
|
if cluster_version is None or cluster_version_next_check is None or cluster_version_next_check < datetime.now():
|
||||||
|
log.info('Checking PostgreSQL cluster version')
|
||||||
|
pool = get_pool(config['dbname'])
|
||||||
|
cluster_version = int(run_query(pool, 'value', 'SHOW server_version_num', None))
|
||||||
|
cluster_version_next_check = datetime.now() + timedelta(seconds=int(config['version_check_period']))
|
||||||
|
log.info(f"Got PostgreSQL cluster version: {cluster_version}")
|
||||||
|
log.debug(f"Next PostgreSQL cluster version check will be after: {cluster_version_next_check}")
|
||||||
|
|
||||||
|
return cluster_version
|
||||||
|
|
||||||
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||||
|
"""
|
||||||
|
This is our request handling server. It is responsible for listening for
|
||||||
|
requests, processing them, and responding.
|
||||||
|
"""
|
||||||
|
|
||||||
def log_request(self, code='-', size='-'):
|
def log_request(self, code='-', size='-'):
|
||||||
# Override to suppress logging
|
"""
|
||||||
|
Override to suppress standard request logging
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def do_GET(self):
|
def do_GET(self):
|
||||||
|
"""
|
||||||
|
Handle a request. This is just a wrapper around the actual handler
|
||||||
|
code to keep things more readable.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self._handle_request()
|
||||||
|
except BrokenPipeError:
|
||||||
|
log.error("Client disconnected, exiting handler")
|
||||||
|
|
||||||
|
def _handle_request(self):
|
||||||
|
"""
|
||||||
|
Request handler
|
||||||
|
"""
|
||||||
# Parse the URL
|
# Parse the URL
|
||||||
parsed_path = urlparse(self.path)
|
parsed_path = urlparse(self.path)
|
||||||
name = parsed_path.path.strip('/')
|
name = parsed_path.path.strip('/')
|
||||||
@ -274,7 +341,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
|||||||
self._reply(200, f"{VERSION}")
|
self._reply(200, f"{VERSION}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Note: Parse_qs returns the values as a list. Since we always expect
|
# Note: parse_qs returns the values as a list. Since we always expect
|
||||||
# single values, just grab the first from each.
|
# single values, just grab the first from each.
|
||||||
args = {key: values[0] for key, values in parsed_query.items()}
|
args = {key: values[0] for key, values in parsed_query.items()}
|
||||||
|
|
||||||
@ -286,10 +353,12 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
|||||||
self._reply(404, 'Unknown metric')
|
self._reply(404, 'Unknown metric')
|
||||||
return
|
return
|
||||||
|
|
||||||
# Get the dbname
|
# Get the dbname. If none was provided, use the default from the
|
||||||
|
# config.
|
||||||
dbname = args.get('dbname', config['dbname'])
|
dbname = args.get('dbname', config['dbname'])
|
||||||
|
|
||||||
# Get the connection pool for the database
|
# Get the connection pool for the database, or create one if it doesn't
|
||||||
|
# already exist.
|
||||||
try:
|
try:
|
||||||
pool = get_pool(dbname)
|
pool = get_pool(dbname)
|
||||||
except UnhappyDBError:
|
except UnhappyDBError:
|
||||||
@ -299,20 +368,17 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
|||||||
|
|
||||||
# Identify the PostgreSQL version
|
# Identify the PostgreSQL version
|
||||||
try:
|
try:
|
||||||
version = int(args['vers'])
|
version = get_cluster_version()
|
||||||
except KeyError:
|
except UnhappyDBError:
|
||||||
try:
|
return
|
||||||
version = int(run_query(pool, 'value', 'SHOW server_version_num', None))
|
except Exception as e:
|
||||||
except UnhappyDBError:
|
if dbname in unhappy:
|
||||||
return
|
log.info(f"Database {dbname} is unhappy, please be patient")
|
||||||
except Exception as e:
|
self._reply(503, 'Database unavailable')
|
||||||
if dbname in unhappy:
|
else:
|
||||||
log.info(f"Database {dbname} is unhappy, please be patient")
|
log.error(f"Failed to get PostgreSQL version: {e}")
|
||||||
self._reply(503, 'Database unavailable')
|
self._reply(500, 'Error getting DB version')
|
||||||
else:
|
return
|
||||||
log.error(f"Failed to get PostgreSQL version: {e}")
|
|
||||||
self._reply(500, 'Error getting DB version')
|
|
||||||
return
|
|
||||||
|
|
||||||
# Get the query version
|
# Get the query version
|
||||||
try:
|
try:
|
||||||
@ -336,6 +402,9 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
|||||||
return
|
return
|
||||||
|
|
||||||
def _reply(self, code, content):
|
def _reply(self, code, content):
|
||||||
|
"""
|
||||||
|
Send a reply to the client
|
||||||
|
"""
|
||||||
self.send_response(code)
|
self.send_response(code)
|
||||||
self.send_header('Content-type', 'application/json')
|
self.send_header('Content-type', 'application/json')
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user