From 39c34ee31f300d06a0b4d77561d869be472dd60c Mon Sep 17 00:00:00 2001 From: James Campbell Date: Sun, 15 Mar 2026 23:54:27 -0400 Subject: [PATCH] wip --- requirements.yml | 3 +- sample-config/pgmon.yml | 1 + src/pgmon.py | 279 +++++++++++++++++++++++++++++++++------- 3 files changed, 234 insertions(+), 49 deletions(-) diff --git a/requirements.yml b/requirements.yml index cd220f4..92ac5c8 100644 --- a/requirements.yml +++ b/requirements.yml @@ -1,3 +1,4 @@ psycopg2 pyyaml -fusepy +llfuse +requests diff --git a/sample-config/pgmon.yml b/sample-config/pgmon.yml index 5d09b5c..14669e9 100644 --- a/sample-config/pgmon.yml +++ b/sample-config/pgmon.yml @@ -44,6 +44,7 @@ dbhost: 127.0.0.1 # Database port #dbport: 5432 +dbport: 54315 # Default database to connect to when none is specified for a metric #dbname: 'postgres' diff --git a/src/pgmon.py b/src/pgmon.py index a6bf0dc..6d8139f 100755 --- a/src/pgmon.py +++ b/src/pgmon.py @@ -15,6 +15,8 @@ import argparse import logging import re import stat +import errno +import requests from decimal import Decimal @@ -35,7 +37,7 @@ from psycopg2.pool import ThreadedConnectionPool from pwd import getpwnam from grp import getgrnam -from fuse import FUSE, Operations, LoggingMixIn, fuse_exit +import llfuse VERSION = "1.1.0-rc1" @@ -141,7 +143,7 @@ class InvalidDataError(Exception): # Default config settings DEFAULT_CONFIG = { # The name for this agent instance (controles the FUSE directory name) - "name": 'pgmon', + "name": "pgmon", # Base directory for FUSE file system "fuse_base": "/run/pgmon", # Mode to set on FUSE files (in octal) @@ -798,69 +800,203 @@ def test_queries(): return 0 -class PGMonFuse(LoggingMixIn, Operations): +class PGMonFuse(llfuse.Operations): """ This is our FUSE filesystem for requests from Zabbix. It is responsible for listening for requests, processing them, and responding. """ def __init__(self): - self.builtin = ['agent_version', 'latest_version_info'] + super().__init__() + self.builtin = ["agent_version", "latest_version_info", "sleep"] + self.update_files() + # Dictionary holdig context information for open file handles + self.inodes = dict() + self.file_handles = dict() + + @staticmethod + def gen_attrs(inode, is_dir): + + attrs = llfuse.EntryAttributes() + if is_dir: + attrs.st_mode = Context.config["fuse_mode"] | stat.S_IXUSR | stat.S_IFDIR + else: + attrs.st_mode = Context.config["fuse_mode"] | stat.S_IFREG + attrs.st_nlink = 1 + attrs.st_size = 1024 + attrs.st_uid = getpwnam(Context.config["fuse_owner"]).pw_uid + attrs.st_gid = getgrnam(Context.config["fuse_group"]).gr_gid + + attrs.st_ino = inode - def getattr(self, path, fh=None): - attrs = { - 'st_mode': (Context.config['fuse_mode'] | stat.S_IFREG), - 'st_nlink': 1, - 'st_size': 1024, - 'st_uid': getpwnam(Context.config['fuse_owner']).pw_uid, - 'st_gid': getgrnam(Context.config['fuse_group']).gr_gid, - } - if path == "/": - # If we're looking at the directory itself, add execute permissions for the owner and identify it as a directory - attrs['st_mode'] = Context.config['fuse_mode'] | stat.S_IXUSR | stat.S_IFDIR return attrs - def readdir(self, path, fh): - return ['.', '..'] + self.builtin + list(Context.config["metrics"].keys()) + def update_files(self): + self.files = self.builtin + sorted(list(Context.config["metrics"].keys())) -# def open(self, path, fh): -# fh.direct_io = 1 -# pass + @staticmethod + def path_to_inode(path): + """ + Provide a consistent inode for a given file + """ + return hash(path) & 0xFFFFFFFFFFFFFFFF - def read(self, path, size, offset, fh): - # Split the path into the expected components: ////... - parts = path.split('/') + def lookup(self, inode_p, name, ctx=None): + """ """ + print(f"Lookup called with: inode_p={inode_p} name={name}") - # Note: the first element will be empty. If we got less than two, we - # don't have a valid request. - if len(parts) < 2: - return ''.encode('utf-8') + # We only have a single level, so if the parent inode is anything else + # then it doesn't exist + if inode_p != llfuse.ROOT_INODE: + raise llfuse.FUSEError(errno.ENOENT) - res = None - metric_name = parts[1] - metric_args = {} + # Generate the inode for this file (ie: query + args) + inode = self.path_to_inode(name) - for arg in parts[2:]: - key, value = arg.split('=') - metric_args[key] = value + # Get the file info if we already have it, create it otherwise + try: + file = self.inodes[inode] + except KeyError: + # Separate out the name components + parts = name.decode("utf-8").split(":", 1) + base_name = parts[0] + try: + arg_str = parts[1] + except IndexError: + arg_str = "" + # Make sure it's a valid request + if base_name not in self.files: + raise llfuse.FUSEError(errno.ENOENT) + + # Split any key/value args + args = {} + for arg in arg_str.split(","): + if arg == "": + continue + print(f"Splitting: {arg}") + key, value = arg.split("=", 1) + args[key] = value + + file = { + "name": base_name, + "args": args, + "error": None, + "attrs": None, + "count": 0, + } + + self.inodes[inode] = file + + if not file["attrs"]: + # Fuse shouldn't be calling this for the base directory, so it + # should be safe to assume we're dealing with a file. + file["attrs"] = self.gen_attrs(inode, False) + + # Increase the lookup count + file["count"] += 1 + + return file["attrs"] + + def getattr(self, inode, ctx=None): + print(f"getattr called with: inode={inode}") + + if inode == llfuse.ROOT_INODE: + # If we're looking at the directory itself + return self.gen_attrs(inode, True) + + try: + return self.inodes[inode]["attrs"] + except KeyError: + print(f"Getattr called without a lookup first for: inode={inode}") + raise llfuse.FUSEError(errno.ENOENT) + + def opendir(self, inode, ctx): + """ + Note: We only allow one directory level + """ + print(f"opendir called with: inode={inode}") + + if inode != llfuse.ROOT_INODE: + raise llfuse.FUSEError(errno.ENOENT) + return inode + + def readdir(self, fh, off): + print(f"readdir called with: fh={fh} off={off}") + + # Something has gone wrong if this was called for anything other than the top level directory + assert fh == llfuse.ROOT_INODE + + for i, entry in enumerate(self.files[off:]): + print(f" yielding: {entry.encode('utf8')}") + yield ( + entry.encode("utf-8"), + self.lookup(llfuse.ROOT_INODE, entry.encode("utf-8")), + i + off + 1, + ) + + def new_file_handle(self, inode=None): + """ + Pick an unused file handle (int) and create an entry in file_handles for it + + returns: the file handle number + """ + # Find the first unused number + new_fh = 0 + for fh in sorted(self.file_handles.keys()): + if new_fh == fh: + new_fh += 1 + else: + break + + # Create the empty entry in the + self.file_handles[new_fh] = { + "inode": inode, + "data": None, + } + + return new_fh + + def open(self, inode, flags, ctx): + print(f"open called with: inode={inode} flags={flags}") + + fh = self.new_file_handle(inode) + + # Get the data associated with the file handle + file = self.file_handles[fh] + + # Get the data associated with the file name and parameters + data = self.inodes[inode] + + # Pull out the things we need to run the query + metric_name = data["name"] + metric_args = data["args"] + + # Populate the data for the file if metric_name == "agent_version": - res = VERSION + file["data"] = VERSION.encode("utf-8") + elif metric_name == "sleep": + seconds = metric_args.get("seconds", "10") + with llfuse.lock_released: + time.sleep(int(seconds)) + file["data"] = seconds.encode("utf-8") elif metric_name == "latest_version_info": try: get_latest_version() - res = json.dumps( - { - "latest": Context.latest_version, - "supported": 1 if Context.release_supported else 0, - } - ) + file["data"] = json.dumps( + { + "latest": Context.latest_version, + "supported": 1 if Context.release_supported else 0, + } + ).encode("utf-8") except LatestVersionCheckError as e: Context.log.error( "Failed to retrieve latest version information: %s", e ) - res = "ERROR: Failed to retrieve latest version info" + file["data"] = "ERROR: Failed to retrieve latest version info".encode( + "utf-8" + ) else: # Get the dbname. If none was provided, use the default from the # config. @@ -868,21 +1004,49 @@ class PGMonFuse(LoggingMixIn, Operations): # Sample the metric try: - res = sample_metric(dbname, metric_name, metric_args) + file["data"] = sample_metric(dbname, metric_name, metric_args).encode( + "utf-8" + ) except UnknownMetricError: Context.log.error("Unknown metric: %s", metric_name) - res = "ERROR: Unknown metric" + file["data"] = "ERROR: Unknown metric".encode("utf-8") except MetricVersionError: Context.log.error("Failed to find an query version for %s", metric_name) - res = "ERROR: Unsupported version" + file["data"] = "ERROR: Unsupported version".encode("utf-8") except UnhappyDBError: Context.log.info("Database %s is unhappy, please be patient", dbname) - res = "ERROR: Database unavailable" + file["data"] = "ERROR: Database unavailable".encode("utf-8") except Exception as e: # pylint: disable=broad-exception-caught Context.log.error("Error running query: %s", e) - res = "ERROR: Unexpected error: {}".format(e) + file["data"] = "ERROR: Unexpected error: {}".format(e).encode("utf-8") - return res.encode('utf-8') + return fh + + def read(self, fh, off, size): + print(f"read called with: fh={fh} off={off} size={size}") + + file = self.file_handles[fh] + + return file["data"][off : off + size] + + def forget(self, inode_list): + """ + Note: Per the dcumenation, this function must not raise an exception! + """ + print(f"forget called with: inode_list={inode_list}") + + for inode, nlookup in inode_list: + try: + self.inodes[inode]["count"] -= nlookup + if self.inodes[inode]["count"] <= 0: + del self.inodes[inode] + except Exception as e: + Context.log.error("Error forgetting inode %s: %s", inode, e) + + def release(self, fh): + print(f"release called with: fh={fh}") + + del self.file_handles[fh] def main(): @@ -934,7 +1098,25 @@ def main(): os.makedirs(Context.config["fuse_base"]) # Create the FUSE filesystem - FUSE(PGMonFuse(), Context.config["fuse_base"], foreground=True, default_permissions=True) + pgmon_fuse = PGMonFuse() + fuse_options = set(llfuse.default_options) + fuse_options.add("fsname=pgmon") + # fuse_options.add('direct_io') + # if options.debug_fuse: + # fuse_options.add('allow_others') + try: + llfuse.init(pgmon_fuse, Context.config["fuse_base"], fuse_options) + except: + llfuse.close() + return -1 + + try: + llfuse.main(workers=1) + except: + llfuse.close() + raise + + llfuse.close() # Clean up PostgreSQL connections # TODO: Improve this ... not sure it actually closes all the connections cleanly @@ -945,5 +1127,6 @@ def main(): print("Good bye.") + if __name__ == "__main__": main()