This commit is contained in:
James Campbell 2026-03-15 23:54:27 -04:00
parent 65f01ea826
commit 39c34ee31f
Signed by: james
GPG Key ID: 2287C33A40DC906A
3 changed files with 234 additions and 49 deletions

View File

@ -1,3 +1,4 @@
psycopg2
pyyaml
fusepy
llfuse
requests

View File

@ -44,6 +44,7 @@ dbhost: 127.0.0.1
# Database port
#dbport: 5432
dbport: 54315
# Default database to connect to when none is specified for a metric
#dbname: 'postgres'

View File

@ -15,6 +15,8 @@ import argparse
import logging
import re
import stat
import errno
import requests
from decimal import Decimal
@ -35,7 +37,7 @@ from psycopg2.pool import ThreadedConnectionPool
from pwd import getpwnam
from grp import getgrnam
from fuse import FUSE, Operations, LoggingMixIn, fuse_exit
import llfuse
VERSION = "1.1.0-rc1"
@ -141,7 +143,7 @@ class InvalidDataError(Exception):
# Default config settings
DEFAULT_CONFIG = {
# The name for this agent instance (controles the FUSE directory name)
"name": 'pgmon',
"name": "pgmon",
# Base directory for FUSE file system
"fuse_base": "/run/pgmon",
# Mode to set on FUSE files (in octal)
@ -798,69 +800,203 @@ def test_queries():
return 0
class PGMonFuse(LoggingMixIn, Operations):
class PGMonFuse(llfuse.Operations):
"""
This is our FUSE filesystem for requests from Zabbix. It is responsible for listening for
requests, processing them, and responding.
"""
def __init__(self):
self.builtin = ['agent_version', 'latest_version_info']
super().__init__()
self.builtin = ["agent_version", "latest_version_info", "sleep"]
self.update_files()
# Dictionary holdig context information for open file handles
self.inodes = dict()
self.file_handles = dict()
@staticmethod
def gen_attrs(inode, is_dir):
attrs = llfuse.EntryAttributes()
if is_dir:
attrs.st_mode = Context.config["fuse_mode"] | stat.S_IXUSR | stat.S_IFDIR
else:
attrs.st_mode = Context.config["fuse_mode"] | stat.S_IFREG
attrs.st_nlink = 1
attrs.st_size = 1024
attrs.st_uid = getpwnam(Context.config["fuse_owner"]).pw_uid
attrs.st_gid = getgrnam(Context.config["fuse_group"]).gr_gid
attrs.st_ino = inode
def getattr(self, path, fh=None):
attrs = {
'st_mode': (Context.config['fuse_mode'] | stat.S_IFREG),
'st_nlink': 1,
'st_size': 1024,
'st_uid': getpwnam(Context.config['fuse_owner']).pw_uid,
'st_gid': getgrnam(Context.config['fuse_group']).gr_gid,
}
if path == "/":
# If we're looking at the directory itself, add execute permissions for the owner and identify it as a directory
attrs['st_mode'] = Context.config['fuse_mode'] | stat.S_IXUSR | stat.S_IFDIR
return attrs
def readdir(self, path, fh):
return ['.', '..'] + self.builtin + list(Context.config["metrics"].keys())
def update_files(self):
self.files = self.builtin + sorted(list(Context.config["metrics"].keys()))
# def open(self, path, fh):
# fh.direct_io = 1
# pass
@staticmethod
def path_to_inode(path):
"""
Provide a consistent inode for a given file
"""
return hash(path) & 0xFFFFFFFFFFFFFFFF
def read(self, path, size, offset, fh):
# Split the path into the expected components: /<metric>/<arg1>/<arg2>/...
parts = path.split('/')
def lookup(self, inode_p, name, ctx=None):
""" """
print(f"Lookup called with: inode_p={inode_p} name={name}")
# Note: the first element will be empty. If we got less than two, we
# don't have a valid request.
if len(parts) < 2:
return ''.encode('utf-8')
# We only have a single level, so if the parent inode is anything else
# then it doesn't exist
if inode_p != llfuse.ROOT_INODE:
raise llfuse.FUSEError(errno.ENOENT)
res = None
metric_name = parts[1]
metric_args = {}
# Generate the inode for this file (ie: query + args)
inode = self.path_to_inode(name)
for arg in parts[2:]:
key, value = arg.split('=')
metric_args[key] = value
# Get the file info if we already have it, create it otherwise
try:
file = self.inodes[inode]
except KeyError:
# Separate out the name components
parts = name.decode("utf-8").split(":", 1)
base_name = parts[0]
try:
arg_str = parts[1]
except IndexError:
arg_str = ""
# Make sure it's a valid request
if base_name not in self.files:
raise llfuse.FUSEError(errno.ENOENT)
# Split any key/value args
args = {}
for arg in arg_str.split(","):
if arg == "":
continue
print(f"Splitting: {arg}")
key, value = arg.split("=", 1)
args[key] = value
file = {
"name": base_name,
"args": args,
"error": None,
"attrs": None,
"count": 0,
}
self.inodes[inode] = file
if not file["attrs"]:
# Fuse shouldn't be calling this for the base directory, so it
# should be safe to assume we're dealing with a file.
file["attrs"] = self.gen_attrs(inode, False)
# Increase the lookup count
file["count"] += 1
return file["attrs"]
def getattr(self, inode, ctx=None):
print(f"getattr called with: inode={inode}")
if inode == llfuse.ROOT_INODE:
# If we're looking at the directory itself
return self.gen_attrs(inode, True)
try:
return self.inodes[inode]["attrs"]
except KeyError:
print(f"Getattr called without a lookup first for: inode={inode}")
raise llfuse.FUSEError(errno.ENOENT)
def opendir(self, inode, ctx):
"""
Note: We only allow one directory level
"""
print(f"opendir called with: inode={inode}")
if inode != llfuse.ROOT_INODE:
raise llfuse.FUSEError(errno.ENOENT)
return inode
def readdir(self, fh, off):
print(f"readdir called with: fh={fh} off={off}")
# Something has gone wrong if this was called for anything other than the top level directory
assert fh == llfuse.ROOT_INODE
for i, entry in enumerate(self.files[off:]):
print(f" yielding: {entry.encode('utf8')}")
yield (
entry.encode("utf-8"),
self.lookup(llfuse.ROOT_INODE, entry.encode("utf-8")),
i + off + 1,
)
def new_file_handle(self, inode=None):
"""
Pick an unused file handle (int) and create an entry in file_handles for it
returns: the file handle number
"""
# Find the first unused number
new_fh = 0
for fh in sorted(self.file_handles.keys()):
if new_fh == fh:
new_fh += 1
else:
break
# Create the empty entry in the
self.file_handles[new_fh] = {
"inode": inode,
"data": None,
}
return new_fh
def open(self, inode, flags, ctx):
print(f"open called with: inode={inode} flags={flags}")
fh = self.new_file_handle(inode)
# Get the data associated with the file handle
file = self.file_handles[fh]
# Get the data associated with the file name and parameters
data = self.inodes[inode]
# Pull out the things we need to run the query
metric_name = data["name"]
metric_args = data["args"]
# Populate the data for the file
if metric_name == "agent_version":
res = VERSION
file["data"] = VERSION.encode("utf-8")
elif metric_name == "sleep":
seconds = metric_args.get("seconds", "10")
with llfuse.lock_released:
time.sleep(int(seconds))
file["data"] = seconds.encode("utf-8")
elif metric_name == "latest_version_info":
try:
get_latest_version()
res = json.dumps(
{
"latest": Context.latest_version,
"supported": 1 if Context.release_supported else 0,
}
)
file["data"] = json.dumps(
{
"latest": Context.latest_version,
"supported": 1 if Context.release_supported else 0,
}
).encode("utf-8")
except LatestVersionCheckError as e:
Context.log.error(
"Failed to retrieve latest version information: %s", e
)
res = "ERROR: Failed to retrieve latest version info"
file["data"] = "ERROR: Failed to retrieve latest version info".encode(
"utf-8"
)
else:
# Get the dbname. If none was provided, use the default from the
# config.
@ -868,21 +1004,49 @@ class PGMonFuse(LoggingMixIn, Operations):
# Sample the metric
try:
res = sample_metric(dbname, metric_name, metric_args)
file["data"] = sample_metric(dbname, metric_name, metric_args).encode(
"utf-8"
)
except UnknownMetricError:
Context.log.error("Unknown metric: %s", metric_name)
res = "ERROR: Unknown metric"
file["data"] = "ERROR: Unknown metric".encode("utf-8")
except MetricVersionError:
Context.log.error("Failed to find an query version for %s", metric_name)
res = "ERROR: Unsupported version"
file["data"] = "ERROR: Unsupported version".encode("utf-8")
except UnhappyDBError:
Context.log.info("Database %s is unhappy, please be patient", dbname)
res = "ERROR: Database unavailable"
file["data"] = "ERROR: Database unavailable".encode("utf-8")
except Exception as e: # pylint: disable=broad-exception-caught
Context.log.error("Error running query: %s", e)
res = "ERROR: Unexpected error: {}".format(e)
file["data"] = "ERROR: Unexpected error: {}".format(e).encode("utf-8")
return res.encode('utf-8')
return fh
def read(self, fh, off, size):
print(f"read called with: fh={fh} off={off} size={size}")
file = self.file_handles[fh]
return file["data"][off : off + size]
def forget(self, inode_list):
"""
Note: Per the dcumenation, this function must not raise an exception!
"""
print(f"forget called with: inode_list={inode_list}")
for inode, nlookup in inode_list:
try:
self.inodes[inode]["count"] -= nlookup
if self.inodes[inode]["count"] <= 0:
del self.inodes[inode]
except Exception as e:
Context.log.error("Error forgetting inode %s: %s", inode, e)
def release(self, fh):
print(f"release called with: fh={fh}")
del self.file_handles[fh]
def main():
@ -934,7 +1098,25 @@ def main():
os.makedirs(Context.config["fuse_base"])
# Create the FUSE filesystem
FUSE(PGMonFuse(), Context.config["fuse_base"], foreground=True, default_permissions=True)
pgmon_fuse = PGMonFuse()
fuse_options = set(llfuse.default_options)
fuse_options.add("fsname=pgmon")
# fuse_options.add('direct_io')
# if options.debug_fuse:
# fuse_options.add('allow_others')
try:
llfuse.init(pgmon_fuse, Context.config["fuse_base"], fuse_options)
except:
llfuse.close()
return -1
try:
llfuse.main(workers=1)
except:
llfuse.close()
raise
llfuse.close()
# Clean up PostgreSQL connections
# TODO: Improve this ... not sure it actually closes all the connections cleanly
@ -945,5 +1127,6 @@ def main():
print("Good bye.")
if __name__ == "__main__":
main()