Compare commits

...

2 Commits

Author SHA1 Message Date
39c34ee31f
wip 2026-03-15 23:54:27 -04:00
65f01ea826
Implement FUSE interface using pyfuse 2026-02-04 00:36:54 -05:00
3 changed files with 293 additions and 77 deletions

View File

@ -1,2 +1,4 @@
psycopg
psycopg2
pyyaml
llfuse
requests

View File

@ -1,8 +1,22 @@
# The address the agent binds to
#address: 127.0.0.1
# # The name for this agent instance (controles the FUSE directory name)
#name: pgmon
# The port the agent listens on for requests
#port: 5400
# Mode to set on FUSE files (in octal)
#fuse_mode: 0o400
# Owner to set on FUSE files
#fuse_owner: zabbix
fuse_owner: james
# Group to set on FUSE files
#fuse_group: zabbix
fuse_group: james
# User the agent runs as
agent_user: james
# Groupthe agent runs as
agent_group: james
# Min PostgreSQL connection pool size (per database)
#min_pool_size: 0
@ -16,21 +30,28 @@
# Log level for stderr logging (see https://docs.python.org/3/library/logging.html#logging-levels)
# Possible values are: debug, info, warning, error, critical
#log_level: 'error'
log_level: 'info'
# Base directory for FUSE file system
fuse_base: /tmp/pgmon
# Database user to connect as
#dbuser: 'postgres'
# Database host
#dbhost: '/var/run/postgresql'
dbhost: 127.0.0.1
# Database port
#dbport: 5432
dbport: 54315
# Default database to connect to when none is specified for a metric
#dbname: 'postgres'
# SSL connection mode
#ssl_mode: require
ssl_mode: allow
# Timeout for getting a connection slot from a pool
#pool_slot_timeout: 5

View File

@ -14,6 +14,9 @@ import signal
import argparse
import logging
import re
import stat
import errno
import requests
from decimal import Decimal
@ -23,9 +26,6 @@ from contextlib import contextmanager
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler
from http.server import ThreadingHTTPServer
from threading import Lock
import yaml
@ -34,7 +34,10 @@ import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
import requests
from pwd import getpwnam
from grp import getgrnam
import llfuse
VERSION = "1.1.0-rc1"
@ -71,9 +74,6 @@ class Context:
# Running state (used to gracefully shut down)
running = True
# The http server object
httpd = None
# Where the config file lives
config_file = None
@ -142,10 +142,20 @@ class InvalidDataError(Exception):
# Default config settings
DEFAULT_CONFIG = {
# The address the agent binds to
"address": "127.0.0.1",
# The port the agent listens on for requests
"port": 5400,
# The name for this agent instance (controles the FUSE directory name)
"name": "pgmon",
# Base directory for FUSE file system
"fuse_base": "/run/pgmon",
# Mode to set on FUSE files (in octal)
"fuse_mode": 0o400,
# Owner to set on FUSE files
"fuse_owner": "zabbix",
# Group to set on FUSE files
"fuse_group": "zabbix",
# The user the agent runs as
"agent_user": "pgmon",
# The group the agent runs as
"agent_group": "pgmon",
# Min PostgreSQL connection pool size (per database)
"min_pool_size": 0,
# Max PostgreSQL connection pool size (per database)
@ -349,11 +359,9 @@ def signal_handler(sig, frame): # pylint: disable=unused-argument
if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
Context.log.info("Shutting down ...")
Context.running = False
if Context.httpd is not None:
Context.httpd.socket.close()
# Signal a reload
if sig == signal.SIGHUP:
if sig in [signal.SIGHUP, signal.SIGUSR1]:
Context.log.warning("Received config reload signal")
read_config(Context.config_file)
@ -792,89 +800,253 @@ def test_queries():
return 0
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
class PGMonFuse(llfuse.Operations):
"""
This is our request handling server. It is responsible for listening for
This is our FUSE filesystem for requests from Zabbix. It is responsible for listening for
requests, processing them, and responding.
"""
def log_request(self, code="-", size="-"):
"""
Override to suppress standard request logging
"""
def __init__(self):
super().__init__()
self.builtin = ["agent_version", "latest_version_info", "sleep"]
self.update_files()
def do_GET(self): # pylint: disable=invalid-name
# Dictionary holdig context information for open file handles
self.inodes = dict()
self.file_handles = dict()
@staticmethod
def gen_attrs(inode, is_dir):
attrs = llfuse.EntryAttributes()
if is_dir:
attrs.st_mode = Context.config["fuse_mode"] | stat.S_IXUSR | stat.S_IFDIR
else:
attrs.st_mode = Context.config["fuse_mode"] | stat.S_IFREG
attrs.st_nlink = 1
attrs.st_size = 1024
attrs.st_uid = getpwnam(Context.config["fuse_owner"]).pw_uid
attrs.st_gid = getgrnam(Context.config["fuse_group"]).gr_gid
attrs.st_ino = inode
return attrs
def update_files(self):
self.files = self.builtin + sorted(list(Context.config["metrics"].keys()))
@staticmethod
def path_to_inode(path):
"""
Handle a request. This is just a wrapper around the actual handler
code to keep things more readable.
Provide a consistent inode for a given file
"""
return hash(path) & 0xFFFFFFFFFFFFFFFF
def lookup(self, inode_p, name, ctx=None):
""" """
print(f"Lookup called with: inode_p={inode_p} name={name}")
# We only have a single level, so if the parent inode is anything else
# then it doesn't exist
if inode_p != llfuse.ROOT_INODE:
raise llfuse.FUSEError(errno.ENOENT)
# Generate the inode for this file (ie: query + args)
inode = self.path_to_inode(name)
# Get the file info if we already have it, create it otherwise
try:
self._handle_request()
except BrokenPipeError:
Context.log.error("Client disconnected, exiting handler")
file = self.inodes[inode]
except KeyError:
# Separate out the name components
parts = name.decode("utf-8").split(":", 1)
base_name = parts[0]
try:
arg_str = parts[1]
except IndexError:
arg_str = ""
def _handle_request(self):
"""
Request handler
"""
# Parse the URL
parsed_path = urlparse(self.path)
metric_name = parsed_path.path.strip("/")
parsed_query = parse_qs(parsed_path.query)
# Make sure it's a valid request
if base_name not in self.files:
raise llfuse.FUSEError(errno.ENOENT)
# Split any key/value args
args = {}
for arg in arg_str.split(","):
if arg == "":
continue
print(f"Splitting: {arg}")
key, value = arg.split("=", 1)
args[key] = value
file = {
"name": base_name,
"args": args,
"error": None,
"attrs": None,
"count": 0,
}
self.inodes[inode] = file
if not file["attrs"]:
# Fuse shouldn't be calling this for the base directory, so it
# should be safe to assume we're dealing with a file.
file["attrs"] = self.gen_attrs(inode, False)
# Increase the lookup count
file["count"] += 1
return file["attrs"]
def getattr(self, inode, ctx=None):
print(f"getattr called with: inode={inode}")
if inode == llfuse.ROOT_INODE:
# If we're looking at the directory itself
return self.gen_attrs(inode, True)
try:
return self.inodes[inode]["attrs"]
except KeyError:
print(f"Getattr called without a lookup first for: inode={inode}")
raise llfuse.FUSEError(errno.ENOENT)
def opendir(self, inode, ctx):
"""
Note: We only allow one directory level
"""
print(f"opendir called with: inode={inode}")
if inode != llfuse.ROOT_INODE:
raise llfuse.FUSEError(errno.ENOENT)
return inode
def readdir(self, fh, off):
print(f"readdir called with: fh={fh} off={off}")
# Something has gone wrong if this was called for anything other than the top level directory
assert fh == llfuse.ROOT_INODE
for i, entry in enumerate(self.files[off:]):
print(f" yielding: {entry.encode('utf8')}")
yield (
entry.encode("utf-8"),
self.lookup(llfuse.ROOT_INODE, entry.encode("utf-8")),
i + off + 1,
)
def new_file_handle(self, inode=None):
"""
Pick an unused file handle (int) and create an entry in file_handles for it
returns: the file handle number
"""
# Find the first unused number
new_fh = 0
for fh in sorted(self.file_handles.keys()):
if new_fh == fh:
new_fh += 1
else:
break
# Create the empty entry in the
self.file_handles[new_fh] = {
"inode": inode,
"data": None,
}
return new_fh
def open(self, inode, flags, ctx):
print(f"open called with: inode={inode} flags={flags}")
fh = self.new_file_handle(inode)
# Get the data associated with the file handle
file = self.file_handles[fh]
# Get the data associated with the file name and parameters
data = self.inodes[inode]
# Pull out the things we need to run the query
metric_name = data["name"]
metric_args = data["args"]
# Populate the data for the file
if metric_name == "agent_version":
self._reply(200, VERSION)
file["data"] = VERSION.encode("utf-8")
elif metric_name == "sleep":
seconds = metric_args.get("seconds", "10")
with llfuse.lock_released:
time.sleep(int(seconds))
file["data"] = seconds.encode("utf-8")
elif metric_name == "latest_version_info":
try:
get_latest_version()
self._reply(
200,
json.dumps(
{
"latest": Context.latest_version,
"supported": 1 if Context.release_supported else 0,
}
),
)
file["data"] = json.dumps(
{
"latest": Context.latest_version,
"supported": 1 if Context.release_supported else 0,
}
).encode("utf-8")
except LatestVersionCheckError as e:
Context.log.error(
"Failed to retrieve latest version information: %s", e
)
self._reply(503, "Failed to retrieve latest version info")
file["data"] = "ERROR: Failed to retrieve latest version info".encode(
"utf-8"
)
else:
# Note: parse_qs returns the values as a list. Since we always expect
# single values, just grab the first from each.
args = {key: values[0] for key, values in parsed_query.items()}
# Get the dbname. If none was provided, use the default from the
# config.
dbname = args.get("dbname", Context.config["dbname"])
dbname = metric_args.get("dbname", Context.config["dbname"])
# Sample the metric
try:
self._reply(200, sample_metric(dbname, metric_name, args))
file["data"] = sample_metric(dbname, metric_name, metric_args).encode(
"utf-8"
)
except UnknownMetricError:
Context.log.error("Unknown metric: %s", metric_name)
self._reply(404, "Unknown metric")
file["data"] = "ERROR: Unknown metric".encode("utf-8")
except MetricVersionError:
Context.log.error("Failed to find an query version for %s", metric_name)
self._reply(404, "Unsupported version")
file["data"] = "ERROR: Unsupported version".encode("utf-8")
except UnhappyDBError:
Context.log.info("Database %s is unhappy, please be patient", dbname)
self._reply(503, "Database unavailable")
file["data"] = "ERROR: Database unavailable".encode("utf-8")
except Exception as e: # pylint: disable=broad-exception-caught
Context.log.error("Error running query: %s", e)
self._reply(500, "Unexpected error: {}".format(e))
file["data"] = "ERROR: Unexpected error: {}".format(e).encode("utf-8")
def _reply(self, code, content):
"""
Send a reply to the client
"""
self.send_response(code)
self.send_header("Content-type", "application/json")
self.end_headers()
return fh
self.wfile.write(bytes(content, "utf-8"))
def read(self, fh, off, size):
print(f"read called with: fh={fh} off={off} size={size}")
file = self.file_handles[fh]
return file["data"][off : off + size]
def forget(self, inode_list):
"""
Note: Per the dcumenation, this function must not raise an exception!
"""
print(f"forget called with: inode_list={inode_list}")
for inode, nlookup in inode_list:
try:
self.inodes[inode]["count"] -= nlookup
if self.inodes[inode]["count"] <= 0:
del self.inodes[inode]
except Exception as e:
Context.log.error("Error forgetting inode %s: %s", inode, e)
def release(self, fh):
print(f"release called with: fh={fh}")
del self.file_handles[fh]
def main():
@ -916,23 +1088,44 @@ def main():
sys.exit(1)
sys.exit(0)
# Set up the http server to receive requests
server_address = (Context.config["address"], Context.config["port"])
Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
# Set up the signal handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler)
# Handle requests.
Context.log.info("Listening on port %s...", Context.config["port"])
while Context.running:
Context.httpd.handle_request()
# Ensure the mount point exists
if not os.path.exists(Context.config["fuse_base"]):
os.makedirs(Context.config["fuse_base"])
# Create the FUSE filesystem
pgmon_fuse = PGMonFuse()
fuse_options = set(llfuse.default_options)
fuse_options.add("fsname=pgmon")
# fuse_options.add('direct_io')
# if options.debug_fuse:
# fuse_options.add('allow_others')
try:
llfuse.init(pgmon_fuse, Context.config["fuse_base"], fuse_options)
except:
llfuse.close()
return -1
try:
llfuse.main(workers=1)
except:
llfuse.close()
raise
llfuse.close()
# Clean up PostgreSQL connections
# TODO: Improve this ... not sure it actually closes all the connections cleanly
for pool in Context.connections.values():
pool.close()
pool.closeall()
logging.shutdown()
print("Good bye.")
if __name__ == "__main__":