Commit ac3195d5 authored by Barry Warsaw's avatar Barry Warsaw

Lots of changes, maybe not all of them good ;)

Added a module docstring as a usage() string.  Also add --help/-h.

Rewrote two of the regexps as verbose re's for readability <wink>.
Use named groups.

Use True/False where appropriate.

Txn: Added a `voters' attribute which is a list of clients seen doing
a vote.  This is cleared on abort or finish.  Thus if we see a
transaction with a non-empty voters list, we know that that
transaction was blocked and it was the first voter that grabbed the
lock.  This client (as an address) is displayed in the report.

call_*(): Pass the client address into the call, as grep'd out of the
zrpc-conn part of the line.  Only call_tpc_vote() really cares about
this.

Most controversial: process the entire file, line by line.
Alternatives, if necessary:
    - go back to the original progressive skip backwards approach
    - add an incremental reader (recording file positions)
    - use as a filter
parent 9c324cc5
#! /usr/bin/env python #! /usr/bin/env python
"""Report on the number of currently waiting clients in the ZEO queue.""" """Report on the number of currently waiting clients in the ZEO queue.
Usage: zeoqueue.py [options] logfile
Options:
-h / --help
Print this help text and exit.
-v
Verbose output
"""
import getopt
import re import re
import sys import sys
import time import time
import getopt
# pick arbitrary buffer size that isn't too big COMMASPACE = ', '
BUFSIZE = 8 * 1024 * 1024
try:
rx_time = re.compile('(\d\d\d\d-\d\d-\d\d)T(\d\d:\d\d:\d\d)') True, False
except NameError:
True = 1
False = 0
tcre = re.compile(r"""
(?P<ymd>
\d{4}- # year
\d{2}- # month
\d{2}) # day
T # separator
(?P<hms>
\d{2}: # hour
\d{2}: # minute
\d{2}) # second
""", re.VERBOSE)
ccre = re.compile(r"""
zrpc-conn:(?P<addr>\d+.\d+.\d+.\d+:\d+)\s+
calling\s+
(?P<method>
\w+) # the method
\( # args open paren
\' # string quote start
(?P<tid>
\S+) # first argument -- usually the tid
\' # end of string
(?P<rest>
.*) # rest of line
""", re.VERBOSE)
wcre = re.compile(r"""Clients waiting: (?P<num>\d+)""")
def parse_time(line): def parse_time(line):
"""Return the time portion of a zLOG line in seconds or None.""" """Return the time portion of a zLOG line in seconds or None."""
mo = rx_time.match(line) mo = tcre.match(line)
if mo is None: if mo is None:
return None return None
date, time_ = mo.group(1, 2) date, time_ = mo.group('ymd', 'hms')
date_l = [int(elt) for elt in date.split('-')] date_l = [int(elt) for elt in date.split('-')]
time_l = [int(elt) for elt in time_.split(':')] time_l = [int(elt) for elt in time_.split(':')]
return int(time.mktime(date_l + time_l + [0, 0, 0])) return int(time.mktime(date_l + time_l + [0, 0, 0]))
class Txn: class Txn:
"""Track status of single transaction.""" """Track status of single transaction."""
def __init__(self, tid): def __init__(self, tid):
self.tid = tid self.tid = tid
self.hint = None self.hint = None
...@@ -31,13 +76,16 @@ class Txn: ...@@ -31,13 +76,16 @@ class Txn:
self.vote = None self.vote = None
self.abort = None self.abort = None
self.finish = None self.finish = None
self.voters = []
def isactive(self): def isactive(self):
if self.begin and not (self.abort or self.finish): if self.begin and not (self.abort or self.finish):
return 1 return True
else: else:
return 0 return False
class Status: class Status:
"""Track status of ZEO server by replaying log records. """Track status of ZEO server by replaying log records.
...@@ -46,6 +94,7 @@ class Status: ...@@ -46,6 +94,7 @@ class Status:
- The last committed transaction. - The last committed transaction.
- The last committed or aborted transaction. - The last committed or aborted transaction.
- The last transaction that got the lock but didn't finish. - The last transaction that got the lock but didn't finish.
- The client address doing the first vote of a transaction.
- The number of currently active transactions. - The number of currently active transactions.
- The number of reported queued transactions. - The number of reported queued transactions.
- Client restarts. - Client restarts.
...@@ -78,6 +127,7 @@ class Status: ...@@ -78,6 +127,7 @@ class Status:
def reset(self): def reset(self):
self.commit = None self.commit = None
self.commit_or_abort = None self.commit_or_abort = None
self.last_unfinished = None
self.n_active = 0 self.n_active = 0
self.n_blocked = 0 self.n_blocked = 0
self.n_conns = 0 self.n_conns = 0
...@@ -88,88 +138,58 @@ class Status: ...@@ -88,88 +138,58 @@ class Status:
# The status report will always be complete if we encounter an # The status report will always be complete if we encounter an
# explicit restart. # explicit restart.
if self.t_restart is not None: if self.t_restart is not None:
return 1 return True
# If we haven't seen a restart, assume that seeing a finished # If we haven't seen a restart, assume that seeing a finished
# transaction is good enough. # transaction is good enough.
return self.commit is not None return self.commit is not None
def report(self):
print "Blocked transactions:", self.n_blocked
if not VERBOSE:
return
if self.t_restart:
print "Server started:", time.ctime(self.t_restart)
if self.commit is not None:
t = self.commit_or_abort.finish
if t is None:
t = self.commit_or_abort.abort
print "Last finished transaction:", time.ctime(t)
# the blocked transaction should be the first one that calls vote
L = [(txn.begin, txn) for txn in self.txns.values()]
L.sort()
for x, txn in L:
if txn.isactive():
began = txn.begin
print "Blocked transaction began at:", time.ctime(began)
print "Hint:", txn.hint
print "Idle time: %d sec" % int(time.time() - began)
break
def process(self, line): def process(self, line):
if line.find("calling") != -1: if line.find("calling") != -1:
self.process_call(line) self.process_call(line)
elif line.find("connect") != -1: elif line.find("connect") != -1:
self.process_connect(line) self.process_connect(line)
elif line.find("locked") != -1:
# test for "locked" because word may start with "B" or "b" # test for "locked" because word may start with "B" or "b"
elif line.find("locked") != -1:
self.process_block(line) self.process_block(line)
elif line.find("Starting") != -1: elif line.find("Starting") != -1:
self.process_start(line) self.process_start(line)
rx_call = re.compile("calling (\w+)\(\'(\S+)\'(.*)")
def process_call(self, line): def process_call(self, line):
mo = self.rx_call.search(line) mo = ccre.search(line)
if mo is None: if mo is None:
return return
called_method = mo.group(1) called_method = mo.group('method')
# XXX exit earlier if we've got zeoLoad, because it's the most # XXX exit earlier if we've got zeoLoad, because it's the most
# frequently called method and we don't use it. # frequently called method and we don't use it.
if called_method == "zeoLoad": if called_method == "zeoLoad":
return return
t = parse_time(line) t = parse_time(line)
meth = getattr(self, "call_%s" % called_method, None) meth = getattr(self, "call_%s" % called_method, None)
if meth is None: if meth is None:
return return
tid = mo.group(2) client = mo.group('addr')
rest = mo.group(3) tid = mo.group('tid')
meth(t, tid, rest) rest = mo.group('rest')
meth(t, client, tid, rest)
def process_connect(self, line): def process_connect(self, line):
pass pass
rx_waiting = re.compile("Clients waiting: (\d+)")
def process_block(self, line): def process_block(self, line):
mo = self.rx_waiting.search(line) mo = wcre.search(line)
if mo is None: if mo is None:
# assume that this was a restart message for the last blocked # assume that this was a restart message for the last blocked
# transaction. # transaction.
self.n_blocked = 0 self.n_blocked = 0
else: else:
self.n_blocked = int(mo.group(1)) self.n_blocked = int(mo.group('num'))
def process_start(self, line): def process_start(self, line):
if line.find("Starting ZEO server") != -1: if line.find("Starting ZEO server") != -1:
self.reset() self.reset()
self.t_restart = parse_time(line) self.t_restart = parse_time(line)
def call_tpc_begin(self, t, tid, rest): def call_tpc_begin(self, t, client, tid, rest):
txn = Txn(tid) txn = Txn(tid)
txn.begin = t txn.begin = t
if rest[0] == ',': if rest[0] == ',':
...@@ -180,22 +200,24 @@ class Status: ...@@ -180,22 +200,24 @@ class Status:
txn.hint = rest txn.hint = rest
self.txns[tid] = txn self.txns[tid] = txn
self.n_active += 1 self.n_active += 1
self.last_unfinished = txn
def call_vote(self, t, tid, rest): def call_vote(self, t, client, tid, rest):
txn = self.txns.get(tid) txn = self.txns.get(tid)
if txn is None: if txn is None:
print "Oops!" print "Oops!"
txn = self.txns[tid] = Txn(tid) txn = self.txns[tid] = Txn(tid)
txn.vote = t txn.vote = t
txn.voters.append(client)
def call_tpc_abort(self, t, tid, rest): def call_tpc_abort(self, t, client, tid, rest):
txn = self.txns.get(tid) txn = self.txns.get(tid)
if txn is None: if txn is None:
print "Oops!" print "Oops!"
txn = self.txns[tid] = Txn(tid) txn = self.txns[tid] = Txn(tid)
txn.abort = t txn.abort = t
txn.voters = []
self.n_active -= 1 self.n_active -= 1
if self.commit_or_abort: if self.commit_or_abort:
# delete the old transaction # delete the old transaction
try: try:
...@@ -204,14 +226,14 @@ class Status: ...@@ -204,14 +226,14 @@ class Status:
pass pass
self.commit_or_abort = txn self.commit_or_abort = txn
def call_tpc_finish(self, t, tid, rest): def call_tpc_finish(self, t, client, tid, rest):
txn = self.txns.get(tid) txn = self.txns.get(tid)
if txn is None: if txn is None:
print "Oops!" print "Oops!"
txn = self.txns[tid] = Txn(tid) txn = self.txns[tid] = Txn(tid)
txn.finish = t txn.finish = t
txn.voters = []
self.n_active -= 1 self.n_active -= 1
if self.commit: if self.commit:
# delete the old transaction # delete the old transaction
try: try:
...@@ -226,38 +248,72 @@ class Status: ...@@ -226,38 +248,72 @@ class Status:
pass pass
self.commit = self.commit_or_abort = txn self.commit = self.commit_or_abort = txn
def process_from(f, pos): def report(self):
s = Status() print "Blocked transactions:", self.n_blocked
f.seek(-pos, 2) if not VERBOSE:
f.readline() return
for line in f.readlines(BUFSIZE): if self.t_restart:
s.process(line) print "Server started:", time.ctime(self.t_restart)
return s
if self.commit is not None:
t = self.commit_or_abort.finish
if t is None:
t = self.commit_or_abort.abort
print "Last finished transaction:", time.ctime(t)
# the blocked transaction should be the first one that calls vote
L = [(txn.begin, txn) for txn in self.txns.values()]
L.sort()
for x, txn in L:
if txn.isactive():
began = txn.begin
if txn.voters:
print "Blocked client (first vote):", txn.voters[0]
print "Blocked transaction began at:", time.ctime(began)
print "Hint:", txn.hint
print "Idle time: %d sec" % int(time.time() - began)
break
def usage(code, msg=''):
print >> sys.stderr, __doc__
if msg:
print >> sys.stderr, msg
sys.exit(code)
def main(): def main():
global VERBOSE global VERBOSE
VERBOSE = 0 VERBOSE = 0
opts, args = getopt.getopt(sys.argv[1:], 'v')
for k, v in opts: try:
if k == '-v': opts, args = getopt.getopt(sys.argv[1:], 'vh', ['help'])
except getopt.error, msg:
usage(1, msg)
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt == '-v':
VERBOSE += 1 VERBOSE += 1
if not args:
usage(1, 'logfile is required')
if len(args) > 1:
usage(1, 'too many arguments: %s' % COMMASPACE.join(args))
path = args[0] path = args[0]
f = open(path, "rb") f = open(path, "rb")
s = Status()
# Start at pos bytes from the end of the file and read forwards. while True:
# If we read enough log data to have a complete snapshot of the line = f.readline()
# server state, stop and print a report. If not, move twice as if not line:
# far from the end of the file and repeat.
pos = 16 * 1024
while 1:
s = process_from(f, pos)
if s.iscomplete():
break break
pos *= 2 s.process(line)
s.report() s.report()
if __name__ == "__main__": if __name__ == "__main__":
main() main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment