monitor.py 4.83 KB
Newer Older
1 2 3 4 5 6
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
Jim Fulton's avatar
Jim Fulton committed
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8 9 10 11 12 13 14 15
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.

16
$Id$
17 18 19 20 21 22
"""

import asyncore
import socket
import time
import types
23
import logging
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40

import ZEO

class StorageStats:
    """Per-storage usage statistics."""

    def __init__(self):
        self.loads = 0
        self.stores = 0
        self.commits = 0
        self.aborts = 0
        self.active_txns = 0
        self.clients = 0
        self.verifying_clients = 0
        self.lock_time = None
        self.conflicts = 0
        self.conflicts_resolved = 0
41
        self.start = time.ctime()
42

43 44 45 46 47 48 49 50 51 52 53 54 55 56
    def parse(self, s):
        # parse the dump format
        lines = s.split("\n")
        for line in lines:
            field, value = line.split(":", 1)
            if field == "Server started":
                self.start = value
            elif field == "Clients":
                self.clients = int(value)
            elif field == "Clients verifying":
                self.verifying_clients = int(value)
            elif field == "Active transactions":
                self.active_txns = int(value)
            elif field == "Commit lock held for":
57
                # This assumes
58 59 60 61 62 63 64 65 66 67 68 69 70 71
                self.lock_time = time.time() - int(value)
            elif field == "Commits":
                self.commits = int(value)
            elif field == "Aborts":
                self.aborts = int(value)
            elif field == "Loads":
                self.loads = int(value)
            elif field == "Stores":
                self.stores = int(value)
            elif field == "Conflicts":
                self.conflicts = int(value)
            elif field == "Conflicts resolved":
                self.conflicts_resolved = int(value)

72
    def dump(self, f):
73
        print >> f, "Server started:", self.start
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
        print >> f, "Clients:", self.clients
        print >> f, "Clients verifying:", self.verifying_clients
        print >> f, "Active transactions:", self.active_txns
        if self.lock_time:
            howlong = time.time() - self.lock_time
            print >> f, "Commit lock held for:", int(howlong)
        print >> f, "Commits:", self.commits
        print >> f, "Aborts:", self.aborts
        print >> f, "Loads:", self.loads
        print >> f, "Stores:", self.stores
        print >> f, "Conflicts:", self.conflicts
        print >> f, "Conflicts resolved:", self.conflicts_resolved

class StatsClient(asyncore.dispatcher):

    def __init__(self, sock, addr):
        asyncore.dispatcher.__init__(self, sock)
        self.buf = []
        self.closed = 0

    def close(self):
        self.closed = 1
        # The socket is closed after all the data is written.
        # See handle_write().

    def write(self, s):
        self.buf.append(s)

    def writable(self):
        return len(self.buf)

    def readable(self):
        return 0

    def handle_write(self):
        s = "".join(self.buf)
        self.buf = []
        n = self.socket.send(s)
        if n < len(s):
            self.buf.append(s[:n])
114

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
        if self.closed and not self.buf:
            asyncore.dispatcher.close(self)

class StatsServer(asyncore.dispatcher):

    StatsConnectionClass = StatsClient

    def __init__(self, addr, stats):
        asyncore.dispatcher.__init__(self)
        self.addr = addr
        self.stats = stats
        if type(self.addr) == types.TupleType:
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.set_reuse_addr()
131 132
        logger = logging.getLogger('ZEO.monitor')
        logger.info("listening on %s", repr(self.addr))
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
        self.bind(self.addr)
        self.listen(5)

    def writable(self):
        return 0

    def readable(self):
        return 1

    def handle_accept(self):
        try:
            sock, addr = self.accept()
        except socket.error:
            return
        f = self.StatsConnectionClass(sock, addr)
        self.dump(f)
        f.close()

    def dump(self, f):
        print >> f, "ZEO monitor server version %s" % ZEO.version
        print >> f, time.ctime()
        print >> f

        L = self.stats.keys()
        L.sort()
        for k in L:
            stats = self.stats[k]
            print >> f, "Storage:", k
            stats.dump(f)
            print >> f