Commit 754556b0 authored by Fred Drake's avatar Fred Drake

Merge pull request #11 from zopefoundation/ruok

Get status information via the zeo port and a nagios monitor
parents dd55476f 63135c37
...@@ -6,6 +6,10 @@ Changelog ...@@ -6,6 +6,10 @@ Changelog
- Add support for Python 3.4. - Add support for Python 3.4.
- Added a new ``ruok`` client protocol for getting server status on
the ZEO port without creating a full-blown client connection and
without logging in the server log.
- Log errors on server side even if using multi threaded delay. - Log errors on server side even if using multi threaded delay.
......
...@@ -131,6 +131,7 @@ setup(name="ZEO", ...@@ -131,6 +131,7 @@ setup(name="ZEO",
runzeo = ZEO.runzeo:main runzeo = ZEO.runzeo:main
zeopasswd = ZEO.zeopasswd:main zeopasswd = ZEO.zeopasswd:main
zeoctl = ZEO.zeoctl:main zeoctl = ZEO.zeoctl:main
zeo-nagios = ZEO.nagios:main
""", """,
include_package_data = True, include_package_data = True,
) )
...@@ -753,7 +753,7 @@ class ZEOStorage: ...@@ -753,7 +753,7 @@ class ZEOStorage:
self._iterators.pop(iid, None) self._iterators.pop(iid, None)
def server_status(self): def server_status(self):
return self.server.server_status(self) return self.server.server_status(self.storage_id)
def set_client_label(self, label): def set_client_label(self, label):
self.log_label = str(label)+' '+_addr_label(self.connection.addr) self.log_label = str(label)+' '+_addr_label(self.connection.addr)
...@@ -992,7 +992,7 @@ class StorageServer: ...@@ -992,7 +992,7 @@ class StorageServer:
zstorage = self.ZEOStorageClass(self, self.read_only) zstorage = self.ZEOStorageClass(self, self.read_only)
c = self.ManagedServerConnectionClass(sock, addr, zstorage, self) c = self.ManagedServerConnectionClass(sock, addr, zstorage, self)
log("new connection %s: %s" % (addr, repr(c))) log("new connection %s: %s" % (addr, repr(c)), logging.DEBUG)
return c return c
def register_connection(self, storage_id, conn): def register_connection(self, storage_id, conn):
...@@ -1303,14 +1303,19 @@ class StorageServer: ...@@ -1303,14 +1303,19 @@ class StorageServer:
with self._lock: with self._lock:
return bool([i for i in waiting if i[0] is zeostore]) return bool([i for i in waiting if i[0] is zeostore])
def server_status(self, zeostore): def server_status(self, storage_id):
storage_id = zeostore.storage_id
status = self.stats[storage_id].__dict__.copy() status = self.stats[storage_id].__dict__.copy()
status['connections'] = len(status['connections']) status['connections'] = len(status['connections'])
status['waiting'] = len(self._waiting[storage_id]) status['waiting'] = len(self._waiting[storage_id])
status['timeout-thread-is-alive'] = self.timeouts[storage_id].isAlive() status['timeout-thread-is-alive'] = self.timeouts[storage_id].isAlive()
status['last-transaction'] = (
self.storages[storage_id].lastTransaction().encode('hex'))
return status return status
def ruok(self):
return dict((storage_id, self.server_status(storage_id))
for storage_id in self.storages)
def _level_for_waiting(waiting): def _level_for_waiting(waiting):
if len(waiting) > 9: if len(waiting) > 9:
return logging.CRITICAL return logging.CRITICAL
......
from __future__ import print_function
##############################################################################
#
# Copyright (c) 2011 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""%prog [options] address
Where the address is an IPV6 address of the form: [addr]:port, an IPV4
address of the form: addr:port, or the name of a unix-domain socket file.
"""
import json
import optparse
import os
import re
import socket
import struct
import sys
import time
NO_TRANSACTION = '0'*16
nodiff_names = 'active_txns connections waiting'.split()
diff_names = 'aborts commits conflicts conflicts_resolved loads stores'.split()
per_times = dict(seconds=1.0, minutes=60.0, hours=3600.0, days=86400.0)
def new_metric(metrics, storage_id, name, value):
if storage_id == '1':
label = name
else:
if ' ' in storage_id:
label = "'%s:%s'" % (storage_id, name)
else:
label = "%s:%s" % (storage_id, name)
metrics.append("%s=%s" % (label, value))
def result(messages, metrics=(), status=None):
if metrics:
messages[0] += '|' + metrics[0]
if len(metrics) > 1:
messages.append('| ' + '\n '.join(metrics[1:]))
print('\n'.join(messages))
return status
def error(message):
return result((message, ), (), 2)
def warn(message):
return result((message, ), (), 1)
def check(addr, output_metrics, status, per):
m = re.match(r'\[(\S+)\]:(\d+)$', addr)
if m:
addr = m.group(1), int(m.group(2))
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
else:
m = re.match(r'(\S+):(\d+)$', addr)
if m:
addr = m.group(1), int(m.group(2))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
s.connect(addr)
except socket.error as err:
return error("Can't connect %s" % err)
fp = s.makefile()
fp.write('\x00\x00\x00\x04ruok')
fp.flush()
proto = fp.read(struct.unpack(">I", fp.read(4))[0])
datas = fp.read(struct.unpack(">I", fp.read(4))[0])
fp.close()
s.close()
data = json.loads(datas)
if not data:
return warn("No storages")
metrics = []
messages = []
level = 0
if output_metrics:
for storage_id, sdata in data.items():
for name in nodiff_names:
new_metric(metrics, storage_id, name, sdata[name])
if status:
now = time.time()
if os.path.exists(status):
dt = now - os.stat(status).st_mtime
if dt > 0: # sanity :)
with open(status) as f: # Read previous
old = json.loads(f.read())
dt /= per_times[per]
for storage_id, sdata in data.items():
sdata['sameple-time'] = now
if storage_id in old:
sold = old[storage_id]
for name in diff_names:
v = (sdata[name] - sold[name]) / dt
new_metric(metrics, storage_id, name, v)
with open(status, 'w') as f: # save current
f.write(json.dumps(data))
for storage_id, sdata in data.items():
if sdata['last-transaction'] == NO_TRANSACTION:
messages.append("Empty storage %r" % storage_id)
level = max(level, 1)
if not messages:
messages.append('OK')
return result(messages, metrics, level or None)
def main(args=None):
if args is None:
args = sys.argv[1:]
parser = optparse.OptionParser(__doc__)
parser.add_option(
'-m', '--output-metrics', action="store_true",
help ="Output metrics."
)
parser.add_option(
'-s', '--status-path',
help ="Path to status file, needed to get rate metrics"
)
parser.add_option(
'-u', '--time-units', type='choice', default='minutes',
choices=['seconds', 'minutes', 'hours', 'days'],
help ="Time unit for rate metrics"
)
(options, args) = parser.parse_args(args)
[addr] = args
return check(
addr, options.output_metrics, options.status_path, options.time_units)
if __name__ == '__main__':
main()
ZEO Nagios plugin
=================
ZEO includes a script that provides a nagios monitor plugin:
>>> import pkg_resources, time
>>> nagios = pkg_resources.load_entry_point(
... 'ZEO', 'console_scripts', 'zeo-nagios')
In it's simplest form, the script just checks if it can get status:
>>> import ZEO
>>> addr, stop = ZEO.server('test.fs')
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr])
Empty storage u'1'
1
The storage was empty. In that case, the monitor warned as much.
Let's add some data:
>>> ZEO.DB(addr).close()
>>> nagios([saddr])
OK
If we stop the server, we'll error:
>>> stop()
>>> nagios([saddr])
Can't connect [Errno 61] Connection refused
2
Metrics
-------
The monitor will optionally output server metric data. There are 2
kinds of metrics it can output, level and rate metric. If we use the
-m/--output-metrics option, we'll just get rate metrics:
>>> addr, stop = ZEO.server('test.fs')
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m'])
OK|active_txns=0
| connections=0
waiting=0
We only got the metrics that are levels, like current number of
connections. If we want rate metrics, we need to be able to save
values from run to run. We need to use the -s/--status-path option to
specify the name of a file for status information:
>>> nagios([saddr, '-m', '-sstatus'])
OK|active_txns=0
| connections=0
waiting=0
We still didn't get any rate metrics, because we've only run once.
Let's actually do something with the database and then make another
sample.
>>> db = ZEO.DB(addr)
>>> nagios([saddr, '-m', '-sstatus'])
OK|active_txns=0
| connections=1
waiting=0
aborts=0.0
commits=0.0
conflicts=0.0
conflicts_resolved=0.0
loads=81.226297803
stores=0.0
Note that this time, we saw that there was a connection.
The ZEO.nagios module provides a check function that can be used by
other monitors (e.g. that get address data from ZooKeeper). It takes:
- Address string,
- Metrics flag.
- Status file name (or None), and
- Time units for rate metrics
::
>>> import ZEO.nagios
>>> ZEO.nagios.check(saddr, True, 'status', 'seconds')
OK|active_txns=0
| connections=1
waiting=0
aborts=0.0
commits=0.0
conflicts=0.0
conflicts_resolved=0.0
loads=0.0
stores=0.0
>>> db.close()
>>> stop()
Multi-storage servers
---------------------
A ZEO server can host multiple servers. (This is a feature that will
likely be dropped in the future.) When this is the case, the monitor
profixes metrics with a storage id.
>>> addr, stop = ZEO.server(
... storage_conf = """
... <mappingstorage first>
... </mappingstorage>
... <mappingstorage second>
... </mappingstorage>
... """)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m', '-sstatus'])
Empty storage u'second'|second:active_txns=0
Empty storage u'first'
| second:connections=0
second:waiting=0
first:active_txns=0
first:connections=0
first:waiting=0
1
>>> nagios([saddr, '-m', '-sstatus'])
Empty storage u'second'|second:active_txns=0
Empty storage u'first'
| second:connections=0
second:waiting=0
first:active_txns=0
first:connections=0
first:waiting=0
second:aborts=0.0
second:commits=0.0
second:conflicts=0.0
second:conflicts_resolved=0.0
second:loads=0.0
second:stores=0.0
first:aborts=0.0
first:commits=0.0
first:conflicts=0.0
first:conflicts_resolved=0.0
first:loads=0.0
first:stores=0.0
1
>>> stop()
...@@ -35,6 +35,7 @@ import doctest ...@@ -35,6 +35,7 @@ import doctest
import logging import logging
import os import os
import persistent import persistent
import pprint
import re import re
import shutil import shutil
import signal import signal
...@@ -1299,7 +1300,6 @@ def test_server_status(): ...@@ -1299,7 +1300,6 @@ def test_server_status():
>>> addr, _ = start_server(zeo_conf=dict(transaction_timeout=1)) >>> addr, _ = start_server(zeo_conf=dict(transaction_timeout=1))
>>> db = ZEO.DB(addr) >>> db = ZEO.DB(addr)
>>> import pprint
>>> pprint.pprint(db.storage.server_status(), width=40) >>> pprint.pprint(db.storage.server_status(), width=40)
{'aborts': 0, {'aborts': 0,
'active_txns': 0, 'active_txns': 0,
...@@ -1307,6 +1307,7 @@ def test_server_status(): ...@@ -1307,6 +1307,7 @@ def test_server_status():
'conflicts': 0, 'conflicts': 0,
'conflicts_resolved': 0, 'conflicts_resolved': 0,
'connections': 1, 'connections': 1,
'last-transaction': '03ac11b771fa1c00',
'loads': 1, 'loads': 1,
'lock_time': None, 'lock_time': None,
'start': 'Tue May 4 10:55:20 2010', 'start': 'Tue May 4 10:55:20 2010',
...@@ -1318,6 +1319,35 @@ def test_server_status(): ...@@ -1318,6 +1319,35 @@ def test_server_status():
>>> db.close() >>> db.close()
""" """
def test_ruok():
"""
You can also get server status using the ruok protocol.
>>> addr, _ = start_server(zeo_conf=dict(transaction_timeout=1))
>>> db = ZEO.DB(addr) # force a transaction :)
>>> import json, socket, struct
>>> s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>>> s.connect(addr)
>>> _ = s.send(struct.pack(">I", 4)+"ruok")
>>> proto = s.recv(struct.unpack(">I", s.recv(4))[0])
>>> pprint.pprint(json.loads(s.recv(struct.unpack(">I", s.recv(4))[0])))
{u'1': {u'aborts': 0,
u'active_txns': 0,
u'commits': 1,
u'conflicts': 0,
u'conflicts_resolved': 0,
u'connections': 1,
u'last-transaction': u'03ac11cd11372499',
u'loads': 1,
u'lock_time': None,
u'start': u'Sun Jan 4 09:37:03 2015',
u'stores': 1,
u'timeout-thread-is-alive': True,
u'verifying_clients': 0,
u'waiting': 0}}
>>> db.close(); s.close()
"""
def client_labels(): def client_labels():
""" """
When looking at server logs, for servers with lots of clients coming When looking at server logs, for servers with lots of clients coming
...@@ -1754,10 +1784,14 @@ def test_suite(): ...@@ -1754,10 +1784,14 @@ def test_suite():
zeo = unittest.TestSuite() zeo = unittest.TestSuite()
zeo.addTest(unittest.makeSuite(ZODB.tests.util.AAAA_Test_Runner_Hack)) zeo.addTest(unittest.makeSuite(ZODB.tests.util.AAAA_Test_Runner_Hack))
patterns = [ patterns = [
(re.compile(r"'start': '[^\n]+'"), 'start'), (re.compile(r"u?'start': u?'[^\n]+'"), 'start'),
(re.compile(r"u?'last-transaction': u?'[0-9a-f]+'"),
'last-transaction'),
(re.compile("ZODB.POSException.ConflictError"), "ConflictError"), (re.compile("ZODB.POSException.ConflictError"), "ConflictError"),
(re.compile("ZODB.POSException.POSKeyError"), "POSKeyError"), (re.compile("ZODB.POSException.POSKeyError"), "POSKeyError"),
(re.compile("ZEO.Exceptions.ClientStorageError"), "ClientStorageError"), (re.compile("ZEO.Exceptions.ClientStorageError"), "ClientStorageError"),
(re.compile(r"\[Errno \d+\]"), '[Errno N]'),
(re.compile(r"loads=\d+\.\d+"), 'loads=42.42'),
] ]
if not PY3: if not PY3:
patterns.append((re.compile("^'(blob[^']*)'"), r"b'\1'")) patterns.append((re.compile("^'(blob[^']*)'"), r"b'\1'"))
...@@ -1781,7 +1815,7 @@ def test_suite(): ...@@ -1781,7 +1815,7 @@ def test_suite():
'zeo-fan-out.test', 'zdoptions.test', 'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test', 'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt', 'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
'dynamic_server_ports.test', 'new_addr.test', 'dynamic_server_ports.test', 'new_addr.test', '../nagios.rst',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown, setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns), checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function}, globs={'print_function': print_function},
......
...@@ -351,6 +351,7 @@ statistics using the server_status method: ...@@ -351,6 +351,7 @@ statistics using the server_status method:
'conflicts': 0, 'conflicts': 0,
'conflicts_resolved': 0, 'conflicts_resolved': 0,
'connections': 11, 'connections': 11,
'last-transaction': '0000000000000000',
'loads': 0, 'loads': 0,
'lock_time': 1272653598.693882, 'lock_time': 1272653598.693882,
'start': 'Fri Apr 30 14:53:18 2010', 'start': 'Fri Apr 30 14:53:18 2010',
......
...@@ -167,7 +167,8 @@ def main(): ...@@ -167,7 +167,8 @@ def main():
elif opt == '-S': elif opt == '-S':
suicide = False suicide = False
elif opt == '-v': elif opt == '-v':
ZEO.zrpc.connection.Connection.current_protocol = arg.encode('ascii') ZEO.zrpc.connection.Connection.current_protocol = arg.encode(
'ascii')
zo = ZEO.runzeo.ZEOOptions() zo = ZEO.runzeo.ZEOOptions()
zo.realize(["-C", configfile]) zo.realize(["-C", configfile])
...@@ -181,11 +182,11 @@ def main(): ...@@ -181,11 +182,11 @@ def main():
else: else:
test_addr = addr + '-test' test_addr = addr + '-test'
log(label, 'creating the storage server') log(label, 'creating the storage server')
storage = zo.storages[0].open()
mon_addr = None mon_addr = None
if zo.monitor_address: if zo.monitor_address:
mon_addr = zo.monitor_address mon_addr = zo.monitor_address
server = ZEO.runzeo.create_server({"1": storage}, zo) storages = dict((s.name or '1', s.open()) for s in zo.storages)
server = ZEO.runzeo.create_server(storages, zo)
try: try:
log(label, 'creating the test server, keep: %s', keep) log(label, 'creating the test server, keep: %s', keep)
...@@ -194,8 +195,9 @@ def main(): ...@@ -194,8 +195,9 @@ def main():
if e[0] != errno.EADDRINUSE: if e[0] != errno.EADDRINUSE:
raise raise
log(label, 'addr in use, closing and exiting') log(label, 'addr in use, closing and exiting')
storage.close() for storage in storages.values():
cleanup(storage) storage.close()
cleanup(storage)
sys.exit(2) sys.exit(2)
t.register_socket(server.dispatcher) t.register_socket(server.dispatcher)
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
############################################################################## ##############################################################################
import asyncore import asyncore
import errno import errno
import json
import sys import sys
import threading import threading
import logging import logging
...@@ -631,8 +632,13 @@ class ManagedServerConnection(Connection): ...@@ -631,8 +632,13 @@ class ManagedServerConnection(Connection):
self.message_output(self.current_protocol) self.message_output(self.current_protocol)
def recv_handshake(self, proto): def recv_handshake(self, proto):
Connection.recv_handshake(self, proto) if proto == 'ruok':
self.obj.notifyConnected(self) self.message_output(json.dumps(self.mgr.ruok()))
self.poll()
Connection.close(self)
else:
Connection.recv_handshake(self, proto)
self.obj.notifyConnected(self)
def close(self): def close(self):
self.obj.notifyDisconnected() self.obj.notifyDisconnected()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment