diff --git a/pgmon.cfg b/pgmon.cfg index 955e4a8..ab7131c 100644 --- a/pgmon.cfg +++ b/pgmon.cfg @@ -1,6 +1,72 @@ +## +# Misc agent settings +## + +# Where to write/find the agent PID +#pid_file=/tmp/pgmon.pid + +## +# Agent communication settings +## + +# IPC socket +#ipc_socket=/tmp/pgmon.sock + +# IPC communication timeout (s) +#ipc_timeout=10 + +# Request processing timeout (s) +#request_timeout=10 + +# Max size of the request queue before it blocks +#request_queue_size=100 + +# Max time to wait when queueing a request (s) +#request_queue_timeout=2 + +## +# Agent resource settings +## + +# Number of worker threads +#worker_count=4 + +## +# Logging settings +## + +# Log level for stderr logging (or 'off') +#stderr_log_level=info + +# Log level for file logging (od 'off') +#file_log_level=info + +# Log file +#log_file=pgmon.log + +## +# DB connection settings +## + +#host=localhost host=/var/run/postgresql -port=5432 -user=postgres +#port=5432 +#user=postgres +#password=None + +# Default database to connect to when none is specified for a metric +#dbname=postgres + +## +# Monitoring configuration +## + +# Metrics +#metrics={} + + + metric=max_frozen_age:value::SELECT max(age(datfrozenxid)) FROM pg_database -metric=db_stats:row::SELECT * FROM pg_stat_database WHERE datname = '{datname}' +metric=db_stats:row::SELECT * FROM pg_stat_database WHERE datname='{datname}' metric=discover_dbs:set::SELECT datname FROM pg_database + diff --git a/pgmon.py b/pgmon.py index 3eef00e..e7f80e7 100755 --- a/pgmon.py +++ b/pgmon.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import argparse -import logging import socket import sys import threading @@ -11,6 +10,7 @@ import queue import os import signal import json +import logging # # Errors @@ -26,9 +26,91 @@ class RequestTimeoutError(Exception): pass # -# Global variables +# Logging +# + +logger = None + +# Store the current log file since there is no public method to get the filename +# from a FileHandler object +current_log_file = None + +# Handler objects for easy adding/removing/modifying +file_log_handler = None +stderr_log_handler = None + +def init_logging(config): + """ + Initialize (or re-initialize/modify) logging + """ + global logger + global file_log_handler + global stderr_log_handler + + # Get the logger object + logger = logging.getLogger(__name__) + + # Create a common formatter + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + + # Set up or modify stderr logging + if config.stderr_log_level != 'OFF': + # Create and add the handler if it doesn't exist + if stderr_log_handler is None: + stderr_log_handler = logging.StreamHandler() + logger.addHandler(stderr_log_handler) + + # Set the formatter + stderr_log_handler.setFormatter(formatter) + + # Set the log level + level = logging.getLevelName(config.stderr_log_level) + stderr_log_handler.setLevel(level) + else: + if stderr_log_handler is not None: + logger.removeHandler(stderr_log_handler) + stderr_log_handler = None + + # Set up or modify file logging + if config.file_log_level != 'OFF': + # Create and add the handler if it doesn't exist + if file_log_handler is None: + file_log_handler = logging.FileHandler(config.log_file, encoding='utf-8') + logger.addHandler(file_log_handler) + + # Set the formatter + file_log_handler.setFormatter(formatter) + + # Set the log level + level = logging.getLevelName(config.file_log_level) + file_log_handler.setLevel(level) + + # Note where logs are being written + print("Logging to {}".format(config.log_file)) + else: + if file_log_handler is not None: + logger.removeHandler(file_log_handler) + file_log_handler = None + +# +# PID file handling +# +def write_pid_file(pid_file): + with open(pid_file, 'w') as f: + f.write("{}".format(os.getpid())) + +def read_pid_file(pid_file): + with open(pid_file, 'r') as f: + return int(f.read().strip()) + +def remove_pid_file(pid_file): + os.unlink(pid_file) + +# +# Global flags for signal handler # running = True +reload = False # # Signal handler @@ -38,10 +120,14 @@ def signal_handler(sig, frame): signal.signal(signal.SIGINT, signal.default_int_handler) # Signal everything to shut down - if sig == signal.SIGINT: - print("Shutting down ...") + if sig in [ signal.SIGINT, signal.SIGTERM, signal.SIGQUIT ]: + logger.info("Shutting down ...") global running running = False + elif sig == signal.SIGHUP: + logger.info("Reloading config ...") + global reload + reload = True # # Classes @@ -53,13 +139,19 @@ class Config: """ def __init__(self, config_file, read_metrics = True): # Set defaults - self.ipc_socket = '/tmp/pgmon.sock' # IPC socket + self.pid_file = '/tmp/pgmon.pid' # PID file + + self.ipc_socket = '/tmp/pgmon.sock' # IPC socket self.ipc_timeout = 10 # IPC communication timeout (s) self.request_timeout = 10 # Request processing timeout (s) self.request_queue_size = 100 # Max size of the request queue before it blocks self.request_queue_timeout = 2 # Max time to wait when queueing a request (s) self.worker_count = 4 # Number of worker threads + self.stderr_log_level = 'INFO' # Log level for stderr logging (or 'off') + self.file_log_level = 'INFO' # Log level for file logging (od 'off') + self.log_file = 'pgmon.log' # Log file + self.host = 'localhost' # DB host self.port = 5432 # DB port self.user = 'postgres' # DB user @@ -90,9 +182,11 @@ class Config: if key == 'include': self.read_file(value, read_metric) - elif key == 'socket': + elif key == 'pid_file': + self.pid_file = value + elif key == 'ipc_socket': self.ipc_socket = value - elif key == 'socket_timeout': + elif key == 'ipc_timeout': self.ipc_timeout = float(value) elif key == 'request_timeout': self.request_timeout = float(value) @@ -102,6 +196,12 @@ class Config: self.request_queue_timeout = float(value) elif key == 'worker_count': self.worker_count = int(value) + elif key == 'stderr_log_level': + self.stderr_log_level = value.upper() + elif key == 'file_log_level': + self.file_log_level = value.upper() + elif key == 'log_file': + self.log_file = value elif key == 'host': self.host = value elif key == 'port': @@ -115,6 +215,8 @@ class Config: elif key == 'metric': if read_metrics: self.add_metric(value) + else: + raise InvalidConfigError("WARNING: Unknown config: {}".format(key)) def add_metric(self, metric_def): (name, ret_type, version, sql) = metric_def.split(':', 3) @@ -170,7 +272,7 @@ class DB: if self.conn is not None: self.conn.close() except psycopg2.Error as e: - print("Caught an error when closing a connection: {}".format(e)) + logger.warning("Caught an error when closing a connection: {}".format(e)) self.conn = None @@ -322,7 +424,9 @@ class Agent: The agent side of the connector """ @staticmethod - def run(config, key): + def run(config_file, key): + config = Config(config_file, read_metrics = False) + # Connect to the socket ipc = IPC(config, 'agent') try: @@ -346,22 +450,81 @@ class Server: """ The server side of the connector """ - @staticmethod - def run(config): + + def __init__(self, config_file): + self.config_file = config_file + + # Load config + self.config = Config(config_file) + + # Write pid file + # Note: we record the PID file here so it can't be changed with reload + self.pid_file = self.config.pid_file + write_pid_file(self.pid_file) + + # Initialize logging + init_logging(self.config) + # Set up the signal handler signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGHUP, signal_handler) # Create reqest queue - req_queue = queue.Queue(config.request_queue_size) + self.req_queue = queue.Queue(self.config.request_queue_size) # Spawn worker threads + self.workers = self.spawn_workers(self.config) + + def spawn_workers(self, config): + logger.info("Spawning {} workers".format(config.worker_count)) + workers = [None] * config.worker_count for i in range(config.worker_count): - workers[i] = Worker(config, req_queue) + workers[i] = Worker(config, self.req_queue) workers[i].start() + return workers + + def retire_workers(self, workers): + logger.info("Retiring {} workers".format(len(workers))) + + for worker in workers: + worker.active = False + + for worker in workers: + worker.join() + + def reload_config(self): + # Clear the reload flag + global reload + reload = False + + # Read new config + new_config = Config(self.config_file) + + # Re-init logging in case settings changed + init_logging(new_config) + + # Spawn new workers + new_workers = self.spawn_workers(new_config) + + # Replace workers + old_workers = self.workers + self.workers = new_workers + + # Retire old workers + self.retire_workers(old_workers) + + # Adjust other settings + # TODO + + self.config = new_config + + def run(self): + logger.info("Server starting") + # Listen on ipc socket - ipc = IPC(config, 'server') + ipc = IPC(self.config, 'server') while True: # Wait for a request connection @@ -374,6 +537,13 @@ class Server: if not running: break + # Check if we're supposed to reload the config + if reload: + try: + self.reload_config() + except Exceptioin as e: + logger.ERROR("Reload failed: {}".format(e)) + # If we just timed out waiting for a request, go back to waiting if conn is None: continue @@ -393,11 +563,11 @@ class Server: (k, v) = arg.split('=', 1) args_dict[k] = v except socket.timeout: - print("IPC communication timeout receiving request") + logger.warning("IPC communication timeout receiving request") conn.close() continue except Exception: - print("Received invalid request: '{}'".format(key)) + logger.warning("Received invalid request: '{}'".format(key)) ipc.send(conn, "ERROR: Invalid key") conn.close() continue @@ -407,21 +577,23 @@ class Server: # Queue the request try: - req_queue.put(req, timeout=config.request_queue_timeout) + self.req_queue.put(req, timeout=self.config.request_queue_timeout) except: - print("Failed to queue request") + logger.warning("Failed to queue request") req.set_result("ERROR: Queue timeout") continue # Spawn a thread to wait for the result - r = Responder(config, ipc, conn, req) + r = Responder(self.config, ipc, conn, req) r.start() # Join worker threads - for worker in workers: - worker.join() + self.retire_workers(self.workers) - print("Good bye") + # Clean up the PID file + remove_pid_file(self.pid_file) + + logger.info("Good bye") class Worker(threading.Thread): @@ -434,6 +606,8 @@ class Worker(threading.Thread): self.config = config self.queue = queue + self.active = True + def run(self): while True: # Wait for a request @@ -443,7 +617,8 @@ class Worker(threading.Thread): req = None # Check if we're supposed to exit - if not running: + if not self.active: + logger.info("Worker exiting: tid={}".format(self.native_id)) break # If we got here because we waited too long for a request, go back to waiting @@ -462,7 +637,7 @@ class Worker(threading.Thread): pg_version = self.config.get_pg_version() except Exception as e: req.set_result("Failed to retrieve database version") - print("Error: {}".format(e)) + logger.error("Failed to get Postgresql version: {}".format(e)) continue # Get the query to use @@ -489,7 +664,7 @@ class Worker(threading.Thread): except: pass req.set_result("Failed to query database") - print("Error: {}".format(e)) + logger.error("Database error: {}".format(e)) continue # Set the result on the request @@ -507,8 +682,7 @@ class Worker(threading.Thread): try: db.close() except Exception as e: - print("Failed to close database connection") - print("Error: {}".format(e)) + logger.debug("Failed to close database connection: {}".format(e)) class Responder(threading.Thread): """ @@ -534,6 +708,7 @@ def main(): # Operational mode parser.add_argument('-s', '--server', action='store_true') + parser.add_argument('-r', '--reload', action='store_true') # Agent options parser.add_argument('key', nargs='?') @@ -541,11 +716,14 @@ def main(): args = parser.parse_args() if args.server: - config = Config(args.config) - Server.run(config) - else: + server = Server(args.config) + server.run() + elif args.reload: config = Config(args.config, read_metrics = False) - Agent.run(config, args.key) + pid = read_pid_file(config.pid_file) + os.kill(pid, signal.SIGHUP) + else: + Agent.run(args.config, args.key) if __name__ == '__main__':