Move all includes back to global, modify agent arguments
This commit is contained in:
parent
3abbc1d83b
commit
3a937ca3bd
66
pgmon.py
66
pgmon.py
@ -5,19 +5,12 @@ import logging
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
|
||||
if "pytest" in sys.modules:
|
||||
# Conditional modules are needed for tests, so import them if this is a test
|
||||
import pytest
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def pytest_imports():
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import queue
|
||||
import os
|
||||
import signal
|
||||
import json
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import queue
|
||||
import os
|
||||
import signal
|
||||
import json
|
||||
|
||||
#
|
||||
# Errors
|
||||
@ -72,7 +65,7 @@ class Config:
|
||||
self.user = 'postgres' # DB user
|
||||
self.password = None # DB password
|
||||
self.dbname = 'postgres' # DB name
|
||||
|
||||
|
||||
self.metrics = {} # Metrics
|
||||
|
||||
# Dynamic values
|
||||
@ -90,7 +83,7 @@ class Config:
|
||||
continue
|
||||
elif line == '':
|
||||
continue
|
||||
|
||||
|
||||
(key, value) = line.split('=', 1)
|
||||
if value is None:
|
||||
raise InvalidConfigError("{}: {}", config_file, line)
|
||||
@ -127,7 +120,7 @@ class Config:
|
||||
(name, ret_type, version, sql) = metric_def.split(':', 3)
|
||||
if sql is None:
|
||||
raise InvalidConfigError
|
||||
|
||||
|
||||
if sql.startswith('file:'):
|
||||
(_,path) = sql.split(':', 1)
|
||||
with open(path, 'r') as f:
|
||||
@ -171,7 +164,7 @@ class DB:
|
||||
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute(sql, args)
|
||||
return(cur.fetchall())
|
||||
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
if self.conn is not None:
|
||||
@ -270,7 +263,7 @@ class IPC:
|
||||
except OSError:
|
||||
if os.path.exists(self.config.ipc_socket):
|
||||
raise
|
||||
|
||||
|
||||
# Create the socket
|
||||
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
# Set a timeout for accepting connections to be able to catch signals
|
||||
@ -285,7 +278,7 @@ class IPC:
|
||||
|
||||
else:
|
||||
raise RuntimeError
|
||||
|
||||
|
||||
def connect(self):
|
||||
"""
|
||||
Establish a connection to a socket
|
||||
@ -336,7 +329,7 @@ class Agent:
|
||||
conn = ipc.connect()
|
||||
|
||||
# Send a request
|
||||
ipc.send(conn, "{},{}".format(key, ",".join(args)))
|
||||
ipc.send(conn, "{},{}".format(key, args))
|
||||
|
||||
# Wait for a response
|
||||
res = ipc.recv(conn)
|
||||
@ -355,13 +348,6 @@ class Server:
|
||||
"""
|
||||
@staticmethod
|
||||
def run(config):
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import queue
|
||||
import os
|
||||
import signal
|
||||
import json
|
||||
|
||||
# Set up the signal handler
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
@ -376,14 +362,14 @@ class Server:
|
||||
|
||||
# Listen on ipc socket
|
||||
ipc = IPC(config, 'server')
|
||||
|
||||
|
||||
while True:
|
||||
# Wait for a request connection
|
||||
try:
|
||||
conn = ipc.accept()
|
||||
except socket.timeout:
|
||||
conn = None
|
||||
|
||||
|
||||
# See if we should exit
|
||||
if not running:
|
||||
break
|
||||
@ -397,10 +383,10 @@ class Server:
|
||||
|
||||
# Receive ipc request (csv)
|
||||
try:
|
||||
(key, args_csv) = req_csv.split(',', 1)
|
||||
(key, args_str) = req_csv.split(',', 1)
|
||||
args_dict = {}
|
||||
if args_csv != "":
|
||||
for (k, v) in [a.split('=', 1) for a in args_csv.split(',')]:
|
||||
if args_str!= "":
|
||||
for (k, v) in [a.split('=', 1) for a in args_str.split(';')]:
|
||||
args_dict[k] = v
|
||||
except socket.timeout:
|
||||
print("IPC communication timeout receiving request")
|
||||
@ -421,7 +407,7 @@ class Server:
|
||||
print("Failed to queue request")
|
||||
req.set_result("ERROR: Queue timeout")
|
||||
continue
|
||||
|
||||
|
||||
# Spawn a thread to wait for the result
|
||||
r = Responder(config, ipc, conn, req)
|
||||
r.start()
|
||||
@ -543,10 +529,10 @@ def main():
|
||||
|
||||
# Operational mode
|
||||
parser.add_argument('-m', '--mode', choices=['agent', 'server'], required=True)
|
||||
|
||||
|
||||
# Agent options
|
||||
parser.add_argument('-k', '--key')
|
||||
parser.add_argument('-a', '--args', nargs='*', default=[])
|
||||
parser.add_argument('-a', '--args')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@ -590,8 +576,8 @@ class TestRequest:
|
||||
req1.set_result('blah')
|
||||
assert not req1.complete.locked()
|
||||
assert 'blah' == req1.get_result()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestMetric:
|
||||
def test_metric_creation(self):
|
||||
@ -615,7 +601,7 @@ class TestMetric:
|
||||
# Make sure added versions are correct
|
||||
assert m1.versions[0].sql == 'default'
|
||||
assert m1.versions[120003].sql == 'v12.3'
|
||||
|
||||
|
||||
def test_metric_get_version(self):
|
||||
# Test retrieving metric versions
|
||||
m1 = Metric('foo', 'value')
|
||||
@ -630,8 +616,8 @@ class TestMetric:
|
||||
|
||||
# Make sure cache is set
|
||||
assert m1.cached is not None
|
||||
assert m1.cached_version is 110003
|
||||
|
||||
assert m1.cached_version == 110003
|
||||
|
||||
# Make sure returned value changes with version
|
||||
assert m1.get_version(120000).sql == 'v12.0'
|
||||
assert m1.get_version(150005).sql == 'v12.0'
|
||||
|
||||
1
pgmon_userparameter.conf
Normal file
1
pgmon_userparameter.conf
Normal file
@ -0,0 +1 @@
|
||||
UserParameter=pgmon[*],/usr/local/bin/pgmon.py -c /etc/zabbix/pgmon.cfg -a "$1"
|
||||
Loading…
Reference in New Issue
Block a user