Add reload capability

This commit is contained in:
James Campbell 2024-05-18 14:09:43 -04:00
parent f4033a9639
commit 1e72d34f5f
Signed by: james
GPG Key ID: 2287C33A40DC906A
2 changed files with 278 additions and 34 deletions

View File

@ -1,6 +1,72 @@
##
# Misc agent settings
##
# Where to write/find the agent PID
#pid_file=/tmp/pgmon.pid
##
# Agent communication settings
##
# IPC socket
#ipc_socket=/tmp/pgmon.sock
# IPC communication timeout (s)
#ipc_timeout=10
# Request processing timeout (s)
#request_timeout=10
# Max size of the request queue before it blocks
#request_queue_size=100
# Max time to wait when queueing a request (s)
#request_queue_timeout=2
##
# Agent resource settings
##
# Number of worker threads
#worker_count=4
##
# Logging settings
##
# Log level for stderr logging (or 'off')
#stderr_log_level=info
# Log level for file logging (od 'off')
#file_log_level=info
# Log file
#log_file=pgmon.log
##
# DB connection settings
##
#host=localhost
host=/var/run/postgresql host=/var/run/postgresql
port=5432 #port=5432
user=postgres #user=postgres
#password=None
# Default database to connect to when none is specified for a metric
#dbname=postgres
##
# Monitoring configuration
##
# Metrics
#metrics={}
metric=max_frozen_age:value::SELECT max(age(datfrozenxid)) FROM pg_database metric=max_frozen_age:value::SELECT max(age(datfrozenxid)) FROM pg_database
metric=db_stats:row::SELECT * FROM pg_stat_database WHERE datname = '{datname}' metric=db_stats:row::SELECT * FROM pg_stat_database WHERE datname='{datname}'
metric=discover_dbs:set::SELECT datname FROM pg_database metric=discover_dbs:set::SELECT datname FROM pg_database

238
pgmon.py
View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import logging
import socket import socket
import sys import sys
import threading import threading
@ -11,6 +10,7 @@ import queue
import os import os
import signal import signal
import json import json
import logging
# #
# Errors # Errors
@ -26,9 +26,91 @@ class RequestTimeoutError(Exception):
pass pass
# #
# Global variables # Logging
#
logger = None
# Store the current log file since there is no public method to get the filename
# from a FileHandler object
current_log_file = None
# Handler objects for easy adding/removing/modifying
file_log_handler = None
stderr_log_handler = None
def init_logging(config):
"""
Initialize (or re-initialize/modify) logging
"""
global logger
global file_log_handler
global stderr_log_handler
# Get the logger object
logger = logging.getLogger(__name__)
# Create a common formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Set up or modify stderr logging
if config.stderr_log_level != 'OFF':
# Create and add the handler if it doesn't exist
if stderr_log_handler is None:
stderr_log_handler = logging.StreamHandler()
logger.addHandler(stderr_log_handler)
# Set the formatter
stderr_log_handler.setFormatter(formatter)
# Set the log level
level = logging.getLevelName(config.stderr_log_level)
stderr_log_handler.setLevel(level)
else:
if stderr_log_handler is not None:
logger.removeHandler(stderr_log_handler)
stderr_log_handler = None
# Set up or modify file logging
if config.file_log_level != 'OFF':
# Create and add the handler if it doesn't exist
if file_log_handler is None:
file_log_handler = logging.FileHandler(config.log_file, encoding='utf-8')
logger.addHandler(file_log_handler)
# Set the formatter
file_log_handler.setFormatter(formatter)
# Set the log level
level = logging.getLevelName(config.file_log_level)
file_log_handler.setLevel(level)
# Note where logs are being written
print("Logging to {}".format(config.log_file))
else:
if file_log_handler is not None:
logger.removeHandler(file_log_handler)
file_log_handler = None
#
# PID file handling
#
def write_pid_file(pid_file):
with open(pid_file, 'w') as f:
f.write("{}".format(os.getpid()))
def read_pid_file(pid_file):
with open(pid_file, 'r') as f:
return int(f.read().strip())
def remove_pid_file(pid_file):
os.unlink(pid_file)
#
# Global flags for signal handler
# #
running = True running = True
reload = False
# #
# Signal handler # Signal handler
@ -38,10 +120,14 @@ def signal_handler(sig, frame):
signal.signal(signal.SIGINT, signal.default_int_handler) signal.signal(signal.SIGINT, signal.default_int_handler)
# Signal everything to shut down # Signal everything to shut down
if sig == signal.SIGINT: if sig in [ signal.SIGINT, signal.SIGTERM, signal.SIGQUIT ]:
print("Shutting down ...") logger.info("Shutting down ...")
global running global running
running = False running = False
elif sig == signal.SIGHUP:
logger.info("Reloading config ...")
global reload
reload = True
# #
# Classes # Classes
@ -53,6 +139,8 @@ class Config:
""" """
def __init__(self, config_file, read_metrics = True): def __init__(self, config_file, read_metrics = True):
# Set defaults # Set defaults
self.pid_file = '/tmp/pgmon.pid' # PID file
self.ipc_socket = '/tmp/pgmon.sock' # IPC socket self.ipc_socket = '/tmp/pgmon.sock' # IPC socket
self.ipc_timeout = 10 # IPC communication timeout (s) self.ipc_timeout = 10 # IPC communication timeout (s)
self.request_timeout = 10 # Request processing timeout (s) self.request_timeout = 10 # Request processing timeout (s)
@ -60,6 +148,10 @@ class Config:
self.request_queue_timeout = 2 # Max time to wait when queueing a request (s) self.request_queue_timeout = 2 # Max time to wait when queueing a request (s)
self.worker_count = 4 # Number of worker threads self.worker_count = 4 # Number of worker threads
self.stderr_log_level = 'INFO' # Log level for stderr logging (or 'off')
self.file_log_level = 'INFO' # Log level for file logging (od 'off')
self.log_file = 'pgmon.log' # Log file
self.host = 'localhost' # DB host self.host = 'localhost' # DB host
self.port = 5432 # DB port self.port = 5432 # DB port
self.user = 'postgres' # DB user self.user = 'postgres' # DB user
@ -90,9 +182,11 @@ class Config:
if key == 'include': if key == 'include':
self.read_file(value, read_metric) self.read_file(value, read_metric)
elif key == 'socket': elif key == 'pid_file':
self.pid_file = value
elif key == 'ipc_socket':
self.ipc_socket = value self.ipc_socket = value
elif key == 'socket_timeout': elif key == 'ipc_timeout':
self.ipc_timeout = float(value) self.ipc_timeout = float(value)
elif key == 'request_timeout': elif key == 'request_timeout':
self.request_timeout = float(value) self.request_timeout = float(value)
@ -102,6 +196,12 @@ class Config:
self.request_queue_timeout = float(value) self.request_queue_timeout = float(value)
elif key == 'worker_count': elif key == 'worker_count':
self.worker_count = int(value) self.worker_count = int(value)
elif key == 'stderr_log_level':
self.stderr_log_level = value.upper()
elif key == 'file_log_level':
self.file_log_level = value.upper()
elif key == 'log_file':
self.log_file = value
elif key == 'host': elif key == 'host':
self.host = value self.host = value
elif key == 'port': elif key == 'port':
@ -115,6 +215,8 @@ class Config:
elif key == 'metric': elif key == 'metric':
if read_metrics: if read_metrics:
self.add_metric(value) self.add_metric(value)
else:
raise InvalidConfigError("WARNING: Unknown config: {}".format(key))
def add_metric(self, metric_def): def add_metric(self, metric_def):
(name, ret_type, version, sql) = metric_def.split(':', 3) (name, ret_type, version, sql) = metric_def.split(':', 3)
@ -170,7 +272,7 @@ class DB:
if self.conn is not None: if self.conn is not None:
self.conn.close() self.conn.close()
except psycopg2.Error as e: except psycopg2.Error as e:
print("Caught an error when closing a connection: {}".format(e)) logger.warning("Caught an error when closing a connection: {}".format(e))
self.conn = None self.conn = None
@ -322,7 +424,9 @@ class Agent:
The agent side of the connector The agent side of the connector
""" """
@staticmethod @staticmethod
def run(config, key): def run(config_file, key):
config = Config(config_file, read_metrics = False)
# Connect to the socket # Connect to the socket
ipc = IPC(config, 'agent') ipc = IPC(config, 'agent')
try: try:
@ -346,22 +450,81 @@ class Server:
""" """
The server side of the connector The server side of the connector
""" """
@staticmethod
def run(config): def __init__(self, config_file):
self.config_file = config_file
# Load config
self.config = Config(config_file)
# Write pid file
# Note: we record the PID file here so it can't be changed with reload
self.pid_file = self.config.pid_file
write_pid_file(self.pid_file)
# Initialize logging
init_logging(self.config)
# Set up the signal handler # Set up the signal handler
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
# Create reqest queue # Create reqest queue
req_queue = queue.Queue(config.request_queue_size) self.req_queue = queue.Queue(self.config.request_queue_size)
# Spawn worker threads # Spawn worker threads
self.workers = self.spawn_workers(self.config)
def spawn_workers(self, config):
logger.info("Spawning {} workers".format(config.worker_count))
workers = [None] * config.worker_count workers = [None] * config.worker_count
for i in range(config.worker_count): for i in range(config.worker_count):
workers[i] = Worker(config, req_queue) workers[i] = Worker(config, self.req_queue)
workers[i].start() workers[i].start()
return workers
def retire_workers(self, workers):
logger.info("Retiring {} workers".format(len(workers)))
for worker in workers:
worker.active = False
for worker in workers:
worker.join()
def reload_config(self):
# Clear the reload flag
global reload
reload = False
# Read new config
new_config = Config(self.config_file)
# Re-init logging in case settings changed
init_logging(new_config)
# Spawn new workers
new_workers = self.spawn_workers(new_config)
# Replace workers
old_workers = self.workers
self.workers = new_workers
# Retire old workers
self.retire_workers(old_workers)
# Adjust other settings
# TODO
self.config = new_config
def run(self):
logger.info("Server starting")
# Listen on ipc socket # Listen on ipc socket
ipc = IPC(config, 'server') ipc = IPC(self.config, 'server')
while True: while True:
# Wait for a request connection # Wait for a request connection
@ -374,6 +537,13 @@ class Server:
if not running: if not running:
break break
# Check if we're supposed to reload the config
if reload:
try:
self.reload_config()
except Exceptioin as e:
logger.ERROR("Reload failed: {}".format(e))
# If we just timed out waiting for a request, go back to waiting # If we just timed out waiting for a request, go back to waiting
if conn is None: if conn is None:
continue continue
@ -393,11 +563,11 @@ class Server:
(k, v) = arg.split('=', 1) (k, v) = arg.split('=', 1)
args_dict[k] = v args_dict[k] = v
except socket.timeout: except socket.timeout:
print("IPC communication timeout receiving request") logger.warning("IPC communication timeout receiving request")
conn.close() conn.close()
continue continue
except Exception: except Exception:
print("Received invalid request: '{}'".format(key)) logger.warning("Received invalid request: '{}'".format(key))
ipc.send(conn, "ERROR: Invalid key") ipc.send(conn, "ERROR: Invalid key")
conn.close() conn.close()
continue continue
@ -407,21 +577,23 @@ class Server:
# Queue the request # Queue the request
try: try:
req_queue.put(req, timeout=config.request_queue_timeout) self.req_queue.put(req, timeout=self.config.request_queue_timeout)
except: except:
print("Failed to queue request") logger.warning("Failed to queue request")
req.set_result("ERROR: Queue timeout") req.set_result("ERROR: Queue timeout")
continue continue
# Spawn a thread to wait for the result # Spawn a thread to wait for the result
r = Responder(config, ipc, conn, req) r = Responder(self.config, ipc, conn, req)
r.start() r.start()
# Join worker threads # Join worker threads
for worker in workers: self.retire_workers(self.workers)
worker.join()
print("Good bye") # Clean up the PID file
remove_pid_file(self.pid_file)
logger.info("Good bye")
class Worker(threading.Thread): class Worker(threading.Thread):
@ -434,6 +606,8 @@ class Worker(threading.Thread):
self.config = config self.config = config
self.queue = queue self.queue = queue
self.active = True
def run(self): def run(self):
while True: while True:
# Wait for a request # Wait for a request
@ -443,7 +617,8 @@ class Worker(threading.Thread):
req = None req = None
# Check if we're supposed to exit # Check if we're supposed to exit
if not running: if not self.active:
logger.info("Worker exiting: tid={}".format(self.native_id))
break break
# If we got here because we waited too long for a request, go back to waiting # If we got here because we waited too long for a request, go back to waiting
@ -462,7 +637,7 @@ class Worker(threading.Thread):
pg_version = self.config.get_pg_version() pg_version = self.config.get_pg_version()
except Exception as e: except Exception as e:
req.set_result("Failed to retrieve database version") req.set_result("Failed to retrieve database version")
print("Error: {}".format(e)) logger.error("Failed to get Postgresql version: {}".format(e))
continue continue
# Get the query to use # Get the query to use
@ -489,7 +664,7 @@ class Worker(threading.Thread):
except: except:
pass pass
req.set_result("Failed to query database") req.set_result("Failed to query database")
print("Error: {}".format(e)) logger.error("Database error: {}".format(e))
continue continue
# Set the result on the request # Set the result on the request
@ -507,8 +682,7 @@ class Worker(threading.Thread):
try: try:
db.close() db.close()
except Exception as e: except Exception as e:
print("Failed to close database connection") logger.debug("Failed to close database connection: {}".format(e))
print("Error: {}".format(e))
class Responder(threading.Thread): class Responder(threading.Thread):
""" """
@ -534,6 +708,7 @@ def main():
# Operational mode # Operational mode
parser.add_argument('-s', '--server', action='store_true') parser.add_argument('-s', '--server', action='store_true')
parser.add_argument('-r', '--reload', action='store_true')
# Agent options # Agent options
parser.add_argument('key', nargs='?') parser.add_argument('key', nargs='?')
@ -541,11 +716,14 @@ def main():
args = parser.parse_args() args = parser.parse_args()
if args.server: if args.server:
config = Config(args.config) server = Server(args.config)
Server.run(config) server.run()
else: elif args.reload:
config = Config(args.config, read_metrics = False) config = Config(args.config, read_metrics = False)
Agent.run(config, args.key) pid = read_pid_file(config.pid_file)
os.kill(pid, signal.SIGHUP)
else:
Agent.run(args.config, args.key)
if __name__ == '__main__': if __name__ == '__main__':