From 65f01ea826c6fcd0a3ed43129cf7a7a768d0231f Mon Sep 17 00:00:00 2001 From: James Campbell Date: Wed, 4 Feb 2026 00:36:54 -0500 Subject: [PATCH] Implement FUSE interface using pyfuse --- requirements.yml | 3 +- sample-config/pgmon.yml | 28 ++++++-- src/pgmon.py | 154 +++++++++++++++++++++------------------- 3 files changed, 108 insertions(+), 77 deletions(-) diff --git a/requirements.yml b/requirements.yml index b2cce68..cd220f4 100644 --- a/requirements.yml +++ b/requirements.yml @@ -1,2 +1,3 @@ -psycopg +psycopg2 pyyaml +fusepy diff --git a/sample-config/pgmon.yml b/sample-config/pgmon.yml index bbe49dc..5d09b5c 100644 --- a/sample-config/pgmon.yml +++ b/sample-config/pgmon.yml @@ -1,8 +1,22 @@ -# The address the agent binds to -#address: 127.0.0.1 +# # The name for this agent instance (controles the FUSE directory name) +#name: pgmon -# The port the agent listens on for requests -#port: 5400 +# Mode to set on FUSE files (in octal) +#fuse_mode: 0o400 + +# Owner to set on FUSE files +#fuse_owner: zabbix +fuse_owner: james + +# Group to set on FUSE files +#fuse_group: zabbix +fuse_group: james + +# User the agent runs as +agent_user: james + +# Groupthe agent runs as +agent_group: james # Min PostgreSQL connection pool size (per database) #min_pool_size: 0 @@ -16,12 +30,17 @@ # Log level for stderr logging (see https://docs.python.org/3/library/logging.html#logging-levels) # Possible values are: debug, info, warning, error, critical #log_level: 'error' +log_level: 'info' + +# Base directory for FUSE file system +fuse_base: /tmp/pgmon # Database user to connect as #dbuser: 'postgres' # Database host #dbhost: '/var/run/postgresql' +dbhost: 127.0.0.1 # Database port #dbport: 5432 @@ -31,6 +50,7 @@ # SSL connection mode #ssl_mode: require +ssl_mode: allow # Timeout for getting a connection slot from a pool #pool_slot_timeout: 5 diff --git a/src/pgmon.py b/src/pgmon.py index e13f435..a6bf0dc 100755 --- a/src/pgmon.py +++ b/src/pgmon.py @@ -14,6 +14,7 @@ import signal import argparse import logging import re +import stat from decimal import Decimal @@ -23,9 +24,6 @@ from contextlib import contextmanager from datetime import datetime, timedelta -from http.server import BaseHTTPRequestHandler -from http.server import ThreadingHTTPServer - from threading import Lock import yaml @@ -34,7 +32,10 @@ import psycopg2 from psycopg2.extras import RealDictCursor from psycopg2.pool import ThreadedConnectionPool -import requests +from pwd import getpwnam +from grp import getgrnam + +from fuse import FUSE, Operations, LoggingMixIn, fuse_exit VERSION = "1.1.0-rc1" @@ -71,9 +72,6 @@ class Context: # Running state (used to gracefully shut down) running = True - # The http server object - httpd = None - # Where the config file lives config_file = None @@ -142,10 +140,20 @@ class InvalidDataError(Exception): # Default config settings DEFAULT_CONFIG = { - # The address the agent binds to - "address": "127.0.0.1", - # The port the agent listens on for requests - "port": 5400, + # The name for this agent instance (controles the FUSE directory name) + "name": 'pgmon', + # Base directory for FUSE file system + "fuse_base": "/run/pgmon", + # Mode to set on FUSE files (in octal) + "fuse_mode": 0o400, + # Owner to set on FUSE files + "fuse_owner": "zabbix", + # Group to set on FUSE files + "fuse_group": "zabbix", + # The user the agent runs as + "agent_user": "pgmon", + # The group the agent runs as + "agent_group": "pgmon", # Min PostgreSQL connection pool size (per database) "min_pool_size": 0, # Max PostgreSQL connection pool size (per database) @@ -349,11 +357,9 @@ def signal_handler(sig, frame): # pylint: disable=unused-argument if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]: Context.log.info("Shutting down ...") Context.running = False - if Context.httpd is not None: - Context.httpd.socket.close() # Signal a reload - if sig == signal.SIGHUP: + if sig in [signal.SIGHUP, signal.SIGUSR1]: Context.log.warning("Received config reload signal") read_config(Context.config_file) @@ -792,89 +798,91 @@ def test_queries(): return 0 -class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): +class PGMonFuse(LoggingMixIn, Operations): """ - This is our request handling server. It is responsible for listening for + This is our FUSE filesystem for requests from Zabbix. It is responsible for listening for requests, processing them, and responding. """ - def log_request(self, code="-", size="-"): - """ - Override to suppress standard request logging - """ + def __init__(self): + self.builtin = ['agent_version', 'latest_version_info'] - def do_GET(self): # pylint: disable=invalid-name - """ - Handle a request. This is just a wrapper around the actual handler - code to keep things more readable. - """ - try: - self._handle_request() - except BrokenPipeError: - Context.log.error("Client disconnected, exiting handler") - def _handle_request(self): - """ - Request handler - """ - # Parse the URL - parsed_path = urlparse(self.path) - metric_name = parsed_path.path.strip("/") - parsed_query = parse_qs(parsed_path.query) + def getattr(self, path, fh=None): + attrs = { + 'st_mode': (Context.config['fuse_mode'] | stat.S_IFREG), + 'st_nlink': 1, + 'st_size': 1024, + 'st_uid': getpwnam(Context.config['fuse_owner']).pw_uid, + 'st_gid': getgrnam(Context.config['fuse_group']).gr_gid, + } + if path == "/": + # If we're looking at the directory itself, add execute permissions for the owner and identify it as a directory + attrs['st_mode'] = Context.config['fuse_mode'] | stat.S_IXUSR | stat.S_IFDIR + return attrs + + def readdir(self, path, fh): + return ['.', '..'] + self.builtin + list(Context.config["metrics"].keys()) + +# def open(self, path, fh): +# fh.direct_io = 1 +# pass + + def read(self, path, size, offset, fh): + # Split the path into the expected components: ////... + parts = path.split('/') + + # Note: the first element will be empty. If we got less than two, we + # don't have a valid request. + if len(parts) < 2: + return ''.encode('utf-8') + + res = None + metric_name = parts[1] + metric_args = {} + + for arg in parts[2:]: + key, value = arg.split('=') + metric_args[key] = value if metric_name == "agent_version": - self._reply(200, VERSION) + res = VERSION elif metric_name == "latest_version_info": try: get_latest_version() - self._reply( - 200, - json.dumps( + res = json.dumps( { "latest": Context.latest_version, "supported": 1 if Context.release_supported else 0, } - ), - ) + ) except LatestVersionCheckError as e: Context.log.error( "Failed to retrieve latest version information: %s", e ) - self._reply(503, "Failed to retrieve latest version info") + res = "ERROR: Failed to retrieve latest version info" else: - # Note: parse_qs returns the values as a list. Since we always expect - # single values, just grab the first from each. - args = {key: values[0] for key, values in parsed_query.items()} - # Get the dbname. If none was provided, use the default from the # config. - dbname = args.get("dbname", Context.config["dbname"]) + dbname = metric_args.get("dbname", Context.config["dbname"]) # Sample the metric try: - self._reply(200, sample_metric(dbname, metric_name, args)) + res = sample_metric(dbname, metric_name, metric_args) except UnknownMetricError: Context.log.error("Unknown metric: %s", metric_name) - self._reply(404, "Unknown metric") + res = "ERROR: Unknown metric" except MetricVersionError: Context.log.error("Failed to find an query version for %s", metric_name) - self._reply(404, "Unsupported version") + res = "ERROR: Unsupported version" except UnhappyDBError: Context.log.info("Database %s is unhappy, please be patient", dbname) - self._reply(503, "Database unavailable") + res = "ERROR: Database unavailable" except Exception as e: # pylint: disable=broad-exception-caught Context.log.error("Error running query: %s", e) - self._reply(500, "Unexpected error: {}".format(e)) + res = "ERROR: Unexpected error: {}".format(e) - def _reply(self, code, content): - """ - Send a reply to the client - """ - self.send_response(code) - self.send_header("Content-type", "application/json") - self.end_headers() - - self.wfile.write(bytes(content, "utf-8")) + return res.encode('utf-8') def main(): @@ -916,24 +924,26 @@ def main(): sys.exit(1) sys.exit(0) - # Set up the http server to receive requests - server_address = (Context.config["address"], Context.config["port"]) - Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) - # Set up the signal handler signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGHUP, signal_handler) + signal.signal(signal.SIGUSR1, signal_handler) - # Handle requests. - Context.log.info("Listening on port %s...", Context.config["port"]) - while Context.running: - Context.httpd.handle_request() + # Ensure the mount point exists + if not os.path.exists(Context.config["fuse_base"]): + os.makedirs(Context.config["fuse_base"]) + + # Create the FUSE filesystem + FUSE(PGMonFuse(), Context.config["fuse_base"], foreground=True, default_permissions=True) # Clean up PostgreSQL connections # TODO: Improve this ... not sure it actually closes all the connections cleanly for pool in Context.connections.values(): - pool.close() + pool.closeall() + logging.shutdown() + + print("Good bye.") if __name__ == "__main__": main()