Implement FUSE interface using pyfuse

This commit is contained in:
James Campbell 2026-02-04 00:36:54 -05:00
parent 17b55d3c46
commit 65f01ea826
Signed by: james
GPG Key ID: 2287C33A40DC906A
3 changed files with 108 additions and 77 deletions

View File

@ -1,2 +1,3 @@
psycopg psycopg2
pyyaml pyyaml
fusepy

View File

@ -1,8 +1,22 @@
# The address the agent binds to # # The name for this agent instance (controles the FUSE directory name)
#address: 127.0.0.1 #name: pgmon
# The port the agent listens on for requests # Mode to set on FUSE files (in octal)
#port: 5400 #fuse_mode: 0o400
# Owner to set on FUSE files
#fuse_owner: zabbix
fuse_owner: james
# Group to set on FUSE files
#fuse_group: zabbix
fuse_group: james
# User the agent runs as
agent_user: james
# Groupthe agent runs as
agent_group: james
# Min PostgreSQL connection pool size (per database) # Min PostgreSQL connection pool size (per database)
#min_pool_size: 0 #min_pool_size: 0
@ -16,12 +30,17 @@
# Log level for stderr logging (see https://docs.python.org/3/library/logging.html#logging-levels) # Log level for stderr logging (see https://docs.python.org/3/library/logging.html#logging-levels)
# Possible values are: debug, info, warning, error, critical # Possible values are: debug, info, warning, error, critical
#log_level: 'error' #log_level: 'error'
log_level: 'info'
# Base directory for FUSE file system
fuse_base: /tmp/pgmon
# Database user to connect as # Database user to connect as
#dbuser: 'postgres' #dbuser: 'postgres'
# Database host # Database host
#dbhost: '/var/run/postgresql' #dbhost: '/var/run/postgresql'
dbhost: 127.0.0.1
# Database port # Database port
#dbport: 5432 #dbport: 5432
@ -31,6 +50,7 @@
# SSL connection mode # SSL connection mode
#ssl_mode: require #ssl_mode: require
ssl_mode: allow
# Timeout for getting a connection slot from a pool # Timeout for getting a connection slot from a pool
#pool_slot_timeout: 5 #pool_slot_timeout: 5

View File

@ -14,6 +14,7 @@ import signal
import argparse import argparse
import logging import logging
import re import re
import stat
from decimal import Decimal from decimal import Decimal
@ -23,9 +24,6 @@ from contextlib import contextmanager
from datetime import datetime, timedelta from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler
from http.server import ThreadingHTTPServer
from threading import Lock from threading import Lock
import yaml import yaml
@ -34,7 +32,10 @@ import psycopg2
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool from psycopg2.pool import ThreadedConnectionPool
import requests from pwd import getpwnam
from grp import getgrnam
from fuse import FUSE, Operations, LoggingMixIn, fuse_exit
VERSION = "1.1.0-rc1" VERSION = "1.1.0-rc1"
@ -71,9 +72,6 @@ class Context:
# Running state (used to gracefully shut down) # Running state (used to gracefully shut down)
running = True running = True
# The http server object
httpd = None
# Where the config file lives # Where the config file lives
config_file = None config_file = None
@ -142,10 +140,20 @@ class InvalidDataError(Exception):
# Default config settings # Default config settings
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
# The address the agent binds to # The name for this agent instance (controles the FUSE directory name)
"address": "127.0.0.1", "name": 'pgmon',
# The port the agent listens on for requests # Base directory for FUSE file system
"port": 5400, "fuse_base": "/run/pgmon",
# Mode to set on FUSE files (in octal)
"fuse_mode": 0o400,
# Owner to set on FUSE files
"fuse_owner": "zabbix",
# Group to set on FUSE files
"fuse_group": "zabbix",
# The user the agent runs as
"agent_user": "pgmon",
# The group the agent runs as
"agent_group": "pgmon",
# Min PostgreSQL connection pool size (per database) # Min PostgreSQL connection pool size (per database)
"min_pool_size": 0, "min_pool_size": 0,
# Max PostgreSQL connection pool size (per database) # Max PostgreSQL connection pool size (per database)
@ -349,11 +357,9 @@ def signal_handler(sig, frame): # pylint: disable=unused-argument
if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]: if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
Context.log.info("Shutting down ...") Context.log.info("Shutting down ...")
Context.running = False Context.running = False
if Context.httpd is not None:
Context.httpd.socket.close()
# Signal a reload # Signal a reload
if sig == signal.SIGHUP: if sig in [signal.SIGHUP, signal.SIGUSR1]:
Context.log.warning("Received config reload signal") Context.log.warning("Received config reload signal")
read_config(Context.config_file) read_config(Context.config_file)
@ -792,89 +798,91 @@ def test_queries():
return 0 return 0
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): class PGMonFuse(LoggingMixIn, Operations):
""" """
This is our request handling server. It is responsible for listening for This is our FUSE filesystem for requests from Zabbix. It is responsible for listening for
requests, processing them, and responding. requests, processing them, and responding.
""" """
def log_request(self, code="-", size="-"): def __init__(self):
""" self.builtin = ['agent_version', 'latest_version_info']
Override to suppress standard request logging
"""
def do_GET(self): # pylint: disable=invalid-name
"""
Handle a request. This is just a wrapper around the actual handler
code to keep things more readable.
"""
try:
self._handle_request()
except BrokenPipeError:
Context.log.error("Client disconnected, exiting handler")
def _handle_request(self): def getattr(self, path, fh=None):
""" attrs = {
Request handler 'st_mode': (Context.config['fuse_mode'] | stat.S_IFREG),
""" 'st_nlink': 1,
# Parse the URL 'st_size': 1024,
parsed_path = urlparse(self.path) 'st_uid': getpwnam(Context.config['fuse_owner']).pw_uid,
metric_name = parsed_path.path.strip("/") 'st_gid': getgrnam(Context.config['fuse_group']).gr_gid,
parsed_query = parse_qs(parsed_path.query) }
if path == "/":
# If we're looking at the directory itself, add execute permissions for the owner and identify it as a directory
attrs['st_mode'] = Context.config['fuse_mode'] | stat.S_IXUSR | stat.S_IFDIR
return attrs
def readdir(self, path, fh):
return ['.', '..'] + self.builtin + list(Context.config["metrics"].keys())
# def open(self, path, fh):
# fh.direct_io = 1
# pass
def read(self, path, size, offset, fh):
# Split the path into the expected components: /<metric>/<arg1>/<arg2>/...
parts = path.split('/')
# Note: the first element will be empty. If we got less than two, we
# don't have a valid request.
if len(parts) < 2:
return ''.encode('utf-8')
res = None
metric_name = parts[1]
metric_args = {}
for arg in parts[2:]:
key, value = arg.split('=')
metric_args[key] = value
if metric_name == "agent_version": if metric_name == "agent_version":
self._reply(200, VERSION) res = VERSION
elif metric_name == "latest_version_info": elif metric_name == "latest_version_info":
try: try:
get_latest_version() get_latest_version()
self._reply( res = json.dumps(
200,
json.dumps(
{ {
"latest": Context.latest_version, "latest": Context.latest_version,
"supported": 1 if Context.release_supported else 0, "supported": 1 if Context.release_supported else 0,
} }
), )
)
except LatestVersionCheckError as e: except LatestVersionCheckError as e:
Context.log.error( Context.log.error(
"Failed to retrieve latest version information: %s", e "Failed to retrieve latest version information: %s", e
) )
self._reply(503, "Failed to retrieve latest version info") res = "ERROR: Failed to retrieve latest version info"
else: else:
# Note: parse_qs returns the values as a list. Since we always expect
# single values, just grab the first from each.
args = {key: values[0] for key, values in parsed_query.items()}
# Get the dbname. If none was provided, use the default from the # Get the dbname. If none was provided, use the default from the
# config. # config.
dbname = args.get("dbname", Context.config["dbname"]) dbname = metric_args.get("dbname", Context.config["dbname"])
# Sample the metric # Sample the metric
try: try:
self._reply(200, sample_metric(dbname, metric_name, args)) res = sample_metric(dbname, metric_name, metric_args)
except UnknownMetricError: except UnknownMetricError:
Context.log.error("Unknown metric: %s", metric_name) Context.log.error("Unknown metric: %s", metric_name)
self._reply(404, "Unknown metric") res = "ERROR: Unknown metric"
except MetricVersionError: except MetricVersionError:
Context.log.error("Failed to find an query version for %s", metric_name) Context.log.error("Failed to find an query version for %s", metric_name)
self._reply(404, "Unsupported version") res = "ERROR: Unsupported version"
except UnhappyDBError: except UnhappyDBError:
Context.log.info("Database %s is unhappy, please be patient", dbname) Context.log.info("Database %s is unhappy, please be patient", dbname)
self._reply(503, "Database unavailable") res = "ERROR: Database unavailable"
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
Context.log.error("Error running query: %s", e) Context.log.error("Error running query: %s", e)
self._reply(500, "Unexpected error: {}".format(e)) res = "ERROR: Unexpected error: {}".format(e)
def _reply(self, code, content): return res.encode('utf-8')
"""
Send a reply to the client
"""
self.send_response(code)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(content, "utf-8"))
def main(): def main():
@ -916,24 +924,26 @@ def main():
sys.exit(1) sys.exit(1)
sys.exit(0) sys.exit(0)
# Set up the http server to receive requests
server_address = (Context.config["address"], Context.config["port"])
Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
# Set up the signal handler # Set up the signal handler
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler) signal.signal(signal.SIGHUP, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler)
# Handle requests. # Ensure the mount point exists
Context.log.info("Listening on port %s...", Context.config["port"]) if not os.path.exists(Context.config["fuse_base"]):
while Context.running: os.makedirs(Context.config["fuse_base"])
Context.httpd.handle_request()
# Create the FUSE filesystem
FUSE(PGMonFuse(), Context.config["fuse_base"], foreground=True, default_permissions=True)
# Clean up PostgreSQL connections # Clean up PostgreSQL connections
# TODO: Improve this ... not sure it actually closes all the connections cleanly # TODO: Improve this ... not sure it actually closes all the connections cleanly
for pool in Context.connections.values(): for pool in Context.connections.values():
pool.close() pool.closeall()
logging.shutdown()
print("Good bye.")
if __name__ == "__main__": if __name__ == "__main__":
main() main()