Commit 1c26a9ac authored by Shane Hathaway's avatar Shane Hathaway

Merged shane-activity-monitoring-branch.

parent 828c9df6
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""ZODB transfer activity monitoring
$Id: ActivityMonitor.py,v 1.2 2002/06/10 20:20:44 shane Exp $"""
__version__='$Revision: 1.2 $'[11:-2]
import time
class ActivityMonitor:
"""ZODB load/store activity monitor
This simple implementation just keeps a small log in memory
and iterates over the log when getActivityAnalysis() is called.
It assumes that log entries are added in chronological sequence,
which is only guaranteed because DB.py holds a lock when calling
the closedConnection() method.
"""
def __init__(self, history_length=3600):
self.history_length = history_length # Number of seconds
self.log = [] # [(time, loads, stores)]
def closedConnection(self, conn):
log = self.log
now = time.time()
loads, stores = conn.getTransferCounts(1)
log.append((now, loads, stores))
self.trim(now)
def trim(self, now):
log = self.log
cutoff = now - self.history_length
n = 0
loglen = len(log)
while n < loglen and log[n][0] < cutoff:
n = n + 1
if n:
del log[:n]
def setHistoryLength(self, history_length):
self.history_length = history_length
self.trim(time.time())
def getHistoryLength(self):
return self.history_length
def getActivityAnalysis(self, start=0, end=0, divisions=10):
res = []
log = self.log
now = time.time()
if start == 0:
start = now - self.history_length
if end == 0:
end = now
for n in range(divisions):
res.append({
'start': start + (end - start) * n / divisions,
'end': start + (end - start) * (n + 1) / divisions,
'loads': 0,
'stores': 0,
})
div = res[0]
div_start = div['start']
div_end = div['end']
div_index = 0
total_loads = 0
total_stores = 0
for t, loads, stores in self.log:
if t < start:
# We could use a binary search to find the start.
continue
elif t > end:
# We could use a binary search to find the end also.
break
while t > div_end:
div['loads'] = total_loads
div['stores'] = total_stores
total_loads = 0
total_stores = 0
div_index = div_index + 1
if div_index < divisions:
div = res[div_index]
div_start = div['start']
div_end = div['end']
total_loads = total_loads + loads
total_stores = total_stores + stores
div['stores'] = div['stores'] + total_stores
div['loads'] = div['loads'] + total_loads
return res
......@@ -13,8 +13,8 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.66 2002/04/15 18:42:50 jeremy Exp $"""
__version__='$Revision: 1.66 $'[11:-2]
$Id: Connection.py,v 1.67 2002/06/10 20:20:44 shane Exp $"""
__version__='$Revision: 1.67 $'[11:-2]
from cPickleCache import PickleCache, MUCH_RING_CHECKING
from POSException import ConflictError, ReadConflictError
......@@ -83,6 +83,8 @@ class Connection(ExportImport.ExportImport):
self._invalid=d.has_key
self._committed=[]
self._code_timestamp = global_code_timestamp
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
def _cache_items(self):
# find all items on the lru list
......@@ -383,6 +385,7 @@ class Connection(ExportImport.ExportImport):
dump(state)
p=file(1)
s=dbstore(oid,serial,p,version,transaction)
self._store_count = self._store_count + 1
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
......@@ -486,6 +489,7 @@ class Connection(ExportImport.ExportImport):
try:
p, serial = self._storage.load(oid, self._version)
self._load_count = self._load_count + 1
# XXX this is quite conservative!
# We need, however, to avoid reading data from a transaction
......@@ -692,6 +696,17 @@ class Connection(ExportImport.ExportImport):
def getDebugInfo(self): return self._debug_info
def setDebugInfo(self, *args): self._debug_info=self._debug_info+args
def getTransferCounts(self, clear=0):
"""Returns the number of objects loaded and stored.
Set the clear argument to reset the counters.
"""
res = (self._load_count, self._store_count)
if clear:
self._load_count = 0
self._store_count = 0
return res
######################################################################
# Just plain weird. Don't try this at home kids.
......
......@@ -13,8 +13,8 @@
##############################################################################
"""Database objects
$Id: DB.py,v 1.41 2002/04/15 18:42:51 jeremy Exp $"""
__version__='$Revision: 1.41 $'[11:-2]
$Id: DB.py,v 1.42 2002/06/10 20:20:44 shane Exp $"""
__version__='$Revision: 1.42 $'[11:-2]
import cPickle, cStringIO, sys, POSException, UndoLogCompatible
from Connection import Connection
......@@ -33,7 +33,8 @@ class DB(UndoLogCompatible.UndoLogCompatible):
or more connections, which manage object spaces. Most of the actual work
of managing objects is done by the connections.
"""
klass = Connection
klass = Connection # Class to use for connections
_activity_monitor = None
def __init__(self, storage,
pool_size=7,
......@@ -124,6 +125,9 @@ class DB(UndoLogCompatible.UndoLogCompatible):
"""Return a connection to the pool"""
self._a()
try:
am = self._activity_monitor
if am is not None:
am.closedConnection(connection)
version=connection._version
pools,pooll=self._pools
pool, allocated, pool_lock = pools[version]
......@@ -486,6 +490,9 @@ class DB(UndoLogCompatible.UndoLogCompatible):
})
return r
def getActivityMonitor(self):
return self._activity_monitor
def pack(self, t=None, days=0):
if t is None: t=time()
t=t-(days*86400)
......@@ -515,7 +522,10 @@ class DB(UndoLogCompatible.UndoLogCompatible):
def setPoolSize(self, v):
self._pool_size=v
def setActivityMonitor(self, am):
self._activity_monitor = am
def setVersionCacheDeactivateAfter(self, v):
self._version_cache_deactivate_after=v
for ver in self._pools[0].keys():
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of the default activity monitor.
See ZODB/ActivityMonitor.py
$Id: testActivityMonitor.py,v 1.2 2002/06/10 20:20:44 shane Exp $
"""
import unittest
import time
from ZODB.ActivityMonitor import ActivityMonitor
class FakeConnection:
loads = 0
stores = 0
def _transferred(self, loads, stores):
self.loads = self.loads + loads
self.stores = self.stores + stores
def getTransferCounts(self, clear=0):
res = self.loads, self.stores
if clear:
self.loads = self.stores = 0
return res
class Tests(unittest.TestCase):
def testAddLogEntries(self):
am = ActivityMonitor(history_length=3600)
self.assertEqual(len(am.log), 0)
c = FakeConnection()
c._transferred(1, 2)
am.closedConnection(c)
c._transferred(3, 7)
am.closedConnection(c)
self.assertEqual(len(am.log), 2)
def testTrim(self):
am = ActivityMonitor(history_length=0.1)
c = FakeConnection()
c._transferred(1, 2)
am.closedConnection(c)
time.sleep(0.2)
c._transferred(3, 7)
am.closedConnection(c)
self.assert_(len(am.log) <= 1)
def testSetHistoryLength(self):
am = ActivityMonitor(history_length=3600)
c = FakeConnection()
c._transferred(1, 2)
am.closedConnection(c)
time.sleep(0.2)
c._transferred(3, 7)
am.closedConnection(c)
self.assertEqual(len(am.log), 2)
am.setHistoryLength(0.1)
self.assertEqual(am.getHistoryLength(), 0.1)
self.assert_(len(am.log) <= 1)
def testActivityAnalysis(self):
am = ActivityMonitor(history_length=3600)
c = FakeConnection()
c._transferred(1, 2)
am.closedConnection(c)
c._transferred(3, 7)
am.closedConnection(c)
res = am.getActivityAnalysis(start=0, end=0, divisions=10)
lastend = 0
for n in range(9):
div = res[n]
self.assertEqual(div['stores'], 0)
self.assertEqual(div['loads'], 0)
self.assert_(div['start'] > 0)
self.assert_(div['start'] >= lastend)
self.assert_(div['start'] < div['end'])
lastend = div['end']
div = res[9]
self.assertEqual(div['stores'], 9)
self.assertEqual(div['loads'], 4)
self.assert_(div['start'] > 0)
self.assert_(div['start'] >= lastend)
self.assert_(div['start'] < div['end'])
def test_suite():
return unittest.makeSuite(Tests)
if __name__=='__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment