Compare commits
2 Commits
17b55d3c46
...
39c34ee31f
| Author | SHA1 | Date | |
|---|---|---|---|
| 39c34ee31f | |||
| 65f01ea826 |
@ -1,2 +1,4 @@
|
|||||||
psycopg
|
psycopg2
|
||||||
pyyaml
|
pyyaml
|
||||||
|
llfuse
|
||||||
|
requests
|
||||||
|
|||||||
@ -1,8 +1,22 @@
|
|||||||
# The address the agent binds to
|
# # The name for this agent instance (controles the FUSE directory name)
|
||||||
#address: 127.0.0.1
|
#name: pgmon
|
||||||
|
|
||||||
# The port the agent listens on for requests
|
# Mode to set on FUSE files (in octal)
|
||||||
#port: 5400
|
#fuse_mode: 0o400
|
||||||
|
|
||||||
|
# Owner to set on FUSE files
|
||||||
|
#fuse_owner: zabbix
|
||||||
|
fuse_owner: james
|
||||||
|
|
||||||
|
# Group to set on FUSE files
|
||||||
|
#fuse_group: zabbix
|
||||||
|
fuse_group: james
|
||||||
|
|
||||||
|
# User the agent runs as
|
||||||
|
agent_user: james
|
||||||
|
|
||||||
|
# Groupthe agent runs as
|
||||||
|
agent_group: james
|
||||||
|
|
||||||
# Min PostgreSQL connection pool size (per database)
|
# Min PostgreSQL connection pool size (per database)
|
||||||
#min_pool_size: 0
|
#min_pool_size: 0
|
||||||
@ -16,21 +30,28 @@
|
|||||||
# Log level for stderr logging (see https://docs.python.org/3/library/logging.html#logging-levels)
|
# Log level for stderr logging (see https://docs.python.org/3/library/logging.html#logging-levels)
|
||||||
# Possible values are: debug, info, warning, error, critical
|
# Possible values are: debug, info, warning, error, critical
|
||||||
#log_level: 'error'
|
#log_level: 'error'
|
||||||
|
log_level: 'info'
|
||||||
|
|
||||||
|
# Base directory for FUSE file system
|
||||||
|
fuse_base: /tmp/pgmon
|
||||||
|
|
||||||
# Database user to connect as
|
# Database user to connect as
|
||||||
#dbuser: 'postgres'
|
#dbuser: 'postgres'
|
||||||
|
|
||||||
# Database host
|
# Database host
|
||||||
#dbhost: '/var/run/postgresql'
|
#dbhost: '/var/run/postgresql'
|
||||||
|
dbhost: 127.0.0.1
|
||||||
|
|
||||||
# Database port
|
# Database port
|
||||||
#dbport: 5432
|
#dbport: 5432
|
||||||
|
dbport: 54315
|
||||||
|
|
||||||
# Default database to connect to when none is specified for a metric
|
# Default database to connect to when none is specified for a metric
|
||||||
#dbname: 'postgres'
|
#dbname: 'postgres'
|
||||||
|
|
||||||
# SSL connection mode
|
# SSL connection mode
|
||||||
#ssl_mode: require
|
#ssl_mode: require
|
||||||
|
ssl_mode: allow
|
||||||
|
|
||||||
# Timeout for getting a connection slot from a pool
|
# Timeout for getting a connection slot from a pool
|
||||||
#pool_slot_timeout: 5
|
#pool_slot_timeout: 5
|
||||||
|
|||||||
337
src/pgmon.py
337
src/pgmon.py
@ -14,6 +14,9 @@ import signal
|
|||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
import stat
|
||||||
|
import errno
|
||||||
|
import requests
|
||||||
|
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
|
|
||||||
@ -23,9 +26,6 @@ from contextlib import contextmanager
|
|||||||
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from http.server import BaseHTTPRequestHandler
|
|
||||||
from http.server import ThreadingHTTPServer
|
|
||||||
|
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
@ -34,7 +34,10 @@ import psycopg2
|
|||||||
from psycopg2.extras import RealDictCursor
|
from psycopg2.extras import RealDictCursor
|
||||||
from psycopg2.pool import ThreadedConnectionPool
|
from psycopg2.pool import ThreadedConnectionPool
|
||||||
|
|
||||||
import requests
|
from pwd import getpwnam
|
||||||
|
from grp import getgrnam
|
||||||
|
|
||||||
|
import llfuse
|
||||||
|
|
||||||
|
|
||||||
VERSION = "1.1.0-rc1"
|
VERSION = "1.1.0-rc1"
|
||||||
@ -71,9 +74,6 @@ class Context:
|
|||||||
# Running state (used to gracefully shut down)
|
# Running state (used to gracefully shut down)
|
||||||
running = True
|
running = True
|
||||||
|
|
||||||
# The http server object
|
|
||||||
httpd = None
|
|
||||||
|
|
||||||
# Where the config file lives
|
# Where the config file lives
|
||||||
config_file = None
|
config_file = None
|
||||||
|
|
||||||
@ -142,10 +142,20 @@ class InvalidDataError(Exception):
|
|||||||
|
|
||||||
# Default config settings
|
# Default config settings
|
||||||
DEFAULT_CONFIG = {
|
DEFAULT_CONFIG = {
|
||||||
# The address the agent binds to
|
# The name for this agent instance (controles the FUSE directory name)
|
||||||
"address": "127.0.0.1",
|
"name": "pgmon",
|
||||||
# The port the agent listens on for requests
|
# Base directory for FUSE file system
|
||||||
"port": 5400,
|
"fuse_base": "/run/pgmon",
|
||||||
|
# Mode to set on FUSE files (in octal)
|
||||||
|
"fuse_mode": 0o400,
|
||||||
|
# Owner to set on FUSE files
|
||||||
|
"fuse_owner": "zabbix",
|
||||||
|
# Group to set on FUSE files
|
||||||
|
"fuse_group": "zabbix",
|
||||||
|
# The user the agent runs as
|
||||||
|
"agent_user": "pgmon",
|
||||||
|
# The group the agent runs as
|
||||||
|
"agent_group": "pgmon",
|
||||||
# Min PostgreSQL connection pool size (per database)
|
# Min PostgreSQL connection pool size (per database)
|
||||||
"min_pool_size": 0,
|
"min_pool_size": 0,
|
||||||
# Max PostgreSQL connection pool size (per database)
|
# Max PostgreSQL connection pool size (per database)
|
||||||
@ -349,11 +359,9 @@ def signal_handler(sig, frame): # pylint: disable=unused-argument
|
|||||||
if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
|
if sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
|
||||||
Context.log.info("Shutting down ...")
|
Context.log.info("Shutting down ...")
|
||||||
Context.running = False
|
Context.running = False
|
||||||
if Context.httpd is not None:
|
|
||||||
Context.httpd.socket.close()
|
|
||||||
|
|
||||||
# Signal a reload
|
# Signal a reload
|
||||||
if sig == signal.SIGHUP:
|
if sig in [signal.SIGHUP, signal.SIGUSR1]:
|
||||||
Context.log.warning("Received config reload signal")
|
Context.log.warning("Received config reload signal")
|
||||||
read_config(Context.config_file)
|
read_config(Context.config_file)
|
||||||
|
|
||||||
@ -792,89 +800,253 @@ def test_queries():
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
class PGMonFuse(llfuse.Operations):
|
||||||
"""
|
"""
|
||||||
This is our request handling server. It is responsible for listening for
|
This is our FUSE filesystem for requests from Zabbix. It is responsible for listening for
|
||||||
requests, processing them, and responding.
|
requests, processing them, and responding.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def log_request(self, code="-", size="-"):
|
def __init__(self):
|
||||||
"""
|
super().__init__()
|
||||||
Override to suppress standard request logging
|
self.builtin = ["agent_version", "latest_version_info", "sleep"]
|
||||||
"""
|
self.update_files()
|
||||||
|
|
||||||
def do_GET(self): # pylint: disable=invalid-name
|
# Dictionary holdig context information for open file handles
|
||||||
|
self.inodes = dict()
|
||||||
|
self.file_handles = dict()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def gen_attrs(inode, is_dir):
|
||||||
|
|
||||||
|
attrs = llfuse.EntryAttributes()
|
||||||
|
if is_dir:
|
||||||
|
attrs.st_mode = Context.config["fuse_mode"] | stat.S_IXUSR | stat.S_IFDIR
|
||||||
|
else:
|
||||||
|
attrs.st_mode = Context.config["fuse_mode"] | stat.S_IFREG
|
||||||
|
attrs.st_nlink = 1
|
||||||
|
attrs.st_size = 1024
|
||||||
|
attrs.st_uid = getpwnam(Context.config["fuse_owner"]).pw_uid
|
||||||
|
attrs.st_gid = getgrnam(Context.config["fuse_group"]).gr_gid
|
||||||
|
|
||||||
|
attrs.st_ino = inode
|
||||||
|
|
||||||
|
return attrs
|
||||||
|
|
||||||
|
def update_files(self):
|
||||||
|
self.files = self.builtin + sorted(list(Context.config["metrics"].keys()))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def path_to_inode(path):
|
||||||
"""
|
"""
|
||||||
Handle a request. This is just a wrapper around the actual handler
|
Provide a consistent inode for a given file
|
||||||
code to keep things more readable.
|
|
||||||
"""
|
"""
|
||||||
|
return hash(path) & 0xFFFFFFFFFFFFFFFF
|
||||||
|
|
||||||
|
def lookup(self, inode_p, name, ctx=None):
|
||||||
|
""" """
|
||||||
|
print(f"Lookup called with: inode_p={inode_p} name={name}")
|
||||||
|
|
||||||
|
# We only have a single level, so if the parent inode is anything else
|
||||||
|
# then it doesn't exist
|
||||||
|
if inode_p != llfuse.ROOT_INODE:
|
||||||
|
raise llfuse.FUSEError(errno.ENOENT)
|
||||||
|
|
||||||
|
# Generate the inode for this file (ie: query + args)
|
||||||
|
inode = self.path_to_inode(name)
|
||||||
|
|
||||||
|
# Get the file info if we already have it, create it otherwise
|
||||||
try:
|
try:
|
||||||
self._handle_request()
|
file = self.inodes[inode]
|
||||||
except BrokenPipeError:
|
except KeyError:
|
||||||
Context.log.error("Client disconnected, exiting handler")
|
# Separate out the name components
|
||||||
|
parts = name.decode("utf-8").split(":", 1)
|
||||||
|
base_name = parts[0]
|
||||||
|
try:
|
||||||
|
arg_str = parts[1]
|
||||||
|
except IndexError:
|
||||||
|
arg_str = ""
|
||||||
|
|
||||||
def _handle_request(self):
|
# Make sure it's a valid request
|
||||||
"""
|
if base_name not in self.files:
|
||||||
Request handler
|
raise llfuse.FUSEError(errno.ENOENT)
|
||||||
"""
|
|
||||||
# Parse the URL
|
|
||||||
parsed_path = urlparse(self.path)
|
|
||||||
metric_name = parsed_path.path.strip("/")
|
|
||||||
parsed_query = parse_qs(parsed_path.query)
|
|
||||||
|
|
||||||
|
# Split any key/value args
|
||||||
|
args = {}
|
||||||
|
for arg in arg_str.split(","):
|
||||||
|
if arg == "":
|
||||||
|
continue
|
||||||
|
print(f"Splitting: {arg}")
|
||||||
|
key, value = arg.split("=", 1)
|
||||||
|
args[key] = value
|
||||||
|
|
||||||
|
file = {
|
||||||
|
"name": base_name,
|
||||||
|
"args": args,
|
||||||
|
"error": None,
|
||||||
|
"attrs": None,
|
||||||
|
"count": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
self.inodes[inode] = file
|
||||||
|
|
||||||
|
if not file["attrs"]:
|
||||||
|
# Fuse shouldn't be calling this for the base directory, so it
|
||||||
|
# should be safe to assume we're dealing with a file.
|
||||||
|
file["attrs"] = self.gen_attrs(inode, False)
|
||||||
|
|
||||||
|
# Increase the lookup count
|
||||||
|
file["count"] += 1
|
||||||
|
|
||||||
|
return file["attrs"]
|
||||||
|
|
||||||
|
def getattr(self, inode, ctx=None):
|
||||||
|
print(f"getattr called with: inode={inode}")
|
||||||
|
|
||||||
|
if inode == llfuse.ROOT_INODE:
|
||||||
|
# If we're looking at the directory itself
|
||||||
|
return self.gen_attrs(inode, True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self.inodes[inode]["attrs"]
|
||||||
|
except KeyError:
|
||||||
|
print(f"Getattr called without a lookup first for: inode={inode}")
|
||||||
|
raise llfuse.FUSEError(errno.ENOENT)
|
||||||
|
|
||||||
|
def opendir(self, inode, ctx):
|
||||||
|
"""
|
||||||
|
Note: We only allow one directory level
|
||||||
|
"""
|
||||||
|
print(f"opendir called with: inode={inode}")
|
||||||
|
|
||||||
|
if inode != llfuse.ROOT_INODE:
|
||||||
|
raise llfuse.FUSEError(errno.ENOENT)
|
||||||
|
return inode
|
||||||
|
|
||||||
|
def readdir(self, fh, off):
|
||||||
|
print(f"readdir called with: fh={fh} off={off}")
|
||||||
|
|
||||||
|
# Something has gone wrong if this was called for anything other than the top level directory
|
||||||
|
assert fh == llfuse.ROOT_INODE
|
||||||
|
|
||||||
|
for i, entry in enumerate(self.files[off:]):
|
||||||
|
print(f" yielding: {entry.encode('utf8')}")
|
||||||
|
yield (
|
||||||
|
entry.encode("utf-8"),
|
||||||
|
self.lookup(llfuse.ROOT_INODE, entry.encode("utf-8")),
|
||||||
|
i + off + 1,
|
||||||
|
)
|
||||||
|
|
||||||
|
def new_file_handle(self, inode=None):
|
||||||
|
"""
|
||||||
|
Pick an unused file handle (int) and create an entry in file_handles for it
|
||||||
|
|
||||||
|
returns: the file handle number
|
||||||
|
"""
|
||||||
|
# Find the first unused number
|
||||||
|
new_fh = 0
|
||||||
|
for fh in sorted(self.file_handles.keys()):
|
||||||
|
if new_fh == fh:
|
||||||
|
new_fh += 1
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Create the empty entry in the
|
||||||
|
self.file_handles[new_fh] = {
|
||||||
|
"inode": inode,
|
||||||
|
"data": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
return new_fh
|
||||||
|
|
||||||
|
def open(self, inode, flags, ctx):
|
||||||
|
print(f"open called with: inode={inode} flags={flags}")
|
||||||
|
|
||||||
|
fh = self.new_file_handle(inode)
|
||||||
|
|
||||||
|
# Get the data associated with the file handle
|
||||||
|
file = self.file_handles[fh]
|
||||||
|
|
||||||
|
# Get the data associated with the file name and parameters
|
||||||
|
data = self.inodes[inode]
|
||||||
|
|
||||||
|
# Pull out the things we need to run the query
|
||||||
|
metric_name = data["name"]
|
||||||
|
metric_args = data["args"]
|
||||||
|
|
||||||
|
# Populate the data for the file
|
||||||
if metric_name == "agent_version":
|
if metric_name == "agent_version":
|
||||||
self._reply(200, VERSION)
|
file["data"] = VERSION.encode("utf-8")
|
||||||
|
elif metric_name == "sleep":
|
||||||
|
seconds = metric_args.get("seconds", "10")
|
||||||
|
with llfuse.lock_released:
|
||||||
|
time.sleep(int(seconds))
|
||||||
|
file["data"] = seconds.encode("utf-8")
|
||||||
elif metric_name == "latest_version_info":
|
elif metric_name == "latest_version_info":
|
||||||
try:
|
try:
|
||||||
get_latest_version()
|
get_latest_version()
|
||||||
self._reply(
|
file["data"] = json.dumps(
|
||||||
200,
|
{
|
||||||
json.dumps(
|
"latest": Context.latest_version,
|
||||||
{
|
"supported": 1 if Context.release_supported else 0,
|
||||||
"latest": Context.latest_version,
|
}
|
||||||
"supported": 1 if Context.release_supported else 0,
|
).encode("utf-8")
|
||||||
}
|
|
||||||
),
|
|
||||||
)
|
|
||||||
except LatestVersionCheckError as e:
|
except LatestVersionCheckError as e:
|
||||||
Context.log.error(
|
Context.log.error(
|
||||||
"Failed to retrieve latest version information: %s", e
|
"Failed to retrieve latest version information: %s", e
|
||||||
)
|
)
|
||||||
self._reply(503, "Failed to retrieve latest version info")
|
file["data"] = "ERROR: Failed to retrieve latest version info".encode(
|
||||||
|
"utf-8"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Note: parse_qs returns the values as a list. Since we always expect
|
|
||||||
# single values, just grab the first from each.
|
|
||||||
args = {key: values[0] for key, values in parsed_query.items()}
|
|
||||||
|
|
||||||
# Get the dbname. If none was provided, use the default from the
|
# Get the dbname. If none was provided, use the default from the
|
||||||
# config.
|
# config.
|
||||||
dbname = args.get("dbname", Context.config["dbname"])
|
dbname = metric_args.get("dbname", Context.config["dbname"])
|
||||||
|
|
||||||
# Sample the metric
|
# Sample the metric
|
||||||
try:
|
try:
|
||||||
self._reply(200, sample_metric(dbname, metric_name, args))
|
file["data"] = sample_metric(dbname, metric_name, metric_args).encode(
|
||||||
|
"utf-8"
|
||||||
|
)
|
||||||
except UnknownMetricError:
|
except UnknownMetricError:
|
||||||
Context.log.error("Unknown metric: %s", metric_name)
|
Context.log.error("Unknown metric: %s", metric_name)
|
||||||
self._reply(404, "Unknown metric")
|
file["data"] = "ERROR: Unknown metric".encode("utf-8")
|
||||||
except MetricVersionError:
|
except MetricVersionError:
|
||||||
Context.log.error("Failed to find an query version for %s", metric_name)
|
Context.log.error("Failed to find an query version for %s", metric_name)
|
||||||
self._reply(404, "Unsupported version")
|
file["data"] = "ERROR: Unsupported version".encode("utf-8")
|
||||||
except UnhappyDBError:
|
except UnhappyDBError:
|
||||||
Context.log.info("Database %s is unhappy, please be patient", dbname)
|
Context.log.info("Database %s is unhappy, please be patient", dbname)
|
||||||
self._reply(503, "Database unavailable")
|
file["data"] = "ERROR: Database unavailable".encode("utf-8")
|
||||||
except Exception as e: # pylint: disable=broad-exception-caught
|
except Exception as e: # pylint: disable=broad-exception-caught
|
||||||
Context.log.error("Error running query: %s", e)
|
Context.log.error("Error running query: %s", e)
|
||||||
self._reply(500, "Unexpected error: {}".format(e))
|
file["data"] = "ERROR: Unexpected error: {}".format(e).encode("utf-8")
|
||||||
|
|
||||||
def _reply(self, code, content):
|
return fh
|
||||||
"""
|
|
||||||
Send a reply to the client
|
|
||||||
"""
|
|
||||||
self.send_response(code)
|
|
||||||
self.send_header("Content-type", "application/json")
|
|
||||||
self.end_headers()
|
|
||||||
|
|
||||||
self.wfile.write(bytes(content, "utf-8"))
|
def read(self, fh, off, size):
|
||||||
|
print(f"read called with: fh={fh} off={off} size={size}")
|
||||||
|
|
||||||
|
file = self.file_handles[fh]
|
||||||
|
|
||||||
|
return file["data"][off : off + size]
|
||||||
|
|
||||||
|
def forget(self, inode_list):
|
||||||
|
"""
|
||||||
|
Note: Per the dcumenation, this function must not raise an exception!
|
||||||
|
"""
|
||||||
|
print(f"forget called with: inode_list={inode_list}")
|
||||||
|
|
||||||
|
for inode, nlookup in inode_list:
|
||||||
|
try:
|
||||||
|
self.inodes[inode]["count"] -= nlookup
|
||||||
|
if self.inodes[inode]["count"] <= 0:
|
||||||
|
del self.inodes[inode]
|
||||||
|
except Exception as e:
|
||||||
|
Context.log.error("Error forgetting inode %s: %s", inode, e)
|
||||||
|
|
||||||
|
def release(self, fh):
|
||||||
|
print(f"release called with: fh={fh}")
|
||||||
|
|
||||||
|
del self.file_handles[fh]
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -916,23 +1088,44 @@ def main():
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
# Set up the http server to receive requests
|
|
||||||
server_address = (Context.config["address"], Context.config["port"])
|
|
||||||
Context.httpd = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler)
|
|
||||||
|
|
||||||
# Set up the signal handler
|
# Set up the signal handler
|
||||||
signal.signal(signal.SIGINT, signal_handler)
|
signal.signal(signal.SIGINT, signal_handler)
|
||||||
signal.signal(signal.SIGHUP, signal_handler)
|
signal.signal(signal.SIGHUP, signal_handler)
|
||||||
|
signal.signal(signal.SIGUSR1, signal_handler)
|
||||||
|
|
||||||
# Handle requests.
|
# Ensure the mount point exists
|
||||||
Context.log.info("Listening on port %s...", Context.config["port"])
|
if not os.path.exists(Context.config["fuse_base"]):
|
||||||
while Context.running:
|
os.makedirs(Context.config["fuse_base"])
|
||||||
Context.httpd.handle_request()
|
|
||||||
|
# Create the FUSE filesystem
|
||||||
|
pgmon_fuse = PGMonFuse()
|
||||||
|
fuse_options = set(llfuse.default_options)
|
||||||
|
fuse_options.add("fsname=pgmon")
|
||||||
|
# fuse_options.add('direct_io')
|
||||||
|
# if options.debug_fuse:
|
||||||
|
# fuse_options.add('allow_others')
|
||||||
|
try:
|
||||||
|
llfuse.init(pgmon_fuse, Context.config["fuse_base"], fuse_options)
|
||||||
|
except:
|
||||||
|
llfuse.close()
|
||||||
|
return -1
|
||||||
|
|
||||||
|
try:
|
||||||
|
llfuse.main(workers=1)
|
||||||
|
except:
|
||||||
|
llfuse.close()
|
||||||
|
raise
|
||||||
|
|
||||||
|
llfuse.close()
|
||||||
|
|
||||||
# Clean up PostgreSQL connections
|
# Clean up PostgreSQL connections
|
||||||
# TODO: Improve this ... not sure it actually closes all the connections cleanly
|
# TODO: Improve this ... not sure it actually closes all the connections cleanly
|
||||||
for pool in Context.connections.values():
|
for pool in Context.connections.values():
|
||||||
pool.close()
|
pool.closeall()
|
||||||
|
|
||||||
|
logging.shutdown()
|
||||||
|
|
||||||
|
print("Good bye.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user