Compare commits

...

2 Commits

Author SHA1 Message Date
39c34ee31f
wip 2026-03-15 23:54:27 -04:00
65f01ea826
Implement FUSE interface using pyfuse 2026-02-04 00:36:54 -05:00
3 changed files with 293 additions and 77 deletions

View File

@ -1,2 +1,4 @@
psycopg psycopg2
pyyaml pyyaml
llfuse
requests

View File

@ -1,8 +1,22 @@
# The address the agent binds to # # The name for this agent instance (controles the FUSE directory name)
#address: 127.0.0.1 #name: pgmon
# The port the agent listens on for requests # Mode to set on FUSE files (in octal)
#port: 5400 #fuse_mode: 0o400
# Owner to set on FUSE files
#fuse_owner: zabbix
fuse_owner: james
# Group to set on FUSE files
#fuse_group: zabbix
fuse_group: james
# User the agent runs as
agent_user: james
# Groupthe agent runs as
agent_group: james
# Min PostgreSQL connection pool size (per database) # Min PostgreSQL connection pool size (per database)
#min_pool_size: 0 #min_pool_size: 0
@ -16,21 +30,28 @@
# Log level for stderr logging (see https://docs.python.org/3/library/logging.html#logging-levels) # Log level for stderr logging (see https://docs.python.org/3/library/logging.html#logging-levels)
# Possible values are: debug, info, warning, error, critical # Possible values are: debug, info, warning, error, critical
#log_level: 'error' #log_level: 'error'
log_level: 'info'
# Base directory for FUSE file system
fuse_base: /tmp/pgmon
# Database user to connect as # Database user to connect as
#dbuser: 'postgres' #dbuser: 'postgres'
# Database host # Database host
#dbhost: '/var/run/postgresql' #dbhost: '/var/run/postgresql'
dbhost: 127.0.0.1
# Database port # Database port
#dbport: 5432 #dbport: 5432
dbport: 54315
# Default database to connect to when none is specified for a metric # Default database to connect to when none is specified for a metric
#dbname: 'postgres' #dbname: 'postgres'
# SSL connection mode # SSL connection mode
#ssl_mode: require #ssl_mode: require
ssl_mode: allow
# Timeout for getting a connection slot from a pool # Timeout for getting a connection slot from a pool
#pool_slot_timeout: 5 #pool_slot_timeout: 5

View File

@ -14,6 +14,9 @@ import signal
import argparse import argparse
import logging import logging
import re import re
import stat
import errno
import requests
from decimal import Decimal from decimal import Decimal
@ -23,9 +26,6 @@ from contextlib import contextmanager
from datetime import datetime, timedelta from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler
from http.server import ThreadingHTTPServer
from threading import Lock from threading import Lock
import yaml import yaml
@ -34,7 +34,10 @@ import psycopg2
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool from psycopg2.pool import ThreadedConnectionPool
import requests from pwd import getpwnam
from grp import getgrnam
import llfuse
VERSION = "1.1.0-rc1" VERSION = "1.1.0-rc1"
@ -71,9 +74,6 @@ class Context:
# Running state (used to gracefully shut down) # Running state (used to gracefully shut down)
running = True running = True
# The http server object
httpd = None
# Where the config file lives # Where the config file lives
config_file = None config_file = None
@ -142,10 +142,20 @@ class InvalidDataError(Exception):
# Default config settings # Default config settings
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
# The address the agent binds to # The name for this agent instance (controles the FUSE directory name)
"address": "127.0.0.1", "name": "pgmon",
# The port the agent listens on for requests # Base directory for FUSE file system
"port": 5400, "fuse_base": "/run/pgmon",
# Mode to set on FUSE files (in octal)
"fuse_mode": 0o400,
# Owner to set on FUSE files
"fuse_owner": "zabbix",
# Group to set on FUSE files
"fuse_group": "zabbix",
# The user the agent runs as
"agent_user": "pgmon",
# The group the agent runs as
"agent_group": "pgmon",
# Min PostgreSQL connection pool size (per database) # Min PostgreSQL connection pool size (per database)
"min_pool_size": 0, "min_pool_size": 0,
# Max PostgreSQL connection pool size (per database) # Max PostgreSQL connection pool size (per database)
@ -349,11 +359,9 @@ def signal_handler(sig, frame): # pylint: disable=unused-argument
if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]: if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
Context.log.info("Shutting down ...") Context.log.info("Shutting down ...")
Context.running = False Context.running = False
if Context.httpd is not None:
Context.httpd.socket.close()
# Signal a reload # Signal a reload
if sig == signal.SIGHUP: if sig in [signal.SIGHUP, signal.SIGUSR1]:
Context.log.warning("Received config reload signal") Context.log.warning("Received config reload signal")
read_config(Context.config_file) read_config(Context.config_file)
@ -792,89 +800,253 @@ def test_queries():
return 0 return 0
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): class PGMonFuse(llfuse.Operations):
""" """
This is our request handling server. It is responsible for listening for This is our FUSE filesystem for requests from Zabbix. It is responsible for listening for
requests, processing them, and responding. requests, processing them, and responding.
""" """
def log_request(self, code="-", size="-"): def __init__(self):
""" super().__init__()
Override to suppress standard request logging self.builtin = ["agent_version", "latest_version_info", "sleep"]
""" self.update_files()
def do_GET(self): # pylint: disable=invalid-name # Dictionary holdig context information for open file handles
self.inodes = dict()
self.file_handles = dict()
@staticmethod
def gen_attrs(inode, is_dir):
attrs = llfuse.EntryAttributes()
if is_dir:
attrs.st_mode = Context.config["fuse_mode"] | stat.S_IXUSR | stat.S_IFDIR
else:
attrs.st_mode = Context.config["fuse_mode"] | stat.S_IFREG
attrs.st_nlink = 1
attrs.st_size = 1024
attrs.st_uid = getpwnam(Context.config["fuse_owner"]).pw_uid
attrs.st_gid = getgrnam(Context.config["fuse_group"]).gr_gid
attrs.st_ino = inode
return attrs
def update_files(self):
self.files = self.builtin + sorted(list(Context.config["metrics"].keys()))
@staticmethod
def path_to_inode(path):
""" """
Handle a request. This is just a wrapper around the actual handler Provide a consistent inode for a given file
code to keep things more readable.
""" """
return hash(path) & 0xFFFFFFFFFFFFFFFF
def lookup(self, inode_p, name, ctx=None):
""" """
print(f"Lookup called with: inode_p={inode_p} name={name}")
# We only have a single level, so if the parent inode is anything else
# then it doesn't exist
if inode_p != llfuse.ROOT_INODE:
raise llfuse.FUSEError(errno.ENOENT)
# Generate the inode for this file (ie: query + args)
inode = self.path_to_inode(name)
# Get the file info if we already have it, create it otherwise
try: try:
self._handle_request() file = self.inodes[inode]
except BrokenPipeError: except KeyError:
Context.log.error("Client disconnected, exiting handler") # Separate out the name components
parts = name.decode("utf-8").split(":", 1)
base_name = parts[0]
try:
arg_str = parts[1]
except IndexError:
arg_str = ""
def _handle_request(self): # Make sure it's a valid request
""" if base_name not in self.files:
Request handler raise llfuse.FUSEError(errno.ENOENT)
"""
# Parse the URL
parsed_path = urlparse(self.path)
metric_name = parsed_path.path.strip("/")
parsed_query = parse_qs(parsed_path.query)
# Split any key/value args
args = {}
for arg in arg_str.split(","):
if arg == "":
continue
print(f"Splitting: {arg}")
key, value = arg.split("=", 1)
args[key] = value
file = {
"name": base_name,
"args": args,
"error": None,
"attrs": None,
"count": 0,
}
self.inodes[inode] = file
if not file["attrs"]:
# Fuse shouldn't be calling this for the base directory, so it
# should be safe to assume we're dealing with a file.
file["attrs"] = self.gen_attrs(inode, False)
# Increase the lookup count
file["count"] += 1
return file["attrs"]
def getattr(self, inode, ctx=None):
print(f"getattr called with: inode={inode}")
if inode == llfuse.ROOT_INODE:
# If we're looking at the directory itself
return self.gen_attrs(inode, True)
try:
return self.inodes[inode]["attrs"]
except KeyError:
print(f"Getattr called without a lookup first for: inode={inode}")
raise llfuse.FUSEError(errno.ENOENT)
def opendir(self, inode, ctx):
"""
Note: We only allow one directory level
"""
print(f"opendir called with: inode={inode}")
if inode != llfuse.ROOT_INODE:
raise llfuse.FUSEError(errno.ENOENT)
return inode
def readdir(self, fh, off):
print(f"readdir called with: fh={fh} off={off}")
# Something has gone wrong if this was called for anything other than the top level directory
assert fh == llfuse.ROOT_INODE
for i, entry in enumerate(self.files[off:]):
print(f" yielding: {entry.encode('utf8')}")
yield (
entry.encode("utf-8"),
self.lookup(llfuse.ROOT_INODE, entry.encode("utf-8")),
i + off + 1,
)
def new_file_handle(self, inode=None):
"""
Pick an unused file handle (int) and create an entry in file_handles for it
returns: the file handle number
"""
# Find the first unused number
new_fh = 0
for fh in sorted(self.file_handles.keys()):
if new_fh == fh:
new_fh += 1
else:
break
# Create the empty entry in the
self.file_handles[new_fh] = {
"inode": inode,
"data": None,
}
return new_fh
def open(self, inode, flags, ctx):
print(f"open called with: inode={inode} flags={flags}")
fh = self.new_file_handle(inode)
# Get the data associated with the file handle
file = self.file_handles[fh]
# Get the data associated with the file name and parameters
data = self.inodes[inode]
# Pull out the things we need to run the query
metric_name = data["name"]
metric_args = data["args"]
# Populate the data for the file
if metric_name == "agent_version": if metric_name == "agent_version":
self._reply(200, VERSION) file["data"] = VERSION.encode("utf-8")
elif metric_name == "sleep":
seconds = metric_args.get("seconds", "10")
with llfuse.lock_released:
time.sleep(int(seconds))
file["data"] = seconds.encode("utf-8")
elif metric_name == "latest_version_info": elif metric_name == "latest_version_info":
try: try:
get_latest_version() get_latest_version()
self._reply( file["data"] = json.dumps(
200,
json.dumps(
{ {
"latest": Context.latest_version, "latest": Context.latest_version,
"supported": 1 if Context.release_supported else 0, "supported": 1 if Context.release_supported else 0,
} }
), ).encode("utf-8")
)
except LatestVersionCheckError as e: except LatestVersionCheckError as e:
Context.log.error( Context.log.error(
"Failed to retrieve latest version information: %s", e "Failed to retrieve latest version information: %s", e
) )
self._reply(503, "Failed to retrieve latest version info") file["data"] = "ERROR: Failed to retrieve latest version info".encode(
"utf-8"
)
else: else:
# Note: parse_qs returns the values as a list. Since we always expect
# single values, just grab the first from each.
args = {key: values[0] for key, values in parsed_query.items()}
# Get the dbname. If none was provided, use the default from the # Get the dbname. If none was provided, use the default from the
# config. # config.
dbname = args.get("dbname", Context.config["dbname"]) dbname = metric_args.get("dbname", Context.config["dbname"])
# Sample the metric # Sample the metric
try: try:
self._reply(200, sample_metric(dbname, metric_name, args)) file["data"] = sample_metric(dbname, metric_name, metric_args).encode(
"utf-8"
)
except UnknownMetricError: except UnknownMetricError:
Context.log.error("Unknown metric: %s", metric_name) Context.log.error("Unknown metric: %s", metric_name)
self._reply(404, "Unknown metric") file["data"] = "ERROR: Unknown metric".encode("utf-8")
except MetricVersionError: except MetricVersionError:
Context.log.error("Failed to find an query version for %s", metric_name) Context.log.error("Failed to find an query version for %s", metric_name)
self._reply(404, "Unsupported version") file["data"] = "ERROR: Unsupported version".encode("utf-8")
except UnhappyDBError: except UnhappyDBError:
Context.log.info("Database %s is unhappy, please be patient", dbname) Context.log.info("Database %s is unhappy, please be patient", dbname)
self._reply(503, "Database unavailable") file["data"] = "ERROR: Database unavailable".encode("utf-8")
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
Context.log.error("Error running query: %s", e) Context.log.error("Error running query: %s", e)
self._reply(500, "Unexpected error: {}".format(e)) file["data"] = "ERROR: Unexpected error: {}".format(e).encode("utf-8")
def _reply(self, code, content): return fh
"""
Send a reply to the client
"""
self.send_response(code)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(content, "utf-8")) def read(self, fh, off, size):
print(f"read called with: fh={fh} off={off} size={size}")
file = self.file_handles[fh]
return file["data"][off : off + size]
def forget(self, inode_list):
"""
Note: Per the dcumenation, this function must not raise an exception!
"""
print(f"forget called with: inode_list={inode_list}")
for inode, nlookup in inode_list:
try:
self.inodes[inode]["count"] -= nlookup
if self.inodes[inode]["count"] <= 0:
del self.inodes[inode]
except Exception as e:
Context.log.error("Error forgetting inode %s: %s", inode, e)
def release(self, fh):
print(f"release called with: fh={fh}")
del self.file_handles[fh]
def main(): def main():
@ -916,23 +1088,44 @@ def main():
sys.exit(1) sys.exit(1)
sys.exit(0) sys.exit(0)
# Set up the http server to receive requests
server_address = (Context.config["address"], Context.config["port"])
Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
# Set up the signal handler # Set up the signal handler
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler) signal.signal(signal.SIGHUP, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler)
# Handle requests. # Ensure the mount point exists
Context.log.info("Listening on port %s...", Context.config["port"]) if not os.path.exists(Context.config["fuse_base"]):
while Context.running: os.makedirs(Context.config["fuse_base"])
Context.httpd.handle_request()
# Create the FUSE filesystem
pgmon_fuse = PGMonFuse()
fuse_options = set(llfuse.default_options)
fuse_options.add("fsname=pgmon")
# fuse_options.add('direct_io')
# if options.debug_fuse:
# fuse_options.add('allow_others')
try:
llfuse.init(pgmon_fuse, Context.config["fuse_base"], fuse_options)
except:
llfuse.close()
return -1
try:
llfuse.main(workers=1)
except:
llfuse.close()
raise
llfuse.close()
# Clean up PostgreSQL connections # Clean up PostgreSQL connections
# TODO: Improve this ... not sure it actually closes all the connections cleanly # TODO: Improve this ... not sure it actually closes all the connections cleanly
for pool in Context.connections.values(): for pool in Context.connections.values():
pool.close() pool.closeall()
logging.shutdown()
print("Good bye.")
if __name__ == "__main__": if __name__ == "__main__":